Spamworldpro Mini Shell
Spamworldpro


Server : Apache
System : Linux server2.corals.io 4.18.0-348.2.1.el8_5.x86_64 #1 SMP Mon Nov 15 09:17:08 EST 2021 x86_64
User : corals ( 1002)
PHP Version : 7.4.33
Disable Function : exec,passthru,shell_exec,system
Directory :  /home/corals/old/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Connection/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : /home/corals/old/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Connection/AbstractConnection.php
<?php

namespace PhpAmqpLib\Connection;

use PhpAmqpLib\Channel\AbstractChannel;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
use PhpAmqpLib\Exception\AMQPHeartbeatMissedException;
use PhpAmqpLib\Exception\AMQPInvalidFrameException;
use PhpAmqpLib\Exception\AMQPIOException;
use PhpAmqpLib\Exception\AMQPNoDataException;
use PhpAmqpLib\Exception\AMQPRuntimeException;
use PhpAmqpLib\Exception\AMQPSocketException;
use PhpAmqpLib\Exception\AMQPTimeoutException;
use PhpAmqpLib\Helper\Assert;
use PhpAmqpLib\Package;
use PhpAmqpLib\Wire;
use PhpAmqpLib\Wire\AMQPReader;
use PhpAmqpLib\Wire\AMQPTable;
use PhpAmqpLib\Wire\AMQPWriter;
use PhpAmqpLib\Wire\IO\AbstractIO;

abstract class AbstractConnection extends AbstractChannel
{
    /**
     * @var array
     * @internal
     */
    public static $LIBRARY_PROPERTIES = array(
        'product' => array('S', Package::NAME),
        'platform' => array('S', 'PHP'),
        'version' => array('S', Package::VERSION),
        'information' => array('S', ''),
        'copyright' => array('S', ''),
        'capabilities' => array(
            'F',
            array(
                'authentication_failure_close' => array('t', true),
                'publisher_confirms' => array('t', true),
                'consumer_cancel_notify' => array('t', true),
                'exchange_exchange_bindings' => array('t', true),
                'basic.nack' => array('t', true),
                'connection.blocked' => array('t', true)
            )
        )
    );

    /**
     * @var AMQPChannel[]|AbstractChannel[]
     * @internal
     */
    public $channels = array();

    /** @var int */
    protected $version_major;

    /** @var int */
    protected $version_minor;

    /** @var array */
    protected $server_properties;

    /** @var array */
    protected $mechanisms;

    /** @var array */
    protected $locales;

    /** @var bool */
    protected $wait_tune_ok;

    /** @var string */
    protected $known_hosts;

    /** @var null|AMQPReader */
    protected $input;

    /** @var string */
    protected $vhost;

    /** @var bool */
    protected $insist;

    /** @var string */
    protected $login_method;

    /**
     * @var null|string
     * @deprecated
     */
    protected $login_response;

    /** @var string */
    protected $locale;

    /** @var int */
    protected $heartbeat;

    /** @var float */
    protected $last_frame;

    /** @var int */
    protected $channel_max = 65535;

    /** @var int */
    protected $frame_max = 131072;

     /** @var array Constructor parameters for clone */
    protected $construct_params;

    /** @var bool Close the connection in destructor */
    protected $close_on_destruct = true;

    /** @var bool Maintain connection status */
    protected $is_connected = false;

    /** @var \PhpAmqpLib\Wire\IO\AbstractIO */
    protected $io;

    /** @var \PhpAmqpLib\Wire\AMQPReader */
    protected $wait_frame_reader;

    /** @var callable Handles connection blocking from the server */
    private $connection_block_handler;

    /** @var callable Handles connection unblocking from the server */
    private $connection_unblock_handler;

    /** @var int Connection timeout value*/
    protected $connection_timeout ;

    /** @var AMQPConnectionConfig|null */
    protected $config;

    /**
     * Circular buffer to speed up prepare_content().
     * Max size limited by $prepare_content_cache_max_size.
     *
     * @var array
     * @see prepare_content()
     */
    private $prepare_content_cache = array();

    /** @var int Maximal size of $prepare_content_cache */
    private $prepare_content_cache_max_size = 100;

    /**
     * Maximum time to wait for channel operations, in seconds
     * @var float $channel_rpc_timeout
     */
    private $channel_rpc_timeout;

    /**
     * If connection is blocked due to the broker running low on resources.
     * @var bool
     */
    protected $blocked = false;

