diff --git a/docs/docs/exosphere/triggers.md b/docs/docs/exosphere/triggers.md index 67de4cfd..aa220cd8 100644 --- a/docs/docs/exosphere/triggers.md +++ b/docs/docs/exosphere/triggers.md @@ -61,13 +61,15 @@ Define triggers in your graph template: { "type": "CRON", "value": { - "expression": "0 9 * * 1-5" + "expression": "0 9 * * 1-5", + "timezone": "America/New_York" } }, { "type": "CRON", "value": { - "expression": "0 0 * * 0" + "expression": "0 0 * * 0", + "timezone": "UTC" } } ], @@ -77,6 +79,8 @@ Define triggers in your graph template: } ``` +**Note:** The `timezone` field is optional and defaults to `"UTC"` if not specified. Use IANA timezone names (e.g., `"America/New_York"`, `"Europe/London"`, `"Asia/Tokyo"`). + ### Python SDK Example ```python @@ -109,8 +113,8 @@ async def create_scheduled_graph(): # Define triggers for automatic execution triggers = [ - CronTrigger(expression="0 2 * * *"), # Daily at 2:00 AM - CronTrigger(expression="0 */4 * * *") # Every 4 hours + CronTrigger(expression="0 2 * * *", timezone="America/New_York"), # Daily at 2:00 AM EST/EDT + CronTrigger(expression="0 */4 * * *", timezone="UTC") # Every 4 hours UTC ] # Create the graph with triggers @@ -158,7 +162,7 @@ asyncio.run(create_scheduled_graph()) 1. **Avoid Peak Times**: Schedule resource-intensive workflows during off-peak hours 2. **Stagger Executions**: If you have multiple graphs, stagger their execution times -3. **Consider Time Zones**: Cron expressions use server time (UTC by default) +3. **Consider Time Zones**: Specify the `timezone` parameter to ensure your cron expressions run at the correct local time. If not specified, defaults to UTC. 4. **Resource Planning**: Ensure your infrastructure can handle scheduled workloads ### Error Handling @@ -191,11 +195,31 @@ result = await state_manager.upsert_graph( ) ``` +## Timezone Support + +Triggers now support specifying a timezone for cron expressions, allowing you to schedule jobs in your local timezone: + +```python +# Schedule a report to run at 9 AM New York time (handles DST automatically) +CronTrigger(expression="0 9 * * 1-5", timezone="America/New_York") + +# Schedule a job at 5 PM London time +CronTrigger(expression="0 17 * * *", timezone="Europe/London") + +# Schedule using UTC (default) +CronTrigger(expression="0 12 * * *", timezone="UTC") +``` + +**Important Notes:** +- Use IANA timezone names (e.g., `"America/New_York"`, `"Europe/London"`, `"Asia/Tokyo"`) +- Timezones automatically handle Daylight Saving Time (DST) transitions +- If no timezone is specified, defaults to `"UTC"` +- All trigger times are internally stored in UTC for consistency + ## Limitations - **CRON Only**: Currently only cron-based scheduling is supported - **No Manual Override**: Scheduled executions cannot be manually cancelled once triggered -- **Time Zone**: All cron expressions are evaluated in server time (UTC) - **Minimum Interval**: Avoid scheduling more frequently than every minute ## Next Steps diff --git a/python-sdk/exospherehost/_version.py b/python-sdk/exospherehost/_version.py index 77621c99..5d12bc38 100644 --- a/python-sdk/exospherehost/_version.py +++ b/python-sdk/exospherehost/_version.py @@ -1 +1 @@ -version = "0.0.3b1" \ No newline at end of file +version = "0.0.3b2" \ No newline at end of file diff --git a/python-sdk/exospherehost/models.py b/python-sdk/exospherehost/models.py index 92ed3381..b5be5a29 100644 --- a/python-sdk/exospherehost/models.py +++ b/python-sdk/exospherehost/models.py @@ -1,7 +1,9 @@ from pydantic import BaseModel, Field, field_validator from typing import Any, Optional, List from enum import Enum +from zoneinfo import available_timezones +_AVAILABLE_TIMEZONES = available_timezones() class UnitesStrategyEnum(str, Enum): ALL_SUCCESS = "ALL_SUCCESS" @@ -160,4 +162,12 @@ def validate_default_values(cls, v: dict[str, str]) -> dict[str, str]: return normalized_dict class CronTrigger(BaseModel): - expression: str = Field(..., description="Cron expression for scheduling automatic graph execution. Uses standard 5-field format: minute hour day-of-month month day-of-week. Example: '0 9 * * 1-5' for weekdays at 9 AM.") \ No newline at end of file + expression: str = Field(..., description="Cron expression for scheduling automatic graph execution. Uses standard 5-field format: minute hour day-of-month month day-of-week. Example: '0 9 * * 1-5' for weekdays at 9 AM.") + timezone: str = Field(default="UTC", description="Timezone for the cron expression (e.g., 'America/New_York', 'Europe/London', 'UTC'). Defaults to 'UTC'.") + + @field_validator('timezone') + @classmethod + def validate_timezone(cls, v: str) -> str: + if v not in _AVAILABLE_TIMEZONES: + raise ValueError(f"Invalid timezone: {v}. Must be a valid IANA timezone (e.g., 'America/New_York', 'Europe/London', 'UTC')") + return v \ No newline at end of file diff --git a/python-sdk/exospherehost/statemanager.py b/python-sdk/exospherehost/statemanager.py index 13cf6890..e4d4b010 100644 --- a/python-sdk/exospherehost/statemanager.py +++ b/python-sdk/exospherehost/statemanager.py @@ -171,9 +171,10 @@ async def upsert_graph(self, graph_name: str, graph_nodes: list[GraphNodeModel], if triggers is not None: body["triggers"] = [ { - "type": "CRON", "value": { - "expression": trigger.expression + "type": "CRON", + "expression": trigger.expression, + "timezone": trigger.timezone } } for trigger in triggers diff --git a/state-manager/app/controller/upsert_graph_template.py b/state-manager/app/controller/upsert_graph_template.py index 72e70e29..125bae53 100644 --- a/state-manager/app/controller/upsert_graph_template.py +++ b/state-manager/app/controller/upsert_graph_template.py @@ -66,7 +66,7 @@ async def upsert_graph_template(namespace_name: str, graph_name: str, body: Upse DatabaseTriggers.graph_name == graph_name, DatabaseTriggers.trigger_status == TriggerStatusEnum.PENDING, DatabaseTriggers.type == TriggerTypeEnum.CRON, - In(DatabaseTriggers.expression, [trigger.value["expression"] for trigger in old_triggers if trigger.type == TriggerTypeEnum.CRON]) + In(DatabaseTriggers.expression, [trigger.value["expression"] for trigger in old_triggers if trigger.value.type == TriggerTypeEnum.CRON]) ).delete_many() background_tasks.add_task(verify_graph, graph_template) diff --git a/state-manager/app/models/db/trigger.py b/state-manager/app/models/db/trigger.py index a416193e..2b6c40ae 100644 --- a/state-manager/app/models/db/trigger.py +++ b/state-manager/app/models/db/trigger.py @@ -9,6 +9,7 @@ class DatabaseTriggers(Document): type: TriggerTypeEnum = Field(..., description="Type of the trigger") expression: Optional[str] = Field(default=None, description="Expression of the trigger") + timezone: Optional[str] = Field(default="UTC", description="Timezone for the trigger") graph_name: str = Field(..., description="Name of the graph") namespace: str = Field(..., description="Namespace of the graph") trigger_time: datetime = Field(..., description="Trigger time of the trigger") diff --git a/state-manager/app/models/trigger_models.py b/state-manager/app/models/trigger_models.py index a8dbddb0..c44ef168 100644 --- a/state-manager/app/models/trigger_models.py +++ b/state-manager/app/models/trigger_models.py @@ -1,7 +1,11 @@ -from pydantic import BaseModel, Field, field_validator, model_validator +from pydantic import BaseModel, Field, field_validator from enum import Enum from croniter import croniter -from typing import Self +from typing import Union, Annotated, Literal +from zoneinfo import available_timezones + +# Cache available timezones at module level to avoid repeated filesystem queries +_AVAILABLE_TIMEZONES = available_timezones() class TriggerTypeEnum(str, Enum): CRON = "CRON" @@ -14,7 +18,9 @@ class TriggerStatusEnum(str, Enum): TRIGGERING = "TRIGGERING" class CronTrigger(BaseModel): + type: Literal[TriggerTypeEnum.CRON] = Field(default=TriggerTypeEnum.CRON, description="Type of the trigger") expression: str = Field(..., description="Cron expression for the trigger") + timezone: str = Field(default="UTC", description="Timezone for the cron expression (e.g., 'America/New_York', 'Europe/London', 'UTC')") @field_validator("expression") @classmethod @@ -23,14 +29,24 @@ def validate_expression(cls, v: str) -> str: raise ValueError("Invalid cron expression") return v + @field_validator("timezone") + @classmethod + def validate_timezone(cls, v: str) -> str: + if v not in _AVAILABLE_TIMEZONES: + raise ValueError(f"Invalid timezone: {v}. Must be a valid IANA timezone (e.g., 'America/New_York', 'Europe/London', 'UTC')") + return v + +# Union type for all trigger types - add new trigger types here +TriggerValue = Annotated[Union[CronTrigger], Field(discriminator="type")] # type: ignore + class Trigger(BaseModel): - type: TriggerTypeEnum = Field(..., description="Type of the trigger") - value: dict = Field(default_factory=dict, description="Value of the trigger") - - @model_validator(mode="after") - def validate_trigger(self) -> Self: - if self.type == TriggerTypeEnum.CRON: - CronTrigger.model_validate(self.value) - else: - raise ValueError(f"Unsupported trigger type: {self.type}") - return self \ No newline at end of file + """ + Extensible trigger model using discriminated unions. + To add a new trigger type: + 1. Add the enum value to TriggerTypeEnum + 2. Create a new trigger class (e.g., WebhookTrigger) with type field + 3. Add it to the TriggerValue Union + + Note: Access trigger type via trigger.value.type + """ + value: TriggerValue = Field(..., description="Value of the trigger") \ No newline at end of file diff --git a/state-manager/app/tasks/trigger_cron.py b/state-manager/app/tasks/trigger_cron.py index 3fca36ea..eb2d3c9d 100644 --- a/state-manager/app/tasks/trigger_cron.py +++ b/state-manager/app/tasks/trigger_cron.py @@ -8,9 +8,13 @@ from pymongo import ReturnDocument from pymongo.errors import DuplicateKeyError from app.config.settings import get_settings +from zoneinfo import ZoneInfo import croniter import asyncio +# Cache UTC timezone at module level to avoid repeated instantiation +UTC = ZoneInfo("UTC") + logger = LogsManager().get_logger() async def get_due_triggers(cron_time: datetime) -> DatabaseTriggers | None: @@ -47,16 +51,27 @@ async def mark_as_failed(trigger: DatabaseTriggers, retention_hours: int): async def create_next_triggers(trigger: DatabaseTriggers, cron_time: datetime, retention_hours: int): assert trigger.expression is not None - iter = croniter.croniter(trigger.expression, trigger.trigger_time) + + # Use the trigger's timezone, defaulting to UTC if not specified + tz = ZoneInfo(trigger.timezone or "UTC") + + # Convert trigger_time to the specified timezone for croniter + trigger_time_tz = trigger.trigger_time.replace(tzinfo=UTC).astimezone(tz) + iter = croniter.croniter(trigger.expression, trigger_time_tz) while True: - next_trigger_time = iter.get_next(datetime) + # Get next trigger time in the specified timezone + next_trigger_time_tz = iter.get_next(datetime) + + # Convert back to UTC for storage + next_trigger_time = next_trigger_time_tz.astimezone(UTC).replace(tzinfo=None) expires_at = next_trigger_time + timedelta(hours=retention_hours) try: await DatabaseTriggers( type=TriggerTypeEnum.CRON, expression=trigger.expression, + timezone=trigger.timezone, graph_name=trigger.graph_name, namespace=trigger.namespace, trigger_time=next_trigger_time, diff --git a/state-manager/app/tasks/verify_graph.py b/state-manager/app/tasks/verify_graph.py index 3ac7ee36..ceef0fc2 100644 --- a/state-manager/app/tasks/verify_graph.py +++ b/state-manager/app/tasks/verify_graph.py @@ -3,6 +3,7 @@ from datetime import datetime from json_schema_to_pydantic import create_model +from zoneinfo import ZoneInfo from app.models.db.graph_template_model import GraphTemplate from app.models.graph_template_validation_status import GraphTemplateValidationStatus @@ -13,6 +14,9 @@ from app.config.settings import get_settings from datetime import timedelta +# Cache UTC timezone at module level to avoid repeated instantiation +UTC = ZoneInfo("UTC") + logger = LogsManager().get_logger() settings = get_settings() @@ -105,21 +109,37 @@ async def verify_inputs(graph_template: GraphTemplate, registered_nodes: list[Re return errors async def create_crons(graph_template: GraphTemplate): - expressions_to_create = set([trigger.value["expression"] for trigger in graph_template.triggers if trigger.type == TriggerTypeEnum.CRON]) + # Build a map of (expression, timezone) -> CronTrigger for deduplication + triggers_to_create = {} + for trigger in graph_template.triggers: + if trigger.value.type == TriggerTypeEnum.CRON: + # trigger.value is already a validated CronTrigger instance + cron_trigger = trigger.value + triggers_to_create[(cron_trigger.expression, cron_trigger.timezone)] = cron_trigger + + current_time = datetime.now(UTC).replace(tzinfo=None) - current_time = datetime.now() - new_db_triggers = [] - for expression in expressions_to_create: - iter = croniter.croniter(expression, current_time) + for (expression, timezone), cron_trigger in triggers_to_create.items(): + # Use the validated timezone (guaranteed to be valid IANA timezone, never None) + tz = ZoneInfo(timezone) + + # Get current time in the specified timezone + current_time_tz = current_time.replace(tzinfo=UTC).astimezone(tz) + iter = croniter.croniter(expression, current_time_tz) + + # Get next trigger time in the specified timezone + next_trigger_time_tz = iter.get_next(datetime) - next_trigger_time = iter.get_next(datetime) + # Convert back to UTC for storage (remove timezone info for storage) + next_trigger_time = next_trigger_time_tz.astimezone(UTC).replace(tzinfo=None) expires_at = next_trigger_time + timedelta(hours=settings.trigger_retention_hours) new_db_triggers.append( DatabaseTriggers( type=TriggerTypeEnum.CRON, - expression=expression, + expression=cron_trigger.expression, + timezone=cron_trigger.timezone, graph_name=graph_template.name, namespace=graph_template.namespace, trigger_status=TriggerStatusEnum.PENDING, diff --git a/state-manager/pytest.ini b/state-manager/pytest.ini index 0e617e9f..e38d3c98 100644 --- a/state-manager/pytest.ini +++ b/state-manager/pytest.ini @@ -7,4 +7,7 @@ markers = unit: marks a test as a unit test with_database: marks a test as a test that requires a database asyncio_mode = auto +filterwarnings = + ignore::pydantic.PydanticDeprecatedSince211:lazy_model.* + ignore::DeprecationWarning:lazy_model.* diff --git a/state-manager/tests/unit/models/test_trigger_models.py b/state-manager/tests/unit/models/test_trigger_models.py new file mode 100644 index 00000000..293b9c8b --- /dev/null +++ b/state-manager/tests/unit/models/test_trigger_models.py @@ -0,0 +1,168 @@ +import pytest +from pydantic import ValidationError +from app.models.trigger_models import CronTrigger, Trigger, TriggerTypeEnum + + +class TestCronTrigger: + """Test cases for CronTrigger model""" + + def test_valid_cron_trigger_with_timezone(self): + """Test creating a valid cron trigger with timezone""" + trigger = CronTrigger(expression="0 9 * * *", timezone="America/New_York") + assert trigger.expression == "0 9 * * *" + assert trigger.timezone == "America/New_York" + + def test_valid_cron_trigger_without_timezone(self): + """Test creating a valid cron trigger without timezone defaults to UTC""" + trigger = CronTrigger(expression="0 9 * * *") + assert trigger.expression == "0 9 * * *" + assert trigger.timezone == "UTC" + + def test_valid_cron_trigger_with_utc_timezone(self): + """Test creating a valid cron trigger with UTC timezone""" + trigger = CronTrigger(expression="0 9 * * *", timezone="UTC") + assert trigger.expression == "0 9 * * *" + assert trigger.timezone == "UTC" + + def test_valid_cron_trigger_with_europe_london(self): + """Test creating a valid cron trigger with Europe/London timezone""" + trigger = CronTrigger(expression="0 17 * * *", timezone="Europe/London") + assert trigger.expression == "0 17 * * *" + assert trigger.timezone == "Europe/London" + + def test_valid_cron_trigger_with_asia_tokyo(self): + """Test creating a valid cron trigger with Asia/Tokyo timezone""" + trigger = CronTrigger(expression="30 8 * * 1-5", timezone="Asia/Tokyo") + assert trigger.expression == "30 8 * * 1-5" + assert trigger.timezone == "Asia/Tokyo" + + def test_invalid_cron_expression(self): + """Test creating a cron trigger with invalid expression""" + with pytest.raises(ValidationError) as exc_info: + CronTrigger(expression="invalid cron", timezone="UTC") + + errors = exc_info.value.errors() + assert len(errors) == 1 + # Check the error message (Pydantic v2 doesn't always populate ctx) + error_msg = errors[0].get("msg") or str(errors[0]) + assert "Invalid cron expression" in error_msg + + def test_invalid_timezone(self): + """Test creating a cron trigger with invalid timezone""" + with pytest.raises(ValidationError) as exc_info: + CronTrigger(expression="0 9 * * *", timezone="Invalid/Timezone") + + errors = exc_info.value.errors() + assert len(errors) == 1 + # Check the error message (Pydantic v2 doesn't always populate ctx) + error_msg = errors[0].get("msg") or str(errors[0]) + assert "Invalid timezone" in error_msg + assert "Invalid/Timezone" in error_msg + + def test_complex_cron_expression_with_timezone(self): + """Test complex cron expression with timezone""" + trigger = CronTrigger(expression="0 0 1,15 * *", timezone="America/Los_Angeles") + assert trigger.expression == "0 0 1,15 * *" + assert trigger.timezone == "America/Los_Angeles" + + def test_every_15_minutes_cron_with_timezone(self): + """Test every 15 minutes cron with timezone""" + trigger = CronTrigger(expression="*/15 * * * *", timezone="Europe/Paris") + assert trigger.expression == "*/15 * * * *" + assert trigger.timezone == "Europe/Paris" + + +class TestTrigger: + """Test cases for Trigger model""" + + def test_valid_trigger_with_cron_and_timezone(self): + """Test creating a valid trigger with CRON type and timezone""" + trigger = Trigger( + value={"type": TriggerTypeEnum.CRON, "expression": "0 9 * * *", "timezone": "America/New_York"} + ) + assert trigger.value.type == TriggerTypeEnum.CRON + assert trigger.value.expression == "0 9 * * *" + assert trigger.value.timezone == "America/New_York" + + def test_valid_trigger_with_cron_without_timezone(self): + """Test creating a valid trigger with CRON type without timezone""" + trigger = Trigger( + value={"type": TriggerTypeEnum.CRON, "expression": "0 9 * * *"} + ) + assert trigger.value.type == TriggerTypeEnum.CRON + assert trigger.value.expression == "0 9 * * *" + assert trigger.value.timezone == "UTC" # Should default to UTC + + def test_invalid_trigger_with_invalid_cron_expression(self): + """Test creating a trigger with invalid cron expression""" + with pytest.raises(ValidationError) as exc_info: + Trigger( + value={"type": TriggerTypeEnum.CRON, "expression": "invalid cron"} + ) + + errors = exc_info.value.errors() + assert len(errors) > 0 + + def test_invalid_trigger_with_invalid_timezone(self): + """Test creating a trigger with invalid timezone""" + with pytest.raises(ValidationError) as exc_info: + Trigger( + value={"type": TriggerTypeEnum.CRON, "expression": "0 9 * * *", "timezone": "Invalid/Zone"} + ) + + errors = exc_info.value.errors() + assert len(errors) > 0 + + def test_trigger_model_equality_with_identical_values(self): + """Test Pydantic model equality for triggers with identical values. + + Note: This test verifies model-level equality only, not deduplication logic. + The actual deduplication of triggers is handled in verify_graph.py's create_crons function. + """ + # Create first trigger + trigger1 = Trigger( + value={"type": TriggerTypeEnum.CRON, "expression": "0 9 * * *", "timezone": "America/New_York"} + ) + + # Create second trigger with identical values + trigger2 = Trigger( + value={"type": TriggerTypeEnum.CRON, "expression": "0 9 * * *", "timezone": "America/New_York"} + ) + + # Both triggers should be created successfully + assert trigger1.value.type == TriggerTypeEnum.CRON + assert trigger2.value.type == TriggerTypeEnum.CRON + assert trigger1.value.expression == "0 9 * * *" + assert trigger2.value.expression == "0 9 * * *" + assert trigger1.value.timezone == "America/New_York" + assert trigger2.value.timezone == "America/New_York" + + # Test equality - Pydantic models with same values should be equal + assert trigger1 == trigger2 + + # Test that they are different objects in memory + assert trigger1 is not trigger2 + + # Test model_dump produces identical output + assert trigger1.model_dump() == trigger2.model_dump() + + def test_duplicate_triggers_different_timezones(self): + """Test creating two triggers with same expression but different timezones""" + trigger1 = Trigger( + value={"type": TriggerTypeEnum.CRON, "expression": "0 9 * * *", "timezone": "America/New_York"} + ) + + trigger2 = Trigger( + value={"type": TriggerTypeEnum.CRON, "expression": "0 9 * * *", "timezone": "Europe/London"} + ) + + # Both triggers should be created successfully + assert trigger1.value.type == TriggerTypeEnum.CRON + assert trigger2.value.type == TriggerTypeEnum.CRON + + # They should NOT be equal due to different timezones + assert trigger1 != trigger2 + + # Verify the difference is in timezone + assert trigger1.value.timezone == "America/New_York" + assert trigger2.value.timezone == "Europe/London" \ No newline at end of file diff --git a/state-manager/tests/unit/tasks/test_create_crons.py b/state-manager/tests/unit/tasks/test_create_crons.py new file mode 100644 index 00000000..1ec136df --- /dev/null +++ b/state-manager/tests/unit/tasks/test_create_crons.py @@ -0,0 +1,235 @@ +""" +Tests for create_crons function to improve code coverage. +These are pure unit tests that mock DatabaseTriggers to avoid database dependency. +Environment variables are provided by CI (see .github/workflows/test-state-manager.yml). +""" +import pytest +from unittest.mock import MagicMock, AsyncMock, patch +from datetime import datetime + +# Environment variables are set by CI workflow +# For local testing, these should be set before running tests +# See: .github/workflows/test-state-manager.yml + +from app.tasks.verify_graph import create_crons +from app.models.trigger_models import Trigger, TriggerTypeEnum + +# Helper function to create trigger value dict +def make_trigger_value(expression: str, timezone: str = None): + """Helper to create trigger value dict with required 'type' field""" + result = {"type": TriggerTypeEnum.CRON, "expression": expression} + if timezone is not None: + result["timezone"] = timezone + return result + + +@pytest.mark.asyncio +async def test_create_crons_with_america_new_york_timezone(): + """Test create_crons processes America/New_York timezone correctly""" + graph_template = MagicMock() + graph_template.name = "test_graph" + graph_template.namespace = "test_ns" + graph_template.triggers = [ + Trigger( + value=make_trigger_value("0 9 * * *", "America/New_York") + ) + ] + + # Mock DatabaseTriggers class and insert_many method + with patch('app.tasks.verify_graph.DatabaseTriggers') as mock_db_class: + # Mock instances that will be created + mock_instance = MagicMock() + mock_db_class.return_value = mock_instance + mock_db_class.insert_many = AsyncMock() + + await create_crons(graph_template) + + # Verify DatabaseTriggers was instantiated with correct parameters + assert mock_db_class.called + call_kwargs = mock_db_class.call_args[1] + assert call_kwargs['timezone'] == "America/New_York" + assert call_kwargs['expression'] == "0 9 * * *" + assert call_kwargs['graph_name'] == "test_graph" + assert call_kwargs['namespace'] == "test_ns" + + # Verify insert_many was called with the list of triggers + assert mock_db_class.insert_many.called + + +@pytest.mark.asyncio +async def test_create_crons_with_default_utc_timezone(): + """Test create_crons uses UTC as default when timezone not specified""" + graph_template = MagicMock() + graph_template.name = "test_graph" + graph_template.namespace = "test_ns" + graph_template.triggers = [ + Trigger( + value=make_trigger_value("0 9 * * *") # No timezone specified + ) + ] + + with patch('app.tasks.verify_graph.DatabaseTriggers') as mock_db_class: + mock_db_class.return_value = MagicMock() + mock_db_class.insert_many = AsyncMock() + + await create_crons(graph_template) + + # Verify default UTC timezone was used + call_kwargs = mock_db_class.call_args[1] + assert call_kwargs['timezone'] == "UTC" + assert mock_db_class.insert_many.called + + +@pytest.mark.asyncio +async def test_create_crons_with_europe_london_timezone(): + """Test create_crons handles Europe/London timezone""" + graph_template = MagicMock() + graph_template.name = "test_graph" + graph_template.namespace = "test_ns" + graph_template.triggers = [ + Trigger( + value=make_trigger_value("0 17 * * *", "Europe/London") + ) + ] + + with patch('app.tasks.verify_graph.DatabaseTriggers') as mock_db_class: + mock_db_class.return_value = MagicMock() + mock_db_class.insert_many = AsyncMock() + + await create_crons(graph_template) + + call_kwargs = mock_db_class.call_args[1] + assert call_kwargs['timezone'] == "Europe/London" + assert call_kwargs['expression'] == "0 17 * * *" + assert mock_db_class.insert_many.called + + +@pytest.mark.asyncio +async def test_create_crons_with_multiple_different_timezones(): + """Test create_crons handles multiple triggers with different timezones""" + graph_template = MagicMock() + graph_template.name = "test_graph" + graph_template.namespace = "test_ns" + graph_template.triggers = [ + Trigger( + value=make_trigger_value("0 9 * * *", "America/New_York") + ), + Trigger( + value=make_trigger_value("0 17 * * *", "Europe/London") + ) + ] + + with patch('app.tasks.verify_graph.DatabaseTriggers') as mock_db_class: + mock_db_class.return_value = MagicMock() + mock_db_class.insert_many = AsyncMock() + + await create_crons(graph_template) + + # Should be called twice (one for each trigger) + assert mock_db_class.call_count == 2 + + # Verify insert_many was called once with list of 2 triggers + assert mock_db_class.insert_many.call_count == 1 + insert_call_args = mock_db_class.insert_many.call_args[0][0] + assert len(insert_call_args) == 2 + + +@pytest.mark.asyncio +async def test_create_crons_skips_insert_when_no_triggers(): + """Test create_crons doesn't call insert_many when no CRON triggers exist""" + graph_template = MagicMock() + graph_template.name = "test_graph" + graph_template.namespace = "test_ns" + graph_template.triggers = [] + + with patch('app.tasks.verify_graph.DatabaseTriggers') as mock_db_class: + mock_db_class.insert_many = AsyncMock() + + await create_crons(graph_template) + + # Verify insert_many was NOT called (no triggers to insert) + assert not mock_db_class.insert_many.called + # DatabaseTriggers should not have been instantiated + assert mock_db_class.call_count == 0 + + +@pytest.mark.asyncio +async def test_create_crons_deduplicates_same_expression_and_timezone(): + """Test create_crons deduplicates triggers with same expression and timezone""" + graph_template = MagicMock() + graph_template.name = "test_graph" + graph_template.namespace = "test_ns" + graph_template.triggers = [ + Trigger( + value=make_trigger_value("0 9 * * *", "America/New_York") + ), + Trigger( + value=make_trigger_value("0 9 * * *", "America/New_York") + ) + ] + + with patch('app.tasks.verify_graph.DatabaseTriggers') as mock_db_class: + mock_db_class.return_value = MagicMock() + mock_db_class.insert_many = AsyncMock() + + await create_crons(graph_template) + + # Should only create one DatabaseTriggers instance (deduplicated) + assert mock_db_class.call_count == 1 + + # insert_many should be called with list of 1 + insert_call_args = mock_db_class.insert_many.call_args[0][0] + assert len(insert_call_args) == 1 + + +@pytest.mark.asyncio +async def test_create_crons_keeps_same_expression_different_timezones(): + """Test create_crons keeps triggers with same expression but different timezones""" + graph_template = MagicMock() + graph_template.name = "test_graph" + graph_template.namespace = "test_ns" + graph_template.triggers = [ + Trigger( + value=make_trigger_value("0 9 * * *", "America/New_York") + ), + Trigger( + value=make_trigger_value("0 9 * * *", "Europe/London") + ) + ] + + with patch('app.tasks.verify_graph.DatabaseTriggers') as mock_db_class: + mock_db_class.return_value = MagicMock() + mock_db_class.insert_many = AsyncMock() + + await create_crons(graph_template) + + # Should create two DatabaseTriggers instances (different timezones) + assert mock_db_class.call_count == 2 + + # insert_many should be called with list of 2 + insert_call_args = mock_db_class.insert_many.call_args[0][0] + assert len(insert_call_args) == 2 + + +@pytest.mark.asyncio +async def test_create_crons_trigger_time_is_datetime(): + """Test that trigger_time is set to a datetime object""" + graph_template = MagicMock() + graph_template.name = "test_graph" + graph_template.namespace = "test_ns" + graph_template.triggers = [ + Trigger( + value=make_trigger_value("0 9 * * *", "America/New_York") + ) + ] + + with patch('app.tasks.verify_graph.DatabaseTriggers') as mock_db_class: + mock_db_class.return_value = MagicMock() + mock_db_class.insert_many = AsyncMock() + + await create_crons(graph_template) + + # Verify trigger_time is a datetime object + call_kwargs = mock_db_class.call_args[1] + assert isinstance(call_kwargs['trigger_time'], datetime) + diff --git a/state-manager/tests/unit/tasks/test_trigger_cron.py b/state-manager/tests/unit/tasks/test_trigger_cron.py index 7179e08a..fe8d6f38 100644 --- a/state-manager/tests/unit/tasks/test_trigger_cron.py +++ b/state-manager/tests/unit/tasks/test_trigger_cron.py @@ -1,6 +1,7 @@ """ -Tests for trigger TTL (Time To Live) expiration logic. -Verifies that completed/failed triggers are properly marked for cleanup. +Tests for trigger_cron functions to improve code coverage. +These are pure unit tests that mock database operations. +Environment variables are provided by CI (see .github/workflows/test-state-manager.yml). """ import pytest from unittest.mock import MagicMock, AsyncMock, patch @@ -20,6 +21,107 @@ from app.models.trigger_models import TriggerStatusEnum +@pytest.mark.asyncio +async def test_create_next_triggers_with_america_new_york_timezone(): + """Test create_next_triggers processes America/New_York timezone correctly""" + trigger = MagicMock() + trigger.expression = "0 9 * * *" + trigger.timezone = "America/New_York" + trigger.trigger_time = datetime(2025, 10, 4, 13, 0, 0) # Naive UTC time + trigger.graph_name = "test_graph" + trigger.namespace = "test_namespace" + + cron_time = datetime(2025, 10, 6, 0, 0, 0) + + with patch('app.tasks.trigger_cron.DatabaseTriggers') as mock_db_class: + mock_instance = MagicMock() + mock_instance.insert = AsyncMock() + mock_db_class.return_value = mock_instance + + await create_next_triggers(trigger, cron_time, 24) + + # Verify DatabaseTriggers was instantiated with timezone + assert mock_db_class.called + call_kwargs = mock_db_class.call_args[1] + assert call_kwargs['timezone'] == "America/New_York" + assert call_kwargs['expression'] == "0 9 * * *" + + +@pytest.mark.asyncio +async def test_create_next_triggers_with_utc_timezone(): + """Test create_next_triggers with UTC timezone""" + trigger = MagicMock() + trigger.expression = "0 9 * * *" + trigger.timezone = "UTC" + trigger.trigger_time = datetime(2025, 10, 4, 9, 0, 0) + trigger.graph_name = "test_graph" + trigger.namespace = "test_namespace" + + cron_time = datetime(2025, 10, 6, 0, 0, 0) + + with patch('app.tasks.trigger_cron.DatabaseTriggers') as mock_db_class: + mock_instance = MagicMock() + mock_instance.insert = AsyncMock() + mock_db_class.return_value = mock_instance + + await create_next_triggers(trigger, cron_time, 24) + + # Verify timezone was passed correctly + call_kwargs = mock_db_class.call_args[1] + assert call_kwargs['timezone'] == "UTC" + + +@pytest.mark.asyncio +async def test_create_next_triggers_with_none_timezone_defaults_to_utc(): + """Test create_next_triggers with None timezone defaults to UTC""" + trigger = MagicMock() + trigger.expression = "0 9 * * *" + trigger.timezone = None + trigger.trigger_time = datetime(2025, 10, 4, 9, 0, 0) + trigger.graph_name = "test_graph" + trigger.namespace = "test_namespace" + + cron_time = datetime(2025, 10, 6, 0, 0, 0) + + with patch('app.tasks.trigger_cron.DatabaseTriggers') as mock_db_class: + mock_instance = MagicMock() + mock_instance.insert = AsyncMock() + mock_db_class.return_value = mock_instance + + await create_next_triggers(trigger, cron_time, 24) + + # Verify None timezone is passed through (will default to UTC in ZoneInfo call) + call_kwargs = mock_db_class.call_args[1] + assert call_kwargs['timezone'] is None + + +@pytest.mark.asyncio +async def test_create_next_triggers_with_europe_london_timezone(): + """Test create_next_triggers with Europe/London timezone""" + trigger = MagicMock() + trigger.expression = "0 17 * * *" + trigger.timezone = "Europe/London" + trigger.trigger_time = datetime(2025, 10, 4, 16, 0, 0) # UTC time + trigger.graph_name = "test_graph" + trigger.namespace = "test_namespace" + + cron_time = datetime(2025, 10, 6, 0, 0, 0) + + with patch('app.tasks.trigger_cron.DatabaseTriggers') as mock_db_class: + mock_instance = MagicMock() + mock_instance.insert = AsyncMock() + mock_db_class.return_value = mock_instance + + await create_next_triggers(trigger, cron_time, 24) + + # Verify Europe/London timezone was used + call_kwargs = mock_db_class.call_args[1] + assert call_kwargs['timezone'] == "Europe/London" + +""" +Tests for trigger TTL (Time To Live) expiration logic. +Verifies that completed/failed triggers are properly marked for cleanup. +""" @pytest.mark.asyncio @pytest.mark.parametrize("mark_function,expected_status", [ (mark_as_triggered, TriggerStatusEnum.TRIGGERED), @@ -155,9 +257,10 @@ async def test_call_trigger_graph(): @pytest.mark.asyncio async def test_create_next_triggers_creates_future_trigger(): """Test create_next_triggers creates next trigger in the future""" - cron_time = datetime.now(timezone.utc) + cron_time = datetime.now(timezone.utc).replace(tzinfo=None) trigger = MagicMock(spec=DatabaseTriggers) trigger.expression = "0 9 * * *" + trigger.timezone = "UTC" trigger.trigger_time = cron_time - timedelta(days=1) trigger.graph_name = "test_graph" trigger.namespace = "test_ns" @@ -177,28 +280,88 @@ async def test_create_next_triggers_creates_future_trigger(): @pytest.mark.asyncio async def test_create_next_triggers_handles_duplicate_key_error(): """Test create_next_triggers handles DuplicateKeyError gracefully""" - cron_time = datetime.now(timezone.utc) - trigger = MagicMock(spec=DatabaseTriggers) + trigger = MagicMock() trigger.expression = "0 9 * * *" - trigger.trigger_time = cron_time - timedelta(days=1) + trigger.timezone = "America/New_York" + trigger.trigger_time = datetime(2025, 10, 4, 13, 0, 0) trigger.graph_name = "test_graph" - trigger.namespace = "test_ns" + trigger.namespace = "test_namespace" - with patch('app.tasks.trigger_cron.DatabaseTriggers') as MockDatabaseTriggers: + cron_time = datetime(2025, 10, 6, 0, 0, 0) + + with patch('app.tasks.trigger_cron.DatabaseTriggers') as mock_db_class: mock_instance = MagicMock() - mock_instance.insert = AsyncMock(side_effect=DuplicateKeyError("duplicate")) - MockDatabaseTriggers.return_value = mock_instance + # First call raises DuplicateKeyError, second succeeds + mock_instance.insert = AsyncMock(side_effect=[ + DuplicateKeyError("Duplicate"), + None + ]) + mock_db_class.return_value = mock_instance + + with patch('app.tasks.trigger_cron.logger') as mock_logger: + # Should not raise exception + await create_next_triggers(trigger, cron_time, 24) + + # Verify error was logged + assert mock_logger.error.called + error_msg = mock_logger.error.call_args[0][0] + assert "Duplicate trigger found" in error_msg + + +@pytest.mark.asyncio +async def test_create_next_triggers_trigger_time_is_datetime(): + """Test that next trigger_time is a datetime object""" + trigger = MagicMock() + trigger.expression = "0 9 * * *" + trigger.timezone = "America/New_York" + trigger.trigger_time = datetime(2025, 10, 4, 13, 0, 0) + trigger.graph_name = "test_graph" + trigger.namespace = "test_namespace" + + cron_time = datetime(2025, 10, 6, 0, 0, 0) + + with patch('app.tasks.trigger_cron.DatabaseTriggers') as mock_db_class: + mock_instance = MagicMock() + mock_instance.insert = AsyncMock() + mock_db_class.return_value = mock_instance - # Should not raise exception await create_next_triggers(trigger, cron_time, 24) + # Verify trigger_time is a datetime + call_kwargs = mock_db_class.call_args[1] + assert isinstance(call_kwargs['trigger_time'], datetime) + + +@pytest.mark.asyncio +async def test_create_next_triggers_creates_multiple_triggers(): + """Test create_next_triggers creates multiple future triggers""" + trigger = MagicMock() + trigger.expression = "0 */6 * * *" # Every 6 hours + trigger.timezone = "UTC" + trigger.trigger_time = datetime(2025, 10, 4, 0, 0, 0) + trigger.graph_name = "test_graph" + trigger.namespace = "test_namespace" + + cron_time = datetime(2025, 10, 5, 0, 0, 0) # 24 hours later + + with patch('app.tasks.trigger_cron.DatabaseTriggers') as mock_db_class: + mock_instance = MagicMock() + mock_instance.insert = AsyncMock() + mock_db_class.return_value = mock_instance + + await create_next_triggers(trigger, cron_time, 24) + + # Should create multiple triggers (every 6 hours until past cron_time) + assert mock_db_class.call_count >= 4 # At least 4 triggers in 24 hours + @pytest.mark.asyncio async def test_create_next_triggers_raises_on_other_exceptions(): """Test create_next_triggers raises on non-DuplicateKeyError exceptions""" - cron_time = datetime.now(timezone.utc) + cron_time = datetime.now(timezone.utc).replace(tzinfo=None) trigger = MagicMock(spec=DatabaseTriggers) trigger.expression = "0 9 * * *" + trigger.timezone = "UTC" trigger.trigger_time = cron_time - timedelta(days=1) trigger.graph_name = "test_graph" trigger.namespace = "test_ns" diff --git a/state-manager/tests/unit/with_database/test_trigger_deduplication.py b/state-manager/tests/unit/with_database/test_trigger_deduplication.py new file mode 100644 index 00000000..8343957a --- /dev/null +++ b/state-manager/tests/unit/with_database/test_trigger_deduplication.py @@ -0,0 +1,176 @@ +""" +Integration tests for trigger deduplication logic in create_crons. +These tests verify that duplicate triggers with identical expression and timezone +result in only one DatabaseTriggers row being created. +""" +import pytest +from typing import List, Optional, Set, Tuple + +from app.models.db.graph_template_model import GraphTemplate +from app.models.graph_template_validation_status import GraphTemplateValidationStatus +from app.models.node_template_model import NodeTemplate +from app.models.trigger_models import Trigger, TriggerTypeEnum +from app.models.db.trigger import DatabaseTriggers +from app.tasks.verify_graph import create_crons + + +async def run_deduplication_test( + graph_name: str, + triggers: List[Trigger], + expected_count: int, + expected_pairs: Optional[Set[Tuple[str, str]]] = None +): + """Helper to test trigger deduplication with actual database. + + Args: + graph_name: Unique name for the test graph + triggers: List of Trigger objects to create + expected_count: Expected number of unique triggers in database + expected_pairs: Optional set of (expression, timezone) tuples to verify + """ + namespace = "test_namespace" + + # Pre-test cleanup + await DatabaseTriggers.find( + DatabaseTriggers.graph_name == graph_name, + DatabaseTriggers.namespace == namespace + ).delete_many() + + # Create graph template with triggers + graph_template = GraphTemplate( + name=graph_name, + namespace=namespace, + nodes=[ + NodeTemplate( + node_name="test_node", + namespace=namespace, + identifier="test_node", + inputs={}, + next_nodes=None, + unites=None + ) + ], + validation_status=GraphTemplateValidationStatus.PENDING, + triggers=triggers + ) + + # Call create_crons + await create_crons(graph_template) + + # Query database + triggers_in_db = await DatabaseTriggers.find( + DatabaseTriggers.graph_name == graph_name, + DatabaseTriggers.namespace == namespace + ).to_list() + + # Assert count + assert len(triggers_in_db) == expected_count + + # Assert pairs if provided + if expected_pairs is not None: + actual_pairs = {(t.expression, t.timezone) for t in triggers_in_db} + assert actual_pairs == expected_pairs + + # Post-test cleanup + await DatabaseTriggers.find( + DatabaseTriggers.graph_name == graph_name, + DatabaseTriggers.namespace == namespace + ).delete_many() + + +@pytest.mark.asyncio(loop_scope="session") +async def test_create_crons_deduplicates_identical_triggers(app_started): + """Test that create_crons deduplicates triggers with identical expression and timezone + + This integration test verifies the actual database behavior, not mocks. + It creates a GraphTemplate with duplicate CRON triggers and verifies that only + one DatabaseTriggers row exists per (expression, timezone) pair. + """ + await run_deduplication_test( + graph_name="test_dedup_graph", + triggers=[ + Trigger(value={"type": TriggerTypeEnum.CRON, "expression": "0 9 * * *", "timezone": "America/New_York"}), + Trigger(value={"type": TriggerTypeEnum.CRON, "expression": "0 9 * * *", "timezone": "America/New_York"}), + Trigger(value={"type": TriggerTypeEnum.CRON, "expression": "0 9 * * *", "timezone": "America/New_York"}), + ], + expected_count=1, + expected_pairs={("0 9 * * *", "America/New_York")} + ) + + +@pytest.mark.asyncio(loop_scope="session") +async def test_create_crons_keeps_triggers_with_different_timezones(app_started): + """Test that create_crons keeps triggers with same expression but different timezones + + This verifies that triggers are only deduplicated when BOTH expression AND timezone match. + """ + await run_deduplication_test( + graph_name="test_timezone_graph", + triggers=[ + Trigger(value={"type": TriggerTypeEnum.CRON, "expression": "0 9 * * *", "timezone": "America/New_York"}), + Trigger(value={"type": TriggerTypeEnum.CRON, "expression": "0 9 * * *", "timezone": "Europe/London"}), + Trigger(value={"type": TriggerTypeEnum.CRON, "expression": "0 9 * * *", "timezone": "Asia/Tokyo"}), + ], + expected_count=3, + expected_pairs={ + ("0 9 * * *", "America/New_York"), + ("0 9 * * *", "Europe/London"), + ("0 9 * * *", "Asia/Tokyo") + } + ) + + +@pytest.mark.asyncio(loop_scope="session") +async def test_create_crons_keeps_triggers_with_different_expressions(app_started): + """Test that create_crons keeps triggers with different expressions + + This verifies basic functionality - triggers with different expressions should all be created. + """ + await run_deduplication_test( + graph_name="test_expr_graph", + triggers=[ + Trigger(value={"type": TriggerTypeEnum.CRON, "expression": "0 9 * * *", "timezone": "UTC"}), + Trigger(value={"type": TriggerTypeEnum.CRON, "expression": "0 12 * * *", "timezone": "UTC"}), + Trigger(value={"type": TriggerTypeEnum.CRON, "expression": "*/15 * * * *", "timezone": "UTC"}), + ], + expected_count=3, + expected_pairs={ + ("0 9 * * *", "UTC"), + ("0 12 * * *", "UTC"), + ("*/15 * * * *", "UTC") + } + ) + + +@pytest.mark.asyncio(loop_scope="session") +async def test_create_crons_complex_deduplication_scenario(app_started): + """Test complex deduplication scenario with mix of duplicates and unique triggers + + This tests a realistic scenario where a graph template has: + - Some duplicate triggers (same expression + timezone) + - Some unique triggers + - Triggers with same expression but different timezone + """ + await run_deduplication_test( + graph_name="test_complex_graph", + triggers=[ + # Three duplicates - should result in 1 DB row + Trigger(value={"type": TriggerTypeEnum.CRON, "expression": "0 9 * * *", "timezone": "America/New_York"}), + Trigger(value={"type": TriggerTypeEnum.CRON, "expression": "0 9 * * *", "timezone": "America/New_York"}), + Trigger(value={"type": TriggerTypeEnum.CRON, "expression": "0 9 * * *", "timezone": "America/New_York"}), + # Same expression, different timezone - should result in 1 DB row + Trigger(value={"type": TriggerTypeEnum.CRON, "expression": "0 9 * * *", "timezone": "Europe/London"}), + # Two duplicates of a different expression - should result in 1 DB row + Trigger(value={"type": TriggerTypeEnum.CRON, "expression": "*/15 * * * *", "timezone": "UTC"}), + Trigger(value={"type": TriggerTypeEnum.CRON, "expression": "*/15 * * * *", "timezone": "UTC"}), + # Unique trigger - should result in 1 DB row + Trigger(value={"type": TriggerTypeEnum.CRON, "expression": "0 0 1 * *", "timezone": "Asia/Tokyo"}), + ], + expected_count=4, + expected_pairs={ + ("0 9 * * *", "America/New_York"), + ("0 9 * * *", "Europe/London"), + ("*/15 * * * *", "UTC"), + ("0 0 1 * *", "Asia/Tokyo"), + } + ) \ No newline at end of file