@@ -65,8 +65,17 @@ class ClusterManagerAssignNodesToJobResult:
6565# These updates must run atomically.
6666@workflow .defn
6767class ClusterManagerWorkflow :
68- def __init__ (self ) -> None :
69- self .state = ClusterManagerState ()
68+ @workflow .init
69+ def __init__ (self , input : ClusterManagerInput ) -> None :
70+ if input .state :
71+ self .state = input .state
72+ else :
73+ self .state = ClusterManagerState ()
74+
75+ if input .test_continue_as_new :
76+ self .max_history_length = 120
77+ self .sleep_interval_seconds = 1
78+
7079 # Protects workflow state from interleaved access
7180 self .nodes_lock = asyncio .Lock ()
7281 self .max_history_length : Optional [int ] = None
@@ -202,29 +211,8 @@ async def perform_health_checks(self) -> None:
202211 f"Health check failed with error { type (e ).__name__ } :{ e } "
203212 )
204213
205- # The cluster manager is a long-running "entity" workflow so we need to periodically checkpoint its state and
206- # continue-as-new.
207- def init (self , input : ClusterManagerInput ) -> None :
208- if input .state :
209- self .state = input .state
210- if input .test_continue_as_new :
211- self .max_history_length = 120
212- self .sleep_interval_seconds = 1
213-
214- def should_continue_as_new (self ) -> bool :
215- if workflow .info ().is_continue_as_new_suggested ():
216- return True
217- # This is just for ease-of-testing. In production, we trust temporal to tell us when to continue as new.
218- if (
219- self .max_history_length
220- and workflow .info ().get_current_history_length () > self .max_history_length
221- ):
222- return True
223- return False
224-
225214 @workflow .run
226215 async def run (self , input : ClusterManagerInput ) -> ClusterManagerResult :
227- self .init (input )
228216 await workflow .wait_condition (lambda : self .state .cluster_started )
229217 # Perform health checks at intervals.
230218 while True :
@@ -239,6 +227,8 @@ async def run(self, input: ClusterManagerInput) -> ClusterManagerResult:
239227 pass
240228 if self .state .cluster_shutdown :
241229 break
230+ # The cluster manager is a long-running "entity" workflow so we need to periodically checkpoint its state and
231+ # continue-as-new.
242232 if self .should_continue_as_new ():
243233 # We don't want to leave any job assignment or deletion handlers half-finished when we continue as new.
244234 await workflow .wait_condition (lambda : workflow .all_handlers_finished ())
@@ -255,3 +245,14 @@ async def run(self, input: ClusterManagerInput) -> ClusterManagerResult:
255245 len (self .get_assigned_nodes ()),
256246 len (self .get_bad_nodes ()),
257247 )
248+
249+ def should_continue_as_new (self ) -> bool :
250+ if workflow .info ().is_continue_as_new_suggested ():
251+ return True
252+ # This is just for ease-of-testing. In production, we trust temporal to tell us when to continue as new.
253+ if (
254+ self .max_history_length
255+ and workflow .info ().get_current_history_length () > self .max_history_length
256+ ):
257+ return True
258+ return False
0 commit comments