From e704da396ee6d294c4844f81d866ca7b7af6a740 Mon Sep 17 00:00:00 2001 From: "akihito.nakano" Date: Tue, 26 Sep 2017 21:32:34 +0900 Subject: [PATCH 1/7] Fix FlatFileDriver does not notice new messages --- src/Driver/FlatFileDriver.php | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/src/Driver/FlatFileDriver.php b/src/Driver/FlatFileDriver.php index a904bb9e..ebae8d20 100644 --- a/src/Driver/FlatFileDriver.php +++ b/src/Driver/FlatFileDriver.php @@ -94,10 +94,7 @@ public function popMessage($queueName, $duration = 5) $runtime = microtime(true) + $duration; $queueDir = $this->getQueueDirectory($queueName); - $it = new \GlobIterator($queueDir.DIRECTORY_SEPARATOR.'*.job', \FilesystemIterator::KEY_AS_FILENAME); - $files = array_keys(iterator_to_array($it)); - - natsort($files); + $files = $this->getJobFiles($queueName); while (microtime(true) < $runtime) { if ($files) { @@ -105,6 +102,9 @@ public function popMessage($queueName, $duration = 5) if (@rename($queueDir.DIRECTORY_SEPARATOR.$id, $queueDir.DIRECTORY_SEPARATOR.$id.'.proceed')) { return array(file_get_contents($queueDir.DIRECTORY_SEPARATOR.$id.'.proceed'), $id); } + } else { + // In order to notice that a new message received, update the list. + $files = $this->getJobFiles($queueName); } usleep(1000); @@ -225,4 +225,21 @@ private function getJobFilename($queueName) return $filename; } + + /** + * @param string $queueName + * + * @return string[] + */ + private function getJobFiles($queueName) + { + $it = new \GlobIterator( + $this->getQueueDirectory($queueName) . DIRECTORY_SEPARATOR . '*.job', + \FilesystemIterator::KEY_AS_FILENAME + ); + $files = array_keys(iterator_to_array($it)); + natsort($files); + + return $files; + } } From 777e16bf1b76030b52f0661bb487031aa1d47ab1 Mon Sep 17 00:00:00 2001 From: "akihito.nakano" Date: Mon, 6 Nov 2017 21:40:15 +0900 Subject: [PATCH 2/7] Add a test which reproduce #330 --- composer.json | 3 ++- tests/Driver/FlatFileDriverTest.php | 34 +++++++++++++++++++++++++++++ 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/composer.json b/composer.json index f6f9904b..d9ef52c8 100644 --- a/composer.json +++ b/composer.json @@ -27,7 +27,8 @@ "iron-io/iron_mq": "~4.0", "league/container": "~2.3", "queue-interop/queue-interop": "^0.6", - "queue-interop/amqp-interop": "^0.6" + "queue-interop/amqp-interop": "^0.6", + "ackintosh/snidel": "^0.10" }, "suggest": { "php-amqplib/php-amqplib": "Allow sending messages to an AMQP server using php-amqplib", diff --git a/tests/Driver/FlatFileDriverTest.php b/tests/Driver/FlatFileDriverTest.php index f1125334..6c8c5d1f 100644 --- a/tests/Driver/FlatFileDriverTest.php +++ b/tests/Driver/FlatFileDriverTest.php @@ -2,6 +2,7 @@ namespace Bernard\Tests\Driver; +use Ackintosh\Snidel; use Bernard\Driver\FlatFileDriver; /** @@ -92,6 +93,39 @@ public function testPopMessage() } } + public function testPopMessageWhichPushedAfterTheInitialCollect() + { + $this->driver->createQueue('send-newsletter'); + $snidel = new Snidel(); + + // Fork a process which pops message + $snidel->process( + function () { + list($message, ) = $this->driver->popMessage('send-newsletter', 10); + return $message; + }, + [], + 'popMessage' + ); + + // Fork another process which pushes message + $snidel->process( + function () { + // Push a message after the initial collect + sleep(5); + $this->driver->pushMessage('send-newsletter', 'test'); + }, + [], + 'pushMessage' + ); + + foreach ($snidel->results() as $result) { + if ($result->getTask()->getTag() === 'popMessage') { + $this->assertSame('test', $result->getReturn()); + } + } + } + public function testAcknowledgeMessage() { $this->driver->createQueue('send-newsletter'); From 6e7d758de3039c5102e15820d70705c4c5c956b2 Mon Sep 17 00:00:00 2001 From: "akihito.nakano" Date: Mon, 6 Nov 2017 22:43:12 +0900 Subject: [PATCH 3/7] Fix test fails with `--prefer-lowest` and hhvm --- composer.json | 2 +- tests/Driver/FlatFileDriverTest.php | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/composer.json b/composer.json index d9ef52c8..c453841f 100644 --- a/composer.json +++ b/composer.json @@ -28,7 +28,7 @@ "league/container": "~2.3", "queue-interop/queue-interop": "^0.6", "queue-interop/amqp-interop": "^0.6", - "ackintosh/snidel": "^0.10" + "ackintosh/snidel": "^0.10.2" }, "suggest": { "php-amqplib/php-amqplib": "Allow sending messages to an AMQP server using php-amqplib", diff --git a/tests/Driver/FlatFileDriverTest.php b/tests/Driver/FlatFileDriverTest.php index 6c8c5d1f..3c54c38c 100644 --- a/tests/Driver/FlatFileDriverTest.php +++ b/tests/Driver/FlatFileDriverTest.php @@ -95,6 +95,11 @@ public function testPopMessage() public function testPopMessageWhichPushedAfterTheInitialCollect() { + // Snidel is only supported on PHP. + if (defined('HHVM_VERSION')) { + $this->markTestSkipped(); + } + $this->driver->createQueue('send-newsletter'); $snidel = new Snidel(); From fde500e6b9b3185386e80d0734be0d7e5b71713f Mon Sep 17 00:00:00 2001 From: "akihito.nakano" Date: Tue, 7 Nov 2017 00:09:20 +0900 Subject: [PATCH 4/7] Tweak skip message --- tests/Driver/FlatFileDriverTest.php | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/Driver/FlatFileDriverTest.php b/tests/Driver/FlatFileDriverTest.php index 3c54c38c..10ed2019 100644 --- a/tests/Driver/FlatFileDriverTest.php +++ b/tests/Driver/FlatFileDriverTest.php @@ -95,9 +95,8 @@ public function testPopMessage() public function testPopMessageWhichPushedAfterTheInitialCollect() { - // Snidel is only supported on PHP. if (defined('HHVM_VERSION')) { - $this->markTestSkipped(); + $this->markTestSkipped('Snidel is only supported on PHP.'); } $this->driver->createQueue('send-newsletter'); From 6e6ad0f2fd1a80ddbed2c9515e6aa24354b078e5 Mon Sep 17 00:00:00 2001 From: "akihito.nakano" Date: Sun, 12 Nov 2017 12:10:24 +0900 Subject: [PATCH 5/7] Ignore .idea --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 5e3bf393..f021de7c 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ vendor phpunit.xml coverage _build +.idea From 7d7dd036d742a97664d0602d01437a9282fa5c59 Mon Sep 17 00:00:00 2001 From: "akihito.nakano" Date: Sun, 12 Nov 2017 12:37:58 +0900 Subject: [PATCH 6/7] Improve make the dependencies minimum --- composer.json | 3 +- tests/Driver/FlatFileDriverTest.php | 47 +++++++++-------------------- 2 files changed, 16 insertions(+), 34 deletions(-) diff --git a/composer.json b/composer.json index c453841f..f6f9904b 100644 --- a/composer.json +++ b/composer.json @@ -27,8 +27,7 @@ "iron-io/iron_mq": "~4.0", "league/container": "~2.3", "queue-interop/queue-interop": "^0.6", - "queue-interop/amqp-interop": "^0.6", - "ackintosh/snidel": "^0.10.2" + "queue-interop/amqp-interop": "^0.6" }, "suggest": { "php-amqplib/php-amqplib": "Allow sending messages to an AMQP server using php-amqplib", diff --git a/tests/Driver/FlatFileDriverTest.php b/tests/Driver/FlatFileDriverTest.php index 10ed2019..28bf4d49 100644 --- a/tests/Driver/FlatFileDriverTest.php +++ b/tests/Driver/FlatFileDriverTest.php @@ -2,7 +2,6 @@ namespace Bernard\Tests\Driver; -use Ackintosh\Snidel; use Bernard\Driver\FlatFileDriver; /** @@ -95,39 +94,23 @@ public function testPopMessage() public function testPopMessageWhichPushedAfterTheInitialCollect() { - if (defined('HHVM_VERSION')) { - $this->markTestSkipped('Snidel is only supported on PHP.'); - } - $this->driver->createQueue('send-newsletter'); - $snidel = new Snidel(); - - // Fork a process which pops message - $snidel->process( - function () { - list($message, ) = $this->driver->popMessage('send-newsletter', 10); - return $message; - }, - [], - 'popMessage' - ); - - // Fork another process which pushes message - $snidel->process( - function () { - // Push a message after the initial collect - sleep(5); - $this->driver->pushMessage('send-newsletter', 'test'); - }, - [], - 'pushMessage' - ); - - foreach ($snidel->results() as $result) { - if ($result->getTask()->getTag() === 'popMessage') { - $this->assertSame('test', $result->getReturn()); - } + + $pid = pcntl_fork(); + + if ($pid === -1) { + $this->fail('Failed to fork the currently running process: ' . pcntl_strerror(pcntl_get_last_error())); + } elseif ($pid === 0) { + // Child process pushes a message after the initial collect + sleep(5); + $this->driver->pushMessage('send-newsletter', 'test'); + exit; } + + list($message, ) = $this->driver->popMessage('send-newsletter', 10); + $this->assertSame('test', $message); + + pcntl_waitpid($pid, $status); } public function testAcknowledgeMessage() From 7c819f19899b6c3402d6cc02ea51f78c36a276d1 Mon Sep 17 00:00:00 2001 From: ackintosh Date: Mon, 13 Nov 2017 13:31:23 +0900 Subject: [PATCH 7/7] Remove ignores which is not project specific --- .gitignore | 1 - 1 file changed, 1 deletion(-) diff --git a/.gitignore b/.gitignore index f021de7c..5e3bf393 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,3 @@ vendor phpunit.xml coverage _build -.idea