Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions state-manager/app/controller/cancel_triggers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""
Controller for cancelling pending triggers for a graph
"""
import asyncio
from app.models.cancel_trigger_models import CancelTriggerResponse
from app.models.db.trigger import DatabaseTriggers
from app.models.trigger_models import TriggerStatusEnum
from app.singletons.logs_manager import LogsManager
from app.config.settings import get_settings
from app.tasks.trigger_cron import mark_as_cancelled
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will not recommend importing from tasks in controller. This might lead to confusing dependencies going forward.

from beanie.operators import In

logger = LogsManager().get_logger()

async def cancel_triggers(namespace_name: str, graph_name: str, x_exosphere_request_id: str) -> CancelTriggerResponse:
"""
Cancel all pending or triggering triggers for a specific graph
Args:
namespace_name: The namespace of the graph
graph_name: The name of the graph
x_exosphere_request_id: Request ID for logging
Returns:
CancelTriggerResponse with cancellation details
"""
try:
logger.info(f"Request to cancel triggers for graph {graph_name} in namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id)

# Find all PENDING or TRIGGERING triggers for this graph
triggers = await DatabaseTriggers.find(
DatabaseTriggers.namespace == namespace_name,
DatabaseTriggers.graph_name == graph_name,
In(DatabaseTriggers.trigger_status, [TriggerStatusEnum.PENDING, TriggerStatusEnum.TRIGGERING])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a Database trigger is in TRIGGERING state, I don't think we should cancel in that state. Why? Because it might be immediately picked by the process to move to TRIGGERED. This will create an inconsistent state where actually the job is Triggered but it shows TRIGGERING.

).to_list()

if not triggers:
logger.info(f"No pending triggers found for graph {graph_name} in namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id)
return CancelTriggerResponse(
namespace=namespace_name,
graph_name=graph_name,
cancelled_count=0,
message="No pending triggers found to cancel"
)

# Get retention hours from settings
settings = get_settings()
retention_hours = settings.trigger_retention_hours
Comment on lines +46 to +48
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this here?


# Cancel each trigger concurrently
cancelled_count = len(triggers)
cancellation_tasks = [mark_as_cancelled(trigger, retention_hours) for trigger in triggers]
await asyncio.gather(*cancellation_tasks)
Comment on lines +50 to +53
Copy link
Contributor

@coderabbitai coderabbitai bot Oct 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Optional: Consider partial-failure visibility.

The current asyncio.gather uses fail-fast behavior: if any mark_as_cancelled call fails, the entire batch fails and the user receives no indication of how many triggers were successfully cancelled before the failure.

For better user experience, consider using return_exceptions=True to handle partial failures:

# Cancel each trigger concurrently
cancellation_tasks = [mark_as_cancelled(trigger, retention_hours) for trigger in triggers]
results = await asyncio.gather(*cancellation_tasks, return_exceptions=True)

# Count successful cancellations
cancelled_count = sum(1 for r in results if not isinstance(r, Exception))
failures = [r for r in results if isinstance(r, Exception)]

if failures:
    logger.warning(f"Failed to cancel {len(failures)} trigger(s) for graph {graph_name}", 
                   x_exosphere_request_id=x_exosphere_request_id)
    # Optionally: raise if no triggers were cancelled successfully

The current all-or-nothing approach is acceptable for ensuring consistency, but partial success information may be valuable for troubleshooting.

🤖 Prompt for AI Agents
In state-manager/app/controller/cancel_triggers.py around lines 50 to 53, the
code uses asyncio.gather without return_exceptions so a single failure aborts
the whole batch and hides partial successes; change to await
asyncio.gather(*tasks, return_exceptions=True), collect results, count
non-Exception results as cancelled_count, and log/collect Exception instances as
failures (warning with graph_name and request id), optionally raising only if
zero succeeded.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Kalra-V this suggestion seems valid. Lets incorporate this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this sends a number of queries, can we use some bulk update operation here? Also can we merge and do something like bulk get and update.


logger.info(f"Cancelled {cancelled_count} triggers for graph {graph_name} in namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id)

return CancelTriggerResponse(
namespace=namespace_name,
graph_name=graph_name,
cancelled_count=cancelled_count,
message=f"Successfully cancelled {cancelled_count} trigger(s)"
)

except Exception as e:
logger.error(f"Error cancelling triggers for graph {graph_name} in namespace {namespace_name}: {str(e)}", x_exosphere_request_id=x_exosphere_request_id)
raise

9 changes: 9 additions & 0 deletions state-manager/app/models/cancel_trigger_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from pydantic import BaseModel, Field


class CancelTriggerResponse(BaseModel):
namespace: str = Field(..., description="Namespace of the cancelled triggers")
graph_name: str = Field(..., description="Name of the graph")
cancelled_count: int = Field(..., description="Number of triggers that were cancelled")
message: str = Field(..., description="Human-readable message describing the result")

3 changes: 2 additions & 1 deletion state-manager/app/models/db/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ class Settings:
"trigger_status": {
"$in": [
TriggerStatusEnum.TRIGGERED,
TriggerStatusEnum.FAILED
TriggerStatusEnum.FAILED,
TriggerStatusEnum.CANCELLED
]
}
}
Expand Down
23 changes: 23 additions & 0 deletions state-manager/app/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@
from .models.manual_retry import ManualRetryRequestModel, ManualRetryResponseModel
from .controller.manual_retry_state import manual_retry_state

# cancel_triggers
from .models.cancel_trigger_models import CancelTriggerResponse
from .controller.cancel_triggers import cancel_triggers


logger = LogsManager().get_logger()

Expand Down Expand Up @@ -237,6 +241,25 @@ async def get_graph_template(namespace_name: str, graph_name: str, request: Requ
return await get_graph_template_controller(namespace_name, graph_name, x_exosphere_request_id)


@router.delete(
"/graph/{graph_name}/triggers",
response_model=CancelTriggerResponse,
status_code=status.HTTP_200_OK,
response_description="Triggers cancelled successfully",
tags=["graph"]
)
async def cancel_triggers_route(namespace_name: str, graph_name: str, request: Request, api_key: str = Depends(check_api_key)):
x_exosphere_request_id = getattr(request.state, "x_exosphere_request_id", str(uuid4()))

if api_key:
logger.info(f"API key is valid for namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id)
else:
logger.error(f"API key is invalid for namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id)
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid API key")

return await cancel_triggers(namespace_name, graph_name, x_exosphere_request_id)


@router.put(
"/nodes/",
response_model=RegisterNodesResponseModel,
Expand Down
13 changes: 12 additions & 1 deletion state-manager/app/tasks/trigger_cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,15 @@ async def trigger_cron():
cron_time = datetime.now()
settings = get_settings()
logger.info(f"starting trigger_cron: {cron_time}")
await asyncio.gather(*[handle_trigger(cron_time, settings.trigger_retention_hours) for _ in range(settings.trigger_workers)])
await asyncio.gather(*[handle_trigger(cron_time, settings.trigger_retention_hours) for _ in range(settings.trigger_workers)])

async def mark_as_cancelled(trigger: DatabaseTriggers, retention_hours: int):
expires_at = datetime.now(timezone.utc) + timedelta(hours=retention_hours)

await DatabaseTriggers.get_pymongo_collection().update_one(
{"_id": trigger.id},
{"$set": {
"trigger_status": TriggerStatusEnum.CANCELLED,
"expires_at": expires_at
}}
)
Comment on lines +103 to +112
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this here?

Loading