Skip to content

Commit d69f011

Browse files
committed
Add LangGraph Functional API Temporal integration proposal
This sample demonstrates the proposed integration between LangGraph's Functional API (@task/@entrypoint) and Temporal. Key findings: - @entrypoint returns Pregel (same as StateGraph.compile()) - Tasks discovered dynamically via CONFIG_KEY_CALL at runtime - No explicit task registration needed - plugin uses identifier() to get module.qualname and executes via dynamic activity Proposed API: - LangGraphFunctionalPlugin(entrypoints={...}) - register entrypoints only - compile("name") in workflows - same API as Graph integration - User writes @workflow.defn with full Temporal feature access
1 parent bb741b3 commit d69f011

File tree

7 files changed

+732
-0
lines changed

7 files changed

+732
-0
lines changed
Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
# LangGraph Functional API + Temporal Integration Proposal
2+
3+
This sample demonstrates the **proposed** integration between LangGraph's Functional API and Temporal using `LangGraphFunctionalPlugin`.
4+
5+
> ⚠️ **Note**: `LangGraphFunctionalPlugin` is a **proposal** and does not exist yet. This sample shows the intended developer experience.
6+
7+
## Key Insights
8+
9+
### 1. `@entrypoint` returns `Pregel`
10+
11+
`@entrypoint` returns a `Pregel` object (same as `StateGraph.compile()`), so we can use the same `compile("name")` API in workflows.
12+
13+
### 2. No Explicit Task Registration Needed
14+
15+
LangGraph **doesn't pre-register tasks**. When `@task` functions are called:
16+
1. They go through `CONFIG_KEY_CALL` callback in the config
17+
2. The callback receives the **actual function object**
18+
3. `identifier(func)` returns `module.qualname` (e.g., `mymodule.research_topic`)
19+
20+
This means the Temporal plugin can discover tasks **dynamically at runtime**:
21+
- Inject `CONFIG_KEY_CALL` callback that schedules a dynamic activity
22+
- The activity receives function identifier + serialized args
23+
- The activity imports the function by module path and executes it
24+
25+
**The worker just needs the task modules to be importable.**
26+
27+
## Overview
28+
29+
```python
30+
# NO explicit task registration!
31+
# Pass entrypoints as list - names extracted from func.__name__
32+
plugin = LangGraphFunctionalPlugin(
33+
entrypoints=[document_workflow, review_workflow],
34+
)
35+
```
36+
37+
Key mappings:
38+
- **`@task` calls → Dynamic Activities**: Discovered at runtime via `CONFIG_KEY_CALL`
39+
- **`@entrypoint` functions → Pregel graphs**: Executed via `compile()` in workflows
40+
- **`interrupt()` → User-handled signals**: Workflow controls pause/resume
41+
42+
## How It Works Internally
43+
44+
```python
45+
# When you call a @task function:
46+
result = await research_topic("AI")
47+
48+
# Internally, @task wraps this in call():
49+
fut = call(research_topic_func, "AI", ...)
50+
51+
# call() reads CONFIG_KEY_CALL from config:
52+
config = get_config()
53+
impl = config[CONF][CONFIG_KEY_CALL]
54+
fut = impl(func, args, ...) # func is the actual function object!
55+
56+
# The plugin's callback:
57+
# 1. Gets identifier: "langgraph_plugin.functional_api_proposal.tasks.research_topic"
58+
# 2. Schedules dynamic activity with identifier + args
59+
# 3. Activity imports function and executes it
60+
```
61+
62+
## Developer Experience
63+
64+
### 1. Define Tasks
65+
66+
```python
67+
# tasks.py
68+
from langgraph.func import task
69+
70+
@task
71+
async def research_topic(topic: str) -> dict:
72+
"""Discovered dynamically when called."""
73+
return {"facts": [...]}
74+
75+
@task
76+
async def write_section(topic: str, section: str, research: dict) -> str:
77+
return f"Content about {topic}..."
78+
```
79+
80+
### 2. Define Entrypoints
81+
82+
```python
83+
# entrypoint.py
84+
from langgraph.func import entrypoint
85+
from langgraph.types import interrupt
86+
from .tasks import research_topic, write_section
87+
88+
@entrypoint()
89+
async def document_workflow(topic: str) -> dict:
90+
# Task calls discovered at runtime via CONFIG_KEY_CALL
91+
research = await research_topic(topic)
92+
93+
intro = write_section(topic, "intro", research)
94+
body = write_section(topic, "body", research)
95+
sections = [await intro, await body]
96+
97+
return {"sections": sections}
98+
```
99+
100+
### 3. Define Temporal Workflows
101+
102+
```python
103+
# workflow.py
104+
from temporalio import workflow
105+
from temporalio.contrib.langgraph import compile
106+
107+
@workflow.defn
108+
class DocumentWorkflow:
109+
@workflow.run
110+
async def run(self, topic: str) -> dict:
111+
app = compile("document_workflow")
112+
result = await app.ainvoke(topic)
113+
return result
114+
```
115+
116+
### 4. Register with Plugin (No Task Registration!)
117+
118+
```python
119+
# run_worker.py
120+
from temporalio.contrib.langgraph import LangGraphFunctionalPlugin
121+
122+
# NO tasks={} needed!
123+
# Pass entrypoints as list - names extracted from func.__name__
124+
plugin = LangGraphFunctionalPlugin(
125+
entrypoints=[document_workflow, review_workflow],
126+
# Optional: default timeout for all task activities
127+
default_task_timeout=timedelta(minutes=10),
128+
# Optional: per-task options by function name
129+
task_options={
130+
"research_topic": {
131+
"start_to_close_timeout": timedelta(minutes=15),
132+
},
133+
},
134+
)
135+
136+
worker = Worker(
137+
client,
138+
task_queue="langgraph-functional",
139+
workflows=[DocumentWorkflow, ReviewWorkflow],
140+
)
141+
```
142+
143+
Note: In workflows, you still use `compile("document_workflow")` by name string
144+
because the workflow sandbox restricts imports (Pregel isn't sandbox-safe).
145+
146+
## Sample Structure
147+
148+
```
149+
functional_api_proposal/
150+
├── tasks.py # @task functions (discovered dynamically)
151+
├── entrypoint.py # @entrypoint functions (→ Pregel)
152+
├── workflow.py # User-defined Temporal workflows
153+
├── run_worker.py # Plugin setup (no task registration!)
154+
├── run_workflow.py # Execute workflows
155+
└── README.md
156+
```
157+
158+
## Running the Sample
159+
160+
```bash
161+
# 1. Start Temporal
162+
temporal server start-dev
163+
164+
# 2. Start Worker
165+
python -m langgraph_plugin.functional_api_proposal.run_worker
166+
167+
# 3. Run Workflows
168+
python -m langgraph_plugin.functional_api_proposal.run_workflow document
169+
python -m langgraph_plugin.functional_api_proposal.run_workflow review
170+
```
171+
172+
## Implementation Details
173+
174+
### Dynamic Activity Execution
175+
176+
The plugin provides a single dynamic activity:
177+
178+
```python
179+
@activity.defn(name="execute_langgraph_task")
180+
async def execute_task(task_id: str, args: bytes, kwargs: bytes) -> bytes:
181+
"""Execute any @task function by module path."""
182+
# Import the function
183+
module_name, func_name = task_id.rsplit(".", 1)
184+
module = importlib.import_module(module_name)
185+
func = getattr(module, func_name)
186+
187+
# Execute
188+
result = await func(*deserialize(args), **deserialize(kwargs))
189+
return serialize(result)
190+
```
191+
192+
### CONFIG_KEY_CALL Injection
193+
194+
When `compile()` is called in a workflow, the plugin injects a custom callback:
195+
196+
```python
197+
def temporal_call_callback(func, args, retry_policy, cache_policy, callbacks):
198+
task_id = identifier(func) # e.g., "mymodule.research_topic"
199+
200+
# Schedule the dynamic activity
201+
return workflow.execute_activity(
202+
"execute_langgraph_task",
203+
args=(task_id, serialize(args)),
204+
start_to_close_timeout=get_timeout(task_id),
205+
retry_policy=convert_retry_policy(retry_policy),
206+
)
207+
```
208+
209+
## Comparison with Graph API
210+
211+
| Aspect | Graph API | Functional API |
212+
|--------|-----------|----------------|
213+
| Definition | `StateGraph` + `add_node()` | `@task` + `@entrypoint` |
214+
| Control flow | Graph edges | Python code |
215+
| Returns | `Pregel` | `Pregel` |
216+
| In-workflow API | `compile("name")` | `compile("name")` |
217+
| Activity discovery | From graph nodes | Dynamic via `CONFIG_KEY_CALL` |
218+
| Registration | `graphs={name: builder}` | `entrypoints=[func, ...]` |
219+
220+
## Why This Works
221+
222+
1. **LangGraph's extensibility**: `CONFIG_KEY_CALL` is designed for custom execution backends
223+
2. **Function identification**: `identifier()` provides stable module paths
224+
3. **Dynamic activities**: Temporal supports activity execution by name
225+
4. **Serialization**: Args/results serialized for activity transport
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
"""LangGraph Functional API + Temporal Integration Proposal."""
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
"""Entrypoint definitions for the Functional API sample.
2+
3+
Each @entrypoint becomes a Temporal workflow when registered
4+
with LangGraphFunctionalPlugin.
5+
"""
6+
7+
from typing import Any
8+
9+
from langgraph.func import entrypoint
10+
from langgraph.types import interrupt
11+
12+
from .tasks import compile_document, research_topic, write_section
13+
14+
15+
@entrypoint()
16+
async def document_workflow(topic: str) -> dict[str, Any]:
17+
"""Create a document about a topic.
18+
19+
Demonstrates:
20+
- Sequential task execution (research)
21+
- Parallel task execution (writing sections)
22+
- Task composition (compiling results)
23+
24+
Each task call becomes a Temporal activity execution.
25+
"""
26+
# Step 1: Research (single activity)
27+
research = await research_topic(topic)
28+
29+
# Step 2: Write sections in parallel (3 concurrent activities)
30+
intro_future = write_section(topic, "introduction", research)
31+
body_future = write_section(topic, "body", research)
32+
conclusion_future = write_section(topic, "conclusion", research)
33+
34+
intro = await intro_future
35+
body = await body_future
36+
conclusion = await conclusion_future
37+
38+
# Step 3: Compile (single activity)
39+
document = await compile_document(
40+
sections=[intro, body, conclusion],
41+
title=f"A Guide to {topic}",
42+
)
43+
44+
return {
45+
"document": document,
46+
"research": research,
47+
"status": "completed",
48+
}
49+
50+
51+
@entrypoint()
52+
async def review_workflow(topic: str) -> dict[str, Any]:
53+
"""Document workflow with human-in-the-loop review.
54+
55+
Demonstrates interrupt() for human review:
56+
- interrupt() pauses the Temporal workflow
57+
- Workflow waits for signal to resume
58+
- Resume with Command(resume=value)
59+
"""
60+
# Generate draft
61+
research = await research_topic(topic)
62+
draft_sections = []
63+
64+
for section_name in ["introduction", "body", "conclusion"]:
65+
section = await write_section(topic, section_name, research)
66+
draft_sections.append(section)
67+
68+
draft = await compile_document(draft_sections, f"Draft: {topic}")
69+
70+
# Human review - pauses workflow until signal received
71+
review_response = interrupt({
72+
"action": "review_document",
73+
"document": draft,
74+
"options": ["approve", "revise", "reject"],
75+
})
76+
77+
decision = review_response.get("decision", "reject")
78+
79+
if decision == "approve":
80+
return {"document": draft, "status": "approved"}
81+
elif decision == "revise":
82+
feedback = review_response.get("feedback", "")
83+
revised_sections = []
84+
for section_name in ["introduction", "body", "conclusion"]:
85+
section = await write_section(
86+
f"{topic} (revised: {feedback})", section_name, research
87+
)
88+
revised_sections.append(section)
89+
revised = await compile_document(revised_sections, f"Revised: {topic}")
90+
return {"document": revised, "status": "revised", "feedback": feedback}
91+
else:
92+
return {"document": None, "status": "rejected"}

0 commit comments

Comments
 (0)