forked from microsoft/agent-framework
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmagentic_checkpoint.py
More file actions
319 lines (262 loc) · 14.3 KB
/
magentic_checkpoint.py
File metadata and controls
319 lines (262 loc) · 14.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
# Copyright (c) Microsoft. All rights reserved.
import asyncio
import json
import os
from datetime import datetime
from pathlib import Path
from typing import cast
from agent_framework import (
Agent,
FileCheckpointStorage,
Message,
WorkflowCheckpoint,
WorkflowEvent,
WorkflowRunState,
)
from agent_framework.azure import AzureOpenAIResponsesClient
from agent_framework.orchestrations import MagenticBuilder, MagenticPlanReviewRequest
from azure.identity import AzureCliCredential
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
"""
Sample: Magentic Orchestration + Checkpointing
The goal of this sample is to show the exact mechanics needed to pause a Magentic
workflow that requires human plan review, persist the outstanding request via a
checkpoint, and later resume the workflow by feeding in the saved response.
Concepts highlighted here:
1. **Deterministic executor IDs** - the orchestrator and plan-review request executor
must keep stable IDs so the checkpoint state aligns when we rebuild the graph.
2. **Executor snapshotting** - checkpoints capture the pending plan-review request
map, at superstep boundaries.
3. **Resume with responses** - `Workflow.run(responses=...)` accepts a
`responses` mapping so we can inject the stored human reply during restoration.
Prerequisites:
- AZURE_AI_PROJECT_ENDPOINT must be your Azure AI Foundry Agent Service (V2) project endpoint.
- Azure OpenAI configured for AzureOpenAIResponsesClient with required environment variables.
- Authentication via azure-identity. Use AzureCliCredential and run az login before executing the sample.
"""
TASK = (
"Draft a concise internal brief describing how our research and implementation teams should collaborate "
"to launch a beta feature for data-driven email summarization. Highlight the key milestones, "
"risks, and communication cadence."
)
# Dedicated folder for captured checkpoints. Keeping it under the sample directory
# makes it easy to inspect the JSON blobs produced by each run.
CHECKPOINT_DIR = Path(__file__).parent / "tmp" / "magentic_checkpoints"
def build_workflow(checkpoint_storage: FileCheckpointStorage):
"""Construct the Magentic workflow graph with checkpointing enabled."""
# Two vanilla ChatAgents act as participants in the orchestration. They do not need
# extra state handling because their inputs/outputs are fully described by chat messages.
researcher = Agent(
name="ResearcherAgent",
description="Collects background facts and references for the project.",
instructions=("You are the research lead. Gather crisp bullet points the team should know."),
client=AzureOpenAIResponsesClient(
project_endpoint=os.environ["AZURE_AI_PROJECT_ENDPOINT"],
deployment_name=os.environ["AZURE_AI_MODEL_DEPLOYMENT_NAME"],
credential=AzureCliCredential(),
),
)
writer = Agent(
name="WriterAgent",
description="Synthesizes the final brief for stakeholders.",
instructions=("You convert the research notes into a structured brief with milestones and risks."),
client=AzureOpenAIResponsesClient(
project_endpoint=os.environ["AZURE_AI_PROJECT_ENDPOINT"],
deployment_name=os.environ["AZURE_AI_MODEL_DEPLOYMENT_NAME"],
credential=AzureCliCredential(),
),
)
# Create a manager agent for orchestration
manager_agent = Agent(
name="MagenticManager",
description="Orchestrator that coordinates the research and writing workflow",
instructions="You coordinate a team to complete complex tasks efficiently.",
client=AzureOpenAIResponsesClient(
project_endpoint=os.environ["AZURE_AI_PROJECT_ENDPOINT"],
deployment_name=os.environ["AZURE_AI_MODEL_DEPLOYMENT_NAME"],
credential=AzureCliCredential(),
),
)
# The builder wires in the Magentic orchestrator, sets the plan review path, and
# stores the checkpoint backend so the runtime knows where to persist snapshots.
return MagenticBuilder(
participants=[researcher, writer],
enable_plan_review=True,
checkpoint_storage=checkpoint_storage,
manager_agent=manager_agent,
max_round_count=10,
max_stall_count=3,
).build()
async def main() -> None:
# Stage 0: make sure the checkpoint folder is empty so we inspect only checkpoints
# written by this invocation. This prevents stale files from previous runs from
# confusing the analysis.
CHECKPOINT_DIR.mkdir(parents=True, exist_ok=True)
for file in CHECKPOINT_DIR.glob("*.json"):
file.unlink()
checkpoint_storage = FileCheckpointStorage(CHECKPOINT_DIR)
print("\n=== Stage 1: run until plan review request (checkpointing active) ===")
workflow = build_workflow(checkpoint_storage)
# Run the workflow until the first is surfaced. The event carries the
# request_id we must reuse on resume. In a real system this is where the UI would present
# the plan for human review.
plan_review_request: MagenticPlanReviewRequest | None = None
async for event in workflow.run(TASK, stream=True):
if event.type == "request_info" and event.request_type is MagenticPlanReviewRequest:
plan_review_request = event.data
print(f"Captured plan review request: {event.request_id}")
if event.type == "status" and event.state is WorkflowRunState.IDLE_WITH_PENDING_REQUESTS:
break
if plan_review_request is None:
print("No plan review request emitted; nothing to resume.")
return
resume_checkpoint = await checkpoint_storage.get_latest(workflow_name=workflow.name)
if not resume_checkpoint:
print("No checkpoints persisted.")
return
print(f"Using checkpoint {resume_checkpoint.checkpoint_id} at iteration {resume_checkpoint.iteration_count}")
# Show that the checkpoint JSON indeed contains the pending plan-review request record.
checkpoint_path = checkpoint_storage.storage_path / f"{resume_checkpoint.checkpoint_id}.json"
if checkpoint_path.exists():
with checkpoint_path.open() as f:
snapshot = json.load(f)
request_map = snapshot.get("pending_request_info_events", {})
print(f"Pending plan-review requests persisted in checkpoint: {list(request_map.keys())}")
print("\n=== Stage 2: resume from checkpoint and approve plan ===")
resumed_workflow = build_workflow(checkpoint_storage)
# Construct an approval reply to supply when the plan review request is re-emitted.
approval = plan_review_request.approve()
# Resume execution and capture the re-emitted plan review request.
request_info_event: WorkflowEvent | None = None
async for event in resumed_workflow.run(checkpoint_id=resume_checkpoint.checkpoint_id, stream=True):
if event.type == "request_info" and isinstance(event.data, MagenticPlanReviewRequest):
request_info_event = event
if request_info_event is None:
print("No plan review request re-emitted on resume; cannot approve.")
return
print(f"Resumed plan review request: {request_info_event.request_id}")
# Supply the approval and continue to run to completion.
final_event: WorkflowEvent | None = None
async for event in resumed_workflow.run(stream=True, responses={request_info_event.request_id: approval}):
if event.type == "output":
final_event = event
if final_event is None:
print("Workflow did not complete after resume.")
return
# Final sanity check: display the assistant's answer as proof the orchestration reached
# a natural completion after resuming from the checkpoint.
result = final_event.data
if not result:
print("No result data from workflow.")
return
output_messages = cast(list[Message], result)
print("\n=== Final Answer ===")
# The output of the Magentic workflow is a list of ChatMessages with only one final message
# generated by the orchestrator.
print(output_messages[-1].text)
# ------------------------------------------------------------------
# Stage 3: demonstrate resuming from a later checkpoint (post-plan)
# ------------------------------------------------------------------
def _pending_message_count(cp: WorkflowCheckpoint) -> int:
return sum(len(msg_list) for msg_list in cp.messages.values() if isinstance(msg_list, list))
all_checkpoints = await checkpoint_storage.list_checkpoints(workflow_name=resume_checkpoint.workflow_name)
later_checkpoints_with_messages = [
cp
for cp in all_checkpoints
if cp.iteration_count > resume_checkpoint.iteration_count and _pending_message_count(cp) > 0
]
if later_checkpoints_with_messages:
post_plan_checkpoint = max(later_checkpoints_with_messages, key=lambda cp: datetime.fromisoformat(cp.timestamp))
else:
later_checkpoints = [cp for cp in all_checkpoints if cp.iteration_count > resume_checkpoint.iteration_count]
if not later_checkpoints:
print("\nNo additional checkpoints recorded beyond plan approval; sample complete.")
return
post_plan_checkpoint = max(later_checkpoints, key=lambda cp: datetime.fromisoformat(cp.timestamp))
print("\n=== Stage 3: resume from post-plan checkpoint ===")
pending_messages = _pending_message_count(post_plan_checkpoint)
print(
f"Resuming from checkpoint {post_plan_checkpoint.checkpoint_id} at iteration "
f"{post_plan_checkpoint.iteration_count} (pending messages: {pending_messages})"
)
if pending_messages == 0:
print("Checkpoint has no pending messages; no additional work expected on resume.")
final_event_post: WorkflowEvent | None = None
post_emitted_events = False
post_plan_workflow = build_workflow(checkpoint_storage)
async for event in post_plan_workflow.run(checkpoint_id=post_plan_checkpoint.checkpoint_id, stream=True):
post_emitted_events = True
if event.type == "output":
final_event_post = event
if final_event_post is None:
if not post_emitted_events:
print("No new events were emitted; checkpoint already captured a completed run.")
print("\n=== Final Answer (post-plan resume) ===")
print(output_messages[-1].text)
return
print("Workflow did not complete after post-plan resume.")
return
post_result = final_event_post.data
if not post_result:
print("No result data from post-plan resume.")
return
output_messages = cast(list[Message], post_result)
print("\n=== Final Answer (post-plan resume) ===")
# The output of the Magentic workflow is a list of ChatMessages with only one final message
# generated by the orchestrator.
print(output_messages[-1].text)
"""
Sample Output:
=== Stage 1: run until plan review request (checkpointing active) ===
Captured plan review request: 3a1a4a09-4ed1-4c90-9cf6-9ac488d452c0
Using checkpoint 4c76d77a-6ff8-4d2b-84f6-824771ffac7e at iteration 1
Pending plan-review requests persisted in checkpoint: ['3a1a4a09-4ed1-4c90-9cf6-9ac488d452c0']
=== Stage 2: resume from checkpoint and approve plan ===
=== Final Answer ===
Certainly! Here's your concise internal brief on how the research and implementation teams should collaborate for
the beta launch of the data-driven email summarization feature:
---
**Internal Brief: Collaboration Plan for Data-driven Email Summarization Beta Launch**
**Collaboration Approach**
- **Joint Kickoff:** Research and Implementation teams hold a project kickoff to align on objectives, requirements,
and success metrics.
- **Ongoing Coordination:** Teams collaborate closely; researchers share model developments and insights, while
implementation ensures smooth integration and user experience.
- **Real-time Feedback Loop:** Implementation provides early feedback on technical integration and UX, while
Research evaluates initial performance and user engagement signals post-integration.
**Key Milestones**
1. **Requirement Finalization & Scoping** - Define MVP feature set and success criteria.
2. **Model Prototyping & Evaluation** - Researchers develop and validate summarization models with agreed metrics.
3. **Integration & Internal Testing** - Implementation team integrates the model; internal alpha testing and
compliance checks.
4. **Beta User Onboarding** - Recruit a select cohort of beta users and guide them through onboarding.
5. **Beta Launch & Monitoring** - Soft-launch for beta group, with active monitoring of usage, feedback,
and performance.
6. **Iterative Improvements** - Address issues, refine features, and prepare for possible broader rollout.
**Top Risks**
- **Data Privacy & Compliance:** Strict protocols and compliance reviews to prevent data leakage.
- **Model Quality (Bias, Hallucination):** Careful monitoring of summary accuracy; rapid iterations if critical
errors occur.
- **User Adoption:** Ensuring the beta solves genuine user needs, collecting actionable feedback early.
- **Feedback Quality & Quantity:** Proactively schedule user outreach to ensure substantive beta feedback.
**Communication Cadence**
- **Weekly Team Syncs:** Short all-hands progress and blockers meeting.
- **Bi-Weekly Stakeholder Check-ins:** Leadership and project leads address escalations and strategic decisions.
- **Dedicated Slack Channel:** For real-time queries and updates.
- **Documentation Hub:** Up-to-date project docs and FAQs on a shared internal wiki.
- **Post-Milestone Retrospectives:** After critical phases (e.g., alpha, beta), reviewing what worked and what needs
improvement.
**Summary**
Clear alignment, consistent communication, and iterative feedback are key to a successful beta. All team members are
expected to surface issues quickly and keep documentation current as we drive toward launch.
---
=== Stage 3: resume from post-plan checkpoint ===
Resuming from checkpoint 9a3b... at iteration 3 (pending messages: 0)
No new events were emitted; checkpoint already captured a completed run.
=== Final Answer (post-plan resume) ===
(same brief as above)
"""
if __name__ == "__main__":
asyncio.run(main())