Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/.codespellignorewords
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ YML
SDK
S3
Kusto
NotIn
NotIn
nin
12 changes: 11 additions & 1 deletion docs/docs/exosphere/state-manager-setup.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ The Exosphere state manager is the core backend service that handles workflow ex
-e MONGO_DATABASE_NAME="your-database-name" \
-e STATE_MANAGER_SECRET="your-secret-key" \
-e SECRETS_ENCRYPTION_KEY="your-base64-encoded-encryption-key" \
-e TRIGGER_WORKERS="1" \
-e TRIGGER_RETENTION_HOURS="720" \
ghcr.io/exospherehost/exosphere-state-manager:latest
```

Expand Down Expand Up @@ -84,6 +86,8 @@ The Exosphere state manager is the core backend service that handles workflow ex
export MONGO_DATABASE_NAME="your-database-name"
export STATE_MANAGER_SECRET="your-secret-key"
export SECRETS_ENCRYPTION_KEY="your-base64-encoded-encryption-key"
export TRIGGER_WORKERS="1"
export TRIGGER_RETENTION_HOURS="720"
```

4. **Run the state manager**:
Expand Down Expand Up @@ -151,6 +155,8 @@ The state manager uri and key would be configured accordingly while setting up n
- MONGO_DATABASE_NAME=${MONGO_DATABASE_NAME}
- STATE_MANAGER_SECRET=${STATE_MANAGER_SECRET}
- SECRETS_ENCRYPTION_KEY=${SECRETS_ENCRYPTION_KEY}
- TRIGGER_WORKERS=${TRIGGER_WORKERS:-1}
- TRIGGER_RETENTION_HOURS=${TRIGGER_RETENTION_HOURS:-720}
deploy:
replicas: 3
update_config:
Expand Down Expand Up @@ -182,6 +188,8 @@ The state manager uri and key would be configured accordingly while setting up n
data:
MONGO_URI: "your-mongodb-connection-string"
MONGO_DATABASE_NAME: "your-database-name"
TRIGGER_WORKERS: "1"
TRIGGER_RETENTION_HOURS: "720"
LOG_LEVEL: "INFO"
```

Expand Down Expand Up @@ -212,9 +220,11 @@ The state manager uri and key would be configured accordingly while setting up n
| Variable | Description | Required | Default |
|----------|-------------|----------|---------|
| `MONGO_URI` | MongoDB connection string | Yes | - |
| `MONGO_DATABASE_NAME` | Database name | Yes | `exosphere` |
| `MONGO_DATABASE_NAME` | Database name | Yes | `exosphere-state-manager` |
| `STATE_MANAGER_SECRET` | Secret API key for authentication | Yes | - |
| `SECRETS_ENCRYPTION_KEY` | Base64-encoded key for data encryption | Yes | - |
| `TRIGGER_WORKERS` | Number of workers to run the trigger cron | No | `1` |
| `TRIGGER_RETENTION_HOURS` | Number of hours to retain completed/failed triggers before cleanup | No | `720` (30 days) |
| `LOG_LEVEL` | Logging level (DEBUG, INFO, WARNING, ERROR) | No | `INFO` |

## Monitoring and Health Checks
Expand Down
6 changes: 4 additions & 2 deletions state-manager/app/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@

class Settings(BaseModel):
"""Application settings loaded from environment variables."""

# MongoDB Configuration
mongo_uri: str = Field(..., description="MongoDB connection URI" )
mongo_database_name: str = Field(default="exosphere-state-manager", description="MongoDB database name")
state_manager_secret: str = Field(..., description="Secret key for API authentication")
secrets_encryption_key: str = Field(..., description="Key for encrypting secrets")
trigger_workers: int = Field(default=1, description="Number of workers to run the trigger cron")
trigger_retention_hours: int = Field(default=720, description="Number of hours to retain completed/failed triggers before cleanup")

@classmethod
def from_env(cls) -> "Settings":
Expand All @@ -21,7 +22,8 @@ def from_env(cls) -> "Settings":
mongo_database_name=os.getenv("MONGO_DATABASE_NAME", "exosphere-state-manager"), # type: ignore
state_manager_secret=os.getenv("STATE_MANAGER_SECRET"), # type: ignore
secrets_encryption_key=os.getenv("SECRETS_ENCRYPTION_KEY"), # type: ignore
trigger_workers=int(os.getenv("TRIGGER_WORKERS", 1)) # type: ignore
trigger_workers=int(os.getenv("TRIGGER_WORKERS", 1)), # type: ignore
trigger_retention_hours=int(os.getenv("TRIGGER_RETENTION_HOURS", 720)) # type: ignore
)


Expand Down
7 changes: 7 additions & 0 deletions state-manager/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from .tasks.trigger_cron import trigger_cron

# init tasks
from .tasks.init_tasks import init_tasks

# Define models list
DOCUMENT_MODELS = [State, GraphTemplate, RegisteredNode, Store, Run, DatabaseTriggers]
Expand All @@ -59,6 +62,10 @@ async def lifespan(app: FastAPI):
await init_beanie(db, document_models=DOCUMENT_MODELS)
logger.info("beanie dbs initialized")

# performing init tasks
await init_tasks()
logger.info("init tasks completed")

# initialize secret
if not settings.state_manager_secret:
raise ValueError("STATE_MANAGER_SECRET is not set")
Expand Down
18 changes: 17 additions & 1 deletion state-manager/app/models/db/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ class DatabaseTriggers(Document):
namespace: str = Field(..., description="Namespace of the graph")
trigger_time: datetime = Field(..., description="Trigger time of the trigger")
trigger_status: TriggerStatusEnum = Field(..., description="Status of the trigger")
expires_at: Optional[datetime] = Field(default=None, description="Expiration time for automatic cleanup of completed triggers")

class Settings:
class Settings:
indexes = [
IndexModel(
[
Expand All @@ -32,5 +33,20 @@ class Settings:
],
name="uniq_graph_type_expr_time",
unique=True
),
IndexModel(
[
("expires_at", 1),
],
name="ttl_expires_at",
expireAfterSeconds=0, # Delete immediately when expires_at is reached
partialFilterExpression={
"trigger_status": {
"$in": [
TriggerStatusEnum.TRIGGERED,
TriggerStatusEnum.FAILED
]
}
}
)
]
20 changes: 20 additions & 0 deletions state-manager/app/tasks/init_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# tasks to run when the server starts
from app.models.db.trigger import DatabaseTriggers
from app.models.trigger_models import TriggerStatusEnum
import asyncio

async def delete_old_triggers():
await DatabaseTriggers.get_pymongo_collection().delete_many(
{
"trigger_status": {
"$in": [TriggerStatusEnum.TRIGGERED, TriggerStatusEnum.FAILED]
},
"expires_at": None
}
)

async def init_tasks():
await asyncio.gather(
*[
delete_old_triggers()
])
37 changes: 25 additions & 12 deletions state-manager/app/tasks/trigger_cron.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from datetime import datetime
from datetime import datetime, timedelta, timezone
from uuid import uuid4
from app.models.db.trigger import DatabaseTriggers
from app.models.trigger_models import TriggerStatusEnum, TriggerTypeEnum
Expand Down Expand Up @@ -34,18 +34,24 @@ async def call_trigger_graph(trigger: DatabaseTriggers):
x_exosphere_request_id=str(uuid4())
)

async def mark_as_failed(trigger: DatabaseTriggers):
async def mark_as_failed(trigger: DatabaseTriggers, retention_hours: int):
expires_at = datetime.now(timezone.utc) + timedelta(hours=retention_hours)

await DatabaseTriggers.get_pymongo_collection().update_one(
{"_id": trigger.id},
{"$set": {"trigger_status": TriggerStatusEnum.FAILED}}
{"$set": {
"trigger_status": TriggerStatusEnum.FAILED,
"expires_at": expires_at
}}
)

async def create_next_triggers(trigger: DatabaseTriggers, cron_time: datetime):
async def create_next_triggers(trigger: DatabaseTriggers, cron_time: datetime, retention_hours: int):
assert trigger.expression is not None
iter = croniter.croniter(trigger.expression, trigger.trigger_time)

while True:
next_trigger_time = iter.get_next(datetime)
expires_at = next_trigger_time + timedelta(hours=retention_hours)

try:
await DatabaseTriggers(
Expand All @@ -54,7 +60,8 @@ async def create_next_triggers(trigger: DatabaseTriggers, cron_time: datetime):
graph_name=trigger.graph_name,
namespace=trigger.namespace,
trigger_time=next_trigger_time,
trigger_status=TriggerStatusEnum.PENDING
trigger_status=TriggerStatusEnum.PENDING,
expires_at=expires_at
).insert()
except DuplicateKeyError:
logger.error(f"Duplicate trigger found for expression {trigger.expression}")
Expand All @@ -65,24 +72,30 @@ async def create_next_triggers(trigger: DatabaseTriggers, cron_time: datetime):
if next_trigger_time > cron_time:
break

async def mark_as_triggered(trigger: DatabaseTriggers):
async def mark_as_triggered(trigger: DatabaseTriggers, retention_hours: int):
expires_at = datetime.now(timezone.utc) + timedelta(hours=retention_hours)

await DatabaseTriggers.get_pymongo_collection().update_one(
{"_id": trigger.id},
{"$set": {"trigger_status": TriggerStatusEnum.TRIGGERED}}
{"$set": {
"trigger_status": TriggerStatusEnum.TRIGGERED,
"expires_at": expires_at
}}
)

async def handle_trigger(cron_time: datetime):
async def handle_trigger(cron_time: datetime, retention_hours: int):
while(trigger:= await get_due_triggers(cron_time)):
try:
await call_trigger_graph(trigger)
await mark_as_triggered(trigger)
await mark_as_triggered(trigger, retention_hours)
except Exception as e:
await mark_as_failed(trigger)
await mark_as_failed(trigger, retention_hours)
logger.error(f"Error calling trigger graph: {e}")
finally:
await create_next_triggers(trigger, cron_time)
await create_next_triggers(trigger, cron_time, retention_hours)

async def trigger_cron():
cron_time = datetime.now()
settings = get_settings()
logger.info(f"starting trigger_cron: {cron_time}")
await asyncio.gather(*[handle_trigger(cron_time) for _ in range(get_settings().trigger_workers)])
await asyncio.gather(*[handle_trigger(cron_time, settings.trigger_retention_hours) for _ in range(settings.trigger_workers)])
8 changes: 7 additions & 1 deletion state-manager/app/tasks/verify_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,13 @@
from app.singletons.logs_manager import LogsManager
from app.models.trigger_models import TriggerStatusEnum, TriggerTypeEnum
from app.models.db.trigger import DatabaseTriggers
from app.config.settings import get_settings
from datetime import timedelta

logger = LogsManager().get_logger()

settings = get_settings()

async def verify_node_exists(graph_template: GraphTemplate, registered_nodes: list[RegisteredNode]) -> list[str]:
errors = []
template_nodes_set = set([(node.node_name, node.namespace) for node in graph_template.nodes])
Expand Down Expand Up @@ -110,6 +114,7 @@ async def create_crons(graph_template: GraphTemplate):
iter = croniter.croniter(expression, current_time)

next_trigger_time = iter.get_next(datetime)
expires_at = next_trigger_time + timedelta(hours=settings.trigger_retention_hours)

new_db_triggers.append(
DatabaseTriggers(
Expand All @@ -118,7 +123,8 @@ async def create_crons(graph_template: GraphTemplate):
graph_name=graph_template.name,
namespace=graph_template.namespace,
trigger_status=TriggerStatusEnum.PENDING,
trigger_time=next_trigger_time
trigger_time=next_trigger_time,
expires_at=expires_at
)
)

Expand Down
Loading