Skip to content

Conversation

@carlosgjs
Copy link
Collaborator

@carlosgjs carlosgjs commented Oct 8, 2025

Summary

Initial version of the Processing service V2.

Closes #971
Closes #968
Closes #969

Current state

The V2 path is working but disabled by default in this PR to allow for extended testing. When enabled, starting a job creates a queue for that job and populates with one task per image. The tasks can be pulled and ACKed via the APIs introduced in PR #1046. The new path can be enabled for a project via the async_pipeline_workers feature flag.

List of Changes

  • Add implementations for the /tasks and /results Job APIs.
  • Added NATS JetStream to the docker compose. I also tried RabbitMQ and Beanstalkd, but they don't support the visibility timeout semantics we want or a disconnected mode of pulling and ACKing tasks.
  • Added TaskStateManager and TaskQueueManager
  • Added the queuing and async results processing logic

TODOs:

  • Cleanup of queues after jobs complete
  • Decoupling the celery task state from the job state (small downside is the progress bar looks green before completion)
  • Integrate with the incomplete job monitoring to re-queue images if needed

Related Issues

See issues #970 and #971.

How to Test the Changes

This path can be enabled by turning on the job.project.feature_flags.async_pipeline_workers feature flag, see ami/jobs/models.py:400:

        if job.project.feature_flags.async_pipeline_workers:
            cls.queue_images_to_nats(job, images)
        else:
            cls.process_images(job, images)

And running the ami worker from RolnickLab/ami-data-companion#94

Test

image

Test both modes by tweaking the flag in the django admin console:
image

Deployment Notes

Checklist

  • I have tested these changes appropriately.
  • I have added and/or modified relevant tests.
  • I updated relevant documentation or comments.
  • I have verified that this PR follows the project's coding standards.
  • Any dependent changes have already been merged to main.

Summary by CodeRabbit

  • New Features

    • Optional async pipeline processing with message-queue-backed task distribution and per-stage job progress tracking
    • Endpoints to submit and retrieve pipeline tasks/results backed by the queue
    • Local/CI message queue manager for publish/reserve/ack workflows
  • Chores

    • Added message-queue service and client dependency; updated compose and env defaults for local/CI/staging
  • Documentation

    • README entry documenting the message queue dashboard URL
  • Tests

    • New unit tests covering queue manager and task-state tracking

✏️ Tip: You can customize this high-level summary in your review settings.

@netlify
Copy link

netlify bot commented Oct 8, 2025

Deploy Preview for antenna-preview canceled.

Name Link
🔨 Latest commit 3a73329
🔍 Latest deploy log https://app.netlify.com/projects/antenna-preview/deploys/6945e06e5f1c8b00084af4c5

@mihow
Copy link
Collaborator

mihow commented Oct 8, 2025

Exciting!

@carlosgjs carlosgjs marked this pull request as ready for review October 24, 2025 18:49
Copilot AI review requested due to automatic review settings October 24, 2025 18:49
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces Processing Service V2, enabling a pull-based task queue architecture using NATS JetStream instead of the push-based Celery approach. Workers can now pull tasks via HTTP endpoints, process them independently, and acknowledge completion without maintaining persistent connections.

Key changes:

  • Added NATS JetStream integration for distributed task queuing with configurable visibility timeouts
  • Introduced new REST API endpoints for task pulling (/jobs/{id}/tasks) and result submission (/jobs/{id}/result)
  • Implemented Redis-based progress tracking to handle asynchronous worker updates

Reviewed Changes

Copilot reviewed 15 out of 16 changed files in this pull request and generated 11 comments.

Show a summary per file
File Description
requirements/base.txt Added nats-py dependency for NATS client support
object_model_diagram.md Added comprehensive Mermaid diagram documenting ML pipeline system architecture
docker-compose.yml Added NATS JetStream service with health checks and monitoring
config/settings/base.py Added NATS_URL configuration setting
ami/utils/nats_queue.py New TaskQueueManager class for NATS JetStream operations
ami/jobs/views.py Added task pulling and result submission endpoints with pipeline filtering
ami/jobs/utils.py Helper function for running async code in sync Django context
ami/jobs/tasks.py New Celery task for processing pipeline results asynchronously
ami/jobs/task_state.py TaskStateManager for Redis-based job progress tracking
ami/jobs/models.py Added queue_images_to_nats method and NATS cleanup logic
ami/base/views.py Fixed request.data handling when not a dict
README.md Added NATS dashboard documentation link
.vscode/launch.json Added debug configurations for Django and Celery containers
.envs/.local/.django Added NATS_URL environment variable
.dockerignore Expanded with comprehensive ignore patterns
Comments suppressed due to low confidence (1)

object_model_diagram.md:1

  • The comment at line 13 appears to be template text from instructions rather than actual documentation content. This namedtuple field description doesn't match the file's purpose as an object model diagram.
# Object Model Diagram: ML Pipeline System

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@carlosgjs carlosgjs requested a review from mihow October 24, 2025 18:59
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Oct 31, 2025

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

Walkthrough

Adds NATS JetStream-backed task queuing and management, Redis task-state tracking, Celery result-processing, an async ML job queuing path behind a feature flag, API wiring for task reservation and result submission, schema/tests/docs updates, and local/CI Docker, config, and dependency additions for NATS.

Changes

Cohort / File(s) Summary
Configuration & Requirements
\.envs/.local/.django, \.envs/.ci/.django, \.envs/.production/.django-example, config/settings/base.py, requirements/base.txt
Add NATS_URL env entries and expose NATS_URL setting; add nats-py==2.10.0.
Docker & Docs
docker-compose.yml, docker-compose.ci.yml, docker-compose.staging.yml, README.md
Add nats JetStream service with healthcheck; include nats in django depends_on where applicable; adjust ui-dev entrypoint; document NATS dashboard URL.
Feature Flags
ami/main/models.py
Add async_pipeline_workers: bool = False to ProjectFeatureFlags.
NATS Queue Manager
ami/ml/orchestration/nats_queue.py
New async TaskQueueManager and get_connection: manage JetStream connection, per-job stream/consumer, publish/reserve/ack, and cleanup.
Orchestration Helpers
ami/ml/orchestration/jobs.py
Add queue_images_to_nats(job, images) and cleanup_nats_resources(job) to prepare messages, initialize Redis task state, and publish via TaskQueueManager.
Task State Management
ami/ml/orchestration/task_state.py
New TaskStateManager and TaskProgress for Redis-backed per-job per-stage pending lists, progress calculation, TTL, locking, and cleanup.
ML Job Model
ami/jobs/models.py
MLJob.run now branches on project.feature_flags.async_pipeline_workers; added MLJob.process_images classmethod and typed image collections.
Celery Tasks & Helpers
ami/jobs/tasks.py
Add Celery task process_pipeline_result plus helpers log_time, _update_job_progress, and _ack_task_via_nats; integrates TaskStateManager, DB save, NATS ACK, timing/logging, and concurrency handling.
API Endpoints & Schema Params
ami/jobs/views.py, ami/utils/requests.py
Implement tasks (GET) and result (POST) endpoints to reserve NATS tasks and queue result processing; add OpenAPI params ids_only, incomplete_only, batch.
Schemas / Models
ami/ml/schemas.py
Removed queue_timestamp field from PipelineProcessingTask.
Tests
ami/ml/test_nats_queue.py, ami/ml/tests.py, ami/jobs/tests.py
Add unit tests for TaskQueueManager and TaskStateManager; update job tests to call queue_images_to_nats and expect accepted / results_queued.
Module Init / Minor Refactor
ami/ml/orchestration/__init__.py
Removed unconditional re-export from .processing; added circular-import comment.
sequenceDiagram
    participant Client
    participant MLJob
    participant Flags as FeatureFlags
    participant QueueMgr as TaskQueueManager
    participant State as TaskStateManager
    participant Worker
    participant Celery as process_pipeline_result
    participant DB as Database

    Client->>MLJob: run(job, images)
    MLJob->>Flags: check async_pipeline_workers
    alt async enabled
        MLJob->>State: initialize_job(image_ids)
        MLJob->>QueueMgr: queue_images_to_nats(job, images)
        loop per batch
            QueueMgr->>QueueMgr: publish_task(job_id, message)
        end
    else sync path
        MLJob->>MLJob: process_images(job, images)
        MLJob->>DB: persist results & progress
    end

    Note over Worker,QueueMgr: Worker reserves tasks from JetStream
    Worker->>QueueMgr: reserve_task(job_id, batch)
    Worker->>Celery: run pipeline, produce result + reply_subject
    Celery->>State: update_state(processed_ids, "process", request_id)
    Celery->>DB: save pipeline results
    Celery->>QueueMgr: acknowledge_task(reply_subject)
    Celery->>State: update_state(processed_ids, "results", request_id)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45–75 minutes

  • Pay extra attention to:
    • async↔sync boundaries (async_to_sync) and exception handling in ami/ml/orchestration/jobs.py and ami/jobs/views.py.
    • JetStream stream/consumer naming, consumer config, and publish/reserve/ACK semantics in ami/ml/orchestration/nats_queue.py.
    • Redis locking, TTL, and atomic progress updates in ami/ml/orchestration/task_state.py.
    • Celery retry/backoff, ACK handling, and concurrent progress updates in ami/jobs/tasks.py.
    • Tests that mock NATS/JetStream and validate lifecycle behaviors in ami/ml/test_nats_queue.py.

Possibly related PRs

Suggested labels

backend, ml

Suggested reviewers

  • mihow

Poem

🐇 I hopped through JetStream at break of day,
I queued small images and watched them stray,
Redis kept tally as workers ran,
Celery clapped when results met the plan,
A carrot, a hop — queues lead the way. 🥕✨

Pre-merge checks and finishing touches

❌ Failed checks (1 warning, 1 inconclusive)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 73.77% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
Out of Scope Changes check ❓ Inconclusive The PR introduces several changes that appear to extend beyond the core queue creation objective: modifications to docker-compose configuration, environment variable setup, schema changes (removing queue_timestamp from PipelineProcessingTask), and comprehensive result processing logic via process_pipeline_result task. Clarify whether docker-compose infrastructure changes, schema modifications, and result processing implementations are within scope for issues #971, #968, #969, or represent supporting infrastructure necessary for the feature.
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The PR title '[integration] Processing service V2' directly summarizes the main change: introducing the V2 version of the processing service with NATS-backed async queuing.
Description check ✅ Passed The PR description follows the template structure with Summary, List of Changes, Related Issues, How to Test, and Checklist sections. However, a Detailed Description section addressing side effects/risks and Deployment Notes are missing or minimal.
Linked Issues check ✅ Passed The PR successfully implements the primary objectives from issue #971: queue creation/population on job start, enqueuing images as work items, and integrating with the async processing path. The implementation includes TaskQueueManager for NATS JetStream queuing and TaskStateManager for progress tracking.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ef8f16c and d254867.

