From 508da6a58625b7a5d311c3f3c1181b36f5f50d6d Mon Sep 17 00:00:00 2001 From: Stefan Veljancic Date: Tue, 23 Dec 2025 13:57:06 +0100 Subject: [PATCH] Implemented left join operator --- .../dataset/operators/list/index.rst | 1 + .../dataset/operators/list/left_join.rst | 97 ++++++++++ .../src/Collector/IndexedCollector.php | 175 ++++++++++++++++++ .../src/Exception/UnsupportedException.php | 18 ++ .../Component/Dataset/src/Model/Buffer.php | 13 ++ .../Dataset/src/Operator/LeftJoin.php | 77 ++++++++ .../Component/Dataset/src/Stream.php | 25 ++- .../Component/Dataset/src/functions.php | 21 +++ .../tests/Collector/IndexCollectorTest.php | 133 +++++++++++++ .../Dataset/tests/Model/BufferTest.php | 14 ++ .../Dataset/tests/Operator/LeftJoinTest.php | 44 +++++ .../Component/Dataset/tests/StreamTest.php | 17 ++ 12 files changed, 631 insertions(+), 4 deletions(-) create mode 100644 docs/source/components/dataset/operators/list/left_join.rst create mode 100644 src/RunOpenCode/Component/Dataset/src/Collector/IndexedCollector.php create mode 100644 src/RunOpenCode/Component/Dataset/src/Exception/UnsupportedException.php create mode 100644 src/RunOpenCode/Component/Dataset/src/Operator/LeftJoin.php create mode 100644 src/RunOpenCode/Component/Dataset/tests/Collector/IndexCollectorTest.php create mode 100644 src/RunOpenCode/Component/Dataset/tests/Operator/LeftJoinTest.php diff --git a/docs/source/components/dataset/operators/list/index.rst b/docs/source/components/dataset/operators/list/index.rst index af3455a..1e20cdf 100644 --- a/docs/source/components/dataset/operators/list/index.rst +++ b/docs/source/components/dataset/operators/list/index.rst @@ -13,6 +13,7 @@ Available operators finalize flatten if_empty + left_join map merge overflow diff --git a/docs/source/components/dataset/operators/list/left_join.rst b/docs/source/components/dataset/operators/list/left_join.rst new file mode 100644 index 0000000..78c624d --- /dev/null +++ b/docs/source/components/dataset/operators/list/left_join.rst @@ -0,0 +1,97 @@ +========== +leftJoin() +========== + +This operator mimics the SQL LEFT JOIN found in relational databases. + +It iterates over the source stream and joins each item with values from the +joining stream based on strict key equality. + +The operator yields the key from the source stream and a tuple containing the +source value as the first element and an iterable of joined values from the +joining stream as the second element. + + +.. warning:: This operator is not memory-efficient. Memory consumption depends + on the size of the joining stream. + + +.. php:namespace:: RunOpenCode\Component\Dataset\Operator + + +.. php:class:: LeftJoin + + .. php:method:: __construct(iterable $source, iterable $join) + + :param $source: ``iterable`` Stream source to iterate over on the left side of the left join operation. + :param $join: ``iterable`` Stream source to iterate over on the right side of the left join operation. + + + .. php:method:: getIterator() + + :returns: ``\Traversable`` Stream source joined with values from joining stream source. + +Use cases +--------- + +* Join operation is required which can not be executed on database level. + +Example +------- + +Migrate users and their addresses from legacy system in batches. + + + +.. code-block:: php + :linenos: + + executeQuery('SELECT * FROM users...'); + + new Stream($users) + ->bufferCount(100) + ->map(function(Buffer $buffer) use ($database): array { + $users = $buffer + ->stream() + ->map(keyTransform: static fn(array $row): int => $row['id']) + ->collect(ArrayCollector::class); + + $addresses = new Stream($database->executeQuery('SELECT * FROM addresses ... WHERE user_id IN (:ids)', [ + 'ids' => \array_keys($users), + ]))->map(keyTransform: static fn(array $row): int => $row['user_id']); + + return [ $users, $addresses ]; + }) + ->map(function(array $batch): Stream { + [$users, $addresses] = $batch; + + return new Stream($users) + ->leftJoin($addresses); + }) + ->flatten() + ->map(valueTransform: function(array $joined): User { + [$user, $addresses] = $joined; + + return User::fromArray($user) + ->setAddresses(\array_map(static fn(array $address): Address => Address::fromArray($address), $addresses)); + }) + ->bufferCount(100) + ->tap(function(Buffer $buffer) use ($orm) { + $buffer + ->stream() + ->tap(function(User $user) use ($orm) { + $orm->persist($user); + }) + ->flush(); + + $orm->flush(); + $orm->clear(); + }) + ->flush(); diff --git a/src/RunOpenCode/Component/Dataset/src/Collector/IndexedCollector.php b/src/RunOpenCode/Component/Dataset/src/Collector/IndexedCollector.php new file mode 100644 index 0000000..e1605cc --- /dev/null +++ b/src/RunOpenCode/Component/Dataset/src/Collector/IndexedCollector.php @@ -0,0 +1,175 @@ +> + * @implements \IteratorAggregate + * @implements \ArrayAccess> + */ +final class IndexedCollector implements CollectorInterface, \ArrayAccess, \IteratorAggregate, \Countable +{ + /** + * {@inheritdoc} + */ + public mixed $value { + get => $this->getIterator(); + } + + /** + * {@inheritdoc} + */ + public array $aggregated { + get => $this->source instanceof StreamInterface ? $this->source->aggregated : []; + } + + /** + * {@inheritdoc} + */ + public bool $closed { + get => false; + } + + /** + * Index of values with keys of scalar type. + * + * @var array> + */ + private array $scalarIndex = []; + + /** + * Index of values with keys of object type. + * + * @var \SplObjectStorage> + */ + private \SplObjectStorage $objectIndex; + + /** + * Collected values from stream. + * + * @var array + */ + private array $collected = []; + + /** + * @param iterable $source Stream source to collect. + */ + public function __construct( + private readonly iterable $source, + ) { + $this->objectIndex = new \SplObjectStorage(); + + foreach ($this->source as $key => $value) { + $this->collected[] = [$key, $value]; + + if (\is_string($key) || \is_int($key)) { + $this->scalarIndex[$key] = $this->scalarIndex[$key] ?? []; + $this->scalarIndex[$key][] = $value; + continue; + } + + if (\is_object($key)) { + $current = $this->objectIndex->contains($key) ? $this->objectIndex[$key] : []; + + $current[] = $value; + $this->objectIndex[$key] = $current; + continue; + } + + throw new UnsupportedException('Only object, string and integer keys are supported.'); + } + } + + /** + * {@inheritdoc} + */ + public function getIterator(): \Traversable + { + foreach ($this->collected as [$key, $value]) { + yield $key => $value; + } + } + + /** + * {@inheritdoc} + */ + public function offsetExists(mixed $offset): bool + { + return match (true) { + \is_string($offset) || \is_int($offset) => \array_key_exists($offset, $this->scalarIndex), + \is_object($offset) => $this->objectIndex->contains($offset), + default => throw new UnsupportedException('Only object, string and integer keys are supported.'), + }; + } + + /** + * Get values for given key. + * + * @param TKey $offset + * + * @return list + */ + public function offsetGet(mixed $offset): mixed + { + if (!$this->offsetExists($offset)) { + throw new OutOfBoundsException($offset, $this->value); + } + + return match (true) { + \is_string($offset) || \is_int($offset) => $this->scalarIndex[$offset], + \is_object($offset) => $this->objectIndex[$offset], + default => throw new UnsupportedException('Only object, string and integer keys are supported.'), + }; + } + + /** + * {@inheritdoc} + */ + public function offsetSet(mixed $offset, mixed $value): void + { + throw new LogicException(\sprintf( + 'Cannot set value for key "%s". Collector "%s" is read-only.', + \var_export($offset, true), + self::class, + )); + } + + /** + * {@inheritdoc} + */ + public function offsetUnset(mixed $offset): void + { + throw new LogicException(\sprintf( + 'Cannot unset value for key "%s". Collector "%s" is read-only.', + \var_export($offset, true), + self::class, + )); + } + + /** + * {@inheritdoc} + */ + public function count(): int + { + return \count($this->collected); + } +} diff --git a/src/RunOpenCode/Component/Dataset/src/Exception/UnsupportedException.php b/src/RunOpenCode/Component/Dataset/src/Exception/UnsupportedException.php new file mode 100644 index 0000000..6bc2127 --- /dev/null +++ b/src/RunOpenCode/Component/Dataset/src/Exception/UnsupportedException.php @@ -0,0 +1,18 @@ + + * + * @phpstan-ignore-next-line generics.variance + */ + public function stream(): Stream + { + return new Stream($this); + } + /** * Get first item in buffer. * diff --git a/src/RunOpenCode/Component/Dataset/src/Operator/LeftJoin.php b/src/RunOpenCode/Component/Dataset/src/Operator/LeftJoin.php new file mode 100644 index 0000000..d6fa7a6 --- /dev/null +++ b/src/RunOpenCode/Component/Dataset/src/Operator/LeftJoin.php @@ -0,0 +1,77 @@ + 'a', 2 => 'b', 3 => 'c'], + * right: [1 => 'x', 1 => 'y', 2 => 'z'], + * ); + * + * // The resulting sequence will be: + * // 1 => ['a', ['x', 'y']] + * // 2 => ['b', ['z']] + * // 3 => ['c', []] + * ``` + * + * @template TKey + * @template TLeftValue + * @template TRightValue + * + * @extends AbstractStream}> + * @implements OperatorInterface}> + */ +final class LeftJoin extends AbstractStream implements OperatorInterface +{ + /** + * @param iterable $source Stream source to iterate over on the left side of the left join operation. + * @param iterable $join Stream source to iterate over on the right side of the left join operation. + */ + public function __construct( + private readonly iterable $source, + private readonly iterable $join + ) { + parent::__construct($this->source); + } + + /** + * {@inheritdoc} + */ + protected function iterate(): \Traversable + { + $join = new Stream($this->join)->collect(IndexedCollector::class); + + foreach ($this->source as $key => $value) { + yield $key => [ + $value, + $join->offsetExists($key) ? $join->offsetGet($key) : [], + ]; + } + } +} diff --git a/src/RunOpenCode/Component/Dataset/src/Stream.php b/src/RunOpenCode/Component/Dataset/src/Stream.php index 65f5359..153ef43 100644 --- a/src/RunOpenCode/Component/Dataset/src/Stream.php +++ b/src/RunOpenCode/Component/Dataset/src/Stream.php @@ -30,6 +30,7 @@ use function RunOpenCode\Component\Dataset\if_empty as dataset_if_empty; use function RunOpenCode\Component\Dataset\overflow as dataset_overflow; use function RunOpenCode\Component\Dataset\operator as dataset_operator; +use function RunOpenCode\Component\Dataset\left_join as dataset_left_join; /** * Iterable data stream. @@ -177,14 +178,14 @@ public function ifEmpty(\Throwable|callable|null $fallback = null): self * @template TModifiedKey * @template TModifiedValue * - * @param callable(TValue, TKey=): TModifiedValue $valueTransform User defined callable to be called on each item. - * @param callable(TKey, TValue=): TModifiedKey|null $keyTransform User defined callable to be called on each item key. If null, original keys are preserved. + * @param callable(TValue, TKey=): TModifiedValue|null $valueTransform User defined callable to be called on each item. + * @param callable(TKey, TValue=): TModifiedKey|null $keyTransform User defined callable to be called on each item key. If null, original keys are preserved. * - * @return self<($keyTransform is null ? TKey : TModifiedKey), TModifiedValue> + * @return self<($keyTransform is null ? TKey : TModifiedKey), ($valueTransform is null ? TValue : TModifiedValue)> * * @see Operator\Map */ - public function map(callable $valueTransform, ?callable $keyTransform = null): self + public function map(?callable $valueTransform = null, ?callable $keyTransform = null): self { return dataset_map($this, $valueTransform, $keyTransform); } @@ -206,6 +207,22 @@ public function merge(iterable $collection): self return dataset_merge($this, $collection); } + /** + * Applies left join operator on current stream. + * + * @template TJoinValue + * + * @param iterable $join Stream source to iterate over on the right side of the left join operation. + * + * @return self}> + * + * @see Operator\LeftJoin + */ + public function leftJoin(iterable $join): Stream + { + return dataset_left_join($this, $join); + } + /** * Applies overflow operator. * diff --git a/src/RunOpenCode/Component/Dataset/src/functions.php b/src/RunOpenCode/Component/Dataset/src/functions.php index 4657cec..60f5452 100644 --- a/src/RunOpenCode/Component/Dataset/src/functions.php +++ b/src/RunOpenCode/Component/Dataset/src/functions.php @@ -277,6 +277,27 @@ function merge(iterable $first, iterable $second): Stream ); } +/** + * Create left join operator. + * + * @template TKey + * @template TLeftValue + * @template TRightValue + * + * @param iterable $source Stream source to iterate over on the left side of the left join operation. + * @param iterable $join Stream source to iterate over on the right side of the left join operation. + * + * @return Stream}> + * + * @see Operator\LeftJoin + */ +function left_join(iterable $source, iterable $join): Stream +{ + return new Stream( + new Operator\LeftJoin($source, $join) + ); +} + /** * Create overflow operator. * diff --git a/src/RunOpenCode/Component/Dataset/tests/Collector/IndexCollectorTest.php b/src/RunOpenCode/Component/Dataset/tests/Collector/IndexCollectorTest.php new file mode 100644 index 0000000..4733ccf --- /dev/null +++ b/src/RunOpenCode/Component/Dataset/tests/Collector/IndexCollectorTest.php @@ -0,0 +1,133 @@ + 1; + yield 'a' => 2; + yield 'b' => 3; + }; + + $collector = collect($generator(), IndexedCollector::class); + $collected = []; + + foreach ($collector as $key => $value) { + $collected[] = [$key, $value]; + } + $this->assertSame([ + ['a', 1], + ['a', 2], + ['b', 3], + ], $collected); + } + + #[Test] + public function rewindable(): void + { + $generator = static function(): iterable { + yield 'a' => 1; + yield 'a' => 2; + yield 'b' => 3; + }; + + $collector = collect($generator(), IndexedCollector::class); + + \iterator_to_array($collector); + \iterator_to_array($collector); + + $this->expectNotToPerformAssertions(); + } + + #[Test] + public function array_access_scalar_keys(): void + { + $generator = static function(): iterable { + yield 0 => 'a'; + yield 0 => 'b'; + yield 1 => 'c'; + }; + + $collector = collect($generator(), IndexedCollector::class); + + $this->assertSame(['a', 'b'], $collector[0]); + $this->assertSame(['c'], $collector[1]); + } + + #[Test] + public function array_access_object_keys(): void + { + $object1 = new \stdClass(); + $object2 = new \stdClass(); + + $generator = static function() use ($object1, $object2) { + yield $object1 => 'a'; + yield $object1 => 'b'; + yield $object2 => 'c'; + }; + + $collector = collect($generator(), IndexedCollector::class); + + $this->assertSame(['a', 'b'], $collector[$object1]); + $this->assertSame(['c'], $collector[$object2]); + } + + #[Test] + public function counts(): void + { + $dataset = [2, 10, 5, 1]; + + $collector = collect($dataset, IndexedCollector::class); + + $this->assertCount(4, $collector); + } + + #[Test] + public function aggregates(): void + { + $dataset = [2, 10]; + + $collector = stream($dataset) + ->aggregate('count', Count::class) + ->aggregate('sum', Sum::class) + ->aggregate('average', Average::class) + ->collect(IndexedCollector::class); + + $this->assertSame(2, $collector->aggregated['count']); + $this->assertSame(12, $collector->aggregated['sum']); + $this->assertEqualsWithDelta(6, $collector->aggregated['average'], 0.0001); + } + + #[Test] + public function array_access_set_throws_exception(): void + { + $this->expectException(LogicException::class); + + collect([], IndexedCollector::class)[10] = ['bar']; + } + + #[Test] + public function array_access_unset_throws_exception(): void + { + $this->expectException(LogicException::class); + + unset(collect([], IndexedCollector::class)[20]); + } +} diff --git a/src/RunOpenCode/Component/Dataset/tests/Model/BufferTest.php b/src/RunOpenCode/Component/Dataset/tests/Model/BufferTest.php index bd34088..0c0d79d 100644 --- a/src/RunOpenCode/Component/Dataset/tests/Model/BufferTest.php +++ b/src/RunOpenCode/Component/Dataset/tests/Model/BufferTest.php @@ -97,4 +97,18 @@ public function iterates(): void 'b' => 2, ], \iterator_to_array($buffer)); } + + #[Test] + public function streams(): void + { + $buffer = new Buffer(new \ArrayObject([ + ['a', 1], + ['b', 2], + ])); + + $this->assertSame([ + 'a' => 1, + 'b' => 2, + ], \iterator_to_array($buffer->stream())); + } } diff --git a/src/RunOpenCode/Component/Dataset/tests/Operator/LeftJoinTest.php b/src/RunOpenCode/Component/Dataset/tests/Operator/LeftJoinTest.php new file mode 100644 index 0000000..76bdffb --- /dev/null +++ b/src/RunOpenCode/Component/Dataset/tests/Operator/LeftJoinTest.php @@ -0,0 +1,44 @@ + 'a', 2 => 'b', 3 => 'c']; + $right = [1 => 'x', 2 => 'y']; + $joined = left_join($left, $right); + + $this->assertSame([ + 1 => ['a', ['x']], + 2 => ['b', ['y']], + 3 => ['c', []], + ], \iterator_to_array($joined)); + } + + #[Test] + public function left_joins_with_duplicate_right_keys(): void + { + $left = [1 => 'a', 2 => 'b']; + $right = (static function(): iterable { + yield 1 => 'x'; + yield 1 => 'y'; + yield 1 => 'z'; + })(); + $joined = left_join($left, $right); + + $this->assertSame([ + 1 => ['a', ['x', 'y', 'z']], + 2 => ['b', []], + ], \iterator_to_array($joined)); + } +} diff --git a/src/RunOpenCode/Component/Dataset/tests/StreamTest.php b/src/RunOpenCode/Component/Dataset/tests/StreamTest.php index ef8f725..44f8027 100644 --- a/src/RunOpenCode/Component/Dataset/tests/StreamTest.php +++ b/src/RunOpenCode/Component/Dataset/tests/StreamTest.php @@ -160,6 +160,23 @@ public function flatten(): void ], $data); } + public function left_join(): void + { + $left = [1 => 'a', 2 => 'b', 3 => 'c']; + $right = [1 => 'x', 2 => 'y']; + + $joined = new Stream($left) + ->leftJoin($right) + ->collect(ArrayCollector::class) + ->value; + + $this->assertSame([ + 1 => ['a', ['x']], + 2 => ['b', ['y']], + 3 => ['c', []], + ], $joined); + } + #[Test] public function map(): void {