diff --git a/src/Driver/FlatFile/Driver.php b/src/Driver/FlatFile/Driver.php index c1df5cfc..afd91e0c 100644 --- a/src/Driver/FlatFile/Driver.php +++ b/src/Driver/FlatFile/Driver.php @@ -94,10 +94,7 @@ public function popMessage($queueName, $duration = 5) $runtime = microtime(true) + $duration; $queueDir = $this->getQueueDirectory($queueName); - $it = new \GlobIterator($queueDir.DIRECTORY_SEPARATOR.'*.job', \FilesystemIterator::KEY_AS_FILENAME); - $files = array_keys(iterator_to_array($it)); - - natsort($files); + $files = $this->getJobFiles($queueName); while (microtime(true) < $runtime) { if ($files) { @@ -107,6 +104,9 @@ public function popMessage($queueName, $duration = 5) } return $this->processFileOrFail($queueDir, $id); + } else { + // In order to notice that a new message received, update the list. + $files = $this->getJobFiles($queueName); } usleep(1000); @@ -244,4 +244,21 @@ private function getJobFilename($queueName) return $filename; } + + /** + * @param string $queueName + * + * @return string[] + */ + private function getJobFiles($queueName) + { + $it = new \GlobIterator( + $this->getQueueDirectory($queueName) . DIRECTORY_SEPARATOR . '*.job', + \FilesystemIterator::KEY_AS_FILENAME + ); + $files = array_keys(iterator_to_array($it)); + natsort($files); + + return $files; + } } diff --git a/tests/Driver/FlatFile/DriverTest.php b/tests/Driver/FlatFile/DriverTest.php index 7944d2ed..86b14d49 100644 --- a/tests/Driver/FlatFile/DriverTest.php +++ b/tests/Driver/FlatFile/DriverTest.php @@ -92,6 +92,27 @@ public function testPopMessage() } } + public function testPopMessageWhichPushedAfterTheInitialCollect() + { + $this->driver->createQueue('send-newsletter'); + + $pid = pcntl_fork(); + + if ($pid === -1) { + $this->fail('Failed to fork the currently running process: ' . pcntl_strerror(pcntl_get_last_error())); + } elseif ($pid === 0) { + // Child process pushes a message after the initial collect + sleep(5); + $this->driver->pushMessage('send-newsletter', 'test'); + exit; + } + + list($message, ) = $this->driver->popMessage('send-newsletter', 10); + $this->assertSame('test', $message); + + pcntl_waitpid($pid, $status); + } + public function testAcknowledgeMessage() { $this->driver->createQueue('send-newsletter');