📒 Files selected for processing (19)
  • .dockerignore (1 hunks)
  • .envs/.local/.django (1 hunks)
  • .gitignore (1 hunks)
  • .vscode/launch.json (1 hunks)
  • README.md (1 hunks)
  • ami/base/views.py (1 hunks)
  • ami/jobs/models.py (8 hunks)
  • ami/jobs/tasks.py (2 hunks)
  • ami/jobs/views.py (3 hunks)
  • ami/main/models.py (1 hunks)
  • ami/ml/orchestration/jobs.py (1 hunks)
  • ami/ml/orchestration/nats_queue.py (1 hunks)
  • ami/ml/orchestration/task_state.py (1 hunks)
  • ami/ml/orchestration/utils.py (1 hunks)
  • ami/utils/requests.py (2 hunks)
  • config/settings/base.py (2 hunks)
  • docker-compose.yml (4 hunks)
  • object_model_diagram.md (1 hunks)
  • requirements/base.txt (3 hunks)
🧰 Additional context used
🧬 Code graph analysis (8)
ami/ml/orchestration/nats_queue.py (1)
ami/jobs/views.py (1)
  • result (256-339)
ami/ml/orchestration/task_state.py (1)
ami/ml/orchestration/jobs.py (1)
  • cleanup (20-23)
ami/jobs/views.py (3)
ami/jobs/tasks.py (1)
  • process_pipeline_result (45-138)
ami/jobs/models.py (4)
  • Job (727-1012)
  • JobState (27-63)
  • logger (997-1006)
  • final_states (58-59)
ami/ml/orchestration/nats_queue.py (2)
  • TaskQueueManager (28-294)
  • reserve_task (152-208)
ami/jobs/tasks.py (5)
ami/ml/orchestration/nats_queue.py (2)
  • TaskQueueManager (28-294)
  • acknowledge_task (210-229)
ami/ml/orchestration/task_state.py (3)
  • TaskStateManager (17-97)
  • mark_images_processed (48-61)
  • get_progress (63-90)
ami/ml/orchestration/utils.py (1)
  • run_in_async_loop (8-18)
ami/jobs/models.py (5)
  • Job (727-1012)
  • JobState (27-63)
  • logger (997-1006)
  • update_stage (168-188)
  • save (947-958)
ami/ml/models/pipeline.py (3)
  • save (1115-1121)
  • save_results (809-917)
  • save_results (1107-1108)
ami/ml/orchestration/jobs.py (4)
ami/jobs/models.py (2)
  • Job (727-1012)
  • logger (997-1006)
ami/ml/orchestration/nats_queue.py (3)
  • TaskQueueManager (28-294)
  • cleanup_job_resources (278-294)
  • publish_task (119-150)
ami/ml/orchestration/task_state.py (3)
  • TaskStateManager (17-97)
  • cleanup (92-97)
  • initialize_job (38-46)
ami/ml/orchestration/utils.py (1)
  • run_in_async_loop (8-18)
ami/ml/orchestration/utils.py (1)
ami/jobs/models.py (1)
  • logger (997-1006)
ami/base/views.py (1)
ami/main/api/views.py (1)
  • get (1595-1651)
ami/jobs/models.py (3)
ami/ml/orchestration/jobs.py (1)
  • queue_images_to_nats (28-107)
ami/main/models.py (1)
  • SourceImage (1622-1870)
ami/ml/models/pipeline.py (2)
  • process_images (163-278)
  • process_images (1091-1105)
🪛 LanguageTool
object_model_diagram.md

[grammar] ~167-~167: Ensure spelling is correct
Context: ...ts 4. Job tracks progress through JobProgress and JobProgressStageDetail

(QB_NEW_EN_ORTHOGRAPHY_ERROR_IDS_1)

🪛 markdownlint-cli2 (0.18.1)
object_model_diagram.md

15-15: Fenced code blocks should have a language specified

(MD040, fenced-code-language)


31-31: Fenced code blocks should have a language specified

(MD040, fenced-code-language)


38-38: Bare URL used

(MD034, no-bare-urls)


39-39: Bare URL used

(MD034, no-bare-urls)


40-40: Bare URL used

(MD034, no-bare-urls)


41-41: Bare URL used

(MD034, no-bare-urls)


42-42: Bare URL used

(MD034, no-bare-urls)


42-42: Bare URL used

(MD034, no-bare-urls)


43-43: Bare URL used

(MD034, no-bare-urls)


61-61: Fenced code blocks should have a language specified

(MD040, fenced-code-language)


77-77: Bare URL used

(MD034, no-bare-urls)


97-97: Bare URL used

(MD034, no-bare-urls)


118-118: Code block style
Expected: fenced; Actual: indented

(MD046, code-block-style)


122-122: Code block style
Expected: fenced; Actual: indented

(MD046, code-block-style)


126-126: Code block style
Expected: fenced; Actual: indented

(MD046, code-block-style)


130-130: Code block style
Expected: fenced; Actual: indented

(MD046, code-block-style)

🪛 Ruff (0.14.2)
ami/ml/orchestration/nats_queue.py

70-70: Unused method argument: ttr

(ARG002)


73-73: Avoid specifying long messages outside the exception class

(TRY003)


81-81: Do not catch blind exception: Exception

(BLE001)


94-94: Avoid specifying long messages outside the exception class

(TRY003)


103-103: Do not catch blind exception: Exception

(BLE001)


132-132: Avoid specifying long messages outside the exception class

(TRY003)


146-146: Consider moving this statement to an else block

(TRY300)


148-148: Do not catch blind exception: Exception

(BLE001)


149-149: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


164-164: Avoid specifying long messages outside the exception class

(TRY003)


206-206: Do not catch blind exception: Exception

(BLE001)


207-207: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


221-221: Avoid specifying long messages outside the exception class

(TRY003)


226-226: Consider moving this statement to an else block

(TRY300)


227-227: Do not catch blind exception: Exception

(BLE001)


228-228: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


242-242: Avoid specifying long messages outside the exception class

(TRY003)


250-250: Consider moving this statement to an else block

(TRY300)


251-251: Do not catch blind exception: Exception

(BLE001)


252-252: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


266-266: Avoid specifying long messages outside the exception class

(TRY003)


273-273: Consider moving this statement to an else block

(TRY300)


274-274: Do not catch blind exception: Exception

(BLE001)


275-275: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

ami/ml/orchestration/task_state.py

35-35: Unused blanket noqa directive

Remove unused noqa directive

(RUF100)


36-36: Unused blanket noqa directive

Remove unused noqa directive

(RUF100)

ami/jobs/views.py

33-43: Mutable class attributes should be annotated with typing.ClassVar

(RUF012)


73-73: Mutable class attributes should be annotated with typing.ClassVar

(RUF012)


218-218: Unused method argument: pk

(ARG002)


236-236: Avoid specifying long messages outside the exception class

(TRY003)


244-244: Loop control variable i not used within loop body

Rename unused i to _i

(B007)


283-283: Avoid specifying long messages outside the exception class

(TRY003)


288-288: Avoid specifying long messages outside the exception class

(TRY003)


298-298: Avoid specifying long messages outside the exception class

(TRY003)


301-301: Avoid specifying long messages outside the exception class

(TRY003)


322-322: Do not catch blind exception: Exception

(BLE001)


323-323: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

ami/jobs/tasks.py

45-45: Unused function argument: self

(ARG001)


120-120: Do not catch blind exception: Exception

(BLE001)


121-121: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


133-133: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


136-136: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

config/settings/base.py

28-28: Unused noqa directive (non-enabled: E231, E501)

Remove unused noqa directive

(RUF100)

ami/ml/orchestration/jobs.py

69-69: Loop control variable i not used within loop body

Rename unused i to _i

(B007)


78-78: Do not catch blind exception: Exception

(BLE001)

ami/ml/orchestration/utils.py

14-14: Do not catch blind exception: Exception

(BLE001)


15-15: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

ami/jobs/models.py

75-75: Unused blanket noqa directive

Remove unused noqa directive

(RUF100)


430-430: Unused blanket noqa directive

Remove unused noqa directive

(RUF100)


482-482: Unused blanket noqa directive

Remove unused noqa directive

(RUF100)

carlosgjs and others added 2 commits October 31, 2025 14:33
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

♻️ Duplicate comments (1)
ami/jobs/tasks.py (1)

160-166: Use timezone-aware datetime for finished_at.

Line 163 uses datetime.now() which returns a naive (non-timezone-aware) datetime. Django models typically expect timezone-aware datetimes, especially when USE_TZ = True. This can cause subtle bugs or database inconsistencies.

Apply this diff to use Django's timezone-aware datetime:

-from datetime import datetime
+from django.utils import timezone

And then:

         if stage == "results" and progress_percentage >= 1.0:
             job.status = JobState.SUCCESS
             job.progress.summary.status = JobState.SUCCESS
-            job.finished_at = datetime.now()
+            job.finished_at = timezone.now()
🧹 Nitpick comments (8)
ami/ml/orchestration/jobs.py (4)

11-26: Previous issue addressed; update docstring to document return value.

The function now correctly returns the cleanup result (addressing the previous review comment), but the docstring is missing the Returns: section.

Apply this diff to complete the docstring:

 def cleanup_nats_resources(job: "Job") -> bool:
     """
     Clean up NATS JetStream resources (stream and consumer) for a completed job.
 
     Args:
         job: The Job instance
+
+    Returns:
+        bool: True if cleanup successful, False otherwise
     """

72-72: Consider using job.logger for consistency.

This line uses the module-level logger, while other logging in this function uses job.logger. Using job.logger consistently ensures all logs are captured by the JobLogHandler.

-                    logger.info(f"Queueing image {image_pk} to stream for job '{job_id}': {message}")
+                    job.logger.info(f"Queueing image {image_pk} to stream for job '{job_id}': {message}")

Note: You'll need to pass job into the async function or capture it from the outer scope (which should work since it's already in scope).


95-97: Consider NATS resource lifecycle for empty job.

When there are no images, the job progress is updated to complete, but should NATS stream/consumer resources be created at all, or immediately cleaned up if already created?

This could lead to orphaned NATS resources. Consider:

  1. Skipping NATS resource creation entirely when len(images) == 0, or
  2. Calling cleanup_nats_resources(job) immediately after marking the job complete.

103-106: Document partial failure behavior.

When some images fail to queue, the function returns False, but successfully queued images remain in the NATS stream. This could leave the job in an inconsistent state.

Consider documenting this behavior in the docstring, or implement one of these strategies:

  1. Return success if at least some images were queued, and let the job track partial completion
  2. Implement a rollback mechanism to remove successfully queued messages when any failure occurs
  3. Add retry logic for failed queue operations

The current behavior (returning False on any failure) might trigger job-level error handling that doesn't account for partially queued work.

ami/jobs/tasks.py (4)

126-128: Use logging.exception to capture traceback automatically.

When logging an exception in an except block, logging.exception is preferred over logging.error because it automatically includes the stack trace, making debugging easier.

Apply this diff:

         except Exception as ack_error:
