![]() Server : Apache System : Linux server2.corals.io 4.18.0-348.2.1.el8_5.x86_64 #1 SMP Mon Nov 15 09:17:08 EST 2021 x86_64 User : corals ( 1002) PHP Version : 7.4.33 Disable Function : exec,passthru,shell_exec,system Directory : /home/corals/old/vendor/magento/module-mysql-mq/Model/ResourceModel/ |
<?php /** * Copyright © Magento, Inc. All rights reserved. * See COPYING.txt for license details. */ namespace Magento\MysqlMq\Model\ResourceModel; use Magento\MysqlMq\Model\QueueManagement; /** * Resource model for queue. */ class Queue extends \Magento\Framework\Model\ResourceModel\Db\AbstractDb { /** * Model initialization * * @return void */ protected function _construct() { $this->_init('queue', 'id'); } /** * Save message to 'queue_message' table. * * @param string $messageTopic * @param string $messageBody * @return int ID of the inserted record */ public function saveMessage($messageTopic, $messageBody) { $this->getConnection()->insert( $this->getMessageTable(), ['topic_name' => $messageTopic, 'body' => $messageBody] ); return $this->getConnection()->lastInsertId($this->getMessageTable()); } /** * Save messages in bulk to 'queue_message' table. * * @param string $messageTopic * @param array $messages * @return array List of IDs of inserted records */ public function saveMessages($messageTopic, array $messages) { $data = []; foreach ($messages as $message) { $data[] = ['topic_name' => $messageTopic, 'body' => $message]; } $rowCount = $this->getConnection()->insertMultiple($this->getMessageTable(), $data); $firstId = $this->getConnection()->lastInsertId($this->getMessageTable()); $select = $this->getConnection()->select() ->from(['qm' => $this->getMessageTable()], ['id']) ->where('qm.id >= ?', $firstId) ->limit($rowCount); return $this->getConnection()->fetchCol($select); } /** * Add associations between the specified message and queues. * * @param int $messageId * @param string[] $queueNames * @return $this */ public function linkQueues($messageId, $queueNames) { return $this->linkMessagesWithQueues([$messageId], $queueNames); } /** * Add associations between the specified messages and queues. * * @param array $messageIds * @param string[] $queueNames * @return $this */ public function linkMessagesWithQueues(array $messageIds, array $queueNames) { $connection = $this->getConnection(); $queueIds = $this->getQueueIdsByNames($queueNames); $data = []; foreach ($messageIds as $messageId) { foreach ($queueIds as $queueId) { $data[] = [ $queueId, $messageId, QueueManagement::MESSAGE_STATUS_NEW ]; } } if (!empty($data)) { $connection->insertArray( $this->getMessageStatusTable(), ['queue_id', 'message_id', 'status'], $data ); } return $this; } /** * Retrieve array of queue IDs corresponding to the specified array of queue names. * * @param string[] $queueNames * @return int[] */ protected function getQueueIdsByNames($queueNames) { $selectObject = $this->getConnection()->select(); $selectObject->from(['queue' => $this->getQueueTable()]) ->columns(['id']) ->where('queue.name IN (?)', $queueNames); return $this->getConnection()->fetchCol($selectObject); } /** * Retrieve messages from the specified queue. * * @param string $queueName * @param int|null $limit * @return array */ public function getMessages($queueName, $limit = null) { $connection = $this->getConnection(); $select = $connection->select() ->from( ['queue_message' => $this->getMessageTable()], [QueueManagement::MESSAGE_TOPIC => 'topic_name', QueueManagement::MESSAGE_BODY => 'body'] )->join( ['queue_message_status' => $this->getMessageStatusTable()], 'queue_message.id = queue_message_status.message_id', [ QueueManagement::MESSAGE_QUEUE_RELATION_ID => 'id', QueueManagement::MESSAGE_QUEUE_ID => 'queue_id', QueueManagement::MESSAGE_ID => 'message_id', QueueManagement::MESSAGE_STATUS => 'status', QueueManagement::MESSAGE_UPDATED_AT => 'updated_at', QueueManagement::MESSAGE_NUMBER_OF_TRIALS => 'number_of_trials' ] )->join( ['queue' => $this->getQueueTable()], 'queue.id = queue_message_status.queue_id', [QueueManagement::MESSAGE_QUEUE_NAME => 'name'] )->where( 'queue_message_status.status IN (?)', [QueueManagement::MESSAGE_STATUS_NEW, QueueManagement::MESSAGE_STATUS_RETRY_REQUIRED] )->where('queue.name = ?', $queueName) ->order(['queue_message_status.updated_at ASC', 'queue_message_status.id ASC']); if ($limit) { $select->limit($limit); } return $connection->fetchAll($select); } /** * Delete messages if there is no queue whrere the message is not in status TO BE DELETED * * @return void */ public function deleteMarkedMessages() { $connection = $this->getConnection(); $select = $connection->select() ->from(['queue_message_status' => $this->getMessageStatusTable()], ['message_id']) ->where('status <> ?', QueueManagement::MESSAGE_STATUS_TO_BE_DELETED) ->distinct(); $messageIds = $connection->fetchCol($select); $condition = count($messageIds) > 0 ? ['id NOT IN (?)' => $messageIds] : null; $connection->delete($this->getMessageTable(), $condition); } /** * Mark specified messages with 'in progress' status. * * @param int[] $relationIds * @return int[] IDs of messages which should be taken in progress by current process. */ public function takeMessagesInProgress($relationIds) { $takenMessagesRelationIds = []; foreach ($relationIds as $relationId) { $affectedRows = $this->getConnection()->update( $this->getMessageStatusTable(), ['status' => QueueManagement::MESSAGE_STATUS_IN_PROGRESS], ['id = ?' => $relationId] ); if ($affectedRows) { /** * If status was set to 'in progress' by some other process (due to race conditions), * current process should not process the same message. * So message will be processed only if current process was able to change its status. */ $takenMessagesRelationIds[] = $relationId; } } return $takenMessagesRelationIds; } /** * Set status of message to 'retry required' and increment number of processing trials. * * @param int $relationId * @return void */ public function pushBackForRetry($relationId) { $this->getConnection()->update( $this->getMessageStatusTable(), [ 'status' => QueueManagement::MESSAGE_STATUS_RETRY_REQUIRED, 'number_of_trials' => new \Zend_Db_Expr('number_of_trials+1') ], ['id = ?' => $relationId] ); } /** * Change message status. * * @param int[] $relationIds * @param int $status * @return void */ public function changeStatus($relationIds, $status) { $this->getConnection()->update( $this->getMessageStatusTable(), ['status' => $status], ['id IN (?)' => $relationIds] ); } /** * Get name of table storing message statuses and associations to queues. * * @return string */ protected function getMessageStatusTable() { return $this->getTable('queue_message_status'); } /** * Get name of table storing declared queues. * * @return string */ protected function getQueueTable() { return $this->getTable('queue'); } /** * Get name of table storing message body and topic. * * @return string */ protected function getMessageTable() { return $this->getTable('queue_message'); } }