Spamworldpro Mini Shell
Spamworldpro


Server : Apache
System : Linux server2.corals.io 4.18.0-348.2.1.el8_5.x86_64 #1 SMP Mon Nov 15 09:17:08 EST 2021 x86_64
User : corals ( 1002)
PHP Version : 7.4.33
Disable Function : exec,passthru,shell_exec,system
Directory :  /home/corals/cartforge.co/app/code/Webkul/PrivateShop/Console/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : /home/corals/cartforge.co/app/code/Webkul/PrivateShop/Console/StartConsumerCommand.php
<?php
/**
 * Webkul Software
 *
 * @category  Webkul
 * @package   Webkul_PrivateShop
 * @author    Webkul Software Private Limited
 * @copyright Webkul Software Private Limited (https://webkul.com)
 * @license   https://store.webkul.com/license.html
 */

namespace Webkul\PrivateShop\Console;

use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Webkul\PrivateShop\Api\Config\ConfigInterface as QueueConfig;
use Webkul\PrivateShop\Api\Queue\PublisherInterface;
use Webkul\PrivateShop\Api\Queue\EncoderInterface;
use Magento\Framework\App\State;

class StartConsumerCommand extends Command
{
    /**
     * Constant for Starting queue consumer
     */
    public const COMMAND_CONSUMERS_START = 'wk_mq:consumers:start';
    /**
     * Constant Queue Name
     */
    public const QUEUE_NAME = 'queue';
    /**
     * Constant for Polling interval
     */
    public const POLL_INTERVAL = 'interval';
    /**
     * Constant for Maximum number of messages to process
     */
    public const MESSAGE_LIMIT = 'limit';

    /**
     * @var QueueConfig
     */
    private $queueConfig;

    /**
     * @var EncoderInterface
     */
    private $encoder;

    /**
     * @var \Magento\Framework\App\State
     */
    protected $state;

    /**
     * @var \Psr\Log\LoggerInterface
     */
    protected $logger;

    /**
     * This is construct
     *
     * @param State $state
     * @param QueueConfig $queueConfig
     * @param EncoderInterface $encoder
     * @param \Psr\Log\LoggerInterface $logger
     * @param string|null $name
     */
    public function __construct(
        State $state,
        QueueConfig $queueConfig,
        EncoderInterface $encoder,
        \Psr\Log\LoggerInterface $logger,
        $name = null
    ) {
        $this->state = $state;
        $this->queueConfig = $queueConfig;
        $this->encoder = $encoder;
        $this->logger = $logger;
        parent::__construct($name);
    }

    /**
     * @inheritdoc
     */
    protected function execute(InputInterface $input, OutputInterface $output)
    {
        try {
            $this->state->getAreaCode();
        } catch (\Exception $e) {
            $this->state->setAreaCode('adminhtml');
        }
        $logger = $this->logger;

        // Load input arguments
        $queueName = $input->getArgument(self::QUEUE_NAME);
        $interval = $input->getOption(self::POLL_INTERVAL);
        $limit = $input->getOption(self::MESSAGE_LIMIT);

        // initiate consumer and broker
        $broker = $this->queueConfig->getQueueBrokerInstance($queueName);
        $consumer = $this->queueConfig->getQueueConsumerInstance($queueName);
        $logger->critical("consumer running");
        do {
          // Get next message in queue
            $message = $broker->next();
            
            if ($message) {
                // Try to process the message
                try {
                    $consumer->process(
                        $this->encoder->decode($queueName, $message->getContent())
                    );
                    $broker->acknowledge($message);
                } catch (\Exception $ex) {
                    $broker->reject($message);
                    $output->writeln('Error: ' . $ex->getMessage());
                }
            } else {
                // No message found, wait before checking again
                usleep($interval * 1000);
            }
          
            $limit--;
        } while ($limit != 0);
    }

    /**
     * @inheritdoc
     */
    protected function configure()
    {
        $this->setName(self::COMMAND_CONSUMERS_START);
        $this->setDescription('Start queue consumer');

        $this->addArgument(
            self::QUEUE_NAME,
            InputArgument::REQUIRED,
            'The queue name.'
        );
        $this->addOption(
            self::POLL_INTERVAL,
            null,
            InputOption::VALUE_REQUIRED,
            'Polling interval in ms (default is 200).',
            200
        );
        $this->addOption(
            self::MESSAGE_LIMIT,
            null,
            InputOption::VALUE_REQUIRED,
            'Maximum number of messages to process (default is 0, unlimited).',
            0
        );

        parent::configure();
    }
}

Spamworldpro Mini