Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/source/components/dataset/operators/list/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Available operators
finalize
flatten
if_empty
left_join
map
merge
overflow
Expand Down
97 changes: 97 additions & 0 deletions docs/source/components/dataset/operators/list/left_join.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
==========
leftJoin()
==========

This operator mimics the SQL LEFT JOIN found in relational databases.

It iterates over the source stream and joins each item with values from the
joining stream based on strict key equality.

The operator yields the key from the source stream and a tuple containing the
source value as the first element and an iterable of joined values from the
joining stream as the second element.


.. warning:: This operator is not memory-efficient. Memory consumption depends
on the size of the joining stream.


.. php:namespace:: RunOpenCode\Component\Dataset\Operator


.. php:class:: LeftJoin

.. php:method:: __construct(iterable<TKey, TValue> $source, iterable<TKey, TJoinValue> $join)

:param $source: ``iterable<TKey, TValue>`` Stream source to iterate over on the left side of the left join operation.
:param $join: ``iterable<TKey, TJoinValue>`` Stream source to iterate over on the right side of the left join operation.


.. php:method:: getIterator()

:returns: ``\Traversable<TKey, TValue>`` Stream source joined with values from joining stream source.

Use cases
---------

* Join operation is required which can not be executed on database level.

Example
-------

Migrate users and their addresses from legacy system in batches.



.. code-block:: php
:linenos:

<?php

use App\Model\User;
use App\Model\Address;
use RunOpenCode\Component\Dataset\Stream;
use RunOpenCode\Component\Dataset\Model\Buffer;

$users = $database->executeQuery('SELECT * FROM users...');

new Stream($users)
->bufferCount(100)
->map(function(Buffer $buffer) use ($database): array {
$users = $buffer
->stream()
->map(keyTransform: static fn(array $row): int => $row['id'])
->collect(ArrayCollector::class);

$addresses = new Stream($database->executeQuery('SELECT * FROM addresses ... WHERE user_id IN (:ids)', [
'ids' => \array_keys($users),
]))->map(keyTransform: static fn(array $row): int => $row['user_id']);

return [ $users, $addresses ];
})
->map(function(array $batch): Stream {
[$users, $addresses] = $batch;

return new Stream($users)
->leftJoin($addresses);
})
->flatten()
->map(valueTransform: function(array $joined): User {
[$user, $addresses] = $joined;

return User::fromArray($user)
->setAddresses(\array_map(static fn(array $address): Address => Address::fromArray($address), $addresses));
})
->bufferCount(100)
->tap(function(Buffer $buffer) use ($orm) {
$buffer
->stream()
->tap(function(User $user) use ($orm) {
$orm->persist($user);
})
->flush();

$orm->flush();
$orm->clear();
})
->flush();
175 changes: 175 additions & 0 deletions src/RunOpenCode/Component/Dataset/src/Collector/IndexedCollector.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
<?php

declare(strict_types=1);

namespace RunOpenCode\Component\Dataset\Collector;

use RunOpenCode\Component\Dataset\Contract\CollectorInterface;
use RunOpenCode\Component\Dataset\Contract\StreamInterface;
use RunOpenCode\Component\Dataset\Exception\LogicException;
use RunOpenCode\Component\Dataset\Exception\OutOfBoundsException;
use RunOpenCode\Component\Dataset\Exception\UnsupportedException;

