diff --git a/src/cwl_platform/arvados_platform.py b/src/cwl_platform/arvados_platform.py index 937fb009..31c03299 100644 --- a/src/cwl_platform/arvados_platform.py +++ b/src/cwl_platform/arvados_platform.py @@ -91,9 +91,6 @@ def _load_cwl_output(self, task: ArvadosTask): cwl_output_collection = arvados.collection.Collection(task.container_request['output_uuid'], api_client=self.api, keep_client=self.keep_client) - if cwl_output_collection.items() is None: - return None - cwl_output = None with cwl_output_collection.open('cwl.output.json') as cwl_output_file: cwl_output = json.load(cwl_output_file) @@ -392,30 +389,80 @@ def get_task_output_filename(self, task: ArvadosTask, output_name): output_file = cwl_output[output_name]['basename'] return output_file - def get_tasks_by_name(self, project, task_name): # -> list(ArvadosTask): + def get_tasks_by_name(self, project, task_name=None): # -> list(ArvadosTask): ''' Get all processes (jobs) in a project with a specified name :param project: The project to search - :param process_name: The name of the process to search for + :param task_name: The name of the process to search for (value None means return all) :return: List of container request uuids and associated containers ''' # We must add priority>0 filter so we do not capture Cancelled jobs as Queued jobs. # According to Curii, 'Cancelled' on the UI = 'Queued' with priority=0, we are not interested in Cancelled # jobs here anyway, we will submit the job again + cont_reqs = [] + conts = {} tasks = [] + setup_filters=[ + ['owner_uuid', '=', project['uuid']], ['priority', '>', 0] + ] + if task_name is not None: + setup_filters.append(["name", '=', task_name]) + #Potentially easier way suggested by Peter: + #setup_filters.append(["name", 'ilike', '%launcher%']) --> should return single process/container, and take its cumulative_cost, should match adding up all container costs + #even better: + #setup_filters.append(['requesting_container_uuid', '=',None]) --> only the launcher won't have another container request for it, so this is better way to find launcher, then just take its comulative_cost prop + #If only want to count completed launchers: + #setup_filters_append(["state","=","Final"]) (but then also need to check its corresponding container and should be ["state","=","Complete"] and also check exit_code) for container_request in arvados.util.keyset_list_all( self.api.container_requests().list, - filters=[ - ["name", '=', task_name], - ['owner_uuid', '=', project['uuid']], ['priority', '>', 0] - ] + filters=setup_filters, + select=["uuid", "owner_uuid", "container_uuid", "name", "cumulative_cost", "properties", "modified_by_user_uuid", "created_at"] ): - # Get the container - container = self.api.containers().get(uuid=container_request['container_uuid']).execute() - tasks.append(ArvadosTask(container_request, container)) + cont_reqs.append(container_request) + + chunk_size = 1000 + + for i in range(0, len(cont_reqs), chunk_size): + chunk = cont_reqs[i:i + chunk_size] + for container in arvados.util.keyset_list_all( + self.api.containers().list, + filters=[ + ["uuid", "in", [c["container_uuid"] for c in chunk if c["container_uuid"]]], + ], + select=["uuid", "started_at", "finished_at", "cost"] + ): + conts[container["uuid"]] = container + + seen_cids = set() + for cr in cont_reqs: + cid = cr["container_uuid"] + if cid in conts: + c = conts[cid] + if cid not in seen_cids: + seen_cids.add(cid) + tasks.append(ArvadosTask(cr,c)) + return tasks + def get_task_cost(self, task): + return(task.container["cost"]) + + def get_project_cost(self, project): + setup_filters=[ + ['owner_uuid', '=', project['uuid']], ['requesting_container_uuid', '=', None] + ] + + projCost = 0.0 + for container_request in arvados.util.keyset_list_all( + self.api.container_requests().list, + filters=setup_filters, + select=["uuid", "owner_uuid", "container_uuid", "name", "cumulative_cost", "properties", "modified_by_user_uuid", "created_at"] + ): + projCost += container_request["cumulative_cost"] + + return(projCost) + def get_project(self): ''' Determine what project we are running in ''' try: diff --git a/src/cwl_platform/base_platform.py b/src/cwl_platform/base_platform.py index bca4e157..803b6628 100644 --- a/src/cwl_platform/base_platform.py +++ b/src/cwl_platform/base_platform.py @@ -87,9 +87,17 @@ def get_task_output_filename(self, task, output_name): ''' Retrieve the output field of the task and return filename''' @abstractmethod - def get_tasks_by_name(self, project, task_name): + def get_tasks_by_name(self, project, task_name=None): ''' Get a tasks by its name ''' + @abstractmethod + def get_task_cost(self, task): + ''' Get a task's cost ''' + + @abstractmethod + def get_project_cost(self, project): + ''' Get a project's cost ''' + @abstractmethod def get_project(self): ''' Determine what project we are running in ''' diff --git a/src/cwl_platform/sevenbridges_platform.py b/src/cwl_platform/sevenbridges_platform.py index 0d5f83ca..6d7f3be0 100644 --- a/src/cwl_platform/sevenbridges_platform.py +++ b/src/cwl_platform/sevenbridges_platform.py @@ -412,14 +412,35 @@ def get_task_output_filename(self, task: sevenbridges.Task, output_name): return outputfile.name raise ValueError(f"Output {output_name} does not exist for task {task.name}.") - def get_tasks_by_name(self, project, task_name): # -> list(sevenbridges.Task): + def get_tasks_by_name(self, project, task_name=None): # -> list(sevenbridges.Task): ''' Get a process by its name ''' tasks = [] for task in self.api.tasks.query(project=project).all(): - if task.name == task_name: + if task_name is None or task.name == task_name: tasks.append(task) return tasks + def get_task_cost(self, task): + task_cost = 0.0 + try: + task_cost = task.price.amount + except Exception as e: + print("Error getting cost for task: " + str(task)) + + return(task_cost) + + def get_project_cost(self, project): + # 1. Get a list of tasks from the project + tasks = self.get_tasks_by_name(project) + + # 2. Iterate over tasks and sum up total cost + total_cost = 0.0 + for task in tasks: + total_cost += self.get_task_cost(task) + rounded_total_cost = round(total_cost,2) + + return(rounded_total_cost) + def get_project(self): ''' Determine what project we are running in ''' task_id = os.environ.get('TASK_ID')