Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions state-manager/app/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class Settings(BaseModel):
state_manager_secret: str = Field(..., description="Secret key for API authentication")
secrets_encryption_key: str = Field(..., description="Key for encrypting secrets")
trigger_workers: int = Field(default=1, description="Number of workers to run the trigger cron")
node_timeout_minutes: int = Field(default=60, gt=0, description="Timeout in minutes for nodes stuck in QUEUED status")
trigger_retention_hours: int = Field(default=720, description="Number of hours to retain completed/failed triggers before cleanup")

@classmethod
Expand All @@ -22,6 +23,7 @@ def from_env(cls) -> "Settings":
mongo_database_name=os.getenv("MONGO_DATABASE_NAME", "exosphere-state-manager"), # type: ignore
state_manager_secret=os.getenv("STATE_MANAGER_SECRET"), # type: ignore
secrets_encryption_key=os.getenv("SECRETS_ENCRYPTION_KEY"), # type: ignore
node_timeout_minutes=int(os.getenv("NODE_TIMEOUT_MINUTES", 60)), # type: ignore
trigger_workers=int(os.getenv("TRIGGER_WORKERS", 1)), # type: ignore
trigger_retention_hours=int(os.getenv("TRIGGER_RETENTION_HOURS", 720)) # type: ignore
)
Expand Down
37 changes: 30 additions & 7 deletions state-manager/app/controller/enqueue_states.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,47 @@
from ..models.state_status_enum import StateStatusEnum

from app.singletons.logs_manager import LogsManager
from app.config.settings import get_settings
from pymongo import ReturnDocument

logger = LogsManager().get_logger()


async def find_state(namespace_name: str, nodes: list[str]) -> State | None:
current_time_ms = int(time.time() * 1000)
settings = get_settings()

