From b1015d292a235bcdd4c0dbba15f4553ef548d7fb Mon Sep 17 00:00:00 2001 From: fany29 Date: Fri, 26 Apr 2024 00:41:52 -0400 Subject: [PATCH 1/3] add retry capabilities from 10X pipeline by ryan porterfield --- src/cwl_platform/arvados_platform.py | 14 +++++ src/cwl_platform/sevenbridges_platform.py | 74 +++++++++++++++++++++++ 2 files changed, 88 insertions(+) diff --git a/src/cwl_platform/arvados_platform.py b/src/cwl_platform/arvados_platform.py index cb13cb2c..0f7c956d 100644 --- a/src/cwl_platform/arvados_platform.py +++ b/src/cwl_platform/arvados_platform.py @@ -625,3 +625,17 @@ def upload_file_to_project(self, filename, project, dest_folder, destination_fil arv_file.write(local_content) # pylint: disable=no-member target_collection.save() return f"keep:{destination_collection['uuid']}/{target_filepath}" + + def monitor_task(self, task): + ''' + Takes a task + Waits until it is done + Placeholder for now, but room to implement automatic retries in the future + ''' + state = self.get_task_state(task, refresh=True) + while state not in ('Completed', 'Cancelled', 'Failed', 'NA'): + logger.debug(" - Sleeping for 60 seconds") + time.sleep(60) + state = self.get_task_state(task, refresh=True) + + return(task) diff --git a/src/cwl_platform/sevenbridges_platform.py b/src/cwl_platform/sevenbridges_platform.py index e6f8f176..1dcb9a2c 100644 --- a/src/cwl_platform/sevenbridges_platform.py +++ b/src/cwl_platform/sevenbridges_platform.py @@ -565,3 +565,77 @@ def upload_file_to_project(self, filename, project, dest_folder, destination_fil # return file id if file already exists return existing_file[0].id + + def monitor_task(self, task): + ''' + Monitor a task on the platform, resubmit on a larger instance + if failed due to reasons in SevenBridgesInstance.errors + ''' + count = 0 + while task.status not in sevenbridges.TaskStatus.terminal_states: + logger.info(f"Waiting for task: {task.name} to finish") + task.wait(period=60) + exe_msg = task.execution_status.message + if task.status == sevenbridges.TaskStatus.FAILED: + if any(err in exe_msg for err in SevenBridgesInstance.errors) \ + and count < 3: + logger.info(f"Task {task.name} failed with message " + f"'{exe_msg}'. Retrying on a bigger " + f"instance") + new_name = task.name.rpartition(" - ")[0] if \ + "Retry" in task.name else task.name + task = self.api.tasks.create( + name=f"{new_name} - Retry {count}", + project=task.project, + app=task.app, + inputs=task.inputs, + interruptible=task.use_interruptible_instances, + execution_settings=SevenBridgesInstance.settings[count], + run=True + ) + count += 1 + return task + +class SevenBridgesInstance: + errors = [ + 'Docker container failed to start', + 'One of the instances of the task stopped responding', + 'detected an issue with the instance running our task. ' + 'If you rerun this execution it is likely to complete successfully', + 'issue with the instance it was running on. ' + 'If you restart this task it is likely to complete successfully. ' + 'If the issue persists', + 'currently unable to launch a valid instance for the task. ' + 'Please retry or contact our support', + 'Failed to execute status command', + 'Something went wrong with this task at our end. ' + 'Please try rerunning, and if the error persists contact support', + 'failed because the instance it was running on stopped responding. ' + 'Please contact our support team to investigate it further', + 'Insufficient disk space may have been the cause.', + 'Executor faced a runtime exception.', + 'No space left on device', + 'ran out of disk space', + 'Please contact our support team for additional information', + 'Failed to stage inputs' + ] + settings = { + 0: { + "instance_type": "m5.8xlarge;ebs-gp2;4096", + "max_parallel_instances": 1, + "use_elastic_disk": True, + "use_memoization": True + }, + 1: { + "instance_type": "m5.16xlarge;ebs-gp2;4096", + "max_parallel_instances": 1, + "use_elastic_disk": True, + "use_memoization": True + }, + 2: { + "instance_type": "m5.24xlarge;ebs-gp2;4096", + "max_parallel_instances": 1, + "use_elastic_disk": True, + "use_memoization": True + } + } From 3fedf7bcb840b09afa47c48ef6616175a749dcfc Mon Sep 17 00:00:00 2001 From: fany29 Date: Fri, 26 Apr 2024 09:16:39 -0400 Subject: [PATCH 2/3] add task param to headers --- src/cwl_platform/arvados_platform.py | 1 + src/cwl_platform/base_platform.py | 8 ++++++++ src/cwl_platform/sevenbridges_platform.py | 1 + 3 files changed, 10 insertions(+) diff --git a/src/cwl_platform/arvados_platform.py b/src/cwl_platform/arvados_platform.py index 0f7c956d..be1b9a1b 100644 --- a/src/cwl_platform/arvados_platform.py +++ b/src/cwl_platform/arvados_platform.py @@ -631,6 +631,7 @@ def monitor_task(self, task): Takes a task Waits until it is done Placeholder for now, but room to implement automatic retries in the future + :param task: task to monitor ''' state = self.get_task_state(task, refresh=True) while state not in ('Completed', 'Cancelled', 'Failed', 'NA'): diff --git a/src/cwl_platform/base_platform.py b/src/cwl_platform/base_platform.py index 17a7b1df..c2bdbcba 100644 --- a/src/cwl_platform/base_platform.py +++ b/src/cwl_platform/base_platform.py @@ -152,3 +152,11 @@ def upload_file_to_project(self, filename, project, dest_folder, destination_fil :param overwrite: Overwrite the file if it already exists. :return: ID of uploaded file. ''' + + @abstractmethod + def monitor_task(self, task): + ''' + Monitor a task on the platform, resubmit on a larger instance + if failed due to reasons in SevenBridgesInstance.errors + :param task: task to monitor + ''' diff --git a/src/cwl_platform/sevenbridges_platform.py b/src/cwl_platform/sevenbridges_platform.py index 1dcb9a2c..1d06e983 100644 --- a/src/cwl_platform/sevenbridges_platform.py +++ b/src/cwl_platform/sevenbridges_platform.py @@ -570,6 +570,7 @@ def monitor_task(self, task): ''' Monitor a task on the platform, resubmit on a larger instance if failed due to reasons in SevenBridgesInstance.errors + :param task: task to monitor ''' count = 0 while task.status not in sevenbridges.TaskStatus.terminal_states: From 93ea0fbabf16fc6129147788c981e996bef5aeaf Mon Sep 17 00:00:00 2001 From: fany29 Date: Fri, 26 Apr 2024 12:21:02 -0400 Subject: [PATCH 3/3] unify function header comment thingys --- src/cwl_platform/arvados_platform.py | 5 ++--- src/cwl_platform/base_platform.py | 4 ++-- src/cwl_platform/sevenbridges_platform.py | 6 +++--- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/src/cwl_platform/arvados_platform.py b/src/cwl_platform/arvados_platform.py index be1b9a1b..253e6d14 100644 --- a/src/cwl_platform/arvados_platform.py +++ b/src/cwl_platform/arvados_platform.py @@ -628,9 +628,8 @@ def upload_file_to_project(self, filename, project, dest_folder, destination_fil def monitor_task(self, task): ''' - Takes a task - Waits until it is done - Placeholder for now, but room to implement automatic retries in the future + Checks the state of a task until it is no longer running + Returns the completed or failed task :param task: task to monitor ''' state = self.get_task_state(task, refresh=True) diff --git a/src/cwl_platform/base_platform.py b/src/cwl_platform/base_platform.py index c2bdbcba..08962228 100644 --- a/src/cwl_platform/base_platform.py +++ b/src/cwl_platform/base_platform.py @@ -156,7 +156,7 @@ def upload_file_to_project(self, filename, project, dest_folder, destination_fil @abstractmethod def monitor_task(self, task): ''' - Monitor a task on the platform, resubmit on a larger instance - if failed due to reasons in SevenBridgesInstance.errors + Checks the state of a task until it is no longer running + Returns the completed or failed task :param task: task to monitor ''' diff --git a/src/cwl_platform/sevenbridges_platform.py b/src/cwl_platform/sevenbridges_platform.py index 1d06e983..897f4422 100644 --- a/src/cwl_platform/sevenbridges_platform.py +++ b/src/cwl_platform/sevenbridges_platform.py @@ -568,9 +568,9 @@ def upload_file_to_project(self, filename, project, dest_folder, destination_fil def monitor_task(self, task): ''' - Monitor a task on the platform, resubmit on a larger instance - if failed due to reasons in SevenBridgesInstance.errors - :param task: task to monitor + Checks the state of a task until it is no longer running + Returns the completed or failed task + :param task: task to monitor ''' count = 0 while task.status not in sevenbridges.TaskStatus.terminal_states: