![]() Server : Apache System : Linux server2.corals.io 4.18.0-348.2.1.el8_5.x86_64 #1 SMP Mon Nov 15 09:17:08 EST 2021 x86_64 User : corals ( 1002) PHP Version : 7.4.33 Disable Function : exec,passthru,shell_exec,system Directory : /home/corals/old/vendor/magento/framework-message-queue/ |
<?php /** * Copyright © Magento, Inc. All rights reserved. * See COPYING.txt for license details. */ namespace Magento\Framework\MessageQueue; use Exception; use Magento\Framework\App\ObjectManager; use Magento\Framework\App\ResourceConnection; use Magento\Framework\Communication\ConfigInterface as CommunicationConfig; use Magento\Framework\Exception\LocalizedException; use Magento\Framework\Exception\NotFoundException; use Magento\Framework\MessageQueue\Consumer\ConfigInterface as ConsumerConfig; use Magento\Framework\Phrase; use Psr\Log\LoggerInterface; /** * Class Consumer used to process a single message, unlike batch consumer. * * This could be used for both synchronous and asynchronous processing, depending on topic. * * @SuppressWarnings(PHPMD.CouplingBetweenObjects) */ class Consumer implements ConsumerInterface { /** * @var ConsumerConfigurationInterface */ private $configuration; /** * @var ResourceConnection */ private $resource; /** * @var MessageEncoder */ private $messageEncoder; /** * @var CallbackInvokerInterface */ private $invoker; /** * @var MessageController */ private $messageController; /** * @var QueueRepository */ private $queueRepository; /** * @var EnvelopeFactory */ private $envelopeFactory; /** * @var MessageValidator */ private $messageValidator; /** * @var ConsumerConfig */ private $consumerConfig; /** * @var CommunicationConfig */ private $communicationConfig; /** * @var LoggerInterface */ private $logger; /** * Initialize dependencies. * * @param CallbackInvokerInterface $invoker * @param MessageEncoder $messageEncoder * @param ResourceConnection $resource * @param ConsumerConfigurationInterface $configuration * @param LoggerInterface|null $logger * @param ConsumerConfig|null $consumerConfig * @param CommunicationConfig|null $communicationConfig * @param QueueRepository|null $queueRepository * @param MessageController|null $messageController * @param MessageValidator|null $messageValidator * @param EnvelopeFactory|null $envelopeFactory * @SuppressWarnings(PHPMD.ExcessiveParameterList) */ public function __construct( CallbackInvokerInterface $invoker, MessageEncoder $messageEncoder, ResourceConnection $resource, ConsumerConfigurationInterface $configuration, LoggerInterface $logger = null, ConsumerConfig $consumerConfig = null, CommunicationConfig $communicationConfig = null, QueueRepository $queueRepository = null, MessageController $messageController = null, MessageValidator $messageValidator = null, EnvelopeFactory $envelopeFactory = null ) { $this->invoker = $invoker; $this->messageEncoder = $messageEncoder; $this->resource = $resource; $this->configuration = $configuration; $this->logger = $logger ?: ObjectManager::getInstance()->get(LoggerInterface::class); $this->consumerConfig = $consumerConfig ?: ObjectManager::getInstance()->get(ConsumerConfig::class); $this->communicationConfig = $communicationConfig ?: ObjectManager::getInstance()->get(CommunicationConfig::class); $this->queueRepository = $queueRepository ?: ObjectManager::getInstance()->get(QueueRepository::class); $this->messageController = $messageController ?: ObjectManager::getInstance()->get(MessageController::class); $this->messageValidator = $messageValidator ?: ObjectManager::getInstance()->get(MessageValidator::class); $this->envelopeFactory = $envelopeFactory ?: ObjectManager::getInstance()->get(EnvelopeFactory::class); } /** * @inheritdoc */ public function process($maxNumberOfMessages = null) { $queue = $this->configuration->getQueue(); $maxIdleTime = $this->configuration->getMaxIdleTime(); $sleep = $this->configuration->getSleep(); if (!isset($maxNumberOfMessages)) { $queue->subscribe($this->getTransactionCallback($queue)); } else { $this->invoker->invoke( $queue, $maxNumberOfMessages, $this->getTransactionCallback($queue), $maxIdleTime, $sleep ); } } /** * Decode message and invoke callback method, return reply back for sync processing. * * @param EnvelopeInterface $message * @param boolean $isSync * @return string|null * @throws LocalizedException */ private function dispatchMessage(EnvelopeInterface $message, $isSync = false) { $properties = $message->getProperties(); $topicName = $properties['topic_name']; $handlers = $this->configuration->getHandlers($topicName); $decodedMessage = $this->messageEncoder->decode($topicName, $message->getBody()); if (isset($decodedMessage)) { $messageSchemaType = $this->configuration->getMessageSchemaType($topicName); if ($messageSchemaType == CommunicationConfig::TOPIC_REQUEST_TYPE_METHOD) { foreach ($handlers as $callback) { // The `array_values` is a workaround to ensure the same behavior in PHP 7 and 8. $result = call_user_func_array($callback, array_values($decodedMessage)); return $this->processSyncResponse($topicName, $result); } } else { foreach ($handlers as $callback) { $result = call_user_func($callback, $decodedMessage); if ($isSync) { return $this->processSyncResponse($topicName, $result); } } } } return null; } /** * Validate and encode synchronous handler output. * * @param string $topicName * @param mixed $result * @return string * @throws LocalizedException */ private function processSyncResponse($topicName, $result) { if (isset($result)) { $this->messageValidator->validate($topicName, $result, false); return $this->messageEncoder->encode($topicName, $result, false); } else { throw new LocalizedException(new Phrase('No reply message resulted in RPC.')); } } /** * Send RPC response message. * * @param EnvelopeInterface $envelope * * @return void * @throws LocalizedException */ private function sendResponse(EnvelopeInterface $envelope) { $messageProperties = $envelope->getProperties(); $connectionName = $this->consumerConfig->getConsumer($this->configuration->getConsumerName())->getConnection(); $queue = $this->queueRepository->get($connectionName, $messageProperties['reply_to']); $queue->push($envelope); } /** * Get transaction callback. This handles the case of both sync and async. * * @param QueueInterface $queue * @return \Closure */ private function getTransactionCallback(QueueInterface $queue) { return function (EnvelopeInterface $message) use ($queue) { /** @var LockInterface $lock */ $lock = null; try { $topicName = $message->getProperties()['topic_name']; $topicConfig = $this->communicationConfig->getTopic($topicName); $lock = $this->messageController->lock($message, $this->configuration->getConsumerName()); if ($topicConfig[CommunicationConfig::TOPIC_IS_SYNCHRONOUS]) { $responseBody = $this->dispatchMessage($message, true); $responseMessage = $this->envelopeFactory->create( ['body' => $responseBody, 'properties' => $message->getProperties()] ); $this->sendResponse($responseMessage); } else { $allowedTopics = $this->configuration->getTopicNames(); if (in_array($topicName, $allowedTopics)) { $this->dispatchMessage($message); } else { $queue->reject($message); return; } } $queue->acknowledge($message); } catch (MessageLockException $exception) { $queue->acknowledge($message); } catch (ConnectionLostException $exception) { if ($lock) { $this->resource->getConnection()->delete( $this->resource->getTableName('queue_lock'), ['id = ?' => $lock->getId()] ); } } catch (NotFoundException $exception) { $queue->acknowledge($message); $this->logger->warning($exception->getMessage()); } catch (Exception $exception) { $queue->reject($message, false, $exception->getMessage()); if ($lock) { $this->resource->getConnection()->delete( $this->resource->getTableName('queue_lock'), ['id = ?' => $lock->getId()] ); } } }; } }