From 1dabc8bf2c712ecf07fba78eabdb00435d541ded Mon Sep 17 00:00:00 2001 From: user1303836 Date: Sat, 7 Feb 2026 13:41:09 -0500 Subject: [PATCH] Fix initial content flood on new source subscription On first poll, store all fetched items in the database but immediately mark all except the most recent as backfilled. This prevents old items from being re-discovered and posted on subsequent poll cycles. Previously, first poll stored only 1 item and broke out of the loop. On the second poll, the remaining items were treated as new content and flooded the Discord channel. --- src/intelstream/services/pipeline.py | 14 ++++-- tests/test_services/test_pipeline.py | 66 +++++++++++++++++++++++++++- 2 files changed, 75 insertions(+), 5 deletions(-) diff --git a/src/intelstream/services/pipeline.py b/src/intelstream/services/pipeline.py index 32abfee..4180208 100644 --- a/src/intelstream/services/pipeline.py +++ b/src/intelstream/services/pipeline.py @@ -242,12 +242,20 @@ async def _fetch_source(self, source: Source) -> int: logger.debug("Content item already exists", external_id=item.external_id) continue - if is_first_poll: + if is_first_poll and new_count > 0: + most_recent = await self._repository.get_most_recent_item_for_source(source.id) + if most_recent: + backfilled = await self._repository.mark_items_as_backfilled( + source_id=source.id, + exclude_item_id=most_recent.id, + ) + if backfilled > 0: logger.info( - "First poll for source, limiting to most recent item", + "First poll: backfilled pre-existing items", source_name=source.name, + backfilled_count=backfilled, + most_recent_title=most_recent.title, ) - break await self._repository.update_source_last_polled(source.id) diff --git a/tests/test_services/test_pipeline.py b/tests/test_services/test_pipeline.py index 4e90f32..c4dee40 100644 --- a/tests/test_services/test_pipeline.py +++ b/tests/test_services/test_pipeline.py @@ -176,6 +176,10 @@ async def test_fetch_all_sources_success( mock_repository.get_all_sources.return_value = [sample_source] mock_repository.content_item_exists.return_value = False + mock_repository.get_most_recent_item_for_source.return_value = MagicMock( + spec=ContentItem, id="item-1", title="Test" + ) + mock_repository.mark_items_as_backfilled.return_value = 0 with patch.object( pipeline._adapters[SourceType.SUBSTACK], @@ -253,17 +257,21 @@ async def test_fetch_all_sources_no_adapter_for_type( await pipeline.close() - async def test_first_poll_limits_to_one_item( + async def test_first_poll_stores_all_items_and_backfills( self, pipeline: ContentPipeline, mock_repository: AsyncMock, sample_source, ): - """On first poll (last_polled_at is None), only store the most recent item.""" + """On first poll, store all items but backfill all except the most recent.""" await pipeline.initialize() sample_source.last_polled_at = None + most_recent = MagicMock(spec=ContentItem) + most_recent.id = "most-recent-id" + most_recent.title = "Article 4" + items = [ ContentData( external_id=f"article-{i}", @@ -279,6 +287,8 @@ async def test_first_poll_limits_to_one_item( mock_repository.get_all_sources.return_value = [sample_source] mock_repository.content_item_exists.return_value = False + mock_repository.get_most_recent_item_for_source.return_value = most_recent + mock_repository.mark_items_as_backfilled.return_value = 4 with patch.object( pipeline._adapters[SourceType.SUBSTACK], @@ -288,8 +298,48 @@ async def test_first_poll_limits_to_one_item( ): result = await pipeline.fetch_all_sources() + assert result == 5 + assert mock_repository.add_content_item.call_count == 5 + mock_repository.get_most_recent_item_for_source.assert_called_once_with(sample_source.id) + mock_repository.mark_items_as_backfilled.assert_called_once_with( + source_id=sample_source.id, + exclude_item_id=most_recent.id, + ) + + await pipeline.close() + + async def test_first_poll_no_backfill_when_single_item( + self, + pipeline: ContentPipeline, + mock_repository: AsyncMock, + sample_source, + sample_content_data, + ): + """On first poll with a single item, backfill marks 0 items.""" + await pipeline.initialize() + + sample_source.last_polled_at = None + + most_recent = MagicMock(spec=ContentItem) + most_recent.id = "item-id" + most_recent.title = "Test Article" + + mock_repository.get_all_sources.return_value = [sample_source] + mock_repository.content_item_exists.return_value = False + mock_repository.get_most_recent_item_for_source.return_value = most_recent + mock_repository.mark_items_as_backfilled.return_value = 0 + + with patch.object( + pipeline._adapters[SourceType.SUBSTACK], + "fetch_latest", + new_callable=AsyncMock, + return_value=[sample_content_data], + ): + result = await pipeline.fetch_all_sources() + assert result == 1 assert mock_repository.add_content_item.call_count == 1 + mock_repository.mark_items_as_backfilled.assert_called_once() await pipeline.close() @@ -518,6 +568,10 @@ async def test_fetch_success_resets_failure_count( mock_repository.get_all_sources.return_value = [sample_source] mock_repository.content_item_exists.return_value = False + mock_repository.get_most_recent_item_for_source.return_value = MagicMock( + spec=ContentItem, id="item-1", title="Test" + ) + mock_repository.mark_items_as_backfilled.return_value = 0 with patch.object( pipeline._adapters[SourceType.SUBSTACK], @@ -545,6 +599,10 @@ async def test_fetch_passes_skip_content_to_adapter( mock_repository.get_all_sources.return_value = [sample_source] mock_repository.content_item_exists.return_value = False + mock_repository.get_most_recent_item_for_source.return_value = MagicMock( + spec=ContentItem, id="item-1", title="Test" + ) + mock_repository.mark_items_as_backfilled.return_value = 0 with patch.object( pipeline._adapters[SourceType.SUBSTACK], @@ -627,6 +685,10 @@ async def test_never_skips_first_poll( sample_source.last_polled_at = None mock_repository.get_all_sources.return_value = [sample_source] mock_repository.content_item_exists.return_value = False + mock_repository.get_most_recent_item_for_source.return_value = MagicMock( + spec=ContentItem, id="item-1", title="Test" + ) + mock_repository.mark_items_as_backfilled.return_value = 0 with patch.object( pipeline._adapters[SourceType.SUBSTACK],