Skip to content

Commit 8cb3c27

Browse files
committed
Update worker versioning sample to use deployments
1 parent 0c66e2f commit 8cb3c27

File tree

13 files changed

+490
-236
lines changed

13 files changed

+490
-236
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@
33
__pycache__
44
.vscode
55
.DS_Store
6+
.claude

worker_versioning/README.md

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,24 @@
1-
# Worker Versioning Sample
1+
## Worker Versioning
22

3-
This sample shows you how you can use the [Worker Versioning](https://docs.temporal.io/workers#worker-versioning)
4-
feature to deploy incompatible changes to workflow code more easily.
3+
This sample demonstrates how to use Temporal's Worker Versioning feature to safely deploy updates to workflow and activity code. It shows the difference between auto-upgrading and pinned workflows, and how to manage worker deployments with different build IDs.
54

6-
To run, first see [README.md](../README.md) for prerequisites. Then, run the following from the root directory:
5+
The sample creates multiple worker versions (1.0, 1.1, and 2.0) within one deployment and demonstrates:
6+
- **Auto-upgrading workflows**: Automatically and controllably migrate to newer worker versions
7+
- **Pinned workflows**: Stay on the original worker version throughout their lifecycle
8+
- **Compatible vs incompatible changes**: How to make safe updates using `workflow.patched`
79

8-
uv run worker_versioning/example.py
10+
### Steps to run this sample:
911

10-
This will add some Build IDs to a Task Queue, and will also run Workers with those versions to show how you can
11-
mark add versions, mark them as compatible (or not) with one another, and run Workers at specific versions. You'll
12-
see that only the workers only process Workflow Tasks assigned versions they are compatible with.
12+
1) Run a [Temporal service](https://github.com/temporalio/samples-python/tree/main/#how-to-use).
13+
14+
2) Start the main application (this will guide you through the sample):
15+
```bash
16+
uv run worker_versioning/app.py
17+
```
18+
19+
3) Follow the prompts to start workers in separate terminals:
20+
- When prompted, run: `uv run worker_versioning/workerv1.py`
21+
- When prompted, run: `uv run worker_versioning/workerv1_1.py`
22+
- When prompted, run: `uv run worker_versioning/workerv2.py`
23+
24+
The sample will show how auto-upgrading workflows migrate to newer workers while pinned workflows remain on their original version.

worker_versioning/activities.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,23 @@
1+
from dataclasses import dataclass
2+
13
from temporalio import activity
24

35

6+
@dataclass
7+
class IncompatibleActivityInput:
8+
"""Input for the incompatible activity."""
9+
10+
called_by: str
11+
more_data: str
12+
13+
414
@activity.defn
5-
async def greet(inp: str) -> str:
6-
return f"Hi from {inp}"
15+
async def some_activity(called_by: str) -> str:
16+
"""Basic activity for the workflow."""
17+
return f"SomeActivity called by {called_by}"
718

819

920
@activity.defn
10-
async def super_greet(inp: str, some_number: int) -> str:
11-
return f"Hi from {inp} with {some_number}"
21+
async def some_incompatible_activity(input_data: IncompatibleActivityInput) -> str:
22+
"""Incompatible activity that takes different input."""
23+
return f"SomeIncompatibleActivity called by {input_data.called_by} with {input_data.more_data}"