data = await State.get_pymongo_collection().find_one_and_update(
{
"namespace_name": namespace_name,
"status": StateStatusEnum.CREATED,
"node_name": {
"$in": nodes
},
"enqueue_after": {"$lte": int(time.time() * 1000)}
},
{
"$set": {"status": StateStatusEnum.QUEUED}
"node_name": {"$in": nodes},
"enqueue_after": {"$lte": current_time_ms}
},
[
{
"$set": {
"status": StateStatusEnum.QUEUED,
"queued_at": current_time_ms,
"timeout_at": {
"$add": [
current_time_ms,
{
"$multiply": [
{
"$ifNull": [
"$timeout_minutes",
settings.node_timeout_minutes
]
},
60000 # Convert minutes to milliseconds
]
}
]
}
}
}
],
return_document=ReturnDocument.AFTER
)
return State(**data) if data else None
Expand Down
3 changes: 2 additions & 1 deletion state-manager/app/controller/errored_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ async def errored_state(namespace_name: str, state_id: PydanticObjectId, body: E
does_unites=state.does_unites,
enqueue_after= int(time.time() * 1000) + graph_template.retry_policy.compute_delay(state.retry_count + 1),
retry_count=state.retry_count + 1,
fanout_id=state.fanout_id
fanout_id=state.fanout_id,
timeout_at=state.timeout_at
Comment on lines +56 to +57
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Retry states should get a fresh timeout window, not inherit the original deadline.

Preserving timeout_at=state.timeout_at means the retry state inherits the original timeout deadline. If the errored state was close to timing out, the retry could immediately timeout without adequate execution time.

Compare with manual_retry_state.py (line 35), which only preserves timeout_minutes and lets the enqueue logic recalculate a fresh timeout_at. This creates an inconsistency: manual retries get fresh timeouts, but automatic retries inherit stale deadlines.

Apply this diff to give retry states a fresh timeout window:

                retry_state = State(
                    node_name=state.node_name,
                    namespace_name=state.namespace_name,
                    identifier=state.identifier,
                    graph_name=state.graph_name,
                    run_id=state.run_id,
                    status=StateStatusEnum.CREATED,
                    inputs=state.inputs,
                    outputs={},
                    error=None,
                    parents=state.parents,
                    does_unites=state.does_unites,
                    enqueue_after= int(time.time() * 1000) + graph_template.retry_policy.compute_delay(state.retry_count + 1),
                    retry_count=state.retry_count + 1,
                    fanout_id=state.fanout_id,
-                    timeout_at=state.timeout_at
+                    timeout_minutes=state.timeout_minutes
                )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fanout_id=state.fanout_id,
timeout_at=state.timeout_at
fanout_id=state.fanout_id,
timeout_minutes=state.timeout_minutes
🤖 Prompt for AI Agents
In state-manager/app/controller/errored_state.py around lines 56-57, the retry
state is incorrectly inheriting the original deadline via
timeout_at=state.timeout_at; change this to preserve only timeout_minutes (e.g.,
pass timeout_minutes=state.timeout_minutes) and remove timeout_at so the enqueue
logic recalculates a fresh timeout_at (keep fanout_id as-is), making automatic
retries receive a new timeout window consistent with manual_retry_state.py line
35.

)
retry_state = await retry_state.insert()
logger.info(f"Retry state {retry_state.id} created for state {state_id}", x_exosphere_request_id=x_exosphere_request_id)
Expand Down
3 changes: 2 additions & 1 deletion state-manager/app/controller/manual_retry_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ async def manual_retry_state(namespace_name: str, state_id: PydanticObjectId, bo
parents=state.parents,
does_unites=state.does_unites,
fanout_id=body.fanout_id, # this will ensure that multiple unwanted retries are not formed because of index in database
manual_retry_fanout_id=body.fanout_id # This is included in the state fingerprint to allow unique manual retries of unite nodes.
manual_retry_fanout_id=body.fanout_id, # This is included in the state fingerprint to allow unique manual retries of unite nodes.
timeout_minutes=state.timeout_minutes
)
retry_state = await retry_state.insert()
logger.info(f"Retry state {retry_state.id} created for state {state_id}", x_exosphere_request_id=x_exosphere_request_id)
Expand Down
2 changes: 2 additions & 0 deletions state-manager/app/controller/re_queue_after_signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ async def re_queue_after_signal(namespace_name: str, state_id: PydanticObjectId,

state.status = StateStatusEnum.CREATED
state.enqueue_after = int(time.time() * 1000) + body.enqueue_after
state.timeout_at = None
state.queued_at = None
await state.save()

return SignalResponseModel(status=StateStatusEnum.CREATED, enqueue_after=state.enqueue_after)
Expand Down
9 changes: 6 additions & 3 deletions state-manager/app/controller/register_nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ async def register_nodes(namespace_name: str, body: RegisterNodesRequestModel, x
RegisteredNode.runtime_namespace: namespace_name,
RegisteredNode.inputs_schema: node_data.inputs_schema, # type: ignore
RegisteredNode.outputs_schema: node_data.outputs_schema, # type: ignore
RegisteredNode.secrets: node_data.secrets # type: ignore
RegisteredNode.secrets: node_data.secrets, # type: ignore
RegisteredNode.timeout_minutes: node_data.timeout_minutes # type: ignore
}))
logger.info(f"Updated existing node {node_data.name} in namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id)

Expand All @@ -44,7 +45,8 @@ async def register_nodes(namespace_name: str, body: RegisterNodesRequestModel, x
runtime_namespace=namespace_name,
inputs_schema=node_data.inputs_schema,
outputs_schema=node_data.outputs_schema,
secrets=node_data.secrets
secrets=node_data.secrets,
timeout_minutes=node_data.timeout_minutes
)
await new_node.insert()
logger.info(f"Created new node {node_data.name} in namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id)
Expand All @@ -54,7 +56,8 @@ async def register_nodes(namespace_name: str, body: RegisterNodesRequestModel, x
name=node_data.name,
inputs_schema=node_data.inputs_schema,
outputs_schema=node_data.outputs_schema,
secrets=node_data.secrets
secrets=node_data.secrets,
timeout_minutes=node_data.timeout_minutes
)
)

Expand Down
15 changes: 14 additions & 1 deletion state-manager/app/controller/trigger_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
from app.models.db.store import Store
from app.models.db.run import Run
from app.models.db.graph_template_model import GraphTemplate
from app.models.db.registered_node import RegisteredNode
from app.models.node_template_model import NodeTemplate
from app.models.dependent_string import DependentString
from app.config.settings import get_settings

import uuid
import time
Expand Down Expand Up @@ -91,6 +93,16 @@ async def trigger_graph(namespace_name: str, graph_name: str, body: TriggerGraph
if len(new_stores) > 0:
await Store.insert_many(new_stores)

# Get node timeout setting
registered_node = await RegisteredNode.get_by_name_and_namespace(root.node_name, root.namespace)
timeout_minutes = None
if registered_node and registered_node.timeout_minutes:
timeout_minutes = registered_node.timeout_minutes
else:
# Fall back to global setting
settings = get_settings()
timeout_minutes = settings.node_timeout_minutes

new_state = State(
node_name=root.node_name,
namespace_name=namespace_name,
Expand All @@ -101,7 +113,8 @@ async def trigger_graph(namespace_name: str, graph_name: str, body: TriggerGraph
enqueue_after=int(time.time() * 1000) + body.start_delay,
inputs=inputs,
outputs={},
error=None
error=None,
timeout_minutes=timeout_minutes
)
await new_state.insert()

Expand Down
10 changes: 10 additions & 0 deletions state-manager/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from .tasks.trigger_cron import trigger_cron
from .tasks.check_node_timeout import check_node_timeout

# init tasks
from .tasks.init_tasks import init_tasks
Expand Down Expand Up @@ -83,6 +84,15 @@ async def lifespan(app: FastAPI):
max_instances=1,
id="every_minute_task"
)
scheduler.add_job(
check_node_timeout,
CronTrigger.from_crontab("* * * * *"),
replace_existing=True,
misfire_grace_time=60,
coalesce=True,
max_instances=1,
id="check_node_timeout_task"
)
Comment on lines +87 to +95
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed with db queries

scheduler.start()

# main logic of the server
Expand Down
3 changes: 2 additions & 1 deletion state-manager/app/models/db/registered_node.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from .base import BaseDatabaseModel
from pydantic import Field
from typing import Any
from typing import Any, Optional
from pymongo import IndexModel
from ..node_template_model import NodeTemplate

Expand All @@ -13,6 +13,7 @@ class RegisteredNode(BaseDatabaseModel):
inputs_schema: dict[str, Any] = Field(..., description="JSON schema for node inputs")
outputs_schema: dict[str, Any] = Field(..., description="JSON schema for node outputs")
secrets: list[str] = Field(default_factory=list, description="List of secrets that the node uses")
timeout_minutes: Optional[int] = Field(None, gt=0, description="Timeout in minutes for this node. Falls back to global setting if not provided")

class Settings:
indexes = [
Expand Down
10 changes: 10 additions & 0 deletions state-manager/app/models/db/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ class State(BaseDatabaseModel):
retry_count: int = Field(default=0, description="Number of times the state has been retried")
fanout_id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Fanout ID of the state")
manual_retry_fanout_id: str = Field(default="", description="Fanout ID from a manual retry request, ensuring unique retries for unite nodes.")
queued_at: Optional[int] = Field(default=None, description="Unix time in milliseconds when state was queued")
timeout_at: Optional[int] = Field(default=None, description="Unix time in milliseconds when state times out")
timeout_minutes: Optional[int] = Field(default=None, gt=0, description="Timeout in minutes for this specific state, taken from node registration")

@before_event([Insert, Replace, Save])
def _generate_fingerprint(self):
Expand Down Expand Up @@ -102,5 +105,12 @@ class Settings:
("status", 1),
],
name="run_id_status_index"
),
IndexModel(
[
("status", 1),
("timeout_at", 1),
],
name="timeout_query_index"
)
]
3 changes: 2 additions & 1 deletion state-manager/app/models/register_nodes_request.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from pydantic import BaseModel, Field
from typing import Any, List
from typing import Any, List, Optional


