Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
4a71a3d
feat: add checkpoints
acoola Feb 17, 2026
a54a45c
fix: clean up docstrings
acoola Feb 17, 2026
705c94b
chore: small improvements
acoola Feb 17, 2026
7ad8541
chore: improve checkpoint context
acoola Feb 17, 2026
4e07478
fix: flow checkpoints
acoola Feb 24, 2026
f872085
feat: handle checkpoints for the iterative nodes
acoola Feb 24, 2026
aa05040
fix: node blocking issue during parallel run
acoola Feb 24, 2026
190671c
feat: handle cehckpoints in orchestrators
acoola Feb 25, 2026
bfe3d72
chore: improve configs
acoola Feb 25, 2026
1d17d51
chore: add improvemements and fixes
acoola Feb 26, 2026
ab0a4c9
chore: fixes and improvements
acoola Feb 26, 2026
ada344c
fix: iteration logic
acoola Feb 26, 2026
735cc65
chore: fixes and improvements
acoola Feb 26, 2026
0e2f7eb
fix: orchestrators
acoola Feb 27, 2026
a5fc97c
feat: add checpoint behaviour, improve filesytem
acoola Feb 27, 2026
6c360d4
fix: serialization and tests
acoola Feb 28, 2026
79525ad
feat: handle checkpoints in yaml
acoola Feb 28, 2026
87f46de
feat: handle checkpoints in async runs, add tests
acoola Mar 1, 2026
bd4c0e0
fix: few issues
acoola Mar 2, 2026
244f37d
fix: sync/async locking and other issues
acoola Mar 2, 2026
aaaf66a
fix: issues and edge cases
acoola Mar 2, 2026
4b56dfb
fix: add extra check data in chk state
acoola Mar 2, 2026
97f6db2
chore: merge main branch
TrachukT Apr 6, 2026
c52f2c3
feat: update flow checkpoint creation, update with latest changes
TrachukT Apr 6, 2026
5815a23
feat: fix comments, add tests and example
TrachukT Apr 7, 2026
78f4405
feat: update default offset value, orchestrator fix
TrachukT Apr 8, 2026
e3323ba
feat: rollback graph changes
TrachukT Apr 8, 2026
aa72526
feat: update history offset tests
TrachukT Apr 8, 2026
cb912eb
feat: add save mid run for orchestrators
TrachukT Apr 8, 2026
2269908
chore: merge main branch
TrachukT Apr 14, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions .cursor/BUGBOT.md
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,75 @@ If a PR modifies prompt handling in nodes:

---

## Checkpoint & Durability

### Checkpoint state must call super().to_checkpoint_state()

If a PR adds or modifies `to_checkpoint_state()` in any node subclass without calling `super()`:
- Add a blocking Bug titled "to_checkpoint_state() must call super()"
- Body: "Node subclasses overriding `to_checkpoint_state()` must call `super().to_checkpoint_state()` and merge the result. Otherwise `approval_response` and `iteration` fields from the base `Node` are lost. Pattern:
```python
def to_checkpoint_state(self) -> MyCheckpointState:
base_fields = super().to_checkpoint_state().model_dump(exclude_none=True)
return MyCheckpointState(my_field=self._my_data, **base_fields)
```"

### Guard state reset on resume

If a PR modifies `reset_run_state()` in agents or orchestrators without checking `is_resumed`:
- Add a blocking Bug titled "reset_run_state() must guard against wiping resumed state"
- Body: "When an agent/orchestrator resumes from a checkpoint, `reset_run_state()` is called before `get_start_iteration()` restores state. Internal state (e.g., `self.state`, `self.manager`) must NOT be reset if `self.is_resumed` is `True`:
```python
def reset_run_state(self):
if not self.is_resumed:
self.state.reset()
super().reset_run_state()
```"

### Increment _completed_iterations after work succeeds

If a PR modifies orchestrator execution loops and sets `_completed_iterations` before the action completes:
- Add a blocking Bug titled "_completed_iterations set before action completion"
- Body: "In orchestrator loops, `_completed_iterations` must be incremented AFTER the work succeeds (delegation, task execution, state transition), not before. If the action fails mid-execution, the iteration would be incorrectly counted as completed, and on resume the orchestrator would skip it instead of retrying."