worker_versioning/app.py

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
"""Main application for the worker versioning sample."""
2+
3+
import asyncio
4+
import logging
5+
import uuid
6+
7+
from temporalio.client import Client
8+
9+
from worker_versioning.constants import DEPLOYMENT_NAME, TASK_QUEUE
10+
11+
logging.basicConfig(level=logging.INFO)
12+
13+
14+
async def main() -> None:
15+
client = await Client.connect("localhost:7233")
16+
17+
# Wait for v1 worker and set as current version
18+
logging.info(
19+
"Waiting for v1 worker to appear. Run `python worker_versioning/workerv1.py` in another terminal"
20+
)
21+
await wait_for_worker_and_make_current(client, "1.0")
22+
23+
# Start auto-upgrading and pinned workflows
24+
auto_upgrade_workflow_id = "worker-versioning-versioning-autoupgrade_" + str(
25+
uuid.uuid4()
26+
)
27+
auto_upgrade_execution = await client.start_workflow(
28+
"AutoUpgrading",
29+
id=auto_upgrade_workflow_id,
30+
task_queue=TASK_QUEUE,
31+
)
32+
33+
pinned_workflow_id = "worker-versioning-versioning-pinned_" + str(uuid.uuid4())
34+
pinned_execution = await client.start_workflow(
35+
"Pinned",
36+
id=pinned_workflow_id,
37+
task_queue=TASK_QUEUE,
38+
)
39+
40+
logging.info("Started auto-upgrading workflow: %s", auto_upgrade_execution.id)
41+
logging.info("Started pinned workflow: %s", pinned_execution.id)
42+
43+
# Signal both workflows a few times to drive them
44+
await advance_workflows(auto_upgrade_execution, pinned_execution)
45+
46+
# Now wait for the v1.1 worker to appear and become current
47+
logging.info(
48+
"Waiting for v1.1 worker to appear. Run `python worker_versioning/workerv1_1.py` in another terminal"
49+
)
50+
await wait_for_worker_and_make_current(client, "1.1")
51+
52+
# Once it has, we will continue to advance the workflows.
53+
# The auto-upgrade workflow will now make progress on the new worker, while the pinned one will
54+
# keep progressing on the old worker.
55+
await advance_workflows(auto_upgrade_execution, pinned_execution)
56+
57+
# Finally we'll start the v2 worker, and again it'll become the new current version
58+
logging.info(
59+
"Waiting for v2 worker to appear. Run `python worker_versioning/workerv2.py` in another terminal"
60+
)
61+
await wait_for_worker_and_make_current(client, "2.0")
62+
63+
# Once it has we'll start one more new workflow, another pinned one, to demonstrate that new
64+
# pinned workflows start on the current version.
65+
pinned_workflow_2_id = "worker-versioning-versioning-pinned-2_" + str(uuid.uuid4())
66+
pinned_execution_2 = await client.start_workflow(
67+
"Pinned",
68+
id=pinned_workflow_2_id,
69+
task_queue=TASK_QUEUE,
70+
)
71+
logging.info("Started pinned workflow v2: %s", pinned_execution_2.id)
72+
73+
# Now we'll conclude all workflows. You should be able to see in your server UI that the pinned
74+
# workflow always stayed on 1.0, while the auto-upgrading workflow migrated.
75+
for handle in [auto_upgrade_execution, pinned_execution, pinned_execution_2]:
76+
await handle.signal("do_next_signal", "conclude")
77+
await handle.result()
78+
79+
logging.info("All workflows completed")
80+
81+
82+
async def advance_workflows(auto_upgrade_execution, pinned_execution):
83+
"""Signal both workflows a few times to drive them."""
84+
for i in range(3):
85+
await auto_upgrade_execution.signal("do_next_signal", "do-activity")
86+
await pinned_execution.signal("do_next_signal", "some-signal")
87+
88+
89+
async def wait_for_worker_and_make_current(client: Client, build_id: str) -> None:
90+
import temporalio.api.workflowservice.v1 as wsv1
91+
from temporalio.common import WorkerDeploymentVersion
92+
93+
target_version = WorkerDeploymentVersion(
94+
deployment_name=DEPLOYMENT_NAME, build_id=build_id
95+
)
96+
97+
# Wait for the worker to appear
98+
while True:
99+
try:
100+
describe_request = wsv1.DescribeWorkerDeploymentRequest(
101+
namespace=client.namespace,
102+
deployment_name=DEPLOYMENT_NAME,
103+
)
104+
response = await client.workflow_service.describe_worker_deployment(
105+
describe_request
106+
)
107+
108+
# Check if our version is present in the version summaries
109+
for version_summary in response.worker_deployment_info.version_summaries:
110+
if (
111+
version_summary.deployment_version.deployment_name
112+
== target_version.deployment_name
113+
and version_summary.deployment_version.build_id
114+
== target_version.build_id
115+
):
116+
break
117+
else:
118+
await asyncio.sleep(1)
119+
continue
120+
121+
break
122+
123+
except Exception:
124+
await asyncio.sleep(1)
125+
continue
126+
127+
# Once the version is available, set it as current
128+
set_request = wsv1.SetWorkerDeploymentCurrentVersionRequest(
129+
namespace=client.namespace,
130+
deployment_name=DEPLOYMENT_NAME,
131+
build_id=target_version.build_id,
132+
)
133+
await client.workflow_service.set_worker_deployment_current_version(set_request)
134+
135+
136+
if __name__ == "__main__":
137+
asyncio.run(main())

worker_versioning/constants.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
"""Constants for the worker versioning sample."""
2+
3+
# Task queue name
4+
TASK_QUEUE = "worker-versioning"
5+
6+
# Deployment name
7+
DEPLOYMENT_NAME = "my-deployment"

worker_versioning/example.py

Lines changed: 0 additions & 116 deletions
This file was deleted.

worker_versioning/workerv1.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
"""Worker v1 for the worker versioning sample."""
2+
3+
import asyncio
4+
import logging
5+
6+
from temporalio.client import Client
7+
from temporalio.common import WorkerDeploymentVersion
8+
from temporalio.worker import Worker, WorkerDeploymentConfig
9+
10+
from worker_versioning.activities import some_activity, some_incompatible_activity
11+
from worker_versioning.constants import DEPLOYMENT_NAME, TASK_QUEUE
12+
from worker_versioning.workflows import AutoUpgradingWorkflowV1, PinnedWorkflowV1
13+
14+
logging.basicConfig(level=logging.INFO)
15+
16+
17+
async def main() -> None:
18+
"""Run worker v1."""
19+
client = await Client.connect("localhost:7233")
20+
21+
# Create worker v1
22+
worker = Worker(
23+
client,
24+
task_queue=TASK_QUEUE,
25+
workflows=[AutoUpgradingWorkflowV1, PinnedWorkflowV1],
26+
activities=[some_activity, some_incompatible_activity],
27+
deployment_config=WorkerDeploymentConfig(
28+
version=WorkerDeploymentVersion(
29+
deployment_name=DEPLOYMENT_NAME, build_id="1.0"
30+
),
31+
use_worker_versioning=True,
32+
),
33+
)
34+
35+
logging.info("Starting worker v1 (build 1.0)")
36+
await worker.run()
37+
38+
39+
if __name__ == "__main__":
40+
asyncio.run(main())

0 commit comments

Comments
 (0)