-
Notifications
You must be signed in to change notification settings - Fork 41
Enhancement/cancel triggers #505
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
ba9f69e
4c478eb
14097f6
569e31f
a94133d
03c460f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,67 @@ | ||
| """ | ||
| Controller for cancelling pending triggers for a graph | ||
| """ | ||
| import asyncio | ||
| from app.models.cancel_trigger_models import CancelTriggerResponse | ||
| from app.models.db.trigger import DatabaseTriggers | ||
| from app.models.trigger_models import TriggerStatusEnum | ||
| from app.singletons.logs_manager import LogsManager | ||
| from app.config.settings import get_settings | ||
| from app.tasks.trigger_cron import mark_as_cancelled | ||
| from beanie.operators import In | ||
|
|
||
| logger = LogsManager().get_logger() | ||
|
|
||
| async def cancel_triggers(namespace_name: str, graph_name: str, x_exosphere_request_id: str) -> CancelTriggerResponse: | ||
| """ | ||
| Cancel all pending or triggering triggers for a specific graph | ||
| Args: | ||
| namespace_name: The namespace of the graph | ||
| graph_name: The name of the graph | ||
| x_exosphere_request_id: Request ID for logging | ||
| Returns: | ||
| CancelTriggerResponse with cancellation details | ||
| """ | ||
| try: | ||
| logger.info(f"Request to cancel triggers for graph {graph_name} in namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id) | ||
|
|
||
| # Find all PENDING or TRIGGERING triggers for this graph | ||
| triggers = await DatabaseTriggers.find( | ||
| DatabaseTriggers.namespace == namespace_name, | ||
| DatabaseTriggers.graph_name == graph_name, | ||
| In(DatabaseTriggers.trigger_status, [TriggerStatusEnum.PENDING, TriggerStatusEnum.TRIGGERING]) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If a Database trigger is in TRIGGERING state, I don't think we should cancel in that state. Why? Because it might be immediately picked by the process to move to TRIGGERED. This will create an inconsistent state where actually the job is Triggered but it shows TRIGGERING. |
||
| ).to_list() | ||
|
|
||
| if not triggers: | ||
| logger.info(f"No pending triggers found for graph {graph_name} in namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id) | ||
| return CancelTriggerResponse( | ||
| namespace=namespace_name, | ||
| graph_name=graph_name, | ||
| cancelled_count=0, | ||
| message="No pending triggers found to cancel" | ||
| ) | ||
|
|
||
| # Get retention hours from settings | ||
| settings = get_settings() | ||
| retention_hours = settings.trigger_retention_hours | ||
|
Comment on lines
+46
to
+48
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we need this here? |
||
|
|
||
| # Cancel each trigger concurrently | ||
| cancelled_count = len(triggers) | ||
| cancellation_tasks = [mark_as_cancelled(trigger, retention_hours) for trigger in triggers] | ||
| await asyncio.gather(*cancellation_tasks) | ||
|
Comment on lines
+50
to
+53
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧹 Nitpick | 🔵 Trivial Optional: Consider partial-failure visibility. The current For better user experience, consider using # Cancel each trigger concurrently
cancellation_tasks = [mark_as_cancelled(trigger, retention_hours) for trigger in triggers]
results = await asyncio.gather(*cancellation_tasks, return_exceptions=True)
# Count successful cancellations
cancelled_count = sum(1 for r in results if not isinstance(r, Exception))
failures = [r for r in results if isinstance(r, Exception)]
if failures:
logger.warning(f"Failed to cancel {len(failures)} trigger(s) for graph {graph_name}",
x_exosphere_request_id=x_exosphere_request_id)
# Optionally: raise if no triggers were cancelled successfullyThe current all-or-nothing approach is acceptable for ensuring consistency, but partial success information may be valuable for troubleshooting. 🤖 Prompt for AI Agents
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Kalra-V this suggestion seems valid. Lets incorporate this.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this sends a number of queries, can we use some bulk update operation here? Also can we merge and do something like bulk get and update. |
||
|
|
||
| logger.info(f"Cancelled {cancelled_count} triggers for graph {graph_name} in namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id) | ||
|
|
||
| return CancelTriggerResponse( | ||
| namespace=namespace_name, | ||
| graph_name=graph_name, | ||
| cancelled_count=cancelled_count, | ||
| message=f"Successfully cancelled {cancelled_count} trigger(s)" | ||
| ) | ||
|
|
||
| except Exception as e: | ||
| logger.error(f"Error cancelling triggers for graph {graph_name} in namespace {namespace_name}: {str(e)}", x_exosphere_request_id=x_exosphere_request_id) | ||
| raise | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,9 @@ | ||
| from pydantic import BaseModel, Field | ||
|
|
||
|
|
||
| class CancelTriggerResponse(BaseModel): | ||
| namespace: str = Field(..., description="Namespace of the cancelled triggers") | ||
| graph_name: str = Field(..., description="Name of the graph") | ||
| cancelled_count: int = Field(..., description="Number of triggers that were cancelled") | ||
| message: str = Field(..., description="Human-readable message describing the result") | ||
|
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -98,4 +98,15 @@ async def trigger_cron(): | |
| cron_time = datetime.now() | ||
| settings = get_settings() | ||
| logger.info(f"starting trigger_cron: {cron_time}") | ||
| await asyncio.gather(*[handle_trigger(cron_time, settings.trigger_retention_hours) for _ in range(settings.trigger_workers)]) | ||
| await asyncio.gather(*[handle_trigger(cron_time, settings.trigger_retention_hours) for _ in range(settings.trigger_workers)]) | ||
|
|
||
| async def mark_as_cancelled(trigger: DatabaseTriggers, retention_hours: int): | ||
| expires_at = datetime.now(timezone.utc) + timedelta(hours=retention_hours) | ||
|
|
||
| await DatabaseTriggers.get_pymongo_collection().update_one( | ||
| {"_id": trigger.id}, | ||
| {"$set": { | ||
| "trigger_status": TriggerStatusEnum.CANCELLED, | ||
| "expires_at": expires_at | ||
| }} | ||
| ) | ||
|
Comment on lines
+103
to
+112
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we need this here? |
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will not recommend importing from tasks in controller. This might lead to confusing dependencies going forward.