Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion state-manager/app/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

class Settings(BaseModel):
"""Application settings loaded from environment variables."""

# MongoDB Configuration
mongo_uri: str = Field(..., description="MongoDB connection URI" )
mongo_database_name: str = Field(default="exosphere-state-manager", description="MongoDB database name")
Expand Down
105 changes: 79 additions & 26 deletions state-manager/app/controller/upsert_graph_template.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,46 @@
from datetime import datetime, timedelta, timezone

from app.singletons.logs_manager import LogsManager
from app.models.graph_models import UpsertGraphTemplateRequest, UpsertGraphTemplateResponse
from app.models.db.graph_template_model import GraphTemplate
from app.models.graph_template_validation_status import GraphTemplateValidationStatus
from app.tasks.verify_graph import verify_graph
from app.models.db.trigger import DatabaseTriggers
from app.models.trigger_models import TriggerStatusEnum, TriggerTypeEnum
from beanie.operators import In
from app.config.settings import get_settings

from fastapi import BackgroundTasks, HTTPException

logger = LogsManager().get_logger()

async def upsert_graph_template(namespace_name: str, graph_name: str, body: UpsertGraphTemplateRequest, x_exosphere_request_id: str, background_tasks: BackgroundTasks) -> UpsertGraphTemplateResponse:
try:

async def upsert_graph_template(
namespace_name: str,
graph_name: str,
body: UpsertGraphTemplateRequest,
x_exosphere_request_id: str,
background_tasks: BackgroundTasks,
) -> UpsertGraphTemplateResponse:
try:
# Load settings at request time so runtime config changes are picked up
settings = get_settings()
old_triggers = []

graph_template = await GraphTemplate.find_one(
GraphTemplate.name == graph_name,
GraphTemplate.namespace == namespace_name
GraphTemplate.namespace == namespace_name,
)

try:
if graph_template:
logger.info(
"Graph template already exists in namespace", graph_template=graph_template,
"Graph template already exists in namespace",
graph_template=graph_template,
namespace_name=namespace_name,
x_exosphere_request_id=x_exosphere_request_id)
x_exosphere_request_id=x_exosphere_request_id,
)
old_triggers = graph_template.triggers

graph_template.set_secrets(body.secrets)
graph_template.validation_status = GraphTemplateValidationStatus.PENDING
graph_template.validation_errors = []
Expand All @@ -37,14 +49,15 @@ async def upsert_graph_template(namespace_name: str, graph_name: str, body: Upse
graph_template.nodes = body.nodes
graph_template.triggers = body.triggers
await graph_template.save()

else:
logger.info(
"Graph template does not exist in namespace",
namespace_name=namespace_name,
graph_name=graph_name,
x_exosphere_request_id=x_exosphere_request_id)

x_exosphere_request_id=x_exosphere_request_id,
)

