diff --git a/src/spellbind/actions.py b/src/spellbind/actions.py index 9ba7113..6a558fb 100644 --- a/src/spellbind/actions.py +++ b/src/spellbind/actions.py @@ -19,6 +19,9 @@ def is_permutation_only(self) -> bool: ... @abstractmethod def map(self, transformer: Callable[[_S_co], _T]) -> CollectionAction[_T]: ... + @abstractmethod + def filter(self, predicate: Callable[[_S_co], bool]) -> CollectionAction[_S_co] | None: ... + @override def __repr__(self) -> str: return f"{self.__class__.__name__}()" @@ -34,6 +37,10 @@ def is_permutation_only(self) -> bool: def map(self, transformer: Callable[[_S_co], _T]) -> ClearAction[_T]: return clear_action() + @override + def filter(self, predicate: Callable[[_S_co], bool]) -> ClearAction[_S_co]: + return self + class SingleValueAction(CollectionAction[_S_co], Generic[_S_co]): @property @@ -56,6 +63,13 @@ def map(self, transformer: Callable[[_S_co], _T]) -> DeltasAction[_T]: mapped = tuple(action.map(transformer) for action in self.delta_actions) return SimpleDeltasAction(mapped) + @override + def filter(self, predicate: Callable[[_S_co], bool]) -> DeltasAction[_S_co] | None: + filtered_actions = tuple(action for action in self.delta_actions if predicate(action.value)) + if not filtered_actions: + return None + return SimpleDeltasAction(filtered_actions) + class SimpleDeltasAction(DeltasAction[_S_co], Generic[_S_co]): def __init__(self, delta_actions: tuple[DeltaAction[_S_co], ...]): @@ -86,6 +100,10 @@ def delta_actions(self) -> tuple[DeltaAction[_S_co], ...]: @override def map(self, transformer: Callable[[_S_co], _T]) -> DeltaAction[_T]: ... + @abstractmethod + @override + def filter(self, predicate: Callable[[_S_co], bool]) -> DeltaAction[_S_co] | None: ... + class AddOneAction(DeltaAction[_S_co], Generic[_S_co], ABC): @property @@ -97,6 +115,12 @@ def is_add(self) -> bool: def map(self, transformer: Callable[[_S_co], _T]) -> AddOneAction[_T]: return SimpleAddOneAction(transformer(self.value)) + @override + def filter(self, predicate: Callable[[_S_co], bool]) -> AddOneAction[_S_co] | None: + if predicate(self.value): + return SimpleAddOneAction(self.value) + return None + @override def __repr__(self) -> str: return f"{self.__class__.__name__}(value={self.value})" @@ -111,6 +135,12 @@ def __init__(self, item: _S_co) -> None: def value(self) -> _S_co: return self._item + @override + def __eq__(self, other: object) -> bool: + if not isinstance(other, AddOneAction): + return NotImplemented + return bool(self.value == other.value) + class RemoveOneAction(DeltaAction[_S_co], Generic[_S_co], ABC): @property @@ -122,6 +152,12 @@ def is_add(self) -> bool: def map(self, transformer: Callable[[_S_co], _T]) -> RemoveOneAction[_T]: return SimpleRemoveOneAction(transformer(self.value)) + @override + def filter(self, predicate: Callable[[_S_co], bool]) -> RemoveOneAction[_S_co] | None: + if predicate(self.value): + return SimpleRemoveOneAction(self.value) + return None + @override def __repr__(self) -> str: return f"{self.__class__.__name__}(value={self.value})" @@ -136,6 +172,12 @@ def __init__(self, item: _S_co) -> None: def value(self) -> _S_co: return self._item + @override + def __eq__(self, other: object) -> bool: + if not isinstance(other, RemoveOneAction): + return NotImplemented + return bool(self.value == other.value) + class ElementsChangedAction(DeltasAction[_S_co], Generic[_S_co], ABC): @property @@ -195,6 +237,19 @@ def delta_actions(self) -> tuple[DeltaAction[_S_co], ...]: def map(self, transformer: Callable[[_S_co], _T]) -> OneElementChangedAction[_T]: return SimpleOneElementChangedAction(new_item=transformer(self.new_item), old_item=transformer(self.old_item)) + @override + def filter(self, predicate: Callable[[_S_co], bool]) -> DeltasAction[_S_co] | None: + old_matches = predicate(self.old_item) + new_matches = predicate(self.new_item) + if old_matches and new_matches: + return SimpleOneElementChangedAction(new_item=self.new_item, old_item=self.old_item) + elif old_matches: + return SimpleRemoveOneAction(self.old_item) + elif new_matches: + return SimpleAddOneAction(self.new_item) + else: + return None + class SimpleOneElementChangedAction(OneElementChangedAction[_S_co], Generic[_S_co]): def __init__(self, *, new_item: _S_co, old_item: _S_co): @@ -614,6 +669,10 @@ def is_permutation_only(self) -> bool: def map(self, transformer: Callable[[_S_co], _T]) -> ReverseAction[_T]: return reverse_action() + @override + def filter(self, predicate: Callable[[_S_co], bool]) -> ReverseAction[_S_co]: + return self + class ExtendAction(AtIndicesDeltasAction[_S_co], SequenceAction[_S_co], Generic[_S_co], ABC): @property diff --git a/src/spellbind/observable_collections.py b/src/spellbind/observable_collections.py index 6a15977..4b7d590 100644 --- a/src/spellbind/observable_collections.py +++ b/src/spellbind/observable_collections.py @@ -2,15 +2,13 @@ import functools from abc import ABC, abstractmethod -from typing import TypeVar, Generic, Collection, Callable, Iterable, Iterator, Any +from typing import TypeVar, Generic, Collection, Callable, Iterable, Iterator, Any, override -from typing_extensions import override - -from spellbind.actions import CollectionAction, DeltaAction, DeltasAction, ClearAction +from spellbind.actions import CollectionAction, DeltaAction, DeltasAction, ClearAction, ReverseAction, clear_action from spellbind.bool_values import BoolValue from spellbind.deriveds import Derived -from spellbind.event import BiEvent -from spellbind.int_values import IntValue +from spellbind.event import BiEvent, ValueEvent +from spellbind.int_values import IntValue, IntVariable from spellbind.observables import ValuesObservable, ValueObservable, Observer, ValueObserver, BiObserver, \ Subscription from spellbind.str_values import StrValue @@ -86,6 +84,9 @@ def reduce_to_int(self, remove_reducer=remove_reducer, initial=initial) + def filter_to_bag(self, predicate: Callable[[_S_co], bool]) -> FilteredObservableBag[_S_co]: + return FilteredObservableBag(self, predicate) + class ReducedValue(Value[_S], Generic[_S]): def __init__(self, @@ -213,3 +214,101 @@ def unobserve(self, observer: Observer | ValueObserver[_S] | BiObserver[_S, _S]) @override def is_observed(self, by: Callable[..., Any] | None = None) -> bool: return self._on_change.is_observed(by=by) + + +class FilteredObservableBag(ObservableCollection[_S], Generic[_S]): + def __init__(self, source: ObservableCollection[_S], predicate: Callable[[_S], bool]) -> None: + self._source = source + self._predicate = predicate + self._item_counts: dict[_S, int] = {} + + total_count = 0 + for item in source: + if predicate(item): + self._item_counts[item] = self._item_counts.get(item, 0) + 1 + total_count += 1 + + self._on_change = ValueEvent[CollectionAction[_S]]() + self._delta_event = ValueEvent[DeltasAction[_S]]() + self._delta_observable = self._delta_event.map_to_values_observable( + transformer=lambda deltas_action: deltas_action.delta_actions + ) + self._len_value = IntVariable(total_count) + + source.on_change.observe(self._on_source_action) + + def _on_source_action(self, action: CollectionAction[_S]) -> None: + if isinstance(action, ClearAction): + self._clear() + elif isinstance(action, ReverseAction): + pass + elif isinstance(action, DeltasAction): + filtered_action = action.filter(self._predicate) + if filtered_action is not None: + total_count = self._len_value.value + for delta in filtered_action.delta_actions: + if delta.is_add: + self._item_counts[delta.value] = self._item_counts.get(delta.value, 0) + 1 + total_count += 1 + else: + count = self._item_counts.get(delta.value, 0) + if count > 0: + if count == 1: + del self._item_counts[delta.value] + else: + self._item_counts[delta.value] = count - 1 + total_count -= 1 + + if self._is_observed(): + with self._len_value.set_delay_notify(total_count): + self._on_change(filtered_action) + self._delta_event(filtered_action) + else: + self._len_value.value = total_count + + def _clear(self) -> None: + if self._len_value.value == 0: + return + self._item_counts.clear() + if self._is_observed(): + with self._len_value.set_delay_notify(0): + action: ClearAction[_S] = clear_action() + self._on_change(action) + else: + self._len_value.value = 0 + + def _is_observed(self) -> bool: + return self._on_change.is_observed() or self._delta_event.is_observed() + + @property + @override + def on_change(self) -> ValueObservable[CollectionAction[_S]]: + return self._on_change + + @property + @override + def delta_observable(self) -> ValuesObservable[DeltaAction[_S]]: + return self._delta_observable + + @property + @override + def length_value(self) -> IntValue: + return self._len_value + + @override + def __len__(self) -> int: + return self._len_value.value + + @override + def __contains__(self, item: object) -> bool: + return item in self._item_counts + + @override + def __iter__(self) -> Iterator[_S]: + for item, count in self._item_counts.items(): + for _ in range(count): + yield item + + @override + def __repr__(self) -> str: + return f"{self.__class__.__name__}({list(self)!r})" diff --git a/src/spellbind/observable_sequences.py b/src/spellbind/observable_sequences.py index 9c04deb..f594911 100644 --- a/src/spellbind/observable_sequences.py +++ b/src/spellbind/observable_sequences.py @@ -860,7 +860,7 @@ def on_action(other_action: AtIndicesDeltasAction[_T] | ClearAction[_T] | Revers else: for delta in other_action.delta_actions: if delta.is_add: - value = self._map_func(delta.value) + value: _S = self._map_func(delta.value) self._values.insert(delta.index, value) else: del self._values[delta.index] diff --git a/tests/conftest.py b/tests/conftest.py index 25e3529..c15c681 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -241,6 +241,41 @@ def assert_single_action(self, action: CollectionAction): self.on_change_observer.assert_called_once_with(action) +class ValueCollectionObservers(Observers): + def __init__(self, collection: ObservableCollection): + self.on_change_observer = OneParameterObserver() + self.delta_observer = OneParameterObserver() + collection.on_change.observe(self.on_change_observer) + collection.delta_observable.observe_single(self.delta_observer) + super().__init__(self.on_change_observer, self.delta_observer) + + def assert_added_calls(self, *expected_adds: Any): + self.assert_calls(*(append_bool(add, True) for add in expected_adds)) + + def assert_removed_calls(self, *expected_removes: Any): + self.assert_calls(*(append_bool(remove, False) for remove in expected_removes)) + + def assert_calls(self, *expected_calls: tuple[Any, bool]): + delta_calls = self.delta_observer.calls + if not len(delta_calls) == len(expected_calls): + pytest.fail(f"Expected {len(expected_calls)} calls, got {len(delta_calls)}") + for i, (call, expected_call) in enumerate(zip(delta_calls, expected_calls)): + action = call.get_arg() + assert isinstance(action, DeltaAction) + expected_value, expected_added = expected_call + if not action.is_add == expected_added: + pytest.fail(f"Error call {i}. Expected {'add' if expected_added else 'remove'}, got {'add' if action.is_add else 'remove'}") + if not action.value == expected_value: + pytest.fail(f"Error call {i}. Expected value {expected_value}, got {action.value}") + + def assert_actions(self, *actions: CollectionAction): + assert self.on_change_observer.calls == [*actions] + + def assert_single_action(self, action: CollectionAction): + assert len(self.on_change_observer.calls) == 1 + assert self.on_change_observer.calls[0] == action + + @contextmanager def assert_length_changed_during_action_events_but_notifies_after(collection: ObservableCollection, expected_length: int): events = [] diff --git a/tests/test_collections/test_filtered_observable_bag.py b/tests/test_collections/test_filtered_observable_bag.py new file mode 100644 index 0000000..07998a0 --- /dev/null +++ b/tests/test_collections/test_filtered_observable_bag.py @@ -0,0 +1,549 @@ +from conftest import ValueCollectionObservers, OneParameterObserver +from spellbind.actions import clear_action, SimpleRemoveOneAction, SimpleAddOneAction, SimpleOneElementChangedAction +from spellbind.observable_sequences import ObservableList +from spellbind.observable_collections import FilteredObservableBag + + +def test_initialize_empty(): + source = ObservableList() + filtered = FilteredObservableBag(source, lambda x: x % 2 == 0) + assert len(filtered) == 0 + assert list(filtered) == [] + + +def test_initialize_with_matching_items(): + source = ObservableList([1, 2, 3, 4, 5]) + filtered = FilteredObservableBag(source, lambda x: x % 2 == 0) + assert len(filtered) == 2 + assert sorted(filtered) == [2, 4] + + +def test_initialize_with_no_matching_items(): + source = ObservableList([1, 3, 5, 7]) + filtered = FilteredObservableBag(source, lambda x: x % 2 == 0) + assert len(filtered) == 0 + assert list(filtered) == [] + + +def test_initialize_with_duplicates(): + source = ObservableList([2, 2, 4, 2]) + filtered = FilteredObservableBag(source, lambda x: x % 2 == 0) + assert len(filtered) == 4 + assert sorted(filtered) == [2, 2, 2, 4] + + +def test_contains_matching_item(): + source = ObservableList([1, 2, 3, 4]) + filtered = FilteredObservableBag(source, lambda x: x % 2 == 0) + assert 2 in filtered + assert 4 in filtered + + +def test_does_not_contain_non_matching_item(): + source = ObservableList([1, 2, 3, 4]) + filtered = FilteredObservableBag(source, lambda x: x % 2 == 0) + assert 1 not in filtered + assert 3 not in filtered + + +def test_append_matching_item(): + source = ObservableList([1, 2, 3]) + filtered = FilteredObservableBag(source, lambda x: x % 2 == 0) + observers = ValueCollectionObservers(filtered) + + source.append(4) + + assert len(filtered) == 2 + assert sorted(filtered) == [2, 4] + observers.assert_added_calls(4) + observers.assert_single_action(SimpleAddOneAction(4)) + + +def test_append_non_matching_item(): + source = ObservableList([2, 4]) + filtered = FilteredObservableBag(source, lambda x: x % 2 == 0) + observers = ValueCollectionObservers(filtered) + + source.append(5) + + assert len(filtered) == 2 + assert sorted(filtered) == [2, 4] + observers.assert_not_called() + + +def test_append_duplicate_matching_item(): + source = ObservableList([2, 4, 6]) + filtered = FilteredObservableBag(source, lambda x: x % 2 == 0) + observers = ValueCollectionObservers(filtered) + + source.append(4) + + assert len(filtered) == 4 + assert sorted(filtered) == [2, 4, 4, 6] + observers.assert_added_calls(4) + observers.assert_single_action(SimpleAddOneAction(4)) + + +def test_remove_matching_item(): + source = ObservableList([1, 2, 3, 4]) + filtered = FilteredObservableBag(source, lambda x: x % 2 == 0) + observers = ValueCollectionObservers(filtered) + + source.remove(2) + + assert len(filtered) == 1 + assert list(filtered) == [4] + observers.assert_removed_calls(2) + observers.assert_single_action(SimpleRemoveOneAction(2)) + + +def test_remove_non_matching_item(): + source = ObservableList([1, 2, 3, 4]) + filtered = FilteredObservableBag(source, lambda x: x % 2 == 0) + observers = ValueCollectionObservers(filtered) + + source.remove(1) + + assert len(filtered) == 2 + assert sorted(filtered) == [2, 4] + observers.assert_not_called() + + +def test_remove_duplicate_matching_item(): + source = ObservableList([2, 2, 4]) + filtered = FilteredObservableBag(source, lambda x: x % 2 == 0) + observers = ValueCollectionObservers(filtered) + + assert len(filtered) == 3 + + source.remove(2) + assert len(filtered) == 2 + assert 2 in filtered + + source.remove(2) + assert len(filtered) == 1 + assert 2 not in filtered + + observers.assert_removed_calls(2, 2) + observers.assert_actions(SimpleRemoveOneAction(2), SimpleRemoveOneAction(2)) + + +def test_clear_source(): + source = ObservableList([1, 2, 3, 4]) + filtered = FilteredObservableBag(source, lambda x: x % 2 == 0) + observers = ValueCollectionObservers(filtered) + + source.clear() + + assert len(filtered) == 0 + assert list(filtered) == [] + observers.assert_single_action(clear_action()) + + +def test_clear_empty_source(): + source = ObservableList([1, 3, 5]) + filtered = FilteredObservableBag(source, lambda x: x % 2 == 0) + observers = ValueCollectionObservers(filtered) + + source.clear() + + assert len(filtered) == 0 + observers.assert_not_called() + + +def test_clear_filtered_empty(): + source = ObservableList() + filtered = FilteredObservableBag(source, lambda x: x % 2 == 0) + observers = ValueCollectionObservers(filtered) + + source.clear() + + assert len(filtered) == 0 + observers.assert_not_called() + + +def test_length_value_updates(): + source = ObservableList([1, 2, 3]) + filtered = FilteredObservableBag(source, lambda x: x % 2 == 0) + + length_observer = OneParameterObserver() + filtered.length_value.observe(length_observer) + + assert filtered.length_value.value == 1 + + source.append(4) + assert filtered.length_value.value == 2 + + source.append(5) + assert filtered.length_value.value == 2 + + source.remove(2) + assert filtered.length_value.value == 1 + + assert length_observer.calls == [2, 1] + + +def test_multiple_operations(): + source = ObservableList([1, 2, 3, 4, 5]) + filtered = FilteredObservableBag(source, lambda x: x % 2 == 0) + observers = ValueCollectionObservers(filtered) + + assert sorted(filtered) == [2, 4] + + source.append(6) + assert sorted(filtered) == [2, 4, 6] + + source.append(7) + assert sorted(filtered) == [2, 4, 6] + + source.remove(2) + assert sorted(filtered) == [4, 6] + + source.clear() + assert list(filtered) == [] + + observers.assert_calls((6, True), (2, False)) + observers.assert_actions( + SimpleAddOneAction(6), + SimpleRemoveOneAction(2), + clear_action() + ) + + +def test_is_unobserved_initially(): + source = ObservableList([1, 2, 3]) + filtered = FilteredObservableBag(source, lambda x: x % 2 == 0) + + assert not filtered.on_change.is_observed() + assert not filtered.delta_observable.is_observed() + + +def test_multiple_adds_and_removes_with_duplicates(): + source = ObservableList([2, 3, 4]) + filtered = FilteredObservableBag(source, lambda x: x % 2 == 0) + observers = ValueCollectionObservers(filtered) + + assert sorted(filtered) == [2, 4] + assert len(filtered) == 2 + + source.append(2) + assert len(filtered) == 3 + assert sorted(filtered) == [2, 2, 4] + + source.append(6) + assert len(filtered) == 4 + assert sorted(filtered) == [2, 2, 4, 6] + + source.remove(2) + assert len(filtered) == 3 + assert 2 in filtered + + source.remove(2) + assert len(filtered) == 2 + assert 2 not in filtered + assert sorted(filtered) == [4, 6] + + observers.assert_calls((2, True), (6, True), (2, False), (2, False)) + observers.assert_actions( + SimpleAddOneAction(2), + SimpleAddOneAction(6), + SimpleRemoveOneAction(2), + SimpleRemoveOneAction(2) + ) + + +def test_setitem_matching_to_matching(): + source = ObservableList([1, 2, 3, 4]) + filtered = FilteredObservableBag(source, lambda x: x % 2 == 0) + observers = ValueCollectionObservers(filtered) + + source[1] = 6 + + assert len(filtered) == 2 + assert sorted(filtered) == [4, 6] + observers.assert_calls((2, False), (6, True)) + observers.assert_single_action(SimpleOneElementChangedAction(old_item=2, new_item=6)) + + +def test_setitem_matching_to_non_matching(): + source = ObservableList([1, 2, 3, 4]) + filtered = FilteredObservableBag(source, lambda x: x % 2 == 0) + observers = ValueCollectionObservers(filtered) + + source[1] = 5 + + assert len(filtered) == 1 + assert sorted(filtered) == [4] + observers.assert_removed_calls(2) + observers.assert_actions(SimpleRemoveOneAction(2)) + + +def test_setitem_non_matching_to_matching(): + source = ObservableList([1, 2, 3, 4]) + filtered = FilteredObservableBag(source, lambda x: x % 2 == 0) + observers = ValueCollectionObservers(filtered) + + source[0] = 6 + + assert len(filtered) == 3 + assert sorted(filtered) == [2, 4, 6] + observers.assert_added_calls(6) + observers.assert_actions(SimpleAddOneAction(6)) + + +def test_setitem_non_matching_to_non_matching(): + source = ObservableList([1, 2, 3, 4]) + filtered = FilteredObservableBag(source, lambda x: x % 2 == 0) + observers = ValueCollectionObservers(filtered) + + source[0] = 5 + + assert len(filtered) == 2 + assert sorted(filtered) == [2, 4] + observers.assert_not_called() + + +def test_setitem_to_duplicate(): + source = ObservableList([1, 2, 3, 4, 6]) + filtered = FilteredObservableBag(source, lambda x: x % 2 == 0) + observers = ValueCollectionObservers(filtered) + + source[1] = 4 + + assert len(filtered) == 3 + assert sorted(filtered) == [4, 4, 6] + observers.assert_calls((2, False), (4, True)) + observers.assert_single_action(SimpleOneElementChangedAction(old_item=2, new_item=4)) + + +def test_extend_with_mixed_items(): + source = ObservableList([2, 3]) + filtered = FilteredObservableBag(source, lambda x: x % 2 == 0) + observers = ValueCollectionObservers(filtered) + + source.extend([4, 5, 6, 7]) + + assert len(filtered) == 3 + assert sorted(filtered) == [2, 4, 6] + observers.assert_added_calls(4, 6) + assert len(observers.on_change_observer.calls) == 1 + + +def test_extend_with_only_non_matching(): + source = ObservableList([2, 4]) + filtered = FilteredObservableBag(source, lambda x: x % 2 == 0) + observers = ValueCollectionObservers(filtered) + + source.extend([1, 3, 5]) + + assert len(filtered) == 2 + assert sorted(filtered) == [2, 4] + observers.assert_not_called() + + +def test_extend_with_duplicates(): + source = ObservableList([2, 4]) + filtered = FilteredObservableBag(source, lambda x: x % 2 == 0) + observers = ValueCollectionObservers(filtered) + + source.extend([2, 6, 2, 8]) + + assert len(filtered) == 6 + assert sorted(filtered) == [2, 2, 2, 4, 6, 8] + observers.assert_added_calls(2, 6, 2, 8) + assert len(observers.on_change_observer.calls) == 1 + + +def test_insert_matching_item(): + source = ObservableList([1, 3, 5]) + filtered = FilteredObservableBag(source, lambda x: x % 2 == 0) + observers = ValueCollectionObservers(filtered) + + source.insert(1, 4) + + assert len(filtered) == 1 + assert list(filtered) == [4] + observers.assert_added_calls(4) + observers.assert_single_action(SimpleAddOneAction(4)) + + +def test_insert_non_matching_item(): + source = ObservableList([2, 4, 6]) + filtered = FilteredObservableBag(source, lambda x: x % 2 == 0) + observers = ValueCollectionObservers(filtered) + + source.insert(1, 3) + + assert len(filtered) == 3 + assert sorted(filtered) == [2, 4, 6] + observers.assert_not_called() + + +def test_insert_duplicate_matching_item(): + source = ObservableList([2, 4, 6]) + filtered = FilteredObservableBag(source, lambda x: x % 2 == 0) + observers = ValueCollectionObservers(filtered) + + source.insert(1, 2) + + assert len(filtered) == 4 + assert sorted(filtered) == [2, 2, 4, 6] + observers.assert_added_calls(2) + observers.assert_single_action(SimpleAddOneAction(2)) + + +def test_del_by_index_matching(): + source = ObservableList([1, 2, 3, 4]) + filtered = FilteredObservableBag(source, lambda x: x % 2 == 0) + observers = ValueCollectionObservers(filtered) + + del source[1] + + assert len(filtered) == 1 + assert list(filtered) == [4] + observers.assert_removed_calls(2) + observers.assert_single_action(SimpleRemoveOneAction(2)) + + +def test_del_by_index_non_matching(): + source = ObservableList([1, 2, 3, 4]) + filtered = FilteredObservableBag(source, lambda x: x % 2 == 0) + observers = ValueCollectionObservers(filtered) + + del source[0] + + assert len(filtered) == 2 + assert sorted(filtered) == [2, 4] + observers.assert_not_called() + + +def test_del_by_index_with_duplicates(): + source = ObservableList([2, 4, 2, 6]) + filtered = FilteredObservableBag(source, lambda x: x % 2 == 0) + observers = ValueCollectionObservers(filtered) + + del source[0] + + assert len(filtered) == 3 + assert sorted(filtered) == [2, 4, 6] + observers.assert_removed_calls(2) + observers.assert_single_action(SimpleRemoveOneAction(2)) + + +def test_del_by_slice(): + source = ObservableList([1, 2, 3, 4, 5, 6]) + filtered = FilteredObservableBag(source, lambda x: x % 2 == 0) + observers = ValueCollectionObservers(filtered) + + del source[1:4] + + assert len(filtered) == 1 + assert list(filtered) == [6] + observers.assert_removed_calls(2, 4) + assert len(observers.on_change_observer.calls) == 1 + + +def test_del_by_slice_with_duplicates(): + source = ObservableList([2, 4, 2, 6, 2, 8]) + filtered = FilteredObservableBag(source, lambda x: x % 2 == 0) + observers = ValueCollectionObservers(filtered) + + del source[1:4] + + assert len(filtered) == 3 + assert sorted(filtered) == [2, 2, 8] + observers.assert_removed_calls(4, 2, 6) + assert len(observers.on_change_observer.calls) == 1 + + +def test_pop_matching(): + source = ObservableList([1, 2, 3, 4]) + filtered = FilteredObservableBag(source, lambda x: x % 2 == 0) + observers = ValueCollectionObservers(filtered) + + popped = source.pop(3) + + assert popped == 4 + assert len(filtered) == 1 + assert sorted(filtered) == [2] + observers.assert_removed_calls(4) + observers.assert_single_action(SimpleRemoveOneAction(4)) + + +def test_pop_non_matching(): + source = ObservableList([1, 2, 3, 4]) + filtered = FilteredObservableBag(source, lambda x: x % 2 == 0) + observers = ValueCollectionObservers(filtered) + + popped = source.pop(0) + + assert popped == 1 + assert len(filtered) == 2 + assert sorted(filtered) == [2, 4] + observers.assert_not_called() + + +def test_pop_default(): + source = ObservableList([1, 2, 3, 4]) + filtered = FilteredObservableBag(source, lambda x: x % 2 == 0) + observers = ValueCollectionObservers(filtered) + + popped = source.pop() + + assert popped == 4 + assert len(filtered) == 1 + assert sorted(filtered) == [2] + observers.assert_removed_calls(4) + observers.assert_single_action(SimpleRemoveOneAction(4)) + + +def test_pop_with_duplicates(): + source = ObservableList([2, 4, 2, 6]) + filtered = FilteredObservableBag(source, lambda x: x % 2 == 0) + observers = ValueCollectionObservers(filtered) + + popped = source.pop(2) + + assert popped == 2 + assert len(filtered) == 3 + assert sorted(filtered) == [2, 4, 6] + observers.assert_removed_calls(2) + observers.assert_single_action(SimpleRemoveOneAction(2)) + + +def test_slice_assignment_mixed(): + source = ObservableList([1, 2, 3, 4, 5]) + filtered = FilteredObservableBag(source, lambda x: x % 2 == 0) + observers = ValueCollectionObservers(filtered) + + source[1:4] = [6, 7, 8] + + assert len(filtered) == 2 + assert sorted(filtered) == [6, 8] + observers.assert_calls((2, False), (6, True), (4, False), (8, True)) + assert len(observers.on_change_observer.calls) == 1 + + +def test_slice_assignment_with_duplicates(): + source = ObservableList([1, 2, 3, 4, 5]) + filtered = FilteredObservableBag(source, lambda x: x % 2 == 0) + observers = ValueCollectionObservers(filtered) + + source[1:4] = [6, 6, 8] + + assert len(filtered) == 3 + assert sorted(filtered) == [6, 6, 8] + observers.assert_calls((2, False), (6, True), (6, True), (4, False), (8, True)) + assert len(observers.on_change_observer.calls) == 1 + + +def test_reverse(): + source = ObservableList([1, 2, 3, 4]) + filtered = FilteredObservableBag(source, lambda x: x % 2 == 0) + observers = ValueCollectionObservers(filtered) + + source.reverse() + + assert len(filtered) == 2 + assert sorted(filtered) == [2, 4] + observers.assert_not_called() diff --git a/tests/test_collections/test_observable_lists/test_observable_list_set_item.py b/tests/test_collections/test_observable_lists/test_observable_list_set_item.py index b292593..c5c9f94 100644 --- a/tests/test_collections/test_observable_lists/test_observable_list_set_item.py +++ b/tests/test_collections/test_observable_lists/test_observable_list_set_item.py @@ -2,8 +2,8 @@ from conftest import ValueSequenceObservers, values_factories from spellbind.actions import SimpleSliceSetAction, SimpleSetAtIndicesAction, SimpleSetAtIndexAction -from spellbind.observable_sequences import ObservableList from spellbind.int_collections import ObservableIntList, IntValueList +from spellbind.observable_sequences import ObservableList @pytest.mark.parametrize("constructor", [ObservableList, ObservableIntList, IntValueList]) @@ -31,6 +31,7 @@ def test_observable_list_set_item_notifies(constructor): assert observable_list == [1, 4, 3] assert observable_list.length_value.value == 3 observers.assert_calls((1, 2, False), (1, 4, True)) + observers.assert_single_action(SimpleSetAtIndexAction(index=1, old_item=2, new_item=4)) @pytest.mark.parametrize("constructor", [ObservableList, ObservableIntList, IntValueList])