From 71487428536b2cefec8e652ebf590c5c76d4c1d3 Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Mon, 13 May 2024 10:49:13 -0400 Subject: [PATCH 01/43] Adding AWS Omics support --- src/cwl_platform/__init__.py | 4 +- src/cwl_platform/omics_platform.py | 178 +++++++++++++++++++++++++++++ 2 files changed, 180 insertions(+), 2 deletions(-) create mode 100644 src/cwl_platform/omics_platform.py diff --git a/src/cwl_platform/__init__.py b/src/cwl_platform/__init__.py index 25fc722f..86cbe43d 100644 --- a/src/cwl_platform/__init__.py +++ b/src/cwl_platform/__init__.py @@ -6,12 +6,12 @@ from .arvados_platform import ArvadosPlatform from .sevenbridges_platform import SevenBridgesPlatform -#from .omics_platform import OmicsPlatform +from .omics_platform import OmicsPlatform # Move this for a config file SUPPORTED_PLATFORMS = { 'Arvados': ArvadosPlatform, -# 'Omics': OmicsPlatform, + 'Omics': OmicsPlatform, 'SevenBridges': SevenBridgesPlatform } diff --git a/src/cwl_platform/omics_platform.py b/src/cwl_platform/omics_platform.py new file mode 100644 index 00000000..d3a373a9 --- /dev/null +++ b/src/cwl_platform/omics_platform.py @@ -0,0 +1,178 @@ +''' +AWS HealthOmics class +''' + +import logging +import boto3 +import botocore + +from .base_platform import Platform + +logger = logging.getLogger(__name__) + +class OmicsPlatform(Platform): + ''' AWS HealthOmics Platform class ''' + def __init__(self, name): + super().__init__(name) + self.api = None + + def connect(self): + ''' Connect to AWS Omics platform''' + self.api = boto3.client('omics') + + def copy_folder(self, reference_project, reference_folder, destination_project): + ''' + Do nothing and return reference folder, which should be an S3 path. + ''' + return reference_folder + + def copy_workflow(self, src_workflow, destination_project): + '''Do nothing and return workflow id''' + return src_workflow + + def copy_workflows(self, reference_project, destination_project): + '''Do nothing. This function seems not used in launcher?''' + pass + + def delete_task(self, task): + ''' Delete a task/workflow/process ''' + self.logger.info('TBD: Deleting task %s', task) + + @classmethod + def detect(cls): + return False + + def get_current_task(self): + ''' Get the current task ''' + return None + + def get_file_id(self, project, file_path): + '''Return file s3 path for Omics job input''' + return file_path + + def get_folder_id(self, project, folder_path): + ''' + There is not unique ID for a folder in s3, so just return the folder_path + The one caveat is that Omics wants trailing slashes on folder paths, so add one. + ''' + return folder_path + "/" + + def get_task_input(self, task, input_name): + ''' Retrieve the input field of the task ''' + self.logger.info("TBD: Getting input for task %s", task) + return None + + def get_task_state(self, task, refresh=False): + ''' + Get status of run by task_id. + task: A dictionary of omics response from start_run. Includes Run ID, Name, Tags, etc. + return status of the run (Complete, Failed, Running, Cancelled, Queued). + ''' + + try: + run_info = self.api.get_run(id=task['id']) + job_status = run_info['status'] + except: + raise ValueError('No Status information found for job %s. Check job status.', task['id']) + + if job_status == 'COMPLETED': + return 'Complete' + if job_status == 'FAILED': + return 'Failed' + if job_status in ['STARTING','RUNNING','STOPPING']: + return 'Running' + if job_status in ['CANCELLED','DELETED']: + return 'Cancelled' + if job_status == 'PENDING': + return 'Queued' + + raise ValueError('Unknown task state: %s : %s', task['id'], job_status) + + def get_task_output(self, task, output_name): + ''' Retrieve the output field of the task ''' + taskinfo = self.api.get_run(id=task) + # TODO: get_run only returns OutputUri. Get file path based on output_name (filename)? + filename = None + # TODO: We shouldn't be hard-coding stuff like this. these functions should be very generic. + if output_name == 'RecalibratedBAM': + filename = taskinfo.name + '.bam' + if filename == None: + raise ValueError(f"Cannot find output file for: {output_name}") + return taskinfo['outputUri'] + filename + + def get_task_output_filename(self, task, output_name): + ''' Retrieve the output field of the task and return filename''' + self.logger.info("TBD: Getting output filename for task %s", task) + return None + + def get_tasks_by_name(self, project, task_name): + ''' Get a tasks by its name ''' + tasks = [] + runs = self.api.list_runs(name=task_name) + for item in runs['items']: + run = self.api.get_run(id=item['id']) + if 'ProjectId' in project: + if run['tags']['ProjectId'] == project['ProjectId']: + tasks.append(run) + elif 'ProjectName' in project: + if run['tags']['ProjectName'] == project['ProjectName']: + tasks.append(run) + return tasks + + def get_project(self): + ''' + Since there is no concept of project in Omics, raise an error. + ''' + raise ValueError("Omics does not support project. Use get_project_by_id or get_project_by_name instead.") + + def get_project_by_name(self, project_name): + ''' Return a dictionary of project to provide project_name tag info for omics jobs ''' + project = { + 'ProjectName': project_name + } + return project + + def get_project_by_id(self, project_id): + ''' Return a dictionary of project to provide project_id tag info for omics jobs''' + project = { + 'ProjectId': project_id + } + return project + + def stage_task_output(self, task, project, output_to_export, output_directory_name, download=False): + ''' TODO ''' + return + + def submit_task(self, name, project, workflow, parameters): + ''' + Submit workflow for one sample. + name: sample ID. + project: dictionary of {'ProjectName':'string'} or {'ProjectId':'string'}, used for add run tag. + workflow: workflow ID in omics. + parameters: dictionary of input parameters. + + return omics response for start_run. + ''' + # A subfolder is created with the Run ID assigned by Omics such that task output is seperate. + base_output_path = 's3://bmsrd-ngs-omics/runs' + + try: + logger.debug("Starting run for %s", name) + # TODO: The roleArn should be a parameter to this function, and not hard-coded. + # Put this in the pipeline_config.py. + job = self.api.start_run(workflowId=workflow, + workflowType='PRIVATE', + roleArn='arn:aws:iam::483421617021:role/ngs360-servicerole', + parameters=parameters, + name=name, + tags=project, + outputUri=base_output_path) + logger.info('Started run for %s, RunID: %s',name,job['id']) + return job + except botocore.exceptions.ClientError as err: + logger.error('Could not start run for %s: %s', name, err) + return None + + def upload_file_to_project(self, filename, project, dest_folder, destination_filename=None, overwrite=False): # pylint: disable=too-many-arguments + self.logger.info("TBD: Uploading file %s to project %s", filename, project) + return None From 2aab38583dc9d3653d47c70cf5beedb641568c13 Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Mon, 13 May 2024 10:50:24 -0400 Subject: [PATCH 02/43] Remove hard-coded role arn --- src/cwl_platform/omics_platform.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cwl_platform/omics_platform.py b/src/cwl_platform/omics_platform.py index d3a373a9..c23c9ac8 100644 --- a/src/cwl_platform/omics_platform.py +++ b/src/cwl_platform/omics_platform.py @@ -162,7 +162,7 @@ def submit_task(self, name, project, workflow, parameters): # Put this in the pipeline_config.py. job = self.api.start_run(workflowId=workflow, workflowType='PRIVATE', - roleArn='arn:aws:iam::483421617021:role/ngs360-servicerole', + roleArn=self.role_arn, parameters=parameters, name=name, tags=project, From b47fad37552d295941c8d2e73c93e63690a04629 Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Mon, 13 May 2024 10:52:35 -0400 Subject: [PATCH 03/43] Remove hard-coded output path --- src/cwl_platform/omics_platform.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/cwl_platform/omics_platform.py b/src/cwl_platform/omics_platform.py index c23c9ac8..eff4872b 100644 --- a/src/cwl_platform/omics_platform.py +++ b/src/cwl_platform/omics_platform.py @@ -15,6 +15,7 @@ class OmicsPlatform(Platform): def __init__(self, name): super().__init__(name) self.api = None + self.role_arn = None def connect(self): ''' Connect to AWS Omics platform''' @@ -153,8 +154,7 @@ def submit_task(self, name, project, workflow, parameters): return omics response for start_run. ''' - # A subfolder is created with the Run ID assigned by Omics such that task output is seperate. - base_output_path = 's3://bmsrd-ngs-omics/runs' + base_output_path = parameters.pop('OutputUri') try: logger.debug("Starting run for %s", name) From f4b1d196daa7301067f39cd739561d0bdce47889 Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Mon, 13 May 2024 10:54:06 -0400 Subject: [PATCH 04/43] Add boto3 to requirements.txt --- requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements.txt b/requirements.txt index 5099bf4e..d1edd6b2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ +boto3 chardet arvados-python-client sevenbridges-python From aeda697361986b5639acacaeeef28f68c47ea020 Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Mon, 13 May 2024 11:05:22 -0400 Subject: [PATCH 05/43] Get role arn from connect() parameters --- src/cwl_platform/omics_platform.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/cwl_platform/omics_platform.py b/src/cwl_platform/omics_platform.py index eff4872b..4b67daa4 100644 --- a/src/cwl_platform/omics_platform.py +++ b/src/cwl_platform/omics_platform.py @@ -17,10 +17,11 @@ def __init__(self, name): self.api = None self.role_arn = None - def connect(self): + def connect(self, **kwargs): ''' Connect to AWS Omics platform''' self.api = boto3.client('omics') - + self.role_arn = kwargs.get('role_arn') + def copy_folder(self, reference_project, reference_folder, destination_project): ''' Do nothing and return reference folder, which should be an S3 path. From 4107460216d83757b376964bc8d45104afde0c0b Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Mon, 13 May 2024 11:08:25 -0400 Subject: [PATCH 06/43] Add stage_output_files --- src/cwl_platform/omics_platform.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/cwl_platform/omics_platform.py b/src/cwl_platform/omics_platform.py index 4b67daa4..109dff4f 100644 --- a/src/cwl_platform/omics_platform.py +++ b/src/cwl_platform/omics_platform.py @@ -141,6 +141,10 @@ def get_project_by_id(self, project_id): } return project + def stage_output_files(self, project, output_files): + ''' TODO ''' + return + def stage_task_output(self, task, project, output_to_export, output_directory_name, download=False): ''' TODO ''' return From 7f190ab5554b30eb19baeee1a934af3810612848 Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Mon, 13 May 2024 11:09:53 -0400 Subject: [PATCH 07/43] Update get_project --- src/cwl_platform/omics_platform.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/cwl_platform/omics_platform.py b/src/cwl_platform/omics_platform.py index 109dff4f..b187794a 100644 --- a/src/cwl_platform/omics_platform.py +++ b/src/cwl_platform/omics_platform.py @@ -123,9 +123,9 @@ def get_tasks_by_name(self, project, task_name): def get_project(self): ''' - Since there is no concept of project in Omics, raise an error. + Since there is no concept of project in Omics, return None ''' - raise ValueError("Omics does not support project. Use get_project_by_id or get_project_by_name instead.") + return None def get_project_by_name(self, project_name): ''' Return a dictionary of project to provide project_name tag info for omics jobs ''' From 3e45a6ffe0b8269c4652523b0d6e5551a3bf3391 Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Mon, 13 May 2024 11:11:24 -0400 Subject: [PATCH 08/43] Revert get_project --- src/cwl_platform/omics_platform.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/cwl_platform/omics_platform.py b/src/cwl_platform/omics_platform.py index b187794a..da83370b 100644 --- a/src/cwl_platform/omics_platform.py +++ b/src/cwl_platform/omics_platform.py @@ -123,9 +123,9 @@ def get_tasks_by_name(self, project, task_name): def get_project(self): ''' - Since there is no concept of project in Omics, return None + Since there is no concept of project in Omics, raise an error. ''' - return None + raise ValueError("Omics does not support get_project. Use get_project_by_id or get_project_by_name instead.") def get_project_by_name(self, project_name): ''' Return a dictionary of project to provide project_name tag info for omics jobs ''' From 4e5cdfae2441d5cfd3a76b55baafc1951d58efda Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Mon, 13 May 2024 11:24:28 -0400 Subject: [PATCH 09/43] Add notes for copy_folder --- src/cwl_platform/omics_platform.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/cwl_platform/omics_platform.py b/src/cwl_platform/omics_platform.py index da83370b..77c64569 100644 --- a/src/cwl_platform/omics_platform.py +++ b/src/cwl_platform/omics_platform.py @@ -25,6 +25,9 @@ def connect(self, **kwargs): def copy_folder(self, reference_project, reference_folder, destination_project): ''' Do nothing and return reference folder, which should be an S3 path. + NOTE: Launchers copy the reference folder to the destination project so that everything is co-located. + However this can cause lots of data duplication in S3. For now we will just use the reference folder + until another use-case is identified that we need to copy the data. ''' return reference_folder From f6f1dd8ce7601a818655c2219237987359c355e4 Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Mon, 13 May 2024 11:26:00 -0400 Subject: [PATCH 10/43] Add trailing slash only if its missing --- src/cwl_platform/omics_platform.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/cwl_platform/omics_platform.py b/src/cwl_platform/omics_platform.py index 77c64569..8e93c130 100644 --- a/src/cwl_platform/omics_platform.py +++ b/src/cwl_platform/omics_platform.py @@ -60,6 +60,8 @@ def get_folder_id(self, project, folder_path): There is not unique ID for a folder in s3, so just return the folder_path The one caveat is that Omics wants trailing slashes on folder paths, so add one. ''' + if folder_path.endswith("/"): + return folder_path return folder_path + "/" def get_task_input(self, task, input_name): From e551a391ba9dcd10099a410c46241955f6d3992b Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Mon, 13 May 2024 11:44:41 -0400 Subject: [PATCH 11/43] Add output_bucket --- src/cwl_platform/omics_platform.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/cwl_platform/omics_platform.py b/src/cwl_platform/omics_platform.py index 8e93c130..f098d5f9 100644 --- a/src/cwl_platform/omics_platform.py +++ b/src/cwl_platform/omics_platform.py @@ -15,13 +15,15 @@ class OmicsPlatform(Platform): def __init__(self, name): super().__init__(name) self.api = None + self.output_bucket = None self.role_arn = None def connect(self, **kwargs): ''' Connect to AWS Omics platform''' self.api = boto3.client('omics') + self.outout_bucket = kwargs.get('output_bucket') self.role_arn = kwargs.get('role_arn') - + def copy_folder(self, reference_project, reference_folder, destination_project): ''' Do nothing and return reference folder, which should be an S3 path. @@ -158,13 +160,17 @@ def submit_task(self, name, project, workflow, parameters): ''' Submit workflow for one sample. name: sample ID. - project: dictionary of {'ProjectName':'string'} or {'ProjectId':'string'}, used for add run tag. + project: dictionary of {'ProjectName': 'string'} or {'ProjectId': 'string'} workflow: workflow ID in omics. parameters: dictionary of input parameters. return omics response for start_run. ''' - base_output_path = parameters.pop('OutputUri') + base_output_path = f"s3://{self.output_bucket}/" + if 'ProjectName' in project: + base_output_path += f"{project['ProjectName']}/{workflow}/{name}/" + else: + base_output_path += f"{project['ProjectId']}/{workflow}/{name}/" try: logger.debug("Starting run for %s", name) From 83803c438418912fa39a8a2c8725b40ae617597e Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Mon, 13 May 2024 11:47:50 -0400 Subject: [PATCH 12/43] Add output_bucket --- src/cwl_platform/omics_platform.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cwl_platform/omics_platform.py b/src/cwl_platform/omics_platform.py index f098d5f9..90877ff4 100644 --- a/src/cwl_platform/omics_platform.py +++ b/src/cwl_platform/omics_platform.py @@ -21,7 +21,7 @@ def __init__(self, name): def connect(self, **kwargs): ''' Connect to AWS Omics platform''' self.api = boto3.client('omics') - self.outout_bucket = kwargs.get('output_bucket') + self.output_bucket = kwargs.get('output_bucket') self.role_arn = kwargs.get('role_arn') def copy_folder(self, reference_project, reference_folder, destination_project): From 008806804076fcec4e41f54731f8af0dc0599731 Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Mon, 13 May 2024 14:59:46 -0400 Subject: [PATCH 13/43] Implement throttling to 1 TPS on Omics --- requirements.txt | 1 + src/cwl_platform/omics_platform.py | 3 +++ 2 files changed, 4 insertions(+) diff --git a/requirements.txt b/requirements.txt index d1edd6b2..dd8bb0e8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,4 @@ boto3 chardet arvados-python-client sevenbridges-python +tenacity diff --git a/src/cwl_platform/omics_platform.py b/src/cwl_platform/omics_platform.py index 90877ff4..0dace8cd 100644 --- a/src/cwl_platform/omics_platform.py +++ b/src/cwl_platform/omics_platform.py @@ -6,6 +6,8 @@ import boto3 import botocore +from tenacity import retry + from .base_platform import Platform logger = logging.getLogger(__name__) @@ -156,6 +158,7 @@ def stage_task_output(self, task, project, output_to_export, output_directory_na ''' TODO ''' return + @retry(wait=wait_fixed(2)) def submit_task(self, name, project, workflow, parameters): ''' Submit workflow for one sample. From 01e5f6fdf2a2bccec41aa5c2a9d38bbeeb41be89 Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Mon, 13 May 2024 14:59:55 -0400 Subject: [PATCH 14/43] Implement throttling to 1 TPS on Omics --- src/cwl_platform/omics_platform.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/cwl_platform/omics_platform.py b/src/cwl_platform/omics_platform.py index 0dace8cd..1d494fcc 100644 --- a/src/cwl_platform/omics_platform.py +++ b/src/cwl_platform/omics_platform.py @@ -6,7 +6,7 @@ import boto3 import botocore -from tenacity import retry +from tenacity import retry, wait_fixed from .base_platform import Platform @@ -158,7 +158,7 @@ def stage_task_output(self, task, project, output_to_export, output_directory_na ''' TODO ''' return - @retry(wait=wait_fixed(2)) + @retry(wait=wait_fixed(1)) def submit_task(self, name, project, workflow, parameters): ''' Submit workflow for one sample. From 16670134dc2f3cc454591fec6b6355c7842c61ff Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Sun, 25 Aug 2024 00:27:36 +0000 Subject: [PATCH 15/43] Adding unit tests for AWS Omics --- tests/test_omics_platform.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) create mode 100644 tests/test_omics_platform.py diff --git a/tests/test_omics_platform.py b/tests/test_omics_platform.py new file mode 100644 index 00000000..60530290 --- /dev/null +++ b/tests/test_omics_platform.py @@ -0,0 +1,23 @@ +''' +Test Module for AWS Omics Platform +''' +import unittest +import os +import mock +from mock import MagicMock, patch + +from cwl_platform.omics_platform import OmicsPlatform + +class TestOmicsPlaform(unittest.TestCase): + ''' + Test Class for Omics Platform + ''' + def setUp(self) -> None: + self.platform = OmicsPlatform('Omics') + return super().setUp() + + def runTest(self): + pass + +if __name__ == '__main__': + unittest.main() From 406bc2b35183310eaf7787bdb6d2a47d34f4e7a4 Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Sun, 25 Aug 2024 00:32:51 +0000 Subject: [PATCH 16/43] Add missing implementations for functions --- .coverage | Bin 53248 -> 0 bytes src/cwl_platform/omics_platform.py | 9 +++++++++ 2 files changed, 9 insertions(+) delete mode 100644 .coverage diff --git a/.coverage b/.coverage deleted file mode 100644 index dd294d0d04811627f6eb5fc516b248039f21ffd2..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 53248 zcmeI)*>4*~90%~(BX*oPY*1<4G*lTOAPvn))mF3*O?o_3RO&&+3li?y9w$rf-DG#2 zLluy;NJU8e1Dvl&Ji%YUAJ7NNf51!C7eFc?Ar%3?-yX4@#!Yytl6+V8dXJgena^Ba z+sRv3FFB!-RnM>6p*$w~g{FyVDTNR{^z5T&x@2j?On2y2YuWC!*(1iwTA%f{$Y(wk z*1LU|t+D(UeXsW3&)0LG^nR7Kb7eY#1p*L&00j27z}nS(wlFZD-F`o`OEnexc3JuH zy6@bz3p3X*$m=s_FI|xFF?pm%(lRzCXQb~f$$90=s#8j^S)EJ{S|prt&BzilB~p|a;!IQJ#rcFS`r z7?t*V3CkYkb2*Gl>wEQy(33UCjf&|8PUv{9RLiQ|2vud%0**3Tx)5G#X(+85T%a3q z%;Z`vI?c$N#7u%Gk*kzB*GpNO5&6;-Im)x!sF#TRy+(bWD~Dpl7Y@BnF+0T#y`-?) zuA5EjvRU4?>P*SGM&now*I8(&^~xWW&6uN6F?Q=(yUJ{UU`b=9JXO{9HOEs_5-qxZP6DaB+G!~@6 zstvcyT_xZ$l)Xje+p}ssIU5fP&$=N|)0xU;3KxbtHJ$Y)P@G!d>{%PgWDA3X+U;D_ zjFQ?)Ub1aXD6Pp&hSBs?UD-jyPY05x4U%b$<|vu=k>m^&XDpj3%nWuaP7I9nQ%o^UjAUY^HIXq_vJPHNFvgCO+WxMg*Krt=` zbWJCaQ68S0mCiKN2v#_a*~Q@~PwC{s!~AvXbZP&hje6a7E6IGt({P+Bjs4s?`8x3+ zLJ{RKb;^Lc=!P`YM3b{TVx{Si&yxwKTDL24cjroJNpWdTJ&{~I8=bd96*_e#IY^Y5 zs9`PHfh_xqPSRXDGHg7p8E|bCr+CeG$=&=`Zf!`*7LFd()*4A?jK`T`K$CjiPP3it z>TQi=!^viQxl98tz1wMRCv4Zk!MpJlcyNnruoNq#p_2DHJtKUBuUU@;J+MFk0uX=z z1Rwwb2tWV=5P$##Ah7!c3{BUveE+XoKMU(OT48|z1Rwwb2tWV=5P$##AOHafKwxJI zVkljA4GC;3PIy7f?453N6UW&x3F2tWV=5P$##AOHafKmY;| zfB*zifxK~8OFsqBdyK(c@*x0!|8H8a3G2S~nN_6?ED(SI1Rwwb2tWV=5P$##AOL|q zEpW;-#I1?)Qmr^&vqSoyDD{cJFK;-Y7L{A_oyx2V)`QW1C3xL1#LW#sH^wd5fohF& zTsOqR7IAEU(XM!Hmm1RyVQ&$wSae(`EEXwe{QbXa9TnD3*5}qNZD4@_1Rwwb2tWV= z5P$##AOHaf>}`S5#w~4&8|$FM{l9t4xVcr_jrae?31eZaIGf!6>tlwU+ajAS_y73+ ze|x)8qx2Af00bZa0SG_<0uX=z1R${M1q}M7Kp%hquUUT!dSHP71Rwwb2tWV=5P$## zAOHafKwuXN7)D<%{`>zaVf}7>ZQZe^c40A5BnUtN0uX=z1Rwwb2tWV=5P-n{7r0_} zX}L9NK05W+0drcIoPsGNt-GVgkN-?-$&usJf;Rbhze!s!t*$=TeO~AX&YYveeCCB0 zC%^naJ7kzz?u7B{$z!Y9y?giaH>%ZYCjYUP5eI*`rt{Ta)*pS_HCZ_LocXW5Djw<- zl^04_h*crJ(KNxwOxlUAEza_*)-C%*oCSJ`7S65g)y!P((D&bsnnE|@zyE(AtjE?L z)-TqBI2_gxfB*y_009U<00Izz00bZa0SN3ofed}ZpqXa$Vi?hjuAB5RK=l2;=-+uI zLXIH-0SG_<0uX=z1Rwwb2tWV=5ZJx|fB%pB|LuDrBm^J;0SG_<0uX=z1Rwwb2tZ&b k3h>|m Date: Sun, 25 Aug 2024 01:02:49 +0000 Subject: [PATCH 17/43] Add comment --- src/cwl_platform/omics_platform.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/cwl_platform/omics_platform.py b/src/cwl_platform/omics_platform.py index 65732294..1541c0cb 100644 --- a/src/cwl_platform/omics_platform.py +++ b/src/cwl_platform/omics_platform.py @@ -21,7 +21,11 @@ def __init__(self, name): self.role_arn = None def connect(self, **kwargs): - ''' Connect to AWS Omics platform''' + ''' + Connect to AWS Omics platform + + If ~/.aws/credentials or ~/.aws/config does not provide a region, region should be specified in the AWS_DEFAULT_REGION environment variable. + ''' self.api = boto3.client('omics') self.output_bucket = kwargs.get('output_bucket') self.role_arn = kwargs.get('role_arn') From e65a629e1328d2c312dee152f6c65d1d9410ef55 Mon Sep 17 00:00:00 2001 From: cheny252 Date: Thu, 16 Jan 2025 15:58:39 -0500 Subject: [PATCH 18/43] make omics consistent with base --- src/cwl_platform/omics_platform.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/src/cwl_platform/omics_platform.py b/src/cwl_platform/omics_platform.py index 1541c0cb..bab1e4e8 100644 --- a/src/cwl_platform/omics_platform.py +++ b/src/cwl_platform/omics_platform.py @@ -30,14 +30,14 @@ def connect(self, **kwargs): self.output_bucket = kwargs.get('output_bucket') self.role_arn = kwargs.get('role_arn') - def copy_folder(self, reference_project, reference_folder, destination_project): + def copy_folder(self, source_project, source_folder, destination_project): ''' Do nothing and return reference folder, which should be an S3 path. NOTE: Launchers copy the reference folder to the destination project so that everything is co-located. However this can cause lots of data duplication in S3. For now we will just use the reference folder until another use-case is identified that we need to copy the data. ''' - return reference_folder + return source_folder def copy_workflow(self, src_workflow, destination_project): '''Do nothing and return workflow id''' @@ -47,6 +47,10 @@ def copy_workflows(self, reference_project, destination_project): '''Do nothing. This function seems not used in launcher?''' pass + def create_project(self, project_name, project_description, **kwargs): + ''' Do nothing''' + pass + def delete_task(self, task): ''' Delete a task/workflow/process ''' self.logger.info('TBD: Deleting task %s', task) @@ -157,6 +161,10 @@ def get_project_by_id(self, project_id): } return project + def get_user(self, user): + '''Get a user object from their (platform) user id or email address''' + raise ValueError("Not yet implemented") + def rename_file(self, fileid, new_filename): raise ValueError("Not yet implemented") @@ -167,12 +175,12 @@ def stage_output_files(self, project, output_files): ''' TODO ''' return - def stage_task_output(self, task, project, output_to_export, output_directory_name, download=False): + def stage_task_output(self, task, project, output_to_export, output_directory_name): ''' TODO ''' return @retry(wait=wait_fixed(1)) - def submit_task(self, name, project, workflow, parameters): + def submit_task(self, name, project, workflow, parameters, execution_settings=None): ''' Submit workflow for one sample. name: sample ID. From 525ca415586e031169f021ef0d3eb2301ee04d72 Mon Sep 17 00:00:00 2001 From: cheny252 Date: Sun, 26 Jan 2025 00:11:29 -0500 Subject: [PATCH 19/43] add function to support launcher; --- src/cwl_platform/omics_platform.py | 74 +++++++++++++++++++++--------- 1 file changed, 52 insertions(+), 22 deletions(-) diff --git a/src/cwl_platform/omics_platform.py b/src/cwl_platform/omics_platform.py index bab1e4e8..ab600a05 100644 --- a/src/cwl_platform/omics_platform.py +++ b/src/cwl_platform/omics_platform.py @@ -6,7 +6,7 @@ import boto3 import botocore -from tenacity import retry, wait_fixed +from tenacity import retry, wait_fixed, stop_after_attempt from .base_platform import Platform @@ -20,15 +20,36 @@ def __init__(self, name): self.output_bucket = None self.role_arn = None + def _list_file_in_s3(self, s3path): + ''' + List all files in S3 path. + s3path: S3 path to directory, formatted as s3://bucket/path/to/folder/. + + return list of files within the directory with full s3 path. + + ''' + bucket = s3path.split('s3://')[1].split('/')[0] + prefix = s3path.split(bucket+'/')[1] + response = self.s3_client.list_objects_v2( + Bucket=bucket, + Prefix=prefix) + files = [] + for element in response.get('Contents', []): + files += ['s3://'+bucket+'/'+element['Key']] + return files + def connect(self, **kwargs): ''' Connect to AWS Omics platform If ~/.aws/credentials or ~/.aws/config does not provide a region, region should be specified in the AWS_DEFAULT_REGION environment variable. ''' - self.api = boto3.client('omics') - self.output_bucket = kwargs.get('output_bucket') - self.role_arn = kwargs.get('role_arn') + self.api = boto3.client('omics', # TODO - Still needs to provide aws key here + region_name='us-east-1') + self.output_bucket = kwargs.get('output_bucket', "bmsrd-ngs-omics/Outputs") + self.role_arn = kwargs.get('role_arn', "arn:aws:iam::483421617021:role/ngs360-servicerole") + self.s3_client = boto3.client('s3', # TODO - Still needs to provide aws key here + region_name='us-east-1') def copy_folder(self, source_project, source_folder, destination_project): ''' @@ -112,15 +133,28 @@ def get_task_state(self, task, refresh=False): def get_task_output(self, task, output_name): ''' Retrieve the output field of the task ''' - taskinfo = self.api.get_run(id=task) - # TODO: get_run only returns OutputUri. Get file path based on output_name (filename)? - filename = None - # TODO: We shouldn't be hard-coding stuff like this. these functions should be very generic. - if output_name == 'RecalibratedBAM': - filename = taskinfo.name + '.bam' - if filename == None: - raise ValueError(f"Cannot find output file for: {output_name}") - return taskinfo['outputUri'] + filename + # Get all files in output folder + taskinfo = self.api.get_run(id=task["id"]) + outputUri = taskinfo['outputUri'] + task["id"] + "/out/" + outputUri_files = self._list_file_in_s3(outputUri) + + # Match output_name with existing files in output folder + if "*" not in output_name: + output = outputUri + output_name + if output in outputUri_files: + return output + else: + return None + else: + output = [] + prefix=output_name.split('*')[0] + suffix=output_name.split('*')[-1] + for output_file in outputUri_files: + filename = output_file.split('/')[-1] + if (prefix == "" or filename.startswith(prefix)) and \ + (suffix == "" or filename.endswith(suffix)): + output+=[output_file] + return output def get_task_output_filename(self, task, output_name): ''' Retrieve the output field of the task and return filename''' @@ -130,15 +164,10 @@ def get_task_output_filename(self, task, output_name): def get_tasks_by_name(self, project, task_name): ''' Get a tasks by its name ''' tasks = [] - runs = self.api.list_runs(name=task_name) + runs = self.api.list_runs(name=task_name, runGroupId=project['ProjectId']) for item in runs['items']: run = self.api.get_run(id=item['id']) - if 'ProjectId' in project: - if run['tags']['ProjectId'] == project['ProjectId']: - tasks.append(run) - elif 'ProjectName' in project: - if run['tags']['ProjectName'] == project['ProjectName']: - tasks.append(run) + tasks.append(run) return tasks def get_project(self): @@ -179,7 +208,7 @@ def stage_task_output(self, task, project, output_to_export, output_directory_na ''' TODO ''' return - @retry(wait=wait_fixed(1)) + @retry(wait=wait_fixed(1), stop=stop_after_attempt(3)) def submit_task(self, name, project, workflow, parameters, execution_settings=None): ''' Submit workflow for one sample. @@ -205,7 +234,8 @@ def submit_task(self, name, project, workflow, parameters, execution_settings=No roleArn=self.role_arn, parameters=parameters, name=name, - tags=project, + runGroupId=project["ProjectId"], + tags={"ProjectId": project["ProjectId"]}, outputUri=base_output_path) logger.info('Started run for %s, RunID: %s',name,job['id']) return job From 5ff2af7da4b455761946213947952735e929e887 Mon Sep 17 00:00:00 2001 From: cheny252 Date: Sun, 26 Jan 2025 00:15:42 -0500 Subject: [PATCH 20/43] add s3 client to init --- src/cwl_platform/omics_platform.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/cwl_platform/omics_platform.py b/src/cwl_platform/omics_platform.py index ab600a05..ba6c8c6c 100644 --- a/src/cwl_platform/omics_platform.py +++ b/src/cwl_platform/omics_platform.py @@ -19,6 +19,7 @@ def __init__(self, name): self.api = None self.output_bucket = None self.role_arn = None + self.s3_client = None def _list_file_in_s3(self, s3path): ''' @@ -44,12 +45,10 @@ def connect(self, **kwargs): If ~/.aws/credentials or ~/.aws/config does not provide a region, region should be specified in the AWS_DEFAULT_REGION environment variable. ''' - self.api = boto3.client('omics', # TODO - Still needs to provide aws key here - region_name='us-east-1') + self.api = boto3.client('omics', region_name='us-east-1') self.output_bucket = kwargs.get('output_bucket', "bmsrd-ngs-omics/Outputs") self.role_arn = kwargs.get('role_arn', "arn:aws:iam::483421617021:role/ngs360-servicerole") - self.s3_client = boto3.client('s3', # TODO - Still needs to provide aws key here - region_name='us-east-1') + self.s3_client = boto3.client('s3', region_name='us-east-1') def copy_folder(self, source_project, source_folder, destination_project): ''' From 78e4f9d25a46367ff1ad8192b18c78675a577bc5 Mon Sep 17 00:00:00 2001 From: cheny252 Date: Fri, 14 Mar 2025 01:24:03 -0400 Subject: [PATCH 21/43] update upload_file function name --- src/cwl_platform/omics_platform.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cwl_platform/omics_platform.py b/src/cwl_platform/omics_platform.py index ba6c8c6c..75b44b35 100644 --- a/src/cwl_platform/omics_platform.py +++ b/src/cwl_platform/omics_platform.py @@ -242,6 +242,6 @@ def submit_task(self, name, project, workflow, parameters, execution_settings=No logger.error('Could not start run for %s: %s', name, err) return None - def upload_file_to_project(self, filename, project, dest_folder, destination_filename=None, overwrite=False): # pylint: disable=too-many-arguments + def upload_file(self, filename, project, dest_folder, destination_filename=None, overwrite=False): # pylint: disable=too-many-arguments self.logger.info("TBD: Uploading file %s to project %s", filename, project) return None From dce5a5ea89c1e69e26e96a57e313dbf0eb98722b Mon Sep 17 00:00:00 2001 From: cheny252 Date: Fri, 14 Mar 2025 02:09:44 -0400 Subject: [PATCH 22/43] add omics function --- src/cwl_platform/omics_platform.py | 113 ++++++++++++++++++++++++++--- 1 file changed, 103 insertions(+), 10 deletions(-) diff --git a/src/cwl_platform/omics_platform.py b/src/cwl_platform/omics_platform.py index 75b44b35..fbfd44fc 100644 --- a/src/cwl_platform/omics_platform.py +++ b/src/cwl_platform/omics_platform.py @@ -19,7 +19,6 @@ def __init__(self, name): self.api = None self.output_bucket = None self.role_arn = None - self.s3_client = None def _list_file_in_s3(self, s3path): ''' @@ -45,10 +44,12 @@ def connect(self, **kwargs): If ~/.aws/credentials or ~/.aws/config does not provide a region, region should be specified in the AWS_DEFAULT_REGION environment variable. ''' - self.api = boto3.client('omics', region_name='us-east-1') - self.output_bucket = kwargs.get('output_bucket', "bmsrd-ngs-omics/Outputs") + self.api = boto3.client('omics', + region_name='us-east-1') + self.output_bucket = kwargs.get('output_bucket', "bmsrd-ngs-omics") self.role_arn = kwargs.get('role_arn', "arn:aws:iam::483421617021:role/ngs360-servicerole") - self.s3_client = boto3.client('s3', region_name='us-east-1') + self.s3_client = boto3.client('s3', + region_name='us-east-1') def copy_folder(self, source_project, source_folder, destination_project): ''' @@ -59,6 +60,30 @@ def copy_folder(self, source_project, source_folder, destination_project): ''' return source_folder + def download_file(self, file, dest_folder): + """ + Download a file to a local directory + :param file: SevenBridges file id (or object) to download + :param dest_folder: Destination folder to download file to + :return: Name of local file downloaded or None + """ + ''' TODO ''' + return + + def export_file(self, file, bucket_name, prefix): + """ + Use platform specific functionality to copy a file from a platform to an S3 bucket. + :param file: File to export + :param bucket_name: S3 bucket name + :param prefix: Destination S3 folder to export file to, path/to/folder + :return: Export job of file + For SevenBridges, there are two differences from the expected base implementation: + 1. the bucket name is translated to a volume name, replacing all dashes with underscores. + 2. the return value is the export job object, not the S3 file path. + """ + ''' TODO ''' + return + def copy_workflow(self, src_workflow, destination_project): '''Do nothing and return workflow id''' return src_workflow @@ -67,10 +92,27 @@ def copy_workflows(self, reference_project, destination_project): '''Do nothing. This function seems not used in launcher?''' pass + def get_workflows(self, project): + ''' + Get workflows in a project + + :param: Platform Project + :return: List of workflows + ''' + ''' TODO ''' + return + def create_project(self, project_name, project_description, **kwargs): ''' Do nothing''' pass + def delete_project_by_name(self, project_name): + ''' + Delete a project on the platform + ''' + ''' TODO ''' + return + def delete_task(self, task): ''' Delete a task/workflow/process ''' self.logger.info('TBD: Deleting task %s', task) @@ -87,6 +129,24 @@ def get_file_id(self, project, file_path): '''Return file s3 path for Omics job input''' return file_path + def get_files(self, project, filters=None): + """ + Retrieve files in a project matching the filter criteria + + :param project: Project to search for files + :param filters: Dictionary containing filter criteria + { + 'name': 'file_name', + 'prefix': 'file_prefix', + 'suffix': 'file_suffix', + 'folder': 'folder_name', + 'recursive': True/False + } + :return: List of tuples (file path, file object) matching filter criteria + """ + ''' TODO ''' + return + def get_folder_id(self, project, folder_path): ''' There is not unique ID for a folder in s3, so just return the folder_path @@ -102,7 +162,8 @@ def get_task_input(self, task, input_name): return None def get_task_outputs(self, task): - raise ValueError("Not yet implemented") + self.logger.info('TBD: Getting task outputs %s', task) + return [] def get_task_state(self, task, refresh=False): ''' @@ -158,7 +219,12 @@ def get_task_output(self, task, output_name): def get_task_output_filename(self, task, output_name): ''' Retrieve the output field of the task and return filename''' self.logger.info("TBD: Getting output filename for task %s", task) - return None + task_output_url = self.get_task_output(task, output_name) + if isinstance(task_output_url, list): + task_output_name = [fileurl.split('/')[-1] for fileurl in task_output_url] + else: + task_output_name = task_output_url.split('/')[-1] + return task_output_name def get_tasks_by_name(self, project, task_name): ''' Get a tasks by its name ''' @@ -189,6 +255,21 @@ def get_project_by_id(self, project_id): } return project + def get_project_users(self, project): + ''' Return a list of user objects associated with a project ''' + ''' TODO ''' + return None + + def add_user_to_project(self, platform_user, project, permission): + """ + Add a user to a project on the platform + :param platform_user: platform user (from get_user) + :param project: platform project + :param permission: permission (permission="read|write|execute|admin") + """ + ''' TODO ''' + return + def get_user(self, user): '''Get a user object from their (platform) user id or email address''' raise ValueError("Not yet implemented") @@ -218,7 +299,7 @@ def submit_task(self, name, project, workflow, parameters, execution_settings=No return omics response for start_run. ''' - base_output_path = f"s3://{self.output_bucket}/" + base_output_path = f"s3://{self.output_bucket}/Outputs" if 'ProjectName' in project: base_output_path += f"{project['ProjectName']}/{workflow}/{name}/" else: @@ -234,7 +315,7 @@ def submit_task(self, name, project, workflow, parameters, execution_settings=No parameters=parameters, name=name, runGroupId=project["ProjectId"], - tags={"ProjectId": project["ProjectId"]}, + tags={"Project": project["ProjectId"]}, outputUri=base_output_path) logger.info('Started run for %s, RunID: %s',name,job['id']) return job @@ -243,5 +324,17 @@ def submit_task(self, name, project, workflow, parameters, execution_settings=No return None def upload_file(self, filename, project, dest_folder, destination_filename=None, overwrite=False): # pylint: disable=too-many-arguments - self.logger.info("TBD: Uploading file %s to project %s", filename, project) - return None + self.logger.info("Uploading file %s to project %s", filename, project) + target_bucket = self.output_bucket + target_filepath = f"Outputs/{project['ProjectId']}" + dest_folder + if destination_filename: + target_filepath += destination_filename + else: + target_filepath += filename.split('/')[-1] + try: + self.s3_client.upload_file(filename, target_bucket, target_filepath) + file_id = f"s3://{self.output_bucket}/"+target_filepath + return file_id + except Exception as e: + self.logger.error('Could not upload file %s', filename) + return None From d2d09506ee90f33dd38e5762def0314b37de6e9b Mon Sep 17 00:00:00 2001 From: cheny252 Date: Mon, 17 Mar 2025 15:39:01 -0400 Subject: [PATCH 23/43] remove space in output url --- src/cwl_platform/omics_platform.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/cwl_platform/omics_platform.py b/src/cwl_platform/omics_platform.py index fbfd44fc..ab5057fb 100644 --- a/src/cwl_platform/omics_platform.py +++ b/src/cwl_platform/omics_platform.py @@ -301,9 +301,9 @@ def submit_task(self, name, project, workflow, parameters, execution_settings=No ''' base_output_path = f"s3://{self.output_bucket}/Outputs" if 'ProjectName' in project: - base_output_path += f"{project['ProjectName']}/{workflow}/{name}/" + base_output_path += f"{project['ProjectName']}/{workflow}/{name.replace(' ','')}/" else: - base_output_path += f"{project['ProjectId']}/{workflow}/{name}/" + base_output_path += f"{project['ProjectId']}/{workflow}/{name.replace(' ','')}/" try: logger.debug("Starting run for %s", name) From 8b101b488cf9fc83dd38e9307206d6a933456497 Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Tue, 18 Mar 2025 02:55:25 +0000 Subject: [PATCH 24/43] Add TODO comments --- src/cwl_platform/omics_platform.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/cwl_platform/omics_platform.py b/src/cwl_platform/omics_platform.py index ab5057fb..0093b614 100644 --- a/src/cwl_platform/omics_platform.py +++ b/src/cwl_platform/omics_platform.py @@ -46,7 +46,9 @@ def connect(self, **kwargs): ''' self.api = boto3.client('omics', region_name='us-east-1') + # TODO: Remove hard-coded bucket self.output_bucket = kwargs.get('output_bucket', "bmsrd-ngs-omics") + # TODO: Remove hard-coded arn self.role_arn = kwargs.get('role_arn', "arn:aws:iam::483421617021:role/ngs360-servicerole") self.s3_client = boto3.client('s3', region_name='us-east-1') From 32318d6a5be2fb43578252eac908f1ec601d0ed3 Mon Sep 17 00:00:00 2001 From: cheny252 Date: Tue, 1 Apr 2025 11:35:17 -0400 Subject: [PATCH 25/43] update storagetype for omics --- src/cwl_platform/omics_platform.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/cwl_platform/omics_platform.py b/src/cwl_platform/omics_platform.py index 0093b614..7df8ebec 100644 --- a/src/cwl_platform/omics_platform.py +++ b/src/cwl_platform/omics_platform.py @@ -318,7 +318,8 @@ def submit_task(self, name, project, workflow, parameters, execution_settings=No name=name, runGroupId=project["ProjectId"], tags={"Project": project["ProjectId"]}, - outputUri=base_output_path) + outputUri=base_output_path, + storageType="DYNAMIC") logger.info('Started run for %s, RunID: %s',name,job['id']) return job except botocore.exceptions.ClientError as err: From 6c54636c3dd34e05eb23905ccddc4d2d2ae6d91c Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Wed, 9 Apr 2025 16:00:37 +0000 Subject: [PATCH 26/43] Create a run group for omics when calling create_project --- src/cwl_platform/omics_platform.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/cwl_platform/omics_platform.py b/src/cwl_platform/omics_platform.py index 7df8ebec..3df2ac51 100644 --- a/src/cwl_platform/omics_platform.py +++ b/src/cwl_platform/omics_platform.py @@ -105,8 +105,10 @@ def get_workflows(self, project): return def create_project(self, project_name, project_description, **kwargs): - ''' Do nothing''' - pass + ''' + Create project + ''' + self.api.create_run_group(name=project_name) def delete_project_by_name(self, project_name): ''' From 190a78ff393ab9263a607e5598138dc76c3e07e7 Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Wed, 9 Apr 2025 16:10:38 +0000 Subject: [PATCH 27/43] Return the rungroupid --- src/cwl_platform/omics_platform.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/cwl_platform/omics_platform.py b/src/cwl_platform/omics_platform.py index 3df2ac51..0ab35107 100644 --- a/src/cwl_platform/omics_platform.py +++ b/src/cwl_platform/omics_platform.py @@ -247,8 +247,12 @@ def get_project(self): def get_project_by_name(self, project_name): ''' Return a dictionary of project to provide project_name tag info for omics jobs ''' + response = self.api.list_run_groups(name=project_name) + run_group_id = response['items'][0]['id'] + project = { - 'ProjectName': project_name + 'ProjectName': project_name, + 'ProjectId': run_group_id } return project From 1eb5a1ab768c74e687b65a41d19f6a196d49d4be Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Wed, 9 Apr 2025 12:22:29 -0400 Subject: [PATCH 28/43] Add missing get_project_cost --- src/cwl_platform/omics_platform.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/cwl_platform/omics_platform.py b/src/cwl_platform/omics_platform.py index 0ab35107..fe4a8c0c 100644 --- a/src/cwl_platform/omics_platform.py +++ b/src/cwl_platform/omics_platform.py @@ -263,6 +263,11 @@ def get_project_by_id(self, project_id): } return project + def get_project_cost(self, project): + ''' Return project cost ''' + # TODO: Return total cost from run_group_id + return 0 + def get_project_users(self, project): ''' Return a list of user objects associated with a project ''' ''' TODO ''' From fc4f27c5314e312cd9b121f0efd19353a783bd96 Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Wed, 9 Apr 2025 12:26:55 -0400 Subject: [PATCH 29/43] Add stubs to missing methods --- src/cwl_platform/omics_platform.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/cwl_platform/omics_platform.py b/src/cwl_platform/omics_platform.py index fe4a8c0c..2ddc6f40 100644 --- a/src/cwl_platform/omics_platform.py +++ b/src/cwl_platform/omics_platform.py @@ -129,6 +129,10 @@ def get_current_task(self): ''' Get the current task ''' return None + def get_task_cost(self, task): + ''' Return task cost ''' + return 0 + def get_file_id(self, project, file_path): '''Return file s3 path for Omics job input''' return file_path @@ -273,6 +277,11 @@ def get_project_users(self, project): ''' TODO ''' return None + def get_projects(self): + ''' Get list of all projects ''' + # TODO: I think this should return a list of project names using run group id + return None + def add_user_to_project(self, platform_user, project, permission): """ Add a user to a project on the platform From 7ab222810575ea321ed32a9fc27192b698412c81 Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Wed, 9 Apr 2025 16:34:04 +0000 Subject: [PATCH 30/43] Revert run_group_id -> projectid --- src/cwl_platform/omics_platform.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/cwl_platform/omics_platform.py b/src/cwl_platform/omics_platform.py index 2ddc6f40..f52ca7aa 100644 --- a/src/cwl_platform/omics_platform.py +++ b/src/cwl_platform/omics_platform.py @@ -251,12 +251,8 @@ def get_project(self): def get_project_by_name(self, project_name): ''' Return a dictionary of project to provide project_name tag info for omics jobs ''' - response = self.api.list_run_groups(name=project_name) - run_group_id = response['items'][0]['id'] - project = { - 'ProjectName': project_name, - 'ProjectId': run_group_id + 'ProjectName': project_name } return project From d2be6eac627d28e04b92ccdb8c12dc40344416d8 Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Fri, 11 Apr 2025 13:40:31 -0400 Subject: [PATCH 31/43] Remove hardcoded values from omics_platform.py --- src/cwl_platform/omics_platform.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/src/cwl_platform/omics_platform.py b/src/cwl_platform/omics_platform.py index f52ca7aa..17877162 100644 --- a/src/cwl_platform/omics_platform.py +++ b/src/cwl_platform/omics_platform.py @@ -44,14 +44,10 @@ def connect(self, **kwargs): If ~/.aws/credentials or ~/.aws/config does not provide a region, region should be specified in the AWS_DEFAULT_REGION environment variable. ''' - self.api = boto3.client('omics', - region_name='us-east-1') - # TODO: Remove hard-coded bucket - self.output_bucket = kwargs.get('output_bucket', "bmsrd-ngs-omics") - # TODO: Remove hard-coded arn - self.role_arn = kwargs.get('role_arn', "arn:aws:iam::483421617021:role/ngs360-servicerole") - self.s3_client = boto3.client('s3', - region_name='us-east-1') + self.api = boto3.client('omics') + self.output_bucket = kwargs.get('output_bucket', None) + self.role_arn = kwargs.get('role_arn', None) + self.s3_client = boto3.client('s3') def copy_folder(self, source_project, source_folder, destination_project): ''' From d12a9f9d13768c739fdb1255a891219006ed6c6a Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Fri, 11 Apr 2025 13:44:10 -0400 Subject: [PATCH 32/43] fix output path in omics_platform.py --- src/cwl_platform/omics_platform.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cwl_platform/omics_platform.py b/src/cwl_platform/omics_platform.py index 17877162..1e931447 100644 --- a/src/cwl_platform/omics_platform.py +++ b/src/cwl_platform/omics_platform.py @@ -313,7 +313,7 @@ def submit_task(self, name, project, workflow, parameters, execution_settings=No return omics response for start_run. ''' - base_output_path = f"s3://{self.output_bucket}/Outputs" + base_output_path = f"s3://{self.output_bucket}/outputs/" if 'ProjectName' in project: base_output_path += f"{project['ProjectName']}/{workflow}/{name.replace(' ','')}/" else: From f3893237a1ba2e951878bfb3f0eabcdeacce807a Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Fri, 11 Apr 2025 23:23:31 +0000 Subject: [PATCH 33/43] Force failure if output_bucket and role_arn are not provided --- src/cwl_platform/omics_platform.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/cwl_platform/omics_platform.py b/src/cwl_platform/omics_platform.py index 1e931447..e59cfc60 100644 --- a/src/cwl_platform/omics_platform.py +++ b/src/cwl_platform/omics_platform.py @@ -45,8 +45,8 @@ def connect(self, **kwargs): If ~/.aws/credentials or ~/.aws/config does not provide a region, region should be specified in the AWS_DEFAULT_REGION environment variable. ''' self.api = boto3.client('omics') - self.output_bucket = kwargs.get('output_bucket', None) - self.role_arn = kwargs.get('role_arn', None) + self.output_bucket = kwargs['output_bucket'] + self.role_arn = kwargs['role_arn'] self.s3_client = boto3.client('s3') def copy_folder(self, source_project, source_folder, destination_project): From 97a76b623226a6faa3a0d9e3a9803834f1b25b7b Mon Sep 17 00:00:00 2001 From: cheny252 Date: Wed, 7 May 2025 09:38:23 -0400 Subject: [PATCH 34/43] add get_project_by_name for omics --- src/cwl_platform/omics_platform.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/cwl_platform/omics_platform.py b/src/cwl_platform/omics_platform.py index e59cfc60..d6c8db9b 100644 --- a/src/cwl_platform/omics_platform.py +++ b/src/cwl_platform/omics_platform.py @@ -246,9 +246,15 @@ def get_project(self): raise ValueError("Omics does not support get_project. Use get_project_by_id or get_project_by_name instead.") def get_project_by_name(self, project_name): - ''' Return a dictionary of project to provide project_name tag info for omics jobs ''' + ''' Return a dictionary of project to provide project_name and project_id tag info for omics jobs ''' + response = self.api.list_run_groups( + name=project_name, maxResults=100 + ) + run_group_id = response['items'][0]['id'] + project = { - 'ProjectName': project_name + 'ProjectName': project_name, + 'ProjectId': run_group_id } return project From 78e1d98fcf74a0fdf8b58b54a180806eaee9509e Mon Sep 17 00:00:00 2001 From: cheny252 Date: Wed, 7 May 2025 10:13:13 -0400 Subject: [PATCH 35/43] allow not finding project by name --- src/cwl_platform/omics_platform.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/src/cwl_platform/omics_platform.py b/src/cwl_platform/omics_platform.py index d6c8db9b..8674fb54 100644 --- a/src/cwl_platform/omics_platform.py +++ b/src/cwl_platform/omics_platform.py @@ -250,13 +250,16 @@ def get_project_by_name(self, project_name): response = self.api.list_run_groups( name=project_name, maxResults=100 ) - run_group_id = response['items'][0]['id'] + if len(response['items'])>0: + run_group_id = response['items'][0]['id'] - project = { - 'ProjectName': project_name, - 'ProjectId': run_group_id - } - return project + project = { + 'ProjectId': run_group_id + } + return project + else: + logger.error('Could not find project with name: %s', project_name) + return {} def get_project_by_id(self, project_id): ''' Return a dictionary of project to provide project_id tag info for omics jobs''' @@ -319,7 +322,7 @@ def submit_task(self, name, project, workflow, parameters, execution_settings=No return omics response for start_run. ''' - base_output_path = f"s3://{self.output_bucket}/outputs/" + base_output_path = f"s3://{self.output_bucket}/Project/" if 'ProjectName' in project: base_output_path += f"{project['ProjectName']}/{workflow}/{name.replace(' ','')}/" else: @@ -347,7 +350,7 @@ def submit_task(self, name, project, workflow, parameters, execution_settings=No def upload_file(self, filename, project, dest_folder, destination_filename=None, overwrite=False): # pylint: disable=too-many-arguments self.logger.info("Uploading file %s to project %s", filename, project) target_bucket = self.output_bucket - target_filepath = f"Outputs/{project['ProjectId']}" + dest_folder + target_filepath = f"Project/{project['ProjectId']}" + dest_folder if destination_filename: target_filepath += destination_filename else: From ffcd06ad968e08c2c19b6be0fa3297edd36bdc4d Mon Sep 17 00:00:00 2001 From: cheny252 Date: Thu, 15 May 2025 14:28:46 -0400 Subject: [PATCH 36/43] update get task output --- src/cwl_platform/omics_platform.py | 34 ++++++++++++------------------ 1 file changed, 14 insertions(+), 20 deletions(-) diff --git a/src/cwl_platform/omics_platform.py b/src/cwl_platform/omics_platform.py index 8674fb54..9efa9398 100644 --- a/src/cwl_platform/omics_platform.py +++ b/src/cwl_platform/omics_platform.py @@ -5,6 +5,7 @@ import logging import boto3 import botocore +import json from tenacity import retry, wait_fixed, stop_after_attempt @@ -197,28 +198,21 @@ def get_task_state(self, task, refresh=False): def get_task_output(self, task, output_name): ''' Retrieve the output field of the task ''' - # Get all files in output folder + # Get output file mapping from outputs.json taskinfo = self.api.get_run(id=task["id"]) - outputUri = taskinfo['outputUri'] + task["id"] + "/out/" - outputUri_files = self._list_file_in_s3(outputUri) - - # Match output_name with existing files in output folder - if "*" not in output_name: - output = outputUri + output_name - if output in outputUri_files: - return output - else: - return None + output_json = taskinfo['outputUri'].split(self.output_bucket+'/')[1] + task["id"] + "/logs/outputs.json" + response = self.s3_client.get_object(Bucket=self.output_bucket, Key=output_json) + content = response['Body'].read().decode("utf-8") + mapping = json.loads(content) + + if output_name not in mapping: + raise KeyError(f"Output field '{output_name}' not found in mapping file.") + all_outputs = mapping[output_name] + if isinstance(all_outputs, list): + outputs = [c["location"] for c in all_outputs] + return outputs else: - output = [] - prefix=output_name.split('*')[0] - suffix=output_name.split('*')[-1] - for output_file in outputUri_files: - filename = output_file.split('/')[-1] - if (prefix == "" or filename.startswith(prefix)) and \ - (suffix == "" or filename.endswith(suffix)): - output+=[output_file] - return output + return all_outputs["location"] def get_task_output_filename(self, task, output_name): ''' Retrieve the output field of the task and return filename''' From 7219cf9c15a712f56a2f5b3e29d733662b3070ca Mon Sep 17 00:00:00 2001 From: cheny252 Date: Mon, 19 May 2025 22:51:21 -0400 Subject: [PATCH 37/43] update omics formats --- src/cwl_platform/omics_platform.py | 48 +++++++++++++++--------------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/src/cwl_platform/omics_platform.py b/src/cwl_platform/omics_platform.py index 9efa9398..01b0f644 100644 --- a/src/cwl_platform/omics_platform.py +++ b/src/cwl_platform/omics_platform.py @@ -3,9 +3,9 @@ ''' import logging +import json import boto3 import botocore -import json from tenacity import retry, wait_fixed, stop_after_attempt @@ -43,7 +43,8 @@ def connect(self, **kwargs): ''' Connect to AWS Omics platform - If ~/.aws/credentials or ~/.aws/config does not provide a region, region should be specified in the AWS_DEFAULT_REGION environment variable. + If ~/.aws/credentials or ~/.aws/config does not provide a region, region + should be specified in the AWS_DEFAULT_REGION environment variable. ''' self.api = boto3.client('omics') self.output_bucket = kwargs['output_bucket'] @@ -206,13 +207,15 @@ def get_task_output(self, task, output_name): mapping = json.loads(content) if output_name not in mapping: - raise KeyError(f"Output field '{output_name}' not found in mapping file.") + raise KeyError(f"Output field '{output_name}' not found in output json file.") all_outputs = mapping[output_name] if isinstance(all_outputs, list): - outputs = [c["location"] for c in all_outputs] + outputs = [c["location"] for c in all_outputs if "location" in c] return outputs - else: + try: return all_outputs["location"] + except KeyError: + raise KeyError(f"Could not find path for '{output_name}'") def get_task_output_filename(self, task, output_name): ''' Retrieve the output field of the task and return filename''' @@ -224,10 +227,10 @@ def get_task_output_filename(self, task, output_name): task_output_name = task_output_url.split('/')[-1] return task_output_name - def get_tasks_by_name(self, project, task_name): + def get_tasks_by_name(self, project, task_name=None): ''' Get a tasks by its name ''' tasks = [] - runs = self.api.list_runs(name=task_name, runGroupId=project['ProjectId']) + runs = self.api.list_runs(name=task_name, runGroupId=project['RunGroupId']) for item in runs['items']: run = self.api.get_run(id=item['id']) tasks.append(run) @@ -240,7 +243,7 @@ def get_project(self): raise ValueError("Omics does not support get_project. Use get_project_by_id or get_project_by_name instead.") def get_project_by_name(self, project_name): - ''' Return a dictionary of project to provide project_name and project_id tag info for omics jobs ''' + ''' Return a dictionary of project to provide RunGroupId tag info for omics jobs ''' response = self.api.list_run_groups( name=project_name, maxResults=100 ) @@ -248,17 +251,17 @@ def get_project_by_name(self, project_name): run_group_id = response['items'][0]['id'] project = { - 'ProjectId': run_group_id + 'RunGroupId': run_group_id } return project - else: - logger.error('Could not find project with name: %s', project_name) - return {} + + logger.error('Could not find project with name: %s', project_name) + return {} def get_project_by_id(self, project_id): - ''' Return a dictionary of project to provide project_id tag info for omics jobs''' + ''' Return a dictionary of project to provide RunGroupId tag info for omics jobs''' project = { - 'ProjectId': project_id + 'RunGroupId': project_id } return project @@ -310,17 +313,14 @@ def submit_task(self, name, project, workflow, parameters, execution_settings=No ''' Submit workflow for one sample. name: sample ID. - project: dictionary of {'ProjectName': 'string'} or {'ProjectId': 'string'} + project: dictionary of {'RunGroupId': 'string'} workflow: workflow ID in omics. parameters: dictionary of input parameters. return omics response for start_run. ''' base_output_path = f"s3://{self.output_bucket}/Project/" - if 'ProjectName' in project: - base_output_path += f"{project['ProjectName']}/{workflow}/{name.replace(' ','')}/" - else: - base_output_path += f"{project['ProjectId']}/{workflow}/{name.replace(' ','')}/" + base_output_path += f"{project['RunGroupId']}/{workflow}/{name.replace(' ','')}/" try: logger.debug("Starting run for %s", name) @@ -331,8 +331,8 @@ def submit_task(self, name, project, workflow, parameters, execution_settings=No roleArn=self.role_arn, parameters=parameters, name=name, - runGroupId=project["ProjectId"], - tags={"Project": project["ProjectId"]}, + runGroupId=project["RunGroupId"], + tags={"Project": project["RunGroupId"]}, outputUri=base_output_path, storageType="DYNAMIC") logger.info('Started run for %s, RunID: %s',name,job['id']) @@ -341,10 +341,10 @@ def submit_task(self, name, project, workflow, parameters, execution_settings=No logger.error('Could not start run for %s: %s', name, err) return None - def upload_file(self, filename, project, dest_folder, destination_filename=None, overwrite=False): # pylint: disable=too-many-arguments + def upload_file(self, filename, project, dest_folder=None, destination_filename=None, overwrite=False): # pylint: disable=too-many-arguments self.logger.info("Uploading file %s to project %s", filename, project) target_bucket = self.output_bucket - target_filepath = f"Project/{project['ProjectId']}" + dest_folder + target_filepath = f"Project/{project['RunGroupId']}" + dest_folder if destination_filename: target_filepath += destination_filename else: @@ -354,5 +354,5 @@ def upload_file(self, filename, project, dest_folder, destination_filename=None, file_id = f"s3://{self.output_bucket}/"+target_filepath return file_id except Exception as e: - self.logger.error('Could not upload file %s', filename) + logger.error('Could not upload file %s', filename) return None From fa1b00e63d6fef439973d4d9ba356079a8c276b8 Mon Sep 17 00:00:00 2001 From: cheny252 Date: Mon, 19 May 2025 23:00:20 -0400 Subject: [PATCH 38/43] add check for locations --- src/cwl_platform/omics_platform.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/cwl_platform/omics_platform.py b/src/cwl_platform/omics_platform.py index 01b0f644..3f88f373 100644 --- a/src/cwl_platform/omics_platform.py +++ b/src/cwl_platform/omics_platform.py @@ -212,9 +212,10 @@ def get_task_output(self, task, output_name): if isinstance(all_outputs, list): outputs = [c["location"] for c in all_outputs if "location" in c] return outputs - try: + + if "location" in all_outputs: return all_outputs["location"] - except KeyError: + else: raise KeyError(f"Could not find path for '{output_name}'") def get_task_output_filename(self, task, output_name): From 96b2fc1ffa50cec804857b0536bf38cd98f572fe Mon Sep 17 00:00:00 2001 From: cheny252 Date: Mon, 19 May 2025 23:13:51 -0400 Subject: [PATCH 39/43] add omics unit test --- tests/test_omics_platform.py | 170 ++++++++++++++++++++++++++++++++++- 1 file changed, 167 insertions(+), 3 deletions(-) diff --git a/tests/test_omics_platform.py b/tests/test_omics_platform.py index 60530290..244184e9 100644 --- a/tests/test_omics_platform.py +++ b/tests/test_omics_platform.py @@ -5,19 +5,183 @@ import os import mock from mock import MagicMock, patch +import json from cwl_platform.omics_platform import OmicsPlatform -class TestOmicsPlaform(unittest.TestCase): +class TestOmicsPlatform(unittest.TestCase): ''' Test Class for Omics Platform ''' def setUp(self) -> None: self.platform = OmicsPlatform('Omics') + self.platform.logger = MagicMock() + self.platform.output_bucket = "test-bucket" + self.platform.role_arn = "arn:aws:iam::123456789012:role/test-role" + self.platform.s3_client = MagicMock() + self.platform.api = MagicMock() return super().setUp() - def runTest(self): - pass + def test_copy_folder_returns_source(self): + self.assertEqual( + self.platform.copy_folder("proj1", "s3://bucket/folder/", "proj2"), + "s3://bucket/folder/" + ) + + def test_get_folder_id_trailing_slash(self): + self.assertEqual( + self.platform.get_folder_id("proj", "s3://bucket/folder/"), + "s3://bucket/folder/" + ) + self.assertEqual( + self.platform.get_folder_id("proj", "s3://bucket/folder"), + "s3://bucket/folder/" + ) + + def test_get_file_id(self): + self.assertEqual( + self.platform.get_file_id("proj", "s3://bucket/file.txt"), + "s3://bucket/file.txt" + ) + + def test_get_project_by_id(self): + project_id = "abc123" + result = self.platform.get_project_by_id(project_id) + self.assertEqual(result, {"RunGroupId": project_id}) + + def test_get_project_by_name_found(self): + self.platform.api.list_run_groups.return_value = { + "items": [{"id": "group1"}] + } + result = self.platform.get_project_by_name("testproj") + self.assertEqual(result, {"RunGroupId": "group1"}) + + def test_get_project_by_name_not_found(self): + self.platform.api.list_run_groups.return_value = {"items": []} + with patch("cwl_platform.omics_platform.logger") as mock_logger: + result = self.platform.get_project_by_name("notfound") + self.assertEqual(result, {}) + mock_logger.error.assert_called() + + def test_get_task_state(self): + self.platform.api.get_run.return_value = {"status": "COMPLETED"} + self.assertEqual( + self.platform.get_task_state({"id": "runid"}), "Complete" + ) + self.platform.api.get_run.return_value = {"status": "FAILED"} + self.assertEqual( + self.platform.get_task_state({"id": "runid"}), "Failed" + ) + for status in ["STARTING", "RUNNING", "STOPPING"]: + self.platform.api.get_run.return_value = {"status": status} + self.assertEqual( + self.platform.get_task_state({"id": "runid"}), "Running" + ) + for status in ["CANCELLED", "DELETED"]: + self.platform.api.get_run.return_value = {"status": status} + self.assertEqual( + self.platform.get_task_state({"id": "runid"}), "Cancelled" + ) + self.platform.api.get_run.return_value = {"status": "PENDING"} + self.assertEqual( + self.platform.get_task_state({"id": "runid"}), "Queued" + ) + self.platform.api.get_run.return_value = {"status": "UNKNOWN"} + with self.assertRaises(ValueError): + self.platform.get_task_state({"id": "runid"}) + + def test_get_task_output(self): + # Simulate S3 and API responses + self.platform.api.get_run.return_value = { + "outputUri": "s3://test-bucket/path/" + } + outputs_json = { + "output1": [{"location": "s3://bucket/file1.txt"}, {"location": "s3://bucket/file2.txt"}], + "output2": {"location": "s3://bucket/file3.txt"} + } + s3_response = { + "Body": MagicMock() + } + s3_response["Body"].read.return_value = json.dumps(outputs_json).encode("utf-8") + self.platform.s3_client.get_object.return_value = s3_response + + # List output + result = self.platform.get_task_output({"id": "runid"}, "output1") + self.assertEqual(result, ["s3://bucket/file1.txt", "s3://bucket/file2.txt"]) + # Single output + result = self.platform.get_task_output({"id": "runid"}, "output2") + self.assertEqual(result, "s3://bucket/file3.txt") + # Missing output + with self.assertRaises(KeyError): + self.platform.get_task_output({"id": "runid"}, "missing") + + def test_get_task_output_filename(self): + with patch.object(self.platform, "get_task_output") as mock_get_task_output: + mock_get_task_output.return_value = ["s3://bucket/file1.txt", "s3://bucket/file2.txt"] + result = self.platform.get_task_output_filename({"id": "runid"}, "output1") + self.assertEqual(result, ["file1.txt", "file2.txt"]) + mock_get_task_output.return_value = "s3://bucket/file3.txt" + result = self.platform.get_task_output_filename({"id": "runid"}, "output2") + self.assertEqual(result, "file3.txt") + + def test_get_project_raises(self): + with self.assertRaises(ValueError): + self.platform.get_project() + + def test_get_user_raises(self): + with self.assertRaises(ValueError): + self.platform.get_user("user") + + def test_rename_file_raises(self): + with self.assertRaises(ValueError): + self.platform.rename_file("fileid", "newname") + + def test_roll_file_raises(self): + with self.assertRaises(ValueError): + self.platform.roll_file("proj", "file.txt") + + def test_submit_task_success(self): + self.platform.output_bucket = "bucket" + self.platform.role_arn = "arn:aws:iam::123456789012:role/test-role" + self.platform.api.start_run.return_value = {"id": "runid"} + project = {"RunGroupId": "groupid"} + workflow = "workflowid" + parameters = {"input": "value"} + result = self.platform.submit_task("sample", project, workflow, parameters) + self.assertEqual(result, {"id": "runid"}) + + def test_submit_task_failure(self): + self.platform.output_bucket = "bucket" + self.platform.role_arn = "arn:aws:iam::123456789012:role/test-role" + self.platform.api.start_run.side_effect = Exception("fail") + project = {"RunGroupId": "groupid"} + workflow = "workflowid" + parameters = {"input": "value"} + result = self.platform.submit_task("sample", project, workflow, parameters) + self.assertIsNone(result) + + def test_upload_file_success(self): + self.platform.output_bucket = "bucket" + self.platform.s3_client.upload_file = MagicMock() + project = {"RunGroupId": "groupid"} + filename = "/tmp/file.txt" + dest_folder = "/dest/" + destination_filename = "newfile.txt" + file_id = self.platform.upload_file(filename, project, dest_folder, destination_filename) + self.assertTrue(file_id.startswith("s3://bucket/Project/groupid/dest/newfile.txt")) + + def test_upload_file_failure(self): + self.platform.output_bucket = "bucket" + self.platform.s3_client.upload_file.side_effect = Exception("fail") + project = {"RunGroupId": "groupid"} + filename = "/tmp/file.txt" + dest_folder = "/dest/" + destination_filename = "newfile.txt" + file_id = self.platform.upload_file(filename, project, dest_folder, destination_filename) + self.assertIsNone(file_id) + + def test_detect_classmethod(self): + self.assertFalse(OmicsPlatform.detect()) if __name__ == '__main__': unittest.main() From 591bf661528c57adb1a0e729225608064bbe548f Mon Sep 17 00:00:00 2001 From: cheny252 Date: Mon, 19 May 2025 23:17:09 -0400 Subject: [PATCH 40/43] fix omics unit test --- tests/test_omics_platform.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tests/test_omics_platform.py b/tests/test_omics_platform.py index 244184e9..60c81a4d 100644 --- a/tests/test_omics_platform.py +++ b/tests/test_omics_platform.py @@ -3,8 +3,7 @@ ''' import unittest import os -import mock -from mock import MagicMock, patch +from unittest.mock import MagicMock, patch import json from cwl_platform.omics_platform import OmicsPlatform @@ -157,8 +156,8 @@ def test_submit_task_failure(self): project = {"RunGroupId": "groupid"} workflow = "workflowid" parameters = {"input": "value"} - result = self.platform.submit_task("sample", project, workflow, parameters) - self.assertIsNone(result) + with self.assertRaises(Exception): + self.platform.submit_task("sample", project, workflow, parameters) def test_upload_file_success(self): self.platform.output_bucket = "bucket" From 9471af13070f2219fb4d8018996bc8a18ebc9ba4 Mon Sep 17 00:00:00 2001 From: cheny252 Date: Tue, 17 Jun 2025 11:25:52 -0400 Subject: [PATCH 41/43] add cache --- src/cwl_platform/omics_platform.py | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/src/cwl_platform/omics_platform.py b/src/cwl_platform/omics_platform.py index 3f88f373..c45531dd 100644 --- a/src/cwl_platform/omics_platform.py +++ b/src/cwl_platform/omics_platform.py @@ -323,19 +323,36 @@ def submit_task(self, name, project, workflow, parameters, execution_settings=No base_output_path = f"s3://{self.output_bucket}/Project/" base_output_path += f"{project['RunGroupId']}/{workflow}/{name.replace(' ','')}/" + if "cacheId" in execution_settings: + cacheId = execution_settings["cacheId"] + else: + cacheId = None + try: logger.debug("Starting run for %s", name) - # TODO: The roleArn should be a parameter to this function, and not hard-coded. - # Put this in the pipeline_config.py. - job = self.api.start_run(workflowId=workflow, + if cacheId: + job = self.api.start_run(workflowId=workflow, workflowType='PRIVATE', roleArn=self.role_arn, parameters=parameters, name=name, + cacheId=cacheId, + cacheBehavior='CACHE_ON_FAILURE', runGroupId=project["RunGroupId"], tags={"Project": project["RunGroupId"]}, outputUri=base_output_path, storageType="DYNAMIC") + else: + job = self.api.start_run(workflowId=workflow, + workflowType='PRIVATE', + roleArn=self.role_arn, + parameters=parameters, + name=name, + runGroupId=project["RunGroupId"], + tags={"Project": project["RunGroupId"]}, + outputUri=base_output_path, + storageType="DYNAMIC") + logger.info('Started run for %s, RunID: %s',name,job['id']) return job except botocore.exceptions.ClientError as err: From 9620775db0cb07268c07c3cf92d542958fedf6b1 Mon Sep 17 00:00:00 2001 From: cheny252 Date: Thu, 9 Oct 2025 12:25:18 -0400 Subject: [PATCH 42/43] update omics wes api --- requirements.txt | 1 + src/cwl_platform/omics_platform.py | 537 +++++++++++++++++++++++------ 2 files changed, 431 insertions(+), 107 deletions(-) diff --git a/requirements.txt b/requirements.txt index cf427734..0115981e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,3 +4,4 @@ smart_open arvados-python-client>=3.0.0,<3.2.0 sevenbridges-python tenacity +httpx diff --git a/src/cwl_platform/omics_platform.py b/src/cwl_platform/omics_platform.py index c45531dd..193b8943 100644 --- a/src/cwl_platform/omics_platform.py +++ b/src/cwl_platform/omics_platform.py @@ -6,6 +6,7 @@ import json import boto3 import botocore +import httpx from tenacity import retry, wait_fixed, stop_after_attempt @@ -20,6 +21,8 @@ def __init__(self, name): self.api = None self.output_bucket = None self.role_arn = None + self.wes_client = None + self.wes_url = None def _list_file_in_s3(self, s3path): ''' @@ -40,7 +43,7 @@ def _list_file_in_s3(self, s3path): return files def connect(self, **kwargs): - ''' + ''' Connect to AWS Omics platform If ~/.aws/credentials or ~/.aws/config does not provide a region, region @@ -50,6 +53,11 @@ def connect(self, **kwargs): self.output_bucket = kwargs['output_bucket'] self.role_arn = kwargs['role_arn'] self.s3_client = boto3.client('s3') + + # WES API connection parameters + self.wes_url = kwargs.get('wes_url', 'http://localhost:8000/ga4gh/wes/v1') + self.wes_username = kwargs.get('wes_username') + self.wes_password = kwargs.get('wes_password') def copy_folder(self, source_project, source_folder, destination_project): ''' @@ -116,8 +124,31 @@ def delete_project_by_name(self, project_name): return def delete_task(self, task): - ''' Delete a task/workflow/process ''' - self.logger.info('TBD: Deleting task %s', task) + ''' + Cancel a workflow run via WES API + task: A dictionary containing the run information, including the WES run ID + ''' + # Set up auth if provided + auth = None + if self.wes_username and self.wes_password: + auth = (self.wes_username, self.wes_password) + + # Extract the WES run ID from the task dictionary + run_id = task.get('id') + if not run_id: + logger.error("No run ID found in task object") + return False + + try: + # Cancel the run via WES API + cancel_url = f"{self.wes_url}/runs/{run_id}/cancel" + response = httpx.post(cancel_url, auth=auth, timeout=30.0) + response.raise_for_status() + logger.info(f"Successfully cancelled run {run_id}") + return True + except Exception as e: + logger.error(f"Error cancelling run {run_id}: {e}") + return False @classmethod def detect(cls): @@ -163,79 +194,338 @@ def get_folder_id(self, project, folder_path): return folder_path + "/" def get_task_input(self, task, input_name): - ''' Retrieve the input field of the task ''' - self.logger.info("TBD: Getting input for task %s", task) - return None + ''' + Retrieve the input field of the task using WES API + task: A dictionary containing the run information, including the WES run ID + input_name: Name of the input to retrieve + ''' + # Set up auth if provided + auth = None + if self.wes_username and self.wes_password: + auth = (self.wes_username, self.wes_password) + + # Extract the WES run ID from the task dictionary + run_id = task.get('id') + if not run_id: + logger.error("No run ID found in task object") + return None + + # Check if parameters are already in the task object + if task.get('parameters') and input_name in task['parameters']: + return task['parameters'][input_name] + + try: + # Get full run details from WES API + run_details_url = f"{self.wes_url}/runs/{run_id}" + response = httpx.get(run_details_url, auth=auth, timeout=30.0) + response.raise_for_status() + run_details = response.json() + + # Check if request and workflow_params are available + if not run_details.get('request') or not run_details['request'].get('workflow_params'): + logger.warning(f"No workflow parameters found for run {run_id}") + return None + + # Parse workflow parameters + try: + params = json.loads(run_details['request']['workflow_params']) + + # Check if the specific input is available + if input_name in params: + return params[input_name] + else: + logger.warning(f"Input '{input_name}' not found in run {run_id}") + return None + + except json.JSONDecodeError: + logger.error(f"Could not parse workflow parameters for run {run_id}") + return None + + except Exception as e: + logger.error(f"Error getting input {input_name} for run {run_id}: {e}") + return None def get_task_outputs(self, task): - self.logger.info('TBD: Getting task outputs %s', task) - return [] + ''' + Get all outputs for a task using WES API + task: A dictionary containing the run information, including the WES run ID + ''' + # Set up auth if provided + auth = None + if self.wes_username and self.wes_password: + auth = (self.wes_username, self.wes_password) + + # Extract the WES run ID from the task dictionary + run_id = task.get('id') + if not run_id: + logger.error("No run ID found in task object") + return [] + + try: + # Get full run details from WES API + run_details_url = f"{self.wes_url}/runs/{run_id}" + response = httpx.get(run_details_url, auth=auth, timeout=30.0) + response.raise_for_status() + run_details = response.json() + if not run_details.get('outputs'): + logger.warning(f"No outputs available for run {run_id}") + return [] + + task_outputUri = run_details['outputs'].get('output_location') + if not task_outputUri: + logger.warning(f"No output_location found for run {run_id}") + return [] + + output_json = task_outputUri.split(self.output_bucket+'/')[1] + run_id + "/logs/outputs.json" + response = self.s3_client.get_object(Bucket=self.output_bucket, Key=output_json) + content = response['Body'].read().decode("utf-8") + mapping = json.loads(content) + + return list(mapping.keys()) + + except Exception as e: + logger.error(f"Error getting outputs for run {run_id}: {e}") + return [] def get_task_state(self, task, refresh=False): - ''' - Get status of run by task_id. - task: A dictionary of omics response from start_run. Includes Run ID, Name, Tags, etc. + ''' + Get status of run by task_id using WES API. + task: A dictionary containing the run information, including the WES run ID. return status of the run (Complete, Failed, Running, Cancelled, Queued). ''' - + # Set up auth if provided + auth = None + if self.wes_username and self.wes_password: + auth = (self.wes_username, self.wes_password) + + # Extract the WES run ID from the task dictionary + run_id = task.get('id') + if not run_id: + logger.error("No run ID found in task object") + raise ValueError("No run ID found in task object") + try: - run_info = self.api.get_run(id=task['id']) - job_status = run_info['status'] - except: - raise ValueError('No Status information found for job %s. Check job status.', task['id']) - - if job_status == 'COMPLETED': - return 'Complete' - if job_status == 'FAILED': - return 'Failed' - if job_status in ['STARTING','RUNNING','STOPPING']: - return 'Running' - if job_status in ['CANCELLED','DELETED']: - return 'Cancelled' - if job_status == 'PENDING': + # Get run status from WES API + run_status_url = f"{self.wes_url}/runs/{run_id}/status" + response = httpx.get(run_status_url, auth=auth, timeout=30.0) + response.raise_for_status() + result = response.json() + + # Map WES state to PAML state + wes_state = result.get('state') + logger.debug(f"WES state for run {run_id}: {wes_state}") + + + logging.info('check state results') + logging.info(task) + logging.info(wes_state) + a=input() + + if wes_state == 'COMPLETE': + return 'Complete' + if wes_state == 'EXECUTOR_ERROR' or wes_state == 'SYSTEM_ERROR': + return 'Failed' + if wes_state == 'RUNNING' or wes_state == 'INITIALIZING': + return 'Running' + if wes_state == 'CANCELED': + return 'Cancelled' + if wes_state == 'QUEUED' or wes_state == 'PAUSED': + return 'Queued' + + # Default to queued if we don't recognize the state + logger.warning(f"Unknown WES state: {wes_state} for run {run_id}") return 'Queued' - - raise ValueError('Unknown task state: %s : %s', task['id'], job_status) + + except Exception as e: + logger.error(f"Error getting status for run {run_id}: {e}") + raise ValueError(f'No status information found for job {run_id}. Check job status.') def get_task_output(self, task, output_name): - ''' Retrieve the output field of the task ''' - # Get output file mapping from outputs.json - taskinfo = self.api.get_run(id=task["id"]) - output_json = taskinfo['outputUri'].split(self.output_bucket+'/')[1] + task["id"] + "/logs/outputs.json" - response = self.s3_client.get_object(Bucket=self.output_bucket, Key=output_json) - content = response['Body'].read().decode("utf-8") - mapping = json.loads(content) - - if output_name not in mapping: - raise KeyError(f"Output field '{output_name}' not found in output json file.") - all_outputs = mapping[output_name] - if isinstance(all_outputs, list): - outputs = [c["location"] for c in all_outputs if "location" in c] - return outputs - - if "location" in all_outputs: - return all_outputs["location"] - else: - raise KeyError(f"Could not find path for '{output_name}'") + ''' + Retrieve the output field of the task using WES API + task: A dictionary containing the run information, including the WES run ID + output_name: Name of the output to retrieve + ''' + # Set up auth if provided + auth = None + if self.wes_username and self.wes_password: + auth = (self.wes_username, self.wes_password) + + # Extract the WES run ID from the task dictionary + run_id = task.get('id') + if not run_id: + logger.error("No run ID found in task object") + raise KeyError("No run ID found in task object") + + try: + # Get full run details from WES API + run_details_url = f"{self.wes_url}/runs/{run_id}" + response = httpx.get(run_details_url, auth=auth, timeout=30.0) + response.raise_for_status() + run_details = response.json() + if not run_details.get('outputs'): + raise KeyError(f"No outputs available for run {run_id}") + + task_outputUri = run_details['outputs'].get('output_location') + if not task_outputUri: + raise KeyError(f"No output_location found for run {run_id}") + + output_json = task_outputUri.split(self.output_bucket+'/')[1] + run_id + "/logs/outputs.json" + response = self.s3_client.get_object(Bucket=self.output_bucket, Key=output_json) + content = response['Body'].read().decode("utf-8") + mapping = json.loads(content) + + if output_name not in mapping: + raise KeyError(f"Output field '{output_name}' not found in output json file.") + all_outputs = mapping[output_name] + if isinstance(all_outputs, list): + outputs = [c["location"] for c in all_outputs if "location" in c] + return outputs + + if "location" in all_outputs: + return all_outputs["location"] + else: + raise KeyError(f"Could not find path for '{output_name}'") + + except Exception as e: + logger.error(f"Error getting output {output_name} for run {run_id}: {e}") + raise KeyError(f"Could not retrieve output '{output_name}' for run {run_id}: {str(e)}") def get_task_output_filename(self, task, output_name): - ''' Retrieve the output field of the task and return filename''' - self.logger.info("TBD: Getting output filename for task %s", task) + ''' + Retrieve the output field of the task and return filename + task: A dictionary containing the run information, including the WES run ID + output_name: Name of the output to retrieve + ''' task_output_url = self.get_task_output(task, output_name) + if isinstance(task_output_url, list): + # Handle list of file URLs task_output_name = [fileurl.split('/')[-1] for fileurl in task_output_url] - else: + elif isinstance(task_output_url, str): + # Handle single file URL task_output_name = task_output_url.split('/')[-1] + else: + # Handle other types of outputs + run_id = task.get('id') + logger.warning(f"Output {output_name} for run {run_id} is not a file URL: {task_output_url}") + task_output_name = str(task_output_url) + return task_output_name - def get_tasks_by_name(self, project, task_name=None): - ''' Get a tasks by its name ''' - tasks = [] - runs = self.api.list_runs(name=task_name, runGroupId=project['RunGroupId']) - for item in runs['items']: - run = self.api.get_run(id=item['id']) - tasks.append(run) - return tasks + def get_tasks_by_name(self, + project, + task_name=None, + inputs_to_compare=None, + tasks=None): + ''' + Get all processes/tasks in a project with a specified name, or all tasks + if no name is specified. Optionally, compare task inputs to ensure + equivalency (eg for reuse). + :param project: The project to search (run group ID) + :param task_name: The name of the process to search for (if None return all tasks) + :param inputs_to_compare: Inputs to compare to ensure task equivalency + :param tasks: List of tasks to search in (if None, query all tasks in project) + :return: List of task dictionaries with run information + ''' + # Set up auth if provided + auth = None + if self.wes_username and self.wes_password: + auth = (self.wes_username, self.wes_password) + + matching_tasks = [] + + try: + # If tasks is not provided, query tasks from WES API + if tasks is None: + # Get list of runs from WES API + list_runs_url = f"{self.wes_url}/runs" + response = httpx.get(list_runs_url, auth=auth, timeout=30.0) + response.raise_for_status() + result = response.json() + + # Extract the list of runs + runs = result.get('runs', []) + tasks = [] + + # Filter by project tag if project is provided + if project: + for run in runs: + if run.get('tags') and run['tags'].get('Project') == project: + # If task_name is provided, filter by name + if task_name is None or run.get('name') == task_name: + # Create a task dictionary with the necessary information + task = { + "id": run.get('run_id'), + "name": run.get('name'), + "status": run.get('state'), + "project": project + } + tasks.append(task) + else: + # Create task dictionaries for all runs + for run in runs: + task = { + "id": run.get('run_id'), + "name": run.get('name'), + "status": run.get('state') + } + tasks.append(task) + else: + if task_name is not None: + tasks = [task for task in tasks if task.get('name',None) == task_name] + + # If inputs_to_compare is provided, we need to get full run details for each task + if inputs_to_compare: + for task in tasks: + task_id = task.get('id') + if not task_id: + continue + + # Get full run details to check inputs + try: + run_details_url = f"{self.wes_url}/runs/{task_id}" + details_response = httpx.get(run_details_url, auth=auth, timeout=30.0) + details_response.raise_for_status() + details = details_response.json() + + # Check if name matches + if task_name is not None and details.get('name') != task_name: + continue + + # Check if inputs match + if details.get('request') and details['request'].get('workflow_params'): + # Parse workflow params + try: + params = json.loads(details['request']['workflow_params']) + all_inputs_match = True + + for input_name, input_value in inputs_to_compare.items(): + if input_name not in params: + all_inputs_match = False + break + + if params[input_name] != input_value: + all_inputs_match = False + break + + if all_inputs_match: + # Update task with parameters + task['parameters'] = params + matching_tasks.append(task) + except json.JSONDecodeError: + logger.warning(f"Could not parse workflow params for run {task_id}") + except Exception as e: + logger.warning(f"Error getting details for run {task_id}: {e}") + else: + # If no inputs to compare, just return the tasks + matching_tasks = tasks + + return matching_tasks + + except Exception as e: + logger.error(f"Error getting tasks by name: {e}") + return [] def get_project(self): ''' @@ -244,27 +534,20 @@ def get_project(self): raise ValueError("Omics does not support get_project. Use get_project_by_id or get_project_by_name instead.") def get_project_by_name(self, project_name): - ''' Return a dictionary of project to provide RunGroupId tag info for omics jobs ''' + ''' Return the run group ID as the project identifier ''' response = self.api.list_run_groups( name=project_name, maxResults=100 ) if len(response['items'])>0: run_group_id = response['items'][0]['id'] - - project = { - 'RunGroupId': run_group_id - } - return project + return run_group_id logger.error('Could not find project with name: %s', project_name) - return {} + return None def get_project_by_id(self, project_id): - ''' Return a dictionary of project to provide RunGroupId tag info for omics jobs''' - project = { - 'RunGroupId': project_id - } - return project + ''' Return the project ID directly as the run group ID ''' + return project_id def get_project_cost(self, project): ''' Return project cost ''' @@ -312,57 +595,97 @@ def stage_task_output(self, task, project, output_to_export, output_directory_na @retry(wait=wait_fixed(1), stop=stop_after_attempt(3)) def submit_task(self, name, project, workflow, parameters, execution_settings=None): ''' - Submit workflow for one sample. + Submit workflow for one sample using GA4GH WES API. name: sample ID. - project: dictionary of {'RunGroupId': 'string'} + project: string containing the run group ID workflow: workflow ID in omics. parameters: dictionary of input parameters. - return omics response for start_run. + return: Dictionary containing run information including ID and name ''' base_output_path = f"s3://{self.output_bucket}/Project/" - base_output_path += f"{project['RunGroupId']}/{workflow}/{name.replace(' ','')}/" - - if "cacheId" in execution_settings: - cacheId = execution_settings["cacheId"] - else: - cacheId = None - + base_output_path += f"{project}/{workflow}/{name.replace(' ','')}/" + + # Prepare workflow engine parameters + workflow_engine_params = { + "roleArn": self.role_arn, + "runGroupId": project, + "name": name, + "outputUri": base_output_path, + "storageType": "DYNAMIC" + } + + # Add cache settings if provided + if execution_settings and "cacheId" in execution_settings: + workflow_engine_params["cacheId"] = execution_settings["cacheId"] + #workflow_engine_params["cacheBehavior"] = "CACHE_ON_FAILURE" + workflow_engine_params["cacheBehavior"] = "CACHE_ALWAYS" + + # Prepare tags + tags = {"Project": project} + try: - logger.debug("Starting run for %s", name) - if cacheId: - job = self.api.start_run(workflowId=workflow, - workflowType='PRIVATE', - roleArn=self.role_arn, - parameters=parameters, - name=name, - cacheId=cacheId, - cacheBehavior='CACHE_ON_FAILURE', - runGroupId=project["RunGroupId"], - tags={"Project": project["RunGroupId"]}, - outputUri=base_output_path, - storageType="DYNAMIC") - else: - job = self.api.start_run(workflowId=workflow, - workflowType='PRIVATE', - roleArn=self.role_arn, - parameters=parameters, - name=name, - runGroupId=project["RunGroupId"], - tags={"Project": project["RunGroupId"]}, - outputUri=base_output_path, - storageType="DYNAMIC") - - logger.info('Started run for %s, RunID: %s',name,job['id']) + logger.debug("Starting run for %s via WES API", name) + + # Create WES API request + url = f"{self.wes_url}/runs" + + # Prepare request data + data = { + "workflow_url": f"omics:{workflow}", + "workflow_type": "CWL", # Adjust as needed based on your workflow type + "workflow_type_version": "v1.0", + "workflow_params": json.dumps(parameters), + "workflow_engine_parameters": json.dumps(workflow_engine_params), + "tags": json.dumps(tags), + "name": name + } + + # Set up auth if provided + auth = None + if self.wes_username and self.wes_password: + auth = (self.wes_username, self.wes_password) + + # Submit the workflow + response = httpx.post( + url, + data=data, + auth=auth, + timeout=30.0 + ) + response.raise_for_status() + + # Parse response + result = response.json() + run_id = result.get("run_id") + + logger.info('Started run for %s, WES RunID: %s', name, run_id) + + # Create a job object with the necessary information + job = { + "id": run_id, + "name": name, + "status": "PENDING", + "project": project, + "workflow": workflow, + "parameters": parameters, + "outputUri": base_output_path + } + return job - except botocore.exceptions.ClientError as err: + + except httpx.HTTPStatusError as err: + logger.error('Could not start run for %s: HTTP error %s - %s', + name, err.response.status_code, err.response.text) + return None + except Exception as err: logger.error('Could not start run for %s: %s', name, err) return None def upload_file(self, filename, project, dest_folder=None, destination_filename=None, overwrite=False): # pylint: disable=too-many-arguments self.logger.info("Uploading file %s to project %s", filename, project) target_bucket = self.output_bucket - target_filepath = f"Project/{project['RunGroupId']}" + dest_folder + target_filepath = f"Project/{project}" + dest_folder if destination_filename: target_filepath += destination_filename else: From bbd59d778f9ab99acd7265dc4c6659c689356027 Mon Sep 17 00:00:00 2001 From: cheny252 Date: Thu, 9 Oct 2025 14:07:56 -0400 Subject: [PATCH 43/43] get api username password from environment --- src/cwl_platform/omics_platform.py | 23 +++++++++-------------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/src/cwl_platform/omics_platform.py b/src/cwl_platform/omics_platform.py index 193b8943..0a00ae74 100644 --- a/src/cwl_platform/omics_platform.py +++ b/src/cwl_platform/omics_platform.py @@ -7,6 +7,7 @@ import boto3 import botocore import httpx +import os from tenacity import retry, wait_fixed, stop_after_attempt @@ -55,9 +56,9 @@ def connect(self, **kwargs): self.s3_client = boto3.client('s3') # WES API connection parameters - self.wes_url = kwargs.get('wes_url', 'http://localhost:8000/ga4gh/wes/v1') - self.wes_username = kwargs.get('wes_username') - self.wes_password = kwargs.get('wes_password') + self.wes_url = kwargs.get('wes_url', os.getenv('WES_URL')) + self.wes_username = kwargs.get('wes_username',os.getenv('WES_USERNAME')) + self.wes_password = kwargs.get('wes_password',os.getenv('WES_PASSWORD')) def copy_folder(self, source_project, source_folder, destination_project): ''' @@ -276,7 +277,7 @@ def get_task_outputs(self, task): logger.warning(f"No output_location found for run {run_id}") return [] - output_json = task_outputUri.split(self.output_bucket+'/')[1] + run_id + "/logs/outputs.json" + output_json = task_outputUri.split(self.output_bucket+'/')[1] + "logs/outputs.json" response = self.s3_client.get_object(Bucket=self.output_bucket, Key=output_json) content = response['Body'].read().decode("utf-8") mapping = json.loads(content) @@ -315,12 +316,6 @@ def get_task_state(self, task, refresh=False): wes_state = result.get('state') logger.debug(f"WES state for run {run_id}: {wes_state}") - - logging.info('check state results') - logging.info(task) - logging.info(wes_state) - a=input() - if wes_state == 'COMPLETE': return 'Complete' if wes_state == 'EXECUTOR_ERROR' or wes_state == 'SYSTEM_ERROR': @@ -350,27 +345,27 @@ def get_task_output(self, task, output_name): auth = None if self.wes_username and self.wes_password: auth = (self.wes_username, self.wes_password) - + # Extract the WES run ID from the task dictionary run_id = task.get('id') if not run_id: logger.error("No run ID found in task object") raise KeyError("No run ID found in task object") - try: # Get full run details from WES API run_details_url = f"{self.wes_url}/runs/{run_id}" response = httpx.get(run_details_url, auth=auth, timeout=30.0) response.raise_for_status() run_details = response.json() + if not run_details.get('outputs'): raise KeyError(f"No outputs available for run {run_id}") task_outputUri = run_details['outputs'].get('output_location') if not task_outputUri: raise KeyError(f"No output_location found for run {run_id}") - - output_json = task_outputUri.split(self.output_bucket+'/')[1] + run_id + "/logs/outputs.json" + + output_json = task_outputUri.split(self.output_bucket+'/')[1] + "logs/outputs.json" response = self.s3_client.get_object(Bucket=self.output_bucket, Key=output_json) content = response['Body'].read().decode("utf-8") mapping = json.loads(content)