graph_template = await GraphTemplate.insert(
GraphTemplate(
name=graph_name,
Expand All @@ -54,35 +67,75 @@ async def upsert_graph_template(namespace_name: str, graph_name: str, body: Upse
validation_errors=[],
retry_policy=body.retry_policy,
store_config=body.store_config,
triggers=body.triggers
triggers=body.triggers,
).set_secrets(body.secrets)
)
except ValueError as e:
logger.error("Error validating graph template", error=e, x_exosphere_request_id=x_exosphere_request_id)
raise HTTPException(status_code=400, detail=f"Error validating graph template: {str(e)}")

logger.error(
"Error validating graph template",
error=e,
x_exosphere_request_id=x_exosphere_request_id,
)
raise HTTPException(
status_code=400,
detail=f"Error validating graph template: {str(e)}",
)

# Previously:
# await DatabaseTriggers.find(...).delete_many()
#
# Now: bulk update to mark matching CRON triggers as CANCELLED
# and set expires_at so TTL can clean them up later.
if len(old_triggers) > 0:
await DatabaseTriggers.find(
DatabaseTriggers.graph_name == graph_name,
DatabaseTriggers.trigger_status == TriggerStatusEnum.PENDING,
DatabaseTriggers.type == TriggerTypeEnum.CRON,
In(DatabaseTriggers.expression, [trigger.value["expression"] for trigger in old_triggers if trigger.type == TriggerTypeEnum.CRON])
).delete_many()
cron_expressions = [
trigger.value["expression"]
for trigger in old_triggers
if trigger.type == TriggerTypeEnum.CRON
]

if cron_expressions:
expires_at = datetime.now(timezone.utc) + timedelta(
hours=settings.trigger_retention_hours
)

await DatabaseTriggers.get_pymongo_collection().update_many(
{
"graph_name": graph_name,
"namespace": namespace_name, # ensure we only affect this namespace
"trigger_status": TriggerStatusEnum.PENDING.value,
"type": TriggerTypeEnum.CRON.value,
"expression": {"$in": cron_expressions},
},
{
"$set": {
"trigger_status": TriggerStatusEnum.CANCELLED.value,
"expires_at": expires_at,
}
},
)

background_tasks.add_task(verify_graph, graph_template)

return UpsertGraphTemplateResponse(
nodes=graph_template.nodes,
validation_status=graph_template.validation_status,
validation_errors=graph_template.validation_errors,
secrets={secret_name: True for secret_name in graph_template.get_secrets().keys()},
secrets={
secret_name: True
for secret_name in graph_template.get_secrets().keys()
},
retry_policy=graph_template.retry_policy,
store_config=graph_template.store_config,
triggers=graph_template.triggers,
created_at=graph_template.created_at,
updated_at=graph_template.updated_at
updated_at=graph_template.updated_at,
)

except Exception as e:
logger.error("Error upserting graph template", error=e, x_exosphere_request_id=x_exosphere_request_id)
raise e
logger.error(
"Error upserting graph template",
error=e,
x_exosphere_request_id=x_exosphere_request_id,
)
# re-raise with original traceback preserved
raise
3 changes: 2 additions & 1 deletion state-manager/app/models/db/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ class Settings:
"trigger_status": {
"$in": [
TriggerStatusEnum.TRIGGERED,
TriggerStatusEnum.FAILED
TriggerStatusEnum.FAILED,
TriggerStatusEnum.CANCELLED
]
}
}
Expand Down
58 changes: 46 additions & 12 deletions state-manager/app/tasks/init_tasks.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,54 @@
# tasks to run when the server starts
from datetime import datetime, timedelta, timezone
import asyncio

from app.config.settings import get_settings
from app.models.db.trigger import DatabaseTriggers
from app.models.trigger_models import TriggerStatusEnum
import asyncio
from app.singletons.logs_manager import LogsManager

logger = LogsManager().get_logger()


async def mark_old_triggers_cancelled() -> None:
"""
Migrate legacy TRIGGERED/FAILED triggers that predate TTL.

async def delete_old_triggers():
await DatabaseTriggers.get_pymongo_collection().delete_many(
These documents have expires_at = None, so we mark them as CANCELLED and
set expires_at so the TTL index can eventually clean them up.
"""
settings = get_settings()
retention_hours = settings.trigger_retention_hours
expires_at = datetime.now(timezone.utc) + timedelta(hours=retention_hours)

# Use the same filter used before by delete_many()
filter_query = {
"trigger_status": {
"$in": [
TriggerStatusEnum.TRIGGERED.value,
TriggerStatusEnum.FAILED.value,
]
},
"expires_at": None,
}

logger.info(
"Init task marking legacy TRIGGERED/FAILED triggers as CANCELLED "
f"for filter={filter_query}, expires_at={expires_at.isoformat()}"
)

await DatabaseTriggers.get_pymongo_collection().update_many(
filter_query,
{
"trigger_status": {
"$in": [TriggerStatusEnum.TRIGGERED, TriggerStatusEnum.FAILED]
},
"expires_at": None
}
"$set": {
"trigger_status": TriggerStatusEnum.CANCELLED.value,
"expires_at": expires_at,
}
},
)

async def init_tasks():

async def init_tasks() -> None:
await asyncio.gather(
*[
delete_old_triggers()
])
mark_old_triggers_cancelled(),
)
50 changes: 43 additions & 7 deletions state-manager/app/tasks/trigger_cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,21 @@

logger = LogsManager().get_logger()