### Use IterativeCheckpointMixin for iterative nodes

If a PR adds a new node with iterative/looping execution without implementing `IterativeCheckpointMixin`:
- Add a Bug titled "Consider IterativeCheckpointMixin for loop-level durability"
- Body: "Nodes with iterative execution (loops, multi-step processing) should implement `IterativeCheckpointMixin` to enable mid-loop resume. Implement `get_iteration_state()` and `restore_iteration_state()` with a typed Pydantic model for iteration data."

### Use Prompt.serialize_messages/deserialize_messages for message persistence

If a PR serializes prompt messages using `model_dump()` or manual dict construction:
- Add a Bug titled "Use Prompt serialization methods for messages"
- Body: "Use `Prompt.serialize_messages()` and `Prompt.deserialize_messages()` for checkpoint persistence. These handle `Message` vs `VisionMessage` discrimination via the `MessageType` enum and preserve the `static` attribute that `model_dump()` excludes."

### CheckpointConfig.to_dict() must use mode='json'

If a PR modifies `CheckpointConfig.to_dict()` without `mode='json'`:
- Add a blocking Bug titled "CheckpointConfig.to_dict() must use mode='json'"
- Body: "Pydantic's `model_dump()` in default Python mode returns enum objects even with `use_enum_values=True`. Use `model_dump(mode='json')` to ensure enums serialize as string values for YAML/JSON compatibility. Exclude `backend` (serialize via `backend.to_dict()`), `context` (runtime closures), and `resume_from` (per-run parameter)."

### Filesystem backend must use atomic writes

If a PR modifies `FileSystem.save()` to use `open()` + `write()` instead of atomic write:
- Add a blocking Bug titled "Filesystem checkpoint writes must be atomic"
- Body: "Use `tempfile.mkstemp()` + `os.replace()` for atomic writes. Never use `open(path, 'w')` directly — it truncates the file before writing, creating a race window where concurrent readers see empty/corrupted data."

### Filesystem backend must reuse run directories

If a PR modifies `FileSystem.save()` without checking for existing run directories:
- Add a Bug titled "Filesystem backend must reuse existing run directories"
- Body: "When saving a checkpoint with the same `wf_run_id`, reuse the existing run directory instead of creating a new one. Use `_find_existing_run_dir()` with glob suffix matching (`*__{safe_id}`) to find existing directories. This groups all checkpoints from the same logical execution together."

### Use UTC-aware datetimes for checkpoint timestamps

If a PR uses `datetime.now()` (timezone-naive) in checkpoint-related code:
- Add a blocking Bug titled "Use utc_now() for checkpoint timestamps"
- Body: "All checkpoint timestamps must use `utc_now()` which returns `datetime.now(timezone.utc)`. Mixing timezone-aware and timezone-naive datetimes causes `TypeError` on comparison and produces inconsistent data."

---

## Connection Management

### New connection nodes must extend ConnectionNode
Expand Down
24 changes: 24 additions & 0 deletions dynamiq/checkpoints/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from dynamiq.checkpoints.checkpoint import (
BaseCheckpointState,
CheckpointFlowMixin,
CheckpointNodeMixin,
FlowCheckpoint,
IterativeCheckpointMixin,
NodeCheckpointState,
)
from dynamiq.checkpoints.config import CheckpointBehavior, CheckpointConfig, CheckpointContext
from dynamiq.checkpoints.types import CheckpointStatus, utc_now

__all__ = [
"BaseCheckpointState",
"CheckpointBehavior",
"CheckpointConfig",
"CheckpointContext",
"CheckpointFlowMixin",
"CheckpointNodeMixin",
"CheckpointStatus",
"FlowCheckpoint",
"IterativeCheckpointMixin",
"NodeCheckpointState",
"utc_now",
]
3 changes: 3 additions & 0 deletions dynamiq/checkpoints/backends/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from dynamiq.checkpoints.backends.base import CheckpointBackend
from dynamiq.checkpoints.backends.filesystem import FileSystem
from dynamiq.checkpoints.backends.in_memory import InMemory
215 changes: 215 additions & 0 deletions dynamiq/checkpoints/backends/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
from __future__ import annotations

