Skip to content
22 changes: 15 additions & 7 deletions src/Adapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

final class Adapter implements AdapterInterface
{
private ?AMQPMessage $message = null;

public function __construct(
private QueueProviderInterface $queueProvider,
private MessageSerializerInterface $serializer,
Expand Down Expand Up @@ -52,10 +54,10 @@ public function status(string $id): JobStatus
public function push(MessageInterface $message): void
{
$payload = $this->serializer->serialize($message);
$amqpMessage = new AMQPMessage(
$payload,
array_merge(['message_id' => uniqid(more_entropy: true)], $this->queueProvider->getMessageProperties())
);
$amqpMessage = $this->getAmqpMessage();
$amqpMessage->setBody($payload);
$amqpMessage->set('message_id', $message->getId());

$exchangeSettings = $this->queueProvider->getExchangeSettings();
$this->queueProvider
->getChannel()
Expand All @@ -66,9 +68,6 @@ public function push(MessageInterface $message): void
->getQueueSettings()
->getName()
);
/** @var string $messageId */
$messageId = $amqpMessage->get('message_id');
$message->setId($messageId);
}

public function subscribe(callable $handlerCallback): void
Expand Down Expand Up @@ -117,4 +116,13 @@ public function withQueueProvider(QueueProviderInterface $queueProvider): self

return $new;
}

private function getAmqpMessage(): AMQPMessage
{
if ($this->message === null) {
$this->message = new AMQPMessage('', $this->queueProvider->getMessageProperties());
}

return $this->message;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scrutinizer review

The expression return $this->message could return the type null which is incompatible with the type-hinted return PhpAmqpLib\Message\AMQPMessage. Consider adding an additional type-check to rule them out.

}
}
22 changes: 22 additions & 0 deletions src/Middleware/MessageIdGeneratingMiddleware.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<?php

declare(strict_types=1);

namespace Yiisoft\Yii\Queue\AMQP\Middleware;

use Yiisoft\Yii\Queue\Middleware\Push\MessageHandlerPushInterface;
use Yiisoft\Yii\Queue\Middleware\Push\MiddlewarePushInterface;
use Yiisoft\Yii\Queue\Middleware\Push\PushRequest;

final class MessageIdGeneratingMiddleware implements MiddlewarePushInterface
{
public function processPush(PushRequest $request, MessageHandlerPushInterface $handler): PushRequest
{
$message = $request->getMessage();
if ($message->getId() === null) {
$message->setId(uniqid(more_entropy: true));
}

return $handler->handlePush($request);
}
}
6 changes: 5 additions & 1 deletion tests/Unit/QueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use Yiisoft\Yii\Queue\AMQP\Exception\NotImplementedException;
use Yiisoft\Yii\Queue\AMQP\MessageSerializer;
use Yiisoft\Yii\Queue\AMQP\MessageSerializerInterface;
use Yiisoft\Yii\Queue\AMQP\Middleware\MessageIdGeneratingMiddleware;
use Yiisoft\Yii\Queue\AMQP\QueueProvider;
use Yiisoft\Yii\Queue\AMQP\QueueProviderInterface;
use Yiisoft\Yii\Queue\AMQP\Settings\Exchange as ExchangeSettings;
Expand Down Expand Up @@ -37,11 +38,14 @@ public function testStatus(): void
$message = new Message('ext-simple', null);
$queue->push(
$message,
new MessageIdGeneratingMiddleware()
);

$messageId = $message->getId();
$this->assertNotNull($messageId);
$this->expectException(NotImplementedException::class);
$this->expectExceptionMessage("Status check is not supported by the adapter $adapterClass.");
$adapter->status($message->getId());
$adapter->status($messageId);
}

/**
Expand Down