Spamworldpro Mini Shell
Spamworldpro


Server : Apache
System : Linux server2.corals.io 4.18.0-348.2.1.el8_5.x86_64 #1 SMP Mon Nov 15 09:17:08 EST 2021 x86_64
User : corals ( 1002)
PHP Version : 7.4.33
Disable Function : exec,passthru,shell_exec,system
Directory :  /home/corals/old/vendor/mageworx/module-open-ai/Model/Queue/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : /home/corals/old/vendor/mageworx/module-open-ai/Model/Queue/QueueProcessor.php
<?php
/**
 * Copyright © MageWorx. All rights reserved.
 * See LICENSE.txt for license details.
 */
declare(strict_types = 1);

namespace MageWorx\OpenAI\Model\Queue;

use Magento\Framework\Exception\LocalizedException;
use MageWorx\OpenAI\Api\Data\QueueItemInterface;
use MageWorx\OpenAI\Api\QueueItemErrorHandlerPoolInterface;
use MageWorx\OpenAI\Api\QueueItemPreprocessorInterface;
use MageWorx\OpenAI\Api\QueueItemPreprocessorPoolInterface;
use MageWorx\OpenAI\Api\QueueProcessManagementInterface;
use MageWorx\OpenAI\Api\QueueProcessorInterface;
use MageWorx\OpenAI\Api\QueueRepositoryInterface;
use MageWorx\OpenAI\Api\RequestInterfaceFactory;
use MageWorx\OpenAI\Model\Models\ModelsFactory;

class QueueProcessor implements QueueProcessorInterface
{
    private ModelsFactory                      $modelsFactory;
    private RequestInterfaceFactory            $requestFactory;
    private QueueProcessManagementInterface    $queueProcessManagement;
    private QueueItemPreprocessorPoolInterface $queueItemPreprocessorPool;
    private QueueRepositoryInterface           $queueRepository;
    private QueueItemErrorHandlerPoolInterface $errorHandlerPool;

    public function __construct(
        ModelsFactory                      $modelsFactory,
        RequestInterfaceFactory            $requestFactory,
        QueueProcessManagementInterface    $queueProcessManagement,
        QueueItemPreprocessorPoolInterface $queueItemPreprocessorPool,
        QueueRepositoryInterface           $queueRepository,
        QueueItemErrorHandlerPoolInterface $errorHandlerPool
    ) {
        $this->modelsFactory             = $modelsFactory;
        $this->requestFactory            = $requestFactory;
        $this->queueProcessManagement    = $queueProcessManagement;
        $this->queueItemPreprocessorPool = $queueItemPreprocessorPool;
        $this->queueRepository           = $queueRepository;
        $this->errorHandlerPool          = $errorHandlerPool;
    }

    /**
     * Process a single queue item.
     *
     * @param QueueItemInterface $item The queue item to be processed.
     * @return void
     * @throws LocalizedException If processing fails.
     */
    public function processItem(QueueItemInterface $item): void
    {
        try {
            $item = $this->preprocessItem($item);

            $modelType = $item->getModel();
            /** @var \MageWorx\OpenAI\Model\Models\AbstractModel $modelEntity */
            $modelEntity = $this->modelsFactory->create($modelType);

            // Getting data
            $content = $item->getContent();
            $context = $item->getContext();
            $options = $item->getOptions();

            // Building request object
            $request = $this->requestFactory->create();
            $request->setContent($content);
            $request->setPath($modelEntity->getPath());
            $request->setContext($context);

            // Send data to OpenAI using model and save response
            $response = $modelEntity->sendRequest($request, $options);

            // Write response to queue item
            $item->setResponse($response);

            // If error response received, call error handler of this queue item, if exists
            if ($response->getIsError()) {
                $this->callErrorHandlerForItem($item);
            } else {
                // Update queue item status
                $item->setStatus(QueueItemInterface::STATUS_READY);
                $this->queueRepository->save($item);

                // Update process items counter
                $processId = $item->getProcessId();
                if ($processId) {
                    $this->queueProcessManagement->setQueueItemProcessed($processId);
                }
            }
        } catch (\Exception $e) {
            throw new LocalizedException(__('Error processing item: %1', $e->getMessage()));
        }
    }

    /**
     * Preprocess queue item
     *
     * @param QueueItemInterface $item
     * @return QueueItemInterface
     * @throws LocalizedException
     */
    private function preprocessItem(QueueItemInterface $item): QueueItemInterface
    {
        $preprocessorType = $item->getPreprocessor();
        if (!empty($preprocessorType)) {
            try {
                /** @var QueueItemPreprocessorInterface $preprocessorEntity */
                $preprocessorEntity = $this->queueItemPreprocessorPool->getByType($preprocessorType);
            } catch (\Exception $e) {
                throw new LocalizedException(
                    __(
                        'Error creating preprocessor of type %2 for item %3. Error message: %1',
                        $e->getMessage(),
                        $preprocessorType,
                        $item->getId()
                    )
                );
            }
            $preprocessorEntity->execute($item);
        }

        return $item;
    }

    /**
     * When error returned in response we are trying to call error handler of this queue item.
     * If error handler not exists or error handler can't process error, we are marking this queue item as fatal error.
     * Fatal error means that this queue item can't be processed anymore.
     * If error handler exists and can process error, it takes control over this queue item status.
     *
     * @param QueueItemInterface $item
     * @return void
     */
    private function callErrorHandlerForItem(QueueItemInterface $item): void
    {
        $errorHandlerType = $item->getErrorHandler() ?? QueueItemErrorHandlerPoolInterface::DEFAULT_HANDLER_GROUP;

        $successfullyProcessed = $this->errorHandlerPool->process($errorHandlerType, $item);
        if (!$successfullyProcessed) {
            $this->performFatalErrorOfQueueItem($item);
        }
    }

    /**
     * Mark queue item as fatal error (set fatal error status)
     * Fatal error means that this queue item can't be processed anymore.
     *
     * @param QueueItemInterface $item
     * @return void
     * @throws \Magento\Framework\Exception\CouldNotSaveException
     */
    private function performFatalErrorOfQueueItem(QueueItemInterface $item): void
    {
        $item->setStatus(QueueItemInterface::STATUS_FATAL_ERROR);
        $this->queueRepository->save($item);

        // Update process items counter
        $processId = $item->getProcessId();
        if ($processId) {
            $this->queueProcessManagement->setQueueItemProcessed($processId);
        }
    }
}

Spamworldpro Mini