Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions backend/app/api/routes/cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@

from app.api.permissions import Permission, require_permission
from fastapi import APIRouter, Depends
from sqlmodel import Session

from app.api.deps import SessionDep, AuthContextDep
from app.api.deps import SessionDep
from app.crud.evaluations import process_all_pending_evaluations_sync
from app.models import User

logger = logging.getLogger(__name__)

Expand All @@ -25,9 +23,9 @@ def evaluation_cron_job(
Cron job endpoint for periodic evaluation tasks.

This endpoint:
1. Gets all organizations
2. For each org, polls their pending evaluations
3. Processes completed batches automatically
1. Fetches all evaluation runs with status='processing'
2. Groups them by project_id
3. Processes each project with its OpenAI/Langfuse clients
4. Returns aggregated results

Hidden from Swagger documentation.
Expand All @@ -41,7 +39,6 @@ def evaluation_cron_job(

logger.info(
f"[evaluation_cron_job] Completed: "
f"orgs={result.get('organizations_processed', 0)}, "
f"processed={result.get('total_processed', 0)}, "
f"failed={result.get('total_failed', 0)}, "
f"still_processing={result.get('total_still_processing', 0)}"
Expand All @@ -57,7 +54,6 @@ def evaluation_cron_job(
return {
"status": "error",
"error": str(e),
"organizations_processed": 0,
"total_processed": 0,
"total_failed": 0,
"total_still_processing": 0,
Expand Down
102 changes: 11 additions & 91 deletions backend/app/crud/evaluations/cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@
import logging
from typing import Any

from sqlmodel import Session, select
from sqlmodel import Session

from app.crud.evaluations.processing import poll_all_pending_evaluations
from app.models import Organization

logger = logging.getLogger(__name__)

Expand All @@ -21,110 +20,32 @@ async def process_all_pending_evaluations(session: Session) -> dict[str, Any]:
"""
Process all pending evaluations across all organizations.

This function:
1. Gets all organizations
2. For each org, polls their pending evaluations
3. Processes completed batches automatically
4. Returns aggregated results

This is the main function that should be called by the cron endpoint.
Delegates to poll_all_pending_evaluations which fetches all processing
evaluation runs in a single query, groups by project, and processes them.

Args:
session: Database session

Returns:
Dict with aggregated results:
{
"status": "success",
"organizations_processed": 3,
"total_processed": 5,
"total_failed": 1,
"total_still_processing": 2,
"results": [
{
"org_id": 1,
"org_name": "Org 1",
"summary": {...}
},
...
]
}
Dict with aggregated results.
"""
logger.info("[process_all_pending_evaluations] Starting evaluation processing")

try:
# Get all organizations
orgs = session.exec(select(Organization)).all()

if not orgs:
logger.info("[process_all_pending_evaluations] No organizations found")
return {
"status": "success",
"organizations_processed": 0,
"total_processed": 0,
"total_failed": 0,
"total_still_processing": 0,
"message": "No organizations to process",
"results": [],
}

logger.info(
f"[process_all_pending_evaluations] Found {len(orgs)} organizations to process"
)

results = []
total_processed = 0
total_failed = 0
total_still_processing = 0

# Process each organization
for org in orgs:
try:
logger.info(
f"[process_all_pending_evaluations] Processing org_id={org.id} ({org.name})"
)

# Poll all pending evaluations for this org
summary = await poll_all_pending_evaluations(
session=session, org_id=org.id
)

results.append(
{
"org_id": org.id,
"org_name": org.name,
"summary": summary,
}
)

total_processed += summary.get("processed", 0)
total_failed += summary.get("failed", 0)
total_still_processing += summary.get("still_processing", 0)

except Exception as e:
logger.error(
f"[process_all_pending_evaluations] Error processing org_id={org.id}: {e}",
exc_info=True,
)
session.rollback()
results.append(
{"org_id": org.id, "org_name": org.name, "error": str(e)}
)
total_failed += 1
summary = await poll_all_pending_evaluations(session=session)

logger.info(
f"[process_all_pending_evaluations] Completed: "
f"{total_processed} processed, {total_failed} failed, "
f"{total_still_processing} still processing"
f"{summary['processed']} processed, {summary['failed']} failed, "
f"{summary['still_processing']} still processing"
)

return {
"status": "success",
"organizations_processed": len(orgs),
"total_processed": total_processed,
"total_failed": total_failed,
"total_still_processing": total_still_processing,
"results": results,
"total_processed": summary["processed"],
"total_failed": summary["failed"],
"total_still_processing": summary["still_processing"],
"results": summary["details"],
}

except Exception as e:
Expand All @@ -134,7 +55,6 @@ async def process_all_pending_evaluations(session: Session) -> dict[str, Any]:
)
return {
"status": "error",
"organizations_processed": 0,
"total_processed": 0,
"total_failed": 0,
"total_still_processing": 0,
Expand Down
27 changes: 16 additions & 11 deletions backend/app/crud/evaluations/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -645,13 +645,16 @@ async def check_and_process_evaluation(
}


async def poll_all_pending_evaluations(session: Session, org_id: int) -> dict[str, Any]:
async def poll_all_pending_evaluations(session: Session) -> dict[str, Any]:
"""
Poll all pending evaluations for an organization.
Poll all pending evaluations across all organizations.

Fetches all evaluation runs with status='processing' in a single query,
groups them by project_id, and processes each project with its own
OpenAI/Langfuse clients.

Args:
session: Database session
org_id: Organization ID

Returns:
Summary dict:
Expand All @@ -663,10 +666,9 @@ async def poll_all_pending_evaluations(session: Session, org_id: int) -> dict[st
"details": [...]
}
"""
# Get pending evaluations (status = "processing")
# Single query to fetch all processing evaluation runs
statement = select(EvaluationRun).where(
EvaluationRun.status == "processing",
EvaluationRun.organization_id == org_id,
)
pending_runs = session.exec(statement).all()

Expand All @@ -678,8 +680,13 @@ async def poll_all_pending_evaluations(session: Session, org_id: int) -> dict[st
"still_processing": 0,
"details": [],
}

logger.info(
f"[poll_all_pending_evaluations] Found {len(pending_runs)} pending evaluation runs"
)

# Group evaluations by project_id since credentials are per project
evaluations_by_project = defaultdict(list)
evaluations_by_project: dict[int, list[EvaluationRun]] = defaultdict(list)
for run in pending_runs:
evaluations_by_project[run.project_id].append(run)

Expand All @@ -690,6 +697,8 @@ async def poll_all_pending_evaluations(session: Session, org_id: int) -> dict[st
total_still_processing_count = 0

for project_id, project_runs in evaluations_by_project.items():
# All runs in a project share the same org_id
org_id = project_runs[0].organization_id
try:
# Get API clients for this project
try:
Expand All @@ -709,7 +718,6 @@ async def poll_all_pending_evaluations(session: Session, org_id: int) -> dict[st
)
# Mark all runs in this project as failed due to client configuration error
for eval_run in project_runs:
# Persist failure status to database
update_evaluation_run(
session=session,
eval_run=eval_run,
Expand Down Expand Up @@ -751,7 +759,6 @@ async def poll_all_pending_evaluations(session: Session, org_id: int) -> dict[st
f"[poll_all_pending_evaluations] Failed to check evaluation run | run_id={eval_run.id} | {e}",
exc_info=True,
)
# Persist failure status to database
update_evaluation_run(
session=session,
eval_run=eval_run,
Expand All @@ -774,9 +781,7 @@ async def poll_all_pending_evaluations(session: Session, org_id: int) -> dict[st
f"[poll_all_pending_evaluations] Failed to process project | project_id={project_id} | {e}",
exc_info=True,
)
# Mark all runs in this project as failed
for eval_run in project_runs:
# Persist failure status to database
update_evaluation_run(
session=session,
eval_run=eval_run,
Expand All @@ -803,7 +808,7 @@ async def poll_all_pending_evaluations(session: Session, org_id: int) -> dict[st
}

logger.info(
f"[poll_all_pending_evaluations] Polling summary | org_id={org_id} | processed={total_processed_count} | failed={total_failed_count} | still_processing={total_still_processing_count}"
f"[poll_all_pending_evaluations] Polling summary | processed={total_processed_count} | failed={total_failed_count} | still_processing={total_still_processing_count}"
)

return summary
25 changes: 10 additions & 15 deletions backend/app/tests/api/routes/test_cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,14 @@ def test_evaluation_cron_job_success(
"""Test successful cron job execution."""
mock_result = {
"status": "success",
"organizations_processed": 2,
"total_processed": 5,
"total_failed": 0,
"total_still_processing": 1,
"results": [
{
"org_id": superuser_api_key.organization_id,
"org_name": "Test Org",
"summary": {"processed": 3, "failed": 0},
"run_id": 1,
"run_name": "test_run",
"action": "processed",
}
],
}
Expand All @@ -38,24 +37,21 @@ def test_evaluation_cron_job_success(
assert response.status_code == 200
data = response.json()
assert data["status"] == "success"
assert data["organizations_processed"] == 2
assert data["total_processed"] == 5
assert data["total_failed"] == 0
assert data["total_still_processing"] == 1


def test_evaluation_cron_job_no_organizations(
def test_evaluation_cron_job_no_pending(
client: TestClient,
superuser_api_key: TestAuthContext,
) -> None:
"""Test cron job when no organizations exist."""
"""Test cron job when no pending evaluations exist."""
mock_result = {
"status": "success",
"organizations_processed": 0,
"total_processed": 0,
"total_failed": 0,
"total_still_processing": 0,
"message": "No organizations to process",
"results": [],
}

Expand All @@ -71,8 +67,7 @@ def test_evaluation_cron_job_no_organizations(
assert response.status_code == 200
data = response.json()
assert data["status"] == "success"
assert data["organizations_processed"] == 0
assert data["message"] == "No organizations to process"
assert data["total_processed"] == 0


def test_evaluation_cron_job_with_failures(
Expand All @@ -82,15 +77,15 @@ def test_evaluation_cron_job_with_failures(
"""Test cron job execution with some failed evaluations."""
mock_result = {
"status": "success",
"organizations_processed": 1,
"total_processed": 3,
"total_failed": 2,
"total_still_processing": 0,
"results": [
{
"org_id": superuser_api_key.organization_id,
"org_name": "Test Org",
"summary": {"processed": 3, "failed": 2},
"run_id": 1,
"run_name": "test_run",
"action": "failed",
"error": "Check failed",
}
],
}
Expand Down
8 changes: 2 additions & 6 deletions backend/app/tests/crud/evaluations/test_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -798,9 +798,7 @@ async def test_poll_all_pending_evaluations_no_pending(
self, db: Session, test_dataset
):
"""Test polling with no pending evaluations."""
result = await poll_all_pending_evaluations(
session=db, org_id=test_dataset.organization_id
)
result = await poll_all_pending_evaluations(session=db)

assert result["total"] == 0
assert result["processed"] == 0
Expand Down Expand Up @@ -867,9 +865,7 @@ async def test_poll_all_pending_evaluations_with_runs(
"action": "no_change",
}

result = await poll_all_pending_evaluations(
session=db, org_id=test_dataset.organization_id
)
result = await poll_all_pending_evaluations(session=db)

assert result["total"] == 1
assert result["still_processing"] == 1
Expand Down