class NodeRegistrationModel(BaseModel):
name: str = Field(..., description="Unique name of the node")
inputs_schema: dict[str, Any] = Field(..., description="JSON schema for node inputs")
outputs_schema: dict[str, Any] = Field(..., description="JSON schema for node outputs")
secrets: List[str] = Field(..., description="List of secrets that the node uses")
timeout_minutes: Optional[int] = Field(None, gt=0, description="Timeout in minutes for this node. Falls back to global setting if not provided")


class RegisterNodesRequestModel(BaseModel):
Expand Down
3 changes: 2 additions & 1 deletion state-manager/app/models/register_nodes_response.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from pydantic import BaseModel, Field
from typing import Any, List
from typing import Any, List, Optional


class RegisteredNodeModel(BaseModel):
name: str = Field(..., description="Name of the registered node")
inputs_schema: dict[str, Any] = Field(..., description="Inputs for the registered node")
outputs_schema: dict[str, Any] = Field(..., description="Outputs for the registered node")
secrets: List[str] = Field(..., description="List of secrets that the node uses")
timeout_minutes: Optional[int] = Field(None, gt=0, description="Timeout in minutes for this node. Falls back to global setting if not provided")


class RegisterNodesResponseModel(BaseModel):
Expand Down
1 change: 1 addition & 0 deletions state-manager/app/models/state_status_enum.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class StateStatusEnum(str, Enum):
# Errored
ERRORED = 'ERRORED'
NEXT_CREATED_ERROR = 'NEXT_CREATED_ERROR'
TIMEDOUT = 'TIMEDOUT'

