From 0d2081faef99c52add12949001cadb797bbf9eb5 Mon Sep 17 00:00:00 2001 From: Marc Bischof Date: Mon, 23 Feb 2026 15:38:13 +0100 Subject: [PATCH 1/5] Implement retryable message handling --- .../Command/RetryQueueCommandController.php | 103 +++ Classes/Queue/AzureQueueStorage.php | 290 +++++++-- Classes/Queue/RetryableQueueInterface.php | 18 + Tests/Unit/Queue/AzureStorageQueueTest.php | 615 +++++++++++++++++- 4 files changed, 981 insertions(+), 45 deletions(-) create mode 100644 Classes/Command/RetryQueueCommandController.php create mode 100644 Classes/Queue/RetryableQueueInterface.php diff --git a/Classes/Command/RetryQueueCommandController.php b/Classes/Command/RetryQueueCommandController.php new file mode 100644 index 0000000..acced81 --- /dev/null +++ b/Classes/Command/RetryQueueCommandController.php @@ -0,0 +1,103 @@ +getQueue($queue); + + try { + $rows = [[ + $queueInstance->getName(), + $queueInstance->countReady(), + $queueInstance->countReserved(), + $queueInstance->countFailed(), + ]]; + } catch (Exception $e) { + $rows = [[$queueInstance->getName(), '-', '-', '-']]; + } + + $this->output->outputTable($rows, ['Queue', '# ready', '# reserved', '# failed']); + } + + public function retryAllCommand(string $queue, int $delay = 0): void + { + $queueInstance = $this->getQueue($queue); + + if ($queueInstance->countFailed() === 0) { + $this->outputLine('No messages in poison queue'); + return; + } + + $count = $queueInstance->retryAllFailed(['delay' => $delay]); + $this->outputLine('Retried %d messages from poison queue', [$count]); + } + + /** + * Discard all messages from the poison queue + * + * @param string $queue Name of the base queue + */ + public function discardAllCommand(string $queue): void + { + $queueInstance = $this->getQueue($queue); + + if ($queueInstance->countFailed() === 0) { + $this->outputLine('No messages in poison queue'); + return; + } + + $count = $queueInstance->discardAllFailed(); + $this->outputLine('Discarded %d messages from poison queue', [$count]); + } + + /** + * Show up to $limit messages from the poison queue without consuming them + * + * @param string $queue Name of the base queue + * @param int $limit Number of messages to peek at + */ + public function peekCommand(string $queue, int $limit = 5): void + { + $queueInstance = $this->getQueue($queue); + $messages = $queueInstance->peekFailed($limit); + + if (empty($messages)) { + $this->outputLine('No messages in poison queue'); + return; + } + + $rows = []; + foreach ($messages as $message) { + $rows[] = [$message->getIdentifier(), json_encode($message->getPayload())]; + } + $this->output->outputTable($rows, ['Message ID', 'Payload']); + } + + private function getQueue(string $queue): RetryableQueueInterface + { + $queueInstance = $this->queueManager->getQueue($queue); + if (!$queueInstance instanceof RetryableQueueInterface) { + $this->outputLine('Queue "%s" is not a RetryableQueueInterface queue', [$queue]); + $this->quit(1); + } + return $queueInstance; + } +} diff --git a/Classes/Queue/AzureQueueStorage.php b/Classes/Queue/AzureQueueStorage.php index 7b2ffce..a19a475 100644 --- a/Classes/Queue/AzureQueueStorage.php +++ b/Classes/Queue/AzureQueueStorage.php @@ -23,7 +23,7 @@ * A queue implementation using Azure Storage Queue as the queue backend * with claim check pattern for large messages */ -class AzureQueueStorage implements QueueInterface +class AzureQueueStorage implements QueueInterface, RetryableQueueInterface { protected string $name; @@ -272,15 +272,7 @@ public function submit($payload, array $options = []): string } catch (Exception $e) { // Clean up blob if it was created but queue message failed if ($isClaimCheck && isset($blobName)) { - try { - $this->getBlobService()->deleteBlob($this->containerName, $blobName); - } catch (Exception $cleanupException) { - // Log cleanup failure but don't throw - $this->systemLogger->error('Failed to clean up blob after message submission failure', [ - 'blobName' => $blobName, - 'error' => $cleanupException->getMessage(), - ]); - } + $this->deleteBlobIfPresent($blobName); } throw new JobQueueException('Failed to submit message: ' . $e->getMessage(), 1234567894); } @@ -305,12 +297,8 @@ public function waitAndTake(?int $timeout = null): ?Message ); // Clean up blob if it exists - if ($message->getBlobName()) { - try { - $this->getBlobService()->deleteBlob($this->containerName, $message->getBlobName()); - } catch (Exception $e) { - $this->systemLogger->warning('Failed to delete blob after message take', ['error' => $e->getMessage()]); - } + if ($blobName = $message->getBlobName()) { + $this->deleteBlobIfPresent($blobName); } return $message; @@ -426,14 +414,7 @@ public function abort(string $messageId): void // Clean up blob if it exists if (!empty($messageInfo['blobName'])) { - try { - $this->getBlobService()->deleteBlob($this->containerName, $messageInfo['blobName']); - } catch (Exception $e) { - $this->systemLogger->warning('Failed to delete blob after message abort', [ - 'blobName' => $messageInfo['blobName'], - 'error' => $e->getMessage(), - ]); - } + $this->deleteBlobIfPresent($messageInfo['blobName']); } } @@ -472,14 +453,7 @@ public function finish(string $messageId): bool // Clean up blob if it exists if (!empty($messageInfo['blobName'])) { - try { - $this->getBlobService()->deleteBlob($this->containerName, $messageInfo['blobName']); - } catch (Exception $e) { - $this->systemLogger->warning('Failed to delete blob after message finish', [ - 'blobName' => $messageInfo['blobName'], - 'error' => $e->getMessage(), - ]); - } + $this->deleteBlobIfPresent($messageInfo['blobName']); } unset($this->reservedMessages[$messageId]); @@ -637,17 +611,163 @@ public function flush(): void $listBlobOptions->setPrefix(sprintf('queue-%s/', $this->name)); $blobList = $this->getBlobService()->listBlobs($this->containerName, $listBlobOptions); foreach ($blobList->getBlobs() as $blob) { - try { - $this->getBlobService()->deleteBlob($this->containerName, $blob->getName()); - } catch (Exception $e) { - // Continue with other blobs - } + $this->deleteBlobIfPresent($blob->getName()); } } catch (Exception $e) { throw new JobQueueException('Failed to flush queue: ' . $e->getMessage(), 1234567902); } } + /** + * @inheritdoc + */ + public function peekFailed(int $limit = 1): array + { + if (!$this->usePoisonQueue) { + return []; + } + + $messages = []; + $remaining = $limit; + + // peekMessages is capped at 32 per call and has no cursor, so we can + // only return up to 32 distinct messages per invocation. + while ($remaining > 0) { + $batchSize = min($remaining, $this->peekLimit); + $options = new PeekMessagesOptions(); + $options->setNumberOfMessages($batchSize); + + try { + $result = $this->getQueueService()->peekMessages($this->poisonQueueName, $options); + } catch (Exception $e) { + if ($e->getCode() === 404) { + break; + } + throw new JobQueueException('Failed to peek poison queue: ' . $e->getMessage(), 1234567930); + } + + $batch = $result->getQueueMessages(); + foreach ($batch as $queueMessage) { + $messages[] = $this->createMessageFromQueueMessage($queueMessage); + } + + // Azure peek has no cursor — a second call returns the same messages + break; + } + + return $messages; + } + + /** + * @inheritdoc + */ + public function retryAllFailed(array $options = []): int + { + if (!$this->usePoisonQueue) { + return 0; + } + + $delay = (int)($options['delay'] ?? 0); + $count = 0; + + while (true) { + $listOptions = new ListMessagesOptions(); + $listOptions->setNumberOfMessages(1); + $listOptions->setVisibilityTimeoutInSeconds($this->visibilityTimeout); + + try { + $result = $this->getQueueService()->listMessages($this->poisonQueueName, $listOptions); + } catch (Exception $e) { + if ($e->getCode() === 404) { + break; + } + throw new JobQueueException('Failed to list poison queue: ' . $e->getMessage(), 1234567931); + } + + $queueMessages = $result->getQueueMessages(); + if (empty($queueMessages)) { + break; + } + + $message = $this->createMessageFromQueueMessage($queueMessages[0], $queueMessages[0]->getPopReceipt()); + + try { + $this->requeuePoisonMessage($message, $delay); + $this->getQueueService()->deleteMessage( + $this->poisonQueueName, + $message->getQueueMessageId(), + $message->getPopReceipt() + ); + $count++; + } catch (Exception $e) { + // Release the message back to the poison queue so it isn't silently lost + try { + $this->getQueueService()->updateMessage( + $this->poisonQueueName, + $message->getQueueMessageId(), + $message->getPopReceipt(), + '', + 0 + ); + } catch (Exception $releaseEx) { + $this->systemLogger->error('Failed to release poison message after retry error', [ + 'messageId' => $message->getIdentifier(), + 'error' => $releaseEx->getMessage(), + ]); + } + throw new JobQueueException( + 'Failed to retry poison message ' . $message->getIdentifier() . ': ' . $e->getMessage(), + 1234567932 + ); + } + } + + return $count; + } + /** + * @inheritdoc + */ + public function discardAllFailed(): int + { + if (!$this->usePoisonQueue) { + return 0; + } + + $count = 0; + + while (true) { + $listOptions = new ListMessagesOptions(); + $listOptions->setNumberOfMessages(1); + $listOptions->setVisibilityTimeoutInSeconds($this->visibilityTimeout); + + try { + $result = $this->getQueueService()->listMessages($this->poisonQueueName, $listOptions); + } catch (Exception $e) { + if ($e->getCode() === 404) { + break; + } + throw new JobQueueException('Failed to list poison queue: ' . $e->getMessage(), 1234567933); + } + + $queueMessages = $result->getQueueMessages(); + if (empty($queueMessages)) { + break; + } + + $queueMessage = $queueMessages[0]; + $this->deleteBlobIfPresent($this->extractBlobName($queueMessage->getMessageText())); + $this->getQueueService()->deleteMessage( + $this->poisonQueueName, + $queueMessage->getMessageId(), + $queueMessage->getPopReceipt() + ); + + $count++; + } + + return $count; + } + protected function getQueueService(): IQueue { if ($this->queueService === null) { @@ -896,16 +1016,100 @@ protected function generateBlobName(string $messageId): string /** * @throws JobQueueException */ - protected function createMessageFromQueueMessage(QueueMessage $queueMessage): AzureQueueStorageMessage - { - $payload = $this->extractPayload($queueMessage->getMessageText()); + protected function createMessageFromQueueMessage( + QueueMessage $queueMessage, + ?string $popReceipt = null, + ?string $queueName = null + ): AzureQueueStorageMessage { + // For poison queue messages we want the raw envelope as the payload, + $data = json_decode($queueMessage->getMessageText(), true); + $isPoisonEnvelope = is_array($data) && isset($data['originalQueue']); + + if ($isPoisonEnvelope) { + // Resolve claim-check blob into the envelope if present + if (!empty($data['isClaimCheck']) && !empty($data['blobName'])) { + try { + $blobResult = $this->getBlobService()->getBlob($this->containerName, $data['blobName']); + $data['payload'] = json_decode(stream_get_contents($blobResult->getContentStream()), true); + unset($data['isClaimCheck']); // normalise + } catch (Exception $e) { + throw new JobQueueException( + 'Failed to read claim-check blob for poison message: ' . $e->getMessage(), + 1234567936 + ); + } + } + $payload = $data; // return the full envelope + } else { + $payload = $this->extractPayload($queueMessage->getMessageText()); + } + return new AzureQueueStorageMessage( $this->extractMessageId($queueMessage->getMessageText()), $payload, - 0, // numberOfReleases not available in peek + 0, $queueMessage->getMessageId(), - null, // No pop receipt in peek - $this->extractBlobName($queueMessage->getMessageText()) + $popReceipt, + $isPoisonEnvelope ? null : $this->extractBlobName($queueMessage->getMessageText()), + $queueName ); } + + /** + * Resubmit a poison message back to its original queue. + * + * The original queue is read from the poison envelope stored in the + * message payload: ['originalQueue' => '...', 'payload' => ...]. + * + * When preservePoisonPayload was false the envelope has no 'payload' key + * and the requeued message will carry a null payload. + */ + private function requeuePoisonMessage(AzureQueueStorageMessage $message, int $delay = 0): void + { + $envelope = $message->getPayload(); + $targetQueue = $envelope['originalQueue'] ?? $this->normalPriorityQueueName; + $payload = array_key_exists('payload', $envelope) ? $envelope['payload'] : null; + + $messageContent = json_encode([ + 'messageId' => $message->getIdentifier(), + 'payload' => $payload, + ]); + + if (strlen($messageContent) > $this->claimCheckThreshold) { + $newBlobName = $this->generateBlobName($message->getIdentifier()); + $this->getBlobService()->createBlockBlob($this->containerName, $newBlobName, json_encode($payload)); + $messageContent = json_encode([ + 'isClaimCheck' => true, + 'blobName' => $newBlobName, + 'originalSize' => strlen(json_encode($payload)), + 'messageId' => $message->getIdentifier(), + ]); + } + + $createOptions = new CreateMessageOptions(); + $createOptions->setTimeToLiveInSeconds($this->defaultTtl); + if ($delay > 0) { + $createOptions->setVisibilityTimeoutInSeconds($delay); + } + + $this->getQueueService()->createMessage($targetQueue, $messageContent, $createOptions); + } + + /** + * Delete a blob, swallowing and logging any errors. + */ + private function deleteBlobIfPresent(?string $blobName): void + { + if ($blobName === null) { + return; + } + try { + $this->getBlobService()->deleteBlob($this->containerName, $blobName); + } catch (Exception $e) { + $this->systemLogger->warning('Failed to delete blob', [ + 'blobName' => $blobName, + 'error' => $e->getMessage(), + ]); + } + } } diff --git a/Classes/Queue/RetryableQueueInterface.php b/Classes/Queue/RetryableQueueInterface.php new file mode 100644 index 0000000..e012874 --- /dev/null +++ b/Classes/Queue/RetryableQueueInterface.php @@ -0,0 +1,18 @@ +logger->expects($this->once()) ->method('warning') ->with( - 'Failed to delete blob after message take', - ['error' => 'Failed to delete blob'] + 'Failed to delete blob', + ['error' => 'Failed to delete blob', 'blobName' => 'test-blob'] ); $message = $queue->waitAndTake(); @@ -1133,6 +1133,617 @@ public function dequeueSetsVisibilityTimeout(): void $queue->waitAndReserve(0); } + /** + * @test + */ + public function peekFailedReturnsEmptyArrayWhenPoisonQueueDisabled(): void + { + $queue = $this->createQueue(); + + $this->queueService->expects($this->never())->method('peekMessages'); + + $result = $queue->peekFailed(5); + + $this->assertSame([], $result); + } + + /** + * @test + */ + public function peekFailedReturnsPoisonMessages(): void + { + $queue = $this->createQueue('test-queue', ['usePoisonQueue' => true]); + + $envelope = [ + 'messageId' => 'msg_123', + 'originalQueue' => 'test-queue', + 'timestamp' => time(), + ]; + $queueMessage = $this->createMock(QueueMessage::class); + $queueMessage->method('getMessageText')->willReturn(json_encode($envelope)); + $queueMessage->method('getMessageId')->willReturn('azure-msg-id'); + $queueMessage->method('getPopReceipt')->willReturn(null); + $queueMessage->method('getDequeueCount')->willReturn(0); + + $peekResult = $this->createMock(PeekMessagesResult::class); + $peekResult->method('getQueueMessages')->willReturn([$queueMessage]); + + $this->queueService->expects($this->once()) + ->method('peekMessages') + ->with('test-queue-poison', $this->callback(function (PeekMessagesOptions $o) { + return $o->getNumberOfMessages() === 1; + })) + ->willReturn($peekResult); + + $messages = $queue->peekFailed(1); + + $this->assertCount(1, $messages); + $this->assertInstanceOf(AzureQueueStorageMessage::class, $messages[0]); + // Poison envelope should be the payload + $payload = $messages[0]->getPayload(); + $this->assertEquals('test-queue', $payload['originalQueue']); + } + + /** + * @test + */ + public function peekFailedRespectsLimit(): void + { + $queue = $this->createQueue('test-queue', ['usePoisonQueue' => true]); + + $peekResult = $this->createMock(PeekMessagesResult::class); + $peekResult->method('getQueueMessages')->willReturn([]); + + $this->queueService->expects($this->once()) + ->method('peekMessages') + ->with('test-queue-poison', $this->callback(function (PeekMessagesOptions $o) { + return $o->getNumberOfMessages() === 10; + })) + ->willReturn($peekResult); + + $queue->peekFailed(10); + } + + /** + * @test + */ + public function peekFailedCapsAtPeekLimit(): void + { + $queue = $this->createQueue('test-queue', ['usePoisonQueue' => true]); + + $peekResult = $this->createMock(PeekMessagesResult::class); + $peekResult->method('getQueueMessages')->willReturn([]); + + $this->queueService->expects($this->once()) + ->method('peekMessages') + ->with('test-queue-poison', $this->callback(function (PeekMessagesOptions $o) { + return $o->getNumberOfMessages() === 32; // peekLimit + })) + ->willReturn($peekResult); + + $queue->peekFailed(100); // request more than peekLimit + } + + /** + * @test + */ + public function peekFailedReturnsEmptyWhenPoisonQueueDoesNotExist(): void + { + $queue = $this->createQueue('test-queue', ['usePoisonQueue' => true]); + + $this->queueService->expects($this->once()) + ->method('peekMessages') + ->willThrowException(new Exception('Queue not found', 404)); + + $result = $queue->peekFailed(5); + + $this->assertSame([], $result); + } + + /** + * @test + */ + public function peekFailedThrowsOnUnexpectedError(): void + { + $queue = $this->createQueue('test-queue', ['usePoisonQueue' => true]); + + $this->queueService->expects($this->once()) + ->method('peekMessages') + ->willThrowException(new Exception('Service unavailable', 503)); + + $this->expectException(JobQueueException::class); + $this->expectExceptionCode(1234567930); + + $queue->peekFailed(1); + } + + /** + * @test + */ + public function peekFailedResolvesClaimCheckBlobForPoisonMessage(): void + { + $queue = $this->createQueue('test-queue', [ + 'usePoisonQueue' => true, + 'preservePoisonPayload' => true, + ]); + + $envelope = [ + 'messageId' => 'msg_123', + 'originalQueue' => 'test-queue', + 'timestamp' => time(), + 'isClaimCheck' => true, + 'blobName' => 'poison-blob', + ]; + $queueMessage = $this->createMock(QueueMessage::class); + $queueMessage->method('getMessageText')->willReturn(json_encode($envelope)); + $queueMessage->method('getMessageId')->willReturn('azure-msg-id'); + $queueMessage->method('getPopReceipt')->willReturn(null); + $queueMessage->method('getDequeueCount')->willReturn(0); + + $peekResult = $this->createMock(PeekMessagesResult::class); + $peekResult->method('getQueueMessages')->willReturn([$queueMessage]); + + $this->queueService->expects($this->once()) + ->method('peekMessages') + ->willReturn($peekResult); + + $stream = fopen('php://memory', 'r+'); + fwrite($stream, json_encode(['original' => 'payload'])); + rewind($stream); + + $blobResult = $this->createMock(GetBlobResult::class); + $blobResult->method('getContentStream')->willReturn($stream); + + $this->blobService->expects($this->once()) + ->method('getBlob') + ->with('jobqueue-blobs', 'poison-blob') + ->willReturn($blobResult); + + $messages = $queue->peekFailed(1); + + $this->assertCount(1, $messages); + $payload = $messages[0]->getPayload(); + $this->assertEquals(['original' => 'payload'], $payload['payload']); + $this->assertArrayNotHasKey('isClaimCheck', $payload); + } + + /** + * @test + */ + public function retryAllFailedReturnsZeroWhenPoisonQueueDisabled(): void + { + $queue = $this->createQueue(); + + $this->queueService->expects($this->never())->method('listMessages'); + + $count = $queue->retryAllFailed(); + + $this->assertSame(0, $count); + } + + /** + * @test + */ + public function retryAllFailedReturnsZeroWhenPoisonQueueEmpty(): void + { + $queue = $this->createQueue('test-queue', ['usePoisonQueue' => true]); + + $emptyResult = $this->createMock(ListMessagesResult::class); + $emptyResult->method('getQueueMessages')->willReturn([]); + + $this->queueService->expects($this->once()) + ->method('listMessages') + ->with('test-queue-poison', $this->anything()) + ->willReturn($emptyResult); + + $count = $queue->retryAllFailed(); + + $this->assertSame(0, $count); + } + + /** + * @test + */ + public function retryAllFailedRequeuesMessageToOriginalQueue(): void + { + $queue = $this->createQueue('test-queue', [ + 'usePoisonQueue' => true, + 'preservePoisonPayload' => true, + ]); + + $envelope = [ + 'messageId' => 'msg_123', + 'originalQueue' => 'test-queue', + 'timestamp' => time(), + 'payload' => ['retry' => 'me'], + ]; + + $queueMessage = $this->createMockQueueMessage(json_encode($envelope), 'azure-msg-id', 'pop-receipt'); + + $listResult = $this->createMock(ListMessagesResult::class); + $emptyResult = $this->createMock(ListMessagesResult::class); + $emptyResult->method('getQueueMessages')->willReturn([]); + + $this->queueService->expects($this->exactly(2)) + ->method('listMessages') + ->willReturnOnConsecutiveCalls($listResult, $emptyResult); + + $listResult->method('getQueueMessages')->willReturn([$queueMessage]); + + // Should requeue to original queue + $this->queueService->expects($this->once()) + ->method('createMessage') + ->with( + 'test-queue', + $this->callback(function ($content) { + $data = json_decode($content, true); + return isset($data['messageId']) + && isset($data['payload']) + && $data['payload'] === ['retry' => 'me']; + }), + $this->anything() + ); + + // Should delete from poison queue + $this->queueService->expects($this->once()) + ->method('deleteMessage') + ->with('test-queue-poison', 'azure-msg-id', 'pop-receipt'); + + $count = $queue->retryAllFailed(); + + $this->assertSame(1, $count); + } + + /** + * @test + */ + public function retryAllFailedRequeuesMultipleMessages(): void + { + $queue = $this->createQueue('test-queue', [ + 'usePoisonQueue' => true, + 'preservePoisonPayload' => true, + ]); + + $makeEnvelope = fn (int $i) => json_encode([ + 'messageId' => "msg_$i", + 'originalQueue' => 'test-queue', + 'timestamp' => time(), + 'payload' => ['index' => $i], + ]); + + $msg1 = $this->createMockQueueMessage($makeEnvelope(1), 'azure-id-1', 'receipt-1'); + $msg2 = $this->createMockQueueMessage($makeEnvelope(2), 'azure-id-2', 'receipt-2'); + + $result1 = $this->createMock(ListMessagesResult::class); + $result1->method('getQueueMessages')->willReturn([$msg1]); + + $result2 = $this->createMock(ListMessagesResult::class); + $result2->method('getQueueMessages')->willReturn([$msg2]); + + $emptyResult = $this->createMock(ListMessagesResult::class); + $emptyResult->method('getQueueMessages')->willReturn([]); + + $this->queueService->expects($this->exactly(3)) + ->method('listMessages') + ->willReturnOnConsecutiveCalls($result1, $result2, $emptyResult); + + $this->queueService->expects($this->exactly(2))->method('createMessage'); + $this->queueService->expects($this->exactly(2))->method('deleteMessage'); + + $count = $queue->retryAllFailed(); + + $this->assertSame(2, $count); + } + + /** + * @test + */ + public function retryAllFailedReturnsZeroWhenPoisonQueueDoesNotExist(): void + { + $queue = $this->createQueue('test-queue', ['usePoisonQueue' => true]); + + $this->queueService->expects($this->once()) + ->method('listMessages') + ->willThrowException(new Exception('Queue not found', 404)); + + $count = $queue->retryAllFailed(); + + $this->assertSame(0, $count); + } + + /** + * @test + */ + public function retryAllFailedThrowsAndReleasesOnRequeueError(): void + { + $queue = $this->createQueue('test-queue', [ + 'usePoisonQueue' => true, + 'preservePoisonPayload' => true, + ]); + + $envelope = [ + 'messageId' => 'msg_123', + 'originalQueue' => 'test-queue', + 'timestamp' => time(), + 'payload' => ['data' => 'x'], + ]; + + $queueMessage = $this->createMockQueueMessage(json_encode($envelope), 'azure-msg-id', 'pop-receipt'); + + $listResult = $this->createMock(ListMessagesResult::class); + $listResult->method('getQueueMessages')->willReturn([$queueMessage]); + + $this->queueService->expects($this->once()) + ->method('listMessages') + ->willReturn($listResult); + + // createMessage (requeue) fails + $this->queueService->expects($this->once()) + ->method('createMessage') + ->willThrowException(new Exception('Service error', 500)); + + // Should try to release the poison message back + $this->queueService->expects($this->once()) + ->method('updateMessage') + ->with('test-queue-poison', 'azure-msg-id', 'pop-receipt', '', 0); + + $this->expectException(JobQueueException::class); + $this->expectExceptionCode(1234567932); + + $queue->retryAllFailed(); + } + + /** + * @test + */ + public function retryAllFailedLogsErrorWhenReleaseAlsoFails(): void + { + $queue = $this->createQueue('test-queue', [ + 'usePoisonQueue' => true, + 'preservePoisonPayload' => true, + ]); + + $envelope = [ + 'messageId' => 'msg_123', + 'originalQueue' => 'test-queue', + 'timestamp' => time(), + 'payload' => ['data' => 'x'], + ]; + + $queueMessage = $this->createMockQueueMessage(json_encode($envelope), 'azure-msg-id', 'pop-receipt'); + + $listResult = $this->createMock(ListMessagesResult::class); + $listResult->method('getQueueMessages')->willReturn([$queueMessage]); + + $this->queueService->method('listMessages')->willReturn($listResult); + $this->queueService->method('createMessage') + ->willThrowException(new Exception('Service error', 500)); + $this->queueService->method('updateMessage') + ->willThrowException(new Exception('Also failed', 500)); + + $this->logger->expects($this->once()) + ->method('error') + ->with('Failed to release poison message after retry error', $this->arrayHasKey('messageId')); + + $this->expectException(JobQueueException::class); + + $queue->retryAllFailed(); + } + + /** + * @test + */ + public function retryAllFailedWithDelayPassesDelayOption(): void + { + $queue = $this->createQueue('test-queue', [ + 'usePoisonQueue' => true, + 'preservePoisonPayload' => true, + ]); + + $envelope = [ + 'messageId' => 'msg_123', + 'originalQueue' => 'test-queue', + 'timestamp' => time(), + 'payload' => ['data' => 'x'], + ]; + + $queueMessage = $this->createMockQueueMessage(json_encode($envelope), 'azure-msg-id', 'pop-receipt'); + + $listResult = $this->createMock(ListMessagesResult::class); + $emptyResult = $this->createMock(ListMessagesResult::class); + $listResult->method('getQueueMessages')->willReturn([$queueMessage]); + $emptyResult->method('getQueueMessages')->willReturn([]); + + $this->queueService->expects($this->exactly(2)) + ->method('listMessages') + ->willReturnOnConsecutiveCalls($listResult, $emptyResult); + + $this->queueService->expects($this->once()) + ->method('createMessage') + ->with( + 'test-queue', + $this->anything(), + $this->callback(function ($options) { + return $options->getVisibilityTimeoutInSeconds() === 60; + }) + ); + + $this->queueService->method('deleteMessage'); + + $queue->retryAllFailed(['delay' => 60]); + } + + /** + * @test + */ + public function discardAllFailedReturnsZeroWhenPoisonQueueDisabled(): void + { + $queue = $this->createQueue(); + + $this->queueService->expects($this->never())->method('listMessages'); + + $count = $queue->discardAllFailed(); + + $this->assertSame(0, $count); + } + + /** + * @test + */ + public function discardAllFailedReturnsZeroWhenPoisonQueueEmpty(): void + { + $queue = $this->createQueue('test-queue', ['usePoisonQueue' => true]); + + $emptyResult = $this->createMock(ListMessagesResult::class); + $emptyResult->method('getQueueMessages')->willReturn([]); + + $this->queueService->expects($this->once()) + ->method('listMessages') + ->with('test-queue-poison', $this->anything()) + ->willReturn($emptyResult); + + $count = $queue->discardAllFailed(); + + $this->assertSame(0, $count); + } + + /** + * @test + */ + public function discardAllFailedDeletesMessagesAndBlobs(): void + { + $queue = $this->createQueue('test-queue', ['usePoisonQueue' => true]); + + $envelope = json_encode([ + 'messageId' => 'msg_123', + 'originalQueue' => 'test-queue', + 'timestamp' => time(), + 'isClaimCheck' => true, + 'blobName' => 'poison-blob', + ]); + + $queueMessage = $this->createMockQueueMessage($envelope, 'azure-msg-id', 'pop-receipt'); + + $listResult = $this->createMock(ListMessagesResult::class); + $emptyResult = $this->createMock(ListMessagesResult::class); + $listResult->method('getQueueMessages')->willReturn([$queueMessage]); + $emptyResult->method('getQueueMessages')->willReturn([]); + + $this->queueService->expects($this->exactly(2)) + ->method('listMessages') + ->willReturnOnConsecutiveCalls($listResult, $emptyResult); + + $this->blobService->expects($this->once()) + ->method('deleteBlob') + ->with('jobqueue-blobs', 'poison-blob'); + + $this->queueService->expects($this->once()) + ->method('deleteMessage') + ->with('test-queue-poison', 'azure-msg-id', 'pop-receipt'); + + $count = $queue->discardAllFailed(); + + $this->assertSame(1, $count); + } + + /** + * @test + */ + public function discardAllFailedDeletesMultipleMessages(): void + { + $queue = $this->createQueue('test-queue', ['usePoisonQueue' => true]); + + $makeMsg = fn (int $i) => $this->createMockQueueMessage( + json_encode(['messageId' => "msg_$i", 'originalQueue' => 'test-queue', 'timestamp' => time()]), + "azure-id-$i", + "receipt-$i" + ); + + $result1 = $this->createMock(ListMessagesResult::class); + $result1->method('getQueueMessages')->willReturn([$makeMsg(1)]); + + $result2 = $this->createMock(ListMessagesResult::class); + $result2->method('getQueueMessages')->willReturn([$makeMsg(2)]); + + $emptyResult = $this->createMock(ListMessagesResult::class); + $emptyResult->method('getQueueMessages')->willReturn([]); + + $this->queueService->expects($this->exactly(3)) + ->method('listMessages') + ->willReturnOnConsecutiveCalls($result1, $result2, $emptyResult); + + $this->queueService->expects($this->exactly(2))->method('deleteMessage'); + + $count = $queue->discardAllFailed(); + + $this->assertSame(2, $count); + } + + /** + * @test + */ + public function discardAllFailedSkipsBlobDeletionWhenNoBlobName(): void + { + $queue = $this->createQueue('test-queue', ['usePoisonQueue' => true]); + + $envelope = json_encode([ + 'messageId' => 'msg_123', + 'originalQueue' => 'test-queue', + 'timestamp' => time(), + ]); + + $queueMessage = $this->createMockQueueMessage($envelope, 'azure-msg-id', 'pop-receipt'); + + $listResult = $this->createMock(ListMessagesResult::class); + $emptyResult = $this->createMock(ListMessagesResult::class); + $listResult->method('getQueueMessages')->willReturn([$queueMessage]); + $emptyResult->method('getQueueMessages')->willReturn([]); + + $this->queueService->expects($this->exactly(2)) + ->method('listMessages') + ->willReturnOnConsecutiveCalls($listResult, $emptyResult); + + $this->blobService->expects($this->never())->method('deleteBlob'); + $this->queueService->expects($this->once())->method('deleteMessage'); + + $count = $queue->discardAllFailed(); + + $this->assertSame(1, $count); + } + + /** + * @test + */ + public function discardAllFailedReturnsZeroWhenPoisonQueueDoesNotExist(): void + { + $queue = $this->createQueue('test-queue', ['usePoisonQueue' => true]); + + $this->queueService->expects($this->once()) + ->method('listMessages') + ->willThrowException(new Exception('Queue not found', 404)); + + $count = $queue->discardAllFailed(); + + $this->assertSame(0, $count); + } + + /** + * @test + */ + public function discardAllFailedThrowsOnUnexpectedListError(): void + { + $queue = $this->createQueue('test-queue', ['usePoisonQueue' => true]); + + $this->queueService->expects($this->once()) + ->method('listMessages') + ->willThrowException(new Exception('Service unavailable', 503)); + + $this->expectException(JobQueueException::class); + $this->expectExceptionCode(1234567933); + + $queue->discardAllFailed(); + } + /** * Helper method to create a mock queue message */ From 63d49bf51f7a5c280c979ab19e393c36d9b3149d Mon Sep 17 00:00:00 2001 From: Marc Bischof Date: Mon, 23 Feb 2026 15:43:57 +0100 Subject: [PATCH 2/5] Update command controller docs --- Classes/Command/RetryQueueCommandController.php | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/Classes/Command/RetryQueueCommandController.php b/Classes/Command/RetryQueueCommandController.php index acced81..828e773 100644 --- a/Classes/Command/RetryQueueCommandController.php +++ b/Classes/Command/RetryQueueCommandController.php @@ -19,6 +19,12 @@ class RetryQueueCommandController extends CommandController */ protected $queueManager; + /** + * Show message counts for a queue including reserved and failed messages + * + * @param string $queue Name of the base queue + * @return void + */ public function statusCommand(string $queue): void { $queueInstance = $this->getQueue($queue); @@ -37,6 +43,14 @@ public function statusCommand(string $queue): void $this->output->outputTable($rows, ['Queue', '# ready', '# reserved', '# failed']); } + /** + * Retry all messages from the poison queue + * + * @param string $queue Name of the base queue + * @param int $delay Optional delay in seconds before the retried messages become available again + * @return void + * @throws \Flowpack\JobQueue\Common\Exception + */ public function retryAllCommand(string $queue, int $delay = 0): void { $queueInstance = $this->getQueue($queue); @@ -69,7 +83,7 @@ public function discardAllCommand(string $queue): void } /** - * Show up to $limit messages from the poison queue without consuming them + * Peek messages from the poison queue without modifying them * * @param string $queue Name of the base queue * @param int $limit Number of messages to peek at From a96fcf228ae64212db82f31db0106a0e5996114d Mon Sep 17 00:00:00 2001 From: Marc Bischof Date: Mon, 23 Feb 2026 15:49:05 +0100 Subject: [PATCH 3/5] Update command controller docs --- Classes/Command/RetryQueueCommandController.php | 12 +++++++++--- Classes/Queue/RetryableQueueInterface.php | 17 ++++++++++++++--- 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/Classes/Command/RetryQueueCommandController.php b/Classes/Command/RetryQueueCommandController.php index 828e773..7bd68e5 100644 --- a/Classes/Command/RetryQueueCommandController.php +++ b/Classes/Command/RetryQueueCommandController.php @@ -87,8 +87,9 @@ public function discardAllCommand(string $queue): void * * @param string $queue Name of the base queue * @param int $limit Number of messages to peek at + * @param int $previewLength Maximum length of the payload preview */ - public function peekCommand(string $queue, int $limit = 5): void + public function peekCommand(string $queue, int $limit = 5, int $previewLength = 100): void { $queueInstance = $this->getQueue($queue); $messages = $queueInstance->peekFailed($limit); @@ -100,9 +101,14 @@ public function peekCommand(string $queue, int $limit = 5): void $rows = []; foreach ($messages as $message) { - $rows[] = [$message->getIdentifier(), json_encode($message->getPayload())]; + $payload = $message->getPayload(); + $payloadPreview = json_encode($payload); + if (strlen($payloadPreview) > 100) { + $payloadPreview = substr($payloadPreview, 0, $previewLength); + } + $rows[] = [$message->getIdentifier(), $message->getQueueMessageId(), $message->getBlobName(), $message->getNumberOfReleases(), $payloadPreview]; } - $this->output->outputTable($rows, ['Message ID', 'Payload']); + $this->output->outputTable($rows, ['Message ID', 'Queue Message ID', 'Blob Name', 'Releases', 'Payload']); } private function getQueue(string $queue): RetryableQueueInterface diff --git a/Classes/Queue/RetryableQueueInterface.php b/Classes/Queue/RetryableQueueInterface.php index e012874..7a937bf 100644 --- a/Classes/Queue/RetryableQueueInterface.php +++ b/Classes/Queue/RetryableQueueInterface.php @@ -7,12 +7,23 @@ */ interface RetryableQueueInterface { - /** Requeue all poison messages back to the original queue */ + /** + * Requeue all poison messages back to the original queue + * @param array $options Simple key/value array with options that can be interpreted by the concrete implementation (optional) + * @return int The number of messages that were retried + */ public function retryAllFailed(array $options = []): int; - /** Discard all poison messages */ + /** + * Discard all poison messages + * @return int The number of messages that were discarded + */ public function discardAllFailed(): int; - /** Count + peek for inspection */ + /** + * Count + peek for inspection + * @param int $limit Number of messages to peek + * @return AzureQueueStorageMessage[] Array of failed messages, empty if there are no failed messages + */ public function peekFailed(int $limit = 1): array; } From fdcc2669b69a97b2c2705611e80ebbc622334736 Mon Sep 17 00:00:00 2001 From: Marc Bischof Date: Mon, 23 Feb 2026 16:36:31 +0100 Subject: [PATCH 4/5] Add optional limit parameter to retry and discard commands --- .../Command/RetryQueueCommandController.php | 19 ++++++++++++++----- Classes/Queue/AzureQueueStorage.php | 13 ++++++++++++- Classes/Queue/RetryableQueueInterface.php | 8 ++++++-- 3 files changed, 32 insertions(+), 8 deletions(-) diff --git a/Classes/Command/RetryQueueCommandController.php b/Classes/Command/RetryQueueCommandController.php index 7bd68e5..d1a6a14 100644 --- a/Classes/Command/RetryQueueCommandController.php +++ b/Classes/Command/RetryQueueCommandController.php @@ -5,6 +5,7 @@ use Neos\Flow\Annotations as Flow; use Flowpack\JobQueue\Common\Queue\QueueManager; use Neos\Flow\Cli\CommandController; +use Oniva\JobQueue\AzureQueueStorage\Queue\AzureQueueStorageMessage; use Oniva\JobQueue\AzureQueueStorage\Queue\RetryableQueueInterface; use Exception; @@ -48,10 +49,11 @@ public function statusCommand(string $queue): void * * @param string $queue Name of the base queue * @param int $delay Optional delay in seconds before the retried messages become available again + * @param int $limit Optional limit for the number of messages to retry, if 0 all messages will be retried * @return void * @throws \Flowpack\JobQueue\Common\Exception */ - public function retryAllCommand(string $queue, int $delay = 0): void + public function retryAllCommand(string $queue, int $delay = 0, int $limit = 0): void { $queueInstance = $this->getQueue($queue); @@ -60,7 +62,7 @@ public function retryAllCommand(string $queue, int $delay = 0): void return; } - $count = $queueInstance->retryAllFailed(['delay' => $delay]); + $count = $queueInstance->retryAllFailed(['delay' => $delay, 'limit' => $limit]); $this->outputLine('Retried %d messages from poison queue', [$count]); } @@ -68,8 +70,11 @@ public function retryAllCommand(string $queue, int $delay = 0): void * Discard all messages from the poison queue * * @param string $queue Name of the base queue + * @param int $limit Optional limit for the number of messages to discard, if 0 all messages will be discarded + * @return void + * @throws \Flowpack\JobQueue\Common\Exception */ - public function discardAllCommand(string $queue): void + public function discardAllCommand(string $queue, int $limit = 0): void { $queueInstance = $this->getQueue($queue); @@ -78,7 +83,7 @@ public function discardAllCommand(string $queue): void return; } - $count = $queueInstance->discardAllFailed(); + $count = $queueInstance->discardAllFailed(['limit' => $limit]); $this->outputLine('Discarded %d messages from poison queue', [$count]); } @@ -106,7 +111,11 @@ public function peekCommand(string $queue, int $limit = 5, int $previewLength = if (strlen($payloadPreview) > 100) { $payloadPreview = substr($payloadPreview, 0, $previewLength); } - $rows[] = [$message->getIdentifier(), $message->getQueueMessageId(), $message->getBlobName(), $message->getNumberOfReleases(), $payloadPreview]; + $messageId = $message->getIdentifier(); + $queueMessageId = $message instanceof AzureQueueStorageMessage ? $message->getQueueMessageId() : '-'; + $blobName = $message instanceof AzureQueueStorageMessage ? $message->getBlobName() : '-'; + $numberOfReleases = $message->getNumberOfReleases(); + $rows[] = [$messageId, $queueMessageId, $blobName, $numberOfReleases, $payloadPreview]; } $this->output->outputTable($rows, ['Message ID', 'Queue Message ID', 'Blob Name', 'Releases', 'Payload']); } diff --git a/Classes/Queue/AzureQueueStorage.php b/Classes/Queue/AzureQueueStorage.php index a19a475..6c6d596 100644 --- a/Classes/Queue/AzureQueueStorage.php +++ b/Classes/Queue/AzureQueueStorage.php @@ -668,9 +668,14 @@ public function retryAllFailed(array $options = []): int } $delay = (int)($options['delay'] ?? 0); + $limit = (int)($options['limit'] ?? 0); $count = 0; while (true) { + if ($limit > 0 && $count >= $limit) { + break; + } + $listOptions = new ListMessagesOptions(); $listOptions->setNumberOfMessages(1); $listOptions->setVisibilityTimeoutInSeconds($this->visibilityTimeout); @@ -724,18 +729,24 @@ public function retryAllFailed(array $options = []): int return $count; } + /** * @inheritdoc */ - public function discardAllFailed(): int + public function discardAllFailed(array $options = []): int { if (!$this->usePoisonQueue) { return 0; } + $limit = (int)($options['limit'] ?? 0); $count = 0; while (true) { + if ($limit > 0 && $count >= $limit) { + break; + } + $listOptions = new ListMessagesOptions(); $listOptions->setNumberOfMessages(1); $listOptions->setVisibilityTimeoutInSeconds($this->visibilityTimeout); diff --git a/Classes/Queue/RetryableQueueInterface.php b/Classes/Queue/RetryableQueueInterface.php index 7a937bf..068f194 100644 --- a/Classes/Queue/RetryableQueueInterface.php +++ b/Classes/Queue/RetryableQueueInterface.php @@ -2,6 +2,8 @@ namespace Oniva\JobQueue\AzureQueueStorage\Queue; +use Flowpack\JobQueue\Common\Queue\Message; + /** * Interface for queues that support retryable messages */ @@ -9,6 +11,7 @@ interface RetryableQueueInterface { /** * Requeue all poison messages back to the original queue + * * @param array $options Simple key/value array with options that can be interpreted by the concrete implementation (optional) * @return int The number of messages that were retried */ @@ -16,14 +19,15 @@ public function retryAllFailed(array $options = []): int; /** * Discard all poison messages + * @param array $options Simple key/value array with options that can be interpreted by the concrete implementation (optional) * @return int The number of messages that were discarded */ - public function discardAllFailed(): int; + public function discardAllFailed(array $options = []): int; /** * Count + peek for inspection * @param int $limit Number of messages to peek - * @return AzureQueueStorageMessage[] Array of failed messages, empty if there are no failed messages + * @return Message[] Array of failed messages, empty if there are no failed messages */ public function peekFailed(int $limit = 1): array; } From eedd2a9ac87fc341ef0a171d2bc62fac53e83cba Mon Sep 17 00:00:00 2001 From: Marc Bischof Date: Mon, 23 Feb 2026 16:39:48 +0100 Subject: [PATCH 5/5] Add missing interface --- Classes/Command/RetryQueueCommandController.php | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Classes/Command/RetryQueueCommandController.php b/Classes/Command/RetryQueueCommandController.php index d1a6a14..4e8dba3 100644 --- a/Classes/Command/RetryQueueCommandController.php +++ b/Classes/Command/RetryQueueCommandController.php @@ -2,6 +2,7 @@ namespace Oniva\JobQueue\AzureQueueStorage\Command; +use Flowpack\JobQueue\Common\Queue\QueueInterface; use Neos\Flow\Annotations as Flow; use Flowpack\JobQueue\Common\Queue\QueueManager; use Neos\Flow\Cli\CommandController; @@ -120,7 +121,7 @@ public function peekCommand(string $queue, int $limit = 5, int $previewLength = $this->output->outputTable($rows, ['Message ID', 'Queue Message ID', 'Blob Name', 'Releases', 'Payload']); } - private function getQueue(string $queue): RetryableQueueInterface + private function getQueue(string $queue): RetryableQueueInterface&QueueInterface { $queueInstance = $this->queueManager->getQueue($queue); if (!$queueInstance instanceof RetryableQueueInterface) {