Skip to content
Merged

Dev #12

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/source/components/dataset/operators/list/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Available operators
finalize
flatten
if_empty
left_join
map
merge
overflow
Expand Down
97 changes: 97 additions & 0 deletions docs/source/components/dataset/operators/list/left_join.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
==========
leftJoin()
==========

This operator mimics the SQL LEFT JOIN found in relational databases.

It iterates over the source stream and joins each item with values from the
joining stream based on strict key equality.

The operator yields the key from the source stream and a tuple containing the
source value as the first element and an iterable of joined values from the
joining stream as the second element.


.. warning:: This operator is not memory-efficient. Memory consumption depends
on the size of the joining stream.


.. php:namespace:: RunOpenCode\Component\Dataset\Operator


.. php:class:: LeftJoin

.. php:method:: __construct(iterable<TKey, TValue> $source, iterable<TKey, TJoinValue> $join)

:param $source: ``iterable<TKey, TValue>`` Stream source to iterate over on the left side of the left join operation.
:param $join: ``iterable<TKey, TJoinValue>`` Stream source to iterate over on the right side of the left join operation.


.. php:method:: getIterator()

:returns: ``\Traversable<TKey, TValue>`` Stream source joined with values from joining stream source.

Use cases
---------

* Join operation is required which can not be executed on database level.

Example
-------

Migrate users and their addresses from legacy system in batches.



.. code-block:: php
:linenos:

<?php

use App\Model\User;
use App\Model\Address;
use RunOpenCode\Component\Dataset\Stream;
use RunOpenCode\Component\Dataset\Model\Buffer;

$users = $database->executeQuery('SELECT * FROM users...');

new Stream($users)
->bufferCount(100)
->map(function(Buffer $buffer) use ($database): array {
$users = $buffer
->stream()
->map(keyTransform: static fn(array $row): int => $row['id'])
->collect(ArrayCollector::class);

$addresses = new Stream($database->executeQuery('SELECT * FROM addresses ... WHERE user_id IN (:ids)', [
'ids' => \array_keys($users),
]))->map(keyTransform: static fn(array $row): int => $row['user_id']);

return [ $users, $addresses ];
})
->map(function(array $batch): Stream {
[$users, $addresses] = $batch;

return new Stream($users)
->leftJoin($addresses);
})
->flatten()
->map(valueTransform: function(array $joined): User {
[$user, $addresses] = $joined;

return User::fromArray($user)
->setAddresses(\array_map(static fn(array $address): Address => Address::fromArray($address), $addresses));
})
->bufferCount(100)
->tap(function(Buffer $buffer) use ($orm) {
$buffer
->stream()
->tap(function(User $user) use ($orm) {
$orm->persist($user);
})
->flush();

$orm->flush();
$orm->clear();
})
->flush();
175 changes: 175 additions & 0 deletions src/RunOpenCode/Component/Dataset/src/Collector/IndexedCollector.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
<?php

declare(strict_types=1);

namespace RunOpenCode\Component\Dataset\Collector;

use RunOpenCode\Component\Dataset\Contract\CollectorInterface;
use RunOpenCode\Component\Dataset\Contract\StreamInterface;
use RunOpenCode\Component\Dataset\Exception\LogicException;
use RunOpenCode\Component\Dataset\Exception\OutOfBoundsException;
use RunOpenCode\Component\Dataset\Exception\UnsupportedException;

