From 40b1be294779e1116d2ee1f736671d07009f29cc Mon Sep 17 00:00:00 2001 From: Brijesh-Thakkar Date: Wed, 10 Dec 2025 19:54:29 +0530 Subject: [PATCH 1/2] fix: wire mark_as_cancelled into trigger cancellation flow and replace delete_many with TTL-based cleanup --- state-manager/app/config/settings.py | 2 +- .../app/controller/upsert_graph_template.py | 102 +++++++++++++----- state-manager/app/models/db/trigger.py | 3 +- state-manager/app/tasks/init_tasks.py | 45 ++++++-- state-manager/app/tasks/trigger_cron.py | 16 +++ 5 files changed, 131 insertions(+), 37 deletions(-) diff --git a/state-manager/app/config/settings.py b/state-manager/app/config/settings.py index 75cea36d..6bbd8cbe 100644 --- a/state-manager/app/config/settings.py +++ b/state-manager/app/config/settings.py @@ -6,7 +6,7 @@ class Settings(BaseModel): """Application settings loaded from environment variables.""" - + # MongoDB Configuration mongo_uri: str = Field(..., description="MongoDB connection URI" ) mongo_database_name: str = Field(default="exosphere-state-manager", description="MongoDB database name") diff --git a/state-manager/app/controller/upsert_graph_template.py b/state-manager/app/controller/upsert_graph_template.py index 72e70e29..770e286e 100644 --- a/state-manager/app/controller/upsert_graph_template.py +++ b/state-manager/app/controller/upsert_graph_template.py @@ -1,3 +1,5 @@ +from datetime import datetime, timedelta, timezone + from app.singletons.logs_manager import LogsManager from app.models.graph_models import UpsertGraphTemplateRequest, UpsertGraphTemplateResponse from app.models.db.graph_template_model import GraphTemplate @@ -5,30 +7,39 @@ from app.tasks.verify_graph import verify_graph from app.models.db.trigger import DatabaseTriggers from app.models.trigger_models import TriggerStatusEnum, TriggerTypeEnum -from beanie.operators import In +from app.config.settings import get_settings from fastapi import BackgroundTasks, HTTPException logger = LogsManager().get_logger() +settings = get_settings() -async def upsert_graph_template(namespace_name: str, graph_name: str, body: UpsertGraphTemplateRequest, x_exosphere_request_id: str, background_tasks: BackgroundTasks) -> UpsertGraphTemplateResponse: - try: +async def upsert_graph_template( + namespace_name: str, + graph_name: str, + body: UpsertGraphTemplateRequest, + x_exosphere_request_id: str, + background_tasks: BackgroundTasks, +) -> UpsertGraphTemplateResponse: + try: old_triggers = [] graph_template = await GraphTemplate.find_one( GraphTemplate.name == graph_name, - GraphTemplate.namespace == namespace_name + GraphTemplate.namespace == namespace_name, ) - + try: if graph_template: logger.info( - "Graph template already exists in namespace", graph_template=graph_template, + "Graph template already exists in namespace", + graph_template=graph_template, namespace_name=namespace_name, - x_exosphere_request_id=x_exosphere_request_id) + x_exosphere_request_id=x_exosphere_request_id, + ) old_triggers = graph_template.triggers - + graph_template.set_secrets(body.secrets) graph_template.validation_status = GraphTemplateValidationStatus.PENDING graph_template.validation_errors = [] @@ -37,14 +48,15 @@ async def upsert_graph_template(namespace_name: str, graph_name: str, body: Upse graph_template.nodes = body.nodes graph_template.triggers = body.triggers await graph_template.save() - + else: logger.info( "Graph template does not exist in namespace", namespace_name=namespace_name, graph_name=graph_name, - x_exosphere_request_id=x_exosphere_request_id) - + x_exosphere_request_id=x_exosphere_request_id, + ) + graph_template = await GraphTemplate.insert( GraphTemplate( name=graph_name, @@ -54,20 +66,51 @@ async def upsert_graph_template(namespace_name: str, graph_name: str, body: Upse validation_errors=[], retry_policy=body.retry_policy, store_config=body.store_config, - triggers=body.triggers + triggers=body.triggers, ).set_secrets(body.secrets) ) except ValueError as e: - logger.error("Error validating graph template", error=e, x_exosphere_request_id=x_exosphere_request_id) - raise HTTPException(status_code=400, detail=f"Error validating graph template: {str(e)}") - + logger.error( + "Error validating graph template", + error=e, + x_exosphere_request_id=x_exosphere_request_id, + ) + raise HTTPException( + status_code=400, + detail=f"Error validating graph template: {str(e)}", + ) + + # Previously: + # await DatabaseTriggers.find(...).delete_many() + # + # Now: bulk update to mark matching CRON triggers as CANCELLED + # and set expires_at so TTL can clean them up later. if len(old_triggers) > 0: - await DatabaseTriggers.find( - DatabaseTriggers.graph_name == graph_name, - DatabaseTriggers.trigger_status == TriggerStatusEnum.PENDING, - DatabaseTriggers.type == TriggerTypeEnum.CRON, - In(DatabaseTriggers.expression, [trigger.value["expression"] for trigger in old_triggers if trigger.type == TriggerTypeEnum.CRON]) - ).delete_many() + cron_expressions = [ + trigger.value["expression"] + for trigger in old_triggers + if trigger.type == TriggerTypeEnum.CRON + ] + + if cron_expressions: + expires_at = datetime.now(timezone.utc) + timedelta( + hours=settings.trigger_retention_hours + ) + + await DatabaseTriggers.get_pymongo_collection().update_many( + { + "graph_name": graph_name, + "trigger_status": TriggerStatusEnum.PENDING.value, + "type": TriggerTypeEnum.CRON.value, + "expression": {"$in": cron_expressions}, + }, + { + "$set": { + "trigger_status": TriggerStatusEnum.CANCELLED.value, + "expires_at": expires_at, + } + }, + ) background_tasks.add_task(verify_graph, graph_template) @@ -75,14 +118,21 @@ async def upsert_graph_template(namespace_name: str, graph_name: str, body: Upse nodes=graph_template.nodes, validation_status=graph_template.validation_status, validation_errors=graph_template.validation_errors, - secrets={secret_name: True for secret_name in graph_template.get_secrets().keys()}, + secrets={ + secret_name: True + for secret_name in graph_template.get_secrets().keys() + }, retry_policy=graph_template.retry_policy, store_config=graph_template.store_config, triggers=graph_template.triggers, created_at=graph_template.created_at, - updated_at=graph_template.updated_at + updated_at=graph_template.updated_at, ) - + except Exception as e: - logger.error("Error upserting graph template", error=e, x_exosphere_request_id=x_exosphere_request_id) - raise e \ No newline at end of file + logger.error( + "Error upserting graph template", + error=e, + x_exosphere_request_id=x_exosphere_request_id, + ) + raise e diff --git a/state-manager/app/models/db/trigger.py b/state-manager/app/models/db/trigger.py index a416193e..9a4998ce 100644 --- a/state-manager/app/models/db/trigger.py +++ b/state-manager/app/models/db/trigger.py @@ -44,7 +44,8 @@ class Settings: "trigger_status": { "$in": [ TriggerStatusEnum.TRIGGERED, - TriggerStatusEnum.FAILED + TriggerStatusEnum.FAILED, + TriggerStatusEnum.CANCELLED ] } } diff --git a/state-manager/app/tasks/init_tasks.py b/state-manager/app/tasks/init_tasks.py index 690bb5a8..adc6d4dd 100644 --- a/state-manager/app/tasks/init_tasks.py +++ b/state-manager/app/tasks/init_tasks.py @@ -1,20 +1,47 @@ # tasks to run when the server starts +from datetime import datetime, timedelta, timezone +import asyncio + +from app.config.settings import get_settings from app.models.db.trigger import DatabaseTriggers from app.models.trigger_models import TriggerStatusEnum -import asyncio +from app.singletons.logs_manager import LogsManager + +logger = LogsManager().get_logger() + async def delete_old_triggers(): - await DatabaseTriggers.get_pymongo_collection().delete_many( + settings = get_settings() + retention_hours = settings.trigger_retention_hours + expires_at = datetime.now(timezone.utc) + timedelta(hours=retention_hours) + + # Use the same filter used before by delete_many() + filter_query = { + "trigger_status": { + "$in": [TriggerStatusEnum.TRIGGERED, TriggerStatusEnum.FAILED] + }, + "expires_at": None + } + + logger.info( + f"Init task marking triggers CANCELLED for filter={filter_query}, " + f"expires_at={expires_at.isoformat()}" + ) + + await DatabaseTriggers.get_pymongo_collection().update_many( + filter_query, { - "trigger_status": { - "$in": [TriggerStatusEnum.TRIGGERED, TriggerStatusEnum.FAILED] - }, - "expires_at": None - } + "$set": { + "trigger_status": TriggerStatusEnum.CANCELLED, + "expires_at": expires_at, + } + }, ) + async def init_tasks(): await asyncio.gather( *[ - delete_old_triggers() - ]) \ No newline at end of file + delete_old_triggers(), + ] + ) diff --git a/state-manager/app/tasks/trigger_cron.py b/state-manager/app/tasks/trigger_cron.py index 3fca36ea..f072883e 100644 --- a/state-manager/app/tasks/trigger_cron.py +++ b/state-manager/app/tasks/trigger_cron.py @@ -45,6 +45,22 @@ async def mark_as_failed(trigger: DatabaseTriggers, retention_hours: int): }} ) +async def mark_as_cancelled(trigger: DatabaseTriggers, retention_hours: int): + """ + Mark a trigger as CANCELLED and set expires_at so MongoDB TTL will remove it + after `retention_hours`. + """ + expires_at = datetime.now(timezone.utc) + timedelta(hours=retention_hours) + + await DatabaseTriggers.get_pymongo_collection().update_one( + {"_id": trigger.id}, + {"$set": { + "trigger_status": TriggerStatusEnum.CANCELLED, + "expires_at": expires_at + }} + ) + + async def create_next_triggers(trigger: DatabaseTriggers, cron_time: datetime, retention_hours: int): assert trigger.expression is not None iter = croniter.croniter(trigger.expression, trigger.trigger_time) From a445bec7eeb5cb8ba82b1f43668f0b1393acd8f3 Mon Sep 17 00:00:00 2001 From: Brijesh-Thakkar Date: Wed, 10 Dec 2025 20:45:45 +0530 Subject: [PATCH 2/2] Add TTL-based CANCELLED trigger cleanup and integrate mark_as_cancelled across init, cron, and upsert_graph_template --- .../app/controller/upsert_graph_template.py | 9 +++-- state-manager/app/tasks/init_tasks.py | 27 ++++++++------ state-manager/app/tasks/trigger_cron.py | 36 ++++++++++++++----- 3 files changed, 51 insertions(+), 21 deletions(-) diff --git a/state-manager/app/controller/upsert_graph_template.py b/state-manager/app/controller/upsert_graph_template.py index 770e286e..cf973177 100644 --- a/state-manager/app/controller/upsert_graph_template.py +++ b/state-manager/app/controller/upsert_graph_template.py @@ -12,7 +12,6 @@ from fastapi import BackgroundTasks, HTTPException logger = LogsManager().get_logger() -settings = get_settings() async def upsert_graph_template( @@ -23,6 +22,8 @@ async def upsert_graph_template( background_tasks: BackgroundTasks, ) -> UpsertGraphTemplateResponse: try: + # Load settings at request time so runtime config changes are picked up + settings = get_settings() old_triggers = [] graph_template = await GraphTemplate.find_one( @@ -79,7 +80,7 @@ async def upsert_graph_template( status_code=400, detail=f"Error validating graph template: {str(e)}", ) - + # Previously: # await DatabaseTriggers.find(...).delete_many() # @@ -100,6 +101,7 @@ async def upsert_graph_template( await DatabaseTriggers.get_pymongo_collection().update_many( { "graph_name": graph_name, + "namespace": namespace_name, # ensure we only affect this namespace "trigger_status": TriggerStatusEnum.PENDING.value, "type": TriggerTypeEnum.CRON.value, "expression": {"$in": cron_expressions}, @@ -135,4 +137,5 @@ async def upsert_graph_template( error=e, x_exosphere_request_id=x_exosphere_request_id, ) - raise e + # re-raise with original traceback preserved + raise diff --git a/state-manager/app/tasks/init_tasks.py b/state-manager/app/tasks/init_tasks.py index adc6d4dd..439a3870 100644 --- a/state-manager/app/tasks/init_tasks.py +++ b/state-manager/app/tasks/init_tasks.py @@ -10,7 +10,13 @@ logger = LogsManager().get_logger() -async def delete_old_triggers(): +async def mark_old_triggers_cancelled() -> None: + """ + Migrate legacy TRIGGERED/FAILED triggers that predate TTL. + + These documents have expires_at = None, so we mark them as CANCELLED and + set expires_at so the TTL index can eventually clean them up. + """ settings = get_settings() retention_hours = settings.trigger_retention_hours expires_at = datetime.now(timezone.utc) + timedelta(hours=retention_hours) @@ -18,30 +24,31 @@ async def delete_old_triggers(): # Use the same filter used before by delete_many() filter_query = { "trigger_status": { - "$in": [TriggerStatusEnum.TRIGGERED, TriggerStatusEnum.FAILED] + "$in": [ + TriggerStatusEnum.TRIGGERED.value, + TriggerStatusEnum.FAILED.value, + ] }, - "expires_at": None + "expires_at": None, } logger.info( - f"Init task marking triggers CANCELLED for filter={filter_query}, " - f"expires_at={expires_at.isoformat()}" + "Init task marking legacy TRIGGERED/FAILED triggers as CANCELLED " + f"for filter={filter_query}, expires_at={expires_at.isoformat()}" ) await DatabaseTriggers.get_pymongo_collection().update_many( filter_query, { "$set": { - "trigger_status": TriggerStatusEnum.CANCELLED, + "trigger_status": TriggerStatusEnum.CANCELLED.value, "expires_at": expires_at, } }, ) -async def init_tasks(): +async def init_tasks() -> None: await asyncio.gather( - *[ - delete_old_triggers(), - ] + mark_old_triggers_cancelled(), ) diff --git a/state-manager/app/tasks/trigger_cron.py b/state-manager/app/tasks/trigger_cron.py index f072883e..eebd9e4a 100644 --- a/state-manager/app/tasks/trigger_cron.py +++ b/state-manager/app/tasks/trigger_cron.py @@ -13,19 +13,21 @@ logger = LogsManager().get_logger() + async def get_due_triggers(cron_time: datetime) -> DatabaseTriggers | None: data = await DatabaseTriggers.get_pymongo_collection().find_one_and_update( { "trigger_time": {"$lte": cron_time}, - "trigger_status": TriggerStatusEnum.PENDING + "trigger_status": TriggerStatusEnum.PENDING.value }, { - "$set": {"trigger_status": TriggerStatusEnum.TRIGGERING} + "$set": {"trigger_status": TriggerStatusEnum.TRIGGERING.value} }, return_document=ReturnDocument.AFTER ) return DatabaseTriggers(**data) if data else None + async def call_trigger_graph(trigger: DatabaseTriggers): await trigger_graph( namespace_name=trigger.namespace, @@ -34,17 +36,19 @@ async def call_trigger_graph(trigger: DatabaseTriggers): x_exosphere_request_id=str(uuid4()) ) + async def mark_as_failed(trigger: DatabaseTriggers, retention_hours: int): expires_at = datetime.now(timezone.utc) + timedelta(hours=retention_hours) await DatabaseTriggers.get_pymongo_collection().update_one( {"_id": trigger.id}, {"$set": { - "trigger_status": TriggerStatusEnum.FAILED, + "trigger_status": TriggerStatusEnum.FAILED.value, "expires_at": expires_at }} ) + async def mark_as_cancelled(trigger: DatabaseTriggers, retention_hours: int): """ Mark a trigger as CANCELLED and set expires_at so MongoDB TTL will remove it @@ -55,12 +59,22 @@ async def mark_as_cancelled(trigger: DatabaseTriggers, retention_hours: int): await DatabaseTriggers.get_pymongo_collection().update_one( {"_id": trigger.id}, {"$set": { - "trigger_status": TriggerStatusEnum.CANCELLED, + "trigger_status": TriggerStatusEnum.CANCELLED.value, # keep .value ✅ "expires_at": expires_at }} ) +async def cancel_trigger(trigger: DatabaseTriggers, retention_hours: int): + """ + Cancel a trigger using the mark_as_cancelled helper. + + This is intended to be used by other modules instead of duplicating + the cancellation logic inline. + """ + await mark_as_cancelled(trigger, retention_hours) + + async def create_next_triggers(trigger: DatabaseTriggers, cron_time: datetime, retention_hours: int): assert trigger.expression is not None iter = croniter.croniter(trigger.expression, trigger.trigger_time) @@ -76,7 +90,7 @@ async def create_next_triggers(trigger: DatabaseTriggers, cron_time: datetime, r graph_name=trigger.graph_name, namespace=trigger.namespace, trigger_time=next_trigger_time, - trigger_status=TriggerStatusEnum.PENDING, + trigger_status=TriggerStatusEnum.PENDING, # OK because insert() converts expires_at=expires_at ).insert() except DuplicateKeyError: @@ -88,19 +102,21 @@ async def create_next_triggers(trigger: DatabaseTriggers, cron_time: datetime, r if next_trigger_time > cron_time: break + async def mark_as_triggered(trigger: DatabaseTriggers, retention_hours: int): expires_at = datetime.now(timezone.utc) + timedelta(hours=retention_hours) await DatabaseTriggers.get_pymongo_collection().update_one( {"_id": trigger.id}, {"$set": { - "trigger_status": TriggerStatusEnum.TRIGGERED, + "trigger_status": TriggerStatusEnum.TRIGGERED.value, "expires_at": expires_at }} ) + async def handle_trigger(cron_time: datetime, retention_hours: int): - while(trigger:= await get_due_triggers(cron_time)): + while (trigger := await get_due_triggers(cron_time)): try: await call_trigger_graph(trigger) await mark_as_triggered(trigger, retention_hours) @@ -110,8 +126,12 @@ async def handle_trigger(cron_time: datetime, retention_hours: int): finally: await create_next_triggers(trigger, cron_time, retention_hours) + async def trigger_cron(): cron_time = datetime.now() settings = get_settings() logger.info(f"starting trigger_cron: {cron_time}") - await asyncio.gather(*[handle_trigger(cron_time, settings.trigger_retention_hours) for _ in range(settings.trigger_workers)]) \ No newline at end of file + await asyncio.gather(*[ + handle_trigger(cron_time, settings.trigger_retention_hours) + for _ in range(settings.trigger_workers) + ])