From cd257ea085362bd1ee6f09e222df203354a6a6f3 Mon Sep 17 00:00:00 2001 From: NiveditJain Date: Mon, 29 Sep 2025 09:45:22 +0530 Subject: [PATCH 1/9] Enhance documentation and add triggers functionality - Updated README.md to include new "Triggers" feature for automatic scheduling of workflows using cron expressions. - Added "Triggers" section in mkdocs.yml for navigation. - Expanded concepts documentation to explain triggers and their lifecycle. - Introduced examples for implementing triggers in graph templates and Python SDK. - Created a new triggers.md file detailing trigger usage and common cron expressions. - Updated StateManager and related models to support triggers for automatic graph execution. --- README.md | 25 ++- docs/docs/exosphere/concepts.md | 9 +- docs/docs/exosphere/create-graph.md | 1 + docs/docs/exosphere/graph-components.md | 36 +++- docs/docs/exosphere/python-sdk-graph.md | 10 +- docs/docs/exosphere/triggers.md | 207 +++++++++++++++++++++++ docs/mkdocs.yml | 3 +- python-sdk/README.md | 10 +- python-sdk/exospherehost/models.py | 2 +- python-sdk/exospherehost/statemanager.py | 3 + 10 files changed, 298 insertions(+), 8 deletions(-) create mode 100644 docs/docs/exosphere/triggers.md diff --git a/README.md b/README.md index f16e6eed..1dcd5da0 100644 --- a/README.md +++ b/README.md @@ -28,7 +28,7 @@ Exosphere provides a powerful foundation for building and orchestrating AI appli - **Infinite Parallel Agents**: Run multiple AI agents simultaneously across distributed infrastructure - **Dynamic State Management**: Create and manage state at runtime with persistent storage - **Fault Tolerance**: Built-in failure handling and recovery mechanisms for production reliability -- **Core Concepts**: Fanout, Unite, Signals, Retry Policy, Store +- **Core Concepts**: Fanout, Unite, Signals, Retry Policy, Store, Triggers ### **Smooth Developer Experience** - **Plug-and-Play Nodes**: Create reusable, atomic workflow components that can be mixed and matched @@ -79,6 +79,29 @@ Exosphere is built on a flexible, node-based architecture that makes it easy to - **Signals**: Inter-node communication and event handling - **Retry Policy**: Configurable failure handling and recovery - **Store**: Persistent storage for workflow state and data +- **Triggers**: Automatic scheduling with cron expressions + +## ⏰ Automatic Scheduling Example + +Schedule your workflows to run automatically using cron expressions: + +```python +from exospherehost import StateManager, GraphNodeModel, CronTrigger + +# Define triggers for automatic execution +triggers = [ + CronTrigger(expression="0 9 * * 1-5"), # Every weekday at 9 AM + CronTrigger(expression="0 */6 * * *") # Every 6 hours +] + +# Create graph with automatic scheduling +result = await state_manager.upsert_graph( + graph_name="data-pipeline", + graph_nodes=graph_nodes, + secrets={"api_key": "your-key"}, + triggers=triggers # Enable automatic execution +) +``` ### **Deployment Options** diff --git a/docs/docs/exosphere/concepts.md b/docs/docs/exosphere/concepts.md index 6507e5b9..0a8da308 100644 --- a/docs/docs/exosphere/concepts.md +++ b/docs/docs/exosphere/concepts.md @@ -53,6 +53,11 @@ graph TB - **Runtime Access**: All nodes can read and write to the shared store - **Automatic Cleanup**: Store data is automatically cleaned up when workflows complete +### 7. **Automatic Triggers** +- **Cron Scheduling**: Schedule automatic graph execution using standard cron expressions +- **Unattended Operation**: Workflows run automatically without manual intervention +- **Multiple Schedules**: Each graph can have multiple triggers with different schedules + ## How They Work Together These concepts combine to create a powerful workflow system: @@ -63,6 +68,7 @@ These concepts combine to create a powerful workflow system: 4. **Signals** give nodes control over execution flow and error handling 5. **Retry policies** ensure resilience against transient failures 6. **Store** provides persistent state across workflow executions +7. **Triggers** enable automatic, scheduled execution for unattended operations ## Benefits @@ -80,4 +86,5 @@ Explore each concept in detail: - **[Unite](./unite.md)** - Understand synchronization of parallel paths - **[Signals](./signals.md)** - Control workflow execution flow - **[Retry Policy](./retry-policy.md)** - Build resilient workflows -- **[Store](./store.md)** - Persist data across workflow execution +- **[Store](./store.md)** - Persist data across workflow execution +- **[Triggers](./triggers.md)** - Schedule automatic graph execution diff --git a/docs/docs/exosphere/create-graph.md b/docs/docs/exosphere/create-graph.md index da497d12..11f5d9b9 100644 --- a/docs/docs/exosphere/create-graph.md +++ b/docs/docs/exosphere/create-graph.md @@ -11,6 +11,7 @@ A graph template consists of: - **Input Mapping**: How data flows between nodes using `${{ ... }}` syntax - **Retry Policy**: `Optional` failure handling configuration - **Store Configuration**: `Optional` graph-level key-value store +- **Triggers**: `Optional` cron-based automatic execution scheduling ## Basic Example of a Graph Template diff --git a/docs/docs/exosphere/graph-components.md b/docs/docs/exosphere/graph-components.md index 0431671a..2779eae5 100644 --- a/docs/docs/exosphere/graph-components.md +++ b/docs/docs/exosphere/graph-components.md @@ -97,10 +97,44 @@ Graph-level key-value storage for shared state: - `required_keys`: Keys that must be present in the store - `default_values`: Default values for store keys +## 6. Triggers + +Schedule automatic graph execution using cron expressions: + +```json +{ + "triggers": [ + { + "type": "CRON", + "value": { + "expression": "0 9 * * 1-5" + } + }, + { + "type": "CRON", + "value": { + "expression": "0 */6 * * *" + } + } + ] +} +``` + +**Fields:** +- `type`: Currently only "CRON" is supported +- `value.expression`: Standard cron expression (5-field format) + +**Common Cron Expressions:** +- `"0 9 * * 1-5"` - Every weekday at 9:00 AM +- `"0 */6 * * *"` - Every 6 hours +- `"0 0 * * 0"` - Every Sunday at midnight +- `"*/15 * * * *"` - Every 15 minutes + ## Next Steps - **[Create Graph](./create-graph.md)** - Return to main guide -- **[Graph Models](./python-sdk-graph.md)** - Use Python SDK for type-safe graph creation. +- **[Graph Models](./python-sdk-graph.md)** - Use Python SDK for type-safe graph creation +- **[Triggers](./triggers.md)** - Schedule automatic graph execution ## Related Concepts diff --git a/docs/docs/exosphere/python-sdk-graph.md b/docs/docs/exosphere/python-sdk-graph.md index 51bc649c..5932cd2f 100644 --- a/docs/docs/exosphere/python-sdk-graph.md +++ b/docs/docs/exosphere/python-sdk-graph.md @@ -5,7 +5,7 @@ Use the Exosphere Python SDK with Pydantic models for type-safe graph creation. ## Basic Usage ```python hl_lines="42-48" -from exospherehost import StateManager, GraphNodeModel, RetryPolicyModel, StoreConfigModel, RetryStrategyEnum +from exospherehost import StateManager, GraphNodeModel, RetryPolicyModel, StoreConfigModel, CronTrigger, RetryStrategyEnum async def create_graph(): state_manager = StateManager( @@ -46,12 +46,18 @@ async def create_graph(): } ) + triggers = [ + CronTrigger(expression="0 9 * * 1-5"), # Every weekday at 9 AM + CronTrigger(expression="0 */6 * * *") # Every 6 hours + ] + result = await state_manager.upsert_graph( graph_name="my-workflow", graph_nodes=graph_nodes, secrets={"api_key": "your-key"}, retry_policy=retry_policy, - store_config=store_config + store_config=store_config, + triggers=triggers ) return result ``` diff --git a/docs/docs/exosphere/triggers.md b/docs/docs/exosphere/triggers.md new file mode 100644 index 00000000..e97bf5c9 --- /dev/null +++ b/docs/docs/exosphere/triggers.md @@ -0,0 +1,207 @@ +# Triggers + +Triggers allow you to schedule automatic execution of your graphs using cron expressions. When a trigger is defined, Exosphere will automatically execute your graph at the specified times without requiring manual intervention. + +## Overview + +Triggers provide **scheduled execution** for your workflows, enabling automation for: + +- **Regular data processing** (daily reports, hourly syncs) +- **Maintenance tasks** (cleanup jobs, backups) +- **Monitoring workflows** (health checks, alerts) +- **Business processes** (invoice generation, notifications) + +```mermaid +graph TB + A[Cron Trigger] --> B[Scheduled Time Reached] + B --> C[Graph Execution Triggered] + C --> D[Workflow Runs Automatically] + + E[Multiple Triggers] -.->|Different schedules| A + + style A fill:#e8f5e8 + style E fill:#e8f5e8 +``` + +## How Triggers Work + +### Trigger Lifecycle + +1. **Definition**: Triggers are defined when creating/updating a graph template +2. **Scheduling**: Exosphere schedules the next execution based on cron expression +3. **Execution**: At the scheduled time, the graph is triggered automatically +4. **Rescheduling**: After execution, the next occurrence is automatically scheduled + +### Trigger Types + +Currently, **CRON** is the only supported trigger type, using standard 5-field cron expressions: + +``` +* * * * * +│ │ │ │ │ +│ │ │ │ └── Day of Week (0-7, Sunday = 0 or 7) +│ │ │ └──── Month (1-12) +│ │ └────── Day of Month (1-31) +│ └──────── Hour (0-23) +└────────── Minute (0-59) +``` + +## Implementation + +### JSON Configuration + +Define triggers in your graph template: + +```json +{ + "triggers": [ + { + "type": "CRON", + "value": { + "expression": "0 9 * * 1-5" + } + }, + { + "type": "CRON", + "value": { + "expression": "0 0 * * 0" + } + } + ], + "nodes": [ + // ... your graph nodes + ] +} +``` + +### Python SDK Example + +```python +from exospherehost import StateManager, GraphNodeModel, CronTrigger + +async def create_scheduled_graph(): + state_manager = StateManager( + namespace="DataPipeline", + state_manager_uri=EXOSPHERE_STATE_MANAGER_URI, + key=EXOSPHERE_API_KEY + ) + + # Define your graph nodes + graph_nodes = [ + GraphNodeModel( + node_name="DataExtractorNode", + namespace="DataPipeline", + identifier="extractor", + inputs={"source": "initial"}, + next_nodes=["processor"] + ), + GraphNodeModel( + node_name="DataProcessorNode", + namespace="DataPipeline", + identifier="processor", + inputs={"raw_data": "${{ extractor.outputs.data }}"}, + next_nodes=[] + ) + ] + + # Define triggers for automatic execution + triggers = [ + CronTrigger(expression="0 2 * * *"), # Daily at 2:00 AM + CronTrigger(expression="0 */4 * * *") # Every 4 hours + ] + + # Create the graph with triggers + result = await state_manager.upsert_graph( + graph_name="data-pipeline", + graph_nodes=graph_nodes, + secrets={"api_key": "your-api-key"}, + triggers=triggers + ) + + print(f"Graph created with {len(triggers)} triggers") + return result + +# Run the function +import asyncio +asyncio.run(create_scheduled_graph()) +``` + +## Common Cron Expressions + +### Basic Patterns + +| Expression | Description | Example Use Case | +|------------|-------------|------------------| +| `"0 9 * * 1-5"` | Every weekday at 9:00 AM | Business reports | +| `"0 */6 * * *"` | Every 6 hours | Data synchronization | +| `"0 0 * * 0"` | Every Sunday at midnight | Weekly cleanup | +| `"*/15 * * * *"` | Every 15 minutes | Health checks | +| `"0 2 * * *"` | Daily at 2:00 AM | Nightly batch jobs | +| `"0 0 1 * *"` | First day of every month | Monthly reports | + +### Advanced Patterns + +| Expression | Description | +|------------|-------------| +| `"0 9-17 * * 1-5"` | Every hour from 9 AM to 5 PM, weekdays only | +| `"0 0 * * 1,3,5"` | Monday, Wednesday, Friday at midnight | +| `"30 8 * * 1-5"` | Weekdays at 8:30 AM | +| `"0 12 1,15 * *"` | 1st and 15th of each month at noon | +| `"0 0 1 1,7 *"` | January 1st and July 1st at midnight | + +## Best Practices + +### Scheduling Considerations + +1. **Avoid Peak Times**: Schedule resource-intensive workflows during off-peak hours +2. **Stagger Executions**: If you have multiple graphs, stagger their execution times +3. **Consider Time Zones**: Cron expressions use server time (UTC by default) +4. **Resource Planning**: Ensure your infrastructure can handle scheduled workloads + +### Error Handling + +- **Retry Policies**: Combine triggers with retry policies for resilient automation +- **Monitoring**: Set up alerts for failed scheduled executions +- **Logging**: Ensure adequate logging for troubleshooting scheduled runs + +### Example with Retry Policy + +```python +from exospherehost import RetryPolicyModel, RetryStrategyEnum + +# Define retry policy for scheduled executions +retry_policy = RetryPolicyModel( + max_retries=3, + strategy=RetryStrategyEnum.EXPONENTIAL, + backoff_factor=2000, + exponent=2 +) + +# Create graph with both triggers and retry policy +result = await state_manager.upsert_graph( + graph_name="robust-pipeline", + graph_nodes=graph_nodes, + secrets=secrets, + triggers=triggers, + retry_policy=retry_policy # Handles failures in scheduled runs +) +``` + +## Limitations + +- **CRON Only**: Currently only cron-based scheduling is supported +- **No Manual Override**: Scheduled executions cannot be manually cancelled once triggered +- **Time Zone**: All cron expressions are evaluated in server time (UTC) +- **Minimum Interval**: Avoid scheduling more frequently than every minute + +## Next Steps + +- **[Create Graph](./create-graph.md)** - Learn about graph creation +- **[Retry Policy](./retry-policy.md)** - Add resilience to scheduled executions +- **[Store](./store.md)** - Persist data across scheduled runs +- **[Dashboard](./dashboard.md)** - Monitor scheduled executions + +## Related Concepts + +- **[Graph Components](./graph-components.md)** - Complete overview of graph features +- **[Python SDK](./python-sdk-graph.md)** - Type-safe graph creation with triggers \ No newline at end of file diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index 9ffebd59..81b4f2a4 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -150,7 +150,8 @@ nav: - Unite: exosphere/unite.md - Signals: exosphere/signals.md - Retry Policy: exosphere/retry-policy.md - - Store: exosphere/store.md + - Store: exosphere/store.md + - Triggers: exosphere/triggers.md - Local Setup: - Overview: exosphere/local-setup.md - Docker Compose: docker-compose-setup.md diff --git a/python-sdk/README.md b/python-sdk/README.md index 43719f3b..894994ea 100644 --- a/python-sdk/README.md +++ b/python-sdk/README.md @@ -274,7 +274,7 @@ state_manager = StateManager(namespace="MyProject") #### Creating/Updating Graph Definitions (Beta) ```python -from exospherehost import StateManager, GraphNodeModel, RetryPolicyModel, StoreConfigModel, RetryStrategyEnum +from exospherehost import StateManager, GraphNodeModel, RetryPolicyModel, StoreConfigModel, RetryStrategyEnum, CronTrigger async def create_graph(): state_manager = StateManager(namespace="MyProject") @@ -320,6 +320,12 @@ async def create_graph(): } ) + # Define triggers for automatic execution + triggers = [ + CronTrigger(expression="0 9 * * 1-5"), # Every weekday at 9 AM + CronTrigger(expression="0 */6 * * *") # Every 6 hours + ] + # Create or update the graph (beta) result = await state_manager.upsert_graph( graph_name="my-workflow", @@ -330,6 +336,7 @@ async def create_graph(): }, retry_policy=retry_policy, # beta store_config=store_config, # beta + triggers=triggers, # automatic scheduling validation_timeout=60, polling_interval=1 ) @@ -345,6 +352,7 @@ async def create_graph(): - `secrets` (dict[str, str]): Key/value secrets available to all nodes - `retry_policy` (RetryPolicyModel | None): Optional retry policy configuration (beta) - `store_config` (StoreConfigModel | None): Graph-level store configuration (beta) +- `triggers` (list[CronTrigger] | None): Optional list of cron triggers for automatic graph execution - `validation_timeout` (int): Seconds to wait for validation (default: 60) - `polling_interval` (int): Polling interval in seconds (default: 1) diff --git a/python-sdk/exospherehost/models.py b/python-sdk/exospherehost/models.py index 78fabeba..92ed3381 100644 --- a/python-sdk/exospherehost/models.py +++ b/python-sdk/exospherehost/models.py @@ -160,4 +160,4 @@ def validate_default_values(cls, v: dict[str, str]) -> dict[str, str]: return normalized_dict class CronTrigger(BaseModel): - expression: str = Field(..., description="Cron expression") \ No newline at end of file + expression: str = Field(..., description="Cron expression for scheduling automatic graph execution. Uses standard 5-field format: minute hour day-of-month month day-of-week. Example: '0 9 * * 1-5' for weekdays at 9 AM.") \ No newline at end of file diff --git a/python-sdk/exospherehost/statemanager.py b/python-sdk/exospherehost/statemanager.py index 85e94130..13cf6890 100644 --- a/python-sdk/exospherehost/statemanager.py +++ b/python-sdk/exospherehost/statemanager.py @@ -143,6 +143,9 @@ async def upsert_graph(self, graph_name: str, graph_nodes: list[GraphNodeModel], retry_policy (RetryPolicyModel | None): Optional per-node retry policy configuration. store_config (StoreConfigModel | None): Beta configuration for the graph-level store (schema is subject to change). + triggers (list[CronTrigger] | None): Optional list of cron triggers for automatic + graph execution. Each trigger contains a cron expression that schedules when + the graph should be executed automatically. validation_timeout (int): Seconds to wait for validation (default 60). polling_interval (int): Polling interval in seconds (default 1). From 706aa0c880002318291e125e25008b29d55c12b7 Mon Sep 17 00:00:00 2001 From: NiveditJain Date: Mon, 29 Sep 2025 09:49:33 +0530 Subject: [PATCH 2/9] Update README.md to clarify triggers feature versioning - Revised the description of the `triggers` parameter in the README to indicate its beta status and specify the SDK version (0.0.3b1) associated with this feature. This enhances clarity for users regarding the current state of the triggers functionality. --- python-sdk/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python-sdk/README.md b/python-sdk/README.md index 894994ea..4d969cf1 100644 --- a/python-sdk/README.md +++ b/python-sdk/README.md @@ -352,7 +352,7 @@ async def create_graph(): - `secrets` (dict[str, str]): Key/value secrets available to all nodes - `retry_policy` (RetryPolicyModel | None): Optional retry policy configuration (beta) - `store_config` (StoreConfigModel | None): Graph-level store configuration (beta) -- `triggers` (list[CronTrigger] | None): Optional list of cron triggers for automatic graph execution +- `triggers` (list[CronTrigger] | None): Optional list of cron triggers for automatic graph execution (beta: SDK version 0.0.3b1) - `validation_timeout` (int): Seconds to wait for validation (default: 60) - `polling_interval` (int): Polling interval in seconds (default: 1) From 4f31163eec98af88e42864afd4f3123731fcd2fd Mon Sep 17 00:00:00 2001 From: NiveditJain Date: Mon, 29 Sep 2025 09:49:36 +0530 Subject: [PATCH 3/9] adding beta status --- README.md | 5 ++++- docs/docs/exosphere/concepts.md | 1 + docs/docs/exosphere/graph-components.md | 3 +++ docs/docs/exosphere/python-sdk-graph.md | 2 +- docs/docs/exosphere/triggers.md | 5 ++++- 5 files changed, 13 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 1dcd5da0..58620a0e 100644 --- a/README.md +++ b/README.md @@ -85,6 +85,9 @@ Exosphere is built on a flexible, node-based architecture that makes it easy to Schedule your workflows to run automatically using cron expressions: +!!! info "Beta Feature" + Available in `beta-latest` Docker tag and SDK version `0.0.3b1` + ```python from exospherehost import StateManager, GraphNodeModel, CronTrigger @@ -99,7 +102,7 @@ result = await state_manager.upsert_graph( graph_name="data-pipeline", graph_nodes=graph_nodes, secrets={"api_key": "your-key"}, - triggers=triggers # Enable automatic execution + triggers=triggers # Enable automatic execution (Beta) ) ``` diff --git a/docs/docs/exosphere/concepts.md b/docs/docs/exosphere/concepts.md index 0a8da308..61f441e5 100644 --- a/docs/docs/exosphere/concepts.md +++ b/docs/docs/exosphere/concepts.md @@ -57,6 +57,7 @@ graph TB - **Cron Scheduling**: Schedule automatic graph execution using standard cron expressions - **Unattended Operation**: Workflows run automatically without manual intervention - **Multiple Schedules**: Each graph can have multiple triggers with different schedules +- **Beta**: Available in `beta-latest` Docker tag and SDK version `0.0.3b1` ## How They Work Together diff --git a/docs/docs/exosphere/graph-components.md b/docs/docs/exosphere/graph-components.md index 2779eae5..347d4525 100644 --- a/docs/docs/exosphere/graph-components.md +++ b/docs/docs/exosphere/graph-components.md @@ -101,6 +101,9 @@ Graph-level key-value storage for shared state: Schedule automatic graph execution using cron expressions: +!!! info "Beta Feature" + Available in `beta-latest` Docker tag and SDK version `0.0.3b1` + ```json { "triggers": [ diff --git a/docs/docs/exosphere/python-sdk-graph.md b/docs/docs/exosphere/python-sdk-graph.md index 5932cd2f..f97a360c 100644 --- a/docs/docs/exosphere/python-sdk-graph.md +++ b/docs/docs/exosphere/python-sdk-graph.md @@ -57,7 +57,7 @@ async def create_graph(): secrets={"api_key": "your-key"}, retry_policy=retry_policy, store_config=store_config, - triggers=triggers + triggers=triggers # Beta: SDK version 0.0.3b1 ) return result ``` diff --git a/docs/docs/exosphere/triggers.md b/docs/docs/exosphere/triggers.md index e97bf5c9..b5b2c697 100644 --- a/docs/docs/exosphere/triggers.md +++ b/docs/docs/exosphere/triggers.md @@ -1,5 +1,8 @@ # Triggers +!!! warning "Beta Feature" + Triggers functionality is currently in beta and available under the `beta-latest` Docker tag and SDK version `0.0.3b1`. The API may change in future versions. + Triggers allow you to schedule automatic execution of your graphs using cron expressions. When a trigger is defined, Exosphere will automatically execute your graph at the specified times without requiring manual intervention. ## Overview @@ -204,4 +207,4 @@ result = await state_manager.upsert_graph( ## Related Concepts - **[Graph Components](./graph-components.md)** - Complete overview of graph features -- **[Python SDK](./python-sdk-graph.md)** - Type-safe graph creation with triggers \ No newline at end of file +- **[Python SDK](./python-sdk-graph.md)** - Type-safe graph creation with triggers \ No newline at end of file From ca7ac50757291033c9306a813781c26dc6357782 Mon Sep 17 00:00:00 2001 From: Nivedit Jain Date: Mon, 29 Sep 2025 09:53:40 +0530 Subject: [PATCH 4/9] Update docs/docs/exosphere/graph-components.md Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- docs/docs/exosphere/graph-components.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/docs/exosphere/graph-components.md b/docs/docs/exosphere/graph-components.md index 347d4525..de596d2d 100644 --- a/docs/docs/exosphere/graph-components.md +++ b/docs/docs/exosphere/graph-components.md @@ -114,7 +114,7 @@ Schedule automatic graph execution using cron expressions: } }, { - "type": "CRON", + "type": "CRON", "value": { "expression": "0 */6 * * *" } From 0d0700ee166cfdeaa4cbdaeec0623edbc13efe2e Mon Sep 17 00:00:00 2001 From: NiveditJain Date: Mon, 29 Sep 2025 10:28:11 +0530 Subject: [PATCH 5/9] Refactor trigger handling and improve error management in graph template processing - Updated the `upsert_graph_template` function to delete pending cron triggers when old triggers are present, enhancing trigger management. - Modified the `create_next_triggers` function in `trigger_cron.py` to handle duplicate key errors gracefully, ensuring robust trigger creation. - Streamlined the `verify_graph` function by removing the old triggers parameter and directly creating new cron triggers, improving clarity and performance. - Enhanced error handling in the trigger processing logic to log errors and prevent failures from interrupting the workflow. --- .../app/controller/upsert_graph_template.py | 13 ++++++- state-manager/app/tasks/trigger_cron.py | 36 +++++++++++++------ state-manager/app/tasks/verify_graph.py | 32 +++-------------- 3 files changed, 41 insertions(+), 40 deletions(-) diff --git a/state-manager/app/controller/upsert_graph_template.py b/state-manager/app/controller/upsert_graph_template.py index 574988dc..72e70e29 100644 --- a/state-manager/app/controller/upsert_graph_template.py +++ b/state-manager/app/controller/upsert_graph_template.py @@ -3,6 +3,9 @@ from app.models.db.graph_template_model import GraphTemplate from app.models.graph_template_validation_status import GraphTemplateValidationStatus from app.tasks.verify_graph import verify_graph +from app.models.db.trigger import DatabaseTriggers +from app.models.trigger_models import TriggerStatusEnum, TriggerTypeEnum +from beanie.operators import In from fastapi import BackgroundTasks, HTTPException @@ -58,7 +61,15 @@ async def upsert_graph_template(namespace_name: str, graph_name: str, body: Upse logger.error("Error validating graph template", error=e, x_exosphere_request_id=x_exosphere_request_id) raise HTTPException(status_code=400, detail=f"Error validating graph template: {str(e)}") - background_tasks.add_task(verify_graph, graph_template, old_triggers) + if len(old_triggers) > 0: + await DatabaseTriggers.find( + DatabaseTriggers.graph_name == graph_name, + DatabaseTriggers.trigger_status == TriggerStatusEnum.PENDING, + DatabaseTriggers.type == TriggerTypeEnum.CRON, + In(DatabaseTriggers.expression, [trigger.value["expression"] for trigger in old_triggers if trigger.type == TriggerTypeEnum.CRON]) + ).delete_many() + + background_tasks.add_task(verify_graph, graph_template) return UpsertGraphTemplateResponse( nodes=graph_template.nodes, diff --git a/state-manager/app/tasks/trigger_cron.py b/state-manager/app/tasks/trigger_cron.py index b53d3aa7..1505dd60 100644 --- a/state-manager/app/tasks/trigger_cron.py +++ b/state-manager/app/tasks/trigger_cron.py @@ -6,6 +6,7 @@ from app.controller.trigger_graph import trigger_graph from app.models.trigger_graph_model import TriggerGraphRequestModel from pymongo import ReturnDocument +from pymongo.errors import DuplicateKeyError from app.config.settings import get_settings import croniter import asyncio @@ -41,17 +42,29 @@ async def mark_as_failed(trigger: DatabaseTriggers): async def create_next_triggers(trigger: DatabaseTriggers, cron_time: datetime): assert trigger.expression is not None - iter = croniter.croniter(trigger.expression, cron_time) - next_trigger_time = iter.get_next(datetime) + iter = croniter.croniter(trigger.expression) - await DatabaseTriggers( - type=TriggerTypeEnum.CRON, - expression=trigger.expression, - graph_name=trigger.graph_name, - namespace=trigger.namespace, - trigger_time=next_trigger_time, - trigger_status=TriggerStatusEnum.PENDING - ).insert() + while True: + next_trigger_time = iter.get_next(datetime) + + try: + await DatabaseTriggers( + type=TriggerTypeEnum.CRON, + expression=trigger.expression, + graph_name=trigger.graph_name, + namespace=trigger.namespace, + trigger_time=next_trigger_time, + trigger_status=TriggerStatusEnum.PENDING + ).insert() + except DuplicateKeyError: + logger.error(f"Duplicate trigger found for expression {trigger.expression}") + continue + except Exception as e: + logger.error(f"Error creating next trigger: {e}") + continue + + if next_trigger_time > cron_time: + break async def mark_as_triggered(trigger: DatabaseTriggers): await DatabaseTriggers.get_pymongo_collection().update_one( @@ -63,11 +76,12 @@ async def handle_trigger(cron_time: datetime): while(trigger:= await get_due_triggers(cron_time)): try: await call_trigger_graph(trigger) - await create_next_triggers(trigger, cron_time) await mark_as_triggered(trigger) except Exception as e: await mark_as_failed(trigger) logger.error(f"Error calling trigger graph: {e}") + finally: + await create_next_triggers(trigger, cron_time) async def trigger_cron(): cron_time = datetime.now() diff --git a/state-manager/app/tasks/verify_graph.py b/state-manager/app/tasks/verify_graph.py index 172e6055..e54f405c 100644 --- a/state-manager/app/tasks/verify_graph.py +++ b/state-manager/app/tasks/verify_graph.py @@ -101,30 +101,8 @@ async def verify_inputs(graph_template: GraphTemplate, registered_nodes: list[Re return errors -async def cancel_crons(graph_template: GraphTemplate, old_triggers: list[Trigger]): - old_cron_expressions = set([trigger.value["expression"] for trigger in old_triggers if trigger.type == TriggerTypeEnum.CRON]) - new_cron_expressions = set([trigger.value["expression"] for trigger in graph_template.triggers if trigger.type == TriggerTypeEnum.CRON]) - - removed_expressions = old_cron_expressions - new_cron_expressions - - await DatabaseTriggers.find( - DatabaseTriggers.graph_name == graph_template.name, - DatabaseTriggers.trigger_status == TriggerStatusEnum.PENDING, - DatabaseTriggers.type == TriggerTypeEnum.CRON, - In(DatabaseTriggers.expression, list(removed_expressions)) - ).update( - { - "$set": { - "trigger_status": TriggerStatusEnum.CANCELLED - } - } - ) # type: ignore - -async def create_crons(graph_template: GraphTemplate, old_triggers: list[Trigger]): - old_cron_expressions = set([trigger.value["expression"] for trigger in old_triggers if trigger.type == TriggerTypeEnum.CRON]) - new_cron_expressions = set([trigger.value["expression"] for trigger in graph_template.triggers if trigger.type == TriggerTypeEnum.CRON]) - - expressions_to_create = new_cron_expressions - old_cron_expressions +async def create_crons(graph_template: GraphTemplate): + expressions_to_create = set([trigger.value["expression"] for trigger in graph_template.triggers if trigger.type == TriggerTypeEnum.CRON]) current_time = datetime.now() @@ -133,7 +111,6 @@ async def create_crons(graph_template: GraphTemplate, old_triggers: list[Trigger iter = croniter.croniter(expression, current_time) next_trigger_time = iter.get_next(datetime) - print(next_trigger_time) new_db_triggers.append( DatabaseTriggers( @@ -149,7 +126,7 @@ async def create_crons(graph_template: GraphTemplate, old_triggers: list[Trigger if len(new_db_triggers) > 0: await DatabaseTriggers.insert_many(new_db_triggers) -async def verify_graph(graph_template: GraphTemplate, old_triggers: list[Trigger]): +async def verify_graph(graph_template: GraphTemplate): try: errors = [] registered_nodes = await RegisteredNode.list_nodes_by_templates(graph_template.nodes) @@ -173,9 +150,8 @@ async def verify_graph(graph_template: GraphTemplate, old_triggers: list[Trigger graph_template.validation_status = GraphTemplateValidationStatus.VALID graph_template.validation_errors = [] - await asyncio.gather(*[cancel_crons(graph_template, old_triggers), create_crons(graph_template, old_triggers)]) - await graph_template.save() + await create_crons(graph_template) except Exception as e: logger.error(f"Exception during graph validation for graph template {graph_template.id}: {str(e)}", exc_info=True) From edbbfcf708b43e181bbb79db21778008e3d9e6a2 Mon Sep 17 00:00:00 2001 From: NiveditJain Date: Mon, 29 Sep 2025 10:49:12 +0530 Subject: [PATCH 6/9] improving triggers --- state-manager/app/tasks/trigger_cron.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/state-manager/app/tasks/trigger_cron.py b/state-manager/app/tasks/trigger_cron.py index 1505dd60..1df1a12e 100644 --- a/state-manager/app/tasks/trigger_cron.py +++ b/state-manager/app/tasks/trigger_cron.py @@ -42,7 +42,7 @@ async def mark_as_failed(trigger: DatabaseTriggers): async def create_next_triggers(trigger: DatabaseTriggers, cron_time: datetime): assert trigger.expression is not None - iter = croniter.croniter(trigger.expression) + iter = croniter.croniter(trigger.expression, trigger.trigger_time) while True: next_trigger_time = iter.get_next(datetime) @@ -58,10 +58,8 @@ async def create_next_triggers(trigger: DatabaseTriggers, cron_time: datetime): ).insert() except DuplicateKeyError: logger.error(f"Duplicate trigger found for expression {trigger.expression}") - continue except Exception as e: logger.error(f"Error creating next trigger: {e}") - continue if next_trigger_time > cron_time: break From 26ed57a1948358883376d8059cd6eb49cf8f0599 Mon Sep 17 00:00:00 2001 From: Nivedit Jain Date: Mon, 29 Sep 2025 11:02:09 +0530 Subject: [PATCH 7/9] Update docs/docs/exosphere/triggers.md Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- docs/docs/exosphere/triggers.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/docs/exosphere/triggers.md b/docs/docs/exosphere/triggers.md index b5b2c697..67de4cfd 100644 --- a/docs/docs/exosphere/triggers.md +++ b/docs/docs/exosphere/triggers.md @@ -183,6 +183,7 @@ retry_policy = RetryPolicyModel( # Create graph with both triggers and retry policy result = await state_manager.upsert_graph( graph_name="robust-pipeline", + # Assuming `graph_nodes`, `secrets`, and `triggers` are defined as in previous examples graph_nodes=graph_nodes, secrets=secrets, triggers=triggers, From f6994e175bdced73087703d0beeea1c079fb721f Mon Sep 17 00:00:00 2001 From: NiveditJain Date: Mon, 29 Sep 2025 11:06:59 +0530 Subject: [PATCH 8/9] Refactor trigger handling and improve error management in tests - Updated the `create_next_triggers` function in `trigger_cron.py` to raise exceptions on errors, enhancing error visibility. - Modified test cases in `test_upsert_graph_template.py` and `test_verify_graph.py` to remove unnecessary parameters from `verify_graph`, streamlining the function calls and improving clarity. - Adjusted mock setups in tests to reflect the changes in function signatures, ensuring accurate testing of trigger functionality. --- state-manager/app/tasks/trigger_cron.py | 1 + .../controller/test_upsert_graph_template.py | 2 +- .../tests/unit/tasks/test_verify_graph.py | 17 +++++++---------- 3 files changed, 9 insertions(+), 11 deletions(-) diff --git a/state-manager/app/tasks/trigger_cron.py b/state-manager/app/tasks/trigger_cron.py index 1df1a12e..ab0771bb 100644 --- a/state-manager/app/tasks/trigger_cron.py +++ b/state-manager/app/tasks/trigger_cron.py @@ -60,6 +60,7 @@ async def create_next_triggers(trigger: DatabaseTriggers, cron_time: datetime): logger.error(f"Duplicate trigger found for expression {trigger.expression}") except Exception as e: logger.error(f"Error creating next trigger: {e}") + raise if next_trigger_time > cron_time: break diff --git a/state-manager/tests/unit/controller/test_upsert_graph_template.py b/state-manager/tests/unit/controller/test_upsert_graph_template.py index db3e2640..ae826296 100644 --- a/state-manager/tests/unit/controller/test_upsert_graph_template.py +++ b/state-manager/tests/unit/controller/test_upsert_graph_template.py @@ -194,7 +194,7 @@ async def test_upsert_graph_template_create_new( mock_graph_template_class.insert.assert_called_once() # Verify background task was added - mock_background_tasks.add_task.assert_called_once_with(mock_verify_graph, mock_new_template, []) + mock_background_tasks.add_task.assert_called_once_with(mock_verify_graph, mock_new_template) @patch('app.controller.upsert_graph_template.GraphTemplate') async def test_upsert_graph_template_database_error( diff --git a/state-manager/tests/unit/tasks/test_verify_graph.py b/state-manager/tests/unit/tasks/test_verify_graph.py index d3124885..e16d583a 100644 --- a/state-manager/tests/unit/tasks/test_verify_graph.py +++ b/state-manager/tests/unit/tasks/test_verify_graph.py @@ -385,13 +385,12 @@ async def test_verify_graph_success(self): with patch('app.tasks.verify_graph.verify_node_exists', new_callable=AsyncMock) as mock_verify_nodes: with patch('app.tasks.verify_graph.verify_secrets', new_callable=AsyncMock) as mock_verify_secrets: with patch('app.tasks.verify_graph.verify_inputs', new_callable=AsyncMock) as mock_verify_inputs: - with patch('app.tasks.verify_graph.cancel_crons', new_callable=AsyncMock) as _: - with patch('app.tasks.verify_graph.create_crons', new_callable=AsyncMock) as _: + with patch('app.tasks.verify_graph.create_crons', new_callable=AsyncMock) as _: mock_verify_nodes.return_value = [] mock_verify_secrets.return_value = [] mock_verify_inputs.return_value = [] - await verify_graph(graph_template, []) + await verify_graph(graph_template) assert graph_template.validation_status == GraphTemplateValidationStatus.VALID assert graph_template.validation_errors == [] @@ -427,7 +426,7 @@ async def test_verify_graph_with_errors(self): mock_verify_secrets.return_value = ["Secret error"] mock_verify_inputs.return_value = ["Input error"] - await verify_graph(graph_template, []) + await verify_graph(graph_template) assert graph_template.validation_status == GraphTemplateValidationStatus.INVALID assert graph_template.validation_errors == ["Node error", "Secret error", "Input error"] @@ -450,7 +449,7 @@ async def test_verify_graph_exception(self): # The verify_graph function should catch the exception, log it, set status, and re-raise it with pytest.raises(Exception, match="Database error"): - await verify_graph(graph_template, []) + await verify_graph(graph_template) assert graph_template.validation_status == GraphTemplateValidationStatus.INVALID assert graph_template.validation_errors == ["Validation failed due to unexpected error: Database error"] @@ -475,7 +474,7 @@ async def test_verify_graph_with_exception(): # This should handle the exception and mark the graph as invalid, then re-raise with pytest.raises(Exception, match="Database connection error"): - await verify_graph(graph_template, []) + await verify_graph(graph_template) # Verify that the graph was marked as invalid with error assert graph_template.validation_status == GraphTemplateValidationStatus.INVALID @@ -499,7 +498,6 @@ async def test_verify_graph_with_validation_errors(): patch('app.tasks.verify_graph.verify_node_exists') as mock_verify_nodes, \ patch('app.tasks.verify_graph.verify_secrets') as mock_verify_secrets, \ patch('app.tasks.verify_graph.verify_inputs') as mock_verify_inputs, \ - patch('app.tasks.verify_graph.cancel_crons', new_callable=AsyncMock) as _, \ patch('app.tasks.verify_graph.create_crons', new_callable=AsyncMock) as _: # Mock registered nodes to return empty list @@ -515,7 +513,7 @@ async def test_verify_graph_with_validation_errors(): graph_template.name = "test_graph" # This should mark the graph as invalid due to validation errors - await verify_graph(graph_template, []) + await verify_graph(graph_template) # Verify that the graph was marked as invalid assert graph_template.validation_status == GraphTemplateValidationStatus.INVALID @@ -539,7 +537,6 @@ async def test_verify_graph_with_valid_graph(): patch('app.tasks.verify_graph.verify_node_exists') as mock_verify_nodes, \ patch('app.tasks.verify_graph.verify_secrets') as mock_verify_secrets, \ patch('app.tasks.verify_graph.verify_inputs') as mock_verify_inputs, \ - patch('app.tasks.verify_graph.cancel_crons', new_callable=AsyncMock) as _, \ patch('app.tasks.verify_graph.create_crons', new_callable=AsyncMock) as _: # Mock registered nodes to return a valid node @@ -563,7 +560,7 @@ async def test_verify_graph_with_valid_graph(): graph_template.name = "test_graph" # This should mark the graph as valid - await verify_graph(graph_template, []) + await verify_graph(graph_template) # Verify that the graph was processed (status may vary based on actual validation) # The specific status depends on the actual validation logic From 2d77783925492eb3c2d4616263ea097dae4fb256 Mon Sep 17 00:00:00 2001 From: NiveditJain Date: Mon, 29 Sep 2025 11:08:03 +0530 Subject: [PATCH 9/9] Refactor import statements in verify_graph.py - Removed unused import of `In` from beanie.operators to clean up the code. - Streamlined the import section by retaining only necessary components, enhancing readability. --- state-manager/app/tasks/verify_graph.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/state-manager/app/tasks/verify_graph.py b/state-manager/app/tasks/verify_graph.py index e54f405c..7fbb021d 100644 --- a/state-manager/app/tasks/verify_graph.py +++ b/state-manager/app/tasks/verify_graph.py @@ -2,14 +2,13 @@ import croniter from datetime import datetime -from beanie.operators import In from json_schema_to_pydantic import create_model from app.models.db.graph_template_model import GraphTemplate from app.models.graph_template_validation_status import GraphTemplateValidationStatus from app.models.db.registered_node import RegisteredNode from app.singletons.logs_manager import LogsManager -from app.models.trigger_models import Trigger, TriggerStatusEnum, TriggerTypeEnum +from app.models.trigger_models import TriggerStatusEnum, TriggerTypeEnum from app.models.db.trigger import DatabaseTriggers logger = LogsManager().get_logger()