diff --git a/src/Adapter.php b/src/Adapter.php index 5b25660..bb5ed4f 100644 --- a/src/Adapter.php +++ b/src/Adapter.php @@ -11,12 +11,13 @@ use Yiisoft\Queue\AMQP\Exception\NotImplementedException; use Yiisoft\Queue\Cli\LoopInterface; use Yiisoft\Queue\JobStatus; -use Yiisoft\Queue\Message\IdEnvelope; use Yiisoft\Queue\Message\MessageInterface; use Yiisoft\Queue\Message\MessageSerializerInterface; final class Adapter implements AdapterInterface { + private ?AMQPMessage $amqpMessage = null; + public function __construct( private QueueProviderInterface $queueProvider, private readonly MessageSerializerInterface $serializer, @@ -27,8 +28,10 @@ public function __construct( public function withChannel(BackedEnum|string $channel): self { $instance = clone $this; + $channelName = is_string($channel) ? $channel : (string) $channel->value; $instance->queueProvider = $this->queueProvider->withChannelName($channelName); + $instance->amqpMessage = null; return $instance; } @@ -55,12 +58,16 @@ public function status(int|string $id): JobStatus public function push(MessageInterface $message): MessageInterface { - $payload = $this->serializer->serialize($message); - $amqpMessage = new AMQPMessage( - $payload, - array_merge(['message_id' => uniqid(more_entropy: true)], $this->queueProvider->getMessageProperties()) + $this->amqpMessage ??= new AMQPMessage( + '', + $this->queueProvider->getMessageProperties(), ); + $amqpMessage = $this->amqpMessage; + + $payload = $this->serializer->serialize($message); + $amqpMessage->setBody($payload); $exchangeSettings = $this->queueProvider->getExchangeSettings(); + $this->queueProvider ->getChannel() ->basic_publish( @@ -70,10 +77,8 @@ public function push(MessageInterface $message): MessageInterface ->getQueueSettings() ->getName() ); - /** @var string $messageId */ - $messageId = $amqpMessage->get('message_id'); - return new IdEnvelope($message, $messageId); + return $message; } public function subscribe(callable $handlerCallback): void