Skip to content

Conversation

@agam1092005
Copy link

Description

Implements node-level timeouts to prevent nodes from getting stuck in QUEUED status indefinitely.

Fixes #444

Changes

Core Implementation

  • New Status: Added TIMEDOUT status to StateStatusEnum for timed-out nodes
  • Database: Added queued_at timestamp field to State model with optimized index for timeout queries
  • Configuration: Added NODE_TIMEOUT_MINUTES environment variable (default: 30 minutes)
  • Scheduled Task: Created check_node_timeout task that runs every minute to detect and timeout stuck nodes
  • Enqueue Logic: Updated state enqueuing to record queued_at timestamp

Testing

  • Added comprehensive unit tests covering timeout detection, threshold calculation, and error handling
  • All existing tests pass without regression

How It Works

  1. When a state transitions to QUEUED, the system records the current timestamp in queued_at
  2. Every minute, a background task scans for QUEUED states older than the configured timeout
  3. Timed-out states are automatically marked as TIMEDOUT with a descriptive error message
  4. The timeout duration is configurable via NODE_TIMEOUT_MINUTES environment variable

Testing

cd state-manager
uv run pytest tests/unit/tasks/test_check_node_timeout.py -v

@safedep
Copy link

safedep bot commented Oct 3, 2025

SafeDep Report Summary

Green Malicious Packages Badge Green Vulnerable Packages Badge Green Risky License Badge

Package Details
Package Malware Vulnerability Risky License Report

This report is generated by SafeDep Github App

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Oct 3, 2025

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

📝 Walkthrough

Summary by CodeRabbit

Release Notes

  • New Features

    • Added configurable timeout mechanism for queued nodes with global default (60 minutes) and per-node override support.
    • Introduced automatic detection and marking of timed-out nodes with new TIMEDOUT status.
    • Added timeout tracking fields for improved visibility into node state lifecycle.
  • Tests

    • Added comprehensive unit tests for timeout detection functionality with custom and global timeout scenarios.

Walkthrough

Adds per-node and global node timeout settings, records queued_at/timeout_at when enqueuing, stores timeout_minutes on states and registered nodes, introduces TIMEDOUT status, schedules a per-minute job to mark overdue QUEUED states TIMEDOUT, and updates controllers, models, and tests accordingly.

Changes

Cohort / File(s) Change Summary
Configuration
state-manager/app/config/settings.py
Added node_timeout_minutes: int = Field(default=60, gt=0, description="Timeout in minutes for nodes stuck in QUEUED status") and load from NODE_TIMEOUT_MINUTES in Settings.from_env.
State model & index
state-manager/app/models/db/state.py
Added queued_at: Optional[int], timeout_at: Optional[int], timeout_minutes: Optional[int] (gt=0). Added compound index timeout_query_index on (status, timeout_at).
Status enum
state-manager/app/models/state_status_enum.py
Added new enum member TIMEDOUT.
Enqueue controller
state-manager/app/controller/enqueue_states.py
Use get_settings(), compute current_time_ms, and switch to an aggregation update pipeline that sets status=QUEUED, queued_at=current_time_ms, and computes timeout_at from timeout_minutes or global node_timeout_minutes.
Registered node models & controller
state-manager/app/models/db/registered_node.py, state-manager/app/models/register_nodes_request.py, state-manager/app/models/register_nodes_response.py, state-manager/app/controller/register_nodes.py
Added optional timeout_minutes to RegisteredNode and request/response models; include timeout_minutes in create/update flows and responses.
Trigger & next-state creation
state-manager/app/controller/trigger_graph.py, state-manager/app/tasks/create_next_states.py
Resolve per-node timeout_minutes (fallback to get_settings().node_timeout_minutes) and include timeout_minutes when constructing new State instances.
Retry & requeue flows
state-manager/app/controller/errored_state.py, state-manager/app/controller/manual_retry_state.py, state-manager/app/controller/re_queue_after_signal.py
Propagate timeout_at/timeout_minutes into retry-created State objects; requeue clears timeout_at and queued_at when updating enqueue_after.
Timeout task & scheduler
state-manager/app/tasks/check_node_timeout.py, state-manager/app/main.py
Added async check_node_timeout() that sets QUEUED states with timeout_at <= now to TIMEDOUT and logs results; scheduled to run every minute in main.
Tests
state-manager/tests/unit/tasks/test_check_node_timeout.py
Added unit tests covering timed-out vs non-timed-out states, custom vs global timeout precedence, and exception handling (mocks for State, settings, time, logger).

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor Scheduler
  participant App as Application
  participant Cron as check_node_timeout
  participant Config as Settings
  participant DB as State Collection
  Note over Cron,Config: runs every 1 minute
  Scheduler->>App: start & register job (check_node_timeout)
  Scheduler-->>Cron: trigger
  Cron->>Config: get_settings().node_timeout_minutes
  Cron->>Cron: now_ms = time.time()*1000
  Cron->>DB: update_many({status: QUEUED, timeout_at: {$lte: now_ms}} -> set status=TIMEDOUT, set error_message)
  DB-->>Cron: {modified_count}
  Cron-->>App: log results
