From eafdec37c22d80a7458780a47d7d5541d50617bb Mon Sep 17 00:00:00 2001 From: fagemx Date: Fri, 27 Mar 2026 09:26:16 +0800 Subject: [PATCH 1/2] feat(serve): add recurring pattern detection endpoint (GH-318) Add GET /api/patterns endpoint to detect recurring governance patterns within a village's decisions. Three pattern types are detected: - recurring_decision: same key changed >= N times in window - chief_repeated_action: same authority+key >= N times - rollback_trend: supersession chains with trend direction Query params: village_id (required), lookback_days (default 7, max 90), min_occurrences (default 3, min 2). Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/edda-ledger/src/ledger.rs | 11 + crates/edda-ledger/src/sqlite_store.rs | 437 +++++++++++++++++++++++++ crates/edda-serve/src/lib.rs | 129 ++++++++ 3 files changed, 577 insertions(+) diff --git a/crates/edda-ledger/src/ledger.rs b/crates/edda-ledger/src/ledger.rs index 31c2f4e..20cb169 100644 --- a/crates/edda-ledger/src/ledger.rs +++ b/crates/edda-ledger/src/ledger.rs @@ -239,6 +239,17 @@ impl Ledger { self.sqlite.village_stats(village_id, after, before) } + /// Detect recurring patterns in a village's decision history. + pub fn detect_village_patterns( + &self, + village_id: &str, + after: &str, + min_occurrences: usize, + ) -> anyhow::Result> { + self.sqlite + .detect_village_patterns(village_id, after, min_occurrences) + } + /// Find the active decision for a specific key on a branch. pub fn find_active_decision( &self, diff --git a/crates/edda-ledger/src/sqlite_store.rs b/crates/edda-ledger/src/sqlite_store.rs index 5737f9e..8b3186c 100644 --- a/crates/edda-ledger/src/sqlite_store.rs +++ b/crates/edda-ledger/src/sqlite_store.rs @@ -303,6 +303,42 @@ pub struct VillageStatsPeriod { pub before: Option, } +/// The type of recurring pattern detected. +#[derive(Debug, Clone, serde::Serialize)] +#[serde(rename_all = "snake_case")] +pub enum PatternType { + RecurringDecision, + ChiefRepeatedAction, + RollbackTrend, +} + +/// A single detected pattern in a village's decision history. +#[derive(Debug, Clone, serde::Serialize)] +pub struct DetectedPattern { + pub pattern_type: PatternType, + pub key: String, + pub domain: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub authority: Option, + pub occurrences: usize, + pub first_seen: String, + pub last_seen: String, + pub dates: Vec, + pub description: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub trending_up: Option, +} + +/// Result of pattern detection for a village. +#[derive(Debug, Clone, serde::Serialize)] +pub struct PatternDetectionResult { + pub village_id: String, + pub lookback_days: u32, + pub after: String, + pub total_patterns: usize, + pub patterns: Vec, +} + /// Domain with decision count. #[derive(Debug, Clone, serde::Serialize)] pub struct DomainCount { @@ -2167,6 +2203,169 @@ impl SqliteStore { }) } + // ── Pattern Detection ───────────────────────────────────────────── + + /// Detect recurring patterns in a village's decision history. + /// + /// Runs three SQL queries to find: + /// 1. Recurring decisions (same key changed >= min_occurrences times) + /// 2. Chief repeated actions (same authority+key >= min_occurrences times) + /// 3. Rollback trends (supersession chains >= 2 within the window) + pub fn detect_village_patterns( + &self, + village_id: &str, + after: &str, + min_occurrences: usize, + ) -> anyhow::Result> { + let mut patterns = Vec::new(); + + // Query 1: Recurring decisions — same key changed N+ times + { + let sql = " + SELECT d.key, d.domain, COUNT(*) as cnt, + MIN(e.ts) as first_seen, MAX(e.ts) as last_seen, + GROUP_CONCAT(DATE(e.ts), ',') as dates + FROM decisions d + JOIN events e ON d.event_id = e.event_id + WHERE d.village_id = ?1 AND e.ts >= ?2 + GROUP BY d.key, d.domain + HAVING cnt >= ?3 + ORDER BY cnt DESC + "; + let mut stmt = self.conn.prepare(sql)?; + let rows = + stmt.query_map(params![village_id, after, min_occurrences as i64], |row| { + let key: String = row.get(0)?; + let domain: String = row.get(1)?; + let cnt: usize = row.get(2)?; + let first: String = row.get(3)?; + let last: String = row.get(4)?; + let dates_str: String = row.get::<_, Option>(5)?.unwrap_or_default(); + Ok((key, domain, cnt, first, last, dates_str)) + })?; + for row in rows { + let (key, domain, cnt, first, last, dates_str) = row?; + let dates: Vec = dates_str + .split(',') + .filter(|s| !s.is_empty()) + .map(|s| s.to_string()) + .collect(); + patterns.push(DetectedPattern { + pattern_type: PatternType::RecurringDecision, + description: format!("{key} changed {cnt} times in window"), + key, + domain, + authority: None, + occurrences: cnt, + first_seen: first, + last_seen: last, + dates, + trending_up: None, + }); + } + } + + // Query 2: Chief repeated actions — same authority+key N+ times + { + let sql = " + SELECT d.authority, d.key, d.domain, COUNT(*) as cnt, + MIN(e.ts) as first_seen, MAX(e.ts) as last_seen, + GROUP_CONCAT(DATE(e.ts), ',') as dates + FROM decisions d + JOIN events e ON d.event_id = e.event_id + WHERE d.village_id = ?1 AND e.ts >= ?2 + AND d.authority != 'system' + GROUP BY d.authority, d.key, d.domain + HAVING cnt >= ?3 + ORDER BY cnt DESC + "; + let mut stmt = self.conn.prepare(sql)?; + let rows = + stmt.query_map(params![village_id, after, min_occurrences as i64], |row| { + let authority: String = row.get(0)?; + let key: String = row.get(1)?; + let domain: String = row.get(2)?; + let cnt: usize = row.get(3)?; + let first: String = row.get(4)?; + let last: String = row.get(5)?; + let dates_str: String = row.get::<_, Option>(6)?.unwrap_or_default(); + Ok((authority, key, domain, cnt, first, last, dates_str)) + })?; + for row in rows { + let (authority, key, domain, cnt, first, last, dates_str) = row?; + let dates: Vec = dates_str + .split(',') + .filter(|s| !s.is_empty()) + .map(|s| s.to_string()) + .collect(); + patterns.push(DetectedPattern { + pattern_type: PatternType::ChiefRepeatedAction, + description: format!("{authority} changed {key} {cnt} times in window"), + key, + domain, + authority: Some(authority), + occurrences: cnt, + first_seen: first, + last_seen: last, + dates, + trending_up: None, + }); + } + } + + // Query 3: Rollback trends — keys with supersession chains + { + let sql = " + SELECT d.key, d.domain, COUNT(*) as cnt, + MIN(e.ts) as first_seen, MAX(e.ts) as last_seen, + GROUP_CONCAT(DATE(e.ts), ',') as dates + FROM decisions d + JOIN events e ON d.event_id = e.event_id + WHERE d.village_id = ?1 AND e.ts >= ?2 + AND d.supersedes_id IS NOT NULL + GROUP BY d.key, d.domain + HAVING cnt >= 2 + ORDER BY cnt DESC + "; + let mut stmt = self.conn.prepare(sql)?; + let rows = stmt.query_map(params![village_id, after], |row| { + let key: String = row.get(0)?; + let domain: String = row.get(1)?; + let cnt: usize = row.get(2)?; + let first: String = row.get(3)?; + let last: String = row.get(4)?; + let dates_str: String = row.get::<_, Option>(5)?.unwrap_or_default(); + Ok((key, domain, cnt, first, last, dates_str)) + })?; + for row in rows { + let (key, domain, cnt, first, last, dates_str) = row?; + let dates: Vec = dates_str + .split(',') + .filter(|s| !s.is_empty()) + .map(|s| s.to_string()) + .collect(); + + // Trend detection: compare rollback count in first half vs second half + let trending_up = detect_trend_direction(&dates, after); + + patterns.push(DetectedPattern { + pattern_type: PatternType::RollbackTrend, + description: format!("{key} rolled back {cnt} times in window"), + key, + domain, + authority: None, + occurrences: cnt, + first_seen: first, + last_seen: last, + dates, + trending_up: Some(trending_up), + }); + } + } + + Ok(patterns) + } + // ── Cross-Project Sync ───────────────────────────────────────────── /// Query active decisions with shared or global scope. @@ -3375,6 +3574,34 @@ struct EventRow { event_level: Option, } +/// Detect if rollback frequency is trending upward by comparing first half vs second half. +/// +/// Splits the dates at the window midpoint. Returns `true` if the second half +/// has more occurrences than the first half. +fn detect_trend_direction(dates: &[String], after: &str) -> bool { + if dates.len() < 2 { + return false; + } + // Use the `after` date and the last date to compute midpoint + let after_date = after.split('T').next().unwrap_or(after); + let last_date = dates.last().map(|s| s.as_str()).unwrap_or(after_date); + + // Simple midpoint: sort dates, split in half + let mut sorted: Vec<&str> = dates.iter().map(|s| s.as_str()).collect(); + sorted.sort(); + let mid = sorted.len() / 2; + + // If we have a midpoint date, use it; otherwise just compare halves + if let Some(&mid_date) = sorted.get(mid) { + let first_half = sorted.iter().filter(|&&d| d < mid_date).count(); + let second_half = sorted.iter().filter(|&&d| d >= mid_date).count(); + second_half > first_half + } else { + let _ = (after_date, last_date); + false + } +} + fn row_to_event(row: EventRow) -> anyhow::Result { let payload: serde_json::Value = serde_json::from_str(&row.payload_str)?; let blobs: Vec = serde_json::from_str(&row.refs_blobs_str)?; @@ -6566,4 +6793,214 @@ mod tests { drop(store); let _ = std::fs::remove_dir_all(&dir); } + + // ── Pattern Detection Tests ── + + /// Helper to create a decision payload with specific authority and village. + fn make_dp( + key: &str, + value: &str, + authority: &str, + village: &str, + ) -> edda_core::types::DecisionPayload { + edda_core::types::DecisionPayload { + key: key.to_string(), + value: value.to_string(), + reason: Some("test".to_string()), + scope: None, + authority: Some(authority.to_string()), + affected_paths: None, + tags: None, + review_after: None, + reversibility: None, + village_id: Some(village.to_string()), + } + } + + #[test] + fn test_detect_village_patterns_recurring() { + let (dir, store) = tmp_db(); + + // Insert 5 decisions with the same key in village "v1" + let mut prev_hash: Option = None; + for i in 0..5 { + let dp = make_dp( + "rewards.daily_limit", + &format!("{}", 100 + i), + "event_chief", + "v1", + ); + let event = + edda_core::event::new_decision_event("main", prev_hash.as_deref(), "system", &dp) + .unwrap(); + prev_hash = Some(event.hash.clone()); + store.append_event(&event).unwrap(); + } + + let patterns = store + .detect_village_patterns("v1", "2020-01-01", 3) + .unwrap(); + + // Should detect at least one recurring_decision pattern + let recurring: Vec<_> = patterns + .iter() + .filter(|p| { + matches!(p.pattern_type, PatternType::RecurringDecision) + && p.key == "rewards.daily_limit" + }) + .collect(); + assert!( + !recurring.is_empty(), + "should detect recurring decision pattern" + ); + assert_eq!(recurring[0].occurrences, 5); + assert!(recurring[0].description.contains("rewards.daily_limit")); + + drop(store); + let _ = std::fs::remove_dir_all(&dir); + } + + #[test] + fn test_detect_village_patterns_chief_repeated() { + let (dir, store) = tmp_db(); + + // Insert 3 decisions by "safety_chief" on the same key + let mut prev_hash: Option = None; + for i in 0..3 { + let dp = make_dp( + "economy.reward_cap", + &format!("{}", 50 + i), + "safety_chief", + "v2", + ); + let event = + edda_core::event::new_decision_event("main", prev_hash.as_deref(), "system", &dp) + .unwrap(); + prev_hash = Some(event.hash.clone()); + store.append_event(&event).unwrap(); + } + + let patterns = store + .detect_village_patterns("v2", "2020-01-01", 3) + .unwrap(); + + let chief: Vec<_> = patterns + .iter() + .filter(|p| { + matches!(p.pattern_type, PatternType::ChiefRepeatedAction) + && p.authority.as_deref() == Some("safety_chief") + }) + .collect(); + assert!( + !chief.is_empty(), + "should detect chief repeated action pattern" + ); + assert_eq!(chief[0].occurrences, 3); + assert!(chief[0].description.contains("safety_chief")); + + drop(store); + let _ = std::fs::remove_dir_all(&dir); + } + + #[test] + fn test_detect_village_patterns_rollback() { + let (dir, store) = tmp_db(); + + // Create a supersession chain: d1 -> d2 supersedes d1 -> d3 supersedes d2 + let dp1 = make_dp("activity.bonus", "100", "event_chief", "v3"); + let e1 = edda_core::event::new_decision_event("main", None, "system", &dp1).unwrap(); + store.append_event(&e1).unwrap(); + + let dp2 = make_dp("activity.bonus", "50", "safety_chief", "v3"); + let e2 = + edda_core::event::new_decision_event("main", Some(&e1.hash), "system", &dp2).unwrap(); + // Manually set supersedes_id via direct SQL update + store.append_event(&e2).unwrap(); + store + .conn + .execute( + "UPDATE decisions SET supersedes_id = ?1 WHERE event_id = ?2", + params![e1.event_id, e2.event_id], + ) + .unwrap(); + + let dp3 = make_dp("activity.bonus", "30", "safety_chief", "v3"); + let e3 = + edda_core::event::new_decision_event("main", Some(&e2.hash), "system", &dp3).unwrap(); + store.append_event(&e3).unwrap(); + store + .conn + .execute( + "UPDATE decisions SET supersedes_id = ?1 WHERE event_id = ?2", + params![e2.event_id, e3.event_id], + ) + .unwrap(); + + let patterns = store + .detect_village_patterns("v3", "2020-01-01", 3) + .unwrap(); + + let rollback: Vec<_> = patterns + .iter() + .filter(|p| { + matches!(p.pattern_type, PatternType::RollbackTrend) && p.key == "activity.bonus" + }) + .collect(); + assert!(!rollback.is_empty(), "should detect rollback trend pattern"); + assert_eq!(rollback[0].occurrences, 2); + assert!(rollback[0].trending_up.is_some()); + + drop(store); + let _ = std::fs::remove_dir_all(&dir); + } + + #[test] + fn test_detect_village_patterns_below_threshold() { + let (dir, store) = tmp_db(); + + // Only 2 decisions — threshold is 3, should not be detected + let mut prev_hash: Option = None; + for i in 0..2 { + let dp = make_dp("db.pool_size", &format!("{}", 10 + i), "human", "v4"); + let event = + edda_core::event::new_decision_event("main", prev_hash.as_deref(), "system", &dp) + .unwrap(); + prev_hash = Some(event.hash.clone()); + store.append_event(&event).unwrap(); + } + + let patterns = store + .detect_village_patterns("v4", "2020-01-01", 3) + .unwrap(); + + let recurring: Vec<_> = patterns + .iter() + .filter(|p| { + matches!(p.pattern_type, PatternType::RecurringDecision) && p.key == "db.pool_size" + }) + .collect(); + assert!( + recurring.is_empty(), + "2 decisions should not reach threshold of 3" + ); + + drop(store); + let _ = std::fs::remove_dir_all(&dir); + } + + #[test] + fn test_detect_village_patterns_empty_village() { + let (dir, store) = tmp_db(); + + let patterns = store + .detect_village_patterns("nonexistent", "2020-01-01", 3) + .unwrap(); + assert!( + patterns.is_empty(), + "non-existent village should return empty patterns" + ); + + drop(store); + let _ = std::fs::remove_dir_all(&dir); + } } diff --git a/crates/edda-serve/src/lib.rs b/crates/edda-serve/src/lib.rs index 4635de6..5ff0fb2 100644 --- a/crates/edda-serve/src/lib.rs +++ b/crates/edda-serve/src/lib.rs @@ -205,6 +205,7 @@ pub async fn serve(repo_root: &Path, config: ServeConfig) -> anyhow::Result<()> .route("/api/snapshots", get(get_snapshots)) .route("/api/snapshots/{context_hash}", get(get_snapshots_by_hash)) .route("/api/villages/{village_id}/stats", get(get_village_stats)) + .route("/api/patterns", get(get_patterns)) .route("/api/pair/new", post(create_pairing)) .route("/api/pair/list", get(list_paired_devices)) .route("/api/pair/revoke", post(revoke_device)) @@ -317,6 +318,7 @@ fn router(repo_root: &Path) -> Router { .route("/api/snapshots", get(get_snapshots)) .route("/api/snapshots/{context_hash}", get(get_snapshots_by_hash)) .route("/api/villages/{village_id}/stats", get(get_village_stats)) + .route("/api/patterns", get(get_patterns)) .route("/pair", get(complete_pairing)) .route("/api/pair/new", post(create_pairing)) .route("/api/pair/list", get(list_paired_devices)) @@ -2375,6 +2377,51 @@ async fn get_village_stats( Ok(Json(stats)) } +// ── GET /api/patterns ── + +#[derive(Deserialize)] +struct PatternsQuery { + village_id: Option, + /// Number of days to look back (default 7, max 90). + #[serde(default)] + lookback_days: Option, + /// Minimum occurrences to qualify as a pattern (default 3). + #[serde(default)] + min_occurrences: Option, +} + +async fn get_patterns( + State(state): State>, + Query(params): Query, +) -> Result, AppError> { + let village_id = params + .village_id + .as_deref() + .filter(|s| !s.is_empty()) + .ok_or_else(|| AppError::Validation("village_id query parameter is required".into()))?; + + let lookback_days = params.lookback_days.unwrap_or(7).min(90); + let min_occurrences = params.min_occurrences.unwrap_or(3).max(2); + + let now = time::OffsetDateTime::now_utc(); + let after_date = now - time::Duration::days(i64::from(lookback_days)); + let after_str = after_date + .format(&time::format_description::well_known::Rfc3339) + .unwrap_or_default(); + + let ledger = state.open_ledger()?; + let patterns = ledger.detect_village_patterns(village_id, &after_str, min_occurrences)?; + let total = patterns.len(); + + Ok(Json(edda_ledger::sqlite_store::PatternDetectionResult { + village_id: village_id.to_string(), + lookback_days, + after: after_str, + total_patterns: total, + patterns, + })) +} + /// Reconstruct a full snapshot JSON from a materialized view row + event payload. fn reconstruct_snapshot( ledger: &Ledger, @@ -7851,4 +7898,86 @@ actors: let ops = json["slowest_operations"].as_array().unwrap(); assert_eq!(ops.len(), 2); } + + // ── Pattern Detection Endpoint Tests ── + + #[tokio::test] + async fn get_patterns_returns_recurring() { + let tmp = tempfile::tempdir().unwrap(); + setup_workspace(tmp.path()); + + // Seed 4 decisions with same key in village "v-test" + let ledger = Ledger::open(tmp.path()).unwrap(); + let mut prev_hash = ledger.last_event_hash().unwrap(); + for i in 0..4 { + let dp = edda_core::types::DecisionPayload { + key: "rewards.cap".to_string(), + value: format!("{}", 100 + i), + reason: Some("adjusting".to_string()), + scope: None, + authority: Some("event_chief".to_string()), + affected_paths: None, + tags: None, + review_after: None, + reversibility: None, + village_id: Some("v-test".to_string()), + }; + let event = + edda_core::event::new_decision_event("main", prev_hash.as_deref(), "system", &dp) + .unwrap(); + prev_hash = Some(event.hash.clone()); + ledger.append_event(&event).unwrap(); + } + drop(ledger); + + let app = router(tmp.path()); + let resp = app + .oneshot( + Request::builder() + .uri("/api/patterns?village_id=v-test&lookback_days=30&min_occurrences=3") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(resp.status(), StatusCode::OK); + let body = axum::body::to_bytes(resp.into_body(), usize::MAX) + .await + .unwrap(); + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + + assert_eq!(json["village_id"].as_str().unwrap(), "v-test"); + assert_eq!(json["lookback_days"].as_u64().unwrap(), 30); + assert!(json["after"].as_str().is_some()); + assert!(json["total_patterns"].as_u64().unwrap() >= 1); + + let patterns = json["patterns"].as_array().unwrap(); + let recurring: Vec<_> = patterns + .iter() + .filter(|p| p["pattern_type"].as_str() == Some("recurring_decision")) + .collect(); + assert!(!recurring.is_empty()); + assert_eq!(recurring[0]["key"].as_str().unwrap(), "rewards.cap"); + assert_eq!(recurring[0]["occurrences"].as_u64().unwrap(), 4); + } + + #[tokio::test] + async fn get_patterns_missing_village_id_returns_400() { + let tmp = tempfile::tempdir().unwrap(); + setup_workspace(tmp.path()); + + let app = router(tmp.path()); + let resp = app + .oneshot( + Request::builder() + .uri("/api/patterns") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + } } From 768df4bd20cd966f017d4abacb2db0c4e098d1b2 Mon Sep 17 00:00:00 2001 From: fagemx Date: Fri, 27 Mar 2026 12:19:48 +0800 Subject: [PATCH 2/2] fix(edda-ledger): clean up detect_trend_direction and re-export pattern types - Remove dead code (unused after_date/last_date and unreachable else branch) - Return false for duplicate dates (burst != trend) - Re-export DetectedPattern, PatternType, PatternDetectionResult from lib.rs Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/edda-ledger/src/lib.rs | 4 ++-- crates/edda-ledger/src/sqlite_store.rs | 29 ++++++++++++-------------- 2 files changed, 15 insertions(+), 18 deletions(-) diff --git a/crates/edda-ledger/src/lib.rs b/crates/edda-ledger/src/lib.rs index 65d3661..088299a 100644 --- a/crates/edda-ledger/src/lib.rs +++ b/crates/edda-ledger/src/lib.rs @@ -19,8 +19,8 @@ pub use ledger::Ledger; pub use lock::WorkspaceLock; pub use paths::EddaPaths; pub use sqlite_store::{ - BundleRow, ChainEntry, DecideSnapshotRow, DecisionRow, DepRow, DeviceTokenRow, ImportParams, - SuggestionRow, TaskBriefRow, + BundleRow, ChainEntry, DecideSnapshotRow, DecisionRow, DepRow, DetectedPattern, DeviceTokenRow, + ImportParams, PatternDetectionResult, PatternType, SuggestionRow, TaskBriefRow, }; pub use tombstone::{append_tombstone, list_tombstones, make_tombstone, DeleteReason, Tombstone}; pub use view::DecisionView; diff --git a/crates/edda-ledger/src/sqlite_store.rs b/crates/edda-ledger/src/sqlite_store.rs index 8b3186c..c139c60 100644 --- a/crates/edda-ledger/src/sqlite_store.rs +++ b/crates/edda-ledger/src/sqlite_store.rs @@ -3576,30 +3576,27 @@ struct EventRow { /// Detect if rollback frequency is trending upward by comparing first half vs second half. /// -/// Splits the dates at the window midpoint. Returns `true` if the second half -/// has more occurrences than the first half. -fn detect_trend_direction(dates: &[String], after: &str) -> bool { +/// Sorts dates and splits at the midpoint index. Returns `true` only if the +/// second half has strictly more occurrences than the first half AND dates are +/// not all identical (a burst on one day is not a trend). +fn detect_trend_direction(dates: &[String], _after: &str) -> bool { if dates.len() < 2 { return false; } - // Use the `after` date and the last date to compute midpoint - let after_date = after.split('T').next().unwrap_or(after); - let last_date = dates.last().map(|s| s.as_str()).unwrap_or(after_date); - // Simple midpoint: sort dates, split in half let mut sorted: Vec<&str> = dates.iter().map(|s| s.as_str()).collect(); sorted.sort(); - let mid = sorted.len() / 2; - // If we have a midpoint date, use it; otherwise just compare halves - if let Some(&mid_date) = sorted.get(mid) { - let first_half = sorted.iter().filter(|&&d| d < mid_date).count(); - let second_half = sorted.iter().filter(|&&d| d >= mid_date).count(); - second_half > first_half - } else { - let _ = (after_date, last_date); - false + // All dates identical means a burst, not a trend + if sorted.first() == sorted.last() { + return false; } + + let mid = sorted.len() / 2; + let mid_date = sorted[mid]; // safe: len >= 2 guarantees mid is valid + let first_half = sorted.iter().filter(|&&d| d < mid_date).count(); + let second_half = sorted.iter().filter(|&&d| d >= mid_date).count(); + second_half > first_half } fn row_to_event(row: EventRow) -> anyhow::Result {