-
Notifications
You must be signed in to change notification settings - Fork 41
feat: add node-level timeouts to prevent stuck queued states #462
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
735cd6d
4303a3d
8634281
4ee52fd
4be13c9
9839bc6
de475b8
2c7f3c1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -38,6 +38,7 @@ | |
| from apscheduler.schedulers.asyncio import AsyncIOScheduler | ||
| from apscheduler.triggers.cron import CronTrigger | ||
| from .tasks.trigger_cron import trigger_cron | ||
| from .tasks.check_node_timeout import check_node_timeout | ||
|
|
||
| # init tasks | ||
| from .tasks.init_tasks import init_tasks | ||
|
|
@@ -83,6 +84,15 @@ async def lifespan(app: FastAPI): | |
| max_instances=1, | ||
| id="every_minute_task" | ||
| ) | ||
| scheduler.add_job( | ||
| check_node_timeout, | ||
| CronTrigger.from_crontab("* * * * *"), | ||
| replace_existing=True, | ||
| misfire_grace_time=60, | ||
| coalesce=True, | ||
| max_instances=1, | ||
| id="check_node_timeout_task" | ||
| ) | ||
|
Comment on lines
+87
to
+95
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not needed with db queries |
||
| scheduler.start() | ||
|
|
||
| # main logic of the server | ||
|
|
||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. while this model of periodic jobs will work, its unnecessary as we can write a database query to figure out timeout nodes, we probably do not need to set the status timeout just from if the status is Queued and current_time > timeout_at we can figure it. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,33 @@ | ||
| import time | ||
| from app.models.db.state import State | ||
| from app.models.state_status_enum import StateStatusEnum | ||
| from app.singletons.logs_manager import LogsManager | ||
|
|
||
| logger = LogsManager().get_logger() | ||
|
|
||
|
|
||
| async def check_node_timeout(): | ||
| try: | ||
| current_time_ms = int(time.time() * 1000) | ||
|
|
||
| logger.info(f"Checking for timed out nodes at {current_time_ms}") | ||
|
|
||
| # Use database query to find and update timed out states in one operation | ||
| result = await State.get_pymongo_collection().update_many( | ||
| { | ||
| "status": StateStatusEnum.QUEUED, | ||
| "timeout_at": {"$ne": None, "$lte": current_time_ms} | ||
| }, | ||
| { | ||
| "$set": { | ||
| "status": StateStatusEnum.TIMEDOUT, | ||
| "error": "Node execution timed out" | ||
| } | ||
| } | ||
| ) | ||
|
Comment on lines
+16
to
+27
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chainVerify retry/recovery mechanism for TIMEDOUT states. The implementation correctly marks states as Run the following script to check if there's a retry mechanism for TIMEDOUT states: 🏁 Script executed: #!/bin/bash
# Description: Search for retry/recovery logic for TIMEDOUT states
# Search for TIMEDOUT status handling
rg -n "TIMEDOUT" --type=py -C 5 | rg -i "retry|recover|manual"
# Search for state status transitions from TIMEDOUT
ast-grep --pattern $'StateStatusEnum.TIMEDOUT'
# Check if TIMEDOUT states can be manually retried or recovered
rg -n "manual_retry|retry_state" --type=py -A 10 | rg -i "timedout"Length of output: 484 Implement retry or recovery for TIMEDOUT states. I didn’t find any logic that re-queues or retries tasks once their status is set to 🤖 Prompt for AI Agents |
||
|
|
||
| if result.modified_count > 0: | ||
| logger.info(f"Marked {result.modified_count} states as TIMEDOUT") | ||
|
|
||
| except Exception: | ||
| logger.error("Error checking node timeout", exc_info=True) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Retry states should get a fresh timeout window, not inherit the original deadline.
Preserving
timeout_at=state.timeout_atmeans the retry state inherits the original timeout deadline. If the errored state was close to timing out, the retry could immediately timeout without adequate execution time.Compare with
manual_retry_state.py(line 35), which only preservestimeout_minutesand lets the enqueue logic recalculate a freshtimeout_at. This creates an inconsistency: manual retries get fresh timeouts, but automatic retries inherit stale deadlines.Apply this diff to give retry states a fresh timeout window:
retry_state = State( node_name=state.node_name, namespace_name=state.namespace_name, identifier=state.identifier, graph_name=state.graph_name, run_id=state.run_id, status=StateStatusEnum.CREATED, inputs=state.inputs, outputs={}, error=None, parents=state.parents, does_unites=state.does_unites, enqueue_after= int(time.time() * 1000) + graph_template.retry_policy.compute_delay(state.retry_count + 1), retry_count=state.retry_count + 1, fanout_id=state.fanout_id, - timeout_at=state.timeout_at + timeout_minutes=state.timeout_minutes )📝 Committable suggestion
🤖 Prompt for AI Agents