Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions src/ProducerInstance.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Amp\Loop;
use Amp\Promise;
use Psr\Log\LoggerInterface;
use Ramsey\Uuid\Uuid;
use Webgriffe\Esb\Model\FlowConfig;
use Webgriffe\Esb\Model\Job;
use Webgriffe\Esb\Model\ProducedJobEvent;
Expand Down Expand Up @@ -164,6 +165,7 @@ function ($watcherId) {
public function produceAndQueueJobs($data = null): Promise
{
return call(function () use ($data) {
$batchId = Uuid::uuid1()->toString();
$jobsCount = 0;
$job = null;
try {
Expand All @@ -172,17 +174,26 @@ public function produceAndQueueJobs($data = null): Promise
/** @var Job $job */
$job = $jobs->getCurrent();
$job->addEvent(new ProducedJobEvent(new \DateTime(), \get_class($this->producer)));
$jobsCount += yield $this->queueManager->enqueue($job);
if ($this->queueManager instanceof QueueManager) {
$jobsCount += yield $this->queueManager->enqueue($job, $batchId);
} else {
$jobsCount += yield $this->queueManager->enqueue($job);
}
}

$jobsCount += yield $this->queueManager->flush();
if ($this->queueManager instanceof QueueManager) {
$jobsCount += yield $this->queueManager->flush($batchId);
} else {
$jobsCount += yield $this->queueManager->flush();
}
} catch (\Throwable $error) {
$this->logger->error(
'An error occurred producing/queueing jobs.',
[
'producer' => \get_class($this->producer),
'last_job_payload_data' => $job ? NonUtf8Cleaner::clean($job->getPayloadData()) : null,
'error' => $error->getMessage(),
'batch_id' => $batchId,
]
);
}
Expand Down
77 changes: 58 additions & 19 deletions src/Service/QueueManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

final class QueueManager implements ProducerQueueManagerInterface, WorkerQueueManagerInterface
{
private const DEFAULT_BATCH_ID = 'default';

/**
* @var BeanstalkClient
*/
Expand All @@ -38,9 +40,9 @@ final class QueueManager implements ProducerQueueManagerInterface, WorkerQueueMa
private $logger;

/**
* @var JobInterface[]
* @var array<string, array<string, JobInterface>>
*/
private $batch = [];
private $batches = [];

/**
* @TODO This map is static because it must be shared between each QueueManager instance: it could be refactored
Expand Down Expand Up @@ -142,9 +144,13 @@ public function boot(): Promise
/**
* @inheritdoc
*/
public function enqueue(JobInterface $job): Promise
public function enqueue(JobInterface $job, ?string $batchId = null): Promise
{
return call(function () use ($job) {
if ($batchId === null) {
$batchId = self::DEFAULT_BATCH_ID;
}

return call(function () use ($job, $batchId) {
$jobExists = yield $this->jobExists($job->getUuid());
if ($jobExists) {
throw new \RuntimeException(
Expand All @@ -154,27 +160,31 @@ public function enqueue(JobInterface $job): Promise
)
);
}
$this->batch[$job->getUuid()] = $job;
$this->addJobToBatch($batchId, $job);

$count = count($this->batch);
$count = count($this->getBatch($batchId));
if ($count < $this->batchSize) {
return 0; //Number of jobs actually added to the queue
}

yield from $this->processBatch();
yield from $this->processBatch($batchId);
return $count;
});
}

/**
* @inheritdoc
*/
public function flush(): Promise
public function flush(?string $batchId = null): Promise
{
return call(function () {
$jobsCount = count($this->batch);
if ($batchId === null) {
$batchId = self::DEFAULT_BATCH_ID;
}

return call(function () use ($batchId) {
$jobsCount = count($this->getBatch($batchId));
if ($jobsCount > 0) {
yield from $this->processBatch();
yield from $this->processBatch($batchId);
}
return $jobsCount;
});
Expand Down Expand Up @@ -273,33 +283,33 @@ private function jobExists(string $jobUuid): Promise
/**
* @return \Generator<Promise>
*/
private function processBatch(): \Generator
private function processBatch(string $batchId): \Generator
{
$this->logger->debug('Processing batch');
$result = yield $this->elasticSearch->bulkIndexJobs($this->batch, $this->flowConfig->getTube());
$result = yield $this->elasticSearch->bulkIndexJobs($this->getBatch($batchId), $this->flowConfig->getTube());

if ($result['errors'] === true) {
foreach ($result['items'] as $item) {
if (!array_key_exists('index', $item)) {
$this->logger->error(
'Unexpected response item in bulk index response',
['bulk_index_response_item' => $item]
['bulk_index_response_item' => $item, 'batch_id' => $batchId]
);
continue;
}
$itemStatusCode = $item['index']['status'] ?? null;
if (!$this->isSuccessfulStatusCode($itemStatusCode)) {
$uuid = $item['index']['_id'];
unset($this->batch[$uuid]);
$this->removeJobFromBatch($batchId, $uuid);
$this->logger->error(
'Job could not be indexed in ElasticSearch',
['bulk_index_response_item' => $item]
['bulk_index_response_item' => $item, 'batch_id' => $batchId]
);
}
}
}

foreach ($this->batch as $singleJob) {
foreach ($this->getBatch($batchId) as $singleJob) {
yield $this->beanstalkClient->put(
$singleJob->getUuid(),
$singleJob->getTimeout(),
Expand All @@ -311,12 +321,13 @@ private function processBatch(): \Generator
[
'flow_name' => $this->flowConfig->getName(),
'job_uuid' => $singleJob->getUuid(),
'payload_data' => NonUtf8Cleaner::clean($singleJob->getPayloadData())
'payload_data' => NonUtf8Cleaner::clean($singleJob->getPayloadData()),
'batch_id' => $batchId
]
);
}

$this->batch = [];
$this->clearBatch($batchId);
}

/**
Expand Down Expand Up @@ -346,4 +357,32 @@ public function isSuccessfulStatusCode(?int $statusCode): bool
{
return $statusCode !== null && $statusCode >= 200 && $statusCode < 300;
}

/**
* @param string $batchId
* @return array<string, JobInterface>
*/
private function getBatch(string $batchId): array
{
return $this->batches[$batchId] ?? [];
}

private function addJobToBatch(string $batchId, JobInterface $job): void
{
$this->batches[$batchId][$job->getUuid()] = $job;
}

private function removeJobFromBatch(string $batchId, string $jobUuid): void
{
unset($this->batches[$batchId][$jobUuid]);
}

private function clearBatch(string $batchId): void
{
if (!array_key_exists($batchId, $this->batches)) {
return;
}

$this->batches[$batchId] = [];
}
}
63 changes: 62 additions & 1 deletion tests/Integration/HttpRequestProducerAndWorkerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public function testHttpRequestProducerWithWrongUriShouldReturn404()
]
);
$httpPort = self::$kernel->getContainer()->getParameter('http_server_port');

Loop::delay(100, function () use ($httpPort) {
yield $this->waitForConnectionAvailable("tcp://127.0.0.1:{$httpPort}");
$payload = json_encode(['jobs' => ['job1', 'job2', 'job3']]);
Expand All @@ -191,6 +191,67 @@ public function testHttpRequestProducerWithWrongUriShouldReturn404()
$this->assertReadyJobsCountInTube(0, self::FLOW_CODE);
}

public function testConcurrentHttpRequestsForSameFlow()
{
$workerFile = vfsStream::url('root/worker.data');
self::createKernel(
[
'services' => [
DummyHttpRequestProducer::class => ['arguments' => []],
DummyFilesystemWorker::class => ['arguments' => [$workerFile]],
],
'flows' => [
self::FLOW_CODE => [
'description' => 'Http Request Producer And Worker Test Flow',
'producer' => ['service' => DummyHttpRequestProducer::class],
'worker' => ['service' => DummyFilesystemWorker::class],
]
]
]
);
$httpPort = self::$kernel->getContainer()->getParameter('http_server_port');

Loop::delay(100, function () use ($httpPort) {
yield $this->waitForConnectionAvailable("tcp://127.0.0.1:{$httpPort}");
$payload1 = json_encode(['jobs' => ['job1']]);
$client1 = HttpClientBuilder::buildDefault();
$request1 = new Request("http://127.0.0.1:{$httpPort}/dummy", 'POST');
$request1->setBody($payload1);
$payload2 = json_encode(['jobs' => ['job2']]);
$client2 = HttpClientBuilder::buildDefault();
$request2 = new Request("http://127.0.0.1:{$httpPort}/dummy", 'POST');
$request2->setBody($payload2);
$responses = yield [$client1->request($request1), $client2->request($request2)];
$this->assertStringContainsString(
'"Successfully scheduled 1 job(s) to be queued."',
yield $responses[0]->getBody()->read()
);
$this->assertStringContainsString(
'"Successfully scheduled 1 job(s) to be queued."',
yield $responses[1]->getBody()->read()
);
});
$this->stopWhen(function () use ($workerFile) {
return (yield exists($workerFile)) && count($this->getFileLines($workerFile)) === 2;
});

self::$kernel->boot();

$workerFileLines = $this->getFileLines($workerFile);
$this->assertCount(2, $workerFileLines);
$this->assertStringContainsString('job1', implode(PHP_EOL, $workerFileLines));
$this->assertStringContainsString('job2', implode(PHP_EOL, $workerFileLines));
$this->logHandler()->hasRecordThatMatches(
'/Successfully produced a new Job .*? "payload_data":["job1"]/',
Logger::INFO
);
$this->logHandler()->hasRecordThatMatches(
'/Successfully produced a new Job .*? "payload_data":["job2"]/',
Logger::INFO
);
$this->assertReadyJobsCountInTube(0, self::FLOW_CODE);
}

private function waitForConnectionAvailable(string $uri): Promise
{
return call(function () use ($uri) {
Expand Down
Loading