Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions src/intelstream/services/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,12 +242,20 @@ async def _fetch_source(self, source: Source) -> int:
logger.debug("Content item already exists", external_id=item.external_id)
continue

if is_first_poll:
if is_first_poll and new_count > 0:
most_recent = await self._repository.get_most_recent_item_for_source(source.id)
if most_recent:
backfilled = await self._repository.mark_items_as_backfilled(
source_id=source.id,
exclude_item_id=most_recent.id,
)
if backfilled > 0:
logger.info(
"First poll for source, limiting to most recent item",
"First poll: backfilled pre-existing items",
source_name=source.name,
backfilled_count=backfilled,
most_recent_title=most_recent.title,
)
break

await self._repository.update_source_last_polled(source.id)

Expand Down
66 changes: 64 additions & 2 deletions tests/test_services/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ async def test_fetch_all_sources_success(

mock_repository.get_all_sources.return_value = [sample_source]
mock_repository.content_item_exists.return_value = False
mock_repository.get_most_recent_item_for_source.return_value = MagicMock(
spec=ContentItem, id="item-1", title="Test"
)
mock_repository.mark_items_as_backfilled.return_value = 0

with patch.object(
pipeline._adapters[SourceType.SUBSTACK],
Expand Down Expand Up @@ -253,17 +257,21 @@ async def test_fetch_all_sources_no_adapter_for_type(

await pipeline.close()

async def test_first_poll_limits_to_one_item(
async def test_first_poll_stores_all_items_and_backfills(
self,
pipeline: ContentPipeline,
mock_repository: AsyncMock,
sample_source,
):
"""On first poll (last_polled_at is None), only store the most recent item."""
"""On first poll, store all items but backfill all except the most recent."""
await pipeline.initialize()

sample_source.last_polled_at = None

most_recent = MagicMock(spec=ContentItem)
most_recent.id = "most-recent-id"
most_recent.title = "Article 4"

items = [
ContentData(
external_id=f"article-{i}",
Expand All @@ -279,6 +287,8 @@ async def test_first_poll_limits_to_one_item(

mock_repository.get_all_sources.return_value = [sample_source]
mock_repository.content_item_exists.return_value = False
mock_repository.get_most_recent_item_for_source.return_value = most_recent
mock_repository.mark_items_as_backfilled.return_value = 4

with patch.object(
pipeline._adapters[SourceType.SUBSTACK],
Expand All @@ -288,8 +298,48 @@ async def test_first_poll_limits_to_one_item(
):
result = await pipeline.fetch_all_sources()

assert result == 5
assert mock_repository.add_content_item.call_count == 5
mock_repository.get_most_recent_item_for_source.assert_called_once_with(sample_source.id)
mock_repository.mark_items_as_backfilled.assert_called_once_with(
source_id=sample_source.id,
exclude_item_id=most_recent.id,
)

await pipeline.close()

async def test_first_poll_no_backfill_when_single_item(
self,
pipeline: ContentPipeline,
mock_repository: AsyncMock,
sample_source,
sample_content_data,
):
"""On first poll with a single item, backfill marks 0 items."""
await pipeline.initialize()

sample_source.last_polled_at = None

most_recent = MagicMock(spec=ContentItem)
most_recent.id = "item-id"
most_recent.title = "Test Article"

mock_repository.get_all_sources.return_value = [sample_source]
mock_repository.content_item_exists.return_value = False
mock_repository.get_most_recent_item_for_source.return_value = most_recent
mock_repository.mark_items_as_backfilled.return_value = 0

with patch.object(
pipeline._adapters[SourceType.SUBSTACK],
"fetch_latest",
new_callable=AsyncMock,
return_value=[sample_content_data],
):
result = await pipeline.fetch_all_sources()

assert result == 1
assert mock_repository.add_content_item.call_count == 1
mock_repository.mark_items_as_backfilled.assert_called_once()

await pipeline.close()

Expand Down Expand Up @@ -518,6 +568,10 @@ async def test_fetch_success_resets_failure_count(

mock_repository.get_all_sources.return_value = [sample_source]
mock_repository.content_item_exists.return_value = False
mock_repository.get_most_recent_item_for_source.return_value = MagicMock(
spec=ContentItem, id="item-1", title="Test"
)
mock_repository.mark_items_as_backfilled.return_value = 0

with patch.object(
pipeline._adapters[SourceType.SUBSTACK],
Expand Down Expand Up @@ -545,6 +599,10 @@ async def test_fetch_passes_skip_content_to_adapter(

mock_repository.get_all_sources.return_value = [sample_source]
mock_repository.content_item_exists.return_value = False
mock_repository.get_most_recent_item_for_source.return_value = MagicMock(
spec=ContentItem, id="item-1", title="Test"
)
mock_repository.mark_items_as_backfilled.return_value = 0

with patch.object(
pipeline._adapters[SourceType.SUBSTACK],
Expand Down Expand Up @@ -627,6 +685,10 @@ async def test_never_skips_first_poll(
sample_source.last_polled_at = None
mock_repository.get_all_sources.return_value = [sample_source]
mock_repository.content_item_exists.return_value = False
mock_repository.get_most_recent_item_for_source.return_value = MagicMock(
spec=ContentItem, id="item-1", title="Test"
)
mock_repository.mark_items_as_backfilled.return_value = 0

with patch.object(
pipeline._adapters[SourceType.SUBSTACK],
Expand Down
Loading