/**
* Collects items into an iterable, indexing values by their keys.
*
* The collector assumes that keys are not unique; therefore, accessing
* a value by key returns a list of values.
*
* Currently, the allowed key types are scalar values
* (int, float, string, bool, null) and objects.
Comment on lines +19 to +20
Copy link

Copilot AI Dec 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistency between documentation and implementation. The class-level documentation (line 19) states that "allowed key types are scalar values (int, float, string, bool, null) and objects", but the actual implementation only supports int, string, and object keys (as seen in lines 84-98, 117-121, and 137-141). Either update the documentation to accurately reflect the implementation, or extend the implementation to support all mentioned scalar types.

Suggested change
* Currently, the allowed key types are scalar values
* (int, float, string, bool, null) and objects.
* Currently, the allowed key types are integers, strings, and objects.

Copilot uses AI. Check for mistakes.
*
* @template TKey
* @template TValue
*
* @implements CollectorInterface<iterable<TKey,TValue>>
* @implements \IteratorAggregate<TKey, TValue>
* @implements \ArrayAccess<TKey, list<TValue>>
*/
final class IndexedCollector implements CollectorInterface, \ArrayAccess, \IteratorAggregate, \Countable
{
/**
* {@inheritdoc}
*/
public mixed $value {
get => $this->getIterator();
}

/**
* {@inheritdoc}
*/
public array $aggregated {
get => $this->source instanceof StreamInterface ? $this->source->aggregated : [];
}

/**
* {@inheritdoc}
*/
public bool $closed {
get => false;
}

/**
* Index of values with keys of scalar type.
*
* @var array<TKey, list<TValue>>
*/
private array $scalarIndex = [];

/**
* Index of values with keys of object type.
*
* @var \SplObjectStorage<TKey&object, list<TValue>>
*/
private \SplObjectStorage $objectIndex;

/**
* Collected values from stream.
*
* @var array<array{TKey, TValue}>
*/
private array $collected = [];

/**
* @param iterable<TKey, TValue> $source Stream source to collect.
*/
public function __construct(
private readonly iterable $source,
) {
$this->objectIndex = new \SplObjectStorage();

foreach ($this->source as $key => $value) {
$this->collected[] = [$key, $value];

if (\is_string($key) || \is_int($key)) {
$this->scalarIndex[$key] = $this->scalarIndex[$key] ?? [];
$this->scalarIndex[$key][] = $value;
continue;
}

if (\is_object($key)) {
$current = $this->objectIndex->contains($key) ? $this->objectIndex[$key] : [];

$current[] = $value;
$this->objectIndex[$key] = $current;
continue;
}

throw new UnsupportedException('Only object, string and integer keys are supported.');
Copy link

Copilot AI Dec 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorrect PHPDoc comment. The doc comment states "Only object, string and integer keys are supported" but the code also handles float, bool, and null types as scalar values (line 84 checks \is_string($key) || \is_int($key) which would fail for other scalar types, but the condition structure suggests broader scalar support was intended). Consider either updating the error message to be accurate or adjusting the code to support all scalar types as documented.

Copilot uses AI. Check for mistakes.
}
}

/**
* {@inheritdoc}
*/
public function getIterator(): \Traversable
{
foreach ($this->collected as [$key, $value]) {
yield $key => $value;
}
}

/**
* {@inheritdoc}
*/
public function offsetExists(mixed $offset): bool
{
return match (true) {
\is_string($offset) || \is_int($offset) => \array_key_exists($offset, $this->scalarIndex),
\is_object($offset) => $this->objectIndex->contains($offset),
default => throw new UnsupportedException('Only object, string and integer keys are supported.'),
Copy link

Copilot AI Dec 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorrect PHPDoc comment. The doc comment states "Only object, string and integer keys are supported" but line 19 in the class documentation claims "Currently, the allowed key types are scalar values (int, float, string, bool, null) and objects." The implementation only supports int, string, and object keys (lines 84, 90, 118, 138). Either update the error message to match the implementation or extend the implementation to support all scalar types as documented.

Copilot uses AI. Check for mistakes.
};
}

/**
* Get values for given key.
*
* @param TKey $offset
*
* @return list<TValue>
*/
public function offsetGet(mixed $offset): mixed
{
if (!$this->offsetExists($offset)) {
throw new OutOfBoundsException($offset, $this->value);
}

return match (true) {
\is_string($offset) || \is_int($offset) => $this->scalarIndex[$offset],
\is_object($offset) => $this->objectIndex[$offset],
default => throw new UnsupportedException('Only object, string and integer keys are supported.'),
Copy link

Copilot AI Dec 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorrect PHPDoc comment. The doc comment states "Only object, string and integer keys are supported" which is inconsistent with line 19 in the class documentation that claims support for all scalar types (int, float, string, bool, null) and objects. The implementation only supports int, string, and object keys. This error message should be consistent with the actual implementation and class-level documentation.

Copilot uses AI. Check for mistakes.
};
}

/**
* {@inheritdoc}
*/
public function offsetSet(mixed $offset, mixed $value): void
{
throw new LogicException(\sprintf(
'Cannot set value for key "%s". Collector "%s" is read-only.',
\var_export($offset, true),
self::class,
));
}

/**
* {@inheritdoc}
*/
public function offsetUnset(mixed $offset): void
{
throw new LogicException(\sprintf(
'Cannot unset value for key "%s". Collector "%s" is read-only.',
\var_export($offset, true),
self::class,
));
}

/**
* {@inheritdoc}
*/
public function count(): int
{
return \count($this->collected);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?php

declare(strict_types=1);

namespace RunOpenCode\Component\Dataset\Exception;

/**
* Thrown when an unsupported operation is attempted.
*
* Lack of support may be due to various reasons, such as:
*
* - The feature is not implemented in the current version.
* - The underlying system or library does not provide support for the requested operation.
* - The operation is not applicable in the current context or configuration.
*/
class UnsupportedException extends RuntimeException
{
}
13 changes: 13 additions & 0 deletions src/RunOpenCode/Component/Dataset/src/Model/Buffer.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace RunOpenCode\Component\Dataset\Model;

use RunOpenCode\Component\Dataset\Exception\LogicException;
use RunOpenCode\Component\Dataset\Stream;

/**
* Buffer of iterated items from collection.
Expand All @@ -30,6 +31,18 @@ public function __construct(private \ArrayObject $items)
// noop.
}

/**
* Create stream from buffer.
*
* @return Stream<TKey, TValue>
*
* @phpstan-ignore-next-line generics.variance
*/
public function stream(): Stream
{
return new Stream($this);
}

/**
* Get first item in buffer.
*
Expand Down
Loading