diff --git a/packages/backend/app/__init__.py b/packages/backend/app/__init__.py index cdf76b45..005cf1e1 100644 --- a/packages/backend/app/__init__.py +++ b/packages/backend/app/__init__.py @@ -110,6 +110,78 @@ def _ensure_schema_compatibility(app: Flask) -> None: NOT NULL DEFAULT 'INR' """ ) + # Background jobs table (idempotent migration) + cur.execute( + """ + DO $$ BEGIN + CREATE TYPE job_status AS ENUM ('PENDING','RUNNING','SUCCESS','FAILED','DEAD_LETTER'); + EXCEPTION WHEN duplicate_object THEN NULL; + END $$; + """ + ) + cur.execute( + """ + CREATE TABLE IF NOT EXISTS background_jobs ( + id SERIAL PRIMARY KEY, + job_type VARCHAR(100) NOT NULL, + payload JSONB NOT NULL DEFAULT '{}', + status job_status NOT NULL DEFAULT 'PENDING', + attempt INT NOT NULL DEFAULT 0, + max_retries INT NOT NULL DEFAULT 3, + next_run_at TIMESTAMP NOT NULL DEFAULT NOW(), + last_error TEXT, + result JSONB, + created_at TIMESTAMP NOT NULL DEFAULT NOW(), + updated_at TIMESTAMP NOT NULL DEFAULT NOW(), + completed_at TIMESTAMP + ); + """ + ) + cur.execute( + "CREATE INDEX IF NOT EXISTS idx_bg_jobs_status_next_run ON background_jobs(status, next_run_at);" + ) + cur.execute( + "CREATE INDEX IF NOT EXISTS idx_bg_jobs_type ON background_jobs(job_type);" + ) + cur.execute( + "CREATE INDEX IF NOT EXISTS idx_bg_jobs_created ON background_jobs(created_at DESC);" + ) + # Savings goals tables (Issue #133) + cur.execute( + """ + CREATE TABLE IF NOT EXISTS savings_goals ( + id SERIAL PRIMARY KEY, + user_id INT NOT NULL REFERENCES users(id) ON DELETE CASCADE, + name VARCHAR(200) NOT NULL, + target_amount NUMERIC(12,2) NOT NULL, + current_amount NUMERIC(12,2) NOT NULL DEFAULT 0, + currency VARCHAR(10) NOT NULL DEFAULT 'INR', + deadline DATE, + category VARCHAR(100), + status VARCHAR(20) NOT NULL DEFAULT 'ACTIVE', + created_at TIMESTAMP NOT NULL DEFAULT NOW(), + updated_at TIMESTAMP NOT NULL DEFAULT NOW() + ); + """ + ) + cur.execute( + "CREATE INDEX IF NOT EXISTS idx_savings_goals_user ON savings_goals(user_id, status);" + ) + cur.execute( + """ + CREATE TABLE IF NOT EXISTS savings_contributions ( + id SERIAL PRIMARY KEY, + goal_id INT NOT NULL REFERENCES savings_goals(id) ON DELETE CASCADE, + amount NUMERIC(12,2) NOT NULL, + note VARCHAR(500), + contributed_at DATE NOT NULL DEFAULT CURRENT_DATE, + created_at TIMESTAMP NOT NULL DEFAULT NOW() + ); + """ + ) + cur.execute( + "CREATE INDEX IF NOT EXISTS idx_savings_contributions_goal ON savings_contributions(goal_id, contributed_at DESC);" + ) conn.commit() except Exception: app.logger.exception( diff --git a/packages/backend/app/db/migrations/001_background_jobs.sql b/packages/backend/app/db/migrations/001_background_jobs.sql new file mode 100644 index 00000000..62c2e68a --- /dev/null +++ b/packages/backend/app/db/migrations/001_background_jobs.sql @@ -0,0 +1,27 @@ +-- Background job tracking for resilient retry & monitoring +-- Part of: Resilient background job retry & monitoring (Issue #130) + +DO $$ BEGIN + CREATE TYPE job_status AS ENUM ('PENDING', 'RUNNING', 'SUCCESS', 'FAILED', 'DEAD_LETTER'); +EXCEPTION + WHEN duplicate_object THEN NULL; +END $$; + +CREATE TABLE IF NOT EXISTS background_jobs ( + id SERIAL PRIMARY KEY, + job_type VARCHAR(100) NOT NULL, + payload JSONB NOT NULL DEFAULT '{}', + status job_status NOT NULL DEFAULT 'PENDING', + attempt INT NOT NULL DEFAULT 0, + max_retries INT NOT NULL DEFAULT 3, + next_run_at TIMESTAMP NOT NULL DEFAULT NOW(), + last_error TEXT, + result JSONB, + created_at TIMESTAMP NOT NULL DEFAULT NOW(), + updated_at TIMESTAMP NOT NULL DEFAULT NOW(), + completed_at TIMESTAMP +); + +CREATE INDEX IF NOT EXISTS idx_bg_jobs_status_next_run ON background_jobs(status, next_run_at); +CREATE INDEX IF NOT EXISTS idx_bg_jobs_type ON background_jobs(job_type); +CREATE INDEX IF NOT EXISTS idx_bg_jobs_created ON background_jobs(created_at DESC); diff --git a/packages/backend/app/db/migrations/002_savings_goals.sql b/packages/backend/app/db/migrations/002_savings_goals.sql new file mode 100644 index 00000000..868f9387 --- /dev/null +++ b/packages/backend/app/db/migrations/002_savings_goals.sql @@ -0,0 +1,26 @@ +-- Goal-based savings tracking & milestones (Issue #133) + +CREATE TABLE IF NOT EXISTS savings_goals ( + id SERIAL PRIMARY KEY, + user_id INT NOT NULL REFERENCES users(id) ON DELETE CASCADE, + name VARCHAR(200) NOT NULL, + target_amount NUMERIC(12,2) NOT NULL, + current_amount NUMERIC(12,2) NOT NULL DEFAULT 0, + currency VARCHAR(10) NOT NULL DEFAULT 'INR', + deadline DATE, + category VARCHAR(100), + status VARCHAR(20) NOT NULL DEFAULT 'ACTIVE', + created_at TIMESTAMP NOT NULL DEFAULT NOW(), + updated_at TIMESTAMP NOT NULL DEFAULT NOW() +); +CREATE INDEX IF NOT EXISTS idx_savings_goals_user ON savings_goals(user_id, status); + +CREATE TABLE IF NOT EXISTS savings_contributions ( + id SERIAL PRIMARY KEY, + goal_id INT NOT NULL REFERENCES savings_goals(id) ON DELETE CASCADE, + amount NUMERIC(12,2) NOT NULL, + note VARCHAR(500), + contributed_at DATE NOT NULL DEFAULT CURRENT_DATE, + created_at TIMESTAMP NOT NULL DEFAULT NOW() +); +CREATE INDEX IF NOT EXISTS idx_savings_contributions_goal ON savings_contributions(goal_id, contributed_at DESC); diff --git a/packages/backend/app/db/schema.sql b/packages/backend/app/db/schema.sql index 410189de..87328890 100644 --- a/packages/backend/app/db/schema.sql +++ b/packages/backend/app/db/schema.sql @@ -123,3 +123,54 @@ CREATE TABLE IF NOT EXISTS audit_logs ( action VARCHAR(100) NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT NOW() ); + +-- Background job tracking for resilient retry & dead-letter support +DO $$ BEGIN + CREATE TYPE job_status AS ENUM ('PENDING','RUNNING','SUCCESS','FAILED','DEAD_LETTER'); +EXCEPTION + WHEN duplicate_object THEN NULL; +END $$; + +CREATE TABLE IF NOT EXISTS background_jobs ( + id SERIAL PRIMARY KEY, + job_type VARCHAR(100) NOT NULL, + payload JSONB NOT NULL DEFAULT '{}', + status job_status NOT NULL DEFAULT 'PENDING', + attempt INT NOT NULL DEFAULT 0, + max_retries INT NOT NULL DEFAULT 3, + next_run_at TIMESTAMP NOT NULL DEFAULT NOW(), + last_error TEXT, + result JSONB, + created_at TIMESTAMP NOT NULL DEFAULT NOW(), + updated_at TIMESTAMP NOT NULL DEFAULT NOW(), + completed_at TIMESTAMP +); +CREATE INDEX IF NOT EXISTS idx_bg_jobs_status_next_run ON background_jobs(status, next_run_at); +CREATE INDEX IF NOT EXISTS idx_bg_jobs_type ON background_jobs(job_type); +CREATE INDEX IF NOT EXISTS idx_bg_jobs_created ON background_jobs(created_at DESC); + +-- Goal-based savings tracking & milestones (Issue #133) +CREATE TABLE IF NOT EXISTS savings_goals ( + id SERIAL PRIMARY KEY, + user_id INT NOT NULL REFERENCES users(id) ON DELETE CASCADE, + name VARCHAR(200) NOT NULL, + target_amount NUMERIC(12,2) NOT NULL, + current_amount NUMERIC(12,2) NOT NULL DEFAULT 0, + currency VARCHAR(10) NOT NULL DEFAULT 'INR', + deadline DATE, + category VARCHAR(100), + status VARCHAR(20) NOT NULL DEFAULT 'ACTIVE', + created_at TIMESTAMP NOT NULL DEFAULT NOW(), + updated_at TIMESTAMP NOT NULL DEFAULT NOW() +); +CREATE INDEX IF NOT EXISTS idx_savings_goals_user ON savings_goals(user_id, status); + +CREATE TABLE IF NOT EXISTS savings_contributions ( + id SERIAL PRIMARY KEY, + goal_id INT NOT NULL REFERENCES savings_goals(id) ON DELETE CASCADE, + amount NUMERIC(12,2) NOT NULL, + note VARCHAR(500), + contributed_at DATE NOT NULL DEFAULT CURRENT_DATE, + created_at TIMESTAMP NOT NULL DEFAULT NOW() +); +CREATE INDEX IF NOT EXISTS idx_savings_contributions_goal ON savings_contributions(goal_id, contributed_at DESC); diff --git a/packages/backend/app/models.py b/packages/backend/app/models.py index 64d44810..7866ae60 100644 --- a/packages/backend/app/models.py +++ b/packages/backend/app/models.py @@ -133,3 +133,77 @@ class AuditLog(db.Model): user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=True) action = db.Column(db.String(100), nullable=False) created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False) + + +class JobStatus(str, Enum): + PENDING = "PENDING" + RUNNING = "RUNNING" + SUCCESS = "SUCCESS" + FAILED = "FAILED" + DEAD_LETTER = "DEAD_LETTER" + + +class BackgroundJob(db.Model): + """Tracks background job execution with retry support.""" + + __tablename__ = "background_jobs" + + id = db.Column(db.Integer, primary_key=True) + job_type = db.Column(db.String(100), nullable=False, index=True) + payload = db.Column(db.JSON, nullable=False, default=dict) + status = db.Column( + SAEnum(JobStatus), nullable=False, default=JobStatus.PENDING, index=True + ) + attempt = db.Column(db.Integer, nullable=False, default=0) + max_retries = db.Column(db.Integer, nullable=False, default=3) + next_run_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow) + last_error = db.Column(db.Text, nullable=True) + result = db.Column(db.JSON, nullable=True) + created_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow) + updated_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow) + completed_at = db.Column(db.DateTime, nullable=True) + + +class SavingsGoalStatus(str, Enum): + ACTIVE = "ACTIVE" + COMPLETED = "COMPLETED" + PAUSED = "PAUSED" + + +class SavingsGoal(db.Model): + """A savings goal with a target amount and optional deadline.""" + + __tablename__ = "savings_goals" + + id = db.Column(db.Integer, primary_key=True) + user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=False, index=True) + name = db.Column(db.String(200), nullable=False) + target_amount = db.Column(db.Numeric(12, 2), nullable=False) + current_amount = db.Column(db.Numeric(12, 2), nullable=False, default=0) + currency = db.Column(db.String(10), nullable=False, default="INR") + deadline = db.Column(db.Date, nullable=True) + category = db.Column(db.String(100), nullable=True) + status = db.Column( + db.String(20), nullable=False, default=SavingsGoalStatus.ACTIVE.value + ) + created_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow) + updated_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow) + + contributions = db.relationship( + "SavingsContribution", backref="goal", lazy="dynamic", cascade="all, delete-orphan" + ) + + +class SavingsContribution(db.Model): + """A single contribution toward a savings goal.""" + + __tablename__ = "savings_contributions" + + id = db.Column(db.Integer, primary_key=True) + goal_id = db.Column( + db.Integer, db.ForeignKey("savings_goals.id", ondelete="CASCADE"), nullable=False, index=True + ) + amount = db.Column(db.Numeric(12, 2), nullable=False) + note = db.Column(db.String(500), nullable=True) + contributed_at = db.Column(db.Date, nullable=False, default=date.today) + created_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow) diff --git a/packages/backend/app/observability.py b/packages/backend/app/observability.py index 0c2a6bf7..d65c272c 100644 --- a/packages/backend/app/observability.py +++ b/packages/backend/app/observability.py @@ -60,6 +60,12 @@ def __init__(self) -> None: ["event", "channel", "status"], registry=self.registry, ) + self.job_events_total = Counter( + "finmind_job_events_total", + "Background job lifecycle events.", + ["event", "job_type", "status"], + registry=self.registry, + ) def observe_http_request( self, method: str, endpoint: str, status_code: int, duration_seconds: float @@ -79,6 +85,13 @@ def record_reminder_event( event=event, channel=channel, status=status ).inc() + def record_job_event( + self, event: str, job_type: str, status: str = "ok" + ) -> None: + self.job_events_total.labels( + event=event, job_type=job_type, status=status + ).inc() + def metrics_response(self) -> Response: if self.multiprocess_enabled: registry = CollectorRegistry() @@ -137,3 +150,9 @@ def track_reminder_event(event: str, channel: str, status: str = "ok") -> None: obs = current_app.extensions.get("observability") if obs: obs.record_reminder_event(event=event, channel=channel, status=status) + + +def track_job_event(event: str, job_type: str, status: str = "ok") -> None: + obs = current_app.extensions.get("observability") + if obs: + obs.record_job_event(event=event, job_type=job_type, status=status) diff --git a/packages/backend/app/openapi.yaml b/packages/backend/app/openapi.yaml index 3f8ec3f0..dff5e727 100644 --- a/packages/backend/app/openapi.yaml +++ b/packages/backend/app/openapi.yaml @@ -12,6 +12,8 @@ tags: - name: Bills - name: Reminders - name: Insights + - name: Jobs + - name: Savings paths: /auth/register: post: @@ -481,6 +483,255 @@ paths: application/json: schema: { $ref: '#/components/schemas/Error' } + # ── Background Jobs (Admin) ────────────────────────────────────────── + + /admin/jobs/stats: + get: + summary: Aggregated job statistics + tags: [Jobs] + security: [bearerAuth: []] + responses: + '200': + description: Job stats by status and type + content: + application/json: + schema: + type: object + properties: + by_status: { type: object, additionalProperties: { type: integer } } + by_type: { type: object, additionalProperties: { type: integer } } + total: { type: integer } + '403': { description: Admin required } + + /admin/jobs: + get: + summary: List background jobs + tags: [Jobs] + security: [bearerAuth: []] + parameters: + - name: status + in: query + schema: { type: string, enum: [PENDING, RUNNING, SUCCESS, FAILED, DEAD_LETTER] } + - name: job_type + in: query + schema: { type: string } + - name: limit + in: query + schema: { type: integer, default: 50 } + - name: offset + in: query + schema: { type: integer, default: 0 } + responses: + '200': + description: Paginated job list + content: + application/json: + schema: + type: object + properties: + total: { type: integer } + offset: { type: integer } + limit: { type: integer } + jobs: + type: array + items: { $ref: '#/components/schemas/BackgroundJob' } + '403': { description: Admin required } + + /admin/jobs/{job_id}: + get: + summary: Get job details + tags: [Jobs] + security: [bearerAuth: []] + parameters: + - name: job_id + in: path + required: true + schema: { type: integer } + responses: + '200': + description: Job details + content: + application/json: + schema: { $ref: '#/components/schemas/BackgroundJob' } + '404': { description: Not found } + delete: + summary: Delete a job record + tags: [Jobs] + security: [bearerAuth: []] + parameters: + - name: job_id + in: path + required: true + schema: { type: integer } + responses: + '200': { description: Deleted } + '404': { description: Not found } + + /admin/jobs/{job_id}/retry: + post: + summary: Retry a dead-lettered job + tags: [Jobs] + security: [bearerAuth: []] + parameters: + - name: job_id + in: path + required: true + schema: { type: integer } + responses: + '200': { description: Job reset to PENDING } + '400': { description: Job not in DEAD_LETTER status } + '404': { description: Not found } + + /admin/jobs/process: + post: + summary: Manually trigger job processing + tags: [Jobs] + security: [bearerAuth: []] + parameters: + - name: limit + in: query + schema: { type: integer, default: 50 } + responses: + '200': + description: Processing results + content: + application/json: + schema: + type: object + properties: + processed: { type: integer } + succeeded: { type: integer } + failed: { type: integer } + dead_lettered: { type: integer } + + # ── Savings Goals ───────────────────────────────────────────────────── + + /savings/goals: + get: + summary: List savings goals + tags: [Savings] + security: [bearerAuth: []] + parameters: + - name: status + in: query + schema: { type: string, enum: [ACTIVE, COMPLETED, PAUSED] } + responses: + '200': + description: List of goals with progress + content: + application/json: + schema: + type: array + items: { $ref: '#/components/schemas/SavingsGoal' } + post: + summary: Create a savings goal + tags: [Savings] + security: [bearerAuth: []] + requestBody: + required: true + content: + application/json: + schema: { $ref: '#/components/schemas/NewSavingsGoal' } + responses: + '201': { description: Created } + '400': { description: Validation error } + + /savings/goals/{goal_id}: + get: + summary: Get goal with contributions + tags: [Savings] + security: [bearerAuth: []] + parameters: + - name: goal_id + in: path + required: true + schema: { type: integer } + responses: + '200': + description: Goal detail + content: + application/json: + schema: { $ref: '#/components/schemas/SavingsGoalDetail' } + '404': { description: Not found } + patch: + summary: Update a savings goal + tags: [Savings] + security: [bearerAuth: []] + parameters: + - name: goal_id + in: path + required: true + schema: { type: integer } + requestBody: + content: + application/json: + schema: { $ref: '#/components/schemas/UpdateSavingsGoal' } + responses: + '200': { description: Updated } + '404': { description: Not found } + delete: + summary: Delete a savings goal + tags: [Savings] + security: [bearerAuth: []] + parameters: + - name: goal_id + in: path + required: true + schema: { type: integer } + responses: + '200': { description: Deleted } + + /savings/goals/{goal_id}/contribute: + post: + summary: Add a contribution + tags: [Savings] + security: [bearerAuth: []] + parameters: + - name: goal_id + in: path + required: true + schema: { type: integer } + requestBody: + required: true + content: + application/json: + schema: { $ref: '#/components/schemas/NewContribution' } + responses: + '201': { description: Contribution added } + '400': { description: Validation error } + '404': { description: Goal not found } + + /savings/goals/{goal_id}/milestones: + get: + summary: Get milestone status + tags: [Savings] + security: [bearerAuth: []] + parameters: + - name: goal_id + in: path + required: true + schema: { type: integer } + responses: + '200': + description: Milestone details + content: + application/json: + schema: + type: object + properties: + goal_id: { type: integer } + progress_pct: { type: number } + milestones: + type: array + items: + type: object + properties: + threshold: { type: integer } + reached: { type: boolean } + amount_at_threshold: { type: number } + next_milestone: { type: integer, nullable: true } + amount_to_next: { type: number } + components: securitySchemes: bearerAuth: @@ -587,3 +838,74 @@ components: message: { type: string } send_at: { type: string, format: date-time } channel: { type: string, enum: [email, whatsapp], default: email } + BackgroundJob: + type: object + properties: + id: { type: integer } + job_type: { type: string } + payload: { type: object } + status: { type: string, enum: [PENDING, RUNNING, SUCCESS, FAILED, DEAD_LETTER] } + attempt: { type: integer } + max_retries: { type: integer } + next_run_at: { type: string, format: date-time, nullable: true } + last_error: { type: string, nullable: true } + result: { type: object, nullable: true } + created_at: { type: string, format: date-time } + updated_at: { type: string, format: date-time } + completed_at: { type: string, format: date-time, nullable: true } + SavingsGoal: + type: object + properties: + id: { type: integer } + name: { type: string } + target_amount: { type: number, format: float } + current_amount: { type: number, format: float } + currency: { type: string } + progress_pct: { type: number } + remaining: { type: number } + deadline: { type: string, format: date, nullable: true } + category: { type: string, nullable: true } + status: { type: string, enum: [ACTIVE, COMPLETED, PAUSED] } + milestones_reached: + type: array + items: { type: integer } + created_at: { type: string, format: date-time } + updated_at: { type: string, format: date-time } + SavingsGoalDetail: + allOf: + - $ref: '#/components/schemas/SavingsGoal' + - type: object + properties: + contributions: + type: array + items: + type: object + properties: + id: { type: integer } + amount: { type: number, format: float } + note: { type: string, nullable: true } + contributed_at: { type: string, format: date } + NewSavingsGoal: + type: object + required: [name, target_amount] + properties: + name: { type: string } + target_amount: { type: number, format: float } + currency: { type: string, default: INR } + deadline: { type: string, format: date, nullable: true } + category: { type: string, nullable: true } + UpdateSavingsGoal: + type: object + properties: + name: { type: string } + target_amount: { type: number, format: float } + deadline: { type: string, format: date, nullable: true } + category: { type: string, nullable: true } + status: { type: string, enum: [ACTIVE, COMPLETED, PAUSED] } + NewContribution: + type: object + required: [amount] + properties: + amount: { type: number, format: float } + note: { type: string, nullable: true } + contributed_at: { type: string, format: date, nullable: true } diff --git a/packages/backend/app/routes/__init__.py b/packages/backend/app/routes/__init__.py index f13b0f89..8e62a26b 100644 --- a/packages/backend/app/routes/__init__.py +++ b/packages/backend/app/routes/__init__.py @@ -7,6 +7,8 @@ from .categories import bp as categories_bp from .docs import bp as docs_bp from .dashboard import bp as dashboard_bp +from .jobs import bp as jobs_bp +from .savings import bp as savings_bp def register_routes(app: Flask): @@ -18,3 +20,5 @@ def register_routes(app: Flask): app.register_blueprint(categories_bp, url_prefix="/categories") app.register_blueprint(docs_bp, url_prefix="/docs") app.register_blueprint(dashboard_bp, url_prefix="/dashboard") + app.register_blueprint(jobs_bp, url_prefix="/admin") + app.register_blueprint(savings_bp, url_prefix="/savings") diff --git a/packages/backend/app/routes/jobs.py b/packages/backend/app/routes/jobs.py new file mode 100644 index 00000000..7e543a11 --- /dev/null +++ b/packages/backend/app/routes/jobs.py @@ -0,0 +1,173 @@ +"""Admin endpoints for background job monitoring and dead-letter management. + +Provides visibility into job execution status, retry counts, and the ability +to manually retry dead-lettered jobs. Requires admin role. +""" + +from flask import Blueprint, jsonify, request +from flask_jwt_extended import jwt_required, get_jwt_identity +from ..extensions import db +from ..models import BackgroundJob, JobStatus, User, Role +from ..services.jobs import ( + get_job_stats, + process_due_jobs, + retry_dead_letter_job, +) +import logging + +bp = Blueprint("jobs", __name__) +logger = logging.getLogger("finmind.jobs.routes") + + +def _require_admin(): + """Check that the current user has admin role. Returns user_id or raises.""" + uid = int(get_jwt_identity()) + user = db.session.get(User, uid) + if not user or user.role != Role.ADMIN.value: + from flask import abort + + abort(403, description="Admin access required") + return uid + + +@bp.get("/jobs/stats") +@jwt_required() +def job_stats(): + """Get aggregated job statistics. + + Returns counts by status (PENDING, RUNNING, SUCCESS, FAILED, DEAD_LETTER) + and by job type. + """ + _require_admin() + stats = get_job_stats() + return jsonify(stats) + + +@bp.get("/jobs") +@jwt_required() +def list_jobs(): + """List background jobs with optional filters. + + Query params: + status: Filter by status (PENDING, RUNNING, SUCCESS, FAILED, DEAD_LETTER) + job_type: Filter by job type + limit: Max results (default 50, max 200) + offset: Pagination offset + """ + _require_admin() + + query = BackgroundJob.query + + status_filter = request.args.get("status") + if status_filter: + try: + query = query.filter(BackgroundJob.status == JobStatus(status_filter)) + except ValueError: + return jsonify(error=f"Invalid status: {status_filter}"), 400 + + job_type_filter = request.args.get("job_type") + if job_type_filter: + query = query.filter(BackgroundJob.job_type == job_type_filter) + + limit = min(int(request.args.get("limit", 50)), 200) + offset = int(request.args.get("offset", 0)) + + total = query.count() + jobs = ( + query.order_by(BackgroundJob.created_at.desc()) + .offset(offset) + .limit(limit) + .all() + ) + + return jsonify( + total=total, + offset=offset, + limit=limit, + jobs=[ + { + "id": j.id, + "job_type": j.job_type, + "status": str(j.status), + "attempt": j.attempt, + "max_retries": j.max_retries, + "next_run_at": j.next_run_at.isoformat() if j.next_run_at else None, + "last_error": j.last_error, + "created_at": j.created_at.isoformat(), + "completed_at": j.completed_at.isoformat() if j.completed_at else None, + } + for j in jobs + ], + ) + + +@bp.get("/jobs/") +@jwt_required() +def get_job(job_id: int): + """Get full details for a specific job.""" + _require_admin() + job = db.session.get(BackgroundJob, job_id) + if not job: + return jsonify(error="Job not found"), 404 + + return jsonify( + id=job.id, + job_type=job.job_type, + payload=job.payload, + status=str(job.status), + attempt=job.attempt, + max_retries=job.max_retries, + next_run_at=job.next_run_at.isoformat() if job.next_run_at else None, + last_error=job.last_error, + result=job.result, + created_at=job.created_at.isoformat(), + updated_at=job.updated_at.isoformat(), + completed_at=job.completed_at.isoformat() if job.completed_at else None, + ) + + +@bp.post("/jobs//retry") +@jwt_required() +def retry_job(job_id: int): + """Manually retry a dead-lettered job. + + Resets the job to PENDING status with attempt count reset to 0. + The job will be picked up on the next processing cycle. + """ + _require_admin() + success = retry_dead_letter_job(job_id) + if not success: + job = db.session.get(BackgroundJob, job_id) + if not job: + return jsonify(error="Job not found"), 404 + return jsonify(error=f"Job is not in DEAD_LETTER status (current: {job.status})"), 400 + + return jsonify(status="retried", job_id=job_id) + + +@bp.post("/jobs/process") +@jwt_required() +def trigger_process(): + """Manually trigger processing of due jobs. + + Useful for testing or when the automatic scheduler is not running. + """ + _require_admin() + limit = int(request.args.get("limit", 50)) + stats = process_due_jobs(limit=limit) + return jsonify(**stats) + + +@bp.delete("/jobs/") +@jwt_required() +def delete_job(job_id: int): + """Delete a job record (admin only).""" + _require_admin() + job = db.session.get(BackgroundJob, job_id) + if not job: + return jsonify(error="Job not found"), 404 + + db.session.delete(job) + db.session.commit() + logger.info("Deleted job id=%s", job_id) + return jsonify(deleted=True, job_id=job_id) diff --git a/packages/backend/app/routes/reminders.py b/packages/backend/app/routes/reminders.py index 9ed7ea50..23d6c7bd 100644 --- a/packages/backend/app/routes/reminders.py +++ b/packages/backend/app/routes/reminders.py @@ -4,7 +4,7 @@ from ..extensions import db from ..models import Bill, Reminder from ..observability import track_reminder_event -from ..services.reminders import send_reminder +from ..services.jobs import enqueue, process_due_jobs import logging bp = Blueprint("reminders", __name__) @@ -159,6 +159,17 @@ def autopay_result_followup(bill_id: int): @bp.post("/run") @jwt_required() def run_due(): + """Process due reminders via the resilient job system. + + Instead of sending reminders inline (fire-and-forget), this now: + 1. Finds due reminders for the user + 2. Enqueues a background job for each reminder + 3. Immediately processes due jobs (with retry support) + 4. Returns job statistics + + This ensures failed sends are retried with exponential backoff + and permanently failed jobs go to the dead-letter queue. + """ uid = int(get_jwt_identity()) now = datetime.utcnow() + timedelta(minutes=1) items = ( @@ -170,13 +181,27 @@ def run_due(): ) .all() ) + + enqueued = 0 for r in items: - send_reminder(r) - r.sent = True - track_reminder_event(event="sent", channel=r.channel) - db.session.commit() - logger.info("Processed due reminders user=%s count=%s", uid, len(items)) - return jsonify(processed=len(items)) + enqueue( + job_type="send_reminder", + payload={"reminder_id": r.id}, + max_retries=3, + ) + enqueued += 1 + track_reminder_event(event="enqueued", channel=r.channel) + + # Process the just-enqueued jobs immediately + stats = process_due_jobs(limit=enqueued or 50) + + logger.info( + "Processed due reminders user=%s enqueued=%s stats=%s", + uid, + enqueued, + stats, + ) + return jsonify(enqueued=enqueued, **stats) def _bill_channels(bill: Bill) -> list[str]: diff --git a/packages/backend/app/routes/savings.py b/packages/backend/app/routes/savings.py new file mode 100644 index 00000000..372df735 --- /dev/null +++ b/packages/backend/app/routes/savings.py @@ -0,0 +1,300 @@ +"""Goal-based savings tracking & milestones (Issue #133). + +Provides CRUD for savings goals, contribution tracking, milestone detection, +and progress analytics. +""" + +from datetime import date, datetime +from decimal import Decimal, InvalidOperation + +from flask import Blueprint, jsonify, request +from flask_jwt_extended import jwt_required, get_jwt_identity +from ..extensions import db +from ..models import SavingsGoal, SavingsGoalStatus, SavingsContribution, User +import logging + +bp = Blueprint("savings", __name__) +logger = logging.getLogger("finmind.savings") + +# Milestone thresholds (percentage of target) +MILESTONES = [25, 50, 75, 100] + + +def _parse_amount(raw) -> Decimal | None: + try: + return Decimal(str(raw)).quantize(Decimal("0.01")) + except (InvalidOperation, ValueError, TypeError): + return None + + +def _goal_to_dict(goal: SavingsGoal, include_contributions: bool = False) -> dict: + target = float(goal.target_amount) + current = float(goal.current_amount) + pct = round((current / target) * 100, 1) if target > 0 else 0.0 + remaining = max(0.0, round(target - current, 2)) + + data = { + "id": goal.id, + "name": goal.name, + "target_amount": target, + "current_amount": current, + "currency": goal.currency, + "progress_pct": pct, + "remaining": remaining, + "deadline": goal.deadline.isoformat() if goal.deadline else None, + "category": goal.category, + "status": goal.status, + "milestones_reached": _milestones_reached(pct), + "created_at": goal.created_at.isoformat(), + "updated_at": goal.updated_at.isoformat(), + } + + if include_contributions: + contribs = ( + goal.contributions + .order_by(SavingsContribution.contributed_at.desc()) + .all() + ) + data["contributions"] = [ + { + "id": c.id, + "amount": float(c.amount), + "note": c.note, + "contributed_at": c.contributed_at.isoformat(), + } + for c in contribs + ] + + return data + + +def _milestones_reached(pct: float) -> list[int]: + """Return list of milestone thresholds that have been reached.""" + return [m for m in MILESTONES if pct >= m] + + +def _check_and_update_status(goal: SavingsGoal): + """Auto-complete goal if 100% reached.""" + if goal.target_amount > 0 and goal.current_amount >= goal.target_amount: + if goal.status == SavingsGoalStatus.ACTIVE.value: + goal.status = SavingsGoalStatus.COMPLETED.value + + +# ─── Routes ──────────────────────────────────────────────────────────────── + + +@bp.get("/goals") +@jwt_required() +def list_goals(): + """List all savings goals for the current user. + + Query params: + status: Filter by status (ACTIVE, COMPLETED, PAUSED) + """ + uid = int(get_jwt_identity()) + q = SavingsGoal.query.filter_by(user_id=uid) + + status_filter = request.args.get("status") + if status_filter and status_filter.upper() in {s.value for s in SavingsGoalStatus}: + q = q.filter_by(status=status_filter.upper()) + + goals = q.order_by(SavingsGoal.created_at.desc()).all() + logger.info("List savings goals user=%s count=%s", uid, len(goals)) + return jsonify([_goal_to_dict(g) for g in goals]) + + +@bp.post("/goals") +@jwt_required() +def create_goal(): + """Create a new savings goal.""" + uid = int(get_jwt_identity()) + user = db.session.get(User, uid) + data = request.get_json() or {} + + name = (data.get("name") or "").strip() + if not name: + return jsonify(error="name is required"), 400 + + target = _parse_amount(data.get("target_amount")) + if target is None or target <= 0: + return jsonify(error="target_amount must be a positive number"), 400 + + deadline = None + if data.get("deadline"): + try: + deadline = date.fromisoformat(data["deadline"]) + except ValueError: + return jsonify(error="invalid deadline date"), 400 + + goal = SavingsGoal( + user_id=uid, + name=name, + target_amount=target, + currency=data.get("currency") or (user.preferred_currency if user else "INR"), + deadline=deadline, + category=(data.get("category") or "").strip() or None, + ) + db.session.add(goal) + db.session.commit() + logger.info("Created savings goal id=%s user=%s target=%s", goal.id, uid, target) + return jsonify(_goal_to_dict(goal)), 201 + + +@bp.get("/goals/") +@jwt_required() +def get_goal(goal_id: int): + """Get a single savings goal with its contributions.""" + uid = int(get_jwt_identity()) + goal = SavingsGoal.query.filter_by(id=goal_id, user_id=uid).first() + if not goal: + return jsonify(error="not found"), 404 + return jsonify(_goal_to_dict(goal, include_contributions=True)) + + +@bp.patch("/goals/") +@jwt_required() +def update_goal(goal_id: int): + """Update a savings goal.""" + uid = int(get_jwt_identity()) + goal = SavingsGoal.query.filter_by(id=goal_id, user_id=uid).first() + if not goal: + return jsonify(error="not found"), 404 + + data = request.get_json() or {} + + if "name" in data: + name = (data["name"] or "").strip() + if not name: + return jsonify(error="name cannot be empty"), 400 + goal.name = name + + if "target_amount" in data: + target = _parse_amount(data["target_amount"]) + if target is None or target <= 0: + return jsonify(error="target_amount must be positive"), 400 + goal.target_amount = target + + if "deadline" in data: + if data["deadline"] is None: + goal.deadline = None + else: + try: + goal.deadline = date.fromisoformat(data["deadline"]) + except ValueError: + return jsonify(error="invalid deadline"), 400 + + if "category" in data: + goal.category = (data["category"] or "").strip() or None + + if "status" in data: + status = (data["status"] or "").upper().strip() + if status not in {s.value for s in SavingsGoalStatus}: + return jsonify(error="invalid status"), 400 + goal.status = status + + _check_and_update_status(goal) + goal.updated_at = datetime.utcnow() + db.session.commit() + return jsonify(_goal_to_dict(goal)) + + +@bp.delete("/goals/") +@jwt_required() +def delete_goal(goal_id: int): + """Delete a savings goal and all its contributions.""" + uid = int(get_jwt_identity()) + goal = SavingsGoal.query.filter_by(id=goal_id, user_id=uid).first() + if not goal: + return jsonify(error="not found"), 404 + db.session.delete(goal) + db.session.commit() + return jsonify(message="deleted") + + +@bp.post("/goals//contribute") +@jwt_required() +def contribute(goal_id: int): + """Add a contribution to a savings goal. + + Body: {"amount": 100.00, "note": "monthly savings"} + """ + uid = int(get_jwt_identity()) + goal = SavingsGoal.query.filter_by(id=goal_id, user_id=uid).first() + if not goal: + return jsonify(error="not found"), 404 + + data = request.get_json() or {} + amount = _parse_amount(data.get("amount")) + if amount is None or amount <= 0: + return jsonify(error="amount must be a positive number"), 400 + + contributed_at = date.today() + if data.get("contributed_at"): + try: + contributed_at = date.fromisoformat(data["contributed_at"]) + except ValueError: + return jsonify(error="invalid contributed_at date"), 400 + + contribution = SavingsContribution( + goal_id=goal.id, + amount=amount, + note=(data.get("note") or "").strip() or None, + contributed_at=contributed_at, + ) + db.session.add(contribution) + + goal.current_amount = Decimal(str(goal.current_amount)) + amount + _check_and_update_status(goal) + goal.updated_at = datetime.utcnow() + db.session.commit() + + logger.info( + "Contribution id=%s goal=%s amount=%s new_total=%s", + contribution.id, + goal.id, + amount, + goal.current_amount, + ) + return jsonify(_goal_to_dict(goal, include_contributions=True)), 201 + + +@bp.get("/goals//milestones") +@jwt_required() +def milestones(goal_id: int): + """Get milestone status for a savings goal. + + Returns which milestones (25%, 50%, 75%, 100%) have been reached + and which is the next upcoming one. + """ + uid = int(get_jwt_identity()) + goal = SavingsGoal.query.filter_by(id=goal_id, user_id=uid).first() + if not goal: + return jsonify(error="not found"), 404 + + target = float(goal.target_amount) + current = float(goal.current_amount) + pct = round((current / target) * 100, 1) if target > 0 else 0.0 + + reached = _milestones_reached(pct) + upcoming = [m for m in MILESTONES if m not in reached] + next_milestone = upcoming[0] if upcoming else None + amount_to_next = ( + round((next_milestone / 100) * target - current, 2) + if next_milestone + else 0.0 + ) + + return jsonify( + goal_id=goal.id, + progress_pct=pct, + milestones=[ + { + "threshold": m, + "reached": m in reached, + "amount_at_threshold": round((m / 100) * target, 2), + } + for m in MILESTONES + ], + next_milestone=next_milestone, + amount_to_next=amount_to_next, + ) diff --git a/packages/backend/app/services/jobs.py b/packages/backend/app/services/jobs.py new file mode 100644 index 00000000..7b7db647 --- /dev/null +++ b/packages/backend/app/services/jobs.py @@ -0,0 +1,290 @@ +"""Resilient background job execution engine with retry and dead-letter support. + +Implements exponential backoff retries, job lifecycle tracking, and dead-letter +queue for permanently failed jobs. Integrates with Prometheus metrics. +""" + +import logging +from datetime import datetime, timedelta +from typing import Any, Callable + +from sqlalchemy.exc import OperationalError + +from ..extensions import db +from ..models import BackgroundJob, JobStatus + +logger = logging.getLogger("finmind.jobs") + +# Exponential backoff: 30s, 2min, 8min +BACKOFF_BASE_SECONDS = 30 +BACKOFF_MULTIPLIER = 4 + +# Registry of job type handlers +_job_handlers: dict[str, Callable] = {} + + +def register_handler(job_type: str): + """Decorator to register a handler function for a job type. + + Usage: + @register_handler("send_reminder") + def handle_send_reminder(payload: dict) -> dict: + ... + return {"result": "sent"} + """ + + def decorator(fn: Callable) -> Callable: + _job_handlers[job_type] = fn + return fn + + return decorator + + +def enqueue( + job_type: str, + payload: dict[str, Any] | None = None, + max_retries: int = 3, + delay_seconds: int = 0, +) -> BackgroundJob: + """Create a new background job and enqueue it for processing. + + Args: + job_type: Identifier for the job handler to use. + payload: JSON-serializable data passed to the handler. + max_retries: Maximum retry attempts before dead-letter. + delay_seconds: Seconds to wait before first execution. + + Returns: + The created BackgroundJob instance. + """ + now = datetime.utcnow() + job = BackgroundJob( + job_type=job_type, + payload=payload or {}, + status=JobStatus.PENDING, + attempt=0, + max_retries=max_retries, + next_run_at=now + timedelta(seconds=delay_seconds), + ) + db.session.add(job) + db.session.commit() + logger.info( + "Enqueued job id=%s type=%s max_retries=%s delay=%ss", + job.id, + job_type, + max_retries, + delay_seconds, + ) + _track_job_event("enqueued", job_type) + return job + + +def calculate_backoff(attempt: int) -> timedelta: + """Calculate exponential backoff delay for a given attempt number. + + Attempt 1 -> 30s, Attempt 2 -> 2min, Attempt 3 -> 8min, etc. + """ + seconds = BACKOFF_BASE_SECONDS * (BACKOFF_MULTIPLIER ** (attempt - 1)) + return timedelta(seconds=seconds) + + +def execute_job(job: BackgroundJob) -> bool: + """Execute a single background job. + + Runs the registered handler for the job type. On success, marks the job as + SUCCESS. On failure, either schedules a retry (with backoff) or moves the + job to DEAD_LETTER if max retries exhausted. + + Args: + job: The BackgroundJob to execute. + + Returns: + True if the job succeeded, False otherwise. + """ + handler = _job_handlers.get(job.job_type) + if handler is None: + logger.error("No handler registered for job_type=%s", job.job_type) + job.status = JobStatus.FAILED + job.last_error = f"No handler registered for job type: {job.job_type}" + job.completed_at = datetime.utcnow() + job.updated_at = datetime.utcnow() + db.session.commit() + _track_job_event("failed", job.job_type, reason="no_handler") + return False + + job.status = JobStatus.RUNNING + job.attempt += 1 + job.updated_at = datetime.utcnow() + db.session.commit() + + logger.info( + "Executing job id=%s type=%s attempt=%s/%s", + job.id, + job.job_type, + job.attempt, + job.max_retries, + ) + + try: + result = handler(job.payload) + job.status = JobStatus.SUCCESS + job.result = result if isinstance(result, dict) else {"value": str(result)} + job.completed_at = datetime.utcnow() + job.updated_at = datetime.utcnow() + db.session.commit() + logger.info("Job id=%s succeeded on attempt=%s", job.id, job.attempt) + _track_job_event("succeeded", job.job_type) + return True + + except Exception as exc: + error_msg = f"{type(exc).__name__}: {exc}" + logger.warning( + "Job id=%s failed on attempt=%s/%s: %s", + job.id, + job.attempt, + job.max_retries, + error_msg, + ) + job.last_error = error_msg + job.updated_at = datetime.utcnow() + + if job.attempt >= job.max_retries: + job.status = JobStatus.DEAD_LETTER + job.completed_at = datetime.utcnow() + logger.error( + "Job id=%s moved to DEAD_LETTER after %s attempts", + job.id, + job.attempt, + ) + _track_job_event("dead_lettered", job.job_type) + else: + backoff = calculate_backoff(job.attempt) + job.status = JobStatus.PENDING + job.next_run_at = datetime.utcnow() + backoff + logger.info( + "Job id=%s scheduled for retry in %s (attempt %s/%s)", + job.id, + backoff, + job.attempt + 1, + job.max_retries, + ) + _track_job_event("retried", job.job_type) + + db.session.commit() + return False + + +def process_due_jobs(limit: int = 50) -> dict[str, int]: + """Process all jobs that are due for execution. + + Finds PENDING jobs where next_run_at <= now, executes them, and returns + a summary of results. + + Args: + limit: Maximum number of jobs to process in one batch. + + Returns: + Dict with counts: {"processed": N, "succeeded": N, "failed": N, "dead_lettered": N} + """ + now = datetime.utcnow() + due_jobs = ( + BackgroundJob.query.filter( + BackgroundJob.status == JobStatus.PENDING, + BackgroundJob.next_run_at <= now, + ) + .order_by(BackgroundJob.next_run_at) + .limit(limit) + .all() + ) + + stats = {"processed": 0, "succeeded": 0, "failed": 0, "dead_lettered": 0} + + for job in due_jobs: + stats["processed"] += 1 + # Re-fetch to avoid stale data in batch processing + db.session.refresh(job) + if job.status != JobStatus.PENDING: + continue + + success = execute_job(job) + if success: + stats["succeeded"] += 1 + elif job.status == JobStatus.DEAD_LETTER: + stats["dead_lettered"] += 1 + else: + stats["failed"] += 1 + + if stats["processed"] > 0: + logger.info("Job batch complete: %s", stats) + + return stats + + +def retry_dead_letter_job(job_id: int) -> bool: + """Manually retry a dead-lettered job. + + Resets the job to PENDING with attempt=0 and immediate next_run_at. + + Args: + job_id: ID of the dead-lettered job. + + Returns: + True if the job was reset, False if not found or not in DEAD_LETTER. + """ + job = db.session.get(BackgroundJob, job_id) + if not job or job.status != JobStatus.DEAD_LETTER: + return False + + job.status = JobStatus.PENDING + job.attempt = 0 + job.last_error = None + job.next_run_at = datetime.utcnow() + job.updated_at = datetime.utcnow() + job.completed_at = None + db.session.commit() + logger.info("Dead-letter job id=%s reset to PENDING for manual retry", job_id) + _track_job_event("manual_retry", job.job_type) + return True + + +def get_job_stats() -> dict[str, Any]: + """Get aggregated job statistics for monitoring. + + Returns: + Dict with counts by status and per-type breakdown. + """ + from sqlalchemy import func + + status_counts = dict( + db.session.query(BackgroundJob.status, func.count(BackgroundJob.id)) + .group_by(BackgroundJob.status) + .all() + ) + + type_counts = dict( + db.session.query(BackgroundJob.job_type, func.count(BackgroundJob.id)) + .group_by(BackgroundJob.job_type) + .all() + ) + + # Convert enum keys to strings + status_counts_str = {str(k): v for k, v in status_counts.items()} + + return { + "by_status": status_counts_str, + "by_type": type_counts, + "total": sum(status_counts.values()), + } + + +def _track_job_event(event: str, job_type: str, reason: str = "ok") -> None: + """Record a job event to Prometheus metrics if observability is available.""" + try: + from flask import current_app, has_request_context + + if has_request_context(): + obs = current_app.extensions.get("observability") + if obs: + obs.record_job_event(event=event, job_type=job_type, status=reason) + except Exception: + pass # Observability is best-effort diff --git a/packages/backend/app/services/reminders.py b/packages/backend/app/services/reminders.py index 42e9c252..c25e4f6f 100644 --- a/packages/backend/app/services/reminders.py +++ b/packages/backend/app/services/reminders.py @@ -1,7 +1,9 @@ import smtplib +import logging from email.message import EmailMessage from ..config import Settings from ..models import Reminder +from .jobs import register_handler try: from twilio.rest import Client as TwilioClient @@ -9,6 +11,7 @@ TwilioClient = None +logger = logging.getLogger("finmind.reminders.service") _settings = Settings() @@ -57,15 +60,60 @@ def send_whatsapp(to_number: str, body: str): def send_reminder(r: Reminder): + """Send a reminder via the configured channel. + + Returns True on success, False on failure (legacy interface). + Raises RuntimeError on failure when called from the job system. + """ # Channel holds 'email' or 'whatsapp:' if r.channel == "whatsapp": return False if r.channel.startswith("whatsapp:"): to = r.channel.split(":", 1)[1] - return send_whatsapp(to, r.message) + success = send_whatsapp(to, r.message) + if not success: + raise RuntimeError(f"WhatsApp send failed to {to}") + return True else: # Fallback: assume email stored in channel as email # or pull from user profile later to = r.channel if "@" in r.channel else (_settings.email_from or "") subject = "Bill Reminder" - return send_email(to, subject, r.message) + success = send_email(to, subject, r.message) + if not success: + raise RuntimeError(f"Email send failed to {to}") + return True + + +# --------------------------------------------------------------------------- +# Job handler: integrates send_reminder with the resilient job system +# --------------------------------------------------------------------------- + + +@register_handler("send_reminder") +def handle_send_reminder_job(payload: dict) -> dict: + """Job handler for sending a reminder. + + Expected payload: {"reminder_id": int} + Fetches the Reminder from DB, sends it, and marks it as sent. + Raises on failure so the job system can retry. + """ + from ..extensions import db + + reminder_id = payload.get("reminder_id") + if not reminder_id: + raise ValueError("Missing reminder_id in job payload") + + reminder = db.session.get(Reminder, reminder_id) + if not reminder: + raise ValueError(f"Reminder {reminder_id} not found") + + if reminder.sent: + logger.info("Reminder %s already sent, skipping", reminder_id) + return {"status": "already_sent", "reminder_id": reminder_id} + + send_reminder(reminder) + reminder.sent = True + db.session.commit() + logger.info("Reminder %s sent successfully via %s", reminder_id, reminder.channel) + return {"status": "sent", "reminder_id": reminder_id, "channel": reminder.channel} diff --git a/packages/backend/tests/test_jobs.py b/packages/backend/tests/test_jobs.py new file mode 100644 index 00000000..8e67273b --- /dev/null +++ b/packages/backend/tests/test_jobs.py @@ -0,0 +1,391 @@ +"""Tests for the resilient background job system. + +Covers: enqueue, execute, retry with backoff, dead-letter, manual retry, +job stats, and admin endpoints. +""" + +import os +import pytest +from datetime import datetime, timedelta +from unittest.mock import patch + +os.environ.setdefault("FLASK_ENV", "testing") + +from app import create_app # noqa: E402 +from app.config import Settings # noqa: E402 +from app.extensions import db, redis_client # noqa: E402 +from app.models import BackgroundJob, JobStatus, User, Role # noqa: E402 +from app.services.jobs import ( # noqa: E402 + enqueue, + execute_job, + process_due_jobs, + retry_dead_letter_job, + get_job_stats, + calculate_backoff, + register_handler, + _job_handlers, +) + + +class TestSettings(Settings): + database_url: str = "sqlite+pysqlite:///:memory:" + redis_url: str = "redis://localhost:6379/15" + jwt_secret: str = "test-secret-jobs-32plus-chars-1234567890" + + +@pytest.fixture() +def app(): + settings = TestSettings() + app = create_app(settings) + app.config.update(TESTING=True) + with app.app_context(): + db.create_all() + try: + redis_client.flushdb() + except Exception: + pass + yield app + with app.app_context(): + db.session.remove() + db.drop_all() + try: + redis_client.flushdb() + except Exception: + pass + + +@pytest.fixture() +def app_ctx(app): + """Push app context for service-level tests.""" + ctx = app.app_context() + ctx.push() + yield + ctx.pop() + + +@pytest.fixture() +def client(app): + return app.test_client() + + +@pytest.fixture() +def admin_auth(client): + """Register an admin user and return auth header.""" + # Register + r = client.post( + "/auth/register", + json={"email": "admin@test.com", "password": "adminpass123"}, + ) + assert r.status_code in (200, 201, 409) + # Promote to admin directly via DB + with client.application.app_context(): + user = User.query.filter_by(email="admin@test.com").first() + user.role = Role.ADMIN.value + db.session.commit() + # Login + r = client.post( + "/auth/login", + json={"email": "admin@test.com", "password": "adminpass123"}, + ) + assert r.status_code == 200 + return {"Authorization": f"Bearer {r.get_json()['access_token']}"} + + +@pytest.fixture() +def regular_auth(client): + """Register a regular user and return auth header.""" + r = client.post( + "/auth/register", + json={"email": "user@test.com", "password": "userpass123"}, + ) + assert r.status_code in (200, 201, 409) + r = client.post( + "/auth/login", + json={"email": "user@test.com", "password": "userpass123"}, + ) + assert r.status_code == 200 + return {"Authorization": f"Bearer {r.get_json()['access_token']}"} + + +# ─── Unit tests: service layer ──────────────────────────────────────────── + + +def test_calculate_backoff(): + assert calculate_backoff(1).total_seconds() == 30 + assert calculate_backoff(2).total_seconds() == 120 # 2 min + assert calculate_backoff(3).total_seconds() == 480 # 8 min + assert calculate_backoff(4).total_seconds() == 1920 # 32 min + + +def test_enqueue_creates_pending_job(app_ctx): + # Register a dummy handler + @register_handler("test_job") + def handle_test(payload): + return {"ok": True} + + job = enqueue("test_job", payload={"key": "value"}, max_retries=2) + assert job.id is not None + assert job.status == JobStatus.PENDING + assert job.job_type == "test_job" + assert job.payload == {"key": "value"} + assert job.max_retries == 2 + assert job.attempt == 0 + + # Cleanup handler + _job_handlers.pop("test_job", None) + + +def test_execute_success(app_ctx): + @register_handler("success_job") + def handle_success(payload): + return {"result": "done"} + + job = enqueue("success_job", payload={"x": 1}) + success = execute_job(job) + assert success is True + assert job.status == JobStatus.SUCCESS + assert job.attempt == 1 + assert job.result == {"result": "done"} + assert job.completed_at is not None + + _job_handlers.pop("success_job", None) + + +def test_execute_retry_on_failure(app_ctx): + call_count = 0 + + @register_handler("flaky_job") + def handle_flaky(payload): + nonlocal call_count + call_count += 1 + if call_count < 3: + raise ConnectionError("Temporary failure") + return {"ok": True} + + job = enqueue("flaky_job", max_retries=3) + + # First attempt: fails, schedules retry + success = execute_job(job) + assert success is False + assert job.status == JobStatus.PENDING + assert job.attempt == 1 + assert job.next_run_at > datetime.utcnow() + assert "ConnectionError" in job.last_error + + # Second attempt: fails again + job.next_run_at = datetime.utcnow() # Force due + success = execute_job(job) + assert success is False + assert job.attempt == 2 + + # Third attempt: succeeds + job.next_run_at = datetime.utcnow() + success = execute_job(job) + assert success is True + assert job.status == JobStatus.SUCCESS + + _job_handlers.pop("flaky_job", None) + + +def test_execute_dead_letter_after_max_retries(app_ctx): + @register_handler("always_fail") + def handle_fail(payload): + raise ValueError("Permanent failure") + + job = enqueue("always_fail", max_retries=2) + + # Attempt 1: retry + execute_job(job) + assert job.status == JobStatus.PENDING + assert job.attempt == 1 + + # Attempt 2: dead letter + job.next_run_at = datetime.utcnow() + execute_job(job) + assert job.status == JobStatus.DEAD_LETTER + assert job.attempt == 2 + assert job.completed_at is not None + + _job_handlers.pop("always_fail", None) + + +def test_execute_no_handler(app_ctx): + job = enqueue("nonexistent_type") + success = execute_job(job) + assert success is False + assert job.status == JobStatus.FAILED + assert "No handler" in job.last_error + + +def test_process_due_jobs(app_ctx): + @register_handler("batch_job") + def handle_batch(payload): + if payload.get("fail"): + raise RuntimeError("batch fail") + return {"ok": True} + + # Create 3 jobs: 2 succeed, 1 fails permanently + now = datetime.utcnow() + for i in range(2): + enqueue("batch_job", payload={"i": i}) + enqueue("batch_job", payload={"fail": True}, max_retries=1) + + stats = process_due_jobs() + assert stats["processed"] == 3 + assert stats["succeeded"] == 2 + assert stats["dead_lettered"] == 1 + + _job_handlers.pop("batch_job", None) + + +def test_process_due_jobs_skips_not_due(app_ctx): + @register_handler("future_job") + def handle_future(payload): + return {"ok": True} + + job = enqueue("future_job", delay_seconds=3600) + stats = process_due_jobs() + assert stats["processed"] == 0 + + # Clean up + job.delete() + db.session.commit() + _job_handlers.pop("future_job", None) + + +def test_retry_dead_letter(app_ctx): + @register_handler("retry_dlq") + def handle_retry(payload): + raise ValueError("fail") + + job = enqueue("retry_dlq", max_retries=1) + execute_job(job) + assert job.status == JobStatus.DEAD_LETTER + + success = retry_dead_letter_job(job.id) + assert success is True + assert job.status == JobStatus.PENDING + assert job.attempt == 0 + assert job.last_error is None + assert job.completed_at is None + + _job_handlers.pop("retry_dlq", None) + + +def test_retry_non_dead_letter_returns_false(app_ctx): + @register_handler("active_job") + def handle_active(payload): + return {"ok": True} + + job = enqueue("active_job") + assert retry_dead_letter_job(job.id) is False # PENDING, not DEAD_LETTER + + _job_handlers.pop("active_job", None) + + +def test_get_job_stats(app_ctx): + @register_handler("stats_job") + def handle_stats(payload): + return {"ok": True} + + enqueue("stats_job") + enqueue("stats_job") + enqueue("stats_job") + + # Execute all + process_due_jobs() + + stats = get_job_stats() + assert stats["total"] == 3 + assert stats["by_status"].get("JobStatus.SUCCESS", 0) == 3 + assert stats["by_type"]["stats_job"] == 3 + + _job_handlers.pop("stats_job", None) + + +# ─── Integration tests: admin API ───────────────────────────────────────── + + +def test_admin_job_stats(client, admin_auth): + r = client.get("/admin/jobs/stats", headers=admin_auth) + assert r.status_code == 200 + data = r.get_json() + assert "by_status" in data + assert "by_type" in data + assert "total" in data + + +def test_admin_list_jobs(client, admin_auth): + r = client.get("/admin/jobs", headers=admin_auth) + assert r.status_code == 200 + data = r.get_json() + assert "total" in data + assert "jobs" in data + + +def test_admin_list_jobs_filter(client, admin_auth): + r = client.get("/admin/jobs?status=DEAD_LETTER&limit=10", headers=admin_auth) + assert r.status_code == 200 + + +def test_admin_list_jobs_invalid_status(client, admin_auth): + r = client.get("/admin/jobs?status=INVALID", headers=admin_auth) + assert r.status_code == 400 + + +def test_admin_get_job_not_found(client, admin_auth): + r = client.get("/admin/jobs/99999", headers=admin_auth) + assert r.status_code == 404 + + +def test_admin_trigger_process(client, admin_auth): + r = client.post("/admin/jobs/process", headers=admin_auth) + assert r.status_code == 200 + data = r.get_json() + assert "processed" in data + + +def test_admin_delete_job_not_found(client, admin_auth): + r = client.delete("/admin/jobs/99999", headers=admin_auth) + assert r.status_code == 404 + + +def test_admin_endpoints_require_admin(client, regular_auth): + """Regular users should get 403 on admin endpoints.""" + assert client.get("/admin/jobs/stats", headers=regular_auth).status_code == 403 + assert client.get("/admin/jobs", headers=regular_auth).status_code == 403 + assert client.post("/admin/jobs/process", headers=regular_auth).status_code == 403 + + +def test_admin_endpoints_require_auth(client): + """Unauthenticated requests should get 401.""" + assert client.get("/admin/jobs/stats").status_code == 401 + + +def test_admin_retry_dead_letter_via_api(client, admin_auth, app_ctx): + """Full flow: create dead-letter job, retry via API.""" + @register_handler("api_retry_test") + def handle_api_retry(payload): + raise ValueError("fail for test") + + job = enqueue("api_retry_test", max_retries=1) + execute_job(job) + assert job.status == JobStatus.DEAD_LETTER + + r = client.post(f"/admin/jobs/{job.id}/retry", headers=admin_auth) + assert r.status_code == 200 + assert r.get_json()["status"] == "retried" + + _job_handlers.pop("api_retry_test", None) + + +def test_admin_retry_non_dead_letter_fails(client, admin_auth, app_ctx): + @register_handler("pending_retry") + def handle_pending(payload): + return {"ok": True} + + job = enqueue("pending_retry") + r = client.post(f"/admin/jobs/{job.id}/retry", headers=admin_auth) + assert r.status_code == 400 + + _job_handlers.pop("pending_retry", None) diff --git a/packages/backend/tests/test_savings.py b/packages/backend/tests/test_savings.py new file mode 100644 index 00000000..85aa4738 --- /dev/null +++ b/packages/backend/tests/test_savings.py @@ -0,0 +1,347 @@ +"""Tests for goal-based savings tracking & milestones (Issue #133).""" + +import os +import pytest +from datetime import date, timedelta + +os.environ.setdefault("FLASK_ENV", "testing") + +from app import create_app # noqa: E402 +from app.config import Settings # noqa: E402 +from app.extensions import db, redis_client # noqa: E402 +from app.models import SavingsGoal, SavingsContribution # noqa: E402 + + +class TestSettings(Settings): + database_url: str = "sqlite+pysqlite:///:memory:" + redis_url: str = "redis://localhost:6379/15" + jwt_secret: str = "test-savings-32plus-chars-secret-12345" + + +@pytest.fixture() +def app(): + settings = TestSettings() + app = create_app(settings) + app.config.update(TESTING=True) + with app.app_context(): + db.create_all() + try: + redis_client.flushdb() + except Exception: + pass + yield app + with app.app_context(): + db.session.remove() + db.drop_all() + try: + redis_client.flushdb() + except Exception: + pass + + +@pytest.fixture() +def client(app): + return app.test_client() + + +@pytest.fixture() +def auth(client): + """Register and login, return auth header.""" + client.post( + "/auth/register", + json={"email": "saver@test.com", "password": "pass123456"}, + ) + r = client.post( + "/auth/login", + json={"email": "saver@test.com", "password": "pass123456"}, + ) + return {"Authorization": f"Bearer {r.get_json()['access_token']}"} + + +@pytest.fixture() +def goal(client, auth): + """Create a default test goal and return its data.""" + r = client.post( + "/savings/goals", + json={ + "name": "Vacation Fund", + "target_amount": 1000.00, + "currency": "USD", + "category": "travel", + "deadline": (date.today() + timedelta(days=180)).isoformat(), + }, + headers=auth, + ) + assert r.status_code == 201 + return r.get_json() + + +# ─── Goal CRUD ───────────────────────────────────────────────────────────── + + +def test_create_goal(client, auth): + r = client.post( + "/savings/goals", + json={"name": "Emergency Fund", "target_amount": 5000}, + headers=auth, + ) + assert r.status_code == 201 + data = r.get_json() + assert data["name"] == "Emergency Fund" + assert data["target_amount"] == 5000.0 + assert data["current_amount"] == 0.0 + assert data["status"] == "ACTIVE" + assert data["progress_pct"] == 0.0 + assert data["remaining"] == 5000.0 + assert data["milestones_reached"] == [] + + +def test_create_goal_validation(client, auth): + # Missing name + r = client.post("/savings/goals", json={"target_amount": 100}, headers=auth) + assert r.status_code == 400 + + # Missing target + r = client.post("/savings/goals", json={"name": "Test"}, headers=auth) + assert r.status_code == 400 + + # Zero target + r = client.post( + "/savings/goals", json={"name": "Test", "target_amount": 0}, headers=auth + ) + assert r.status_code == 400 + + # Negative target + r = client.post( + "/savings/goals", json={"name": "Test", "target_amount": -100}, headers=auth + ) + assert r.status_code == 400 + + +def test_list_goals(client, auth, goal): + r = client.get("/savings/goals", headers=auth) + assert r.status_code == 200 + data = r.get_json() + assert len(data) == 1 + assert data[0]["name"] == "Vacation Fund" + + +def test_list_goals_filter_status(client, auth, goal): + r = client.get("/savings/goals?status=COMPLETED", headers=auth) + assert r.status_code == 200 + assert len(r.get_json()) == 0 + + r = client.get("/savings/goals?status=ACTIVE", headers=auth) + assert len(r.get_json()) == 1 + + +def test_get_goal(client, auth, goal): + gid = goal["id"] + r = client.get(f"/savings/goals/{gid}", headers=auth) + assert r.status_code == 200 + data = r.get_json() + assert data["id"] == gid + assert "contributions" in data + assert len(data["contributions"]) == 0 + + +def test_get_goal_not_found(client, auth): + r = client.get("/savings/goals/99999", headers=auth) + assert r.status_code == 404 + + +def test_update_goal(client, auth, goal): + gid = goal["id"] + r = client.patch( + f"/savings/goals/{gid}", + json={"name": "Beach Vacation", "target_amount": 1500}, + headers=auth, + ) + assert r.status_code == 200 + data = r.get_json() + assert data["name"] == "Beach Vacation" + assert data["target_amount"] == 1500.0 + + +def test_update_goal_status(client, auth, goal): + gid = goal["id"] + r = client.patch( + f"/savings/goals/{gid}", + json={"status": "PAUSED"}, + headers=auth, + ) + assert r.status_code == 200 + assert r.get_json()["status"] == "PAUSED" + + +def test_delete_goal(client, auth, goal): + gid = goal["id"] + r = client.delete(f"/savings/goals/{gid}", headers=auth) + assert r.status_code == 200 + assert r.get_json()["message"] == "deleted" + + r = client.get(f"/savings/goals/{gid}", headers=auth) + assert r.status_code == 404 + + +# ─── Contributions ───────────────────────────────────────────────────────── + + +def test_contribute(client, auth, goal): + gid = goal["id"] + r = client.post( + f"/savings/goals/{gid}/contribute", + json={"amount": 250, "note": "first deposit"}, + headers=auth, + ) + assert r.status_code == 201 + data = r.get_json() + assert data["current_amount"] == 250.0 + assert data["progress_pct"] == 25.0 + assert data["remaining"] == 750.0 + assert 25 in data["milestones_reached"] + assert len(data["contributions"]) == 1 + assert data["contributions"][0]["note"] == "first deposit" + + +def test_contribute_validation(client, auth, goal): + gid = goal["id"] + # Zero amount + r = client.post( + f"/savings/goals/{gid}/contribute", + json={"amount": 0}, + headers=auth, + ) + assert r.status_code == 400 + + # Negative amount + r = client.post( + f"/savings/goals/{gid}/contribute", + json={"amount": -50}, + headers=auth, + ) + assert r.status_code == 400 + + # Missing amount + r = client.post(f"/savings/goals/{gid}/contribute", json={}, headers=auth) + assert r.status_code == 400 + + +def test_multiple_contributions_accumulate(client, auth, goal): + gid = goal["id"] + client.post(f"/savings/goals/{gid}/contribute", json={"amount": 100}, headers=auth) + client.post(f"/savings/goals/{gid}/contribute", json={"amount": 150}, headers=auth) + client.post(f"/savings/goals/{gid}/contribute", json={"amount": 500}, headers=auth) + + r = client.get(f"/savings/goals/{gid}", headers=auth) + data = r.get_json() + assert data["current_amount"] == 750.0 + assert data["progress_pct"] == 75.0 + assert sorted(data["milestones_reached"]) == [25, 50, 75] + + +def test_goal_auto_completes_at_100(client, auth, goal): + gid = goal["id"] + r = client.post( + f"/savings/goals/{gid}/contribute", + json={"amount": 1000}, + headers=auth, + ) + data = r.get_json() + assert data["status"] == "COMPLETED" + assert data["progress_pct"] == 100.0 + assert data["milestones_reached"] == [25, 50, 75, 100] + + +def test_contribute_exceeds_target(client, auth, goal): + """Contributions can exceed the target (over-saving).""" + gid = goal["id"] + r = client.post( + f"/savings/goals/{gid}/contribute", + json={"amount": 1200}, + headers=auth, + ) + data = r.get_json() + assert data["current_amount"] == 1200.0 + assert data["status"] == "COMPLETED" + assert data["progress_pct"] == 120.0 + + +# ─── Milestones ──────────────────────────────────────────────────────────── + + +def test_milestones_endpoint(client, auth, goal): + gid = goal["id"] + client.post(f"/savings/goals/{gid}/contribute", json={"amount": 500}, headers=auth) + + r = client.get(f"/savings/goals/{gid}/milestones", headers=auth) + assert r.status_code == 200 + data = r.get_json() + assert data["progress_pct"] == 50.0 + assert data["next_milestone"] == 75 + assert data["amount_to_next"] == 250.0 + + milestones = {m["threshold"]: m for m in data["milestones"]} + assert milestones[25]["reached"] is True + assert milestones[50]["reached"] is True + assert milestones[75]["reached"] is False + assert milestones[100]["reached"] is False + + +def test_milestones_all_reached(client, auth, goal): + gid = goal["id"] + client.post(f"/savings/goals/{gid}/contribute", json={"amount": 1000}, headers=auth) + + r = client.get(f"/savings/goals/{gid}/milestones", headers=auth) + data = r.get_json() + assert data["next_milestone"] is None + assert data["amount_to_next"] == 0.0 + milestones = {m["threshold"]: m for m in data["milestones"]} + assert all(m["reached"] for m in milestones.values()) + + +def test_milestones_not_found(client, auth): + r = client.get("/savings/goals/99999/milestones", headers=auth) + assert r.status_code == 404 + + +# ─── Auth & Isolation ────────────────────────────────────────────────────── + + +def test_goals_require_auth(client): + assert client.get("/savings/goals").status_code == 401 + assert client.post("/savings/goals", json={"name": "X", "target_amount": 1}).status_code == 401 + + +def test_users_cannot_see_others_goals(client, auth, goal): + """Create another user and verify isolation.""" + client.post( + "/auth/register", + json={"email": "other@test.com", "password": "pass123456"}, + ) + r = client.post( + "/auth/login", + json={"email": "other@test.com", "password": "pass123456"}, + ) + other_auth = {"Authorization": f"Bearer {r.get_json()['access_token']}"} + + # Other user sees empty list + r = client.get("/savings/goals", headers=other_auth) + assert len(r.get_json()) == 0 + + # Other user cannot access the goal + r = client.get(f"/savings/goals/{goal['id']}", headers=other_auth) + assert r.status_code == 404 + + +def test_contribute_to_others_goal_fails(client, auth, goal): + client.post("/auth/register", json={"email": "hacker@test.com", "password": "pass123"}) + r = client.post("/auth/login", json={"email": "hacker@test.com", "password": "pass123"}) + hacker_auth = {"Authorization": f"Bearer {r.get_json()['access_token']}"} + + r = client.post( + f"/savings/goals/{goal['id']}/contribute", + json={"amount": 100}, + headers=hacker_auth, + ) + assert r.status_code == 404