async def get_due_triggers(cron_time: datetime) -> DatabaseTriggers | None:
data = await DatabaseTriggers.get_pymongo_collection().find_one_and_update(
{
"trigger_time": {"$lte": cron_time},
"trigger_status": TriggerStatusEnum.PENDING
"trigger_status": TriggerStatusEnum.PENDING.value
},
{
"$set": {"trigger_status": TriggerStatusEnum.TRIGGERING}
"$set": {"trigger_status": TriggerStatusEnum.TRIGGERING.value}
},
return_document=ReturnDocument.AFTER
)
return DatabaseTriggers(**data) if data else None


async def call_trigger_graph(trigger: DatabaseTriggers):
await trigger_graph(
namespace_name=trigger.namespace,
Expand All @@ -34,17 +36,45 @@ async def call_trigger_graph(trigger: DatabaseTriggers):
x_exosphere_request_id=str(uuid4())
)


async def mark_as_failed(trigger: DatabaseTriggers, retention_hours: int):
expires_at = datetime.now(timezone.utc) + timedelta(hours=retention_hours)

await DatabaseTriggers.get_pymongo_collection().update_one(
{"_id": trigger.id},
{"$set": {
"trigger_status": TriggerStatusEnum.FAILED,
"trigger_status": TriggerStatusEnum.FAILED.value,
"expires_at": expires_at
}}
)


async def mark_as_cancelled(trigger: DatabaseTriggers, retention_hours: int):
"""
Mark a trigger as CANCELLED and set expires_at so MongoDB TTL will remove it
after `retention_hours`.
"""
expires_at = datetime.now(timezone.utc) + timedelta(hours=retention_hours)

await DatabaseTriggers.get_pymongo_collection().update_one(
{"_id": trigger.id},
{"$set": {
"trigger_status": TriggerStatusEnum.CANCELLED.value, # keep .value ✅
"expires_at": expires_at
}}
)


async def cancel_trigger(trigger: DatabaseTriggers, retention_hours: int):
"""
Cancel a trigger using the mark_as_cancelled helper.

This is intended to be used by other modules instead of duplicating
the cancellation logic inline.
"""
await mark_as_cancelled(trigger, retention_hours)


async def create_next_triggers(trigger: DatabaseTriggers, cron_time: datetime, retention_hours: int):
assert trigger.expression is not None
iter = croniter.croniter(trigger.expression, trigger.trigger_time)
Expand All @@ -60,7 +90,7 @@ async def create_next_triggers(trigger: DatabaseTriggers, cron_time: datetime, r
graph_name=trigger.graph_name,
namespace=trigger.namespace,
trigger_time=next_trigger_time,
trigger_status=TriggerStatusEnum.PENDING,
trigger_status=TriggerStatusEnum.PENDING, # OK because insert() converts
expires_at=expires_at
).insert()
except DuplicateKeyError:
Expand All @@ -72,19 +102,21 @@ async def create_next_triggers(trigger: DatabaseTriggers, cron_time: datetime, r
if next_trigger_time > cron_time:
break


async def mark_as_triggered(trigger: DatabaseTriggers, retention_hours: int):
expires_at = datetime.now(timezone.utc) + timedelta(hours=retention_hours)

await DatabaseTriggers.get_pymongo_collection().update_one(
{"_id": trigger.id},
{"$set": {
"trigger_status": TriggerStatusEnum.TRIGGERED,
"trigger_status": TriggerStatusEnum.TRIGGERED.value,
"expires_at": expires_at
}}
)


async def handle_trigger(cron_time: datetime, retention_hours: int):
while(trigger:= await get_due_triggers(cron_time)):
while (trigger := await get_due_triggers(cron_time)):
try:
await call_trigger_graph(trigger)
await mark_as_triggered(trigger, retention_hours)
Expand All @@ -94,8 +126,12 @@ async def handle_trigger(cron_time: datetime, retention_hours: int):
finally:
await create_next_triggers(trigger, cron_time, retention_hours)


async def trigger_cron():
cron_time = datetime.now()
settings = get_settings()
logger.info(f"starting trigger_cron: {cron_time}")
await asyncio.gather(*[handle_trigger(cron_time, settings.trigger_retention_hours) for _ in range(settings.trigger_workers)])
await asyncio.gather(*[
handle_trigger(cron_time, settings.trigger_retention_hours)
for _ in range(settings.trigger_workers)
])