-
Notifications
You must be signed in to change notification settings - Fork 41
feat: add timezone support for CRON triggers (#438) #463
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
db9e0a0
93071da
770eb4e
4b065e4
9d55322
421c4bf
077ca50
8142f52
b425b31
530b136
7594831
2437624
cacf060
e7fc2f7
9510f29
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1 +1 @@ | ||
| version = "0.0.3b1" | ||
| version = "0.0.3b2" |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,7 +1,11 @@ | ||
| from pydantic import BaseModel, Field, field_validator, model_validator | ||
| from pydantic import BaseModel, Field, field_validator | ||
| from enum import Enum | ||
| from croniter import croniter | ||
| from typing import Self | ||
| from typing import Union, Annotated, Literal | ||
| from zoneinfo import available_timezones | ||
|
|
||
| # Cache available timezones at module level to avoid repeated filesystem queries | ||
| _AVAILABLE_TIMEZONES = available_timezones() | ||
|
|
||
| class TriggerTypeEnum(str, Enum): | ||
| CRON = "CRON" | ||
|
|
@@ -14,7 +18,9 @@ class TriggerStatusEnum(str, Enum): | |
| TRIGGERING = "TRIGGERING" | ||
|
|
||
| class CronTrigger(BaseModel): | ||
| type: Literal[TriggerTypeEnum.CRON] = Field(default=TriggerTypeEnum.CRON, description="Type of the trigger") | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. type is added again here.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This tells Pydantic: "Look at the type field to determine which union variant to instantiate." Without type in CronTrigger, Pydantic won't know how to deserialize the data. I have removed type from Trigger instead and refactored the code to access it via trigger.value.type |
||
| expression: str = Field(..., description="Cron expression for the trigger") | ||
| timezone: str = Field(default="UTC", description="Timezone for the cron expression (e.g., 'America/New_York', 'Europe/London', 'UTC')") | ||
|
|
||
| @field_validator("expression") | ||
| @classmethod | ||
|
|
@@ -23,14 +29,24 @@ def validate_expression(cls, v: str) -> str: | |
| raise ValueError("Invalid cron expression") | ||
| return v | ||
|
|
||
| @field_validator("timezone") | ||
| @classmethod | ||
| def validate_timezone(cls, v: str) -> str: | ||
| if v not in _AVAILABLE_TIMEZONES: | ||
| raise ValueError(f"Invalid timezone: {v}. Must be a valid IANA timezone (e.g., 'America/New_York', 'Europe/London', 'UTC')") | ||
| return v | ||
|
|
||
| # Union type for all trigger types - add new trigger types here | ||
| TriggerValue = Annotated[Union[CronTrigger], Field(discriminator="type")] # type: ignore | ||
|
|
||
| class Trigger(BaseModel): | ||
| type: TriggerTypeEnum = Field(..., description="Type of the trigger") | ||
| value: dict = Field(default_factory=dict, description="Value of the trigger") | ||
|
|
||
| @model_validator(mode="after") | ||
| def validate_trigger(self) -> Self: | ||
| if self.type == TriggerTypeEnum.CRON: | ||
| CronTrigger.model_validate(self.value) | ||
| else: | ||
| raise ValueError(f"Unsupported trigger type: {self.type}") | ||
| return self | ||
| """ | ||
| Extensible trigger model using discriminated unions. | ||
| To add a new trigger type: | ||
| 1. Add the enum value to TriggerTypeEnum | ||
| 2. Create a new trigger class (e.g., WebhookTrigger) with type field | ||
| 3. Add it to the TriggerValue Union | ||
|
|
||
| Note: Access trigger type via trigger.value.type | ||
| """ | ||
| value: TriggerValue = Field(..., description="Value of the trigger") | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,9 +8,13 @@ | |
| from pymongo import ReturnDocument | ||
| from pymongo.errors import DuplicateKeyError | ||
| from app.config.settings import get_settings | ||
| from zoneinfo import ZoneInfo | ||
| import croniter | ||
| import asyncio | ||
|
|
||
| # Cache UTC timezone at module level to avoid repeated instantiation | ||
| UTC = ZoneInfo("UTC") | ||
|
|
||
| logger = LogsManager().get_logger() | ||
|
|
||
| async def get_due_triggers(cron_time: datetime) -> DatabaseTriggers | None: | ||
|
|
@@ -47,16 +51,27 @@ async def mark_as_failed(trigger: DatabaseTriggers, retention_hours: int): | |
|
|
||
| async def create_next_triggers(trigger: DatabaseTriggers, cron_time: datetime, retention_hours: int): | ||
| assert trigger.expression is not None | ||
| iter = croniter.croniter(trigger.expression, trigger.trigger_time) | ||
|
|
||
| # Use the trigger's timezone, defaulting to UTC if not specified | ||
| tz = ZoneInfo(trigger.timezone or "UTC") | ||
|
|
||
| # Convert trigger_time to the specified timezone for croniter | ||
| trigger_time_tz = trigger.trigger_time.replace(tzinfo=UTC).astimezone(tz) | ||
| iter = croniter.croniter(trigger.expression, trigger_time_tz) | ||
|
|
||
|
Comment on lines
+55
to
61
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Back-compat: handle missing timezone attribute and avoid Tests and older records may not have - # Use the trigger's timezone, defaulting to UTC if not specified
- tz = ZoneInfo(trigger.timezone or "UTC")
+ # Resolve timezone safely for old records/tests without the attribute
+ tz_name = getattr(trigger, "timezone", None) or "UTC"
+ tz = ZoneInfo(tz_name)
- # Convert trigger_time to the specified timezone for croniter
- trigger_time_tz = trigger.trigger_time.replace(tzinfo=UTC).astimezone(tz)
- iter = croniter.croniter(trigger.expression, trigger_time_tz)
+ # Convert trigger_time to the specified timezone for croniter
+ trigger_time_tz = trigger.trigger_time.replace(tzinfo=UTC).astimezone(tz)
+ cron_iter = croniter.croniter(trigger.expression, trigger_time_tz)Also avoids shadowing built-in 🧰 Tools🪛 GitHub Actions: State Manager Unit Tests[error] 56-56: pytest failed with AttributeError: Mock object has no attribute 'timezone' during create_next_triggers; the test mocks for DatabaseTriggers without providing a timezone attribute on the trigger mock. Command: uv run pytest tests/ --cov=app --cov-report=xml --cov-report=term-missing --cov-report=html -v --junitxml=full-pytest-report.xml 🤖 Prompt for AI Agents |
||
| while True: | ||
| next_trigger_time = iter.get_next(datetime) | ||
| # Get next trigger time in the specified timezone | ||
| next_trigger_time_tz = iter.get_next(datetime) | ||
|
|
||
| # Convert back to UTC for storage | ||
| next_trigger_time = next_trigger_time_tz.astimezone(UTC).replace(tzinfo=None) | ||
| expires_at = next_trigger_time + timedelta(hours=retention_hours) | ||
|
Comment on lines
+63
to
68
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧹 Nitpick | 🔵 Trivial Consistency: consider storing Minor, but keeps a single convention. - next_trigger_time_tz = iter.get_next(datetime)
+ next_trigger_time_tz = cron_iter.get_next(datetime)
- # Convert back to UTC for storage
- next_trigger_time = next_trigger_time_tz.astimezone(UTC).replace(tzinfo=None)
- expires_at = next_trigger_time + timedelta(hours=retention_hours)
+ # Convert back to UTC for storage
+ next_trigger_time = next_trigger_time_tz.astimezone(UTC).replace(tzinfo=None)
+ expires_at = next_trigger_time.replace(tzinfo=timezone.utc) + timedelta(hours=retention_hours) |
||
|
|
||
| try: | ||
| await DatabaseTriggers( | ||
| type=TriggerTypeEnum.CRON, | ||
| expression=trigger.expression, | ||
| timezone=trigger.timezone, | ||
| graph_name=trigger.graph_name, | ||
| namespace=trigger.namespace, | ||
| trigger_time=next_trigger_time, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,6 +3,7 @@ | |
|
|
||
| from datetime import datetime | ||
| from json_schema_to_pydantic import create_model | ||
| from zoneinfo import ZoneInfo | ||
|
|
||
| from app.models.db.graph_template_model import GraphTemplate | ||
| from app.models.graph_template_validation_status import GraphTemplateValidationStatus | ||
|
|
@@ -13,6 +14,9 @@ | |
| from app.config.settings import get_settings | ||
| from datetime import timedelta | ||
|
|
||
| # Cache UTC timezone at module level to avoid repeated instantiation | ||
| UTC = ZoneInfo("UTC") | ||
|
|
||
| logger = LogsManager().get_logger() | ||
|
|
||
| settings = get_settings() | ||
|
|
@@ -105,21 +109,37 @@ async def verify_inputs(graph_template: GraphTemplate, registered_nodes: list[Re | |
| return errors | ||
|
|
||
| async def create_crons(graph_template: GraphTemplate): | ||
| expressions_to_create = set([trigger.value["expression"] for trigger in graph_template.triggers if trigger.type == TriggerTypeEnum.CRON]) | ||
| # Build a map of (expression, timezone) -> CronTrigger for deduplication | ||
| triggers_to_create = {} | ||
| for trigger in graph_template.triggers: | ||
| if trigger.value.type == TriggerTypeEnum.CRON: | ||
| # trigger.value is already a validated CronTrigger instance | ||
| cron_trigger = trigger.value | ||
| triggers_to_create[(cron_trigger.expression, cron_trigger.timezone)] = cron_trigger | ||
|
|
||
| current_time = datetime.now(UTC).replace(tzinfo=None) | ||
|
|
||
| current_time = datetime.now() | ||
|
|
||
| new_db_triggers = [] | ||
| for expression in expressions_to_create: | ||
| iter = croniter.croniter(expression, current_time) | ||
| for (expression, timezone), cron_trigger in triggers_to_create.items(): | ||
| # Use the validated timezone (guaranteed to be valid IANA timezone, never None) | ||
| tz = ZoneInfo(timezone) | ||
|
|
||
| # Get current time in the specified timezone | ||
| current_time_tz = current_time.replace(tzinfo=UTC).astimezone(tz) | ||
| iter = croniter.croniter(expression, current_time_tz) | ||
|
|
||
| # Get next trigger time in the specified timezone | ||
| next_trigger_time_tz = iter.get_next(datetime) | ||
|
|
||
| next_trigger_time = iter.get_next(datetime) | ||
| # Convert back to UTC for storage (remove timezone info for storage) | ||
| next_trigger_time = next_trigger_time_tz.astimezone(UTC).replace(tzinfo=None) | ||
|
Comment on lines
+127
to
+135
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧹 Nitpick | 🔵 Trivial Name nit + consistency for Avoid shadowing - current_time_tz = current_time.replace(tzinfo=UTC).astimezone(tz)
- iter = croniter.croniter(expression, current_time_tz)
+ current_time_tz = current_time.replace(tzinfo=UTC).astimezone(tz)
+ cron_iter = croniter.croniter(expression, current_time_tz)
@@
- next_trigger_time_tz = iter.get_next(datetime)
+ next_trigger_time_tz = cron_iter.get_next(datetime)
@@
- next_trigger_time = next_trigger_time_tz.astimezone(UTC).replace(tzinfo=None)
- expires_at = next_trigger_time + timedelta(hours=settings.trigger_retention_hours)
+ next_trigger_time = next_trigger_time_tz.astimezone(UTC).replace(tzinfo=None)
+ expires_at = next_trigger_time.replace(tzinfo=timezone.utc) + timedelta(hours=settings.trigger_retention_hours)
🤖 Prompt for AI Agents |
||
| expires_at = next_trigger_time + timedelta(hours=settings.trigger_retention_hours) | ||
|
|
||
| new_db_triggers.append( | ||
| DatabaseTriggers( | ||
| type=TriggerTypeEnum.CRON, | ||
| expression=expression, | ||
| expression=cron_trigger.expression, | ||
| timezone=cron_trigger.timezone, | ||
| graph_name=graph_template.name, | ||
| namespace=graph_template.namespace, | ||
| trigger_status=TriggerStatusEnum.PENDING, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Strip whitespace for consistency with other validators.
All other validators in this file (
validate_node_name,validate_identifier,validate_next_nodes, etc.) strip whitespace from input before validation. The timezone validator should follow this pattern to prevent confusing validation errors when users accidentally include leading/trailing spaces.Apply this diff to maintain consistency:
@field_validator('timezone') @classmethod def validate_timezone(cls, v: str) -> str: - if v not in _AVAILABLE_TIMEZONES: + trimmed_v = v.strip() + if trimmed_v not in _AVAILABLE_TIMEZONES: raise ValueError(f"Invalid timezone: {v}. Must be a valid IANA timezone (e.g., 'America/New_York', 'Europe/London', 'UTC')") - return v + return trimmed_v🤖 Prompt for AI Agents