diff --git a/Dockerfile b/Dockerfile index f096b4c..741f3d3 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM composer:2.0 as composer +FROM composer as composer WORKDIR /usr/local/src/ @@ -7,7 +7,7 @@ COPY composer.json /usr/local/src/ RUN composer install --ignore-platform-reqs -FROM phpswoole/swoole:php8.1-alpine +FROM phpswoole/swoole:php8.3-alpine WORKDIR /usr/local/src/ diff --git a/composer.json b/composer.json index 31e7c97..2bab812 100644 --- a/composer.json +++ b/composer.json @@ -24,12 +24,15 @@ "lint": "vendor/bin/pint --test" }, "require": { - "php": ">=8.1", + "php": ">=8.3", + "php-amqplib/php-amqplib": "^3.7", "utopia-php/cli": "0.15.*", - "utopia-php/framework": "0.*.*", - "utopia-php/telemetry": "0.1.*" + "utopia-php/framework": "0.33.*", + "utopia-php/telemetry": "0.1.*", + "utopia-php/fetch": "^0.3.0" }, "require-dev": { + "ext-redis": "*", "swoole/ide-helper": "4.8.8", "phpunit/phpunit": "^9.5.5", "laravel/pint": "^0.2.3", diff --git a/composer.lock b/composer.lock index 92c67d8..579c374 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "74b58f35a2196824cfc8ab396d78db83", + "content-hash": "6e409e8e218225d898b5bcee19fecf83", "packages": [ { "name": "brick/math", @@ -149,16 +149,16 @@ }, { "name": "google/protobuf", - "version": "v4.28.3", + "version": "v4.29.3", "source": { "type": "git", "url": "https://github.com/protocolbuffers/protobuf-php.git", - "reference": "c5c311e0f3d89928251ac5a2f0e3db283612c100" + "reference": "ab5077c2cfdd1f415f42d11fdbdf903ba8e3d9b7" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/protocolbuffers/protobuf-php/zipball/c5c311e0f3d89928251ac5a2f0e3db283612c100", - "reference": "c5c311e0f3d89928251ac5a2f0e3db283612c100", + "url": "https://api.github.com/repos/protocolbuffers/protobuf-php/zipball/ab5077c2cfdd1f415f42d11fdbdf903ba8e3d9b7", + "reference": "ab5077c2cfdd1f415f42d11fdbdf903ba8e3d9b7", "shasum": "" }, "require": { @@ -187,9 +187,9 @@ "proto" ], "support": { - "source": "https://github.com/protocolbuffers/protobuf-php/tree/v4.28.3" + "source": "https://github.com/protocolbuffers/protobuf-php/tree/v4.29.3" }, - "time": "2024-10-22T22:27:17+00:00" + "time": "2025-01-08T21:00:13+00:00" }, { "name": "nyholm/psr7", @@ -337,16 +337,16 @@ }, { "name": "open-telemetry/api", - "version": "1.1.1", + "version": "1.2.1", "source": { "type": "git", "url": "https://github.com/opentelemetry-php/api.git", - "reference": "542064815d38a6df55af7957cd6f1d7d967c99c6" + "reference": "74b1a03263be8c5acb578f41da054b4bac3af4a0" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/opentelemetry-php/api/zipball/542064815d38a6df55af7957cd6f1d7d967c99c6", - "reference": "542064815d38a6df55af7957cd6f1d7d967c99c6", + "url": "https://api.github.com/repos/opentelemetry-php/api/zipball/74b1a03263be8c5acb578f41da054b4bac3af4a0", + "reference": "74b1a03263be8c5acb578f41da054b4bac3af4a0", "shasum": "" }, "require": { @@ -360,13 +360,13 @@ }, "type": "library", "extra": { - "branch-alias": { - "dev-main": "1.1.x-dev" - }, "spi": { "OpenTelemetry\\API\\Instrumentation\\AutoInstrumentation\\HookManagerInterface": [ "OpenTelemetry\\API\\Instrumentation\\AutoInstrumentation\\ExtensionHookManager" ] + }, + "branch-alias": { + "dev-main": "1.1.x-dev" } }, "autoload": { @@ -403,7 +403,7 @@ "issues": "https://github.com/open-telemetry/opentelemetry-php/issues", "source": "https://github.com/open-telemetry/opentelemetry-php" }, - "time": "2024-10-15T22:42:37+00:00" + "time": "2025-01-20T23:35:16+00:00" }, { "name": "open-telemetry/context", @@ -466,16 +466,16 @@ }, { "name": "open-telemetry/exporter-otlp", - "version": "1.1.0", + "version": "1.2.0", "source": { "type": "git", "url": "https://github.com/opentelemetry-php/exporter-otlp.git", - "reference": "9b6de12204f25f8ab9540b46d6e7b5151897ce18" + "reference": "243d9657c44a06f740cf384f486afe954c2b725f" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/opentelemetry-php/exporter-otlp/zipball/9b6de12204f25f8ab9540b46d6e7b5151897ce18", - "reference": "9b6de12204f25f8ab9540b46d6e7b5151897ce18", + "url": "https://api.github.com/repos/opentelemetry-php/exporter-otlp/zipball/243d9657c44a06f740cf384f486afe954c2b725f", + "reference": "243d9657c44a06f740cf384f486afe954c2b725f", "shasum": "" }, "require": { @@ -526,20 +526,20 @@ "issues": "https://github.com/open-telemetry/opentelemetry-php/issues", "source": "https://github.com/open-telemetry/opentelemetry-php" }, - "time": "2024-04-30T18:28:30+00:00" + "time": "2025-01-08T23:50:03+00:00" }, { "name": "open-telemetry/gen-otlp-protobuf", - "version": "1.2.1", + "version": "1.5.0", "source": { "type": "git", "url": "https://github.com/opentelemetry-php/gen-otlp-protobuf.git", - "reference": "66c3b98e998a726691c92e6405a82e6e7b8b169d" + "reference": "585bafddd4ae6565de154610b10a787a455c9ba0" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/opentelemetry-php/gen-otlp-protobuf/zipball/66c3b98e998a726691c92e6405a82e6e7b8b169d", - "reference": "66c3b98e998a726691c92e6405a82e6e7b8b169d", + "url": "https://api.github.com/repos/opentelemetry-php/gen-otlp-protobuf/zipball/585bafddd4ae6565de154610b10a787a455c9ba0", + "reference": "585bafddd4ae6565de154610b10a787a455c9ba0", "shasum": "" }, "require": { @@ -589,20 +589,20 @@ "issues": "https://github.com/open-telemetry/opentelemetry-php/issues", "source": "https://github.com/open-telemetry/opentelemetry-php" }, - "time": "2024-10-30T11:49:49+00:00" + "time": "2025-01-15T23:07:07+00:00" }, { "name": "open-telemetry/sdk", - "version": "1.1.2", + "version": "1.2.1", "source": { "type": "git", "url": "https://github.com/opentelemetry-php/sdk.git", - "reference": "fb0ff8d8279a3776bd604791e2531dd0cc147e8b" + "reference": "96aeaee5b7cb8c0bc4af7ff4717b429f2d9f67e1" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/opentelemetry-php/sdk/zipball/fb0ff8d8279a3776bd604791e2531dd0cc147e8b", - "reference": "fb0ff8d8279a3776bd604791e2531dd0cc147e8b", + "url": "https://api.github.com/repos/opentelemetry-php/sdk/zipball/96aeaee5b7cb8c0bc4af7ff4717b429f2d9f67e1", + "reference": "96aeaee5b7cb8c0bc4af7ff4717b429f2d9f67e1", "shasum": "" }, "require": { @@ -630,13 +630,13 @@ }, "type": "library", "extra": { - "branch-alias": { - "dev-main": "1.0.x-dev" - }, "spi": { "OpenTelemetry\\API\\Instrumentation\\AutoInstrumentation\\HookManagerInterface": [ "OpenTelemetry\\API\\Instrumentation\\AutoInstrumentation\\ExtensionHookManager" ] + }, + "branch-alias": { + "dev-main": "1.0.x-dev" } }, "autoload": { @@ -679,7 +679,7 @@ "issues": "https://github.com/open-telemetry/opentelemetry-php/issues", "source": "https://github.com/open-telemetry/opentelemetry-php" }, - "time": "2024-10-18T21:01:35+00:00" + "time": "2025-01-09T23:17:14+00:00" }, { "name": "open-telemetry/sem-conv", @@ -738,6 +738,204 @@ }, "time": "2024-08-28T09:20:31+00:00" }, + { + "name": "paragonie/constant_time_encoding", + "version": "v3.0.0", + "source": { + "type": "git", + "url": "https://github.com/paragonie/constant_time_encoding.git", + "reference": "df1e7fde177501eee2037dd159cf04f5f301a512" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/paragonie/constant_time_encoding/zipball/df1e7fde177501eee2037dd159cf04f5f301a512", + "reference": "df1e7fde177501eee2037dd159cf04f5f301a512", + "shasum": "" + }, + "require": { + "php": "^8" + }, + "require-dev": { + "phpunit/phpunit": "^9", + "vimeo/psalm": "^4|^5" + }, + "type": "library", + "autoload": { + "psr-4": { + "ParagonIE\\ConstantTime\\": "src/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Paragon Initiative Enterprises", + "email": "security@paragonie.com", + "homepage": "https://paragonie.com", + "role": "Maintainer" + }, + { + "name": "Steve 'Sc00bz' Thomas", + "email": "steve@tobtu.com", + "homepage": "https://www.tobtu.com", + "role": "Original Developer" + } + ], + "description": "Constant-time Implementations of RFC 4648 Encoding (Base-64, Base-32, Base-16)", + "keywords": [ + "base16", + "base32", + "base32_decode", + "base32_encode", + "base64", + "base64_decode", + "base64_encode", + "bin2hex", + "encoding", + "hex", + "hex2bin", + "rfc4648" + ], + "support": { + "email": "info@paragonie.com", + "issues": "https://github.com/paragonie/constant_time_encoding/issues", + "source": "https://github.com/paragonie/constant_time_encoding" + }, + "time": "2024-05-08T12:36:18+00:00" + }, + { + "name": "paragonie/random_compat", + "version": "v9.99.100", + "source": { + "type": "git", + "url": "https://github.com/paragonie/random_compat.git", + "reference": "996434e5492cb4c3edcb9168db6fbb1359ef965a" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/paragonie/random_compat/zipball/996434e5492cb4c3edcb9168db6fbb1359ef965a", + "reference": "996434e5492cb4c3edcb9168db6fbb1359ef965a", + "shasum": "" + }, + "require": { + "php": ">= 7" + }, + "require-dev": { + "phpunit/phpunit": "4.*|5.*", + "vimeo/psalm": "^1" + }, + "suggest": { + "ext-libsodium": "Provides a modern crypto API that can be used to generate random bytes." + }, + "type": "library", + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Paragon Initiative Enterprises", + "email": "security@paragonie.com", + "homepage": "https://paragonie.com" + } + ], + "description": "PHP 5.x polyfill for random_bytes() and random_int() from PHP 7", + "keywords": [ + "csprng", + "polyfill", + "pseudorandom", + "random" + ], + "support": { + "email": "info@paragonie.com", + "issues": "https://github.com/paragonie/random_compat/issues", + "source": "https://github.com/paragonie/random_compat" + }, + "time": "2020-10-15T08:29:30+00:00" + }, + { + "name": "php-amqplib/php-amqplib", + "version": "v3.7.2", + "source": { + "type": "git", + "url": "https://github.com/php-amqplib/php-amqplib.git", + "reference": "738a73eb0019b6c99d9bc25d7a0c0dd8f56a5199" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/php-amqplib/php-amqplib/zipball/738a73eb0019b6c99d9bc25d7a0c0dd8f56a5199", + "reference": "738a73eb0019b6c99d9bc25d7a0c0dd8f56a5199", + "shasum": "" + }, + "require": { + "ext-mbstring": "*", + "ext-sockets": "*", + "php": "^7.2||^8.0", + "phpseclib/phpseclib": "^2.0|^3.0" + }, + "conflict": { + "php": "7.4.0 - 7.4.1" + }, + "replace": { + "videlalvaro/php-amqplib": "self.version" + }, + "require-dev": { + "ext-curl": "*", + "nategood/httpful": "^0.2.20", + "phpunit/phpunit": "^7.5|^9.5", + "squizlabs/php_codesniffer": "^3.6" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "3.0-dev" + } + }, + "autoload": { + "psr-4": { + "PhpAmqpLib\\": "PhpAmqpLib/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "LGPL-2.1-or-later" + ], + "authors": [ + { + "name": "Alvaro Videla", + "role": "Original Maintainer" + }, + { + "name": "Raúl Araya", + "email": "nubeiro@gmail.com", + "role": "Maintainer" + }, + { + "name": "Luke Bakken", + "email": "luke@bakken.io", + "role": "Maintainer" + }, + { + "name": "Ramūnas Dronga", + "email": "github@ramuno.lt", + "role": "Maintainer" + } + ], + "description": "Formerly videlalvaro/php-amqplib. This library is a pure PHP implementation of the AMQP protocol. It's been tested against RabbitMQ.", + "homepage": "https://github.com/php-amqplib/php-amqplib/", + "keywords": [ + "message", + "queue", + "rabbitmq" + ], + "support": { + "issues": "https://github.com/php-amqplib/php-amqplib/issues", + "source": "https://github.com/php-amqplib/php-amqplib/tree/v3.7.2" + }, + "time": "2024-11-21T09:21:41+00:00" + }, { "name": "php-http/discovery", "version": "1.20.0", @@ -817,6 +1015,116 @@ }, "time": "2024-10-02T11:20:13+00:00" }, + { + "name": "phpseclib/phpseclib", + "version": "3.0.43", + "source": { + "type": "git", + "url": "https://github.com/phpseclib/phpseclib.git", + "reference": "709ec107af3cb2f385b9617be72af8cf62441d02" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/phpseclib/phpseclib/zipball/709ec107af3cb2f385b9617be72af8cf62441d02", + "reference": "709ec107af3cb2f385b9617be72af8cf62441d02", + "shasum": "" + }, + "require": { + "paragonie/constant_time_encoding": "^1|^2|^3", + "paragonie/random_compat": "^1.4|^2.0|^9.99.99", + "php": ">=5.6.1" + }, + "require-dev": { + "phpunit/phpunit": "*" + }, + "suggest": { + "ext-dom": "Install the DOM extension to load XML formatted public keys.", + "ext-gmp": "Install the GMP (GNU Multiple Precision) extension in order to speed up arbitrary precision integer arithmetic operations.", + "ext-libsodium": "SSH2/SFTP can make use of some algorithms provided by the libsodium-php extension.", + "ext-mcrypt": "Install the Mcrypt extension in order to speed up a few other cryptographic operations.", + "ext-openssl": "Install the OpenSSL extension in order to speed up a wide variety of cryptographic operations." + }, + "type": "library", + "autoload": { + "files": [ + "phpseclib/bootstrap.php" + ], + "psr-4": { + "phpseclib3\\": "phpseclib/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Jim Wigginton", + "email": "terrafrost@php.net", + "role": "Lead Developer" + }, + { + "name": "Patrick Monnerat", + "email": "pm@datasphere.ch", + "role": "Developer" + }, + { + "name": "Andreas Fischer", + "email": "bantu@phpbb.com", + "role": "Developer" + }, + { + "name": "Hans-Jürgen Petrich", + "email": "petrich@tronic-media.com", + "role": "Developer" + }, + { + "name": "Graham Campbell", + "email": "graham@alt-three.com", + "role": "Developer" + } + ], + "description": "PHP Secure Communications Library - Pure-PHP implementations of RSA, AES, SSH2, SFTP, X.509 etc.", + "homepage": "http://phpseclib.sourceforge.net", + "keywords": [ + "BigInteger", + "aes", + "asn.1", + "asn1", + "blowfish", + "crypto", + "cryptography", + "encryption", + "rsa", + "security", + "sftp", + "signature", + "signing", + "ssh", + "twofish", + "x.509", + "x509" + ], + "support": { + "issues": "https://github.com/phpseclib/phpseclib/issues", + "source": "https://github.com/phpseclib/phpseclib/tree/3.0.43" + }, + "funding": [ + { + "url": "https://github.com/terrafrost", + "type": "github" + }, + { + "url": "https://www.patreon.com/phpseclib", + "type": "patreon" + }, + { + "url": "https://tidelift.com/funding/github/packagist/phpseclib/phpseclib", + "type": "tidelift" + } + ], + "time": "2024-12-14T21:12:59+00:00" + }, { "name": "psr/container", "version": "2.0.2", @@ -1263,16 +1571,16 @@ }, { "name": "symfony/deprecation-contracts", - "version": "v3.5.0", + "version": "v3.5.1", "source": { "type": "git", "url": "https://github.com/symfony/deprecation-contracts.git", - "reference": "0e0d29ce1f20deffb4ab1b016a7257c4f1e789a1" + "reference": "74c71c939a79f7d5bf3c1ce9f5ea37ba0114c6f6" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/deprecation-contracts/zipball/0e0d29ce1f20deffb4ab1b016a7257c4f1e789a1", - "reference": "0e0d29ce1f20deffb4ab1b016a7257c4f1e789a1", + "url": "https://api.github.com/repos/symfony/deprecation-contracts/zipball/74c71c939a79f7d5bf3c1ce9f5ea37ba0114c6f6", + "reference": "74c71c939a79f7d5bf3c1ce9f5ea37ba0114c6f6", "shasum": "" }, "require": { @@ -1280,12 +1588,12 @@ }, "type": "library", "extra": { + "thanks": { + "url": "https://github.com/symfony/contracts", + "name": "symfony/contracts" + }, "branch-alias": { "dev-main": "3.5-dev" - }, - "thanks": { - "name": "symfony/contracts", - "url": "https://github.com/symfony/contracts" } }, "autoload": { @@ -1310,7 +1618,7 @@ "description": "A generic function and convention to trigger deprecation notices", "homepage": "https://symfony.com", "support": { - "source": "https://github.com/symfony/deprecation-contracts/tree/v3.5.0" + "source": "https://github.com/symfony/deprecation-contracts/tree/v3.5.1" }, "funding": [ { @@ -1326,30 +1634,31 @@ "type": "tidelift" } ], - "time": "2024-04-18T09:32:20+00:00" + "time": "2024-09-25T14:20:29+00:00" }, { "name": "symfony/http-client", - "version": "v7.1.7", + "version": "v7.2.3", "source": { "type": "git", "url": "https://github.com/symfony/http-client.git", - "reference": "90ab2a4992dcf5d1f19a9b8737eba36a7c305fd0" + "reference": "7ce6078c79a4a7afff931c413d2959d3bffbfb8d" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/http-client/zipball/90ab2a4992dcf5d1f19a9b8737eba36a7c305fd0", - "reference": "90ab2a4992dcf5d1f19a9b8737eba36a7c305fd0", + "url": "https://api.github.com/repos/symfony/http-client/zipball/7ce6078c79a4a7afff931c413d2959d3bffbfb8d", + "reference": "7ce6078c79a4a7afff931c413d2959d3bffbfb8d", "shasum": "" }, "require": { "php": ">=8.2", "psr/log": "^1|^2|^3", "symfony/deprecation-contracts": "^2.5|^3", - "symfony/http-client-contracts": "^3.4.1", + "symfony/http-client-contracts": "~3.4.4|^3.5.2", "symfony/service-contracts": "^2.5|^3" }, "conflict": { + "amphp/amp": "<2.5", "php-http/discovery": "<1.15", "symfony/http-foundation": "<6.4" }, @@ -1360,14 +1669,14 @@ "symfony/http-client-implementation": "3.0" }, "require-dev": { - "amphp/amp": "^2.5", - "amphp/http-client": "^4.2.1", - "amphp/http-tunnel": "^1.0", + "amphp/http-client": "^4.2.1|^5.0", + "amphp/http-tunnel": "^1.0|^2.0", "amphp/socket": "^1.1", "guzzlehttp/promises": "^1.4|^2.0", "nyholm/psr7": "^1.0", "php-http/httplug": "^1.0|^2.0", "psr/http-client": "^1.0", + "symfony/amphp-http-client-meta": "^1.0|^2.0", "symfony/dependency-injection": "^6.4|^7.0", "symfony/http-kernel": "^6.4|^7.0", "symfony/messenger": "^6.4|^7.0", @@ -1404,7 +1713,7 @@ "http" ], "support": { - "source": "https://github.com/symfony/http-client/tree/v7.1.7" + "source": "https://github.com/symfony/http-client/tree/v7.2.3" }, "funding": [ { @@ -1420,20 +1729,20 @@ "type": "tidelift" } ], - "time": "2024-11-05T16:45:54+00:00" + "time": "2025-01-28T15:51:35+00:00" }, { "name": "symfony/http-client-contracts", - "version": "v3.5.0", + "version": "v3.5.2", "source": { "type": "git", "url": "https://github.com/symfony/http-client-contracts.git", - "reference": "20414d96f391677bf80078aa55baece78b82647d" + "reference": "ee8d807ab20fcb51267fdace50fbe3494c31e645" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/http-client-contracts/zipball/20414d96f391677bf80078aa55baece78b82647d", - "reference": "20414d96f391677bf80078aa55baece78b82647d", + "url": "https://api.github.com/repos/symfony/http-client-contracts/zipball/ee8d807ab20fcb51267fdace50fbe3494c31e645", + "reference": "ee8d807ab20fcb51267fdace50fbe3494c31e645", "shasum": "" }, "require": { @@ -1441,12 +1750,12 @@ }, "type": "library", "extra": { + "thanks": { + "url": "https://github.com/symfony/contracts", + "name": "symfony/contracts" + }, "branch-alias": { "dev-main": "3.5-dev" - }, - "thanks": { - "name": "symfony/contracts", - "url": "https://github.com/symfony/contracts" } }, "autoload": { @@ -1482,7 +1791,7 @@ "standards" ], "support": { - "source": "https://github.com/symfony/http-client-contracts/tree/v3.5.0" + "source": "https://github.com/symfony/http-client-contracts/tree/v3.5.2" }, "funding": [ { @@ -1498,7 +1807,7 @@ "type": "tidelift" } ], - "time": "2024-04-18T09:32:20+00:00" + "time": "2024-12-07T08:49:48+00:00" }, { "name": "symfony/polyfill-mbstring", @@ -1526,8 +1835,8 @@ "type": "library", "extra": { "thanks": { - "name": "symfony/polyfill", - "url": "https://github.com/symfony/polyfill" + "url": "https://github.com/symfony/polyfill", + "name": "symfony/polyfill" } }, "autoload": { @@ -1600,8 +1909,8 @@ "type": "library", "extra": { "thanks": { - "name": "symfony/polyfill", - "url": "https://github.com/symfony/polyfill" + "url": "https://github.com/symfony/polyfill", + "name": "symfony/polyfill" } }, "autoload": { @@ -1658,16 +1967,16 @@ }, { "name": "symfony/service-contracts", - "version": "v3.5.0", + "version": "v3.5.1", "source": { "type": "git", "url": "https://github.com/symfony/service-contracts.git", - "reference": "bd1d9e59a81d8fa4acdcea3f617c581f7475a80f" + "reference": "e53260aabf78fb3d63f8d79d69ece59f80d5eda0" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/service-contracts/zipball/bd1d9e59a81d8fa4acdcea3f617c581f7475a80f", - "reference": "bd1d9e59a81d8fa4acdcea3f617c581f7475a80f", + "url": "https://api.github.com/repos/symfony/service-contracts/zipball/e53260aabf78fb3d63f8d79d69ece59f80d5eda0", + "reference": "e53260aabf78fb3d63f8d79d69ece59f80d5eda0", "shasum": "" }, "require": { @@ -1680,12 +1989,12 @@ }, "type": "library", "extra": { + "thanks": { + "url": "https://github.com/symfony/contracts", + "name": "symfony/contracts" + }, "branch-alias": { "dev-main": "3.5-dev" - }, - "thanks": { - "name": "symfony/contracts", - "url": "https://github.com/symfony/contracts" } }, "autoload": { @@ -1721,7 +2030,7 @@ "standards" ], "support": { - "source": "https://github.com/symfony/service-contracts/tree/v3.5.0" + "source": "https://github.com/symfony/service-contracts/tree/v3.5.1" }, "funding": [ { @@ -1737,7 +2046,7 @@ "type": "tidelift" } ], - "time": "2024-04-18T09:32:20+00:00" + "time": "2024-09-25T14:20:29+00:00" }, { "name": "tbachert/spi", @@ -1766,10 +2075,10 @@ }, "type": "composer-plugin", "extra": { + "class": "Nevay\\SPI\\Composer\\Plugin", "branch-alias": { "dev-main": "0.2.x-dev" }, - "class": "Nevay\\SPI\\Composer\\Plugin", "plugin-optional": true }, "autoload": { @@ -1840,54 +2149,137 @@ }, "time": "2024-10-04T13:55:36+00:00" }, + { + "name": "utopia-php/compression", + "version": "0.1.3", + "source": { + "type": "git", + "url": "https://github.com/utopia-php/compression.git", + "reference": "66f093557ba66d98245e562036182016c7dcfe8a" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/utopia-php/compression/zipball/66f093557ba66d98245e562036182016c7dcfe8a", + "reference": "66f093557ba66d98245e562036182016c7dcfe8a", + "shasum": "" + }, + "require": { + "php": ">=8.0" + }, + "require-dev": { + "laravel/pint": "1.2.*", + "phpunit/phpunit": "^9.3", + "vimeo/psalm": "4.0.1" + }, + "type": "library", + "autoload": { + "psr-4": { + "Utopia\\Compression\\": "src/Compression" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "description": "A simple Compression library to handle file compression", + "keywords": [ + "compression", + "framework", + "php", + "upf", + "utopia" + ], + "support": { + "issues": "https://github.com/utopia-php/compression/issues", + "source": "https://github.com/utopia-php/compression/tree/0.1.3" + }, + "time": "2025-01-15T15:15:51+00:00" + }, + { + "name": "utopia-php/fetch", + "version": "0.3.0", + "source": { + "type": "git", + "url": "https://github.com/utopia-php/fetch.git", + "reference": "02b12c05aec13399dcc2da8d51f908e328ab63f4" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/utopia-php/fetch/zipball/02b12c05aec13399dcc2da8d51f908e328ab63f4", + "reference": "02b12c05aec13399dcc2da8d51f908e328ab63f4", + "shasum": "" + }, + "require": { + "php": ">=8.0" + }, + "require-dev": { + "laravel/pint": "^1.5.0", + "phpstan/phpstan": "^1.10", + "phpunit/phpunit": "^9.5" + }, + "type": "library", + "autoload": { + "psr-4": { + "Utopia\\Fetch\\": "src/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "description": "A simple library that provides an interface for making HTTP Requests.", + "support": { + "issues": "https://github.com/utopia-php/fetch/issues", + "source": "https://github.com/utopia-php/fetch/tree/0.3.0" + }, + "time": "2025-01-17T06:11:10+00:00" + }, { "name": "utopia-php/framework", - "version": "0.34.3", + "version": "0.33.16", "source": { "type": "git", "url": "https://github.com/utopia-php/http.git", - "reference": "e3bbca07c1df4e908ea9d3ce4f59367a7696b66b" + "reference": "e91d4c560d1b809e25faa63d564fef034363b50f" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/utopia-php/http/zipball/e3bbca07c1df4e908ea9d3ce4f59367a7696b66b", - "reference": "e3bbca07c1df4e908ea9d3ce4f59367a7696b66b", + "url": "https://api.github.com/repos/utopia-php/http/zipball/e91d4c560d1b809e25faa63d564fef034363b50f", + "reference": "e91d4c560d1b809e25faa63d564fef034363b50f", "shasum": "" }, "require": { - "ext-swoole": "*", - "php": ">=8.0" + "php": ">=8.1", + "utopia-php/compression": "0.1.*", + "utopia-php/telemetry": "0.1.*" }, "require-dev": { "laravel/pint": "^1.2", "phpbench/phpbench": "^1.2", "phpstan/phpstan": "^1.10", - "phpunit/phpunit": "^9.5.25", - "swoole/ide-helper": "4.8.3" + "phpunit/phpunit": "^9.5.25" }, "type": "library", "autoload": { "psr-4": { - "Utopia\\": "src/", - "Tests\\E2E\\": "tests/e2e" + "Utopia\\": "src/" } }, "notification-url": "https://packagist.org/downloads/", "license": [ "MIT" ], - "description": "A simple, light and advanced PHP HTTP framework", + "description": "A simple, light and advanced PHP framework", "keywords": [ "framework", - "http", "php", "upf" ], "support": { "issues": "https://github.com/utopia-php/http/issues", - "source": "https://github.com/utopia-php/http/tree/0.34.3" + "source": "https://github.com/utopia-php/http/tree/0.33.16" }, - "time": "2024-07-02T15:08:46+00:00" + "time": "2025-01-16T15:58:50+00:00" }, { "name": "utopia-php/telemetry", @@ -2139,16 +2531,16 @@ }, { "name": "nikic/php-parser", - "version": "v5.3.1", + "version": "v5.4.0", "source": { "type": "git", "url": "https://github.com/nikic/PHP-Parser.git", - "reference": "8eea230464783aa9671db8eea6f8c6ac5285794b" + "reference": "447a020a1f875a434d62f2a401f53b82a396e494" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/nikic/PHP-Parser/zipball/8eea230464783aa9671db8eea6f8c6ac5285794b", - "reference": "8eea230464783aa9671db8eea6f8c6ac5285794b", + "url": "https://api.github.com/repos/nikic/PHP-Parser/zipball/447a020a1f875a434d62f2a401f53b82a396e494", + "reference": "447a020a1f875a434d62f2a401f53b82a396e494", "shasum": "" }, "require": { @@ -2191,9 +2583,9 @@ ], "support": { "issues": "https://github.com/nikic/PHP-Parser/issues", - "source": "https://github.com/nikic/PHP-Parser/tree/v5.3.1" + "source": "https://github.com/nikic/PHP-Parser/tree/v5.4.0" }, - "time": "2024-10-08T18:51:32+00:00" + "time": "2024-12-30T11:07:19+00:00" }, { "name": "phar-io/manifest", @@ -2315,16 +2707,16 @@ }, { "name": "phpstan/phpstan", - "version": "1.12.10", + "version": "1.12.16", "source": { "type": "git", "url": "https://github.com/phpstan/phpstan.git", - "reference": "fc463b5d0fe906dcf19689be692c65c50406a071" + "reference": "e0bb5cb78545aae631220735aa706eac633a6be9" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/phpstan/phpstan/zipball/fc463b5d0fe906dcf19689be692c65c50406a071", - "reference": "fc463b5d0fe906dcf19689be692c65c50406a071", + "url": "https://api.github.com/repos/phpstan/phpstan/zipball/e0bb5cb78545aae631220735aa706eac633a6be9", + "reference": "e0bb5cb78545aae631220735aa706eac633a6be9", "shasum": "" }, "require": { @@ -2369,7 +2761,7 @@ "type": "github" } ], - "time": "2024-11-11T15:37:09+00:00" + "time": "2025-01-21T14:50:05+00:00" }, { "name": "phpunit/php-code-coverage", @@ -2692,16 +3084,16 @@ }, { "name": "phpunit/phpunit", - "version": "9.6.21", + "version": "9.6.22", "source": { "type": "git", "url": "https://github.com/sebastianbergmann/phpunit.git", - "reference": "de6abf3b6f8dd955fac3caad3af7a9504e8c2ffa" + "reference": "f80235cb4d3caa59ae09be3adf1ded27521d1a9c" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/sebastianbergmann/phpunit/zipball/de6abf3b6f8dd955fac3caad3af7a9504e8c2ffa", - "reference": "de6abf3b6f8dd955fac3caad3af7a9504e8c2ffa", + "url": "https://api.github.com/repos/sebastianbergmann/phpunit/zipball/f80235cb4d3caa59ae09be3adf1ded27521d1a9c", + "reference": "f80235cb4d3caa59ae09be3adf1ded27521d1a9c", "shasum": "" }, "require": { @@ -2712,7 +3104,7 @@ "ext-mbstring": "*", "ext-xml": "*", "ext-xmlwriter": "*", - "myclabs/deep-copy": "^1.12.0", + "myclabs/deep-copy": "^1.12.1", "phar-io/manifest": "^2.0.4", "phar-io/version": "^3.2.1", "php": ">=7.3", @@ -2775,7 +3167,7 @@ "support": { "issues": "https://github.com/sebastianbergmann/phpunit/issues", "security": "https://github.com/sebastianbergmann/phpunit/security/policy", - "source": "https://github.com/sebastianbergmann/phpunit/tree/9.6.21" + "source": "https://github.com/sebastianbergmann/phpunit/tree/9.6.22" }, "funding": [ { @@ -2791,7 +3183,7 @@ "type": "tidelift" } ], - "time": "2024-09-19T10:50:18+00:00" + "time": "2024-12-05T13:48:26+00:00" }, { "name": "sebastian/cli-parser", @@ -3850,16 +4242,16 @@ }, { "name": "workerman/workerman", - "version": "v4.2.0", + "version": "v4.2.1", "source": { "type": "git", "url": "https://github.com/walkor/workerman.git", - "reference": "df513f3fd274811ebb8358d05d7cec19ee8bd3e1" + "reference": "cafb5a43d93d7d30a16b32a57948581cca993562" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/walkor/workerman/zipball/df513f3fd274811ebb8358d05d7cec19ee8bd3e1", - "reference": "df513f3fd274811ebb8358d05d7cec19ee8bd3e1", + "url": "https://api.github.com/repos/walkor/workerman/zipball/cafb5a43d93d7d30a16b32a57948581cca993562", + "reference": "cafb5a43d93d7d30a16b32a57948581cca993562", "shasum": "" }, "require": { @@ -3909,7 +4301,7 @@ "type": "patreon" } ], - "time": "2024-11-07T08:31:33+00:00" + "time": "2024-11-24T11:45:37+00:00" } ], "aliases": [], @@ -3918,8 +4310,10 @@ "prefer-stable": false, "prefer-lowest": false, "platform": { - "php": ">=8.1" + "php": ">=8.3" + }, + "platform-dev": { + "ext-redis": "*" }, - "platform-dev": [], "plugin-api-version": "2.6.0" } diff --git a/docker-compose.yml b/docker-compose.yml index fe8d540..09c2320 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -4,12 +4,15 @@ services: tests: container_name: tests build: . + command: + - vendor/bin/phpunit + - tests volumes: - ./:/usr/local/src depends_on: - - redis - - redis-cluster-0 - swoole + - swoole-amqp + - swoole-redis-cluster - workerman swoole: @@ -28,7 +31,18 @@ services: volumes: - ./:/usr/src/code depends_on: - - redis-cluster-0 + redis-cluster-0: + condition: service_healthy + + swoole-amqp: + container_name: swoole-amqp + build: ./tests/Queue/servers/AMQP/. + command: php /usr/src/code/tests/Queue/servers/AMQP/worker.php + volumes: + - ./:/usr/src/code + depends_on: + amqp: + condition: service_healthy workerman: container_name: workerman @@ -54,9 +68,15 @@ services: - REDIS_CLUSTER_CREATOR=yes - REDIS_CLUSTER_REPLICAS=0 depends_on: - - redis-cluster-1 - - redis-cluster-2 - - redis-cluster-3 + redis-cluster-1: + condition: service_started + redis-cluster-2: + condition: service_started + redis-cluster-3: + condition: service_started + healthcheck: + test: [ "CMD", "redis-cli", "-h", "localhost", "-p", "6379", "ping" ] + start_interval: 1s redis-cluster-1: image: docker.io/bitnami/redis-cluster:7.4 @@ -64,6 +84,9 @@ services: - ALLOW_EMPTY_PASSWORD=yes - REDIS_DISABLE_COMMANDS=FLUSHDB,FLUSHALL - REDIS_NODES=redis-cluster-0 redis-cluster-1 redis-cluster-2 redis-cluster-3 + healthcheck: + test: [ "CMD", "redis-cli", "-h", "localhost", "-p", "6379", "ping" ] + start_interval: 1s redis-cluster-2: image: docker.io/bitnami/redis-cluster:7.4 @@ -71,10 +94,26 @@ services: - ALLOW_EMPTY_PASSWORD=yes - REDIS_DISABLE_COMMANDS=FLUSHDB,FLUSHALL - REDIS_NODES=redis-cluster-0 redis-cluster-1 redis-cluster-2 redis-cluster-3 + healthcheck: + test: [ "CMD", "redis-cli", "-h", "localhost", "-p", "6379", "ping" ] + start_interval: 1s redis-cluster-3: image: docker.io/bitnami/redis-cluster:7.4 environment: - ALLOW_EMPTY_PASSWORD=yes - REDIS_DISABLE_COMMANDS=FLUSHDB,FLUSHALL - - REDIS_NODES=redis-cluster-0 redis-cluster-1 redis-cluster-2 redis-cluster-3 \ No newline at end of file + - REDIS_NODES=redis-cluster-0 redis-cluster-1 redis-cluster-2 redis-cluster-3 + healthcheck: + test: [ "CMD", "redis-cli", "-h", "localhost", "-p", "6379", "ping" ] + start_interval: 1s + + amqp: + image: rabbitmq:4 + environment: + RABBITMQ_DEFAULT_USER: amqp + RABBITMQ_DEFAULT_PASS: amqp + RABBITMQ_DEFAULT_VHOST: "/" + healthcheck: + test: [ "CMD", "rabbitmqctl", "node_health_check"] + start_interval: 1s \ No newline at end of file diff --git a/src/Queue/Adapter.php b/src/Queue/Adapter.php index 756e67a..64c378d 100644 --- a/src/Queue/Adapter.php +++ b/src/Queue/Adapter.php @@ -5,15 +5,14 @@ abstract class Adapter { public int $workerNum; - public string $queue; + public Queue $queue; public string $namespace; - public Connection $connection; + public Consumer $consumer; public function __construct(int $workerNum, string $queue, string $namespace = 'utopia-queue') { $this->workerNum = $workerNum; - $this->queue = $queue; - $this->namespace = $namespace; + $this->queue = new Queue($queue, $namespace); } /** diff --git a/src/Queue/Adapter/Swoole.php b/src/Queue/Adapter/Swoole.php index 522e73a..fef30da 100644 --- a/src/Queue/Adapter/Swoole.php +++ b/src/Queue/Adapter/Swoole.php @@ -4,17 +4,17 @@ use Swoole\Process\Pool; use Utopia\Queue\Adapter; -use Utopia\Queue\Connection; +use Utopia\Queue\Consumer; class Swoole extends Adapter { protected Pool $pool; - public function __construct(Connection $connection, int $workerNum, string $queue, string $namespace = 'utopia-queue') + public function __construct(Consumer $consumer, int $workerNum, string $queue, string $namespace = 'utopia-queue') { parent::__construct($workerNum, $queue, $namespace); - $this->connection = $connection; + $this->consumer = $consumer; $this->pool = new Pool($workerNum); } diff --git a/src/Queue/Adapter/Workerman.php b/src/Queue/Adapter/Workerman.php index bd6255d..8c45d6f 100644 --- a/src/Queue/Adapter/Workerman.php +++ b/src/Queue/Adapter/Workerman.php @@ -3,20 +3,20 @@ namespace Utopia\Queue\Adapter; use Utopia\Queue\Adapter; -use Utopia\Queue\Connection; +use Utopia\Queue\Consumer; use Workerman\Worker; class Workerman extends Adapter { protected Worker $worker; - public function __construct(Connection $connection, int $workerNum, string $queue, string $namespace = 'utopia-queue') + public function __construct(Consumer $consumer, int $workerNum, string $queue, string $namespace = 'utopia-queue') { parent::__construct($workerNum, $queue, $namespace); $this->worker = new Worker(); $this->worker->count = $workerNum; - $this->connection = $connection; + $this->consumer = $consumer; } public function start(): self diff --git a/src/Queue/Broker/AMQP.php b/src/Queue/Broker/AMQP.php new file mode 100644 index 0000000..f105136 --- /dev/null +++ b/src/Queue/Broker/AMQP.php @@ -0,0 +1,138 @@ +getBody(), associative: true) ?? false; + if (!$nextMessage) { + $amqpMessage->nack(requeue: false); + return; + } + + $nextMessage['timestamp'] = (int)$nextMessage['timestamp']; + $message = new Message($nextMessage); + + $messageCallback($message); + $amqpMessage->ack(); + $successCallback($message); + } catch (Retryable $e) { + $amqpMessage->nack(requeue: true); + $errorCallback($message, $e); + } catch (\Throwable $th) { + $amqpMessage->nack(requeue: false); + $errorCallback($message, $th); + } + }; + + $channel = $this->getChannel(); + + // It's good practice for the consumer to set up exchange and queues. + // This approach uses TOPICs (https://www.rabbitmq.com/tutorials/amqp-concepts#exchange-topic) and + // dead-letter-exchanges (https://www.rabbitmq.com/docs/dlx) for failed messages. + + // 1. Declare the exchange and a dead-letter-exchange. + $channel->exchange_declare($queue->namespace, AMQPExchangeType::TOPIC, durable: true, auto_delete: false); + $channel->exchange_declare("{$queue->namespace}.failed", AMQPExchangeType::TOPIC, durable: true, auto_delete: false); + + // 2. Declare the working queue and configure the DLX for receiving rejected messages. + $channel->queue_declare($queue->name, durable: true, auto_delete: false, arguments: new AMQPTable(["x-dead-letter-exchange" => "{$queue->namespace}.failed"])); + $channel->queue_bind($queue->name, $queue->namespace, routing_key: $queue->name); + + // 3. Declare the dead-letter-queue and bind it to the DLX. + $channel->queue_declare("{$queue->name}.failed", durable: true, auto_delete: false); + $channel->queue_bind("{$queue->name}.failed", "{$queue->namespace}.failed", routing_key: $queue->name); + + // 4. Instruct to consume on the working queue. + $channel->basic_consume($queue->name, callback: $processMessage); + + // 5. Consume. This blocks until the connection gets closed. + $channel->consume(); + } + + public function close(): void + { + if ($this->channel) { + $this->channel->getConnection()?->close(); + } + } + + public function enqueue(Queue $queue, array $payload): bool + { + $payload = [ + 'pid' => \uniqid(more_entropy: true), + 'queue' => $queue->name, + 'timestamp' => time(), + 'payload' => $payload + ]; + $message = new AMQPMessage(json_encode($payload), ['content_type' => 'application/json', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]); + $this->getChannel()->basic_publish($message, $queue->namespace, routing_key: $queue->name); + return true; + } + + public function retry(Queue $queue, int $limit = null): void + { + // This is a no-op for AMQP + } + + public function getQueueSize(Queue $queue, bool $failedJobs = false): int + { + $queueName = $queue->name; + if ($failedJobs) { + $queueName = $queueName . ".failed"; + } + + $client = new Client(); + $response = $client->fetch(sprintf('http://%s:%s@%s:%s/api/queues/%s/%s', $this->user, $this->password, $this->host, $this->httpPort, urlencode($this->vhost), $queueName)); + + // If this queue does not exist (yet), the queue size is 0. + if ($response->getStatusCode() === 404) { + return 0; + } + + if ($response->getStatusCode() !== 200) { + throw new \Exception(sprintf('Invalid status code %d: %s', $response->getStatusCode(), $response->getBody())); + } + + $data = $response->json(); + return $data['messages']; + } + + private function getChannel(): AMQPChannel + { + if ($this->channel == null) { + $connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->password, $this->vhost); + $this->channel = $connection->channel(); + } + + return $this->channel; + } +} diff --git a/src/Queue/Broker/Redis.php b/src/Queue/Broker/Redis.php new file mode 100644 index 0000000..34ee22e --- /dev/null +++ b/src/Queue/Broker/Redis.php @@ -0,0 +1,167 @@ +closed) { + /** + * Waiting for next Job. + */ + $nextMessage = $this->connection->rightPopArray("{$queue->namespace}.queue.{$queue->name}", 5); + + if (!$nextMessage) { + continue; + } + + $nextMessage['timestamp'] = (int)$nextMessage['timestamp']; + + $message = new Message($nextMessage); + + /** + * Move Job to Jobs and it's PID to the processing list. + */ + $this->connection->setArray("{$queue->namespace}.jobs.{$queue->name}.{$message->getPid()}", $nextMessage); + $this->connection->leftPush("{$queue->namespace}.processing.{$queue->name}", $message->getPid()); + + /** + * Increment Total Jobs Received from Stats. + */ + $this->connection->increment("{$queue->namespace}.stats.{$queue->name}.total"); + + try { + /** + * Increment Processing Jobs from Stats. + */ + $this->connection->increment("{$queue->namespace}.stats.{$queue->name}.processing"); + + $messageCallback($message); + + /** + * Remove Jobs if successful. + */ + $this->connection->remove("{$queue->namespace}.jobs.{$queue->name}.{$message->getPid()}"); + + /** + * Increment Successful Jobs from Stats. + */ + $this->connection->increment("{$queue->namespace}.stats.{$queue->name}.success"); + + + $successCallback($message); + } catch (\Throwable $th) { + /** + * Move failed Job to Failed list. + */ + $this->connection->leftPush("{$queue->namespace}.failed.{$queue->name}", $message->getPid()); + + /** + * Increment Failed Jobs from Stats. + */ + $this->connection->increment("{$queue->namespace}.stats.{$queue->name}.failed"); + + $errorCallback($message, $th); + } finally { + /** + * Remove Job from Processing. + */ + $this->connection->listRemove("{$queue->namespace}.processing.{$queue->name}", $message->getPid()); + + /** + * Decrease Processing Jobs from Stats. + */ + $this->connection->decrement("{$queue->namespace}.stats.{$queue->name}.processing"); + } + } + } + + public function close(): void + { + $this->closed = true; + } + + public function enqueue(Queue $queue, array $payload): bool + { + $payload = [ + 'pid' => \uniqid(more_entropy: true), + 'queue' => $queue->name, + 'timestamp' => time(), + 'payload' => $payload + ]; + return $this->connection->leftPushArray("{$queue->namespace}.queue.{$queue->name}", $payload); + } + + /** + * Take all jobs from the failed queue and re-enqueue them. + * @param int|null $limit The amount of jobs to retry + */ + public function retry(Queue $queue, int $limit = null): void + { + $start = \time(); + $processed = 0; + + while (true) { + $pid = $this->connection->rightPop("{$queue->namespace}.failed.{$queue->name}", 5); + + // No more jobs to retry + if ($pid === false) { + break; + } + + $job = $this->getJob($queue, $pid); + + // Job doesn't exist + if ($job === false) { + break; + } + + // Job was already retried + if ($job->getTimestamp() >= $start) { + break; + } + + // We're reached the max amount of jobs to retry + if ($limit !== null && $processed >= $limit) { + break; + } + + $this->enqueue($queue, $job->getPayload()); + $processed++; + } + } + + private function getJob(Queue $queue, string $pid): Message|false + { + $value = $this->connection->get("{$queue->namespace}.jobs.{$queue->name}.{$pid}"); + + if ($value === false) { + return false; + } + + $job = json_decode($value, true); + return new Message($job); + } + + public function getQueueSize(Queue $queue, bool $failedJobs = false): int + { + $queueName = "{$queue->namespace}.queue.{$queue->name}"; + if ($failedJobs) { + $queueName = "{$queue->namespace}.failed.{$queue->name}"; + } + return $this->connection->listSize($queueName); + } +} diff --git a/src/Queue/Client.php b/src/Queue/Client.php deleted file mode 100644 index ac55bf8..0000000 --- a/src/Queue/Client.php +++ /dev/null @@ -1,118 +0,0 @@ -queue = $queue; - $this->namespace = $namespace; - $this->connection = $connection; - } - - public function enqueue(array $payload): bool - { - $payload = [ - 'pid' => \uniqid(more_entropy: true), - 'queue' => $this->queue, - 'timestamp' => time(), - 'payload' => $payload - ]; - - return $this->connection->leftPushArray("{$this->namespace}.queue.{$this->queue}", $payload); - } - - /** - * Take all jobs from the failed queue and re-enqueue them. - * @param int|null $limit The amount of jobs to retry - */ - public function retry(int $limit = null): void - { - $start = \time(); - $processed = 0; - - while (true) { - $pid = $this->connection->rightPop("{$this->namespace}.failed.{$this->queue}", 5); - - // No more jobs to retry - if ($pid === false) { - break; - } - - $job = $this->getJob($pid); - - // Job doesn't exist - if ($job === false) { - break; - } - - // Job was already retried - if ($job->getTimestamp() >= $start) { - break; - } - - // We're reached the max amount of jobs to retry - if ($limit !== null && $processed >= $limit) { - break; - } - - $this->enqueue($job->getPayload()); - $processed++; - } - } - - public function getJob(string $pid): Message|false - { - $value = $this->connection->get("{$this->namespace}.jobs.{$this->queue}.{$pid}"); - - if ($value === false) { - return false; - } - - $job = json_decode($value, true); - - return new Message($job); - } - - public function listJobs(int $total = 50, int $offset = 0): array - { - return $this->connection->listRange("{$this->namespace}.queue.{$this->queue}", $total, $offset); - } - - public function getQueueSize(): int - { - return $this->connection->listSize("{$this->namespace}.queue.{$this->queue}"); - } - - public function countTotalJobs(): int - { - return (int)($this->connection->get("{$this->namespace}.stats.{$this->queue}.total") ?? 0); - } - - public function countSuccessfulJobs(): int - { - return (int)($this->connection->get("{$this->namespace}.stats.{$this->queue}.success") ?? 0); - } - - public function countFailedJobs(): int - { - return (int)($this->connection->get("{$this->namespace}.stats.{$this->queue}.failed") ?? 0); - } - - public function countProcessingJobs(): int - { - return (int)($this->connection->get("{$this->namespace}.stats.{$this->queue}.processing") ?? 0); - } - - public function resetStats(): void - { - $this->connection->set("{$this->namespace}.stats.{$this->queue}.total", 0); - $this->connection->set("{$this->namespace}.stats.{$this->queue}.success", 0); - $this->connection->set("{$this->namespace}.stats.{$this->queue}.failed", 0); - $this->connection->set("{$this->namespace}.stats.{$this->queue}.processing", 0); - } -} diff --git a/src/Queue/Consumer.php b/src/Queue/Consumer.php new file mode 100644 index 0000000..8c55c6b --- /dev/null +++ b/src/Queue/Consumer.php @@ -0,0 +1,20 @@ +name)) { + throw new \InvalidArgumentException("Cannot create queue with empty name."); + } + } +} diff --git a/src/Queue/Server.php b/src/Queue/Server.php index ce6b573..1e389a0 100644 --- a/src/Queue/Server.php +++ b/src/Queue/Server.php @@ -2,9 +2,9 @@ namespace Utopia\Queue; +use Exception; use Throwable; use Utopia\CLI\Console; -use Exception; use Utopia\Hook; use Utopia\Telemetry\Adapter as Telemetry; use Utopia\Telemetry\Adapter\None as NoTelemetry; @@ -145,7 +145,7 @@ public static function setResource(string $name, callable $callback, array $inje self::$resourcesCallbacks[$name] = ['callback' => $callback, 'injections' => $injections, 'reset' => true]; } - public function setTelemetry(Telemetry $telemetry) + public function setTelemetry(Telemetry $telemetry): void { $this->jobWaitTime = $telemetry->createHistogram( 'messaging.process.wait.duration', @@ -218,76 +218,43 @@ public function start(): self if (!is_null($this->workerStartHook)) { call_user_func_array($this->workerStartHook->getAction(), $this->getArguments($this->workerStartHook)); } - while (true) { - /** - * Waiting for next Job. - */ - $nextMessage = $this->adapter->connection->rightPopArray("{$this->adapter->namespace}.queue.{$this->adapter->queue}", 5); - - if (!$nextMessage) { - continue; - } - - $nextMessage['timestamp'] = (int)$nextMessage['timestamp']; - - $message = new Message($nextMessage); - - self::setResource('message', fn () => $message); - - $receivedAtTimestamp = microtime(true); - - Console::info("[Job] Received Job ({$message->getPid()})."); - $waitDuration = microtime(true) - $message->getTimestamp(); - $this->jobWaitTime->record($waitDuration); - - /** - * Move Job to Jobs and it's PID to the processing list. - */ - $this->adapter->connection->setArray("{$this->adapter->namespace}.jobs.{$this->adapter->queue}.{$message->getPid()}", $nextMessage); - $this->adapter->connection->leftPush("{$this->adapter->namespace}.processing.{$this->adapter->queue}", $message->getPid()); - - /** - * Increment Total Jobs Received from Stats. - */ - $this->adapter->connection->increment("{$this->adapter->namespace}.stats.{$this->adapter->queue}.total"); - - try { - /** - * Increment Processing Jobs from Stats. - */ - $this->adapter->connection->increment("{$this->adapter->namespace}.stats.{$this->adapter->queue}.processing"); - - if ($this->job->getHook()) { - foreach ($this->initHooks as $hook) { // Global init hooks - if (in_array('*', $hook->getGroups())) { - $arguments = $this->getArguments($hook, $message->getPayload()); - \call_user_func_array($hook->getAction(), $arguments); + $this->adapter->consumer->consume( + $this->adapter->queue, + function (Message $message) { + $receivedAtTimestamp = microtime(true); + Console::info("[Job] Received Job ({$message->getPid()})."); + try { + $waitDuration = microtime(true) - $message->getTimestamp(); + $this->jobWaitTime->record($waitDuration); + + $this->resources = []; + self::setResource('message', fn () => $message); + if ($this->job->getHook()) { + foreach ($this->initHooks as $hook) { // Global init hooks + if (in_array('*', $hook->getGroups())) { + $arguments = $this->getArguments($hook, $message->getPayload()); + \call_user_func_array($hook->getAction(), $arguments); + } } } - } - foreach ($this->job->getGroups() as $group) { - foreach ($this->initHooks as $hook) { // Group init hooks - if (in_array($group, $hook->getGroups())) { - $arguments = $this->getArguments($hook, $message->getPayload()); - \call_user_func_array($hook->getAction(), $arguments); + foreach ($this->job->getGroups() as $group) { + foreach ($this->initHooks as $hook) { // Group init hooks + if (in_array($group, $hook->getGroups())) { + $arguments = $this->getArguments($hook, $message->getPayload()); + \call_user_func_array($hook->getAction(), $arguments); + } } } - } - - \call_user_func_array($this->job->getAction(), $this->getArguments($this->job, $message->getPayload())); - - /** - * Remove Jobs if successful. - */ - $this->adapter->connection->remove("{$this->adapter->namespace}.jobs.{$this->adapter->queue}.{$message->getPid()}"); - - /** - * Increment Successful Jobs from Stats. - */ - $this->adapter->connection->increment("{$this->adapter->namespace}.stats.{$this->adapter->queue}.success"); + \call_user_func_array($this->job->getAction(), $this->getArguments($this->job, $message->getPayload())); + } finally { + $processDuration = microtime(true) - $receivedAtTimestamp; + $this->processDuration->record($processDuration); + } + }, + function (Message $message) { if ($this->job->getHook()) { foreach ($this->shutdownHooks as $hook) { // Global init hooks if (in_array('*', $hook->getGroups())) { @@ -307,17 +274,8 @@ public function start(): self } Console::success("[Job] ({$message->getPid()}) successfully run."); - } catch (\Throwable $th) { - /** - * Move failed Job to Failed list. - */ - $this->adapter->connection->leftPush("{$this->adapter->namespace}.failed.{$this->adapter->queue}", $message->getPid()); - - /** - * Increment Failed Jobs from Stats. - */ - $this->adapter->connection->increment("{$this->adapter->namespace}.stats.{$this->adapter->queue}.failed"); - + }, + function (Message $message, Throwable $th) { Console::error("[Job] ({$message->getPid()}) failed to run."); Console::error("[Job] ({$message->getPid()}) {$th->getMessage()}"); @@ -325,23 +283,8 @@ public function start(): self foreach ($this->errorHooks as $hook) { call_user_func_array($hook->getAction(), $this->getArguments($hook)); } - } finally { - $processDuration = microtime(true) - $receivedAtTimestamp; - $this->processDuration->record($processDuration); - - /** - * Remove Job from Processing. - */ - $this->adapter->connection->listRemove("{$this->adapter->namespace}.processing.{$this->adapter->queue}", $message->getPid()); - - /** - * Decrease Processing Jobs from Stats. - */ - $this->adapter->connection->decrement("{$this->adapter->namespace}.stats.{$this->adapter->queue}.processing"); - } - - $this->resources = []; - } + }, + ); }); $this->adapter->start(); diff --git a/tests/Queue/E2E/Adapter/AMQPTest.php b/tests/Queue/E2E/Adapter/AMQPTest.php new file mode 100644 index 0000000..dac71bd --- /dev/null +++ b/tests/Queue/E2E/Adapter/AMQPTest.php @@ -0,0 +1,21 @@ +getClient(); - $client->resetStats(); + $publisher = $this->getPublisher(); foreach ($this->payloads as $payload) { - $this->assertTrue($client->enqueue($payload)); + $this->assertTrue($publisher->enqueue($this->getQueue(), $payload)); } sleep(1); - - $this->assertEquals(7, $client->countTotalJobs()); - $this->assertEquals(0, $client->getQueueSize()); - $this->assertEquals(0, $client->countProcessingJobs()); - $this->assertEquals(0, $client->countFailedJobs()); - $this->assertEquals(7, $client->countSuccessfulJobs()); } protected function testConcurrency(): void { run(function () { - $client = $this->getClient(); - go(function () use ($client) { - $client->resetStats(); - + $publisher = $this->getPublisher(); + go(function () use ($publisher) { foreach ($this->payloads as $payload) { - $this->assertTrue($client->enqueue($payload)); + $this->assertTrue($publisher->enqueue($this->getQueue(), $payload)); } sleep(1); - - $this->assertEquals(7, $client->countTotalJobs()); - $this->assertEquals(0, $client->countProcessingJobs()); - $this->assertEquals(0, $client->countFailedJobs()); - $this->assertEquals(7, $client->countSuccessfulJobs()); }); }); } @@ -102,54 +91,29 @@ protected function testConcurrency(): void */ public function testRetry(): void { - $client = $this->getClient(); - $client->resetStats(); + $publisher = $this->getPublisher(); - $client->enqueue([ + $publisher->enqueue($this->getQueue(), [ 'type' => 'test_exception', 'id' => 1 ]); - $client->enqueue([ + $publisher->enqueue($this->getQueue(), [ 'type' => 'test_exception', 'id' => 2 ]); - $client->enqueue([ + $publisher->enqueue($this->getQueue(), [ 'type' => 'test_exception', 'id' => 3 ]); - $client->enqueue([ + $publisher->enqueue($this->getQueue(), [ 'type' => 'test_exception', 'id' => 4 ]); sleep(1); - - $this->assertEquals(4, $client->countTotalJobs()); - $this->assertEquals(0, $client->countProcessingJobs()); - $this->assertEquals(4, $client->countFailedJobs()); - $this->assertEquals(0, $client->countSuccessfulJobs()); - - $client->resetStats(); - - $client->retry(); - + $publisher->retry($this->getQueue()); sleep(1); - - // Retry will retry ALL failed jobs regardless of if they are still tracked in stats - $this->assertEquals(4, $client->countTotalJobs()); - $this->assertEquals(0, $client->countProcessingJobs()); - $this->assertEquals(4, $client->countFailedJobs()); - $this->assertEquals(0, $client->countSuccessfulJobs()); - - $client->resetStats(); - - $client->retry(2); - + $publisher->retry($this->getQueue(), 2); sleep(1); - - $this->assertEquals(2, $client->countTotalJobs()); - $this->assertEquals(0, $client->countProcessingJobs()); - $this->assertEquals(2, $client->countFailedJobs()); - $this->assertEquals(0, $client->countSuccessfulJobs()); } } diff --git a/tests/Queue/E2E/Adapter/SwooleRedisClusterTest.php b/tests/Queue/E2E/Adapter/SwooleRedisClusterTest.php index 1e4d34c..2e02744 100644 --- a/tests/Queue/E2E/Adapter/SwooleRedisClusterTest.php +++ b/tests/Queue/E2E/Adapter/SwooleRedisClusterTest.php @@ -3,16 +3,21 @@ namespace Queue\E2E\Adapter; use Tests\E2E\Adapter\Base; -use Utopia\Queue\Client; +use Utopia\Queue\Broker\Redis; use Utopia\Queue\Connection\RedisCluster; +use Utopia\Queue\Publisher; +use Utopia\Queue\Queue; class SwooleRedisClusterTest extends Base { - protected function getClient(): Client + protected function getPublisher(): Publisher { $connection = new RedisCluster(['redis-cluster-0:6379', 'redis-cluster-1:6379', 'redis-cluster-2:6379']); - $client = new Client('swoole-redis-cluster', $connection); + return new Redis($connection); + } - return $client; + protected function getQueue(): Queue + { + return new Queue('swoole-redis-cluster'); } } diff --git a/tests/Queue/E2E/Adapter/SwooleTest.php b/tests/Queue/E2E/Adapter/SwooleTest.php index e940c98..9a3f183 100644 --- a/tests/Queue/E2E/Adapter/SwooleTest.php +++ b/tests/Queue/E2E/Adapter/SwooleTest.php @@ -2,16 +2,21 @@ namespace Tests\E2E\Adapter; -use Utopia\Queue\Client; +use Utopia\Queue\Broker\Redis as RedisBroker; use Utopia\Queue\Connection\Redis; +use Utopia\Queue\Publisher; +use Utopia\Queue\Queue; class SwooleTest extends Base { - protected function getClient(): Client + protected function getPublisher(): Publisher { $connection = new Redis('redis', 6379); - $client = new Client('swoole', $connection); + return new RedisBroker($connection); + } - return $client; + protected function getQueue(): Queue + { + return new Queue('swoole'); } } diff --git a/tests/Queue/E2E/Adapter/WorkermanTest.php b/tests/Queue/E2E/Adapter/WorkermanTest.php index e0cd1ed..a9c7899 100644 --- a/tests/Queue/E2E/Adapter/WorkermanTest.php +++ b/tests/Queue/E2E/Adapter/WorkermanTest.php @@ -2,16 +2,21 @@ namespace Tests\E2E\Adapter; -use Utopia\Queue\Client; +use Utopia\Queue\Broker\Redis as RedisPublisher; use Utopia\Queue\Connection\Redis; +use Utopia\Queue\Publisher; +use Utopia\Queue\Queue; class WorkermanTest extends Base { - protected function getClient(): Client + protected function getPublisher(): Publisher { $connection = new Redis('redis', 6379); - $client = new Client('workerman', $connection); + return new RedisPublisher($connection); + } - return $client; + protected function getQueue(): Queue + { + return new Queue('workerman'); } } diff --git a/tests/Queue/servers/AMQP/Dockerfile b/tests/Queue/servers/AMQP/Dockerfile new file mode 100644 index 0000000..8643629 --- /dev/null +++ b/tests/Queue/servers/AMQP/Dockerfile @@ -0,0 +1,3 @@ +FROM phpswoole/swoole:php8.3-alpine + +RUN apk add autoconf build-base \ No newline at end of file diff --git a/tests/Queue/servers/AMQP/worker.php b/tests/Queue/servers/AMQP/worker.php new file mode 100644 index 0000000..d590d46 --- /dev/null +++ b/tests/Queue/servers/AMQP/worker.php @@ -0,0 +1,32 @@ +job() + ->inject('message') + ->action(function (Message $message) { + handleRequest($message); + }); + +$server + ->error() + ->inject('error') + ->action(function ($th) { + echo $th->getMessage() . PHP_EOL; + }); + +$server + ->workerStart() + ->action(function () { + echo "Worker Started" . PHP_EOL; + }); + +$server->start(); diff --git a/tests/Queue/servers/Swoole/Dockerfile b/tests/Queue/servers/Swoole/Dockerfile index 093cfb8..7857498 100644 --- a/tests/Queue/servers/Swoole/Dockerfile +++ b/tests/Queue/servers/Swoole/Dockerfile @@ -1,4 +1,4 @@ -FROM phpswoole/swoole:php8.1-alpine +FROM phpswoole/swoole:php8.3-alpine RUN apk add autoconf build-base diff --git a/tests/Queue/servers/Swoole/worker.php b/tests/Queue/servers/Swoole/worker.php index 972aba9..3645a1d 100644 --- a/tests/Queue/servers/Swoole/worker.php +++ b/tests/Queue/servers/Swoole/worker.php @@ -7,7 +7,8 @@ use Utopia\Queue\Message; $connection = new Queue\Connection\Redis('redis'); -$adapter = new Queue\Adapter\Swoole($connection, 12, 'swoole'); +$consumer = new Queue\Broker\Redis($connection); +$adapter = new Queue\Adapter\Swoole($consumer, 12, 'swoole'); $server = new Queue\Server($adapter); $server->job() diff --git a/tests/Queue/servers/SwooleRedisCluster/Dockerfile b/tests/Queue/servers/SwooleRedisCluster/Dockerfile index 093cfb8..7857498 100644 --- a/tests/Queue/servers/SwooleRedisCluster/Dockerfile +++ b/tests/Queue/servers/SwooleRedisCluster/Dockerfile @@ -1,4 +1,4 @@ -FROM phpswoole/swoole:php8.1-alpine +FROM phpswoole/swoole:php8.3-alpine RUN apk add autoconf build-base diff --git a/tests/Queue/servers/SwooleRedisCluster/worker.php b/tests/Queue/servers/SwooleRedisCluster/worker.php index 9269807..d120b24 100644 --- a/tests/Queue/servers/SwooleRedisCluster/worker.php +++ b/tests/Queue/servers/SwooleRedisCluster/worker.php @@ -7,7 +7,8 @@ use Utopia\Queue\Message; $connection = new Queue\Connection\RedisCluster(['redis-cluster-0:6379', 'redis-cluster-1:6379', 'redis-cluster-2:6379']); -$adapter = new Queue\Adapter\Swoole($connection, 12, 'swoole-redis-cluster'); +$consumer = new Queue\Broker\Redis($connection); +$adapter = new Queue\Adapter\Swoole($consumer, 12, 'swoole-redis-cluster'); $server = new Queue\Server($adapter); $server->job() diff --git a/tests/Queue/servers/Workerman/Dockerfile b/tests/Queue/servers/Workerman/Dockerfile index 80d7b82..6dd16ab 100644 --- a/tests/Queue/servers/Workerman/Dockerfile +++ b/tests/Queue/servers/Workerman/Dockerfile @@ -1,4 +1,4 @@ -FROM phpswoole/swoole:php8.1-alpine +FROM phpswoole/swoole:php8.3-alpine RUN apk add autoconf build-base diff --git a/tests/Queue/servers/Workerman/worker.php b/tests/Queue/servers/Workerman/worker.php index ffc667e..5a093ec 100644 --- a/tests/Queue/servers/Workerman/worker.php +++ b/tests/Queue/servers/Workerman/worker.php @@ -7,7 +7,8 @@ use Utopia\Queue\Message; $connection = new Queue\Connection\Redis('redis'); -$adapter = new Queue\Adapter\Workerman($connection, 12, 'workerman'); +$consumer = new Queue\Broker\Redis($connection); +$adapter = new Queue\Adapter\Workerman($consumer, 12, 'wokerman'); $server = new Queue\Server($adapter); $server->job() ->inject('message') diff --git a/tests/Queue/servers/tests.php b/tests/Queue/servers/tests.php index f663d86..628bdf0 100644 --- a/tests/Queue/servers/tests.php +++ b/tests/Queue/servers/tests.php @@ -4,7 +4,8 @@ function handleRequest(Queue\Message $job): void { - ['type' => $type, 'value' => $value] = $job->getPayload(); + $type = $job->getPayload()['type']; + $value = $job->getPayload()['value'] ?? null; if (empty($job->getTimestamp())) { throw new Exception();