diff --git a/src/cwl_platform/arvados_platform.py b/src/cwl_platform/arvados_platform.py index cb13cb2c..253e6d14 100644 --- a/src/cwl_platform/arvados_platform.py +++ b/src/cwl_platform/arvados_platform.py @@ -625,3 +625,17 @@ def upload_file_to_project(self, filename, project, dest_folder, destination_fil arv_file.write(local_content) # pylint: disable=no-member target_collection.save() return f"keep:{destination_collection['uuid']}/{target_filepath}" + + def monitor_task(self, task): + ''' + Checks the state of a task until it is no longer running + Returns the completed or failed task + :param task: task to monitor + ''' + state = self.get_task_state(task, refresh=True) + while state not in ('Completed', 'Cancelled', 'Failed', 'NA'): + logger.debug(" - Sleeping for 60 seconds") + time.sleep(60) + state = self.get_task_state(task, refresh=True) + + return(task) diff --git a/src/cwl_platform/base_platform.py b/src/cwl_platform/base_platform.py index 17a7b1df..08962228 100644 --- a/src/cwl_platform/base_platform.py +++ b/src/cwl_platform/base_platform.py @@ -152,3 +152,11 @@ def upload_file_to_project(self, filename, project, dest_folder, destination_fil :param overwrite: Overwrite the file if it already exists. :return: ID of uploaded file. ''' + + @abstractmethod + def monitor_task(self, task): + ''' + Checks the state of a task until it is no longer running + Returns the completed or failed task + :param task: task to monitor + ''' diff --git a/src/cwl_platform/sevenbridges_platform.py b/src/cwl_platform/sevenbridges_platform.py index e6f8f176..897f4422 100644 --- a/src/cwl_platform/sevenbridges_platform.py +++ b/src/cwl_platform/sevenbridges_platform.py @@ -565,3 +565,78 @@ def upload_file_to_project(self, filename, project, dest_folder, destination_fil # return file id if file already exists return existing_file[0].id + + def monitor_task(self, task): + ''' + Checks the state of a task until it is no longer running + Returns the completed or failed task + :param task: task to monitor + ''' + count = 0 + while task.status not in sevenbridges.TaskStatus.terminal_states: + logger.info(f"Waiting for task: {task.name} to finish") + task.wait(period=60) + exe_msg = task.execution_status.message + if task.status == sevenbridges.TaskStatus.FAILED: + if any(err in exe_msg for err in SevenBridgesInstance.errors) \ + and count < 3: + logger.info(f"Task {task.name} failed with message " + f"'{exe_msg}'. Retrying on a bigger " + f"instance") + new_name = task.name.rpartition(" - ")[0] if \ + "Retry" in task.name else task.name + task = self.api.tasks.create( + name=f"{new_name} - Retry {count}", + project=task.project, + app=task.app, + inputs=task.inputs, + interruptible=task.use_interruptible_instances, + execution_settings=SevenBridgesInstance.settings[count], + run=True + ) + count += 1 + return task + +class SevenBridgesInstance: + errors = [ + 'Docker container failed to start', + 'One of the instances of the task stopped responding', + 'detected an issue with the instance running our task. ' + 'If you rerun this execution it is likely to complete successfully', + 'issue with the instance it was running on. ' + 'If you restart this task it is likely to complete successfully. ' + 'If the issue persists', + 'currently unable to launch a valid instance for the task. ' + 'Please retry or contact our support', + 'Failed to execute status command', + 'Something went wrong with this task at our end. ' + 'Please try rerunning, and if the error persists contact support', + 'failed because the instance it was running on stopped responding. ' + 'Please contact our support team to investigate it further', + 'Insufficient disk space may have been the cause.', + 'Executor faced a runtime exception.', + 'No space left on device', + 'ran out of disk space', + 'Please contact our support team for additional information', + 'Failed to stage inputs' + ] + settings = { + 0: { + "instance_type": "m5.8xlarge;ebs-gp2;4096", + "max_parallel_instances": 1, + "use_elastic_disk": True, + "use_memoization": True + }, + 1: { + "instance_type": "m5.16xlarge;ebs-gp2;4096", + "max_parallel_instances": 1, + "use_elastic_disk": True, + "use_memoization": True + }, + 2: { + "instance_type": "m5.24xlarge;ebs-gp2;4096", + "max_parallel_instances": 1, + "use_elastic_disk": True, + "use_memoization": True + } + }