diff --git a/src/ProducerInstance.php b/src/ProducerInstance.php index 5251939..c6a67c1 100644 --- a/src/ProducerInstance.php +++ b/src/ProducerInstance.php @@ -9,6 +9,7 @@ use Amp\Loop; use Amp\Promise; use Psr\Log\LoggerInterface; +use Ramsey\Uuid\Uuid; use Webgriffe\Esb\Model\FlowConfig; use Webgriffe\Esb\Model\Job; use Webgriffe\Esb\Model\ProducedJobEvent; @@ -164,6 +165,7 @@ function ($watcherId) { public function produceAndQueueJobs($data = null): Promise { return call(function () use ($data) { + $batchId = Uuid::uuid1()->toString(); $jobsCount = 0; $job = null; try { @@ -172,10 +174,18 @@ public function produceAndQueueJobs($data = null): Promise /** @var Job $job */ $job = $jobs->getCurrent(); $job->addEvent(new ProducedJobEvent(new \DateTime(), \get_class($this->producer))); - $jobsCount += yield $this->queueManager->enqueue($job); + if ($this->queueManager instanceof QueueManager) { + $jobsCount += yield $this->queueManager->enqueue($job, $batchId); + } else { + $jobsCount += yield $this->queueManager->enqueue($job); + } } - $jobsCount += yield $this->queueManager->flush(); + if ($this->queueManager instanceof QueueManager) { + $jobsCount += yield $this->queueManager->flush($batchId); + } else { + $jobsCount += yield $this->queueManager->flush(); + } } catch (\Throwable $error) { $this->logger->error( 'An error occurred producing/queueing jobs.', @@ -183,6 +193,7 @@ public function produceAndQueueJobs($data = null): Promise 'producer' => \get_class($this->producer), 'last_job_payload_data' => $job ? NonUtf8Cleaner::clean($job->getPayloadData()) : null, 'error' => $error->getMessage(), + 'batch_id' => $batchId, ] ); } diff --git a/src/Service/QueueManager.php b/src/Service/QueueManager.php index 4e12dfd..2cf9bb6 100644 --- a/src/Service/QueueManager.php +++ b/src/Service/QueueManager.php @@ -17,6 +17,8 @@ final class QueueManager implements ProducerQueueManagerInterface, WorkerQueueManagerInterface { + private const DEFAULT_BATCH_ID = 'default'; + /** * @var BeanstalkClient */ @@ -38,9 +40,9 @@ final class QueueManager implements ProducerQueueManagerInterface, WorkerQueueMa private $logger; /** - * @var JobInterface[] + * @var array> */ - private $batch = []; + private $batches = []; /** * @TODO This map is static because it must be shared between each QueueManager instance: it could be refactored @@ -142,9 +144,13 @@ public function boot(): Promise /** * @inheritdoc */ - public function enqueue(JobInterface $job): Promise + public function enqueue(JobInterface $job, ?string $batchId = null): Promise { - return call(function () use ($job) { + if ($batchId === null) { + $batchId = self::DEFAULT_BATCH_ID; + } + + return call(function () use ($job, $batchId) { $jobExists = yield $this->jobExists($job->getUuid()); if ($jobExists) { throw new \RuntimeException( @@ -154,14 +160,14 @@ public function enqueue(JobInterface $job): Promise ) ); } - $this->batch[$job->getUuid()] = $job; + $this->addJobToBatch($batchId, $job); - $count = count($this->batch); + $count = count($this->getBatch($batchId)); if ($count < $this->batchSize) { return 0; //Number of jobs actually added to the queue } - yield from $this->processBatch(); + yield from $this->processBatch($batchId); return $count; }); } @@ -169,12 +175,16 @@ public function enqueue(JobInterface $job): Promise /** * @inheritdoc */ - public function flush(): Promise + public function flush(?string $batchId = null): Promise { - return call(function () { - $jobsCount = count($this->batch); + if ($batchId === null) { + $batchId = self::DEFAULT_BATCH_ID; + } + + return call(function () use ($batchId) { + $jobsCount = count($this->getBatch($batchId)); if ($jobsCount > 0) { - yield from $this->processBatch(); + yield from $this->processBatch($batchId); } return $jobsCount; }); @@ -273,33 +283,33 @@ private function jobExists(string $jobUuid): Promise /** * @return \Generator */ - private function processBatch(): \Generator + private function processBatch(string $batchId): \Generator { $this->logger->debug('Processing batch'); - $result = yield $this->elasticSearch->bulkIndexJobs($this->batch, $this->flowConfig->getTube()); + $result = yield $this->elasticSearch->bulkIndexJobs($this->getBatch($batchId), $this->flowConfig->getTube()); if ($result['errors'] === true) { foreach ($result['items'] as $item) { if (!array_key_exists('index', $item)) { $this->logger->error( 'Unexpected response item in bulk index response', - ['bulk_index_response_item' => $item] + ['bulk_index_response_item' => $item, 'batch_id' => $batchId] ); continue; } $itemStatusCode = $item['index']['status'] ?? null; if (!$this->isSuccessfulStatusCode($itemStatusCode)) { $uuid = $item['index']['_id']; - unset($this->batch[$uuid]); + $this->removeJobFromBatch($batchId, $uuid); $this->logger->error( 'Job could not be indexed in ElasticSearch', - ['bulk_index_response_item' => $item] + ['bulk_index_response_item' => $item, 'batch_id' => $batchId] ); } } } - foreach ($this->batch as $singleJob) { + foreach ($this->getBatch($batchId) as $singleJob) { yield $this->beanstalkClient->put( $singleJob->getUuid(), $singleJob->getTimeout(), @@ -311,12 +321,13 @@ private function processBatch(): \Generator [ 'flow_name' => $this->flowConfig->getName(), 'job_uuid' => $singleJob->getUuid(), - 'payload_data' => NonUtf8Cleaner::clean($singleJob->getPayloadData()) + 'payload_data' => NonUtf8Cleaner::clean($singleJob->getPayloadData()), + 'batch_id' => $batchId ] ); } - $this->batch = []; + $this->clearBatch($batchId); } /** @@ -346,4 +357,32 @@ public function isSuccessfulStatusCode(?int $statusCode): bool { return $statusCode !== null && $statusCode >= 200 && $statusCode < 300; } + + /** + * @param string $batchId + * @return array + */ + private function getBatch(string $batchId): array + { + return $this->batches[$batchId] ?? []; + } + + private function addJobToBatch(string $batchId, JobInterface $job): void + { + $this->batches[$batchId][$job->getUuid()] = $job; + } + + private function removeJobFromBatch(string $batchId, string $jobUuid): void + { + unset($this->batches[$batchId][$jobUuid]); + } + + private function clearBatch(string $batchId): void + { + if (!array_key_exists($batchId, $this->batches)) { + return; + } + + $this->batches[$batchId] = []; + } } diff --git a/tests/Integration/HttpRequestProducerAndWorkerTest.php b/tests/Integration/HttpRequestProducerAndWorkerTest.php index 0e08399..7fbbe64 100644 --- a/tests/Integration/HttpRequestProducerAndWorkerTest.php +++ b/tests/Integration/HttpRequestProducerAndWorkerTest.php @@ -171,7 +171,7 @@ public function testHttpRequestProducerWithWrongUriShouldReturn404() ] ); $httpPort = self::$kernel->getContainer()->getParameter('http_server_port'); - + Loop::delay(100, function () use ($httpPort) { yield $this->waitForConnectionAvailable("tcp://127.0.0.1:{$httpPort}"); $payload = json_encode(['jobs' => ['job1', 'job2', 'job3']]); @@ -191,6 +191,67 @@ public function testHttpRequestProducerWithWrongUriShouldReturn404() $this->assertReadyJobsCountInTube(0, self::FLOW_CODE); } + public function testConcurrentHttpRequestsForSameFlow() + { + $workerFile = vfsStream::url('root/worker.data'); + self::createKernel( + [ + 'services' => [ + DummyHttpRequestProducer::class => ['arguments' => []], + DummyFilesystemWorker::class => ['arguments' => [$workerFile]], + ], + 'flows' => [ + self::FLOW_CODE => [ + 'description' => 'Http Request Producer And Worker Test Flow', + 'producer' => ['service' => DummyHttpRequestProducer::class], + 'worker' => ['service' => DummyFilesystemWorker::class], + ] + ] + ] + ); + $httpPort = self::$kernel->getContainer()->getParameter('http_server_port'); + + Loop::delay(100, function () use ($httpPort) { + yield $this->waitForConnectionAvailable("tcp://127.0.0.1:{$httpPort}"); + $payload1 = json_encode(['jobs' => ['job1']]); + $client1 = HttpClientBuilder::buildDefault(); + $request1 = new Request("http://127.0.0.1:{$httpPort}/dummy", 'POST'); + $request1->setBody($payload1); + $payload2 = json_encode(['jobs' => ['job2']]); + $client2 = HttpClientBuilder::buildDefault(); + $request2 = new Request("http://127.0.0.1:{$httpPort}/dummy", 'POST'); + $request2->setBody($payload2); + $responses = yield [$client1->request($request1), $client2->request($request2)]; + $this->assertStringContainsString( + '"Successfully scheduled 1 job(s) to be queued."', + yield $responses[0]->getBody()->read() + ); + $this->assertStringContainsString( + '"Successfully scheduled 1 job(s) to be queued."', + yield $responses[1]->getBody()->read() + ); + }); + $this->stopWhen(function () use ($workerFile) { + return (yield exists($workerFile)) && count($this->getFileLines($workerFile)) === 2; + }); + + self::$kernel->boot(); + + $workerFileLines = $this->getFileLines($workerFile); + $this->assertCount(2, $workerFileLines); + $this->assertStringContainsString('job1', implode(PHP_EOL, $workerFileLines)); + $this->assertStringContainsString('job2', implode(PHP_EOL, $workerFileLines)); + $this->logHandler()->hasRecordThatMatches( + '/Successfully produced a new Job .*? "payload_data":["job1"]/', + Logger::INFO + ); + $this->logHandler()->hasRecordThatMatches( + '/Successfully produced a new Job .*? "payload_data":["job2"]/', + Logger::INFO + ); + $this->assertReadyJobsCountInTube(0, self::FLOW_CODE); + } + private function waitForConnectionAvailable(string $uri): Promise { return call(function () use ($uri) {