From 1d797a91a7a563fecd0b27d69fe3f45bc602802a Mon Sep 17 00:00:00 2001 From: Manuele Menozzi Date: Wed, 7 Jan 2026 18:15:12 +0100 Subject: [PATCH 1/8] WIP refactor separating queue backend from batch management --- src/FlowExtension.php | 39 +++-- src/ProducerInstance.php | 114 ++------------ src/Service/BatchManager.php | 66 ++++++++ src/Service/BatchManagerFactory.php | 19 +++ ...nterface.php => BatchManagerInterface.php} | 9 +- ...=> BeanstalkElasticsearchQueueBackend.php} | 141 ++++++------------ ...nterface.php => QueueBackendInterface.php} | 16 +- src/WorkerInstance.php | 113 ++------------ .../HttpRequestProducerAndWorkerTest.php | 63 +++++++- 9 files changed, 262 insertions(+), 318 deletions(-) create mode 100644 src/Service/BatchManager.php create mode 100644 src/Service/BatchManagerFactory.php rename src/Service/{ProducerQueueManagerInterface.php => BatchManagerInterface.php} (84%) rename src/Service/{QueueManager.php => BeanstalkElasticsearchQueueBackend.php} (71%) rename src/Service/{WorkerQueueManagerInterface.php => QueueBackendInterface.php} (81%) diff --git a/src/FlowExtension.php b/src/FlowExtension.php index e648549..27d7db3 100644 --- a/src/FlowExtension.php +++ b/src/FlowExtension.php @@ -13,7 +13,9 @@ use Symfony\Component\DependencyInjection\Extension\ExtensionInterface; use Symfony\Component\DependencyInjection\Reference; use Webgriffe\Esb\Model\FlowConfig; -use Webgriffe\Esb\Service\QueueManager; +use Webgriffe\Esb\Service\BatchManager; +use Webgriffe\Esb\Service\BatchManagerFactory; +use Webgriffe\Esb\Service\BeanstalkElasticsearchQueueBackend; final class FlowExtension implements ExtensionInterface, CompilerPassInterface { @@ -66,10 +68,12 @@ public function process(ContainerBuilder $container): void { //These classes are defined manually. Remove the default definitions otherwise the container generates errors //trying to autowire them - $container->removeDefinition(QueueManager::class); + $container->removeDefinition(BeanstalkElasticsearchQueueBackend::class); $container->removeDefinition(FlowConfig::class); $container->removeDefinition(ProducerInstance::class); $container->removeDefinition(WorkerInstance::class); + $container->removeDefinition(BatchManager::class); + $container->removeDefinition(BatchManagerFactory::class); $flowManagerDefinition = $container->findDefinition(FlowManager::class); foreach ($this->flowsConfig as $flowName => $flowConfigData) { @@ -78,30 +82,39 @@ public function process(ContainerBuilder $container): void $flowDefinition = new Definition(Flow::class); $flowDefinition->setAutowired(true); $flowDefinition->setArgument('$flowConfig', $flowConfig); - $queueManagerId = 'flow.queue_manager.' . $flowName; + $queueBackendId = 'flow.queue_backend.' . $flowName; + $batchManagerFactoryId = 'flow.batch_manager_factory.' . $flowName; try { $producerDefinition = $container->findDefinition($flowConfig->getProducerServiceId()); $producerDefinition->setShared(false); - $queueManagerDefinition = new Definition(); - $queueManagerDefinition + $queueBackendDefinition = new Definition(); + $queueBackendDefinition ->setShared(false) ->setAutowired(true) - ->setClass(QueueManager::class) + ->setClass(BeanstalkElasticsearchQueueBackend::class) ->setArgument('$flowConfig', $flowConfig) + ; + $container->setDefinition($queueBackendId, $queueBackendDefinition); + + $batchManagerFactoryDefinition = new Definition(); + $batchManagerFactoryDefinition + ->setShared(false) + ->setAutowired(true) + ->setClass(BatchManagerFactory::class) + ->setArgument('$queueBackend', new Reference($queueBackendId)) ->setArgument('$batchSize', $flowConfig->getProducerBatchSize()) ; - $container->setDefinition($queueManagerId, $queueManagerDefinition); + $container->setDefinition($batchManagerFactoryId, $batchManagerFactoryDefinition); $producerInstanceDefinition = new Definition(); $producerInstanceDefinition ->setAutowired(true) ->setClass(ProducerInstance::class) - ->setArgument('$producer', new Reference($flowConfig->getProducerServiceId())) ->setArgument('$flowConfig', $flowConfig) - ->setArgument('$queueManager', new Reference($queueManagerId)) - ->setArgument('$beanstalkClient', null) - ->setArgument('$elasticSearch', null) + ->setArgument('$producer', new Reference($flowConfig->getProducerServiceId())) + ->setArgument('$queueBackend', new Reference($queueBackendId)) + ->setArgument('$batchManagerFactory', new Reference($batchManagerFactoryId)) ; $producerInstanceId = 'flow.producer_instance' . $flowName; $container->setDefinition($producerInstanceId, $producerInstanceDefinition); @@ -129,9 +142,7 @@ public function process(ContainerBuilder $container): void ->setArgument('$flowConfig', $flowConfig) ->setArgument('$instanceId', $instanceId) ->setArgument('$worker', new Reference($flowConfig->getWorkerServiceId())) - ->setArgument('$queueManager', new Reference($queueManagerId)) - ->setArgument('$beanstalkClient', null) - ->setArgument('$elasticSearch', null) + ->setArgument('$queueBackend', new Reference($queueBackendId)) ; $workerInstanceId = sprintf('flow.worker_instance.%s.%s', $flowName, $instanceId); $container->setDefinition($workerInstanceId, $workerInstanceDefinition); diff --git a/src/ProducerInstance.php b/src/ProducerInstance.php index 5251939..2caddc2 100644 --- a/src/ProducerInstance.php +++ b/src/ProducerInstance.php @@ -4,7 +4,6 @@ namespace Webgriffe\Esb; -use Amp\Beanstalk\BeanstalkClient; use function Amp\call; use Amp\Loop; use Amp\Promise; @@ -12,115 +11,31 @@ use Webgriffe\Esb\Model\FlowConfig; use Webgriffe\Esb\Model\Job; use Webgriffe\Esb\Model\ProducedJobEvent; +use Webgriffe\Esb\Service\BatchManagerFactory; use Webgriffe\Esb\Service\CronProducersServer; -use Webgriffe\Esb\Service\ElasticSearch; + use Webgriffe\Esb\Service\HttpProducersServer; -use Webgriffe\Esb\Service\ProducerQueueManagerInterface; -use Webgriffe\Esb\Service\QueueManager; + +use Webgriffe\Esb\Service\QueueBackendInterface; final class ProducerInstance implements ProducerInstanceInterface { - /** - * @var FlowConfig - */ - private $flowConfig; - - /** - * @var ProducerInterface - */ - private $producer; - - /** - * @var LoggerInterface - */ - private $logger; - - /** - * @var HttpProducersServer - */ - private $httpProducersServer; - - /** - * @var CronProducersServer - */ - private $cronProducersServer; - - /** - * @var ProducerQueueManagerInterface - */ - private $queueManager; - public function __construct( - FlowConfig $flowConfig, - ProducerInterface $producer, - ?BeanstalkClient $beanstalkClient, - LoggerInterface $logger, - HttpProducersServer $httpProducersServer, - CronProducersServer $cronProducersServer, - ?ElasticSearch $elasticSearch, - ?ProducerQueueManagerInterface $queueManager = null + private readonly FlowConfig $flowConfig, + private readonly ProducerInterface $producer, + private readonly LoggerInterface $logger, + private readonly HttpProducersServer $httpProducersServer, + private readonly CronProducersServer $cronProducersServer, + private readonly QueueBackendInterface $queueBackend, + private readonly BatchManagerFactory $batchManagerFactory, ) { - if ($beanstalkClient !== null) { - trigger_deprecation( - 'webgriffe/esb', - '2.2', - 'Passing a "%s" to "%s" is deprecated and will be removed in 3.0. ' . - 'Please pass a "%s" instead.', - BeanstalkClient::class, - __CLASS__, - ProducerQueueManagerInterface::class - ); - } - if ($elasticSearch !== null) { - trigger_deprecation( - 'webgriffe/esb', - '2.2', - 'Passing a "%s" to "%s" is deprecated and will be removed in 3.0. ' . - 'Please pass a "%s" instead.', - ElasticSearch::class, - __CLASS__, - ProducerQueueManagerInterface::class - ); - } - $this->flowConfig = $flowConfig; - $this->producer = $producer; - $this->logger = $logger; - $this->httpProducersServer = $httpProducersServer; - $this->cronProducersServer = $cronProducersServer; - - if ($queueManager === null) { - trigger_deprecation( - 'webgriffe/esb', - '2.2', - 'Not passing a "%s" to "%s" is deprecated and will be required in 3.0.', - ProducerQueueManagerInterface::class, - __CLASS__ - ); - - if (!$beanstalkClient) { - throw new \RuntimeException('Cannot create a QueueManager without the Beanstalk client!'); - } - - if (!$elasticSearch) { - throw new \RuntimeException('Cannot create a QueueManager without the ElasticSearch client'); - } - - $queueManager = new QueueManager( - $this->flowConfig, - $beanstalkClient, - $elasticSearch, - $this->logger, - 1000 - ); - } - $this->queueManager = $queueManager; } public function boot(): Promise { return call(function () { yield $this->producer->init(); - yield $this->queueManager->boot(); + yield $this->queueBackend->boot(); $this->logger->info( 'A Producer has been successfully initialized', @@ -164,6 +79,7 @@ function ($watcherId) { public function produceAndQueueJobs($data = null): Promise { return call(function () use ($data) { + $batchManager = $this->batchManagerFactory->create(); $jobsCount = 0; $job = null; try { @@ -172,10 +88,10 @@ public function produceAndQueueJobs($data = null): Promise /** @var Job $job */ $job = $jobs->getCurrent(); $job->addEvent(new ProducedJobEvent(new \DateTime(), \get_class($this->producer))); - $jobsCount += yield $this->queueManager->enqueue($job); + $jobsCount += yield $batchManager->enqueue($job); } - $jobsCount += yield $this->queueManager->flush(); + $jobsCount += yield $batchManager->flush(); } catch (\Throwable $error) { $this->logger->error( 'An error occurred producing/queueing jobs.', diff --git a/src/Service/BatchManager.php b/src/Service/BatchManager.php new file mode 100644 index 0000000..9dd0152 --- /dev/null +++ b/src/Service/BatchManager.php @@ -0,0 +1,66 @@ +queueBackend->jobExists($job->getUuid()); + if ($jobExists) { + throw new \RuntimeException( + sprintf( + 'A job with UUID "%s" already exists but this should be a new job.', + $job->getUuid() + ) + ); + } + $this->batch[$job->getUuid()] = $job; + + $count = count($this->batch); + if ($count < $this->batchSize) { + return 0; //Number of jobs actually added to the queue + } + + return yield $this->flush(); + }); + } + + /** + * @inheritdoc + */ + public function flush(): Promise + { + return call(function () { + $jobsCount = count($this->batch); + if ($jobsCount > 0) { + yield $this->queueBackend->enqueueJobs($this->batch); + } + $this->batch = []; + + return $jobsCount; + }); + } +} diff --git a/src/Service/BatchManagerFactory.php b/src/Service/BatchManagerFactory.php new file mode 100644 index 0000000..9ee1da7 --- /dev/null +++ b/src/Service/BatchManagerFactory.php @@ -0,0 +1,19 @@ +queueBackend, $this->batchSize); + } +} diff --git a/src/Service/ProducerQueueManagerInterface.php b/src/Service/BatchManagerInterface.php similarity index 84% rename from src/Service/ProducerQueueManagerInterface.php rename to src/Service/BatchManagerInterface.php index 1c2fa00..e790261 100644 --- a/src/Service/ProducerQueueManagerInterface.php +++ b/src/Service/BatchManagerInterface.php @@ -7,15 +7,8 @@ use Amp\Promise; use Webgriffe\Esb\Model\JobInterface; -interface ProducerQueueManagerInterface +interface BatchManagerInterface { - /** - * Initializes this queue manager. Must be called before this can be used - * - * @return Promise - */ - public function boot(): Promise; - /** * Adds a new job to the queue managed by this object. The method returns a promise that resolves to the number of * jobs that were actually added to the underlying queue. In the simplest case this number is 1, but if the diff --git a/src/Service/QueueManager.php b/src/Service/BeanstalkElasticsearchQueueBackend.php similarity index 71% rename from src/Service/QueueManager.php rename to src/Service/BeanstalkElasticsearchQueueBackend.php index 4e12dfd..227b460 100644 --- a/src/Service/QueueManager.php +++ b/src/Service/BeanstalkElasticsearchQueueBackend.php @@ -13,9 +13,10 @@ use Webgriffe\Esb\Model\FlowConfig; use Webgriffe\Esb\Model\Job; use Webgriffe\Esb\Model\JobInterface; + use Webgriffe\Esb\NonUtf8Cleaner; -final class QueueManager implements ProducerQueueManagerInterface, WorkerQueueManagerInterface +final class BeanstalkElasticsearchQueueBackend implements QueueBackendInterface { /** * @var BeanstalkClient @@ -38,33 +39,22 @@ final class QueueManager implements ProducerQueueManagerInterface, WorkerQueueMa private $logger; /** - * @var JobInterface[] - */ - private $batch = []; - - /** - * @TODO This map is static because it must be shared between each QueueManager instance: it could be refactored + * @TODO This map is static because it must be shared between each QueueBackend instance: it could be refactored * extracting the mapping service to a dedicated class * @var int[] */ private static $uuidToBeanstalkIdMap = []; - /** - * @var int - */ - private $batchSize; public function __construct( FlowConfig $flowConfig, BeanstalkClient $beanstalkClient, ElasticSearch $elasticSearch, LoggerInterface $logger, - int $batchSize ) { $this->flowConfig = $flowConfig; $this->beanstalkClient = $beanstalkClient; $this->elasticSearch = $elasticSearch; $this->logger = $logger; - $this->batchSize = $batchSize; } /** @@ -139,47 +129,6 @@ public function boot(): Promise }); } - /** - * @inheritdoc - */ - public function enqueue(JobInterface $job): Promise - { - return call(function () use ($job) { - $jobExists = yield $this->jobExists($job->getUuid()); - if ($jobExists) { - throw new \RuntimeException( - sprintf( - 'A job with UUID "%s" already exists but this should be a new job.', - $job->getUuid() - ) - ); - } - $this->batch[$job->getUuid()] = $job; - - $count = count($this->batch); - if ($count < $this->batchSize) { - return 0; //Number of jobs actually added to the queue - } - - yield from $this->processBatch(); - return $count; - }); - } - - /** - * @inheritdoc - */ - public function flush(): Promise - { - return call(function () { - $jobsCount = count($this->batch); - if ($jobsCount > 0) { - yield from $this->processBatch(); - } - return $jobsCount; - }); - } - /** * @inheritdoc */ @@ -258,7 +207,7 @@ public function isEmpty(string $queueName): Promise * @param string $jobUuid * @return Promise */ - private function jobExists(string $jobUuid): Promise + public function jobExists(string $jobUuid): Promise { return call(function () use ($jobUuid) { try { @@ -271,52 +220,52 @@ private function jobExists(string $jobUuid): Promise } /** - * @return \Generator + * @param array $jobs */ - private function processBatch(): \Generator + public function enqueueJobs(array $jobs): Promise { - $this->logger->debug('Processing batch'); - $result = yield $this->elasticSearch->bulkIndexJobs($this->batch, $this->flowConfig->getTube()); - - if ($result['errors'] === true) { - foreach ($result['items'] as $item) { - if (!array_key_exists('index', $item)) { - $this->logger->error( - 'Unexpected response item in bulk index response', - ['bulk_index_response_item' => $item] - ); - continue; - } - $itemStatusCode = $item['index']['status'] ?? null; - if (!$this->isSuccessfulStatusCode($itemStatusCode)) { - $uuid = $item['index']['_id']; - unset($this->batch[$uuid]); - $this->logger->error( - 'Job could not be indexed in ElasticSearch', - ['bulk_index_response_item' => $item] - ); + return call(function () use ($jobs) { + $this->logger->debug('Enqueuing jobs batch...'); + $result = yield $this->elasticSearch->bulkIndexJobs($jobs, $this->flowConfig->getTube()); + + if ($result['errors'] === true) { + foreach ($result['items'] as $item) { + if (!array_key_exists('index', $item)) { + $this->logger->error( + 'Unexpected response item in bulk index response', + ['bulk_index_response_item' => $item] + ); + continue; + } + $itemStatusCode = $item['index']['status'] ?? null; + if (!$this->isSuccessfulStatusCode($itemStatusCode)) { + $uuid = $item['index']['_id']; + unset($jobs[$uuid]); + $this->logger->error( + 'Job could not be indexed in ElasticSearch', + ['bulk_index_response_item' => $item] + ); + } } } - } - - foreach ($this->batch as $singleJob) { - yield $this->beanstalkClient->put( - $singleJob->getUuid(), - $singleJob->getTimeout(), - $singleJob->getDelay(), - $singleJob->getPriority() - ); - $this->logger->info( - 'Successfully enqueued a new Job', - [ - 'flow_name' => $this->flowConfig->getName(), - 'job_uuid' => $singleJob->getUuid(), - 'payload_data' => NonUtf8Cleaner::clean($singleJob->getPayloadData()) - ] - ); - } - $this->batch = []; + foreach ($jobs as $singleJob) { + yield $this->beanstalkClient->put( + $singleJob->getUuid(), + $singleJob->getTimeout(), + $singleJob->getDelay(), + $singleJob->getPriority() + ); + $this->logger->info( + 'Successfully enqueued a new Job', + [ + 'flow_name' => $this->flowConfig->getName(), + 'job_uuid' => $singleJob->getUuid(), + 'payload_data' => NonUtf8Cleaner::clean($singleJob->getPayloadData()) + ] + ); + } + }); } /** diff --git a/src/Service/WorkerQueueManagerInterface.php b/src/Service/QueueBackendInterface.php similarity index 81% rename from src/Service/WorkerQueueManagerInterface.php rename to src/Service/QueueBackendInterface.php index c9e6806..6f654ab 100644 --- a/src/Service/WorkerQueueManagerInterface.php +++ b/src/Service/QueueBackendInterface.php @@ -7,10 +7,10 @@ use Amp\Promise; use Webgriffe\Esb\Model\JobInterface; -interface WorkerQueueManagerInterface +interface QueueBackendInterface { /** - * Initializes this queue manager. Must be called before this can be used + * Initializes this queue backend. Must be called before this can be used * * @return Promise */ @@ -58,4 +58,16 @@ public function dequeue(JobInterface $job): Promise; * @return Promise */ public function isEmpty(string $queueName): Promise; + + /** + * @param string $jobUuid + * @return Promise + */ + public function jobExists(string $jobUuid): Promise; + + /** + * @param JobInterface[] $jobs + * @return Promise + */ + public function enqueueJobs(array $jobs): Promise; } diff --git a/src/WorkerInstance.php b/src/WorkerInstance.php index faa67f8..f3cfe32 100644 --- a/src/WorkerInstance.php +++ b/src/WorkerInstance.php @@ -4,7 +4,6 @@ namespace Webgriffe\Esb; -use Amp\Beanstalk\BeanstalkClient; use function Amp\call; use function Amp\delay; use Amp\Promise; @@ -15,111 +14,29 @@ use Webgriffe\Esb\Model\JobInterface; use Webgriffe\Esb\Model\ReservedJobEvent; use Webgriffe\Esb\Model\WorkedJobEvent; -use Webgriffe\Esb\Service\ElasticSearch; -use Webgriffe\Esb\Service\QueueManager; -use Webgriffe\Esb\Service\WorkerQueueManagerInterface; +use Webgriffe\Esb\Service\QueueBackendInterface; final class WorkerInstance implements WorkerInstanceInterface { - /** - * @var FlowConfig - */ - private $flowConfig; - - /** - * @var int - */ - private $instanceId; - - /** - * @var WorkerInterface - */ - private $worker; - - /** - * @var LoggerInterface - */ - private $logger; - - /** - * @var WorkerQueueManagerInterface - */ - private $queueManager; - /** * @var array */ private static $workCounts = []; public function __construct( - FlowConfig $flowConfig, - int $instanceId, - WorkerInterface $worker, - ?BeanstalkClient $beanstalkClient, - LoggerInterface $logger, - ?ElasticSearch $elasticSearch, - ?WorkerQueueManagerInterface $queueManager = null + private readonly FlowConfig $flowConfig, + private readonly int $instanceId, + private readonly WorkerInterface $worker, + private readonly LoggerInterface $logger, + private readonly QueueBackendInterface $queueBackend ) { - if ($beanstalkClient !== null) { - trigger_deprecation( - 'webgriffe/esb', - '2.2', - 'Passing a "%s" to "%s" is deprecated and will be removed in 3.0. ' . - 'Please pass a "%s" instead.', - BeanstalkClient::class, - __CLASS__, - WorkerQueueManagerInterface::class - ); - } - if ($elasticSearch !== null) { - trigger_deprecation( - 'webgriffe/esb', - '2.2', - 'Passing a "%s" to "%s" is deprecated and will be removed in 3.0. ' . - 'Please pass a "%s" instead.', - ElasticSearch::class, - __CLASS__, - WorkerQueueManagerInterface::class - ); - } - $this->flowConfig = $flowConfig; - $this->instanceId = $instanceId; - $this->worker = $worker; - $this->logger = $logger; - - if ($queueManager === null) { - trigger_deprecation( - 'webgriffe/esb', - '2.2', - 'Not passing a "%s" to "%s" is deprecated and will be required in 3.0.', - WorkerQueueManagerInterface::class, - __CLASS__ - ); - - if (!$beanstalkClient) { - throw new \RuntimeException('Cannot create a QueueManager without the Beanstalk client!'); - } - - if (!$elasticSearch) { - throw new \RuntimeException('Cannot create a QueueManager without the ElasticSearch client'); - } - - $queueManager = new QueueManager( - $this->flowConfig, - $beanstalkClient, - $elasticSearch, - $this->logger, - 1000 - ); - } - $this->queueManager = $queueManager; } public function boot(): Promise { return call(function () { yield $this->worker->init(); - yield $this->queueManager->boot(); + yield $this->queueBackend->boot(); $workerFqcn = \get_class($this->worker); $globalLogContext = [ @@ -145,7 +62,7 @@ public function boot(): Promise try { /** @var JobInterface $job */ - if (!($job = yield $this->queueManager->getNextJob())) { + if (!($job = yield $this->queueBackend->getNextJob())) { break; } } catch (FatalQueueException $ex) { @@ -163,7 +80,7 @@ public function boot(): Promise yield $this->waitForDependencies($lastProcessTimestamp, $logContext); $job->addEvent(new ReservedJobEvent(new \DateTime(), $workerFqcn)); - yield $this->queueManager->updateJob($job); + yield $this->queueBackend->updateJob($job); $payloadData = $job->getPayloadData(); $logContext['payload_data'] = NonUtf8Cleaner::clean($payloadData); @@ -178,14 +95,14 @@ public function boot(): Promise yield $this->worker->work($job); $job->addEvent(new WorkedJobEvent(new \DateTime(), $workerFqcn)); - yield $this->queueManager->updateJob($job); + yield $this->queueBackend->updateJob($job); $this->logger->info('Successfully worked a Job', $logContext); - yield $this->queueManager->dequeue($job); + yield $this->queueBackend->dequeue($job); unset(self::$workCounts[$jobUuid]); } catch (\Throwable $e) { $job->addEvent(new ErroredJobEvent(new \DateTime(), $workerFqcn, $e->getMessage())); - yield $this->queueManager->updateJob($job); + yield $this->queueBackend->updateJob($job); $this->logger->notice( 'An error occurred while working a Job.', array_merge( @@ -199,7 +116,7 @@ public function boot(): Promise ); if (self::$workCounts[$jobUuid] >= $this->flowConfig->getWorkerMaxRetry()) { - yield $this->queueManager->dequeue($job); + yield $this->queueBackend->dequeue($job); $this->logger->error( 'A Job reached maximum work retry limit and has been removed from queue.', array_merge( @@ -214,7 +131,7 @@ public function boot(): Promise continue; } - yield $this->queueManager->requeue($job, $this->flowConfig->getWorkerReleaseDelay()); + yield $this->queueBackend->requeue($job, $this->flowConfig->getWorkerReleaseDelay()); $this->logger->info( 'Worker released a Job', array_merge($logContext, ['release_delay' => $this->flowConfig->getWorkerReleaseDelay()]) @@ -265,7 +182,7 @@ private function waitForDependencies(float $lastProcessTimestamp, array $logCont foreach ($this->flowConfig->getDependsOn() as $dependency) { $sleepTime = $this->flowConfig->getInitialPollingInterval(); while (true) { - if (yield $this->queueManager->isEmpty($dependency)) { + if (yield $this->queueBackend->isEmpty($dependency)) { break; } $this->logger->debug( diff --git a/tests/Integration/HttpRequestProducerAndWorkerTest.php b/tests/Integration/HttpRequestProducerAndWorkerTest.php index 0e08399..7fbbe64 100644 --- a/tests/Integration/HttpRequestProducerAndWorkerTest.php +++ b/tests/Integration/HttpRequestProducerAndWorkerTest.php @@ -171,7 +171,7 @@ public function testHttpRequestProducerWithWrongUriShouldReturn404() ] ); $httpPort = self::$kernel->getContainer()->getParameter('http_server_port'); - + Loop::delay(100, function () use ($httpPort) { yield $this->waitForConnectionAvailable("tcp://127.0.0.1:{$httpPort}"); $payload = json_encode(['jobs' => ['job1', 'job2', 'job3']]); @@ -191,6 +191,67 @@ public function testHttpRequestProducerWithWrongUriShouldReturn404() $this->assertReadyJobsCountInTube(0, self::FLOW_CODE); } + public function testConcurrentHttpRequestsForSameFlow() + { + $workerFile = vfsStream::url('root/worker.data'); + self::createKernel( + [ + 'services' => [ + DummyHttpRequestProducer::class => ['arguments' => []], + DummyFilesystemWorker::class => ['arguments' => [$workerFile]], + ], + 'flows' => [ + self::FLOW_CODE => [ + 'description' => 'Http Request Producer And Worker Test Flow', + 'producer' => ['service' => DummyHttpRequestProducer::class], + 'worker' => ['service' => DummyFilesystemWorker::class], + ] + ] + ] + ); + $httpPort = self::$kernel->getContainer()->getParameter('http_server_port'); + + Loop::delay(100, function () use ($httpPort) { + yield $this->waitForConnectionAvailable("tcp://127.0.0.1:{$httpPort}"); + $payload1 = json_encode(['jobs' => ['job1']]); + $client1 = HttpClientBuilder::buildDefault(); + $request1 = new Request("http://127.0.0.1:{$httpPort}/dummy", 'POST'); + $request1->setBody($payload1); + $payload2 = json_encode(['jobs' => ['job2']]); + $client2 = HttpClientBuilder::buildDefault(); + $request2 = new Request("http://127.0.0.1:{$httpPort}/dummy", 'POST'); + $request2->setBody($payload2); + $responses = yield [$client1->request($request1), $client2->request($request2)]; + $this->assertStringContainsString( + '"Successfully scheduled 1 job(s) to be queued."', + yield $responses[0]->getBody()->read() + ); + $this->assertStringContainsString( + '"Successfully scheduled 1 job(s) to be queued."', + yield $responses[1]->getBody()->read() + ); + }); + $this->stopWhen(function () use ($workerFile) { + return (yield exists($workerFile)) && count($this->getFileLines($workerFile)) === 2; + }); + + self::$kernel->boot(); + + $workerFileLines = $this->getFileLines($workerFile); + $this->assertCount(2, $workerFileLines); + $this->assertStringContainsString('job1', implode(PHP_EOL, $workerFileLines)); + $this->assertStringContainsString('job2', implode(PHP_EOL, $workerFileLines)); + $this->logHandler()->hasRecordThatMatches( + '/Successfully produced a new Job .*? "payload_data":["job1"]/', + Logger::INFO + ); + $this->logHandler()->hasRecordThatMatches( + '/Successfully produced a new Job .*? "payload_data":["job2"]/', + Logger::INFO + ); + $this->assertReadyJobsCountInTube(0, self::FLOW_CODE); + } + private function waitForConnectionAvailable(string $uri): Promise { return call(function () use ($uri) { From 743d1d010f9725c792a7c73cde658f89199db489 Mon Sep 17 00:00:00 2001 From: Andrea Zambon Date: Thu, 8 Jan 2026 17:44:46 +0100 Subject: [PATCH 2/8] Fixes the test errors --- src/FlowExtension.php | 1 - src/ProducerInstance.php | 2 +- src/Service/BatchManagerFactory.php | 5 ++--- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/FlowExtension.php b/src/FlowExtension.php index 27d7db3..3e869fd 100644 --- a/src/FlowExtension.php +++ b/src/FlowExtension.php @@ -102,7 +102,6 @@ public function process(ContainerBuilder $container): void ->setShared(false) ->setAutowired(true) ->setClass(BatchManagerFactory::class) - ->setArgument('$queueBackend', new Reference($queueBackendId)) ->setArgument('$batchSize', $flowConfig->getProducerBatchSize()) ; $container->setDefinition($batchManagerFactoryId, $batchManagerFactoryDefinition); diff --git a/src/ProducerInstance.php b/src/ProducerInstance.php index 2caddc2..d1252ac 100644 --- a/src/ProducerInstance.php +++ b/src/ProducerInstance.php @@ -79,7 +79,7 @@ function ($watcherId) { public function produceAndQueueJobs($data = null): Promise { return call(function () use ($data) { - $batchManager = $this->batchManagerFactory->create(); + $batchManager = $this->batchManagerFactory->create($this->queueBackend); $jobsCount = 0; $job = null; try { diff --git a/src/Service/BatchManagerFactory.php b/src/Service/BatchManagerFactory.php index 9ee1da7..d13fa3d 100644 --- a/src/Service/BatchManagerFactory.php +++ b/src/Service/BatchManagerFactory.php @@ -7,13 +7,12 @@ final class BatchManagerFactory { public function __construct( - private readonly QueueBackendInterface $queueBackend, private readonly int $batchSize ) { } - public function create(): BatchManagerInterface + public function create(QueueBackendInterface $queueBackend): BatchManagerInterface { - return new BatchManager($this->queueBackend, $this->batchSize); + return new BatchManager($queueBackend, $this->batchSize); } } From 1efdbdfba063ae18a4acb9bc9ea21e39507cfffb Mon Sep 17 00:00:00 2001 From: Andrea Zambon Date: Mon, 12 Jan 2026 12:41:35 +0100 Subject: [PATCH 3/8] Removes the deprecated release_delay configuration parameter --- README.md | 1 - src/FlowConfiguration.php | 1 - src/Model/FlowConfig.php | 3 +-- src/WorkerInstance.php | 2 +- tests/Integration/FailingJobHandlingTest.php | 4 ++-- 5 files changed, 4 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index e8f0e97..8ae57c1 100644 --- a/README.md +++ b/README.md @@ -72,7 +72,6 @@ flows: service: My\Esb\Worker # A worker service ID defined above instances: 1 # The number of worker instances to spawn for this flow error_retry_delay: 0 # The number of seconds to wait before an errored job can be retried. The default is 0 (errored jobs can be retried immediately). Useful when "retrying later" might solve the problem. - release_delay: 0 # (deprecated) older name of the error_retry_delay parameter max_retry: 5 # The number of maximum work retries for a job in this tube/flow before being buried dependencies: # This whole section can be omitted if the current flow has no dependencies flows: ['other_flow_1', 'other_flow_2'] # Optional: dependencies of this flow toward other flow(s) diff --git a/src/FlowConfiguration.php b/src/FlowConfiguration.php index e1c9e0e..b5be518 100644 --- a/src/FlowConfiguration.php +++ b/src/FlowConfiguration.php @@ -38,7 +38,6 @@ public function getConfigTreeBuilder(): TreeBuilder ->scalarNode('service')->isRequired()->end() ->integerNode('instances')->min(1)->defaultValue(1)->end() ->integerNode('error_retry_delay')->min(0)->defaultValue(0)->end() - ->integerNode('release_delay')->min(0)->defaultValue(0)->setDeprecated('Use "error_retry_delay" instead')->end() ->integerNode('max_retry')->min(1)->defaultValue(5)->end() ->end() ->end() diff --git a/src/Model/FlowConfig.php b/src/Model/FlowConfig.php index 4d83910..5a0fee6 100644 --- a/src/Model/FlowConfig.php +++ b/src/Model/FlowConfig.php @@ -61,8 +61,7 @@ public function getWorkerInstancesCount(): int public function getWorkerReleaseDelay(): int { - //error_retry_delay is the new name, so give precedence to that - return $this->config['worker']['error_retry_delay'] ?: $this->config['worker']['release_delay']; + return $this->config['worker']['error_retry_delay']; } public function getWorkerMaxRetry(): int diff --git a/src/WorkerInstance.php b/src/WorkerInstance.php index f3cfe32..346a703 100644 --- a/src/WorkerInstance.php +++ b/src/WorkerInstance.php @@ -134,7 +134,7 @@ public function boot(): Promise yield $this->queueBackend->requeue($job, $this->flowConfig->getWorkerReleaseDelay()); $this->logger->info( 'Worker released a Job', - array_merge($logContext, ['release_delay' => $this->flowConfig->getWorkerReleaseDelay()]) + array_merge($logContext, ['error_retry_delay' => $this->flowConfig->getWorkerReleaseDelay()]) ); } } diff --git a/tests/Integration/FailingJobHandlingTest.php b/tests/Integration/FailingJobHandlingTest.php index 5b756a0..cb5eb18 100644 --- a/tests/Integration/FailingJobHandlingTest.php +++ b/tests/Integration/FailingJobHandlingTest.php @@ -26,7 +26,7 @@ public function testFailingJobIsReleasedWithProperDelayAndThenDeletedAftetProper 'producer' => ['service' => DummyRepeatProducer::class], 'worker' => [ 'service' => AlwaysFailingWorker::class, - 'release_delay' => 1, + 'error_retry_delay' => 1, 'max_retry' => 2 ], ] @@ -50,7 +50,7 @@ function (array $record) { $this->assertTrue( $this->logHandler()->hasInfoThatPasses( function (array $record) { - return $record['message'] === 'Worker released a Job' && $record['context']['release_delay'] === 1; + return $record['message'] === 'Worker released a Job' && $record['context']['error_retry_delay'] === 1; } ) ); From 6a918e9ebacaa9f2abbec5123d60c16f8eacbced Mon Sep 17 00:00:00 2001 From: Andrea Zambon Date: Mon, 12 Jan 2026 12:55:11 +0100 Subject: [PATCH 4/8] Removes a bunch of unused deprecated exception classes --- .../Pager/LessThan1CurrentPageException.php | 20 ------------------- .../Pager/LessThan1MaxPerPageException.php | 17 ---------------- src/Console/Pager/NotBooleanException.php | 19 ------------------ .../Pager/NotIntegerCurrentPageException.php | 19 ------------------ src/Console/Pager/NotIntegerException.php | 19 ------------------ .../Pager/NotIntegerMaxPerPageException.php | 19 ------------------ .../Pager/NotValidCurrentPageException.php | 20 ------------------- .../Pager/NotValidMaxPerPageException.php | 20 ------------------- .../Pager/OutOfRangeCurrentPageException.php | 20 ------------------- 9 files changed, 173 deletions(-) delete mode 100644 src/Console/Pager/LessThan1CurrentPageException.php delete mode 100644 src/Console/Pager/LessThan1MaxPerPageException.php delete mode 100644 src/Console/Pager/NotBooleanException.php delete mode 100644 src/Console/Pager/NotIntegerCurrentPageException.php delete mode 100644 src/Console/Pager/NotIntegerException.php delete mode 100644 src/Console/Pager/NotIntegerMaxPerPageException.php delete mode 100644 src/Console/Pager/NotValidCurrentPageException.php delete mode 100644 src/Console/Pager/NotValidMaxPerPageException.php delete mode 100644 src/Console/Pager/OutOfRangeCurrentPageException.php diff --git a/src/Console/Pager/LessThan1CurrentPageException.php b/src/Console/Pager/LessThan1CurrentPageException.php deleted file mode 100644 index 245dc88..0000000 --- a/src/Console/Pager/LessThan1CurrentPageException.php +++ /dev/null @@ -1,20 +0,0 @@ - Date: Mon, 12 Jan 2026 12:58:20 +0100 Subject: [PATCH 5/8] Removes the deprecated pagination functions --- src/Console/Pager/AsyncPager.php | 45 -------------------------------- 1 file changed, 45 deletions(-) diff --git a/src/Console/Pager/AsyncPager.php b/src/Console/Pager/AsyncPager.php index 3a4eecc..aef91bf 100644 --- a/src/Console/Pager/AsyncPager.php +++ b/src/Console/Pager/AsyncPager.php @@ -224,57 +224,12 @@ public function getMaxPerPage() */ public function setCurrentPage($currentPage) { - $this->useDeprecatedCurrentPageBooleanArguments(func_get_args()); - $this->currentPage = $this->filterCurrentPage($currentPage); $this->resetForCurrentPageChange(); return $this; } - /** - * @param array $arguments - */ - private function useDeprecatedCurrentPageBooleanArguments($arguments): void - { - $this->useDeprecatedCurrentPageAllowOutOfRangePagesBooleanArgument($arguments); - $this->useDeprecatedCurrentPageNormalizeOutOfRangePagesBooleanArgument($arguments); - } - - /** - * @param array $arguments - */ - private function useDeprecatedCurrentPageAllowOutOfRangePagesBooleanArgument($arguments): void - { - $index = 1; - $method = 'setAllowOutOfRangePages'; - - $this->useDeprecatedBooleanArgument($arguments, $index, $method); - } - - /** - * @param array $arguments - */ - private function useDeprecatedCurrentPageNormalizeOutOfRangePagesBooleanArgument($arguments): void - { - $index = 2; - $method = 'setNormalizeOutOfRangePages'; - - $this->useDeprecatedBooleanArgument($arguments, $index, $method); - } - - /** - * @param array $arguments - * @param int $index - * @param string $method - */ - private function useDeprecatedBooleanArgument($arguments, $index, $method): void - { - if (isset($arguments[$index])) { - $this->$method($arguments[$index]); - } - } - /** * @param mixed $currentPage * @return int From 560dfcab328b045f1b87b4ec23527a307e95f087 Mon Sep 17 00:00:00 2001 From: Andrea Zambon Date: Mon, 12 Jan 2026 13:01:02 +0100 Subject: [PATCH 6/8] Fix CS --- src/ProducerInstance.php | 1 - 1 file changed, 1 deletion(-) diff --git a/src/ProducerInstance.php b/src/ProducerInstance.php index ab483e3..d1252ac 100644 --- a/src/ProducerInstance.php +++ b/src/ProducerInstance.php @@ -8,7 +8,6 @@ use Amp\Loop; use Amp\Promise; use Psr\Log\LoggerInterface; -use Ramsey\Uuid\Uuid; use Webgriffe\Esb\Model\FlowConfig; use Webgriffe\Esb\Model\Job; use Webgriffe\Esb\Model\ProducedJobEvent; From e3cf9f6eecef6fc82b08b6c53d7e02675ee955f1 Mon Sep 17 00:00:00 2001 From: Andrea Zambon Date: Mon, 12 Jan 2026 14:12:58 +0100 Subject: [PATCH 7/8] Updates the build to include PHP 8.5 and run on the latest Ubuntu version --- .github/workflows/ci.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index d736925..13113e6 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -12,12 +12,12 @@ jobs: tests: name: "PHP ${{ matrix.php }}" - runs-on: ubuntu-22.04 + runs-on: ubuntu-24.04 strategy: fail-fast: false matrix: - php: [8.1, 8.2, 8.3, 8.4] + php: [8.1, 8.2, 8.3, 8.4, 8.5] env: ESB_CONSOLE_PORT: 8080 From b041d2fc5383468e4a6ec85ac266f1b2f66275ce Mon Sep 17 00:00:00 2001 From: Andrea Zambon Date: Mon, 12 Jan 2026 14:17:54 +0100 Subject: [PATCH 8/8] Adds PHP 8.5 to the allowed PHP versions in the composer.json file --- composer.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/composer.json b/composer.json index 2e1fd06..18926e2 100644 --- a/composer.json +++ b/composer.json @@ -10,7 +10,7 @@ } ], "require": { - "php": "~8.1.0|~8.2.0|~8.3.0|~8.4.0", + "php": "~8.1.0|~8.2.0|~8.3.0|~8.4.0|~8.5.0", "ext-pcntl": "*", "ext-json": "*", "ext-mbstring": "*",