Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions fix_issue_71.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```json
{
"solution_code": "### File: app/workers/__init__.py\n```python\n# app/workers/__init__.py\n```\n\n### File: app/workers/celery_app.py\n```python\nimport os\nfrom celery import Celery\nfrom celery.utils.log import get_task_logger\nfrom kombu import Queue, Exchange\n\nlogger = get_task_logger(__name__)\n\nREDIS_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379/0')\nDEAD_LETTER_QUEUE = 'dead_letter'\n\ndef make_celery(app=None):\n celery = Celery(\n 'finmind',\n broker=REDIS_URL,\n backend=REDIS_URL,\n include=[\n 'app.workers.tasks.import_tasks',\n 'app.workers.tasks.ai_tasks',\n 'app.workers.tasks.reminder_tasks',\n ]\n )\n\n celery.conf.update(\n task_serializer='json',\n accept_content=['json'],\n result_serializer='json',\n timezone='UTC',\n enable_utc=True,\n task_track_started=True,\n task_acks_late=True,\n worker_prefetch_multiplier=1,\n task_reject_on_worker_lost=True,\n\n # Retry / backoff defaults\n task_max_retries=3,\n task_default_retry_delay=60, # seconds\n\n # Dead-letter queue routing\n task_queues=(\n Queue('default', Exchange('default'), routing_key='default'),\n Queue('imports', Exchange('imports'), routing_key='imports'),\n Queue('ai', Exchange('ai'), routing_key='ai'),\n Queue('reminders', Exchange('reminders'), routing_key='reminders'),\n Queue(DEAD_LETTER_QUEUE, Exchange(DEAD_LETTER_QUEUE), routing_key=DEAD_LETTER_QUEUE),\n ),\n task_default_queue='default',\n task_routes={\n 'app.workers.tasks.import_tasks.*': {'queue': 'imports'},\n 'app.workers.tasks.ai_tasks.*': {'queue': 'ai'},\n 'app.workers.tasks.reminder_tasks.*': {'queue': 'reminders'},\n },\n\n # Result expiry\n result_expires=3600,\n\n # Beat schedule (replaces APScheduler for recurring jobs)\n beat_schedule={\n 'run-due-reminders-every-minute': {\n 'task': 'app.workers.tasks.reminder_tasks.dispatch_due_reminders',\n 'schedule': 60.0,\n },\n },\n )\n\n if app is not None:\n celery.conf.update(app.config)\n\n class ContextTask(celery.Task):\n def __call__(self, *args, **kwargs):\n with app.app_context():\n return self.run(*args, **kwargs)\n\n celery.Task = ContextTask\n\n return celery\n\n\ncelery_app = make_celery()\n```\n\n### File: app/workers/tasks/__init__.py\n```python\n# app/workers/tasks/__init__.py\n```\n\n### File: app/workers/tasks/base.py\n```python\nimport traceback\nfrom celery import Task\nfrom celery.utils.log import get_task_logger\nfrom app.workers.celery_app import celery_app, DEAD_LETTER_QUEUE\nfrom app.db.job_status import update_job_status, JobStatus\n\nlogger = get_task_logger(__name__)\n\n\nclass BaseTaskWithRetry(Task):\n \"\"\"\n Base Celery task that:\n - Updates job_status rows in PostgreSQL\n - Retries with exponential back-off on failure\n - Sends permanently-failed tasks to the dead-letter queue\n \"\"\"\n abstract = True\n max_retries = 3\n default_retry_delay = 30 # base seconds; multiplied below\n\n def on_success(self, retval, task_id, args, kwargs):\n update_job_status(task_id, JobStatus.SUCCESS, result=str(retval))\n\n def on_failure(self, exc, task_id, args, kwargs, einfo):\n logger.error(\n 'Task %s[%s] failed permanently: %s',\n self.name, task_id, exc,\n exc_info=True,\n )\n update_job_status(\n task_id,\n JobStatus.DEAD_LETTER,\n error=traceback.format_exc(),\n )\n # Re-publish to dead-letter queue so ops can inspect / replay\n celery_app.send_task(\n 'app.workers.tasks.dead_letter.handle_dead_letter',\n args=[self.name, task_id, args, kwargs, str(exc)],\n queue=DEAD_LETTER_QUEUE,\n )\n\n def on_retry(self, exc, task_id, args, kwargs, einfo):\n update_job_status(\n task_id,\n JobStatus.RETRYING,\n error=str(exc),\n )\n\n def apply_async(self, args=None, kwargs=None, **options):\n result = super().apply_async(args=args, kwargs=kwargs, **options)\n update_job_status(result.id, JobStatus.QUEUED)\n return result\n\n def retry_with_backoff(self, exc, countdown_base=30):\n \"\"\"Exponential back-off helper: 30s, 60s, 120s …\"\"\"\n retry_number = self.request.retries + 1\n countdown = countdown_base * (2 ** (retry_number - 1))\n raise self.retry(exc=exc, countdown=countdown)\n```\n\n### File: app/workers/tasks/dead_letter.py\n```python\nfrom celery.utils.log import get_task_logger\nfrom app.workers.celery_app import celery_app\n\nlogger = get_task_logger(__name__)\n\n\n@celery_app.task(name='app.workers.tasks.dead_letter.handle_dead_letter', queue='dead_letter')\ndef handle_dead_letter(original_task_name, task_id, args, kwargs, error_msg):\n \"\"\"Persist dead-letter info and alert ops.\"\"\"\n logger.critical(\n 'DEAD LETTER | task=%s id=%s error=%s args=%s kwargs=%s',\n original_task_name, task_id, error_msg, args, kwargs,\n )\n # TODO: integrate PagerDuty / Slack webhook here\n return {\n 'task': original_task_name,\n 'task_id': task_id,\n 'error': error_msg,\n }\n```\n\n### File: app/workers/tasks/import_tasks.py\n```python\nimport csv\nimport io\nfrom celery.utils.log import get_task_logger\nfrom app.workers.celery_app import celery_app\nfrom app.workers.tasks.base import BaseTaskWithRetry\nfrom app.db.job_status import update_job_status, JobStatus\n\nlogger = get_task_logger(__name__)\n\n\n@celery_app.task(\n bind=True,\n base=BaseTaskWithRetry,\n name='app.workers.tasks.import_tasks.import_expenses_csv',\n max_retries=3,\n)\ndef import_expenses_csv(self, user_id: int, csv_content: str):\n \"\"\"\n Parse and persist CSV expense rows for a user.\n csv_content — raw CSV string (decoded before enqueuing).\n \"\"\"\n update_job_status(self.request.id, JobStatus.RUNNING)\n try:\n from app.db.models import db, Expense # lazy import to avoid circular deps\n reader = csv.DictReader(io.StringIO(csv_content))\n rows_added = 0\n for row in reader:\n expense = Expense(\n user_id=user_id,\n amount=float(row['amount']),\n description=row.get('description', ''),\n category_id=int(row['category_id']) if row.get('category_id') else None,\n date=row.get('date'),\n )\n db.session.add(expense)\n rows_added += 1\n db.session.commit()\n logger.info('Imported %d expenses for user %d', rows_added, user_id)\n return {'rows_added': rows_added}\n except Exception as exc:\n logger.warning('import_expenses_csv failed (attempt %d): %s', self.request.retries + 1, exc)\n self.retry_with_backoff(exc)\n```\n\n### File: app/workers/tasks/ai_tasks.py\n```python\nfrom celery.utils.log import get_task_logger\nfrom app.workers.celery_app import celery_app\nfrom app.workers.tasks.base import BaseTaskWithRetry\nfrom app.db.job_status import update_job_status, JobStatus\n\nlogger = get_task_logger(__name__)\n\n\n@celery_app.task(\n bind=True,\n base=BaseTaskWithRetry,\n name='app.workers.tasks.ai_tasks.generate_insights',\n max_retries=3,\n)\ndef generate_insights(self, user_id: int, year_month: str):\n \"\"\"\n Generate AI budget insights for *user_id* for *year_month* (YYYY-MM).\n Result is cached in Redis under insights:{user_id}.\n \"\"\"\n update_job_status(self.request.id, JobStatus.RUNNING)\n try:\n from app.services.insights_service import build_insights # lazy\n import redis, os, json\n result = build_insights(user_id, year_month)\n r = redis.from_url(os.environ.get('REDIS_URL', 'redis://localhost:6379/0'))\n r.setex(f'insights:{user_id}', 86400, json.dumps(result))\n logger.info('Insights generated for user %d (%s)', user_id, year_month)\n return result\n except Exception as exc:\n logger.warning('generate_insights failed (attempt %d): %s', self.request.retries + 1, exc)\n self.retry_with_backoff(exc)\n\n\n@celery_app.task(\n bind=True,\n base=BaseTaskWithRetry,\n name='app.workers.tasks.ai_tasks.generate_budget_suggestion',\n max_retries=3,\n)\ndef generate_budget_suggestion(self, user_id: int):\n \"\"\"Generate budget suggestion and cache it.\"\"\"\n update_job_status(self.request.id, JobStatus.RUNNING)\n try:\n from app.services.