From d020b843d6bbebe673b27038c84e530fab5d7759 Mon Sep 17 00:00:00 2001 From: viktorprogger Date: Tue, 10 Jun 2025 19:40:27 +0500 Subject: [PATCH 1/4] Optimize message publishing --- src/Adapter.php | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/src/Adapter.php b/src/Adapter.php index 5b25660..c205f36 100644 --- a/src/Adapter.php +++ b/src/Adapter.php @@ -17,6 +17,8 @@ final class Adapter implements AdapterInterface { + private ?AMQPMessage $amqpMessage = null; + public function __construct( private QueueProviderInterface $queueProvider, private readonly MessageSerializerInterface $serializer, @@ -27,8 +29,10 @@ public function __construct( public function withChannel(BackedEnum|string $channel): self { $instance = clone $this; + $channelName = is_string($channel) ? $channel : (string) $channel->value; $instance->queueProvider = $this->queueProvider->withChannelName($channelName); + $instance->amqpMessage = null; return $instance; } @@ -55,12 +59,20 @@ public function status(int|string $id): JobStatus public function push(MessageInterface $message): MessageInterface { - $payload = $this->serializer->serialize($message); - $amqpMessage = new AMQPMessage( - $payload, - array_merge(['message_id' => uniqid(more_entropy: true)], $this->queueProvider->getMessageProperties()) + if (empty($message->getMetadata()[IdEnvelope::MESSAGE_ID_KEY])) { + $message = new IdEnvelope($message, uniqid(more_entropy: true)); + } + + $this->amqpMessage = $this->amqpMessage ?? new AMQPMessage( + '', + $this->queueProvider->getMessageProperties(), ); + $amqpMessage = $this->amqpMessage; + + $payload = $this->serializer->serialize($message); + $amqpMessage->setBody($payload); $exchangeSettings = $this->queueProvider->getExchangeSettings(); + $this->queueProvider ->getChannel() ->basic_publish( @@ -70,10 +82,8 @@ public function push(MessageInterface $message): MessageInterface ->getQueueSettings() ->getName() ); - /** @var string $messageId */ - $messageId = $amqpMessage->get('message_id'); - return new IdEnvelope($message, $messageId); + return $message; } public function subscribe(callable $handlerCallback): void From bba8731062d6fd66a68b0dc807a4263c4fc9bab7 Mon Sep 17 00:00:00 2001 From: viktorprogger Date: Wed, 11 Jun 2025 17:38:25 +0500 Subject: [PATCH 2/4] Remove id generation from the adapter --- src/Adapter.php | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/Adapter.php b/src/Adapter.php index c205f36..0455bf1 100644 --- a/src/Adapter.php +++ b/src/Adapter.php @@ -59,10 +59,6 @@ public function status(int|string $id): JobStatus public function push(MessageInterface $message): MessageInterface { - if (empty($message->getMetadata()[IdEnvelope::MESSAGE_ID_KEY])) { - $message = new IdEnvelope($message, uniqid(more_entropy: true)); - } - $this->amqpMessage = $this->amqpMessage ?? new AMQPMessage( '', $this->queueProvider->getMessageProperties(), From f04904dc3758ae60e135033587972fca2f323be7 Mon Sep 17 00:00:00 2001 From: StyleCI Bot Date: Wed, 11 Jun 2025 12:38:42 +0000 Subject: [PATCH 3/4] Apply fixes from StyleCI --- src/Adapter.php | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Adapter.php b/src/Adapter.php index 0455bf1..8ca0d7b 100644 --- a/src/Adapter.php +++ b/src/Adapter.php @@ -11,7 +11,6 @@ use Yiisoft\Queue\AMQP\Exception\NotImplementedException; use Yiisoft\Queue\Cli\LoopInterface; use Yiisoft\Queue\JobStatus; -use Yiisoft\Queue\Message\IdEnvelope; use Yiisoft\Queue\Message\MessageInterface; use Yiisoft\Queue\Message\MessageSerializerInterface; From efe8c32e775d33b6dfdb6feb829f212759101484 Mon Sep 17 00:00:00 2001 From: viktorprogger <7670669+viktorprogger@users.noreply.github.com> Date: Sun, 22 Jun 2025 12:05:51 +0000 Subject: [PATCH 4/4] Apply Rector changes (CI) --- src/Adapter.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Adapter.php b/src/Adapter.php index 8ca0d7b..bb5ed4f 100644 --- a/src/Adapter.php +++ b/src/Adapter.php @@ -58,7 +58,7 @@ public function status(int|string $id): JobStatus public function push(MessageInterface $message): MessageInterface { - $this->amqpMessage = $this->amqpMessage ?? new AMQPMessage( + $this->amqpMessage ??= new AMQPMessage( '', $this->queueProvider->getMessageProperties(), );