Skip to content

Commit d72c2de

Browse files
committed
Diverge
1 parent d892661 commit d72c2de

File tree

8 files changed

+104
-220
lines changed

8 files changed

+104
-220
lines changed

message_passing/waiting_for_handlers/README.md

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
1-
# Waiting for message handlers, and performing compensation and cleanup in message handlers
1+
# Waiting for message handlers in message handlers
22

3-
This sample demonstrates how to do the following:
3+
This workflow demonstrates how to wait for signal and update handlers to
4+
finish in the following circumstances:
45

5-
1. Ensure that all update/signal handlers are finished before a successful
6-
workflow return, and on workflow cancellation and failure.
7-
2. Perform compensation/cleanup in an update handler when the workflow is
8-
cancelled or fails.
6+
- Before a successful return
7+
- On failure
8+
- On cancellation
99

10+
Your workflow can also exit via Continue-As-New. In that case you would
11+
usually wait for the handlers to finish immediately before the call to
12+
continue_as_new(); that's not illustrated in this sample.
1013

1114

1215
To run, open two terminals and `cd` to this directory in them.
@@ -29,11 +32,10 @@ workflow exit type: SUCCESS
2932
3033
3134
workflow exit type: FAILURE
32-
🔴 caught exception while waiting for update result: Workflow update failed: The update failed because the workflow run exited
35+
🟢 caller received update result
3336
🔴 caught exception while waiting for workflow result: Workflow execution failed: deliberately failing workflow
3437
3538
3639
workflow exit type: CANCELLATION
37-
🔴 caught exception while waiting for update result: Workflow update failed: The update failed because the workflow run exited
38-
🔴 caught exception while waiting for workflow result: Workflow execution failed: Workflow cancelled
40+
🟢 caller received update result
3941
```

message_passing/waiting_for_handlers/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,8 @@ class WorkflowExitType(IntEnum):
1414
@dataclass
1515
class WorkflowInput:
1616
exit_type: WorkflowExitType
17+
18+
19+
@dataclass
20+
class WorkflowResult:
21+
data: str

message_passing/waiting_for_handlers/activities.py

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,6 @@
33
from temporalio import activity
44

55

6-
@activity.defn
7-
async def activity_executed_to_perform_workflow_compensation():
8-
await asyncio.sleep(1)
9-
10-
116
@activity.defn
127
async def activity_executed_by_update_handler():
138
await asyncio.sleep(1)
14-
15-
16-
@activity.defn
17-
async def activity_executed_by_update_handler_to_perform_compensation():
18-
await asyncio.sleep(1)

message_passing/waiting_for_handlers/starter.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,21 @@
22

33
from temporalio import client, common
44

5-
from message_passing.waiting_for_handlers_and_compensation import (
5+
from message_passing.waiting_for_handlers import (
66
TASK_QUEUE,
77
WORKFLOW_ID,
88
WorkflowExitType,
99
WorkflowInput,
1010
)
11-
from message_passing.waiting_for_handlers_and_compensation.workflows import (
12-
WaitingForHandlersAndCompensationWorkflow,
11+
from message_passing.waiting_for_handlers.workflows import (
12+
WaitingForHandlersWorkflow,
1313
)
1414

1515

1616
async def starter(exit_type: WorkflowExitType):
1717
cl = await client.Client.connect("localhost:7233")
1818
wf_handle = await cl.start_workflow(
19-
WaitingForHandlersAndCompensationWorkflow.run,
19+
WaitingForHandlersWorkflow.run,
2020
WorkflowInput(exit_type=exit_type),
2121
id=WORKFLOW_ID,
2222
task_queue=TASK_QUEUE,
@@ -31,11 +31,13 @@ async def _check_run(
3131
):
3232
try:
3333
up_handle = await wf_handle.start_update(
34-
WaitingForHandlersAndCompensationWorkflow.my_update,
34+
WaitingForHandlersWorkflow.my_update,
3535
wait_for_stage=client.WorkflowUpdateStage.ACCEPTED,
3636
)
3737
except Exception as e:
38-
print(f" 🔴 caught exception while starting update: {e}: {e.__cause__ or ''}")
38+
print(
39+
f" 🔴 caught exception while starting update: {e}: {e.__cause__ or ''}"
40+
)
3941

4042
if exit_type == WorkflowExitType.CANCELLATION:
4143
await wf_handle.cancel()
@@ -51,7 +53,7 @@ async def _check_run(
5153
try:
5254
await wf_handle.result()
5355
print(" 🟢 caller received workflow result")
54-
except Exception as e:
56+
except BaseException as e:
5557
print(
5658
f" 🔴 caught exception while waiting for workflow result: {e}: {e.__cause__ or ''}"
5759
)

message_passing/waiting_for_handlers/worker.py

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,12 @@
44
from temporalio.client import Client
55
from temporalio.worker import Worker
66

7-
from message_passing.waiting_for_handlers_and_compensation import TASK_QUEUE
8-
from message_passing.waiting_for_handlers_and_compensation.activities import (
7+
from message_passing.waiting_for_handlers import TASK_QUEUE
8+
from message_passing.waiting_for_handlers.activities import (
99
activity_executed_by_update_handler,
10-
activity_executed_by_update_handler_to_perform_compensation,
11-
activity_executed_to_perform_workflow_compensation,
1210
)
13-
from message_passing.waiting_for_handlers_and_compensation.workflows import (
14-
WaitingForHandlersAndCompensationWorkflow,
11+
from message_passing.waiting_for_handlers.workflows import (
12+
WaitingForHandlersWorkflow,
1513
)
1614

1715
interrupt_event = asyncio.Event()
@@ -25,11 +23,9 @@ async def main():
2523
async with Worker(
2624
client,
2725
task_queue=TASK_QUEUE,
28-
workflows=[WaitingForHandlersAndCompensationWorkflow],
26+
workflows=[WaitingForHandlersWorkflow],
2927
activities=[
3028
activity_executed_by_update_handler,
31-
activity_executed_by_update_handler_to_perform_compensation,
32-
activity_executed_to_perform_workflow_compensation,
3329
],
3430
):
3531
logging.info("Worker started, ctrl+c to exit")
Lines changed: 41 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -1,180 +1,95 @@
11
import asyncio
22
from datetime import timedelta
3-
from typing import cast
43

54
from temporalio import exceptions, workflow
65

7-
from message_passing.waiting_for_handlers_and_compensation import (
6+
from message_passing.waiting_for_handlers import (
87
WorkflowExitType,
98
WorkflowInput,
9+
WorkflowResult,
1010
)
11-
from message_passing.waiting_for_handlers_and_compensation.activities import (
11+
from message_passing.waiting_for_handlers.activities import (
1212
activity_executed_by_update_handler,
13-
activity_executed_by_update_handler_to_perform_compensation,
14-
activity_executed_to_perform_workflow_compensation,
1513
)
1614

1715

18-
@workflow.defn
19-
class WaitingForHandlersAndCompensationWorkflow:
16+
def is_workflow_exit_exception(e: BaseException) -> bool:
2017
"""
21-
This Workflow demonstrates how to wait for message handlers to finish and
22-
perform compensation/cleanup:
18+
True if the exception is of a type that will cause the workflow to exit.
2319
24-
1. It ensures that all signal and update handlers have finished before a
25-
successful return, and on failure and cancellation.
26-
2. The update handler performs any necessary compensation/cleanup when the
27-
workflow is cancelled or fails.
20+
This is as opposed to exceptions that cause a workflow task failure, which
21+
are retried automatically by Temporal.
2822
"""
23+
# 👉 If you have set additional failure_exception_types you should also
24+
# check for these here.
25+
return isinstance(e, (asyncio.CancelledError, exceptions.FailureError))
2926

30-
def __init__(self) -> None:
31-
# 👉 If the workflow exits prematurely, this future will be completed
32-
# with the associated exception as its value. Message handlers can then
33-
# "race" this future against a task performing the message handler's own
34-
# application logic; if this future completes before the message handler
35-
# task then the handler should abort and perform compensation.
36-
self.workflow_exit: asyncio.Future[None] = asyncio.Future()
37-
38-
# The following two attributes are implementation detail of this sample
39-
# and can be ignored
40-
self._update_started = False
41-
self._update_compensation_done = False
42-
self._workflow_compensation_done = False
4327

28+
@workflow.defn
29+
class WaitingForHandlersWorkflow:
4430
@workflow.run
45-
async def run(self, input: WorkflowInput) -> str:
31+
async def run(self, input: WorkflowInput) -> WorkflowResult:
32+
"""
33+
This workflow.run method demonstrates a pattern that can be used to wait for signal and
34+
update handlers to finish in the following circumstances:
35+
36+
- On successful workflow return
37+
- On workflow cancellation
38+
- On workflow failure
39+
40+
Your workflow can also exit via Continue-As-New. In that case you would usually wait for
41+
the handlers to finish immediately before the call to continue_as_new(); that's not
42+
illustrated in this sample.
43+
44+
If you additionally need to perform cleanup or compensation on workflow failure or
45+
cancellation, see the message_passing/waiting_for_handlers_and_compensation sample.
46+
"""
4647
try:
4748
# 👉 Use this `try...except` style, instead of waiting for message
4849
# handlers to finish in a `finally` block. The reason is that some
4950
# exception types cause a workflow task failure as opposed to
5051
# workflow exit, in which case we do *not* want to wait for message
5152
# handlers to finish.
52-
53-
# 👉 self._run contains your actual application logic. This is
54-
# implemented in a separate method in order to separate
55-
# "platform-level" concerns (waiting for handlers to finish and
56-
# ensuring that compensation is performed when appropriate) from
57-
# application logic. In this sample, its actual implementation is
58-
# below but contains nothing relevant.
59-
result = await self._run(input)
60-
self.workflow_exit.set_result(None)
53+
result = await self._my_workflow_application_logic(input)
6154
await workflow.wait_condition(workflow.all_handlers_finished)
6255
return result
6356
# 👉 Catch BaseException since asyncio.CancelledError does not inherit
6457
# from Exception.
6558
except BaseException as e:
6659
if is_workflow_exit_exception(e):
67-
self.workflow_exit.set_exception(e)
6860
await workflow.wait_condition(workflow.all_handlers_finished)
69-
await self.workflow_compensation()
70-
self._workflow_compensation_done = True
7161
raise
7262

73-
async def workflow_compensation(self):
74-
await workflow.execute_activity(
75-
activity_executed_to_perform_workflow_compensation,
76-
start_to_close_timeout=timedelta(seconds=10),
77-
)
78-
self._update_compensation_done = True
63+
# Methods below this point can be ignored unless you are interested in
64+
# the implementation details of this sample.
65+
66+
def __init__(self) -> None:
67+
self._update_started = False
7968

8069
@workflow.update
8170
async def my_update(self) -> str:
82-
"""
83-
An update handler that handles exceptions raised in its own execution
84-
and in that of the main workflow method.
85-
86-
It ensures that:
87-
- Compensation/cleanup is always performed when appropriate
88-
- The update caller gets the update result, or WorkflowUpdateFailedError
89-
"""
90-
# 👉 As with the main workflow method, the update application logic is
91-
# implemented in a separate method in order to separate "platform-level"
92-
# error-handling and compensation concerns from application logic. Note
93-
# that coroutines must be wrapped in tasks in order to use
94-
# workflow.wait.
95-
update_task = asyncio.create_task(self._my_update())
96-
97-
# 👉 "Race" the workflow_exit future against the handler's own application
98-
# logic. Always use `workflow.wait` instead of `asyncio.wait` in
99-
# Workflow code: asyncio's version is non-deterministic.
100-
await workflow.wait( # type: ignore
101-
[update_task, self.workflow_exit], return_when=asyncio.FIRST_EXCEPTION
102-
)
103-
try:
104-
if update_task.done():
105-
# 👉 The update has finished (whether successfully or not).
106-
# Regardless of whether the main workflow method is about to
107-
# exit or not, the update caller should receive a response
108-
# informing them of the outcome of the update. So return the
109-
# result, or raise the exception that caused the update handler
110-
# to exit.
111-
return await update_task
112-
else:
113-
# 👉 The main workflow method exited prematurely due to an
114-
# error, and this happened before the update finished. Fail the
115-
# update with the workflow exception as cause.
116-
raise exceptions.ApplicationError(
117-
"The update failed because the workflow run exited"
118-
) from cast(BaseException, self.workflow_exit.exception())
119-
# 👉 Catch BaseException since asyncio.CancelledError does not inherit
120-
# from Exception.
121-
except BaseException as e:
122-
if is_workflow_exit_exception(e):
123-
try:
124-
await self.my_update_compensation()
125-
except BaseException as e:
126-
raise exceptions.ApplicationError(
127-
"Update compensation failed"
128-
) from e
129-
raise
130-
131-
async def my_update_compensation(self):
132-
await workflow.execute_activity(
133-
activity_executed_by_update_handler_to_perform_compensation,
134-
start_to_close_timeout=timedelta(seconds=10),
135-
)
136-
self._update_compensation_done = True
137-
138-
@workflow.query
139-
def workflow_compensation_done(self) -> bool:
140-
return self._workflow_compensation_done
141-
142-
@workflow.query
143-
def update_compensation_done(self) -> bool:
144-
return self._update_compensation_done
145-
146-
# The following methods are placeholders for the actual application logic
147-
# that you would perform in your main workflow method or update handler.
148-
# Their implementation can be ignored.
149-
150-
async def _my_update(self) -> str:
151-
# Ignore this method unless you are interested in the implementation
152-
# details of this sample.
15371
self._update_started = True
15472
await workflow.execute_activity(
15573
activity_executed_by_update_handler,
15674
start_to_close_timeout=timedelta(seconds=10),
15775
)
15876
return "update-result"
15977

160-
async def _run(self, input: WorkflowInput) -> str:
161-
# Ignore this method unless you are interested in the implementation
162-
# details of this sample.
78+
async def _my_workflow_application_logic(
79+
self, input: WorkflowInput
80+
) -> WorkflowResult:
81+
# The main workflow logic is implemented in a separate method in order
82+
# to separate "platform-level" concerns (waiting for handlers to finish
83+
# and error handling) from application logic.
16384

16485
# Wait until handlers have started, so that we are demonstrating that we
16586
# wait for them to finish.
16687
await workflow.wait_condition(lambda: self._update_started)
16788
if input.exit_type == WorkflowExitType.SUCCESS:
168-
return "workflow-result"
89+
return WorkflowResult(data="workflow-result")
16990
elif input.exit_type == WorkflowExitType.FAILURE:
17091
raise exceptions.ApplicationError("deliberately failing workflow")
17192
elif input.exit_type == WorkflowExitType.CANCELLATION:
17293
# Block forever; the starter will send a workflow cancellation request.
17394
await asyncio.Future()
17495
raise AssertionError("unreachable")
175-
176-
177-
def is_workflow_exit_exception(e: BaseException) -> bool:
178-
# 👉 If you have set additional failure_exception_types you should also
179-
# check for these here.
180-
return isinstance(e, (asyncio.CancelledError, exceptions.FailureError))

0 commit comments

Comments
 (0)