Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions splitio/client/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -1180,7 +1180,7 @@ def _build_localhost_factory(cfg):
feature_flag_sync_task,
segment_sync_task,
None, None, None,
internal_events_task
internal_events_task=internal_events_task
)

sdk_metadata = util.get_metadata(cfg)
Expand Down Expand Up @@ -1262,7 +1262,7 @@ async def _build_localhost_factory_async(cfg):
feature_flag_sync_task,
segment_sync_task,
None, None, None,
internal_events_task
internal_events_task=internal_events_task
)

sdk_metadata = util.get_metadata(cfg)
Expand Down
12 changes: 8 additions & 4 deletions splitio/sync/synchronizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -955,12 +955,13 @@ def sync_all(self, till=None):

def stop_periodic_fetching(self):
"""Stop fetchers for feature flags and segments."""
_LOGGER.debug('Stopping periodic fetching')
if self._split_tasks.split_task is not None:
_LOGGER.debug('Stopping periodic fetching')
self._split_tasks.split_task.stop()
if self._split_tasks.segment_task is not None:
self._split_tasks.segment_task.stop()
if self._split_tasks.internal_events_task:
_LOGGER.debug('Stopping internal events notification')
self._split_tasks.internal_events_task.stop()

def synchronize_splits(self):
Expand Down Expand Up @@ -1031,12 +1032,15 @@ async def sync_all(self, till=None):

async def stop_periodic_fetching(self):
"""Stop fetchers for feature flags and segments."""
_LOGGER.debug('Stopping periodic fetching')
if self._split_tasks.split_task is not None:
_LOGGER.debug('Stopping periodic fetching')
await self._split_tasks.split_task.stop()
if self._split_tasks.segment_task is not None:
await self._split_tasks.segment_task.stop()

await self._split_tasks.segment_task.stop()
if self._split_tasks.internal_events_task is not None:
_LOGGER.debug('Stopping internal events notification')
await self._split_tasks.internal_events_task.stop()

async def synchronize_splits(self):
"""Synchronize all feature flags."""
try:
Expand Down
15 changes: 15 additions & 0 deletions tests/integration/test_streaming_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -1367,6 +1367,9 @@ def test_change_number(mocker):
class StreamingIntegrationAsyncTests(object):
"""Test streaming operation and failover."""

update_flag = False
metadata = []

@pytest.mark.asyncio
async def test_happiness(self):
"""Test initialization & splits/segment updates."""
Expand Down Expand Up @@ -1421,6 +1424,7 @@ async def test_happiness(self):

factory = await get_factory_async('some_apikey', **kwargs)
await factory.block_until_ready(1)
await factory.client().on(SdkEvent.SDK_UPDATE, self._update_callcack)
assert factory.ready
assert await factory.client().get_treatment('maldo', 'split1') == 'on'

Expand All @@ -1437,6 +1441,13 @@ async def test_happiness(self):
'rbs': {'t': -1, 's': -1, 'd': []}}
sse_server.publish(make_split_change_event(2))
await asyncio.sleep(1)
flag = False
for meta in self.metadata:
if 'split1' in meta.get_names():
assert meta.get_type() == SdkEventType.FLAG_UPDATE
flag = True
assert flag

assert await factory.client().get_treatment('maldo', 'split1') == 'off'

split_changes[2] = {'ff': {
Expand Down Expand Up @@ -1556,6 +1567,10 @@ async def test_happiness(self):
sse_server.stop()
split_backend.stop()

async def _update_callcack(self, metadata):
self.update_flag = True
self.metadata.append(metadata)

@pytest.mark.asyncio
async def test_occupancy_flicker(self):
"""Test that changes in occupancy switch between polling & streaming properly."""
Expand Down
1 change: 0 additions & 1 deletion tests/sync/test_synchronizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,6 @@ def intersect(sets):
mocker.Mock(), mocker.Mock())

synchronizer = Synchronizer(split_synchronizers, mocker.Mock(spec=SplitTasks))
# pytest.set_trace()
self.clear = False
def clear():
self.clear = True
Expand Down