From fb1268b19e5a537a6ff8fb273df36af595d8ce96 Mon Sep 17 00:00:00 2001 From: moon <152454724+pabloDarkmoon24@users.noreply.github.com> Date: Mon, 23 Mar 2026 17:06:42 -0500 Subject: [PATCH] fix: solution for issue #72 --- fix_issue_72.py | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 fix_issue_72.py diff --git a/fix_issue_72.py b/fix_issue_72.py new file mode 100644 index 00000000..47a79aaf --- /dev/null +++ b/fix_issue_72.py @@ -0,0 +1,3 @@ +```json +{ + "solution_code": "### FILE: app/services/anomaly_detection.py\n\n```python\n\"\"\"\nAnomaly Detection Service for FinMind\nDetects unusual spending spikes by category or merchant using statistical baselines.\n\"\"\"\n\nimport logging\nfrom datetime import datetime, timedelta\nfrom typing import Any\n\nimport numpy as np\nfrom sqlalchemy import text\n\nlogger = logging.getLogger(__name__)\n\n\nclass AnomalyDetectionService:\n \"\"\"\n Detects spending anomalies by comparing recent transactions\n against a statistical baseline (mean ± k*std).\n\n Baseline window : configurable (default 90 days)\n Detection window : configurable (default 30 days)\n Sensitivity : z-score threshold (default 2.0 → ~95th percentile)\n \"\"\"\n\n DEFAULT_BASELINE_DAYS = 90\n DEFAULT_DETECTION_DAYS = 30\n DEFAULT_Z_THRESHOLD = 2.0\n MIN_DATA_POINTS = 3 # need at least 3 periods to compute std\n\n def __init__(self, db, redis_client=None):\n self.db = db\n self.redis = redis_client\n self.cache_ttl = 3600 # 1 hour\n\n # ------------------------------------------------------------------\n # Public API\n # ------------------------------------------------------------------\n\n def detect_anomalies(\n self,\n user_id: int,\n baseline_days: int = DEFAULT_BASELINE_DAYS,\n detection_days: int = DEFAULT_DETECTION_DAYS,\n z_threshold: float = DEFAULT_Z_THRESHOLD,\n group_by: str = \"category\", # \"category\" | \"merchant\"\n ) -> dict[str, Any]:\n \"\"\"\n Main entry point.\n Returns a dict with flagged anomalies and their explanation metadata.\n \"\"\"\n cache_key = (\n f\"anomalies:{user_id}:{baseline_days}:{detection_days}\"\n f\":{z_threshold}:{group_by}\"\n )\n cached = self._cache_get(cache_key)\n if cached is not None:\n return cached\n\n now = datetime.utcnow()\n detection_start = now - timedelta(days=detection_days)\n baseline_start = now - timedelta(days=baseline_days + detection_days)\n baseline_end = detection_start\n\n if group_by == \"merchant\":\n baseline_data = self._fetch_merchant_spending(\n user_id, baseline_start, baseline_end\n )\n detection_data = self._fetch_merchant_spending(\n user_id, detection_start, now\n )\n else:\n baseline_data = self._fetch_category_spending(\n user_id, baseline_start, baseline_end\n )\n detection_data = self._fetch_category_spending(\n user_id, detection_start, now\n )\n\n baselines = self._compute_baselines(baseline_data, group_by)\n anomalies = self._flag_anomalies(\n detection_data, baselines, z_threshold, group_by,\n detection_start, now\n )\n\n result = {\n \"user_id\": user_id,\n \"generated_at\": now.isoformat(),\n \"parameters\": {\n \"baseline_days\": baseline_days,\n \"detection_days\": detection_days,\n \"z_threshold\": z_threshold,\n \"group_by\": group_by,\n },\n \"anomalies\": anomalies,\n \"summary\": {\n \"total_flagged\": len(anomalies),\n \"groups_analyzed\": len(baselines),\n },\n }\n\n self._cache_set(cache_key, result)\n return result\n\n # ------------------------------------------------------------------\n # Data fetching\n # ------------------------------------------------------------------\n\n def _fetch_category_spending(\n self, user_id: int, start: datetime, end: datetime\n ) -> list[dict]:\n \"\"\"\n Aggregate spending per category per *week* inside the window.\n Weekly bucketing gives a stable baseline without being too noisy.\n \"\"\"\n sql = text(\n \"\"\"\n SELECT\n c.id AS group_id,\n c.name AS group_name,\n DATE_TRUNC('week', e.date)::date AS period,\n SUM(e.amount) AS total\n FROM expenses e\n JOIN categories c ON c.id = e.category_id\n WHERE e.user_id = :user_id\n AND e.date >= :start\n AND e.date < :end\n GROUP BY c.id, c.name, DATE_TRUNC('week', e.date)\n ORDER BY period\n \"\"\"\n )\n rows = self.db.session.execute(\n sql, {\"user_id\": user_id, \"start\": start, \"end\": end}\n ).mappings().all()\n return [dict(r) for r in rows]\n\n def _fetch_merchant_spending(\n self, user_id: int, start: datetime, end: datetime\n ) -> list[dict]:\n \"\"\"\n Aggregate spending per merchant (description) per week.\n \"\"\"\n sql = text(\n \"\"\"\n SELECT\n LOWER(TRIM(e.description)) AS group_id,\n e.description AS group_name,\n DATE_TRUNC('week', e.date)::date AS period,\n SUM(e.amount) AS total\n FROM expenses e\n WHERE e.user_id = :user_id\n AND e.date >= :start\n AND e.date < :end\n AND e.description IS NOT NULL\n AND e.description <> ''\n GROUP BY LOWER(TRIM(e.description)), e.description,\n DATE_TRUNC('week', e.date)\n ORDER BY period\n \"\"\"\n )\n rows = self.db.session.execute(\n sql, {\"user_id\": user_id, \"start\": start, \"end\": end}\n ).mappings().all()\n return [dict(r) for r in rows]\n\n # ------------------------------------------------------------------\n # Statistical helpers\n # ------------------------------------------------------------------\n\n def _compute_baselines(self, rows: list[dict], group_by: str) -> dict:\n \"\"\"\n Build a baseline dict keyed by group_id.\n Each entry contains: mean, std, data_points, periods.\n \"\"\"\n grouped: dict[Any, list[float]] = {}\n meta: dict[Any, str] = {}\n\n for row in rows:\n gid = row[\"group_id\"]\n if gid not in grouped:\n grouped[gid] = []\n meta[gid] = row[\"group_name\"]\n grouped[gid].append(float(row[\"total\"]))\n\n baselines = {}\n for gid, amounts in grouped.items():\n arr = np.array(amounts)\n mean = float(np.mean(arr))\n std = float(np.std(arr, ddof=1)) if len(arr) > 1 else 0.0\n baselines[gid] = {\n \"group_name\": meta[gid],\n \"mean\": round(mean, 2),\n \"std\": round(std, 2),\n \"data_points\": len(arr),\n \"sufficient_data\": len(arr) >= self.MIN_DATA_POINTS,\n }\n return baselines\n\n def _flag_anomalies(\n self,\n detection_rows: list[dict],\n baselines: dict,\n z_threshold: float,\n group_by: str,\n detection_start: datetime,\n detection_end: datetime,\n ) -> list[dict]:\n \"\"\"\n Compare each detection-period bucket against its baseline.\n Returns list of anomaly dicts with full explanation metadata.\n \"\"\"\n # Collapse detection rows into {group_id: total}\n detection_totals: dict[Any, dict] = {}\n for row in detection_rows:\n gid = row[\"group_id\"]\n if gid not in detection_totals:\n detection_totals[gid] = {\n \"group_name\": row[\"group_name\"],\n \"total\": 0.0,\n }\n detection_totals[gid][\"total\"] += float(row[\"total\"])\n\n # Normalize detection total to weekly equivalent for fair comparison\n detection_weeks = max(\n (detection_end - detection_start).days / 7.0, 1.0\n )\n\n anomalies = []\n for gid, det in detection_totals.items():\n weekly_det = det[\"total\"] / detection_weeks\n\n if gid not in baselines:\n # New group — no baseline to compare\n anomalies.append(\n self._build_anomaly(\n gid=gid,\n group_name=det[\"group_name\"],\n group_by=group_by,\n detected_weekly=round(weekly_det, 2),\n detected_total=round(det[\"total\"], 2),\n baseline=None,\n z_score=None,\n reason=\"no_baseline\",\n severity=\"info\",\n detection_start=detection_start,\n detection_end=detection_end,\n )\n )\n continue\n\n bl = baselines[gid]\n if not bl[\"sufficient_data\"]:\n continue # skip groups with too few data points\n\n mean = bl[\"mean\"]\n std = bl[\"std\"]\n\n if std == 0:\n # No variance in baseline — any deviation is unusual\n if weekly_det > mean * 1.5:\n z_score = float(\"inf\")\n severity = \"high\"\n else:\n continue\n else:\n z_score = (weekly_det - mean) / std\n if z_score < z_threshold:\n continue # not anomalous\n severity = self._severity(z_score, z_threshold)\n\n pct_change = (\n ((weekly_det - mean) / mean * 100) if mean > 0 else None\n )\n\n anomalies.append(\n self._build_anomaly(\n gid=gid,\n group_name=det[\"group_name\"],\n group_by=group_by,\n detected_weekly=round(weekly_det, 2),\n detected_total=round(det[\"total\"], 2),\n baseline=bl,\n z_score=round(z_score, 3) if z_score != float(\"inf\") else None,\n reason=\"spending_spike\",\n severity=severity,\n detection_start=detection_start,\n detection_end=detection \ No newline at end of file