Spamworldpro Mini Shell
Spamworldpro


Server : Apache
System : Linux server2.corals.io 4.18.0-348.2.1.el8_5.x86_64 #1 SMP Mon Nov 15 09:17:08 EST 2021 x86_64
User : corals ( 1002)
PHP Version : 7.4.33
Disable Function : exec,passthru,shell_exec,system
Directory :  /home/corals/old/vendor/magento/module-mysql-mq/Model/Driver/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : /home/corals/old/vendor/magento/module-mysql-mq/Model/Driver/Queue.php
<?php
/**
 * Copyright © Magento, Inc. All rights reserved.
 * See COPYING.txt for license details.
 */
namespace Magento\MysqlMq\Model\Driver;

use Magento\Framework\MessageQueue\EnvelopeInterface;
use Magento\Framework\MessageQueue\QueueInterface;
use Magento\MysqlMq\Model\QueueManagement;
use Magento\Framework\MessageQueue\EnvelopeFactory;
use Psr\Log\LoggerInterface;

/**
 * Queue based on MessageQueue protocol
 */
class Queue implements QueueInterface
{
    /**
     * @var QueueManagement
     */
    private $queueManagement;

    /**
     * @var EnvelopeFactory
     */
    private $envelopeFactory;

    /**
     * @var string
     */
    private $queueName;

    /**
     * @var int
     */
    private $interval;

    /**
     * @var int
     */
    private $maxNumberOfTrials;

    /**
     * @var LoggerInterface $logger
     */
    private $logger;

    /**
     * Queue constructor.
     *
     * @param QueueManagement $queueManagement
     * @param EnvelopeFactory $envelopeFactory
     * @param LoggerInterface $logger
     * @param string $queueName
     * @param int $interval
     * @param int $maxNumberOfTrials
     */
    public function __construct(
        QueueManagement $queueManagement,
        EnvelopeFactory $envelopeFactory,
        LoggerInterface $logger,
        $queueName,
        $interval = 5,
        $maxNumberOfTrials = 3
    ) {
        $this->queueManagement = $queueManagement;
        $this->envelopeFactory = $envelopeFactory;
        $this->queueName = $queueName;
        $this->interval = $interval;
        $this->maxNumberOfTrials = $maxNumberOfTrials;
        $this->logger = $logger;
    }

    /**
     * @inheritdoc
     */
    public function dequeue()
    {
        $envelope = null;
        $messages = $this->queueManagement->readMessages($this->queueName, 1);
        if (isset($messages[0])) {
            $properties = $messages[0];

            $body = $properties[QueueManagement::MESSAGE_BODY];
            unset($properties[QueueManagement::MESSAGE_BODY]);

            $envelope = $this->envelopeFactory->create(['body' => $body, 'properties' => $properties]);
        }

        return $envelope;
    }

    /**
     * @inheritdoc
     */
    public function acknowledge(EnvelopeInterface $envelope)
    {
        $properties = $envelope->getProperties();
        $relationId = $properties[QueueManagement::MESSAGE_QUEUE_RELATION_ID];

        $this->queueManagement->changeStatus($relationId, QueueManagement::MESSAGE_STATUS_COMPLETE);
    }

    /**
     * @inheritdoc
     */
    public function subscribe($callback)
    {
        while (true) {
            while ($envelope = $this->dequeue()) {
                try {
                    // phpcs:ignore Magento2.Functions.DiscouragedFunction
                    call_user_func($callback, $envelope);
                } catch (\Exception $e) {
                    $this->reject($envelope);
                }
            }
            // phpcs:ignore Magento2.Functions.DiscouragedFunction
            sleep($this->interval);
        }
    }

    /**
     * @inheritdoc
     */
    public function reject(EnvelopeInterface $envelope, $requeue = true, $rejectionMessage = null)
    {
        $properties = $envelope->getProperties();
        $relationId = $properties[QueueManagement::MESSAGE_QUEUE_RELATION_ID];

        if ($properties[QueueManagement::MESSAGE_NUMBER_OF_TRIALS] < $this->maxNumberOfTrials && $requeue) {
            $this->queueManagement->pushToQueueForRetry($relationId);
        } else {
            $this->queueManagement->changeStatus([$relationId], QueueManagement::MESSAGE_STATUS_ERROR);
            if ($rejectionMessage !== null) {
                $this->logger->critical(__('Message has been rejected: %1', $rejectionMessage));
            }
        }
    }

    /**
     * @inheritDoc
     */
    public function push(EnvelopeInterface $envelope)
    {
        $properties = $envelope->getProperties();
        $this->queueManagement->addMessageToQueues(
            $properties[QueueManagement::MESSAGE_TOPIC],
            $envelope->getBody(),
            [$this->queueName]
        );
    }
}

Spamworldpro Mini