feat(checkpoint): add automatic checkpoint recovery system #137
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Overview
Implements automatic state persistence and recovery for agent execution, enabling agents to resume from the last successful checkpoint after failures.
Problem Solved
Currently, when agent execution fails mid-workflow, there is no mechanism to resume from the last successful step. This results in:
Solution
Added a comprehensive checkpoint/recovery system that automatically:
Changes Made
New Files (3)
core/framework/schemas/checkpoint.py- Checkpoint data models (Pydantic schemas)core/framework/graph/checkpoint_manager.py- Checkpoint orchestration and recovery logiccore/tests/test_checkpoint_recovery.py- Comprehensive test suite (20 tests)Modified Files (4)
core/framework/storage/backend.py- Added checkpoint persistence methodscore/framework/graph/executor.py- Integrated checkpointing into GraphExecutorcore/framework/graph/flexible_executor.py- Integrated checkpointing into FlexibleGraphExecutorcore/framework/runner/runner.py- Wired CheckpointManager into AgentRunnerFeatures
Testing
Code Quality
Backward Compatibility
checkpoint_managerparameterExample Usage
from framework.graph.checkpoint_manager import CheckpointManager
from framework.schemas.checkpoint import CheckpointConfig
Create checkpoint manager (opt-in)
checkpoint_manager = CheckpointManager(storage, CheckpointConfig())
Use with GraphExecutor
executor = GraphExecutor(
runtime=runtime,
checkpoint_manager=checkpoint_manager,
)
result = await executor.execute(graph, goal, input_data)