![]() Server : Apache System : Linux server2.corals.io 4.18.0-348.2.1.el8_5.x86_64 #1 SMP Mon Nov 15 09:17:08 EST 2021 x86_64 User : corals ( 1002) PHP Version : 7.4.33 Disable Function : exec,passthru,shell_exec,system Directory : /home/corals/cartforge.co/app/code/Webkul/PrivateShop/Console/ |
<?php /** * Webkul Software * * @category Webkul * @package Webkul_PrivateShop * @author Webkul Software Private Limited * @copyright Webkul Software Private Limited (https://webkul.com) * @license https://store.webkul.com/license.html */ namespace Webkul\PrivateShop\Console; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputArgument; use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; use Webkul\PrivateShop\Api\Config\ConfigInterface as QueueConfig; use Webkul\PrivateShop\Api\Queue\PublisherInterface; use Webkul\PrivateShop\Api\Queue\EncoderInterface; use Magento\Framework\App\State; class StartConsumerCommand extends Command { /** * Constant for Starting queue consumer */ public const COMMAND_CONSUMERS_START = 'wk_mq:consumers:start'; /** * Constant Queue Name */ public const QUEUE_NAME = 'queue'; /** * Constant for Polling interval */ public const POLL_INTERVAL = 'interval'; /** * Constant for Maximum number of messages to process */ public const MESSAGE_LIMIT = 'limit'; /** * @var QueueConfig */ private $queueConfig; /** * @var EncoderInterface */ private $encoder; /** * @var \Magento\Framework\App\State */ protected $state; /** * @var \Psr\Log\LoggerInterface */ protected $logger; /** * This is construct * * @param State $state * @param QueueConfig $queueConfig * @param EncoderInterface $encoder * @param \Psr\Log\LoggerInterface $logger * @param string|null $name */ public function __construct( State $state, QueueConfig $queueConfig, EncoderInterface $encoder, \Psr\Log\LoggerInterface $logger, $name = null ) { $this->state = $state; $this->queueConfig = $queueConfig; $this->encoder = $encoder; $this->logger = $logger; parent::__construct($name); } /** * @inheritdoc */ protected function execute(InputInterface $input, OutputInterface $output) { try { $this->state->getAreaCode(); } catch (\Exception $e) { $this->state->setAreaCode('adminhtml'); } $logger = $this->logger; // Load input arguments $queueName = $input->getArgument(self::QUEUE_NAME); $interval = $input->getOption(self::POLL_INTERVAL); $limit = $input->getOption(self::MESSAGE_LIMIT); // initiate consumer and broker $broker = $this->queueConfig->getQueueBrokerInstance($queueName); $consumer = $this->queueConfig->getQueueConsumerInstance($queueName); $logger->critical("consumer running"); do { // Get next message in queue $message = $broker->next(); if ($message) { // Try to process the message try { $consumer->process( $this->encoder->decode($queueName, $message->getContent()) ); $broker->acknowledge($message); } catch (\Exception $ex) { $broker->reject($message); $output->writeln('Error: ' . $ex->getMessage()); } } else { // No message found, wait before checking again usleep($interval * 1000); } $limit--; } while ($limit != 0); } /** * @inheritdoc */ protected function configure() { $this->setName(self::COMMAND_CONSUMERS_START); $this->setDescription('Start queue consumer'); $this->addArgument( self::QUEUE_NAME, InputArgument::REQUIRED, 'The queue name.' ); $this->addOption( self::POLL_INTERVAL, null, InputOption::VALUE_REQUIRED, 'Polling interval in ms (default is 200).', 200 ); $this->addOption( self::MESSAGE_LIMIT, null, InputOption::VALUE_REQUIRED, 'Maximum number of messages to process (default is 0, unlimited).', 0 ); parent::configure(); } }