diff --git a/composer.json b/composer.json index 27e775f..08ab44f 100644 --- a/composer.json +++ b/composer.json @@ -25,15 +25,15 @@ }, "require-dev": { "ext-redis": "*", - "predis/predis": "^1.1", + "predis/predis": "^3.0", "phpunit/phpunit": "^8 || ^9", "squizlabs/php_codesniffer": "~3.5", - "php-amqplib/php-amqplib": "^2.6.3", + "php-amqplib/php-amqplib": "^3.0", "scrutinizer/ocular": "^1.6.0", "aws/aws-sdk-php": "3.*", - "phpstan/phpstan": "^0.12.65", - "pepakriz/phpstan-exception-rules": "^0.11.3", - "phpstan/phpstan-strict-rules": "^0.12" + "phpstan/phpstan": "^2", + "rector/rector": "^2.2", + "phpstan/phpstan-strict-rules": "^2.0" }, "suggest": { "monolog/monolog": "Basic logger implements psr/logger", @@ -56,6 +56,7 @@ "phpunit": "phpunit", "phpstan": "phpstan analyse", "phpcs": "phpcs --standard=PSR2 src tests examples -n", - "coverage": "XDEBUG_MODE=coverage phpunit --coverage-clover build/logs/clover.xml --coverage-html build/coverage" + "coverage": "XDEBUG_MODE=coverage phpunit --coverage-clover build/logs/clover.xml --coverage-html build/coverage", + "rector": "php vendor/bin/rector" } } diff --git a/examples/rabbitmq/emitter.php b/examples/rabbitmq/emitter.php index fece74c..1b0d86f 100644 --- a/examples/rabbitmq/emitter.php +++ b/examples/rabbitmq/emitter.php @@ -1,4 +1,5 @@ withPaths([ + __DIR__ . '/src', + __DIR__ . '/tests', + ]) + ->withSkip([ + __DIR__ . '/vendor', + \Rector\CodingStyle\Rector\FuncCall\StrictArraySearchRector::class, + \Rector\DeadCode\Rector\FunctionLike\NarrowWideUnionReturnTypeRector::class, + \Rector\CodingStyle\Rector\String_\SimplifyQuoteEscapeRector::class, + \Rector\DeadCode\Rector\Cast\RecastingRemovalRector::class, + \Rector\DeadCode\Rector\Assign\RemoveUnusedVariableAssignRector::class, + \Rector\CodingStyle\Rector\If_\NullableCompareToNullRector::class, + \Rector\CodingStyle\Rector\Assign\SplitDoubleAssignRector::class, + \Rector\DeadCode\Rector\PropertyProperty\RemoveNullPropertyInitializationRector::class, + \Rector\EarlyReturn\Rector\StmtsAwareInterface\ReturnEarlyIfVariableRector::class, + ]) + ->withSets([ + LevelSetList::UP_TO_PHP_71, + SetList::DEAD_CODE, + SetList::CODING_STYLE, + SetList::EARLY_RETURN, + ]); diff --git a/src/Dispatcher.php b/src/Dispatcher.php index 5990b81..8a795fb 100644 --- a/src/Dispatcher.php +++ b/src/Dispatcher.php @@ -34,10 +34,6 @@ class Dispatcher implements DispatcherInterface /** * Create new Dispatcher - * - * @param DriverInterface $driver - * @param LoggerInterface|null $logger - * @param ShutdownInterface|null $shutdown */ public function __construct(DriverInterface $driver, ?LoggerInterface $logger = null, ?ShutdownInterface $shutdown = null) { @@ -53,9 +49,6 @@ public function __construct(DriverInterface $driver, ?LoggerInterface $logger = } /** - * @param MessageInterface $message - * @param int $priority - * @return DispatcherInterface * @deprecated - use Emitter::emit method instead */ public function emit(MessageInterface $message, int $priority = self::DEFAULT_PRIORITY): DispatcherInterface @@ -64,7 +57,7 @@ public function emit(MessageInterface $message, int $priority = self::DEFAULT_PR $this->log( LogLevel::INFO, - "Dispatcher send message #{$message->getId()} with priority {$priority} to driver " . get_class($this->driver), + sprintf('Dispatcher send message #%s with priority %d to driver ', $message->getId(), $priority) . get_class($this->driver), $this->messageLoggerContext($message) ); return $this; @@ -78,8 +71,6 @@ public function emit(MessageInterface $message, int $priority = self::DEFAULT_PR * WARNING! Don't use it on web server calls. Run it only with cli. * * @param int[] $priorities - * - * @return void */ public function handle(array $priorities = []): void { @@ -87,7 +78,7 @@ public function handle(array $priorities = []): void $this->driver->wait(function (MessageInterface $message, int $priority = Dispatcher::DEFAULT_PRIORITY): bool { $this->log( LogLevel::INFO, - "Start handle message #{$message->getId()} ({$message->getType()}) priority:{$priority}", + sprintf('Start handle message #%s (%s) priority:%d', $message->getId(), $message->getType(), $priority), $this->messageLoggerContext($message) ); @@ -111,9 +102,7 @@ public function handle(array $priorities = []): void /** * Dispatch message * - * @param MessageInterface $message * - * @return bool */ private function dispatch(MessageInterface $message): bool { @@ -139,10 +128,7 @@ private function dispatch(MessageInterface $message): bool /** * Handle given message with given handler * - * @param HandlerInterface $handler - * @param MessageInterface $message * - * @return bool */ private function handleMessage(HandlerInterface $handler, MessageInterface $message): bool { @@ -156,31 +142,29 @@ private function handleMessage(HandlerInterface $handler, MessageInterface $mess $this->log( LogLevel::INFO, - "End handle message #{$message->getId()} ({$message->getType()})", + sprintf('End handle message #%s (%s)', $message->getId(), $message->getType()), $this->messageLoggerContext($message) ); - } catch (Exception $e) { + } catch (Exception $exception) { $this->log( LogLevel::ERROR, - "Handler " . get_class($handler) . " throws exception - {$e->getMessage()}", - ['error' => $e, 'message' => $this->messageLoggerContext($message), 'exception' => $e] + "Handler " . get_class($handler) . (' throws exception - ' . $exception->getMessage()), + ['error' => $exception, 'message' => $this->messageLoggerContext($message), 'exception' => $exception] ); if (Debugger::isEnabled()) { - Debugger::log($e, Debugger::EXCEPTION); + Debugger::log($exception, Debugger::EXCEPTION); } $this->retryMessage($message, $handler); $result = false; } + return $result; } /** * Helper function for sending retrying message back to driver - * - * @param MessageInterface $message - * @param HandlerInterface $handler */ private function retryMessage(MessageInterface $message, HandlerInterface $handler): void { @@ -197,21 +181,16 @@ private function retryMessage(MessageInterface $message, HandlerInterface $handl * Calculate next retry * * Inspired by ruby sidekiq (https://github.com/mperham/sidekiq/wiki/Error-Handling#automatic-job-retry) - * - * @param MessageInterface $message - * @return float */ private function nextRetry(MessageInterface $message): float { - return microtime(true) + pow($message->getRetries(), 4) + 15 + (rand(1, 30) * ($message->getRetries() + 1)); + return microtime(true) + $message->getRetries() ** 4 + 15 + (random_int(1, 30) * ($message->getRetries() + 1)); } /** * Check if actual dispatcher has handler for given type * - * @param string $type * - * @return bool */ private function hasHandlers(string $type): bool { @@ -239,6 +218,7 @@ public function registerHandlers(string $type, array $handlers): DispatcherInter foreach ($handlers as $handler) { $this->registerHandler($type, $handler); } + return $this; } @@ -259,6 +239,7 @@ public function unregisterHandler(string $type, HandlerInterface $handler): Disp if (!isset($this->handlers[$type])) { return $this; } + $this->handlers[$type] = array_filter( $this->handlers[$type], fn(HandlerInterface $registeredHandler): bool => $registeredHandler !== $handler @@ -270,7 +251,6 @@ public function unregisterHandler(string $type, HandlerInterface $handler): Disp /** * Serialize message to logger context * - * @param MessageInterface $message * * @return array */ @@ -290,10 +270,8 @@ private function messageLoggerContext(MessageInterface $message): array * Interal log method wrapper * * @param mixed $level - * @param string $message * @param array $context * - * @return void */ private function log($level, string $message, array $context = []): void { diff --git a/src/DispatcherInterface.php b/src/DispatcherInterface.php index 09389ed..82e194c 100644 --- a/src/DispatcherInterface.php +++ b/src/DispatcherInterface.php @@ -14,36 +14,26 @@ interface DispatcherInterface * This handler will be called in background job when event * of registered $type will be emitted. * - * @param string $type - * @param HandlerInterface $handler * - * @return $this */ public function registerHandler(string $type, HandlerInterface $handler): DispatcherInterface; /** * Register multiple handlers for same type. * - * @param string $type * @param HandlerInterface[] $handler - * @return DispatcherInterface */ public function registerHandlers(string $type, array $handler): DispatcherInterface; /** * Will unregister all handlers. This method is useful for testing. - * - * @return DispatcherInterface */ public function unregisterAllHandlers(): DispatcherInterface; /** * Will unregister a specific handler. * - * @param string $type - * @param HandlerInterface $handler * - * @return DispatcherInterface */ public function unregisterHandler(string $type, HandlerInterface $handler): DispatcherInterface; diff --git a/src/Driver/AmazonSqsDriver.php b/src/Driver/AmazonSqsDriver.php index c0a39e9..14ab3da 100644 --- a/src/Driver/AmazonSqsDriver.php +++ b/src/Driver/AmazonSqsDriver.php @@ -1,13 +1,14 @@ $queueAttributes */ public function __construct(SqsClient $client, string $queueName, array $queueAttributes = []) @@ -78,11 +77,15 @@ public function __construct(SqsClient $client, string $queueName, array $queueAt $this->serializer = new MessageSerializer(); $result = $client->createQueue([ - 'QueueName' => $queueName, + 'QueueName' => $this->queueName, 'Attributes' => $queueAttributes, ]); + $url = $result->get('QueueUrl'); + if (is_string($url) === false) { + throw new \RuntimeException('Cannot get queue URL for queue ' . $this->queueName); + } - $this->queueUrl = $result->get('QueueUrl'); + $this->queueUrl = $url; } /** @@ -98,8 +101,6 @@ public function send(MessageInterface $message, int $priority = Dispatcher::DEFA } /** - * @param string $name - * @param int $priority * * @throws NotSupportedException */ @@ -126,15 +127,16 @@ public function wait(Closure $callback, array $priorities = []): void $messages = $result['Messages']; - if ($messages) { + if (is_array($messages) && $messages) { $hermesMessages = []; foreach ($messages as $message) { $this->client->deleteMessage([ 'QueueUrl' => $this->queueUrl, 'ReceiptHandle' => $message['ReceiptHandle'], ]); - $hermesMessages[] = $this->serializer->unserialize($message['Body']); + $hermesMessages[] = $this->serializer->unserialize(is_string($message['Body']) ? $message['Body'] : ''); } + foreach ($hermesMessages as $hermesMessage) { $callback($hermesMessage); $this->incrementProcessedItems(); diff --git a/src/Driver/DriverInterface.php b/src/Driver/DriverInterface.php index b1132e9..46a1f0b 100644 --- a/src/Driver/DriverInterface.php +++ b/src/Driver/DriverInterface.php @@ -18,13 +18,10 @@ interface DriverInterface * This method has to be as fast as possible because it will be called in * web server threads. * - * @param MessageInterface $message - * @param int $priority * * @throws SerializeException * @throws UnknownPriorityException * - * @return bool */ public function send(MessageInterface $message, int $priority = Dispatcher::DEFAULT_PRIORITY): bool; @@ -34,9 +31,7 @@ public function send(MessageInterface $message, int $priority = Dispatcher::DEFA * In general you messages will be processed based on queue (higher priority will be processed first) * Or you can define dispatcher that will handle only specific priority (queue)s * - * @param string $name * @param int $priority higher priority will be processed first - * * @throws NotSupportedException */ public function setupPriorityQueue(string $name, int $priority): void; @@ -49,13 +44,11 @@ public function setupPriorityQueue(string $name, int $priority): void; * or can be implemented as callback for driver emit method (like rabbitmq or redis pubsub). * When driver receive new message, you have to call $callback with this message like $callback($message) * - * @param Closure $callback * @param int[] $priorities * * @throws UnknownPriorityException * @throws ShutdownException * - * @return void */ public function wait(Closure $callback, array $priorities): void; } diff --git a/src/Driver/LazyRabbitMqDriver.php b/src/Driver/LazyRabbitMqDriver.php index ef4180c..70101d2 100755 --- a/src/Driver/LazyRabbitMqDriver.php +++ b/src/Driver/LazyRabbitMqDriver.php @@ -1,4 +1,5 @@ $amqpMessageProperties - * @param int $refreshInterval - * @param string $consumerTag */ public function __construct(AMQPLazyConnection $connection, string $queue, array $amqpMessageProperties = [], int $refreshInterval = 0, string $consumerTag = 'hermes') { @@ -69,8 +66,6 @@ public function send(MessageInterface $message, int $priority = Dispatcher::DEFA } /** - * @param string $name - * @param int $priority * * @throws NotSupportedException */ @@ -99,6 +94,7 @@ public function wait(Closure $callback, array $priorities = []): void false, false, function ($rabbitMessage) use ($callback) { + /** @var AMQPMessage $rabbitMessage */ $message = $this->serializer->unserialize($rabbitMessage->body); $callback($message); } @@ -110,6 +106,7 @@ function ($rabbitMessage) use ($callback) { if (!$this->shouldProcessNext()) { break 2; } + if ($this->refreshInterval) { sleep($this->refreshInterval); } @@ -122,13 +119,13 @@ function ($rabbitMessage) use ($callback) { /** * @throws \PhpAmqpLib\Exception\AMQPTimeoutException - * @return AMQPChannel */ private function getChannel(): AMQPChannel { if ($this->channel !== null) { return $this->channel; } + $this->channel = $this->connection->channel(); $this->channel->queue_declare($this->queue, false, false, false, false); return $this->channel; diff --git a/src/Driver/MaxItemsTrait.php b/src/Driver/MaxItemsTrait.php index 835a7be..39f6b56 100644 --- a/src/Driver/MaxItemsTrait.php +++ b/src/Driver/MaxItemsTrait.php @@ -30,9 +30,11 @@ public function shouldProcessNext(): bool if ($this->maxProcessItems === 0) { return true; } + if ($this->processed >= $this->maxProcessItems) { return false; } + return true; } } diff --git a/src/Driver/PredisSetDriver.php b/src/Driver/PredisSetDriver.php index c1203f6..9d8fe63 100644 --- a/src/Driver/PredisSetDriver.php +++ b/src/Driver/PredisSetDriver.php @@ -1,4 +1,5 @@ */ - private $queues = []; + private array $queues = []; - /** - * @var string - */ - private $scheduleKey; + private string $scheduleKey; - /** - * @var Client - */ - private $redis; + private Client $redis; - /** - * @var integer - */ - private $refreshInterval; + private int $refreshInterval; /** * Create new PredisSetDriver @@ -45,10 +37,6 @@ class PredisSetDriver implements DriverInterface * Managing connection to redis is up to you and you have to create it outsite * of this class. You will need to install predis php package. * - * @param Client $redis - * @param string $key - * @param integer $refreshInterval - * @param string $scheduleKey * @see examples/redis * * @throws NotSupportedException @@ -77,6 +65,7 @@ public function send(MessageInterface $message, int $priority = Dispatcher::DEFA $key = $this->getKey($priority); $this->redis->sadd($key, [$this->serializer->serialize($message)]); } + return true; } @@ -89,16 +78,15 @@ public function setupPriorityQueue(string $name, int $priority): void } /** - * @param int $priority - * @return string * * @throws UnknownPriorityException */ private function getKey(int $priority): string { if (!isset($this->queues[$priority])) { - throw new UnknownPriorityException("Unknown priority {$priority}"); + throw new UnknownPriorityException('Unknown priority ' . $priority); } + return $this->queues[$priority]; } @@ -119,10 +107,12 @@ public function wait(Closure $callback, array $priorities = []): void } // check schedule - $messagesString = $this->redis->zrangebyscore($this->scheduleKey, '-inf', microtime(true), ['LIMIT' => ['OFFSET' => 0, 'COUNT' => 1]]); + $messagesString = $this->redis->zrangebyscore($this->scheduleKey, '-inf', microtime(false), ['LIMIT' => ['OFFSET' => 0, 'COUNT' => 1]]); if (count($messagesString)) { foreach ($messagesString as $messageString) { + /** @var string $messageString */ $this->redis->zrem($this->scheduleKey, $messageString); + /** @var string $messageString */ $this->send($this->serializer->unserialize($messageString)); } } @@ -131,9 +121,10 @@ public function wait(Closure $callback, array $priorities = []): void $foundPriority = null; foreach ($queues as $priority => $name) { - if (count($priorities) > 0 && !in_array($priority, $priorities)) { + if ($priorities !== [] && !in_array($priority, $priorities)) { continue; } + if ($messageString !== null) { break; } @@ -144,7 +135,7 @@ public function wait(Closure $callback, array $priorities = []): void $foundPriority = $priority; } - if ($messageString !== null) { + if ($messageString !== null && is_string($messageString)) { $message = $this->serializer->unserialize($messageString); $callback($message, $foundPriority); $this->incrementProcessedItems(); diff --git a/src/Driver/RedisSetDriver.php b/src/Driver/RedisSetDriver.php index 8c03acd..84635ee 100644 --- a/src/Driver/RedisSetDriver.php +++ b/src/Driver/RedisSetDriver.php @@ -46,11 +46,6 @@ class RedisSetDriver implements DriverInterface * of this class. You have to have enabled native Redis php extension. * * @see examples/redis - * - * @param Redis $redis - * @param string $key - * @param integer $refreshInterval - * @param string $scheduleKey */ public function __construct(Redis $redis, string $key = 'hermes', int $refreshInterval = 1, string $scheduleKey = 'hermes_schedule') { @@ -75,6 +70,7 @@ public function send(MessageInterface $message, int $priority = Dispatcher::DEFA $key = $this->getKey($priority); $this->redis->sAdd($key, $this->serializer->serialize($message)); } + return true; } @@ -87,16 +83,15 @@ public function setupPriorityQueue(string $name, int $priority): void } /** - * @param int $priority - * @return string * * @throws UnknownPriorityException */ private function getKey(int $priority): string { if (!isset($this->queues[$priority])) { - throw new UnknownPriorityException("Unknown priority {$priority}"); + throw new UnknownPriorityException('Unknown priority ' . $priority); } + return $this->queues[$priority]; } @@ -124,6 +119,7 @@ public function wait(Closure $callback, array $priorities = []): void if (!$messageString) { break; } + $scheduledMessage = $this->serializer->unserialize($messageString); $this->send($scheduledMessage); @@ -136,9 +132,10 @@ public function wait(Closure $callback, array $priorities = []): void $foundPriority = null; foreach ($queues as $priority => $name) { - if (count($priorities) > 0 && !in_array($priority, $priorities)) { + if ($priorities !== [] && !in_array($priority, $priorities)) { continue; } + if ($messageString !== null) { break; } diff --git a/src/Driver/SerializerAwareTrait.php b/src/Driver/SerializerAwareTrait.php index f0edbeb..d457494 100644 --- a/src/Driver/SerializerAwareTrait.php +++ b/src/Driver/SerializerAwareTrait.php @@ -15,9 +15,7 @@ trait SerializerAwareTrait * You can this trait to set serializer from outsite to your driver * if you need your custom serialization for your objects. * - * @param SerializerInterface $serializer * - * @return void */ public function setSerializer(SerializerInterface $serializer): void { diff --git a/src/Emitter.php b/src/Emitter.php index d5a7819..4d8722c 100644 --- a/src/Emitter.php +++ b/src/Emitter.php @@ -15,9 +15,6 @@ class Emitter implements EmitterInterface /** * Create new Dispatcher - * - * @param DriverInterface $driver - * @param LoggerInterface|null $logger */ public function __construct(DriverInterface $driver, ?LoggerInterface $logger = null) { @@ -36,7 +33,7 @@ public function emit(MessageInterface $message, int $priority = Dispatcher::DEFA $this->log( LogLevel::INFO, - "Dispatcher send message #{$message->getId()} to driver " . get_class($this->driver), + sprintf('Dispatcher send message #%s to driver ', $message->getId()) . get_class($this->driver), $this->messageLoggerContext($message) ); return $this; @@ -45,7 +42,6 @@ public function emit(MessageInterface $message, int $priority = Dispatcher::DEFA /** * Serialize message to logger context * - * @param MessageInterface $message * * @return array */ @@ -63,10 +59,8 @@ private function messageLoggerContext(MessageInterface $message): array * Internal log method wrapper * * @param mixed $level - * @param string $message * @param array $context * - * @return void */ private function log($level, string $message, array $context = []): void { diff --git a/src/EmitterInterface.php b/src/EmitterInterface.php index e68832f..e24fdd6 100644 --- a/src/EmitterInterface.php +++ b/src/EmitterInterface.php @@ -10,11 +10,8 @@ interface EmitterInterface /** * Emit new message * - * @param MessageInterface $message - * @param int $priority * * @throws UnknownPriorityException - * @return $this */ public function emit(MessageInterface $message, int $priority = Dispatcher::DEFAULT_PRIORITY): EmitterInterface; } diff --git a/src/Handler/EchoHandler.php b/src/Handler/EchoHandler.php index f9d0d8b..173f937 100644 --- a/src/Handler/EchoHandler.php +++ b/src/Handler/EchoHandler.php @@ -16,7 +16,7 @@ public function handle(MessageInterface $message): bool { echo "Received message: #{$message->getId()} (type {$message->getType()})\n"; $payload = json_encode($message->getPayload()); - echo "Payload: {$payload}\n"; + echo sprintf('Payload: %s%s', $payload, PHP_EOL); return true; } } diff --git a/src/Handler/HandlerInterface.php b/src/Handler/HandlerInterface.php index 8ac2268..178dd91 100644 --- a/src/Handler/HandlerInterface.php +++ b/src/Handler/HandlerInterface.php @@ -15,10 +15,8 @@ interface HandlerInterface * long processing jobs that will be executed by dispatcher. * You have to register all your handlers to Dispatcher for specified types. * - * @param MessageInterface $message * @throws \Exception * - * @return bool */ public function handle(MessageInterface $message): bool; } diff --git a/src/Message.php b/src/Message.php index f373256..c190e52 100644 --- a/src/Message.php +++ b/src/Message.php @@ -25,12 +25,9 @@ class Message implements MessageInterface /** * Native implementation of message. * - * @param string $type * @param array|null $payload - * @param string|null $messageId * @param float|null $created timestamp (microtime(true)) * @param float|null $executeAt timestamp (microtime(true)) - * @param int $retries */ public function __construct(string $type, ?array $payload = null, ?string $messageId = null, ?float $created = null, ?float $executeAt = null, int $retries = 0) { diff --git a/src/MessageInterface.php b/src/MessageInterface.php index 2851d55..a61cb2c 100644 --- a/src/MessageInterface.php +++ b/src/MessageInterface.php @@ -11,15 +11,11 @@ interface MessageInterface * This identifier should be unique all the time. * Recommendation is to use UUIDv4 (Included Message implementation * generating UUIDv4 identifiers) - * - * @return string */ public function getId(): string; /** * Message creation date - microtime(true) - * - * @return float */ public function getCreated(): float; @@ -35,8 +31,6 @@ public function getExecuteAt(): ?float; * * Based on this field, message will be dispatched and will be sent to * appropriate handler. - * - * @return string */ public function getType(): string; @@ -53,8 +47,6 @@ public function getPayload(): ?array; /** * Total retries for message - * - * @return int */ public function getRetries(): int; } diff --git a/src/MessageSerializer.php b/src/MessageSerializer.php index dbd9523..a0f8e16 100644 --- a/src/MessageSerializer.php +++ b/src/MessageSerializer.php @@ -1,4 +1,5 @@ getId()}"); + throw new SerializeException('Cannot serialize message ' . $message->getId()); } + return $result; } @@ -33,28 +35,43 @@ public function unserialize(string $string): MessageInterface { $data = json_decode($string, true); if (!is_array($data) || !isset($data['message'])) { - throw new SerializeException("Cannot unserialize message from '{$string}'"); + throw new SerializeException(sprintf("Cannot unserialize message from '%s'", $string)); } + $message = $data['message']; if (!is_array($message) || !isset($message['type'], $message['id'], $message['created'])) { - throw new SerializeException("Invalid message format in '{$string}'"); + throw new SerializeException(sprintf("Invalid message format in '%s'", $string)); } - + $executeAt = null; if (isset($message['execute_at']) && is_numeric($message['execute_at'])) { $executeAt = (float) $message['execute_at']; } - + $retries = 0; if (isset($message['retries']) && is_numeric($message['retries'])) { $retries = (int) $message['retries']; } - + $payload = null; if (isset($message['payload']) && is_array($message['payload'])) { $payload = $message['payload']; } - - return new Message($message['type'], $payload, $message['id'], (float) $message['created'], $executeAt, $retries); + + if (!is_string($message['type'])) { + throw new SerializeException(sprintf("Invalid message type in '%s'", $string)); + } + + $created = null; + if (is_numeric($message['created'])) { + $created = (float) $message['created']; + } + + $id = null; + if (is_string($message['id'])) { + $id = $message['id']; + } + + return new Message($message['type'], $payload, $id, $created, $executeAt, $retries); } } diff --git a/src/SerializerInterface.php b/src/SerializerInterface.php index 11781a7..7eec8d4 100644 --- a/src/SerializerInterface.php +++ b/src/SerializerInterface.php @@ -11,10 +11,8 @@ interface SerializerInterface * Method is used when serializing $message to driver. * Message has to be serializable to string. * - * @param MessageInterface $message * * @throws SerializeException - * @return string */ public function serialize(MessageInterface $message): string; @@ -24,10 +22,8 @@ public function serialize(MessageInterface $message): string; * Message needs to be un-serialized from input string. * This string will be received from driver. * - * @param string $string * * @throws SerializeException - * @return MessageInterface */ public function unserialize(string $string): MessageInterface; } diff --git a/src/Shutdown/PredisShutdown.php b/src/Shutdown/PredisShutdown.php index 8fffb75..9b9b58a 100644 --- a/src/Shutdown/PredisShutdown.php +++ b/src/Shutdown/PredisShutdown.php @@ -39,6 +39,7 @@ public function shouldShutdown(DateTime $startTime): bool if ($shutdownTime === null) { return false; } + $shutdownTime = (int) $shutdownTime; // do not shutdown if shutdown time is in future diff --git a/src/Shutdown/RedisShutdown.php b/src/Shutdown/RedisShutdown.php index 772c6c6..7ebd1ec 100644 --- a/src/Shutdown/RedisShutdown.php +++ b/src/Shutdown/RedisShutdown.php @@ -1,10 +1,11 @@ redis->get($this->key); - if ($shutdownTime === false) { + if ($shutdownTime === false || !is_numeric($shutdownTime)) { return false; } + $shutdownTime = (int) $shutdownTime; // do not shutdown if shutdown time is in future diff --git a/src/Shutdown/ShutdownInterface.php b/src/Shutdown/ShutdownInterface.php index a6dbf74..42f6647 100644 --- a/src/Shutdown/ShutdownInterface.php +++ b/src/Shutdown/ShutdownInterface.php @@ -12,9 +12,6 @@ interface ShutdownInterface * * You have to return true or false if hermes worker should shutdown. * This method is called from Dispatcher always after messages were processed - * - * @param DateTime $startTime - * @return bool */ public function shouldShutdown(DateTime $startTime): bool; @@ -24,7 +21,6 @@ public function shouldShutdown(DateTime $startTime): bool; * This function performs necessary operations required to shutdown Hermes through used implementation. * * @param DateTime|null $shutdownTime (Optional) DateTime when should be Hermes shutdown. If null, current datetime should be used. - * @return bool */ public function shutdown(?DateTime $shutdownTime = null): bool; } diff --git a/tests/Driver/AmazonSqsDriverTest.php b/tests/Driver/AmazonSqsDriverTest.php index 42efb8b..58bc58d 100644 --- a/tests/Driver/AmazonSqsDriverTest.php +++ b/tests/Driver/AmazonSqsDriverTest.php @@ -126,7 +126,7 @@ public function testSendMessage(): void private function createMockResult(array $data): object { - $result = new class($data) { + return new class($data) { private $data; public function __construct($data) @@ -139,7 +139,5 @@ public function get($key) return $this->data[$key] ?? null; } }; - - return $result; } } diff --git a/tests/Driver/DummyDriver.php b/tests/Driver/DummyDriver.php index 2fdfff6..430030c 100644 --- a/tests/Driver/DummyDriver.php +++ b/tests/Driver/DummyDriver.php @@ -45,16 +45,15 @@ public function __construct(array $events = []) } /** - * @param string $event - * @param int $priority * * @throws UnknownPriorityException */ private function addEvent(string $event, int $priority): void { if (!isset($this->events[$priority])) { - throw new UnknownPriorityException("Unknown priority {$priority} - you have to setupPriorityQueue before"); + throw new UnknownPriorityException(sprintf('Unknown priority %d - you have to setupPriorityQueue before', $priority)); } + $this->events[$priority][] = $event; } @@ -88,11 +87,11 @@ public function getMessage(): ?MessageInterface if ($message === null) { return null; } + return $this->serializer->unserialize($message); } /** - * @param Closure $callback * @param int[] $priorities */ public function wait(Closure $callback, array $priorities = []): void @@ -104,6 +103,7 @@ public function wait(Closure $callback, array $priorities = []): void if (!$this->shouldProcessNext()) { return; } + $message = $this->serializer->unserialize($event); $this->waitResult = $callback($message); $this->incrementProcessedItems(); diff --git a/tests/EmitTest.php b/tests/EmitTest.php index 9f8e227..11dba9d 100644 --- a/tests/EmitTest.php +++ b/tests/EmitTest.php @@ -1,12 +1,13 @@ getMessage(); $this->assertNotNull($message); - if ($message !== null) { - $this->assertEquals('event-type', $message->getType()); - $this->assertEquals(['content'], $message->getPayload()); - } + $this->assertEquals('event-type', $message->getType()); + $this->assertEquals(['content'], $message->getPayload()); } } diff --git a/tests/HandleTest.php b/tests/HandleTest.php index c2005e1..abae863 100644 --- a/tests/HandleTest.php +++ b/tests/HandleTest.php @@ -215,13 +215,14 @@ public function testEmitMethod(): void { $driver = new DummyDriver([]); $driver->setupPriorityQueue('high', 200); + $dispatcher = new Dispatcher($driver); $message = new Message('test-event', ['test' => 'data']); $result = $dispatcher->emit($message, 200); $this->assertSame($dispatcher, $result); - + // Check that the message was sent via the driver $sentMessage = $driver->getMessage(); $this->assertNotNull($sentMessage); @@ -255,7 +256,7 @@ public function testRetryMessage(): void // Create a handler that uses RetryTrait $handler = new class implements \Tomaj\Hermes\Handler\HandlerInterface { use \Tomaj\Hermes\Handler\RetryTrait; - + public function handle(\Tomaj\Hermes\MessageInterface $message): bool { // Throw exception to trigger retry @@ -274,16 +275,17 @@ public function handle(\Tomaj\Hermes\MessageInterface $message): bool // Need to check both messages that might be in the queue $message1 = $driver->getMessage(); $message2 = $driver->getMessage(); - + // One should be the retry message with retries=6 $foundRetryMessage = false; if ($message1 && $message1->getRetries() === 6) { $foundRetryMessage = true; } + if ($message2 && $message2->getRetries() === 6) { $foundRetryMessage = true; } - + $this->assertTrue($foundRetryMessage, 'Should have found a retry message with retries=6'); } @@ -292,12 +294,12 @@ public function testRetryMessageMaxRetriesReached(): void // Create a handler that uses RetryTrait with custom maxRetry $handler = new class implements \Tomaj\Hermes\Handler\HandlerInterface { use \Tomaj\Hermes\Handler\RetryTrait; - + public function maxRetry(): int { return 5; // Lower max retry for this test } - + public function handle(\Tomaj\Hermes\MessageInterface $message): bool { // Throw exception to trigger retry check @@ -316,16 +318,17 @@ public function handle(\Tomaj\Hermes\MessageInterface $message): bool // But the original message should still be there $message1 = $driver->getMessage(); $message2 = $driver->getMessage(); - + // Should not find any message with retries=6 (no retry should have been sent) $foundRetryMessage = false; if ($message1 && $message1->getRetries() === 6) { $foundRetryMessage = true; } + if ($message2 && $message2->getRetries() === 6) { $foundRetryMessage = true; } - + $this->assertFalse($foundRetryMessage, 'Should not have found a retry message with retries=6'); } @@ -351,16 +354,17 @@ public function handle(\Tomaj\Hermes\MessageInterface $message): bool // But the original message should still be there $message1 = $driver->getMessage(); $message2 = $driver->getMessage(); - + // Should not find any message with retries=6 (no retry should have been sent) $foundRetryMessage = false; if ($message1 && $message1->getRetries() === 6) { $foundRetryMessage = true; } + if ($message2 && $message2->getRetries() === 6) { $foundRetryMessage = true; } - + $this->assertFalse($foundRetryMessage, 'Should not have found a retry message with retries=6'); } } diff --git a/tests/Logger/LoggerTest.php b/tests/Logger/LoggerTest.php index b7b576f..7fef5e1 100644 --- a/tests/Logger/LoggerTest.php +++ b/tests/Logger/LoggerTest.php @@ -57,9 +57,9 @@ public function testHandlerLogger(): void $priority = Dispatcher::DEFAULT_PRIORITY; $this->assertEquals('info', $logs[0]['level']); - $this->assertEquals("Start handle message #{$message1->getId()} ({$message1->getType()}) priority:{$priority}", $logs[0]['message']); + $this->assertEquals(sprintf('Start handle message #%s (%s) priority:%d', $message1->getId(), $message1->getType(), $priority), $logs[0]['message']); $this->assertEquals('info', $logs[1]['level']); - $this->assertEquals("End handle message #{$message1->getId()} ({$message1->getType()})", $logs[1]['message']); + $this->assertEquals(sprintf('End handle message #%s (%s)', $message1->getId(), $message1->getType()), $logs[1]['message']); } } diff --git a/tests/MessageSerializerTest.php b/tests/MessageSerializerTest.php index 97cb330..775c00e 100644 --- a/tests/MessageSerializerTest.php +++ b/tests/MessageSerializerTest.php @@ -1,4 +1,5 @@ expectException(SerializeException::class); $this->expectExceptionMessage("Cannot unserialize message from 'invalid json'"); - + $serializer->unserialize('invalid json'); } - + public function testSerializeWithNonSerializableData(): void { $serializer = new MessageSerializer(); - + // Create a message with non-serializable data (like a resource) $resource = fopen('php://memory', 'r'); $message = $this->createMock(\Tomaj\Hermes\MessageInterface::class); @@ -35,32 +36,42 @@ public function testSerializeWithNonSerializableData(): void $message->method('getCreated')->willReturn(microtime(true)); $message->method('getExecuteAt')->willReturn(null); $message->method('getRetries')->willReturn(0); - + $this->expectException(SerializeException::class); $this->expectExceptionMessage("Cannot serialize message test-id"); - + $serializer->serialize($message); - + fclose($resource); } - + public function testUnserializeWithMissingMessageKey(): void { $serializer = new MessageSerializer(); - + $this->expectException(SerializeException::class); $this->expectExceptionMessage("Cannot unserialize message from '{\"data\":\"test\"}'"); - + $serializer->unserialize('{"data":"test"}'); } - + public function testUnserializeWithInvalidMessageFormat(): void { $serializer = new MessageSerializer(); - + $this->expectException(SerializeException::class); $this->expectExceptionMessage("Invalid message format in '{\"message\":{\"type\":\"test\"}}'"); - + $serializer->unserialize('{"message":{"type":"test"}}'); } + + public function testUnserializeWithInvalidTypeFormat(): void + { + $serializer = new MessageSerializer(); + + $this->expectException(SerializeException::class); + $this->expectExceptionMessage("Invalid message type in '{\"message\":{\"type\": 123, \"id\":\"test-id\", \"created\":1620000000}}'"); + + $serializer->unserialize('{"message":{"type": 123, "id":"test-id", "created":1620000000}}'); + } } diff --git a/tests/Shutdown/DispatcherShutdownTest.php b/tests/Shutdown/DispatcherShutdownTest.php index d733513..043c785 100644 --- a/tests/Shutdown/DispatcherShutdownTest.php +++ b/tests/Shutdown/DispatcherShutdownTest.php @@ -49,6 +49,7 @@ public function testEmitWithDummyDriverWithShutdown(): void $driver = new DummyDriver([$message1, $message2]); $stopShutdown = new StopShutdown(); $stopShutdown->shutdown(new \DateTime()); + $dispatcher = new Dispatcher($driver, null, $stopShutdown); $handler = new TestHandler(); diff --git a/tests/Shutdown/SharedFileShutdownTest.php b/tests/Shutdown/SharedFileShutdownTest.php index 9d1aa8a..4a88e1b 100644 --- a/tests/Shutdown/SharedFileShutdownTest.php +++ b/tests/Shutdown/SharedFileShutdownTest.php @@ -104,7 +104,7 @@ public function testShutdownWithNullTime(): void $shutdown = new SharedFileShutdown($this->tempFile); $beforeTime = time(); - $result = $shutdown->shutdown(null); + $result = $shutdown->shutdown(); $afterTime = time(); $this->assertTrue($result); diff --git a/tests/Shutdown/StopShutdown.php b/tests/Shutdown/StopShutdown.php index c4288af..99e0335 100644 --- a/tests/Shutdown/StopShutdown.php +++ b/tests/Shutdown/StopShutdown.php @@ -16,9 +16,11 @@ public function shouldShutdown(DateTime $startTime): bool if (!isset(self::$eventsStop)) { return false; } + if (self::$eventsStop === 1) { return true; } + self::$eventsStop--; return false; } diff --git a/tests/SimpleCoverageImprovements.php b/tests/SimpleCoverageImprovements.php index 10250f8..7756f4a 100644 --- a/tests/SimpleCoverageImprovements.php +++ b/tests/SimpleCoverageImprovements.php @@ -46,38 +46,38 @@ public function testRedisSetDriverScheduledMessageHandling(): void ->disableOriginalConstructor() ->onlyMethods(['zAdd', 'sAdd', 'zRangeByScore', 'zRem']) ->getMock(); - + $driver = new RedisSetDriver($redis, 'test_queue'); $serializer = new MessageSerializer(); $driver->setSerializer($serializer); - + // Test scheduled message (future) $futureTime = microtime(true) + 3600; $scheduledMessage = new Message('scheduled.task', ['data' => 'future'], 'sched1', microtime(true), $futureTime); - + $redis->expects($this->once()) ->method('zAdd') ->with('hermes_schedule', $futureTime, $this->anything()) ->willReturn(1); - + $this->assertTrue($driver->send($scheduledMessage)); - + // Test scheduled message processing $processedMessages = []; $redis->method('zRangeByScore') ->willReturn(['serialized_message_data']); - + $redis->method('zRem') ->willReturn(1); - + $callback = function (MessageInterface $message) use (&$processedMessages) { $processedMessages[] = $message; return true; }; - + // This should call wait() which processes scheduled messages $driver->wait($callback); - + $this->assertTrue(true); // Just verify no exceptions } @@ -171,32 +171,32 @@ public function testDispatcherUnregisterMethods(): void { $driver = new DummyDriver([]); $dispatcher = new Dispatcher($driver); - + $handler1 = new class implements HandlerInterface { public function handle(MessageInterface $message): bool { return true; } }; - + $handler2 = new class implements HandlerInterface { public function handle(MessageInterface $message): bool { return true; } }; - + // Register handlers $dispatcher->registerHandler('test.event', $handler1); $dispatcher->registerHandler('test.event', $handler2); $dispatcher->registerHandler('other.event', $handler1); - + // Test unregisterHandler $dispatcher->unregisterHandler('test.event', $handler1); - + // Test unregisterAllHandlers - $dispatcher->unregisterAllHandlers('test.event'); - + $dispatcher->unregisterAllHandlers(); + $this->assertTrue(true); // No exceptions }