Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions packages/backend/app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,42 @@ def _ensure_schema_compatibility(app: Flask) -> None:
NOT NULL DEFAULT 'INR'
"""
)
# Background jobs table (idempotent migration)
cur.execute(
"""
DO $$ BEGIN
CREATE TYPE job_status AS ENUM ('PENDING','RUNNING','SUCCESS','FAILED','DEAD_LETTER');
EXCEPTION WHEN duplicate_object THEN NULL;
END $$;
"""
)
cur.execute(
"""
CREATE TABLE IF NOT EXISTS background_jobs (
id SERIAL PRIMARY KEY,
job_type VARCHAR(100) NOT NULL,
payload JSONB NOT NULL DEFAULT '{}',
status job_status NOT NULL DEFAULT 'PENDING',
attempt INT NOT NULL DEFAULT 0,
max_retries INT NOT NULL DEFAULT 3,
next_run_at TIMESTAMP NOT NULL DEFAULT NOW(),
last_error TEXT,
result JSONB,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW(),
completed_at TIMESTAMP
);
"""
)
cur.execute(
"CREATE INDEX IF NOT EXISTS idx_bg_jobs_status_next_run ON background_jobs(status, next_run_at);"
)
cur.execute(
"CREATE INDEX IF NOT EXISTS idx_bg_jobs_type ON background_jobs(job_type);"
)
cur.execute(
"CREATE INDEX IF NOT EXISTS idx_bg_jobs_created ON background_jobs(created_at DESC);"
)
conn.commit()
except Exception:
app.logger.exception(
Expand Down
27 changes: 27 additions & 0 deletions packages/backend/app/db/migrations/001_background_jobs.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
-- Background job tracking for resilient retry & monitoring
-- Part of: Resilient background job retry & monitoring (Issue #130)

DO $$ BEGIN
CREATE TYPE job_status AS ENUM ('PENDING', 'RUNNING', 'SUCCESS', 'FAILED', 'DEAD_LETTER');
EXCEPTION
WHEN duplicate_object THEN NULL;
END $$;

CREATE TABLE IF NOT EXISTS background_jobs (
id SERIAL PRIMARY KEY,
job_type VARCHAR(100) NOT NULL,
payload JSONB NOT NULL DEFAULT '{}',
status job_status NOT NULL DEFAULT 'PENDING',
attempt INT NOT NULL DEFAULT 0,
max_retries INT NOT NULL DEFAULT 3,
next_run_at TIMESTAMP NOT NULL DEFAULT NOW(),
last_error TEXT,
result JSONB,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW(),
completed_at TIMESTAMP
);

CREATE INDEX IF NOT EXISTS idx_bg_jobs_status_next_run ON background_jobs(status, next_run_at);
CREATE INDEX IF NOT EXISTS idx_bg_jobs_type ON background_jobs(job_type);
CREATE INDEX IF NOT EXISTS idx_bg_jobs_created ON background_jobs(created_at DESC);
25 changes: 25 additions & 0 deletions packages/backend/app/db/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,28 @@ CREATE TABLE IF NOT EXISTS audit_logs (
action VARCHAR(100) NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT NOW()
);

-- Background job tracking for resilient retry & dead-letter support
DO $$ BEGIN
CREATE TYPE job_status AS ENUM ('PENDING','RUNNING','SUCCESS','FAILED','DEAD_LETTER');
EXCEPTION
WHEN duplicate_object THEN NULL;
END $$;

CREATE TABLE IF NOT EXISTS background_jobs (
id SERIAL PRIMARY KEY,
job_type VARCHAR(100) NOT NULL,
payload JSONB NOT NULL DEFAULT '{}',
status job_status NOT NULL DEFAULT 'PENDING',
attempt INT NOT NULL DEFAULT 0,
max_retries INT NOT NULL DEFAULT 3,
next_run_at TIMESTAMP NOT NULL DEFAULT NOW(),
last_error TEXT,
result JSONB,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW(),
completed_at TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_bg_jobs_status_next_run ON background_jobs(status, next_run_at);
CREATE INDEX IF NOT EXISTS idx_bg_jobs_type ON background_jobs(job_type);
CREATE INDEX IF NOT EXISTS idx_bg_jobs_created ON background_jobs(created_at DESC);
29 changes: 29 additions & 0 deletions packages/backend/app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,32 @@ class AuditLog(db.Model):
user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=True)
action = db.Column(db.String(100), nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)


class JobStatus(str, Enum):
PENDING = "PENDING"
RUNNING = "RUNNING"
SUCCESS = "SUCCESS"
FAILED = "FAILED"
DEAD_LETTER = "DEAD_LETTER"


class BackgroundJob(db.Model):
"""Tracks background job execution with retry support."""

__tablename__ = "background_jobs"

id = db.Column(db.Integer, primary_key=True)
job_type = db.Column(db.String(100), nullable=False, index=True)
payload = db.Column(db.JSON, nullable=False, default=dict)
status = db.Column(
SAEnum(JobStatus), nullable=False, default=JobStatus.PENDING, index=True
)
attempt = db.Column(db.Integer, nullable=False, default=0)
max_retries = db.Column(db.Integer, nullable=False, default=3)
next_run_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow)
last_error = db.Column(db.Text, nullable=True)
result = db.Column(db.JSON, nullable=True)
created_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow)
completed_at = db.Column(db.DateTime, nullable=True)
19 changes: 19 additions & 0 deletions packages/backend/app/observability.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ def __init__(self) -> None:
["event", "channel", "status"],
registry=self.registry,
)
self.job_events_total = Counter(
"finmind_job_events_total",
"Background job lifecycle events.",
["event", "job_type", "status"],
registry=self.registry,
)

def observe_http_request(
self, method: str, endpoint: str, status_code: int, duration_seconds: float
Expand All @@ -79,6 +85,13 @@ def record_reminder_event(
event=event, channel=channel, status=status
).inc()

def record_job_event(
self, event: str, job_type: str, status: str = "ok"
) -> None:
self.job_events_total.labels(
event=event, job_type=job_type, status=status
).inc()

def metrics_response(self) -> Response:
if self.multiprocess_enabled:
registry = CollectorRegistry()
Expand Down Expand Up @@ -137,3 +150,9 @@ def track_reminder_event(event: str, channel: str, status: str = "ok") -> None:
obs = current_app.extensions.get("observability")
if obs:
obs.record_reminder_event(event=event, channel=channel, status=status)


def track_job_event(event: str, job_type: str, status: str = "ok") -> None:
obs = current_app.extensions.get("observability")
if obs:
obs.record_job_event(event=event, job_type=job_type, status=status)
137 changes: 137 additions & 0 deletions packages/backend/app/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ tags:
- name: Bills
- name: Reminders
- name: Insights
- name: Jobs
paths:
/auth/register:
post:
Expand Down Expand Up @@ -481,6 +482,127 @@ paths:
application/json:
schema: { $ref: '#/components/schemas/Error' }

# ── Background Jobs (Admin) ──────────────────────────────────────────

/admin/jobs/stats:
get:
summary: Aggregated job statistics
tags: [Jobs]
security: [bearerAuth: []]
responses:
'200':
description: Job stats by status and type
content:
application/json:
schema:
type: object
properties:
by_status: { type: object, additionalProperties: { type: integer } }
by_type: { type: object, additionalProperties: { type: integer } }
total: { type: integer }
'403': { description: Admin required }

/admin/jobs:
get:
summary: List background jobs
tags: [Jobs]
security: [bearerAuth: []]
parameters:
- name: status
in: query
schema: { type: string, enum: [PENDING, RUNNING, SUCCESS, FAILED, DEAD_LETTER] }
- name: job_type
in: query
schema: { type: string }
- name: limit
in: query
schema: { type: integer, default: 50 }
- name: offset
in: query
schema: { type: integer, default: 0 }
responses:
'200':
description: Paginated job list
content:
application/json:
schema:
type: object
properties:
total: { type: integer }
offset: { type: integer }
limit: { type: integer }
jobs:
type: array
items: { $ref: '#/components/schemas/BackgroundJob' }
'403': { description: Admin required }

/admin/jobs/{job_id}:
get:
summary: Get job details
tags: [Jobs]
security: [bearerAuth: []]
parameters:
- name: job_id
in: path
required: true
schema: { type: integer }
responses:
'200':
description: Job details
content:
application/json:
schema: { $ref: '#/components/schemas/BackgroundJob' }
'404': { description: Not found }
delete:
summary: Delete a job record
tags: [Jobs]
security: [bearerAuth: []]
parameters:
- name: job_id
in: path
required: true
schema: { type: integer }
responses:
'200': { description: Deleted }
'404': { description: Not found }

/admin/jobs/{job_id}/retry:
post:
summary: Retry a dead-lettered job
tags: [Jobs]
security: [bearerAuth: []]
parameters:
- name: job_id
in: path
required: true
schema: { type: integer }
responses:
'200': { description: Job reset to PENDING }
'400': { description: Job not in DEAD_LETTER status }
'404': { description: Not found }

/admin/jobs/process:
post:
summary: Manually trigger job processing
tags: [Jobs]
security: [bearerAuth: []]
parameters:
- name: limit
in: query
schema: { type: integer, default: 50 }
responses:
'200':
description: Processing results
content:
application/json:
schema:
type: object
properties:
processed: { type: integer }
succeeded: { type: integer }
failed: { type: integer }
dead_lettered: { type: integer }

components:
securitySchemes:
bearerAuth:
Expand Down Expand Up @@ -587,3 +709,18 @@ components:
message: { type: string }
send_at: { type: string, format: date-time }
channel: { type: string, enum: [email, whatsapp], default: email }
BackgroundJob:
type: object
properties:
id: { type: integer }
job_type: { type: string }
payload: { type: object }
status: { type: string, enum: [PENDING, RUNNING, SUCCESS, FAILED, DEAD_LETTER] }
attempt: { type: integer }
max_retries: { type: integer }
next_run_at: { type: string, format: date-time, nullable: true }
last_error: { type: string, nullable: true }
result: { type: object, nullable: true }
created_at: { type: string, format: date-time }
updated_at: { type: string, format: date-time }
completed_at: { type: string, format: date-time, nullable: true }
2 changes: 2 additions & 0 deletions packages/backend/app/routes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from .categories import bp as categories_bp
from .docs import bp as docs_bp
from .dashboard import bp as dashboard_bp
from .jobs import bp as jobs_bp


def register_routes(app: Flask):
Expand All @@ -18,3 +19,4 @@ def register_routes(app: Flask):
app.register_blueprint(categories_bp, url_prefix="/categories")
app.register_blueprint(docs_bp, url_prefix="/docs")
app.register_blueprint(dashboard_bp, url_prefix="/dashboard")
app.register_blueprint(jobs_bp, url_prefix="/admin")
Loading