    /**
     * If a frame is currently being written
     * @var bool
     */
    protected $writing = false;

    /**
     * @param string $user
     * @param string $password
     * @param string $vhost
     * @param bool $insist
     * @param string $login_method
     * @param null $login_response @deprecated
     * @param string $locale
     * @param AbstractIO $io
     * @param int $heartbeat
     * @param int|float $connection_timeout
     * @param int|float $channel_rpc_timeout
     * @throws \Exception
     */
    public function __construct(
        $user,
        $password,
        $vhost = '/',
        $insist = false,
        $login_method = 'AMQPLAIN',
        $login_response = null,
        $locale = 'en_US',
        AbstractIO $io = null,
        $heartbeat = 0,
        $connection_timeout = 0,
        $channel_rpc_timeout = 0.0,
        ?AMQPConnectionConfig $config = null
    ) {
        if (is_null($io)) {
            throw new \InvalidArgumentException('Argument $io cannot be null');
        }

        if ($config) {
            $this->config = clone $config;
        }

        // save the params for the use of __clone
        $this->construct_params = func_get_args();

        $this->wait_frame_reader = new AMQPReader(null);
        $this->vhost = $vhost;
        $this->insist = $insist;
        $this->login_method = $login_method;
        $this->locale = $locale;
        $this->io = $io;
        $this->heartbeat = $heartbeat;
        $this->connection_timeout = $connection_timeout;
        $this->channel_rpc_timeout = $channel_rpc_timeout;

        if ($user && $password) {
            if ($login_method === 'PLAIN') {
                $this->login_response = sprintf("\0%s\0%s", $user, $password);
            } elseif ($login_method === 'AMQPLAIN') {
                $login_response = new AMQPWriter();
                $login_response->write_table(array(
                    'LOGIN' => array('S', $user),
                    'PASSWORD' => array('S', $password)
                ));

                // Skip the length
                $responseValue = $login_response->getvalue();
                $this->login_response = mb_substr($responseValue, 4, mb_strlen($responseValue, 'ASCII') - 4, 'ASCII');
            } else {
                throw new \InvalidArgumentException('Unknown login method: ' . $login_method);
            }
        } else {
            $this->login_response = null;
        }

        // Lazy Connection waits on connecting
        if ($this->connectOnConstruct()) {
            $this->connect();
        }
    }

    /**
     * Connects to the AMQP server
     */
    protected function connect()
    {
        $this->blocked = false;
        try {
            // Loop until we connect
            while (!$this->isConnected()) {
                // Assume we will connect, until we dont
                $this->setIsConnected(true);

                // Connect the socket
                $this->io->connect();

                $this->channels = array();
                // The connection object itself is treated as channel 0
                parent::__construct($this, 0);

                $this->input = new AMQPReader(null, $this->io);

                $this->write($this->constants->getHeader());
                // assume frame was sent successfully, used in $this->wait_channel()
                $this->last_frame = microtime(true);
                $this->wait(array($this->waitHelper->get_wait('connection.start')), false, $this->connection_timeout);
                $this->x_start_ok(
                    $this->getLibraryProperties(),
                    $this->login_method,
                    $this->login_response,
                    $this->locale
                );

                $this->wait_tune_ok = true;
                while ($this->wait_tune_ok) {
                    $this->wait(array(
                        $this->waitHelper->get_wait('connection.secure'),
                        $this->waitHelper->get_wait('connection.tune')
                    ), false, $this->connection_timeout);
                }

                $host = $this->x_open($this->vhost, '', $this->insist);
                if (!$host) {
                    //Reconnected
                    $this->io->reenableHeartbeat();
                    return null; // we weren't redirected
                }

                $this->setIsConnected(false);
                $this->closeChannels();

                // we were redirected, close the socket, loop and try again
                $this->close_socket();
            }
        } catch (\Exception $e) {
            // Something went wrong, set the connection status
            $this->setIsConnected(false);
            $this->closeChannels();
            $this->close_input();
            $this->close_socket();
            throw $e; // Rethrow exception
        }
    }

    /**
     * Reconnects using the original connection settings.
     * This will not recreate any channels that were established previously
     */
    public function reconnect()
    {
        // Try to close the AMQP connection
        $this->safeClose();
        // Reconnect the socket/stream then AMQP
        $this->io->close();
        // getIO can initiate the connection setting via LazyConnection, set it here to be sure
        $this->setIsConnected(false);
        $this->connect();
    }

