diff --git a/composer.json b/composer.json index 362a515..6fb7dfa 100644 --- a/composer.json +++ b/composer.json @@ -21,7 +21,8 @@ ], "require": { "composer/installers": "~1.0", - "php-amqplib/php-amqplib": "2.8.0" + "php-amqplib/php-amqplib": "2.8.0", + "aws/aws-sdk-php": "^3.63" }, "require-dev": { "phpunit/phpunit": "~3.7", diff --git a/composer.lock b/composer.lock index d310b80..15dd05b 100644 --- a/composer.lock +++ b/composer.lock @@ -4,20 +4,103 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "579b825ca4a8572eec781af18d9ab14c", + "content-hash": "9b1232c657fcb82858c213a9ef437b29", "packages": [ + { + "name": "aws/aws-sdk-php", + "version": "3.112.7", + "source": { + "type": "git", + "url": "https://github.com/aws/aws-sdk-php.git", + "reference": "5167704e39f4e139152906b3d962aa15692bff07" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/aws/aws-sdk-php/zipball/5167704e39f4e139152906b3d962aa15692bff07", + "reference": "5167704e39f4e139152906b3d962aa15692bff07", + "shasum": "" + }, + "require": { + "ext-json": "*", + "ext-pcre": "*", + "ext-simplexml": "*", + "guzzlehttp/guzzle": "^5.3.3|^6.2.1", + "guzzlehttp/promises": "~1.0", + "guzzlehttp/psr7": "^1.4.1", + "mtdowling/jmespath.php": "~2.2", + "php": ">=5.5" + }, + "require-dev": { + "andrewsville/php-token-reflection": "^1.4", + "aws/aws-php-sns-message-validator": "~1.0", + "behat/behat": "~3.0", + "doctrine/cache": "~1.4", + "ext-dom": "*", + "ext-openssl": "*", + "ext-pcntl": "*", + "ext-sockets": "*", + "nette/neon": "^2.3", + "phpunit/phpunit": "^4.8.35|^5.4.3", + "psr/cache": "^1.0", + "psr/simple-cache": "^1.0" + }, + "suggest": { + "aws/aws-php-sns-message-validator": "To validate incoming SNS notifications", + "doctrine/cache": "To use the DoctrineCacheAdapter", + "ext-curl": "To send requests using cURL", + "ext-openssl": "Allows working with CloudFront private distributions and verifying received SNS messages", + "ext-sockets": "To use client-side monitoring" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "3.0-dev" + } + }, + "autoload": { + "psr-4": { + "Aws\\": "src/" + }, + "files": [ + "src/functions.php" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "Apache-2.0" + ], + "authors": [ + { + "name": "Amazon Web Services", + "homepage": "http://aws.amazon.com" + } + ], + "description": "AWS SDK for PHP - Use Amazon Web Services in your PHP project", + "homepage": "http://aws.amazon.com/sdkforphp", + "keywords": [ + "amazon", + "aws", + "cloud", + "dynamodb", + "ec2", + "glacier", + "s3", + "sdk" + ], + "time": "2019-09-24T18:13:32+00:00" + }, { "name": "composer/installers", - "version": "v1.6.0", + "version": "v1.7.0", "source": { "type": "git", "url": "https://github.com/composer/installers.git", - "reference": "cfcca6b1b60bc4974324efb5783c13dca6932b5b" + "reference": "141b272484481432cda342727a427dc1e206bfa0" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/composer/installers/zipball/cfcca6b1b60bc4974324efb5783c13dca6932b5b", - "reference": "cfcca6b1b60bc4974324efb5783c13dca6932b5b", + "url": "https://api.github.com/repos/composer/installers/zipball/141b272484481432cda342727a427dc1e206bfa0", + "reference": "141b272484481432cda342727a427dc1e206bfa0", "shasum": "" }, "require": { @@ -73,6 +156,7 @@ "RadPHP", "SMF", "Thelia", + "Whmcs", "WolfCMS", "agl", "aimeos", @@ -95,6 +179,7 @@ "installer", "itop", "joomla", + "known", "kohana", "laravel", "lavalite", @@ -124,7 +209,249 @@ "zend", "zikula" ], - "time": "2018-08-27T06:10:37+00:00" + "time": "2019-08-12T15:00:31+00:00" + }, + { + "name": "guzzlehttp/guzzle", + "version": "6.3.3", + "source": { + "type": "git", + "url": "https://github.com/guzzle/guzzle.git", + "reference": "407b0cb880ace85c9b63c5f9551db498cb2d50ba" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/guzzle/guzzle/zipball/407b0cb880ace85c9b63c5f9551db498cb2d50ba", + "reference": "407b0cb880ace85c9b63c5f9551db498cb2d50ba", + "shasum": "" + }, + "require": { + "guzzlehttp/promises": "^1.0", + "guzzlehttp/psr7": "^1.4", + "php": ">=5.5" + }, + "require-dev": { + "ext-curl": "*", + "phpunit/phpunit": "^4.8.35 || ^5.7 || ^6.4 || ^7.0", + "psr/log": "^1.0" + }, + "suggest": { + "psr/log": "Required for using the Log middleware" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "6.3-dev" + } + }, + "autoload": { + "files": [ + "src/functions_include.php" + ], + "psr-4": { + "GuzzleHttp\\": "src/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Michael Dowling", + "email": "mtdowling@gmail.com", + "homepage": "https://github.com/mtdowling" + } + ], + "description": "Guzzle is a PHP HTTP client library", + "homepage": "http://guzzlephp.org/", + "keywords": [ + "client", + "curl", + "framework", + "http", + "http client", + "rest", + "web service" + ], + "time": "2018-04-22T15:46:56+00:00" + }, + { + "name": "guzzlehttp/promises", + "version": "v1.3.1", + "source": { + "type": "git", + "url": "https://github.com/guzzle/promises.git", + "reference": "a59da6cf61d80060647ff4d3eb2c03a2bc694646" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/guzzle/promises/zipball/a59da6cf61d80060647ff4d3eb2c03a2bc694646", + "reference": "a59da6cf61d80060647ff4d3eb2c03a2bc694646", + "shasum": "" + }, + "require": { + "php": ">=5.5.0" + }, + "require-dev": { + "phpunit/phpunit": "^4.0" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "1.4-dev" + } + }, + "autoload": { + "psr-4": { + "GuzzleHttp\\Promise\\": "src/" + }, + "files": [ + "src/functions_include.php" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Michael Dowling", + "email": "mtdowling@gmail.com", + "homepage": "https://github.com/mtdowling" + } + ], + "description": "Guzzle promises library", + "keywords": [ + "promise" + ], + "time": "2016-12-20T10:07:11+00:00" + }, + { + "name": "guzzlehttp/psr7", + "version": "1.6.1", + "source": { + "type": "git", + "url": "https://github.com/guzzle/psr7.git", + "reference": "239400de7a173fe9901b9ac7c06497751f00727a" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/guzzle/psr7/zipball/239400de7a173fe9901b9ac7c06497751f00727a", + "reference": "239400de7a173fe9901b9ac7c06497751f00727a", + "shasum": "" + }, + "require": { + "php": ">=5.4.0", + "psr/http-message": "~1.0", + "ralouphie/getallheaders": "^2.0.5 || ^3.0.0" + }, + "provide": { + "psr/http-message-implementation": "1.0" + }, + "require-dev": { + "ext-zlib": "*", + "phpunit/phpunit": "~4.8.36 || ^5.7.27 || ^6.5.8" + }, + "suggest": { + "zendframework/zend-httphandlerrunner": "Emit PSR-7 responses" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "1.6-dev" + } + }, + "autoload": { + "psr-4": { + "GuzzleHttp\\Psr7\\": "src/" + }, + "files": [ + "src/functions_include.php" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Michael Dowling", + "email": "mtdowling@gmail.com", + "homepage": "https://github.com/mtdowling" + }, + { + "name": "Tobias Schultze", + "homepage": "https://github.com/Tobion" + } + ], + "description": "PSR-7 message implementation that also provides common utility methods", + "keywords": [ + "http", + "message", + "psr-7", + "request", + "response", + "stream", + "uri", + "url" + ], + "time": "2019-07-01T23:21:34+00:00" + }, + { + "name": "mtdowling/jmespath.php", + "version": "2.4.0", + "source": { + "type": "git", + "url": "https://github.com/jmespath/jmespath.php.git", + "reference": "adcc9531682cf87dfda21e1fd5d0e7a41d292fac" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/jmespath/jmespath.php/zipball/adcc9531682cf87dfda21e1fd5d0e7a41d292fac", + "reference": "adcc9531682cf87dfda21e1fd5d0e7a41d292fac", + "shasum": "" + }, + "require": { + "php": ">=5.4.0" + }, + "require-dev": { + "phpunit/phpunit": "~4.0" + }, + "bin": [ + "bin/jp.php" + ], + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "2.0-dev" + } + }, + "autoload": { + "psr-4": { + "JmesPath\\": "src/" + }, + "files": [ + "src/JmesPath.php" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Michael Dowling", + "email": "mtdowling@gmail.com", + "homepage": "https://github.com/mtdowling" + } + ], + "description": "Declaratively specify how to extract elements from a JSON document", + "keywords": [ + "json", + "jsonpath" + ], + "time": "2016-12-03T22:08:25+00:00" }, { "name": "php-amqplib/php-amqplib", @@ -195,6 +522,96 @@ "rabbitmq" ], "time": "2018-10-23T18:48:24+00:00" + }, + { + "name": "psr/http-message", + "version": "1.0.1", + "source": { + "type": "git", + "url": "https://github.com/php-fig/http-message.git", + "reference": "f6561bf28d520154e4b0ec72be95418abe6d9363" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/php-fig/http-message/zipball/f6561bf28d520154e4b0ec72be95418abe6d9363", + "reference": "f6561bf28d520154e4b0ec72be95418abe6d9363", + "shasum": "" + }, + "require": { + "php": ">=5.3.0" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "1.0.x-dev" + } + }, + "autoload": { + "psr-4": { + "Psr\\Http\\Message\\": "src/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "PHP-FIG", + "homepage": "http://www.php-fig.org/" + } + ], + "description": "Common interface for HTTP messages", + "homepage": "https://github.com/php-fig/http-message", + "keywords": [ + "http", + "http-message", + "psr", + "psr-7", + "request", + "response" + ], + "time": "2016-08-06T14:39:51+00:00" + }, + { + "name": "ralouphie/getallheaders", + "version": "3.0.3", + "source": { + "type": "git", + "url": "https://github.com/ralouphie/getallheaders.git", + "reference": "120b605dfeb996808c31b6477290a714d356e822" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/ralouphie/getallheaders/zipball/120b605dfeb996808c31b6477290a714d356e822", + "reference": "120b605dfeb996808c31b6477290a714d356e822", + "shasum": "" + }, + "require": { + "php": ">=5.6" + }, + "require-dev": { + "php-coveralls/php-coveralls": "^2.1", + "phpunit/phpunit": "^5 || ^6.5" + }, + "type": "library", + "autoload": { + "files": [ + "src/getallheaders.php" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Ralph Khattar", + "email": "ralph.khattar@gmail.com" + } + ], + "description": "A polyfill for getallheaders.", + "time": "2019-03-08T08:55:37+00:00" } ], "packages-dev": [ @@ -245,16 +662,16 @@ }, { "name": "mockery/mockery", - "version": "0.9.9", + "version": "0.9.11", "source": { "type": "git", "url": "https://github.com/mockery/mockery.git", - "reference": "6fdb61243844dc924071d3404bb23994ea0b6856" + "reference": "be9bf28d8e57d67883cba9fcadfcff8caab667f8" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/mockery/mockery/zipball/6fdb61243844dc924071d3404bb23994ea0b6856", - "reference": "6fdb61243844dc924071d3404bb23994ea0b6856", + "url": "https://api.github.com/repos/mockery/mockery/zipball/be9bf28d8e57d67883cba9fcadfcff8caab667f8", + "reference": "be9bf28d8e57d67883cba9fcadfcff8caab667f8", "shasum": "" }, "require": { @@ -306,7 +723,7 @@ "test double", "testing" ], - "time": "2017-02-28T12:52:32+00:00" + "time": "2019-02-12T16:07:13+00:00" }, { "name": "phpunit/php-code-coverage", @@ -680,16 +1097,16 @@ }, { "name": "symfony/polyfill-ctype", - "version": "v1.10.0", + "version": "v1.12.0", "source": { "type": "git", "url": "https://github.com/symfony/polyfill-ctype.git", - "reference": "e3d826245268269cd66f8326bd8bc066687b4a19" + "reference": "550ebaac289296ce228a706d0867afc34687e3f4" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/polyfill-ctype/zipball/e3d826245268269cd66f8326bd8bc066687b4a19", - "reference": "e3d826245268269cd66f8326bd8bc066687b4a19", + "url": "https://api.github.com/repos/symfony/polyfill-ctype/zipball/550ebaac289296ce228a706d0867afc34687e3f4", + "reference": "550ebaac289296ce228a706d0867afc34687e3f4", "shasum": "" }, "require": { @@ -701,7 +1118,7 @@ "type": "library", "extra": { "branch-alias": { - "dev-master": "1.9-dev" + "dev-master": "1.12-dev" } }, "autoload": { @@ -717,13 +1134,13 @@ "MIT" ], "authors": [ - { - "name": "Symfony Community", - "homepage": "https://symfony.com/contributors" - }, { "name": "Gert de Pagter", "email": "BackEndTea@gmail.com" + }, + { + "name": "Symfony Community", + "homepage": "https://symfony.com/contributors" } ], "description": "Symfony polyfill for ctype functions", @@ -734,20 +1151,20 @@ "polyfill", "portable" ], - "time": "2018-08-06T14:22:27+00:00" + "time": "2019-08-06T08:03:45+00:00" }, { "name": "symfony/yaml", - "version": "v2.8.47", + "version": "v2.8.50", "source": { "type": "git", "url": "https://github.com/symfony/yaml.git", - "reference": "0e16589861f192dbffb19b06683ce3ef58f7f99d" + "reference": "02c1859112aa779d9ab394ae4f3381911d84052b" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/yaml/zipball/0e16589861f192dbffb19b06683ce3ef58f7f99d", - "reference": "0e16589861f192dbffb19b06683ce3ef58f7f99d", + "url": "https://api.github.com/repos/symfony/yaml/zipball/02c1859112aa779d9ab394ae4f3381911d84052b", + "reference": "02c1859112aa779d9ab394ae4f3381911d84052b", "shasum": "" }, "require": { @@ -784,7 +1201,7 @@ ], "description": "Symfony Yaml Component", "homepage": "https://symfony.com", - "time": "2018-10-02T16:27:16+00:00" + "time": "2018-11-11T11:18:13+00:00" } ], "aliases": [], diff --git a/includes/WpMinions/Plugin.php b/includes/WpMinions/Plugin.php index f0c25bd..ad79eb7 100644 --- a/includes/WpMinions/Plugin.php +++ b/includes/WpMinions/Plugin.php @@ -187,6 +187,8 @@ function build_client() { return new \WpMinions\Gearman\Client(); } elseif ( 'rabbitmq' === strtolower( $backend ) ) { return new \WpMinions\RabbitMQ\Client(); + } elseif ( 'sqs' === strtolower( $backend ) ) { + return new \WpMinions\SQS\Client(); } else { return new \WpMinions\Cron\Client(); } @@ -217,6 +219,8 @@ function build_worker() { return new \WpMinions\Gearman\Worker(); } elseif ( 'rabbitmq' === strtolower( $backend ) ) { return new \WpMinions\RabbitMQ\Worker(); + } elseif ( 'sqs' === strtolower( $backend ) ) { + return new \WpMinions\SQS\Worker(); } else { return new \WpMinions\Cron\Worker(); } diff --git a/includes/WpMinions/SQS/Client.php b/includes/WpMinions/SQS/Client.php new file mode 100644 index 0000000..5a3ad6b --- /dev/null +++ b/includes/WpMinions/SQS/Client.php @@ -0,0 +1,99 @@ +connect() ) { + return false; + } + $group_id = null; + $message_id = null; + if ( isset( $args['sqs_extra'] ) && ! empty( $args['sqs_extra'] ) ) { + if ( isset( $args['sqs_extra']['group_id'] ) ) { + $group_id = $args['sqs_extra']['group_id']; + } + if ( isset( $args['sqs_extra']['id'] ) ) { + $message_id = $args['sqs_extra']['id']; + } + unset( $args['sqs_extra'] ); + } + $job_data = array( + 'hook' => $hook, + 'args' => $args, + 'blog_id' => get_current_blog_id(), + ); + try { + $job_args = array( + 'QueueUrl' => $this->connection->get_channel(), + 'MessageBody' => json_encode( $job_data ), + ); + if ( ! empty( $group_id ) ) { + $job_args['MessageGroupId'] = $group_id; + } + if ( ! empty( $message_id ) ) { + $job_args['MessageDeduplicationId'] = $message_id; + } + $this->connection->get_connection()->sendMessage( $job_args ); + + return true; + } catch ( \Exception $e ) { + error_log( + 'SQSClient->add failed: ' . $e->getMessage() + ); + + return false; + } + } + + /** + * Connect to host and channel. + */ + private function connect() { + if ( null !== $this->connection ) { + return $this->connection; + } + + try { + $this->connection = new Connection(); + } catch ( \Exception $e ) { + error_log( + 'SQSClient->connect failed: ' . $e->getMessage() + ); + + return false; + } + + return $this->connection; + } +} diff --git a/includes/WpMinions/SQS/Connection.php b/includes/WpMinions/SQS/Connection.php new file mode 100644 index 0000000..f8f17d8 --- /dev/null +++ b/includes/WpMinions/SQS/Connection.php @@ -0,0 +1,61 @@ +queue_name = defined( 'WP_ASYNC_SQS_QUEUE' ) ? WP_ASYNC_SQS_QUEUE : 'wordpress'; + $this->aws_credentials = $aws_credentials; + $this->connection = new \Aws\Sqs\SqsClient( $this->aws_credentials ); + + } else { + throw new \Exception( 'Could not create connection.' ); + } + } + + /** + * Return queue url. + * + * @return mixed|null Queue url. + */ + public function get_channel() { + return $this->connection->getQueueUrl( array( 'QueueName' => $this->queue_name ) )->get( 'QueueUrl' ); + } + + /** + * Return connection client. + * + * @return \Aws\Sqs\SqsClient Client. + */ + public function get_connection() { + return $this->connection; + } + +} diff --git a/includes/WpMinions/SQS/Worker.php b/includes/WpMinions/SQS/Worker.php new file mode 100644 index 0000000..9748374 --- /dev/null +++ b/includes/WpMinions/SQS/Worker.php @@ -0,0 +1,148 @@ +connect() ) { + return false; + } + $switched = false; + + $message = $this->get_message(); + if ( empty( $message ) ) { + //Wait 3s to avoid too many requests to AWS. + sleep( 3 ); + + return true; + } + + try { + $job_data = json_decode( $message['Body'], true ); + $hook = $job_data['hook']; + $args = $job_data['args']; + + if ( function_exists( 'is_multisite' ) && is_multisite() && $job_data['blog_id'] ) { + $blog_id = $job_data['blog_id']; + + if ( get_current_blog_id() !== $blog_id ) { + switch_to_blog( $blog_id ); + $switched = true; + } else { + $switched = false; + } + } else { + $switched = false; + } + + do_action( 'wp_async_task_before_job', $hook, $message ); + do_action( 'wp_async_task_before_job_' . $hook, $message ); + + do_action( $hook, $args, $message ); + + do_action( 'wp_async_task_after_job', $hook, $message ); + do_action( 'wp_async_task_after_job_' . $hook, $message ); + + //Delete message from the queue as we have procesessed. + $this->connection->get_connection()->deleteMessage( array( + 'QueueUrl' => $this->connection->get_channel(), + 'ReceiptHandle' => $message['ReceiptHandle'] + ) ); + $result = true; + } catch ( \Exception $e ) { + error_log( + 'SQSWorker->do_job failed: ' . $e->getMessage() + ); + //Make the message available back into the queue right away. + $this->connection->get_connection()->changeMessageVisibility( array( + 'QueueUrl' => $this->connection->get_channel(), + 'ReceiptHandle' => $message['ReceiptHandle'], + 'VisibilityTimeout' => 0 + ) ); + + $result = true; + } + + if ( $switched ) { + restore_current_blog(); + } + //Wait 3s to avoid too many requests to AWS. + sleep( 3 ); + + return $result; + } + + /** + * Connect to Aws SQS queue. + */ + private function connect() { + if ( null !== $this->connection ) { + return $this->connection; + } + + try { + $this->connection = new Connection(); + } catch ( \Exception $e ) { + return false; + } + + return $this->connection; + } + + /** + * Get next message from the queue. + * + * @return \Aws\Result|bool|mixed + */ + private function get_message() { + if ( empty( $this->connection ) ) { + return false; + } + try { + // Receive a message from the queue + $message = $this->connection->get_connection()->receiveMessage( array( + 'QueueUrl' => $this->connection->get_channel() + ) ); + + if ( $message['Messages'] == null ) { + // No message to process + return false; + } + // Get the message and return it + $message = array_pop( $message['Messages'] ); + + return $message; + } catch ( \Exception $e ) { + error_log( + 'SQSWorker->get_message failed: ' . $e->getMessage() + ); + + return false; + } + } +} diff --git a/system-tests/sqs/config.php b/system-tests/sqs/config.php new file mode 100644 index 0000000..d2c15e6 --- /dev/null +++ b/system-tests/sqs/config.php @@ -0,0 +1,11 @@ + 'us-east-1', + 'version' => 'latest', + 'credentials' => array( + 'key' => '', //Testing AWS key. + 'secret' => ' ' //Testing AWS secret. + ) +); diff --git a/system-tests/sqs/test-client.php b/system-tests/sqs/test-client.php new file mode 100644 index 0000000..18a5fe8 --- /dev/null +++ b/system-tests/sqs/test-client.php @@ -0,0 +1,14 @@ +getQueueUrl( array( 'QueueName' => 'wordpress' ) )->get( 'QueueUrl' ); + +$connection->sendMessage( array( + 'QueueUrl' => $queue_url, + 'MessageBody' => 'Hello World!' +) ); \ No newline at end of file diff --git a/system-tests/sqs/test-worker.php b/system-tests/sqs/test-worker.php new file mode 100644 index 0000000..fce2af9 --- /dev/null +++ b/system-tests/sqs/test-worker.php @@ -0,0 +1,43 @@ +getQueueUrl( array( 'QueueName' => 'wordpress' ) )->get( 'QueueUrl' ); + +echo ' [*] Waiting for messages. To exit press CTRL+C', "\n"; + +$callback = function ( $msg ) { + echo " [x] Received ", $msg, "\n"; +}; + +while ( true ) { + $message = $connection->receiveMessage( array( + 'QueueUrl' => $queue_url + ) ); + if ( ! empty( $message ) && ! empty( $message['Messages'] ) ) { + + $message = array_pop( $message['Messages'] ); + + try { + + call_user_func( $callback, $message['Body'] ); + $connection->deleteMessage( array( + 'QueueUrl' => $queue_url, + 'ReceiptHandle' => $message['ReceiptHandle'] + ) ); + } catch ( Exception $e ) { + $connection->changeMessageVisibility( array( + 'QueueUrl' => $queue_url, + 'ReceiptHandle' => $message['ReceiptHandle'], + 'VisibilityTimeout' => 0 + ) ); + echo $e->getMessage(); + } + + } + + sleep( 3 ); +}