From 4f3beb0f42c4e80727cfa25615791d57e0c877a0 Mon Sep 17 00:00:00 2001 From: moon <152454724+pabloDarkmoon24@users.noreply.github.com> Date: Mon, 23 Mar 2026 17:08:23 -0500 Subject: [PATCH] fix: solution for issue #71 --- fix_issue_71.py | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 fix_issue_71.py diff --git a/fix_issue_71.py b/fix_issue_71.py new file mode 100644 index 00000000..bc897fbc --- /dev/null +++ b/fix_issue_71.py @@ -0,0 +1,3 @@ +```json +{ + "solution_code": "### File: app/workers/__init__.py\n```python\n# app/workers/__init__.py\n```\n\n### File: app/workers/celery_app.py\n```python\nimport os\nfrom celery import Celery\nfrom celery.utils.log import get_task_logger\nfrom kombu import Queue, Exchange\n\nlogger = get_task_logger(__name__)\n\nREDIS_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379/0')\nDEAD_LETTER_QUEUE = 'dead_letter'\n\ndef make_celery(app=None):\n celery = Celery(\n 'finmind',\n broker=REDIS_URL,\n backend=REDIS_URL,\n include=[\n 'app.workers.tasks.import_tasks',\n 'app.workers.tasks.ai_tasks',\n 'app.workers.tasks.reminder_tasks',\n ]\n )\n\n celery.conf.update(\n task_serializer='json',\n accept_content=['json'],\n result_serializer='json',\n timezone='UTC',\n enable_utc=True,\n task_track_started=True,\n task_acks_late=True,\n worker_prefetch_multiplier=1,\n task_reject_on_worker_lost=True,\n\n # Retry / backoff defaults\n task_max_retries=3,\n task_default_retry_delay=60, # seconds\n\n # Dead-letter queue routing\n task_queues=(\n Queue('default', Exchange('default'), routing_key='default'),\n Queue('imports', Exchange('imports'), routing_key='imports'),\n Queue('ai', Exchange('ai'), routing_key='ai'),\n Queue('reminders', Exchange('reminders'), routing_key='reminders'),\n Queue(DEAD_LETTER_QUEUE, Exchange(DEAD_LETTER_QUEUE), routing_key=DEAD_LETTER_QUEUE),\n ),\n task_default_queue='default',\n task_routes={\n 'app.workers.tasks.import_tasks.*': {'queue': 'imports'},\n 'app.workers.tasks.ai_tasks.*': {'queue': 'ai'},\n 'app.workers.tasks.reminder_tasks.*': {'queue': 'reminders'},\n },\n\n # Result expiry\n result_expires=3600,\n\n # Beat schedule (replaces APScheduler for recurring jobs)\n beat_schedule={\n 'run-due-reminders-every-minute': {\n 'task': 'app.workers.tasks.reminder_tasks.dispatch_due_reminders',\n 'schedule': 60.0,\n },\n },\n )\n\n if app is not None:\n celery.conf.update(app.config)\n\n class ContextTask(celery.Task):\n def __call__(self, *args, **kwargs):\n with app.app_context():\n return self.run(*args, **kwargs)\n\n celery.Task = ContextTask\n\n return celery\n\n\ncelery_app = make_celery()\n```\n\n### File: app/workers/tasks/__init__.py\n```python\n# app/workers/tasks/__init__.py\n```\n\n### File: app/workers/tasks/base.py\n```python\nimport traceback\nfrom celery import Task\nfrom celery.utils.log import get_task_logger\nfrom app.workers.celery_app import celery_app, DEAD_LETTER_QUEUE\nfrom app.db.job_status import update_job_status, JobStatus\n\nlogger = get_task_logger(__name__)\n\n\nclass BaseTaskWithRetry(Task):\n \"\"\"\n Base Celery task that:\n - Updates job_status rows in PostgreSQL\n - Retries with exponential back-off on failure\n - Sends permanently-failed tasks to the dead-letter queue\n \"\"\"\n abstract = True\n max_retries = 3\n default_retry_delay = 30 # base seconds; multiplied below\n\n def on_success(self, retval, task_id, args, kwargs):\n update_job_status(task_id, JobStatus.SUCCESS, result=str(retval))\n\n def on_failure(self, exc, task_id, args, kwargs, einfo):\n logger.error(\n 'Task %s[%s] failed permanently: %s',\n self.name, task_id, exc,\n exc_info=True,\n )\n update_job_status(\n task_id,\n JobStatus.DEAD_LETTER,\n error=traceback.format_exc(),\n )\n # Re-publish to dead-letter queue so ops can inspect / replay\n celery_app.send_task(\n 'app.workers.tasks.dead_letter.handle_dead_letter',\n args=[self.name, task_id, args, kwargs, str(exc)],\n queue=DEAD_LETTER_QUEUE,\n )\n\n def on_retry(self, exc, task_id, args, kwargs, einfo):\n update_job_status(\n task_id,\n JobStatus.RETRYING,\n error=str(exc),\n )\n\n def apply_async(self, args=None, kwargs=None, **options):\n result = super().apply_async(args=args, kwargs=kwargs, **options)\n update_job_status(result.id, JobStatus.QUEUED)\n return result\n\n def retry_with_backoff(self, exc, countdown_base=30):\n \"\"\"Exponential back-off helper: 30s, 60s, 120s …\"\"\"\n retry_number = self.request.retries + 1\n countdown = countdown_base * (2 ** (retry_number - 1))\n raise self.retry(exc=exc, countdown=countdown)\n```\n\n### File: app/workers/tasks/dead_letter.py\n```python\nfrom celery.utils.log import get_task_logger\nfrom app.workers.celery_app import celery_app\n\nlogger = get_task_logger(__name__)\n\n\n@celery_app.task(name='app.workers.tasks.dead_letter.handle_dead_letter', queue='dead_letter')\ndef handle_dead_letter(original_task_name, task_id, args, kwargs, error_msg):\n \"\"\"Persist dead-letter info and alert ops.\"\"\"\n logger.critical(\n 'DEAD LETTER | task=%s id=%s error=%s args=%s kwargs=%s',\n original_task_name, task_id, error_msg, args, kwargs,\n )\n # TODO: integrate PagerDuty / Slack webhook here\n return {\n 'task': original_task_name,\n 'task_id': task_id,\n 'error': error_msg,\n }\n```\n\n### File: app/workers/tasks/import_tasks.py\n```python\nimport csv\nimport io\nfrom celery.utils.log import get_task_logger\nfrom app.workers.celery_app import celery_app\nfrom app.workers.tasks.base import BaseTaskWithRetry\nfrom app.db.job_status import update_job_status, JobStatus\n\nlogger = get_task_logger(__name__)\n\n\n@celery_app.task(\n bind=True,\n base=BaseTaskWithRetry,\n name='app.workers.tasks.import_tasks.import_expenses_csv',\n max_retries=3,\n)\ndef import_expenses_csv(self, user_id: int, csv_content: str):\n \"\"\"\n Parse and persist CSV expense rows for a user.\n csv_content — raw CSV string (decoded before enqueuing).\n \"\"\"\n update_job_status(self.request.id, JobStatus.RUNNING)\n try:\n from app.db.models import db, Expense # lazy import to avoid circular deps\n reader = csv.DictReader(io.StringIO(csv_content))\n rows_added = 0\n for row in reader:\n expense = Expense(\n user_id=user_id,\n amount=float(row['amount']),\n description=row.get('description', ''),\n category_id=int(row['category_id']) if row.get('category_id') else None,\n date=row.get('date'),\n )\n db.session.add(expense)\n rows_added += 1\n db.session.commit()\n logger.info('Imported %d expenses for user %d', rows_added, user_id)\n return {'rows_added': rows_added}\n except Exception as exc:\n logger.warning('import_expenses_csv failed (attempt %d): %s', self.request.retries + 1, exc)\n self.retry_with_backoff(exc)\n```\n\n### File: app/workers/tasks/ai_tasks.py\n```python\nfrom celery.utils.log import get_task_logger\nfrom app.workers.celery_app import celery_app\nfrom app.workers.tasks.base import BaseTaskWithRetry\nfrom app.db.job_status import update_job_status, JobStatus\n\nlogger = get_task_logger(__name__)\n\n\n@celery_app.task(\n bind=True,\n base=BaseTaskWithRetry,\n name='app.workers.tasks.ai_tasks.generate_insights',\n max_retries=3,\n)\ndef generate_insights(self, user_id: int, year_month: str):\n \"\"\"\n Generate AI budget insights for *user_id* for *year_month* (YYYY-MM).\n Result is cached in Redis under insights:{user_id}.\n \"\"\"\n update_job_status(self.request.id, JobStatus.RUNNING)\n try:\n from app.services.insights_service import build_insights # lazy\n import redis, os, json\n result = build_insights(user_id, year_month)\n r = redis.from_url(os.environ.get('REDIS_URL', 'redis://localhost:6379/0'))\n r.setex(f'insights:{user_id}', 86400, json.dumps(result))\n logger.info('Insights generated for user %d (%s)', user_id, year_month)\n return result\n except Exception as exc:\n logger.warning('generate_insights failed (attempt %d): %s', self.request.retries + 1, exc)\n self.retry_with_backoff(exc)\n\n\n@celery_app.task(\n bind=True,\n base=BaseTaskWithRetry,\n name='app.workers.tasks.ai_tasks.generate_budget_suggestion',\n max_retries=3,\n)\ndef generate_budget_suggestion(self, user_id: int):\n \"\"\"Generate budget suggestion and cache it.\"\"\"\n update_job_status(self.request.id, JobStatus.RUNNING)\n try:\n from app.services. \ No newline at end of file