diff --git a/.gitignore b/.gitignore index a0a271e..49ce3c1 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1 @@ -/vendor -/examples \ No newline at end of file +/vendor \ No newline at end of file diff --git a/examples/RMQPoolConnections.php b/examples/RMQPoolConnections.php new file mode 100644 index 0000000..8fcbe2b --- /dev/null +++ b/examples/RMQPoolConnections.php @@ -0,0 +1,55 @@ +setConnections(); + + # Get connection from Pool + /** @var RabbitMQ $Queue */ + $Queue = RMQPool::obj()->get(); + + $queueName = "test"; + $Queue->sendMessage( + messageBody: ['data' => 'test message'], + routingKey: $queueName + ); + + # Return connection to Pool + RMQPool::obj()->put($Queue); +}); + +# Register queue, exchanger and receive messages +Co::run(function () { + $queueName = "test"; + + /** @var RabbitMQ $Queue */ + $Queue = RMQPool::obj()->get(); + + $Queue->registerQueue($queueName, false, true, false, false); + $Queue->registerExchange($Queue->getLetterExchange(), 'direct', false, true, false); + $Queue->queueBind($queueName, $Queue->getLetterExchange(), $Queue->getLetterRoutineKey($queueName)); + + $Queue->receiveMessage($queueName, '', false, false, false, false, function ($msg) { + $message = $msg->body; + $params = json_decode($message, true); + # message handling + }); +}); \ No newline at end of file diff --git a/src/RMQPool.php b/src/RMQPool.php index 3f18409..cf27e17 100644 --- a/src/RMQPool.php +++ b/src/RMQPool.php @@ -13,6 +13,7 @@ class RMQPool extends Pool private static int $port; private static string $username; private static string $password; + private static int $heartbeat; private function __clone() { @@ -36,6 +37,7 @@ public static function declareConnection( Logger $ErrorHandler, int $waitTimeoutPool, int $capacity = self::DEFAULT_CAPACITY, + int $heartbeat = 0, ): void { self::$instance = new self($capacity); @@ -47,6 +49,7 @@ public static function declareConnection( self::$port = $port; self::$username = $username; self::$password = $password; + self::$heartbeat = $heartbeat; } protected function error(\Exception $e): void @@ -57,7 +60,7 @@ protected function error(\Exception $e): void protected function connect(): RabbitMQ|bool { try { - return (new RabbitMQ(self::$environment, self::$host, self::$port, self::$username, self::$password, $this->ErrorHandler)); + return (new RabbitMQ(self::$environment, self::$host, self::$port, self::$username, self::$password, self::$heartbeat, $this->ErrorHandler)); } catch (\Exception $e) { $this->error($e); return false; diff --git a/src/RabbitMQ.php b/src/RabbitMQ.php index f9e5da7..f328f5f 100644 --- a/src/RabbitMQ.php +++ b/src/RabbitMQ.php @@ -14,15 +14,18 @@ class RabbitMQ private string $environment; private Logger $ErrorHandler; private bool $connectionError = false; - public AMQPStreamConnection $client; - public AMQPChannel $channel; + private AMQPStreamConnection $client; + private AMQPChannel $channel; - public function __construct(string $environment, string $host, int $port, string $username, string $password, Logger $ErrorHandler) + private static string $letterExchange = "dlx"; + private static string $letterRoutineKey = "dlrk"; + + public function __construct(string $environment, string $host, int $port, string $username, string $password, int $heartbeat, Logger $ErrorHandler) { $this->environment = $environment; $this->ErrorHandler = $ErrorHandler; try { - $this->client = new AMQPStreamConnection($host, $port, $username, $password); + $this->client = new AMQPStreamConnection($host, $port, $username, $password, heartbeat: $heartbeat); $this->channel = $this->client->channel(); } catch (\Exception $e) { throw new \Exception("Cant connect to RabbitMQ. " . $e, 500); @@ -40,9 +43,19 @@ private function checkIsConnectionError(\Exception $e): bool return $e instanceof AMQPConnectionClosedException; } - private function getQueueNameWithEnv(string $name): string + private function getNameWithEnv(string $name): string + { + return $this->environment . ':' . $name; + } + + public function getLetterExchange(): string + { + return $this->environment . ':' . self::$letterExchange; + } + + public function getLetterRoutineKey(string $name): string { - return $this->environment . '-' . $name; + return $this->getNameWithEnv($name) . '-' . self::$letterRoutineKey; } public function isConnectionError(): bool @@ -52,6 +65,7 @@ public function isConnectionError(): bool public function heartbeat(): void { + $this->client->getConnection()->checkHeartBeat(); if (!$this->client()->isConnected()) { throw new \Exception('RabbitMQ Connection check failed'); } @@ -79,7 +93,7 @@ public function close(): void public function registerQueue(string $name, bool $passive = false, bool $durable = false, bool $exclusive = false, bool $auto_delete = false): array { try { - $name = $this->getQueueNameWithEnv($name); + $name = $this->getNameWithEnv($name); $result = $this->channel->queue_declare($name, $passive, $durable, $exclusive, $auto_delete); } catch (\Exception $e) { $this->error($e); @@ -102,7 +116,6 @@ public function registerQueue(string $name, bool $passive = false, bool $durable public function registerExchange(string $name, string $type, bool $passive = false, bool $durable = false, bool $auto_delete = false) { try { - $name = $this->getQueueNameWithEnv($name); $result = $this->channel->exchange_declare($name, $type, $passive, $durable, $auto_delete); } catch (\Exception $e) { $this->error($e); @@ -120,12 +133,20 @@ public function registerExchange(string $name, string $type, bool $passive = fal * @return true [type] * @throws \Exception */ - public function sendMessage(string $name, array $messageBody, string $exchange = '') + public function sendMessage(array $messageBody, string $routingKey, string $exchange = '') { try { - $name = $this->getQueueNameWithEnv($name); - $msg = new AMQPMessage(serialize($messageBody)); - $this->channel->basic_publish($msg, $exchange, $name); + if (empty($exchange)) { + $exchange = $this->getLetterExchange(); + } + + $routingKey = self::getLetterRoutineKey($routingKey); + + $message = new AMQPMessage( + json_encode($messageBody, JSON_UNESCAPED_UNICODE), + ['delivery_mode' => 2] + ); + $this->channel->basic_publish($message, $exchange, $routingKey); } catch (\Exception $e) { $this->error($e); throw $e; @@ -155,7 +176,7 @@ public function receiveMessage( ) { try { - $name = $this->getQueueNameWithEnv($name); + $name = $this->getNameWithEnv($name); $result = $this->channel->basic_consume($name, $consumer_tag, $no_local, $no_ack, $exclusive, $nowait, $callback); while ($this->channel->is_open()) { $this->channel->wait(); @@ -174,11 +195,11 @@ public function receiveMessage( * @param string $routing_key * @return mixed */ - public function queueBind(string $name, string $exchange, string $routing_key) + public function queueBind(string $queue, string $exchange, string $routing_key) { try { - $name = $this->getQueueNameWithEnv($name); - $result = $this->channel->queue_bind($name, $exchange, $routing_key); + $queue = $this->getNameWithEnv($queue); + $result = $this->channel->queue_bind($queue, $exchange, $routing_key); } catch (\Exception $e) { $this->error($e); throw $e;