import asyncio
from abc import ABC, abstractmethod
from datetime import datetime, timezone
from functools import cached_property
from typing import TYPE_CHECKING, Any

from pydantic import BaseModel, ConfigDict, Field, computed_field

from dynamiq.checkpoints.types import CheckpointStatus
from dynamiq.utils import generate_uuid

if TYPE_CHECKING:
from dynamiq.checkpoints.checkpoint import FlowCheckpoint

DEFAULT_CLEANUP_FETCH_LIMIT = 1000
DEFAULT_LIST_LIMIT = 10


class CheckpointBackend(ABC, BaseModel):
"""Abstract base class for checkpoint storage backends."""

model_config = ConfigDict(arbitrary_types_allowed=True)

name: str = Field(default="CheckpointBackend", description="Backend name")
id: str = Field(default_factory=generate_uuid, description="Unique backend instance ID")
max_list_results: int = Field(default=DEFAULT_LIST_LIMIT, description="Max checkpoints returned by list queries")
max_cleanup_results: int = Field(
default=DEFAULT_CLEANUP_FETCH_LIMIT, description="Max checkpoints fetched during cleanup"
)

@property
def to_dict_exclude_params(self) -> dict[str, bool]:
return {}

def to_dict(self, include_secure_params: bool = False, **kwargs) -> dict[str, Any]:
kwargs.pop("include_secure_params", None)
kwargs.pop("for_tracing", None)
return self.model_dump(exclude=kwargs.pop("exclude", self.to_dict_exclude_params), **kwargs)

@computed_field
@cached_property
def type(self) -> str:
return f"{self.__module__.rsplit('.', 1)[0]}.{self.__class__.__name__}"

@abstractmethod
def save(self, checkpoint: FlowCheckpoint) -> str:
"""Save a checkpoint and return its ID."""
raise NotImplementedError

@abstractmethod
def load(self, checkpoint_id: str) -> FlowCheckpoint | None:
"""Load a checkpoint by ID, returns None if not found."""
raise NotImplementedError

@abstractmethod
def update(self, checkpoint: FlowCheckpoint) -> None:
"""Update an existing checkpoint."""
raise NotImplementedError

@abstractmethod
def delete(self, checkpoint_id: str) -> bool:
"""Delete a checkpoint, returns True if deleted."""
raise NotImplementedError

@abstractmethod
def get_list_by_flow(
self,
flow_id: str,
*,
status: CheckpointStatus | None = None,
limit: int | None = None,
before: datetime | None = None,
) -> list[FlowCheckpoint]:
"""List checkpoints for a flow, newest first. Use limit=None to get all."""
raise NotImplementedError

@abstractmethod
def get_latest_by_flow(self, flow_id: str, *, status: CheckpointStatus | None = None) -> FlowCheckpoint | None:
"""Get the most recent checkpoint for a flow."""
raise NotImplementedError

def cleanup_by_flow(self, flow_id: str, *, keep_count: int = 10, max_ttl_minutes: int | None = None) -> int:
"""Remove old checkpoints for a flow, returns count deleted."""
checkpoints = self.get_list_by_flow(flow_id, limit=self.max_cleanup_results)
deleted = 0

for i, cp in enumerate(checkpoints):
should_delete = i >= keep_count
if max_ttl_minutes is not None and not should_delete:
age_minutes = (datetime.now(timezone.utc) - cp.created_at).total_seconds() / 60
should_delete = age_minutes > max_ttl_minutes
if should_delete and self.delete(cp.id):
deleted += 1

return deleted

def get_list_by_run(self, run_id: str, *, limit: int | None = None) -> list[FlowCheckpoint]:
"""List checkpoints matching run_id or wf_run_id. Use limit=None to get all."""
raise NotImplementedError("get_list_by_run not implemented for this backend")

def get_list_by_flow_and_run(
self,
flow_id: str,
run_id: str,
*,
wf_run_id: str | None = None,
status: CheckpointStatus | None = None,
limit: int | None = None,
) -> list[FlowCheckpoint]:
"""List checkpoints for a specific flow and run (matches run_id or wf_run_id), newest first."""
raise NotImplementedError("get_list_by_flow_and_run not implemented for this backend")