Loading
sequenceDiagram
  autonumber
  participant Controller as enqueue_states.find_state
  participant Config as Settings
  participant DB as State Collection
  Controller->>Controller: current_time_ms = time.time()*1000
  Controller->>Config: get_settings().node_timeout_minutes
  Controller->>DB: find_one_and_update(match CREATED & enqueue_after <= now, aggregation -> set status=QUEUED, queued_at=current_time_ms, timeout_at = ( $ifNull(state.timeout_minutes, global) * 60000 ) + current_time_ms)
  DB-->>Controller: updated document or null
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

Poem

I hop the queue and watch the clock,
I nudge the stuck with gentle knock.
Minutes set, deadlines found,
TIMEDOUT hops when time's run 'round.
A rabbit's grin — cron keeps time! 🐇⏰

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (4 passed)
Check name Status Explanation
Title Check ✅ Passed The pull request title "feat: add node-level timeouts to prevent stuck queued states" directly and clearly summarizes the primary objective of the changeset. The title is concise, specific, and accurately reflects the main feature being implemented—adding timeout mechanisms to prevent states from remaining in QUEUED status indefinitely. The title is neither vague nor overly broad, and it provides clear context about the core functionality being introduced.
Linked Issues Check ✅ Passed The changes meet all primary coding requirements from linked issue #444. The PR successfully prevents nodes from getting stuck in QUEUED status by adding a check_node_timeout task that runs every minute to detect and mark timed-out states [#444]. Configurability is achieved through both per-node timeout_minutes (optional, stored on RegisteredNode) and global NODE_TIMEOUT_MINUTES environment variable (default 30 minutes) [#444]. Performance considerations are addressed by resolving and storing timeout values at state creation time in trigger_graph.py and create_next_states.py, avoiding database joins on the enqueue path [#444]. Backward compatibility is ensured through optional fields with proper fallbacks to the global setting [#444]. A new TIMEDOUT status has been added to StateStatusEnum to represent the timed-out state [#444].
Out of Scope Changes Check ✅ Passed All changes in the pull request are directly related to the timeout feature implementation objectives. The modifications to errored_state.py and manual_retry_state.py that preserve timeout information during retry state creation are in-scope, as they ensure timeout configurations are maintained through state retries. Similarly, the changes to re_queue_after_signal.py that reset timeout_at and queued_at on re-queue are in-scope, as they properly clear stale timeout information when re-queuing states. No out-of-scope changes have been identified.
Description Check ✅ Passed The pull request description is clearly related to the changeset and provides comprehensive information about the changes being made. It includes a well-structured breakdown of the implementation (new status, database changes, configuration, scheduled task, and enqueue logic), explains how the feature works, provides testing instructions, and references the related issue #444. The description is neither vague nor generic, and it meaningfully conveys the scope and objectives of the changes.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @agam1092005, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request addresses the issue of nodes getting indefinitely stuck in a QUEUED state by introducing a robust timeout mechanism. It establishes a new TIMEDOUT status, tracks the queuing time of states, and periodically scans for and updates any states that have exceeded a predefined, configurable timeout period. This ensures that the system can automatically resolve stuck states, improving overall reliability and preventing resource wastage.

Highlights

  • New State Status: Introduced a new TIMEDOUT status to StateStatusEnum to explicitly mark nodes that have exceeded their processing time.
  • Timestamp Tracking: Added a queued_at timestamp field to the State database model, which is recorded when a state transitions to QUEUED.
  • Configurable Timeout: Implemented a NODE_TIMEOUT_MINUTES environment variable (defaulting to 30 minutes) to allow configuration of the timeout duration for queued nodes.
  • Scheduled Timeout Check: Created a new check_node_timeout asynchronous task that runs every minute to identify and update states that have been in the QUEUED status beyond the configured timeout.
  • Database Indexing: Added an optimized MongoDB index on status and queued_at fields to efficiently query for timed-out states.
  • Comprehensive Testing: Included new unit tests for the check_node_timeout task, covering various scenarios like marking timed-out states, handling no timed-out states, exception handling, and correct threshold calculation.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a valuable feature to prevent nodes from being stuck in a 'QUEUED' state by implementing a timeout mechanism. The changes are well-structured, including a new configuration, a scheduled task for checking timeouts, updates to the database model with a necessary index, and a new 'TIMEDOUT' status. The accompanying unit tests are comprehensive and cover the new logic well.

My review includes a few suggestions to improve robustness and correctness:

  • Handling potential configuration errors more gracefully.
  • Ensuring consistency in timestamping.
  • Enhancing error logging for better debuggability.

Overall, this is a solid implementation of an important feature.

@codecov
Copy link

codecov bot commented Oct 3, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.

📢 Thoughts on this report? Let us know!

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 964d0f2 and 735cd6d.

