diff --git a/docs/source/components/dataset/operators/extend.rst b/docs/source/components/dataset/operators/extend.rst new file mode 100644 index 0000000..b7489d9 --- /dev/null +++ b/docs/source/components/dataset/operators/extend.rst @@ -0,0 +1,292 @@ +======================= +Write your own operator +======================= + +To create your own operator, you need to define a class that implements +``RunOpenCode\Component\Dataset\Contract\OperatorInterface``. The library also +provides a base class, ``RunOpenCode\Component\Dataset\AbstractStream``, +which serves as a prototype for custom operators and significantly simplifies +their implementation. + +.. _PHPStan: https://phpstan.org + +.. note:: This tutorial assumes that you are familiar with `PHPStan`_ and + generics. + +For the purpose of this tutorial, we will implement a log() operator. The goal +of this operator is to monitor a stream and write each item (key and value) to a +file. + +.. note:: Security, usability, bugs, and edge cases are intentionally ignored + in this example. The goal of the tutorial is to demonstrate the + process of writing and using a custom operator. + +Creating the operator class +--------------------------- + +First, we need to create our operator class and define its signature. + +.. code-block:: php + :linenos: + + + * @implements OperatorInterface + */ + final class Log extends AbstractStream implements OperatorInterface + { + /** + * @param iterable $source Stream source to iterate over. + * @param non-empty-string $file File where items should be stored. + */ + public function __construct( + iterable $source, + private readonly string $file, + ) { + parent::__construct($source); + } + } + +The operator class extends ``RunOpenCode\Component\Dataset\AbstractStream`` and +implements ``RunOpenCode\Component\Dataset\Contract\OperatorInterface``. + +The class defines two generic template parameters, ``TKey`` and ``TValue``, +which describe the key and value types yielded by the operator. This information +is required by PHPStan for correct type inference. + +The constructor accepts two arguments: + +* The source stream to iterate over. +* A non-empty string representing the path to the file where stream items + will be written. + +From the declared generic types and constructor signature, it is clear that this +operator does not modify the original stream. + +.. note:: If you need to implement an operator that modifies the stream, you can + refer to existing implementations such as + ``RunOpenCode\Component\Dataset\Operator\Map`` or + ``RunOpenCode\Component\Dataset\Operator\Merge``. + +The constructor must pass the source stream to the parent ``AbstractStream`` +implementation. This is required so that the library can correctly track +upstreams and provide proper support for aggregators. + +Lines 12 and 13 are required in order to provide information to the PHPStan what +will operator yield. Line 22 provides information about input stream source and +what input source streams. Line 23 defines required parameter for operator - a +path to a file where stream items will be stored. + +It is clear from lines 12, 13 and 22 that operator does not modifies the +original stream. + +Line 25 is required and in its essence, it will provide source stream to +prototype stream implementation ``RunOpenCode\Component\Dataset\AbstractStream`` +which will track upstreams and provide correct support for aggregators. + +Implementing the operator logic +------------------------------- + +Next, implement the ``iterate(): \Traversable`` method. +This method is executed when the stream is iterated and contains the actual +operator logic. + +.. code-block:: php + :linenos: + + + * @implements OperatorInterface + */ + final class Log extends AbstractStream implements OperatorInterface + { + // Code omitted for the sake of readability. + + /** + * {@inheritdoc} + */ + public function iterate(): \Traversable + { + $handler = \Safe\fopen($this->file, 'wb'); // @see https://github.com/thecodingmachine/safe + + try { + foreach($this->source as $key => $value) { + \Safe\fwrite($handler, \sprintf( + 'Key: "%s", Value: "%s"', + self::stringify($key), + self::stringify($value), + )); + + yield $key => $value; + } + } finally { + \Safe\fclose($handler); + } + } + + /** + * Cast anything to its string representation. + */ + private static function stringify(mixed $value): string + { + // Implementation omitted. + } + } + +Inside this method, a file handler is opened using the provided file path. +The source stream is then iterated item by item. + +For each key–value pair: + +* The key and value are converted to their string representations + (implementation of method ``Log::stringify()`` is omitted for the sake of + readability). +* The formatted output is written to the file. +* The original key and value are yielded back to the stream. + +**Yielding the original key and value is the most important step**, as it allows +the stream to continue flowing to downstream operators or consumers. + +The file handler is closed in a finally block to ensure that resources are +released even if an error occurs during iteration. + +.. warning:: This implementation assumes that the stream will be fully iterated, + which is not always the case. Consumer of the stream can break + iteration (either by using ``break`` in loop, or by using operator + which limits number of iterations, such as ``take()``). When + writing your own operators, always account for early termination + and ensure that resources are handled correctly. + +Testing the operator +-------------------- + +Once the operator is complete, it can be unit tested without requiring any +external dependencies. + +.. code-block:: php + :linenos: + + assertFileDoesNotExist('/tmp/test_logs.log'); + + $operator = new Log([1, 2, 3], '/tmp/test_logs.log'); + + \iterator_to_array($operator); + + $this->assertSame( + \Safe\file_get_contents('/path/to/expected/output/file.log'), + \Safe\file_get_contents('/tmp/test_logs.log'), + ); + + \Safe\unlink('/tmp/test_logs.log'); + } + } + +Using the operator +------------------ + +When using streams in an object-oriented style, you can call the ``operator()`` +method on an instance of ``RunOpenCode\Component\Dataset\Stream`` to apply your +operator. + + +.. code-block:: php + :linenos: + + operator(Log::class, '/path/to/file.log'); + +When using the functional style in PHP 8.5 or later, the ``operator()`` function +is also available. + +.. code-block:: php + :linenos: + + stream(...) + |> static fn(iterable $stream): Stream => operator($stream, Log::class, '/path/to/file.log'); + +General advices for implementing operators +------------------------------------------ + +* **Keep your operators simple.** The general idea of each operator is to + perform one simple, fundamental operation. Any data-processing complexity + should be achieved by composing multiple simple operators, not by implementing + a single complex operator that performs multiple tasks simultaneously. +* **Reuse existing operators whenever possible.** If your operator can be + expressed as a composition of existing operators, with only a small amount of + custom stream-specific logic, do not reimplement existing functionality. + Existing operators are already unit-tested, which makes it easier to reason + about their composition than to write everything from scratch. +* **Keys can be anything — do not break this assumption.** A generator may emit + values with any type of key. +* **Keys are not unique — do not break this assumption.** A generator may emit + multiple items with the same key. +* **Streams are not rewindable — do not break this assumption.** Although + ``array`` is ``iterable``, ``\Generator`` is also ``iterable``, and generators + cannot be rewound. +* **Streams may not be fully iterated** — do not assume that users of your + operator will consume the entire stream. They may use ``break`` or ``take()`` + in their code. +* **Do not lock or hold resources.** If you do, release them when the operator + finishes streaming. Since there is no reliable way to detect whether streaming + was interrupted (for example, via ``break`` or another operator), ensure that + your operator implements the ``__destruct()`` method to properly release any + resources on garbage collection. + +If you believe your operator would be valuable to the wider community, feel free +to submit a pull request! \ No newline at end of file diff --git a/docs/source/components/dataset/operators/index.rst b/docs/source/components/dataset/operators/index.rst index 12a5d9c..b6b8291 100644 --- a/docs/source/components/dataset/operators/index.rst +++ b/docs/source/components/dataset/operators/index.rst @@ -1,3 +1,70 @@ ========= Operators ========= + +Operator, also known as intermediate operations, are functions which allows you +to transform, combine and/or control streams of data. Each time when you apply +operator on stream, you get new instance of +``RunOpenCode\Component\Dataset\Stream``. That means that you can chain +operators describing in declarative way process of data processing from the +start to the end on each individual streamed record. + +.. toctree:: + :maxdepth: 1 + :hidden: + + list/index + extend + + +As already mentioned, applying operators to a stream is essentially a +composition of functions, presented in a more developer-friendly way: + +.. code-block:: php + :linenos: + + map(/* ... */) + ->tap(/* ... */) + ->takeUntil(/* ... */) + ->finally(/* ... */); + +which maps to: + +.. code-block:: php + :linenos: + + map(/* ... */) + |> tap(/* ... */) + |> takeUntil(/* ... */) + |> finally(/* ... */); + diff --git a/docs/source/components/dataset/operators/list/buffer_count.rst b/docs/source/components/dataset/operators/list/buffer_count.rst new file mode 100644 index 0000000..18fca57 --- /dev/null +++ b/docs/source/components/dataset/operators/list/buffer_count.rst @@ -0,0 +1,75 @@ +============= +bufferCount() +============= + +Buffers the stream of data until buffer reaches predefined number of items (or +stream is exhausted) and yields instance of +``RunOpenCode\Component\Dataset\Model\Buffer`` for batch processing. + +Memory consumption depends on the size of the buffer; however, the operator is +still considered **memory-safe**. + +.. php:namespace:: RunOpenCode\Component\Dataset\Operator + + +.. php:class:: BufferCount + + .. php:method:: __construct(iterable $source, int $count = 1000) + + :param $source: ``iterable`` Stream source to iterate over. + :param $count: ``positive-int`` Number of items to store into buffer. + + + .. php:method:: getIterator() + + :returns: ``\Traversable>`` Stream of buffers. + + +Use cases +--------- + +* You want to load large dataset (per example, from file or from database) and + process it in batches (batch import). + +Example +------- + +Load data from legacy database and execute batch import of entities into new +system. Notify rest of the application that new entity has been created. + + +.. code-block:: php + :linenos: + + executeQuery('...'); + + new Stream($dataset) + ->bufferCount(100) + ->map(function(Buffer $buffer) use ($orm): Buffer { + $entities = new Stream($buffer) + ->map(function(array $row): Entity { + return Entity::fromLegacy($row); + }) + ->tap(function(Entity $entity) use ($orm): void { + $orm->persist($entity); + }) + ->collect(ArrayCollector::class); + + $orm->flush(); + $orm->clear(); + + return $entities; + }) + ->flatten() + ->tap(function(Entity $entity) use ($dispatcher): void { + $dispatcher->notify(new EntityCreated($entity::class, $entity->id)); + }) + ->flush(); diff --git a/docs/source/components/dataset/operators/list/buffer_while.rst b/docs/source/components/dataset/operators/list/buffer_while.rst new file mode 100644 index 0000000..1b52c28 --- /dev/null +++ b/docs/source/components/dataset/operators/list/buffer_while.rst @@ -0,0 +1,90 @@ +============= +bufferWhile() +============= + +Buffers stream items while the predicate returns ``true``. When the predicate +evaluates to ``false`` (or the stream ends), a +``RunOpenCode\Component\Dataset\Model\Buffer`` instance is emitted and buffering +restarts. + +.. warning:: Memory usage depends on the predicate and data distribution, so + memory safety depends on the specific use case. + +.. php:namespace:: RunOpenCode\Component\Dataset\Operator + + +.. php:class:: BufferWhile + + .. php:method:: __construct(iterable $source, callable(Buffer, TKey=, TValue=): bool $predicate) + + :param $source: ``iterable`` Stream source to iterate over. + :param $predicate: ``callable(Buffer, TKey=, TValue=): bool`` Predicate function to evaluate if current item should be placed into existing buffer, + or existing buffer should be yielded and new one should be created with current item. + + + .. php:method:: getIterator() + + :returns: ``\Traversable>`` Stream of buffers. + + +Use cases +--------- + +* You want to load large dataset (per example, from file or from database) and + process it in batches (batch import). However, you want to cluster stream data + based on yielded keys/values from data stream. + +Example +------- + +Model defines ``Person`` entity and ``Address`` entity which needs to be +migrated from legacy system. In the first iteration, all legacy data for +``Person`` is migrated into new system. + +In the next iteration, data for ``Address`` is loaded, ordered by ``person_id`` +column, and for each related ``Person``, addresses are stored. + +On each processed ``Person`` and its addresses, system is notified about +successful migration of ``Person`` addresses. + + +.. code-block:: php + :linenos: + + executeQuery('...'); + + new Stream($dataset) + ->bufferWhile(function(Buffer $buffer, array $values): bool { + return $buffer->first()->value()['person_id'] === $values['person_id']; + }) + ->map(function(Buffer $buffer) use ($orm): Buffer { + $person = $orm->fetchOne(Person::class, $buffer->first()->value()['person_id']); + $addresses = new Stream($buffer) + ->map(function(array $row): Address { + return Address::fromLegacy($row); + }) + ->tap(function(Address $entity) use ($orm): void { + $orm->persist($entity); + }) + ->collect(ArrayCollector::class); + + $person->setAddresses($addresses); + + $orm->flush(); + $orm->clear(); + + return $person; + }) + ->tap(function(Person $person) use ($dispatcher): void { + $dispatcher->notify(new PersonAddressesMigrated($person::class, $person->id)); + }) + ->flush(); diff --git a/docs/source/components/dataset/operators/list/distinct.rst b/docs/source/components/dataset/operators/list/distinct.rst new file mode 100644 index 0000000..e5f086a --- /dev/null +++ b/docs/source/components/dataset/operators/list/distinct.rst @@ -0,0 +1,55 @@ +========== +distinct() +========== + +The distinct operator emits only items that are unique, preserving FIFO order. +By default, items are compared using the strict equality operator (===). + +Optionally, you may provide a callable that computes an identity for each item +based on its value and key. When provided, distinctness is determined by +performing strict equality comparisons on the computed identities instead of the +original items. + +.. warning:: The memory consumption of this operator depends on the number of + distinct items emitted by the upstream. As a result, it is + considered memory-unsafe, since memory usage can grow without bound + for unbounded streams. + +.. php:namespace:: RunOpenCode\Component\Dataset\Operator + + +.. php:class:: Distinct + + .. php:method:: __construct(iterable $source, ?callable(TValue, TKey=): string $identity = null) + + :param $source: ``iterable`` Stream to iterate over. + :param $identity: ``?callable(TValue, TKey=): string`` User defined callable to determine item identity. If null, strict comparison (===) of values is used. + + + .. php:method:: getIterator() + + :returns: ``\Traversable`` Distinct items from the stream source (distinct by value or by identity). + +Use cases +--------- + +* Use this operator to eliminate duplicate items from a stream. + +Example +------- + +.. code-block:: php + :linenos: + + 1; + yield 'a' => 2; + yield 'a' => 1; + yield 'b' => 1; + yield 'b' => 1; + })()) + ->distinct(static fn(int $value, string $key): string => \sprintf('%s_%d', $key, $value)); + + // preserves: 'a' => 1, 'a' => 2, 'b' => 1 \ No newline at end of file diff --git a/docs/source/components/dataset/operators/list/filter.rst b/docs/source/components/dataset/operators/list/filter.rst new file mode 100644 index 0000000..eb63653 --- /dev/null +++ b/docs/source/components/dataset/operators/list/filter.rst @@ -0,0 +1,39 @@ +======== +filter() +======== + +Filter operator iterates over given stream source and yields only those items +for which user defined callable returns ``true``. + +.. php:namespace:: RunOpenCode\Component\Dataset\Operator + + +.. php:class:: Filter + + .. php:method:: __construct(iterable $source, callable(TValue, TKey=): bool $filter) + + :param $source: ``iterable`` Stream source to iterate over. + :param $filter: ``callable(TValue, TKey=): bool`` User defined callable to filter items. + + + .. php:method:: getIterator() + + :returns: ``\Traversable`` Filtered items from the stream source. + +Use cases +--------- + +* Use this operator to eliminate items according to some filtering criteria. + +Example +------- + +.. code-block:: php + :linenos: + + filter(static fn(int $value): bool => 0 === $value % 2); + + // preserves: 2, 4, 6 \ No newline at end of file diff --git a/docs/source/components/dataset/operators/list/finalize.rst b/docs/source/components/dataset/operators/list/finalize.rst new file mode 100644 index 0000000..25517eb --- /dev/null +++ b/docs/source/components/dataset/operators/list/finalize.rst @@ -0,0 +1,69 @@ +========== +finalize() +========== + +Iterates over the given stream source and yields its items. When iteration +completes or an exception occurs, the finalization function is invoked. + +This is equivalent to executing the iteration inside a try/finally block. + +If iteration of the stream source ends prematurely (for example, via a ``break`` +statement), the finalization function is invoked when the operator instance +is garbage-collected. + +.. php:namespace:: RunOpenCode\Component\Dataset\Operator + + +.. php:class:: Finalize + + .. php:method:: __construct(iterable $source, callable(): void $finalizer) + + :param $source: ``iterable`` Stream source to iterate over. + :param $finalizer: ``callable(): void`` User defined callable to invoke when iterator is depleted, or exception is thrown, + or operator instance is garbage collected. + + + .. php:method:: getIterator() + + :returns: ``\Traversable`` Items from the stream source. + +Use cases +--------- + +* Use this operator when you want to express finalization logic in a declarative + manner. + +Example +------- + +Finalization logic for stream processing may be provided by wrapping stream +processing with try/finally block: + +.. code-block:: php + :linenos: + + filter(...) + ->map(...) + ->tap(...); + } finally { + // Finalization logic... + } + +However, with this operator, same result can be achieved in declarative manner: + +.. code-block:: php + :linenos: + + filter(...) + ->map(...) + ->tap(...) + ->finalize(function(): void { + // Finalization logic + }); \ No newline at end of file diff --git a/docs/source/components/dataset/operators/list/flatten.rst b/docs/source/components/dataset/operators/list/flatten.rst new file mode 100644 index 0000000..b99d76d --- /dev/null +++ b/docs/source/components/dataset/operators/list/flatten.rst @@ -0,0 +1,44 @@ +========= +flatten() +========= + +Flatten operator iterates over given stream of iterables and yields each item +from each iterable in a single flat sequence. + +By default, keys from inner iterables are not preserved, which can be overridden +in constructor. + +.. php:namespace:: RunOpenCode\Component\Dataset\Operator + + +.. php:class:: Flatten + + .. php:method:: __construct(iterable> $source, bool $preserveKeys = false) + + :param $source: ``iterable>`` Stream of streams to iterate over. + :param $preserveKeys: ``bool`` Should keys be preserved from the flattened stream, false by default. + + + .. php:method:: getIterator() + + :returns: ``\Traversable`` or ``\Traversable`` Flattened stream. + +Use cases +--------- + +* Use this operator to flatten nested structures, per example, to flatten buffer. + +Example +------- + +.. code-block:: php + :linenos: + + bufferCount(100) + ->tap(function(Buffer $buffer): void { + // Do a batch processing... + }) + ->flatten(); // Continue with original stream. diff --git a/docs/source/components/dataset/operators/list/if_empty.rst b/docs/source/components/dataset/operators/list/if_empty.rst new file mode 100644 index 0000000..a8860c5 --- /dev/null +++ b/docs/source/components/dataset/operators/list/if_empty.rst @@ -0,0 +1,59 @@ +========= +ifEmpty() +========= + +IfEmpty operator tracks the number of yielded items. If the stream is empty, it +will invoke the provided callable and yield from it as an alternative source of +data. + +If exception is provided instead of alternative source of data, that exception +will be thrown. + +.. php:namespace:: RunOpenCode\Component\Dataset\Operator + + +.. php:class:: IfEmpty + + .. php:method:: __construct(iterable $source, \Throwable|callable()|null: iterable $fallback = null) + + :param $source: ``iterable`` Stream source to iterate over. + :param $fallback: ``\Throwable|callable()|null: iterable`` Fallback stream source, or exception to + throw, or ``null`` to use the default empty-stream exception. + + + .. php:method:: getIterator() + + :returns: ``\Traversable`` Provided stream source or fallback stream source. + +Use cases +--------- + +* Provide an alternative source of data if the original is empty. +* Assert that the stream is not empty and throw an exception if it is. + +Example +------- + +Try with dataset stored in cache first. If cache is empty, load data from +database. + +.. code-block:: php + :linenos: + + ifEmpty(function() use ($database): iterable { + return $database->fetch(); + }); + +Expect that the query will return at least one result; otherwise, throw an +exception. + +.. code-block:: php + :linenos: + + ifEmpty(new \RuntimeException('At least one record is expected.')); diff --git a/docs/source/components/dataset/operators/list/index.rst b/docs/source/components/dataset/operators/list/index.rst new file mode 100644 index 0000000..af3455a --- /dev/null +++ b/docs/source/components/dataset/operators/list/index.rst @@ -0,0 +1,66 @@ +=================== +Available operators +=================== + +.. toctree:: + :maxdepth: 1 + :hidden: + + buffer_count + buffer_while + distinct + filter + finalize + flatten + if_empty + map + merge + overflow + reverse + skip + sort + take + take_until + tap + +Transformation operators +------------------------ + +* ``bufferCount()`` +* ``bufferWhile()`` +* ``flatten()`` +* ``map()`` +* ``merge()`` +* ``reverse()`` +* ``sort()`` + +Filtering operators +------------------- + +* ``distinct()`` +* ``filter()`` +* ``skip()`` +* ``take()`` +* ``takeUntil()`` + +Asserting operators +------------------- + +* ``ifEmpty()`` +* ``overflow()`` + +Error handling operators +------------------------ + +* ``finalize()`` + +Aggregate operators +------------------- + +* ``reduce()`` + +Utility operators +----------------- + +* ``tap()`` + diff --git a/docs/source/components/dataset/operators/list/map.rst b/docs/source/components/dataset/operators/list/map.rst new file mode 100644 index 0000000..6079814 --- /dev/null +++ b/docs/source/components/dataset/operators/list/map.rst @@ -0,0 +1,53 @@ +===== +map() +===== + +Map operator iterates over given stream source and applies transformation +functions on keys/values before yielding. + +Operator may be used to transform only keys, or only values, or both. + +.. php:namespace:: RunOpenCode\Component\Dataset\Operator + + +.. php:class:: Map + + .. php:method:: __construct(iterable $source, ?callable(TValue, TKey=): TModifiedValue $valueTransform = null, ?callable(TKey, TValue=): TModifiedKey $keyTransform = null) + + :param $source: ``iterable`` Stream source to iterate over. + :param $valueTransform: ``?callable(TValue, TKey=): TModifiedValue`` Optional transformation function for transforming values. + :param $keyTransform: ``?callable(TKey, TValue=): TModifiedKey`` Optional transformation function for transforming keys. + + + .. php:method:: getIterator() + + :returns: ``\Traversable`` Modified keys and values from the stream source. + +Use cases +--------- + +* Modify values and/or keys. +* Index stream records by value. + +Example +------- + +Transform rows from database into entity objects, collect them into an array +indexed by identifier. + +.. code-block:: php + :linenos: + + execute('SELECT ...'); + + new Stream($dataset) + ->map( + valueTransform: static function(array $row): Entity { + return Entity::fromArray($row); + }, + keyTransform: static function(int $key, array $row): string { + return $row['id']; + } + ); diff --git a/docs/source/components/dataset/operators/list/merge.rst b/docs/source/components/dataset/operators/list/merge.rst new file mode 100644 index 0000000..f7820fe --- /dev/null +++ b/docs/source/components/dataset/operators/list/merge.rst @@ -0,0 +1,43 @@ +======= +merge() +======= + +Merges two streaming sources into a single stream, yielding items from both +sources. + +.. php:namespace:: RunOpenCode\Component\Dataset\Operator + + +.. php:class:: Merge + + .. php:method:: __construct(iterable $first, iterable $second) + + :param $first: ``iterable`` First stream source to iterate over. + :param $second: ``iterable`` Second stream source to iterate over. + + + .. php:method:: getIterator() + + :returns: ``\Traversable`` Stream containing keys and values from both sources. + +Use cases +--------- + +* Combine two stream sources into one. + +Example +------- + +Combine client records from a sharded database to produce a single consolidated +report for all clients. + +.. code-block:: php + :linenos: + + execute('SELECT ...'); + $euClients = $euDbConnection->execute('SELECT ...'); + + new Stream($usClients) + ->merge($euClients); diff --git a/docs/source/components/dataset/operators/list/overflow.rst b/docs/source/components/dataset/operators/list/overflow.rst new file mode 100644 index 0000000..482a75f --- /dev/null +++ b/docs/source/components/dataset/operators/list/overflow.rst @@ -0,0 +1,46 @@ +========== +overflow() +========== + +Monitors the number of items yielded by the stream and raises an exception when +the allowed limit is exceeded. + +.. php:namespace:: RunOpenCode\Component\Dataset\Operator + + +.. php:class:: Overflow + + .. php:method:: __construct(iterable $source, positive-int $capacity, \Throwable|(callable(StreamOverflowException=): \Throwable)|null $throw = null) + + :param $source: ``iterable`` Stream source to iterate over. + :param $capacity: ``positive-int`` Maximum number of items to iterate over. + :param $throw: ``\Throwable|(callable(StreamOverflowException=): \Throwable)|null`` Exception to throw if stream yielded more items then capacity allows. + If ``null`` is provided, ``StreamOverflowException`` is thrown. Otherwise, + provided exception will be thrown, or callable invoked to create exception to throw. + + + .. php:method:: getIterator() + + :returns: ``\Traversable`` Items from the stream source. + +Use cases +--------- + +* When number of yielded items is expected not to exceed given capacity. + +Example +------- + +Execute query which should yield number of items not more than expected count +(per example, business logic allows for user to have up to 10 bank accounts). + +.. code-block:: php + :linenos: + + execute('SELECT * FROM accounts WHERE accounts.user_id = :user'); + + new Stream($accounts) + ->overflow(10) + ->collect(ArrayCollector::class); diff --git a/docs/source/components/dataset/operators/list/reverse.rst b/docs/source/components/dataset/operators/list/reverse.rst new file mode 100644 index 0000000..c3832bf --- /dev/null +++ b/docs/source/components/dataset/operators/list/reverse.rst @@ -0,0 +1,38 @@ +========= +reverse() +========= + +Reverse operator iterates over given stream source and yields items in reverse +order. + +.. warning:: The memory consumption of this operator depends on the number of + items in the stream and it is considered as **memory unsafe**. + +.. php:namespace:: RunOpenCode\Component\Dataset\Operator + + +.. php:class:: Reverse + + .. php:method:: __construct(iterable $source) + + :param $source: ``iterable`` Stream source to iterate over in reverse order. + + .. php:method:: getIterator() + + :returns: ``\Traversable`` Items from the stream source yielded in reverse order. + +Use cases +--------- + +* When reverse order of items in stream is required. + +Example +------- + +.. code-block:: php + :linenos: + + 1, 'b' => 2, 'c' => 3]) + ->reverse(); // yields 'c' => 3, 'b' => 2, 'a' => 1 diff --git a/docs/source/components/dataset/operators/list/skip.rst b/docs/source/components/dataset/operators/list/skip.rst new file mode 100644 index 0000000..52646fd --- /dev/null +++ b/docs/source/components/dataset/operators/list/skip.rst @@ -0,0 +1,36 @@ +====== +skip() +====== + +The skip operator processes a stream source by discarding the first N items +and yielding all subsequent items. + +.. php:namespace:: RunOpenCode\Component\Dataset\Operator + + +.. php:class:: Skip + + .. php:method:: __construct(iterable $source, positive-int $count) + + :param $source: ``iterable`` Stream source to iterate over. + :param $count: ``positive-int`` Number of items to skip. + + .. php:method:: getIterator() + + :returns: ``\Traversable`` Items from the stream source after first ``$count`` items. + +Use cases +--------- + +* When first N items needs to be skipped. + +Example +------- + +.. code-block:: php + :linenos: + + 1, 'b' => 2, 'c' => 3]) + ->skip(2); // yields 'c' => 3 diff --git a/docs/source/components/dataset/operators/list/sort.rst b/docs/source/components/dataset/operators/list/sort.rst new file mode 100644 index 0000000..c573ca6 --- /dev/null +++ b/docs/source/components/dataset/operators/list/sort.rst @@ -0,0 +1,53 @@ +====== +sort() +====== + +The sort operator processes a stream source and yields items ordered by keys or +values. Ordering may be defined by a user-supplied comparator or by the default +comparator, which relies on the spaceship operator (<=>) for key or value +comparison. + +.. warning:: The memory consumption of this operator depends on the number of + items in the stream and it is considered as **memory unsafe**. + +.. php:namespace:: RunOpenCode\Component\Dataset\Operator + + +.. php:class:: Sort + + .. php:method:: __construct(iterable $source, ?callable(TKey|TValue, TKey|TValue): int $comparator = null, bool $byKeys = false) + + :param $source: ``iterable`` Stream source to iterate over. + :param $comparator: ``?callable(TKey|TValue, TKey|TValue): int`` User defined callable to compare two items. If ``null``, spaceship operator (``<=>``) is used. + :param $byKeys: ``bool`` If ``$byKeys`` is ``true``, keys will be compared instead of values. + + .. php:method:: getIterator() + + :returns: ``\Traversable`` Sorted items from the stream source. + +Use cases +--------- + +* When items from stream needs to be sorted, either by keys or by values. + +Example +------- + +.. code-block:: php + :linenos: + + 3, 'b' => 1, 'c' => 2]) + ->sort( + comparator: static fn(int $first, int $second): int => $first <=> $second, + byKeys: false, + ); + + // Sort by keys. + new Stream(['a' => 3, 'b' => 1, 'c' => 2]) + ->sort( + comparator: static fn(string $first, string $second): int => \strcmp($first, $second), + byKeys: true, + ); \ No newline at end of file diff --git a/docs/source/components/dataset/operators/list/take.rst b/docs/source/components/dataset/operators/list/take.rst new file mode 100644 index 0000000..eb0bb3e --- /dev/null +++ b/docs/source/components/dataset/operators/list/take.rst @@ -0,0 +1,36 @@ +====== +take() +====== + +Take operator iterates over given stream source and yields only the first N +items. + +.. php:namespace:: RunOpenCode\Component\Dataset\Operator + + +.. php:class:: Take + + .. php:method:: __construct(iterable $source, positive-int $count) + + :param $source: ``iterable`` Stream source to iterate over. + :param $count: ``positive-int`` Number of items to take. + + .. php:method:: getIterator() + + :returns: ``\Traversable`` First N items from the stream source. + +Use cases +--------- + +* When only first N items are needed to be iterated. + +Example +------- + +.. code-block:: php + :linenos: + + 1, 'b' => 2, 'c' => 3]) + ->take(2); // yields 'a' => 1, 'b' => 2 diff --git a/docs/source/components/dataset/operators/list/take_until.rst b/docs/source/components/dataset/operators/list/take_until.rst new file mode 100644 index 0000000..7be986f --- /dev/null +++ b/docs/source/components/dataset/operators/list/take_until.rst @@ -0,0 +1,37 @@ +=========== +takeUntil() +=========== + +The take operator processes a stream source and yields items until the predicate +callable indicates that iteration should stop. + +.. php:namespace:: RunOpenCode\Component\Dataset\Operator + + +.. php:class:: TakeUntil + + .. php:method:: __construct(iterable $source, callable(TValue, TKey=): bool $predicate) + + :param $source: ``iterable`` Stream source to iterate over. + :param $predicate: ``callable(TValue, TKey=): bool`` Predicate callable to evaluate stop condition. + + .. php:method:: getIterator() + + :returns: ``\Traversable`` First N items from the stream source until predicate callable was satisfied. + +Use cases +--------- + +* When only the first N items need to be yielded until some condition is met. + +Example +------- + +.. code-block:: php + :linenos: + + 1, 'b' => 2, 'c' => 3]) + ->takeUntil(static fn(int $value, string $key): bool => $value > 2); + // yields 'a' => 1, 'b' => 2 diff --git a/docs/source/components/dataset/operators/list/tap.rst b/docs/source/components/dataset/operators/list/tap.rst new file mode 100644 index 0000000..a4edce8 --- /dev/null +++ b/docs/source/components/dataset/operators/list/tap.rst @@ -0,0 +1,36 @@ +===== +tap() +===== + +Provides a way to observe the stream by executing a callback for each item. + +.. php:namespace:: RunOpenCode\Component\Dataset\Operator + + +.. php:class:: Tap + + .. php:method:: __construct(iterable $source, callable(TValue, TKey=): void $callback) + + :param $source: ``iterable`` Stream source to iterate over. + :param $callback: ``callable(TValue, TKey=): void`` Callable to execute for each item. + + .. php:method:: getIterator() + + :returns: ``\Traversable`` Items from the stream source. + +Use cases +--------- + +* Monitor yielded items for debugging/logging purposes. +* Execute processing logic for each yielded item. + +Example +------- + +.. code-block:: php + :linenos: + + 1, 'b' => 2, 'c' => 3]) + ->tap(static fn(int $value, string $key): bool => print("Key: $key, Value: $value\n")); diff --git a/docs/source/components/dataset/stream/index.rst b/docs/source/components/dataset/stream/index.rst index 29ed628..9125003 100644 --- a/docs/source/components/dataset/stream/index.rst +++ b/docs/source/components/dataset/stream/index.rst @@ -14,9 +14,9 @@ or collect data into some convenient data structure using collectors. Fluent API ---------- -With an instance of ``RunOpenCode\Component\Dataset\Stream``, you can apply a -variety of operators to transform or filter the data. Operators process each -item in the stream lazily, meaning that no computation is performed until the +With an instance of ``RunOpenCode\Component\Dataset\Stream``, you can apply a +variety of operators to transform or filter data. Operators process each item +in the stream lazily, which means that no computation is performed until the stream is iterated. Examples of some of the available operators include: @@ -24,18 +24,21 @@ Examples of some of the available operators include: * ``map()`` – transform each value in the stream. * ``filter()`` – include only values that meet a given condition. * ``take()``, ``skip()``, ``distinct()`` – control which items are emitted. -* ``sort()``, ``reverse()`` – ordering operators (note: these load the entire stream into memory). +* ``sort()``, ``reverse()`` – ordering operators (note: these load the entire + stream into memory). * etc. -Additionally, ``RunOpenCode\Component\Dataset\Stream`` supports aggregators, -which are attached reducers that compute a reduced value while the stream is -being iterated. This allows you to process a stream and calculate totals, -averages, or other summary values simultaneously without breaking the data flow. +In addition, ``RunOpenCode\Component\Dataset\Stream`` supports aggregators. +Aggregators are attached reducers that compute a reduced value while the stream +is being iterated. This makes it possible to process a stream and compute totals, +averages, or other summary values at the same time, without interrupting the +data flow. - **Applying operators and aggregators on stream DOES NOT break the stream and - its fluent API.** + **NOTE**: Applying operators and aggregators to a stream **does NOT** break + the stream or its fluent API. However, the fluent API may end with either a + reducer or a collector, which will break fluent API. -With the fluent API, you are able to write stream processing code in declarative +Using the fluent API, you can write stream-processing code in a declarative manner: .. code-block:: php @@ -45,14 +48,122 @@ manner: use RunOpenCode\Component\Dataset\Stream; - Stream::create(/* ... */) + new Stream(/* ... */) ->map(/* ... */) ->tap(/* ... */) ->takeUntil(/* ... */) ->finally(/* ... */); +.. _pipe operator: https://wiki.php.net/rfc/pipe-operator-v3 + +If you are using PHP 8.5 or higher, you can leverage the `pipe operator`_ and +write stream-processing code in a functional style using functions. + +.. code-block:: php + :linenos: + + map(/* ... */) + |> tap(/* ... */) + |> takeUntil(/* ... */) + |> finally(/* ... */); + Internals --------- -Class ``RunOpenCode\Component\Dataset\Stream`` implements -``RunOpenCode\Component\Dataset\Contract\StreamInterface``. While \ No newline at end of file +Class ``RunOpenCode\Component\Dataset\Stream`` implements +``RunOpenCode\Component\Dataset\Contract\StreamInterface``. However, this +interface **is not intended to be implemented** directly. The library provides +different extension mechanisms, which are discussed in detail in separate +chapters of this documentation. + +The interface defines the following: + +* An extension of the ``\IteratorAggregate`` interface, which is + required for the stream to be iterable. +* A property ``bool $closed`` that indicates whether the stream has been + iterated. It does not indicate whether the stream has been fully iterated, as + iteration may be terminated early. +* A property ``array $aggregated`` that contains a list + of all aggregators attached to the stream, along with the values aggregated + during the current iteration. This means that after each iteration, you can + retrieve the current aggregated values. + +The interface also defines additional properties; however, they are annotated as +``@internal``, which means they should not be used directly in your code. They +are described here only to provide a better understanding of the internal +implementation: + +* A property ``list> $upstreams`` that contains a list of + streams preceding the current stream. +* A property ``array`` that contains a + list of all attached aggregators from the current stream or any upstream + stream. You will not interact with this property directly, as the + ``array $aggregated`` property provides a more + developer-friendly way to access aggregated values. + +Upstreams +~~~~~~~~~ + +Applying operators to a stream is essentially a composition of functions, +presented in a more developer-friendly way. Consider the following example: + +.. code-block:: php + :linenos: + + map(/* ... */) + ->tap(/* ... */) + ->takeUntil(/* ... */) + ->finally(/* ... */); + +Under the hood, the library creates a composition of function calls, where each +subsequent call decorates the previous one. Illustratively, this can be written +as: + +.. code-block:: php + :linenos: + + map(/* ... */) // This is stream B + ->tap(/* ... */) // This is stream C + ->takeUntil(/* ... */) // This is stream D + ->finally(/* ... */); // This is stream E + +We can form a mental model of the stream composition as follows: + + A -> B -> C -> D -> E + +This can be interpreted as "*data flows from A into B, then into C*", and so on. +However, since we always interact with the last stream instance in the chain, +the model used by this library is based on *upstreams* rather than +*downstreams*. This means that each stream holds a reference to its preceding +stream (or streams), not to its child stream. + +This data structure enables support for aggregators (reducers applied to the +stream) that can be attached at any point in the stream composition and later +accessed from the final stream instance or collector. diff --git a/src/RunOpenCode/Component/Dataset/src/AbstractStream.php b/src/RunOpenCode/Component/Dataset/src/AbstractStream.php index 0743570..3e40d4a 100644 --- a/src/RunOpenCode/Component/Dataset/src/AbstractStream.php +++ b/src/RunOpenCode/Component/Dataset/src/AbstractStream.php @@ -10,7 +10,7 @@ use RunOpenCode\Component\Dataset\Exception\LogicException; /** - * Prototype for dataset streams. + * Prototype for streams. * * @template TKey * @template TValue diff --git a/src/RunOpenCode/Component/Dataset/src/Collector/ArrayCollector.php b/src/RunOpenCode/Component/Dataset/src/Collector/ArrayCollector.php index 17a2484..b38d660 100644 --- a/src/RunOpenCode/Component/Dataset/src/Collector/ArrayCollector.php +++ b/src/RunOpenCode/Component/Dataset/src/Collector/ArrayCollector.php @@ -12,7 +12,7 @@ use function RunOpenCode\Component\Dataset\iterable_to_array; /** - * Collect iterable into array. + * Collect stream into array. * * @template TKey of array-key * @template TValue @@ -32,7 +32,7 @@ final class ArrayCollector implements \IteratorAggregate, \Countable, \ArrayAcce * {@inheritdoc} */ public array $aggregated { - get => $this->collection instanceof StreamInterface ? $this->collection->aggregated : []; + get => $this->source instanceof StreamInterface ? $this->source->aggregated : []; } /** @@ -43,12 +43,12 @@ final class ArrayCollector implements \IteratorAggregate, \Countable, \ArrayAcce } /** - * @param iterable $collection Collection to collect. + * @param iterable $source Stream source to collect. */ public function __construct( - public readonly iterable $collection, + public readonly iterable $source, ) { - $this->value = iterable_to_array($this->collection); + $this->value = iterable_to_array($this->source); } /** diff --git a/src/RunOpenCode/Component/Dataset/src/Collector/CursoredCollector.php b/src/RunOpenCode/Component/Dataset/src/Collector/CursoredCollector.php index 4b894da..9439e53 100644 --- a/src/RunOpenCode/Component/Dataset/src/Collector/CursoredCollector.php +++ b/src/RunOpenCode/Component/Dataset/src/Collector/CursoredCollector.php @@ -105,10 +105,10 @@ final class CursoredCollector implements \IteratorAggregate, CollectorInterface private bool $exhausted = false; /** - * @param iterable $collection Collection to collect. + * @param iterable $source Stream source to collect. */ public function __construct( - private readonly iterable $collection, + private readonly iterable $source, public readonly int $offset = 0, public readonly ?int $limit = null, ) { @@ -124,12 +124,12 @@ public function getIterator(): \Traversable $this->closed = true; $this->aggregated = []; - foreach ($this->collection as $key => $value) { + foreach ($this->source as $key => $value) { $iteration++; if (null !== $this->limit && $iteration === $this->limit) { yield $key => $value; - $this->aggregated = $this->collection instanceof StreamInterface ? $this->collection->aggregated : []; + $this->aggregated = $this->source instanceof StreamInterface ? $this->source->aggregated : []; continue; } @@ -139,7 +139,7 @@ public function getIterator(): \Traversable return; } - $this->aggregated = $this->collection instanceof StreamInterface ? $this->collection->aggregated : []; + $this->aggregated = $this->source instanceof StreamInterface ? $this->source->aggregated : []; yield $key => $value; } diff --git a/src/RunOpenCode/Component/Dataset/src/Collector/IterableCollector.php b/src/RunOpenCode/Component/Dataset/src/Collector/IterableCollector.php index 07a1454..68eed55 100644 --- a/src/RunOpenCode/Component/Dataset/src/Collector/IterableCollector.php +++ b/src/RunOpenCode/Component/Dataset/src/Collector/IterableCollector.php @@ -9,10 +9,10 @@ use RunOpenCode\Component\Dataset\Exception\LogicException; /** - * Collect as original iterable. + * Collect as original stream. * - * Allows you to iterate through whole dataset providing you the access to - * aggregators when collection is iterated. + * Allows you to iterate through whole stream providing you the access to + * aggregators when stream is iterated. * * @template TKey * @template TValue @@ -40,7 +40,7 @@ final class IterableCollector implements \IteratorAggregate, CollectorInterface throw new LogicException('Collector must be iterated first.'); } - return $this->collection instanceof StreamInterface ? $this->collection->aggregated : []; + return $this->source instanceof StreamInterface ? $this->source->aggregated : []; } } @@ -57,10 +57,10 @@ final class IterableCollector implements \IteratorAggregate, CollectorInterface public private(set) int $count = 0; /** - * @param iterable $collection Collection to collect. + * @param iterable $source Stream source to collect. */ public function __construct( - private readonly iterable $collection, + private readonly iterable $source, public readonly int $offset = 0, public readonly ?int $limit = null, ) { @@ -74,7 +74,7 @@ public function getIterator(): \Traversable { $this->closed = true; - foreach ($this->collection as $key => $value) { + foreach ($this->source as $key => $value) { yield $key => $value; $this->count++; } diff --git a/src/RunOpenCode/Component/Dataset/src/Collector/ListCollector.php b/src/RunOpenCode/Component/Dataset/src/Collector/ListCollector.php index 3269a60..9c09c98 100644 --- a/src/RunOpenCode/Component/Dataset/src/Collector/ListCollector.php +++ b/src/RunOpenCode/Component/Dataset/src/Collector/ListCollector.php @@ -12,7 +12,7 @@ use function RunOpenCode\Component\Dataset\iterable_to_list; /** - * Collect iterable into list. + * Collect stream into list. * * @template TKey * @template TValue @@ -32,7 +32,7 @@ final class ListCollector implements \IteratorAggregate, \Countable, \ArrayAcces * {@inheritdoc} */ public array $aggregated { - get => $this->collection instanceof StreamInterface ? $this->collection->aggregated : []; + get => $this->source instanceof StreamInterface ? $this->source->aggregated : []; } /** @@ -43,12 +43,12 @@ final class ListCollector implements \IteratorAggregate, \Countable, \ArrayAcces } /** - * @param iterable $collection Collection to collect. + * @param iterable $source Stream source to collect. */ public function __construct( - private readonly iterable $collection, + private readonly iterable $source, ) { - $this->value = iterable_to_list($this->collection); + $this->value = iterable_to_list($this->source); } /** diff --git a/src/RunOpenCode/Component/Dataset/src/Contract/StreamInterface.php b/src/RunOpenCode/Component/Dataset/src/Contract/StreamInterface.php index d76aa3d..731728e 100644 --- a/src/RunOpenCode/Component/Dataset/src/Contract/StreamInterface.php +++ b/src/RunOpenCode/Component/Dataset/src/Contract/StreamInterface.php @@ -15,43 +15,47 @@ interface StreamInterface extends \IteratorAggregate { /** - * Get list of stream data origins. + * Get aggregated values collected during iteration process. * - * @var list> + * @var array */ - public array $upstreams { + public array $aggregated { get; } /** - * Get list of aggregators attached to this stream. + * Check if stream has been iterated. * - * @var array> + * Do note that this denotes only if iteration started, not + * if stream has been fully iterated. + * + * This information may be used to determine if stream can + * be iterated or not as implementation assumes that all + * streams can not be rewound. */ - public array $aggregators { + public bool $closed { get; } /** - * Get aggregated values collected during iteration process. + * Get list of stream data origins. * - * @var array + * @var list> + * + * @internal */ - public array $aggregated { + public array $upstreams { get; } /** - * Check if stream has been iterated. + * Get list of aggregators attached to this stream. * - * Do note that this denotes only if iteration started, not - * if stream has been fully iterated. + * @var array> * - * This information may be used to determine if stream can - * be iterated or not as implementation assumes that all - * streams can not be rewound. + * @internal */ - public bool $closed { + public array $aggregators { get; } } diff --git a/src/RunOpenCode/Component/Dataset/src/Exception/ExpectationFailedException.php b/src/RunOpenCode/Component/Dataset/src/Exception/ExpectationFailedException.php new file mode 100644 index 0000000..615d50e --- /dev/null +++ b/src/RunOpenCode/Component/Dataset/src/Exception/ExpectationFailedException.php @@ -0,0 +1,9 @@ + $collection Collection to iterate over. - * @param positive-int $count How many items to buffer. + * @param iterable $source Stream source to iterate over. + * @param positive-int $count Number of items to store into buffer. */ public function __construct( - private readonly iterable $collection, + private readonly iterable $source, private readonly int $count = 1000, ) { - parent::__construct($collection); + parent::__construct($source); } /** @@ -43,7 +44,7 @@ protected function iterate(): \Traversable /** @var \ArrayObject $items */ $items = new \ArrayObject(); - foreach ($this->collection as $key => $value) { + foreach ($this->source as $key => $value) { $items[] = [$key, $value]; if (\count($items) === $this->count) { diff --git a/src/RunOpenCode/Component/Dataset/src/Operator/BufferWhile.php b/src/RunOpenCode/Component/Dataset/src/Operator/BufferWhile.php index 48d7fd3..4f90f85 100644 --- a/src/RunOpenCode/Component/Dataset/src/Operator/BufferWhile.php +++ b/src/RunOpenCode/Component/Dataset/src/Operator/BufferWhile.php @@ -11,7 +11,7 @@ /** * Buffer while operator. * - * Iterates over given collection and creates a buffer of items as long as given + * Iterates over given data stream and creates a buffer of items as long as given * predicate function is satisfied. * * Yields created instances of {@see Buffer} for batch processing. @@ -20,8 +20,8 @@ * @template TValue * * @phpstan-type TStorage = \ArrayObject - * @phpstan-type PredicateCallable = callable(TBuffer, TValue=, TKey=): bool * @phpstan-type TBuffer = Buffer + * @phpstan-type PredicateCallable = callable(TBuffer, TValue=, TKey=): bool * * @extends AbstractStream * @implements OperatorInterface @@ -31,14 +31,15 @@ final class BufferWhile extends AbstractStream implements OperatorInterface private readonly \Closure $predicate; /** - * @param iterable $collection Collection to iterate over. - * @param PredicateCallable $predicate Callable predicate function to evaluate. + * @param iterable $source Stream source to iterate over. + * @param PredicateCallable $predicate Predicate function to evaluate if current item should be placed into existing buffer, or + * existing buffer should be yielded and new one should be created with current item. */ public function __construct( - private readonly iterable $collection, + private readonly iterable $source, callable $predicate, ) { - parent::__construct($collection); + parent::__construct($source); $this->predicate = $predicate(...); } @@ -51,7 +52,7 @@ protected function iterate(): \Traversable $items = new \ArrayObject(); $buffer = new Buffer($items); - foreach ($this->collection as $key => $value) { + foreach ($this->source as $key => $value) { if (0 === \count($items)) { $items[] = [$key, $value]; continue; diff --git a/src/RunOpenCode/Component/Dataset/src/Operator/Distinct.php b/src/RunOpenCode/Component/Dataset/src/Operator/Distinct.php index be46d7c..8bb0723 100644 --- a/src/RunOpenCode/Component/Dataset/src/Operator/Distinct.php +++ b/src/RunOpenCode/Component/Dataset/src/Operator/Distinct.php @@ -10,20 +10,17 @@ /** * Distinct operator. * - * Distinct operator can be used in two modes, with identity callable or without it. + * The distinct operator emits only items that are unique, preserving FIFO order. + * By default, items are compared using the strict equality operator (===). * - * When identity callable is provided, it is used to determine the identity of each - * item in the collection. If two items have the same identity, only the first one - * is yielded. + * Optionally, you may provide a callable that computes an identity for each item + * based on its value and key. When provided, distinctness is determined by + * performing strict equality comparisons on the computed identities instead of the + * original items. * - * When identity callable is not provided, strict comparison (===) is used to determine - * if two items are the same. - * - * WARNING: Memory consumption of this operator depends on the number of distinct items - * and type of identity callable used. If the collection has a lot of distinct items, - * or if the identity callable produces a lot of unique identities, memory consumption - * can grow significantly. Do note that if value based comparison is used, memory consumption - * can grow even more, as all distinct values need to be stored in memory. + * WARNING: The memory consumption of this operator depends on the number of distinct + * items emitted by the upstream. As a result, it is considered memory-unsafe, since + * memory usage can grow without bound for unbounded streams. * * Example usage: * @@ -31,11 +28,13 @@ * use RunOpenCode\Component\Dataset\Operator\Distinct; * * $distinct = new Distinct( - * collection: new Dataset(['a' => 1, 'b' => 2, 'c' => 1]), + * source: ['a' => 1, 'b' => 2, 'c' => 1], * identity: static fn($value, $key): string => (string) $value, * ); * ``` * + * TODO: This operator may be refactored for comparison without identity callable. + * * @template TKey * @template TValue * @@ -49,14 +48,14 @@ final class Distinct extends AbstractStream implements OperatorInterface private readonly ?\Closure $identity; /** - * @param iterable $collection Collection to iterate over. - * @param IdentityCallable|null $identity User defined callable to determine item identity. If null, strict comparison (===) is used. + * @param iterable $source Stream source to iterate over. + * @param IdentityCallable|null $identity User defined callable to determine item identity. If null, strict comparison (===) of values is used. */ public function __construct( - private readonly iterable $collection, + private readonly iterable $source, ?callable $identity = null, ) { - parent::__construct($this->collection); + parent::__construct($this->source); $this->identity = $identity ? $identity(...) : null; } @@ -77,7 +76,7 @@ private function identifiable(): iterable \assert(null !== $this->identity); - foreach ($this->collection as $key => $item) { + foreach ($this->source as $key => $item) { $identity = ($this->identity)($item, $key); if (isset($identities[$identity])) { @@ -97,7 +96,7 @@ private function generic(): iterable { $emitted = []; - foreach ($this->collection as $key => $item) { + foreach ($this->source as $key => $item) { if (\in_array($item, $emitted, true)) { continue; } diff --git a/src/RunOpenCode/Component/Dataset/src/Operator/Filter.php b/src/RunOpenCode/Component/Dataset/src/Operator/Filter.php index 961dab2..777f769 100644 --- a/src/RunOpenCode/Component/Dataset/src/Operator/Filter.php +++ b/src/RunOpenCode/Component/Dataset/src/Operator/Filter.php @@ -10,21 +10,16 @@ /** * Filter operator. * - * Filter operator iterates over given collection and yields only those items + * Filter operator iterates over given stream source and yields only those items * for which user defined callable returns true. * - * User defined callable receives three arguments: - * - current item value - * - current item key - * - original collection being iterated - * * Example usage: * * ```php * use RunOpenCode\Component\Dataset\Operator\Filter; * * $filter = new Filter( - * collection: new Dataset(['a' => 1, 'b' => 2, 'c' => 3]), + * source: ['a' => 1, 'b' => 2, 'c' => 3], * filter: static fn(int $value, string $key): bool => $value > 1, * ); * ``` @@ -42,14 +37,14 @@ final class Filter extends AbstractStream implements OperatorInterface private readonly \Closure $filter; /** - * @param iterable $collection Collection to iterate over. - * @param FilterCallable $filter User defined callable to filter items. + * @param iterable $source Stream source to iterate over. + * @param FilterCallable $filter User defined callable to filter items. */ public function __construct( - private readonly iterable $collection, + private readonly iterable $source, callable $filter, ) { - parent::__construct($this->collection); + parent::__construct($this->source); $this->filter = $filter(...); } @@ -58,7 +53,7 @@ public function __construct( */ protected function iterate(): \Traversable { - foreach ($this->collection as $key => $value) { + foreach ($this->source as $key => $value) { if (!($this->filter)($value, $key)) { continue; } diff --git a/src/RunOpenCode/Component/Dataset/src/Operator/Finalize.php b/src/RunOpenCode/Component/Dataset/src/Operator/Finalize.php index e016aaa..57918d8 100644 --- a/src/RunOpenCode/Component/Dataset/src/Operator/Finalize.php +++ b/src/RunOpenCode/Component/Dataset/src/Operator/Finalize.php @@ -10,11 +10,14 @@ /** * Finalize operator. * - * Finalize operator iterates over given collection and yields all - * items. When iteration is completed or exception is thrown, - * it invokes finalization function. + * Iterates over the given stream source and yields its items. When iteration + * completes or an exception occurs, the finalization function is invoked. * - * Behavior is same as if iteration is within try/finally block. + * This is equivalent to executing the iteration inside a try/finally block. + * + * If iteration of the stream source ends prematurely (for example, via a `break` + * statement), the finalization function is invoked when the operator instance + * is garbage-collected. * * Example usage: * @@ -22,7 +25,7 @@ * use RunOpenCode\Component\Dataset\Operator\Finalize; * * $finalize = new Finalize( - * collection: new Dataset(['a' => 1, 'b' => 2, 'c' => 3]), + * source: ['a' => 1, 'b' => 2, 'c' => 3], * finalizer: static fn(): void => // finalization logic, * ); * ``` @@ -39,15 +42,18 @@ final class Finalize extends AbstractStream implements OperatorInterface { private \Closure $finalizer; + private bool $finalized = false; + /** - * @param iterable $collection Collection to iterate over. - * @param FinalizerCallable $finalizer User defined callable to invoke when iterator is depleted or exception is thrown. + * @param iterable $source Stream source to iterate over. + * @param FinalizerCallable $finalizer User defined callable to invoke when iterator is depleted, or exception + * is thrown, or operator instance is garbage collected. */ public function __construct( - iterable $collection, - callable $finalizer, + private readonly iterable $source, + callable $finalizer, ) { - parent::__construct($collection); + parent::__construct($source); $this->finalizer = $finalizer(...); } @@ -56,10 +62,25 @@ public function __construct( */ protected function iterate(): \Traversable { + $this->finalized = true; + try { - yield from $this; + yield from $this->source; } finally { ($this->finalizer)(); } } + + /** + * Ensure finalization logic is executed. + */ + public function __destruct() + { + if ($this->finalized) { + return; + } + + $this->finalized = true; + ($this->finalizer)(); + } } diff --git a/src/RunOpenCode/Component/Dataset/src/Operator/Flatten.php b/src/RunOpenCode/Component/Dataset/src/Operator/Flatten.php index d3758da..7fd3615 100644 --- a/src/RunOpenCode/Component/Dataset/src/Operator/Flatten.php +++ b/src/RunOpenCode/Component/Dataset/src/Operator/Flatten.php @@ -10,7 +10,7 @@ /** * Flatten operator. * - * Flatten operator iterates over given collection of iterables and yields + * Flatten operator iterates over given stream of iterables and yields * each item from each iterable in a single flat sequence. * * By default, keys from inner iterables are not preserved, which can be @@ -22,7 +22,7 @@ * use RunOpenCode\Component\Dataset\Operator\Flatten; * * $flatten = new Flatten( - * collection: new Dataset(['first' => ['a' => 1, 'b' => 3], 'second' => ['c' => 5]]), + * source: ['first' => ['a' => 1, 'b' => 3], 'second' => ['c' => 5]], * ); * // The resulting sequence will be: 0 => 1, 1 => 3, 2 => 5 * // With `preserveKeys` set to true, resulting sequence would be: 'a' => 1, 'b' => 3, 'c' => 5 @@ -37,14 +37,14 @@ final class Flatten extends AbstractStream implements OperatorInterface { /** - * @param iterable> $collection Collection to iterate over. - * @param bool $preserveKeys Should keys be preserved from the flattened collections, false by default. + * @param iterable> $source Stream of streams to iterate over. + * @param bool $preserveKeys Should keys be preserved from the flattened stream, false by default. */ public function __construct( - private readonly iterable $collection, + private readonly iterable $source, private readonly bool $preserveKeys = false, ) { - parent::__construct($this->collection); + parent::__construct($this->source); } /** @@ -52,7 +52,7 @@ public function __construct( */ protected function iterate(): \Traversable { - foreach ($this->collection as $items) { + foreach ($this->source as $items) { foreach ($items as $key => $value) { if ($this->preserveKeys) { yield $key => $value; diff --git a/src/RunOpenCode/Component/Dataset/src/Operator/IfEmpty.php b/src/RunOpenCode/Component/Dataset/src/Operator/IfEmpty.php index 81d97c8..1b8d28e 100644 --- a/src/RunOpenCode/Component/Dataset/src/Operator/IfEmpty.php +++ b/src/RunOpenCode/Component/Dataset/src/Operator/IfEmpty.php @@ -6,52 +6,52 @@ use RunOpenCode\Component\Dataset\AbstractStream; use RunOpenCode\Component\Dataset\Contract\OperatorInterface; +use RunOpenCode\Component\Dataset\Exception\StreamEmptyException; /** * IfEmpty operator. * - * IfEmpty operator tracks number of yielded items. If stream was empty, it - * will yield provided callable and yield from it as alternative source of + * IfEmpty operator tracks the number of yielded items. If the stream is empty, it + * will invoke the provided callable and yield from it as an alternative source of * data. * - * If exception is provided instead of callable, that exception will be thrown - * instead. + * If exception is provided instead of alternative source of data, that exception + * will be thrown. * * Example usage: * * ```php * use RunOpenCode\Component\Dataset\Operator\IfEmpty; * - * $at = new IfEmpty( - * collection: new Dataset([]), - * action: static fn(): iterable => new Dataset(['a' => 1, 'b' => 2, 'c' => 3]), + * $stream = new IfEmpty( + * source: [], + * action: static fn(): iterable => ['a' => 1, 'b' => 2, 'c' => 3], * ); * ``` * * @template TKey * @template TValue - * @template TAlternativeKey - * @template TAlternativeValue * - * @phpstan-type ActionCallable = callable(): (iterable|never) + * @phpstan-type FallbackSourceCallable = callable(): iterable * - * @extends AbstractStream - * @implements OperatorInterface + * @extends AbstractStream + * @implements OperatorInterface */ final class IfEmpty extends AbstractStream implements OperatorInterface { - private \Closure $action; + private \Closure $fallback; /** - * @param iterable $collection Collection to iterate over. - * @param \Exception|ActionCallable $action Action to execute if original stream is empty (or exception to throw). + * @param iterable $source Stream source to iterate over. + * @param FallbackSourceCallable|\Throwable|null $fallback Fallback stream source, or exception to throw. */ public function __construct( - private readonly iterable $collection, - \Exception|callable $action, + private readonly iterable $source, + callable|\Throwable|null $fallback, ) { - parent::__construct($collection); - $this->action = $action instanceof \Exception ? static fn(): never => throw $action : $action(...); + parent::__construct($source); + $fallback = $fallback ?? new StreamEmptyException(); + $this->fallback = $fallback instanceof \Throwable ? static fn(): never => throw $fallback : $fallback(...); } /** @@ -61,13 +61,13 @@ protected function iterate(): \Traversable { $counter = 0; - foreach ($this->collection as $key => $value) { + foreach ($this->source as $key => $value) { yield $key => $value; $counter++; } if (0 === $counter) { - yield from ($this->action)(); + yield from ($this->fallback)(); } } } diff --git a/src/RunOpenCode/Component/Dataset/src/Operator/Map.php b/src/RunOpenCode/Component/Dataset/src/Operator/Map.php index 10508f3..97f6e0e 100644 --- a/src/RunOpenCode/Component/Dataset/src/Operator/Map.php +++ b/src/RunOpenCode/Component/Dataset/src/Operator/Map.php @@ -6,15 +6,15 @@ use RunOpenCode\Component\Dataset\AbstractStream; use RunOpenCode\Component\Dataset\Contract\OperatorInterface; +use RunOpenCode\Component\Dataset\Exception\LogicException; /** * Map operator. * - * Map operator iterates over given collection and yields transformed items. + * Map operator iterates over given stream source and applies transformation + * functions on keys/values before yielding. * - * User must provide a callable to transform each item value. Additionally, - * user may provide a callable to transform each item key. If key transform - * callable is not provided, original keys are preserved. + * Operator may be used to transform only keys, or only values, or both. * * Example usage: * @@ -22,7 +22,7 @@ * use RunOpenCode\Component\Dataset\Operator\Map; * * $map = new Map( - * collection: new Dataset(['a' => 1, 'b' => 2, 'c' => 3]), + * source: ['a' => 1, 'b' => 2, 'c' => 3], * valueTransform: static fn(int $value, string $key): int => $value * 2, * keyTransform: static fn(string $key, int $value): string => \strtoupper($key), * ); @@ -47,18 +47,22 @@ final class Map extends AbstractStream implements OperatorInterface private readonly \Closure $keyTransform; /** - * @param iterable $collection Collection to iterate over. - * @param ValueTransformCallable $valueTransform User defined callable to transform item values. - * @param KeyTransformCallable|null $keyTransform User defined callable to transform item keys. If null, original keys are preserved. + * @param iterable $source Stream source to iterate over. + * @param ValueTransformCallable|null $valueTransform Optional transformation function for transforming values. + * @param KeyTransformCallable|null $keyTransform Optional transformation function for transforming keys. */ public function __construct( - private readonly iterable $collection, - callable $valueTransform, + private readonly iterable $source, + ?callable $valueTransform = null, ?callable $keyTransform = null ) { - parent::__construct($this->collection); - $this->valueTransform = $valueTransform(...); - $this->keyTransform = ($keyTransform ?? static fn($key, $value): mixed => $key)(...); + if (null === $valueTransform && null === $keyTransform) { + throw new LogicException('At least one transformation function must be provided, either for keys or for values.'); + } + + parent::__construct($this->source); + $this->valueTransform = ($valueTransform ?? static fn(mixed $value, mixed $key): mixed => $value)(...); + $this->keyTransform = ($keyTransform ?? static fn(mixed $key, mixed $value): mixed => $key)(...); } /** @@ -66,7 +70,7 @@ public function __construct( */ public function iterate(): \Traversable { - foreach ($this->collection as $key => $value) { + foreach ($this->source as $key => $value) { yield ($this->keyTransform)($key, $value) => ($this->valueTransform)($value, $key); } } diff --git a/src/RunOpenCode/Component/Dataset/src/Operator/Merge.php b/src/RunOpenCode/Component/Dataset/src/Operator/Merge.php index fc05387..c2db72d 100644 --- a/src/RunOpenCode/Component/Dataset/src/Operator/Merge.php +++ b/src/RunOpenCode/Component/Dataset/src/Operator/Merge.php @@ -10,7 +10,7 @@ /** * Merge operator. * - * Merge operator iterates over two given collections and yields all items from both collections. + * Merges two streaming sources into a single stream, yielding items from both sources. * * Example usage: * @@ -18,8 +18,8 @@ * use RunOpenCode\Component\Dataset\Operator\Merge; * * $merged = new Merge( - * first: new Dataset(['a' => 1, 'b' => 2]), - * second: new Dataset(['c' => 3, 'd' => 4]), + * first: ['a' => 1, 'b' => 2], + * second: ['c' => 3, 'd' => 4], * ); * * // The resulting sequence will be: 'a' => 1, 'b' => 2, 'c' => 3, 'd' => 4 @@ -36,8 +36,8 @@ final class Merge extends AbstractStream implements OperatorInterface { /** - * @param iterable $first First collection to iterate over. - * @param iterable $second Second collection to iterate over. + * @param iterable $first First stream source to iterate over. + * @param iterable $second Second stream source to iterate over. */ public function __construct( private readonly iterable $first, diff --git a/src/RunOpenCode/Component/Dataset/src/Operator/Overflow.php b/src/RunOpenCode/Component/Dataset/src/Operator/Overflow.php index 0e778ac..0608f51 100644 --- a/src/RunOpenCode/Component/Dataset/src/Operator/Overflow.php +++ b/src/RunOpenCode/Component/Dataset/src/Operator/Overflow.php @@ -6,44 +6,54 @@ use RunOpenCode\Component\Dataset\AbstractStream; use RunOpenCode\Component\Dataset\Contract\OperatorInterface; +use RunOpenCode\Component\Dataset\Exception\StreamOverflowException; /** * Overflow operator. * - * Overflow operator tracks number of yielded items and throws exception if - * stream produced more than defined number of allowed items. + * Monitors the number of items yielded by the stream and raises an exception when + * the allowed limit is exceeded. * * Example usage: * * ```php * use RunOpenCode\Component\Dataset\Operator\Overflow; * - * $at = new Overflow( - * collection: new Dataset(['a' => 1, 'b' => 2, 'c' => 3]), + * $stream = new Overflow( + * source: ['a' => 1, 'b' => 2, 'c' => 3], * capacity: 2, - * exception: new \Exception('Max number of items exceeded.'), + * throw: new \Exception('Max number of items exceeded.'), * ); * ``` * * @template TKey * @template TValue * + * @phpstan-type ThrowableFactoryCallable = callable(StreamOverflowException=): \Throwable + * * @extends AbstractStream * @implements OperatorInterface */ final class Overflow extends AbstractStream implements OperatorInterface { + private \Closure $throw; + /** - * @param iterable $collection Collection to iterate over. - * @param positive-int $capacity Max number of items to iterate over. - * @param \Exception|null $exception Which exception to throw if collection has more then allowed items ({@see \OverflowException} by default). + * @param iterable $source Collection to iterate over. + * @param positive-int $capacity Maximum number of items to iterate over. + * @param \Throwable|ThrowableFactoryCallable|null $throw Exception to throw if stream yielded more items then capacity allows. */ public function __construct( - private readonly iterable $collection, - private readonly int $capacity, - private readonly ?\Exception $exception = null, + private readonly iterable $source, + private readonly int $capacity, + \Throwable|callable|null $throw = null, ) { - parent::__construct($collection); + parent::__construct($source); + $this->throw = match (true) { + null === $throw => fn(): never => throw new StreamOverflowException($this->capacity), + $throw instanceof \Throwable => static fn(): never => throw $throw, + default => fn(): never => throw $throw(new StreamOverflowException($this->capacity)), + }; } /** @@ -53,12 +63,9 @@ protected function iterate(): \Traversable { $counter = 0; - foreach ($this->collection as $key => $value) { + foreach ($this->source as $key => $value) { if ($counter >= $this->capacity) { - throw $this->exception ?? new \OverflowException(\sprintf( - 'Defined capacity of %d items exceeded.', - $this->capacity - )); + ($this->throw)(); } yield $key => $value; diff --git a/src/RunOpenCode/Component/Dataset/src/Operator/Reverse.php b/src/RunOpenCode/Component/Dataset/src/Operator/Reverse.php index 3f3cc77..2dd60b4 100644 --- a/src/RunOpenCode/Component/Dataset/src/Operator/Reverse.php +++ b/src/RunOpenCode/Component/Dataset/src/Operator/Reverse.php @@ -10,7 +10,7 @@ /** * Reverse operator. * - * Reverse operator iterates over given collection and yields items in reverse order. + * Reverse operator iterates over given stream source and yields items in reverse order. * * WARNING: this is not memory efficient operator. * @@ -20,7 +20,7 @@ * use RunOpenCode\Component\Dataset\Operator\Reverse; * * $reverse = new Reverse( - * collection: new Dataset(['a' => 1, 'b' => 2, 'c' => 3]), + * source: ['a' => 1, 'b' => 2, 'c' => 3], * ); * // The resulting sequence will be: 'c' => 3, 'b' => 2, 'a' => 1 * ``` @@ -34,12 +34,12 @@ final class Reverse extends AbstractStream implements OperatorInterface { /** - * @param iterable $collection Collection to iterate over in reverse order. + * @param iterable $source Stream source to iterate over in reverse order. */ public function __construct( - private readonly iterable $collection, + private readonly iterable $source, ) { - parent::__construct($this->collection); + parent::__construct($this->source); } /** @@ -49,7 +49,7 @@ protected function iterate(): \Traversable { $buffer = []; - foreach ($this->collection as $key => $value) { + foreach ($this->source as $key => $value) { $buffer[] = [$key, $value]; } diff --git a/src/RunOpenCode/Component/Dataset/src/Operator/Skip.php b/src/RunOpenCode/Component/Dataset/src/Operator/Skip.php index fc88b01..76308ba 100644 --- a/src/RunOpenCode/Component/Dataset/src/Operator/Skip.php +++ b/src/RunOpenCode/Component/Dataset/src/Operator/Skip.php @@ -10,8 +10,8 @@ /** * Skip operator. * - * Skip operator iterates over given collection and skips the first N items, - * where N is defined by user, yielding the rest of the items. + * The skip operator processes a stream source by discarding the first N items + * and yielding all subsequent items. * * Example usage: * @@ -19,7 +19,7 @@ * use RunOpenCode\Component\Dataset\Operator\Skip; * * $skip = new Skip( - * collection: new Dataset(['a' => 1, 'b' => 2, 'c' => 3]), + * stream: ['a' => 1, 'b' => 2, 'c' => 3], * count: 2, * ); * @@ -35,14 +35,14 @@ final class Skip extends AbstractStream implements OperatorInterface { /** - * @param iterable $collection Collection to iterate over. - * @param positive-int $count Number of items to skip. + * @param iterable $source Stream source to iterate over. + * @param positive-int $count Number of items to skip. */ public function __construct( - private readonly iterable $collection, + private readonly iterable $source, private readonly int $count, ) { - parent::__construct($this->collection); + parent::__construct($this->source); } /** @@ -52,7 +52,7 @@ protected function iterate(): \Traversable { $count = 0; - foreach ($this->collection as $key => $value) { + foreach ($this->source as $key => $value) { $count++; if ($count <= $this->count) { diff --git a/src/RunOpenCode/Component/Dataset/src/Operator/Sort.php b/src/RunOpenCode/Component/Dataset/src/Operator/Sort.php index 25b8e88..bb2fcdf 100644 --- a/src/RunOpenCode/Component/Dataset/src/Operator/Sort.php +++ b/src/RunOpenCode/Component/Dataset/src/Operator/Sort.php @@ -12,7 +12,7 @@ * * Sort operator iterates over given collection and yields items sorted by keys or values. * User may provide custom comparator function or use default one, which uses spaceship - * operator (<=>) on values or keys (defined by user). + * operator (<=>) on values or keys. * * Keys are preserved. * @@ -24,13 +24,13 @@ * use RunOpenCode\Component\Dataset\Operator\Sort; * * $sortByValues = new Sort( - * collection: new Dataset(['a' => 3, 'b' => 1, 'c' => 2]), + * source: ['a' => 3, 'b' => 1, 'c' => 2], * comparator: static fn(int $first, int $second): int => $first <=> $second, * byKeys: false, * ); * * $sortByKeys = new Sort( - * collection: new Dataset(['a' => 3, 'b' => 1, 'c' => 2]), + * source: ['a' => 3, 'b' => 1, 'c' => 2], * comparator: static fn(string $first, string $second): int => \strcmp($first, $second), * byKeys: true, * ); @@ -51,16 +51,16 @@ final class Sort extends AbstractStream implements OperatorInterface private readonly \Closure $sorter; /** - * @param iterable $collection Collection to iterate over. + * @param iterable $source Stream source to iterate over. * @param ($byKeys is true ? ValueComparatorCallable : KeyComparatorCallable)|null $comparator User defined callable to compare two items. If null, spaceship operator (<=>) is used. - * @param bool $byKeys If `byKeys` is true, keys will be compared instead of values. + * @param bool $byKeys If `$byKeys` is `true`, keys will be compared instead of values. */ public function __construct( - private readonly iterable $collection, + private readonly iterable $source, ?callable $comparator = null, private readonly bool $byKeys = false, ) { - parent::__construct($this->collection); + parent::__construct($this->source); $this->sorter = $comparator ? $comparator(...) : static fn(mixed $first, mixed $second): int => $first <=> $second; } @@ -72,7 +72,7 @@ protected function iterate(): \Traversable $items = []; $getValue = fn(array $item): mixed => $this->byKeys ? $item[0] : $item[1]; - foreach ($this->collection as $key => $value) { + foreach ($this->source as $key => $value) { $items[] = [$key, $value]; } diff --git a/src/RunOpenCode/Component/Dataset/src/Operator/Take.php b/src/RunOpenCode/Component/Dataset/src/Operator/Take.php index ae1cacf..7ef89e4 100644 --- a/src/RunOpenCode/Component/Dataset/src/Operator/Take.php +++ b/src/RunOpenCode/Component/Dataset/src/Operator/Take.php @@ -10,8 +10,7 @@ /** * Take operator. * - * Take operator iterates over given collection and yields only the first N items, - * where N is defined by user. + * Take operator iterates over given stream source and yields only the first N items. * * Example usage: * @@ -19,7 +18,7 @@ * use RunOpenCode\Component\Dataset\Operator\Take; * * $take = new Take( - * collection: new Dataset(['a' => 1, 'b' => 2, 'c' => 3]), + * source: ['a' => 1, 'b' => 2, 'c' => 3], * count: 2, * ); * // $take will yield ['a' => 1, 'b' => 2] @@ -34,14 +33,14 @@ final class Take extends AbstractStream implements OperatorInterface { /** - * @param iterable $collection Collection to iterate over. - * @param positive-int $count Number of items to yield. + * @param iterable $source Stream source to iterate over. + * @param positive-int $count Number of items to yield. */ public function __construct( - private readonly iterable $collection, + private readonly iterable $source, private readonly int $count, ) { - parent::__construct($this->collection); + parent::__construct($this->source); } /** @@ -51,7 +50,7 @@ protected function iterate(): \Traversable { $count = 0; - foreach ($this->collection as $key => $value) { + foreach ($this->source as $key => $value) { $count++; if ($count > $this->count) { diff --git a/src/RunOpenCode/Component/Dataset/src/Operator/TakeUntil.php b/src/RunOpenCode/Component/Dataset/src/Operator/TakeUntil.php index 56946a7..684c3d2 100644 --- a/src/RunOpenCode/Component/Dataset/src/Operator/TakeUntil.php +++ b/src/RunOpenCode/Component/Dataset/src/Operator/TakeUntil.php @@ -10,8 +10,8 @@ /** * Take until operator. * - * Take operator iterates over given collection and yields only the first N items - * until condition is met. + * The take operator processes a stream source and yields items until the predicate + * callable indicates that iteration should stop. * * Example usage: * @@ -19,7 +19,7 @@ * use RunOpenCode\Component\Dataset\Operator\Take; * * $takeUntil = new TakeUntil( - * collection: new Dataset(['a' => 1, 'b' => 2, 'c' => 3]), + * collection: ['a' => 1, 'b' => 2, 'c' => 3], * predicate: static fn(int $value, string $key): bool => $value > 2, * ); * // $takeUntil will yield ['a' => 1, 'b' => 2] @@ -28,7 +28,7 @@ * @template TKey * @template TValue * - * @phpstan-type PredicateCallable = callable(TValue, TKey): bool + * @phpstan-type PredicateCallable = callable(TValue, TKey=): bool * * @extends AbstractStream * @implements OperatorInterface @@ -38,14 +38,14 @@ final class TakeUntil extends AbstractStream implements OperatorInterface private readonly \Closure $predicate; /** - * @param iterable $collection Collection to iterate over. - * @param callable $predicate Callable predicate function to evaluate. + * @param iterable $source Stream source to iterate over. + * @param callable $predicate Predicate callable to evaluate stop condition. */ public function __construct( - private readonly iterable $collection, + private readonly iterable $source, callable $predicate, ) { - parent::__construct($collection); + parent::__construct($source); $this->predicate = $predicate(...); } @@ -54,8 +54,8 @@ public function __construct( */ protected function iterate(): \Traversable { - foreach ($this->collection as $key => $value) { - if (($this->predicate)($value, $key, $this->collection)) { + foreach ($this->source as $key => $value) { + if (($this->predicate)($value, $key, $this->source)) { break; } diff --git a/src/RunOpenCode/Component/Dataset/src/Operator/Tap.php b/src/RunOpenCode/Component/Dataset/src/Operator/Tap.php index 80eab3c..c323127 100644 --- a/src/RunOpenCode/Component/Dataset/src/Operator/Tap.php +++ b/src/RunOpenCode/Component/Dataset/src/Operator/Tap.php @@ -10,17 +10,16 @@ /** * Tap operator. * - * Allows to "tap into" the iteration loop and execute a callback for each item in the collection without modifying - * the items themselves. + * Provides a way to observe the stream by executing a callback for each item. * * Example usage: * * ```php + * * use RunOpenCode\Component\Dataset\Operator\Tap; - * use RunOpenCode\Component\Dataset\Dataset; * * $tap = new Tap( - * collection: new Dataset(['a' => 1, 'b' => 2, 'c' => 3]), + * collection: ['a' => 1, 'b' => 2, 'c' => 3], * spy: static fn(int $value, string $key): void => print("Key: $key, Value: $value\n"), * ); * ``` @@ -28,7 +27,7 @@ * @template TKey * @template TValue * - * @phpstan-type TapCallable = callable(TValue, TKey): void + * @phpstan-type TapCallable = callable(TValue, TKey=): void * * @extends AbstractStream * @implements OperatorInterface @@ -38,14 +37,14 @@ final class Tap extends AbstractStream implements OperatorInterface private \Closure $callback; /** - * @param iterable $collection Collection to iterate over. - * @param TapCallable $callback User defined callable to execute for each item. + * @param iterable $source Stream source to iterate over. + * @param TapCallable $callback Callable to execute for each item. */ public function __construct( - private readonly iterable $collection, + private readonly iterable $source, callable $callback, ) { - parent::__construct($this->collection); + parent::__construct($this->source); $this->callback = $callback(...); } @@ -54,7 +53,7 @@ public function __construct( */ protected function iterate(): \Traversable { - foreach ($this->collection as $key => $value) { + foreach ($this->source as $key => $value) { ($this->callback)($value, $key); yield $key => $value; diff --git a/src/RunOpenCode/Component/Dataset/src/Reducer/Average.php b/src/RunOpenCode/Component/Dataset/src/Reducer/Average.php index e568df1..4daf4cb 100644 --- a/src/RunOpenCode/Component/Dataset/src/Reducer/Average.php +++ b/src/RunOpenCode/Component/Dataset/src/Reducer/Average.php @@ -7,7 +7,7 @@ use RunOpenCode\Component\Dataset\Contract\ReducerInterface; /** - * Reducer which calculates average of values from a collection of values. + * Reducer which calculates average of values from a stream of values. * * Null values are ignored. You may define if null values are included in * the count of items when calculating average. diff --git a/src/RunOpenCode/Component/Dataset/src/Reducer/Max.php b/src/RunOpenCode/Component/Dataset/src/Reducer/Max.php index 79fe9ca..9745951 100644 --- a/src/RunOpenCode/Component/Dataset/src/Reducer/Max.php +++ b/src/RunOpenCode/Component/Dataset/src/Reducer/Max.php @@ -7,7 +7,7 @@ use RunOpenCode\Component\Dataset\Contract\ReducerInterface; /** - * Reducer which calculates maximum value from a collection of values. + * Reducer which calculates maximum value from a stream of values. * * Null values are ignored. * diff --git a/src/RunOpenCode/Component/Dataset/src/Reducer/Min.php b/src/RunOpenCode/Component/Dataset/src/Reducer/Min.php index 65f248e..b3eaac8 100644 --- a/src/RunOpenCode/Component/Dataset/src/Reducer/Min.php +++ b/src/RunOpenCode/Component/Dataset/src/Reducer/Min.php @@ -7,7 +7,7 @@ use RunOpenCode\Component\Dataset\Contract\ReducerInterface; /** - * Reducer which calculates minimum value from a collection of values. + * Reducer which calculates minimum value from a stream of values. * * Null values are ignored. * diff --git a/src/RunOpenCode/Component/Dataset/src/Reducer/Sum.php b/src/RunOpenCode/Component/Dataset/src/Reducer/Sum.php index 6b5e303..337a797 100644 --- a/src/RunOpenCode/Component/Dataset/src/Reducer/Sum.php +++ b/src/RunOpenCode/Component/Dataset/src/Reducer/Sum.php @@ -7,7 +7,7 @@ use RunOpenCode\Component\Dataset\Contract\ReducerInterface; /** - * Reducer which calculates sum of values from a collection of values. + * Reducer which calculates sum of values from a stream of values. * * Null values are ignored. * diff --git a/src/RunOpenCode/Component/Dataset/src/Stream.php b/src/RunOpenCode/Component/Dataset/src/Stream.php index f90b229..65f5359 100644 --- a/src/RunOpenCode/Component/Dataset/src/Stream.php +++ b/src/RunOpenCode/Component/Dataset/src/Stream.php @@ -5,6 +5,7 @@ namespace RunOpenCode\Component\Dataset; use RunOpenCode\Component\Dataset\Contract\CollectorInterface; +use RunOpenCode\Component\Dataset\Contract\OperatorInterface; use RunOpenCode\Component\Dataset\Contract\ReducerInterface; use RunOpenCode\Component\Dataset\Model\Buffer; @@ -28,9 +29,10 @@ use function RunOpenCode\Component\Dataset\finalize as dataset_finalize; use function RunOpenCode\Component\Dataset\if_empty as dataset_if_empty; use function RunOpenCode\Component\Dataset\overflow as dataset_overflow; +use function RunOpenCode\Component\Dataset\operator as dataset_operator; /** - * Dataset iterable stream. + * Iterable data stream. * * @template TKey * @template TValue @@ -40,12 +42,12 @@ final class Stream extends AbstractStream { /** - * @param iterable $collection + * @param iterable $source Stream source to wrap. */ public function __construct( - private readonly iterable $collection, + private readonly iterable $source, ) { - parent::__construct($collection); + parent::__construct($source); } /** @@ -54,13 +56,13 @@ public function __construct( * @template Key * @template Value * - * @param iterable $collection Collection to wrap. + * @param iterable $source Stream source. * * @return self */ - public static function create(iterable $collection): self + public static function create(iterable $source): self { - return new self($collection); + return new self($source); } /** @@ -142,7 +144,7 @@ public function finalize(callable $finalizer): self */ public function flatten(bool $preserveKeys = false): self { - return dataset_flatten($this, $preserveKeys); + return dataset_flatten($this, $preserveKeys); // @phpstan-ignore-line } /** @@ -156,20 +158,17 @@ public function flush(): self } /** - * Applies if empty operator. + * Applies "if empty" operator. * - * @template TAlternativeKey - * @template TAlternativeValue + * @param \Throwable|(callable(): iterable)|null $fallback Fallback stream source, or exception to throw. * - * @param \Exception|(callable(): iterable|never) $action Action to undertake if collection is empty, or exception to throw. - * - * @return Stream + * @return Stream * * @see Operator\IfEmpty */ - public function ifEmpty(\Exception|callable $action): self + public function ifEmpty(\Throwable|callable|null $fallback = null): self { - return dataset_if_empty($this, $action); + return dataset_if_empty($this, $fallback); } /** @@ -284,7 +283,7 @@ public function take(int $count): self /** * Applies takeUntil operator on current stream. * - * @param callable(TValue, TKey): bool $predicate User defined callable to evaluate. + * @param callable(TValue, TKey=): bool $predicate User defined callable to evaluate. * * @return self * @@ -298,7 +297,7 @@ public function takeUntil(callable $predicate): self /** * Applies tap operator on current stream. * - * @param callable(TValue, TKey): void $callback User defined callable to be called on each item. + * @param callable(TValue, TKey=): void $callback User defined callable to be called on each item. * * @return self * @@ -309,6 +308,22 @@ public function tap(callable $callback): self return dataset_tap($this, $callback); } + /** + * Applies custom operator on current stream. + * + * @template TOutputKey + * @template TOutputValue + * + * @param class-string> $operator Class name of the custom operator. + * @param mixed ...$arguments Arguments passed to the operator. + * + * @return self + */ + public function operator(string $operator, mixed ...$arguments): self + { + return dataset_operator($this, $operator, ...$arguments); + } + /** * @template TReducedValue * @template TReducer of ReducerInterface @@ -365,6 +380,6 @@ public function reduce(callable|string $reducer, mixed ...$args): mixed */ protected function iterate(): \Traversable { - yield from $this->collection; + yield from $this->source; } } diff --git a/src/RunOpenCode/Component/Dataset/src/functions.php b/src/RunOpenCode/Component/Dataset/src/functions.php index 8a1f90c..4657cec 100644 --- a/src/RunOpenCode/Component/Dataset/src/functions.php +++ b/src/RunOpenCode/Component/Dataset/src/functions.php @@ -6,8 +6,10 @@ use RunOpenCode\Component\Dataset\Aggregator\Aggregator; use RunOpenCode\Component\Dataset\Contract\CollectorInterface; +use RunOpenCode\Component\Dataset\Contract\OperatorInterface; use RunOpenCode\Component\Dataset\Contract\ReducerInterface; use RunOpenCode\Component\Dataset\Contract\StreamInterface; +use RunOpenCode\Component\Dataset\Exception\StreamOverflowException; use RunOpenCode\Component\Dataset\Model\Buffer; /** @@ -49,18 +51,18 @@ function iterable_to_list(iterable $iterable): array } /** - * Create batch operator. + * Create new stream. * * @template TKey * @template TValue * - * @param iterable $collection Collection to iterate over. + * @param iterable $source Stream source to iterate over. * * @return Stream */ -function stream(iterable $collection): Stream +function stream(iterable $source): Stream { - return new Stream($collection); + return new Stream($source); } /** @@ -69,17 +71,17 @@ function stream(iterable $collection): Stream * @template TKey * @template TValue * - * @param iterable $collection Collection to iterate over. - * @param positive-int $count How many items to buffer. + * @param iterable $source Stream source to iterate over. + * @param positive-int $count Number of items to store into buffer. * * @return Stream> * * @see Operator\BufferCount */ -function buffer_count(iterable $collection, int $count): Stream +function buffer_count(iterable $source, int $count): Stream { return new Stream( - new Operator\BufferCount($collection, $count) + new Operator\BufferCount($source, $count) ); } @@ -89,17 +91,18 @@ function buffer_count(iterable $collection, int $count): Stream * @template TKey * @template TValue * - * @param iterable $collection Collection to iterate over. - * @param callable(Buffer, TValue=, TKey=): bool $predicate Callable predicate function to evaluate. + * @param iterable $source Stream source to iterate over. + * @param callable(Buffer, TValue=, TKey=): bool $predicate Predicate function to evaluate if current item should be placed into existing buffer, or + * existing buffer should be yielded and new one should be created with current item. * * @return Stream> * * @see Operator\BufferWhile */ -function buffer_while(iterable $collection, callable $predicate): Stream +function buffer_while(iterable $source, callable $predicate): Stream { return new Stream( - new Operator\BufferWhile($collection, $predicate) + new Operator\BufferWhile($source, $predicate) ); } @@ -109,17 +112,17 @@ function buffer_while(iterable $collection, callable $predicate): Stream * @template TKey * @template TValue * - * @param iterable $collection Collection to iterate over. - * @param (callable(TValue, TKey=): string)|null $identity User defined callable to determine item identity. If null, strict comparison (===) is used. + * @param iterable $source Stream source to iterate over. + * @param (callable(TValue, TKey=): string)|null $identity User defined callable to determine item identity. If null, strict comparison (===) is used. * * @return Stream * * @see Operator\Distinct */ -function distinct(iterable $collection, ?callable $identity = null): Stream +function distinct(iterable $source, ?callable $identity = null): Stream { return new Stream( - new Operator\Distinct($collection, $identity) + new Operator\Distinct($source, $identity) ); } @@ -129,17 +132,17 @@ function distinct(iterable $collection, ?callable $identity = null): Stream * @template TKey * @template TValue * - * @param iterable $collection Collection to iterate over. - * @param callable(TValue, TKey=): bool $filter User defined callable to filter items. + * @param iterable $source Stream source to iterate over. + * @param callable(TValue, TKey=): bool $filter User defined callable to filter items. * * @return Stream * * @see Operator\Filter */ -function filter(iterable $collection, callable $filter): Stream +function filter(iterable $source, callable $filter): Stream { return new Stream( - new Operator\Filter($collection, $filter) + new Operator\Filter($source, $filter) ); } @@ -149,17 +152,18 @@ function filter(iterable $collection, callable $filter): Stream * @template TKey * @template TValue * - * @param iterable $collection Collection to iterate over. - * @param callable(): void $finalizer User defined callable to invoke when iterator is depleted or exception is thrown. + * @param iterable $source Stream source to iterate over. + * @param callable(): void $finalizer User defined callable to invoke when iterator is depleted, or exception is + * thrown, or operator instance is garbage collected. * * @return Stream * * @see Operator\Finalize */ -function finalize(iterable $collection, callable $finalizer): Stream +function finalize(iterable $source, callable $finalizer): Stream { return new Stream( - new Operator\Finalize($collection, $finalizer) + new Operator\Finalize($source, $finalizer) ); } @@ -168,22 +172,21 @@ function finalize(iterable $collection, callable $finalizer): Stream * * @template TKey * @template TValue - * @template TValues of iterable * - * @param iterable $collection Collection to iterate over. - * @param bool $preserveKeys Should keys be preserved from the flattened collections, false by default. + * @param iterable> $source Stream of streams to iterate over. + * @param bool $preserveKeys Should keys be preserved from the flattened stream, false by default. * * @return ($preserveKeys is true ? Stream : Stream) * - * @see Operator\Flatten + * @see Operator\Flatten * - * @phpstan-ignore-next-line return.unusedType + * @phpstan-ignore-next-line */ -function flatten(iterable $collection, bool $preserveKeys = false): Stream +function flatten(iterable $source, bool $preserveKeys = false): Stream { // @phpstan-ignore-next-line return.type return new Stream( - new Operator\Flatten($collection, $preserveKeys) + new Operator\Flatten($source, $preserveKeys) ); } @@ -193,13 +196,13 @@ function flatten(iterable $collection, bool $preserveKeys = false): Stream * @template TKey * @template TValue * - * @param iterable $collection Collection to iterate. + * @param iterable $source Stream source to iterate. * - * @return Stream Flushed stream. + * @return Stream Flushed and closed stream. */ -function flush(iterable $collection): Stream +function flush(iterable $source): Stream { - $stream = new Stream($collection); + $stream = new Stream($source); \iterator_to_array($stream, false); @@ -207,24 +210,22 @@ function flush(iterable $collection): Stream } /** - * Create if empty operator. + * Create "if empty" operator. * * @template TKey * @template TValue - * @template TAlternativeKey - * @template TAlternativeValue * - * @param iterable $collection Collection to iterate over. - * @param \Exception|(callable(): iterable|never) $action Action to undertake if collection is empty, or exception to throw. + * @param iterable $source Stream source to iterate over. + * @param \Throwable|(callable(): iterable)|null $fallback Fallback stream source, or exception to throw. * - * @return Stream + * @return Stream * * @see Operator\IfEmpty */ -function if_empty(iterable $collection, \Exception|callable $action): Stream +function if_empty(iterable $source, \Throwable|callable|null $fallback): Stream { return new Stream( - new Operator\IfEmpty($collection, $action) + new Operator\IfEmpty($source, $fallback) ); } @@ -236,20 +237,20 @@ function if_empty(iterable $collection, \Exception|callable $action): Stream * @template TModifiedKey * @template TModifiedValue * - * @param iterable $collection Collection to iterate over. - * @param callable(TValue, TKey=): TModifiedValue $valueTransform User defined callable to be called on each item. - * @param callable(TKey, TValue=): TModifiedKey|null $keyTransform User defined callable to be called on each item key. If null, original keys are preserved. + * @param iterable $source Stream source to iterate over. + * @param callable(TValue, TKey=): TModifiedValue|null $valueTransform Optional transformation function for transforming values. + * @param callable(TKey, TValue=): TModifiedKey|null $keyTransform Optional transformation function for transforming keys. * - * @return Stream<($keyTransform is null ? TModifiedKey : TKey), TModifiedValue> + * @return Stream<($keyTransform is null ? TKey : TModifiedKey), ($valueTransform is null ? TValue : TModifiedValue)> * * @see Operator\Map */ -function map(iterable $collection, callable $valueTransform, ?callable $keyTransform = null): Stream +function map(iterable $source, ?callable $valueTransform = null, ?callable $keyTransform = null): Stream { /** - * @var StreamInterface<($keyTransform is null ? TKey : TModifiedKey), TModifiedValue> $map + * @var StreamInterface<($keyTransform is null ? TKey : TModifiedKey), ($valueTransform is null ? TValue : TModifiedValue)> $map */ - $map = new Operator\Map($collection, $valueTransform, $keyTransform); + $map = new Operator\Map($source, $valueTransform, $keyTransform); return new Stream($map); } @@ -262,8 +263,8 @@ function map(iterable $collection, callable $valueTransform, ?callable $keyTrans * @template TKey2 * @template TValue2 * - * @param iterable $first First collection to iterate over. - * @param iterable $second Second collection to iterate over. + * @param iterable $first First stream source to iterate over. + * @param iterable $second Second stream source to iterate over. * * @return Stream * @@ -282,18 +283,18 @@ function merge(iterable $first, iterable $second): Stream * @template TKey * @template TValue * - * @param iterable $collection Collection to iterate over. - * @param positive-int $capacity Max number of items to iterate over. - * @param \Exception|null $exception Which exception to throw if collection has more then allowed items ({@see \OverflowException} by default). + * @param iterable $source Collection to iterate over. + * @param positive-int $capacity Maximum number of items to iterate over. + * @param \Throwable|(callable(StreamOverflowException=): \Throwable)|null $throw Exception to throw if stream yielded more items then capacity allows. * * @return Stream * * @see Operator\Overflow */ -function overflow(iterable $collection, int $capacity, ?\Exception $exception = null): Stream +function overflow(iterable $source, int $capacity, \Throwable|callable|null $throw = null): Stream { return new Stream( - new Operator\Overflow($collection, $capacity, $exception) + new Operator\Overflow($source, $capacity, $throw) ); } @@ -303,16 +304,16 @@ function overflow(iterable $collection, int $capacity, ?\Exception $exception = * @template TKey * @template TValue * - * @param iterable $collection Collection to iterate over in reverse order. + * @param iterable $source Stream source to iterate over in reverse order. * * @return Stream * * @see Operator\Reverse */ -function reverse(iterable $collection): Stream +function reverse(iterable $source): Stream { return new Stream( - new Operator\Reverse($collection) + new Operator\Reverse($source) ); } @@ -322,17 +323,17 @@ function reverse(iterable $collection): Stream * @template TKey * @template TValue * - * @param iterable $collection Collection to iterate over. - * @param positive-int $count Number of items to skip. + * @param iterable $source Stream source to iterate over. + * @param positive-int $count Number of items to skip. * * @return Stream * * @see Operator\Skip */ -function skip(iterable $collection, int $count): Stream +function skip(iterable $source, int $count): Stream { return new Stream( - new Operator\Skip($collection, $count) + new Operator\Skip($source, $count) ); } @@ -342,7 +343,7 @@ function skip(iterable $collection, int $count): Stream * @template TKey * @template TValue * - * @param iterable $collection Collection to iterate over. + * @param iterable $source Stream source to iterate over. * @param ($byKeys is false ? (callable(TValue, TValue): int) : (callable(TKey, TKey): int))|null $comparator User defined callable to compare two items. If null, spaceship operator (<=>) is used. * @param bool $byKeys If `byKeys` is true, keys will be compared instead of values. * @@ -350,10 +351,10 @@ function skip(iterable $collection, int $count): Stream * * @see Operator\Sort */ -function sort(iterable $collection, ?callable $comparator = null, bool $byKeys = false): Stream +function sort(iterable $source, ?callable $comparator = null, bool $byKeys = false): Stream { return new Stream( - new Operator\Sort($collection, $comparator, $byKeys) + new Operator\Sort($source, $comparator, $byKeys) ); } @@ -363,17 +364,17 @@ function sort(iterable $collection, ?callable $comparator = null, bool $byKeys = * @template TKey * @template TValue * - * @param iterable $collection Collection to iterate over. - * @param positive-int $count Number of items to yield. + * @param iterable $source Stream source to iterate over. + * @param positive-int $count Number of items to yield. * * @return Stream * * @see Operator\Take */ -function take(iterable $collection, int $count): Stream +function take(iterable $source, int $count): Stream { return new Stream( - new Operator\Take($collection, $count) + new Operator\Take($source, $count) ); } @@ -383,17 +384,17 @@ function take(iterable $collection, int $count): Stream * @template TKey * @template TValue * - * @param iterable $collection Collection to iterate over. - * @param callable(TValue, TKey): bool $predicate User defined callable to evaluate. + * @param iterable $source Stream source to iterate over. + * @param callable(TValue, TKey=): bool $predicate Predicate callable to evaluate stop condition. * * @return Stream * * @see Operator\TakeUntil */ -function takeUntil(iterable $collection, callable $predicate): Stream +function takeUntil(iterable $source, callable $predicate): Stream { return new Stream( - new Operator\TakeUntil($collection, $predicate) + new Operator\TakeUntil($source, $predicate) ); } @@ -403,17 +404,38 @@ function takeUntil(iterable $collection, callable $predicate): Stream * @template TKey * @template TValue * - * @param iterable $collection Collection to iterate over. - * @param callable(TValue, TKey): void $callback User defined callable to execute for each item. + * @param iterable $source Stream source to iterate over. + * @param callable(TValue, TKey=): void $callback Callable to execute for each item. * * @return Stream * * @see Operator\Tap */ -function tap(iterable $collection, callable $callback): Stream +function tap(iterable $source, callable $callback): Stream +{ + return new Stream( + new Operator\Tap($source, $callback) + ); +} + +/** + * Create custom operator. + * + * @template TInputKey + * @template TInputValue + * @template TOutputKey + * @template TOutputValue + * + * @param iterable $source Stream source to iterate over. + * @param class-string> $operator Operator to apply. + * @param mixed ...$arguments Arguments for operator. + * + * @return Stream + */ +function operator(iterable $source, string $operator, mixed ...$arguments): Stream { return new Stream( - new Operator\Tap($collection, $callback) + new $operator($source, ...$arguments) ); } @@ -425,14 +447,14 @@ function tap(iterable $collection, callable $callback): Stream * @template TReducedValue * @template TReducer of ReducerInterface * - * @param non-empty-string $name Name of the aggregator. - * @param iterable $collection Collection to collect from. - * @param class-string|callable(TReducedValue, TValue, TKey=): TReducedValue $reducer Reducer to attach. - * @param mixed ...$args Arguments passed to reducer. + * @param non-empty-string $name Name of the aggregator. + * @param iterable $source Stream source from which to aggregate values. + * @param class-string|callable(TReducedValue, TValue, TKey=): TReducedValue $reducer Reducer to attach. + * @param mixed ...$args Arguments passed to reducer. * * @return Stream */ -function aggregate(string $name, iterable $collection, callable|string $reducer, mixed ...$args): Stream +function aggregate(string $name, iterable $source, callable|string $reducer, mixed ...$args): Stream { /** @var TReducer $instance */ $instance = \is_string($reducer) && \is_a($reducer, ReducerInterface::class, true) @@ -440,7 +462,7 @@ function aggregate(string $name, iterable $collection, callable|string $reducer, : new Reducer\Callback($reducer, ...$args); return new Stream( - new Aggregator($name, new Operator\Reduce($collection, $instance)), + new Aggregator($name, new Operator\Reduce($source, $instance)), ); } @@ -452,46 +474,46 @@ function aggregate(string $name, iterable $collection, callable|string $reducer, * @template TCollectedValue * @template TCollector of CollectorInterface * - * @param iterable $collection Collection to collect from. - * @param class-string $collector Collector class name. - * @param mixed ...$args Arguments passed to collector. + * @param iterable $source Stream source to collect from. + * @param class-string $collector Collector class name. + * @param mixed ...$args Arguments passed to collector. * * @return TCollector * * @see Contract\CollectorInterface */ -function collect(iterable $collection, string $collector, mixed ...$args): CollectorInterface +function collect(iterable $source, string $collector, mixed ...$args): CollectorInterface { return new \ReflectionClass($collector)->newInstanceArgs(\array_merge( - [$collection], + [$source], $args )); } /** - * Reduce values from dataset using specified reducer. + * Reduce values from stream source using specified reducer. * * @template TKey * @template TValue * @template TReducedValue * @template TReducer of ReducerInterface * - * @param iterable $collection Collection to collect from. - * @param class-string|callable(TReducedValue, TValue, TKey=): TReducedValue $reducer Reducer to use. - * @param mixed ...$args Arguments passed to reducer. + * @param iterable $source Stream source to reduce. + * @param class-string|callable(TReducedValue, TValue, TKey=): TReducedValue $reducer Reducer to use. + * @param mixed ...$args Arguments passed to reducer. * * @return TReducedValue * * @see Contract\ReducerInterface */ -function reduce(iterable $collection, callable|string $reducer, mixed ...$args): mixed +function reduce(iterable $source, callable|string $reducer, mixed ...$args): mixed { /** @var TReducer $instance */ $instance = \is_string($reducer) && \is_a($reducer, ReducerInterface::class, true) ? new \ReflectionClass($reducer)->newInstanceArgs($args) : new Reducer\Callback($reducer, ...$args); - $operator = new Operator\Reduce($collection, $instance); + $operator = new Operator\Reduce($source, $instance); flush($operator); diff --git a/src/RunOpenCode/Component/Dataset/tests/Operator/MapTest.php b/src/RunOpenCode/Component/Dataset/tests/Operator/MapTest.php index b7396e9..f9600fd 100644 --- a/src/RunOpenCode/Component/Dataset/tests/Operator/MapTest.php +++ b/src/RunOpenCode/Component/Dataset/tests/Operator/MapTest.php @@ -6,6 +6,7 @@ use PHPUnit\Framework\Attributes\Test; use PHPUnit\Framework\TestCase; +use RunOpenCode\Component\Dataset\Exception\LogicException; use function RunOpenCode\Component\Dataset\map; @@ -16,10 +17,10 @@ public function maps(): void { $operator = map( [ - 'a' => 1, - 'b' => 2, - 'c' => 3, - ], + 'a' => 1, + 'b' => 2, + 'c' => 3, + ], static fn(int $value): int => $value * 2, static fn(string $key): string => \sprintf('mapped_%s', $key), ); @@ -30,4 +31,54 @@ public function maps(): void 'mapped_c' => 6, ], \iterator_to_array($operator)); } + + #[Test] + public function map_keys(): void + { + $operator = map( + [ + 'a' => 1, + 'b' => 2, + 'c' => 3, + ], + keyTransform: static fn(string $key): string => \sprintf('mapped_%s', $key), + ); + + $this->assertSame([ + 'mapped_a' => 1, + 'mapped_b' => 2, + 'mapped_c' => 3, + ], \iterator_to_array($operator)); + } + + #[Test] + public function map_values(): void + { + $operator = map( + [ + 'a' => 1, + 'b' => 2, + 'c' => 3, + ], + valueTransform: static fn(int $value): int => $value * 2, + ); + + $this->assertSame([ + 'a' => 2, + 'b' => 4, + 'c' => 6, + ], \iterator_to_array($operator)); + } + + #[Test] + public function map_throws_exception_when_transform_function_missing(): void + { + $this->expectException(LogicException::class); + + map([ + 'a' => 1, + 'b' => 2, + 'c' => 3, + ]); + } } diff --git a/src/RunOpenCode/Component/Dataset/tests/Operator/TapTest.php b/src/RunOpenCode/Component/Dataset/tests/Operator/TapTest.php index 8be9a34..61345bf 100644 --- a/src/RunOpenCode/Component/Dataset/tests/Operator/TapTest.php +++ b/src/RunOpenCode/Component/Dataset/tests/Operator/TapTest.php @@ -20,7 +20,7 @@ public function taps(): void 'a' => 1, 'b' => 2, 'c' => 3, - ], static function(int $value, string $key) use (&$log): void { + ], static function(int $value, string $key) use (&$log): void { // @phpstan-ignore-line $log[] = \sprintf('Key: %s, Value: %d', $key, $value); }); diff --git a/src/RunOpenCode/Component/Dataset/tests/StreamTest.php b/src/RunOpenCode/Component/Dataset/tests/StreamTest.php index 1a08ea8..ef8f725 100644 --- a/src/RunOpenCode/Component/Dataset/tests/StreamTest.php +++ b/src/RunOpenCode/Component/Dataset/tests/StreamTest.php @@ -331,7 +331,7 @@ public function tap(): void $tapped = []; $data = new Stream($dataset) - ->tap(static function(int $value, string $key) use (&$tapped): void { + ->tap(static function(int $value, string $key) use (&$tapped): void { // @phpstan-ignore-line $tapped[\sprintf('tapped_%s', $key)] = $value * 2; }) ->collect(ArrayCollector::class) diff --git a/src/RunOpenCode/Component/Query/src/Doctrine/Dbal/Middleware/Converted.php b/src/RunOpenCode/Component/Query/src/Doctrine/Dbal/Middleware/Converted.php index 138de45..cdcb652 100644 --- a/src/RunOpenCode/Component/Query/src/Doctrine/Dbal/Middleware/Converted.php +++ b/src/RunOpenCode/Component/Query/src/Doctrine/Dbal/Middleware/Converted.php @@ -63,13 +63,17 @@ public function scalar(...$default): mixed { assert_default_value(...$default); - return Stream::create($this->result) - ->take(2) - ->overflow(1, new NonUniqueResultException('Expected only one record in result set, multiple retrieved.')) - ->map(static fn(array $row): array => [\array_key_first($row) => array_first($row)]) - ->map($this->convert(...)) // @phpstan-ignore-line - ->ifEmpty(static fn(): iterable => \array_key_exists(0, $default) ? [[$default[0]]] : throw new NoResultException('Expected one record in result set, none found.')) - ->reduce(Callback::class, static fn(mixed $carry, array $value): mixed => \array_values($value)[0], null); + try { + return Stream::create($this->result) + ->take(2) + ->overflow(1, new NonUniqueResultException('Expected only one record in result set, multiple retrieved.')) + ->map(static fn(array $row): array => [\array_key_first($row) => array_first($row)]) + ->map($this->convert(...)) // @phpstan-ignore-line + ->ifEmpty(new NoResultException('Expected one record in result set, none found.')) + ->reduce(Callback::class, static fn(mixed $carry, array $value): mixed => \array_values($value)[0], null); + } catch (NoResultException $exception) { + return \array_key_exists(0, $default) ? $default[0] : throw $exception; + } } /** @@ -95,12 +99,16 @@ public function record(...$default): mixed { assert_default_value(...$default); - return Stream::create($this->result) - ->take(2) - ->overflow(1, new NonUniqueResultException('Expected only one record in result set, multiple retrieved.')) - ->map($this->convert(...)) - ->ifEmpty(static fn(): iterable => \array_key_exists(0, $default) ? [$default[0]] : throw new NoResultException('Expected one record in result set, none found.')) - ->collect(ListCollector::class)[0]; + try { + return Stream::create($this->result) + ->take(2) + ->overflow(1, new NonUniqueResultException('Expected only one record in result set, multiple retrieved.')) + ->map($this->convert(...)) + ->ifEmpty(new NoResultException('Expected one record in result set, none found.')) + ->collect(ListCollector::class)[0]; + } catch (NoResultException $exception) { + return \array_key_exists(0, $default) ? $default[0] : throw $exception; + } } /** diff --git a/src/RunOpenCode/Component/Query/src/Doctrine/Dbal/Result.php b/src/RunOpenCode/Component/Query/src/Doctrine/Dbal/Result.php index 2514c99..8b7c534 100644 --- a/src/RunOpenCode/Component/Query/src/Doctrine/Dbal/Result.php +++ b/src/RunOpenCode/Component/Query/src/Doctrine/Dbal/Result.php @@ -58,9 +58,11 @@ public function scalar(mixed ...$default): mixed try { return Stream::create($this->dataset->vector()) ->take(2) - ->ifEmpty(static fn(): iterable => \array_key_exists(0, $default) ? [$default[0]] : throw new NoResultException('Expected one record in result set, none found.')) ->overflow(1, new NonUniqueResultException('Expected only one record in result set, multiple retrieved.')) + ->ifEmpty(new NoResultException('Expected one record in result set, none found.')) ->reduce(Callback::class, static fn(mixed $carry, mixed $value): mixed => $value, null); + } catch (NoResultException $exception) { + return \array_key_exists(0, $default) ? $default[0] : throw $exception; } finally { $this->free(); } @@ -94,9 +96,11 @@ public function record(mixed ...$default): mixed try { return Stream::create($this->dataset) ->take(2) - ->ifEmpty(static fn(): iterable => \array_key_exists(0, $default) ? [$default[0]] : throw new NoResultException('Expected one record in result set, none found.')) ->overflow(1, new NonUniqueResultException('Expected only one record in result set, multiple retrieved.')) + ->ifEmpty(new NoResultException('Expected one record in result set, none found.')) ->collect(ListCollector::class)[0]; + } catch (NoResultException $exception) { + return \array_key_exists(0, $default) ? $default[0] : throw $exception; } finally { $this->free(); }