![]() Server : Apache System : Linux server2.corals.io 4.18.0-348.2.1.el8_5.x86_64 #1 SMP Mon Nov 15 09:17:08 EST 2021 x86_64 User : corals ( 1002) PHP Version : 7.4.33 Disable Function : exec,passthru,shell_exec,system Directory : /home/corals/old/vendor/magento/framework-message-queue/ |
<?php /** * Copyright © Magento, Inc. All rights reserved. * See COPYING.txt for license details. */ namespace Magento\Framework\MessageQueue; /** * Processing messages implementing MergedMessageInterface. */ class MergedMessageProcessor implements MessageProcessorInterface { /** * @var \Magento\Framework\MessageQueue\MessageStatusProcessor */ private $messageStatusProcessor; /** * @param MessageStatusProcessor $messageStatusProcessor */ public function __construct(MessageStatusProcessor $messageStatusProcessor) { $this->messageStatusProcessor = $messageStatusProcessor; } /** * @inheritdoc */ public function process( QueueInterface $queue, ConsumerConfigurationInterface $configuration, array $messages, array $messagesToAcknowledge, array $mergedMessages ) { $this->messageStatusProcessor->acknowledgeMessages($queue, $messagesToAcknowledge); $this->dispatchMessages($queue, $configuration, $mergedMessages, $messages); } /** * Processing decoded messages, invoking callbacks, changing statuses for messages. * * @param QueueInterface $queue * @param ConsumerConfigurationInterface $configuration * @param array $messageList * @param array $originalMessages */ private function dispatchMessages( QueueInterface $queue, ConsumerConfigurationInterface $configuration, array $messageList, array $originalMessages ) { $originalMessagesIds = []; try { foreach ($messageList as $topicName => $messages) { foreach ($messages as $message) { /** * @var \Magento\Framework\MessageQueue\MergedMessageInterface $message */ $callbacks = $configuration->getHandlers($topicName); $originalMessagesIds = $message->getOriginalMessagesIds(); foreach ($callbacks as $callback) { call_user_func($callback, $message->getMergedMessage()); } $originalMessages = $this->getOriginalMessages($originalMessages, $originalMessagesIds); $this->messageStatusProcessor->acknowledgeMessages($queue, $originalMessages); } } } catch (\Exception $e) { $originalMessages = $this->getOriginalMessages($originalMessages, $originalMessagesIds); $this->messageStatusProcessor->rejectMessages($queue, $originalMessages); } } /** * Get original messages by messages ids. * * @param array $messages * @param array $messagesIds * @return array */ private function getOriginalMessages(array $messages, array $messagesIds) { $originalMessages = []; foreach ($messagesIds as $messageId) { if (isset($messages[$messageId])) { $originalMessages[] = $messages[$messageId]; } } return $originalMessages; } }