/**
* Collects items into an iterable, indexing values by their keys.
*
* The collector assumes that keys are not unique; therefore, accessing
* a value by key returns a list of values.
*
* Currently, the allowed key types are scalar values
* (int, float, string, bool, null) and objects.
*
* @template TKey
* @template TValue
*
* @implements CollectorInterface<iterable<TKey,TValue>>
* @implements \IteratorAggregate<TKey, TValue>
* @implements \ArrayAccess<TKey, list<TValue>>
*/
final class IndexedCollector implements CollectorInterface, \ArrayAccess, \IteratorAggregate, \Countable
{
/**
* {@inheritdoc}
*/
public mixed $value {
get => $this->getIterator();
}

/**
* {@inheritdoc}
*/
public array $aggregated {
get => $this->source instanceof StreamInterface ? $this->source->aggregated : [];
}

/**
* {@inheritdoc}
*/
public bool $closed {
get => false;
}

/**
* Index of values with keys of scalar type.
*
* @var array<TKey, list<TValue>>
*/
private array $scalarIndex = [];

/**
* Index of values with keys of object type.
*
* @var \SplObjectStorage<TKey&object, list<TValue>>
*/
private \SplObjectStorage $objectIndex;

/**
* Collected values from stream.
*
* @var array<array{TKey, TValue}>
*/
private array $collected = [];

/**
* @param iterable<TKey, TValue> $source Stream source to collect.
*/
public function __construct(
private readonly iterable $source,
) {
$this->objectIndex = new \SplObjectStorage();

foreach ($this->source as $key => $value) {
$this->collected[] = [$key, $value];

if (\is_string($key) || \is_int($key)) {
$this->scalarIndex[$key] = $this->scalarIndex[$key] ?? [];
$this->scalarIndex[$key][] = $value;
continue;
}

if (\is_object($key)) {
$current = $this->objectIndex->contains($key) ? $this->objectIndex[$key] : [];

$current[] = $value;
$this->objectIndex[$key] = $current;
continue;
}

throw new UnsupportedException('Only object, string and integer keys are supported.');
}
}

/**
* {@inheritdoc}
*/
public function getIterator(): \Traversable
{
foreach ($this->collected as [$key, $value]) {
yield $key => $value;
}
}

/**
* {@inheritdoc}
*/
public function offsetExists(mixed $offset): bool
{
return match (true) {
\is_string($offset) || \is_int($offset) => \array_key_exists($offset, $this->scalarIndex),
\is_object($offset) => $this->objectIndex->contains($offset),
default => throw new UnsupportedException('Only object, string and integer keys are supported.'),
};
}

/**
* Get values for given key.
*
* @param TKey $offset
*
* @return list<TValue>
*/
public function offsetGet(mixed $offset): mixed
{
if (!$this->offsetExists($offset)) {
throw new OutOfBoundsException($offset, $this->value);
}

return match (true) {
\is_string($offset) || \is_int($offset) => $this->scalarIndex[$offset],
\is_object($offset) => $this->objectIndex[$offset],
default => throw new UnsupportedException('Only object, string and integer keys are supported.'),
};
}

/**
* {@inheritdoc}
*/
public function offsetSet(mixed $offset, mixed $value): void
{
throw new LogicException(\sprintf(
'Cannot set value for key "%s". Collector "%s" is read-only.',
\var_export($offset, true),
self::class,
));
}

/**
* {@inheritdoc}
*/
public function offsetUnset(mixed $offset): void
{
throw new LogicException(\sprintf(
'Cannot unset value for key "%s". Collector "%s" is read-only.',
\var_export($offset, true),
self::class,
));
}

/**
* {@inheritdoc}
*/
public function count(): int
{
return \count($this->collected);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?php

declare(strict_types=1);

namespace RunOpenCode\Component\Dataset\Exception;

/**
* Thrown when an unsupported operation is attempted.
*
* Lack of support may be due to various reasons, such as:
*
* - The feature is not implemented in the current version.
* - The underlying system or library does not provide support for the requested operation.
* - The operation is not applicable in the current context or configuration.
*/
class UnsupportedException extends RuntimeException
{
}
13 changes: 13 additions & 0 deletions src/RunOpenCode/Component/Dataset/src/Model/Buffer.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace RunOpenCode\Component\Dataset\Model;

use RunOpenCode\Component\Dataset\Exception\LogicException;
use RunOpenCode\Component\Dataset\Stream;

/**
* Buffer of iterated items from collection.
Expand All @@ -30,6 +31,18 @@ public function __construct(private \ArrayObject $items)
// noop.
}

/**
* Create stream from buffer.
*
* @return Stream<TKey, TValue>
*
* @phpstan-ignore-next-line generics.variance
*/
public function stream(): Stream
{
return new Stream($this);
}

/**
* Get first item in buffer.
*
Expand Down
Loading