Skip to content

Commit f336fcc

Browse files
committed
Start cluster automatically; use update to wait until started
1 parent 950681e commit f336fcc

File tree

6 files changed

+53
-15
lines changed

6 files changed

+53
-15
lines changed

message_passing/safe_message_handlers/README.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
This sample shows off important techniques for handling signals and updates, aka messages. In particular, it illustrates how message handlers can interleave or not be completed before the workflow completes, and how you can manage that.
44

55
* Here, using workflow.wait_condition, signal and update handlers will only operate when the workflow is within a certain state--between cluster_started and cluster_shutdown.
6-
* You can run start_workflow with an initializer signal that you want to run before anything else other than the workflow's constructor. This pattern is known as "signal-with-start."
76
* Message handlers can block and their actions can be interleaved with one another and with the main workflow. This can easily cause bugs, so you can use a lock to protect shared state from interleaved access.
87
* An "Entity" workflow, i.e. a long-lived workflow, periodically "continues as new". It must do this to prevent its history from growing too large, and it passes its state to the next workflow. You can check `workflow.info().is_continue_as_new_suggested()` to see when it's time.
98
* Most people want their message handlers to finish before the workflow run completes or continues as new. Use `await workflow.wait_condition(lambda: workflow.all_handlers_finished())` to achieve this.

message_passing/safe_message_handlers/activities.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,16 @@ class AssignNodesToJobInput:
1111
job_name: str
1212

1313

14+
@dataclass
15+
class ClusterState:
16+
node_ids: List[str]
17+
18+
19+
@activity.defn
20+
async def start_cluster() -> ClusterState:
21+
return ClusterState(node_ids=[f"{i}" for i in range(25)])
22+
23+
1424
@activity.defn
1525
async def assign_nodes_to_job(input: AssignNodesToJobInput) -> None:
1626
print(f"Assigning nodes {input.nodes} to job {input.job_name}")

message_passing/safe_message_handlers/starter.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@
1616

1717

1818
async def do_cluster_lifecycle(wf: WorkflowHandle, delay_seconds: Optional[int] = None):
19-
20-
await wf.signal(ClusterManagerWorkflow.start_cluster)
19+
cluster_status = await wf.execute_update(
20+
ClusterManagerWorkflow.wait_until_cluster_started
21+
)
22+
print(f"Cluster started with {len(cluster_status.nodes)} nodes")
2123

2224
print("Assigning jobs to nodes...")
2325
allocation_updates = []

message_passing/safe_message_handlers/worker.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
ClusterManagerWorkflow,
99
assign_nodes_to_job,
1010
find_bad_nodes,
11+
start_cluster,
1112
unassign_nodes_for_job,
1213
)
1314

@@ -21,7 +22,12 @@ async def main():
2122
client,
2223
task_queue="safe-message-handlers-task-queue",
2324
workflows=[ClusterManagerWorkflow],
24-
activities=[assign_nodes_to_job, unassign_nodes_for_job, find_bad_nodes],
25+
activities=[
26+
assign_nodes_to_job,
27+
unassign_nodes_for_job,
28+
find_bad_nodes,
29+
start_cluster,
30+
],
2531
):
2632
logging.info("ClusterManagerWorkflow worker started, ctrl+c to exit")
2733
await interrupt_event.wait()

message_passing/safe_message_handlers/workflow.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
UnassignNodesForJobInput,
1515
assign_nodes_to_job,
1616
find_bad_nodes,
17+
start_cluster,
1718
unassign_nodes_for_job,
1819
)
1920

@@ -81,11 +82,10 @@ def __init__(self, input: ClusterManagerInput) -> None:
8182
self.max_history_length: Optional[int] = None
8283
self.sleep_interval_seconds: int = 600
8384

84-
@workflow.signal
85-
async def start_cluster(self) -> None:
86-
self.state.cluster_started = True
87-
self.state.nodes = {str(k): None for k in range(25)}
88-
workflow.logger.info("Cluster started")
85+
@workflow.update
86+
async def wait_until_cluster_started(self) -> ClusterManagerState:
87+
await workflow.wait_condition(lambda: self.state.cluster_started)
88+
return self.state
8989

9090
@workflow.signal
9191
async def shutdown_cluster(self) -> None:
@@ -213,6 +213,13 @@ async def perform_health_checks(self) -> None:
213213

