diff --git a/src/RunOpenCode/Component/Dataset/composer.json b/src/RunOpenCode/Component/Dataset/composer.json index 9c22cb4..549600b 100644 --- a/src/RunOpenCode/Component/Dataset/composer.json +++ b/src/RunOpenCode/Component/Dataset/composer.json @@ -6,6 +6,10 @@ { "name": "Nikola Svitlica a.k.a TheCelavi", "email": "thecelavi@runopencode.com" + }, + { + "name": "Stefan Veljancic", + "email": "veljancicstefan@gmail.com" } ], "require": { diff --git a/src/RunOpenCode/Component/Dataset/src/Collector/IterableCollector.php b/src/RunOpenCode/Component/Dataset/src/Collector/IterableCollector.php index 546f4ef..eeabda5 100644 --- a/src/RunOpenCode/Component/Dataset/src/Collector/IterableCollector.php +++ b/src/RunOpenCode/Component/Dataset/src/Collector/IterableCollector.php @@ -28,7 +28,7 @@ final class IterableCollector implements \IteratorAggregate, CollectorInterface return $this->getIterator(); } } - + /** * {@inheritdoc} */ @@ -45,7 +45,7 @@ final class IterableCollector implements \IteratorAggregate, CollectorInterface /** * Provides you with total number of iterated elements. - * + * * @var non-negative-int */ public private(set) int $count = 0; @@ -73,4 +73,4 @@ public function getIterator(): \Traversable $this->count++; } } -} \ No newline at end of file +} diff --git a/src/RunOpenCode/Component/Dataset/src/Model/Buffer.php b/src/RunOpenCode/Component/Dataset/src/Model/Buffer.php new file mode 100644 index 0000000..2f1484e --- /dev/null +++ b/src/RunOpenCode/Component/Dataset/src/Model/Buffer.php @@ -0,0 +1,110 @@ + + */ +final readonly class Buffer implements \IteratorAggregate, \Countable +{ + /** + * Create new buffer. + * + * @param \ArrayObject $items Items within buffer. + * + * @internal + */ + public function __construct(private \ArrayObject $items) + { + // noop. + } + + /** + * Get first item in buffer. + * + * @return Item + * + * @phpstan-ignore-next-line generics.variance + */ + public function first(): Item + { + $first = $this->items[0] ?? throw new LogicException('Buffer is empty.'); + + return new Item($first[0], $first[1]); + } + + /** + * Get last item in buffer. + * + * @return Item + * + * @phpstan-ignore-next-line generics.variance + */ + public function last(): Item + { + $last = $this->items[\count($this->items) - 1] ?? throw new LogicException('Buffer is empty.'); + + return new Item($last[0], $last[1]); + } + + /** + * Get all keys. + * + * @return list + */ + public function keys(): array + { + $keys = []; + + foreach ($this->items as [$key]) { + $keys[] = $key; + } + + return $keys; + } + + /** + * Get all values. + * + * @return list + */ + public function values(): array + { + $values = []; + + foreach ($this->items as [, $value]) { + $values[] = $value; + } + + return $values; + } + + /** + * {@inheritdoc} + */ + public function getIterator(): \Traversable + { + foreach ($this->items as [$key, $value]) { + yield $key => $value; + } + } + + /** + * {@inheritdoc} + */ + public function count(): int + { + return $this->items->count(); + } +} diff --git a/src/RunOpenCode/Component/Dataset/src/Model/Item.php b/src/RunOpenCode/Component/Dataset/src/Model/Item.php new file mode 100644 index 0000000..4987384 --- /dev/null +++ b/src/RunOpenCode/Component/Dataset/src/Model/Item.php @@ -0,0 +1,88 @@ + + */ +final readonly class Item implements \ArrayAccess +{ + /** + * @param TKey $key + * @param TValue $value + */ + public function __construct( + private mixed $key, + private mixed $value + ) { + // noop. + } + + /** + * Get key. + * + * @return TKey + */ + public function key(): mixed + { + return $this->key; + } + + /** + * Get value. + * + * @return TValue + */ + public function value(): mixed + { + return $this->value; + } + + /** + * {@inheritdoc} + */ + public function offsetExists(mixed $offset): bool + { + return $offset === 0 || $offset === 1; + } + + /** + * {@inheritdoc} + * + * @return ($offset is 0 ? TKey : TValue) + */ + public function offsetGet(mixed $offset): mixed + { + return match ($offset) { + 0 => $this->key, + 1 => $this->value, + default => throw new OutOfBoundsException($offset, $this, \sprintf( + 'Item tuple does not have offset "%s".', + $offset + )), + }; + } + + /** + * {@inheritdoc} + */ + public function offsetSet(mixed $offset, mixed $value): void + { + throw new \BadMethodCallException('Item is immutable.'); + } + + /** + * {@inheritdoc} + */ + public function offsetUnset(mixed $offset): void + { + throw new \BadMethodCallException('Item is immutable.'); + } +} diff --git a/src/RunOpenCode/Component/Dataset/src/Operator/Batch.php b/src/RunOpenCode/Component/Dataset/src/Operator/Batch.php deleted file mode 100644 index 21bac1e..0000000 --- a/src/RunOpenCode/Component/Dataset/src/Operator/Batch.php +++ /dev/null @@ -1,94 +0,0 @@ - 1, 'b' => 2, 'c' => 3, 'd' => 4, 'e' => 5]), - * size: 2, - * onBatch: function(array $batch, int $batchNumber): iterable { - * foreach ($batch as [$key, $value]) { - * // Do some operations on $key and $value, - * // e.g. load additional data from database - * // for each record. - * yield $key => $value; - * } - * } - * ); - * ``` - * - * @template TKey - * @template TValue - * @template TModifiedKey - * @template TModifiedValue - * - * @phpstan-type Record = array{TKey, TValue} - * @phpstan-type Records = Record[] - * - * @phpstan-type OnBatchCallable = callable(Records $batch, int $batchNumber): iterable - * - * @extends AbstractStream - * @implements OperatorInterface - */ -final class Batch extends AbstractStream implements OperatorInterface -{ - private readonly \Closure $onBatch; - - /** - * @param iterable $collection Collection to iterate over. - * @param OnBatchCallable $onBatch User defined callable to be called on each batch. - * @param positive-int $size Size of the batch buffer. - */ - public function __construct( - private readonly iterable $collection, - callable $onBatch, - private readonly int $size = 1000, - ) { - parent::__construct($collection); - $this->onBatch = $onBatch(...); - } - - /** - * {@inheritdoc} - */ - protected function iterate(): \Traversable - { - $batch = []; - $iteration = 1; - - foreach ($this->collection as $key => $value) { - $batch[] = [$key, $value]; - - if (\count($batch) === $this->size) { - yield from ($this->onBatch)($batch, $iteration); - $batch = []; - $iteration++; - } - } - - if (\count($batch) !== 0) { - yield from ($this->onBatch)($batch, $iteration); - } - } -} diff --git a/src/RunOpenCode/Component/Dataset/src/Operator/BufferCount.php b/src/RunOpenCode/Component/Dataset/src/Operator/BufferCount.php new file mode 100644 index 0000000..3eae493 --- /dev/null +++ b/src/RunOpenCode/Component/Dataset/src/Operator/BufferCount.php @@ -0,0 +1,59 @@ +> + * @implements OperatorInterface> + */ +final class BufferCount extends AbstractStream implements OperatorInterface +{ + /** + * @param iterable $collection Collection to iterate over. + * @param positive-int $count How many items to buffer. + */ + public function __construct( + private readonly iterable $collection, + private readonly int $count = 1000, + ) { + parent::__construct($collection); + } + + /** + * {@inheritdoc} + */ + protected function iterate(): \Traversable + { + /** @var \ArrayObject $items */ + $items = new \ArrayObject(); + + foreach ($this->collection as $key => $value) { + $items[] = [$key, $value]; + + if (\count($items) === $this->count) { + yield new Buffer($items); + $items = new \ArrayObject(); + } + } + + if (\count($items) !== 0) { + yield new Buffer($items); + } + } +} diff --git a/src/RunOpenCode/Component/Dataset/src/Operator/BufferWhile.php b/src/RunOpenCode/Component/Dataset/src/Operator/BufferWhile.php new file mode 100644 index 0000000..48d7fd3 --- /dev/null +++ b/src/RunOpenCode/Component/Dataset/src/Operator/BufferWhile.php @@ -0,0 +1,77 @@ + + * @phpstan-type PredicateCallable = callable(TBuffer, TValue=, TKey=): bool + * @phpstan-type TBuffer = Buffer + * + * @extends AbstractStream + * @implements OperatorInterface + */ +final class BufferWhile extends AbstractStream implements OperatorInterface +{ + private readonly \Closure $predicate; + + /** + * @param iterable $collection Collection to iterate over. + * @param PredicateCallable $predicate Callable predicate function to evaluate. + */ + public function __construct( + private readonly iterable $collection, + callable $predicate, + ) { + parent::__construct($collection); + $this->predicate = $predicate(...); + } + + /** + * {@inheritdoc} + */ + protected function iterate(): \Traversable + { + /** @var TStorage $items */ + $items = new \ArrayObject(); + $buffer = new Buffer($items); + + foreach ($this->collection as $key => $value) { + if (0 === \count($items)) { + $items[] = [$key, $value]; + continue; + } + + if (($this->predicate)($buffer, $value, $key)) { + $items[] = [$key, $value]; + continue; + } + + yield $buffer; + + /** @var TStorage $items */ + $items = new \ArrayObject(); + $buffer = new Buffer($items); + $items[] = [$key, $value]; + } + + if (0 !== \count($items)) { + yield $buffer; + } + } +} diff --git a/src/RunOpenCode/Component/Dataset/src/Operator/CompressJoin.php b/src/RunOpenCode/Component/Dataset/src/Operator/CompressJoin.php deleted file mode 100644 index 02ceef2..0000000 --- a/src/RunOpenCode/Component/Dataset/src/Operator/CompressJoin.php +++ /dev/null @@ -1,100 +0,0 @@ - [1, 2], 2 => [1, 3], 3 => [1, 4], 4 => [2, 1], 5 => [2, 2]]), - * predicate: static fn(array $values): bool => $values[0][0] === $values[1][0], - * join: static fn(array $buffer): iterable => [ - * $buffer[0][1][0] => array_map(static fn(array $record): int => $record[1][1], $buffer) - * ], - * ); - * // $compressJoin will yield [1 => [2, 3, 4], 2 => [1, 2]] - * ``` - * - * @template TKey - * @template TValue - * @template TModifiedKey - * @template TModifiedValue - * - * @phpstan-type PredicateValues = array{TValue, TValue} - * @phpstan-type PredicateKeys = array{TKey, TKey} - * @phpstan-type Record = array{TKey, TValue} - * @phpstan-type Buffer = list - * @phpstan-type PredicateCallable = callable(PredicateValues, PredicateKeys=, Buffer=): bool - * @phpstan-type JoinCallable = callable(Buffer): iterable - * - * @extends AbstractStream - * @implements OperatorInterface - */ -final class CompressJoin extends AbstractStream implements OperatorInterface -{ - private readonly \Closure $predicate; - - private readonly \Closure $join; - - /** - * @param iterable $collection Collection to iterate over. - * @param PredicateCallable $predicate Callable predicate function to evaluate. - * @param JoinCallable $join Callable join function to produce joined records. - */ - public function __construct( - private readonly iterable $collection, - callable $predicate, - callable $join, - ) { - parent::__construct($collection); - $this->predicate = $predicate(...); - $this->join = $join(...); - } - - /** - * {@inheritdoc} - */ - protected function iterate(): \Traversable - { - /** @var Buffer $buffer */ - $buffer = []; - /** @var Record|null $previous */ - $previous = null; - - foreach ($this->collection as $key => $value) { - if (0 === \count($buffer)) { - $previous = [$key, $value]; - $buffer[] = $previous; - continue; - } - - \assert(null !== $previous); - - if (($this->predicate)([$previous[1], $value], [$previous[0], $key], $buffer)) { - $previous = [$key, $value]; - $buffer[] = $previous; - continue; - } - - yield from ($this->join)($buffer); - $previous = [$key, $value]; - $buffer = [$previous]; - } - - if (0 !== \count($buffer)) { - yield from ($this->join)($buffer); - } - } -} diff --git a/src/RunOpenCode/Component/Dataset/src/Operator/Flatten.php b/src/RunOpenCode/Component/Dataset/src/Operator/Flatten.php index c25cafe..d3758da 100644 --- a/src/RunOpenCode/Component/Dataset/src/Operator/Flatten.php +++ b/src/RunOpenCode/Component/Dataset/src/Operator/Flatten.php @@ -13,8 +13,8 @@ * Flatten operator iterates over given collection of iterables and yields * each item from each iterable in a single flat sequence. * - * Keys from inner iterables are not preserved, new keys are generated - * in a continuous manner starting from 0. + * By default, keys from inner iterables are not preserved, which can be + * overridden in constructor. * * Example usage: * @@ -22,23 +22,27 @@ * use RunOpenCode\Component\Dataset\Operator\Flatten; * * $flatten = new Flatten( - * collection: new Dataset(['a' => [1, 2], 'b' => [3, 4], 'c' => [5]]), + * collection: new Dataset(['first' => ['a' => 1, 'b' => 3], 'second' => ['c' => 5]]), * ); - * // The resulting sequence will be: 0 => 1, 1 => 2, 2 => 3, 3 => 4, 4 => 5 + * // The resulting sequence will be: 0 => 1, 1 => 3, 2 => 5 + * // With `preserveKeys` set to true, resulting sequence would be: 'a' => 1, 'b' => 3, 'c' => 5 * ``` * + * @template TKey * @template TValue * - * @extends AbstractStream - * @implements OperatorInterface + * @extends AbstractStream + * @implements OperatorInterface */ final class Flatten extends AbstractStream implements OperatorInterface { /** - * @param iterable> $collection Collection to iterate over. + * @param iterable> $collection Collection to iterate over. + * @param bool $preserveKeys Should keys be preserved from the flattened collections, false by default. */ public function __construct( private readonly iterable $collection, + private readonly bool $preserveKeys = false, ) { parent::__construct($this->collection); } @@ -49,7 +53,12 @@ public function __construct( protected function iterate(): \Traversable { foreach ($this->collection as $items) { - foreach ($items as $value) { + foreach ($items as $key => $value) { + if ($this->preserveKeys) { + yield $key => $value; + continue; + } + yield $value; } } diff --git a/src/RunOpenCode/Component/Dataset/src/Stream.php b/src/RunOpenCode/Component/Dataset/src/Stream.php index aa726a2..9a300f2 100644 --- a/src/RunOpenCode/Component/Dataset/src/Stream.php +++ b/src/RunOpenCode/Component/Dataset/src/Stream.php @@ -6,14 +6,16 @@ use RunOpenCode\Component\Dataset\Contract\CollectorInterface; use RunOpenCode\Component\Dataset\Contract\ReducerInterface; +use RunOpenCode\Component\Dataset\Model\Buffer; use function RunOpenCode\Component\Dataset\aggregate as dataset_aggregate; -use function RunOpenCode\Component\Dataset\batch as dataset_batch; +use function RunOpenCode\Component\Dataset\buffer_count as dataset_buffer_count; +use function RunOpenCode\Component\Dataset\buffer_while as dataset_buffer_while; use function RunOpenCode\Component\Dataset\collect as dataset_collect; -use function RunOpenCode\Component\Dataset\compress_join as dataset_compress_join; use function RunOpenCode\Component\Dataset\distinct as dataset_distinct; use function RunOpenCode\Component\Dataset\filter as dataset_filter; use function RunOpenCode\Component\Dataset\flatten as dataset_flatten; +use function RunOpenCode\Component\Dataset\flush as dataset_flush; use function RunOpenCode\Component\Dataset\map as dataset_map; use function RunOpenCode\Component\Dataset\merge as dataset_merge; use function RunOpenCode\Component\Dataset\reduce as dataset_reduce; @@ -64,39 +66,31 @@ public static function create(iterable $collection): self } /** - * Applies batch operator on current stream. + * Applies buffer count operator on current stream. * - * @template TModifiedKey - * @template TModifiedValue - * - * @param callable(iterable $batch, int $batchNumber): iterable $onBatch User defined callable to be called on each batch. - * @param positive-int $size Size of the batch buffer. + * @param positive-int $count How many items to buffer. * - * @return self + * @return self> * - * @see Operator\Batch + * @see Operator\BufferCount */ - public function batch(callable $onBatch, int $size = 1000): self + public function bufferCount(int $count): self { - return dataset_batch($this, $onBatch, $size); + return dataset_buffer_count($this, $count); } /** - * Applies compress join operator on current stream. - * - * @template TModifiedKey - * @template TModifiedValue + * Applies buffer while operator on current stream. * - * @param callable(array{TValue, TValue}, array{TKey, TKey}=, list=): bool $predicate Callable predicate function to evaluate. - * @param callable(list): iterable $join Callable join function to produce joined records. + * @param callable(Buffer, TValue=, TKey=): bool $predicate Callable predicate function to evaluate. * - * @return self + * @return Stream> * - * @see Operator\CompressJoin + * @see Operator\BufferWhile */ - public function compressJoin(callable $predicate, callable $join): self + public function bufferWhile(callable $predicate): self { - return dataset_compress_join($this, $predicate, $join); + return dataset_buffer_while($this, $predicate); } /** @@ -127,16 +121,40 @@ public function filter(callable $filter): self return dataset_filter($this, $filter); } + /** + * Applies finalize operator on current stream. + * + * @param callable(): void $finalizer User defined callable to invoke when iterator is depleted or exception is thrown. + * + * @return self + * + * @see Operator\Finalize + */ + public function finalize(callable $finalizer): self + { + return dataset_finalize($this, $finalizer); + } + /** * Applies flatten operator on current stream. * - * @return self + * @return self * * @see Operator\Flatten */ - public function flatten(): self + public function flatten(bool $preserveKeys = false): self { - return dataset_flatten($this); + return dataset_flatten($this, $preserveKeys); + } + + /** + * Iterate through stream without yielding items. + * + * @return self + */ + public function flush(): self + { + return dataset_flush($this); } /** @@ -293,20 +311,6 @@ public function tap(callable $callback): self return dataset_tap($this, $callback); } - /** - * Applies finalize operator on current stream. - * - * @param callable(): void $finalizer User defined callable to invoke when iterator is depleted or exception is thrown. - * - * @return self - * - * @see Operator\Finalize - */ - public function finalize(callable $finalizer): self - { - return dataset_finalize($this, $finalizer); - } - /** * @template TReducedValue * @template TReducer of ReducerInterface @@ -365,4 +369,4 @@ protected function iterate(): \Traversable { yield from $this->collection; } -} \ No newline at end of file +} diff --git a/src/RunOpenCode/Component/Dataset/src/functions.php b/src/RunOpenCode/Component/Dataset/src/functions.php index 8a94b2f..ccf05e3 100644 --- a/src/RunOpenCode/Component/Dataset/src/functions.php +++ b/src/RunOpenCode/Component/Dataset/src/functions.php @@ -8,6 +8,7 @@ use RunOpenCode\Component\Dataset\Contract\CollectorInterface; use RunOpenCode\Component\Dataset\Contract\ReducerInterface; use RunOpenCode\Component\Dataset\Contract\StreamInterface; +use RunOpenCode\Component\Dataset\Model\Buffer; use RunOpenCode\Component\Dataset\Reducer\Callback; /** @@ -60,48 +61,42 @@ function stream(iterable $collection): Stream } /** - * Create batch operator. + * Create buffer count operator. * * @template TKey * @template TValue - * @template TModifiedKey - * @template TModifiedValue * - * @param iterable $collection Collection to iterate over. - * @param callable(iterable $batch, int $batchNumber): iterable $onBatch User defined callable to be called on each batch. - * @param positive-int $size Size of the batch buffer. + * @param iterable $collection Collection to iterate over. + * @param positive-int $count How many items to buffer. * - * @return Stream + * @return Stream> * - * @see Operator\Batch + * @see Operator\BufferCount */ -function batch(iterable $collection, callable $onBatch, int $size = 1000): Stream +function buffer_count(iterable $collection, int $count): Stream { return new Stream( - new Operator\Batch($collection, $onBatch, $size) + new Operator\BufferCount($collection, $count) ); } /** - * Create compress join operator. + * Create buffer while operator. * * @template TKey * @template TValue - * @template TModifiedKey - * @template TModifiedValue * - * @param iterable $collection Collection to iterate over. - * @param callable(array{TValue, TValue}, array{TKey, TKey}=, list=): bool $predicate Callable predicate function to evaluate. - * @param callable(list): iterable $join Callable join function to produce joined records. + * @param iterable $collection Collection to iterate over. + * @param callable(Buffer, TValue=, TKey=): bool $predicate Callable predicate function to evaluate. * - * @return Stream + * @return Stream> * - * @see Operator\CompressJoin + * @see Operator\BufferWhile */ -function compress_join(iterable $collection, callable $predicate, callable $join): Stream +function buffer_while(iterable $collection, callable $predicate): Stream { return new Stream( - new Operator\CompressJoin($collection, $predicate, $join) + new Operator\BufferWhile($collection, $predicate) ); } @@ -145,25 +140,69 @@ function filter(iterable $collection, callable $filter): Stream ); } +/** + * Create finalize operator. + * + * @template TKey + * @template TValue + * + * @param iterable $collection Collection to iterate over. + * @param callable(): void $finalizer User defined callable to invoke when iterator is depleted or exception is thrown. + * + * @return Stream + * + * @see Operator\Finalize + */ +function finalize(iterable $collection, callable $finalizer): Stream +{ + return new Stream( + new Operator\Finalize($collection, $finalizer) + ); +} + /** * Create flatten operator. * + * @template TKey * @template TValue - * @template TValues of iterable + * @template TValues of iterable * - * @param iterable $collection Collection to iterate over. + * @param iterable $collection Collection to iterate over. + * @param bool $preserveKeys Should keys be preserved from the flattened collections, false by default. * - * @return Stream + * @return ($preserveKeys is true ? Stream : Stream) * - * @see Operator\Flatten + * @see Operator\Flatten + * + * @phpstan-ignore-next-line return.unusedType */ -function flatten(iterable $collection): Stream +function flatten(iterable $collection, bool $preserveKeys = false): Stream { + // @phpstan-ignore-next-line return.type return new Stream( - new Operator\Flatten($collection) + new Operator\Flatten($collection, $preserveKeys) ); } +/** + * Iterate through stream without yielding items. + * + * @template TKey + * @template TValue + * + * @param iterable $collection Collection to iterate. + * + * @return Stream Flushed stream. + */ +function flush(iterable $collection): Stream +{ + $stream = new Stream($collection); + + \iterator_to_array($stream, false); + + return $stream; +} + /** * Create if empty operator. * @@ -375,25 +414,6 @@ function tap(iterable $collection, callable $callback): Stream ); } -/** - * Create finalize operator. - * - * @template TKey - * @template TValue - * - * @param iterable $collection Collection to iterate over. - * @param callable(): void $finalizer User defined callable to invoke when iterator is depleted or exception is thrown. - * - * @return Stream - * - * @see Operator\Finalize - */ -function finalize(iterable $collection, callable $finalizer): Stream -{ - return new Stream( - new Operator\Finalize($collection, $finalizer) - ); -} /** * Attach reducer as an aggregator. @@ -476,4 +496,4 @@ function reduce(iterable $collection, callable|string $reducer, mixed ...$args): } return $reducer->value; -} \ No newline at end of file +} diff --git a/src/RunOpenCode/Component/Dataset/tests/Model/BufferTest.php b/src/RunOpenCode/Component/Dataset/tests/Model/BufferTest.php new file mode 100644 index 0000000..bd34088 --- /dev/null +++ b/src/RunOpenCode/Component/Dataset/tests/Model/BufferTest.php @@ -0,0 +1,100 @@ +assertSame('a', $buffer->first()->key()); + $this->assertSame(1, $buffer->first()->value()); + } + + #[Test] + public function first_throws_exception_on_empty_buffer(): void + { + $this->expectException(LogicException::class); + + new Buffer(new \ArrayObject())->first(); + } + + #[Test] + public function last(): void + { + $buffer = new Buffer(new \ArrayObject([ + ['a', 1], + ['b', 2], + ])); + + $this->assertSame('b', $buffer->last()->key()); + $this->assertSame(2, $buffer->last()->value()); + } + + #[Test] + public function last_throws_exception_on_empty_buffer(): void + { + $this->expectException(LogicException::class); + + new Buffer(new \ArrayObject())->last(); + } + + #[Test] + public function keys(): void + { + $buffer = new Buffer(new \ArrayObject([ + ['a', 1], + ['b', 2], + ])); + + $this->assertSame(['a', 'b'], $buffer->keys()); + } + + #[Test] + public function values(): void + { + $buffer = new Buffer(new \ArrayObject([ + ['a', 1], + ['b', 2], + ])); + + $this->assertSame([1, 2], $buffer->values()); + } + + #[Test] + public function counts(): void + { + $buffer = new Buffer(new \ArrayObject([ + ['a', 1], + ['b', 2], + ])); + + $this->assertCount(2, $buffer); + } + + #[Test] + public function iterates(): void + { + $buffer = new Buffer(new \ArrayObject([ + ['a', 1], + ['b', 2], + ])); + + $this->assertSame([ + 'a' => 1, + 'b' => 2, + ], \iterator_to_array($buffer)); + } +} diff --git a/src/RunOpenCode/Component/Dataset/tests/Model/ItemTest.php b/src/RunOpenCode/Component/Dataset/tests/Model/ItemTest.php new file mode 100644 index 0000000..ede9b80 --- /dev/null +++ b/src/RunOpenCode/Component/Dataset/tests/Model/ItemTest.php @@ -0,0 +1,54 @@ +assertSame('1', new Item('1', 1)->key()); + } + + #[Test] + public function value(): void + { + $this->assertSame(1, new Item('1', 1)->value()); + } + + #[Test] + public function array_access(): void + { + $item = new Item('1', 1); + + $this->assertTrue(isset($item[0], $item[1])); + $this->assertSame('1', $item[0]); + $this->assertSame(1, $item[1]); + } + + #[Test] + public function offset_set_throws_exception(): void + { + $this->expectException(\BadMethodCallException::class); + + $item = new Item('1', 1); + + $item[0] = 'foo'; + } + + #[Test] + public function offset_unset_throws_exception(): void + { + $this->expectException(\BadMethodCallException::class); + + $item = new Item('1', 1); + + unset($item[0]); + } +} diff --git a/src/RunOpenCode/Component/Dataset/tests/Operator/BatchTest.php b/src/RunOpenCode/Component/Dataset/tests/Operator/BatchTest.php deleted file mode 100644 index 3ff497b..0000000 --- a/src/RunOpenCode/Component/Dataset/tests/Operator/BatchTest.php +++ /dev/null @@ -1,35 +0,0 @@ - 2, - 'b' => 10, - 'c' => 5, - 'd' => 1, - ], static function(iterable $batch, int $batchNumber): iterable { - foreach ($batch as [$key, $value]) { - yield \sprintf('processed_%s', $key) => $batchNumber * $value; - } - }, 2); - - $this->assertSame([ - 'processed_a' => 2, - 'processed_b' => 10, - 'processed_c' => 10, - 'processed_d' => 2, - ], \iterator_to_array($operator)); - } -} diff --git a/src/RunOpenCode/Component/Dataset/tests/Operator/BufferCountTest.php b/src/RunOpenCode/Component/Dataset/tests/Operator/BufferCountTest.php new file mode 100644 index 0000000..bcc9965 --- /dev/null +++ b/src/RunOpenCode/Component/Dataset/tests/Operator/BufferCountTest.php @@ -0,0 +1,80 @@ + 2, + 'b' => 10, + 'c' => 5, + 'd' => 1, + 'e' => 7, + ], 2) + ->map(static function(Buffer $buffer): iterable { + return stream($buffer) + ->map( + static fn(int $value): int => $value * 2, + static fn(string $key): string => \sprintf('processed_%s', $key) + ); + }) + ->aggregate('count', Count::class) + ->flatten(true); + + $this->assertSame([ + 'processed_a' => 4, + 'processed_b' => 20, + 'processed_c' => 10, + 'processed_d' => 2, + 'processed_e' => 14, + ], \iterator_to_array($stream)); + + $this->assertSame(3, $stream->aggregators['count']->value); + } + + #[Test] + public function buffers_one_element(): void + { + $stream = buffer_count([ + 'a' => 2, + ], 2) + ->map(static function(Buffer $buffer): iterable { + return stream($buffer) + ->map( + static fn(int $value): int => $value * 2, + static fn(string $key): string => \sprintf('processed_%s', $key) + ); + }) + ->aggregate('count', Count::class) + ->flatten(true); + + $this->assertSame([ + 'processed_a' => 4, + ], \iterator_to_array($stream)); + + $this->assertSame(1, $stream->aggregators['count']->value); + } + + #[Test] + public function buffers_empty_stream(): void + { + $stream = buffer_count([], 10)->aggregate('count', Count::class); + + $this->assertSame([], \iterator_to_array($stream)); + + $this->assertSame(0, $stream->aggregators['count']->value); + } +} diff --git a/src/RunOpenCode/Component/Dataset/tests/Operator/BufferWhileTest.php b/src/RunOpenCode/Component/Dataset/tests/Operator/BufferWhileTest.php new file mode 100644 index 0000000..589e5eb --- /dev/null +++ b/src/RunOpenCode/Component/Dataset/tests/Operator/BufferWhileTest.php @@ -0,0 +1,89 @@ + 2, + 'b' => 2, + 'c' => 2, + 'd' => 3, + 'e' => 3, + ], + static fn(Buffer $buffer, int $value): bool => $value === $buffer->last()->value(), // @phpstan-ignore-line + ) + ->aggregate('count', Count::class) + ->map(static function(Buffer $buffer): iterable { + return stream($buffer) + ->map( + static fn(int $value): int => $value * 2, + static fn(string $key): string => \sprintf('processed_%s', $key) + ); + }) + ->flatten(true); + + $this->assertSame([ + 'processed_a' => 4, + 'processed_b' => 4, + 'processed_c' => 4, + 'processed_d' => 6, + 'processed_e' => 6, + ], \iterator_to_array($stream)); + + $this->assertSame(2, $stream->aggregators['count']->value); + } + + #[Test] + public function buffers_one_element(): void + { + $stream = buffer_while( + [ + 'a' => 2, + ], + static fn(Buffer $buffer, int $value): bool => $value === $buffer->last()->key(), // @phpstan-ignore-line + ) + ->map(static function(Buffer $buffer): iterable { + return stream($buffer) + ->map( + static fn(int $value): int => $value * 2, + static fn(string $key): string => \sprintf('processed_%s', $key) + ); + }) + ->aggregate('count', Count::class) + ->flatten(true); + + $this->assertSame([ + 'processed_a' => 4, + ], \iterator_to_array($stream)); + + $this->assertSame(1, $stream->aggregators['count']->value); + } + + #[Test] + public function buffers_empty_stream(): void + { + $stream = buffer_while( + [], + static fn(): never => throw new \Exception('Never to be called'), + )->aggregate('count', Count::class); + + $this->assertSame([], \iterator_to_array($stream)); + + $this->assertSame(0, $stream->aggregators['count']->value); + } +} diff --git a/src/RunOpenCode/Component/Dataset/tests/Operator/CompressJoinTest.php b/src/RunOpenCode/Component/Dataset/tests/Operator/CompressJoinTest.php deleted file mode 100644 index 3f013fd..0000000 --- a/src/RunOpenCode/Component/Dataset/tests/Operator/CompressJoinTest.php +++ /dev/null @@ -1,69 +0,0 @@ - [10, 2], - 2 => [10, 3], - 3 => [10, 4], - 4 => [20, 1], - 5 => [20, 2], - 6 => [30, 5], - ], - static fn(array $values): bool => $values[0][0] === $values[1][0], - static fn(array $buffer): iterable => [ - $buffer[0][1][0] => \array_map(static fn(array $record): int => $record[1][1], $buffer), - ], - ); - - $this->assertSame([ - 10 => [2, 3, 4], - 20 => [1, 2], - 30 => [5], - ], \iterator_to_array($operator)); - } - - #[Test] - public function compress_join_with_single_element(): void - { - $operator = new CompressJoin( - [ - 1 => [10, 2], - ], - static fn(array $values): bool => $values[0][0] === $values[1][0], // @phpstan-ignore-line - static fn(array $buffer): iterable => [ - $buffer[0][1][0] => \array_map(static fn(array $record): int => $record[1][1], $buffer), - ], - ); - - $this->assertSame([ - 10 => [2], - ], \iterator_to_array($operator)); - } - - #[Test] - public function compress_join_with_empty(): void - { - $operator = new CompressJoin( - [], - static fn(array $values): bool => $values[0][0] === $values[1][0], - static fn(array $buffer): iterable => [ - $buffer[0][1][0] => \array_map(static fn(array $record): int => $record[1][1], $buffer), //@phpstan-ignore-line - ], - ); - - $this->assertSame([], \iterator_to_array($operator)); - } -} \ No newline at end of file diff --git a/src/RunOpenCode/Component/Dataset/tests/Operator/FlattenTest.php b/src/RunOpenCode/Component/Dataset/tests/Operator/FlattenTest.php index 39b9fd1..a349166 100644 --- a/src/RunOpenCode/Component/Dataset/tests/Operator/FlattenTest.php +++ b/src/RunOpenCode/Component/Dataset/tests/Operator/FlattenTest.php @@ -32,4 +32,20 @@ public function flattens(): void 6, ], \iterator_to_array($operator)); } + + #[Test] + public function flattens_preserving_keys(): void + { + $operator = flatten([ + 'foo' => ['a' => 2, 'b' => 3], + 'bar' => ['c' => 10, 'd' => 20], + ], true); + + $this->assertSame([ + 'a' => 2, + 'b' => 3, + 'c' => 10, + 'd' => 20, + ], \iterator_to_array($operator)); + } } diff --git a/src/RunOpenCode/Component/Dataset/tests/StreamTest.php b/src/RunOpenCode/Component/Dataset/tests/StreamTest.php index 8c9afb6..05a8f0c 100644 --- a/src/RunOpenCode/Component/Dataset/tests/StreamTest.php +++ b/src/RunOpenCode/Component/Dataset/tests/StreamTest.php @@ -8,6 +8,7 @@ use PHPUnit\Framework\TestCase; use RunOpenCode\Component\Dataset\Collector\ArrayCollector; use RunOpenCode\Component\Dataset\Exception\LogicException; +use RunOpenCode\Component\Dataset\Model\Buffer; use RunOpenCode\Component\Dataset\Reducer\Average; use RunOpenCode\Component\Dataset\Reducer\Count; use RunOpenCode\Component\Dataset\Reducer\Max; @@ -27,59 +28,71 @@ public function creates(): void } #[Test] - public function batch(): void + public function buffer_count(): void { $dataset = [ 'a' => 2, 'b' => 10, 'c' => 5, 'd' => 1, + 'e' => 7, ]; - $data = new Stream($dataset) - ->batch(function(iterable $batch): iterable { - foreach ($batch as [$key, $value]) { - yield \sprintf('processed_%s', $key) => $value * 2; - } - }, 2) - ->collect(ArrayCollector::class) - ->value; + $stream = new Stream($dataset) + ->bufferCount(2) + ->map(static function(Buffer $buffer): iterable { + return new Stream($buffer) + ->map( + static fn(int $value): int => $value * 2, + static fn(string $key): string => \sprintf('processed_%s', $key) + ); + }) + ->aggregate('count', Count::class) + ->flatten(true); $this->assertSame([ 'processed_a' => 4, 'processed_b' => 20, 'processed_c' => 10, 'processed_d' => 2, - ], $data); + 'processed_e' => 14, + ], \iterator_to_array($stream)); + + $this->assertSame(3, $stream->aggregators['count']->value); } #[Test] - public function compressJoin(): void + public function buffer_while(): void { $dataset = [ - 1 => [10, 2], - 2 => [10, 3], - 3 => [10, 4], - 4 => [20, 1], - 5 => [20, 2], - 6 => [30, 5], + 'a' => 2, + 'b' => 2, + 'c' => 2, + 'd' => 3, + 'e' => 3, ]; - $data = new Stream($dataset) - ->compressJoin( - static fn(array $values): bool => $values[0][0] === $values[1][0], - static fn(array $buffer): iterable => [ - $buffer[0][1][0] => \array_map(static fn(array $record): int => $record[1][1], $buffer), - ], - ) - ->collect(ArrayCollector::class) - ->value; + $stream = new Stream($dataset) + ->bufferWhile(static fn(Buffer $buffer, int $value): bool => $value === $buffer->last()->value()) // @phpstan-ignore-line + ->aggregate('count', Count::class) + ->map(static function(Buffer $buffer): iterable { + return new Stream($buffer) + ->map( + static fn(int $value): int => $value * 2, + static fn(string $key): string => \sprintf('processed_%s', $key) + ); + }) + ->flatten(true); $this->assertSame([ - 10 => [2, 3, 4], - 20 => [1, 2], - 30 => [5], - ], $data); + 'processed_a' => 4, + 'processed_b' => 4, + 'processed_c' => 4, + 'processed_d' => 6, + 'processed_e' => 6, + ], \iterator_to_array($stream)); + + $this->assertSame(2, $stream->aggregators['count']->value); } #[Test] @@ -390,6 +403,24 @@ public function reduce(): void $this->assertSame(36, new Stream($dataset)->reduce(static fn(?int $carry, int $value, string $key): int => $value * 2 + ($carry ?? 0))); } + #[Test] + public function flush(): void + { + $dataset = [ + 'a' => 2, + 'b' => 10, + 'c' => 5, + 'd' => 1, + ]; + + $stream = Stream::create($dataset) + ->aggregate('count', Count::class) + ->flush(); + + $this->assertSame(4, $stream->aggregators['count']->value); + $this->assertTrue($stream->closed); + } + #[Test] public function throws_exception_when_iterating_closed_stream(): void { @@ -400,4 +431,4 @@ public function throws_exception_when_iterating_closed_stream(): void iterable_to_array($stream); \iterator_to_array($stream); } -} \ No newline at end of file +} diff --git a/src/RunOpenCode/Component/Query/tests/Fixtures/Dbal/MySqlFactory.php b/src/RunOpenCode/Component/Query/tests/Fixtures/Dbal/MySqlFactory.php index 19ab4ab..1778fd3 100644 --- a/src/RunOpenCode/Component/Query/tests/Fixtures/Dbal/MySqlFactory.php +++ b/src/RunOpenCode/Component/Query/tests/Fixtures/Dbal/MySqlFactory.php @@ -15,7 +15,7 @@ */ final class MySqlFactory { - private const int MAX_ATTEMPTS = 15; + private const int MAX_ATTEMPTS = 60; private static self $instance;