diff --git a/src/cwl_platform/base_platform.py b/src/cwl_platform/base_platform.py index 17a7b1df..bafa43f2 100644 --- a/src/cwl_platform/base_platform.py +++ b/src/cwl_platform/base_platform.py @@ -152,3 +152,11 @@ def upload_file_to_project(self, filename, project, dest_folder, destination_fil :param overwrite: Overwrite the file if it already exists. :return: ID of uploaded file. ''' + + @abstractmethod + def monitor_task(self, task): + ''' + Checks the state of a task until it is no longer running + Returns the completed or failed task + :param task: task to monitor + ''' diff --git a/src/cwl_platform/sevenbridges_platform.py b/src/cwl_platform/sevenbridges_platform.py index e6f8f176..32ad5a33 100644 --- a/src/cwl_platform/sevenbridges_platform.py +++ b/src/cwl_platform/sevenbridges_platform.py @@ -5,6 +5,7 @@ import logging import sevenbridges from sevenbridges.http.error_handlers import rate_limit_sleeper, maintenance_sleeper, general_error_sleeper +from time import sleep from .base_platform import Platform @@ -565,3 +566,51 @@ def upload_file_to_project(self, filename, project, dest_folder, destination_fil # return file id if file already exists return existing_file[0].id + + def monitor_task(self, task, sb_max_retry_count=3, + retry=False, instance_type=None): + ''' + Checks the state of a task until it is no longer running + Returns the completed or failed task + :param task: task to monitor + :param retry: boolean; should task be retried upon failure + :param instance_type: dict with integer keys and instance type values + eg: {0: "m5.8xlarge;ebs-gp2;4096", 1: "m5.8xlarge;ebs-gp2;4096"} + ''' + while task.status not in sevenbridges.TaskStatus.terminal_states: + logger.info(f"Waiting for task: {task.name} to finish") + task.wait(period=60) + if retry: + count = 1 + if task.status == sevenbridges.TaskStatus.FAILED and \ + count <= sb_max_retry_count: + logger.info(f"Task {task.name} failed with message " + f"'{exe_msg}'. Retrying on a bigger " + f"instance") + task_name = task.name.rpartition(" - ")[0] if \ + "Retry" in task.name else task.name + new_name = f"{task_name} - Retry {count}" + try: + exe_settings = dict(task.execution_settings, + **{'instance_type': + instance_type[count]} + ) + task = self.api.tasks.create( + name=new_name, + project=task.project, + app=task.app, + inputs=task.inputs, + interruptible=task.use_interruptible_instances, + execution_settings=exe_settings, + run=True + ) + except (KeyError, TypeError): + logger.info(f"Execution settings not defined for " + f"task: {task.name}. Retrying on " + f"same instance") + task = task.clone() + task.name = new_name + sleep(10) + task.save() + count += 1 + return task