![]() Server : Apache System : Linux server2.corals.io 4.18.0-348.2.1.el8_5.x86_64 #1 SMP Mon Nov 15 09:17:08 EST 2021 x86_64 User : corals ( 1002) PHP Version : 7.4.33 Disable Function : exec,passthru,shell_exec,system Directory : /home/corals/old/vendor/magento/module-mysql-mq/Model/Driver/ |
<?php /** * Copyright © Magento, Inc. All rights reserved. * See COPYING.txt for license details. */ namespace Magento\MysqlMq\Model\Driver; use Magento\Framework\MessageQueue\EnvelopeInterface; use Magento\Framework\MessageQueue\QueueInterface; use Magento\MysqlMq\Model\QueueManagement; use Magento\Framework\MessageQueue\EnvelopeFactory; use Psr\Log\LoggerInterface; /** * Queue based on MessageQueue protocol */ class Queue implements QueueInterface { /** * @var QueueManagement */ private $queueManagement; /** * @var EnvelopeFactory */ private $envelopeFactory; /** * @var string */ private $queueName; /** * @var int */ private $interval; /** * @var int */ private $maxNumberOfTrials; /** * @var LoggerInterface $logger */ private $logger; /** * Queue constructor. * * @param QueueManagement $queueManagement * @param EnvelopeFactory $envelopeFactory * @param LoggerInterface $logger * @param string $queueName * @param int $interval * @param int $maxNumberOfTrials */ public function __construct( QueueManagement $queueManagement, EnvelopeFactory $envelopeFactory, LoggerInterface $logger, $queueName, $interval = 5, $maxNumberOfTrials = 3 ) { $this->queueManagement = $queueManagement; $this->envelopeFactory = $envelopeFactory; $this->queueName = $queueName; $this->interval = $interval; $this->maxNumberOfTrials = $maxNumberOfTrials; $this->logger = $logger; } /** * @inheritdoc */ public function dequeue() { $envelope = null; $messages = $this->queueManagement->readMessages($this->queueName, 1); if (isset($messages[0])) { $properties = $messages[0]; $body = $properties[QueueManagement::MESSAGE_BODY]; unset($properties[QueueManagement::MESSAGE_BODY]); $envelope = $this->envelopeFactory->create(['body' => $body, 'properties' => $properties]); } return $envelope; } /** * @inheritdoc */ public function acknowledge(EnvelopeInterface $envelope) { $properties = $envelope->getProperties(); $relationId = $properties[QueueManagement::MESSAGE_QUEUE_RELATION_ID]; $this->queueManagement->changeStatus($relationId, QueueManagement::MESSAGE_STATUS_COMPLETE); } /** * @inheritdoc */ public function subscribe($callback) { while (true) { while ($envelope = $this->dequeue()) { try { // phpcs:ignore Magento2.Functions.DiscouragedFunction call_user_func($callback, $envelope); } catch (\Exception $e) { $this->reject($envelope); } } // phpcs:ignore Magento2.Functions.DiscouragedFunction sleep($this->interval); } } /** * @inheritdoc */ public function reject(EnvelopeInterface $envelope, $requeue = true, $rejectionMessage = null) { $properties = $envelope->getProperties(); $relationId = $properties[QueueManagement::MESSAGE_QUEUE_RELATION_ID]; if ($properties[QueueManagement::MESSAGE_NUMBER_OF_TRIALS] < $this->maxNumberOfTrials && $requeue) { $this->queueManagement->pushToQueueForRetry($relationId); } else { $this->queueManagement->changeStatus([$relationId], QueueManagement::MESSAGE_STATUS_ERROR); if ($rejectionMessage !== null) { $this->logger->critical(__('Message has been rejected: %1', $rejectionMessage)); } } } /** * @inheritDoc */ public function push(EnvelopeInterface $envelope) { $properties = $envelope->getProperties(); $this->queueManagement->addMessageToQueues( $properties[QueueManagement::MESSAGE_TOPIC], $envelope->getBody(), [$this->queueName] ); } }