Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@
},
"require-dev": {
"ext-redis": "*",
"predis/predis": "^1.1",
"predis/predis": "^3.0",
"phpunit/phpunit": "^8 || ^9",
"squizlabs/php_codesniffer": "~3.5",
"php-amqplib/php-amqplib": "^2.6.3",
"php-amqplib/php-amqplib": "^3.0",
"scrutinizer/ocular": "^1.6.0",
"aws/aws-sdk-php": "3.*",
"phpstan/phpstan": "^0.12.65",
"pepakriz/phpstan-exception-rules": "^0.11.3",
"phpstan/phpstan-strict-rules": "^0.12"
"phpstan/phpstan": "^2",
"rector/rector": "^2.2",
"phpstan/phpstan-strict-rules": "^2.0"
},
"suggest": {
"monolog/monolog": "Basic logger implements psr/logger",
Expand All @@ -56,6 +56,7 @@
"phpunit": "phpunit",
"phpstan": "phpstan analyse",
"phpcs": "phpcs --standard=PSR2 src tests examples -n",
"coverage": "XDEBUG_MODE=coverage phpunit --coverage-clover build/logs/clover.xml --coverage-html build/coverage"
"coverage": "XDEBUG_MODE=coverage phpunit --coverage-clover build/logs/clover.xml --coverage-html build/coverage",
"rector": "php vendor/bin/rector"
}
}
3 changes: 2 additions & 1 deletion examples/rabbitmq/emitter.php
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
<?php

declare(strict_types=1);


Expand All @@ -10,7 +11,7 @@
require_once __DIR__.'/../../vendor/autoload.php';

$queueName = 'hermes_queue';
$connection = new AMQPLazyConnection('localhost', '5672', 'guest', 'guest', '/');
$connection = new AMQPLazyConnection('localhost', 5672, 'guest', 'guest', '/');
$driver = new LazyRabbitMqDriver($connection, $queueName);

$emitter = new Emitter($driver);
Expand Down
5 changes: 3 additions & 2 deletions examples/rabbitmq/processor.php
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
<?php

declare(strict_types=1);

use PhpAmqpLib\Connection\AMQPLazyConnection;
use Tomaj\Hermes\Driver\LazyRabbitMqDriver;
use Tomaj\Hermes\Dispatcher;
use Tomaj\Hermes\Driver\LazyRabbitMqDriver;
use Tomaj\Hermes\Handler\EchoHandler;

require_once __DIR__.'/../../vendor/autoload.php';

$queueName = 'hermes_queue';
$connection = new AMQPLazyConnection('localhost', '5672', 'guest', 'guest', '/');
$connection = new AMQPLazyConnection('localhost', 5672, 'guest', 'guest', '/');
$driver = new LazyRabbitMqDriver($connection, $queueName, [], 0);

$dispatcher = new Dispatcher($driver);
Expand Down
36 changes: 8 additions & 28 deletions phpstan.neon
Original file line number Diff line number Diff line change
@@ -1,40 +1,20 @@
includes:
- vendor/pepakriz/phpstan-exception-rules/extension.neon
- vendor/phpstan/phpstan-strict-rules/rules.neon

- vendor/phpstan/phpstan-strict-rules/rules.neon

parameters:
level: max
paths:
- src
- examples
# - tests

treatPhpDocTypesAsCertain: false
checkMissingIterableValueType: true
checkGenericClassInNonGenericObjectType: false
reportUnmatchedIgnoredErrors: false

# Better type checking (compatible with PHPStan 0.12)
checkAlwaysTrueCheckTypeFunctionCall: true
checkAlwaysTrueInstanceof: true
checkAlwaysTrueStrictComparison: true
checkExplicitMixedMissingReturn: true
checkFunctionNameCase: true
checkInternalClassCaseSensitivity: true

# Exception rules
exceptionRules:
reportUnusedCatchesOfUncheckedExceptions: true
reportUnusedCheckedThrowsInSubtypes: false
reportCheckedThrowsInGlobalScope: true
checkedExceptions:
- RuntimeException

# Ignore some legacy driver issues for now
strictRules:
allRules: false

