diff --git a/Classes/Command/RetryQueueCommandController.php b/Classes/Command/RetryQueueCommandController.php new file mode 100644 index 0000000..4e8dba3 --- /dev/null +++ b/Classes/Command/RetryQueueCommandController.php @@ -0,0 +1,133 @@ +getQueue($queue); + + try { + $rows = [[ + $queueInstance->getName(), + $queueInstance->countReady(), + $queueInstance->countReserved(), + $queueInstance->countFailed(), + ]]; + } catch (Exception $e) { + $rows = [[$queueInstance->getName(), '-', '-', '-']]; + } + + $this->output->outputTable($rows, ['Queue', '# ready', '# reserved', '# failed']); + } + + /** + * Retry all messages from the poison queue + * + * @param string $queue Name of the base queue + * @param int $delay Optional delay in seconds before the retried messages become available again + * @param int $limit Optional limit for the number of messages to retry, if 0 all messages will be retried + * @return void + * @throws \Flowpack\JobQueue\Common\Exception + */ + public function retryAllCommand(string $queue, int $delay = 0, int $limit = 0): void + { + $queueInstance = $this->getQueue($queue); + + if ($queueInstance->countFailed() === 0) { + $this->outputLine('No messages in poison queue'); + return; + } + + $count = $queueInstance->retryAllFailed(['delay' => $delay, 'limit' => $limit]); + $this->outputLine('Retried %d messages from poison queue', [$count]); + } + + /** + * Discard all messages from the poison queue + * + * @param string $queue Name of the base queue + * @param int $limit Optional limit for the number of messages to discard, if 0 all messages will be discarded + * @return void + * @throws \Flowpack\JobQueue\Common\Exception + */ + public function discardAllCommand(string $queue, int $limit = 0): void + { + $queueInstance = $this->getQueue($queue); + + if ($queueInstance->countFailed() === 0) { + $this->outputLine('No messages in poison queue'); + return; + } + + $count = $queueInstance->discardAllFailed(['limit' => $limit]); + $this->outputLine('Discarded %d messages from poison queue', [$count]); + } + + /** + * Peek messages from the poison queue without modifying them + * + * @param string $queue Name of the base queue + * @param int $limit Number of messages to peek at + * @param int $previewLength Maximum length of the payload preview + */ + public function peekCommand(string $queue, int $limit = 5, int $previewLength = 100): void + { + $queueInstance = $this->getQueue($queue); + $messages = $queueInstance->peekFailed($limit); + + if (empty($messages)) { + $this->outputLine('No messages in poison queue'); + return; + } + + $rows = []; + foreach ($messages as $message) { + $payload = $message->getPayload(); + $payloadPreview = json_encode($payload); + if (strlen($payloadPreview) > 100) { + $payloadPreview = substr($payloadPreview, 0, $previewLength); + } + $messageId = $message->getIdentifier(); + $queueMessageId = $message instanceof AzureQueueStorageMessage ? $message->getQueueMessageId() : '-'; + $blobName = $message instanceof AzureQueueStorageMessage ? $message->getBlobName() : '-'; + $numberOfReleases = $message->getNumberOfReleases(); + $rows[] = [$messageId, $queueMessageId, $blobName, $numberOfReleases, $payloadPreview]; + } + $this->output->outputTable($rows, ['Message ID', 'Queue Message ID', 'Blob Name', 'Releases', 'Payload']); + } + + private function getQueue(string $queue): RetryableQueueInterface&QueueInterface + { + $queueInstance = $this->queueManager->getQueue($queue); + if (!$queueInstance instanceof RetryableQueueInterface) { + $this->outputLine('Queue "%s" is not a RetryableQueueInterface queue', [$queue]); + $this->quit(1); + } + return $queueInstance; + } +} diff --git a/Classes/Queue/AzureQueueStorage.php b/Classes/Queue/AzureQueueStorage.php index 7b2ffce..6c6d596 100644 --- a/Classes/Queue/AzureQueueStorage.php +++ b/Classes/Queue/AzureQueueStorage.php @@ -23,7 +23,7 @@ * A queue implementation using Azure Storage Queue as the queue backend * with claim check pattern for large messages */ -class AzureQueueStorage implements QueueInterface +class AzureQueueStorage implements QueueInterface, RetryableQueueInterface { protected string $name; @@ -272,15 +272,7 @@ public function submit($payload, array $options = []): string } catch (Exception $e) { // Clean up blob if it was created but queue message failed if ($isClaimCheck && isset($blobName)) { - try { - $this->getBlobService()->deleteBlob($this->containerName, $blobName); - } catch (Exception $cleanupException) { - // Log cleanup failure but don't throw - $this->systemLogger->error('Failed to clean up blob after message submission failure', [ - 'blobName' => $blobName, - 'error' => $cleanupException->getMessage(), - ]); - } + $this->deleteBlobIfPresent($blobName); } throw new JobQueueException('Failed to submit message: ' . $e->getMessage(), 1234567894); } @@ -305,12 +297,8 @@ public function waitAndTake(?int $timeout = null): ?Message ); // Clean up blob if it exists - if ($message->getBlobName()) { - try { - $this->getBlobService()->deleteBlob($this->containerName, $message->getBlobName()); - } catch (Exception $e) { - $this->systemLogger->warning('Failed to delete blob after message take', ['error' => $e->getMessage()]); - } + if ($blobName = $message->getBlobName()) { + $this->deleteBlobIfPresent($blobName); } return $message; @@ -426,14 +414,7 @@ public function abort(string $messageId): void // Clean up blob if it exists if (!empty($messageInfo['blobName'])) { - try { - $this->getBlobService()->deleteBlob($this->containerName, $messageInfo['blobName']); - } catch (Exception $e) { - $this->systemLogger->warning('Failed to delete blob after message abort', [ - 'blobName' => $messageInfo['blobName'], - 'error' => $e->getMessage(), - ]); - } + $this->deleteBlobIfPresent($messageInfo['blobName']); } } @@ -472,14 +453,7 @@ public function finish(string $messageId): bool // Clean up blob if it exists if (!empty($messageInfo['blobName'])) { - try { - $this->getBlobService()->deleteBlob($this->containerName, $messageInfo['blobName']); - } catch (Exception $e) { - $this->systemLogger->warning('Failed to delete blob after message finish', [ - 'blobName' => $messageInfo['blobName'], - 'error' => $e->getMessage(), - ]); - } + $this->deleteBlobIfPresent($messageInfo['blobName']); } unset($this->reservedMessages[$messageId]); @@ -637,17 +611,174 @@ public function flush(): void $listBlobOptions->setPrefix(sprintf('queue-%s/', $this->name)); $blobList = $this->getBlobService()->listBlobs($this->containerName, $listBlobOptions); foreach ($blobList->getBlobs() as $blob) { - try { - $this->getBlobService()->deleteBlob($this->containerName, $blob->getName()); - } catch (Exception $e) { - // Continue with other blobs - } + $this->deleteBlobIfPresent($blob->getName()); } } catch (Exception $e) { throw new JobQueueException('Failed to flush queue: ' . $e->getMessage(), 1234567902); } } + /** + * @inheritdoc + */ + public function peekFailed(int $limit = 1): array + { + if (!$this->usePoisonQueue) { + return []; + } + + $messages = []; + $remaining = $limit; + + // peekMessages is capped at 32 per call and has no cursor, so we can + // only return up to 32 distinct messages per invocation. + while ($remaining > 0) { + $batchSize = min($remaining, $this->peekLimit); + $options = new PeekMessagesOptions(); + $options->setNumberOfMessages($batchSize); + + try { + $result = $this->getQueueService()->peekMessages($this->poisonQueueName, $options); + } catch (Exception $e) { + if ($e->getCode() === 404) { + break; + } + throw new JobQueueException('Failed to peek poison queue: ' . $e->getMessage(), 1234567930); + } + + $batch = $result->getQueueMessages(); + foreach ($batch as $queueMessage) { + $messages[] = $this->createMessageFromQueueMessage($queueMessage); + } + + // Azure peek has no cursor — a second call returns the same messages + break; + } + + return $messages; + } + + /** + * @inheritdoc + */ + public function retryAllFailed(array $options = []): int + { + if (!$this->usePoisonQueue) { + return 0; + } + + $delay = (int)($options['delay'] ?? 0); + $limit = (int)($options['limit'] ?? 0); + $count = 0; + + while (true) { + if ($limit > 0 && $count >= $limit) { + break; + } + + $listOptions = new ListMessagesOptions(); + $listOptions->setNumberOfMessages(1); + $listOptions->setVisibilityTimeoutInSeconds($this->visibilityTimeout); + + try { + $result = $this->getQueueService()->listMessages($this->poisonQueueName, $listOptions); + } catch (Exception $e) { + if ($e->getCode() === 404) { + break; + } + throw new JobQueueException('Failed to list poison queue: ' . $e->getMessage(), 1234567931); + } + + $queueMessages = $result->getQueueMessages(); + if (empty($queueMessages)) { + break; + } + + $message = $this->createMessageFromQueueMessage($queueMessages[0], $queueMessages[0]->getPopReceipt()); + + try { + $this->requeuePoisonMessage($message, $delay); + $this->getQueueService()->deleteMessage( + $this->poisonQueueName, + $message->getQueueMessageId(), + $message->getPopReceipt() + ); + $count++; + } catch (Exception $e) { + // Release the message back to the poison queue so it isn't silently lost + try { + $this->getQueueService()->updateMessage( + $this->poisonQueueName, + $message->getQueueMessageId(), + $message->getPopReceipt(), + '', + 0 + ); + } catch (Exception $releaseEx) { + $this->systemLogger->error('Failed to release poison message after retry error', [ + 'messageId' => $message->getIdentifier(), + 'error' => $releaseEx->getMessage(), + ]); + } + throw new JobQueueException( + 'Failed to retry poison message ' . $message->getIdentifier() . ': ' . $e->getMessage(), + 1234567932 + ); + } + } + + return $count; + } + + /** + * @inheritdoc + */ + public function discardAllFailed(array $options = []): int + { + if (!$this->usePoisonQueue) { + return 0; + } + + $limit = (int)($options['limit'] ?? 0); + $count = 0; + + while (true) { + if ($limit > 0 && $count >= $limit) { + break; + } + + $listOptions = new ListMessagesOptions(); + $listOptions->setNumberOfMessages(1); + $listOptions->setVisibilityTimeoutInSeconds($this->visibilityTimeout); + + try { + $result = $this->getQueueService()->listMessages($this->poisonQueueName, $listOptions); + } catch (Exception $e) { + if ($e->getCode() === 404) { + break; + } + throw new JobQueueException('Failed to list poison queue: ' . $e->getMessage(), 1234567933); + } + + $queueMessages = $result->getQueueMessages(); + if (empty($queueMessages)) { + break; + } + + $queueMessage = $queueMessages[0]; + $this->deleteBlobIfPresent($this->extractBlobName($queueMessage->getMessageText())); + $this->getQueueService()->deleteMessage( + $this->poisonQueueName, + $queueMessage->getMessageId(), + $queueMessage->getPopReceipt() + ); + + $count++; + } + + return $count; + } + protected function getQueueService(): IQueue { if ($this->queueService === null) { @@ -896,16 +1027,100 @@ protected function generateBlobName(string $messageId): string /** * @throws JobQueueException */ - protected function createMessageFromQueueMessage(QueueMessage $queueMessage): AzureQueueStorageMessage - { - $payload = $this->extractPayload($queueMessage->getMessageText()); + protected function createMessageFromQueueMessage( + QueueMessage $queueMessage, + ?string $popReceipt = null, + ?string $queueName = null + ): AzureQueueStorageMessage { + // For poison queue messages we want the raw envelope as the payload, + $data = json_decode($queueMessage->getMessageText(), true); + $isPoisonEnvelope = is_array($data) && isset($data['originalQueue']); + + if ($isPoisonEnvelope) { + // Resolve claim-check blob into the envelope if present + if (!empty($data['isClaimCheck']) && !empty($data['blobName'])) { + try { + $blobResult = $this->getBlobService()->getBlob($this->containerName, $data['blobName']); + $data['payload'] = json_decode(stream_get_contents($blobResult->getContentStream()), true); + unset($data['isClaimCheck']); // normalise + } catch (Exception $e) { + throw new JobQueueException( + 'Failed to read claim-check blob for poison message: ' . $e->getMessage(), + 1234567936 + ); + } + } + $payload = $data; // return the full envelope + } else { + $payload = $this->extractPayload($queueMessage->getMessageText()); + } + return new AzureQueueStorageMessage( $this->extractMessageId($queueMessage->getMessageText()), $payload, - 0, // numberOfReleases not available in peek + 0, $queueMessage->getMessageId(), - null, // No pop receipt in peek - $this->extractBlobName($queueMessage->getMessageText()) + $popReceipt, + $isPoisonEnvelope ? null : $this->extractBlobName($queueMessage->getMessageText()), + $queueName ); } + + /** + * Resubmit a poison message back to its original queue. + * + * The original queue is read from the poison envelope stored in the + * message payload: ['originalQueue' => '...', 'payload' => ...]. + * + * When preservePoisonPayload was false the envelope has no 'payload' key + * and the requeued message will carry a null payload. + */ + private function requeuePoisonMessage(AzureQueueStorageMessage $message, int $delay = 0): void + { + $envelope = $message->getPayload(); + $targetQueue = $envelope['originalQueue'] ?? $this->normalPriorityQueueName; + $payload = array_key_exists('payload', $envelope) ? $envelope['payload'] : null; + + $messageContent = json_encode([ + 'messageId' => $message->getIdentifier(), + 'payload' => $payload, + ]); + + if (strlen($messageContent) > $this->claimCheckThreshold) { + $newBlobName = $this->generateBlobName($message->getIdentifier()); + $this->getBlobService()->createBlockBlob($this->containerName, $newBlobName, json_encode($payload)); + $messageContent = json_encode([ + 'isClaimCheck' => true, + 'blobName' => $newBlobName, + 'originalSize' => strlen(json_encode($payload)), + 'messageId' => $message->getIdentifier(), + ]); + } + + $createOptions = new CreateMessageOptions(); + $createOptions->setTimeToLiveInSeconds($this->defaultTtl); + if ($delay > 0) { + $createOptions->setVisibilityTimeoutInSeconds($delay); + } + + $this->getQueueService()->createMessage($targetQueue, $messageContent, $createOptions); + } + + /** + * Delete a blob, swallowing and logging any errors. + */ + private function deleteBlobIfPresent(?string $blobName): void + { + if ($blobName === null) { + return; + } + try { + $this->getBlobService()->deleteBlob($this->containerName, $blobName); + } catch (Exception $e) { + $this->systemLogger->warning('Failed to delete blob', [ + 'blobName' => $blobName, + 'error' => $e->getMessage(), + ]); + } + } } diff --git a/Classes/Queue/RetryableQueueInterface.php b/Classes/Queue/RetryableQueueInterface.php new file mode 100644 index 0000000..068f194 --- /dev/null +++ b/Classes/Queue/RetryableQueueInterface.php @@ -0,0 +1,33 @@ +logger->expects($this->once()) ->method('warning') ->with( - 'Failed to delete blob after message take', - ['error' => 'Failed to delete blob'] + 'Failed to delete blob', + ['error' => 'Failed to delete blob', 'blobName' => 'test-blob'] ); $message = $queue->waitAndTake(); @@ -1133,6 +1133,617 @@ public function dequeueSetsVisibilityTimeout(): void $queue->waitAndReserve(0); } + /** + * @test + */ + public function peekFailedReturnsEmptyArrayWhenPoisonQueueDisabled(): void + { + $queue = $this->createQueue(); + + $this->queueService->expects($this->never())->method('peekMessages'); + + $result = $queue->peekFailed(5); + + $this->assertSame([], $result); + } + + /** + * @test + */ + public function peekFailedReturnsPoisonMessages(): void + { + $queue = $this->createQueue('test-queue', ['usePoisonQueue' => true]); + + $envelope = [ + 'messageId' => 'msg_123', + 'originalQueue' => 'test-queue', + 'timestamp' => time(), + ]; + $queueMessage = $this->createMock(QueueMessage::class); + $queueMessage->method('getMessageText')->willReturn(json_encode($envelope)); + $queueMessage->method('getMessageId')->willReturn('azure-msg-id'); + $queueMessage->method('getPopReceipt')->willReturn(null); + $queueMessage->method('getDequeueCount')->willReturn(0); + + $peekResult = $this->createMock(PeekMessagesResult::class); + $peekResult->method('getQueueMessages')->willReturn([$queueMessage]); + + $this->queueService->expects($this->once()) + ->method('peekMessages') + ->with('test-queue-poison', $this->callback(function (PeekMessagesOptions $o) { + return $o->getNumberOfMessages() === 1; + })) + ->willReturn($peekResult); + + $messages = $queue->peekFailed(1); + + $this->assertCount(1, $messages); + $this->assertInstanceOf(AzureQueueStorageMessage::class, $messages[0]); + // Poison envelope should be the payload + $payload = $messages[0]->getPayload(); + $this->assertEquals('test-queue', $payload['originalQueue']); + } + + /** + * @test + */ + public function peekFailedRespectsLimit(): void + { + $queue = $this->createQueue('test-queue', ['usePoisonQueue' => true]); + + $peekResult = $this->createMock(PeekMessagesResult::class); + $peekResult->method('getQueueMessages')->willReturn([]); + + $this->queueService->expects($this->once()) + ->method('peekMessages') + ->with('test-queue-poison', $this->callback(function (PeekMessagesOptions $o) { + return $o->getNumberOfMessages() === 10; + })) + ->willReturn($peekResult); + + $queue->peekFailed(10); + } + + /** + * @test + */ + public function peekFailedCapsAtPeekLimit(): void + { + $queue = $this->createQueue('test-queue', ['usePoisonQueue' => true]); + + $peekResult = $this->createMock(PeekMessagesResult::class); + $peekResult->method('getQueueMessages')->willReturn([]); + + $this->queueService->expects($this->once()) + ->method('peekMessages') + ->with('test-queue-poison', $this->callback(function (PeekMessagesOptions $o) { + return $o->getNumberOfMessages() === 32; // peekLimit + })) + ->willReturn($peekResult); + + $queue->peekFailed(100); // request more than peekLimit + } + + /** + * @test + */ + public function peekFailedReturnsEmptyWhenPoisonQueueDoesNotExist(): void + { + $queue = $this->createQueue('test-queue', ['usePoisonQueue' => true]); + + $this->queueService->expects($this->once()) + ->method('peekMessages') + ->willThrowException(new Exception('Queue not found', 404)); + + $result = $queue->peekFailed(5); + + $this->assertSame([], $result); + } + + /** + * @test + */ + public function peekFailedThrowsOnUnexpectedError(): void + { + $queue = $this->createQueue('test-queue', ['usePoisonQueue' => true]); + + $this->queueService->expects($this->once()) + ->method('peekMessages') + ->willThrowException(new Exception('Service unavailable', 503)); + + $this->expectException(JobQueueException::class); + $this->expectExceptionCode(1234567930); + + $queue->peekFailed(1); + } + + /** + * @test + */ + public function peekFailedResolvesClaimCheckBlobForPoisonMessage(): void + { + $queue = $this->createQueue('test-queue', [ + 'usePoisonQueue' => true, + 'preservePoisonPayload' => true, + ]); + + $envelope = [ + 'messageId' => 'msg_123', + 'originalQueue' => 'test-queue', + 'timestamp' => time(), + 'isClaimCheck' => true, + 'blobName' => 'poison-blob', + ]; + $queueMessage = $this->createMock(QueueMessage::class); + $queueMessage->method('getMessageText')->willReturn(json_encode($envelope)); + $queueMessage->method('getMessageId')->willReturn('azure-msg-id'); + $queueMessage->method('getPopReceipt')->willReturn(null); + $queueMessage->method('getDequeueCount')->willReturn(0); + + $peekResult = $this->createMock(PeekMessagesResult::class); + $peekResult->method('getQueueMessages')->willReturn([$queueMessage]); + + $this->queueService->expects($this->once()) + ->method('peekMessages') + ->willReturn($peekResult); + + $stream = fopen('php://memory', 'r+'); + fwrite($stream, json_encode(['original' => 'payload'])); + rewind($stream); + + $blobResult = $this->createMock(GetBlobResult::class); + $blobResult->method('getContentStream')->willReturn($stream); + + $this->blobService->expects($this->once()) + ->method('getBlob') + ->with('jobqueue-blobs', 'poison-blob') + ->willReturn($blobResult); + + $messages = $queue->peekFailed(1); + + $this->assertCount(1, $messages); + $payload = $messages[0]->getPayload(); + $this->assertEquals(['original' => 'payload'], $payload['payload']); + $this->assertArrayNotHasKey('isClaimCheck', $payload); + } + + /** + * @test + */ + public function retryAllFailedReturnsZeroWhenPoisonQueueDisabled(): void + { + $queue = $this->createQueue(); + + $this->queueService->expects($this->never())->method('listMessages'); + + $count = $queue->retryAllFailed(); + + $this->assertSame(0, $count); + } + + /** + * @test + */ + public function retryAllFailedReturnsZeroWhenPoisonQueueEmpty(): void + { + $queue = $this->createQueue('test-queue', ['usePoisonQueue' => true]); + + $emptyResult = $this->createMock(ListMessagesResult::class); + $emptyResult->method('getQueueMessages')->willReturn([]); + + $this->queueService->expects($this->once()) + ->method('listMessages') + ->with('test-queue-poison', $this->anything()) + ->willReturn($emptyResult); + + $count = $queue->retryAllFailed(); + + $this->assertSame(0, $count); + } + + /** + * @test + */ + public function retryAllFailedRequeuesMessageToOriginalQueue(): void + { + $queue = $this->createQueue('test-queue', [ + 'usePoisonQueue' => true, + 'preservePoisonPayload' => true, + ]); + + $envelope = [ + 'messageId' => 'msg_123', + 'originalQueue' => 'test-queue', + 'timestamp' => time(), + 'payload' => ['retry' => 'me'], + ]; + + $queueMessage = $this->createMockQueueMessage(json_encode($envelope), 'azure-msg-id', 'pop-receipt'); + + $listResult = $this->createMock(ListMessagesResult::class); + $emptyResult = $this->createMock(ListMessagesResult::class); + $emptyResult->method('getQueueMessages')->willReturn([]); + + $this->queueService->expects($this->exactly(2)) + ->method('listMessages') + ->willReturnOnConsecutiveCalls($listResult, $emptyResult); + + $listResult->method('getQueueMessages')->willReturn([$queueMessage]); + + // Should requeue to original queue + $this->queueService->expects($this->once()) + ->method('createMessage') + ->with( + 'test-queue', + $this->callback(function ($content) { + $data = json_decode($content, true); + return isset($data['messageId']) + && isset($data['payload']) + && $data['payload'] === ['retry' => 'me']; + }), + $this->anything() + ); + + // Should delete from poison queue + $this->queueService->expects($this->once()) + ->method('deleteMessage') + ->with('test-queue-poison', 'azure-msg-id', 'pop-receipt'); + + $count = $queue->retryAllFailed(); + + $this->assertSame(1, $count); + } + + /** + * @test + */ + public function retryAllFailedRequeuesMultipleMessages(): void + { + $queue = $this->createQueue('test-queue', [ + 'usePoisonQueue' => true, + 'preservePoisonPayload' => true, + ]); + + $makeEnvelope = fn (int $i) => json_encode([ + 'messageId' => "msg_$i", + 'originalQueue' => 'test-queue', + 'timestamp' => time(), + 'payload' => ['index' => $i], + ]); + + $msg1 = $this->createMockQueueMessage($makeEnvelope(1), 'azure-id-1', 'receipt-1'); + $msg2 = $this->createMockQueueMessage($makeEnvelope(2), 'azure-id-2', 'receipt-2'); + + $result1 = $this->createMock(ListMessagesResult::class); + $result1->method('getQueueMessages')->willReturn([$msg1]); + + $result2 = $this->createMock(ListMessagesResult::class); + $result2->method('getQueueMessages')->willReturn([$msg2]); + + $emptyResult = $this->createMock(ListMessagesResult::class); + $emptyResult->method('getQueueMessages')->willReturn([]); + + $this->queueService->expects($this->exactly(3)) + ->method('listMessages') + ->willReturnOnConsecutiveCalls($result1, $result2, $emptyResult); + + $this->queueService->expects($this->exactly(2))->method('createMessage'); + $this->queueService->expects($this->exactly(2))->method('deleteMessage'); + + $count = $queue->retryAllFailed(); + + $this->assertSame(2, $count); + } + + /** + * @test + */ + public function retryAllFailedReturnsZeroWhenPoisonQueueDoesNotExist(): void + { + $queue = $this->createQueue('test-queue', ['usePoisonQueue' => true]); + + $this->queueService->expects($this->once()) + ->method('listMessages') + ->willThrowException(new Exception('Queue not found', 404)); + + $count = $queue->retryAllFailed(); + + $this->assertSame(0, $count); + } + + /** + * @test + */ + public function retryAllFailedThrowsAndReleasesOnRequeueError(): void + { + $queue = $this->createQueue('test-queue', [ + 'usePoisonQueue' => true, + 'preservePoisonPayload' => true, + ]); + + $envelope = [ + 'messageId' => 'msg_123', + 'originalQueue' => 'test-queue', + 'timestamp' => time(), + 'payload' => ['data' => 'x'], + ]; + + $queueMessage = $this->createMockQueueMessage(json_encode($envelope), 'azure-msg-id', 'pop-receipt'); + + $listResult = $this->createMock(ListMessagesResult::class); + $listResult->method('getQueueMessages')->willReturn([$queueMessage]); + + $this->queueService->expects($this->once()) + ->method('listMessages') + ->willReturn($listResult); + + // createMessage (requeue) fails + $this->queueService->expects($this->once()) + ->method('createMessage') + ->willThrowException(new Exception('Service error', 500)); + + // Should try to release the poison message back + $this->queueService->expects($this->once()) + ->method('updateMessage') + ->with('test-queue-poison', 'azure-msg-id', 'pop-receipt', '', 0); + + $this->expectException(JobQueueException::class); + $this->expectExceptionCode(1234567932); + + $queue->retryAllFailed(); + } + + /** + * @test + */ + public function retryAllFailedLogsErrorWhenReleaseAlsoFails(): void + { + $queue = $this->createQueue('test-queue', [ + 'usePoisonQueue' => true, + 'preservePoisonPayload' => true, + ]); + + $envelope = [ + 'messageId' => 'msg_123', + 'originalQueue' => 'test-queue', + 'timestamp' => time(), + 'payload' => ['data' => 'x'], + ]; + + $queueMessage = $this->createMockQueueMessage(json_encode($envelope), 'azure-msg-id', 'pop-receipt'); + + $listResult = $this->createMock(ListMessagesResult::class); + $listResult->method('getQueueMessages')->willReturn([$queueMessage]); + + $this->queueService->method('listMessages')->willReturn($listResult); + $this->queueService->method('createMessage') + ->willThrowException(new Exception('Service error', 500)); + $this->queueService->method('updateMessage') + ->willThrowException(new Exception('Also failed', 500)); + + $this->logger->expects($this->once()) + ->method('error') + ->with('Failed to release poison message after retry error', $this->arrayHasKey('messageId')); + + $this->expectException(JobQueueException::class); + + $queue->retryAllFailed(); + } + + /** + * @test + */ + public function retryAllFailedWithDelayPassesDelayOption(): void + { + $queue = $this->createQueue('test-queue', [ + 'usePoisonQueue' => true, + 'preservePoisonPayload' => true, + ]); + + $envelope = [ + 'messageId' => 'msg_123', + 'originalQueue' => 'test-queue', + 'timestamp' => time(), + 'payload' => ['data' => 'x'], + ]; + + $queueMessage = $this->createMockQueueMessage(json_encode($envelope), 'azure-msg-id', 'pop-receipt'); + + $listResult = $this->createMock(ListMessagesResult::class); + $emptyResult = $this->createMock(ListMessagesResult::class); + $listResult->method('getQueueMessages')->willReturn([$queueMessage]); + $emptyResult->method('getQueueMessages')->willReturn([]); + + $this->queueService->expects($this->exactly(2)) + ->method('listMessages') + ->willReturnOnConsecutiveCalls($listResult, $emptyResult); + + $this->queueService->expects($this->once()) + ->method('createMessage') + ->with( + 'test-queue', + $this->anything(), + $this->callback(function ($options) { + return $options->getVisibilityTimeoutInSeconds() === 60; + }) + ); + + $this->queueService->method('deleteMessage'); + + $queue->retryAllFailed(['delay' => 60]); + } + + /** + * @test + */ + public function discardAllFailedReturnsZeroWhenPoisonQueueDisabled(): void + { + $queue = $this->createQueue(); + + $this->queueService->expects($this->never())->method('listMessages'); + + $count = $queue->discardAllFailed(); + + $this->assertSame(0, $count); + } + + /** + * @test + */ + public function discardAllFailedReturnsZeroWhenPoisonQueueEmpty(): void + { + $queue = $this->createQueue('test-queue', ['usePoisonQueue' => true]); + + $emptyResult = $this->createMock(ListMessagesResult::class); + $emptyResult->method('getQueueMessages')->willReturn([]); + + $this->queueService->expects($this->once()) + ->method('listMessages') + ->with('test-queue-poison', $this->anything()) + ->willReturn($emptyResult); + + $count = $queue->discardAllFailed(); + + $this->assertSame(0, $count); + } + + /** + * @test + */ + public function discardAllFailedDeletesMessagesAndBlobs(): void + { + $queue = $this->createQueue('test-queue', ['usePoisonQueue' => true]); + + $envelope = json_encode([ + 'messageId' => 'msg_123', + 'originalQueue' => 'test-queue', + 'timestamp' => time(), + 'isClaimCheck' => true, + 'blobName' => 'poison-blob', + ]); + + $queueMessage = $this->createMockQueueMessage($envelope, 'azure-msg-id', 'pop-receipt'); + + $listResult = $this->createMock(ListMessagesResult::class); + $emptyResult = $this->createMock(ListMessagesResult::class); + $listResult->method('getQueueMessages')->willReturn([$queueMessage]); + $emptyResult->method('getQueueMessages')->willReturn([]); + + $this->queueService->expects($this->exactly(2)) + ->method('listMessages') + ->willReturnOnConsecutiveCalls($listResult, $emptyResult); + + $this->blobService->expects($this->once()) + ->method('deleteBlob') + ->with('jobqueue-blobs', 'poison-blob'); + + $this->queueService->expects($this->once()) + ->method('deleteMessage') + ->with('test-queue-poison', 'azure-msg-id', 'pop-receipt'); + + $count = $queue->discardAllFailed(); + + $this->assertSame(1, $count); + } + + /** + * @test + */ + public function discardAllFailedDeletesMultipleMessages(): void + { + $queue = $this->createQueue('test-queue', ['usePoisonQueue' => true]); + + $makeMsg = fn (int $i) => $this->createMockQueueMessage( + json_encode(['messageId' => "msg_$i", 'originalQueue' => 'test-queue', 'timestamp' => time()]), + "azure-id-$i", + "receipt-$i" + ); + + $result1 = $this->createMock(ListMessagesResult::class); + $result1->method('getQueueMessages')->willReturn([$makeMsg(1)]); + + $result2 = $this->createMock(ListMessagesResult::class); + $result2->method('getQueueMessages')->willReturn([$makeMsg(2)]); + + $emptyResult = $this->createMock(ListMessagesResult::class); + $emptyResult->method('getQueueMessages')->willReturn([]); + + $this->queueService->expects($this->exactly(3)) + ->method('listMessages') + ->willReturnOnConsecutiveCalls($result1, $result2, $emptyResult); + + $this->queueService->expects($this->exactly(2))->method('deleteMessage'); + + $count = $queue->discardAllFailed(); + + $this->assertSame(2, $count); + } + + /** + * @test + */ + public function discardAllFailedSkipsBlobDeletionWhenNoBlobName(): void + { + $queue = $this->createQueue('test-queue', ['usePoisonQueue' => true]); + + $envelope = json_encode([ + 'messageId' => 'msg_123', + 'originalQueue' => 'test-queue', + 'timestamp' => time(), + ]); + + $queueMessage = $this->createMockQueueMessage($envelope, 'azure-msg-id', 'pop-receipt'); + + $listResult = $this->createMock(ListMessagesResult::class); + $emptyResult = $this->createMock(ListMessagesResult::class); + $listResult->method('getQueueMessages')->willReturn([$queueMessage]); + $emptyResult->method('getQueueMessages')->willReturn([]); + + $this->queueService->expects($this->exactly(2)) + ->method('listMessages') + ->willReturnOnConsecutiveCalls($listResult, $emptyResult); + + $this->blobService->expects($this->never())->method('deleteBlob'); + $this->queueService->expects($this->once())->method('deleteMessage'); + + $count = $queue->discardAllFailed(); + + $this->assertSame(1, $count); + } + + /** + * @test + */ + public function discardAllFailedReturnsZeroWhenPoisonQueueDoesNotExist(): void + { + $queue = $this->createQueue('test-queue', ['usePoisonQueue' => true]); + + $this->queueService->expects($this->once()) + ->method('listMessages') + ->willThrowException(new Exception('Queue not found', 404)); + + $count = $queue->discardAllFailed(); + + $this->assertSame(0, $count); + } + + /** + * @test + */ + public function discardAllFailedThrowsOnUnexpectedListError(): void + { + $queue = $this->createQueue('test-queue', ['usePoisonQueue' => true]); + + $this->queueService->expects($this->once()) + ->method('listMessages') + ->willThrowException(new Exception('Service unavailable', 503)); + + $this->expectException(JobQueueException::class); + $this->expectExceptionCode(1234567933); + + $queue->discardAllFailed(); + } + /** * Helper method to create a mock queue message */