Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions fix_issue_72.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```json
{
"solution_code": "### FILE: app/services/anomaly_detection.py\n\n```python\n\"\"\"\nAnomaly Detection Service for FinMind\nDetects unusual spending spikes by category or merchant using statistical baselines.\n\"\"\"\n\nimport logging\nfrom datetime import datetime, timedelta\nfrom typing import Any\n\nimport numpy as np\nfrom sqlalchemy import text\n\nlogger = logging.getLogger(__name__)\n\n\nclass AnomalyDetectionService:\n \"\"\"\n Detects spending anomalies by comparing recent transactions\n against a statistical baseline (mean ± k*std).\n\n Baseline window : configurable (default 90 days)\n Detection window : configurable (default 30 days)\n Sensitivity : z-score threshold (default 2.0 → ~95th percentile)\n \"\"\"\n\n DEFAULT_BASELINE_DAYS = 90\n DEFAULT_DETECTION_DAYS = 30\n DEFAULT_Z_THRESHOLD = 2.0\n MIN_DATA_POINTS = 3 # need at least 3 periods to compute std\n\n def __init__(self, db, redis_client=None):\n self.db = db\n self.redis = redis_client\n self.cache_ttl = 3600 # 1 hour\n\n # ------------------------------------------------------------------\n # Public API\n # ------------------------------------------------------------------\n\n def detect_anomalies(\n self,\n user_id: int,\n baseline_days: int = DEFAULT_BASELINE_DAYS,\n detection_days: int = DEFAULT_DETECTION_DAYS,\n z_threshold: float = DEFAULT_Z_THRESHOLD,\n group_by: str = \"category\", # \"category\" | \"merchant\"\n ) -> dict[str, Any]:\n \"\"\"\n Main entry point.\n Returns a dict with flagged anomalies and their explanation metadata.\n \"\"\"\n cache_key = (\n f\"anomalies:{user_id}:{baseline_days}:{detection_days}\"\n f\":{z_threshold}:{group_by}\"\n )\n cached = self._cache_get(cache_key)\n if cached is not None:\n return cached\n\n now = datetime.utcnow()\n detection_start = now - timedelta(days=detection_days)\n baseline_start = now - timedelta(days=baseline_days + detection_days)\n baseline_end = detection_start\n\n if group_by == \"merchant\":\n baseline_data = self._fetch_merchant_spending(\n user_id, baseline_start, baseline_end\n )\n detection_data = self._fetch_merchant_spending(\n user_id, detection_start, now\n )\n else:\n baseline_data = self._fetch_category_spending(\n user_id, baseline_start, baseline_end\n )\n detection_data = self._fetch_category_spending(\n user_id, detection_start, now\n )\n\n baselines = self._compute_baselines(baseline_data, group_by)\n anomalies = self._flag_anomalies(\n detection_data, baselines, z_threshold, group_by,\n detection_start, now\n )\n\n result = {\n \"user_id\": user_id,\n \"generated_at\": now.isoformat(),\n \"parameters\": {\n \"baseline_days\": baseline_days,\n \"detection_days\": detection_days,\n \"z_threshold\": z_threshold,\n \"group_by\": group_by,\n },\n \"anomalies\": anomalies,\n \"summary\": {\n \"total_flagged\": len(anomalies),\n \"groups_analyzed\": len(baselines),\n },\n }\n\n self._cache_set(cache_key, result)\n return result\n\n # ------------------------------------------------------------------\n # Data fetching\n # ------------------------------------------------------------------\n\n def _fetch_category_spending(\n self, user_id: int, start: datetime, end: datetime\n ) -> list[dict]:\n \"\"\"\n Aggregate spending per category per *week* inside the window.\n Weekly bucketing gives a stable baseline without being too noisy.\n \"\"\"\n sql = text(\n \"\"\"\n SELECT\n c.id AS group_id,\n c.name AS group_name,\n DATE_TRUNC('week', e.date)::date AS period,\n SUM(e.amount) AS total\n FROM expenses e\n JOIN categories c ON c.id = e.category_id\n WHERE e.user_id = :user_id\n AND e.date >= :start\n AND e.date < :end\n GROUP BY c.id, c.name, DATE_TRUNC('week', e.date)\n ORDER BY period\n \"\"\"\n )\n rows = self.db.session.execute(\n sql, {\"user_id\": user_id, \"start\": start, \"end\": end}\n ).mappings().all()\n return [dict(r) for r in rows]\n\n def _fetch_merchant_spending(\n self, user_id: int, start: datetime, end: datetime\n ) -> list[dict]:\n \"\"\"\n Aggregate spending per merchant (description) per week.\n \"\"\"\n sql = text(\n \"\"\"\n SELECT\n LOWER(TRIM(e.description)) AS group_id,\n e.description AS group_name,\n DATE_TRUNC('week', e.date)::date AS period,\n SUM(e.amount) AS total\n FROM expenses e\n WHERE e.user_id = :user_id\n AND e.date >= :start\n AND e.date < :end\n AND e.description IS NOT NULL\n AND e.description <> ''\n GROUP BY LOWER(TRIM(e.description)), e.description,\n DATE_TRUNC('week', e.date)\n ORDER BY period\n \"\"\"\n )\n rows = self.db.session.execute(\n sql, {\"user_id\": user_id, \"start\": start, \"end\": end}\n ).mappings().all()\n return [dict(r) for r in rows]\n\n # ------------------------------------------------------------------\n # Statistical helpers\n # ------------------------------------------------------------------\n\n def _compute_baselines(self, rows: list[dict], group_by: str) -> dict:\n \"\"\"\n Build a baseline dict keyed by group_id.\n Each entry contains: mean, std, data_points, periods.\n \"\"\"\n grouped: dict[Any, list[float]] = {}\n meta: dict[Any, str] = {}\n\n for row in rows:\n gid = row[\"group_id\"]\n if gid not in grouped:\n grouped[gid] = []\n meta[gid] = row[\"group_name\"]\n grouped[gid].append(float(row[\"total\"]))\n\n baselines = {}\n for gid, amounts in grouped.items():\n arr = np.array(amounts)\n mean = float(np.mean(arr))\n std = float(np.std(arr, ddof=1)) if len(arr) > 1 else 0.0\n baselines[gid] = {\n \"group_name\": meta[gid],\n \"mean\": round(mean, 2),\n \"std\": round(std, 2),\n \"data_points\": len(arr),\n \"sufficient_data\": len(arr) >= self.MIN_DATA_POINTS,\n }\n return baselines\n\n def _flag_anomalies(\n self,\n detection_rows: list[dict],\n baselines: dict,\n z_threshold: float,\n group_by: str,\n detection_start: datetime,\n detection_end: datetime,\n ) -> list[dict]:\n \"\"\"\n Compare each detection-period bucket against its baseline.\n Returns list of anomaly dicts with full explanation metadata.\n \"\"\"\n # Collapse detection rows into {group_id: total}\n detection_totals: dict[Any, dict] = {}\n for row in detection_rows:\n gid = row[\"group_id\"]\n if gid not in detection_totals:\n detection_totals[gid] = {\n \"group_name\": row[\"group_name\"],\n \"total\": 0.0,\n }\n detection_totals[gid][\"total\"] += float(row[\"total\"])\n\n # Normalize detection total to weekly equivalent for fair comparison\n detection_weeks = max(\n (detection_end - detection_start).days / 7.0, 1.0\n )\n\n anomalies = []\n for gid, det in detection_totals.items():\n weekly_det = det[\"total\"] / detection_weeks\n\n if gid not in baselines:\n # New group — no baseline to compare\n anomalies.append(\n self._build_anomaly(\n gid=gid,\n group_name=det[\"group_name\"],\n group_by=group_by,\n detected_weekly=round(weekly_det, 2),\n detected_total=round(det[\"total\"], 2),\n baseline=None,\n z_score=None,\n reason=\"no_baseline\",\n severity=\"info\",\n detection_start=detection_start,\n detection_end=detection_end,\n )\n )\n continue\n\n bl = baselines[gid]\n if not bl[\"sufficient_data\"]:\n continue # skip groups with too few data points\n\n mean = bl[\"mean\"]\n std = bl[\"std\"]\n\n if std == 0:\n # No variance in baseline — any deviation is unusual\n if weekly_det > mean * 1.5:\n z_score = float(\"inf\")\n severity = \"high\"\n else:\n continue\n else:\n z_score = (weekly_det - mean) / std\n if z_score < z_threshold:\n continue # not anomalous\n severity = self._severity(z_score, z_threshold)\n\n pct_change = (\n ((weekly_det - mean) / mean * 100) if mean > 0 else None\n )\n\n anomalies.append(\n self._build_anomaly(\n gid=gid,\n group_name=det[\"group_name\"],\n group_by=group_by,\n detected_weekly=round(weekly_det, 2),\n detected_total=round(det[\"total\"], 2),\n baseline=bl,\n z_score=round(z_score, 3) if z_score != float(\"inf\") else None,\n reason=\"spending_spike\",\n severity=severity,\n detection_start=detection_start,\n detection_end=detection