Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/cwl_platform/arvados_platform.py
Original file line number Diff line number Diff line change
Expand Up @@ -625,3 +625,17 @@ def upload_file_to_project(self, filename, project, dest_folder, destination_fil
arv_file.write(local_content) # pylint: disable=no-member
target_collection.save()
return f"keep:{destination_collection['uuid']}/{target_filepath}"

def monitor_task(self, task):
'''
Checks the state of a task until it is no longer running
Returns the completed or failed task
:param task: task to monitor
'''
state = self.get_task_state(task, refresh=True)
while state not in ('Completed', 'Cancelled', 'Failed', 'NA'):
logger.debug(" - Sleeping for 60 seconds")
time.sleep(60)
state = self.get_task_state(task, refresh=True)

return(task)
8 changes: 8 additions & 0 deletions src/cwl_platform/base_platform.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,11 @@ def upload_file_to_project(self, filename, project, dest_folder, destination_fil
:param overwrite: Overwrite the file if it already exists.
:return: ID of uploaded file.
'''

@abstractmethod
def monitor_task(self, task):
'''
Checks the state of a task until it is no longer running
Returns the completed or failed task
:param task: task to monitor
'''
75 changes: 75 additions & 0 deletions src/cwl_platform/sevenbridges_platform.py
Original file line number Diff line number Diff line change
Expand Up @@ -565,3 +565,78 @@ def upload_file_to_project(self, filename, project, dest_folder, destination_fil

# return file id if file already exists
return existing_file[0].id

def monitor_task(self, task):
'''
Checks the state of a task until it is no longer running
Returns the completed or failed task
:param task: task to monitor
'''
count = 0
while task.status not in sevenbridges.TaskStatus.terminal_states:
logger.info(f"Waiting for task: {task.name} to finish")
task.wait(period=60)
exe_msg = task.execution_status.message
if task.status == sevenbridges.TaskStatus.FAILED:
if any(err in exe_msg for err in SevenBridgesInstance.errors) \
and count < 3:
logger.info(f"Task {task.name} failed with message "
f"'{exe_msg}'. Retrying on a bigger "
f"instance")
new_name = task.name.rpartition(" - ")[0] if \
"Retry" in task.name else task.name
task = self.api.tasks.create(
name=f"{new_name} - Retry {count}",
project=task.project,
app=task.app,
inputs=task.inputs,
interruptible=task.use_interruptible_instances,
execution_settings=SevenBridgesInstance.settings[count],
run=True
)
count += 1
return task

class SevenBridgesInstance:
errors = [
'Docker container failed to start',
'One of the instances of the task stopped responding',
'detected an issue with the instance running our task. '
'If you rerun this execution it is likely to complete successfully',
'issue with the instance it was running on. '
'If you restart this task it is likely to complete successfully. '
'If the issue persists',
'currently unable to launch a valid instance for the task. '
'Please retry or contact our support',
'Failed to execute status command',
'Something went wrong with this task at our end. '
'Please try rerunning, and if the error persists contact support',
'failed because the instance it was running on stopped responding. '
'Please contact our support team to investigate it further',
'Insufficient disk space may have been the cause.',
'Executor faced a runtime exception.',
'No space left on device',
'ran out of disk space',
'Please contact our support team for additional information',
'Failed to stage inputs'
]
settings = {
0: {
"instance_type": "m5.8xlarge;ebs-gp2;4096",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are there instance types hard-coded here? This assumes that is what should be used. There doesn't seem to be a way to override that either.

Copy link

@ryanporterfield ryanporterfield May 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @golharam and @yfan2012,

I see what you're saying here. When I wrote this, I just had the 10X launcher in mind. I forgot that the sevenbridges_platform.py should work for all the launchers. I can implement something similar to how the launcher functioned previously and move the instance types to the config file. I can also add arguments to specify whether or not a task should be retried, as well as a specific instance type to re-run the task on.

Is this something you'd like us to work on?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I want to make sure this package stays generic enough to not include anything specific to an individual pipeline. The config settings should be part of the launcher for the pipeline.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @golharam,

I rewrote this logic to be launcher agnostic, but need permissions to create a PR on this repo. Would you mind granting me those? And the rest of our team if that's okay with you?

Ryan

"max_parallel_instances": 1,
"use_elastic_disk": True,
"use_memoization": True
},
1: {
"instance_type": "m5.16xlarge;ebs-gp2;4096",
"max_parallel_instances": 1,
"use_elastic_disk": True,
"use_memoization": True
},
2: {
"instance_type": "m5.24xlarge;ebs-gp2;4096",
"max_parallel_instances": 1,
"use_elastic_disk": True,
"use_memoization": True
}
}