diff --git a/src/Adapter.php b/src/Adapter.php index a6f041d..8d7d581 100644 --- a/src/Adapter.php +++ b/src/Adapter.php @@ -14,6 +14,8 @@ final class Adapter implements AdapterInterface { + private ?AMQPMessage $message = null; + public function __construct( private QueueProviderInterface $queueProvider, private MessageSerializerInterface $serializer, @@ -52,10 +54,10 @@ public function status(string $id): JobStatus public function push(MessageInterface $message): void { $payload = $this->serializer->serialize($message); - $amqpMessage = new AMQPMessage( - $payload, - array_merge(['message_id' => uniqid(more_entropy: true)], $this->queueProvider->getMessageProperties()) - ); + $amqpMessage = $this->getAmqpMessage(); + $amqpMessage->setBody($payload); + $amqpMessage->set('message_id', $message->getId()); + $exchangeSettings = $this->queueProvider->getExchangeSettings(); $this->queueProvider ->getChannel() @@ -66,9 +68,6 @@ public function push(MessageInterface $message): void ->getQueueSettings() ->getName() ); - /** @var string $messageId */ - $messageId = $amqpMessage->get('message_id'); - $message->setId($messageId); } public function subscribe(callable $handlerCallback): void @@ -117,4 +116,13 @@ public function withQueueProvider(QueueProviderInterface $queueProvider): self return $new; } + + private function getAmqpMessage(): AMQPMessage + { + if ($this->message === null) { + $this->message = new AMQPMessage('', $this->queueProvider->getMessageProperties()); + } + + return $this->message; + } } diff --git a/src/Middleware/MessageIdGeneratingMiddleware.php b/src/Middleware/MessageIdGeneratingMiddleware.php new file mode 100644 index 0000000..fee0257 --- /dev/null +++ b/src/Middleware/MessageIdGeneratingMiddleware.php @@ -0,0 +1,22 @@ +getMessage(); + if ($message->getId() === null) { + $message->setId(uniqid(more_entropy: true)); + } + + return $handler->handlePush($request); + } +} diff --git a/tests/Unit/QueueTest.php b/tests/Unit/QueueTest.php index 642d297..2e7ccdc 100644 --- a/tests/Unit/QueueTest.php +++ b/tests/Unit/QueueTest.php @@ -10,6 +10,7 @@ use Yiisoft\Yii\Queue\AMQP\Exception\NotImplementedException; use Yiisoft\Yii\Queue\AMQP\MessageSerializer; use Yiisoft\Yii\Queue\AMQP\MessageSerializerInterface; +use Yiisoft\Yii\Queue\AMQP\Middleware\MessageIdGeneratingMiddleware; use Yiisoft\Yii\Queue\AMQP\QueueProvider; use Yiisoft\Yii\Queue\AMQP\QueueProviderInterface; use Yiisoft\Yii\Queue\AMQP\Settings\Exchange as ExchangeSettings; @@ -37,11 +38,14 @@ public function testStatus(): void $message = new Message('ext-simple', null); $queue->push( $message, + new MessageIdGeneratingMiddleware() ); + $messageId = $message->getId(); + $this->assertNotNull($messageId); $this->expectException(NotImplementedException::class); $this->expectExceptionMessage("Status check is not supported by the adapter $adapterClass."); - $adapter->status($message->getId()); + $adapter->status($messageId); } /**