Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 30 additions & 6 deletions docs/docs/exosphere/triggers.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,15 @@ Define triggers in your graph template:
{
"type": "CRON",
"value": {
"expression": "0 9 * * 1-5"
"expression": "0 9 * * 1-5",
"timezone": "America/New_York"
}
},
{
"type": "CRON",
"value": {
"expression": "0 0 * * 0"
"expression": "0 0 * * 0",
"timezone": "UTC"
}
}
],
Expand All @@ -77,6 +79,8 @@ Define triggers in your graph template:
}
```

**Note:** The `timezone` field is optional and defaults to `"UTC"` if not specified. Use IANA timezone names (e.g., `"America/New_York"`, `"Europe/London"`, `"Asia/Tokyo"`).

### Python SDK Example

```python
Expand Down Expand Up @@ -109,8 +113,8 @@ async def create_scheduled_graph():

# Define triggers for automatic execution
triggers = [
CronTrigger(expression="0 2 * * *"), # Daily at 2:00 AM
CronTrigger(expression="0 */4 * * *") # Every 4 hours
CronTrigger(expression="0 2 * * *", timezone="America/New_York"), # Daily at 2:00 AM EST/EDT
CronTrigger(expression="0 */4 * * *", timezone="UTC") # Every 4 hours UTC
]

# Create the graph with triggers
Expand Down Expand Up @@ -158,7 +162,7 @@ asyncio.run(create_scheduled_graph())

1. **Avoid Peak Times**: Schedule resource-intensive workflows during off-peak hours
2. **Stagger Executions**: If you have multiple graphs, stagger their execution times
3. **Consider Time Zones**: Cron expressions use server time (UTC by default)
3. **Consider Time Zones**: Specify the `timezone` parameter to ensure your cron expressions run at the correct local time. If not specified, defaults to UTC.
4. **Resource Planning**: Ensure your infrastructure can handle scheduled workloads

### Error Handling
Expand Down Expand Up @@ -191,11 +195,31 @@ result = await state_manager.upsert_graph(
)
```

## Timezone Support

Triggers now support specifying a timezone for cron expressions, allowing you to schedule jobs in your local timezone:

```python
# Schedule a report to run at 9 AM New York time (handles DST automatically)
CronTrigger(expression="0 9 * * 1-5", timezone="America/New_York")

# Schedule a job at 5 PM London time
CronTrigger(expression="0 17 * * *", timezone="Europe/London")