# Success
SUCCESS = 'SUCCESS'
Expand Down
33 changes: 33 additions & 0 deletions state-manager/app/tasks/check_node_timeout.py
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while this model of periodic jobs will work, its unnecessary as we can write a database query to figure out timeout nodes, we probably do not need to set the status timeout just from if the status is Queued and current_time > timeout_at we can figure it.

Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import time
from app.models.db.state import State
from app.models.state_status_enum import StateStatusEnum
from app.singletons.logs_manager import LogsManager

logger = LogsManager().get_logger()


async def check_node_timeout():
try:
current_time_ms = int(time.time() * 1000)

logger.info(f"Checking for timed out nodes at {current_time_ms}")

# Use database query to find and update timed out states in one operation
result = await State.get_pymongo_collection().update_many(
{
"status": StateStatusEnum.QUEUED,
"timeout_at": {"$ne": None, "$lte": current_time_ms}
},
{
"$set": {
"status": StateStatusEnum.TIMEDOUT,
"error": "Node execution timed out"
}
}
)
Comment on lines +16 to +27
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Verify retry/recovery mechanism for TIMEDOUT states.

The implementation correctly marks states as TIMEDOUT, but there's no visibility into how these states are subsequently handled. As noted in previous review comments, it's important to understand how timed-out states are re-picked or retried.

Run the following script to check if there's a retry mechanism for TIMEDOUT states:


🏁 Script executed:

#!/bin/bash
# Description: Search for retry/recovery logic for TIMEDOUT states

# Search for TIMEDOUT status handling
rg -n "TIMEDOUT" --type=py -C 5 | rg -i "retry|recover|manual"

# Search for state status transitions from TIMEDOUT
ast-grep --pattern $'StateStatusEnum.TIMEDOUT'

# Check if TIMEDOUT states can be manually retried or recovered
rg -n "manual_retry|retry_state" --type=py -A 10 | rg -i "timedout"

Length of output: 484


Implement retry or recovery for TIMEDOUT states. I didn’t find any logic that re-queues or retries tasks once their status is set to TIMEDOUT. Add a mechanism (e.g., re-pick policy, manual recovery endpoint, or scheduled retry) in state-manager/app/tasks/check_node_timeout.py to handle these states.

🤖 Prompt for AI Agents
In state-manager/app/tasks/check_node_timeout.py around lines 16 to 27, the code
marks timed-out states as TIMEDOUT but provides no recovery; implement an
automatic retry path: change the logic to (1) for timed-out docs with
retry_count < MAX_RETRIES, set status back to QUEUED, increment retry_count,
clear error and set a next_attempt_at or timeout_at to now + backoff_ms; (2) for
timed-out docs with retry_count >= MAX_RETRIES, set status to TIMEDOUT and leave
the error as final; (3) define MAX_RETRIES and backoff policy as config
constants and ensure updates are done atomically (use update_many with $inc and
$set filters or two distinct update_many calls filtered by retry_count) so tasks
are re-queued automatically while preventing infinite retries.


if result.modified_count > 0:
logger.info(f"Marked {result.modified_count} states as TIMEDOUT")

except Exception:
logger.error("Error checking node timeout", exc_info=True)
14 changes: 13 additions & 1 deletion state-manager/app/tasks/create_next_states.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from app.models.db.store import Store
from app.models.dependent_string import DependentString
from app.models.node_template_model import UnitesStrategyEnum
from app.config.settings import get_settings
from json_schema_to_pydantic import create_model
from pydantic import BaseModel
from typing import Type
Expand Down Expand Up @@ -162,6 +163,16 @@ async def generate_next_state(next_state_input_model: Type[BaseModel], next_stat
current_state.identifier: current_state.id
}

# Get timeout for this node
registered_node = await get_registered_node(next_state_node_template)
timeout_minutes = None
if registered_node.timeout_minutes:
timeout_minutes = registered_node.timeout_minutes
else:
# Fall back to global setting
settings = get_settings()
timeout_minutes = settings.node_timeout_minutes

return State(
node_name=next_state_node_template.node_name,
identifier=next_state_node_template.identifier,
Expand All @@ -173,7 +184,8 @@ async def generate_next_state(next_state_input_model: Type[BaseModel], next_stat
outputs={},
does_unites=next_state_node_template.unites is not None,
run_id=current_state.run_id,
error=None
error=None,
timeout_minutes=timeout_minutes
)

current_states = await State.find(
Expand Down
Loading
Loading