diff --git a/src/intelstream/services/pipeline.py b/src/intelstream/services/pipeline.py index 32abfee..4180208 100644 --- a/src/intelstream/services/pipeline.py +++ b/src/intelstream/services/pipeline.py @@ -242,12 +242,20 @@ async def _fetch_source(self, source: Source) -> int: logger.debug("Content item already exists", external_id=item.external_id) continue - if is_first_poll: + if is_first_poll and new_count > 0: + most_recent = await self._repository.get_most_recent_item_for_source(source.id) + if most_recent: + backfilled = await self._repository.mark_items_as_backfilled( + source_id=source.id, + exclude_item_id=most_recent.id, + ) + if backfilled > 0: logger.info( - "First poll for source, limiting to most recent item", + "First poll: backfilled pre-existing items", source_name=source.name, + backfilled_count=backfilled, + most_recent_title=most_recent.title, ) - break await self._repository.update_source_last_polled(source.id) diff --git a/tests/test_services/test_pipeline.py b/tests/test_services/test_pipeline.py index 4e90f32..c4dee40 100644 --- a/tests/test_services/test_pipeline.py +++ b/tests/test_services/test_pipeline.py @@ -176,6 +176,10 @@ async def test_fetch_all_sources_success( mock_repository.get_all_sources.return_value = [sample_source] mock_repository.content_item_exists.return_value = False + mock_repository.get_most_recent_item_for_source.return_value = MagicMock( + spec=ContentItem, id="item-1", title="Test" + ) + mock_repository.mark_items_as_backfilled.return_value = 0 with patch.object( pipeline._adapters[SourceType.SUBSTACK], @@ -253,17 +257,21 @@ async def test_fetch_all_sources_no_adapter_for_type( await pipeline.close() - async def test_first_poll_limits_to_one_item( + async def test_first_poll_stores_all_items_and_backfills( self, pipeline: ContentPipeline, mock_repository: AsyncMock, sample_source, ): - """On first poll (last_polled_at is None), only store the most recent item.""" + """On first poll, store all items but backfill all except the most recent.""" await pipeline.initialize() sample_source.last_polled_at = None + most_recent = MagicMock(spec=ContentItem) + most_recent.id = "most-recent-id" + most_recent.title = "Article 4" + items = [ ContentData( external_id=f"article-{i}", @@ -279,6 +287,8 @@ async def test_first_poll_limits_to_one_item( mock_repository.get_all_sources.return_value = [sample_source] mock_repository.content_item_exists.return_value = False + mock_repository.get_most_recent_item_for_source.return_value = most_recent + mock_repository.mark_items_as_backfilled.return_value = 4 with patch.object( pipeline._adapters[SourceType.SUBSTACK], @@ -288,8 +298,48 @@ async def test_first_poll_limits_to_one_item( ): result = await pipeline.fetch_all_sources() + assert result == 5 + assert mock_repository.add_content_item.call_count == 5 + mock_repository.get_most_recent_item_for_source.assert_called_once_with(sample_source.id) + mock_repository.mark_items_as_backfilled.assert_called_once_with( + source_id=sample_source.id, + exclude_item_id=most_recent.id, + ) + + await pipeline.close() + + async def test_first_poll_no_backfill_when_single_item( + self, + pipeline: ContentPipeline, + mock_repository: AsyncMock, + sample_source, + sample_content_data, + ): + """On first poll with a single item, backfill marks 0 items.""" + await pipeline.initialize() + + sample_source.last_polled_at = None + + most_recent = MagicMock(spec=ContentItem) + most_recent.id = "item-id" + most_recent.title = "Test Article" + + mock_repository.get_all_sources.return_value = [sample_source] + mock_repository.content_item_exists.return_value = False + mock_repository.get_most_recent_item_for_source.return_value = most_recent + mock_repository.mark_items_as_backfilled.return_value = 0 + + with patch.object( + pipeline._adapters[SourceType.SUBSTACK], + "fetch_latest", + new_callable=AsyncMock, + return_value=[sample_content_data], + ): + result = await pipeline.fetch_all_sources() + assert result == 1 assert mock_repository.add_content_item.call_count == 1 + mock_repository.mark_items_as_backfilled.assert_called_once() await pipeline.close() @@ -518,6 +568,10 @@ async def test_fetch_success_resets_failure_count( mock_repository.get_all_sources.return_value = [sample_source] mock_repository.content_item_exists.return_value = False + mock_repository.get_most_recent_item_for_source.return_value = MagicMock( + spec=ContentItem, id="item-1", title="Test" + ) + mock_repository.mark_items_as_backfilled.return_value = 0 with patch.object( pipeline._adapters[SourceType.SUBSTACK], @@ -545,6 +599,10 @@ async def test_fetch_passes_skip_content_to_adapter( mock_repository.get_all_sources.return_value = [sample_source] mock_repository.content_item_exists.return_value = False + mock_repository.get_most_recent_item_for_source.return_value = MagicMock( + spec=ContentItem, id="item-1", title="Test" + ) + mock_repository.mark_items_as_backfilled.return_value = 0 with patch.object( pipeline._adapters[SourceType.SUBSTACK], @@ -627,6 +685,10 @@ async def test_never_skips_first_poll( sample_source.last_polled_at = None mock_repository.get_all_sources.return_value = [sample_source] mock_repository.content_item_exists.return_value = False + mock_repository.get_most_recent_item_for_source.return_value = MagicMock( + spec=ContentItem, id="item-1", title="Test" + ) + mock_repository.mark_items_as_backfilled.return_value = 0 with patch.object( pipeline._adapters[SourceType.SUBSTACK],