-            job.logger.error(f"Error acknowledging task via NATS: {ack_error}")
+            job.logger.exception(f"Error acknowledging task via NATS: {ack_error}")
             # Don't fail the task if ACK fails - data is already saved

141-147: Use logging.exception in exception handlers.

Similar to the NATS ACK handler, these exception handlers should use logging.exception instead of logging.error to automatically capture stack traces for easier debugging.

Apply this diff:

     except Job.DoesNotExist:
-        logger.error(f"Job {job_id} not found")
+        logger.exception(f"Job {job_id} not found")
         raise
     except Exception as e:
-        logger.error(f"Failed to process pipeline result for job {job_id}: {e}")
+        logger.exception(f"Failed to process pipeline result for job {job_id}: {e}")
         # Celery will automatically retry based on autoretry_for
         raise

160-166: Cleanup of NATS queues and Redis state when job completes.

When the job reaches 100% completion (lines 160-163), there's no call to clean up the NATS JetStream queue or Redis state managed by TaskStateManager. This was mentioned in previous reviews and is listed as a TODO in the PR objectives.

Without cleanup:

  • NATS queues will accumulate over time
  • Redis keys persist beyond their usefulness
  • Resource leaks could degrade system performance

Would you like me to generate a cleanup implementation that could be called here? It would need to:

  1. Delete the NATS stream for this job
  2. Call TaskStateManager(job_id).cleanup() to remove Redis keys
  3. Include error handling to avoid failing job finalization if cleanup fails

80-86: Consider extracting duplicate retry logic (optional).

The pattern of calling update_state, checking for None, logging a warning, and retrying with the same parameters appears at both lines 80-86 and lines 131-138. You could extract this to a small helper function to reduce duplication.

Example helper:

def _update_state_with_retry(state_manager, processed_image_ids, stage, request_id, task):
    progress_info = state_manager.update_state(processed_image_ids, stage=stage, request_id=request_id)
    if not progress_info:
        logger.warning(
            f"Another task is already processing results for job {state_manager.job_id}. "
            f"Retrying task {request_id} in 5 seconds..."
        )
        raise task.retry(countdown=5, max_retries=10)
    return progress_info

Then use:

progress_info = _update_state_with_retry(state_manager, processed_image_ids, "process", self.request.id, self)

However, since this is only used twice and the logic is straightforward, the current approach is also acceptable.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between df7eaa3 and 0391642.

