From 4327a301e2fc46cddf9b7440394405c801b3e10e Mon Sep 17 00:00:00 2001 From: Bilal Al-Shahwany Date: Wed, 21 Jan 2026 19:32:33 -0800 Subject: [PATCH 1/2] updated localhost classes and tests --- splitio/client/factory.py | 22 ++++++++++++++-------- splitio/sync/synchronizer.py | 12 ++++++++---- tests/integration/test_streaming_e2e.py | 15 +++++++++++++++ tests/sync/test_synchronizer.py | 1 - 4 files changed, 37 insertions(+), 13 deletions(-) diff --git a/splitio/client/factory.py b/splitio/client/factory.py index 670cf6c3..6157d0bd 100644 --- a/splitio/client/factory.py +++ b/splitio/client/factory.py @@ -1145,11 +1145,11 @@ def _build_localhost_factory(cfg): telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() telemetry_evaluation_producer = telemetry_producer.get_telemetry_evaluation_producer() - events_queue = queue.Queue() + internal_events_queue = queue.Queue() storages = { - 'splits': InMemorySplitStorage(events_queue, cfg['flagSetsFilter'] if cfg['flagSetsFilter'] is not None else []), - 'segments': InMemorySegmentStorage(events_queue), # not used, just to avoid possible future errors. - 'rule_based_segments': InMemoryRuleBasedSegmentStorage(events_queue), + 'splits': InMemorySplitStorage(internal_events_queue, cfg['flagSetsFilter'] if cfg['flagSetsFilter'] is not None else []), + 'segments': InMemorySegmentStorage(internal_events_queue), # not used, just to avoid possible future errors. + 'rule_based_segments': InMemoryRuleBasedSegmentStorage(internal_events_queue), 'impressions': LocalhostImpressionsStorage(), 'events': LocalhostEventsStorage(), } @@ -1162,6 +1162,8 @@ def _build_localhost_factory(cfg): LocalSegmentSynchronizer(cfg['segmentDirectory'], storages['splits'], storages['segments']), None, None, None, ) + events_manager = EventsManager(EventsManagerConfig(), EventsDelivery()) + internal_events_task = EventsTask(events_manager.notify_internal_event, internal_events_queue) feature_flag_sync_task = None segment_sync_task = None @@ -1178,6 +1180,7 @@ def _build_localhost_factory(cfg): feature_flag_sync_task, segment_sync_task, None, None, None, + internal_events_task=internal_events_task ) sdk_metadata = util.get_metadata(cfg) @@ -1199,8 +1202,7 @@ def _build_localhost_factory(cfg): telemetry_evaluation_producer, telemetry_runtime_producer ) - internal_events_queue = queue.Queue() - events_manager = EventsManager(EventsManagerConfig(), EventsDelivery()) + internal_events_task.start() return SplitFactory( 'localhost', @@ -1226,6 +1228,8 @@ async def _build_localhost_factory_async(cfg): internal_events_queue = asyncio.Queue() events_manager = EventsManagerAsync(EventsManagerConfig(), EventsDelivery()) + internal_events_task = EventsTaskAsync(events_manager.notify_internal_event, internal_events_queue) + storages = { 'splits': InMemorySplitStorageAsync(internal_events_queue), 'segments': InMemorySegmentStorageAsync(internal_events_queue), # not used, just to avoid possible future errors. @@ -1258,6 +1262,7 @@ async def _build_localhost_factory_async(cfg): feature_flag_sync_task, segment_sync_task, None, None, None, + internal_events_task=internal_events_task ) sdk_metadata = util.get_metadata(cfg) @@ -1277,8 +1282,9 @@ async def _build_localhost_factory_async(cfg): storages['impressions'], telemetry_evaluation_producer, telemetry_runtime_producer - ) - + ) + internal_events_task.start() + return SplitFactoryAsync( 'localhost', storages, diff --git a/splitio/sync/synchronizer.py b/splitio/sync/synchronizer.py index 6bbb7fa6..a6ca6214 100644 --- a/splitio/sync/synchronizer.py +++ b/splitio/sync/synchronizer.py @@ -955,12 +955,13 @@ def sync_all(self, till=None): def stop_periodic_fetching(self): """Stop fetchers for feature flags and segments.""" + _LOGGER.debug('Stopping periodic fetching') if self._split_tasks.split_task is not None: - _LOGGER.debug('Stopping periodic fetching') self._split_tasks.split_task.stop() if self._split_tasks.segment_task is not None: self._split_tasks.segment_task.stop() if self._split_tasks.internal_events_task: + _LOGGER.debug('Stopping internal events notification') self._split_tasks.internal_events_task.stop() def synchronize_splits(self): @@ -1031,12 +1032,15 @@ async def sync_all(self, till=None): async def stop_periodic_fetching(self): """Stop fetchers for feature flags and segments.""" + _LOGGER.debug('Stopping periodic fetching') if self._split_tasks.split_task is not None: - _LOGGER.debug('Stopping periodic fetching') await self._split_tasks.split_task.stop() if self._split_tasks.segment_task is not None: - await self._split_tasks.segment_task.stop() - + await self._split_tasks.segment_task.stop() + if self._split_tasks.internal_events_task is not None: + _LOGGER.debug('Stopping internal events notification') + await self._split_tasks.internal_events_task.stop() + async def synchronize_splits(self): """Synchronize all feature flags.""" try: diff --git a/tests/integration/test_streaming_e2e.py b/tests/integration/test_streaming_e2e.py index a673c65c..48dc2093 100644 --- a/tests/integration/test_streaming_e2e.py +++ b/tests/integration/test_streaming_e2e.py @@ -1367,6 +1367,9 @@ def test_change_number(mocker): class StreamingIntegrationAsyncTests(object): """Test streaming operation and failover.""" + update_flag = False + metadata = [] + @pytest.mark.asyncio async def test_happiness(self): """Test initialization & splits/segment updates.""" @@ -1421,6 +1424,7 @@ async def test_happiness(self): factory = await get_factory_async('some_apikey', **kwargs) await factory.block_until_ready(1) + await factory.client().on(SdkEvent.SDK_UPDATE, self._update_callcack) assert factory.ready assert await factory.client().get_treatment('maldo', 'split1') == 'on' @@ -1437,6 +1441,13 @@ async def test_happiness(self): 'rbs': {'t': -1, 's': -1, 'd': []}} sse_server.publish(make_split_change_event(2)) await asyncio.sleep(1) + flag = False + for meta in self.metadata: + if 'split1' in meta.get_names(): + assert meta.get_type() == SdkEventType.FLAG_UPDATE + flag = True + assert flag + assert await factory.client().get_treatment('maldo', 'split1') == 'off' split_changes[2] = {'ff': { @@ -1556,6 +1567,10 @@ async def test_happiness(self): sse_server.stop() split_backend.stop() + async def _update_callcack(self, metadata): + self.update_flag = True + self.metadata.append(metadata) + @pytest.mark.asyncio async def test_occupancy_flicker(self): """Test that changes in occupancy switch between polling & streaming properly.""" diff --git a/tests/sync/test_synchronizer.py b/tests/sync/test_synchronizer.py index 179d7978..1244429b 100644 --- a/tests/sync/test_synchronizer.py +++ b/tests/sync/test_synchronizer.py @@ -210,7 +210,6 @@ def intersect(sets): mocker.Mock(), mocker.Mock()) synchronizer = Synchronizer(split_synchronizers, mocker.Mock(spec=SplitTasks)) -# pytest.set_trace() self.clear = False def clear(): self.clear = True From 11d56fddf3333599a2a00bd8655f3c47943c7d9c Mon Sep 17 00:00:00 2001 From: Bilal Al-Shahwany Date: Thu, 22 Jan 2026 10:10:19 -0800 Subject: [PATCH 2/2] fixed typo for segment update type --- splitio/events/events_metadata.py | 2 +- splitio/storage/inmemmory.py | 12 ++++++------ tests/integration/test_streaming_e2e.py | 2 +- tests/storage/test_inmemory_storage.py | 16 ++++++++-------- 4 files changed, 16 insertions(+), 16 deletions(-) diff --git a/splitio/events/events_metadata.py b/splitio/events/events_metadata.py index 5d6f4961..0707a8f5 100644 --- a/splitio/events/events_metadata.py +++ b/splitio/events/events_metadata.py @@ -5,7 +5,7 @@ class SdkEventType(Enum): """Public event types""" FLAG_UPDATE = 'FLAG_UPDATE' - SEGMENT_UPDATE = 'SEGMENT_UPDATE' + SEGMENTS_UPDATE = 'SEGMENTS_UPDATE' class EventsMetadata(object): """Events Metadata class.""" diff --git a/splitio/storage/inmemmory.py b/splitio/storage/inmemmory.py index bbde8816..db71f7fd 100644 --- a/splitio/storage/inmemmory.py +++ b/splitio/storage/inmemmory.py @@ -158,7 +158,7 @@ def update(self, to_add, to_delete, new_change_number): self._internal_event_queue.put( SdkInternalEventNotification( SdkInternalEvent.RB_SEGMENTS_UPDATED, - EventsMetadata(SdkEventType.SEGMENT_UPDATE, {}))) + EventsMetadata(SdkEventType.SEGMENTS_UPDATE, {}))) def _put(self, rule_based_segment): """ @@ -290,7 +290,7 @@ async def update(self, to_add, to_delete, new_change_number): await self._internal_event_queue.put( SdkInternalEventNotification( SdkInternalEvent.RB_SEGMENTS_UPDATED, - EventsMetadata(SdkEventType.SEGMENT_UPDATE, {}))) + EventsMetadata(SdkEventType.SEGMENTS_UPDATE, {}))) async def _put(self, rule_based_segment): """ @@ -999,7 +999,7 @@ def put(self, segment): self._internal_event_queue.put( SdkInternalEventNotification( SdkInternalEvent.SEGMENTS_UPDATED, - EventsMetadata(SdkEventType.SEGMENT_UPDATE, {}))) + EventsMetadata(SdkEventType.SEGMENTS_UPDATE, {}))) def update(self, segment_name, to_add, to_remove, change_number=None): """ @@ -1025,7 +1025,7 @@ def update(self, segment_name, to_add, to_remove, change_number=None): self._internal_event_queue.put( SdkInternalEventNotification( SdkInternalEvent.SEGMENTS_UPDATED, - EventsMetadata(SdkEventType.SEGMENT_UPDATE, {}))) + EventsMetadata(SdkEventType.SEGMENTS_UPDATE, {}))) def get_change_number(self, segment_name): """ @@ -1140,7 +1140,7 @@ async def put(self, segment): await self._internal_event_queue.put( SdkInternalEventNotification( SdkInternalEvent.SEGMENTS_UPDATED, - EventsMetadata(SdkEventType.SEGMENT_UPDATE, {}))) + EventsMetadata(SdkEventType.SEGMENTS_UPDATE, {}))) async def update(self, segment_name, to_add, to_remove, change_number=None): @@ -1166,7 +1166,7 @@ async def update(self, segment_name, to_add, to_remove, change_number=None): await self._internal_event_queue.put( SdkInternalEventNotification( SdkInternalEvent.SEGMENTS_UPDATED, - EventsMetadata(SdkEventType.SEGMENT_UPDATE, {}))) + EventsMetadata(SdkEventType.SEGMENTS_UPDATE, {}))) async def get_change_number(self, segment_name): diff --git a/tests/integration/test_streaming_e2e.py b/tests/integration/test_streaming_e2e.py index 48dc2093..d7b3103a 100644 --- a/tests/integration/test_streaming_e2e.py +++ b/tests/integration/test_streaming_e2e.py @@ -128,7 +128,7 @@ def test_happiness(self): sse_server.publish(make_segment_change_event('segment1', 1)) time.sleep(1) assert self.update_flag - assert self.metadata[len(self.metadata)-1].get_type() == SdkEventType.SEGMENT_UPDATE + assert self.metadata[len(self.metadata)-1].get_type() == SdkEventType.SEGMENTS_UPDATE flag = False for meta in self.metadata: if 'split2' in meta.get_names(): diff --git a/tests/storage/test_inmemory_storage.py b/tests/storage/test_inmemory_storage.py index 0f830239..d46980aa 100644 --- a/tests/storage/test_inmemory_storage.py +++ b/tests/storage/test_inmemory_storage.py @@ -811,13 +811,13 @@ def test_internal_event_notification(self): storage.put(segment) event = events_queue.get() assert event.internal_event == SdkInternalEvent.SEGMENTS_UPDATED - assert event.metadata.get_type() == SdkEventType.SEGMENT_UPDATE + assert event.metadata.get_type() == SdkEventType.SEGMENTS_UPDATE assert len(event.metadata.get_names()) == 0 storage.update('some_segment', ['key4', 'key5'], ['key2', 'key3'], 456) event = events_queue.get() assert event.internal_event == SdkInternalEvent.SEGMENTS_UPDATED - assert event.metadata.get_type() == SdkEventType.SEGMENT_UPDATE + assert event.metadata.get_type() == SdkEventType.SEGMENTS_UPDATE assert len(event.metadata.get_names()) == 0 class InMemorySegmentStorageAsyncTests(object): @@ -893,13 +893,13 @@ async def test_internal_event_notification(self): await storage.put(segment) event = await events_queue.get() assert event.internal_event == SdkInternalEvent.SEGMENTS_UPDATED - assert event.metadata.get_type() == SdkEventType.SEGMENT_UPDATE + assert event.metadata.get_type() == SdkEventType.SEGMENTS_UPDATE assert len(event.metadata.get_names()) == 0 await storage.update('some_segment', ['key4', 'key5'], ['key2', 'key3'], 456) event = await events_queue.get() assert event.internal_event == SdkInternalEvent.SEGMENTS_UPDATED - assert event.metadata.get_type() == SdkEventType.SEGMENT_UPDATE + assert event.metadata.get_type() == SdkEventType.SEGMENTS_UPDATE assert len(event.metadata.get_names()) == 0 class InMemoryImpressionsStorageTests(object): @@ -2006,12 +2006,12 @@ def test_internal_event_notification(self, mocker): rbs_storage.update([segment1, segment2], [], -1) event = events_queue.get() assert event.internal_event == SdkInternalEvent.RB_SEGMENTS_UPDATED - assert event.metadata.get_type() == SdkEventType.SEGMENT_UPDATE + assert event.metadata.get_type() == SdkEventType.SEGMENTS_UPDATE assert len(event.metadata.get_names()) == 0 rbs_storage.update([], ['some_segment'], -1) assert event.internal_event == SdkInternalEvent.RB_SEGMENTS_UPDATED - assert event.metadata.get_type() == SdkEventType.SEGMENT_UPDATE + assert event.metadata.get_type() == SdkEventType.SEGMENTS_UPDATE assert len(event.metadata.get_names()) == 0 class InMemoryRuleBasedSegmentStorageAsyncTests(object): @@ -2093,12 +2093,12 @@ async def test_internal_event_notification(self, mocker): await rbs_storage.update([segment1, segment2], [], -1) event = await events_queue.get() assert event.internal_event == SdkInternalEvent.RB_SEGMENTS_UPDATED - assert event.metadata.get_type() == SdkEventType.SEGMENT_UPDATE + assert event.metadata.get_type() == SdkEventType.SEGMENTS_UPDATE assert len(event.metadata.get_names()) == 0 await rbs_storage.update([], ['some_segment'], -1) event = await events_queue.get() assert event.internal_event == SdkInternalEvent.RB_SEGMENTS_UPDATED - assert event.metadata.get_type() == SdkEventType.SEGMENT_UPDATE + assert event.metadata.get_type() == SdkEventType.SEGMENTS_UPDATE assert len(event.metadata.get_names()) == 0