📒 Files selected for processing (7)
  • state-manager/app/config/settings.py (2 hunks)
  • state-manager/app/controller/enqueue_states.py (1 hunks)
  • state-manager/app/main.py (2 hunks)
  • state-manager/app/models/db/state.py (2 hunks)
  • state-manager/app/models/state_status_enum.py (1 hunks)
  • state-manager/app/tasks/check_node_timeout.py (1 hunks)
  • state-manager/tests/unit/tasks/test_check_node_timeout.py (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
state-manager/app/controller/enqueue_states.py (1)
state-manager/app/models/state_status_enum.py (1)
  • StateStatusEnum (4-21)
state-manager/app/tasks/check_node_timeout.py (4)
state-manager/app/models/db/state.py (1)
  • State (13-114)
state-manager/app/models/state_status_enum.py (1)
  • StateStatusEnum (4-21)
state-manager/app/singletons/logs_manager.py (2)
  • LogsManager (9-66)
  • get_logger (65-66)
state-manager/app/config/settings.py (1)
  • get_settings (34-38)
state-manager/tests/unit/tasks/test_check_node_timeout.py (2)
state-manager/app/models/state_status_enum.py (1)
  • StateStatusEnum (4-21)
state-manager/app/tasks/check_node_timeout.py (1)
  • check_node_timeout (10-36)
state-manager/app/main.py (1)
state-manager/app/tasks/check_node_timeout.py (1)
  • check_node_timeout (10-36)
🔇 Additional comments (9)
state-manager/app/models/state_status_enum.py (1)

14-14: LGTM!

The new TIMEDOUT status is correctly added to the StateStatusEnum and appropriately placed in the "Errored" section.

state-manager/app/controller/enqueue_states.py (1)

26-29: LGTM!

The addition of queued_at timestamp when transitioning to QUEUED status is correctly implemented and consistent with the timestamp format used elsewhere in the codebase (milliseconds since epoch).

state-manager/app/main.py (1)

80-88: LGTM!

The scheduled job for timeout checking is correctly configured with appropriate parameters. Running every minute with max_instances=1 and coalesce=True ensures efficient and non-overlapping execution.

state-manager/app/models/db/state.py (2)

31-31: LGTM!

The queued_at field is correctly defined as Optional[int] with an appropriate description, consistent with other timestamp fields in the model.


107-113: LGTM!

The timeout_query_index on (status, queued_at) is well-designed to efficiently support the timeout detection query in check_node_timeout.

state-manager/app/config/settings.py (1)

25-26: LGTM!

The environment variable loading is correctly implemented with appropriate type conversion and default value.

state-manager/tests/unit/tasks/test_check_node_timeout.py (3)

9-40: LGTM!

Comprehensive test covering the happy path with appropriate assertions on the query structure, update operation, and modified count.


42-62: LGTM!

Good coverage of the edge case where no states are timed out.


64-85: LGTM!

Exception handling is properly tested with verification of error logging.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 735cd6d and 4303a3d.

📒 Files selected for processing (4)
  • state-manager/app/config/settings.py (2 hunks)
  • state-manager/app/controller/enqueue_states.py (1 hunks)
  • state-manager/app/tasks/check_node_timeout.py (1 hunks)
  • state-manager/tests/unit/tasks/test_check_node_timeout.py (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
state-manager/app/tasks/check_node_timeout.py (4)
state-manager/app/models/db/state.py (1)
  • State (13-114)
state-manager/app/models/state_status_enum.py (1)
  • StateStatusEnum (4-21)
state-manager/app/singletons/logs_manager.py (2)
  • LogsManager (9-66)
  • get_logger (65-66)
state-manager/app/config/settings.py (1)
  • get_settings (34-38)
state-manager/tests/unit/tasks/test_check_node_timeout.py (2)
state-manager/app/models/state_status_enum.py (1)
  • StateStatusEnum (4-21)
state-manager/app/tasks/check_node_timeout.py (1)
  • check_node_timeout (10-36)
state-manager/app/controller/enqueue_states.py (2)
state-manager/app/models/db/state.py (1)
  • State (13-114)
state-manager/app/models/state_status_enum.py (1)
  • StateStatusEnum (4-21)
🔇 Additional comments (3)
state-manager/app/controller/enqueue_states.py (1)

16-30: LGTM! Timestamp consistency addressed.

The change correctly captures the current timestamp once and reuses it for both the query filter and the update operation, eliminating the inconsistency flagged in the previous review. The logic for setting queued_at when transitioning to QUEUED is sound.

state-manager/app/tasks/check_node_timeout.py (1)

10-36: LGTM! Well-implemented timeout detection.

The implementation correctly:

  • Computes the timeout threshold using the configured node_timeout_minutes
  • Queries for QUEUED states with non-null queued_at that exceed the threshold
  • Updates timed-out states to TIMEDOUT status with a descriptive error message
  • Logs the modified count for observability
  • Handles exceptions with full stack traces (addresses previous review feedback)
state-manager/tests/unit/tasks/test_check_node_timeout.py (1)

7-114: LGTM! Comprehensive test coverage.

The test suite thoroughly covers the timeout detection functionality:

  • Verifies states are marked as TIMEDOUT with correct error messages
  • Tests the zero-modification case
  • Validates exception handling with proper logging
  • Confirms threshold calculation accuracy (with realistic mock timestamp addressing previous feedback)

All tests use appropriate mocking and assertions.

@agam1092005
Copy link
Author

Hi @NiveditJain ,

Thanks for the feedback! I have a few questions about implementing node-level timeout:

  1. Scope: Should I implement node-level timeout in this PR, or would you prefer to address it as a separate enhancement after merging the current global timeout implementation?

  2. Approach: If implementing now, I'm thinking:

    • Add optional timeout_minutes field to NodeRegistration model
    • Store it in RegisteredNode database model
    • Fall back to global NODE_TIMEOUT_MINUTES if not specified per-node
    • This maintains backward compatibility
  3. Bot feedback: coderabbitai suggests node-level timeout as a "future enhancement" and notes line 25 has inconsistent type casting (though that's unrelated to this PR). Should I:

    • Fix only the timeout-related code now, and address node-level timeout separately?
    • Or implement the full node-level architecture in this PR?

Let me know your preference and I'll proceed accordingly!

@NiveditJain
Copy link
Member

@agam1092005,
1 - Yes, there is RegisterNode process in the system which essentially registers the details node to StateManager, we should add timeout with Nodes.
2 - I like the Approach you mentioned here.
3 - We can create an issue for these future enhancements

Please ensure that we do not do any merges/joins in enqueue request API as its the most frequently used API, we should add timeouts to states at the point of creation.

@agam1092005
Copy link
Author

@NiveditJain ,
I've implemented the node-level timeout as requested:

Added timeout_minutes to:

  • NodeRegistrationModel (optional, validates gt=0)
  • RegisteredNode database model
  • State model (populated at creation time)

Performance optimized per your requirement:

  • No database joins in enqueue API
  • Timeout looked up and stored during state creation in trigger_graph.py and create_next_states.py
  • check_node_timeout uses per-state timeout values

Backward compatible:

  • All timeout_minutes fields are optional
  • Falls back to global NODE_TIMEOUT_MINUTES when not specified
  • Existing nodes/states continue working

Also fixed:

  • Type casting consistency issue flagged by coderabbit
  • Added comprehensive test coverage

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4303a3d and 8634281.

📒 Files selected for processing (10)
  • state-manager/app/config/settings.py (2 hunks)
  • state-manager/app/controller/register_nodes.py (3 hunks)
  • state-manager/app/controller/trigger_graph.py (3 hunks)
  • state-manager/app/models/db/registered_node.py (2 hunks)
  • state-manager/app/models/db/state.py (2 hunks)
  • state-manager/app/models/register_nodes_request.py (1 hunks)
  • state-manager/app/models/register_nodes_response.py (1 hunks)
  • state-manager/app/tasks/check_node_timeout.py (1 hunks)
  • state-manager/app/tasks/create_next_states.py (3 hunks)
  • state-manager/tests/unit/tasks/test_check_node_timeout.py (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (5)
state-manager/app/tasks/check_node_timeout.py (4)
state-manager/app/models/db/state.py (1)
  • State (13-116)
state-manager/app/models/state_status_enum.py (1)
  • StateStatusEnum (4-21)
state-manager/app/singletons/logs_manager.py (2)
  • LogsManager (9-66)
  • get_logger (65-66)
state-manager/app/config/settings.py (1)
  • get_settings (34-38)
state-manager/app/controller/trigger_graph.py (2)
state-manager/app/models/db/registered_node.py (2)
  • RegisteredNode (8-45)
  • get_by_name_and_namespace (28-32)
state-manager/app/config/settings.py (1)
  • get_settings (34-38)
state-manager/tests/unit/tasks/test_check_node_timeout.py (2)
state-manager/app/tasks/check_node_timeout.py (1)
  • check_node_timeout (10-42)
state-manager/app/models/state_status_enum.py (1)
  • StateStatusEnum (4-21)
state-manager/app/tasks/create_next_states.py (1)
state-manager/app/config/settings.py (1)
  • get_settings (34-38)
state-manager/app/controller/register_nodes.py (1)
state-manager/app/models/db/registered_node.py (1)
  • RegisteredNode (8-45)
🔇 Additional comments (27)
state-manager/app/models/register_nodes_response.py (2)

2-2: LGTM: Import addition is correct.

The addition of Optional to the imports properly supports the new timeout_minutes field.


10-10: LGTM: Field definition follows best practices.

The timeout_minutes field is correctly defined with:

  • Proper type annotation (Optional[int])
  • Validation constraint (gt=0) preventing zero/negative values
  • Clear description explaining fallback behavior
  • Consistent with the DB model and request model definitions
state-manager/app/models/db/registered_node.py (2)

3-3: LGTM: Import correctly supports the new field.


16-16: LGTM: Field definition is consistent and validated.

The timeout_minutes field on the DB model matches the response model definition with proper validation (gt=0) and clear description. No indexing is needed since this field is only read for timeout resolution, not queried.

state-manager/app/models/register_nodes_request.py (2)

2-2: LGTM: Import supports the new optional field.


10-10: LGTM: Request model field is consistent with response and DB models.

The field definition is identical across request, response, and database models, ensuring consistency throughout the API layer.

state-manager/app/controller/trigger_graph.py (3)

10-10: LGTM: Imports correctly support timeout resolution.

The addition of RegisteredNode and get_settings imports properly enables per-node timeout lookup and global fallback.

Also applies to: 13-13


96-104: LGTM: Timeout resolution correctly implements per-node override with global fallback.

The logic properly:

  • Queries the RegisteredNode for the root node
  • Uses per-node timeout_minutes when available
  • Falls back to global settings.node_timeout_minutes when not set
  • Follows the precedence specified in PR objectives

106-118: LGTM: State constructor correctly includes timeout_minutes.

The resolved timeout_minutes value is properly passed to the State constructor, ensuring timeout configuration is propagated from node registration through state creation.

state-manager/app/tasks/create_next_states.py (3)

13-13: LGTM: Import enables global timeout fallback.


166-174: LGTM: Timeout resolution efficiently reuses cached RegisteredNode.

The logic correctly:

  • Leverages the existing get_registered_node cache to avoid redundant DB queries
  • Implements the same per-node > global precedence as trigger_graph.py
  • Maintains consistency across state creation paths

176-189: LGTM: State construction includes resolved timeout.

The timeout_minutes is correctly passed to the State constructor, ensuring timeout configuration is propagated to all next states in the graph execution.

state-manager/app/controller/register_nodes.py (3)

28-36: LGTM: Update flow correctly includes timeout_minutes.

The timeout_minutes field is properly added to the update payload alongside other node attributes, ensuring existing nodes can have their timeout configuration updated.


40-52: LGTM: Create flow correctly includes timeout_minutes.

The timeout_minutes is properly set on the RegisteredNode constructor when creating new nodes, consistent with other fields.


54-62: LGTM: Response model construction includes timeout_minutes.

The API response correctly includes timeout_minutes in the RegisteredNodeModel, ensuring clients receive the timeout configuration for registered nodes.

state-manager/app/config/settings.py (2)

16-16: LGTM: Field includes validation as requested in past reviews.

The node_timeout_minutes field correctly includes:

  • gt=0 validation preventing zero/negative timeouts (addresses past review feedback)
  • Reasonable default of 30 minutes
  • Clear description

Based on learnings.


25-26: LGTM: Environment loading uses consistent string defaults.

Both trigger_workers and node_timeout_minutes now use string defaults, allowing Pydantic to handle type coercion and validation consistently. This addresses the inconsistency flagged in past reviews.

Based on learnings.

state-manager/app/tasks/check_node_timeout.py (5)

1-7: LGTM: Imports and logger setup are correct.

All necessary dependencies are imported and the logger is properly initialized.


10-15: LGTM: Task initialization and timestamp calculation are correct.

The function correctly loads settings and calculates the current timestamp in milliseconds for timeout comparisons.


23-34: LGTM: Timeout resolution and threshold logic are correct.

The implementation correctly:

  • Uses per-state timeout_minutes when available, falling back to global setting
  • Calculates timeout threshold in milliseconds
  • Compares queued_at against threshold using <= (includes boundary)
  • Sets descriptive error message with actual timeout duration

36-39: LGTM: Bulk update efficiently handles timed-out states.

The use of State.save_all() for bulk updates is efficient, and logging the count provides useful observability.


41-42: LGTM: Exception handling includes stack trace.

The error handling correctly uses exc_info=True to log the full stack trace for debugging, as requested in past reviews. The task does not crash on error, allowing the scheduler to continue.

Based on learnings.

state-manager/tests/unit/tasks/test_check_node_timeout.py (5)

7-60: LGTM! Comprehensive test coverage for different timeout scenarios.

The test correctly validates:

  • Per-node timeout handling (state1 with custom 30 min)
  • States that don't exceed timeout remain QUEUED (state2)
  • Global timeout fallback (state3 with None timeout_minutes)
  • Bulk save operation with only timed-out states

The realistic timestamp and clear test structure make this easy to understand and maintain.


63-92: LGTM! Good negative test case.

The test correctly verifies that:

  • States not exceeding timeout remain in QUEUED status
  • No database updates occur when no states timeout

95-108: LGTM! Exception handling properly tested.

The test correctly verifies that exceptions are caught and logged with stack trace information (exc_info=True), ensuring errors don't crash the scheduled task.


111-152: LGTM! Critical test for custom timeout precedence.

The test correctly validates that:

  • Custom per-node timeout_minutes takes precedence over global setting
  • Global timeout is used as fallback when timeout_minutes is None
  • Only states exceeding their respective timeouts are saved

This test is essential for the per-node timeout feature mentioned in the PR objectives.


1-152: Excellent test coverage and structure.

The test suite comprehensively covers:

  1. ✅ Multiple timeout scenarios (per-node and global)
  2. ✅ Negative case (no timeouts)
  3. ✅ Exception handling
  4. ✅ Custom timeout precedence

The tests use realistic timestamps (addressed from previous review) and clear assertions. Well done!

@NiveditJain
Copy link
Member

@agam1092005 lets also fix the failing tests. Reviewing PR in greater detail meanwhile.

Comment on lines 17 to 33
data = await State.get_pymongo_collection().find_one_and_update(
{
"namespace_name": namespace_name,
"status": StateStatusEnum.CREATED,
"node_name": {
"$in": nodes
},
"enqueue_after": {"$lte": int(time.time() * 1000)}
"enqueue_after": {"$lte": current_time_ms}
},
{
"$set": {"status": StateStatusEnum.QUEUED}
"$set": {
"status": StateStatusEnum.QUEUED,
"queued_at": current_time_ms
}
},
return_document=ReturnDocument.AFTER
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The query seems to be correct, however how will we repick timed-out states?

Comment on lines 31 to 32
queued_at: Optional[int] = Field(None, description="Unix time in milliseconds when state was queued")
timeout_minutes: Optional[int] = Field(None, gt=0, description="Timeout in minutes for this specific state, taken from node registration")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a better model here is to store timeout_at pre-compute when would be the timeout and store that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while this model of periodic jobs will work, its unnecessary as we can write a database query to figure out timeout nodes, we probably do not need to set the status timeout just from if the status is Queued and current_time > timeout_at we can figure it.

Comment on lines +80 to +88
scheduler.add_job(
check_node_timeout,
CronTrigger.from_crontab("* * * * *"),
replace_existing=True,
misfire_grace_time=60,
coalesce=True,
max_instances=1,
id="check_node_timeout_task"
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed with db queries

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4ee52fd and 4be13c9.

📒 Files selected for processing (3)
  • state-manager/app/controller/enqueue_states.py (1 hunks)
  • state-manager/app/models/db/state.py (2 hunks)
  • state-manager/app/tasks/check_node_timeout.py (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
state-manager/app/tasks/check_node_timeout.py (3)
state-manager/app/models/db/state.py (1)
  • State (13-116)
state-manager/app/models/state_status_enum.py (1)
  • StateStatusEnum (4-21)
state-manager/app/singletons/logs_manager.py (2)
  • LogsManager (9-66)
  • get_logger (65-66)
state-manager/app/controller/enqueue_states.py (3)
state-manager/app/config/settings.py (1)
  • get_settings (34-38)
state-manager/app/models/db/state.py (1)
  • State (13-116)
state-manager/app/models/state_status_enum.py (1)
  • StateStatusEnum (4-21)
🔇 Additional comments (5)
state-manager/app/models/db/state.py (2)

31-33: LGTM! Timeout fields correctly implement pre-computed timeout strategy.

The three fields work together effectively:

  • timeout_minutes stores per-state timeout configuration with proper validation (gt=0)
  • queued_at records when state enters QUEUED status
  • timeout_at stores pre-computed expiration timestamp for efficient queries

This design follows the reviewer's suggestion to pre-compute timeout_at rather than calculating it at query time, which improves query performance and simplifies the timeout detection logic.


109-115: LGTM! Index ordering is optimal for timeout queries.

The compound index (status, timeout_at) correctly supports the timeout detection query with:

  • status as the first field (equality filter on QUEUED)
  • timeout_at as the second field (range filter <= current_time_ms)

This ordering allows MongoDB to efficiently scan only QUEUED states and then apply the timeout threshold, which is the optimal access pattern for the check_node_timeout task.

state-manager/app/tasks/check_node_timeout.py (1)

9-31: LGTM! Efficient bulk update implementation.

The function correctly uses a single update_many operation to mark timed-out states in bulk, which is much more efficient than fetching states individually and updating them one by one. The query properly leverages the timeout_query_index on (status, timeout_at) for optimal performance.

The implementation follows the reviewer's guidance to use a database query to identify timeout nodes rather than fetching and filtering in application code.

state-manager/app/controller/enqueue_states.py (2)

17-18: Good practice: Timestamp captured once for consistency.

Pre-computing current_time_ms once and reusing it throughout the function ensures consistency between the query filter and the values being set, addressing the concern raised in previous reviews about potential timing inconsistencies.


61-84: LGTM! Aggregation pipeline correctly implements timeout calculation.

The aggregation pipeline properly:

  1. Sets queued_at to the current timestamp
  2. Calculates timeout_at using $add and $multiply with the correct conversion factor (60000 ms/minute)
  3. Falls back to the global node_timeout_minutes setting when per-state timeout_minutes is null using $ifNull

This approach avoids database joins (as requested by the reviewer) by computing timeout_at inline during the state transition to QUEUED, using either the state's own timeout_minutes or the global default.

Comment on lines +16 to +27
result = await State.get_pymongo_collection().update_many(
{
"status": StateStatusEnum.QUEUED,
"timeout_at": {"$ne": None, "$lte": current_time_ms}
},
{
"$set": {
"status": StateStatusEnum.TIMEDOUT,
"error": "Node execution timed out"
}
}
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Verify retry/recovery mechanism for TIMEDOUT states.

The implementation correctly marks states as TIMEDOUT, but there's no visibility into how these states are subsequently handled. As noted in previous review comments, it's important to understand how timed-out states are re-picked or retried.

Run the following script to check if there's a retry mechanism for TIMEDOUT states:


🏁 Script executed:

#!/bin/bash
# Description: Search for retry/recovery logic for TIMEDOUT states

# Search for TIMEDOUT status handling
rg -n "TIMEDOUT" --type=py -C 5 | rg -i "retry|recover|manual"

# Search for state status transitions from TIMEDOUT
ast-grep --pattern $'StateStatusEnum.TIMEDOUT'

# Check if TIMEDOUT states can be manually retried or recovered
rg -n "manual_retry|retry_state" --type=py -A 10 | rg -i "timedout"

Length of output: 484


Implement retry or recovery for TIMEDOUT states. I didn’t find any logic that re-queues or retries tasks once their status is set to TIMEDOUT. Add a mechanism (e.g., re-pick policy, manual recovery endpoint, or scheduled retry) in state-manager/app/tasks/check_node_timeout.py to handle these states.

🤖 Prompt for AI Agents
In state-manager/app/tasks/check_node_timeout.py around lines 16 to 27, the code
marks timed-out states as TIMEDOUT but provides no recovery; implement an
automatic retry path: change the logic to (1) for timed-out docs with
retry_count < MAX_RETRIES, set status back to QUEUED, increment retry_count,
clear error and set a next_attempt_at or timeout_at to now + backoff_ms; (2) for
timed-out docs with retry_count >= MAX_RETRIES, set status to TIMEDOUT and leave
the error as final; (3) define MAX_RETRIES and backoff policy as config
constants and ensure updates are done atomically (use update_many with $inc and
$set filters or two distinct update_many calls filtered by retry_count) so tasks
are re-queued automatically while preventing infinite retries.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
state-manager/app/main.py (1)

87-95: Verify necessity given past review feedback.

A previous review comment on these lines stated "not needed with db queries." However, this scheduled job registration appears necessary to trigger the timeout checks periodically. Without it, the check_node_timeout task would never execute.

Please clarify:

  • Is there alternative/duplicate timeout checking logic elsewhere?
  • Was the previous comment resolved or does it still apply?
📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4be13c9 and 9839bc6.

📒 Files selected for processing (2)
  • state-manager/app/config/settings.py (2 hunks)
  • state-manager/app/main.py (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
state-manager/app/main.py (1)
state-manager/app/tasks/check_node_timeout.py (1)
  • check_node_timeout (9-33)
🔇 Additional comments (2)
state-manager/app/main.py (1)

41-41: LGTM: Import is correctly placed.

The import is appropriately positioned alongside other task imports.

state-manager/app/config/settings.py (1)

16-16: LGTM: Validation correctly implemented.

The field definition properly includes gt=0 validation as requested in previous feedback, ensuring only positive timeout values are accepted.

Comment on lines 26 to 27
node_timeout_minutes=os.getenv("NODE_TIMEOUT_MINUTES", "30") # type: ignore
trigger_workers=int(os.getenv("TRIGGER_WORKERS", 1)), # type: ignore
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Critical syntax error: Missing comma after line 26.

Line 26 is missing a trailing comma, which will cause a Python syntax error when parsing this constructor call.

Additionally, line 27 uses explicit int() casting while line 26 correctly passes a string to let Pydantic handle type conversion and validation. This inconsistency was flagged in previous reviews.

Apply this diff to fix both issues:

-            node_timeout_minutes=os.getenv("NODE_TIMEOUT_MINUTES", "30") # type: ignore
-            trigger_workers=int(os.getenv("TRIGGER_WORKERS", 1)), # type: ignore
+            node_timeout_minutes=os.getenv("NODE_TIMEOUT_MINUTES", "30"), # type: ignore
+            trigger_workers=os.getenv("TRIGGER_WORKERS", "1"), # type: ignore
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
node_timeout_minutes=os.getenv("NODE_TIMEOUT_MINUTES", "30") # type: ignore
trigger_workers=int(os.getenv("TRIGGER_WORKERS", 1)), # type: ignore
node_timeout_minutes=os.getenv("NODE_TIMEOUT_MINUTES", "30"), # type: ignore
trigger_workers=os.getenv("TRIGGER_WORKERS", "1"), # type: ignore
🤖 Prompt for AI Agents
In state-manager/app/config/settings.py around lines 26 to 27, add the missing
trailing comma at the end of the node_timeout_minutes=... line and make
trigger_workers consistent by removing the explicit int() cast and passing the
environment value as a string (e.g. os.getenv("TRIGGER_WORKERS", "1")) so
Pydantic handles type conversion; ensure both lines end with commas and keep any
existing type-ignore comments as needed.

Updated the default value of node_timeout_minutes to enhance the timeout duration for nodes stuck in QUEUED status, improving system resilience and performance.
… enhancing state attributes

- Removed the unused pipeline for calculating timeout_at in find_state function.
- Added timeout_at and timeout_minutes attributes to the state model for better state management.
- Updated errored_state and manual_retry_state functions to include timeout_at in state creation.
- Set timeout_at and queued_at to None in re_queue_after_signal function to reset state attributes upon re-queuing.
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between de475b8 and 2c7f3c1.

📒 Files selected for processing (5)
  • state-manager/app/controller/enqueue_states.py (1 hunks)
  • state-manager/app/controller/errored_state.py (1 hunks)
  • state-manager/app/controller/manual_retry_state.py (1 hunks)
  • state-manager/app/controller/re_queue_after_signal.py (1 hunks)
  • state-manager/app/models/db/state.py (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
state-manager/app/controller/enqueue_states.py (2)
state-manager/app/config/settings.py (1)
  • get_settings (36-40)
state-manager/app/models/db/state.py (1)
  • State (13-116)
🔇 Additional comments (7)
state-manager/app/controller/re_queue_after_signal.py (1)

24-25: LGTM: Timeout tracking correctly reset for re-queue.

Clearing timeout_at and queued_at ensures fresh timeout calculation when the state is re-enqueued, while preserving timeout_minutes (the static configuration). This aligns with the enqueue logic that recalculates timeout_at from timeout_minutes or the global fallback.

state-manager/app/models/db/state.py (2)

31-33: LGTM: Timeout fields are well-defined.

The three new fields are correctly specified:

  • queued_at and timeout_at track runtime timestamps (optional, default None)
  • timeout_minutes is validated with gt=0 to ensure positive values
  • All fields are optional for backward compatibility

109-115: The index is already being used efficiently; the query already pushes the threshold to MongoDB.

The implementation in check_node_timeout.py (lines 16-19) already correctly includes both required conditions in the MongoDB query:

  • "status": StateStatusEnum.QUEUED (equality filter)
  • "timeout_at": {"$ne": None, "$lte": current_time_ms} (threshold comparison)

The compound index (status, 1), (timeout_at, 1) is correctly ordered to support this query pattern. The threshold comparison is pushed to the database (not filtered in Python), so the index is already being leveraged efficiently. No changes are needed.

Regarding "repicking timed-out states": TIMEDOUT is a terminal status with no retry or requeue logic, similar to ERRORED and CANCELLED. If automatic retry for timeouts is desired, that would be a separate feature request requiring explicit state transition logic.

Likely an incorrect or invalid review comment.

state-manager/app/controller/manual_retry_state.py (1)

34-35: LGTM: Manual retry correctly preserves timeout configuration.

Preserving only timeout_minutes (not timeout_at) is the correct approach. This allows the enqueue logic to calculate a fresh timeout_at deadline for the retry state, ensuring adequate execution time.

This contrasts with the issue in errored_state.py (line 57) where timeout_at is incorrectly preserved, causing automatic retries to inherit stale deadlines.

state-manager/app/controller/enqueue_states.py (3)

17-18: LGTM: Timestamp consistency fixed.

Fetching current_time_ms once and reusing it for both the query filter and the update pipeline ensures consistency and addresses the previous review concern about timestamp being fetched twice.


24-25: LGTM: Query filter is correct.

The filter correctly identifies states ready for enqueuing by checking enqueue_after against the current timestamp.


27-50: Aggregation pipeline correctly implements atomic timeout calculation.

The pipeline atomically updates three fields:

  1. Sets status to QUEUED
  2. Records queued_at timestamp
  3. Calculates timeout_at using per-state timeout_minutes or falling back to global node_timeout_minutes

The timeout calculation is correct: current_time_ms + (timeout_minutes * 60000).

However, regarding the reviewer's question "how will we repick timed-out states?": the current query filters for status == CREATED, which means states marked TIMEDOUT will not be automatically re-enqueued. Please clarify:

  1. Is this intentional? Should TIMEDOUT states require explicit manual retry or re-queue?
  2. Or should they be automatically retried? If so, you'll need to either:
    • Update the query to include StateStatusEnum.TIMEDOUT in the status filter, OR
    • Have check_node_timeout transition timed-out states to ERRORED instead of TIMEDOUT to leverage existing retry logic, OR
    • Add explicit retry logic for TIMEDOUT states similar to ERRORED states

The current design treats TIMEDOUT as a terminal state requiring manual intervention, which may or may not be the intended behavior.

Comment on lines +56 to +57
fanout_id=state.fanout_id,
timeout_at=state.timeout_at
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Retry states should get a fresh timeout window, not inherit the original deadline.

Preserving timeout_at=state.timeout_at means the retry state inherits the original timeout deadline. If the errored state was close to timing out, the retry could immediately timeout without adequate execution time.

Compare with manual_retry_state.py (line 35), which only preserves timeout_minutes and lets the enqueue logic recalculate a fresh timeout_at. This creates an inconsistency: manual retries get fresh timeouts, but automatic retries inherit stale deadlines.

Apply this diff to give retry states a fresh timeout window:

                retry_state = State(
                    node_name=state.node_name,
                    namespace_name=state.namespace_name,
                    identifier=state.identifier,
                    graph_name=state.graph_name,
                    run_id=state.run_id,
                    status=StateStatusEnum.CREATED,
                    inputs=state.inputs,
                    outputs={},
                    error=None,
                    parents=state.parents,
                    does_unites=state.does_unites,
                    enqueue_after= int(time.time() * 1000) + graph_template.retry_policy.compute_delay(state.retry_count + 1),
                    retry_count=state.retry_count + 1,
                    fanout_id=state.fanout_id,
-                    timeout_at=state.timeout_at
+                    timeout_minutes=state.timeout_minutes
                )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fanout_id=state.fanout_id,
timeout_at=state.timeout_at
fanout_id=state.fanout_id,
timeout_minutes=state.timeout_minutes
🤖 Prompt for AI Agents
In state-manager/app/controller/errored_state.py around lines 56-57, the retry
state is incorrectly inheriting the original deadline via
timeout_at=state.timeout_at; change this to preserve only timeout_minutes (e.g.,
pass timeout_minutes=state.timeout_minutes) and remove timeout_at so the enqueue
logic recalculates a fresh timeout_at (keep fanout_id as-is), making automatic
retries receive a new timeout window consistent with manual_retry_state.py line
35.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add node level timeouts

2 participants