Skip to content
21 changes: 13 additions & 8 deletions src/Adapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@
use Yiisoft\Queue\AMQP\Exception\NotImplementedException;
use Yiisoft\Queue\Cli\LoopInterface;
use Yiisoft\Queue\JobStatus;
use Yiisoft\Queue\Message\IdEnvelope;
use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\Message\MessageSerializerInterface;

final class Adapter implements AdapterInterface
{
private ?AMQPMessage $amqpMessage = null;

public function __construct(
private QueueProviderInterface $queueProvider,
private readonly MessageSerializerInterface $serializer,
Expand All @@ -27,8 +28,10 @@
public function withChannel(BackedEnum|string $channel): self
{
$instance = clone $this;

$channelName = is_string($channel) ? $channel : (string) $channel->value;
$instance->queueProvider = $this->queueProvider->withChannelName($channelName);
$instance->amqpMessage = null;

return $instance;
}
Expand All @@ -55,12 +58,16 @@

public function push(MessageInterface $message): MessageInterface
{
$payload = $this->serializer->serialize($message);
$amqpMessage = new AMQPMessage(
$payload,
array_merge(['message_id' => uniqid(more_entropy: true)], $this->queueProvider->getMessageProperties())
$this->amqpMessage ??= new AMQPMessage(
'',
$this->queueProvider->getMessageProperties(),
);
$amqpMessage = $this->amqpMessage;

$payload = $this->serializer->serialize($message);
$amqpMessage->setBody($payload);
$exchangeSettings = $this->queueProvider->getExchangeSettings();

$this->queueProvider
->getChannel()
->basic_publish(
Expand All @@ -70,10 +77,8 @@
->getQueueSettings()
->getName()
);
/** @var string $messageId */
$messageId = $amqpMessage->get('message_id');

return new IdEnvelope($message, $messageId);
return $message;
}

public function subscribe(callable $handlerCallback): void
Expand All @@ -86,10 +91,10 @@
$this->queueProvider
->getQueueSettings()
->getName(),
false,

Check warning on line 94 in src/Adapter.php

View workflow job for this annotation

GitHub Actions / PHP 8.4-ubuntu-latest

Escaped Mutant for Mutator "FalseValue": @@ @@ public function subscribe(callable $handlerCallback): void { $channel = $this->queueProvider->getChannel(); - $channel->basic_consume($this->queueProvider->getQueueSettings()->getName(), $this->queueProvider->getQueueSettings()->getName(), false, false, false, true, function (AMQPMessage $amqpMessage) use ($handlerCallback, $channel): void { + $channel->basic_consume($this->queueProvider->getQueueSettings()->getName(), $this->queueProvider->getQueueSettings()->getName(), true, false, false, true, function (AMQPMessage $amqpMessage) use ($handlerCallback, $channel): void { try { $handlerCallback($this->serializer->unserialize($amqpMessage->getBody())); $channel->basic_ack($amqpMessage->getDeliveryTag());
false,
false,

Check warning on line 96 in src/Adapter.php

View workflow job for this annotation

GitHub Actions / PHP 8.4-ubuntu-latest

Escaped Mutant for Mutator "FalseValue": @@ @@ public function subscribe(callable $handlerCallback): void { $channel = $this->queueProvider->getChannel(); - $channel->basic_consume($this->queueProvider->getQueueSettings()->getName(), $this->queueProvider->getQueueSettings()->getName(), false, false, false, true, function (AMQPMessage $amqpMessage) use ($handlerCallback, $channel): void { + $channel->basic_consume($this->queueProvider->getQueueSettings()->getName(), $this->queueProvider->getQueueSettings()->getName(), false, false, true, true, function (AMQPMessage $amqpMessage) use ($handlerCallback, $channel): void { try { $handlerCallback($this->serializer->unserialize($amqpMessage->getBody())); $channel->basic_ack($amqpMessage->getDeliveryTag());
true,

Check warning on line 97 in src/Adapter.php

View workflow job for this annotation

GitHub Actions / PHP 8.4-ubuntu-latest

Escaped Mutant for Mutator "TrueValue": @@ @@ public function subscribe(callable $handlerCallback): void { $channel = $this->queueProvider->getChannel(); - $channel->basic_consume($this->queueProvider->getQueueSettings()->getName(), $this->queueProvider->getQueueSettings()->getName(), false, false, false, true, function (AMQPMessage $amqpMessage) use ($handlerCallback, $channel): void { + $channel->basic_consume($this->queueProvider->getQueueSettings()->getName(), $this->queueProvider->getQueueSettings()->getName(), false, false, false, false, function (AMQPMessage $amqpMessage) use ($handlerCallback, $channel): void { try { $handlerCallback($this->serializer->unserialize($amqpMessage->getBody())); $channel->basic_ack($amqpMessage->getDeliveryTag());
function (AMQPMessage $amqpMessage) use ($handlerCallback, $channel): void {
try {
$handlerCallback($this->serializer->unserialize($amqpMessage->getBody()));
Expand All @@ -97,7 +102,7 @@
} catch (Throwable $exception) {
$consumerTag = $amqpMessage->getConsumerTag();
if ($consumerTag !== null) {
$channel->basic_cancel($consumerTag);

Check warning on line 105 in src/Adapter.php

View workflow job for this annotation

GitHub Actions / PHP 8.4-ubuntu-latest

Escaped Mutant for Mutator "MethodCallRemoval": @@ @@ } catch (Throwable $exception) { $consumerTag = $amqpMessage->getConsumerTag(); if ($consumerTag !== null) { - $channel->basic_cancel($consumerTag); + } throw $exception; }
}

throw $exception;
Expand Down
Loading