Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 16 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,15 @@ change to start working with the queue:
## Differences to yii2-queue

If you have experience with `yiisoft/yii2-queue`, you will find out that this package is similar.
Though, there are some key differences which are described in the "[migrating from yii2-queue](docs/guide/migrating-from-yii2-queue.md)" article.
Though, there are some key differences that are described in the "[migrating from yii2-queue](docs/guide/migrating-from-yii2-queue.md)"
article.

## General usage

Each queue task consists of two parts:

1. A message is a class implementing `MessageInterface`. For simple cases you can use the default implementation,
`Yiisoft\Queue\Message\Message`. For more complex cases you should implement the interface by your own.
`Yiisoft\Queue\Message\Message`. For more complex cases, you should implement the interface by your own.
2. A message handler is a callable called by a `Yiisoft\Queue\Worker\Worker`. The handler handles each queue message.

For example, if you need to download and save a file, your message creation may look like the following:
Expand Down Expand Up @@ -103,7 +104,7 @@ $worker = new \Yiisoft\Queue\Worker\Worker(
);
```

There is the way to run all the messages that are already in the queue, and then exit:
There is a way to run all the messages that are already in the queue, and then exit:

```php
$queue->run(); // this will execute all the existing messages
Expand Down Expand Up @@ -147,7 +148,7 @@ To use a custom handler name before message push, you can pass it as the first a
new Message('handler-name', $data);
```

To use a custom handler name on message consume, you should configure handler mapping for the `Worker` class:
To use a custom handler name on message consumption, you should configure handler mapping for the `Worker` class:
```php
$worker = new \Yiisoft\Queue\Worker\Worker(
['handler-name' => FooHandler::class],
Expand Down Expand Up @@ -175,7 +176,7 @@ Out of the box, there are four implementations of the `QueueProviderInterface`:

### `AdapterFactoryQueueProvider`

Provider based on definition of channel-specific adapters. Definitions are passed in
Provider based on the definition of channel-specific adapters. Definitions are passed in
the `$definitions` constructor parameter of the factory, where keys are channel names and values are definitions
for the [`Yiisoft\Factory\Factory`](https://github.com/yiisoft/factory). Below are some examples:

Expand All @@ -192,7 +193,7 @@ use Yiisoft\Queue\Adapter\SynchronousAdapter;
]
```

For more information about a definition formats available see the [factory](https://github.com/yiisoft/factory) documentation.
For more information about the definition formats available, see the [factory](https://github.com/yiisoft/factory) documentation.

### `PrototypeQueueProvider`

Expand Down Expand Up @@ -226,14 +227,14 @@ yii queue:listen

See the documentation for more details about adapter specific console commands and their options.

The component also has the ability to track the status of a job which was pushed into queue.
The component can also track the status of a job which was pushed into queue.

For more details see [the guide](docs/guide/en/README.md).
For more details, see [the guide](docs/guide/en/README.md).

## Middleware pipelines

Any message pushed to a queue or consumed from it passes through two different middleware pipelines: one pipeline
on message push and another - on message consume. The process is the same as for the HTTP request, but it is executed
on message push and another - on a message consume. The process is the same as for the HTTP request, but it is executed
twice for a queue message. That means you can add extra functionality on message pushing and consuming with configuration
of the two classes: `PushMiddlewareDispatcher` and `ConsumeMiddlewareDispatcher` respectively.

Expand Down Expand Up @@ -262,13 +263,13 @@ graph LR
ConsumeMiddleware1[$middleware1] -.-> EndConsume((End))
```

### Push pipeline
### Push a pipeline

When you push a message, you can use middlewares to modify both message and queue adapter.
With message modification you can add extra data, obfuscate data, collect metrics, etc.
With queue adapter modification you can redirect message to another queue, delay message consuming, and so on.
With queue adapter modification you can redirect the message to another queue, delay message consuming, and so on.

To use this feature you have to create a middleware class, which implements `MiddlewarePushInterface`, and
To use this feature, you have to create a middleware class, which implements `MiddlewarePushInterface`, and
return a modified `PushRequest` object from the `processPush` method:

```php
Expand Down Expand Up @@ -297,16 +298,16 @@ along with them.

### Consume pipeline

You can set a middleware pipeline for a message when it will be consumed from a queue server. This is useful to collect metrics, modify message data, etc. In pair with a Push middleware you can deduplicate messages in the queue, calculate time from push to consume, handle errors (push to a queue again, redirect failed message to another queue, send a notification, etc.). Unless Push pipeline, you have only one place to define the middleware stack: in the `ConsumeMiddlewareDispatcher`, either in the constructor, or in the `withMiddlewares()` method. If you use [yiisoft/config](yiisoft/config), you can add middleware to the `middlewares-consume` key of the `yiisoft/queue` array in the `params`.
You can set a middleware pipeline for a message when it will be consumed from a queue server. This is useful to collect metrics, modify message data, etc. In a pair with a Push middleware you can deduplicate messages in the queue, calculate time from push to consume, handle errors (push to a queue again, redirect failed message to another queue, send a notification, etc.). Except push pipeline, you have only one place to define the middleware stack: in the `ConsumeMiddlewareDispatcher`, either in the constructor, or in the `withMiddlewares()` method. If you use [yiisoft/config](yiisoft/config), you can add middleware to the `middlewares-consume` key of the `yiisoft/queue` array in the `params`.

### Error handling pipeline

Often when some job is failing, we want to retry its execution a couple more times or redirect it to another queue channel. This can be done in `yiisoft/queue` with Failure middleware pipeline. They are triggered each time message processing via the Consume middleware pipeline is interrupted with any `Throwable`. The key differences from the previous two pipelines:
Often when some job is failing, we want to retry its execution a couple more times or redirect it to another queue channel. This can be done in `yiisoft/queue` with a Failure middleware pipeline. They are triggered each time message processing via the Consume middleware pipeline is interrupted with any `Throwable`. The key differences from the previous two pipelines:

- You should set up the middleware pipeline separately for each queue channel. That means, the format should be `['channel-name' => [FooMiddleware::class]]` instead of `[FooMiddleware::class]`, like for the other two pipelines. There is also a default key, which will be used for those channels without their own one: `FailureMiddlewareDispatcher::DEFAULT_PIPELINE`.
- The last middleware will throw the exception, which will come with the `FailureHandlingRequest` object. If you don't want the exception to be thrown, your middlewares should `return` a request without calling `$handler->handleFailure()`.

You can declare error handling middleware pipeline in the `FailureMiddlewareDispatcher`, either in the constructor, or in the `withMiddlewares()` method. If you use [yiisoft/config](yiisoft/config), you can add middleware to the `middlewares-fail` key of the `yiisoft/queue` array in the `params`.
You can declare error handling a middleware pipeline in the `FailureMiddlewareDispatcher`, either in the constructor, or in the `withMiddlewares()` method. If you use [yiisoft/config](yiisoft/config), you can add middleware to the `middlewares-fail` key of the `yiisoft/queue` array in the `params`.

See [error handling docs](docs/guide/error-handling.md) for details.

Expand Down
6 changes: 3 additions & 3 deletions docs/guide/en/error-handling.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Often when some message handling is failing, we want to retry its execution a co

## Configuration

Here below is configuration via `yiisoft/config`. If you don't use it - you should add middleware definition list (in the `middlewares-fail` key here) to the `FailureMiddlewareDispatcher` by your own.
Here below is configuration via `yiisoft/config`. If you don't use it, you should add a middleware definition list (in the `middlewares-fail` key here) to the `FailureMiddlewareDispatcher` by your own.

Configuration should be passed to the `yiisoft/queue.fail-strategy-pipelines` key of the `params` config to work with the `yiisoft/config`. You can define different failure handling pipelines for each queue channel. Let's see and describe an example:

Expand Down Expand Up @@ -39,10 +39,10 @@ Configuration should be passed to the `yiisoft/queue.fail-strategy-pipelines` ke
]
```

Keys here except `FailureMiddlewareDispatcher::DEFAULT_PIPELINE` are queue channel names, and values are lists of `FailureMiddlewareInterface` definitions. `FailureMiddlewareDispatcher::DEFAULT_PIPELINE` defines a default pipeline to apply to channels without explicitly defined failure strategy pipeline. Each middleware definition must be one of:
Keys here except `FailureMiddlewareDispatcher::DEFAULT_PIPELINE` are queue channel names, and values are lists of `FailureMiddlewareInterface` definitions. `FailureMiddlewareDispatcher::DEFAULT_PIPELINE` defines a default pipeline to apply to channels without an explicitly defined failure strategy pipeline. Each middleware definition must be one of:
- A ready-to-use `MiddlewareFailureInterface` object like `new FooMiddleware()`.
- A valid definition for the [yiisoft/definitions](https://github.com/yiisoft/definitions). It must describe an object, implementing the `MiddlewareFailureInterface`.
- A callable: `fn() => // do stuff`, `$object->foo(...)`, etc. It will be executed through the `yiisoft/injector`, so all the dependencies of your callable will be resolved. You can also define a "callable-looking" array, where object will be instantiated with a DI container: `[FooMiddleware::class, 'handle']`.
- A callable: `fn() => // do stuff`, `$object->foo(...)`, etc. It will be executed through the `yiisoft/injector`, so all the dependencies of your callable will be resolved. You can also define a "callable-looking" array, where an object will be instantiated with a DI container: `[FooMiddleware::class, 'handle']`.
- A string for your DI container to resolve the middleware, e.g. `FooMiddleware::class`.

In the example above failures will be handled this way (look the concrete middleware description below):
Expand Down
12 changes: 6 additions & 6 deletions docs/guide/en/migrating-from-yii2-queue.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# Migrating from yii2-queue

This package is similar to [yiisoft/yii2-queue] but with improved design and code style. The new package less coupled
This package is similar to [yiisoft/yii2-queue] but with improved design and code style. The new package is less coupled
and more structured than the old one allowing better maintenance.

## Adapters

- Individual adapters are now separate packages. This means each adapter must be `require`d with composer in order to
- Individual adapters are now separate packages. This means each adapter must be `require`d with composer to
be available in your application.
- Adapter may be any class which implements `AdapterInterface`. This means you can replace one adapter with another without
changing any code in your app. For example, you can use `db` adapter in development while using `amqp` in production,
Expand All @@ -15,11 +15,11 @@ and more structured than the old one allowing better maintenance.
## Jobs (Messages and Handlers)

There was a concept in [yiisoft/yii2-queue] called `Job`: you had to push it to the queue, and it was executed after
being consumed. In the new package it is divided into two different concepts: a message and a handler.
being consumed. In the new package, it is divided into two different concepts: a message and a handler.

- A `Message` is a class implementing `MessageInterface`. It contains 2 types of data:
- Name. Worker uses it to find the right handler for a message.
- Data. Any serializable data which should be used by the message handler.
- A `Message` is a class implementing `MessageInterface`. It contains two types of data:
- Name. The worker uses it to find the right handler for a message.
- Data. Any serializable data that should be used by the message handler.

All the message data is fully serializable (that means message `data` must be serializable too). It allows you to
freely choose where and how to send and process jobs. Both can be implemented in a single application, or
Expand Down
6 changes: 3 additions & 3 deletions docs/guide/en/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ See also the documentation for concrete adapters ([synchronous adapter](adapter-
## Usage

Each job sent to the queue should be defined as a separate class.
For example, if you need to download and save a file the class may look like the following:
For example, if you need to download and save a file, the class may look like the following:

```php
$data = [
Expand Down Expand Up @@ -54,7 +54,7 @@ To push a job into the queue that should run after 5 minutes:
## Queue handling

The exact way how a job is executed depends on the adapter used. Most adapters can be run using
console commands, which the component registers in your application. For more details check the respective
console commands, which the component registers in your application. For more details, check the respective
adapter documentation.


Expand All @@ -79,7 +79,7 @@ $status->isDone($id);

## Limitations

When using queues it is important to remember that tasks are put into and obtained from the queue in separate
When using queues, it is important to remember that tasks are put into and obtained from the queue in separate
processes. Therefore, avoid external dependencies when executing a task if you're not sure if they are available in
the environment where the worker does its job.

Expand Down
12 changes: 6 additions & 6 deletions docs/guide/en/worker.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Configuration

To use a worker, you should resolve its dependencies (e.g. through DI container) and define handlers for each message
which will be consumed by this worker;
To use a worker, you should resolve its dependencies (e.g., through DI container) and define handlers for each message
that will be consumed by this worker;

Handlers are callables indexed by payload names. When a message is consumed from the queue, a callable associated with
its payload name is called.
Expand All @@ -10,7 +10,7 @@ its payload name is called.

Handler can be any callable with a couple of additions:

- If handler is provided as an array of two strings, it will be treated as a DI container service id and its method.
- If a handler is provided as an array of two strings, it will be treated as a DI container service id and its method.
E.g. `[ClassName::class, 'handle']` will be resolved to:
```php
$container
Expand Down Expand Up @@ -69,7 +69,7 @@ stdout_logfile=/var/www/my_project/log/yii-queue-worker.log
In this case Supervisor should start 4 `queue:listen` workers. The worker output will be written
to the specified log file.

For more info about Supervisor's configuration and usage see its [documentation](http://supervisord.org).
For more info about Supervisor's configuration and usage, see its [documentation](http://supervisord.org).

### Systemd

Expand All @@ -96,7 +96,7 @@ Restart=on-failure
WantedBy=multi-user.target
```

You need to reload systemd in order to re-read its configuration:
You need to reload systemd to re-read its configuration:

```shell
systemctl daemon-reload
Expand Down Expand Up @@ -133,4 +133,4 @@ Config example:
* * * * * /usr/bin/php /var/www/my_project/yii queue:run
```

In this case cron will run the command every minute.
In this case, cron will run the command every minute.
4 changes: 2 additions & 2 deletions src/Adapter/SynchronousAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ final class SynchronousAdapter implements AdapterInterface
private string $channel;

public function __construct(
private WorkerInterface $worker,
private QueueInterface $queue,
private readonly WorkerInterface $worker,
private readonly QueueInterface $queue,
string|BackedEnum $channel = QueueInterface::DEFAULT_CHANNEL,
) {
$this->channel = ChannelNormalizer::normalize($channel);
Expand Down
4 changes: 2 additions & 2 deletions src/Cli/SignalLoop.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,18 @@

/**
* @param int $memorySoftLimit Soft RAM limit in bytes. The loop won't let you continue to execute the program if
* soft limit is reached. Zero means no limit.
* soft limit is reached. Zero means no limit.
*/
public function __construct(protected int $memorySoftLimit = 0)
{
foreach (self::SIGNALS_EXIT as $signal) {
pcntl_signal($signal, fn () => $this->exit = true);

Check warning on line 36 in src/Cli/SignalLoop.php

View workflow job for this annotation

GitHub Actions / mutation / PHP 8.4-ubuntu-latest

Escaped Mutant for Mutator "FunctionCallRemoval": @@ @@ public function __construct(protected int $memorySoftLimit = 0) { foreach (self::SIGNALS_EXIT as $signal) { - pcntl_signal($signal, fn() => $this->exit = true); + } foreach (self::SIGNALS_SUSPEND as $signal) { pcntl_signal($signal, fn() => $this->pause = true);

Check warning on line 36 in src/Cli/SignalLoop.php

View workflow job for this annotation

GitHub Actions / mutation / PHP 8.4-ubuntu-latest

Escaped Mutant for Mutator "TrueValue": @@ @@ public function __construct(protected int $memorySoftLimit = 0) { foreach (self::SIGNALS_EXIT as $signal) { - pcntl_signal($signal, fn() => $this->exit = true); + pcntl_signal($signal, fn() => $this->exit = false); } foreach (self::SIGNALS_SUSPEND as $signal) { pcntl_signal($signal, fn() => $this->pause = true);
}
foreach (self::SIGNALS_SUSPEND as $signal) {
pcntl_signal($signal, fn () => $this->pause = true);

Check warning on line 39 in src/Cli/SignalLoop.php

View workflow job for this annotation

GitHub Actions / mutation / PHP 8.4-ubuntu-latest

Escaped Mutant for Mutator "FunctionCallRemoval": @@ @@ pcntl_signal($signal, fn() => $this->exit = true); } foreach (self::SIGNALS_SUSPEND as $signal) { - pcntl_signal($signal, fn() => $this->pause = true); + } foreach (self::SIGNALS_RESUME as $signal) { pcntl_signal($signal, fn() => $this->pause = false);

Check warning on line 39 in src/Cli/SignalLoop.php

View workflow job for this annotation

GitHub Actions / mutation / PHP 8.4-ubuntu-latest

Escaped Mutant for Mutator "TrueValue": @@ @@ pcntl_signal($signal, fn() => $this->exit = true); } foreach (self::SIGNALS_SUSPEND as $signal) { - pcntl_signal($signal, fn() => $this->pause = true); + pcntl_signal($signal, fn() => $this->pause = false); } foreach (self::SIGNALS_RESUME as $signal) { pcntl_signal($signal, fn() => $this->pause = false);
}
foreach (self::SIGNALS_RESUME as $signal) {
pcntl_signal($signal, fn () => $this->pause = false);

Check warning on line 42 in src/Cli/SignalLoop.php

View workflow job for this annotation

GitHub Actions / mutation / PHP 8.4-ubuntu-latest

Escaped Mutant for Mutator "FunctionCallRemoval": @@ @@ pcntl_signal($signal, fn() => $this->pause = true); } foreach (self::SIGNALS_RESUME as $signal) { - pcntl_signal($signal, fn() => $this->pause = false); + } } /**

Check warning on line 42 in src/Cli/SignalLoop.php

View workflow job for this annotation

GitHub Actions / mutation / PHP 8.4-ubuntu-latest

Escaped Mutant for Mutator "FalseValue": @@ @@ pcntl_signal($signal, fn() => $this->pause = true); } foreach (self::SIGNALS_RESUME as $signal) { - pcntl_signal($signal, fn() => $this->pause = false); + pcntl_signal($signal, fn() => $this->pause = true); } } /**
}
}

Expand All @@ -59,10 +59,10 @@

protected function dispatchSignals(): bool
{
pcntl_signal_dispatch();

Check warning on line 62 in src/Cli/SignalLoop.php

View workflow job for this annotation

GitHub Actions / mutation / PHP 8.4-ubuntu-latest

Escaped Mutant for Mutator "FunctionCallRemoval": @@ @@ } protected function dispatchSignals(): bool { - pcntl_signal_dispatch(); + // Wait for resume signal until the loop is suspended while ($this->pause && !$this->exit) { usleep(10000);

// Wait for resume signal until loop is suspended
// Wait for resume signal until the loop is suspended
while ($this->pause && !$this->exit) {

Check warning on line 65 in src/Cli/SignalLoop.php

View workflow job for this annotation

GitHub Actions / mutation / PHP 8.4-ubuntu-latest

Escaped Mutant for Mutator "While_": @@ @@ { pcntl_signal_dispatch(); // Wait for resume signal until the loop is suspended - while ($this->pause && !$this->exit) { + while (false) { usleep(10000); pcntl_signal_dispatch(); }

Check warning on line 65 in src/Cli/SignalLoop.php

View workflow job for this annotation

GitHub Actions / mutation / PHP 8.4-ubuntu-latest

Escaped Mutant for Mutator "LogicalAndAllSubExprNegation": @@ @@ { pcntl_signal_dispatch(); // Wait for resume signal until the loop is suspended - while ($this->pause && !$this->exit) { + while (!$this->pause && $this->exit) { usleep(10000); pcntl_signal_dispatch(); }

Check warning on line 65 in src/Cli/SignalLoop.php

View workflow job for this annotation

GitHub Actions / mutation / PHP 8.4-ubuntu-latest

Escaped Mutant for Mutator "LogicalNot": @@ @@ { pcntl_signal_dispatch(); // Wait for resume signal until the loop is suspended - while ($this->pause && !$this->exit) { + while ($this->pause && $this->exit) { usleep(10000); pcntl_signal_dispatch(); }
usleep(10000);
pcntl_signal_dispatch();
}
Expand Down
4 changes: 2 additions & 2 deletions src/Debug/QueueCollector.php
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ public function getSummary(): array
return [];
}

$countPushes = array_sum(array_map(fn ($messages) => is_countable($messages) ? count($messages) : 0, $this->pushes));
$countPushes = array_sum(array_map(static fn ($messages) => is_countable($messages) ? count($messages) : 0, $this->pushes));
$countStatuses = count($this->statuses);
$countProcessingMessages = array_sum(array_map(fn ($messages) => is_countable($messages) ? count($messages) : 0, $this->processingMessages));
$countProcessingMessages = array_sum(array_map(static fn ($messages) => is_countable($messages) ? count($messages) : 0, $this->processingMessages));

return [
'countPushes' => $countPushes,
Expand Down
4 changes: 2 additions & 2 deletions src/Debug/QueueDecorator.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
final class QueueDecorator implements QueueInterface
{
public function __construct(
private QueueInterface $queue,
private QueueCollector $collector,
private readonly QueueInterface $queue,
private readonly QueueCollector $collector,
) {
}

Expand Down
4 changes: 2 additions & 2 deletions src/Debug/QueueWorkerInterfaceProxy.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
final class QueueWorkerInterfaceProxy implements WorkerInterface
{
public function __construct(
private WorkerInterface $worker,
private QueueCollector $collector,
private readonly WorkerInterface $worker,
private readonly QueueCollector $collector,
) {
}

Expand Down
6 changes: 4 additions & 2 deletions src/Exception/JobFailureException.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@

final class JobFailureException extends RuntimeException
{
public function __construct(private MessageInterface $queueMessage, Throwable $previous)
{
public function __construct(
private readonly MessageInterface $queueMessage,
Throwable $previous
) {
$error = $previous->getMessage();
$messageId = $queueMessage->getMetadata()[IdEnvelope::MESSAGE_ID_KEY] ?? 'null';
$messageText = "Processing of message #$messageId is stopped because of an exception:\n$error.";
Expand Down
2 changes: 1 addition & 1 deletion src/Message/EnvelopeTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ trait EnvelopeTrait
/**
* A mirror of {@see MessageInterface::fromData()}
*/
abstract public static function fromMessage(MessageInterface $message): self;
abstract public static function fromMessage(MessageInterface $message): MessageInterface;

public static function fromData(string $handlerName, mixed $data, array $metadata = []): MessageInterface
{
Expand Down
Loading
Loading