ignoreErrors:
- '#Only booleans are allowed in#'
- '#Call to function in_array\(\) requires parameter \#3 to be set#'
- '#Anonymous function should have native return typehint#'
- '#While loop condition is always true.#'
-
identifier: offsetAccess.nonOffsetAccessible



Expand Down
31 changes: 31 additions & 0 deletions rector.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<?php

declare(strict_types=1);

use Rector\Config\RectorConfig;
use Rector\Set\ValueObject\LevelSetList;
use Rector\Set\ValueObject\SetList;

return RectorConfig::configure()
->withPaths([
__DIR__ . '/src',
__DIR__ . '/tests',
])
->withSkip([
__DIR__ . '/vendor',
\Rector\CodingStyle\Rector\FuncCall\StrictArraySearchRector::class,
\Rector\DeadCode\Rector\FunctionLike\NarrowWideUnionReturnTypeRector::class,
\Rector\CodingStyle\Rector\String_\SimplifyQuoteEscapeRector::class,
\Rector\DeadCode\Rector\Cast\RecastingRemovalRector::class,
\Rector\DeadCode\Rector\Assign\RemoveUnusedVariableAssignRector::class,
\Rector\CodingStyle\Rector\If_\NullableCompareToNullRector::class,
\Rector\CodingStyle\Rector\Assign\SplitDoubleAssignRector::class,
\Rector\DeadCode\Rector\PropertyProperty\RemoveNullPropertyInitializationRector::class,
\Rector\EarlyReturn\Rector\StmtsAwareInterface\ReturnEarlyIfVariableRector::class,
])
->withSets([
LevelSetList::UP_TO_PHP_71,
SetList::DEAD_CODE,
SetList::CODING_STYLE,
SetList::EARLY_RETURN,
]);
44 changes: 11 additions & 33 deletions src/Dispatcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@ class Dispatcher implements DispatcherInterface

