diff --git a/blog-series/01-architecture-and-threat-model.md b/blog-series/01-architecture-and-threat-model.md index c07237c..d2767ed 100644 --- a/blog-series/01-architecture-and-threat-model.md +++ b/blog-series/01-architecture-and-threat-model.md @@ -12,7 +12,7 @@ CDDBS — the Cyber Disinformation Detection Briefing System — is an open-sour The result is a professional intelligence briefing — the kind an analyst at a think tank or government agency would write — produced in under a minute. -This is the first post in a series where I'll walk through the technical architecture, the pipeline internals, the quality assurance system, and the operational infrastructure behind it. This isn't a weekend project write-up. CDDBS has been through five development sprints, 169 tests, and a production deployment on Render. The goal of this series is to show how the pieces fit together — and why we made the decisions we did. +TThis is the first post in a series where I'll walk through the technical architecture, the pipeline internals, the quality assurance system, and the operational infrastructure behind it. This isn't a weekend project write-up. CDDBS has been through six development sprints, 142 tests, and a production deployment on Render. The goal of this series is to show how the pieces fit together — and why we made the decisions we did. ## The Problem We're Solving @@ -61,7 +61,7 @@ CDDBS is a three-tier application: └──────────────┬──────────────┘ │ ┌──────────────▼──────────────┐ -│ PostgreSQL (7 tables) │ +│ PostgreSQL (12 tables) │ │ Neon managed (production) │ └─────────────────────────────┘ │ @@ -71,7 +71,7 @@ CDDBS is a three-tier application: (article fetch) (LLM analysis) ``` -The backend is a FastAPI application with 17 endpoints. The frontend is a React SPA with Redux Toolkit for state and TanStack React Query for data fetching. PostgreSQL stores everything — reports, articles, quality scores, narrative matches, and tester feedback. +The backend is a FastAPI application with 34 endpoints. The frontend is a React SPA with Redux Toolkit for state and TanStack React Query for data fetching. PostgreSQL stores everything — reports, articles, quality scores, narrative matches, and tester feedback. ### The BYOK Model @@ -104,7 +104,7 @@ Stages 4 and 5 — quality scoring and narrative matching — are wrapped in `tr ## Database Schema -Seven tables store the full lifecycle of an analysis: +Twelve tables store the full lifecycle of an analysis — from article ingestion to briefing delivery and webhook alerting: ``` outlets ──< articles >── reports ──< narrative_matches @@ -113,6 +113,13 @@ outlets ──< articles >── reports ──< narrative_matches │ batches ─────────────────── ┘ (via report_ids JSON) +topic_runs ──< topic_outlet_results + +raw_articles (multi-source ingestion) +event_clusters (Sprint 6+) +narrative_bursts (Sprint 6+) +webhook_configs + feedback (standalone) ``` @@ -122,6 +129,8 @@ Narrative matches are stored as individual rows — one per detected narrative p The `Batch` model (added in Sprint 5) groups multiple reports under a single analysis request. It tracks progress with `completed_count` and `failed_count` fields, and stores linked report IDs in a JSON column. +Sprint 6 added four more tables: `raw_articles` (multi-source feed ingestion from RSS and GDELT), `event_clusters` and `narrative_bursts` (for event intelligence, populated in Sprint 7+), and `webhook_configs` (for outbound alerting via HMAC-signed webhooks). + ## What's Coming in This Series This post covered the *what* and *why*. The next posts go deep on the *how*: @@ -130,6 +139,7 @@ This post covered the *what* and *why*. The next posts go deep on the *how*: - **Part 3**: The 7-dimension quality rubric — how we score LLM output without using another LLM. - **Part 4**: Multi-platform analysis — how Twitter API v2 and Telegram adapters normalize heterogeneous data into a common format. - **Part 5**: Operational maturity — batch analysis, export formats, metrics, and what it takes to go from "it works on my machine" to "it works in production." +- **Part 6**: Event intelligence at scale — the Sprint 6 multi-source ingestion pipeline (RSS + GDELT), TF-IDF deduplication, and webhook alerting. Each post will include real code, real data flows, and real architectural trade-offs. If you're building LLM-powered analysis tools — or any system where LLM output quality matters — the patterns here apply well beyond disinformation detection. diff --git a/blog-series/02-the-analysis-pipeline.md b/blog-series/02-the-analysis-pipeline.md index fb2e98c..c9a4858 100644 --- a/blog-series/02-the-analysis-pipeline.md +++ b/blog-series/02-the-analysis-pipeline.md @@ -18,22 +18,40 @@ When a user requests an analysis of a media outlet, the first thing we need is c ```python # src/cddbs/pipeline/fetch.py (simplified) + +# Map short date_filter codes to Google News 'when:' query values +_WHEN_MAP = { + "h": "1h", + "d": "1d", + "w": "7d", + "m": "30d", + "y": "1y", +} + def fetch_articles(outlet, country, num_articles=3, url=None, api_key=None, time_period=None): if not api_key: return generate_mock_articles(outlet) + query = f'"{outlet}"' + if url: + clean_url = url.replace("https://", "").replace("http://", "").split("/")[0] + query = f'"{outlet}" site:{clean_url}' + + # google_news engine does NOT support the tbs parameter. + # Date filtering must be done via 'when:' operator in the query string. + if time_period: + when_value = _WHEN_MAP.get(time_period, time_period) + query = f"{query} when:{when_value}" + params = { "engine": "google_news", - "q": f'"{outlet}"' + (f" site:{url}" if url else ""), + "q": query, "gl": normalize_country(country), - "num": num_articles + "api_key": api_key, } - if time_period: - params["tbs"] = f"qdr:{time_period}" - - search = GoogleSearch({**params, "api_key": api_key}) + search = GoogleSearch(params) results = search.get_dict() return results.get("news_results", []) ``` @@ -42,9 +60,9 @@ A few things to note: **Country normalization.** SerpAPI expects ISO country codes (`us`, `ru`, `gb`), but users type "Russia" or "United States." The `normalize_country()` function maps natural language country names to their codes. Small detail, large UX impact. -**Date filtering.** The `tbs=qdr:{period}` parameter filters by recency — `h` for last hour, `d` for day, `w` for week. This matters because disinformation campaigns often intensify around specific events. An analyst tracking a narrative spike needs yesterday's articles, not last month's. +**Date filtering.** The SerpAPI `google_news` engine does **not** support the `tbs` parameter. Passing `tbs=qdr:d` silently fails — articles come back unfiltered. The correct approach is the `when:` query operator embedded directly in the search string: `when:1d` for last 24 hours, `when:7d` for last week. We discovered this through silent failures in early testing and patched it in production. This matters because disinformation campaigns often intensify around specific events — an analyst tracking a narrative spike needs yesterday's articles, not last month's. -**Mock fallback.** When no API key is configured, the system generates mock articles rather than crashing. This is critical for local development and testing — you don't want your 169 tests to require a live SerpAPI key. +**Mock fallback.** When no API key is configured, the system generates mock articles rather than crashing. This is critical for local development and testing — you don't want your 142 tests to require a live SerpAPI key. ## Stage 2: Prompt Construction @@ -258,24 +276,29 @@ The narrative matcher reads the full report text, scans for keyword hits across ## The Async Execution Model -The entire pipeline runs in a background thread: +The entire pipeline runs as a FastAPI background task: ```python @app.post("/analysis-runs") -def create_analysis_run(request: RunCreateRequest, db=Depends(get_db)): +def create_analysis_run( + request: RunCreateRequest, + background_tasks: BackgroundTasks, + db=Depends(get_db), +): # Create placeholder report report = Report(outlet=request.outlet, country=request.country) report.data = {"status": "queued"} db.add(report) db.commit() - # Launch pipeline in background - thread = threading.Thread( - target=_run_analysis_job, - args=(report.id, request.outlet, request.country, ...), - daemon=True + # Schedule pipeline as a background task + background_tasks.add_task( + _run_analysis_job, + report_id=report.id, + outlet=request.outlet, + country=request.country, + # ... other params ) - thread.start() return {"id": report.id, "status": "queued"} ``` @@ -296,7 +319,7 @@ const { data: run } = useQuery({ }); ``` -Why threads instead of Celery? Cost discipline. Celery requires a message broker (Redis or RabbitMQ), which means another service to deploy and pay for. For our throughput (single-digit concurrent analyses on Render free tier), Python threads are fine. If CDDBS needed to handle hundreds of concurrent analyses, we'd switch to a proper task queue. Until then, the simplest solution that works is the right one. +Why FastAPI `BackgroundTasks` instead of Celery? Cost discipline. Celery requires a message broker (Redis or RabbitMQ), which means another service to deploy and pay for. FastAPI's built-in `BackgroundTasks` runs the job in the same process after the HTTP response is sent — zero extra infrastructure. For our throughput (single-digit concurrent analyses on Render free tier), this is entirely adequate. If CDDBS needed to handle hundreds of concurrent analyses, we'd switch to a proper task queue. Until then, the simplest solution that works is the right one. ## Platform Routing diff --git a/blog-series/04-multi-platform-analysis.md b/blog-series/04-multi-platform-analysis.md index bdc2cc6..c1ad9c5 100644 --- a/blog-series/04-multi-platform-analysis.md +++ b/blog-series/04-multi-platform-analysis.md @@ -314,11 +314,11 @@ With multi-platform support, CDDBS can address several analysis patterns: Transparency about limitations: -**Telegram Bot API integration is interface-only.** The adapter exists and normalizes data correctly (22 tests prove it), but there's no live Telegram client yet. The MTProto protocol is complex, and the Bot API has its own rate limiting quirks. This is planned for Sprint 6. +**Telegram live integration shipped in Sprint 6.** What started as an interface-only adapter is now wired into the live pipeline via `POST /analysis-runs/telegram`. The endpoint accepts a Telegram channel handle and routes it through `TelegramAdapter` in the orchestrator using the Telegram Bot API. The adapter tests (22 tests) cover normalization and forwarding chain attribution; the live endpoint handles channel lookups and message retrieval. **Cross-platform identity linking is manual.** The research framework defines 8 signals for linking accounts across platforms (shared URLs, similar bios, posting timing, content overlap, etc.), but automated correlation isn't implemented. An analyst has to manually run analyses on suspected linked accounts and compare the results. -**No real-time streaming.** Both Twitter and Telegram offer streaming APIs for real-time data. CDDBS currently operates in batch mode — you request an analysis, it fetches recent data, and gives you a report. Continuous monitoring is a future capability. +**No real-time streaming.** Both Twitter and Telegram offer streaming APIs for real-time data. CDDBS currently operates in batch mode — you request an analysis, it fetches recent data, and gives you a report. The Sprint 6 RSS/GDELT ingestion pipeline runs on a schedule (every 3–5 minutes), which is the closest thing to near-real-time monitoring available today. Full streaming is a future capability. ## The Adapter Test Suite diff --git a/blog-series/05-operational-maturity.md b/blog-series/05-operational-maturity.md index 427f917..a894f09 100644 --- a/blog-series/05-operational-maturity.md +++ b/blog-series/05-operational-maturity.md @@ -52,11 +52,15 @@ The trade-off is that querying "all reports in batch X" requires a JSON contains ### Execution Model -Each target in a batch gets its own background thread: +Each target in a batch gets its own FastAPI `BackgroundTask`: ```python @app.post("/analysis-runs/batch") -def create_batch(request: BatchCreateRequest, db=Depends(get_db)): +def create_batch( + request: BatchCreateRequest, + background_tasks: BackgroundTasks, + db=Depends(get_db), +): batch = Batch( name=request.name, status="running", @@ -74,13 +78,13 @@ def create_batch(request: BatchCreateRequest, db=Depends(get_db)): batch.report_ids = batch.report_ids + [report.id] db.commit() - thread = threading.Thread( - target=_run_analysis_job, - args=(report.id, target.outlet, target.country, ...), - kwargs={"batch_id": batch.id}, - daemon=True + background_tasks.add_task( + _run_analysis_job, + report_id=report.id, + outlet=target.outlet, + country=target.country, + batch_id=batch.id, ) - thread.start() return {"batch_id": batch.id, "target_count": len(request.targets)} ``` @@ -119,9 +123,9 @@ GET /analysis-runs/batch/7 } ``` -### Why Threads, Not a Task Queue? +### Why BackgroundTasks, Not a Task Queue? -Same reasoning as the single-analysis pipeline: cost discipline. A proper task queue (Celery + Redis) requires two additional services. For batches capped at 5 targets on Render's free tier, Python threads with daemon mode are adequate. The `BATCH_MAX_SIZE` config (default 5) prevents resource exhaustion. +Same reasoning as the single-analysis pipeline: cost discipline. A proper task queue (Celery + Redis) requires two additional services. FastAPI's `BackgroundTasks` runs jobs in-process after the response is returned — zero extra infrastructure. For batches capped at 5 targets on Render's free tier, this is entirely adequate. The `BATCH_MAX_SIZE` config (default 5) prevents resource exhaustion. ## Export Pipeline @@ -478,38 +482,39 @@ def test_export_json_without_quality(db, report): ## The Full Picture -After five sprints, here's where CDDBS stands: +After five sprints of operational maturity work (six total in the series), here's where CDDBS stands after Sprint 6: | Metric | Value | |--------|-------| -| Database tables | 7 | -| API endpoints | 17 | -| Tests passing | 169 | -| External dependencies added (Sprints 4-5) | 0 required, 1 optional | -| Lines of backend code | ~2,500 | +| Database tables | 12 | +| API endpoints | 34 | +| Tests passing | 142 | +| External dependencies added (Sprints 4-6) | feedparser, httpx, scikit-learn, scipy (+ optional: reportlab) | +| Lines of backend code | ~4,000 | | Frontend components | 15 | -The system handles the full lifecycle: ingest data from news or social media, analyze it with a constrained LLM, score the output for structural quality, match against known disinformation narratives, export results in three formats, and track operational health over time. +The system handles the full lifecycle: ingest data from news, social media, RSS, and GDELT; analyze with a constrained LLM; score output for structural quality; match against known disinformation narratives; export results in three formats; track operational health over time; and fire webhook alerts to external subscribers. ## What's Next for CDDBS -The immediate roadmap: +Sprint 6 delivered the event intelligence pipeline — multi-source ingestion (RSS + GDELT), TF-IDF deduplication, and webhook alerting (covered in Part 6 of this series). The immediate roadmap ahead: -- **Sprint 6**: Telegram Bot API live integration, frontend metrics dashboard, webhook alerting for failure spikes. -- **Sprint 7-8**: User authentication, shared analysis workspaces, trend detection over time. -- **Sprint 9+**: ML-based narrative matching (to complement keyword matching), automated monitoring schedules, multi-language support. +- **Sprint 7**: Event clustering (TF-IDF agglomerative), Z-score burst detection for narrative spikes, `EventClusterPanel` and `BurstTimeline` frontend components. +- **Sprint 8**: User authentication, shared analysis workspaces, automated monitoring schedules. +- **Sprint 9+**: ML-based narrative matching (to complement keyword matching), multi-language support, sentence-transformer upgrade for semantic deduplication. The long-term vision is a system where an analyst can set up continuous monitoring of 20+ outlets and social media accounts, get alerted when narrative patterns shift, and produce briefings that meet professional intelligence community standards — all powered by LLMs constrained to be honest about what they know and don't know. ## Series Recap -This five-part series covered: +This series has covered: 1. **Architecture & Threat Model** — What CDDBS is, the 18 narratives it tracks, and the three-tier architecture. 2. **The Analysis Pipeline** — Article fetch, prompt construction, LLM call, response parsing, and the async execution model. 3. **Quality Scoring & Narrative Detection** — The 7-dimension rubric, keyword-based narrative matching, and why we evaluate structure instead of truth. 4. **Multi-Platform Analysis** — Twitter and Telegram adapters, platform routing, and the common `BriefingInput` format. -5. **Operational Maturity** — Batch analysis, export formats, metrics, and production engineering. +5. **Operational Maturity** — Batch analysis, export formats, metrics, and production engineering (this post). +6. **Event Intelligence at Scale** — Sprint 6: RSS + GDELT ingestion, TF-IDF deduplication, webhook alerting. The common thread: **constrain the LLM, verify the output, degrade gracefully.** LLMs are powerful synthesis engines, but they need guardrails — structured prompts, typed evidence, quality rubrics, and narrative databases — to produce output that analysts can trust. Building those guardrails is the actual engineering challenge. The LLM call itself is one line of code.