Replies: 1 comment
-
|
This is a very good and detailed breakdown! Do you plan on adapting some of this into developer documentation? It gives a great stack trace of what happens within BEE. I actually have been working on a few of the issues / questions you mentioned (in pr #1105) Relevant questions that PR addresses:
It now gets set to No Start in neo4j.
It now updates as archived within neo4j too. This was actually the source of #1038
This also gets addressed with archiving. Neo4j now becomes a source of truth for workflow states.
This functionality never existed in the past because they used to use one workflow database. Queries have been added that will remove all workflow nodes from neo4j if a user does remove. The biggest things not addressed are regarding cancellation and the flux/slurm states. These definitely need further examination. |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
This issue contains my notes from when I went through BEE specifically looking at states. bold text describes states. italicized text describes questions I have.
What happens when a user submits a workflow
Initial setting up and running of things
beeflow/client/bee_client.py submit function -> beeflow/common/parser/parser.py parse_workflow method creates Workflow object and the state of the workflow is SUBMITTED (this will be the first state of the workflow in neo4j)
beeflow/client/bee_client.py submit function -> beeflow/wf_manager/resources/wf_list.py post method
beeflow/wf_manager/resources/wf_list.py post method -> beeflow/common/db/wfm_db.py init_workflow method of the Workflows class. The workflow is inserted into the wfm db and the state is set to Initializing
beeflow/wf_manager/resources/wf_list.py post method -> beeflow/wf_manager/resources/wf_list.py init_workflow function connects to the neo4j driver and runs wf_utils.setup_workflow
beeflow/wf_manager/resources/wf_utils.py setup_workflow function -> runs beeflow/common/wf_interface.py initialize_workflow method which runs beeflow/common/gdb/neo4j_driver.py initialize_workflow method which creates the Workflow, Requirement, and Hint nodes in the Neo4j database.
beeflow/common/gdb/neo4j_driver.py initialize_workflow method -> beeflow/common/gdb/neo4j_cypher.py create_workflow_node method sets the state of the workflow. The state of the workflow comes from the workflow object that is passed to wf_utils.setup_workflow, which is SUBMITTED
beeflow/wf_manager/resources/wf_utils.py setup_workflow function -> beeflow/wf_manager/resources/wf_utils.py create_wf_metadata function calls
create_wf_namewhich creates the bee_wf_name file and calls thecreate_wf_statuswhich creates the bee_wf_status file and sets the state in there to Pendingbeeflow/wf_manager/resources/wf_utils.py setup_workflow function for task in tasks loop:
task_state = "" if no_start else "WAITING", then the
wfi.add_taskcall calls the load_task method in beeflow/common/gdb/neo4j_driver.py which creates the task hint, requirement, input, output and metadata nodes (the state in the metadata is either the "" or WAITING we just determined).wfi.set_task_metadata(task, metadata)updates the metadata in neo4j to include the workdir.db.workflows.add_task(task.id, wf_id, task.name, task_state)adds the task to the wfm db (the state of the tasks is either the "" or WAITING we just determined).beeflow/wf_manager/resources/wf_utils.py setup_workflow function if no_start:
The state in bee_wf_status is set to No Start. The workflow state in the wfm db is set to No Start. What about the state in neo4j? Right now it just stays as SUBMITTED. Should it get changed to No Start too?
beeflow/wf_manager/resources/wf_utils.py setup_workflow function the else block from if no_start:
The state in bee_wf_status is set to Waiting. The workflow state in the wfm db is set to Waiting. The
log.info('Starting workflow')happens and then the workflow state in the wfm db is set to Running and the beeflow/wf_manager/resources/wf_utils.py start_workflow function is called. Is setting the workflow state in the wfm db to Running before the start_workflow function runs correct? I know in the start_workflow function the neo4j workflow state gets set to RUNNING, the bee_wf_status file gets set to Running, and the wfm db workflow state gets set to Running (so it gets set to Running again). So maybe the wfm db workflow state state shouldn't get set to running in both the setup_workflow function and the start_workflow function -- it should probably just be the start_workflow functionbeeflow/wf_manager/resources/wf_utils.py start_workflow function:
Gets the workflow state from neo4j. If that state is in ('RUNNING', 'PAUSED', 'COMPLETED') the function returns false.
for task in tasks loop: goes through and gets the task state from neo4j. If the neo4j task state is '' then the neo4j task state gets set to WAITING and the wfm db task state gets set to WAITING
wfi.execute_workflow()calls the execute_workflow method in beeflow/common/gdb/neo4j_driver.py. This method sets tasks that have their inputs and are in the WAITING state to the READY state in neo4j. Sets the neo4j workflow state to RUNNINGsubmit_tasks_tmcallssubmit_tasks_tmwhich sends the tasks to the tm db submit queue (I don't think the tasks have a state in the tm db yet). Then the task state in neo4j is set to SUBMIT.then the bee_wf_status file is set to Running and the workflow state in the wfm db is set to Running.
What gets run because the process_queues function in beeflow/task_manager/background.py in runs every 5 seconds (or the background value specified in the config)
beeflow/task_manager/background.py process_queues -> beeflow/task_manager/background.py submit_jobs function
beeflow/task_manager/background.py submit_jobs function:
while db.submit_queue.count() >= 1 and db.job_queue.count() < jobs_limit:
pops the task from the bottom of the submit queue and assigns it to the task variable. then the job_state variable is defined by calling the submit_task function with the tm db, the worker and the task.
beeflow/task_manager/background.py submit_task function (returns the job_state): gets the job state and id by submitting the task to
worker.submit_task. the job state is the state the worker returns (slurm or flux) or it's NOT_RESPONDINGThe job state and id that just got defined are then pushed to the tm db job queue along with the task (so now the task in the tm db definitely has a state)
if there was a container build error the job state is BUILD_FAIL. If there was some other error the job state is SUBMIT_FAIL
submit_task function returns the job_state we just defined (either the state from the worker, NOT_RESPONDING, BUILD_FAIL, or SUBMIT_FAIL) back to
submit_jobsand submit_jobs pushes the info (including this job_state) to the tm db update queuebeeflow/task_manager/background.py process_queues -> beeflow/task_manager/background.py update_jobs function
beeflow/task_manager/background.py update_jobs function:
gets a list of jobs from the tm db job_queue. The job_state variable is assigned to be the value of the job state from the job_queue. If the job state is in COMPLETED_STATES then the job is removed from the job queue.
we assign new_job_state to either be the job state from the worker or UNKNOWN. If the new_job state doesn't equal the job_state:
the tm db job queue is updated with this new job state.
if new_job_state in ('FAILED', 'TIMEOUT'): we get the checkpoint info (if applicable) and send the new job state to the tm db update queue. Unless there was a CheckpointRestartError then we send the task to the db update queue with the FAILED state.
elif new_job_state in ('BOOT_FAIL', 'NODE_FAIL', 'OUT_OF_MEMORY', 'PREEMPTED'): we remove the job from the tm db job queue, get the state from submit_task and send that to the tm db update queue (i.e., we just resubmit the task)
else (so the task new job state is not in ('FAILED', 'TIMEOUT') or ('BOOT_FAIL', 'NODE_FAIL', 'OUT_OF_MEMORY', 'PREEMPTED'): we send the new_job_state and other task info to the tm db update queue.
check again if the job state is in COMPLETED_STATES. If it is then the job is removed from the job queue.
beeflow/task_manager/background.py process queues continued:
get the state updates from the tm db update queue. now send them to the put method of WFUpdate defined in beeflow/wf_manager/resources/wf_update.py. If all goes well you can then clear the tm db update queue
beeflow/wf_manager/resources/wf_update.py WFUpdate put method send each of the state updates to the update_task_state method of WFUpdate
beeflow/wf_manager/resources/wf_update.py update_task_state method:
Gets the tasks from neo4j. Then the neo4j task state and the wfm db task state is update with the new task state
updates the neo4j metadata
handle_checkpoint_restart gets run: this code will skip and return false if the state_update.task_info is not None (I think state_update.task_info is not None if background.update_jobs gets the checkpoint file ). If state_update.task_info is not None, then we assign new_task from wfi.restart_task. If the num of tries from wfi.restart task is greater than 0: a new task id, name the wf_id and the state set to WAITING are added to the wfm db and the tasks are submitted to the task manager. If the num of tries from wfi.restart task is less than or equal to 0: the task state and workflow state in neo4j are set to FAILED and then the workflow is archived by
archive_fail_workflowshould the whole workflow be archived or just the dependent tasks?if not handle_checkpoint_restart, then the handle_state_change method is run:
if the new job state is COMPLETED then the task outputs are set and copied. tasks are assigned by
wfi.finalize_task(task)wfi.finalize_task (returns ready tasks) runs: the task state in neo4j is set to COMPLETED, the runnable tasks are set to READY in neo4j and the ready tasks are returned.
if tasks and wf_state not in ('PAUSED', 'Cancelled'): then the ready tasks are submitted to the task manager. I don't know why tasks are being considered in the set because I don;t think the task state is in the tasks variable.
if the new job state is in (FAILED', 'SUBMIT_FAIL', 'BUILD_FAIL', 'TIMEOUT', 'CANCELLED') then the dependent tasks are sent to DEP_FAIL
if all the final tasks in neo4j have a state in neo4j that's in the set ('FAILED', 'SUBMIT_FAIL', 'BUILD_FAIL', 'DEP_FAIL', 'TIMEOUT', 'CANCELLED', "COMPLETED'), then we run
wfi.get_workflow_final_statewhich returns None if All tasks succeeded, Failed if all the tasks failed, or Partial-Fail if some of the tasks failed. then we archive the workflow with that final statechecks id the workflow state in neo4j is cancelled and if wfi.cancelled_workflow_completed(). If these are true the workflow is archived with the Cancelled state.
cancelled_workflow_completed is true if each of the workflows final task states are not in (PENDING, RUNNING, COMPLETING) *see Issue #1058 for one of my problems with this. My other problem is that (PENDING, RUNNING, COMPLETING) doesn't cover all of the states slurm or flux could produce *
beeflow/wf_manager/resources/wf_update.py archive_workflow function checks workflow state in the wfm db, if the state starts with Archived that means the workflow has already been archived and the code skips. the code says that we get the workflow state from the wfm db because that's the only way to retrieve the wf state after archiving, but is it? I know you can't get it from the bee_wf_status file, but what about the neo4j database?
wf_state is assigned by:
wf_state = f'Archived/{final_state}' if final_state is not None else 'Archived'. Then the wfm db workflow state gets updated with that wf_state and the bee_wf_status file gets updated with that wf_state what about the workflow state in neo4j? That state is still RUNNINGWhat happens when a user starts a workflow
bee_client calls the post method of WFActions in beeflow/wf_manager/resources/wf_actions.py.
beeflow/wf_manager/resources/wf_actions.py post method:
calls the start_workflow function in beeflow/wf_manager/resources/wf_utils.py (this was covered in the submit section). The workflow state in wfm db gets updated to Running
What happens when a user queries a workflow
bee_client calls the get method of WFActions in beeflow/wf_manager/resources/wf_actions.py. It gets tasks states and the workflow state from the wfm db and returns them
What happens when a user runs
beeflow listbee_client calls get_wf_list() which calls the get method of WFList in beeflow/wf_manager/resources/wf_list.py. It gets all of the workflow ids, statuses, and names from the wfm db
What happens when a user pauses a workflow
bee_client calls the patch method of WFActions in beeflow/wf_manager/resources/wf_actions.py
beeflow/wf_manager/resources/wf_actions.py patch method:
gets the workflow state from neo4j. if the option == 'pause' and wf_state in ('RUNNING', 'INITIALIZING'):
problem: the neo4j workflow state can be RUNNING even if the workflow has been completed and archived since it's never changed from RUNNING to something like COMPLETED -- I made a note about this in the submit section of this issue. Also see Issue #1038
the workflow state in neo4j gets set to PAUSED. The workflow state in bee_wf_status file and the wfm db get set to Paused
What happens when a user resumes a workflow:
bee_client calls the patch method of WFActions in beeflow/wf_manager/resources/wf_actions.py
beeflow/wf_manager/resources/wf_actions.py patch method:
gets the workflow state from neo4j. if the option == 'resume' and wf_state == 'PAUSED':
the workflow state in neo4j gets set to RUNNING. the ready tasks in neo4j are sent to the task manager. The workflow status in bee_wf_status and the wfm db are set to RUNNING.
What happens when a user cancels a workflow:
bee_client calls
get_wf_statuswhich calls the get method of WFActions in beeflow/wf_manager/resources/wf_actions.py. It gets tasks states and the workflow state from the wfm db and returns them. get_wf_status then returns just the workflow stateif the workflow state is Initializing then the user is told to try to cancel later.
if the workflow state is in ('Running', 'Paused', 'No Start') then bee_client calls the delete method of WFActions in beeflow/wf_manager/resources/wf_actions.py
beeflow/wf_manager/resources/wf_actions.py delete method:
gets the workflow state from neo4j. Sets the workflow state in neo4j, bee_wf_status, and wfm db to Cancelled. the state in neo4j should be CANCELLED instead of Cancelled to match the capitalization of other neo4j workflow states -- note: this will have to be reflected in places like wf_update
the workflow is archived if it was originally paused -- this happens in wf_actions instead of wf_update because the tasks won't have been updated so wf_update wouldn't have been receiving any updates.
What happens when a user removes a workflow:
bee_client calls
get_wf_statuswhich calls the get method of WFActions in beeflow/wf_manager/resources/wf_actions.py. It gets tasks states and the workflow state from the wfm db and returns them. get_wf_status then returns just the workflow stateif wf_status in ('Cancelled', 'Paused') or 'Archived' in wf_status the user is asked if they want to continue to remove I think Archived should just be in the set
Then bee_client calls the delete method of WFActions in beeflow/wf_manager/resources/wf_actions.py
beeflow/wf_manager/resources/wf_actions.py delete method:
if the option is remove, the workflow get deleted from the wfm db. Does the workflow not get/need to get deleted from the neo4j database?)
Beta Was this translation helpful? Give feedback.
All reactions