From 2eab20ef4f46977b9def5347830e19f22abc436a Mon Sep 17 00:00:00 2001 From: AkhileshNegi Date: Fri, 6 Feb 2026 22:55:03 +0530 Subject: [PATCH] refactor cron to single query --- backend/app/api/routes/cron.py | 12 +-- backend/app/crud/evaluations/cron.py | 102 ++---------------- backend/app/crud/evaluations/processing.py | 27 +++-- backend/app/tests/api/routes/test_cron.py | 25 ++--- .../tests/crud/evaluations/test_processing.py | 8 +- 5 files changed, 43 insertions(+), 131 deletions(-) diff --git a/backend/app/api/routes/cron.py b/backend/app/api/routes/cron.py index f04e11885..ab072970b 100644 --- a/backend/app/api/routes/cron.py +++ b/backend/app/api/routes/cron.py @@ -2,11 +2,9 @@ from app.api.permissions import Permission, require_permission from fastapi import APIRouter, Depends -from sqlmodel import Session -from app.api.deps import SessionDep, AuthContextDep +from app.api.deps import SessionDep from app.crud.evaluations import process_all_pending_evaluations_sync -from app.models import User logger = logging.getLogger(__name__) @@ -25,9 +23,9 @@ def evaluation_cron_job( Cron job endpoint for periodic evaluation tasks. This endpoint: - 1. Gets all organizations - 2. For each org, polls their pending evaluations - 3. Processes completed batches automatically + 1. Fetches all evaluation runs with status='processing' + 2. Groups them by project_id + 3. Processes each project with its OpenAI/Langfuse clients 4. Returns aggregated results Hidden from Swagger documentation. @@ -41,7 +39,6 @@ def evaluation_cron_job( logger.info( f"[evaluation_cron_job] Completed: " - f"orgs={result.get('organizations_processed', 0)}, " f"processed={result.get('total_processed', 0)}, " f"failed={result.get('total_failed', 0)}, " f"still_processing={result.get('total_still_processing', 0)}" @@ -57,7 +54,6 @@ def evaluation_cron_job( return { "status": "error", "error": str(e), - "organizations_processed": 0, "total_processed": 0, "total_failed": 0, "total_still_processing": 0, diff --git a/backend/app/crud/evaluations/cron.py b/backend/app/crud/evaluations/cron.py index ca6bd2af2..ab89f50fc 100644 --- a/backend/app/crud/evaluations/cron.py +++ b/backend/app/crud/evaluations/cron.py @@ -9,10 +9,9 @@ import logging from typing import Any -from sqlmodel import Session, select +from sqlmodel import Session from app.crud.evaluations.processing import poll_all_pending_evaluations -from app.models import Organization logger = logging.getLogger(__name__) @@ -21,110 +20,32 @@ async def process_all_pending_evaluations(session: Session) -> dict[str, Any]: """ Process all pending evaluations across all organizations. - This function: - 1. Gets all organizations - 2. For each org, polls their pending evaluations - 3. Processes completed batches automatically - 4. Returns aggregated results - - This is the main function that should be called by the cron endpoint. + Delegates to poll_all_pending_evaluations which fetches all processing + evaluation runs in a single query, groups by project, and processes them. Args: session: Database session Returns: - Dict with aggregated results: - { - "status": "success", - "organizations_processed": 3, - "total_processed": 5, - "total_failed": 1, - "total_still_processing": 2, - "results": [ - { - "org_id": 1, - "org_name": "Org 1", - "summary": {...} - }, - ... - ] - } + Dict with aggregated results. """ logger.info("[process_all_pending_evaluations] Starting evaluation processing") try: - # Get all organizations - orgs = session.exec(select(Organization)).all() - - if not orgs: - logger.info("[process_all_pending_evaluations] No organizations found") - return { - "status": "success", - "organizations_processed": 0, - "total_processed": 0, - "total_failed": 0, - "total_still_processing": 0, - "message": "No organizations to process", - "results": [], - } - - logger.info( - f"[process_all_pending_evaluations] Found {len(orgs)} organizations to process" - ) - - results = [] - total_processed = 0 - total_failed = 0 - total_still_processing = 0 - - # Process each organization - for org in orgs: - try: - logger.info( - f"[process_all_pending_evaluations] Processing org_id={org.id} ({org.name})" - ) - - # Poll all pending evaluations for this org - summary = await poll_all_pending_evaluations( - session=session, org_id=org.id - ) - - results.append( - { - "org_id": org.id, - "org_name": org.name, - "summary": summary, - } - ) - - total_processed += summary.get("processed", 0) - total_failed += summary.get("failed", 0) - total_still_processing += summary.get("still_processing", 0) - - except Exception as e: - logger.error( - f"[process_all_pending_evaluations] Error processing org_id={org.id}: {e}", - exc_info=True, - ) - session.rollback() - results.append( - {"org_id": org.id, "org_name": org.name, "error": str(e)} - ) - total_failed += 1 + summary = await poll_all_pending_evaluations(session=session) logger.info( f"[process_all_pending_evaluations] Completed: " - f"{total_processed} processed, {total_failed} failed, " - f"{total_still_processing} still processing" + f"{summary['processed']} processed, {summary['failed']} failed, " + f"{summary['still_processing']} still processing" ) return { "status": "success", - "organizations_processed": len(orgs), - "total_processed": total_processed, - "total_failed": total_failed, - "total_still_processing": total_still_processing, - "results": results, + "total_processed": summary["processed"], + "total_failed": summary["failed"], + "total_still_processing": summary["still_processing"], + "results": summary["details"], } except Exception as e: @@ -134,7 +55,6 @@ async def process_all_pending_evaluations(session: Session) -> dict[str, Any]: ) return { "status": "error", - "organizations_processed": 0, "total_processed": 0, "total_failed": 0, "total_still_processing": 0, diff --git a/backend/app/crud/evaluations/processing.py b/backend/app/crud/evaluations/processing.py index 99ce39d56..a9ce7bf50 100644 --- a/backend/app/crud/evaluations/processing.py +++ b/backend/app/crud/evaluations/processing.py @@ -645,13 +645,16 @@ async def check_and_process_evaluation( } -async def poll_all_pending_evaluations(session: Session, org_id: int) -> dict[str, Any]: +async def poll_all_pending_evaluations(session: Session) -> dict[str, Any]: """ - Poll all pending evaluations for an organization. + Poll all pending evaluations across all organizations. + + Fetches all evaluation runs with status='processing' in a single query, + groups them by project_id, and processes each project with its own + OpenAI/Langfuse clients. Args: session: Database session - org_id: Organization ID Returns: Summary dict: @@ -663,10 +666,9 @@ async def poll_all_pending_evaluations(session: Session, org_id: int) -> dict[st "details": [...] } """ - # Get pending evaluations (status = "processing") + # Single query to fetch all processing evaluation runs statement = select(EvaluationRun).where( EvaluationRun.status == "processing", - EvaluationRun.organization_id == org_id, ) pending_runs = session.exec(statement).all() @@ -678,8 +680,13 @@ async def poll_all_pending_evaluations(session: Session, org_id: int) -> dict[st "still_processing": 0, "details": [], } + + logger.info( + f"[poll_all_pending_evaluations] Found {len(pending_runs)} pending evaluation runs" + ) + # Group evaluations by project_id since credentials are per project - evaluations_by_project = defaultdict(list) + evaluations_by_project: dict[int, list[EvaluationRun]] = defaultdict(list) for run in pending_runs: evaluations_by_project[run.project_id].append(run) @@ -690,6 +697,8 @@ async def poll_all_pending_evaluations(session: Session, org_id: int) -> dict[st total_still_processing_count = 0 for project_id, project_runs in evaluations_by_project.items(): + # All runs in a project share the same org_id + org_id = project_runs[0].organization_id try: # Get API clients for this project try: @@ -709,7 +718,6 @@ async def poll_all_pending_evaluations(session: Session, org_id: int) -> dict[st ) # Mark all runs in this project as failed due to client configuration error for eval_run in project_runs: - # Persist failure status to database update_evaluation_run( session=session, eval_run=eval_run, @@ -751,7 +759,6 @@ async def poll_all_pending_evaluations(session: Session, org_id: int) -> dict[st f"[poll_all_pending_evaluations] Failed to check evaluation run | run_id={eval_run.id} | {e}", exc_info=True, ) - # Persist failure status to database update_evaluation_run( session=session, eval_run=eval_run, @@ -774,9 +781,7 @@ async def poll_all_pending_evaluations(session: Session, org_id: int) -> dict[st f"[poll_all_pending_evaluations] Failed to process project | project_id={project_id} | {e}", exc_info=True, ) - # Mark all runs in this project as failed for eval_run in project_runs: - # Persist failure status to database update_evaluation_run( session=session, eval_run=eval_run, @@ -803,7 +808,7 @@ async def poll_all_pending_evaluations(session: Session, org_id: int) -> dict[st } logger.info( - f"[poll_all_pending_evaluations] Polling summary | org_id={org_id} | processed={total_processed_count} | failed={total_failed_count} | still_processing={total_still_processing_count}" + f"[poll_all_pending_evaluations] Polling summary | processed={total_processed_count} | failed={total_failed_count} | still_processing={total_still_processing_count}" ) return summary diff --git a/backend/app/tests/api/routes/test_cron.py b/backend/app/tests/api/routes/test_cron.py index a73e2d6f8..858834dc9 100644 --- a/backend/app/tests/api/routes/test_cron.py +++ b/backend/app/tests/api/routes/test_cron.py @@ -13,15 +13,14 @@ def test_evaluation_cron_job_success( """Test successful cron job execution.""" mock_result = { "status": "success", - "organizations_processed": 2, "total_processed": 5, "total_failed": 0, "total_still_processing": 1, "results": [ { - "org_id": superuser_api_key.organization_id, - "org_name": "Test Org", - "summary": {"processed": 3, "failed": 0}, + "run_id": 1, + "run_name": "test_run", + "action": "processed", } ], } @@ -38,24 +37,21 @@ def test_evaluation_cron_job_success( assert response.status_code == 200 data = response.json() assert data["status"] == "success" - assert data["organizations_processed"] == 2 assert data["total_processed"] == 5 assert data["total_failed"] == 0 assert data["total_still_processing"] == 1 -def test_evaluation_cron_job_no_organizations( +def test_evaluation_cron_job_no_pending( client: TestClient, superuser_api_key: TestAuthContext, ) -> None: - """Test cron job when no organizations exist.""" + """Test cron job when no pending evaluations exist.""" mock_result = { "status": "success", - "organizations_processed": 0, "total_processed": 0, "total_failed": 0, "total_still_processing": 0, - "message": "No organizations to process", "results": [], } @@ -71,8 +67,7 @@ def test_evaluation_cron_job_no_organizations( assert response.status_code == 200 data = response.json() assert data["status"] == "success" - assert data["organizations_processed"] == 0 - assert data["message"] == "No organizations to process" + assert data["total_processed"] == 0 def test_evaluation_cron_job_with_failures( @@ -82,15 +77,15 @@ def test_evaluation_cron_job_with_failures( """Test cron job execution with some failed evaluations.""" mock_result = { "status": "success", - "organizations_processed": 1, "total_processed": 3, "total_failed": 2, "total_still_processing": 0, "results": [ { - "org_id": superuser_api_key.organization_id, - "org_name": "Test Org", - "summary": {"processed": 3, "failed": 2}, + "run_id": 1, + "run_name": "test_run", + "action": "failed", + "error": "Check failed", } ], } diff --git a/backend/app/tests/crud/evaluations/test_processing.py b/backend/app/tests/crud/evaluations/test_processing.py index 042a1a444..ab9fc59b0 100644 --- a/backend/app/tests/crud/evaluations/test_processing.py +++ b/backend/app/tests/crud/evaluations/test_processing.py @@ -798,9 +798,7 @@ async def test_poll_all_pending_evaluations_no_pending( self, db: Session, test_dataset ): """Test polling with no pending evaluations.""" - result = await poll_all_pending_evaluations( - session=db, org_id=test_dataset.organization_id - ) + result = await poll_all_pending_evaluations(session=db) assert result["total"] == 0 assert result["processed"] == 0 @@ -867,9 +865,7 @@ async def test_poll_all_pending_evaluations_with_runs( "action": "no_change", } - result = await poll_all_pending_evaluations( - session=db, org_id=test_dataset.organization_id - ) + result = await poll_all_pending_evaluations(session=db) assert result["total"] == 1 assert result["still_processing"] == 1