def get_latest_by_flow_and_run(
self,
flow_id: str,
run_id: str,
*,
wf_run_id: str | None = None,
status: CheckpointStatus | None = None,
) -> FlowCheckpoint | None:
"""Get the most recent checkpoint for a specific flow and run."""
results = self.get_list_by_flow_and_run(flow_id, run_id, wf_run_id=wf_run_id, status=status, limit=1)
return results[0] if results else None

def get_chain(self, checkpoint_id: str) -> list[FlowCheckpoint]:
"""Walk parent_checkpoint_id links to build a checkpoint chain (newest first)."""
chain: list[FlowCheckpoint] = []
current_id: str | None = checkpoint_id
seen: set[str] = set()
while current_id and current_id not in seen:
seen.add(current_id)
cp = self.load(current_id)
if not cp:
break
chain.append(cp)
current_id = cp.parent_checkpoint_id
return chain

async def save_async(self, checkpoint: FlowCheckpoint) -> str:
"""Async save - runs sync method in thread pool to avoid blocking event loop."""
return await asyncio.to_thread(self.save, checkpoint)

async def load_async(self, checkpoint_id: str) -> FlowCheckpoint | None:
"""Async load - runs sync method in thread pool to avoid blocking event loop."""
return await asyncio.to_thread(self.load, checkpoint_id)

async def update_async(self, checkpoint: FlowCheckpoint) -> None:
"""Async update - runs sync method in thread pool to avoid blocking event loop."""
return await asyncio.to_thread(self.update, checkpoint)

async def delete_async(self, checkpoint_id: str) -> bool:
"""Async delete - runs sync method in thread pool to avoid blocking event loop."""
return await asyncio.to_thread(self.delete, checkpoint_id)

async def get_list_by_flow_async(
self,
flow_id: str,
*,
status: CheckpointStatus | None = None,
limit: int | None = None,
before: datetime | None = None,
) -> list[FlowCheckpoint]:
"""Async list checkpoints for a flow - runs sync method in thread pool."""
return await asyncio.to_thread(self.get_list_by_flow, flow_id, status=status, limit=limit, before=before)

async def get_latest_by_flow_async(
self, flow_id: str, *, status: CheckpointStatus | None = None
) -> FlowCheckpoint | None:
"""Async get most recent checkpoint for a flow - runs sync method in thread pool."""
return await asyncio.to_thread(self.get_latest_by_flow, flow_id, status=status)

async def get_list_by_run_async(self, run_id: str, *, limit: int | None = None) -> list[FlowCheckpoint]:
"""Async list checkpoints by run ID - runs sync method in thread pool."""
return await asyncio.to_thread(self.get_list_by_run, run_id, limit=limit)

async def get_list_by_flow_and_run_async(
self,
flow_id: str,
run_id: str,
*,
wf_run_id: str | None = None,
status: CheckpointStatus | None = None,
limit: int | None = None,
) -> list[FlowCheckpoint]:
"""Async list checkpoints for a specific flow and run - runs sync method in thread pool."""
return await asyncio.to_thread(
self.get_list_by_flow_and_run, flow_id, run_id, wf_run_id=wf_run_id, status=status, limit=limit
)

async def get_latest_by_flow_and_run_async(
self,
flow_id: str,
run_id: str,
*,
wf_run_id: str | None = None,
status: CheckpointStatus | None = None,
) -> FlowCheckpoint | None:
"""Async get most recent checkpoint for a specific flow and run - runs sync method in thread pool."""
return await asyncio.to_thread(
self.get_latest_by_flow_and_run, flow_id, run_id, wf_run_id=wf_run_id, status=status
)

async def get_chain_async(self, checkpoint_id: str) -> list[FlowCheckpoint]:
"""Async walk checkpoint chain - runs sync method in thread pool."""
return await asyncio.to_thread(self.get_chain, checkpoint_id)

async def cleanup_by_flow_async(
self, flow_id: str, *, keep_count: int = 10, max_ttl_minutes: int | None = None
) -> int:
"""Async cleanup - runs sync method in thread pool to avoid blocking event loop."""
return await asyncio.to_thread(
self.cleanup_by_flow, flow_id, keep_count=keep_count, max_ttl_minutes=max_ttl_minutes
)
Loading
Loading