diff --git a/src/Adapter.php b/src/Adapter.php index c503559..2d05b9e 100644 --- a/src/Adapter.php +++ b/src/Adapter.php @@ -14,11 +14,12 @@ final class Adapter implements AdapterInterface { public function __construct( - private QueueProviderInterface $provider, + private QueueProviderInterface $provider, private MessageSerializerInterface $serializer, - private LoopInterface $loop, - private int $timeout = 3 - ) { + private LoopInterface $loop, + private int $timeout = 3 + ) + { } public function runExisting(callable $handlerCallback): void @@ -66,15 +67,18 @@ public function push(MessageInterface $message): MessageInterface public function subscribe(callable $handlerCallback): void { - while ($this->loop->canContinue()) { + $continue = true; + while ($continue) { $message = $this->reserve(); if (null === $message) { + $continue = $this->loop->canContinue(); continue; } $result = $handlerCallback($message); - if ($result) { - $this->provider->delete((string) $message->getId()); + $this->provider->delete((string) $message->getId()); + if (!$result) { + $continue = false; } } } @@ -99,4 +103,9 @@ private function reserve(): ?IdEnvelope return $envelope; } + + public function getChannelName(): string + { + return $this->provider->getChannelName(); + } } diff --git a/src/Message/Message.php b/src/Message/Message.php index 2c5cc9d..52c72fd 100644 --- a/src/Message/Message.php +++ b/src/Message/Message.php @@ -10,10 +10,11 @@ final class Message implements MessageInterface { public function __construct( private string $handlerName, - private mixed $data, - private array $metadata, - private int $delay = 0 //delay in seconds - ) { + private mixed $data, + private array $metadata, + private int $delay = 0 //delay in seconds + ) + { if ($this->delay > 0) { $this->metadata['delay'] = $delay; } @@ -40,4 +41,9 @@ public function getMetadata(): array { return $this->metadata; } + + public static function fromData(string $handlerName, mixed $data, array $metadata = []): MessageInterface + { + return new self($handlerName, $data, $metadata, $metadata['delay'] ? (int)$metadata['delay'] : 0); + } } diff --git a/src/QueueProvider.php b/src/QueueProvider.php index 5ed38ce..fe42b65 100644 --- a/src/QueueProvider.php +++ b/src/QueueProvider.php @@ -17,7 +17,8 @@ class QueueProvider implements QueueProviderInterface public function __construct( private \Redis $redis, //redis connection, private string $channelName = self::DEFAULT_CHANNEL_NAME - ) { + ) + { } /** @@ -152,4 +153,9 @@ private function checkConnection(): void throw new NotConnectedRedisException('Redis is not connected.'); } } + + public function getChannelName(): string + { + return $this->channelName; + } } diff --git a/src/QueueProviderInterface.php b/src/QueueProviderInterface.php index 1bce6fc..f59b523 100644 --- a/src/QueueProviderInterface.php +++ b/src/QueueProviderInterface.php @@ -20,4 +20,6 @@ public function existInWaiting(int $id): bool; public function existInReserved(int $id): bool; public function withChannelName(string $channelName): self; + + public function getChannelName(): string; } diff --git a/tests/Integration/QueueProviderTest.php b/tests/Integration/QueueProviderTest.php index 09d2281..9b42327 100644 --- a/tests/Integration/QueueProviderTest.php +++ b/tests/Integration/QueueProviderTest.php @@ -44,4 +44,16 @@ public function testDelay(QueueProvider $provider): void $reserv = $provider->reserve($id); $this->assertNotNull($reserv); } + + /** + * @depends test__construct + */ + public function testWithChannelName(QueueProvider $provider): void + { + self::assertEquals('test', $provider->getChannelName()); + $providerOther = $provider->withChannelName('test'); + self::assertEquals($providerOther->getChannelName(), $provider->getChannelName()); + $providerOther = $provider->withChannelName('test1'); + self::assertEquals('test1', $providerOther->getChannelName()); + } } diff --git a/tests/Integration/QueueTest.php b/tests/Integration/QueueTest.php index 2882e40..9276fbc 100644 --- a/tests/Integration/QueueTest.php +++ b/tests/Integration/QueueTest.php @@ -15,6 +15,7 @@ use Yiisoft\Queue\Redis\Adapter; use Yiisoft\Queue\Redis\QueueProvider; use Yiisoft\Queue\Redis\QueueProviderInterface; +use Yiisoft\Queue\Redis\Reserve; use Yiisoft\Queue\Redis\Tests\Support\FileHelper; use Yiisoft\Queue\Redis\Tests\Support\IntegrationTestCase; @@ -85,6 +86,7 @@ public function testListen(): void $mockLoop, ); $queue = $this->getDefaultQueue($adapter); + self::assertEquals('yii-queue', $adapter->getChannelName()); $queue->push( new Message('ext-simple', ['file_name' => 'test-listen' . $time, 'payload' => ['time' => $time]]) @@ -124,7 +126,10 @@ public function testAdapterStatusException() public function testAdapterNullMessage() { $provider = $this->createMock(QueueProviderInterface::class); - $provider->method('reserve')->willReturn(null); + $provider->method('reserve')->willReturnOnConsecutiveCalls( + null, null, null, new Reserve(1, '{"name":"handler"}') + ); + $provider->method('delete'); $mockLoop = $this->createMock(LoopInterface::class); $mockLoop->expects($this->exactly(2))->method('canContinue')->willReturn(true, false); @@ -145,5 +150,11 @@ public function testAdapterNullMessage() $notUseHandler = false; }); $this->assertTrue($notUseHandler); + + $adapter->subscribe(function (MessageInterface $message) use (&$notUseHandler): mixed { + $notUseHandler = false; + return null; + }); + $this->assertFalse($notUseHandler); } } diff --git a/tests/Unit/Message/MessageTest.php b/tests/Unit/Message/MessageTest.php index d982c14..e47c87c 100644 --- a/tests/Unit/Message/MessageTest.php +++ b/tests/Unit/Message/MessageTest.php @@ -40,4 +40,15 @@ public function testWithDelay(): void $this->assertNotSame($message, $delayedMessage); $this->assertEquals(5, $delayedMessage->getMetadata()['delay']); } + + public function testFromData(): void + { + $message = Message::fromData('test-handler', ['data' => 'test-data'], ['delay' => 2]); + self::assertEquals('test-handler', $message->getHandlerName()); + self::assertEquals(['data' => 'test-data'], $message->getData()); + self::assertEquals(['delay' => 2], $message->getMetadata()); + + $message = Message::fromData('test-handler', ['data' => 'test-data'], ['delay' => '3']); + self::assertEquals(['delay' => 3], $message->getMetadata()); + } } diff --git a/tests/Unit/QueueProviderTest.php b/tests/Unit/QueueProviderTest.php index 8458639..9d44183 100644 --- a/tests/Unit/QueueProviderTest.php +++ b/tests/Unit/QueueProviderTest.php @@ -20,6 +20,14 @@ public function test__construct() return $provider; } + /** + * @depends test__construct + */ + public function testGetChannelName(QueueProvider $provider) + { + self::assertEquals('test', $provider->getChannelName()); + } + /** * @depends test__construct * @throws \PHPUnit\Framework\MockObject\Exception