    /**
     * Cloning will use the old properties to make a new connection to the same server
     */
    public function __clone()
    {
        if ($this->config) {
            $this->config = clone $this->config;
        }
        call_user_func_array(array($this, '__construct'), $this->construct_params);
    }

    public function __destruct()
    {
        if ($this->close_on_destruct) {
            $this->safeClose();
        }
    }

    /**
     * Attempts to close the connection safely
     */
    protected function safeClose()
    {
        try {
            if (null !== $this->input) {
                $this->close();
            }
        } catch (\Exception $e) {
            // Nothing here
        }
    }

    /**
     * @param int|null $sec
     * @param int $usec
     * @return int
     * @throws AMQPIOException
     * @throws AMQPRuntimeException
     * @throws AMQPConnectionClosedException
     * @throws AMQPRuntimeException
     */
    public function select(?int $sec, int $usec = 0): int
    {
        try {
            return $this->io->select($sec, $usec);
        } catch (AMQPConnectionClosedException $e) {
            $this->do_close();
            throw $e;
        } catch (AMQPRuntimeException $e) {
            $this->setIsConnected(false);
            throw $e;
        }
    }

    /**
     * Allows to not close the connection
     * it's useful after the fork when you don't want to close parent process connection
     *
     * @param bool $close
     */
    public function set_close_on_destruct($close = true)
    {
        $this->close_on_destruct = (bool) $close;
    }

    protected function close_input()
    {
        $this->debug && $this->debug->debug_msg('closing input');

        if (null !== $this->input) {
            $this->input->close();
            $this->input = null;
        }
    }

    protected function close_socket()
    {
        $this->debug && $this->debug->debug_msg('closing socket');
        $this->io->close();
    }

    /**
     * @param string $data
     */
    public function write($data)
    {
        $this->debug->debug_hexdump($data);

        try {
            $this->writing = true;
            $this->io->write($data);
        } catch (AMQPConnectionClosedException $e) {
            $this->do_close();
            throw $e;
        } catch (AMQPRuntimeException $e) {
            $this->setIsConnected(false);
            throw $e;
        } finally {
            $this->writing = false;
        }
    }

    protected function do_close()
    {
        $this->frame_queue = [];
        $this->method_queue = [];
        $this->setIsConnected(false);
        $this->close_input();
        $this->close_socket();
    }

    /**
     * @return int
     * @throws AMQPRuntimeException
     */
    public function get_free_channel_id()
    {
        for ($i = 1; $i <= $this->channel_max; $i++) {
            if (!isset($this->channels[$i])) {
                return $i;
            }
        }

        throw new AMQPRuntimeException('No free channel ids');
    }

