Conversation
There was a problem hiding this comment.
Pull request overview
This PR implements Phase 2 Flow 1 of the agentic librarian project, establishing a complete ETL pipeline for ingesting and enriching reading history data. The implementation introduces a modular Dagster-based orchestration system with three core assets (raw_history, enriched_metadata, vectorized_tropes) that process CSV files, enrich metadata from multiple sources (Google Books, Hardcover, Audible), and standardize tropes using semantic similarity.
Changes:
- Implemented complete ETL pipeline with Dagster orchestration (sensors, jobs, assets)
- Added multi-source metadata enrichment with dual-pathway audiobook scouting and MLFlow benchmarking
- Implemented semantic trope deduplication using Gemini embeddings and pgvector
- Created comprehensive test suite with unit, integration, and efficacy tests
- Established database infrastructure with PostgreSQL, pgvector extension, and SQLAlchemy models
- Documented verification procedures and architectural decisions
Reviewed changes
Copilot reviewed 42 out of 46 changed files in this pull request and generated 22 comments.
Show a summary per file
| File | Description |
|---|---|
| src/agentic_librarian/scouts/trope_manager.py | New: Manages trope standardization and semantic deduplication using Gemini embeddings |
| src/agentic_librarian/scouts/metadata_scout.py | Enhanced: Added MultiSourceScout, DirectKnowledgeScout, and dual-pathway audiobook scouting |
| src/agentic_librarian/orchestration/assets.py | New: Dagster assets for raw history loading, metadata enrichment, and trope vectorization |
| src/agentic_librarian/orchestration/sensors.py | New: File sensor to monitor data/raw for new CSV files and trigger enhancement job |
| src/agentic_librarian/orchestration/jobs.py | Updated: Job definition selecting all three pipeline assets |
| src/agentic_librarian/orchestration/definitions.py | New: Dagster definitions with assets, jobs, sensors, and database manager resource |
| src/agentic_librarian/etl/ingest.py | New: HistoryIngestor class for CSV cleaning and model mapping |
| src/agentic_librarian/etl/cleaning.py | Enhanced: Added date parsing, author/narrator splitting functions |
| src/agentic_librarian/etl/enhance.py | Deleted: Replaced by modular asset-based architecture |
| src/agentic_librarian/db/session.py | New: DatabaseManager with session lifecycle management and credential prompting |
| src/agentic_librarian/db/models.py | New: Complete SQLAlchemy models for authors, works, editions, tropes, and reading history |
| test/unit/test_trope_manager.py | New: Comprehensive unit tests for TropeManager with 100% coverage |
| test/unit/test_orchestration_sensors.py | New: Unit tests for file sensor cursor management and filtering |
| test/unit/test_ingest.py | New: Tests for CSV ingestion, year inference, and model generation |
| test/unit/test_cleaning.py | New: Tests for date parsing and author/format splitting |
| test/integration/test_etl_pipeline.py | New: Integration tests for full Dagster pipeline with mocked scouts |
| test/conftest.py | New: Pytest configuration with database reachability checks |
| docker-compose.yml | New: PostgreSQL with pgvector and MLFlow services |
| pyproject.toml | Updated: Added database dependencies (SQLAlchemy, pgvector, psycopg2-binary) |
| docs/verification_tutorial.md | New: Step-by-step verification guide for Phases 1 & 2 |
| docs/project_notes/decisions.md | Updated: ADRs for hybrid data access, coarse-grained MCP tools, and implementation patterns |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| response = self._client.models.generate_content( | ||
| model="gemini-2.0-flash", | ||
| contents=prompt, | ||
| config={"tools": [{"google_search": {}}] if os.environ.get("USE_SEARCH_GROUNDING") == "1" else []}, | ||
| ) | ||
|
|
||
| text = response.text.strip() | ||
| if text.startswith("```json"): | ||
| text = text.replace("```json", "").replace("```", "") | ||
| return json.loads(text) |
There was a problem hiding this comment.
Missing error handling for JSON parsing: Same issue as AudiobookScout - if the LLM response is not valid JSON, json.loads(text) at line 304 will raise a JSONDecodeError. Consider adding try-except handling with a descriptive error message that includes the title and author being processed for easier debugging.
test/test_metadata_scout.py
Outdated
| scout = md_scout.MultiSourceScout() | ||
| result = scout.scout_metadata("Test Book", "Test Author") | ||
|
|
||
| assert result["title"] == "Google Title" | ||
| assert result["authors"] == ["Google Author"] | ||
| assert result["isbn_13"] == "1111111111111" | ||
| assert result["page_count"] == 100 | ||
| assert result["genres"] == ["Fiction"] | ||
| assert result["original_publication_year"] == 2020 | ||
|
|
||
|
|
||
| def test_multi_source_scout_audiobook_dual_pathway(monkeypatch): | ||
| """Test that MultiSourceScout uses both pathways for audiobooks and logs to MLFlow.""" | ||
|
|
||
| # Mock MLFlow | ||
| mock_mlflow = MagicMock() | ||
| monkeypatch.setattr(md_scout, "mlflow", mock_mlflow) | ||
|
|
||
| def mock_google(title, author, api_key=None): | ||
| return {} | ||
|
|
||
| def mock_hardcover(title, author, format, api_key=None): | ||
| return {"audio_minutes": 100} | ||
|
|
||
| def mock_audible(self, title): | ||
| return {"length_minutes": 120} | ||
|
|
||
| def mock_direct(self, title, author): | ||
| return {"audio_minutes": 115} | ||
|
|
||
| monkeypatch.setattr(md_scout, "fetch_google_books_metadata", mock_google) | ||
| monkeypatch.setattr(md_scout, "fetch_hardcover_metadata", mock_hardcover) | ||
| monkeypatch.setattr(md_scout.AudiobookScout, "extract_metadata_with_gemini", mock_audible) | ||
| monkeypatch.setattr(md_scout.DirectKnowledgeScout, "scout_audiobook", mock_direct) | ||
|
|
||
| scout = md_scout.MultiSourceScout() |
There was a problem hiding this comment.
Tests will fail if GOOGLE_SEARCH_API_KEY is not set in the environment. The tests at lines 411 and 446 create MultiSourceScout() without arguments, which internally tries to instantiate AudiobookScout() and DirectKnowledgeScout() (metadata_scout.py lines 320-321). Both of these classes check for GOOGLE_SEARCH_API_KEY in their __init__ and raise ValueError if missing. The tests mock the methods but not the init. Consider either: (1) mocking AudiobookScout and DirectKnowledgeScout classes entirely, (2) setting a fake GOOGLE_SEARCH_API_KEY in the test setup, or (3) implementing lazy initialization as suggested in comment 016.
| "isbn_13": hardcover_data.get("isbn_13") or google_data.get("ISBN_13"), | ||
| "page_count": hardcover_data.get("page_count") or google_data.get("page_count"), | ||
| "description": hardcover_data.get("description") or google_data.get("description"), | ||
| "genres": list(set(list(hardcover_data.get("genres", [])) + google_data.get("genres", []))), |
There was a problem hiding this comment.
Potential TypeError when merging genres: The code attempts to merge genres from hardcover and google using list(set(list(hardcover_data.get("genres", [])) + google_data.get("genres", []))). However, hardcover_data.get("genres") returns a set (see line 185 of metadata_scout.py: "genres": set(genres)), not a list. Calling list() on a set works, but then adding it to another list creates a list of mixed types. If google_data.get("genres") returns None instead of an empty list, this will raise a TypeError. Consider: list(set(hardcover_data.get("genres", [])) | set(google_data.get("genres", []))) to properly merge both sets.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…om/jaydee829/agentic_librarian into 9-phase-2-flow-1-intake-enrichment
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 45 out of 49 changed files in this pull request and generated 10 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Implements all of Flow 1 along with some changes to enable a smoother Flow 2 implementation