diff --git a/README.md b/README.md index f16e6eed..58620a0e 100644 --- a/README.md +++ b/README.md @@ -28,7 +28,7 @@ Exosphere provides a powerful foundation for building and orchestrating AI appli - **Infinite Parallel Agents**: Run multiple AI agents simultaneously across distributed infrastructure - **Dynamic State Management**: Create and manage state at runtime with persistent storage - **Fault Tolerance**: Built-in failure handling and recovery mechanisms for production reliability -- **Core Concepts**: Fanout, Unite, Signals, Retry Policy, Store +- **Core Concepts**: Fanout, Unite, Signals, Retry Policy, Store, Triggers ### **Smooth Developer Experience** - **Plug-and-Play Nodes**: Create reusable, atomic workflow components that can be mixed and matched @@ -79,6 +79,32 @@ Exosphere is built on a flexible, node-based architecture that makes it easy to - **Signals**: Inter-node communication and event handling - **Retry Policy**: Configurable failure handling and recovery - **Store**: Persistent storage for workflow state and data +- **Triggers**: Automatic scheduling with cron expressions + +## ⏰ Automatic Scheduling Example + +Schedule your workflows to run automatically using cron expressions: + +!!! info "Beta Feature" + Available in `beta-latest` Docker tag and SDK version `0.0.3b1` + +```python +from exospherehost import StateManager, GraphNodeModel, CronTrigger + +# Define triggers for automatic execution +triggers = [ + CronTrigger(expression="0 9 * * 1-5"), # Every weekday at 9 AM + CronTrigger(expression="0 */6 * * *") # Every 6 hours +] + +# Create graph with automatic scheduling +result = await state_manager.upsert_graph( + graph_name="data-pipeline", + graph_nodes=graph_nodes, + secrets={"api_key": "your-key"}, + triggers=triggers # Enable automatic execution (Beta) +) +``` ### **Deployment Options** diff --git a/docs/docs/exosphere/concepts.md b/docs/docs/exosphere/concepts.md index 6507e5b9..61f441e5 100644 --- a/docs/docs/exosphere/concepts.md +++ b/docs/docs/exosphere/concepts.md @@ -53,6 +53,12 @@ graph TB - **Runtime Access**: All nodes can read and write to the shared store - **Automatic Cleanup**: Store data is automatically cleaned up when workflows complete +### 7. **Automatic Triggers** +- **Cron Scheduling**: Schedule automatic graph execution using standard cron expressions +- **Unattended Operation**: Workflows run automatically without manual intervention +- **Multiple Schedules**: Each graph can have multiple triggers with different schedules +- **Beta**: Available in `beta-latest` Docker tag and SDK version `0.0.3b1` + ## How They Work Together These concepts combine to create a powerful workflow system: @@ -63,6 +69,7 @@ These concepts combine to create a powerful workflow system: 4. **Signals** give nodes control over execution flow and error handling 5. **Retry policies** ensure resilience against transient failures 6. **Store** provides persistent state across workflow executions +7. **Triggers** enable automatic, scheduled execution for unattended operations ## Benefits @@ -80,4 +87,5 @@ Explore each concept in detail: - **[Unite](./unite.md)** - Understand synchronization of parallel paths - **[Signals](./signals.md)** - Control workflow execution flow - **[Retry Policy](./retry-policy.md)** - Build resilient workflows -- **[Store](./store.md)** - Persist data across workflow execution +- **[Store](./store.md)** - Persist data across workflow execution +- **[Triggers](./triggers.md)** - Schedule automatic graph execution diff --git a/docs/docs/exosphere/create-graph.md b/docs/docs/exosphere/create-graph.md index da497d12..11f5d9b9 100644 --- a/docs/docs/exosphere/create-graph.md +++ b/docs/docs/exosphere/create-graph.md @@ -11,6 +11,7 @@ A graph template consists of: - **Input Mapping**: How data flows between nodes using `${{ ... }}` syntax - **Retry Policy**: `Optional` failure handling configuration - **Store Configuration**: `Optional` graph-level key-value store +- **Triggers**: `Optional` cron-based automatic execution scheduling ## Basic Example of a Graph Template diff --git a/docs/docs/exosphere/graph-components.md b/docs/docs/exosphere/graph-components.md index 0431671a..de596d2d 100644 --- a/docs/docs/exosphere/graph-components.md +++ b/docs/docs/exosphere/graph-components.md @@ -97,10 +97,47 @@ Graph-level key-value storage for shared state: - `required_keys`: Keys that must be present in the store - `default_values`: Default values for store keys +## 6. Triggers + +Schedule automatic graph execution using cron expressions: + +!!! info "Beta Feature" + Available in `beta-latest` Docker tag and SDK version `0.0.3b1` + +```json +{ + "triggers": [ + { + "type": "CRON", + "value": { + "expression": "0 9 * * 1-5" + } + }, + { + "type": "CRON", + "value": { + "expression": "0 */6 * * *" + } + } + ] +} +``` + +**Fields:** +- `type`: Currently only "CRON" is supported +- `value.expression`: Standard cron expression (5-field format) + +**Common Cron Expressions:** +- `"0 9 * * 1-5"` - Every weekday at 9:00 AM +- `"0 */6 * * *"` - Every 6 hours +- `"0 0 * * 0"` - Every Sunday at midnight +- `"*/15 * * * *"` - Every 15 minutes + ## Next Steps - **[Create Graph](./create-graph.md)** - Return to main guide -- **[Graph Models](./python-sdk-graph.md)** - Use Python SDK for type-safe graph creation. +- **[Graph Models](./python-sdk-graph.md)** - Use Python SDK for type-safe graph creation +- **[Triggers](./triggers.md)** - Schedule automatic graph execution ## Related Concepts diff --git a/docs/docs/exosphere/python-sdk-graph.md b/docs/docs/exosphere/python-sdk-graph.md index 51bc649c..f97a360c 100644 --- a/docs/docs/exosphere/python-sdk-graph.md +++ b/docs/docs/exosphere/python-sdk-graph.md @@ -5,7 +5,7 @@ Use the Exosphere Python SDK with Pydantic models for type-safe graph creation. ## Basic Usage ```python hl_lines="42-48" -from exospherehost import StateManager, GraphNodeModel, RetryPolicyModel, StoreConfigModel, RetryStrategyEnum +from exospherehost import StateManager, GraphNodeModel, RetryPolicyModel, StoreConfigModel, CronTrigger, RetryStrategyEnum async def create_graph(): state_manager = StateManager( @@ -46,12 +46,18 @@ async def create_graph(): } ) + triggers = [ + CronTrigger(expression="0 9 * * 1-5"), # Every weekday at 9 AM + CronTrigger(expression="0 */6 * * *") # Every 6 hours + ] + result = await state_manager.upsert_graph( graph_name="my-workflow", graph_nodes=graph_nodes, secrets={"api_key": "your-key"}, retry_policy=retry_policy, - store_config=store_config + store_config=store_config, + triggers=triggers # Beta: SDK version 0.0.3b1 ) return result ``` diff --git a/docs/docs/exosphere/triggers.md b/docs/docs/exosphere/triggers.md new file mode 100644 index 00000000..67de4cfd --- /dev/null +++ b/docs/docs/exosphere/triggers.md @@ -0,0 +1,211 @@ +# Triggers + +!!! warning "Beta Feature" + Triggers functionality is currently in beta and available under the `beta-latest` Docker tag and SDK version `0.0.3b1`. The API may change in future versions. + +Triggers allow you to schedule automatic execution of your graphs using cron expressions. When a trigger is defined, Exosphere will automatically execute your graph at the specified times without requiring manual intervention. + +## Overview + +Triggers provide **scheduled execution** for your workflows, enabling automation for: + +- **Regular data processing** (daily reports, hourly syncs) +- **Maintenance tasks** (cleanup jobs, backups) +- **Monitoring workflows** (health checks, alerts) +- **Business processes** (invoice generation, notifications) + +```mermaid +graph TB + A[Cron Trigger] --> B[Scheduled Time Reached] + B --> C[Graph Execution Triggered] + C --> D[Workflow Runs Automatically] + + E[Multiple Triggers] -.->|Different schedules| A + + style A fill:#e8f5e8 + style E fill:#e8f5e8 +``` + +## How Triggers Work + +### Trigger Lifecycle + +1. **Definition**: Triggers are defined when creating/updating a graph template +2. **Scheduling**: Exosphere schedules the next execution based on cron expression +3. **Execution**: At the scheduled time, the graph is triggered automatically +4. **Rescheduling**: After execution, the next occurrence is automatically scheduled + +### Trigger Types + +Currently, **CRON** is the only supported trigger type, using standard 5-field cron expressions: + +``` +* * * * * +│ │ │ │ │ +│ │ │ │ └── Day of Week (0-7, Sunday = 0 or 7) +│ │ │ └──── Month (1-12) +│ │ └────── Day of Month (1-31) +│ └──────── Hour (0-23) +└────────── Minute (0-59) +``` + +## Implementation + +### JSON Configuration + +Define triggers in your graph template: + +```json +{ + "triggers": [ + { + "type": "CRON", + "value": { + "expression": "0 9 * * 1-5" + } + }, + { + "type": "CRON", + "value": { + "expression": "0 0 * * 0" + } + } + ], + "nodes": [ + // ... your graph nodes + ] +} +``` + +### Python SDK Example + +```python +from exospherehost import StateManager, GraphNodeModel, CronTrigger + +async def create_scheduled_graph(): + state_manager = StateManager( + namespace="DataPipeline", + state_manager_uri=EXOSPHERE_STATE_MANAGER_URI, + key=EXOSPHERE_API_KEY + ) + + # Define your graph nodes + graph_nodes = [ + GraphNodeModel( + node_name="DataExtractorNode", + namespace="DataPipeline", + identifier="extractor", + inputs={"source": "initial"}, + next_nodes=["processor"] + ), + GraphNodeModel( + node_name="DataProcessorNode", + namespace="DataPipeline", + identifier="processor", + inputs={"raw_data": "${{ extractor.outputs.data }}"}, + next_nodes=[] + ) + ] + + # Define triggers for automatic execution + triggers = [ + CronTrigger(expression="0 2 * * *"), # Daily at 2:00 AM + CronTrigger(expression="0 */4 * * *") # Every 4 hours + ] + + # Create the graph with triggers + result = await state_manager.upsert_graph( + graph_name="data-pipeline", + graph_nodes=graph_nodes, + secrets={"api_key": "your-api-key"}, + triggers=triggers + ) + + print(f"Graph created with {len(triggers)} triggers") + return result + +# Run the function +import asyncio +asyncio.run(create_scheduled_graph()) +``` + +## Common Cron Expressions + +### Basic Patterns + +| Expression | Description | Example Use Case | +|------------|-------------|------------------| +| `"0 9 * * 1-5"` | Every weekday at 9:00 AM | Business reports | +| `"0 */6 * * *"` | Every 6 hours | Data synchronization | +| `"0 0 * * 0"` | Every Sunday at midnight | Weekly cleanup | +| `"*/15 * * * *"` | Every 15 minutes | Health checks | +| `"0 2 * * *"` | Daily at 2:00 AM | Nightly batch jobs | +| `"0 0 1 * *"` | First day of every month | Monthly reports | + +### Advanced Patterns + +| Expression | Description | +|------------|-------------| +| `"0 9-17 * * 1-5"` | Every hour from 9 AM to 5 PM, weekdays only | +| `"0 0 * * 1,3,5"` | Monday, Wednesday, Friday at midnight | +| `"30 8 * * 1-5"` | Weekdays at 8:30 AM | +| `"0 12 1,15 * *"` | 1st and 15th of each month at noon | +| `"0 0 1 1,7 *"` | January 1st and July 1st at midnight | + +## Best Practices + +### Scheduling Considerations + +1. **Avoid Peak Times**: Schedule resource-intensive workflows during off-peak hours +2. **Stagger Executions**: If you have multiple graphs, stagger their execution times +3. **Consider Time Zones**: Cron expressions use server time (UTC by default) +4. **Resource Planning**: Ensure your infrastructure can handle scheduled workloads + +### Error Handling + +- **Retry Policies**: Combine triggers with retry policies for resilient automation +- **Monitoring**: Set up alerts for failed scheduled executions +- **Logging**: Ensure adequate logging for troubleshooting scheduled runs + +### Example with Retry Policy + +```python +from exospherehost import RetryPolicyModel, RetryStrategyEnum + +# Define retry policy for scheduled executions +retry_policy = RetryPolicyModel( + max_retries=3, + strategy=RetryStrategyEnum.EXPONENTIAL, + backoff_factor=2000, + exponent=2 +) + +# Create graph with both triggers and retry policy +result = await state_manager.upsert_graph( + graph_name="robust-pipeline", + # Assuming `graph_nodes`, `secrets`, and `triggers` are defined as in previous examples + graph_nodes=graph_nodes, + secrets=secrets, + triggers=triggers, + retry_policy=retry_policy # Handles failures in scheduled runs +) +``` + +## Limitations + +- **CRON Only**: Currently only cron-based scheduling is supported +- **No Manual Override**: Scheduled executions cannot be manually cancelled once triggered +- **Time Zone**: All cron expressions are evaluated in server time (UTC) +- **Minimum Interval**: Avoid scheduling more frequently than every minute + +## Next Steps + +- **[Create Graph](./create-graph.md)** - Learn about graph creation +- **[Retry Policy](./retry-policy.md)** - Add resilience to scheduled executions +- **[Store](./store.md)** - Persist data across scheduled runs +- **[Dashboard](./dashboard.md)** - Monitor scheduled executions + +## Related Concepts + +- **[Graph Components](./graph-components.md)** - Complete overview of graph features +- **[Python SDK](./python-sdk-graph.md)** - Type-safe graph creation with triggers \ No newline at end of file diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index 9ffebd59..81b4f2a4 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -150,7 +150,8 @@ nav: - Unite: exosphere/unite.md - Signals: exosphere/signals.md - Retry Policy: exosphere/retry-policy.md - - Store: exosphere/store.md + - Store: exosphere/store.md + - Triggers: exosphere/triggers.md - Local Setup: - Overview: exosphere/local-setup.md - Docker Compose: docker-compose-setup.md diff --git a/python-sdk/README.md b/python-sdk/README.md index 43719f3b..4d969cf1 100644 --- a/python-sdk/README.md +++ b/python-sdk/README.md @@ -274,7 +274,7 @@ state_manager = StateManager(namespace="MyProject") #### Creating/Updating Graph Definitions (Beta) ```python -from exospherehost import StateManager, GraphNodeModel, RetryPolicyModel, StoreConfigModel, RetryStrategyEnum +from exospherehost import StateManager, GraphNodeModel, RetryPolicyModel, StoreConfigModel, RetryStrategyEnum, CronTrigger async def create_graph(): state_manager = StateManager(namespace="MyProject") @@ -320,6 +320,12 @@ async def create_graph(): } ) + # Define triggers for automatic execution + triggers = [ + CronTrigger(expression="0 9 * * 1-5"), # Every weekday at 9 AM + CronTrigger(expression="0 */6 * * *") # Every 6 hours + ] + # Create or update the graph (beta) result = await state_manager.upsert_graph( graph_name="my-workflow", @@ -330,6 +336,7 @@ async def create_graph(): }, retry_policy=retry_policy, # beta store_config=store_config, # beta + triggers=triggers, # automatic scheduling validation_timeout=60, polling_interval=1 ) @@ -345,6 +352,7 @@ async def create_graph(): - `secrets` (dict[str, str]): Key/value secrets available to all nodes - `retry_policy` (RetryPolicyModel | None): Optional retry policy configuration (beta) - `store_config` (StoreConfigModel | None): Graph-level store configuration (beta) +- `triggers` (list[CronTrigger] | None): Optional list of cron triggers for automatic graph execution (beta: SDK version 0.0.3b1) - `validation_timeout` (int): Seconds to wait for validation (default: 60) - `polling_interval` (int): Polling interval in seconds (default: 1) diff --git a/python-sdk/exospherehost/models.py b/python-sdk/exospherehost/models.py index 78fabeba..92ed3381 100644 --- a/python-sdk/exospherehost/models.py +++ b/python-sdk/exospherehost/models.py @@ -160,4 +160,4 @@ def validate_default_values(cls, v: dict[str, str]) -> dict[str, str]: return normalized_dict class CronTrigger(BaseModel): - expression: str = Field(..., description="Cron expression") \ No newline at end of file + expression: str = Field(..., description="Cron expression for scheduling automatic graph execution. Uses standard 5-field format: minute hour day-of-month month day-of-week. Example: '0 9 * * 1-5' for weekdays at 9 AM.") \ No newline at end of file diff --git a/python-sdk/exospherehost/statemanager.py b/python-sdk/exospherehost/statemanager.py index 85e94130..13cf6890 100644 --- a/python-sdk/exospherehost/statemanager.py +++ b/python-sdk/exospherehost/statemanager.py @@ -143,6 +143,9 @@ async def upsert_graph(self, graph_name: str, graph_nodes: list[GraphNodeModel], retry_policy (RetryPolicyModel | None): Optional per-node retry policy configuration. store_config (StoreConfigModel | None): Beta configuration for the graph-level store (schema is subject to change). + triggers (list[CronTrigger] | None): Optional list of cron triggers for automatic + graph execution. Each trigger contains a cron expression that schedules when + the graph should be executed automatically. validation_timeout (int): Seconds to wait for validation (default 60). polling_interval (int): Polling interval in seconds (default 1). diff --git a/state-manager/app/controller/upsert_graph_template.py b/state-manager/app/controller/upsert_graph_template.py index 574988dc..72e70e29 100644 --- a/state-manager/app/controller/upsert_graph_template.py +++ b/state-manager/app/controller/upsert_graph_template.py @@ -3,6 +3,9 @@ from app.models.db.graph_template_model import GraphTemplate from app.models.graph_template_validation_status import GraphTemplateValidationStatus from app.tasks.verify_graph import verify_graph +from app.models.db.trigger import DatabaseTriggers +from app.models.trigger_models import TriggerStatusEnum, TriggerTypeEnum +from beanie.operators import In from fastapi import BackgroundTasks, HTTPException @@ -58,7 +61,15 @@ async def upsert_graph_template(namespace_name: str, graph_name: str, body: Upse logger.error("Error validating graph template", error=e, x_exosphere_request_id=x_exosphere_request_id) raise HTTPException(status_code=400, detail=f"Error validating graph template: {str(e)}") - background_tasks.add_task(verify_graph, graph_template, old_triggers) + if len(old_triggers) > 0: + await DatabaseTriggers.find( + DatabaseTriggers.graph_name == graph_name, + DatabaseTriggers.trigger_status == TriggerStatusEnum.PENDING, + DatabaseTriggers.type == TriggerTypeEnum.CRON, + In(DatabaseTriggers.expression, [trigger.value["expression"] for trigger in old_triggers if trigger.type == TriggerTypeEnum.CRON]) + ).delete_many() + + background_tasks.add_task(verify_graph, graph_template) return UpsertGraphTemplateResponse( nodes=graph_template.nodes, diff --git a/state-manager/app/tasks/trigger_cron.py b/state-manager/app/tasks/trigger_cron.py index b53d3aa7..ab0771bb 100644 --- a/state-manager/app/tasks/trigger_cron.py +++ b/state-manager/app/tasks/trigger_cron.py @@ -6,6 +6,7 @@ from app.controller.trigger_graph import trigger_graph from app.models.trigger_graph_model import TriggerGraphRequestModel from pymongo import ReturnDocument +from pymongo.errors import DuplicateKeyError from app.config.settings import get_settings import croniter import asyncio @@ -41,17 +42,28 @@ async def mark_as_failed(trigger: DatabaseTriggers): async def create_next_triggers(trigger: DatabaseTriggers, cron_time: datetime): assert trigger.expression is not None - iter = croniter.croniter(trigger.expression, cron_time) - next_trigger_time = iter.get_next(datetime) + iter = croniter.croniter(trigger.expression, trigger.trigger_time) - await DatabaseTriggers( - type=TriggerTypeEnum.CRON, - expression=trigger.expression, - graph_name=trigger.graph_name, - namespace=trigger.namespace, - trigger_time=next_trigger_time, - trigger_status=TriggerStatusEnum.PENDING - ).insert() + while True: + next_trigger_time = iter.get_next(datetime) + + try: + await DatabaseTriggers( + type=TriggerTypeEnum.CRON, + expression=trigger.expression, + graph_name=trigger.graph_name, + namespace=trigger.namespace, + trigger_time=next_trigger_time, + trigger_status=TriggerStatusEnum.PENDING + ).insert() + except DuplicateKeyError: + logger.error(f"Duplicate trigger found for expression {trigger.expression}") + except Exception as e: + logger.error(f"Error creating next trigger: {e}") + raise + + if next_trigger_time > cron_time: + break async def mark_as_triggered(trigger: DatabaseTriggers): await DatabaseTriggers.get_pymongo_collection().update_one( @@ -63,11 +75,12 @@ async def handle_trigger(cron_time: datetime): while(trigger:= await get_due_triggers(cron_time)): try: await call_trigger_graph(trigger) - await create_next_triggers(trigger, cron_time) await mark_as_triggered(trigger) except Exception as e: await mark_as_failed(trigger) logger.error(f"Error calling trigger graph: {e}") + finally: + await create_next_triggers(trigger, cron_time) async def trigger_cron(): cron_time = datetime.now() diff --git a/state-manager/app/tasks/verify_graph.py b/state-manager/app/tasks/verify_graph.py index 172e6055..7fbb021d 100644 --- a/state-manager/app/tasks/verify_graph.py +++ b/state-manager/app/tasks/verify_graph.py @@ -2,14 +2,13 @@ import croniter from datetime import datetime -from beanie.operators import In from json_schema_to_pydantic import create_model from app.models.db.graph_template_model import GraphTemplate from app.models.graph_template_validation_status import GraphTemplateValidationStatus from app.models.db.registered_node import RegisteredNode from app.singletons.logs_manager import LogsManager -from app.models.trigger_models import Trigger, TriggerStatusEnum, TriggerTypeEnum +from app.models.trigger_models import TriggerStatusEnum, TriggerTypeEnum from app.models.db.trigger import DatabaseTriggers logger = LogsManager().get_logger() @@ -101,30 +100,8 @@ async def verify_inputs(graph_template: GraphTemplate, registered_nodes: list[Re return errors -async def cancel_crons(graph_template: GraphTemplate, old_triggers: list[Trigger]): - old_cron_expressions = set([trigger.value["expression"] for trigger in old_triggers if trigger.type == TriggerTypeEnum.CRON]) - new_cron_expressions = set([trigger.value["expression"] for trigger in graph_template.triggers if trigger.type == TriggerTypeEnum.CRON]) - - removed_expressions = old_cron_expressions - new_cron_expressions - - await DatabaseTriggers.find( - DatabaseTriggers.graph_name == graph_template.name, - DatabaseTriggers.trigger_status == TriggerStatusEnum.PENDING, - DatabaseTriggers.type == TriggerTypeEnum.CRON, - In(DatabaseTriggers.expression, list(removed_expressions)) - ).update( - { - "$set": { - "trigger_status": TriggerStatusEnum.CANCELLED - } - } - ) # type: ignore - -async def create_crons(graph_template: GraphTemplate, old_triggers: list[Trigger]): - old_cron_expressions = set([trigger.value["expression"] for trigger in old_triggers if trigger.type == TriggerTypeEnum.CRON]) - new_cron_expressions = set([trigger.value["expression"] for trigger in graph_template.triggers if trigger.type == TriggerTypeEnum.CRON]) - - expressions_to_create = new_cron_expressions - old_cron_expressions +async def create_crons(graph_template: GraphTemplate): + expressions_to_create = set([trigger.value["expression"] for trigger in graph_template.triggers if trigger.type == TriggerTypeEnum.CRON]) current_time = datetime.now() @@ -133,7 +110,6 @@ async def create_crons(graph_template: GraphTemplate, old_triggers: list[Trigger iter = croniter.croniter(expression, current_time) next_trigger_time = iter.get_next(datetime) - print(next_trigger_time) new_db_triggers.append( DatabaseTriggers( @@ -149,7 +125,7 @@ async def create_crons(graph_template: GraphTemplate, old_triggers: list[Trigger if len(new_db_triggers) > 0: await DatabaseTriggers.insert_many(new_db_triggers) -async def verify_graph(graph_template: GraphTemplate, old_triggers: list[Trigger]): +async def verify_graph(graph_template: GraphTemplate): try: errors = [] registered_nodes = await RegisteredNode.list_nodes_by_templates(graph_template.nodes) @@ -173,9 +149,8 @@ async def verify_graph(graph_template: GraphTemplate, old_triggers: list[Trigger graph_template.validation_status = GraphTemplateValidationStatus.VALID graph_template.validation_errors = [] - await asyncio.gather(*[cancel_crons(graph_template, old_triggers), create_crons(graph_template, old_triggers)]) - await graph_template.save() + await create_crons(graph_template) except Exception as e: logger.error(f"Exception during graph validation for graph template {graph_template.id}: {str(e)}", exc_info=True) diff --git a/state-manager/tests/unit/controller/test_upsert_graph_template.py b/state-manager/tests/unit/controller/test_upsert_graph_template.py index db3e2640..ae826296 100644 --- a/state-manager/tests/unit/controller/test_upsert_graph_template.py +++ b/state-manager/tests/unit/controller/test_upsert_graph_template.py @@ -194,7 +194,7 @@ async def test_upsert_graph_template_create_new( mock_graph_template_class.insert.assert_called_once() # Verify background task was added - mock_background_tasks.add_task.assert_called_once_with(mock_verify_graph, mock_new_template, []) + mock_background_tasks.add_task.assert_called_once_with(mock_verify_graph, mock_new_template) @patch('app.controller.upsert_graph_template.GraphTemplate') async def test_upsert_graph_template_database_error( diff --git a/state-manager/tests/unit/tasks/test_verify_graph.py b/state-manager/tests/unit/tasks/test_verify_graph.py index d3124885..e16d583a 100644 --- a/state-manager/tests/unit/tasks/test_verify_graph.py +++ b/state-manager/tests/unit/tasks/test_verify_graph.py @@ -385,13 +385,12 @@ async def test_verify_graph_success(self): with patch('app.tasks.verify_graph.verify_node_exists', new_callable=AsyncMock) as mock_verify_nodes: with patch('app.tasks.verify_graph.verify_secrets', new_callable=AsyncMock) as mock_verify_secrets: with patch('app.tasks.verify_graph.verify_inputs', new_callable=AsyncMock) as mock_verify_inputs: - with patch('app.tasks.verify_graph.cancel_crons', new_callable=AsyncMock) as _: - with patch('app.tasks.verify_graph.create_crons', new_callable=AsyncMock) as _: + with patch('app.tasks.verify_graph.create_crons', new_callable=AsyncMock) as _: mock_verify_nodes.return_value = [] mock_verify_secrets.return_value = [] mock_verify_inputs.return_value = [] - await verify_graph(graph_template, []) + await verify_graph(graph_template) assert graph_template.validation_status == GraphTemplateValidationStatus.VALID assert graph_template.validation_errors == [] @@ -427,7 +426,7 @@ async def test_verify_graph_with_errors(self): mock_verify_secrets.return_value = ["Secret error"] mock_verify_inputs.return_value = ["Input error"] - await verify_graph(graph_template, []) + await verify_graph(graph_template) assert graph_template.validation_status == GraphTemplateValidationStatus.INVALID assert graph_template.validation_errors == ["Node error", "Secret error", "Input error"] @@ -450,7 +449,7 @@ async def test_verify_graph_exception(self): # The verify_graph function should catch the exception, log it, set status, and re-raise it with pytest.raises(Exception, match="Database error"): - await verify_graph(graph_template, []) + await verify_graph(graph_template) assert graph_template.validation_status == GraphTemplateValidationStatus.INVALID assert graph_template.validation_errors == ["Validation failed due to unexpected error: Database error"] @@ -475,7 +474,7 @@ async def test_verify_graph_with_exception(): # This should handle the exception and mark the graph as invalid, then re-raise with pytest.raises(Exception, match="Database connection error"): - await verify_graph(graph_template, []) + await verify_graph(graph_template) # Verify that the graph was marked as invalid with error assert graph_template.validation_status == GraphTemplateValidationStatus.INVALID @@ -499,7 +498,6 @@ async def test_verify_graph_with_validation_errors(): patch('app.tasks.verify_graph.verify_node_exists') as mock_verify_nodes, \ patch('app.tasks.verify_graph.verify_secrets') as mock_verify_secrets, \ patch('app.tasks.verify_graph.verify_inputs') as mock_verify_inputs, \ - patch('app.tasks.verify_graph.cancel_crons', new_callable=AsyncMock) as _, \ patch('app.tasks.verify_graph.create_crons', new_callable=AsyncMock) as _: # Mock registered nodes to return empty list @@ -515,7 +513,7 @@ async def test_verify_graph_with_validation_errors(): graph_template.name = "test_graph" # This should mark the graph as invalid due to validation errors - await verify_graph(graph_template, []) + await verify_graph(graph_template) # Verify that the graph was marked as invalid assert graph_template.validation_status == GraphTemplateValidationStatus.INVALID @@ -539,7 +537,6 @@ async def test_verify_graph_with_valid_graph(): patch('app.tasks.verify_graph.verify_node_exists') as mock_verify_nodes, \ patch('app.tasks.verify_graph.verify_secrets') as mock_verify_secrets, \ patch('app.tasks.verify_graph.verify_inputs') as mock_verify_inputs, \ - patch('app.tasks.verify_graph.cancel_crons', new_callable=AsyncMock) as _, \ patch('app.tasks.verify_graph.create_crons', new_callable=AsyncMock) as _: # Mock registered nodes to return a valid node @@ -563,7 +560,7 @@ async def test_verify_graph_with_valid_graph(): graph_template.name = "test_graph" # This should mark the graph as valid - await verify_graph(graph_template, []) + await verify_graph(graph_template) # Verify that the graph was processed (status may vary based on actual validation) # The specific status depends on the actual validation logic