    /**
     * @param int $channel
     * @param int $class_id
     * @param int $weight
     * @param int $body_size
     * @param string $packed_properties
     * @param string $body
     * @param AMQPWriter $pkt
     */
    public function send_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt)
    {
        $this->prepare_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt);
        $this->write($pkt->getvalue());
    }

    /**
     * Returns a new AMQPWriter or mutates the provided $pkt
     *
     * @param int $channel
     * @param int $class_id
     * @param int $weight
     * @param int $body_size
     * @param string $packed_properties
     * @param string $body
     * @param AMQPWriter|null $pkt
     * @return AMQPWriter
     */
    public function prepare_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt)
    {
        $pkt = $pkt ?: new AMQPWriter();

        // Content already prepared ?
        $key_cache = sprintf(
            '%s|%s|%s|%s',
            $channel,
            $packed_properties,
            $class_id,
            $weight
        );

        if (!isset($this->prepare_content_cache[$key_cache])) {
            $w = new AMQPWriter();
            $w->write_octet(2);
            $w->write_short($channel);
            $w->write_long(mb_strlen($packed_properties, 'ASCII') + 12);
            $w->write_short($class_id);
            $w->write_short($weight);
            $this->prepare_content_cache[$key_cache] = $w->getvalue();
            if (count($this->prepare_content_cache) > $this->prepare_content_cache_max_size) {
                reset($this->prepare_content_cache);
                $old_key = key($this->prepare_content_cache);
                unset($this->prepare_content_cache[$old_key]);
            }
        }
        $pkt->write($this->prepare_content_cache[$key_cache]);

        $pkt->write_longlong($body_size);
        $pkt->write($packed_properties);

        $pkt->write_octet(0xCE);


        // memory efficiency: walk the string instead of biting
        // it. good for very large packets (close in size to
        // memory_limit setting)
        $position = 0;
        $bodyLength = mb_strlen($body, 'ASCII');
        while ($position < $bodyLength) {
            $payload = mb_substr($body, $position, $this->frame_max - 8, 'ASCII');
            $position += $this->frame_max - 8;

            $pkt->write_octet(3);
            $pkt->write_short($channel);
            $pkt->write_long(mb_strlen($payload, 'ASCII'));

            $pkt->write($payload);

            $pkt->write_octet(0xCE);
        }

        return $pkt;
    }

    /**
     * @param int $channel
     * @param array $method_sig
     * @param AMQPWriter|string $args
     * @param null $pkt
     */
    protected function send_channel_method_frame($channel, $method_sig, $args = '', $pkt = null)
    {
        $pkt = $this->prepare_channel_method_frame($channel, $method_sig, $args, $pkt);
        $this->write($pkt->getvalue());
        $this->debug->debug_method_signature1($method_sig);
    }

    /**
     * Returns a new AMQPWriter or mutates the provided $pkt
     *
     * @param int $channel
     * @param array $method_sig
     * @param AMQPWriter|string $args
     * @param AMQPWriter|null $pkt
     * @return AMQPWriter
     */
    protected function prepare_channel_method_frame($channel, $method_sig, $args = '', $pkt = null)
    {
        if ($args instanceof AMQPWriter) {
            $args = $args->getvalue();
        }

        $pkt = $pkt ?: new AMQPWriter();

        $pkt->write_octet(1);
        $pkt->write_short($channel);
        $pkt->write_long(mb_strlen($args, 'ASCII') + 4); // 4 = length of class_id and method_id
        // in payload

        $pkt->write_short($method_sig[0]); // class_id
        $pkt->write_short($method_sig[1]); // method_id
        $pkt->write($args);

        $pkt->write_octet(0xCE);

        $this->debug->debug_method_signature1($method_sig);

        return $pkt;
    }

    /**
     * Waits for a frame from the server
     *
     * @param int|float|null $timeout
     * @return array
     * @throws \Exception
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException
     * @throws AMQPRuntimeException
     */
    protected function wait_frame($timeout = 0)
    {
        if (null === $this->input) {
            $this->setIsConnected(false);
            throw new AMQPConnectionClosedException('Broken pipe or closed connection');
        }

        $currentTimeout = $this->input->getTimeout();
        $this->input->setTimeout($timeout);

        try {
            // frame_type + channel_id + size
            $this->wait_frame_reader->reuse(
                $this->input->read(AMQPReader::OCTET + AMQPReader::SHORT + AMQPReader::LONG)
            );

            $frame_type = $this->wait_frame_reader->read_octet();
            if (!$this->constants->isFrameType($frame_type)) {
                throw new AMQPInvalidFrameException('Invalid frame type ' . $frame_type);
            }
            $channel = $this->wait_frame_reader->read_short();
            $size = $this->wait_frame_reader->read_long();

            // payload + ch
            $this->wait_frame_reader->reuse($this->input->read(AMQPReader::OCTET + (int) $size));

            $payload = $this->wait_frame_reader->read($size);
            $ch = $this->wait_frame_reader->read_octet();
        } catch (AMQPTimeoutException $e) {
            if ($this->input) {
                $this->input->setTimeout($currentTimeout);
            }
            throw $e;
        } catch (AMQPNoDataException $e) {
            if ($this->input) {
                $this->input->setTimeout($currentTimeout);
            }
            throw $e;
        } catch (AMQPConnectionClosedException $exception) {
            $this->do_close();
            throw $exception;
        }

        $this->input->setTimeout($currentTimeout);

        if ($ch !== 0xCE) {
            throw new AMQPInvalidFrameException(sprintf(
                'Framing error, unexpected byte: %x',
                $ch
            ));
        }

        return array($frame_type, $channel, $payload);
    }

    /**
     * Waits for a frame from the server destined for a particular channel.
     *
     * @param int $channel_id
     * @param int|float|null $timeout
     * @return array
     */
    protected function wait_channel($channel_id, $timeout = 0)
    {
        // Keeping the original timeout unchanged.
        $_timeout = $timeout;
        while (true) {
            $start = microtime(true);
            try {
                list($frame_type, $frame_channel, $payload) = $this->wait_frame($_timeout);
            } catch (AMQPTimeoutException $e) {
                if (
                    $this->heartbeat && $this->last_frame
                    && microtime(true) - ($this->heartbeat * 2) > $this->last_frame
                ) {
                    $this->debug->debug_msg('missed server heartbeat (at threshold * 2)');
                    $this->setIsConnected(false);
                    throw new AMQPHeartbeatMissedException('Missed server heartbeat');
                }

                throw $e;
            }

            $this->last_frame = microtime(true);

            if ($frame_channel === 0 && $frame_type === 8) {
                // skip heartbeat frames and reduce the timeout by the time passed
                $this->debug->debug_msg('received server heartbeat');
                if ($_timeout > 0) {
                    $_timeout -= $this->last_frame - $start;
                    if ($_timeout <= 0) {
                        // If timeout has been reached, throw the exception without calling wait_frame
                        throw new AMQPTimeoutException('Timeout waiting on channel');
                    }
                }
                continue;
            }

            if ($frame_channel === $channel_id) {
                return array($frame_type, $payload);
            }

            // Not the channel we were looking for.  Queue this frame
            //for later, when the other channel is looking for frames.
            // Make sure the channel still exists, it could have been
            // closed by a previous Exception.
            if (isset($this->channels[$frame_channel])) {
                $this->channels[$frame_channel]->frame_queue[] = [$frame_type, $payload];
            }

            // If we just queued up a method for channel 0 (the Connection
            // itself) it's probably a close method in reaction to some
            // error, so deal with it right away.
            if ($frame_type === 1 && $frame_channel === 0) {
                $this->wait();
            }
        }
    }

    /**
     * Fetches a channel object identified by the numeric channel_id, or
     * create that object if it doesn't already exist.
     *
     * @param int|null $channel_id
     * @return AMQPChannel
     * @throws \PhpAmqpLib\Exception\AMQPOutOfBoundsException
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException
     * @throws \PhpAmqpLib\Exception\AMQPConnectionClosedException
     */
    public function channel($channel_id = null)
    {
        if (isset($this->channels[$channel_id])) {
            return $this->channels[$channel_id];
        }

        $channel_id = $channel_id ?: $this->get_free_channel_id();
        $ch = new AMQPChannel($this, $channel_id, true, $this->channel_rpc_timeout);
        $this->channels[$channel_id] = $ch;

        return $ch;
    }

    /**
     * Requests a connection close
     *
     * @param int $reply_code
     * @param string $reply_text
     * @param array $method_sig
     * @return mixed|null
     */
    public function close($reply_code = 0, $reply_text = '', $method_sig = array(0, 0))
    {
        $this->io->disableHeartbeat();
        if (empty($this->protocolWriter) || !$this->isConnected()) {
            return null;
        }

        $result = null;
        try {
            $this->closeChannels();
            list($class_id, $method_id, $args) = $this->protocolWriter->connectionClose(
                $reply_code,
                $reply_text,
                $method_sig[0],
                $method_sig[1]
            );
            $this->send_method_frame(array($class_id, $method_id), $args);
            $result = $this->wait(
                array($this->waitHelper->get_wait('connection.close_ok')),
                false,
                $this->connection_timeout
            );
        } catch (\Exception $exception) {
            $this->do_close();
            throw $exception;
        }

        $this->setIsConnected(false);

        return $result;
    }

    /**
     * @param AMQPReader $reader
     * @throws AMQPConnectionClosedException
     */
    protected function connection_close(AMQPReader $reader)
    {
        $code = (int)$reader->read_short();
        $reason = $reader->read_shortstr();
        $class = $reader->read_short();
        $method = $reader->read_short();
        $reason .= sprintf('(%s, %s)', $class, $method);

        $this->x_close_ok();

        throw new AMQPConnectionClosedException($reason, $code);
    }

    /**
     * Confirms a connection close
     */
    protected function x_close_ok()
    {
        $this->send_method_frame(
            explode(',', $this->waitHelper->get_wait('connection.close_ok'))
        );
        $this->do_close();
    }

    /**
     * Confirm a connection close
     */
    protected function connection_close_ok()
    {
        $this->do_close();
    }

    /**
     * @param string $virtual_host
     * @param string $capabilities
     * @param bool $insist
     * @return mixed
     */
    protected function x_open($virtual_host, $capabilities = '', $insist = false)
    {
        $args = new AMQPWriter();
        $args->write_shortstr($virtual_host);
        $args->write_shortstr($capabilities);
        $args->write_bits(array($insist));
        $this->send_method_frame(array(10, 40), $args);

        $wait = array(
            $this->waitHelper->get_wait('connection.open_ok')
        );

        if ($this->protocolVersion === Wire\Constants080::VERSION) {
            $wait[] = $this->waitHelper->get_wait('connection.redirect');
        }

        return $this->wait($wait, false, $this->connection_timeout);
    }

    /**
     * Signals that the connection is ready
     *
     * @param AMQPReader $args
     */
    protected function connection_open_ok($args)
    {
        $this->known_hosts = $args->read_shortstr();
        $this->debug->debug_msg('Open OK! known_hosts: ' . $this->known_hosts);
    }

    /**
     * Asks the client to use a different server
     *
     * @param AMQPReader $args
     * @return string
     */
    protected function connection_redirect($args)
    {
        $host = $args->read_shortstr();
        $this->known_hosts = $args->read_shortstr();
        $this->debug->debug_msg(sprintf(
            'Redirected to [%s], known_hosts [%s]',
            $host,
            $this->known_hosts
        ));

        return $host;
    }

    /**
     * Security mechanism challenge
     *
     * @param AMQPReader $args
     */
    protected function connection_secure($args)
    {
        $args->read_longstr();
    }

    /**
     * Security mechanism response
     *
     * @param string $response
     */
    protected function x_secure_ok($response)
    {
        $args = new AMQPWriter();
        $args->write_longstr($response);
        $this->send_method_frame(array(10, 21), $args);
    }

    /**
     * Starts connection negotiation
     *
     * @param AMQPReader $args
     */
    protected function connection_start($args)
    {
        $this->version_major = $args->read_octet();
        $this->version_minor = $args->read_octet();
        $this->server_properties = $args->read_table();
        $this->mechanisms = explode(' ', $args->read_longstr());
        $this->locales = explode(' ', $args->read_longstr());

        $this->debug->debug_connection_start(
            $this->version_major,
            $this->version_minor,
            $this->server_properties,
            $this->mechanisms,
            $this->locales
        );
    }

    /**
     * @param AMQPTable|array $clientProperties
     * @param string $mechanism
     * @param string $response
     * @param string $locale
     */
    protected function x_start_ok($clientProperties, $mechanism, $response, $locale)
    {
        $args = new AMQPWriter();
        $args->write_table($clientProperties);
        $args->write_shortstr($mechanism);
        $args->write_longstr($response);
        $args->write_shortstr($locale);
        $this->send_method_frame(array(10, 11), $args);
    }

    /**
     * Proposes connection tuning parameters
     *
     * @param AMQPReader $args
     */
    protected function connection_tune($args)
    {
        $v = $args->read_short();
        if ($v) {
            $this->channel_max = $v;
        }

        $v = $args->read_long();
        if ($v) {
            $this->frame_max = (int)$v;
        }

        // use server proposed value if not set
        if ($this->heartbeat === null) {
            $this->heartbeat = $args->read_short();
        }

        $this->x_tune_ok($this->channel_max, $this->frame_max, $this->heartbeat);
    }

    /**
     * Negotiates connection tuning parameters
     *
     * @param int $channel_max
     * @param int $frame_max
     * @param int $heartbeat
     */
    protected function x_tune_ok($channel_max, $frame_max, $heartbeat)
    {
        $args = new AMQPWriter();
        $args->write_short($channel_max);
        $args->write_long($frame_max);
        $args->write_short($heartbeat);
        $this->send_method_frame(array(10, 31), $args);
        $this->wait_tune_ok = false;
    }

    /**
     * @return \PhpAmqpLib\Wire\IO\AbstractIO
     * @deprecated
     */
    public function getIO()
    {
        return $this->io;
    }

    /**
     * Check connection heartbeat if enabled.
     * @throws AMQPHeartbeatMissedException If too much time passed since last connection activity.
     * @throws AMQPConnectionClosedException If connection was closed due to network issues or timeouts.
     * @throws AMQPSocketException If connection was already closed.
     * @throws AMQPTimeoutException If heartbeat write takes too much time.
     * @throws AMQPIOException If other connection problems occurred.
     */
    public function checkHeartBeat()
    {
        $this->io->check_heartbeat();
    }

    /**
     * @return float|int
     */
    public function getLastActivity()
    {
        return $this->io->getLastActivity();
    }

    /**
     * @return float
     * @since 3.2.0
     */
    public function getReadTimeout(): float
    {
        return $this->io->getReadTimeout();
    }

    /**
     * Handles connection blocked notifications
     *
     * @param AMQPReader $args
     */
    protected function connection_blocked(AMQPReader $args)
    {
        $this->blocked = true;
        // Call the block handler and pass in the reason
        $this->dispatch_to_handler($this->connection_block_handler, array($args->read_shortstr()));
    }

    /**
     * Handles connection unblocked notifications
     */
    protected function connection_unblocked()
    {
        $this->blocked = false;
        // No args to an unblock event
        $this->dispatch_to_handler($this->connection_unblock_handler);
    }

    /**
     * Sets a handler which is called whenever a connection.block is sent from the server
     *
     * @param callable $callback
     * @throws \InvalidArgumentException if $callback is not callable
     */
    public function set_connection_block_handler($callback)
    {
        Assert::isCallable($callback);
        $this->connection_block_handler = $callback;
    }

    /**
     * Sets a handler which is called whenever a connection.block is sent from the server
     *
     * @param callable $callback
     * @throws \InvalidArgumentException if $callback is not callable
     */
    public function set_connection_unblock_handler($callback)
    {
        Assert::isCallable($callback);
        $this->connection_unblock_handler = $callback;
    }

    /**
     * Gets the connection status
     *
     * @return bool
     */
    public function isConnected()
    {
        return $this->is_connected;
    }

    /**
     * Get the connection blocked state.
     * @return bool
     * @since 2.12.0
     */
    public function isBlocked()
    {
        return $this->blocked;
    }

    /**
     * Get the io writing state.
     * @return bool
     */
    public function isWriting()
    {
        return $this->writing;
    }

    /**
     * Set the connection status
     *
     * @param bool $is_connected
     */
    protected function setIsConnected($is_connected)
    {
        $this->is_connected = (bool) $is_connected;
    }

    /**
     * Closes all available channels
     */
    protected function closeChannels()
    {
        foreach ($this->channels as $key => $channel) {
            // channels[0] is this connection object, so don't close it yet
            if ($key === 0) {
                continue;
            }
            try {
                $channel->close();
            } catch (\Exception $e) {
                /* Ignore closing errors */
            }
        }
    }

    /**
     * Should the connection be attempted during construction?
     *
     * @return bool
     */
    public function connectOnConstruct()
    {
        return true;
    }

    /**
     * @return array
     */
    public function getServerProperties()
    {
        return $this->server_properties;
    }

    /**
     * @return int
     */
    public function getHeartbeat()
    {
        return $this->heartbeat;
    }

    /**
     * Get the library properties for populating the client protocol information
     *
     * @return array
     */
    public function getLibraryProperties()
    {
        return self::$LIBRARY_PROPERTIES;
    }

    /**
     * @param array $hosts
     * @param array $options
     *
     * @return mixed
     * @throws \Exception
     */
    public static function create_connection($hosts, $options = array())
    {
        if (!is_array($hosts) || count($hosts) < 1) {
            throw new \InvalidArgumentException(
                'An array of hosts are required when attempting to create a connection'
            );
        }

        foreach ($hosts as $hostdef) {
            self::validate_host($hostdef);
            $host = $hostdef['host'];
            $port = $hostdef['port'];
            $user = $hostdef['user'];
            $password = $hostdef['password'];
            $vhost = isset($hostdef['vhost']) ? $hostdef['vhost'] : '/';
            try {
                $conn = static::try_create_connection($host, $port, $user, $password, $vhost, $options);
                return $conn;
            } catch (\Exception $e) {
                $latest_exception = $e;
            }
        }
        throw $latest_exception;
    }

    public static function validate_host($host)
    {
        if (!isset($host['host'])) {
            throw new \InvalidArgumentException("'host' key is required.");
        }
        if (!isset($host['port'])) {
            throw new \InvalidArgumentException("'port' key is required.");
        }
        if (!isset($host['user'])) {
            throw new \InvalidArgumentException("'user' key is required.");
        }
        if (!isset($host['password'])) {
            throw new \InvalidArgumentException("'password' key is required.");
        }
    }
}

Spamworldpro Mini