Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
/vendor
/examples
/vendor
55 changes: 55 additions & 0 deletions examples/RMQPoolConnections.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
<?php

use Memcrab\Queue\RabbitMQ;
use Memcrab\Queue\RMQPool;
use Monolog\Handler\StreamHandler;
use Monolog\Logger;

RMQPool::declareConnection(
environment: 'dev',
host: '127.0.0.1',
port: '5672',
username: 'dev',
password: 'dev',
ErrorHandler: new Logger('log'),
waitTimeoutPool: 3,
capacity: 1,
heartbeat: 60
);

# Connections
Co::run(function () {
# Start init and ping connections
RMQPool::obj()->setConnections();

# Get connection from Pool
/** @var RabbitMQ $Queue */
$Queue = RMQPool::obj()->get();

$queueName = "test";
$Queue->sendMessage(
messageBody: ['data' => 'test message'],
routingKey: $queueName
);

# Return connection to Pool
RMQPool::obj()->put($Queue);
});

# Register queue, exchanger and receive messages
Co::run(function () {
$queueName = "test";

/** @var RabbitMQ $Queue */
$Queue = RMQPool::obj()->get();

$Queue->registerQueue($queueName, false, true, false, false);
$Queue->registerExchange($Queue->getLetterExchange(), 'direct', false, true, false);
$Queue->queueBind($queueName, $Queue->getLetterExchange(), $Queue->getLetterRoutineKey($queueName));

$Queue->receiveMessage($queueName, '', false, false, false, false, function ($msg) {
$message = $msg->body;
$params = json_decode($message, true);
# message handling
});
});
5 changes: 4 additions & 1 deletion src/RMQPool.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class RMQPool extends Pool
private static int $port;
private static string $username;
private static string $password;
private static int $heartbeat;

private function __clone()
{
Expand All @@ -36,6 +37,7 @@ public static function declareConnection(
Logger $ErrorHandler,
int $waitTimeoutPool,
int $capacity = self::DEFAULT_CAPACITY,
int $heartbeat = 0,
): void
{
self::$instance = new self($capacity);
Expand All @@ -47,6 +49,7 @@ public static function declareConnection(
self::$port = $port;
self::$username = $username;
self::$password = $password;
self::$heartbeat = $heartbeat;
}

protected function error(\Exception $e): void
Expand All @@ -57,7 +60,7 @@ protected function error(\Exception $e): void
protected function connect(): RabbitMQ|bool
{
try {
return (new RabbitMQ(self::$environment, self::$host, self::$port, self::$username, self::$password, $this->ErrorHandler));
return (new RabbitMQ(self::$environment, self::$host, self::$port, self::$username, self::$password, self::$heartbeat, $this->ErrorHandler));
} catch (\Exception $e) {
$this->error($e);
return false;
Expand Down
53 changes: 37 additions & 16 deletions src/RabbitMQ.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,18 @@ class RabbitMQ
private string $environment;
private Logger $ErrorHandler;
private bool $connectionError = false;
public AMQPStreamConnection $client;
public AMQPChannel $channel;
private AMQPStreamConnection $client;
private AMQPChannel $channel;

public function __construct(string $environment, string $host, int $port, string $username, string $password, Logger $ErrorHandler)
private static string $letterExchange = "dlx";
private static string $letterRoutineKey = "dlrk";

public function __construct(string $environment, string $host, int $port, string $username, string $password, int $heartbeat, Logger $ErrorHandler)
{
$this->environment = $environment;
$this->ErrorHandler = $ErrorHandler;
try {
$this->client = new AMQPStreamConnection($host, $port, $username, $password);
$this->client = new AMQPStreamConnection($host, $port, $username, $password, heartbeat: $heartbeat);
$this->channel = $this->client->channel();
} catch (\Exception $e) {
throw new \Exception("Cant connect to RabbitMQ. " . $e, 500);
Expand All @@ -40,9 +43,19 @@ private function checkIsConnectionError(\Exception $e): bool
return $e instanceof AMQPConnectionClosedException;
}

private function getQueueNameWithEnv(string $name): string
private function getNameWithEnv(string $name): string
{
return $this->environment . ':' . $name;
}

public function getLetterExchange(): string
{
return $this->environment . ':' . self::$letterExchange;
}

public function getLetterRoutineKey(string $name): string
{
return $this->environment . '-' . $name;
return $this->getNameWithEnv($name) . '-' . self::$letterRoutineKey;
}

public function isConnectionError(): bool
Expand All @@ -52,6 +65,7 @@ public function isConnectionError(): bool

public function heartbeat(): void
{
$this->client->getConnection()->checkHeartBeat();
if (!$this->client()->isConnected()) {
throw new \Exception('RabbitMQ Connection check failed');
}
Expand Down Expand Up @@ -79,7 +93,7 @@ public function close(): void
public function registerQueue(string $name, bool $passive = false, bool $durable = false, bool $exclusive = false, bool $auto_delete = false): array
{
try {
$name = $this->getQueueNameWithEnv($name);
$name = $this->getNameWithEnv($name);
$result = $this->channel->queue_declare($name, $passive, $durable, $exclusive, $auto_delete);
} catch (\Exception $e) {
$this->error($e);
Expand All @@ -102,7 +116,6 @@ public function registerQueue(string $name, bool $passive = false, bool $durable
public function registerExchange(string $name, string $type, bool $passive = false, bool $durable = false, bool $auto_delete = false)
{
try {
$name = $this->getQueueNameWithEnv($name);
$result = $this->channel->exchange_declare($name, $type, $passive, $durable, $auto_delete);
} catch (\Exception $e) {
$this->error($e);
Expand All @@ -120,12 +133,20 @@ public function registerExchange(string $name, string $type, bool $passive = fal
* @return true [type]
* @throws \Exception
*/
public function sendMessage(string $name, array $messageBody, string $exchange = '')
public function sendMessage(array $messageBody, string $routingKey, string $exchange = '')
{
try {
$name = $this->getQueueNameWithEnv($name);
$msg = new AMQPMessage(serialize($messageBody));
$this->channel->basic_publish($msg, $exchange, $name);
if (empty($exchange)) {
$exchange = $this->getLetterExchange();
}

$routingKey = self::getLetterRoutineKey($routingKey);

$message = new AMQPMessage(
json_encode($messageBody, JSON_UNESCAPED_UNICODE),
['delivery_mode' => 2]
);
$this->channel->basic_publish($message, $exchange, $routingKey);
} catch (\Exception $e) {
$this->error($e);
throw $e;
Expand Down Expand Up @@ -155,7 +176,7 @@ public function receiveMessage(
)
{
try {
$name = $this->getQueueNameWithEnv($name);
$name = $this->getNameWithEnv($name);
$result = $this->channel->basic_consume($name, $consumer_tag, $no_local, $no_ack, $exclusive, $nowait, $callback);
while ($this->channel->is_open()) {
$this->channel->wait();
Expand All @@ -174,11 +195,11 @@ public function receiveMessage(
* @param string $routing_key
* @return mixed
*/
public function queueBind(string $name, string $exchange, string $routing_key)
public function queueBind(string $queue, string $exchange, string $routing_key)
{
try {
$name = $this->getQueueNameWithEnv($name);
$result = $this->channel->queue_bind($name, $exchange, $routing_key);
$queue = $this->getNameWithEnv($queue);
$result = $this->channel->queue_bind($queue, $exchange, $routing_key);
} catch (\Exception $e) {
$this->error($e);
throw $e;
Expand Down