Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,6 @@ venv/*

# Eclipse
.settings

# LLMs
CLAUDE.md
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ repos:
language_version: python3.9

- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.15.0
rev: v1.17.0
hooks:
- id: mypy
files: flupy/
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "flupy"
version = "1.2.2"
version = "1.2.3"
description = "Fluent data processing in Python - a chainable stream processing library for expressive data manipulation using method chaining"
authors = ["Oliver Rice <oliver@oliverrice.com>"]
license = "MIT"
Expand Down
51 changes: 51 additions & 0 deletions src/flupy/fluent.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
Callable,
Collection,
Deque,
Dict,
Generator,
Generic,
Hashable,
Expand Down Expand Up @@ -314,6 +315,56 @@ def _impl() -> Generator[Tuple[T, _T1], None, None]:

return Fluent(_impl())

def join_full(
self,
other: Iterable[_T1],
key: Callable[[T], Hashable] = identity,
other_key: Callable[[_T1], Hashable] = identity,
) -> "Fluent[Tuple[Union[T, None], Union[_T1, None]]]":
"""Join the iterable with another iterable using equality between *key* applied to self and *other_key* applied to *other* to identify matching entries

Returns all entries from both iterables. When no matching entry is found, entries are paired with None

Note: join_full loads both *self* and *other* into memory

>>> flu(range(4)).join_full(range(2, 6)).to_list()
[(0, None), (1, None), (2, 2), (3, 3), (None, 4), (None, 5)]
"""

def _impl() -> Generator[Tuple[Union[T, None], Union[_T1, None]], None, None]:

# Build lookup for other
other_lookup: Dict[Hashable, List[_T1]] = defaultdict(list)
other_keys_seen: Set[Hashable] = set()

for entry_other in other:
other_key_val = other_key(entry_other)
other_lookup[other_key_val].append(entry_other)
other_keys_seen.add(other_key_val)

# Track which keys from other have been matched
matched_other_keys: Set[Hashable] = set()

# Process all entries from self
for entry in self:
entry_key = key(entry)
matches: Optional[List[_T1]] = other_lookup.get(entry_key)

if matches:
matched_other_keys.add(entry_key)
for match in matches:
yield (entry, match)
else:
yield (entry, None)

# Yield unmatched entries from other
unmatched_keys = other_keys_seen - matched_other_keys
for unmatched_key in unmatched_keys:
for entry_other in other_lookup[unmatched_key]:
yield (None, entry_other)

return Fluent(_impl())

def shuffle(self) -> "Fluent[T]":
"""Randomize the order of elements in the interable

Expand Down
40 changes: 40 additions & 0 deletions src/tests/test_flu.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,3 +387,43 @@ def test_join_inner():
# Default unpacking
res = flu(range(6)).join_inner(range(0, 6, 2)).collect()
assert res == [(0, 0), (2, 2), (4, 4)]


def test_join_full():
# Basic full join
res = flu(range(4)).join_full(range(2, 6)).collect()
assert res == [(0, None), (1, None), (2, 2), (3, 3), (None, 4), (None, 5)]

# Full join with custom keys
left = [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]
right = [{"id": 2, "value": 100}, {"id": 3, "value": 200}]
res = flu(left).join_full(right, key=lambda x: x["id"], other_key=lambda x: x["id"]).collect()
assert res == [
({"id": 1, "name": "Alice"}, None),
({"id": 2, "name": "Bob"}, {"id": 2, "value": 100}),
(None, {"id": 3, "value": 200}),
]

# Full join with empty left
res = flu([]).join_full(range(3)).collect()
assert res == [(None, 0), (None, 1), (None, 2)]

# Full join with empty right
res = flu(range(3)).join_full([]).collect()
assert res == [(0, None), (1, None), (2, None)]

# Full join with both empty
res = flu([]).join_full([]).collect()
assert res == []

# Full join with duplicates
res = flu([1, 2, 2, 3]).join_full([2, 2, 4]).collect()
expected = [(1, None), (2, 2), (2, 2), (2, 2), (2, 2), (3, None), (None, 4)] # 2x2 cartesian product
# Sort with custom key to handle None values
sort_key = lambda x: (
x[0] is None,
x[0] if x[0] is not None else -1,
x[1] is None,
x[1] if x[1] is not None else -1,
)
assert sorted(res, key=sort_key) == sorted(expected, key=sort_key)