# Schedule using UTC (default)
CronTrigger(expression="0 12 * * *", timezone="UTC")
```

**Important Notes:**
- Use IANA timezone names (e.g., `"America/New_York"`, `"Europe/London"`, `"Asia/Tokyo"`)
- Timezones automatically handle Daylight Saving Time (DST) transitions
- If no timezone is specified, defaults to `"UTC"`
- All trigger times are internally stored in UTC for consistency

## Limitations

- **CRON Only**: Currently only cron-based scheduling is supported
- **No Manual Override**: Scheduled executions cannot be manually cancelled once triggered
- **Time Zone**: All cron expressions are evaluated in server time (UTC)
- **Minimum Interval**: Avoid scheduling more frequently than every minute

## Next Steps
Expand Down
2 changes: 1 addition & 1 deletion python-sdk/exospherehost/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = "0.0.3b1"
version = "0.0.3b2"
12 changes: 11 additions & 1 deletion python-sdk/exospherehost/models.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from pydantic import BaseModel, Field, field_validator
from typing import Any, Optional, List
from enum import Enum
from zoneinfo import available_timezones

_AVAILABLE_TIMEZONES = available_timezones()

class UnitesStrategyEnum(str, Enum):
ALL_SUCCESS = "ALL_SUCCESS"
Expand Down Expand Up @@ -160,4 +162,12 @@ def validate_default_values(cls, v: dict[str, str]) -> dict[str, str]:
return normalized_dict

class CronTrigger(BaseModel):
expression: str = Field(..., description="Cron expression for scheduling automatic graph execution. Uses standard 5-field format: minute hour day-of-month month day-of-week. Example: '0 9 * * 1-5' for weekdays at 9 AM.")
expression: str = Field(..., description="Cron expression for scheduling automatic graph execution. Uses standard 5-field format: minute hour day-of-month month day-of-week. Example: '0 9 * * 1-5' for weekdays at 9 AM.")
timezone: str = Field(default="UTC", description="Timezone for the cron expression (e.g., 'America/New_York', 'Europe/London', 'UTC'). Defaults to 'UTC'.")

@field_validator('timezone')
@classmethod
def validate_timezone(cls, v: str) -> str:
if v not in _AVAILABLE_TIMEZONES:
raise ValueError(f"Invalid timezone: {v}. Must be a valid IANA timezone (e.g., 'America/New_York', 'Europe/London', 'UTC')")
return v
Comment on lines +168 to +173
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Strip whitespace for consistency with other validators.

All other validators in this file (validate_node_name, validate_identifier, validate_next_nodes, etc.) strip whitespace from input before validation. The timezone validator should follow this pattern to prevent confusing validation errors when users accidentally include leading/trailing spaces.

Apply this diff to maintain consistency:

 @field_validator('timezone')
 @classmethod
 def validate_timezone(cls, v: str) -> str:
-    if v not in _AVAILABLE_TIMEZONES:
+    trimmed_v = v.strip()
+    if trimmed_v not in _AVAILABLE_TIMEZONES:
         raise ValueError(f"Invalid timezone: {v}. Must be a valid IANA timezone (e.g., 'America/New_York', 'Europe/London', 'UTC')")
-    return v
+    return trimmed_v
🤖 Prompt for AI Agents
In python-sdk/exospherehost/models.py around lines 168 to 173, the timezone
validator currently checks v against _AVAILABLE_TIMEZONES without trimming
whitespace; update it to strip leading/trailing whitespace from v before
validation and return the stripped value so inputs like " UTC " validate
consistently with other validators in the file. Ensure you call v = v.strip()
(or equivalent) at the start of the validator, use the stripped value for the
membership check, and return the stripped string.

5 changes: 3 additions & 2 deletions python-sdk/exospherehost/statemanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,10 @@ async def upsert_graph(self, graph_name: str, graph_nodes: list[GraphNodeModel],
if triggers is not None:
body["triggers"] = [
{
"type": "CRON",
"value": {
"expression": trigger.expression
"type": "CRON",
"expression": trigger.expression,
"timezone": trigger.timezone
}
}
for trigger in triggers
Expand Down
2 changes: 1 addition & 1 deletion state-manager/app/controller/upsert_graph_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ async def upsert_graph_template(namespace_name: str, graph_name: str, body: Upse
DatabaseTriggers.graph_name == graph_name,
DatabaseTriggers.trigger_status == TriggerStatusEnum.PENDING,
DatabaseTriggers.type == TriggerTypeEnum.CRON,
In(DatabaseTriggers.expression, [trigger.value["expression"] for trigger in old_triggers if trigger.type == TriggerTypeEnum.CRON])
In(DatabaseTriggers.expression, [trigger.value["expression"] for trigger in old_triggers if trigger.value.type == TriggerTypeEnum.CRON])
).delete_many()

background_tasks.add_task(verify_graph, graph_template)
Expand Down
1 change: 1 addition & 0 deletions state-manager/app/models/db/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
class DatabaseTriggers(Document):
type: TriggerTypeEnum = Field(..., description="Type of the trigger")
expression: Optional[str] = Field(default=None, description="Expression of the trigger")
timezone: Optional[str] = Field(default="UTC", description="Timezone for the trigger")
graph_name: str = Field(..., description="Name of the graph")
namespace: str = Field(..., description="Namespace of the graph")
trigger_time: datetime = Field(..., description="Trigger time of the trigger")
Expand Down
40 changes: 28 additions & 12 deletions state-manager/app/models/trigger_models.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from pydantic import BaseModel, Field, field_validator, model_validator
from pydantic import BaseModel, Field, field_validator
from enum import Enum
from croniter import croniter
from typing import Self
from typing import Union, Annotated, Literal
from zoneinfo import available_timezones

# Cache available timezones at module level to avoid repeated filesystem queries
_AVAILABLE_TIMEZONES = available_timezones()

class TriggerTypeEnum(str, Enum):
CRON = "CRON"
Expand All @@ -14,7 +18,9 @@ class TriggerStatusEnum(str, Enum):
TRIGGERING = "TRIGGERING"

class CronTrigger(BaseModel):
type: Literal[TriggerTypeEnum.CRON] = Field(default=TriggerTypeEnum.CRON, description="Type of the trigger")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

type is added again here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This tells Pydantic: "Look at the type field to determine which union variant to instantiate." Without type in CronTrigger, Pydantic won't know how to deserialize the data. I have removed type from Trigger instead and refactored the code to access it via trigger.value.type

expression: str = Field(..., description="Cron expression for the trigger")
timezone: str = Field(default="UTC", description="Timezone for the cron expression (e.g., 'America/New_York', 'Europe/London', 'UTC')")

@field_validator("expression")
@classmethod
Expand All @@ -23,14 +29,24 @@ def validate_expression(cls, v: str) -> str:
raise ValueError("Invalid cron expression")
return v

@field_validator("timezone")
@classmethod
def validate_timezone(cls, v: str) -> str:
if v not in _AVAILABLE_TIMEZONES:
raise ValueError(f"Invalid timezone: {v}. Must be a valid IANA timezone (e.g., 'America/New_York', 'Europe/London', 'UTC')")
return v

# Union type for all trigger types - add new trigger types here
TriggerValue = Annotated[Union[CronTrigger], Field(discriminator="type")] # type: ignore

class Trigger(BaseModel):
type: TriggerTypeEnum = Field(..., description="Type of the trigger")
value: dict = Field(default_factory=dict, description="Value of the trigger")

@model_validator(mode="after")
def validate_trigger(self) -> Self:
if self.type == TriggerTypeEnum.CRON:
CronTrigger.model_validate(self.value)
else:
raise ValueError(f"Unsupported trigger type: {self.type}")
return self
"""
Extensible trigger model using discriminated unions.
To add a new trigger type:
1. Add the enum value to TriggerTypeEnum
2. Create a new trigger class (e.g., WebhookTrigger) with type field
3. Add it to the TriggerValue Union

Note: Access trigger type via trigger.value.type
"""
value: TriggerValue = Field(..., description="Value of the trigger")
19 changes: 17 additions & 2 deletions state-manager/app/tasks/trigger_cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@
from pymongo import ReturnDocument
from pymongo.errors import DuplicateKeyError
from app.config.settings import get_settings
from zoneinfo import ZoneInfo
import croniter
import asyncio

# Cache UTC timezone at module level to avoid repeated instantiation
UTC = ZoneInfo("UTC")

logger = LogsManager().get_logger()

async def get_due_triggers(cron_time: datetime) -> DatabaseTriggers | None:
Expand Down Expand Up @@ -47,16 +51,27 @@ async def mark_as_failed(trigger: DatabaseTriggers, retention_hours: int):

async def create_next_triggers(trigger: DatabaseTriggers, cron_time: datetime, retention_hours: int):
assert trigger.expression is not None
iter = croniter.croniter(trigger.expression, trigger.trigger_time)

# Use the trigger's timezone, defaulting to UTC if not specified
tz = ZoneInfo(trigger.timezone or "UTC")

# Convert trigger_time to the specified timezone for croniter
trigger_time_tz = trigger.trigger_time.replace(tzinfo=UTC).astimezone(tz)
iter = croniter.croniter(trigger.expression, trigger_time_tz)

Comment on lines +55 to 61
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Back-compat: handle missing timezone attribute and avoid ZoneInfo(None).

Tests and older records may not have timezone. Also persist a normalized tz name for reuse later.

-    # Use the trigger's timezone, defaulting to UTC if not specified
-    tz = ZoneInfo(trigger.timezone or "UTC")
+    # Resolve timezone safely for old records/tests without the attribute
+    tz_name = getattr(trigger, "timezone", None) or "UTC"
+    tz = ZoneInfo(tz_name)
 
-    # Convert trigger_time to the specified timezone for croniter
-    trigger_time_tz = trigger.trigger_time.replace(tzinfo=UTC).astimezone(tz)
-    iter = croniter.croniter(trigger.expression, trigger_time_tz)
+    # Convert trigger_time to the specified timezone for croniter
+    trigger_time_tz = trigger.trigger_time.replace(tzinfo=UTC).astimezone(tz)
+    cron_iter = croniter.croniter(trigger.expression, trigger_time_tz)

Also avoids shadowing built-in iter.

🧰 Tools
🪛 GitHub Actions: State Manager Unit Tests

[error] 56-56: pytest failed with AttributeError: Mock object has no attribute 'timezone' during create_next_triggers; the test mocks for DatabaseTriggers without providing a timezone attribute on the trigger mock. Command: uv run pytest tests/ --cov=app --cov-report=xml --cov-report=term-missing --cov-report=html -v --junitxml=full-pytest-report.xml

🤖 Prompt for AI Agents
In state-manager/app/tasks/trigger_cron.py around lines 55–61, avoid calling
ZoneInfo(None) by reading the timezone safely (e.g. tz_name = getattr(trigger,
"timezone", None) or trigger.timezone) and defaulting tz_name to "UTC" if falsy,
then create ZoneInfo(tz_name); persist the normalized tz_name back onto the
trigger record for future reuse; compute trigger_time_tz by replacing UTC on
trigger.trigger_time and astimezone(tz) as before; and stop shadowing the
built-in iter by renaming the croniter instance variable (e.g. cron_iter).

while True:
next_trigger_time = iter.get_next(datetime)
# Get next trigger time in the specified timezone
next_trigger_time_tz = iter.get_next(datetime)

# Convert back to UTC for storage
next_trigger_time = next_trigger_time_tz.astimezone(UTC).replace(tzinfo=None)
expires_at = next_trigger_time + timedelta(hours=retention_hours)
Comment on lines +63 to 68
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Consistency: consider storing expires_at as UTC-aware to match mark_ functions.*

Minor, but keeps a single convention.

-        next_trigger_time_tz = iter.get_next(datetime)
+        next_trigger_time_tz = cron_iter.get_next(datetime)
 
-        # Convert back to UTC for storage
-        next_trigger_time = next_trigger_time_tz.astimezone(UTC).replace(tzinfo=None)
-        expires_at = next_trigger_time + timedelta(hours=retention_hours)
+        # Convert back to UTC for storage
+        next_trigger_time = next_trigger_time_tz.astimezone(UTC).replace(tzinfo=None)
+        expires_at = next_trigger_time.replace(tzinfo=timezone.utc) + timedelta(hours=retention_hours)


try:
await DatabaseTriggers(
type=TriggerTypeEnum.CRON,
expression=trigger.expression,
timezone=trigger.timezone,
graph_name=trigger.graph_name,
namespace=trigger.namespace,
trigger_time=next_trigger_time,
Expand Down
34 changes: 27 additions & 7 deletions state-manager/app/tasks/verify_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from datetime import datetime
from json_schema_to_pydantic import create_model
from zoneinfo import ZoneInfo

from app.models.db.graph_template_model import GraphTemplate
from app.models.graph_template_validation_status import GraphTemplateValidationStatus
Expand All @@ -13,6 +14,9 @@
from app.config.settings import get_settings
from datetime import timedelta

# Cache UTC timezone at module level to avoid repeated instantiation
UTC = ZoneInfo("UTC")

logger = LogsManager().get_logger()

settings = get_settings()
Expand Down Expand Up @@ -105,21 +109,37 @@ async def verify_inputs(graph_template: GraphTemplate, registered_nodes: list[Re
return errors

async def create_crons(graph_template: GraphTemplate):
expressions_to_create = set([trigger.value["expression"] for trigger in graph_template.triggers if trigger.type == TriggerTypeEnum.CRON])
# Build a map of (expression, timezone) -> CronTrigger for deduplication
triggers_to_create = {}
for trigger in graph_template.triggers:
if trigger.value.type == TriggerTypeEnum.CRON:
# trigger.value is already a validated CronTrigger instance
cron_trigger = trigger.value
triggers_to_create[(cron_trigger.expression, cron_trigger.timezone)] = cron_trigger

current_time = datetime.now(UTC).replace(tzinfo=None)

current_time = datetime.now()

new_db_triggers = []
for expression in expressions_to_create:
iter = croniter.croniter(expression, current_time)
for (expression, timezone), cron_trigger in triggers_to_create.items():
# Use the validated timezone (guaranteed to be valid IANA timezone, never None)
tz = ZoneInfo(timezone)

# Get current time in the specified timezone
current_time_tz = current_time.replace(tzinfo=UTC).astimezone(tz)
iter = croniter.croniter(expression, current_time_tz)

# Get next trigger time in the specified timezone
next_trigger_time_tz = iter.get_next(datetime)

next_trigger_time = iter.get_next(datetime)
# Convert back to UTC for storage (remove timezone info for storage)
next_trigger_time = next_trigger_time_tz.astimezone(UTC).replace(tzinfo=None)
Comment on lines +127 to +135
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Name nit + consistency for expires_at.

Avoid shadowing iter and consider UTC-aware expires_at like in mark functions.

-    current_time_tz = current_time.replace(tzinfo=UTC).astimezone(tz)
-    iter = croniter.croniter(expression, current_time_tz)
+    current_time_tz = current_time.replace(tzinfo=UTC).astimezone(tz)
+    cron_iter = croniter.croniter(expression, current_time_tz)
@@
-    next_trigger_time_tz = iter.get_next(datetime)
+    next_trigger_time_tz = cron_iter.get_next(datetime)
@@
-    next_trigger_time = next_trigger_time_tz.astimezone(UTC).replace(tzinfo=None)
-    expires_at = next_trigger_time + timedelta(hours=settings.trigger_retention_hours)
+    next_trigger_time = next_trigger_time_tz.astimezone(UTC).replace(tzinfo=None)
+    expires_at = next_trigger_time.replace(tzinfo=timezone.utc) + timedelta(hours=settings.trigger_retention_hours)

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In state-manager/app/tasks/verify_graph.py around lines 127 to 135, avoid
shadowing the built-in-like name "iter" by renaming it (e.g., cron_iter) and
keep the computed expiry timestamp UTC-aware to match other mark functions:
compute next_trigger_time_tz from cron_iter.get_next(datetime) then convert to
UTC but do NOT strip tzinfo (i.e., set tzinfo=UTC or use astimezone(UTC) and
keep tzinfo), and assign that UTC-aware datetime to expires_at (or
next_trigger_time) for consistent storage and comparisons.

expires_at = next_trigger_time + timedelta(hours=settings.trigger_retention_hours)

new_db_triggers.append(
DatabaseTriggers(
type=TriggerTypeEnum.CRON,
expression=expression,
expression=cron_trigger.expression,
timezone=cron_trigger.timezone,
graph_name=graph_template.name,
namespace=graph_template.namespace,
trigger_status=TriggerStatusEnum.PENDING,
Expand Down
3 changes: 3 additions & 0 deletions state-manager/pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,7 @@ markers =
unit: marks a test as a unit test
with_database: marks a test as a test that requires a database
asyncio_mode = auto
filterwarnings =
ignore::pydantic.PydanticDeprecatedSince211:lazy_model.*
ignore::DeprecationWarning:lazy_model.*

Loading