diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index d736925..13113e6 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -12,12 +12,12 @@ jobs: tests: name: "PHP ${{ matrix.php }}" - runs-on: ubuntu-22.04 + runs-on: ubuntu-24.04 strategy: fail-fast: false matrix: - php: [8.1, 8.2, 8.3, 8.4] + php: [8.1, 8.2, 8.3, 8.4, 8.5] env: ESB_CONSOLE_PORT: 8080 diff --git a/README.md b/README.md index e8f0e97..8ae57c1 100644 --- a/README.md +++ b/README.md @@ -72,7 +72,6 @@ flows: service: My\Esb\Worker # A worker service ID defined above instances: 1 # The number of worker instances to spawn for this flow error_retry_delay: 0 # The number of seconds to wait before an errored job can be retried. The default is 0 (errored jobs can be retried immediately). Useful when "retrying later" might solve the problem. - release_delay: 0 # (deprecated) older name of the error_retry_delay parameter max_retry: 5 # The number of maximum work retries for a job in this tube/flow before being buried dependencies: # This whole section can be omitted if the current flow has no dependencies flows: ['other_flow_1', 'other_flow_2'] # Optional: dependencies of this flow toward other flow(s) diff --git a/composer.json b/composer.json index 2e1fd06..18926e2 100644 --- a/composer.json +++ b/composer.json @@ -10,7 +10,7 @@ } ], "require": { - "php": "~8.1.0|~8.2.0|~8.3.0|~8.4.0", + "php": "~8.1.0|~8.2.0|~8.3.0|~8.4.0|~8.5.0", "ext-pcntl": "*", "ext-json": "*", "ext-mbstring": "*", diff --git a/src/Console/Pager/AsyncPager.php b/src/Console/Pager/AsyncPager.php index 3a4eecc..aef91bf 100644 --- a/src/Console/Pager/AsyncPager.php +++ b/src/Console/Pager/AsyncPager.php @@ -224,57 +224,12 @@ public function getMaxPerPage() */ public function setCurrentPage($currentPage) { - $this->useDeprecatedCurrentPageBooleanArguments(func_get_args()); - $this->currentPage = $this->filterCurrentPage($currentPage); $this->resetForCurrentPageChange(); return $this; } - /** - * @param array $arguments - */ - private function useDeprecatedCurrentPageBooleanArguments($arguments): void - { - $this->useDeprecatedCurrentPageAllowOutOfRangePagesBooleanArgument($arguments); - $this->useDeprecatedCurrentPageNormalizeOutOfRangePagesBooleanArgument($arguments); - } - - /** - * @param array $arguments - */ - private function useDeprecatedCurrentPageAllowOutOfRangePagesBooleanArgument($arguments): void - { - $index = 1; - $method = 'setAllowOutOfRangePages'; - - $this->useDeprecatedBooleanArgument($arguments, $index, $method); - } - - /** - * @param array $arguments - */ - private function useDeprecatedCurrentPageNormalizeOutOfRangePagesBooleanArgument($arguments): void - { - $index = 2; - $method = 'setNormalizeOutOfRangePages'; - - $this->useDeprecatedBooleanArgument($arguments, $index, $method); - } - - /** - * @param array $arguments - * @param int $index - * @param string $method - */ - private function useDeprecatedBooleanArgument($arguments, $index, $method): void - { - if (isset($arguments[$index])) { - $this->$method($arguments[$index]); - } - } - /** * @param mixed $currentPage * @return int diff --git a/src/Console/Pager/LessThan1CurrentPageException.php b/src/Console/Pager/LessThan1CurrentPageException.php deleted file mode 100644 index 245dc88..0000000 --- a/src/Console/Pager/LessThan1CurrentPageException.php +++ /dev/null @@ -1,20 +0,0 @@ -scalarNode('service')->isRequired()->end() ->integerNode('instances')->min(1)->defaultValue(1)->end() ->integerNode('error_retry_delay')->min(0)->defaultValue(0)->end() - ->integerNode('release_delay')->min(0)->defaultValue(0)->setDeprecated('Use "error_retry_delay" instead')->end() ->integerNode('max_retry')->min(1)->defaultValue(5)->end() ->end() ->end() diff --git a/src/FlowExtension.php b/src/FlowExtension.php index e648549..3e869fd 100644 --- a/src/FlowExtension.php +++ b/src/FlowExtension.php @@ -13,7 +13,9 @@ use Symfony\Component\DependencyInjection\Extension\ExtensionInterface; use Symfony\Component\DependencyInjection\Reference; use Webgriffe\Esb\Model\FlowConfig; -use Webgriffe\Esb\Service\QueueManager; +use Webgriffe\Esb\Service\BatchManager; +use Webgriffe\Esb\Service\BatchManagerFactory; +use Webgriffe\Esb\Service\BeanstalkElasticsearchQueueBackend; final class FlowExtension implements ExtensionInterface, CompilerPassInterface { @@ -66,10 +68,12 @@ public function process(ContainerBuilder $container): void { //These classes are defined manually. Remove the default definitions otherwise the container generates errors //trying to autowire them - $container->removeDefinition(QueueManager::class); + $container->removeDefinition(BeanstalkElasticsearchQueueBackend::class); $container->removeDefinition(FlowConfig::class); $container->removeDefinition(ProducerInstance::class); $container->removeDefinition(WorkerInstance::class); + $container->removeDefinition(BatchManager::class); + $container->removeDefinition(BatchManagerFactory::class); $flowManagerDefinition = $container->findDefinition(FlowManager::class); foreach ($this->flowsConfig as $flowName => $flowConfigData) { @@ -78,30 +82,38 @@ public function process(ContainerBuilder $container): void $flowDefinition = new Definition(Flow::class); $flowDefinition->setAutowired(true); $flowDefinition->setArgument('$flowConfig', $flowConfig); - $queueManagerId = 'flow.queue_manager.' . $flowName; + $queueBackendId = 'flow.queue_backend.' . $flowName; + $batchManagerFactoryId = 'flow.batch_manager_factory.' . $flowName; try { $producerDefinition = $container->findDefinition($flowConfig->getProducerServiceId()); $producerDefinition->setShared(false); - $queueManagerDefinition = new Definition(); - $queueManagerDefinition + $queueBackendDefinition = new Definition(); + $queueBackendDefinition ->setShared(false) ->setAutowired(true) - ->setClass(QueueManager::class) + ->setClass(BeanstalkElasticsearchQueueBackend::class) ->setArgument('$flowConfig', $flowConfig) + ; + $container->setDefinition($queueBackendId, $queueBackendDefinition); + + $batchManagerFactoryDefinition = new Definition(); + $batchManagerFactoryDefinition + ->setShared(false) + ->setAutowired(true) + ->setClass(BatchManagerFactory::class) ->setArgument('$batchSize', $flowConfig->getProducerBatchSize()) ; - $container->setDefinition($queueManagerId, $queueManagerDefinition); + $container->setDefinition($batchManagerFactoryId, $batchManagerFactoryDefinition); $producerInstanceDefinition = new Definition(); $producerInstanceDefinition ->setAutowired(true) ->setClass(ProducerInstance::class) - ->setArgument('$producer', new Reference($flowConfig->getProducerServiceId())) ->setArgument('$flowConfig', $flowConfig) - ->setArgument('$queueManager', new Reference($queueManagerId)) - ->setArgument('$beanstalkClient', null) - ->setArgument('$elasticSearch', null) + ->setArgument('$producer', new Reference($flowConfig->getProducerServiceId())) + ->setArgument('$queueBackend', new Reference($queueBackendId)) + ->setArgument('$batchManagerFactory', new Reference($batchManagerFactoryId)) ; $producerInstanceId = 'flow.producer_instance' . $flowName; $container->setDefinition($producerInstanceId, $producerInstanceDefinition); @@ -129,9 +141,7 @@ public function process(ContainerBuilder $container): void ->setArgument('$flowConfig', $flowConfig) ->setArgument('$instanceId', $instanceId) ->setArgument('$worker', new Reference($flowConfig->getWorkerServiceId())) - ->setArgument('$queueManager', new Reference($queueManagerId)) - ->setArgument('$beanstalkClient', null) - ->setArgument('$elasticSearch', null) + ->setArgument('$queueBackend', new Reference($queueBackendId)) ; $workerInstanceId = sprintf('flow.worker_instance.%s.%s', $flowName, $instanceId); $container->setDefinition($workerInstanceId, $workerInstanceDefinition); diff --git a/src/Model/FlowConfig.php b/src/Model/FlowConfig.php index 4d83910..5a0fee6 100644 --- a/src/Model/FlowConfig.php +++ b/src/Model/FlowConfig.php @@ -61,8 +61,7 @@ public function getWorkerInstancesCount(): int public function getWorkerReleaseDelay(): int { - //error_retry_delay is the new name, so give precedence to that - return $this->config['worker']['error_retry_delay'] ?: $this->config['worker']['release_delay']; + return $this->config['worker']['error_retry_delay']; } public function getWorkerMaxRetry(): int diff --git a/src/ProducerInstance.php b/src/ProducerInstance.php index c6a67c1..d1252ac 100644 --- a/src/ProducerInstance.php +++ b/src/ProducerInstance.php @@ -4,124 +4,38 @@ namespace Webgriffe\Esb; -use Amp\Beanstalk\BeanstalkClient; use function Amp\call; use Amp\Loop; use Amp\Promise; use Psr\Log\LoggerInterface; -use Ramsey\Uuid\Uuid; use Webgriffe\Esb\Model\FlowConfig; use Webgriffe\Esb\Model\Job; use Webgriffe\Esb\Model\ProducedJobEvent; +use Webgriffe\Esb\Service\BatchManagerFactory; use Webgriffe\Esb\Service\CronProducersServer; -use Webgriffe\Esb\Service\ElasticSearch; + use Webgriffe\Esb\Service\HttpProducersServer; -use Webgriffe\Esb\Service\ProducerQueueManagerInterface; -use Webgriffe\Esb\Service\QueueManager; + +use Webgriffe\Esb\Service\QueueBackendInterface; final class ProducerInstance implements ProducerInstanceInterface { - /** - * @var FlowConfig - */ - private $flowConfig; - - /** - * @var ProducerInterface - */ - private $producer; - - /** - * @var LoggerInterface - */ - private $logger; - - /** - * @var HttpProducersServer - */ - private $httpProducersServer; - - /** - * @var CronProducersServer - */ - private $cronProducersServer; - - /** - * @var ProducerQueueManagerInterface - */ - private $queueManager; - public function __construct( - FlowConfig $flowConfig, - ProducerInterface $producer, - ?BeanstalkClient $beanstalkClient, - LoggerInterface $logger, - HttpProducersServer $httpProducersServer, - CronProducersServer $cronProducersServer, - ?ElasticSearch $elasticSearch, - ?ProducerQueueManagerInterface $queueManager = null + private readonly FlowConfig $flowConfig, + private readonly ProducerInterface $producer, + private readonly LoggerInterface $logger, + private readonly HttpProducersServer $httpProducersServer, + private readonly CronProducersServer $cronProducersServer, + private readonly QueueBackendInterface $queueBackend, + private readonly BatchManagerFactory $batchManagerFactory, ) { - if ($beanstalkClient !== null) { - trigger_deprecation( - 'webgriffe/esb', - '2.2', - 'Passing a "%s" to "%s" is deprecated and will be removed in 3.0. ' . - 'Please pass a "%s" instead.', - BeanstalkClient::class, - __CLASS__, - ProducerQueueManagerInterface::class - ); - } - if ($elasticSearch !== null) { - trigger_deprecation( - 'webgriffe/esb', - '2.2', - 'Passing a "%s" to "%s" is deprecated and will be removed in 3.0. ' . - 'Please pass a "%s" instead.', - ElasticSearch::class, - __CLASS__, - ProducerQueueManagerInterface::class - ); - } - $this->flowConfig = $flowConfig; - $this->producer = $producer; - $this->logger = $logger; - $this->httpProducersServer = $httpProducersServer; - $this->cronProducersServer = $cronProducersServer; - - if ($queueManager === null) { - trigger_deprecation( - 'webgriffe/esb', - '2.2', - 'Not passing a "%s" to "%s" is deprecated and will be required in 3.0.', - ProducerQueueManagerInterface::class, - __CLASS__ - ); - - if (!$beanstalkClient) { - throw new \RuntimeException('Cannot create a QueueManager without the Beanstalk client!'); - } - - if (!$elasticSearch) { - throw new \RuntimeException('Cannot create a QueueManager without the ElasticSearch client'); - } - - $queueManager = new QueueManager( - $this->flowConfig, - $beanstalkClient, - $elasticSearch, - $this->logger, - 1000 - ); - } - $this->queueManager = $queueManager; } public function boot(): Promise { return call(function () { yield $this->producer->init(); - yield $this->queueManager->boot(); + yield $this->queueBackend->boot(); $this->logger->info( 'A Producer has been successfully initialized', @@ -165,7 +79,7 @@ function ($watcherId) { public function produceAndQueueJobs($data = null): Promise { return call(function () use ($data) { - $batchId = Uuid::uuid1()->toString(); + $batchManager = $this->batchManagerFactory->create($this->queueBackend); $jobsCount = 0; $job = null; try { @@ -174,18 +88,10 @@ public function produceAndQueueJobs($data = null): Promise /** @var Job $job */ $job = $jobs->getCurrent(); $job->addEvent(new ProducedJobEvent(new \DateTime(), \get_class($this->producer))); - if ($this->queueManager instanceof QueueManager) { - $jobsCount += yield $this->queueManager->enqueue($job, $batchId); - } else { - $jobsCount += yield $this->queueManager->enqueue($job); - } + $jobsCount += yield $batchManager->enqueue($job); } - if ($this->queueManager instanceof QueueManager) { - $jobsCount += yield $this->queueManager->flush($batchId); - } else { - $jobsCount += yield $this->queueManager->flush(); - } + $jobsCount += yield $batchManager->flush(); } catch (\Throwable $error) { $this->logger->error( 'An error occurred producing/queueing jobs.', @@ -193,7 +99,6 @@ public function produceAndQueueJobs($data = null): Promise 'producer' => \get_class($this->producer), 'last_job_payload_data' => $job ? NonUtf8Cleaner::clean($job->getPayloadData()) : null, 'error' => $error->getMessage(), - 'batch_id' => $batchId, ] ); } diff --git a/src/Service/BatchManager.php b/src/Service/BatchManager.php new file mode 100644 index 0000000..9dd0152 --- /dev/null +++ b/src/Service/BatchManager.php @@ -0,0 +1,66 @@ +queueBackend->jobExists($job->getUuid()); + if ($jobExists) { + throw new \RuntimeException( + sprintf( + 'A job with UUID "%s" already exists but this should be a new job.', + $job->getUuid() + ) + ); + } + $this->batch[$job->getUuid()] = $job; + + $count = count($this->batch); + if ($count < $this->batchSize) { + return 0; //Number of jobs actually added to the queue + } + + return yield $this->flush(); + }); + } + + /** + * @inheritdoc + */ + public function flush(): Promise + { + return call(function () { + $jobsCount = count($this->batch); + if ($jobsCount > 0) { + yield $this->queueBackend->enqueueJobs($this->batch); + } + $this->batch = []; + + return $jobsCount; + }); + } +} diff --git a/src/Service/BatchManagerFactory.php b/src/Service/BatchManagerFactory.php new file mode 100644 index 0000000..d13fa3d --- /dev/null +++ b/src/Service/BatchManagerFactory.php @@ -0,0 +1,18 @@ +batchSize); + } +} diff --git a/src/Service/ProducerQueueManagerInterface.php b/src/Service/BatchManagerInterface.php similarity index 84% rename from src/Service/ProducerQueueManagerInterface.php rename to src/Service/BatchManagerInterface.php index 1c2fa00..e790261 100644 --- a/src/Service/ProducerQueueManagerInterface.php +++ b/src/Service/BatchManagerInterface.php @@ -7,15 +7,8 @@ use Amp\Promise; use Webgriffe\Esb\Model\JobInterface; -interface ProducerQueueManagerInterface +interface BatchManagerInterface { - /** - * Initializes this queue manager. Must be called before this can be used - * - * @return Promise - */ - public function boot(): Promise; - /** * Adds a new job to the queue managed by this object. The method returns a promise that resolves to the number of * jobs that were actually added to the underlying queue. In the simplest case this number is 1, but if the diff --git a/src/Service/QueueManager.php b/src/Service/BeanstalkElasticsearchQueueBackend.php similarity index 64% rename from src/Service/QueueManager.php rename to src/Service/BeanstalkElasticsearchQueueBackend.php index 2cf9bb6..a21a374 100644 --- a/src/Service/QueueManager.php +++ b/src/Service/BeanstalkElasticsearchQueueBackend.php @@ -13,9 +13,10 @@ use Webgriffe\Esb\Model\FlowConfig; use Webgriffe\Esb\Model\Job; use Webgriffe\Esb\Model\JobInterface; + use Webgriffe\Esb\NonUtf8Cleaner; -final class QueueManager implements ProducerQueueManagerInterface, WorkerQueueManagerInterface +final class BeanstalkElasticsearchQueueBackend implements QueueBackendInterface { private const DEFAULT_BATCH_ID = 'default'; @@ -40,33 +41,23 @@ final class QueueManager implements ProducerQueueManagerInterface, WorkerQueueMa private $logger; /** - * @var array> - */ - private $batches = []; - - /** - * @TODO This map is static because it must be shared between each QueueManager instance: it could be refactored + * @TODO This map is static because it must be shared between each QueueBackend instance: it could be refactored * extracting the mapping service to a dedicated class + * * @var int[] */ private static $uuidToBeanstalkIdMap = []; - /** - * @var int - */ - private $batchSize; public function __construct( FlowConfig $flowConfig, BeanstalkClient $beanstalkClient, ElasticSearch $elasticSearch, LoggerInterface $logger, - int $batchSize ) { $this->flowConfig = $flowConfig; $this->beanstalkClient = $beanstalkClient; $this->elasticSearch = $elasticSearch; $this->logger = $logger; - $this->batchSize = $batchSize; } /** @@ -141,55 +132,6 @@ public function boot(): Promise }); } - /** - * @inheritdoc - */ - public function enqueue(JobInterface $job, ?string $batchId = null): Promise - { - if ($batchId === null) { - $batchId = self::DEFAULT_BATCH_ID; - } - - return call(function () use ($job, $batchId) { - $jobExists = yield $this->jobExists($job->getUuid()); - if ($jobExists) { - throw new \RuntimeException( - sprintf( - 'A job with UUID "%s" already exists but this should be a new job.', - $job->getUuid() - ) - ); - } - $this->addJobToBatch($batchId, $job); - - $count = count($this->getBatch($batchId)); - if ($count < $this->batchSize) { - return 0; //Number of jobs actually added to the queue - } - - yield from $this->processBatch($batchId); - return $count; - }); - } - - /** - * @inheritdoc - */ - public function flush(?string $batchId = null): Promise - { - if ($batchId === null) { - $batchId = self::DEFAULT_BATCH_ID; - } - - return call(function () use ($batchId) { - $jobsCount = count($this->getBatch($batchId)); - if ($jobsCount > 0) { - yield from $this->processBatch($batchId); - } - return $jobsCount; - }); - } - /** * @inheritdoc */ @@ -268,7 +210,7 @@ public function isEmpty(string $queueName): Promise * @param string $jobUuid * @return Promise */ - private function jobExists(string $jobUuid): Promise + public function jobExists(string $jobUuid): Promise { return call(function () use ($jobUuid) { try { @@ -281,53 +223,52 @@ private function jobExists(string $jobUuid): Promise } /** - * @return \Generator + * @param array $jobs */ - private function processBatch(string $batchId): \Generator + public function enqueueJobs(array $jobs): Promise { - $this->logger->debug('Processing batch'); - $result = yield $this->elasticSearch->bulkIndexJobs($this->getBatch($batchId), $this->flowConfig->getTube()); - - if ($result['errors'] === true) { - foreach ($result['items'] as $item) { - if (!array_key_exists('index', $item)) { - $this->logger->error( - 'Unexpected response item in bulk index response', - ['bulk_index_response_item' => $item, 'batch_id' => $batchId] - ); - continue; - } - $itemStatusCode = $item['index']['status'] ?? null; - if (!$this->isSuccessfulStatusCode($itemStatusCode)) { - $uuid = $item['index']['_id']; - $this->removeJobFromBatch($batchId, $uuid); - $this->logger->error( - 'Job could not be indexed in ElasticSearch', - ['bulk_index_response_item' => $item, 'batch_id' => $batchId] - ); + return call(function () use ($jobs) { + $this->logger->debug('Enqueuing jobs batch...'); + $result = yield $this->elasticSearch->bulkIndexJobs($jobs, $this->flowConfig->getTube()); + + if ($result['errors'] === true) { + foreach ($result['items'] as $item) { + if (!array_key_exists('index', $item)) { + $this->logger->error( + 'Unexpected response item in bulk index response', + ['bulk_index_response_item' => $item] + ); + continue; + } + $itemStatusCode = $item['index']['status'] ?? null; + if (!$this->isSuccessfulStatusCode($itemStatusCode)) { + $uuid = $item['index']['_id']; + unset($jobs[$uuid]); + $this->logger->error( + 'Job could not be indexed in ElasticSearch', + ['bulk_index_response_item' => $item] + ); + } } } - } - foreach ($this->getBatch($batchId) as $singleJob) { - yield $this->beanstalkClient->put( - $singleJob->getUuid(), - $singleJob->getTimeout(), - $singleJob->getDelay(), - $singleJob->getPriority() - ); - $this->logger->info( - 'Successfully enqueued a new Job', - [ - 'flow_name' => $this->flowConfig->getName(), - 'job_uuid' => $singleJob->getUuid(), - 'payload_data' => NonUtf8Cleaner::clean($singleJob->getPayloadData()), - 'batch_id' => $batchId - ] - ); - } - - $this->clearBatch($batchId); + foreach ($jobs as $singleJob) { + yield $this->beanstalkClient->put( + $singleJob->getUuid(), + $singleJob->getTimeout(), + $singleJob->getDelay(), + $singleJob->getPriority() + ); + $this->logger->info( + 'Successfully enqueued a new Job', + [ + 'flow_name' => $this->flowConfig->getName(), + 'job_uuid' => $singleJob->getUuid(), + 'payload_data' => NonUtf8Cleaner::clean($singleJob->getPayloadData()) + ] + ); + } + }); } /** @@ -353,36 +294,8 @@ private function getJobBeanstalkId(JobInterface $job): int throw new \RuntimeException("Unknown Beanstalk id for job {$uuid}"); } - public function isSuccessfulStatusCode(?int $statusCode): bool + private function isSuccessfulStatusCode(?int $statusCode): bool { return $statusCode !== null && $statusCode >= 200 && $statusCode < 300; } - - /** - * @param string $batchId - * @return array - */ - private function getBatch(string $batchId): array - { - return $this->batches[$batchId] ?? []; - } - - private function addJobToBatch(string $batchId, JobInterface $job): void - { - $this->batches[$batchId][$job->getUuid()] = $job; - } - - private function removeJobFromBatch(string $batchId, string $jobUuid): void - { - unset($this->batches[$batchId][$jobUuid]); - } - - private function clearBatch(string $batchId): void - { - if (!array_key_exists($batchId, $this->batches)) { - return; - } - - $this->batches[$batchId] = []; - } } diff --git a/src/Service/WorkerQueueManagerInterface.php b/src/Service/QueueBackendInterface.php similarity index 81% rename from src/Service/WorkerQueueManagerInterface.php rename to src/Service/QueueBackendInterface.php index c9e6806..6f654ab 100644 --- a/src/Service/WorkerQueueManagerInterface.php +++ b/src/Service/QueueBackendInterface.php @@ -7,10 +7,10 @@ use Amp\Promise; use Webgriffe\Esb\Model\JobInterface; -interface WorkerQueueManagerInterface +interface QueueBackendInterface { /** - * Initializes this queue manager. Must be called before this can be used + * Initializes this queue backend. Must be called before this can be used * * @return Promise */ @@ -58,4 +58,16 @@ public function dequeue(JobInterface $job): Promise; * @return Promise */ public function isEmpty(string $queueName): Promise; + + /** + * @param string $jobUuid + * @return Promise + */ + public function jobExists(string $jobUuid): Promise; + + /** + * @param JobInterface[] $jobs + * @return Promise + */ + public function enqueueJobs(array $jobs): Promise; } diff --git a/src/WorkerInstance.php b/src/WorkerInstance.php index faa67f8..346a703 100644 --- a/src/WorkerInstance.php +++ b/src/WorkerInstance.php @@ -4,7 +4,6 @@ namespace Webgriffe\Esb; -use Amp\Beanstalk\BeanstalkClient; use function Amp\call; use function Amp\delay; use Amp\Promise; @@ -15,111 +14,29 @@ use Webgriffe\Esb\Model\JobInterface; use Webgriffe\Esb\Model\ReservedJobEvent; use Webgriffe\Esb\Model\WorkedJobEvent; -use Webgriffe\Esb\Service\ElasticSearch; -use Webgriffe\Esb\Service\QueueManager; -use Webgriffe\Esb\Service\WorkerQueueManagerInterface; +use Webgriffe\Esb\Service\QueueBackendInterface; final class WorkerInstance implements WorkerInstanceInterface { - /** - * @var FlowConfig - */ - private $flowConfig; - - /** - * @var int - */ - private $instanceId; - - /** - * @var WorkerInterface - */ - private $worker; - - /** - * @var LoggerInterface - */ - private $logger; - - /** - * @var WorkerQueueManagerInterface - */ - private $queueManager; - /** * @var array */ private static $workCounts = []; public function __construct( - FlowConfig $flowConfig, - int $instanceId, - WorkerInterface $worker, - ?BeanstalkClient $beanstalkClient, - LoggerInterface $logger, - ?ElasticSearch $elasticSearch, - ?WorkerQueueManagerInterface $queueManager = null + private readonly FlowConfig $flowConfig, + private readonly int $instanceId, + private readonly WorkerInterface $worker, + private readonly LoggerInterface $logger, + private readonly QueueBackendInterface $queueBackend ) { - if ($beanstalkClient !== null) { - trigger_deprecation( - 'webgriffe/esb', - '2.2', - 'Passing a "%s" to "%s" is deprecated and will be removed in 3.0. ' . - 'Please pass a "%s" instead.', - BeanstalkClient::class, - __CLASS__, - WorkerQueueManagerInterface::class - ); - } - if ($elasticSearch !== null) { - trigger_deprecation( - 'webgriffe/esb', - '2.2', - 'Passing a "%s" to "%s" is deprecated and will be removed in 3.0. ' . - 'Please pass a "%s" instead.', - ElasticSearch::class, - __CLASS__, - WorkerQueueManagerInterface::class - ); - } - $this->flowConfig = $flowConfig; - $this->instanceId = $instanceId; - $this->worker = $worker; - $this->logger = $logger; - - if ($queueManager === null) { - trigger_deprecation( - 'webgriffe/esb', - '2.2', - 'Not passing a "%s" to "%s" is deprecated and will be required in 3.0.', - WorkerQueueManagerInterface::class, - __CLASS__ - ); - - if (!$beanstalkClient) { - throw new \RuntimeException('Cannot create a QueueManager without the Beanstalk client!'); - } - - if (!$elasticSearch) { - throw new \RuntimeException('Cannot create a QueueManager without the ElasticSearch client'); - } - - $queueManager = new QueueManager( - $this->flowConfig, - $beanstalkClient, - $elasticSearch, - $this->logger, - 1000 - ); - } - $this->queueManager = $queueManager; } public function boot(): Promise { return call(function () { yield $this->worker->init(); - yield $this->queueManager->boot(); + yield $this->queueBackend->boot(); $workerFqcn = \get_class($this->worker); $globalLogContext = [ @@ -145,7 +62,7 @@ public function boot(): Promise try { /** @var JobInterface $job */ - if (!($job = yield $this->queueManager->getNextJob())) { + if (!($job = yield $this->queueBackend->getNextJob())) { break; } } catch (FatalQueueException $ex) { @@ -163,7 +80,7 @@ public function boot(): Promise yield $this->waitForDependencies($lastProcessTimestamp, $logContext); $job->addEvent(new ReservedJobEvent(new \DateTime(), $workerFqcn)); - yield $this->queueManager->updateJob($job); + yield $this->queueBackend->updateJob($job); $payloadData = $job->getPayloadData(); $logContext['payload_data'] = NonUtf8Cleaner::clean($payloadData); @@ -178,14 +95,14 @@ public function boot(): Promise yield $this->worker->work($job); $job->addEvent(new WorkedJobEvent(new \DateTime(), $workerFqcn)); - yield $this->queueManager->updateJob($job); + yield $this->queueBackend->updateJob($job); $this->logger->info('Successfully worked a Job', $logContext); - yield $this->queueManager->dequeue($job); + yield $this->queueBackend->dequeue($job); unset(self::$workCounts[$jobUuid]); } catch (\Throwable $e) { $job->addEvent(new ErroredJobEvent(new \DateTime(), $workerFqcn, $e->getMessage())); - yield $this->queueManager->updateJob($job); + yield $this->queueBackend->updateJob($job); $this->logger->notice( 'An error occurred while working a Job.', array_merge( @@ -199,7 +116,7 @@ public function boot(): Promise ); if (self::$workCounts[$jobUuid] >= $this->flowConfig->getWorkerMaxRetry()) { - yield $this->queueManager->dequeue($job); + yield $this->queueBackend->dequeue($job); $this->logger->error( 'A Job reached maximum work retry limit and has been removed from queue.', array_merge( @@ -214,10 +131,10 @@ public function boot(): Promise continue; } - yield $this->queueManager->requeue($job, $this->flowConfig->getWorkerReleaseDelay()); + yield $this->queueBackend->requeue($job, $this->flowConfig->getWorkerReleaseDelay()); $this->logger->info( 'Worker released a Job', - array_merge($logContext, ['release_delay' => $this->flowConfig->getWorkerReleaseDelay()]) + array_merge($logContext, ['error_retry_delay' => $this->flowConfig->getWorkerReleaseDelay()]) ); } } @@ -265,7 +182,7 @@ private function waitForDependencies(float $lastProcessTimestamp, array $logCont foreach ($this->flowConfig->getDependsOn() as $dependency) { $sleepTime = $this->flowConfig->getInitialPollingInterval(); while (true) { - if (yield $this->queueManager->isEmpty($dependency)) { + if (yield $this->queueBackend->isEmpty($dependency)) { break; } $this->logger->debug( diff --git a/tests/Integration/FailingJobHandlingTest.php b/tests/Integration/FailingJobHandlingTest.php index 5b756a0..cb5eb18 100644 --- a/tests/Integration/FailingJobHandlingTest.php +++ b/tests/Integration/FailingJobHandlingTest.php @@ -26,7 +26,7 @@ public function testFailingJobIsReleasedWithProperDelayAndThenDeletedAftetProper 'producer' => ['service' => DummyRepeatProducer::class], 'worker' => [ 'service' => AlwaysFailingWorker::class, - 'release_delay' => 1, + 'error_retry_delay' => 1, 'max_retry' => 2 ], ] @@ -50,7 +50,7 @@ function (array $record) { $this->assertTrue( $this->logHandler()->hasInfoThatPasses( function (array $record) { - return $record['message'] === 'Worker released a Job' && $record['context']['release_delay'] === 1; + return $record['message'] === 'Worker released a Job' && $record['context']['error_retry_delay'] === 1; } ) );