-
Notifications
You must be signed in to change notification settings - Fork 41
Feature: Add cleanup strategy for old database triggers #433 #460
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -1,4 +1,4 @@ | ||||||||||||||
| from datetime import datetime | ||||||||||||||
| from datetime import datetime, timedelta | ||||||||||||||
| from uuid import uuid4 | ||||||||||||||
| from app.models.db.trigger import DatabaseTriggers | ||||||||||||||
| from app.models.trigger_models import TriggerStatusEnum, TriggerTypeEnum | ||||||||||||||
|
|
@@ -14,6 +14,9 @@ | |||||||||||||
| logger = LogsManager().get_logger() | ||||||||||||||
|
|
||||||||||||||
| async def get_due_triggers(cron_time: datetime) -> DatabaseTriggers | None: | ||||||||||||||
| """ | ||||||||||||||
| Fetch a trigger that is due and mark it as TRIGGERING. | ||||||||||||||
| """ | ||||||||||||||
| data = await DatabaseTriggers.get_pymongo_collection().find_one_and_update( | ||||||||||||||
| { | ||||||||||||||
| "trigger_time": {"$lte": cron_time}, | ||||||||||||||
|
|
@@ -26,23 +29,38 @@ async def get_due_triggers(cron_time: datetime) -> DatabaseTriggers | None: | |||||||||||||
| ) | ||||||||||||||
| return DatabaseTriggers(**data) if data else None | ||||||||||||||
|
|
||||||||||||||
|
|
||||||||||||||
| async def call_trigger_graph(trigger: DatabaseTriggers): | ||||||||||||||
| """ | ||||||||||||||
| Call the associated graph for a trigger. | ||||||||||||||
| """ | ||||||||||||||
| await trigger_graph( | ||||||||||||||
| namespace_name=trigger.namespace, | ||||||||||||||
| graph_name=trigger.graph_name, | ||||||||||||||
| body=TriggerGraphRequestModel(), | ||||||||||||||
| x_exosphere_request_id=str(uuid4()) | ||||||||||||||
| ) | ||||||||||||||
|
|
||||||||||||||
|
|
||||||||||||||
| async def mark_as_failed(trigger: DatabaseTriggers): | ||||||||||||||
| """ | ||||||||||||||
| Mark a trigger as FAILED. | ||||||||||||||
| """ | ||||||||||||||
| await DatabaseTriggers.get_pymongo_collection().update_one( | ||||||||||||||
| {"_id": trigger.id}, | ||||||||||||||
| {"$set": {"trigger_status": TriggerStatusEnum.FAILED}} | ||||||||||||||
| ) | ||||||||||||||
|
|
||||||||||||||
|
|
||||||||||||||
| async def create_next_triggers(trigger: DatabaseTriggers, cron_time: datetime): | ||||||||||||||
| assert trigger.expression is not None | ||||||||||||||
| """ | ||||||||||||||
| Create the next scheduled triggers based on the cron expression. | ||||||||||||||
| """ | ||||||||||||||
| if not trigger.expression: | ||||||||||||||
| return | ||||||||||||||
|
|
||||||||||||||
| iter = croniter.croniter(trigger.expression, trigger.trigger_time) | ||||||||||||||
| ttl_days = getattr(get_settings(), "trigger_ttl_days", 30) # default 30 days | ||||||||||||||
|
|
||||||||||||||
| while True: | ||||||||||||||
| next_trigger_time = iter.get_next(datetime) | ||||||||||||||
|
|
@@ -54,8 +72,10 @@ async def create_next_triggers(trigger: DatabaseTriggers, cron_time: datetime): | |||||||||||||
| graph_name=trigger.graph_name, | ||||||||||||||
| namespace=trigger.namespace, | ||||||||||||||
| trigger_time=next_trigger_time, | ||||||||||||||
| trigger_status=TriggerStatusEnum.PENDING | ||||||||||||||
| trigger_status=TriggerStatusEnum.PENDING, | ||||||||||||||
| expires_at=datetime.utcnow() + timedelta(days=ttl_days) | ||||||||||||||
| ).insert() | ||||||||||||||
|
Comment on lines
+75
to
77
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Prevent future cron triggers from expiring before they run.
- expires_at=datetime.utcnow() + timedelta(days=ttl_days)
+ expires_at=next_trigger_time + timedelta(days=ttl_days)📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||
|
|
||||||||||||||
| except DuplicateKeyError: | ||||||||||||||
| logger.error(f"Duplicate trigger found for expression {trigger.expression}") | ||||||||||||||
| except Exception as e: | ||||||||||||||
|
|
@@ -65,14 +85,22 @@ async def create_next_triggers(trigger: DatabaseTriggers, cron_time: datetime): | |||||||||||||
| if next_trigger_time > cron_time: | ||||||||||||||
| break | ||||||||||||||
|
|
||||||||||||||
|
|
||||||||||||||
| async def mark_as_triggered(trigger: DatabaseTriggers): | ||||||||||||||
| """ | ||||||||||||||
| Mark a trigger as TRIGGERED. | ||||||||||||||
| """ | ||||||||||||||
| await DatabaseTriggers.get_pymongo_collection().update_one( | ||||||||||||||
| {"_id": trigger.id}, | ||||||||||||||
| {"$set": {"trigger_status": TriggerStatusEnum.TRIGGERED}} | ||||||||||||||
| ) | ||||||||||||||
|
|
||||||||||||||
|
|
||||||||||||||
| async def handle_trigger(cron_time: datetime): | ||||||||||||||
| while(trigger:= await get_due_triggers(cron_time)): | ||||||||||||||
| """ | ||||||||||||||
| Handle due triggers one by one. | ||||||||||||||
| """ | ||||||||||||||
| while (trigger := await get_due_triggers(cron_time)): | ||||||||||||||
| try: | ||||||||||||||
| await call_trigger_graph(trigger) | ||||||||||||||
| await mark_as_triggered(trigger) | ||||||||||||||
|
|
@@ -82,7 +110,12 @@ async def handle_trigger(cron_time: datetime): | |||||||||||||
| finally: | ||||||||||||||
| await create_next_triggers(trigger, cron_time) | ||||||||||||||
|
|
||||||||||||||
|
|
||||||||||||||
| async def trigger_cron(): | ||||||||||||||
| cron_time = datetime.now() | ||||||||||||||
| logger.info(f"starting trigger_cron: {cron_time}") | ||||||||||||||
| await asyncio.gather(*[handle_trigger(cron_time) for _ in range(get_settings().trigger_workers)]) | ||||||||||||||
| """ | ||||||||||||||
| Main loop for cron trigger handling. | ||||||||||||||
| """ | ||||||||||||||
| cron_time = datetime.utcnow() | ||||||||||||||
| logger.info(f"Starting trigger_cron: {cron_time}") | ||||||||||||||
| workers = getattr(get_settings(), "trigger_workers", 1) | ||||||||||||||
| await asyncio.gather(*[handle_trigger(cron_time) for _ in range(workers)]) | ||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,32 @@ | ||
| # Byte-compiled / optimized / DLL files | ||
| __pycache__/ | ||
| *.py[cod] | ||
| *.pyo | ||
| *.pyd | ||
| *.so | ||
|
|
||
| # C extensions | ||
| *.egg | ||
| *.egg-info | ||
| .eggs | ||
|
|
||
| # Virtual environments | ||
| .venv/ | ||
|
|
||
| # Git, CI and editor config | ||
| .git/ | ||
| .github/ | ||
| .vscode/ | ||
|
|
||
| # Logs and temp files | ||
| *.log | ||
|
|
||
| # Other | ||
| .env | ||
| Dockerfile | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧹 Nitpick | 🔵 Trivial 🧩 Analysis chainTrailing space in pattern; confirm intent to ignore Dockerfile. “Dockerfile ” has a trailing space, likely not matching the Dockerfile. Also, ignoring the Dockerfile is usually not desired. Option A (keep ignoring, fix whitespace): -Dockerfile
+DockerfileOption B (recommended): do not ignore the Dockerfile; remove the line. -Dockerfile 🏁 Script executed: #!/bin/bash
sed -n '20,30p' state_manager/.dockerignore
find state_manager -maxdepth 1 -type f -name 'Dockerfile'Length of output: 122 Remove unnecessary whitespace and redundant patterns in .dockerignore
🤖 Prompt for AI Agents |
||
| tests/ | ||
| pytest.ini | ||
| .pytest_cache/ | ||
| .coverage | ||
| .coverage.* | ||
| coverage.xml | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,66 @@ | ||||||||||||||||||||||||||||||
| # Python | ||||||||||||||||||||||||||||||
| __pycache__/ | ||||||||||||||||||||||||||||||
| *.py[cod] | ||||||||||||||||||||||||||||||
| *$py.class | ||||||||||||||||||||||||||||||
| *.so | ||||||||||||||||||||||||||||||
| .Python | ||||||||||||||||||||||||||||||
| build/ | ||||||||||||||||||||||||||||||
| develop-eggs/ | ||||||||||||||||||||||||||||||
| dist/ | ||||||||||||||||||||||||||||||
| downloads/ | ||||||||||||||||||||||||||||||
| eggs/ | ||||||||||||||||||||||||||||||
| .eggs/ | ||||||||||||||||||||||||||||||
| lib/ | ||||||||||||||||||||||||||||||
| lib64/ | ||||||||||||||||||||||||||||||
| parts/ | ||||||||||||||||||||||||||||||
| sdist/ | ||||||||||||||||||||||||||||||
| var/ | ||||||||||||||||||||||||||||||
| wheels/ | ||||||||||||||||||||||||||||||
| *.egg-info/ | ||||||||||||||||||||||||||||||
| .installed.cfg | ||||||||||||||||||||||||||||||
| *.egg | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| # Virtual Environment | ||||||||||||||||||||||||||||||
| venv/ | ||||||||||||||||||||||||||||||
| env/ | ||||||||||||||||||||||||||||||
| ENV/ | ||||||||||||||||||||||||||||||
| .env | ||||||||||||||||||||||||||||||
| .venv/ | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| # IDE | ||||||||||||||||||||||||||||||
| .vscode/ | ||||||||||||||||||||||||||||||
| *.swp | ||||||||||||||||||||||||||||||
| *.swo | ||||||||||||||||||||||||||||||
| .idea/ | ||||||||||||||||||||||||||||||
| *.iws | ||||||||||||||||||||||||||||||
| *.iml | ||||||||||||||||||||||||||||||
| *.ipr | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧹 Nitpick | 🔵 Trivial Add common Python tool/cache/coverage ignores. These are standard and will reduce accidental noise in diffs. +
+# Test & coverage caches
+.pytest_cache/
+.mypy_cache/
+.ruff_cache/
+.coverage
+.coverage.*
+htmlcov/
+.tox/
+.nox/
+
+# Python tooling
+.python-version
+pip-wheel-metadata/📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||
| # Local development | ||||||||||||||||||||||||||||||
| .env.local | ||||||||||||||||||||||||||||||
| .env.development.local | ||||||||||||||||||||||||||||||
| .env.test.local | ||||||||||||||||||||||||||||||
| .env.production.local | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| # Database | ||||||||||||||||||||||||||||||
| *.db | ||||||||||||||||||||||||||||||
| *.sqlite3 | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| # OS generated files | ||||||||||||||||||||||||||||||
| .DS_Store | ||||||||||||||||||||||||||||||
| .DS_Store? | ||||||||||||||||||||||||||||||
| ._* | ||||||||||||||||||||||||||||||
| .Spotlight-V100 | ||||||||||||||||||||||||||||||
| .Trashes | ||||||||||||||||||||||||||||||
| ehthumbs.db | ||||||||||||||||||||||||||||||
| Thumbs.db | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| #logs | ||||||||||||||||||||||||||||||
| *.log | ||||||||||||||||||||||||||||||
| logs/*.* | ||||||||||||||||||||||||||||||
| !logs/.gitkeep | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
|
Comment on lines
+59
to
+63
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion | 🟠 Major Make logs ignore recursive and robust.
-#logs
-*.log
-logs/*.*
-!logs/.gitkeep
+# Logs
+*.log
+logs/**
+!logs/.gitkeep📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||
| # local files | ||||||||||||||||||||||||||||||
| files/ | ||||||||||||||||||||||||||||||
| !files/.gitkeep | ||||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧹 Nitpick | 🔵 Trivial Add trailing newline at EOF. POSIX-style text files should end with a newline. Improves diffs and tooling compatibility. 🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| 3.12 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,14 @@ | ||
| FROM python:3.12-slim-bookworm | ||
| COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/ | ||
|
|
||
| WORKDIR /api-server | ||
|
|
||
| COPY pyproject.toml uv.lock ./ | ||
|
|
||
| RUN uv sync --locked | ||
|
|
||
| COPY . . | ||
|
|
||
| EXPOSE 8000 | ||
|
|
||
| CMD ["uv", "run", "run.py", "--mode", "production", "--workers", "4"] | ||
|
Comment on lines
+1
to
+14
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧹 Nitpick | 🔵 Trivial Add HEALTHCHECK and create a non-root user for production security. The Dockerfile is missing two important production-ready features:
Apply this diff to add both features: FROM python:3.12-slim-bookworm
COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/
WORKDIR /api-server
COPY pyproject.toml uv.lock ./
RUN uv sync --locked
COPY . .
+# Create a non-root user and switch to it
+RUN useradd -m -u 1000 apiserver && chown -R apiserver:apiserver /api-server
+USER apiserver
+
EXPOSE 8000
+# Add health check (adjust path/port as needed)
+HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
+ CMD python -c "import requests; requests.get('http://localhost:8000/health', timeout=2)" || exit 1
+
CMD ["uv", "run", "run.py", "--mode", "production", "--workers", "4"]Note: The health check command assumes:
Alternatively, use a simpler health check with HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1(This requires installing 🧰 Tools🪛 Checkov (3.2.334)[low] 1-14: Ensure that HEALTHCHECK instructions have been added to container images (CKV_DOCKER_2) [low] 1-14: Ensure that a user for the container has been created (CKV_DOCKER_3) 🤖 Prompt for AI Agents |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| # Config module for State Manager |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,47 @@ | ||
| """ | ||
| CORS configuration for the State Manager API | ||
| """ | ||
| import os | ||
| from typing import List | ||
|
|
||
| def get_cors_origins() -> List[str]: | ||
| """ | ||
| Get CORS origins from environment variables or use defaults | ||
| """ | ||
| # Get origins from environment variable | ||
| cors_origins = os.getenv("CORS_ORIGINS", "") | ||
|
|
||
| if cors_origins: | ||
| # Split by comma and strip whitespace | ||
| return [origin.strip() for origin in cors_origins.split(",") if origin.strip()] | ||
|
|
||
| # Default origins for development | ||
| return [ | ||
| "http://localhost:3000", # Next.js frontend | ||
| "http://localhost:3001", # Alternative frontend port | ||
| "http://127.0.0.1:3000", # Alternative localhost | ||
| "http://127.0.0.1:3001", # Alternative localhost port | ||
| ] | ||
|
|
||
| def get_cors_config(): | ||
| """ | ||
| Get CORS configuration | ||
| """ | ||
| return { | ||
| "allow_origins": get_cors_origins(), | ||
| "allow_credentials": True, | ||
| "allow_methods": ["GET", "POST", "PUT", "DELETE", "OPTIONS", "PATCH"], | ||
| "allow_headers": [ | ||
| "Accept", | ||
| "Accept-Language", | ||
| "Content-Language", | ||
| "Content-Type", | ||
| "X-API-Key", | ||
| "Authorization", | ||
| "X-Requested-With", | ||
| "X-Exosphere-Request-ID", | ||
| ], | ||
| "expose_headers": [ | ||
| "X-Exosphere-Request-ID", | ||
| ], | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,48 @@ | ||
| import os | ||
| from pydantic import BaseModel, Field | ||
| from dotenv import load_dotenv | ||
|
|
||
| load_dotenv() | ||
|
|
||
| class Settings(BaseModel): | ||
| """Application settings loaded from environment variables.""" | ||
|
|
||
| # MongoDB Configuration | ||
| mongo_uri: str = Field(..., description="MongoDB connection URI") | ||
| mongo_database_name: str = Field(default="exosphere-state-manager", description="MongoDB database name") | ||
| state_manager_secret: str = Field(..., description="Secret key for API authentication") | ||
| secrets_encryption_key: str = Field(..., description="Key for encrypting secrets") | ||
| trigger_workers: int = Field(default=1, description="Number of workers to run the trigger cron") | ||
|
|
||
| # Cleanup / Retention Configs | ||
| trigger_retention_days: int = Field(default=30, description="How many days to retain old triggers") | ||
| cleanup_interval_minutes: int = Field(default=60, description="Interval (minutes) between cleanup runs") | ||
|
|
||
| @classmethod | ||
| def from_env(cls) -> "Settings": | ||
| return cls( | ||
| mongo_uri=os.getenv("MONGO_URI"), # type: ignore | ||
| mongo_database_name=os.getenv("MONGO_DATABASE_NAME", "exosphere-state-manager"), # type: ignore | ||
| state_manager_secret=os.getenv("STATE_MANAGER_SECRET"), # type: ignore | ||
| secrets_encryption_key=os.getenv("SECRETS_ENCRYPTION_KEY"), # type: ignore | ||
| trigger_workers=int(os.getenv("TRIGGER_WORKERS", 1)), # type: ignore | ||
|
|
||
| # NEW CONFIGS | ||
| trigger_retention_days=int(os.getenv("TRIGGER_RETENTION_DAYS", 30)), # type: ignore | ||
| cleanup_interval_minutes=int(os.getenv("CLEANUP_INTERVAL_MINUTES", 60)) # type: ignore | ||
| ) | ||
|
|
||
|
|
||
| # Global settings instance - will be updated when get_settings() is called | ||
| _settings = None | ||
|
|
||
|
|
||
| def get_settings() -> Settings: | ||
| """Get the global settings instance, reloading from environment if needed.""" | ||
| global _settings | ||
| _settings = Settings.from_env() | ||
| return _settings | ||
|
|
||
|
|
||
| # Initialize settings | ||
| settings = get_settings() |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,13 @@ | ||
| from app.config.settings import get_settings | ||
|
|
||
| def test_settings(): | ||
| settings = get_settings() | ||
| print("✅ Loaded Settings:") | ||
| print("Mongo URI:", settings.mongo_uri) | ||
| print("Database:", settings.mongo_database_name) | ||
| print("Trigger Workers:", settings.trigger_workers) | ||
| print("Trigger Retention Days:", settings.trigger_retention_days) | ||
| print("Cleanup Interval (minutes):", settings.cleanup_interval_minutes) | ||
|
|
||
| if __name__ == "__main__": | ||
| test_settings() | ||
|
Comment on lines
+1
to
+13
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧹 Nitpick | 🔵 Trivial Clarify the purpose of this file and consider relocation or renaming. This file has a This can cause confusion:
Consider one of the following:
from app.config.settings import get_settings
def test_settings_loaded():
"""Verify settings can be loaded from environment."""
settings = get_settings()
assert settings.mongo_uri is not None
assert settings.mongo_database_name is not None
assert settings.trigger_workers > 0
assert settings.trigger_retention_days > 0
assert settings.cleanup_interval_minutes > 0 |
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
DatabaseTriggersmodel does not have anexpires_atfield. Adding it here will cause apydantic.ValidationErrorat runtime whenDatabaseTriggersis instantiated. The cleanup logic implemented in this PR uses thetrigger_timefield, so thisexpires_atfield seems unnecessary and will break the creation of new triggers. If a TTL index is intended, the model and database index configuration need to be updated accordingly.