214214
@workflow.run
215215
async def run(self, input: ClusterManagerInput) -> ClusterManagerResult:
216+
cluster_state = await workflow.execute_activity(
217+
start_cluster, schedule_to_close_timeout=timedelta(seconds=10)
218+
)
219+
self.state.nodes = {k: None for k in cluster_state.node_ids}
220+
self.state.cluster_started = True
221+
workflow.logger.info("Cluster started")
222+
216223
await workflow.wait_condition(lambda: self.state.cluster_started)
217224
# Perform health checks at intervals.
218225
while True:

tests/message_passing/safe_message_handlers/workflow_test.py

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from message_passing.safe_message_handlers.activities import (
1111
assign_nodes_to_job,
1212
find_bad_nodes,
13+
start_cluster,
1314
unassign_nodes_for_job,
1415
)
1516
from message_passing.safe_message_handlers.workflow import (
@@ -19,6 +20,13 @@
1920
ClusterManagerWorkflow,
2021
)
2122

23+
ACTIVITIES = [
24+
assign_nodes_to_job,
25+
unassign_nodes_for_job,
26+
find_bad_nodes,
27+
start_cluster,
28+
]
29+
2230

2331
async def test_safe_message_handlers(client: Client, env: WorkflowEnvironment):
2432
if env.supports_time_skipping:
@@ -30,15 +38,17 @@ async def test_safe_message_handlers(client: Client, env: WorkflowEnvironment):
3038
client,
3139
task_queue=task_queue,
3240
workflows=[ClusterManagerWorkflow],
33-
activities=[assign_nodes_to_job, unassign_nodes_for_job, find_bad_nodes],
41+
activities=ACTIVITIES,
3442
):
3543
cluster_manager_handle = await client.start_workflow(
3644
ClusterManagerWorkflow.run,
3745
ClusterManagerInput(),
3846
id=f"ClusterManagerWorkflow-{uuid.uuid4()}",
3947
task_queue=task_queue,
4048
)
41-
await cluster_manager_handle.signal(ClusterManagerWorkflow.start_cluster)
49+
await cluster_manager_handle.execute_update(
50+
ClusterManagerWorkflow.wait_until_cluster_started
51+
)
4252

4353
allocation_updates = []
4454
for i in range(6):
@@ -82,7 +92,7 @@ async def test_update_idempotency(client: Client, env: WorkflowEnvironment):
8292
client,
8393
task_queue=task_queue,
8494
workflows=[ClusterManagerWorkflow],
85-
activities=[assign_nodes_to_job, unassign_nodes_for_job, find_bad_nodes],
95+
activities=ACTIVITIES,
8696
):
8797
cluster_manager_handle = await client.start_workflow(
8898
ClusterManagerWorkflow.run,
@@ -91,7 +101,9 @@ async def test_update_idempotency(client: Client, env: WorkflowEnvironment):
91101
task_queue=task_queue,
92102
)
93103

94-
await cluster_manager_handle.signal(ClusterManagerWorkflow.start_cluster)
104+
await cluster_manager_handle.execute_update(
105+
ClusterManagerWorkflow.wait_until_cluster_started
106+
)
95107

96108
result_1 = await cluster_manager_handle.execute_update(
97109
ClusterManagerWorkflow.assign_nodes_to_job,
@@ -121,7 +133,7 @@ async def test_update_failure(client: Client, env: WorkflowEnvironment):
121133
client,
122134
task_queue=task_queue,
123135
workflows=[ClusterManagerWorkflow],
124-
activities=[assign_nodes_to_job, unassign_nodes_for_job, find_bad_nodes],
136+
activities=ACTIVITIES,
125137
):
126138
cluster_manager_handle = await client.start_workflow(
127139
ClusterManagerWorkflow.run,
@@ -130,7 +142,9 @@ async def test_update_failure(client: Client, env: WorkflowEnvironment):
130142
task_queue=task_queue,
131143
)
132144

133-
await cluster_manager_handle.signal(ClusterManagerWorkflow.start_cluster)
145+
await cluster_manager_handle.execute_update(
146+
ClusterManagerWorkflow.wait_until_cluster_started
147+
)
134148

135149
await cluster_manager_handle.execute_update(
136150
ClusterManagerWorkflow.assign_nodes_to_job,

0 commit comments

Comments
 (0)