Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 59 additions & 12 deletions src/cwl_platform/arvados_platform.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,6 @@ def _load_cwl_output(self, task: ArvadosTask):
cwl_output_collection = arvados.collection.Collection(task.container_request['output_uuid'],
api_client=self.api,
keep_client=self.keep_client)
if cwl_output_collection.items() is None:
return None

cwl_output = None
with cwl_output_collection.open('cwl.output.json') as cwl_output_file:
cwl_output = json.load(cwl_output_file)
Expand Down Expand Up @@ -392,30 +389,80 @@ def get_task_output_filename(self, task: ArvadosTask, output_name):
output_file = cwl_output[output_name]['basename']
return output_file

def get_tasks_by_name(self, project, task_name): # -> list(ArvadosTask):
def get_tasks_by_name(self, project, task_name=None): # -> list(ArvadosTask):
'''
Get all processes (jobs) in a project with a specified name

:param project: The project to search
:param process_name: The name of the process to search for
:param task_name: The name of the process to search for (value None means return all)
:return: List of container request uuids and associated containers
'''
# We must add priority>0 filter so we do not capture Cancelled jobs as Queued jobs.
# According to Curii, 'Cancelled' on the UI = 'Queued' with priority=0, we are not interested in Cancelled
# jobs here anyway, we will submit the job again
cont_reqs = []
conts = {}
tasks = []
setup_filters=[
['owner_uuid', '=', project['uuid']], ['priority', '>', 0]
]
if task_name is not None:
setup_filters.append(["name", '=', task_name])
#Potentially easier way suggested by Peter:
#setup_filters.append(["name", 'ilike', '%launcher%']) --> should return single process/container, and take its cumulative_cost, should match adding up all container costs
#even better:
#setup_filters.append(['requesting_container_uuid', '=',None]) --> only the launcher won't have another container request for it, so this is better way to find launcher, then just take its comulative_cost prop
#If only want to count completed launchers:
#setup_filters_append(["state","=","Final"]) (but then also need to check its corresponding container and should be ["state","=","Complete"] and also check exit_code)
for container_request in arvados.util.keyset_list_all(
self.api.container_requests().list,
filters=[
["name", '=', task_name],
['owner_uuid', '=', project['uuid']], ['priority', '>', 0]
]
filters=setup_filters,
select=["uuid", "owner_uuid", "container_uuid", "name", "cumulative_cost", "properties", "modified_by_user_uuid", "created_at"]
):
# Get the container
container = self.api.containers().get(uuid=container_request['container_uuid']).execute()
tasks.append(ArvadosTask(container_request, container))
cont_reqs.append(container_request)

chunk_size = 1000

for i in range(0, len(cont_reqs), chunk_size):
chunk = cont_reqs[i:i + chunk_size]
for container in arvados.util.keyset_list_all(
self.api.containers().list,
filters=[
["uuid", "in", [c["container_uuid"] for c in chunk if c["container_uuid"]]],
],
select=["uuid", "started_at", "finished_at", "cost"]
):
conts[container["uuid"]] = container

seen_cids = set()
for cr in cont_reqs:
cid = cr["container_uuid"]
if cid in conts:
c = conts[cid]
if cid not in seen_cids:
seen_cids.add(cid)
tasks.append(ArvadosTask(cr,c))

return tasks

def get_task_cost(self, task):
return(task.container["cost"])

def get_project_cost(self, project):
setup_filters=[
['owner_uuid', '=', project['uuid']], ['requesting_container_uuid', '=', None]
]

projCost = 0.0
for container_request in arvados.util.keyset_list_all(
self.api.container_requests().list,
filters=setup_filters,
select=["uuid", "owner_uuid", "container_uuid", "name", "cumulative_cost", "properties", "modified_by_user_uuid", "created_at"]
):
projCost += container_request["cumulative_cost"]

return(projCost)

def get_project(self):
''' Determine what project we are running in '''
try:
Expand Down
10 changes: 9 additions & 1 deletion src/cwl_platform/base_platform.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,17 @@ def get_task_output_filename(self, task, output_name):
''' Retrieve the output field of the task and return filename'''

@abstractmethod
def get_tasks_by_name(self, project, task_name):
def get_tasks_by_name(self, project, task_name=None):
''' Get a tasks by its name '''

@abstractmethod
def get_task_cost(self, task):
''' Get a task's cost '''

@abstractmethod
def get_project_cost(self, project):
''' Get a project's cost '''

@abstractmethod
def get_project(self):
''' Determine what project we are running in '''
Expand Down
25 changes: 23 additions & 2 deletions src/cwl_platform/sevenbridges_platform.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,14 +412,35 @@ def get_task_output_filename(self, task: sevenbridges.Task, output_name):
return outputfile.name
raise ValueError(f"Output {output_name} does not exist for task {task.name}.")

def get_tasks_by_name(self, project, task_name): # -> list(sevenbridges.Task):
def get_tasks_by_name(self, project, task_name=None): # -> list(sevenbridges.Task):
''' Get a process by its name '''
tasks = []
for task in self.api.tasks.query(project=project).all():
if task.name == task_name:
if task_name is None or task.name == task_name:
tasks.append(task)
return tasks

def get_task_cost(self, task):
task_cost = 0.0
try:
task_cost = task.price.amount
except Exception as e:
print("Error getting cost for task: " + str(task))

return(task_cost)

def get_project_cost(self, project):
# 1. Get a list of tasks from the project
tasks = self.get_tasks_by_name(project)

# 2. Iterate over tasks and sum up total cost
total_cost = 0.0
for task in tasks:
total_cost += self.get_task_cost(task)
rounded_total_cost = round(total_cost,2)

return(rounded_total_cost)

def get_project(self):
''' Determine what project we are running in '''
task_id = os.environ.get('TASK_ID')
Expand Down