/**
* Create new Dispatcher
*
* @param DriverInterface $driver
* @param LoggerInterface|null $logger
* @param ShutdownInterface|null $shutdown
*/
public function __construct(DriverInterface $driver, ?LoggerInterface $logger = null, ?ShutdownInterface $shutdown = null)
{
Expand All @@ -53,9 +49,6 @@ public function __construct(DriverInterface $driver, ?LoggerInterface $logger =
}

/**
* @param MessageInterface $message
* @param int $priority
* @return DispatcherInterface
* @deprecated - use Emitter::emit method instead
*/
public function emit(MessageInterface $message, int $priority = self::DEFAULT_PRIORITY): DispatcherInterface
Expand All @@ -64,7 +57,7 @@ public function emit(MessageInterface $message, int $priority = self::DEFAULT_PR

$this->log(
LogLevel::INFO,
"Dispatcher send message #{$message->getId()} with priority {$priority} to driver " . get_class($this->driver),
sprintf('Dispatcher send message #%s with priority %d to driver ', $message->getId(), $priority) . get_class($this->driver),
$this->messageLoggerContext($message)
);
return $this;
Expand All @@ -78,16 +71,14 @@ public function emit(MessageInterface $message, int $priority = self::DEFAULT_PR
* WARNING! Don't use it on web server calls. Run it only with cli.
*
* @param int[] $priorities
*
* @return void
*/
public function handle(array $priorities = []): void
{
try {
$this->driver->wait(function (MessageInterface $message, int $priority = Dispatcher::DEFAULT_PRIORITY): bool {
$this->log(
LogLevel::INFO,
"Start handle message #{$message->getId()} ({$message->getType()}) priority:{$priority}",
sprintf('Start handle message #%s (%s) priority:%d', $message->getId(), $message->getType(), $priority),
$this->messageLoggerContext($message)
);

Expand All @@ -111,9 +102,7 @@ public function handle(array $priorities = []): void
/**
* Dispatch message
*
* @param MessageInterface $message
*
* @return bool
*/
private function dispatch(MessageInterface $message): bool
{
Expand All @@ -139,10 +128,7 @@ private function dispatch(MessageInterface $message): bool
/**
* Handle given message with given handler
*
* @param HandlerInterface $handler
* @param MessageInterface $message
*
* @return bool
*/
private function handleMessage(HandlerInterface $handler, MessageInterface $message): bool
{
Expand All @@ -156,31 +142,29 @@ private function handleMessage(HandlerInterface $handler, MessageInterface $mess

$this->log(
LogLevel::INFO,
"End handle message #{$message->getId()} ({$message->getType()})",
sprintf('End handle message #%s (%s)', $message->getId(), $message->getType()),
$this->messageLoggerContext($message)
);
} catch (Exception $e) {
} catch (Exception $exception) {
$this->log(
LogLevel::ERROR,
"Handler " . get_class($handler) . " throws exception - {$e->getMessage()}",
['error' => $e, 'message' => $this->messageLoggerContext($message), 'exception' => $e]
"Handler " . get_class($handler) . (' throws exception - ' . $exception->getMessage()),
['error' => $exception, 'message' => $this->messageLoggerContext($message), 'exception' => $exception]
);
if (Debugger::isEnabled()) {
Debugger::log($e, Debugger::EXCEPTION);
Debugger::log($exception, Debugger::EXCEPTION);
}

$this->retryMessage($message, $handler);

$result = false;
}

return $result;
}

/**
* Helper function for sending retrying message back to driver
*
* @param MessageInterface $message
* @param HandlerInterface $handler
*/
private function retryMessage(MessageInterface $message, HandlerInterface $handler): void
{
Expand All @@ -197,21 +181,16 @@ private function retryMessage(MessageInterface $message, HandlerInterface $handl
* Calculate next retry
*
* Inspired by ruby sidekiq (https://github.com/mperham/sidekiq/wiki/Error-Handling#automatic-job-retry)
*
* @param MessageInterface $message
* @return float
*/
private function nextRetry(MessageInterface $message): float
{
return microtime(true) + pow($message->getRetries(), 4) + 15 + (rand(1, 30) * ($message->getRetries() + 1));
return microtime(true) + $message->getRetries() ** 4 + 15 + (random_int(1, 30) * ($message->getRetries() + 1));
}

/**
* Check if actual dispatcher has handler for given type
*
* @param string $type
*
* @return bool
*/
private function hasHandlers(string $type): bool
{
Expand Down Expand Up @@ -239,6 +218,7 @@ public function registerHandlers(string $type, array $handlers): DispatcherInter
foreach ($handlers as $handler) {
$this->registerHandler($type, $handler);
}

return $this;
}

Expand All @@ -259,6 +239,7 @@ public function unregisterHandler(string $type, HandlerInterface $handler): Disp
if (!isset($this->handlers[$type])) {
return $this;
}

$this->handlers[$type] = array_filter(
$this->handlers[$type],
fn(HandlerInterface $registeredHandler): bool => $registeredHandler !== $handler
Expand All @@ -270,7 +251,6 @@ public function unregisterHandler(string $type, HandlerInterface $handler): Disp
/**
* Serialize message to logger context
*
* @param MessageInterface $message
*
* @return array<string, mixed>
*/
Expand All @@ -290,10 +270,8 @@ private function messageLoggerContext(MessageInterface $message): array
* Interal log method wrapper
*
* @param mixed $level
* @param string $message
* @param array<mixed> $context
*
* @return void
*/
private function log($level, string $message, array $context = []): void
{
Expand Down
10 changes: 0 additions & 10 deletions src/DispatcherInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,36 +14,26 @@ interface DispatcherInterface
* This handler will be called in background job when event
* of registered $type will be emitted.
*
* @param string $type
* @param HandlerInterface $handler
*
* @return $this
*/
public function registerHandler(string $type, HandlerInterface $handler): DispatcherInterface;

/**
* Register multiple handlers for same type.
*
* @param string $type
* @param HandlerInterface[] $handler
* @return DispatcherInterface
*/
public function registerHandlers(string $type, array $handler): DispatcherInterface;

/**
* Will unregister all handlers. This method is useful for testing.
*
* @return DispatcherInterface
*/
public function unregisterAllHandlers(): DispatcherInterface;

/**
* Will unregister a specific handler.
*
* @param string $type
* @param HandlerInterface $handler
*
* @return DispatcherInterface
*/
public function unregisterHandler(string $type, HandlerInterface $handler): DispatcherInterface;

Expand Down
Loading