📒 Files selected for processing (2)
  • ami/jobs/tasks.py (3 hunks)
  • ami/ml/orchestration/jobs.py (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
ami/jobs/tasks.py (5)
ami/jobs/views.py (2)
  • result (256-339)
  • retry (114-125)
ami/ml/orchestration/nats_queue.py (2)
  • TaskQueueManager (31-296)
  • acknowledge_task (212-231)
ami/ml/orchestration/task_state.py (2)
  • TaskStateManager (17-125)
  • update_state (54-83)
ami/jobs/models.py (4)
  • Job (734-1019)
  • logger (1004-1013)
  • retry (873-885)
  • save (954-965)
ami/ml/models/pipeline.py (3)
  • save_results (814-922)
  • save_results (1120-1121)
  • save (1128-1134)
ami/ml/orchestration/jobs.py (3)
ami/jobs/models.py (4)
  • Job (734-1019)
  • logger (1004-1013)
  • update_stage (168-188)
  • save (954-965)
ami/ml/orchestration/nats_queue.py (3)
  • TaskQueueManager (31-296)
  • cleanup_job_resources (280-296)
  • publish_task (122-152)
ami/ml/orchestration/task_state.py (3)
  • TaskStateManager (17-125)
  • cleanup (119-125)
  • initialize_job (39-49)
🪛 Ruff (0.14.5)
ami/jobs/tasks.py

126-126: Do not catch blind exception: Exception

(BLE001)


127-127: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


138-138: Abstract raise to an inner function

(TRY301)


142-142: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


145-145: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

ami/ml/orchestration/jobs.py

70-70: Loop control variable i not used within loop body

Rename unused i to _i

(B007)


78-78: Do not catch blind exception: Exception

(BLE001)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: test
🔇 Additional comments (1)
ami/jobs/tasks.py (1)

203-225: LGTM! Clean timing utility.

The log_time helper is well-designed with clear semantics. Using time.perf_counter() is the right choice for measuring elapsed time, and the partial function pattern makes it convenient to chain timing measurements. The docstring with usage examples is helpful.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
ami/ml/orchestration/jobs.py (1)

46-58: Use timezone-aware datetime for queue_timestamp.

Line 56 uses datetime.datetime.now() which returns a naive (timezone-unaware) datetime. Django applications should use timezone-aware timestamps to avoid comparison and storage issues.

Apply this diff to fix:

+from django.utils import timezone
+
 def queue_images_to_nats(job: "Job", images: list[SourceImage]):
     ...
         message = {
             "job_id": job.pk,
             "image_id": image_id,
             "image_url": image.url() if hasattr(image, "url") else None,
             "timestamp": (image.timestamp.isoformat() if hasattr(image, "timestamp") and image.timestamp else None),
             "batch_index": i,
             "total_images": len(images),
-            "queue_timestamp": datetime.datetime.now().isoformat(),
+            "queue_timestamp": timezone.now().isoformat(),
         }

You may also be able to remove the datetime import from line 1 if it's not used elsewhere.

🧹 Nitpick comments (2)
ami/ml/orchestration/jobs.py (2)

11-26: LGTM - cleanup function properly returns result.

The function correctly returns the boolean result from the async cleanup operation (line 26), which allows callers to handle success/failure. Good fix from the previous review.

Regarding the TODO comment at line 11: I can help implement the cleanup mechanism if needed. This could be triggered from job completion handlers or as part of a Celery task. Would you like me to open an issue to track this, or would you prefer a code suggestion?


88-102: LGTM - clean result handling.

The direct unpacking at line 88 is correct (previous None check issue was properly resolved). The logic clearly returns False for any failures and True for complete success.

Lines 90-92 handle the edge case of an empty images list by marking the job complete. While this seems intentional and reasonable, consider adding a brief comment explaining this behavior for future maintainers.

Optional: Add explanatory comment:

     successful_queues, failed_queues = async_to_sync(queue_all_images)()
 
     if not images:
+        # No images to process - mark job as complete immediately
         job.progress.update_stage("results", status=JobState.SUCCESS, progress=1.0)
         job.save()
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0391642 and 4ae27b0.

📒 Files selected for processing (2)
  • ami/ml/orchestration/jobs.py (1 hunks)
  • docker-compose.yml (4 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • docker-compose.yml
🧰 Additional context used
🧬 Code graph analysis (1)
ami/ml/orchestration/jobs.py (3)
ami/jobs/models.py (5)
  • Job (734-1019)
  • JobState (27-63)
  • logger (1004-1013)
  • update_stage (168-188)
  • save (954-965)
ami/ml/orchestration/nats_queue.py (3)
  • TaskQueueManager (31-296)
  • cleanup_job_resources (280-296)
  • publish_task (122-152)
ami/ml/orchestration/task_state.py (3)
  • TaskStateManager (17-125)
  • cleanup (119-125)
  • initialize_job (39-49)
🪛 Ruff (0.14.5)
ami/ml/orchestration/jobs.py

77-77: Do not catch blind exception: Exception

(BLE001)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: test
🔇 Additional comments (2)
ami/ml/orchestration/jobs.py (2)

60-63: LGTM - proper Redis state initialization.

Good pattern to initialize the task state manager with all image IDs before queuing begins. This ensures progress tracking is ready before any tasks are processed.


65-86: LGTM - batch processing with appropriate error handling.

The broad Exception catch at line 77 is flagged by static analysis, but it's actually appropriate in this context. The batch-processing pattern here requires continuing through all images even if individual ones fail, and the exception is logged (line 78) while tracking success/failure counts. This provides resilience for partial failures.

carlosgjs and others added 4 commits November 21, 2025 15:19
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
ami/jobs/tasks.py (1)

124-125: Consider using logging.exception for better debugging context.

At lines 125, 140, and 143, the code uses logging.error() within exception handlers. Using logging.exception() would automatically include the stack trace, making debugging easier without changing the log level.

Example for line 125:

         except Exception as ack_error:
-            job.logger.error(f"Error acknowledging task via NATS: {ack_error}")
+            job.logger.exception(f"Error acknowledging task via NATS: {ack_error}")
             # Don't fail the task if ACK fails - data is already saved

Apply similar changes to lines 140 and 143.

Also applies to: 140-140, 143-143

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4ae27b0 and f221a1a.

📒 Files selected for processing (3)
  • ami/jobs/tasks.py (3 hunks)
  • ami/ml/orchestration/jobs.py (1 hunks)
  • docker-compose.yml (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • docker-compose.yml
🧰 Additional context used
🧬 Code graph analysis (2)
ami/ml/orchestration/jobs.py (3)
ami/jobs/models.py (4)
  • Job (734-1019)
  • logger (1004-1013)
  • update_stage (168-188)
  • save (954-965)
ami/ml/orchestration/nats_queue.py (3)
  • TaskQueueManager (31-296)
  • cleanup_job_resources (280-296)
  • publish_task (122-152)
ami/ml/orchestration/task_state.py (3)
  • TaskStateManager (17-125)
  • cleanup (119-125)
  • initialize_job (39-49)
ami/jobs/tasks.py (5)
ami/jobs/views.py (1)
  • result (256-339)
ami/ml/orchestration/nats_queue.py (2)
  • TaskQueueManager (31-296)
  • acknowledge_task (212-231)
ami/ml/orchestration/task_state.py (2)
  • TaskStateManager (17-125)
  • update_state (54-83)
ami/jobs/models.py (4)
  • Job (734-1019)
  • logger (1004-1013)
  • update_stage (168-188)
  • save (954-965)
ami/ml/models/pipeline.py (3)
  • save_results (814-922)
  • save_results (1120-1121)
  • save (1128-1134)
🪛 Ruff (0.14.5)
ami/ml/orchestration/jobs.py

76-76: Do not catch blind exception: Exception

(BLE001)

ami/jobs/tasks.py

124-124: Do not catch blind exception: Exception

(BLE001)


125-125: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


136-136: Abstract raise to an inner function

(TRY301)


140-140: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


143-143: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: test
🔇 Additional comments (4)
ami/ml/orchestration/jobs.py (1)

11-25: LGTM: Cleanup function correctly returns success status.

The async cleanup flow properly wraps the TaskQueueManager call and returns the boolean result, allowing callers to handle success/failure appropriately.

ami/jobs/tasks.py (3)

67-77: LGTM: Error handling correctly marks failed images as processed.

The code properly extracts the failed image ID and marks it as processed (lines 74-75), preventing the job from hanging indefinitely when images fail. This addresses the earlier review feedback.


111-127: LGTM: NATS acknowledgment flow is robust.

The acknowledgment logic correctly:

  • Acknowledges tasks even if processing errored (preventing infinite redelivery)
  • Handles ACK failures gracefully without failing the task (line 126 comment)
  • Uses proper async/sync bridging

The broad exception catch on line 124 is intentional and well-documented.


201-223: LGTM: Useful timing utility.

The log_time helper provides a clean pattern for measuring and logging execution times with partial function application. Well-documented with clear usage examples.

messages.append((image.pk, message))

# Store all image IDs in Redis for progress tracking
state_manager = TaskStateManager(job.pk)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vanessavmac you may be interested in looking into the details of the TaskStateManager

mihow added a commit that referenced this pull request Dec 9, 2025
…#1046)

Defines an initial set of API endpoints for an external processing service to read & pull ML tasks from the job queue and post back results (detections & predictions).

- Add tasks() endpoint (stubbed) for NATS JetStream integration
- Add result() endpoint (stubbed) for pipeline result processing
- Add ids_only and incomplete_only query parameters to job list endpoint
- Add JobFilterSet with pipeline__slug filtering and search fields

- Add utility OpenApiParameter definitions for new query params

The tasks() and result() endpoints are documented and stubbed out,
awaiting NATS and Celery task dependencies from PR #987.

* test: Add comprehensive unit tests for new JobViewSet features

- test_list_jobs_with_ids_only: Verifies ids_only parameter returns job IDs
- test_list_jobs_with_incomplete_only: Tests incomplete_only filtering by results stage
- test_filter_by_pipeline_slug: Tests JobFilterSet pipeline__slug filtering
- test_search_jobs: Tests search functionality by name and pipeline name
- test_tasks_endpoint_stub: Tests stubbed tasks() endpoint
- test_tasks_endpoint_with_batch: Tests batch parameter handling
- test_tasks_endpoint_without_pipeline: Tests validation for missing pipeline
- test_result_endpoint_stub: Tests stubbed result() endpoint
- test_result_endpoint_validation: Tests request validation for result()

All tests pass successfully, verifying the new API features work correctly.

* refactor: Use helper methods in JobViewSet tests for cleaner code

* Update ami/jobs/views.py

CR feedback

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update ami/jobs/views.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update ami/jobs/views.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Use self.get_object()

* Lean out implementation

* Use pydantic types for tasks/results

* Update tests

* coderabbit feedback

* Better batch validation

* better tests, fix formatting settings

* Fix typo

* Move filtering to filter_queryset

* Undo settings change

* Make jobs/tasks?batch required

* CR nits

* Refactor

* Update tests

* chore: move openAPI param definitions to job/schemas

* docs: add inline todos

* chore: move project openAPI param to main/schemas

* test: fix periodically failing test due to pagination. update asserts.

* fix: add missing new schema files

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Michael Bunsen <notbot@gmail.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
ami/jobs/tasks.py (1)

165-190: Fix Celery signal wiring: pre_update_job_status should not subscribe to task_postrun

pre_update_job_status is currently decorated with both @task_postrun.connect and @task_prerun.connect. This creates a runtime error:

When task_postrun fires, Celery passes state in kwargs. The function then calls update_job_status(sender, task_id, task, "PENDING", **kwargs), which results in state being passed both positionally (as "PENDING") and via kwargs (as the actual task state), causing TypeError: multiple values for argument 'state'.

task_prerun does not include state in kwargs, so it works correctly. Remove the @task_postrun.connect decorator to fix this:

-@task_postrun.connect(sender=run_job)
-@task_prerun.connect(sender=run_job)
-def pre_update_job_status(sender, task_id, task, **kwargs):
-    # in the prerun signal, set the job status to PENDING
-    update_job_status(sender, task_id, task, "PENDING", **kwargs)
+@task_prerun.connect(sender=run_job)
+def pre_update_job_status(sender, task_id, task, **kwargs):
+    # in the prerun signal, set the job status to PENDING
+    update_job_status(sender, task_id, task, "PENDING", **kwargs)
🧹 Nitpick comments (4)
README.md (1)

72-76: Wrap NATS dashboard URL to satisfy markdownlint (MD034)

The bare URL triggers MD034. You can fix by wrapping it as a markdown link:

-- NATS dashboard: https://natsdashboard.com/ (Add localhost)
+- NATS dashboard: [https://natsdashboard.com/](https://natsdashboard.com/) (Add localhost)
ami/jobs/views.py (2)

201-240: DRY up ids_only / incomplete_only handling and reuse existing helpers

The list() implementation works, but a couple of cleanups would reduce duplication:

  • You're manually parsing booleans for ids_only / incomplete_only, while url_boolean_param already encapsulates this.
  • The incomplete-only filtering logic duplicates what IncompleteJobFilter does.

Consider:

-        # Check if ids_only parameter is set
-        ids_only = request.query_params.get("ids_only", "false").lower() in ["true", "1", "yes"]
-
-        # Check if incomplete_only parameter is set
-        incomplete_only = request.query_params.get("incomplete_only", "false").lower() in ["true", "1", "yes"]
+        ids_only = url_boolean_param(request, "ids_only", default=False)
+        incomplete_only = url_boolean_param(request, "incomplete_only", default=False)

and either:

  • Drop IncompleteJobFilter and keep the inline incomplete_only branch here, or
  • Move the logic back into IncompleteJobFilter and rely solely on the filter backend, to avoid two sources of truth.

246-283: Rename unused loop index in tasks() for clarity

Inside get_tasks, the loop index i is unused:

for i in range(batch):
    task = await manager.reserve_task(job_id, timeout=0.1)

Rename it to _ to make intent clear and satisfy Ruff B007:

-            async with TaskQueueManager() as manager:
-                for i in range(batch):
+            async with TaskQueueManager() as manager:
+                for _ in range(batch):
                     task = await manager.reserve_task(job_id, timeout=0.1)
ami/jobs/tasks.py (1)

41-145: Improve diagnostics in process_pipeline_result without changing behaviour

The overall flow in process_pipeline_result looks good (state manager → DB save → NATS ACK → results-stage progress), but the exception logging loses stack traces and uses broad Exception catches.

To keep behaviour but improve observability and align with Ruff hints, you could:

-        except Exception as ack_error:
-            job.logger.error(f"Error acknowledging task via NATS: {ack_error}")
+        except Exception:
+            job.logger.exception(
+                "Error acknowledging task via NATS for job %s (reply_subject=%s)",
+                job_id,
+                reply_subject,
+            )
@@
-    except Job.DoesNotExist:
-        logger.error(f"Job {job_id} not found")
-        raise
-    except Exception as e:
-        logger.error(f"Failed to process pipeline result for job {job_id}: {e}")
-        # Celery will automatically retry based on autoretry_for
-        raise
+    except Job.DoesNotExist:
+        logger.exception("Job %s not found while processing pipeline result", job_id)
+        raise
+    except Exception:
+        logger.exception("Failed to process pipeline result for job %s", job_id)
+        # Celery will automatically retry based on autoretry_for
+        raise

This keeps retry semantics the same but makes debugging failures and ACK issues much easier.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f221a1a and 1a9b80a.

📒 Files selected for processing (5)
  • README.md (1 hunks)
  • ami/jobs/tasks.py (3 hunks)
  • ami/jobs/views.py (6 hunks)
  • ami/main/models.py (1 hunks)
  • config/settings/base.py (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • ami/main/models.py
🧰 Additional context used
🪛 markdownlint-cli2 (0.18.1)
README.md

74-74: Bare URL used

(MD034, no-bare-urls)

🪛 Ruff (0.14.8)
ami/jobs/tasks.py

123-123: Do not catch blind exception: Exception

(BLE001)


124-124: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


135-135: Abstract raise to an inner function

(TRY301)


139-139: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


142-142: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

ami/jobs/views.py

273-273: Loop control variable i not used within loop body

Rename unused i to _i

(B007)


285-285: Unused method argument: pk

(ARG002)


330-330: Do not catch blind exception: Exception

(BLE001)


331-331: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: test
🔇 Additional comments (3)
config/settings/base.py (1)

266-268: NATS_URL setting aligns with existing env patterns

Using env("NATS_URL", default="nats://localhost:4222") is consistent with other settings and provides a sensible local-dev default. No changes needed.

ami/jobs/tasks.py (1)

204-226: log_time helper is a handy, lightweight timing utility

The log_time helper is small, side-effect-free, and its API is clear. Using time.perf_counter() and returning a partially applied function makes it easy to chain timing calls around different segments.

Looks good as-is.

ami/jobs/views.py (1)

285-347: Guard against invalid result payloads and improve error reporting

If any item in results doesn't match PipelineTaskResult's shape, PipelineTaskResult(**item) will raise and bubble up as a 500 instead of a 4xx. Only the .delay() call is wrapped in try/except.

Wrap the construction in a try/except to fail fast with a clear 400:

-        for item in results:
-            task_result = PipelineTaskResult(**item)
+        for idx, item in enumerate(results):
+            try:
+                task_result = PipelineTaskResult(**item)
+            except Exception as exc:
+                # Treat bad client payloads as a 400 rather than 500
+                raise ValidationError({"results": f"Item {idx} is invalid: {exc}"})
             reply_subject = task_result.reply_subject
             result_data = task_result.result
@@
-            except Exception as e:
-                logger.error(f"Failed to queue result with reply_subject='{reply_subject}' for job {job_id}: {e}")
+            except Exception:
+                # Log full stack; per-item error is still surfaced in the response
+                logger.exception(
+                    "Failed to queue result with reply_subject='%s' for job %s",
+                    reply_subject,
+                    job_id,
+                )

This keeps the API responsive to malformed input and provides better diagnostics when queuing fails.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (2)
ami/jobs/tasks.py (1)

157-160: Use timezone-aware datetime for finished_at.

datetime.now() returns a naive datetime. Django applications should use timezone-aware timestamps.

+from django.utils import timezone
+
 def _update_job_progress(job_id: int, stage: str, progress_percentage: float) -> None:
     from ami.jobs.models import Job, JobState  # avoid circular import

     with transaction.atomic():
         job = Job.objects.select_for_update().get(pk=job_id)
         job.progress.update_stage(
             stage,
             status=JobState.SUCCESS if progress_percentage >= 1.0 else JobState.STARTED,
             progress=progress_percentage,
         )
         if stage == "results" and progress_percentage >= 1.0:
             job.status = JobState.SUCCESS
             job.progress.summary.status = JobState.SUCCESS
-            job.finished_at = datetime.now()
+            job.finished_at = timezone.now()
         job.logger.info(f"Updated job {job_id} progress in stage '{stage}' to {progress_percentage*100}%")
         job.save()
ami/ml/orchestration/jobs.py (1)

89-92: Update both stages when the image list is empty.

When images is empty, only the "results" stage is marked as SUCCESS. The "process" stage should also be updated to maintain consistent job state.

Apply this diff:

     if not images:
+        job.progress.update_stage("process", status=JobState.SUCCESS, progress=1.0)
         job.progress.update_stage("results", status=JobState.SUCCESS, progress=1.0)
         job.save()
🧹 Nitpick comments (5)
ami/jobs/views.py (2)

241-248: Fix unused loop variable and consider a longer timeout.

  1. The loop variable i is unused; rename to _.
  2. The timeout=0.1 (100ms) is quite short for network operations. Consider a slightly longer timeout or making it configurable.
         async def get_tasks():
             tasks = []
             async with TaskQueueManager() as manager:
-                for i in range(batch):
-                    task = await manager.reserve_task(job_id, timeout=0.1)
+                for _ in range(batch):
+                    task = await manager.reserve_task(job_id, timeout=0.5)
                     if task:
                         tasks.append(task)
             return tasks

312-319: Use logger.exception to capture the stack trace.

When catching a broad exception, use logger.exception() instead of logger.error() to include the full traceback for debugging.

         except Exception as e:
-            logger.error(f"Failed to queue pipeline results for job {job_id}: {e}")
+            logger.exception(f"Failed to queue pipeline results for job {job_id}: {e}")
             return HttpResponseServerError(
                 {
                     "status": "error",
                     "job_id": job_id,
                 },
             )
ami/jobs/tasks.py (3)

100-103: Avoid using assert for runtime validation in production code.

Assertions can be disabled with the -O flag, which would skip this check entirely. For runtime validation, use an explicit conditional with an appropriate exception.

         if pipeline_result:
-            # should never happen since otherwise we could not be processing results here
-            assert job.pipeline is not None, "Job pipeline is None"
+            if job.pipeline is None:
+                raise ValueError(f"Job {job_id} has no pipeline configured")
             job.pipeline.save_results(results=pipeline_result, job_id=job.pk)

123-125: Use logger.exception to capture the full stack trace.

When logging exceptions, logger.exception() automatically includes the traceback, which aids debugging.

         except Exception as ack_error:
-            job.logger.error(f"Error acknowledging task via NATS: {ack_error}")
+            job.logger.exception(f"Error acknowledging task via NATS: {ack_error}")
             # Don't fail the task if ACK fails - data is already saved

138-144: Use logger.exception for error logging with stack traces.

Both exception handlers should use logger.exception() to capture full tracebacks.

     except Job.DoesNotExist:
-        logger.error(f"Job {job_id} not found")
+        logger.exception(f"Job {job_id} not found")
         raise
     except Exception as e:
-        logger.error(f"Failed to process pipeline result for job {job_id}: {e}")
+        logger.exception(f"Failed to process pipeline result for job {job_id}: {e}")
         # Celery will automatically retry based on autoretry_for
         raise
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1a9b80a and 3657fd2.

📒 Files selected for processing (3)
  • ami/jobs/tasks.py (3 hunks)
  • ami/jobs/views.py (5 hunks)
  • ami/ml/orchestration/jobs.py (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
ami/jobs/tasks.py (4)
ami/ml/orchestration/nats_queue.py (2)
  • TaskQueueManager (31-296)
  • acknowledge_task (212-231)
ami/ml/orchestration/task_state.py (2)
  • TaskStateManager (17-125)
  • update_state (54-83)
ami/jobs/models.py (3)
  • Job (734-1019)
  • JobState (27-63)
  • save (954-965)
ami/ml/models/pipeline.py (3)
  • save_results (814-922)
  • save_results (1120-1121)
  • save (1128-1134)
ami/jobs/views.py (3)
ami/jobs/tasks.py (1)
  • process_pipeline_result (49-144)
ami/ml/schemas.py (1)
  • PipelineTaskResult (232-238)
ami/ml/orchestration/nats_queue.py (2)
  • TaskQueueManager (31-296)
  • reserve_task (154-210)
ami/ml/orchestration/jobs.py (3)
ami/jobs/models.py (3)
  • Job (734-1019)
  • JobState (27-63)
  • logger (1004-1013)
ami/ml/orchestration/nats_queue.py (3)
  • TaskQueueManager (31-296)
  • cleanup_job_resources (280-296)
  • publish_task (122-152)
ami/ml/orchestration/task_state.py (3)
  • TaskStateManager (17-125)
  • cleanup (119-125)
  • initialize_job (39-49)
🪛 Ruff (0.14.8)
ami/jobs/tasks.py

123-123: Do not catch blind exception: Exception

(BLE001)


124-124: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


135-135: Abstract raise to an inner function

(TRY301)


139-139: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


142-142: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

ami/jobs/views.py

244-244: Loop control variable i not used within loop body

Rename unused i to _i

(B007)


256-256: Unused method argument: pk

(ARG002)


310-310: Avoid specifying long messages outside the exception class

(TRY003)


312-312: Do not catch blind exception: Exception

(BLE001)


313-313: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

ami/ml/orchestration/jobs.py

76-76: Do not catch blind exception: Exception

(BLE001)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: test
🔇 Additional comments (8)
ami/ml/orchestration/jobs.py (2)

76-78: Catching broad Exception is acceptable here for fault tolerance.

In an async context interacting with external NATS infrastructure, catching a broad exception to log and continue processing remaining images is reasonable. The success/failure is tracked per-image via failed_queues.


28-101: Implementation looks solid overall.

The function correctly:

  • Prepares messages outside async context to avoid Django ORM issues
  • Initializes Redis state tracking before queueing
  • Tracks successes/failures per image
  • Uses timezone-aware timestamps
ami/jobs/views.py (3)

34-49: JobFilterSet implementation looks good.

Clean filter implementation with proper field configuration.


274-308: Result endpoint implementation is well-structured.

The endpoint correctly:

  • Handles both list and single-item input
  • Validates with Pydantic
  • Queues tasks asynchronously via Celery
  • Returns proper response with task tracking info

228-228: Verify job_id type consistency with TaskQueueManager.

job.pk is an integer, but it's passed directly to manager.reserve_task(job_id, ...). Looking at TaskQueueManager, it expects job_id: str for methods like reserve_task. This works due to Python's dynamic typing, but the inconsistency could cause issues.

Consider converting explicitly:

-        job_id = job.pk
+        job_id = str(job.pk)
#!/bin/bash
# Verify TaskQueueManager method signatures expect string job_id
ast-grep --pattern 'async def reserve_task(self, job_id: str$_)'
rg -n "def reserve_task" --type=py -A2
ami/jobs/tasks.py (3)

41-48: Task configuration looks appropriate.

Good choices for retry and timeout configuration:

  • max_retries=3 with 60s delay provides reasonable retry behavior
  • 5-minute soft limit with 6-minute hard limit gives buffer for cleanup
  • autoretry_for=(Exception,) ensures transient failures are retried

203-225: Clever timing helper implementation.

The log_time helper using functools.partial for chained timing is elegant and reusable.


147-163: Job completion logic is well-structured.

The atomic transaction with select_for_update properly handles concurrent updates. The completion detection based on the "results" stage reaching 100% is appropriate.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
ami/ml/orchestration/nats_queue.py (1)

104-121: TTR mismatch risk when consumer already exists.

If a consumer was previously created with a different ack_wait value, this code silently reuses it without verifying the configuration matches TASK_TTR. JetStream doesn't allow mutating ack_wait on an existing durable consumer, which could lead to unexpected redelivery behavior if the TTR expectation changes.

Consider detecting mismatched ack_wait and recreating the consumer, or at minimum logging a warning when the existing consumer's configuration differs.

🧹 Nitpick comments (9)
ami/jobs/tasks.py (2)

111-125: Use logger.exception for better error diagnostics.

When catching exceptions, logger.exception automatically includes the stack trace, which aids debugging. This aligns with the Ruff TRY400 hint.

         except Exception as ack_error:
-            job.logger.error(f"Error acknowledging task via NATS: {ack_error}")
+            job.logger.exception(f"Error acknowledging task via NATS: {ack_error}")
             # Don't fail the task if ACK fails - data is already saved

138-144: Use logger.exception for better error diagnostics.

Similar to the NATS ack error handling, using logger.exception here will include the full stack trace for debugging.

     except Job.DoesNotExist:
-        logger.error(f"Job {job_id} not found")
+        logger.exception(f"Job {job_id} not found")
         raise
     except Exception as e:
-        logger.error(f"Failed to process pipeline result for job {job_id}: {e}")
+        logger.exception(f"Failed to process pipeline result for job {job_id}: {e}")
         # Celery will automatically retry based on autoretry_for
         raise
ami/ml/orchestration/jobs.py (3)

17-18: Consider adding return type to docstring.

The function signature shows -> bool but the docstring doesn't document the return value.

     """
     Clean up NATS JetStream resources (stream and consumer) for a completed job.

     Args:
         job: The Job instance
+
+    Returns:
+        bool: True if cleanup was successful, False otherwise
     """

76-78: Use logger.exception for better error diagnostics.

When catching exceptions in the queue loop, using logger.exception will include the stack trace.

                 except Exception as e:
-                    logger.error(f"Failed to queue image {image_pk} to stream for job '{job_id}': {e}")
+                    logger.exception(f"Failed to queue image {image_pk} to stream for job '{job_id}': {e}")
                     success = False

87-92: Empty images check occurs after queueing attempt.

The if not images check on line 89 happens after async_to_sync(queue_all_images)() has already executed. If images was empty, tasks would also be empty, and the async function would return (0, 0). This works but the check should ideally be earlier to avoid unnecessary async overhead.

+    if not images:
+        job.progress.update_stage("process", status=JobState.SUCCESS, progress=1.0)
+        job.progress.update_stage("results", status=JobState.SUCCESS, progress=1.0)
+        job.save()
+        job.logger.info(f"No images to queue for job '{job_id}'")
+        return True
+
     # Prepare all messages outside of async context to avoid Django ORM issues
     tasks: list[tuple[int, PipelineProcessingTask]] = []
     ...
     successful_queues, failed_queues = async_to_sync(queue_all_images)()

-    if not images:
-        job.progress.update_stage("process", status=JobState.SUCCESS, progress=1.0)
-        job.progress.update_stage("results", status=JobState.SUCCESS, progress=1.0)
-        job.save()
ami/jobs/views.py (3)

228-228: Consider passing job_id as string consistently.

The job_id is assigned as job.pk (likely an integer), but TaskQueueManager.reserve_task expects a string job_id. While Python will handle the conversion in f-strings internally, it's cleaner to be explicit.

-        job_id = job.pk
+        job_id = str(job.pk)

241-251: Rename unused loop variable and consider batch fetching optimization.

The loop variable i is unused (Ruff B007). Also, calling reserve_task sequentially in a loop with short timeouts may be inefficient. Consider fetching multiple messages in a single pull if the NATS client supports it.

         async def get_tasks():
             tasks = []
             async with TaskQueueManager() as manager:
-                for i in range(batch):
+                for _ in range(batch):
                     task = await manager.reserve_task(job_id, timeout=0.1)
                     if task:
                         tasks.append(task.dict())
             return tasks

312-319: Use logger.exception and avoid returning 500 directly.

Using logger.exception includes the stack trace. Also, consider using DRF's proper exception handling rather than returning status=500 directly.

         except Exception as e:
-            logger.error(f"Failed to queue pipeline results for job {job_id}: {e}")
+            logger.exception(f"Failed to queue pipeline results for job {job_id}: {e}")
             return Response(
                 {
                     "status": "error",
                     "job_id": job_id,
+                    "message": str(e),
                 },
                 status=500,
             )
ami/ml/orchestration/nats_queue.py (1)

144-144: Replace dict() with model_dump() for Pydantic v2 compatibility.

In Pydantic v2, dict() is deprecated in favor of model_dump(). Update line 144 from json.dumps(data.dict()) to json.dumps(data.model_dump()).

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3657fd2 and 2483592.

📒 Files selected for processing (5)
  • ami/jobs/tasks.py (3 hunks)
  • ami/jobs/views.py (5 hunks)
  • ami/ml/orchestration/jobs.py (1 hunks)
  • ami/ml/orchestration/nats_queue.py (1 hunks)
  • ami/ml/schemas.py (0 hunks)
💤 Files with no reviewable changes (1)
  • ami/ml/schemas.py
🧰 Additional context used
🧬 Code graph analysis (3)
ami/jobs/views.py (3)
ami/jobs/tasks.py (1)
  • process_pipeline_result (49-144)
ami/ml/schemas.py (1)
  • PipelineTaskResult (231-237)
ami/ml/orchestration/nats_queue.py (2)
  • TaskQueueManager (32-297)
  • reserve_task (156-211)
ami/ml/orchestration/nats_queue.py (1)
ami/ml/schemas.py (1)
  • PipelineProcessingTask (217-228)
ami/jobs/tasks.py (3)
ami/ml/orchestration/nats_queue.py (2)
  • TaskQueueManager (32-297)
  • acknowledge_task (213-232)
ami/ml/orchestration/task_state.py (2)
  • TaskStateManager (17-125)
  • update_state (54-83)
ami/ml/schemas.py (1)
  • PipelineResultsResponse (199-214)
🪛 Ruff (0.14.8)
ami/jobs/views.py

244-244: Loop control variable i not used within loop body

Rename unused i to _i

(B007)


256-256: Unused method argument: pk

(ARG002)


310-310: Avoid specifying long messages outside the exception class

(TRY003)


312-312: Do not catch blind exception: Exception

(BLE001)


313-313: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

ami/ml/orchestration/nats_queue.py

77-77: Avoid specifying long messages outside the exception class

(TRY003)


85-85: Do not catch blind exception: Exception

(BLE001)


98-98: Avoid specifying long messages outside the exception class

(TRY003)


107-107: Do not catch blind exception: Exception

(BLE001)


135-135: Avoid specifying long messages outside the exception class

(TRY003)


150-150: Consider moving this statement to an else block

(TRY300)


152-152: Do not catch blind exception: Exception

(BLE001)


153-153: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


168-168: Avoid specifying long messages outside the exception class

(TRY003)


209-209: Do not catch blind exception: Exception

(BLE001)


210-210: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


224-224: Avoid specifying long messages outside the exception class

(TRY003)


229-229: Consider moving this statement to an else block

(TRY300)


230-230: Do not catch blind exception: Exception

(BLE001)


231-231: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


245-245: Avoid specifying long messages outside the exception class

(TRY003)


253-253: Consider moving this statement to an else block

(TRY300)


254-254: Do not catch blind exception: Exception

(BLE001)


255-255: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


269-269: Avoid specifying long messages outside the exception class

(TRY003)


276-276: Consider moving this statement to an else block

(TRY300)


277-277: Do not catch blind exception: Exception

(BLE001)


278-278: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

ami/jobs/tasks.py

123-123: Do not catch blind exception: Exception

(BLE001)


124-124: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


135-135: Abstract raise to an inner function

(TRY301)


139-139: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


142-142: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

ami/ml/orchestration/jobs.py

76-76: Do not catch blind exception: Exception

(BLE001)

🔇 Additional comments (17)
ami/jobs/tasks.py (5)

1-17: LGTM!

Import organization is clean. Good use of timezone from Django utils (addressing prior feedback) and proper separation of concerns with the new orchestration imports.


41-63: LGTM!

Task configuration is well-structured with appropriate retry settings. The 5/6 minute soft/hard time limits align with the lock timeout in TaskStateManager (360 seconds). Docstring is now accurate (addresses prior feedback about return type).


72-75: LGTM!

Good fix for marking failed images as processed to prevent job hanging (addresses prior feedback). The conditional {str(image_id)} if image_id else set() properly handles cases where image_id might be missing.


147-162: LGTM!

Good use of transaction.atomic() with select_for_update() for safe concurrent updates. The job completion logic (setting finished_at with timezone.now()) properly addresses prior feedback. The conditional status transition when stage == "results" and progress_percentage >= 1.0 correctly handles job lifecycle termination.


203-225: LGTM!

Clever timing helper using functools.partial for chained measurements. The pattern of returning both the duration and a callable for the next measurement is elegant and readable.

ami/ml/orchestration/nats_queue.py (6)

1-27: LGTM!

Clean module structure with good documentation. The get_connection helper properly establishes both NATS and JetStream contexts.


43-60: LGTM!

Proper async context manager implementation. The __aexit__ correctly handles cleanup by nullifying references and closing the connection only if not already closed.


123-154: LGTM!

The publish_task method correctly ensures stream and consumer exist before publishing. Using data.dict() for Pydantic serialization is appropriate.


156-211: LGTM!

The reserve_task implementation is well-structured:

  • Properly creates ephemeral subscription for pull
  • Handles timeout gracefully with nats.errors.TimeoutError
  • Always unsubscribes in finally block
  • Returns parsed PipelineProcessingTask with reply_subject attached

213-232: LGTM!

Simple and effective acknowledgment via reply subject. The +ACK protocol message is correct for JetStream acknowledgment.


281-297: LGTM!

The cleanup_job_resources method correctly deletes consumer before stream (proper order to avoid orphaned resources).

ami/ml/orchestration/jobs.py (3)

10-25: LGTM!

Good implementation. The function now correctly returns the cleanup result (addressing prior feedback), allowing callers to handle success/failure.


42-57: LGTM!

Good defensive coding:

  • Validates image URL before queuing
  • Logs warning for skipped images
  • Properly builds PipelineProcessingTask objects outside async context to avoid Django ORM issues

89-92: LGTM!

Good fix addressing prior feedback - both "process" and "results" stages are now updated when the images list is empty, ensuring consistent job state.

ami/jobs/views.py (3)

33-48: LGTM!

Clean filterset implementation enabling pipeline filtering by slug.


255-272: LGTM!

Good handling of both single result and list of results. The endpoint correctly validates and normalizes input.


274-299: LGTM!

Good implementation of result queuing:

  • Each result is validated with Pydantic
  • Background task is properly dispatched via Celery .delay()
  • Response includes task IDs for tracking
  • Logging provides good observability

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
ami/ml/orchestration/nats_queue.py (1)

104-121: TTR mismatch when consumer already exists remains unaddressed.

Per the past review comment, if reserve_task() creates the consumer before publish_task(), the consumer uses the default TASK_TTR. If different TTR values are needed later, the existing consumer configuration won't be updated since JetStream doesn't allow mutating ack_wait on an existing durable consumer. This can cause premature redeliveries.

The suggested fix from the past review was to recreate the consumer when the configured ack_wait differs from the requested TTR. Consider implementing that logic or documenting that a single TTR value is always used.

🧹 Nitpick comments (5)
ami/ml/orchestration/jobs.py (2)

44-50: Avoid duplicate url() call.

image.url() is called twice: once in the condition check and again for assignment. If this method performs any computation or I/O, this is wasteful.

     for image in images:
         image_id = str(image.pk)
-        image_url = image.url() if hasattr(image, "url") and image.url() else ""
+        image_url = image.url() if hasattr(image, "url") else ""
         if not image_url:
             job.logger.warning(f"Image {image.pk} has no URL, skipping queuing to NATS for job '{job.pk}'")
             continue

68-77: Inconsistent logger usage inside async context.

Lines 70 and 76 use the module-level logger, while the rest of the function uses job.logger. This inconsistency can make log correlation difficult when debugging job-specific issues.

             for image_pk, task in tasks:
                 try:
-                    logger.info(f"Queueing image {image_pk} to stream for job '{job.pk}': {task.image_url}")
+                    job.logger.info(f"Queueing image {image_pk} to stream for job '{job.pk}': {task.image_url}")
                     success = await manager.publish_task(
                         job_id=job.pk,
                         data=task,
                     )
                 except Exception as e:
-                    logger.error(f"Failed to queue image {image_pk} to stream for job '{job.pk}': {e}")
+                    job.logger.error(f"Failed to queue image {image_pk} to stream for job '{job.pk}': {e}")
                     success = False
ami/jobs/views.py (2)

299-306: Simplify redundant list comprehension.

All items in queued_tasks already have status: "queued" (set on line 289), making the filter unnecessary.

             return Response(
                 {
                     "status": "accepted",
                     "job_id": job.pk,
-                    "results_queued": len([t for t in queued_tasks if t["status"] == "queued"]),
+                    "results_queued": len(queued_tasks),
                     "tasks": queued_tasks,
                 }
             )

310-317: Use logger.exception() to capture traceback.

logger.exception() automatically includes the traceback, which aids debugging when errors occur in production.

         except Exception as e:
-            logger.error(f"Failed to queue pipeline results for job {job.pk}: {e}")
+            logger.exception(f"Failed to queue pipeline results for job {job.pk}: {e}")
             return Response(
                 {
                     "status": "error",
                     "job_id": job.pk,
                 },
                 status=500,
             )
ami/ml/orchestration/nats_queue.py (1)

143-144: Pydantic v1 syntax used.

.dict() is Pydantic v1 syntax. For Pydantic v2 compatibility, use .model_dump(). This applies throughout the codebase where Pydantic models are serialized.

             # Convert Pydantic model to JSON
-            task_data = json.dumps(data.dict())
+            task_data = json.dumps(data.model_dump())
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2483592 and 3c034a9.

📒 Files selected for processing (4)
  • ami/jobs/tests.py (3 hunks)
  • ami/jobs/views.py (3 hunks)
  • ami/ml/orchestration/jobs.py (1 hunks)
  • ami/ml/orchestration/nats_queue.py (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
ami/ml/orchestration/nats_queue.py (1)
ami/ml/schemas.py (1)
  • PipelineProcessingTask (217-228)
ami/jobs/views.py (3)
ami/jobs/tasks.py (1)
  • process_pipeline_result (49-144)
ami/ml/schemas.py (1)
  • PipelineTaskResult (231-237)
ami/ml/orchestration/nats_queue.py (2)
  • TaskQueueManager (32-297)
  • reserve_task (156-211)
🪛 Ruff (0.14.8)
ami/ml/orchestration/nats_queue.py

77-77: Avoid specifying long messages outside the exception class

(TRY003)


85-85: Do not catch blind exception: Exception

(BLE001)


98-98: Avoid specifying long messages outside the exception class

(TRY003)


107-107: Do not catch blind exception: Exception

(BLE001)


135-135: Avoid specifying long messages outside the exception class

(TRY003)


150-150: Consider moving this statement to an else block

(TRY300)


152-152: Do not catch blind exception: Exception

(BLE001)


153-153: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


168-168: Avoid specifying long messages outside the exception class

(TRY003)


209-209: Do not catch blind exception: Exception

(BLE001)


210-210: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


224-224: Avoid specifying long messages outside the exception class

(TRY003)


229-229: Consider moving this statement to an else block

(TRY300)


230-230: Do not catch blind exception: Exception

(BLE001)


231-231: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


245-245: Avoid specifying long messages outside the exception class

(TRY003)


253-253: Consider moving this statement to an else block

(TRY300)


254-254: Do not catch blind exception: Exception

(BLE001)


255-255: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


269-269: Avoid specifying long messages outside the exception class

(TRY003)


276-276: Consider moving this statement to an else block

(TRY300)


277-277: Do not catch blind exception: Exception

(BLE001)


278-278: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

ami/ml/orchestration/jobs.py

75-75: Do not catch blind exception: Exception

(BLE001)

ami/jobs/views.py

255-255: Unused method argument: pk

(ARG002)


308-308: Avoid specifying long messages outside the exception class

(TRY003)


310-310: Do not catch blind exception: Exception

(BLE001)


311-311: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: test
🔇 Additional comments (7)
ami/jobs/tests.py (1)

397-399: LGTM!

The updated assertions correctly reflect the new response structure from the result endpoint, matching the implementation in views.py.

ami/ml/orchestration/jobs.py (2)

11-25: LGTM!

The cleanup function properly returns the boolean result from the async cleanup operation, addressing the earlier feedback.


86-102: LGTM!

The post-queueing logic correctly handles both the empty tasks case (updating both stages) and properly reports success/failure based on queue outcomes.

ami/ml/orchestration/nats_queue.py (3)

181-207: LGTM!

The task reservation logic correctly creates a pull subscription, handles timeouts gracefully, and ensures cleanup via finally block.


213-232: LGTM!

The acknowledgment implementation correctly publishes the ACK message to the reply subject.


281-297: LGTM!

The cleanup logic correctly deletes the consumer before the stream and returns a combined success indicator.

ami/jobs/views.py (1)

237-252: LGTM with note on Pydantic version compatibility.

The NATS task retrieval implementation looks correct. If this project uses or will upgrade to Pydantic v2, the .dict() call (line 246) should be replaced with .model_dump() as part of migration—.dict() is deprecated in Pydantic v2. For this simple no-argument usage, the replacement is straightforward: task.model_dump().

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
ami/jobs/tasks.py (1)

160-160: Use timezone-aware datetime for consistency with Django settings.

The code uses datetime.now() which returns a naive datetime. The inline comment suggests this is intentional ("Use naive datetime in local time"), but Django applications with USE_TZ=True should use timezone-aware timestamps to avoid timezone-related bugs.

Apply this diff:

 def _update_job_progress(job_id: int, stage: str, progress_percentage: float) -> None:
     from ami.jobs.models import Job, JobState  # avoid circular import
+    from django.utils import timezone

     with transaction.atomic():
         job = Job.objects.select_for_update().get(pk=job_id)
         job.progress.update_stage(
             stage,
             status=JobState.SUCCESS if progress_percentage >= 1.0 else JobState.STARTED,
             progress=progress_percentage,
         )
         if stage == "results" and progress_percentage >= 1.0:
             job.status = JobState.SUCCESS
             job.progress.summary.status = JobState.SUCCESS
-            job.finished_at = datetime.now()  # Use naive datetime in local time
+            job.finished_at = timezone.now()
         job.logger.info(f"Updated job {job_id} progress in stage '{stage}' to {progress_percentage*100}%")
         job.save()
🧹 Nitpick comments (4)
ami/jobs/tasks.py (3)

45-45: Consider narrowing the autoretry exception types.

The task uses autoretry_for=(Exception,) which will retry on any exception. This may mask distinct failure modes (e.g., validation errors vs. transient network issues) that should be handled differently. Consider specifying more targeted exceptions or using retry_for with explicit exception types.


123-124: Improve exception handling for NATS acknowledgment.

The code catches a bare Exception and logs via logging.error without the traceback. This makes debugging NATS acknowledgment failures difficult.

Apply this diff to use logging.exception for automatic traceback logging:

         except Exception as ack_error:
-            job.logger.error(f"Error acknowledging task via NATS: {ack_error}")
+            job.logger.exception(f"Error acknowledging task via NATS: {ack_error}")
             # Don't fail the task if ACK fails - data is already saved

Based on static analysis hints.


138-144: Use logging.exception for better error diagnostics.

The exception handlers at lines 139 and 142 use logging.error which doesn't include tracebacks. This makes troubleshooting production issues harder.

Apply this diff:

     except Job.DoesNotExist:
-        logger.error(f"Job {job_id} not found")
+        logger.exception(f"Job {job_id} not found")
         raise
     except Exception as e:
-        logger.error(f"Failed to process pipeline result for job {job_id}: {e}")
+        logger.exception(f"Failed to process pipeline result for job {job_id}: {e}")
         # Celery will automatically retry based on autoretry_for
         raise

Based on static analysis hints.

docker-compose.ci.yml (1)

43-55: Consider renaming container for CI consistency.

The NATS container is named ami_local_nats in a CI compose file. This could cause conflicts if both local and CI stacks run simultaneously, and doesn't follow the naming pattern of other CI resources (e.g., ami_ci_postgres_data).

Consider changing to ami_ci_nats:

   nats:
     image: nats:2.10-alpine
-    container_name: ami_local_nats
+    container_name: ami_ci_nats
     hostname: nats

Alternatively, remove the container_name directive entirely and let docker-compose auto-generate it with the stack name prefix.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3c034a9 and e9d2a1c.

📒 Files selected for processing (5)
  • .envs/.ci/.django (1 hunks)
  • .envs/.production/.django-example (1 hunks)
  • ami/jobs/tasks.py (3 hunks)
  • ami/jobs/tests.py (3 hunks)
  • docker-compose.ci.yml (3 hunks)
✅ Files skipped from review due to trivial changes (1)
  • .envs/.ci/.django
🧰 Additional context used
🧬 Code graph analysis (2)
ami/jobs/tasks.py (4)
ami/ml/orchestration/nats_queue.py (2)
  • TaskQueueManager (32-297)
  • acknowledge_task (213-232)
ami/ml/orchestration/task_state.py (2)
  • TaskStateManager (17-125)
  • update_state (54-83)
ami/ml/schemas.py (2)
  • PipelineResultsResponse (199-214)
  • summary (179-196)
ami/jobs/models.py (4)
  • Job (734-1019)
  • logger (1004-1013)
  • retry (873-885)
  • save (954-965)
ami/jobs/tests.py (2)
ami/ml/orchestration/jobs.py (1)
  • queue_images_to_nats (28-102)
ami/main/models.py (1)
  • SourceImage (1666-1914)
🪛 Ruff (0.14.8)
ami/jobs/tasks.py

123-123: Do not catch blind exception: Exception

(BLE001)


124-124: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


135-135: Abstract raise to an inner function

(TRY301)


139-139: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


142-142: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: test
🔇 Additional comments (7)
ami/jobs/tasks.py (2)

203-225: LGTM!

The log_time helper provides a clean, functional approach to timing code blocks. The use of time.perf_counter() is appropriate for performance measurements, and the docstring clearly explains the usage pattern.


1-18: LGTM!

The imports are well-organized and appropriate for the new functionality. The use of local imports inside functions (e.g., from ami.jobs.models import Job) to avoid circular dependencies is a good practice.

.envs/.production/.django-example (1)

69-71: LGTM!

The NATS URL configuration is consistent with the NATS service added to docker-compose and matches the pattern used in local and CI environments.

ami/jobs/tests.py (2)

330-338: LGTM!

The test now correctly creates and saves SourceImage instances before passing them to queue_images_to_nats. This ensures each image has a valid primary key, addressing the previous concern about unsaved objects with pk=None.


403-405: LGTM!

The updated assertions correctly reflect the new asynchronous result processing workflow where results are queued to Celery tasks rather than processed synchronously.

docker-compose.ci.yml (2)

25-25: LGTM!

Adding NATS to Django's dependencies is correct, as the new task queue and job processing functionality requires NATS JetStream to be available.


50-50: JetStream and monitoring flags are correctly configured.

The NATS service configuration uses -js to enable JetStream and -m 8222 to start the HTTP monitoring server on the conventional port. Both flags are supported in NATS 2.10 and this combination is the standard way to enable these features.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (9)
docker-compose.ci.yml (1)

19-25: Consider adding explicit health condition to depends_on for NATS.

The django service now depends on nats, but the dependency lacks condition: service_healthy, unlike the minio-init service (line 84). This means Django may start before NATS is fully initialized and healthy, potentially causing early connection failures.

If the application already has robust retry/backoff logic for NATS connections, this is acceptable. Otherwise, consider updating to ensure NATS is healthy:

     depends_on:
       - postgres
       - redis
       - minio-init
       - ml_backend
       - rabbitmq
-      - nats
+      nats:
+        condition: service_healthy
ami/ml/tests.py (3)

860-884: Solid coverage; consider relying more on the public API than _get_progress

This suite gives good coverage of initialization, per-stage tracking, empty jobs, and cleanup. The one downside is the heavy reliance on the private _get_progress helper, which couples tests to implementation details and may make refactors noisier. Where feasible, prefer going through update_state (including for test_progress_tracking and test_stages_independent) and reserve _get_progress calls for very targeted cases where you intentionally want to test that internal behavior.


895-918: Float equality assertions could use assertAlmostEqual for robustness

test_progress_tracking asserts exact equality on floating-point percentages (0.4, 0.8, 1.0). This works today but is a bit brittle to minor implementation changes (e.g., if total is computed differently). Using assertAlmostEqual(progress.percentage, 0.4, places=6) (and similarly for other percentages) would make these tests more resilient while preserving intent.


920-944: Locking behavior is covered; optional extra check for lock lifecycle

test_update_state_with_locking nicely validates that a held lock causes update_state to no-op and that progress resumes after the lock is cleared. If you want to harden this further, you could also assert that update_state does not inadvertently modify the lock when it’s held by another task (e.g., confirm the lock key/value is unchanged after the failed update) to guard against future changes to the lock-release logic.

ami/ml/orchestration/test_nats_queue.py (5)

10-38: Good structure and mocking helpers; small clarity tweaks possible

The use of IsolatedAsyncioTestCase plus _create_sample_task and _create_mock_nats_connection keeps the async tests readable and DRY. One minor improvement would be to document in _create_mock_nats_connection which subset of NATS/JetStream APIs you rely on (e.g., in a comment) so future changes to TaskQueueManager know what needs to be mocked or extended here.


39-49: Context manager lifecycle test is solid; optionally assert __aenter__/__aexit__ effects more strictly

The lifecycle test correctly verifies that a connection is established and closed. If you want this to catch more regressions, you could also assert that manager.nc/manager.js are None after the context exits, and optionally use nc.close.assert_awaited_once() to emphasize the async nature of the close call.


50-64: Publish-path test works but could assert arguments more precisely

Right now the test checks add_stream indirectly via a substring search on call_args. To make this more robust, consider asserting on the actual kwargs used (e.g., stream name and subjects) instead of relying on str(js.add_stream.call_args), which is more brittle to representation changes of mocks.


106-117: ACK behavior test is clear; consider also covering failure/no-connection case

The ACK test correctly ensures the manager publishes b"+ACK" to the supplied reply subject. As a follow-up, you might also add a negative test mirroring test_operations_without_connection_raise_error to ensure acknowledge_task also raises when no connection is open, keeping behavior consistent across APIs.


118-129: Cleanup test validates both consumer and stream deletion

test_cleanup_job_resources confirms that both delete_consumer and delete_stream are invoked and that the method returns True, which is the key behavior here. If you later add error handling (e.g., ignore “not found”), consider extending this test to cover that branch as well.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e9d2a1c and 3d198d0.

📒 Files selected for processing (3)
  • ami/ml/orchestration/test_nats_queue.py (1 hunks)
  • ami/ml/tests.py (1 hunks)
  • docker-compose.ci.yml (3 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
ami/ml/tests.py (1)
ami/ml/orchestration/task_state.py (4)
  • TaskStateManager (17-125)
  • initialize_job (39-49)
  • _get_progress (85-117)
  • update_state (54-83)
ami/ml/orchestration/test_nats_queue.py (1)
ami/ml/schemas.py (1)
  • PipelineProcessingTask (217-228)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: test
🔇 Additional comments (7)
docker-compose.ci.yml (2)

43-52: NATS service configuration is well-suited for CI testing.

The NATS service is properly configured with:

  • Pinned image version (nats:2.10-alpine) for reproducibility
  • Correct hostname (nats) matching the NATS_URL in environment config
  • JetStream enabled (-js flag) for the task queue feature
  • Monitoring port exposed (-m 8222) with appropriate healthcheck
  • Reasonable healthcheck parameters (10s interval, 5s timeout, 3 retries)

73-73: Minio healthcheck formatting improvement.

The reformatting from single-quoted string to explicit array syntax improves consistency with other healthchecks in the file (NATS, etc.) and aligns with docker-compose best practices for the healthcheck test directive.

ami/ml/tests.py (1)

960-980: Empty-job and cleanup behavior look correct

The expectations for empty jobs (immediately 100% complete) and for cleanup returning None progress afterwards are well aligned with TaskStateManager’s semantics. No changes needed here.

ami/ml/orchestration/test_nats_queue.py (4)

65-89: Reservation success path is well covered

The reserve_task happy path is nicely exercised: deserialization, reply_subject propagation, and unsubscribe are all asserted. This aligns well with the intended PipelineProcessingTask contract; no changes needed here.


90-105: No-message reservation behavior looks correct

test_reserve_task_no_messages appropriately verifies the None return and that the pull subscription is still cleaned up via unsubscribe(). That matches the desired contract for idle queues.


130-137: Naming convention tests nicely lock down per-job resource names

The expectations for stream (job_123), subject (job.123.tasks), and consumer (job-123-consumer) names are explicit and make future refactors safer. This is a good use of tests against private helpers.


138-150: Error-on-missing-connection behavior is correctly asserted

test_operations_without_connection_raise_error ensures that key operations fail fast with a clear RuntimeError when the manager isn’t entered. That’s important guardrail behavior for callers and looks correct as-is.

@carlosgjs
Copy link
Collaborator Author

@mihow I'm not addressing two of coderabbit comments:

  • Use of logger.exception() instead of logger.error() since I don't see it in the rest of the code base and its probably better done as a global change.
  • Use of timezone.now() instead of datetime.now() to update the Job's finished_at because the rest of the codebase uses datetime, I believe because we have django set to USE_TZ=False

@carlosgjs carlosgjs requested a review from mihow December 10, 2025 20:35
@netlify
Copy link

netlify bot commented Dec 19, 2025

👷 Deploy request for antenna-ssec pending review.

Visit the deploys page to approve it

Name Link
🔨 Latest commit 3a73329

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
docker-compose.staging.yml (2)

20-23: Fix indentation for the commented NATS dependency.

The commented # - nats has extra indentation compared to the other list items. While YAML allows comments at any indentation, this is inconsistent and could be confusing.

🔎 Proposed fix
     depends_on:
       - postgres
       - redis
-        # - nats  
+      # - nats

69-82: Consider JetStream persistence and container naming.

Two observations:

  1. No JetStream persistence: Without a volume mount for JetStream storage, streams/consumers and any pending messages will be lost on container restart. This may be acceptable for staging/testing, but consider adding persistence if jobs shouldn't be lost across restarts:

    volumes:
      - nats_data:/data
    command: ["-js", "-m", "8222", "-sd", "/data"]
  2. Container name inconsistency: ami_local_nats uses "local" prefix but this is the staging compose file. Consider ami_staging_nats for consistency.

Please confirm whether JetStream persistence is intentionally omitted for staging, or if it should be added to preserve task queues across container restarts.

📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3d198d0 and 3a73329.

📒 Files selected for processing (2)
  • ami/jobs/tasks.py (3 hunks)
  • docker-compose.staging.yml (4 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
ami/jobs/tasks.py (5)
ami/ml/orchestration/nats_queue.py (2)
  • TaskQueueManager (32-297)
  • acknowledge_task (213-232)
ami/ml/orchestration/task_state.py (2)
  • TaskStateManager (17-125)
  • update_state (54-83)
ami/ml/schemas.py (2)
  • PipelineResultsResponse (199-214)
  • summary (179-196)
ami/jobs/models.py (5)
  • Job (734-1019)
  • logger (1004-1013)
  • retry (873-885)
  • JobState (27-63)
  • save (954-965)
ami/ml/models/pipeline.py (3)
  • save_results (814-922)
  • save_results (1120-1121)
  • save (1128-1134)
🪛 Ruff (0.14.8)
ami/jobs/tasks.py

98-98: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


124-124: Abstract raise to an inner function

(TRY301)


127-127: Do not catch blind exception: Exception

(BLE001)


128-128: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


144-144: Do not catch blind exception: Exception

(BLE001)


145-145: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: test
🔇 Additional comments (4)
docker-compose.staging.yml (1)

32-50: LGTM!

Adding restart: always to the core services (django, postgres, redis) improves reliability for the staging environment.

ami/jobs/tasks.py (3)

131-147: LGTM - Error handling strategy is appropriate.

The helper correctly wraps the async NATS acknowledgment and handles failures gracefully without failing the parent task. The comment on line 146 clearly explains the design decision: if data is already saved, it's better to log ACK failures and let NATS redeliver than to fail the entire task.


205-227: LGTM - Clean timing utility.

The implementation is straightforward and correct. Using time.perf_counter() is appropriate for measuring elapsed time, and the partial function pattern allows convenient chaining of timing measurements throughout the task execution.


149-165: Verified: USE_TZ=False is configured—no changes needed.

The project's Django settings explicitly set USE_TZ = False with a comment noting "All time are in local time." With this configuration, datetime.datetime.now() on line 162 is the correct approach and is used consistently throughout the codebase (20+ instances). The explicit comment in the code confirming "Use naive datetime in local time" aligns with the project's timezone strategy.

Comment on lines +102 to +129
try:
# Save to database (this is the slow operation)
if pipeline_result:
# should never happen since otherwise we could not be processing results here
assert job.pipeline is not None, "Job pipeline is None"
job.pipeline.save_results(results=pipeline_result, job_id=job.pk)
job.logger.info(f"Successfully saved results for job {job_id}")

_, t = t(
f"Saved pipeline results to database with {len(pipeline_result.detections)} detections"
f", percentage: {progress_info.percentage*100}%"
)

_ack_task_via_nats(reply_subject, job.logger)
# Update job stage with calculated progress
progress_info = state_manager.update_state(processed_image_ids, stage="results", request_id=self.request.id)

if not progress_info:
logger.warning(
f"Another task is already processing results for job {job_id}. "
f"Retrying task {self.request.id} in 5 seconds..."
)
raise self.retry(countdown=5, max_retries=10)
_update_job_progress(job_id, "results", progress_info.percentage)

except Exception as e:
job.logger.error(f"Failed to process pipeline result for job {job_id}: {e}. Retrying ...")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

# Search for save_results method definition to check for idempotency handling
rg -n "def save_results" --type=py -A 20

Repository: RolnickLab/antenna

Length of output: 2730


🏁 Script executed:

# Also search for _ack_task_via_nats to understand what happens on ACK failure
rg -n "_ack_task_via_nats|def _ack_task_via_nats" --type=py -A 15

Repository: RolnickLab/antenna

Length of output: 3185


🏁 Script executed:

# Check the exception handler and retry logic in the task
rg -n "except Exception" ami/jobs/tasks.py -B 5 -A 10

Repository: RolnickLab/antenna

Length of output: 1999


🏁 Script executed:

# Get the full save_results function implementation
sed -n '814,900p' ami/ml/models/pipeline.py

Repository: RolnickLab/antenna

Length of output: 3415


🏁 Script executed:

# Look for Detection.objects.create calls in save_results
rg -n "Detection.*create|bulk_create" ami/ml/models/pipeline.py -B 2 -A 2 | head -50

Repository: RolnickLab/antenna

Length of output: 1139


Confirm: save_results lacks idempotency safeguards and will create duplicate Detection records on NATS redelivery.

The implementation at ami/ml/models/pipeline.py:506 uses Detection.objects.bulk_create() without checking for existing records. If ACK fails after save_results completes (line 107 in tasks.py), the NATS message remains unacknowledged. When JetStream redelivers the message after the ack-wait timeout, the entire task re-executes and calls save_results again with identical data, creating duplicate detections in the database.

Implement idempotency by either:

  • Adding a check to skip detections already in the database before bulk_create
  • Using get_or_create logic per detection
  • Adding unique constraints on Detection fields and using upsert operations

Minor: Clarify the exception handler log message.

Line 128 logs "Retrying..." but the task doesn't call self.retry(). Clarify to "Failed to process pipeline result for job {job_id}: {e}. NATS will redeliver the message." to accurately reflect the retry mechanism.

🧰 Tools
🪛 Ruff (0.14.8)

124-124: Abstract raise to an inner function

(TRY301)


127-127: Do not catch blind exception: Exception

(BLE001)


128-128: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

PSv2 Job scheduler Reference PSv2 implementation PSv2 Pull API design and specification

2 participants