From 8a11fbdaca517bb7a2e6deeec169421549aa5295 Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Wed, 7 May 2025 10:38:30 -0400 Subject: [PATCH 01/38] Create GA4GH WES Implementation of PAML::Platform class using Claude 3.7 --- docs/wes_platform.md | 132 +++++++ examples/wes_platform_example.py | 100 +++++ src/cwl_platform/__init__.py | 4 +- src/cwl_platform/wes_platform.py | 615 +++++++++++++++++++++++++++++++ tests/test_wes_platform.py | 293 +++++++++++++++ 5 files changed, 1143 insertions(+), 1 deletion(-) create mode 100644 docs/wes_platform.md create mode 100644 examples/wes_platform_example.py create mode 100644 src/cwl_platform/wes_platform.py create mode 100644 tests/test_wes_platform.py diff --git a/docs/wes_platform.md b/docs/wes_platform.md new file mode 100644 index 00000000..fae8f17d --- /dev/null +++ b/docs/wes_platform.md @@ -0,0 +1,132 @@ +# GA4GH WES Platform Implementation + +This document describes the GA4GH Workflow Execution Service (WES) platform implementation for the PAML library. + +## Overview + +The GA4GH WES API provides a standard way to submit and manage workflows across different workflow execution systems. This implementation allows you to use the PAML library to submit workflows to any WES-compatible service. + +The WES platform implementation (`WESPlatform`) inherits from the `Platform` abstract base class and implements all the required methods to interact with a WES API endpoint. + +## Features + +- Connect to any WES API endpoint +- Submit CWL workflows +- Monitor workflow execution status +- Retrieve workflow outputs + +## Limitations + +Since the WES API is focused on workflow execution and doesn't include concepts like projects, folders, or file management, some features of the Platform API are not fully supported: + +- File operations (upload, download, copy) are limited +- Project management is simulated using virtual projects +- User management is not supported + +## Usage + +### Initialization and Connection + +```python +from src.cwl_platform import PlatformFactory + +# Initialize the platform factory +factory = PlatformFactory() + +# Get the WES platform +platform = factory.get_platform('WES') + +# Connect to the WES API +platform.connect( + api_endpoint="https://wes.example.com/ga4gh/wes/v1", + auth_token="your_auth_token" # Optional +) +``` + +### Submitting a Workflow + +```python +# Create a virtual project (not used by WES but required by the API) +project = platform.create_project('wes-example', 'WES Example Project') + +# Define workflow parameters +parameters = { + "input_file": { + "class": "File", + "path": "https://example.com/input.txt" + }, + "output_filename": "output.txt" +} + +# Submit the workflow +task = platform.submit_task( + name="My Workflow", + project=project, + workflow="https://example.com/workflow.cwl", # URL or local file path + parameters=parameters +) + +# Get the run ID +run_id = task.run_id +``` + +### Monitoring Workflow Execution + +```python +# Check the workflow state +state = platform.get_task_state(task) +print(f"Workflow state: {state}") + +# Refresh the state from the server +state = platform.get_task_state(task, refresh=True) +print(f"Updated workflow state: {state}") + +# Monitor until completion +import time +while True: + state = platform.get_task_state(task, refresh=True) + print(f"Workflow state: {state}") + + if state in ['Complete', 'Failed', 'Cancelled']: + break + + time.sleep(10) # Check every 10 seconds +``` + +### Retrieving Workflow Outputs + +```python +# Get all outputs +outputs = platform.get_task_outputs(task) +print(f"Outputs: {outputs}") + +# Get a specific output +output_file = platform.get_task_output(task, "output_file") +print(f"Output file: {output_file}") +``` + +## Example Script + +See the `examples/wes_platform_example.py` script for a complete example of using the WES platform implementation. + +## Configuration + +The WES platform can be configured using the following environment variables: + +- `WES_API_ENDPOINT`: The URL of the WES API endpoint +- `WES_AUTH_TOKEN`: Authentication token for the WES API + +## Supported WES Implementations + +This implementation should work with any WES-compatible service, including: + +- [Cromwell](https://github.com/broadinstitute/cromwell) +- [Toil](https://github.com/DataBiosphere/toil) +- [WES-ELIXIR](https://github.com/elixir-cloud-aai/cwl-WES) +- [TESK](https://github.com/EMBL-EBI-TSI/TESK) +- [DNAstack WES](https://docs.dnastack.com/docs/workflow-execution-service-wes) + +## References + +- [GA4GH WES API Specification](https://github.com/ga4gh/workflow-execution-service-schemas) +- [WES API Documentation](https://ga4gh.github.io/workflow-execution-service-schemas/) \ No newline at end of file diff --git a/examples/wes_platform_example.py b/examples/wes_platform_example.py new file mode 100644 index 00000000..01f6b683 --- /dev/null +++ b/examples/wes_platform_example.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python3 +''' +Example script demonstrating how to use the WES Platform implementation +''' +import os +import sys +import time +import logging +import argparse + +# Add the src directory to the Python path +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +from src.cwl_platform import PlatformFactory + +def main(): + ''' + Main function + ''' + # Set up logging + logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') + logger = logging.getLogger(__name__) + + # Parse command line arguments + parser = argparse.ArgumentParser(description='WES Platform Example') + parser.add_argument('--api-endpoint', required=True, help='WES API endpoint URL') + parser.add_argument('--auth-token', help='Authentication token for the WES API') + parser.add_argument('--workflow', required=True, help='Path to workflow file or URL') + parser.add_argument('--params', required=True, help='Path to workflow parameters JSON file') + args = parser.parse_args() + + # Initialize the platform factory + factory = PlatformFactory() + + try: + # Get the WES platform + platform = factory.get_platform('WES') + logger.info("Initialized WES platform") + + # Connect to the WES API + connected = platform.connect( + api_endpoint=args.api_endpoint, + auth_token=args.auth_token + ) + + if not connected: + logger.error("Failed to connect to WES API") + return 1 + + logger.info("Connected to WES API") + + # Create a virtual project (not used by WES but required by the API) + project = platform.create_project('wes-example', 'WES Example Project') + logger.info("Created virtual project: %s", project['name']) + + # Load workflow parameters + with open(args.params, 'r') as f: + import json + parameters = json.load(f) + + # Submit the workflow + task = platform.submit_task( + name="WES Example Task", + project=project, + workflow=args.workflow, + parameters=parameters + ) + + if not task: + logger.error("Failed to submit workflow") + return 1 + + logger.info("Submitted workflow with run ID: %s", task.run_id) + + # Monitor the workflow execution + while True: + state = platform.get_task_state(task, refresh=True) + logger.info("Workflow state: %s", state) + + if state in ['Complete', 'Failed', 'Cancelled']: + break + + time.sleep(10) # Check every 10 seconds + + # Get the workflow outputs + if state == 'Complete': + outputs = platform.get_task_outputs(task) + logger.info("Workflow completed successfully") + logger.info("Outputs: %s", outputs) + return 0 + else: + logger.error("Workflow failed or was cancelled") + return 1 + + except Exception as e: + logger.exception("Error: %s", e) + return 1 + +if __name__ == '__main__': + sys.exit(main()) \ No newline at end of file diff --git a/src/cwl_platform/__init__.py b/src/cwl_platform/__init__.py index 25fc722f..a7d1b0d3 100644 --- a/src/cwl_platform/__init__.py +++ b/src/cwl_platform/__init__.py @@ -6,13 +6,15 @@ from .arvados_platform import ArvadosPlatform from .sevenbridges_platform import SevenBridgesPlatform +from .wes_platform import WESPlatform #from .omics_platform import OmicsPlatform # Move this for a config file SUPPORTED_PLATFORMS = { 'Arvados': ArvadosPlatform, # 'Omics': OmicsPlatform, - 'SevenBridges': SevenBridgesPlatform + 'SevenBridges': SevenBridgesPlatform, + 'WES': WESPlatform } class PlatformFactory(): diff --git a/src/cwl_platform/wes_platform.py b/src/cwl_platform/wes_platform.py new file mode 100644 index 00000000..97583594 --- /dev/null +++ b/src/cwl_platform/wes_platform.py @@ -0,0 +1,615 @@ +''' +GA4GH WES Platform class + +This module implements the Platform abstract base class using the GA4GH Workflow Execution Service (WES) API. +The WES API provides a standard way to submit and manage workflows across different workflow execution systems. +''' +import os +import json +import logging +import time +import uuid +import requests +from urllib.parse import urljoin + +from .base_platform import Platform + +class WESTask: + ''' + WES Task class to encapsulate task functionality + ''' + def __init__(self, run_id, name, state=None, outputs=None, inputs=None): + self.run_id = run_id + self.name = name + self.state = state + self.outputs = outputs or {} + self.inputs = inputs or {} + + def to_dict(self): + ''' Convert to dictionary ''' + return { + 'run_id': self.run_id, + 'name': self.name, + 'state': self.state, + 'outputs': self.outputs, + 'inputs': self.inputs + } + + @classmethod + def from_dict(cls, task_dict): + ''' Convert from dictionary ''' + return cls( + task_dict['run_id'], + task_dict['name'], + task_dict.get('state'), + task_dict.get('outputs'), + task_dict.get('inputs') + ) + +class WESPlatform(Platform): + ''' GA4GH WES Platform class ''' + + # WES API state mapping to Platform states + STATE_MAP = { + 'UNKNOWN': 'Unknown', + 'QUEUED': 'Queued', + 'INITIALIZING': 'Queued', + 'RUNNING': 'Running', + 'PAUSED': 'Running', + 'COMPLETE': 'Complete', + 'EXECUTOR_ERROR': 'Failed', + 'SYSTEM_ERROR': 'Failed', + 'CANCELED': 'Cancelled', + 'CANCELING': 'Cancelled' + } + + def __init__(self, name): + ''' + Initialize WES Platform + ''' + super().__init__(name) + self.logger = logging.getLogger(__name__) + self.api_endpoint = None + self.auth_token = None + self.projects = {} # Map project names to project objects + self.workflows = {} # Map workflow names to workflow objects + self.files = {} # Map file paths to file objects + + def connect(self, **kwargs): + ''' + Connect to the WES API + + :param kwargs: Connection parameters + - api_endpoint: WES API endpoint URL + - auth_token: Authentication token for the WES API + ''' + self.api_endpoint = kwargs.get('api_endpoint') + self.auth_token = kwargs.get('auth_token') + + if not self.api_endpoint: + raise ValueError("WES API endpoint URL is required") + + # Test connection by getting service info + try: + response = self._make_request('GET', 'service-info') + self.logger.info(f"Connected to WES API: {response.get('workflow_type_versions', {})}") + self.connected = True + return True + except Exception as e: + self.logger.error(f"Failed to connect to WES API: {e}") + self.connected = False + return False + + def _make_request(self, method, path, data=None, files=None, params=None): + ''' + Make a request to the WES API + + :param method: HTTP method (GET, POST, etc.) + :param path: API path + :param data: Request data + :param files: Files to upload + :param params: Query parameters + :return: Response JSON + ''' + url = urljoin(self.api_endpoint, path) + headers = {} + + if self.auth_token: + headers['Authorization'] = f'Bearer {self.auth_token}' + + response = requests.request( + method=method, + url=url, + headers=headers, + json=data, + files=files, + params=params + ) + + response.raise_for_status() + + if response.content: + return response.json() + return {} + + # File methods + def copy_folder(self, source_project, source_folder, destination_project): + ''' + Copy source folder to destination project + + Note: WES API doesn't have a direct concept of folders, so this is a no-op + ''' + self.logger.warning("WES API doesn't support folder operations directly") + return None + + def download_file(self, file, dest_folder): + """ + Download a file to a local directory + + :param file: File ID to download + :param dest_folder: Destination folder to download file to + :return: Name of local file downloaded or None + """ + if not file or not dest_folder: + return None + + # In WES context, file might be a URL + if file.startswith('http'): + filename = os.path.basename(file) + dest_path = os.path.join(dest_folder, filename) + + response = requests.get(file, stream=True) + response.raise_for_status() + + with open(dest_path, 'wb') as f: + for chunk in response.iter_content(chunk_size=8192): + f.write(chunk) + + return dest_path + else: + self.logger.error(f"Unsupported file format: {file}") + return None + + def export_file(self, file, bucket_name, prefix): + """ + Use platform specific functionality to copy a file from a platform to an S3 bucket. + + :param file: File to export + :param bucket_name: S3 bucket name + :param prefix: Destination S3 folder to export file to, path/to/folder + :return: s3 file path or None + """ + self.logger.warning("WES API doesn't support direct S3 export") + return None + + def get_file_id(self, project, file_path): + ''' + Get a file id by its full path name + + Note: WES API doesn't have a direct concept of files, so this returns the path + ''' + return file_path + + def get_files(self, project, filters=None): + """ + Retrieve files in a project matching the filter criteria + + Note: WES API doesn't have a direct concept of files, so this returns an empty list + + :param project: Project to search for files + :param filters: Dictionary containing filter criteria + :return: List of tuples (file path, file object) matching filter criteria + """ + self.logger.warning("WES API doesn't support listing files") + return [] + + def get_folder_id(self, project, folder_path): + ''' + Get a folder id by its full path name + + Note: WES API doesn't have a direct concept of folders, so this returns None + ''' + self.logger.warning("WES API doesn't support folder operations") + return None + + def rename_file(self, fileid, new_filename): + ''' + Rename a file to new_filename. + + Note: WES API doesn't have a direct concept of files, so this is a no-op + ''' + self.logger.warning("WES API doesn't support file operations") + return None + + def roll_file(self, project, file_name): + ''' + Roll (find and rename) a file in a project. + + Note: WES API doesn't have a direct concept of files, so this is a no-op + ''' + self.logger.warning("WES API doesn't support file operations") + return None + + def stage_output_files(self, project, output_files): + ''' + Stage output files to a project + + Note: WES API doesn't have a direct concept of files, so this is a no-op + ''' + self.logger.warning("WES API doesn't support file staging operations") + return None + + def upload_file(self, filename, project, dest_folder=None, destination_filename=None, overwrite=False): + ''' + Upload a local file to project + + Note: WES API doesn't have a direct concept of files, so this returns the filename + + :param filename: filename of local file to be uploaded. + :param project: project that the file is uploaded to. + :param dest_folder: The target path to the folder that file will be uploaded to. None will upload to root. + :param destination_filename: File name after uploaded to destination folder. + :param overwrite: Overwrite the file if it already exists. + :return: ID of uploaded file. + ''' + self.logger.warning("WES API doesn't support file upload operations directly") + return filename + + # Task/Workflow methods + def copy_workflow(self, src_workflow, destination_project): + ''' + Copy a workflow from one project to another + + Note: WES API doesn't have a direct concept of projects, so this returns the workflow + ''' + return src_workflow + + def copy_workflows(self, reference_project, destination_project): + ''' + Copy all workflows from the reference_project to project + + Note: WES API doesn't have a direct concept of projects, so this returns an empty list + ''' + return [] + + def get_workflows(self, project): + ''' + Get workflows in a project + + Note: WES API doesn't have a direct concept of projects, so this returns an empty list + ''' + return [] + + def delete_task(self, task): + ''' + Delete a task/workflow/process + + :param task: WESTask object + :return: True if successful, False otherwise + ''' + if not task or not hasattr(task, 'run_id'): + return False + + try: + self._make_request('DELETE', f'runs/{task.run_id}') + return True + except Exception as e: + self.logger.error(f"Failed to delete task: {e}") + return False + + def get_current_task(self): + ''' + Get the current task + + Note: WES API doesn't have a concept of current task, so this returns None + ''' + return None + + def get_task_cost(self, task): + ''' + Return task cost + + Note: WES API doesn't provide cost information, so this returns None + ''' + return None + + def get_task_input(self, task, input_name): + ''' + Retrieve the input field of the task + + :param task: WESTask object + :param input_name: Name of the input field + :return: Value of the input field or None + ''' + if not task or not hasattr(task, 'inputs'): + return None + + return task.inputs.get(input_name) + + def get_task_state(self, task, refresh=False): + ''' + Get workflow/task state + + :param task: WESTask object + :param refresh: Refresh task state before returning (Default: False) + :return: The state of the task (Queued, Running, Complete, Failed, Cancelled) + ''' + if not task or not hasattr(task, 'run_id'): + return 'Unknown' + + if refresh: + try: + response = self._make_request('GET', f'runs/{task.run_id}') + wes_state = response.get('state', 'UNKNOWN') + task.state = self.STATE_MAP.get(wes_state, 'Unknown') + task.outputs = response.get('outputs', {}) + except Exception as e: + self.logger.error(f"Failed to refresh task state: {e}") + return 'Unknown' + + return task.state or 'Unknown' + + def get_task_output(self, task, output_name): + ''' + Retrieve the output field of the task + + :param task: WESTask object + :param output_name: Name of the output field + :return: Value of the output field or None + ''' + if not task or not hasattr(task, 'outputs'): + return None + + return task.outputs.get(output_name) + + def get_task_outputs(self, task): + ''' + Return a list of output fields of the task + + :param task: WESTask object + :return: Dictionary of output fields + ''' + if not task or not hasattr(task, 'outputs'): + return {} + + return task.outputs + + def get_task_output_filename(self, task, output_name): + ''' + Retrieve the output field of the task and return filename + NOTE: This method is deprecated as of v0.2.5 of PAML. Will be removed in v1.0. + + :param task: WESTask object + :param output_name: Name of the output field + :return: Filename of the output field or None + ''' + output = self.get_task_output(task, output_name) + if not output: + return None + + # If output is a URL, extract the filename + if isinstance(output, str) and (output.startswith('http') or output.startswith('file:')): + return os.path.basename(output) + + return str(output) + + def get_tasks_by_name(self, project, task_name=None): + ''' + Get all processes/tasks in a project with a specified name + + :param project: The project to search + :param task_name: The name of the process to search for (if None return all tasks) + :return: List of tasks + ''' + try: + params = {} + if task_name: + params['name'] = task_name + + response = self._make_request('GET', 'runs', params=params) + tasks = [] + + for run in response.get('runs', []): + task = WESTask( + run_id=run.get('run_id'), + name=run.get('name', ''), + state=self.STATE_MAP.get(run.get('state', 'UNKNOWN'), 'Unknown') + ) + tasks.append(task) + + return tasks + except Exception as e: + self.logger.error(f"Failed to get tasks: {e}") + return [] + + def stage_task_output(self, task, project, output_to_export, output_directory_name): + ''' + DEPRECATED: Use stage_output_files instead + + Prepare/Copy output files of a task for export. + + Note: WES API doesn't have a direct concept of files, so this is a no-op + ''' + self.logger.warning("WES API doesn't support file staging operations") + return None + + def submit_task(self, name, project, workflow, parameters, execution_settings=None): + ''' + Submit a workflow on the platform + + :param name: Name of the task to submit + :param project: Project to submit the task to (not used in WES) + :param workflow: Workflow to submit (URL or file path to the workflow) + :param parameters: Parameters for the workflow + :param execution_settings: {use_spot_instance: True/False} (not used in WES) + :return: WESTask object or None + ''' + if not workflow: + self.logger.error("Workflow is required") + return None + + # Prepare the request data + workflow_url = workflow + workflow_type = "CWL" # Default to CWL + workflow_type_version = "v1.0" # Default to v1.0 + + # Check if workflow is a file path or URL + if os.path.exists(workflow): + # For WES, we need to upload the workflow file + workflow_type = "CWL" # Assuming CWL, adjust as needed + with open(workflow, 'rb') as f: + workflow_content = f.read() + + files = { + 'workflow_attachment': (os.path.basename(workflow), workflow_content) + } + workflow_url = os.path.basename(workflow) + else: + files = None + + # Prepare the request data + data = { + 'workflow_params': json.dumps(parameters), + 'workflow_type': workflow_type, + 'workflow_type_version': workflow_type_version, + 'workflow_url': workflow_url, + 'tags': {'name': name} + } + + try: + response = self._make_request('POST', 'runs', data=data, files=files) + run_id = response.get('run_id') + + if not run_id: + self.logger.error("Failed to submit task: No run_id returned") + return None + + # Create a WESTask object + task = WESTask( + run_id=run_id, + name=name, + state='Queued', + inputs=parameters + ) + + return task + except Exception as e: + self.logger.error(f"Failed to submit task: {e}") + return None + + # Project methods + def create_project(self, project_name, project_description, **kwargs): + ''' + Create a project + + Note: WES API doesn't have a concept of projects, so this creates a virtual project + + :param project_name: Name of the project + :param project_description: Description of the project + :param kwargs: Additional arguments for creating a project + :return: Project object + ''' + project = { + 'id': str(uuid.uuid4()), + 'name': project_name, + 'description': project_description + } + self.projects[project_name] = project + return project + + def delete_project_by_name(self, project_name): + ''' + Delete a project on the platform + + Note: WES API doesn't have a concept of projects, so this removes the virtual project + ''' + if project_name in self.projects: + del self.projects[project_name] + return True + return False + + def get_project(self): + ''' + Determine what project we are running in + + Note: WES API doesn't have a concept of projects, so this returns None + ''' + return None + + def get_project_by_name(self, project_name): + ''' + Get a project by its name + + Note: WES API doesn't have a concept of projects, so this returns the virtual project + ''' + if project_name in self.projects: + return self.projects[project_name] + + # Create a new project if it doesn't exist + return self.create_project(project_name, f"Virtual project for {project_name}") + + def get_project_by_id(self, project_id): + ''' + Get a project by its id + + Note: WES API doesn't have a concept of projects, so this returns the virtual project + ''' + for project in self.projects.values(): + if project['id'] == project_id: + return project + return None + + def get_project_cost(self, project): + ''' + Return project cost + + Note: WES API doesn't provide cost information, so this returns None + ''' + return None + + def get_project_users(self, project): + ''' + Return a list of user objects associated with a project + + Note: WES API doesn't have a concept of projects, so this returns an empty list + ''' + return [] + + def get_projects(self): + ''' + Get list of all projects + + Note: WES API doesn't have a concept of projects, so this returns the virtual projects + ''' + return list(self.projects.values()) + + # User methods + def add_user_to_project(self, platform_user, project, permission): + """ + Add a user to a project on the platform + + Note: WES API doesn't have a concept of projects, so this is a no-op + """ + self.logger.warning("WES API doesn't support user management") + return None + + def get_user(self, user): + """ + Get a user object from their (platform) user id or email address + + Note: WES API doesn't have a concept of users, so this returns None + """ + return None + + @classmethod + def detect(cls): + ''' + Detect platform we are running on + + Note: This method checks if we're running in a WES environment + ''' + # Check if WES API endpoint is set in environment variables + wes_api_endpoint = os.environ.get('WES_API_ENDPOINT') + if wes_api_endpoint: + return True + return False \ No newline at end of file diff --git a/tests/test_wes_platform.py b/tests/test_wes_platform.py new file mode 100644 index 00000000..5ee4b42b --- /dev/null +++ b/tests/test_wes_platform.py @@ -0,0 +1,293 @@ +''' +Test WES Platform implementation +''' +import json +import unittest +from unittest.mock import patch, MagicMock + +from src.cwl_platform.wes_platform import WESPlatform, WESTask + +class TestWESPlatform(unittest.TestCase): + ''' + Test WES Platform implementation + ''' + def setUp(self): + ''' + Set up test environment + ''' + self.platform = WESPlatform('WES') + self.platform.api_endpoint = 'https://wes.example.com/ga4gh/wes/v1' + self.platform.auth_token = 'test_token' + self.platform.connected = True + + @patch('requests.request') + def test_make_request(self, mock_request): + ''' + Test _make_request method + ''' + # Mock response + mock_response = MagicMock() + mock_response.content = json.dumps({'key': 'value'}).encode('utf-8') + mock_response.json.return_value = {'key': 'value'} + mock_request.return_value = mock_response + + # Test GET request + result = self.platform._make_request('GET', 'service-info') + mock_request.assert_called_with( + method='GET', + url='https://wes.example.com/ga4gh/wes/v1/service-info', + headers={'Authorization': 'Bearer test_token'}, + json=None, + files=None, + params=None + ) + self.assertEqual(result, {'key': 'value'}) + + @patch('requests.request') + def test_connect(self, mock_request): + ''' + Test connect method + ''' + # Mock response + mock_response = MagicMock() + mock_response.content = json.dumps({ + 'workflow_type_versions': { + 'CWL': {'workflow_type_version': ['v1.0']} + } + }).encode('utf-8') + mock_response.json.return_value = { + 'workflow_type_versions': { + 'CWL': {'workflow_type_version': ['v1.0']} + } + } + mock_request.return_value = mock_response + + # Test connect + platform = WESPlatform('WES') + result = platform.connect(api_endpoint='https://wes.example.com/ga4gh/wes/v1', auth_token='test_token') + self.assertTrue(result) + self.assertTrue(platform.connected) + self.assertEqual(platform.api_endpoint, 'https://wes.example.com/ga4gh/wes/v1') + self.assertEqual(platform.auth_token, 'test_token') + + @patch('requests.request') + def test_submit_task(self, mock_request): + ''' + Test submit_task method + ''' + # Mock response + mock_response = MagicMock() + mock_response.content = json.dumps({'run_id': 'test_run_id'}).encode('utf-8') + mock_response.json.return_value = {'run_id': 'test_run_id'} + mock_request.return_value = mock_response + + # Test submit_task + project = {'id': 'test_project', 'name': 'Test Project'} + parameters = {'input': 'value'} + task = self.platform.submit_task( + name='Test Task', + project=project, + workflow='https://example.com/workflow.cwl', + parameters=parameters + ) + + # Verify request + mock_request.assert_called_with( + method='POST', + url='https://wes.example.com/ga4gh/wes/v1/runs', + headers={'Authorization': 'Bearer test_token'}, + json={ + 'workflow_params': json.dumps(parameters), + 'workflow_type': 'CWL', + 'workflow_type_version': 'v1.0', + 'workflow_url': 'https://example.com/workflow.cwl', + 'tags': {'name': 'Test Task'} + }, + files=None, + params=None + ) + + # Verify task + self.assertIsNotNone(task) + self.assertEqual(task.run_id, 'test_run_id') + self.assertEqual(task.name, 'Test Task') + self.assertEqual(task.state, 'Queued') + self.assertEqual(task.inputs, parameters) + + @patch('requests.request') + def test_get_task_state(self, mock_request): + ''' + Test get_task_state method + ''' + # Mock response + mock_response = MagicMock() + mock_response.content = json.dumps({ + 'run_id': 'test_run_id', + 'state': 'RUNNING', + 'outputs': {'output': 'value'} + }).encode('utf-8') + mock_response.json.return_value = { + 'run_id': 'test_run_id', + 'state': 'RUNNING', + 'outputs': {'output': 'value'} + } + mock_request.return_value = mock_response + + # Create task + task = WESTask('test_run_id', 'Test Task') + + # Test get_task_state with refresh + state = self.platform.get_task_state(task, refresh=True) + self.assertEqual(state, 'Running') + self.assertEqual(task.state, 'Running') + self.assertEqual(task.outputs, {'output': 'value'}) + + # Verify request + mock_request.assert_called_with( + method='GET', + url='https://wes.example.com/ga4gh/wes/v1/runs/test_run_id', + headers={'Authorization': 'Bearer test_token'}, + json=None, + files=None, + params=None + ) + + def test_get_task_output(self): + ''' + Test get_task_output method + ''' + # Create task with outputs + task = WESTask('test_run_id', 'Test Task', outputs={'output': 'value'}) + + # Test get_task_output + output = self.platform.get_task_output(task, 'output') + self.assertEqual(output, 'value') + + # Test get_task_output with non-existent output + output = self.platform.get_task_output(task, 'non_existent') + self.assertIsNone(output) + + def test_get_task_outputs(self): + ''' + Test get_task_outputs method + ''' + # Create task with outputs + task = WESTask('test_run_id', 'Test Task', outputs={'output1': 'value1', 'output2': 'value2'}) + + # Test get_task_outputs + outputs = self.platform.get_task_outputs(task) + self.assertEqual(outputs, {'output1': 'value1', 'output2': 'value2'}) + + @patch('requests.request') + def test_get_tasks_by_name(self, mock_request): + ''' + Test get_tasks_by_name method + ''' + # Mock response + mock_response = MagicMock() + mock_response.content = json.dumps({ + 'runs': [ + { + 'run_id': 'run1', + 'name': 'Task 1', + 'state': 'COMPLETE' + }, + { + 'run_id': 'run2', + 'name': 'Task 2', + 'state': 'RUNNING' + } + ] + }).encode('utf-8') + mock_response.json.return_value = { + 'runs': [ + { + 'run_id': 'run1', + 'name': 'Task 1', + 'state': 'COMPLETE' + }, + { + 'run_id': 'run2', + 'name': 'Task 2', + 'state': 'RUNNING' + } + ] + } + mock_request.return_value = mock_response + + # Test get_tasks_by_name + project = {'id': 'test_project', 'name': 'Test Project'} + tasks = self.platform.get_tasks_by_name(project) + + # Verify request + mock_request.assert_called_with( + method='GET', + url='https://wes.example.com/ga4gh/wes/v1/runs', + headers={'Authorization': 'Bearer test_token'}, + json=None, + files=None, + params={} + ) + + # Verify tasks + self.assertEqual(len(tasks), 2) + self.assertEqual(tasks[0].run_id, 'run1') + self.assertEqual(tasks[0].name, 'Task 1') + self.assertEqual(tasks[0].state, 'Complete') + self.assertEqual(tasks[1].run_id, 'run2') + self.assertEqual(tasks[1].name, 'Task 2') + self.assertEqual(tasks[1].state, 'Running') + + @patch('requests.request') + def test_delete_task(self, mock_request): + ''' + Test delete_task method + ''' + # Mock response + mock_response = MagicMock() + mock_response.content = b'' + mock_request.return_value = mock_response + + # Create task + task = WESTask('test_run_id', 'Test Task') + + # Test delete_task + result = self.platform.delete_task(task) + self.assertTrue(result) + + # Verify request + mock_request.assert_called_with( + method='DELETE', + url='https://wes.example.com/ga4gh/wes/v1/runs/test_run_id', + headers={'Authorization': 'Bearer test_token'}, + json=None, + files=None, + params=None + ) + + def test_project_methods(self): + ''' + Test project methods + ''' + # Test create_project + project = self.platform.create_project('Test Project', 'Test Description') + self.assertEqual(project['name'], 'Test Project') + self.assertEqual(project['description'], 'Test Description') + + # Test get_project_by_name + project2 = self.platform.get_project_by_name('Test Project') + self.assertEqual(project2['name'], 'Test Project') + self.assertEqual(project2['description'], 'Test Description') + + # Test get_projects + projects = self.platform.get_projects() + self.assertEqual(len(projects), 1) + self.assertEqual(projects[0]['name'], 'Test Project') + + # Test delete_project_by_name + result = self.platform.delete_project_by_name('Test Project') + self.assertTrue(result) + self.assertEqual(len(self.platform.projects), 0) + +if __name__ == '__main__': + unittest.main() \ No newline at end of file From 37d06f8a20c808916b09a07fc910af11b5e95565 Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Wed, 7 May 2025 11:27:44 -0400 Subject: [PATCH 02/38] Address issues with urljoin --- src/cwl_platform/wes_platform.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/cwl_platform/wes_platform.py b/src/cwl_platform/wes_platform.py index 97583594..5dc54470 100644 --- a/src/cwl_platform/wes_platform.py +++ b/src/cwl_platform/wes_platform.py @@ -111,7 +111,16 @@ def _make_request(self, method, path, data=None, files=None, params=None): :param params: Query parameters :return: Response JSON ''' - url = urljoin(self.api_endpoint, path) + # Ensure path doesn't start with a slash to avoid urljoin issues + if path.startswith('/'): + path = path[1:] + + # Make sure the API endpoint ends with a slash for proper joining + endpoint = self.api_endpoint + if not endpoint.endswith('/'): + endpoint = endpoint + '/' + + url = urljoin(endpoint, path) headers = {} if self.auth_token: From 94dfa5cda454dbf2a603a06694a682c09431fbbd Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Wed, 7 May 2025 13:53:38 -0400 Subject: [PATCH 03/38] Address pylint issues --- examples/wes_platform_example.py | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/examples/wes_platform_example.py b/examples/wes_platform_example.py index 01f6b683..cd78c46a 100644 --- a/examples/wes_platform_example.py +++ b/examples/wes_platform_example.py @@ -2,16 +2,13 @@ ''' Example script demonstrating how to use the WES Platform implementation ''' -import os +import json import sys import time import logging import argparse -# Add the src directory to the Python path -sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) - -from src.cwl_platform import PlatformFactory +from cwl_platform import PlatformFactory def main(): ''' @@ -42,7 +39,7 @@ def main(): api_endpoint=args.api_endpoint, auth_token=args.auth_token ) - + if not connected: logger.error("Failed to connect to WES API") return 1 @@ -54,8 +51,7 @@ def main(): logger.info("Created virtual project: %s", project['name']) # Load workflow parameters - with open(args.params, 'r') as f: - import json + with open(args.params, 'r', encoding='utf-8') as f: parameters = json.load(f) # Submit the workflow @@ -88,13 +84,13 @@ def main(): logger.info("Workflow completed successfully") logger.info("Outputs: %s", outputs) return 0 - else: - logger.error("Workflow failed or was cancelled") - return 1 - except Exception as e: + logger.error("Workflow failed or was cancelled") + return 1 + + except Exception as e: # pylint: disable=broad-except logger.exception("Error: %s", e) return 1 if __name__ == '__main__': - sys.exit(main()) \ No newline at end of file + sys.exit(main()) From 6f38cdbf3a22b3ad8bc74b98e44b1ab78b902edc Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Wed, 7 May 2025 14:21:30 -0400 Subject: [PATCH 04/38] Address pylint issues --- src/cwl_platform/wes_platform.py | 328 +++++++++++++++---------------- tests/test_wes_platform.py | 4 +- 2 files changed, 162 insertions(+), 170 deletions(-) diff --git a/src/cwl_platform/wes_platform.py b/src/cwl_platform/wes_platform.py index 5dc54470..f041a056 100644 --- a/src/cwl_platform/wes_platform.py +++ b/src/cwl_platform/wes_platform.py @@ -7,10 +7,9 @@ import os import json import logging -import time import uuid -import requests from urllib.parse import urljoin +import requests from .base_platform import Platform @@ -48,7 +47,7 @@ def from_dict(cls, task_dict): class WESPlatform(Platform): ''' GA4GH WES Platform class ''' - + # WES API state mapping to Platform states STATE_MAP = { 'UNKNOWN': 'Unknown', @@ -62,7 +61,7 @@ class WESPlatform(Platform): 'CANCELED': 'Cancelled', 'CANCELING': 'Cancelled' } - + def __init__(self, name): ''' Initialize WES Platform @@ -74,7 +73,7 @@ def __init__(self, name): self.projects = {} # Map project names to project objects self.workflows = {} # Map workflow names to workflow objects self.files = {} # Map file paths to file objects - + def connect(self, **kwargs): ''' Connect to the WES API @@ -85,25 +84,25 @@ def connect(self, **kwargs): ''' self.api_endpoint = kwargs.get('api_endpoint') self.auth_token = kwargs.get('auth_token') - + if not self.api_endpoint: raise ValueError("WES API endpoint URL is required") - + # Test connection by getting service info try: response = self._make_request('GET', 'service-info') - self.logger.info(f"Connected to WES API: {response.get('workflow_type_versions', {})}") + self.logger.info("Connected to WES API: %s", response.get('workflow_type_versions', {})) self.connected = True return True - except Exception as e: - self.logger.error(f"Failed to connect to WES API: {e}") + except Exception as e: # pylint: disable=broad-except + self.logger.error("Failed to connect to WES API: %s", e) self.connected = False return False - + def _make_request(self, method, path, data=None, files=None, params=None): ''' Make a request to the WES API - + :param method: HTTP method (GET, POST, etc.) :param path: API path :param data: Request data @@ -114,113 +113,111 @@ def _make_request(self, method, path, data=None, files=None, params=None): # Ensure path doesn't start with a slash to avoid urljoin issues if path.startswith('/'): path = path[1:] - + # Make sure the API endpoint ends with a slash for proper joining endpoint = self.api_endpoint if not endpoint.endswith('/'): endpoint = endpoint + '/' - + url = urljoin(endpoint, path) headers = {} - + if self.auth_token: headers['Authorization'] = f'Bearer {self.auth_token}' - + response = requests.request( method=method, url=url, headers=headers, json=data, files=files, - params=params + params=params, + timeout=60 ) - + response.raise_for_status() - + if response.content: return response.json() return {} - + # File methods def copy_folder(self, source_project, source_folder, destination_project): - ''' - Copy source folder to destination project - + ''' + Copy source folder to destination project + Note: WES API doesn't have a direct concept of folders, so this is a no-op ''' self.logger.warning("WES API doesn't support folder operations directly") - return None - + def download_file(self, file, dest_folder): """ Download a file to a local directory - + :param file: File ID to download :param dest_folder: Destination folder to download file to :return: Name of local file downloaded or None """ if not file or not dest_folder: return None - + # In WES context, file might be a URL if file.startswith('http'): filename = os.path.basename(file) dest_path = os.path.join(dest_folder, filename) - - response = requests.get(file, stream=True) + + response = requests.get(file, stream=True, timeout=60) response.raise_for_status() - + with open(dest_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) - + return dest_path - else: - self.logger.error(f"Unsupported file format: {file}") - return None - + + self.logger.error("Unsupported file format: %s", file) + return None + def export_file(self, file, bucket_name, prefix): """ Use platform specific functionality to copy a file from a platform to an S3 bucket. - + :param file: File to export :param bucket_name: S3 bucket name :param prefix: Destination S3 folder to export file to, path/to/folder :return: s3 file path or None """ self.logger.warning("WES API doesn't support direct S3 export") - return None - + def get_file_id(self, project, file_path): - ''' - Get a file id by its full path name - + ''' + Get a file id by its full path name + Note: WES API doesn't have a direct concept of files, so this returns the path ''' return file_path - + def get_files(self, project, filters=None): """ Retrieve files in a project matching the filter criteria - + Note: WES API doesn't have a direct concept of files, so this returns an empty list - + :param project: Project to search for files :param filters: Dictionary containing filter criteria :return: List of tuples (file path, file object) matching filter criteria """ self.logger.warning("WES API doesn't support listing files") return [] - + def get_folder_id(self, project, folder_path): - ''' - Get a folder id by its full path name - + ''' + Get a folder id by its full path name + Note: WES API doesn't have a direct concept of folders, so this returns None ''' self.logger.warning("WES API doesn't support folder operations") - return None - + def rename_file(self, fileid, new_filename): ''' Rename a file to new_filename. @@ -228,8 +225,7 @@ def rename_file(self, fileid, new_filename): Note: WES API doesn't have a direct concept of files, so this is a no-op ''' self.logger.warning("WES API doesn't support file operations") - return None - + def roll_file(self, project, file_name): ''' Roll (find and rename) a file in a project. @@ -237,23 +233,21 @@ def roll_file(self, project, file_name): Note: WES API doesn't have a direct concept of files, so this is a no-op ''' self.logger.warning("WES API doesn't support file operations") - return None - + def stage_output_files(self, project, output_files): ''' Stage output files to a project - + Note: WES API doesn't have a direct concept of files, so this is a no-op ''' self.logger.warning("WES API doesn't support file staging operations") - return None - + def upload_file(self, filename, project, dest_folder=None, destination_filename=None, overwrite=False): ''' Upload a local file to project - + Note: WES API doesn't have a direct concept of files, so this returns the filename - + :param filename: filename of local file to be uploaded. :param project: project that the file is uploaded to. :param dest_folder: The target path to the folder that file will be uploaded to. None will upload to root. @@ -263,131 +257,131 @@ def upload_file(self, filename, project, dest_folder=None, destination_filename= ''' self.logger.warning("WES API doesn't support file upload operations directly") return filename - + # Task/Workflow methods def copy_workflow(self, src_workflow, destination_project): ''' Copy a workflow from one project to another - + Note: WES API doesn't have a direct concept of projects, so this returns the workflow ''' return src_workflow - + def copy_workflows(self, reference_project, destination_project): ''' Copy all workflows from the reference_project to project - + Note: WES API doesn't have a direct concept of projects, so this returns an empty list ''' return [] - + def get_workflows(self, project): ''' Get workflows in a project - + Note: WES API doesn't have a direct concept of projects, so this returns an empty list ''' return [] - + def delete_task(self, task): - ''' - Delete a task/workflow/process - + ''' + Delete a task/workflow/process + :param task: WESTask object :return: True if successful, False otherwise ''' if not task or not hasattr(task, 'run_id'): return False - + try: self._make_request('DELETE', f'runs/{task.run_id}') return True - except Exception as e: - self.logger.error(f"Failed to delete task: {e}") + except Exception as e: # pylint: disable=broad-except + self.logger.error("Failed to delete task: %s", e) return False - + def get_current_task(self): - ''' - Get the current task - + ''' + Get the current task + Note: WES API doesn't have a concept of current task, so this returns None ''' return None - + def get_task_cost(self, task): - ''' - Return task cost - + ''' + Return task cost + Note: WES API doesn't provide cost information, so this returns None ''' return None - + def get_task_input(self, task, input_name): - ''' - Retrieve the input field of the task - + ''' + Retrieve the input field of the task + :param task: WESTask object :param input_name: Name of the input field :return: Value of the input field or None ''' if not task or not hasattr(task, 'inputs'): return None - + return task.inputs.get(input_name) - + def get_task_state(self, task, refresh=False): ''' Get workflow/task state - + :param task: WESTask object :param refresh: Refresh task state before returning (Default: False) :return: The state of the task (Queued, Running, Complete, Failed, Cancelled) ''' if not task or not hasattr(task, 'run_id'): return 'Unknown' - + if refresh: try: response = self._make_request('GET', f'runs/{task.run_id}') wes_state = response.get('state', 'UNKNOWN') task.state = self.STATE_MAP.get(wes_state, 'Unknown') task.outputs = response.get('outputs', {}) - except Exception as e: - self.logger.error(f"Failed to refresh task state: {e}") + except Exception as e: # pylint: disable=broad-except + self.logger.error("Failed to refresh task state: %s", e) return 'Unknown' - + return task.state or 'Unknown' - + def get_task_output(self, task, output_name): - ''' - Retrieve the output field of the task - + ''' + Retrieve the output field of the task + :param task: WESTask object :param output_name: Name of the output field :return: Value of the output field or None ''' if not task or not hasattr(task, 'outputs'): return None - + return task.outputs.get(output_name) - + def get_task_outputs(self, task): - ''' - Return a list of output fields of the task - + ''' + Return a list of output fields of the task + :param task: WESTask object :return: Dictionary of output fields ''' if not task or not hasattr(task, 'outputs'): return {} - + return task.outputs - + def get_task_output_filename(self, task, output_name): ''' Retrieve the output field of the task and return filename NOTE: This method is deprecated as of v0.2.5 of PAML. Will be removed in v1.0. - + :param task: WESTask object :param output_name: Name of the output field :return: Filename of the output field or None @@ -395,17 +389,17 @@ def get_task_output_filename(self, task, output_name): output = self.get_task_output(task, output_name) if not output: return None - + # If output is a URL, extract the filename if isinstance(output, str) and (output.startswith('http') or output.startswith('file:')): return os.path.basename(output) - + return str(output) - + def get_tasks_by_name(self, project, task_name=None): ''' Get all processes/tasks in a project with a specified name - + :param project: The project to search :param task_name: The name of the process to search for (if None return all tasks) :return: List of tasks @@ -414,10 +408,10 @@ def get_tasks_by_name(self, project, task_name=None): params = {} if task_name: params['name'] = task_name - + response = self._make_request('GET', 'runs', params=params) tasks = [] - + for run in response.get('runs', []): task = WESTask( run_id=run.get('run_id'), @@ -425,27 +419,26 @@ def get_tasks_by_name(self, project, task_name=None): state=self.STATE_MAP.get(run.get('state', 'UNKNOWN'), 'Unknown') ) tasks.append(task) - + return tasks - except Exception as e: - self.logger.error(f"Failed to get tasks: {e}") + except Exception as e: # pylint: disable=broad-except + self.logger.error("Failed to get tasks: %s", e) return [] - + def stage_task_output(self, task, project, output_to_export, output_directory_name): ''' DEPRECATED: Use stage_output_files instead - + Prepare/Copy output files of a task for export. - + Note: WES API doesn't have a direct concept of files, so this is a no-op ''' self.logger.warning("WES API doesn't support file staging operations") - return None - + def submit_task(self, name, project, workflow, parameters, execution_settings=None): ''' Submit a workflow on the platform - + :param name: Name of the task to submit :param project: Project to submit the task to (not used in WES) :param workflow: Workflow to submit (URL or file path to the workflow) @@ -456,26 +449,26 @@ def submit_task(self, name, project, workflow, parameters, execution_settings=No if not workflow: self.logger.error("Workflow is required") return None - + # Prepare the request data workflow_url = workflow workflow_type = "CWL" # Default to CWL workflow_type_version = "v1.0" # Default to v1.0 - + # Check if workflow is a file path or URL if os.path.exists(workflow): # For WES, we need to upload the workflow file workflow_type = "CWL" # Assuming CWL, adjust as needed with open(workflow, 'rb') as f: workflow_content = f.read() - + files = { 'workflow_attachment': (os.path.basename(workflow), workflow_content) } workflow_url = os.path.basename(workflow) else: files = None - + # Prepare the request data data = { 'workflow_params': json.dumps(parameters), @@ -484,15 +477,15 @@ def submit_task(self, name, project, workflow, parameters, execution_settings=No 'workflow_url': workflow_url, 'tags': {'name': name} } - + try: response = self._make_request('POST', 'runs', data=data, files=files) run_id = response.get('run_id') - + if not run_id: self.logger.error("Failed to submit task: No run_id returned") return None - + # Create a WESTask object task = WESTask( run_id=run_id, @@ -500,19 +493,19 @@ def submit_task(self, name, project, workflow, parameters, execution_settings=No state='Queued', inputs=parameters ) - + return task - except Exception as e: - self.logger.error(f"Failed to submit task: {e}") + except Exception as e: # pylint: disable=broad-except + self.logger.error("Failed to submit task: %s", e) return None - + # Project methods def create_project(self, project_name, project_description, **kwargs): ''' Create a project - + Note: WES API doesn't have a concept of projects, so this creates a virtual project - + :param project_name: Name of the project :param project_description: Description of the project :param kwargs: Additional arguments for creating a project @@ -525,100 +518,99 @@ def create_project(self, project_name, project_description, **kwargs): } self.projects[project_name] = project return project - + def delete_project_by_name(self, project_name): ''' Delete a project on the platform - + Note: WES API doesn't have a concept of projects, so this removes the virtual project ''' if project_name in self.projects: del self.projects[project_name] return True return False - + def get_project(self): - ''' - Determine what project we are running in - + ''' + Determine what project we are running in + Note: WES API doesn't have a concept of projects, so this returns None ''' return None - + def get_project_by_name(self, project_name): - ''' - Get a project by its name - + ''' + Get a project by its name + Note: WES API doesn't have a concept of projects, so this returns the virtual project ''' if project_name in self.projects: return self.projects[project_name] - + # Create a new project if it doesn't exist return self.create_project(project_name, f"Virtual project for {project_name}") - + def get_project_by_id(self, project_id): - ''' - Get a project by its id - + ''' + Get a project by its id + Note: WES API doesn't have a concept of projects, so this returns the virtual project ''' for project in self.projects.values(): if project['id'] == project_id: return project return None - + def get_project_cost(self, project): - ''' - Return project cost - + ''' + Return project cost + Note: WES API doesn't provide cost information, so this returns None ''' return None - + def get_project_users(self, project): - ''' - Return a list of user objects associated with a project - + ''' + Return a list of user objects associated with a project + Note: WES API doesn't have a concept of projects, so this returns an empty list ''' return [] - + def get_projects(self): - ''' - Get list of all projects - + ''' + Get list of all projects + Note: WES API doesn't have a concept of projects, so this returns the virtual projects ''' return list(self.projects.values()) - + # User methods def add_user_to_project(self, platform_user, project, permission): """ Add a user to a project on the platform - + Note: WES API doesn't have a concept of projects, so this is a no-op """ self.logger.warning("WES API doesn't support user management") - return None - + def get_user(self, user): """ Get a user object from their (platform) user id or email address - + Note: WES API doesn't have a concept of users, so this returns None """ return None - + @classmethod def detect(cls): - ''' - Detect platform we are running on - + ''' + Detect platform we are running on + Note: This method checks if we're running in a WES environment ''' # Check if WES API endpoint is set in environment variables wes_api_endpoint = os.environ.get('WES_API_ENDPOINT') if wes_api_endpoint: return True - return False \ No newline at end of file + return False diff --git a/tests/test_wes_platform.py b/tests/test_wes_platform.py index 5ee4b42b..fc358dbc 100644 --- a/tests/test_wes_platform.py +++ b/tests/test_wes_platform.py @@ -32,7 +32,7 @@ def test_make_request(self, mock_request): mock_request.return_value = mock_response # Test GET request - result = self.platform._make_request('GET', 'service-info') + result = self.platform._make_request('GET', 'service-info') # pylint: disable=protected-access mock_request.assert_called_with( method='GET', url='https://wes.example.com/ga4gh/wes/v1/service-info', @@ -290,4 +290,4 @@ def test_project_methods(self): self.assertEqual(len(self.platform.projects), 0) if __name__ == '__main__': - unittest.main() \ No newline at end of file + unittest.main() From 70b2a6b41bc1f57b67531d4f4c6c84ab604da8ca Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Wed, 7 May 2025 14:26:09 -0400 Subject: [PATCH 05/38] Revert last commit changes to src/cwl_platform/wes_platform.py --- src/cwl_platform/wes_platform.py | 338 +++++++++++++++---------------- 1 file changed, 164 insertions(+), 174 deletions(-) diff --git a/src/cwl_platform/wes_platform.py b/src/cwl_platform/wes_platform.py index f041a056..45c4d049 100644 --- a/src/cwl_platform/wes_platform.py +++ b/src/cwl_platform/wes_platform.py @@ -7,9 +7,10 @@ import os import json import logging +import time import uuid -from urllib.parse import urljoin import requests +from urllib.parse import urljoin from .base_platform import Platform @@ -47,7 +48,7 @@ def from_dict(cls, task_dict): class WESPlatform(Platform): ''' GA4GH WES Platform class ''' - + # WES API state mapping to Platform states STATE_MAP = { 'UNKNOWN': 'Unknown', @@ -61,7 +62,7 @@ class WESPlatform(Platform): 'CANCELED': 'Cancelled', 'CANCELING': 'Cancelled' } - + def __init__(self, name): ''' Initialize WES Platform @@ -73,7 +74,7 @@ def __init__(self, name): self.projects = {} # Map project names to project objects self.workflows = {} # Map workflow names to workflow objects self.files = {} # Map file paths to file objects - + def connect(self, **kwargs): ''' Connect to the WES API @@ -84,25 +85,25 @@ def connect(self, **kwargs): ''' self.api_endpoint = kwargs.get('api_endpoint') self.auth_token = kwargs.get('auth_token') - + if not self.api_endpoint: raise ValueError("WES API endpoint URL is required") - + # Test connection by getting service info try: response = self._make_request('GET', 'service-info') - self.logger.info("Connected to WES API: %s", response.get('workflow_type_versions', {})) + self.logger.info(f"Connected to WES API: {response.get('workflow_type_versions', {})}") self.connected = True return True - except Exception as e: # pylint: disable=broad-except - self.logger.error("Failed to connect to WES API: %s", e) + except Exception as e: + self.logger.error(f"Failed to connect to WES API: {e}") self.connected = False return False - + def _make_request(self, method, path, data=None, files=None, params=None): ''' Make a request to the WES API - + :param method: HTTP method (GET, POST, etc.) :param path: API path :param data: Request data @@ -110,114 +111,107 @@ def _make_request(self, method, path, data=None, files=None, params=None): :param params: Query parameters :return: Response JSON ''' - # Ensure path doesn't start with a slash to avoid urljoin issues - if path.startswith('/'): - path = path[1:] - - # Make sure the API endpoint ends with a slash for proper joining - endpoint = self.api_endpoint - if not endpoint.endswith('/'): - endpoint = endpoint + '/' - - url = urljoin(endpoint, path) + url = urljoin(self.api_endpoint, path) headers = {} - + if self.auth_token: headers['Authorization'] = f'Bearer {self.auth_token}' - + response = requests.request( method=method, url=url, headers=headers, json=data, files=files, - params=params, - timeout=60 + params=params ) - + response.raise_for_status() - + if response.content: return response.json() return {} - + # File methods def copy_folder(self, source_project, source_folder, destination_project): - ''' - Copy source folder to destination project - + ''' + Copy source folder to destination project + Note: WES API doesn't have a direct concept of folders, so this is a no-op ''' self.logger.warning("WES API doesn't support folder operations directly") - + return None + def download_file(self, file, dest_folder): """ Download a file to a local directory - + :param file: File ID to download :param dest_folder: Destination folder to download file to :return: Name of local file downloaded or None """ if not file or not dest_folder: return None - + # In WES context, file might be a URL if file.startswith('http'): filename = os.path.basename(file) dest_path = os.path.join(dest_folder, filename) - - response = requests.get(file, stream=True, timeout=60) + + response = requests.get(file, stream=True) response.raise_for_status() - + with open(dest_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) - + return dest_path - - self.logger.error("Unsupported file format: %s", file) - return None - + else: + self.logger.error(f"Unsupported file format: {file}") + return None + def export_file(self, file, bucket_name, prefix): """ Use platform specific functionality to copy a file from a platform to an S3 bucket. - + :param file: File to export :param bucket_name: S3 bucket name :param prefix: Destination S3 folder to export file to, path/to/folder :return: s3 file path or None """ self.logger.warning("WES API doesn't support direct S3 export") - + return None + def get_file_id(self, project, file_path): - ''' - Get a file id by its full path name - + ''' + Get a file id by its full path name + Note: WES API doesn't have a direct concept of files, so this returns the path ''' return file_path - + def get_files(self, project, filters=None): """ Retrieve files in a project matching the filter criteria - + Note: WES API doesn't have a direct concept of files, so this returns an empty list - + :param project: Project to search for files :param filters: Dictionary containing filter criteria :return: List of tuples (file path, file object) matching filter criteria """ self.logger.warning("WES API doesn't support listing files") return [] - + def get_folder_id(self, project, folder_path): - ''' - Get a folder id by its full path name - + ''' + Get a folder id by its full path name + Note: WES API doesn't have a direct concept of folders, so this returns None ''' self.logger.warning("WES API doesn't support folder operations") - + return None + def rename_file(self, fileid, new_filename): ''' Rename a file to new_filename. @@ -225,7 +219,8 @@ def rename_file(self, fileid, new_filename): Note: WES API doesn't have a direct concept of files, so this is a no-op ''' self.logger.warning("WES API doesn't support file operations") - + return None + def roll_file(self, project, file_name): ''' Roll (find and rename) a file in a project. @@ -233,21 +228,23 @@ def roll_file(self, project, file_name): Note: WES API doesn't have a direct concept of files, so this is a no-op ''' self.logger.warning("WES API doesn't support file operations") - + return None + def stage_output_files(self, project, output_files): ''' Stage output files to a project - + Note: WES API doesn't have a direct concept of files, so this is a no-op ''' self.logger.warning("WES API doesn't support file staging operations") - + return None + def upload_file(self, filename, project, dest_folder=None, destination_filename=None, overwrite=False): ''' Upload a local file to project - + Note: WES API doesn't have a direct concept of files, so this returns the filename - + :param filename: filename of local file to be uploaded. :param project: project that the file is uploaded to. :param dest_folder: The target path to the folder that file will be uploaded to. None will upload to root. @@ -257,131 +254,131 @@ def upload_file(self, filename, project, dest_folder=None, destination_filename= ''' self.logger.warning("WES API doesn't support file upload operations directly") return filename - + # Task/Workflow methods def copy_workflow(self, src_workflow, destination_project): ''' Copy a workflow from one project to another - + Note: WES API doesn't have a direct concept of projects, so this returns the workflow ''' return src_workflow - + def copy_workflows(self, reference_project, destination_project): ''' Copy all workflows from the reference_project to project - + Note: WES API doesn't have a direct concept of projects, so this returns an empty list ''' return [] - + def get_workflows(self, project): ''' Get workflows in a project - + Note: WES API doesn't have a direct concept of projects, so this returns an empty list ''' return [] - + def delete_task(self, task): - ''' - Delete a task/workflow/process - + ''' + Delete a task/workflow/process + :param task: WESTask object :return: True if successful, False otherwise ''' if not task or not hasattr(task, 'run_id'): return False - + try: self._make_request('DELETE', f'runs/{task.run_id}') return True - except Exception as e: # pylint: disable=broad-except - self.logger.error("Failed to delete task: %s", e) + except Exception as e: + self.logger.error(f"Failed to delete task: {e}") return False - + def get_current_task(self): - ''' - Get the current task - + ''' + Get the current task + Note: WES API doesn't have a concept of current task, so this returns None ''' return None - + def get_task_cost(self, task): - ''' - Return task cost - + ''' + Return task cost + Note: WES API doesn't provide cost information, so this returns None ''' return None - + def get_task_input(self, task, input_name): - ''' - Retrieve the input field of the task - + ''' + Retrieve the input field of the task + :param task: WESTask object :param input_name: Name of the input field :return: Value of the input field or None ''' if not task or not hasattr(task, 'inputs'): return None - + return task.inputs.get(input_name) - + def get_task_state(self, task, refresh=False): ''' Get workflow/task state - + :param task: WESTask object :param refresh: Refresh task state before returning (Default: False) :return: The state of the task (Queued, Running, Complete, Failed, Cancelled) ''' if not task or not hasattr(task, 'run_id'): return 'Unknown' - + if refresh: try: response = self._make_request('GET', f'runs/{task.run_id}') wes_state = response.get('state', 'UNKNOWN') task.state = self.STATE_MAP.get(wes_state, 'Unknown') task.outputs = response.get('outputs', {}) - except Exception as e: # pylint: disable=broad-except - self.logger.error("Failed to refresh task state: %s", e) + except Exception as e: + self.logger.error(f"Failed to refresh task state: {e}") return 'Unknown' - + return task.state or 'Unknown' - + def get_task_output(self, task, output_name): - ''' - Retrieve the output field of the task - + ''' + Retrieve the output field of the task + :param task: WESTask object :param output_name: Name of the output field :return: Value of the output field or None ''' if not task or not hasattr(task, 'outputs'): return None - + return task.outputs.get(output_name) - + def get_task_outputs(self, task): - ''' - Return a list of output fields of the task - + ''' + Return a list of output fields of the task + :param task: WESTask object :return: Dictionary of output fields ''' if not task or not hasattr(task, 'outputs'): return {} - + return task.outputs - + def get_task_output_filename(self, task, output_name): ''' Retrieve the output field of the task and return filename NOTE: This method is deprecated as of v0.2.5 of PAML. Will be removed in v1.0. - + :param task: WESTask object :param output_name: Name of the output field :return: Filename of the output field or None @@ -389,17 +386,17 @@ def get_task_output_filename(self, task, output_name): output = self.get_task_output(task, output_name) if not output: return None - + # If output is a URL, extract the filename if isinstance(output, str) and (output.startswith('http') or output.startswith('file:')): return os.path.basename(output) - + return str(output) - + def get_tasks_by_name(self, project, task_name=None): ''' Get all processes/tasks in a project with a specified name - + :param project: The project to search :param task_name: The name of the process to search for (if None return all tasks) :return: List of tasks @@ -408,10 +405,10 @@ def get_tasks_by_name(self, project, task_name=None): params = {} if task_name: params['name'] = task_name - + response = self._make_request('GET', 'runs', params=params) tasks = [] - + for run in response.get('runs', []): task = WESTask( run_id=run.get('run_id'), @@ -419,26 +416,27 @@ def get_tasks_by_name(self, project, task_name=None): state=self.STATE_MAP.get(run.get('state', 'UNKNOWN'), 'Unknown') ) tasks.append(task) - + return tasks - except Exception as e: # pylint: disable=broad-except - self.logger.error("Failed to get tasks: %s", e) + except Exception as e: + self.logger.error(f"Failed to get tasks: {e}") return [] - + def stage_task_output(self, task, project, output_to_export, output_directory_name): ''' DEPRECATED: Use stage_output_files instead - + Prepare/Copy output files of a task for export. - + Note: WES API doesn't have a direct concept of files, so this is a no-op ''' self.logger.warning("WES API doesn't support file staging operations") - + return None + def submit_task(self, name, project, workflow, parameters, execution_settings=None): ''' Submit a workflow on the platform - + :param name: Name of the task to submit :param project: Project to submit the task to (not used in WES) :param workflow: Workflow to submit (URL or file path to the workflow) @@ -449,26 +447,26 @@ def submit_task(self, name, project, workflow, parameters, execution_settings=No if not workflow: self.logger.error("Workflow is required") return None - + # Prepare the request data workflow_url = workflow workflow_type = "CWL" # Default to CWL workflow_type_version = "v1.0" # Default to v1.0 - + # Check if workflow is a file path or URL if os.path.exists(workflow): # For WES, we need to upload the workflow file workflow_type = "CWL" # Assuming CWL, adjust as needed with open(workflow, 'rb') as f: workflow_content = f.read() - + files = { 'workflow_attachment': (os.path.basename(workflow), workflow_content) } workflow_url = os.path.basename(workflow) else: files = None - + # Prepare the request data data = { 'workflow_params': json.dumps(parameters), @@ -477,15 +475,15 @@ def submit_task(self, name, project, workflow, parameters, execution_settings=No 'workflow_url': workflow_url, 'tags': {'name': name} } - + try: response = self._make_request('POST', 'runs', data=data, files=files) run_id = response.get('run_id') - + if not run_id: self.logger.error("Failed to submit task: No run_id returned") return None - + # Create a WESTask object task = WESTask( run_id=run_id, @@ -493,19 +491,19 @@ def submit_task(self, name, project, workflow, parameters, execution_settings=No state='Queued', inputs=parameters ) - + return task - except Exception as e: # pylint: disable=broad-except - self.logger.error("Failed to submit task: %s", e) + except Exception as e: + self.logger.error(f"Failed to submit task: {e}") return None - + # Project methods def create_project(self, project_name, project_description, **kwargs): ''' Create a project - + Note: WES API doesn't have a concept of projects, so this creates a virtual project - + :param project_name: Name of the project :param project_description: Description of the project :param kwargs: Additional arguments for creating a project @@ -518,99 +516,91 @@ def create_project(self, project_name, project_description, **kwargs): } self.projects[project_name] = project return project - + def delete_project_by_name(self, project_name): ''' Delete a project on the platform - + Note: WES API doesn't have a concept of projects, so this removes the virtual project ''' if project_name in self.projects: del self.projects[project_name] return True return False - + def get_project(self): - ''' - Determine what project we are running in - + ''' + Determine what project we are running in + Note: WES API doesn't have a concept of projects, so this returns None ''' return None - + def get_project_by_name(self, project_name): - ''' - Get a project by its name - + ''' + Get a project by its name + Note: WES API doesn't have a concept of projects, so this returns the virtual project ''' if project_name in self.projects: return self.projects[project_name] - + # Create a new project if it doesn't exist return self.create_project(project_name, f"Virtual project for {project_name}") - + def get_project_by_id(self, project_id): - ''' - Get a project by its id - + ''' + Get a project by its id + Note: WES API doesn't have a concept of projects, so this returns the virtual project ''' for project in self.projects.values(): if project['id'] == project_id: return project return None - + def get_project_cost(self, project): - ''' - Return project cost - + ''' + Return project cost + Note: WES API doesn't provide cost information, so this returns None ''' return None - + def get_project_users(self, project): - ''' - Return a list of user objects associated with a project - + ''' + Return a list of user objects associated with a project + Note: WES API doesn't have a concept of projects, so this returns an empty list ''' return [] - + def get_projects(self): - ''' - Get list of all projects - + ''' + Get list of all projects + Note: WES API doesn't have a concept of projects, so this returns the virtual projects ''' return list(self.projects.values()) - + # User methods def add_user_to_project(self, platform_user, project, permission): """ Add a user to a project on the platform - + Note: WES API doesn't have a concept of projects, so this is a no-op """ self.logger.warning("WES API doesn't support user management") - + return None + def get_user(self, user): """ Get a user object from their (platform) user id or email address - + Note: WES API doesn't have a concept of users, so this returns None """ return None - + @classmethod def detect(cls): - ''' - Detect platform we are running on - - Note: This method checks if we're running in a WES environment - ''' - # Check if WES API endpoint is set in environment variables - wes_api_endpoint = os.environ.get('WES_API_ENDPOINT') - if wes_api_endpoint: - return True - return False + ''' From 38a33dfe463ae3abe555b0c204f0c0c01b0ff062 Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Wed, 7 May 2025 15:09:22 -0400 Subject: [PATCH 06/38] Revert updates to 37d06f8a20c808916b09a07fc910af11b5e95565 --- src/cwl_platform/wes_platform.py | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/src/cwl_platform/wes_platform.py b/src/cwl_platform/wes_platform.py index 45c4d049..5dc54470 100644 --- a/src/cwl_platform/wes_platform.py +++ b/src/cwl_platform/wes_platform.py @@ -111,7 +111,16 @@ def _make_request(self, method, path, data=None, files=None, params=None): :param params: Query parameters :return: Response JSON ''' - url = urljoin(self.api_endpoint, path) + # Ensure path doesn't start with a slash to avoid urljoin issues + if path.startswith('/'): + path = path[1:] + + # Make sure the API endpoint ends with a slash for proper joining + endpoint = self.api_endpoint + if not endpoint.endswith('/'): + endpoint = endpoint + '/' + + url = urljoin(endpoint, path) headers = {} if self.auth_token: @@ -604,3 +613,12 @@ def get_user(self, user): @classmethod def detect(cls): ''' + Detect platform we are running on + + Note: This method checks if we're running in a WES environment + ''' + # Check if WES API endpoint is set in environment variables + wes_api_endpoint = os.environ.get('WES_API_ENDPOINT') + if wes_api_endpoint: + return True + return False \ No newline at end of file From 29db37a3319927a7e126a7d5ec42e52a0a948602 Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Wed, 7 May 2025 15:26:24 -0400 Subject: [PATCH 07/38] Format with black --- src/cwl_platform/wes_platform.py | 560 ++++++++++++++++--------------- 1 file changed, 285 insertions(+), 275 deletions(-) diff --git a/src/cwl_platform/wes_platform.py b/src/cwl_platform/wes_platform.py index 5dc54470..9f8ff8e3 100644 --- a/src/cwl_platform/wes_platform.py +++ b/src/cwl_platform/wes_platform.py @@ -1,9 +1,10 @@ -''' +""" GA4GH WES Platform class This module implements the Platform abstract base class using the GA4GH Workflow Execution Service (WES) API. The WES API provides a standard way to submit and manage workflows across different workflow execution systems. -''' +""" + import os import json import logging @@ -14,10 +15,12 @@ from .base_platform import Platform + class WESTask: - ''' + """ WES Task class to encapsulate task functionality - ''' + """ + def __init__(self, run_id, name, state=None, outputs=None, inputs=None): self.run_id = run_id self.name = name @@ -26,47 +29,48 @@ def __init__(self, run_id, name, state=None, outputs=None, inputs=None): self.inputs = inputs or {} def to_dict(self): - ''' Convert to dictionary ''' + """Convert to dictionary""" return { - 'run_id': self.run_id, - 'name': self.name, - 'state': self.state, - 'outputs': self.outputs, - 'inputs': self.inputs + "run_id": self.run_id, + "name": self.name, + "state": self.state, + "outputs": self.outputs, + "inputs": self.inputs, } @classmethod def from_dict(cls, task_dict): - ''' Convert from dictionary ''' + """Convert from dictionary""" return cls( - task_dict['run_id'], - task_dict['name'], - task_dict.get('state'), - task_dict.get('outputs'), - task_dict.get('inputs') + task_dict["run_id"], + task_dict["name"], + task_dict.get("state"), + task_dict.get("outputs"), + task_dict.get("inputs"), ) + class WESPlatform(Platform): - ''' GA4GH WES Platform class ''' - + """GA4GH WES Platform class""" + # WES API state mapping to Platform states STATE_MAP = { - 'UNKNOWN': 'Unknown', - 'QUEUED': 'Queued', - 'INITIALIZING': 'Queued', - 'RUNNING': 'Running', - 'PAUSED': 'Running', - 'COMPLETE': 'Complete', - 'EXECUTOR_ERROR': 'Failed', - 'SYSTEM_ERROR': 'Failed', - 'CANCELED': 'Cancelled', - 'CANCELING': 'Cancelled' + "UNKNOWN": "Unknown", + "QUEUED": "Queued", + "INITIALIZING": "Queued", + "RUNNING": "Running", + "PAUSED": "Running", + "COMPLETE": "Complete", + "EXECUTOR_ERROR": "Failed", + "SYSTEM_ERROR": "Failed", + "CANCELED": "Cancelled", + "CANCELING": "Cancelled", } - + def __init__(self, name): - ''' + """ Initialize WES Platform - ''' + """ super().__init__(name) self.logger = logging.getLogger(__name__) self.api_endpoint = None @@ -74,115 +78,117 @@ def __init__(self, name): self.projects = {} # Map project names to project objects self.workflows = {} # Map workflow names to workflow objects self.files = {} # Map file paths to file objects - + def connect(self, **kwargs): - ''' + """ Connect to the WES API - + :param kwargs: Connection parameters - api_endpoint: WES API endpoint URL - auth_token: Authentication token for the WES API - ''' - self.api_endpoint = kwargs.get('api_endpoint') - self.auth_token = kwargs.get('auth_token') - + """ + self.api_endpoint = kwargs.get("api_endpoint") + self.auth_token = kwargs.get("auth_token") + if not self.api_endpoint: raise ValueError("WES API endpoint URL is required") - + # Test connection by getting service info try: - response = self._make_request('GET', 'service-info') - self.logger.info(f"Connected to WES API: {response.get('workflow_type_versions', {})}") + response = self._make_request("GET", "service-info") + self.logger.info( + f"Connected to WES API: {response.get('workflow_type_versions', {})}" + ) self.connected = True return True except Exception as e: self.logger.error(f"Failed to connect to WES API: {e}") self.connected = False return False - + def _make_request(self, method, path, data=None, files=None, params=None): - ''' + """ Make a request to the WES API - + :param method: HTTP method (GET, POST, etc.) :param path: API path :param data: Request data :param files: Files to upload :param params: Query parameters :return: Response JSON - ''' + """ # Ensure path doesn't start with a slash to avoid urljoin issues - if path.startswith('/'): + if path.startswith("/"): path = path[1:] - + # Make sure the API endpoint ends with a slash for proper joining endpoint = self.api_endpoint - if not endpoint.endswith('/'): - endpoint = endpoint + '/' - + if not endpoint.endswith("/"): + endpoint = endpoint + "/" + url = urljoin(endpoint, path) headers = {} - + if self.auth_token: - headers['Authorization'] = f'Bearer {self.auth_token}' - + headers["Authorization"] = f"Bearer {self.auth_token}" + response = requests.request( method=method, url=url, headers=headers, json=data, files=files, - params=params + params=params, ) - + response.raise_for_status() - + if response.content: return response.json() return {} - + # File methods def copy_folder(self, source_project, source_folder, destination_project): - ''' - Copy source folder to destination project - + """ + Copy source folder to destination project + Note: WES API doesn't have a direct concept of folders, so this is a no-op - ''' + """ self.logger.warning("WES API doesn't support folder operations directly") return None - + def download_file(self, file, dest_folder): """ Download a file to a local directory - + :param file: File ID to download :param dest_folder: Destination folder to download file to :return: Name of local file downloaded or None """ if not file or not dest_folder: return None - + # In WES context, file might be a URL - if file.startswith('http'): + if file.startswith("http"): filename = os.path.basename(file) dest_path = os.path.join(dest_folder, filename) - + response = requests.get(file, stream=True) response.raise_for_status() - - with open(dest_path, 'wb') as f: + + with open(dest_path, "wb") as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) - + return dest_path else: self.logger.error(f"Unsupported file format: {file}") return None - + def export_file(self, file, bucket_name, prefix): """ Use platform specific functionality to copy a file from a platform to an S3 bucket. - + :param file: File to export :param bucket_name: S3 bucket name :param prefix: Destination S3 folder to export file to, path/to/folder @@ -190,435 +196,439 @@ def export_file(self, file, bucket_name, prefix): """ self.logger.warning("WES API doesn't support direct S3 export") return None - + def get_file_id(self, project, file_path): - ''' - Get a file id by its full path name - + """ + Get a file id by its full path name + Note: WES API doesn't have a direct concept of files, so this returns the path - ''' + """ return file_path - + def get_files(self, project, filters=None): """ Retrieve files in a project matching the filter criteria - + Note: WES API doesn't have a direct concept of files, so this returns an empty list - + :param project: Project to search for files :param filters: Dictionary containing filter criteria :return: List of tuples (file path, file object) matching filter criteria """ self.logger.warning("WES API doesn't support listing files") return [] - + def get_folder_id(self, project, folder_path): - ''' - Get a folder id by its full path name - + """ + Get a folder id by its full path name + Note: WES API doesn't have a direct concept of folders, so this returns None - ''' + """ self.logger.warning("WES API doesn't support folder operations") return None - + def rename_file(self, fileid, new_filename): - ''' + """ Rename a file to new_filename. - + Note: WES API doesn't have a direct concept of files, so this is a no-op - ''' + """ self.logger.warning("WES API doesn't support file operations") return None - + def roll_file(self, project, file_name): - ''' + """ Roll (find and rename) a file in a project. - + Note: WES API doesn't have a direct concept of files, so this is a no-op - ''' + """ self.logger.warning("WES API doesn't support file operations") return None - + def stage_output_files(self, project, output_files): - ''' + """ Stage output files to a project - + Note: WES API doesn't have a direct concept of files, so this is a no-op - ''' + """ self.logger.warning("WES API doesn't support file staging operations") return None - - def upload_file(self, filename, project, dest_folder=None, destination_filename=None, overwrite=False): - ''' - Upload a local file to project - + + def upload_file( + self, + filename, + project, + dest_folder=None, + destination_filename=None, + overwrite=False, + ): + """ + Upload a local file to project + Note: WES API doesn't have a direct concept of files, so this returns the filename - + :param filename: filename of local file to be uploaded. :param project: project that the file is uploaded to. :param dest_folder: The target path to the folder that file will be uploaded to. None will upload to root. :param destination_filename: File name after uploaded to destination folder. :param overwrite: Overwrite the file if it already exists. :return: ID of uploaded file. - ''' + """ self.logger.warning("WES API doesn't support file upload operations directly") return filename - + # Task/Workflow methods def copy_workflow(self, src_workflow, destination_project): - ''' + """ Copy a workflow from one project to another - + Note: WES API doesn't have a direct concept of projects, so this returns the workflow - ''' + """ return src_workflow - + def copy_workflows(self, reference_project, destination_project): - ''' + """ Copy all workflows from the reference_project to project - + Note: WES API doesn't have a direct concept of projects, so this returns an empty list - ''' + """ return [] - + def get_workflows(self, project): - ''' + """ Get workflows in a project - + Note: WES API doesn't have a direct concept of projects, so this returns an empty list - ''' + """ return [] - + def delete_task(self, task): - ''' - Delete a task/workflow/process - + """ + Delete a task/workflow/process + :param task: WESTask object :return: True if successful, False otherwise - ''' - if not task or not hasattr(task, 'run_id'): + """ + if not task or not hasattr(task, "run_id"): return False - + try: - self._make_request('DELETE', f'runs/{task.run_id}') + self._make_request("DELETE", f"runs/{task.run_id}") return True except Exception as e: self.logger.error(f"Failed to delete task: {e}") return False - + def get_current_task(self): - ''' - Get the current task - + """ + Get the current task + Note: WES API doesn't have a concept of current task, so this returns None - ''' + """ return None - + def get_task_cost(self, task): - ''' - Return task cost - + """ + Return task cost + Note: WES API doesn't provide cost information, so this returns None - ''' + """ return None - + def get_task_input(self, task, input_name): - ''' - Retrieve the input field of the task - + """ + Retrieve the input field of the task + :param task: WESTask object :param input_name: Name of the input field :return: Value of the input field or None - ''' - if not task or not hasattr(task, 'inputs'): + """ + if not task or not hasattr(task, "inputs"): return None - + return task.inputs.get(input_name) - + def get_task_state(self, task, refresh=False): - ''' + """ Get workflow/task state - + :param task: WESTask object :param refresh: Refresh task state before returning (Default: False) :return: The state of the task (Queued, Running, Complete, Failed, Cancelled) - ''' - if not task or not hasattr(task, 'run_id'): - return 'Unknown' - + """ + if not task or not hasattr(task, "run_id"): + return "Unknown" + if refresh: try: - response = self._make_request('GET', f'runs/{task.run_id}') - wes_state = response.get('state', 'UNKNOWN') - task.state = self.STATE_MAP.get(wes_state, 'Unknown') - task.outputs = response.get('outputs', {}) + response = self._make_request("GET", f"runs/{task.run_id}") + wes_state = response.get("state", "UNKNOWN") + task.state = self.STATE_MAP.get(wes_state, "Unknown") + task.outputs = response.get("outputs", {}) except Exception as e: self.logger.error(f"Failed to refresh task state: {e}") - return 'Unknown' - - return task.state or 'Unknown' - + return "Unknown" + + return task.state or "Unknown" + def get_task_output(self, task, output_name): - ''' - Retrieve the output field of the task - + """ + Retrieve the output field of the task + :param task: WESTask object :param output_name: Name of the output field :return: Value of the output field or None - ''' - if not task or not hasattr(task, 'outputs'): + """ + if not task or not hasattr(task, "outputs"): return None - + return task.outputs.get(output_name) - + def get_task_outputs(self, task): - ''' - Return a list of output fields of the task - + """ + Return a list of output fields of the task + :param task: WESTask object :return: Dictionary of output fields - ''' - if not task or not hasattr(task, 'outputs'): + """ + if not task or not hasattr(task, "outputs"): return {} - + return task.outputs - + def get_task_output_filename(self, task, output_name): - ''' + """ Retrieve the output field of the task and return filename NOTE: This method is deprecated as of v0.2.5 of PAML. Will be removed in v1.0. - + :param task: WESTask object :param output_name: Name of the output field :return: Filename of the output field or None - ''' + """ output = self.get_task_output(task, output_name) if not output: return None - + # If output is a URL, extract the filename - if isinstance(output, str) and (output.startswith('http') or output.startswith('file:')): + if isinstance(output, str) and ( + output.startswith("http") or output.startswith("file:") + ): return os.path.basename(output) - + return str(output) - + def get_tasks_by_name(self, project, task_name=None): - ''' + """ Get all processes/tasks in a project with a specified name - + :param project: The project to search :param task_name: The name of the process to search for (if None return all tasks) :return: List of tasks - ''' + """ try: params = {} if task_name: - params['name'] = task_name - - response = self._make_request('GET', 'runs', params=params) + params["name"] = task_name + + response = self._make_request("GET", "runs", params=params) tasks = [] - - for run in response.get('runs', []): + + for run in response.get("runs", []): task = WESTask( - run_id=run.get('run_id'), - name=run.get('name', ''), - state=self.STATE_MAP.get(run.get('state', 'UNKNOWN'), 'Unknown') + run_id=run.get("run_id"), + name=run.get("name", ""), + state=self.STATE_MAP.get(run.get("state", "UNKNOWN"), "Unknown"), ) tasks.append(task) - + return tasks except Exception as e: self.logger.error(f"Failed to get tasks: {e}") return [] - + def stage_task_output(self, task, project, output_to_export, output_directory_name): - ''' + """ DEPRECATED: Use stage_output_files instead - + Prepare/Copy output files of a task for export. - + Note: WES API doesn't have a direct concept of files, so this is a no-op - ''' + """ self.logger.warning("WES API doesn't support file staging operations") return None - + def submit_task(self, name, project, workflow, parameters, execution_settings=None): - ''' + """ Submit a workflow on the platform - + :param name: Name of the task to submit :param project: Project to submit the task to (not used in WES) :param workflow: Workflow to submit (URL or file path to the workflow) :param parameters: Parameters for the workflow :param execution_settings: {use_spot_instance: True/False} (not used in WES) :return: WESTask object or None - ''' + """ if not workflow: self.logger.error("Workflow is required") return None - + # Prepare the request data workflow_url = workflow workflow_type = "CWL" # Default to CWL workflow_type_version = "v1.0" # Default to v1.0 - + # Check if workflow is a file path or URL if os.path.exists(workflow): # For WES, we need to upload the workflow file workflow_type = "CWL" # Assuming CWL, adjust as needed - with open(workflow, 'rb') as f: + with open(workflow, "rb") as f: workflow_content = f.read() - + files = { - 'workflow_attachment': (os.path.basename(workflow), workflow_content) + "workflow_attachment": (os.path.basename(workflow), workflow_content) } workflow_url = os.path.basename(workflow) else: files = None - + # Prepare the request data data = { - 'workflow_params': json.dumps(parameters), - 'workflow_type': workflow_type, - 'workflow_type_version': workflow_type_version, - 'workflow_url': workflow_url, - 'tags': {'name': name} + "workflow_params": json.dumps(parameters), + "workflow_type": workflow_type, + "workflow_type_version": workflow_type_version, + "workflow_url": workflow_url, + "tags": {"name": name}, } - + try: - response = self._make_request('POST', 'runs', data=data, files=files) - run_id = response.get('run_id') - + response = self._make_request("POST", "runs", data=data, files=files) + run_id = response.get("run_id") + if not run_id: self.logger.error("Failed to submit task: No run_id returned") return None - + # Create a WESTask object - task = WESTask( - run_id=run_id, - name=name, - state='Queued', - inputs=parameters - ) - + task = WESTask(run_id=run_id, name=name, state="Queued", inputs=parameters) + return task except Exception as e: self.logger.error(f"Failed to submit task: {e}") return None - + # Project methods def create_project(self, project_name, project_description, **kwargs): - ''' + """ Create a project - + Note: WES API doesn't have a concept of projects, so this creates a virtual project - + :param project_name: Name of the project :param project_description: Description of the project :param kwargs: Additional arguments for creating a project :return: Project object - ''' + """ project = { - 'id': str(uuid.uuid4()), - 'name': project_name, - 'description': project_description + "id": str(uuid.uuid4()), + "name": project_name, + "description": project_description, } self.projects[project_name] = project return project - + def delete_project_by_name(self, project_name): - ''' + """ Delete a project on the platform - + Note: WES API doesn't have a concept of projects, so this removes the virtual project - ''' + """ if project_name in self.projects: del self.projects[project_name] return True return False - + def get_project(self): - ''' - Determine what project we are running in - + """ + Determine what project we are running in + Note: WES API doesn't have a concept of projects, so this returns None - ''' + """ return None - + def get_project_by_name(self, project_name): - ''' - Get a project by its name - + """ + Get a project by its name + Note: WES API doesn't have a concept of projects, so this returns the virtual project - ''' + """ if project_name in self.projects: return self.projects[project_name] - + # Create a new project if it doesn't exist return self.create_project(project_name, f"Virtual project for {project_name}") - + def get_project_by_id(self, project_id): - ''' - Get a project by its id - + """ + Get a project by its id + Note: WES API doesn't have a concept of projects, so this returns the virtual project - ''' + """ for project in self.projects.values(): - if project['id'] == project_id: + if project["id"] == project_id: return project return None - + def get_project_cost(self, project): - ''' - Return project cost - + """ + Return project cost + Note: WES API doesn't provide cost information, so this returns None - ''' + """ return None - + def get_project_users(self, project): - ''' - Return a list of user objects associated with a project - + """ + Return a list of user objects associated with a project + Note: WES API doesn't have a concept of projects, so this returns an empty list - ''' + """ return [] - + def get_projects(self): - ''' - Get list of all projects - + """ + Get list of all projects + Note: WES API doesn't have a concept of projects, so this returns the virtual projects - ''' + """ return list(self.projects.values()) - + # User methods def add_user_to_project(self, platform_user, project, permission): """ Add a user to a project on the platform - + Note: WES API doesn't have a concept of projects, so this is a no-op """ self.logger.warning("WES API doesn't support user management") return None - + def get_user(self, user): """ Get a user object from their (platform) user id or email address - + Note: WES API doesn't have a concept of users, so this returns None """ return None - + @classmethod def detect(cls): - ''' - Detect platform we are running on - + """ + Detect platform we are running on + Note: This method checks if we're running in a WES environment - ''' + """ # Check if WES API endpoint is set in environment variables - wes_api_endpoint = os.environ.get('WES_API_ENDPOINT') + wes_api_endpoint = os.environ.get("WES_API_ENDPOINT") if wes_api_endpoint: return True - return False \ No newline at end of file + return False From f3d01a0ea036779740ce7f0ec1dc700349c3a588 Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Mon, 19 May 2025 20:49:07 +0000 Subject: [PATCH 08/38] Update README and fix class name --- docs/wes_platform.md | 38 ++++++++----------- src/cwl_platform/__init__.py | 4 +- .../{wes_platform.py => ga4ghwes_platform.py} | 2 +- 3 files changed, 19 insertions(+), 25 deletions(-) rename src/cwl_platform/{wes_platform.py => ga4ghwes_platform.py} (99%) diff --git a/docs/wes_platform.md b/docs/wes_platform.md index fae8f17d..c3641834 100644 --- a/docs/wes_platform.md +++ b/docs/wes_platform.md @@ -4,9 +4,10 @@ This document describes the GA4GH Workflow Execution Service (WES) platform impl ## Overview -The GA4GH WES API provides a standard way to submit and manage workflows across different workflow execution systems. This implementation allows you to use the PAML library to submit workflows to any WES-compatible service. +The GA4GH WES API provides a standard way to submit and manage workflows across different workflow execution systems. +This implementation allows you to use the PAML library to submit workflows to any WES-compatible service. -The WES platform implementation (`WESPlatform`) inherits from the `Platform` abstract base class and implements all the required methods to interact with a WES API endpoint. +The WES platform implementation (`GA4GHWESPlatform`) inherits from the `Platform` abstract base class and implements all the required methods to interact with a WES API endpoint. ## Features @@ -34,7 +35,7 @@ from src.cwl_platform import PlatformFactory factory = PlatformFactory() # Get the WES platform -platform = factory.get_platform('WES') +platform = factory.get_platform('GA4GHWESPlatform') # Connect to the WES API platform.connect( @@ -46,14 +47,18 @@ platform.connect( ### Submitting a Workflow ```python + # Create a virtual project (not used by WES but required by the API) -project = platform.create_project('wes-example', 'WES Example Project') +project = { + project_name = 'GA4GH WES Example Project' +} +#project = platform.create_project('wes-example', 'WES Example Project') # Define workflow parameters -parameters = { +workflow_parameters = { "input_file": { "class": "File", - "path": "https://example.com/input.txt" + "path": "platform-specific id" }, "output_filename": "output.txt" } @@ -62,8 +67,11 @@ parameters = { task = platform.submit_task( name="My Workflow", project=project, - workflow="https://example.com/workflow.cwl", # URL or local file path - parameters=parameters + workflow="platform:// Date: Mon, 19 May 2025 20:50:20 +0000 Subject: [PATCH 09/38] Fix platform name --- src/cwl_platform/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cwl_platform/__init__.py b/src/cwl_platform/__init__.py index 502911bd..7b02249a 100644 --- a/src/cwl_platform/__init__.py +++ b/src/cwl_platform/__init__.py @@ -14,7 +14,7 @@ 'Arvados': ArvadosPlatform, # 'Omics': OmicsPlatform, 'SevenBridges': SevenBridgesPlatform, - 'WES': GA4GHWESPlatform + 'GA4GHWES': GA4GHWESPlatform } class PlatformFactory(): From a3013b58eeb25e9a6a2617e92b1ab31fff4ee387 Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Mon, 19 May 2025 20:53:16 +0000 Subject: [PATCH 10/38] Add workflow_engine parameter --- src/cwl_platform/ga4ghwes_platform.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/cwl_platform/ga4ghwes_platform.py b/src/cwl_platform/ga4ghwes_platform.py index 09e61827..3294f030 100644 --- a/src/cwl_platform/ga4ghwes_platform.py +++ b/src/cwl_platform/ga4ghwes_platform.py @@ -497,6 +497,7 @@ def submit_task(self, name, project, workflow, parameters, execution_settings=No "workflow_type": workflow_type, "workflow_type_version": workflow_type_version, "workflow_url": workflow_url, + "workflow_engine": execution_settings.get("workflow_engine"), "tags": {"name": name}, } From 41273afe59075b5624d354908f8284840be1e5ce Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Mon, 19 May 2025 21:03:07 +0000 Subject: [PATCH 11/38] Rename GA4GHWESPlatform to NGS360Platform to better reflect what is happening --- src/cwl_platform/__init__.py | 4 ++-- src/cwl_platform/{ga4ghwes_platform.py => ngs360_platform.py} | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) rename src/cwl_platform/{ga4ghwes_platform.py => ngs360_platform.py} (99%) diff --git a/src/cwl_platform/__init__.py b/src/cwl_platform/__init__.py index 7b02249a..ea8de73f 100644 --- a/src/cwl_platform/__init__.py +++ b/src/cwl_platform/__init__.py @@ -6,7 +6,7 @@ from .arvados_platform import ArvadosPlatform from .sevenbridges_platform import SevenBridgesPlatform -from .ga4ghwes_platform import GA4GHWESPlatform +from .ngs360_platform import NGS360Platform #from .omics_platform import OmicsPlatform # Move this for a config file @@ -14,7 +14,7 @@ 'Arvados': ArvadosPlatform, # 'Omics': OmicsPlatform, 'SevenBridges': SevenBridgesPlatform, - 'GA4GHWES': GA4GHWESPlatform + 'NGS360': NGS360Platform } class PlatformFactory(): diff --git a/src/cwl_platform/ga4ghwes_platform.py b/src/cwl_platform/ngs360_platform.py similarity index 99% rename from src/cwl_platform/ga4ghwes_platform.py rename to src/cwl_platform/ngs360_platform.py index 3294f030..15271ccc 100644 --- a/src/cwl_platform/ga4ghwes_platform.py +++ b/src/cwl_platform/ngs360_platform.py @@ -1,5 +1,5 @@ """ -GA4GH WES Platform class +NGS360 / GA4GH WES Platform class This module implements the Platform abstract base class using the GA4GH Workflow Execution Service (WES) API. The WES API provides a standard way to submit and manage workflows across different workflow execution systems. @@ -50,7 +50,7 @@ def from_dict(cls, task_dict): ) -class GA4GHWESPlatform(Platform): +class NGS360Platform(Platform): """GA4GH WES Platform class""" # WES API state mapping to Platform states From c03fcceaeb30dd69e99e4da9d018a3e0e081b1ed Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Thu, 9 Oct 2025 15:39:53 -0400 Subject: [PATCH 12/38] Read the endpoint & token from env vars --- src/cwl_platform/ngs360_platform.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/cwl_platform/ngs360_platform.py b/src/cwl_platform/ngs360_platform.py index 15271ccc..c48d1ae7 100644 --- a/src/cwl_platform/ngs360_platform.py +++ b/src/cwl_platform/ngs360_platform.py @@ -87,8 +87,8 @@ def connect(self, **kwargs): - api_endpoint: WES API endpoint URL - auth_token: Authentication token for the WES API """ - self.api_endpoint = kwargs.get("api_endpoint") - self.auth_token = kwargs.get("auth_token") + self.api_endpoint = kwargs.get("api_endpoint", os.environ.get("WES_API_ENDPOINT")) + self.auth_token = kwargs.get("auth_token", os.environ.get("WES_AUTH_TOKEN")) if not self.api_endpoint: raise ValueError("WES API endpoint URL is required") From 87b183776087978fb657d465ccae32b7b1954df8 Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Thu, 9 Oct 2025 16:11:12 -0400 Subject: [PATCH 13/38] Encode username/password as a token --- src/cwl_platform/ngs360_platform.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/cwl_platform/ngs360_platform.py b/src/cwl_platform/ngs360_platform.py index c48d1ae7..0ec13585 100644 --- a/src/cwl_platform/ngs360_platform.py +++ b/src/cwl_platform/ngs360_platform.py @@ -89,6 +89,15 @@ def connect(self, **kwargs): """ self.api_endpoint = kwargs.get("api_endpoint", os.environ.get("WES_API_ENDPOINT")) self.auth_token = kwargs.get("auth_token", os.environ.get("WES_AUTH_TOKEN")) + if not self.auth_token: + self.username = os.environ.get("USERNAME") + self.password = os.environ.get("PASSWORD") + if self.username and self.password: + # Basic auth token + import base64 + + token = f"{self.username}:{self.password}" + self.auth_token = base64.b64encode(token.encode()).decode() if not self.api_endpoint: raise ValueError("WES API endpoint URL is required") From 64950d574f4a0e82c725e7517725fd74a1c7ad05 Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Fri, 10 Oct 2025 16:46:48 +0000 Subject: [PATCH 14/38] Rename env vars for WES_USERNAME/WES_PASSWORD --- src/cwl_platform/ngs360_platform.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/cwl_platform/ngs360_platform.py b/src/cwl_platform/ngs360_platform.py index 0ec13585..306e92c0 100644 --- a/src/cwl_platform/ngs360_platform.py +++ b/src/cwl_platform/ngs360_platform.py @@ -90,8 +90,8 @@ def connect(self, **kwargs): self.api_endpoint = kwargs.get("api_endpoint", os.environ.get("WES_API_ENDPOINT")) self.auth_token = kwargs.get("auth_token", os.environ.get("WES_AUTH_TOKEN")) if not self.auth_token: - self.username = os.environ.get("USERNAME") - self.password = os.environ.get("PASSWORD") + self.username = os.environ.get("WES_USERNAME") + self.password = os.environ.get("WES_PASSWORD") if self.username and self.password: # Basic auth token import base64 From fcf3c807490f6bcdae3fd80816634e79c6e40956 Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Fri, 10 Oct 2025 17:09:39 +0000 Subject: [PATCH 15/38] Add username,password auth support --- src/cwl_platform/ngs360_platform.py | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/src/cwl_platform/ngs360_platform.py b/src/cwl_platform/ngs360_platform.py index 306e92c0..664ee989 100644 --- a/src/cwl_platform/ngs360_platform.py +++ b/src/cwl_platform/ngs360_platform.py @@ -88,20 +88,16 @@ def connect(self, **kwargs): - auth_token: Authentication token for the WES API """ self.api_endpoint = kwargs.get("api_endpoint", os.environ.get("WES_API_ENDPOINT")) - self.auth_token = kwargs.get("auth_token", os.environ.get("WES_AUTH_TOKEN")) - if not self.auth_token: - self.username = os.environ.get("WES_USERNAME") - self.password = os.environ.get("WES_PASSWORD") - if self.username and self.password: - # Basic auth token - import base64 - - token = f"{self.username}:{self.password}" - self.auth_token = base64.b64encode(token.encode()).decode() - if not self.api_endpoint: raise ValueError("WES API endpoint URL is required") + # Set up auth token or username/password as auth + self.auth_token = kwargs.get("auth_token", os.environ.get("WES_AUTH_TOKEN")) + if not self.auth_token: + username = os.environ.get("WES_USERNAME") + password = os.environ.get("WES_PASSWORD") + self.auth = (username, password) + # Test connection by getting service info try: response = self._make_request("GET", "service-info") @@ -140,6 +136,7 @@ def _make_request(self, method, path, data=None, files=None, params=None): if self.auth_token: headers["Authorization"] = f"Bearer {self.auth_token}" + if self.auth: response = requests.request( method=method, @@ -148,6 +145,7 @@ def _make_request(self, method, path, data=None, files=None, params=None): json=data, files=files, params=params, + auth=self.auth ) response.raise_for_status() From 599cd2a1ca5111b2b9f029c03ca9774b258d6913 Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Fri, 10 Oct 2025 17:12:33 +0000 Subject: [PATCH 16/38] Remove left over code --- src/cwl_platform/ngs360_platform.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/cwl_platform/ngs360_platform.py b/src/cwl_platform/ngs360_platform.py index 664ee989..c78bcbe2 100644 --- a/src/cwl_platform/ngs360_platform.py +++ b/src/cwl_platform/ngs360_platform.py @@ -136,8 +136,7 @@ def _make_request(self, method, path, data=None, files=None, params=None): if self.auth_token: headers["Authorization"] = f"Bearer {self.auth_token}" - if self.auth: - + response = requests.request( method=method, url=url, From e07bd4f82623dab2a1c02528a550f5d88951874b Mon Sep 17 00:00:00 2001 From: cheny252 Date: Wed, 29 Oct 2025 10:50:46 -0400 Subject: [PATCH 17/38] update get_tasks_by_name and submit_task --- src/cwl_platform/ngs360_platform.py | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/src/cwl_platform/ngs360_platform.py b/src/cwl_platform/ngs360_platform.py index c78bcbe2..f219ada6 100644 --- a/src/cwl_platform/ngs360_platform.py +++ b/src/cwl_platform/ngs360_platform.py @@ -136,12 +136,12 @@ def _make_request(self, method, path, data=None, files=None, params=None): if self.auth_token: headers["Authorization"] = f"Bearer {self.auth_token}" - + response = requests.request( method=method, url=url, headers=headers, - json=data, + data=data, files=files, params=params, auth=self.auth @@ -423,7 +423,7 @@ def get_task_output_filename(self, task, output_name): return str(output) - def get_tasks_by_name(self, project, task_name=None): + def get_tasks_by_name(self, project, task_name=None,inputs_to_compare=None,tasks=None): """ Get all processes/tasks in a project with a specified name @@ -432,10 +432,12 @@ def get_tasks_by_name(self, project, task_name=None): :return: List of tasks """ try: - params = {} + tags = {"Project": project["name"]} if task_name: - params["name"] = task_name - + tags["Name"] = task_name + params = { + "tags": json.dumps(tags) + } response = self._make_request("GET", "runs", params=params) tasks = [] @@ -504,7 +506,10 @@ def submit_task(self, name, project, workflow, parameters, execution_settings=No "workflow_type_version": workflow_type_version, "workflow_url": workflow_url, "workflow_engine": execution_settings.get("workflow_engine"), - "tags": {"name": name}, + "tags": json.dumps({ + "Name": name, + "Project": project["name"] + }), } try: @@ -583,7 +588,7 @@ def get_project_by_id(self, project_id): for project in self.projects.values(): if project["id"] == project_id: return project - return None + return self.create_project(project_id, f"Virtual project for {project_id}") def get_project_cost(self, project): """ From bb1a371b2521fbf65ebf86fb5b7e492c1f9ab5d9 Mon Sep 17 00:00:00 2001 From: cheny252 Date: Tue, 18 Nov 2025 11:17:33 -0500 Subject: [PATCH 18/38] update folder id --- src/cwl_platform/ngs360_platform.py | 38 ++++++++++++++++++----------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/src/cwl_platform/ngs360_platform.py b/src/cwl_platform/ngs360_platform.py index f219ada6..71d98e2e 100644 --- a/src/cwl_platform/ngs360_platform.py +++ b/src/cwl_platform/ngs360_platform.py @@ -230,8 +230,7 @@ def get_folder_id(self, project, folder_path): Note: WES API doesn't have a direct concept of folders, so this returns None """ - self.logger.warning("WES API doesn't support folder operations") - return None + return folder_path def rename_file(self, fileid, new_filename): """ @@ -385,10 +384,16 @@ def get_task_output(self, task, output_name): :param output_name: Name of the output field :return: Value of the output field or None """ - if not task or not hasattr(task, "outputs"): + if not task or not hasattr(task, "run_id"): + return None + try: + response = self._make_request("GET", f"runs/{task.run_id}") + task.outputs = response.get("outputs", {}) + task.output_mapping = task.outputs.get("output_mapping",{}) + return task.output_mapping.get(output_name) + except Exception as e: + self.logger.error(f"Failed to get task output: {e}") return None - - return task.outputs.get(output_name) def get_task_outputs(self, task): """ @@ -397,10 +402,17 @@ def get_task_outputs(self, task): :param task: WESTask object :return: Dictionary of output fields """ - if not task or not hasattr(task, "outputs"): - return {} + if not task or not hasattr(task, "run_id"): + return None + try: + response = self._make_request("GET", f"runs/{task.run_id}") + task.outputs = response.get("outputs", {}) + task.output_mapping = task.outputs.get("output_mapping",{}) + except Exception as e: + self.logger.error(f"Failed to get task outputs: {e}") + return [] - return task.outputs + return list(task.output_mapping.keys()) def get_task_output_filename(self, task, output_name): """ @@ -416,12 +428,10 @@ def get_task_output_filename(self, task, output_name): return None # If output is a URL, extract the filename - if isinstance(output, str) and ( - output.startswith("http") or output.startswith("file:") - ): - return os.path.basename(output) - - return str(output) + if isinstance(output, list): + return [output_path.split('/')[-1] for output_path in output] + else: + return output.split('/')[-1] def get_tasks_by_name(self, project, task_name=None,inputs_to_compare=None,tasks=None): """ From d34067394ed008e97f2b3e3ab3f3919e556818fa Mon Sep 17 00:00:00 2001 From: cheny252 Date: Fri, 12 Dec 2025 10:38:59 -0500 Subject: [PATCH 19/38] add cache id --- src/cwl_platform/ngs360_platform.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/cwl_platform/ngs360_platform.py b/src/cwl_platform/ngs360_platform.py index 71d98e2e..7fb8ceaf 100644 --- a/src/cwl_platform/ngs360_platform.py +++ b/src/cwl_platform/ngs360_platform.py @@ -510,12 +510,18 @@ def submit_task(self, name, project, workflow, parameters, execution_settings=No files = None # Prepare the request data + workflow_engine_params = {} + workflow_engine_params["name"] = name + if execution_settings and "cacheId" in execution_settings: + workflow_engine_params["cacheId"] = execution_settings["cacheId"] + data = { "workflow_params": json.dumps(parameters), "workflow_type": workflow_type, "workflow_type_version": workflow_type_version, "workflow_url": workflow_url, "workflow_engine": execution_settings.get("workflow_engine"), + "workflow_engine_parameters": json.dumps(workflow_engine_params), "tags": json.dumps({ "Name": name, "Project": project["name"] From 8688892e53686f97f1b397d67bd1e3d6a278f1d8 Mon Sep 17 00:00:00 2001 From: cheny252 Date: Mon, 15 Dec 2025 12:59:59 -0500 Subject: [PATCH 20/38] fix unit test --- src/cwl_platform/ngs360_platform.py | 4 +- tests/test_wes_platform.py | 90 ++++++++++++++++++++++------- 2 files changed, 70 insertions(+), 24 deletions(-) diff --git a/src/cwl_platform/ngs360_platform.py b/src/cwl_platform/ngs360_platform.py index 7fb8ceaf..8ac1166d 100644 --- a/src/cwl_platform/ngs360_platform.py +++ b/src/cwl_platform/ngs360_platform.py @@ -75,6 +75,7 @@ def __init__(self, name): self.logger = logging.getLogger(__name__) self.api_endpoint = None self.auth_token = None + self.auth = None self.projects = {} # Map project names to project objects self.workflows = {} # Map workflow names to workflow objects self.files = {} # Map file paths to file objects @@ -520,14 +521,13 @@ def submit_task(self, name, project, workflow, parameters, execution_settings=No "workflow_type": workflow_type, "workflow_type_version": workflow_type_version, "workflow_url": workflow_url, - "workflow_engine": execution_settings.get("workflow_engine"), + "workflow_engine": "CWL", "workflow_engine_parameters": json.dumps(workflow_engine_params), "tags": json.dumps({ "Name": name, "Project": project["name"] }), } - try: response = self._make_request("POST", "runs", data=data, files=files) run_id = response.get("run_id") diff --git a/tests/test_wes_platform.py b/tests/test_wes_platform.py index fc358dbc..03aa020b 100644 --- a/tests/test_wes_platform.py +++ b/tests/test_wes_platform.py @@ -5,9 +5,9 @@ import unittest from unittest.mock import patch, MagicMock -from src.cwl_platform.wes_platform import WESPlatform, WESTask +from src.cwl_platform.ngs360_platform import NGS360Platform, WESTask -class TestWESPlatform(unittest.TestCase): +class TestNGS360Platform(unittest.TestCase): ''' Test WES Platform implementation ''' @@ -15,7 +15,7 @@ def setUp(self): ''' Set up test environment ''' - self.platform = WESPlatform('WES') + self.platform = NGS360Platform('WES') self.platform.api_endpoint = 'https://wes.example.com/ga4gh/wes/v1' self.platform.auth_token = 'test_token' self.platform.connected = True @@ -37,9 +37,10 @@ def test_make_request(self, mock_request): method='GET', url='https://wes.example.com/ga4gh/wes/v1/service-info', headers={'Authorization': 'Bearer test_token'}, - json=None, + data=None, files=None, - params=None + params=None, + auth=None ) self.assertEqual(result, {'key': 'value'}) @@ -63,7 +64,7 @@ def test_connect(self, mock_request): mock_request.return_value = mock_response # Test connect - platform = WESPlatform('WES') + platform = NGS360Platform('WES') result = platform.connect(api_endpoint='https://wes.example.com/ga4gh/wes/v1', auth_token='test_token') self.assertTrue(result) self.assertTrue(platform.connected) @@ -87,8 +88,8 @@ def test_submit_task(self, mock_request): task = self.platform.submit_task( name='Test Task', project=project, - workflow='https://example.com/workflow.cwl', - parameters=parameters + workflow='1234567', + parameters=parameters, ) # Verify request @@ -96,15 +97,18 @@ def test_submit_task(self, mock_request): method='POST', url='https://wes.example.com/ga4gh/wes/v1/runs', headers={'Authorization': 'Bearer test_token'}, - json={ + data={ 'workflow_params': json.dumps(parameters), 'workflow_type': 'CWL', 'workflow_type_version': 'v1.0', - 'workflow_url': 'https://example.com/workflow.cwl', - 'tags': {'name': 'Test Task'} + 'workflow_engine': 'CWL', + 'workflow_engine_parameters': '{"name": "Test Task"}', + 'workflow_url': '1234567', + 'tags': '{"Name": "Test Task", "Project": "Test Project"}' }, files=None, - params=None + params=None, + auth=None ) # Verify task @@ -147,17 +151,37 @@ def test_get_task_state(self, mock_request): method='GET', url='https://wes.example.com/ga4gh/wes/v1/runs/test_run_id', headers={'Authorization': 'Bearer test_token'}, - json=None, + data=None, files=None, - params=None + params=None, + auth=None ) - def test_get_task_output(self): + @patch('requests.request') + def test_get_task_output(self, mock_request): ''' Test get_task_output method ''' + # Mock response + mock_response = MagicMock() + mock_response.content = json.dumps({ + "outputs": { + "output_mapping": { + 'output': 'value' + } + } + }).encode('utf-8') + mock_response.json.return_value = { + "outputs": { + "output_mapping": { + 'output': 'value' + } + } + } + mock_request.return_value = mock_response + # Create task with outputs - task = WESTask('test_run_id', 'Test Task', outputs={'output': 'value'}) + task = WESTask('test_run_id', 'Test Task') # Test get_task_output output = self.platform.get_task_output(task, 'output') @@ -167,16 +191,36 @@ def test_get_task_output(self): output = self.platform.get_task_output(task, 'non_existent') self.assertIsNone(output) - def test_get_task_outputs(self): + @patch('requests.request') + def test_get_task_outputs(self, mock_request): ''' Test get_task_outputs method ''' + # Mock response + mock_response = MagicMock() + mock_response.content = json.dumps({ + "outputs": { + "output_mapping": { + 'output1': 'value1', + 'output2': 'value2' + } + } + }).encode('utf-8') + mock_response.json.return_value = { + "outputs": { + "output_mapping": { + 'output1': 'value1', + 'output2': 'value2' + } + } + } + mock_request.return_value = mock_response # Create task with outputs task = WESTask('test_run_id', 'Test Task', outputs={'output1': 'value1', 'output2': 'value2'}) # Test get_task_outputs outputs = self.platform.get_task_outputs(task) - self.assertEqual(outputs, {'output1': 'value1', 'output2': 'value2'}) + self.assertEqual(outputs, ['output1', 'output2']) @patch('requests.request') def test_get_tasks_by_name(self, mock_request): @@ -224,9 +268,10 @@ def test_get_tasks_by_name(self, mock_request): method='GET', url='https://wes.example.com/ga4gh/wes/v1/runs', headers={'Authorization': 'Bearer test_token'}, - json=None, + data=None, files=None, - params={} + params={'tags': '{"Project": "Test Project"}'}, + auth=None ) # Verify tasks @@ -260,9 +305,10 @@ def test_delete_task(self, mock_request): method='DELETE', url='https://wes.example.com/ga4gh/wes/v1/runs/test_run_id', headers={'Authorization': 'Bearer test_token'}, - json=None, + data=None, files=None, - params=None + params=None, + auth=None ) def test_project_methods(self): From 832ea5817cf04f28560f1361499bd16176400bc4 Mon Sep 17 00:00:00 2001 From: cheny252 Date: Mon, 15 Dec 2025 13:12:18 -0500 Subject: [PATCH 21/38] fix pylint issues --- src/cwl_platform/ngs360_platform.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/cwl_platform/ngs360_platform.py b/src/cwl_platform/ngs360_platform.py index 8ac1166d..3b528b11 100644 --- a/src/cwl_platform/ngs360_platform.py +++ b/src/cwl_platform/ngs360_platform.py @@ -8,10 +8,9 @@ import os import json import logging -import time import uuid -import requests from urllib.parse import urljoin +import requests from .base_platform import Platform @@ -103,7 +102,8 @@ def connect(self, **kwargs): try: response = self._make_request("GET", "service-info") self.logger.info( - f"Connected to WES API: {response.get('workflow_type_versions', {})}" + "Connected to WES API: %s", + response.get('workflow_type_versions', {}) ) self.connected = True return True @@ -189,7 +189,7 @@ def download_file(self, file, dest_folder): return dest_path else: - self.logger.error(f"Unsupported file format: {file}") + self.logger.error("Unsupported file format: %s", file) return None def export_file(self, file, bucket_name, prefix): @@ -322,7 +322,7 @@ def delete_task(self, task): self._make_request("DELETE", f"runs/{task.run_id}") return True except Exception as e: - self.logger.error(f"Failed to delete task: {e}") + self.logger.error("Failed to delete task: %s", e) return False def get_current_task(self): @@ -372,7 +372,7 @@ def get_task_state(self, task, refresh=False): task.state = self.STATE_MAP.get(wes_state, "Unknown") task.outputs = response.get("outputs", {}) except Exception as e: - self.logger.error(f"Failed to refresh task state: {e}") + self.logger.error("Failed to refresh task state: %s", e) return "Unknown" return task.state or "Unknown" @@ -393,7 +393,7 @@ def get_task_output(self, task, output_name): task.output_mapping = task.outputs.get("output_mapping",{}) return task.output_mapping.get(output_name) except Exception as e: - self.logger.error(f"Failed to get task output: {e}") + self.logger.error("Failed to get task output: %s", e) return None def get_task_outputs(self, task): @@ -407,10 +407,10 @@ def get_task_outputs(self, task): return None try: response = self._make_request("GET", f"runs/{task.run_id}") - task.outputs = response.get("outputs", {}) + task.outputs = response.get("outputs", {}) task.output_mapping = task.outputs.get("output_mapping",{}) except Exception as e: - self.logger.error(f"Failed to get task outputs: {e}") + self.logger.error("Failed to get task output: %s", e) return [] return list(task.output_mapping.keys()) @@ -462,7 +462,7 @@ def get_tasks_by_name(self, project, task_name=None,inputs_to_compare=None,tasks return tasks except Exception as e: - self.logger.error(f"Failed to get tasks: {e}") + self.logger.error("Failed to get tasks: %s", e) return [] def stage_task_output(self, task, project, output_to_export, output_directory_name): @@ -541,7 +541,7 @@ def submit_task(self, name, project, workflow, parameters, execution_settings=No return task except Exception as e: - self.logger.error(f"Failed to submit task: {e}") + self.logger.error("Failed to submit task: %s", e) return None # Project methods From a30fff4d5b4ef5e99a8d1bf9fc1290f70c504188 Mon Sep 17 00:00:00 2001 From: cheny252 Date: Mon, 15 Dec 2025 13:25:25 -0500 Subject: [PATCH 22/38] fix pyint issues --- src/cwl_platform/ngs360_platform.py | 52 ++++++++++++++++++----------- 1 file changed, 33 insertions(+), 19 deletions(-) diff --git a/src/cwl_platform/ngs360_platform.py b/src/cwl_platform/ngs360_platform.py index 3b528b11..aaf8ae01 100644 --- a/src/cwl_platform/ngs360_platform.py +++ b/src/cwl_platform/ngs360_platform.py @@ -107,8 +107,12 @@ def connect(self, **kwargs): ) self.connected = True return True - except Exception as e: - self.logger.error(f"Failed to connect to WES API: {e}") + except requests.RequestException as e: + self.logger.error("Failed to connect to WES API: %s", e) + self.connected = False + return False + except (ValueError, KeyError) as e: + self.logger.error("Invalid response from WES API: %s", e) self.connected = False return False @@ -145,13 +149,15 @@ def _make_request(self, method, path, data=None, files=None, params=None): data=data, files=files, params=params, - auth=self.auth + auth=self.auth, + timeout=120 ) response.raise_for_status() if response.content: return response.json() + return {} # File methods @@ -162,7 +168,6 @@ def copy_folder(self, source_project, source_folder, destination_project): Note: WES API doesn't have a direct concept of folders, so this is a no-op """ self.logger.warning("WES API doesn't support folder operations directly") - return None def download_file(self, file, dest_folder): """ @@ -202,7 +207,6 @@ def export_file(self, file, bucket_name, prefix): :return: s3 file path or None """ self.logger.warning("WES API doesn't support direct S3 export") - return None def get_file_id(self, project, file_path): """ @@ -240,7 +244,6 @@ def rename_file(self, fileid, new_filename): Note: WES API doesn't have a direct concept of files, so this is a no-op """ self.logger.warning("WES API doesn't support file operations") - return None def roll_file(self, project, file_name): """ @@ -249,7 +252,6 @@ def roll_file(self, project, file_name): Note: WES API doesn't have a direct concept of files, so this is a no-op """ self.logger.warning("WES API doesn't support file operations") - return None def stage_output_files(self, project, output_files): """ @@ -258,7 +260,6 @@ def stage_output_files(self, project, output_files): Note: WES API doesn't have a direct concept of files, so this is a no-op """ self.logger.warning("WES API doesn't support file staging operations") - return None def upload_file( self, @@ -321,7 +322,7 @@ def delete_task(self, task): try: self._make_request("DELETE", f"runs/{task.run_id}") return True - except Exception as e: + except requests.RequestException as e: self.logger.error("Failed to delete task: %s", e) return False @@ -371,9 +372,12 @@ def get_task_state(self, task, refresh=False): wes_state = response.get("state", "UNKNOWN") task.state = self.STATE_MAP.get(wes_state, "Unknown") task.outputs = response.get("outputs", {}) - except Exception as e: + except requests.RequestException as e: self.logger.error("Failed to refresh task state: %s", e) return "Unknown" + except (ValueError, KeyError) as e: + self.logger.error("Invalid response format for task state: %s", e) + return "Unknown" return task.state or "Unknown" @@ -392,9 +396,12 @@ def get_task_output(self, task, output_name): task.outputs = response.get("outputs", {}) task.output_mapping = task.outputs.get("output_mapping",{}) return task.output_mapping.get(output_name) - except Exception as e: + except requests.RequestException as e: self.logger.error("Failed to get task output: %s", e) return None + except (ValueError, KeyError, AttributeError) as e: + self.logger.error("Invalid response format for task output: %s", e) + return None def get_task_outputs(self, task): """ @@ -409,8 +416,11 @@ def get_task_outputs(self, task): response = self._make_request("GET", f"runs/{task.run_id}") task.outputs = response.get("outputs", {}) task.output_mapping = task.outputs.get("output_mapping",{}) - except Exception as e: - self.logger.error("Failed to get task output: %s", e) + except requests.RequestException as e: + self.logger.error("Failed to get task outputs: %s", e) + return [] + except (ValueError, KeyError, AttributeError) as e: + self.logger.error("Invalid response format for task outputs: %s", e) return [] return list(task.output_mapping.keys()) @@ -431,8 +441,8 @@ def get_task_output_filename(self, task, output_name): # If output is a URL, extract the filename if isinstance(output, list): return [output_path.split('/')[-1] for output_path in output] - else: - return output.split('/')[-1] + + return output.split('/')[-1] def get_tasks_by_name(self, project, task_name=None,inputs_to_compare=None,tasks=None): """ @@ -461,9 +471,12 @@ def get_tasks_by_name(self, project, task_name=None,inputs_to_compare=None,tasks tasks.append(task) return tasks - except Exception as e: + except requests.RequestException as e: self.logger.error("Failed to get tasks: %s", e) return [] + except (ValueError, KeyError, TypeError) as e: + self.logger.error("Invalid response format for tasks: %s", e) + return [] def stage_task_output(self, task, project, output_to_export, output_directory_name): """ @@ -474,7 +487,6 @@ def stage_task_output(self, task, project, output_to_export, output_directory_na Note: WES API doesn't have a direct concept of files, so this is a no-op """ self.logger.warning("WES API doesn't support file staging operations") - return None def submit_task(self, name, project, workflow, parameters, execution_settings=None): """ @@ -540,9 +552,12 @@ def submit_task(self, name, project, workflow, parameters, execution_settings=No task = WESTask(run_id=run_id, name=name, state="Queued", inputs=parameters) return task - except Exception as e: + except requests.RequestException as e: self.logger.error("Failed to submit task: %s", e) return None + except (ValueError, KeyError, IOError) as e: + self.logger.error("Failed to prepare or parse task submission: %s", e) + return None # Project methods def create_project(self, project_name, project_description, **kwargs): @@ -638,7 +653,6 @@ def add_user_to_project(self, platform_user, project, permission): Note: WES API doesn't have a concept of projects, so this is a no-op """ self.logger.warning("WES API doesn't support user management") - return None def get_user(self, user): """ From 0811b2eb8db4253d434d2194a632ffeb9b48ecba Mon Sep 17 00:00:00 2001 From: cheny252 Date: Mon, 15 Dec 2025 13:28:41 -0500 Subject: [PATCH 23/38] fix unit test --- tests/test_wes_platform.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/test_wes_platform.py b/tests/test_wes_platform.py index 03aa020b..4630991c 100644 --- a/tests/test_wes_platform.py +++ b/tests/test_wes_platform.py @@ -40,6 +40,7 @@ def test_make_request(self, mock_request): data=None, files=None, params=None, + timeout=120, auth=None ) self.assertEqual(result, {'key': 'value'}) @@ -108,6 +109,7 @@ def test_submit_task(self, mock_request): }, files=None, params=None, + timeout=120, auth=None ) @@ -154,6 +156,7 @@ def test_get_task_state(self, mock_request): data=None, files=None, params=None, + timeout=120, auth=None ) @@ -271,6 +274,7 @@ def test_get_tasks_by_name(self, mock_request): data=None, files=None, params={'tags': '{"Project": "Test Project"}'}, + timeout=120, auth=None ) @@ -308,6 +312,7 @@ def test_delete_task(self, mock_request): data=None, files=None, params=None, + timeout=120, auth=None ) From 7e13a7a6dc56b739ec1f62f55f6e313eb8fa8d91 Mon Sep 17 00:00:00 2001 From: cheny252 Date: Mon, 15 Dec 2025 13:30:21 -0500 Subject: [PATCH 24/38] fix pylint --- src/cwl_platform/ngs360_platform.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/cwl_platform/ngs360_platform.py b/src/cwl_platform/ngs360_platform.py index aaf8ae01..4161a171 100644 --- a/src/cwl_platform/ngs360_platform.py +++ b/src/cwl_platform/ngs360_platform.py @@ -185,7 +185,7 @@ def download_file(self, file, dest_folder): filename = os.path.basename(file) dest_path = os.path.join(dest_folder, filename) - response = requests.get(file, stream=True) + response = requests.get(file, stream=True, timeout=120) response.raise_for_status() with open(dest_path, "wb") as f: @@ -193,9 +193,9 @@ def download_file(self, file, dest_folder): f.write(chunk) return dest_path - else: - self.logger.error("Unsupported file format: %s", file) - return None + + self.logger.error("Unsupported file format: %s", file) + return None def export_file(self, file, bucket_name, prefix): """ From ce901b04c41063452cf23441da74fbfa0dc3a6a3 Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Fri, 2 Jan 2026 18:17:05 -0500 Subject: [PATCH 25/38] Rename test file --- tests/{test_wes_platform.py => test_ngs360-ga4gh_platform.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename tests/{test_wes_platform.py => test_ngs360-ga4gh_platform.py} (100%) diff --git a/tests/test_wes_platform.py b/tests/test_ngs360-ga4gh_platform.py similarity index 100% rename from tests/test_wes_platform.py rename to tests/test_ngs360-ga4gh_platform.py From 1e7f02ebdf99998921a6840d8a4250bf65c3dca5 Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Fri, 2 Jan 2026 18:20:04 -0500 Subject: [PATCH 26/38] Update code comments --- tests/test_ngs360-ga4gh_platform.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/test_ngs360-ga4gh_platform.py b/tests/test_ngs360-ga4gh_platform.py index 4630991c..998b7104 100644 --- a/tests/test_ngs360-ga4gh_platform.py +++ b/tests/test_ngs360-ga4gh_platform.py @@ -83,9 +83,11 @@ def test_submit_task(self, mock_request): mock_response.json.return_value = {'run_id': 'test_run_id'} mock_request.return_value = mock_response - # Test submit_task + # Set up parameters project = {'id': 'test_project', 'name': 'Test Project'} parameters = {'input': 'value'} + + # Test task = self.platform.submit_task( name='Test Task', project=project, From c881e0e4a38d1e89a1c2081af17d5e3dc133cfdf Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Fri, 2 Jan 2026 20:00:21 -0500 Subject: [PATCH 27/38] Update submit_task test for NGS360 GA4GH WES API call --- src/cwl_platform/ngs360_platform.py | 19 ++++++----------- tests/test_ngs360-ga4gh_platform.py | 32 ++++++++++++++--------------- 2 files changed, 21 insertions(+), 30 deletions(-) diff --git a/src/cwl_platform/ngs360_platform.py b/src/cwl_platform/ngs360_platform.py index 4161a171..6dabea4c 100644 --- a/src/cwl_platform/ngs360_platform.py +++ b/src/cwl_platform/ngs360_platform.py @@ -490,13 +490,13 @@ def stage_task_output(self, task, project, output_to_export, output_directory_na def submit_task(self, name, project, workflow, parameters, execution_settings=None): """ - Submit a workflow on the platform + Submit a workflow to the GA4GH WES API :param name: Name of the task to submit - :param project: Project to submit the task to (not used in WES) - :param workflow: Workflow to submit (URL or file path to the workflow) + :param project: Project to submit the task to + :param workflow: Workflow to submit (ID, URL or file path to the workflow) :param parameters: Parameters for the workflow - :param execution_settings: {use_spot_instance: True/False} (not used in WES) + :param execution_settings: Not used in NGS360 WES API (yet) :return: WESTask object or None """ if not workflow: @@ -523,21 +523,14 @@ def submit_task(self, name, project, workflow, parameters, execution_settings=No files = None # Prepare the request data - workflow_engine_params = {} - workflow_engine_params["name"] = name - if execution_settings and "cacheId" in execution_settings: - workflow_engine_params["cacheId"] = execution_settings["cacheId"] - data = { "workflow_params": json.dumps(parameters), "workflow_type": workflow_type, "workflow_type_version": workflow_type_version, "workflow_url": workflow_url, - "workflow_engine": "CWL", - "workflow_engine_parameters": json.dumps(workflow_engine_params), "tags": json.dumps({ - "Name": name, - "Project": project["name"] + "ProjectName": project["name"], + "TaskName": name, }), } try: diff --git a/tests/test_ngs360-ga4gh_platform.py b/tests/test_ngs360-ga4gh_platform.py index 998b7104..6c2dd083 100644 --- a/tests/test_ngs360-ga4gh_platform.py +++ b/tests/test_ngs360-ga4gh_platform.py @@ -75,39 +75,37 @@ def test_connect(self, mock_request): @patch('requests.request') def test_submit_task(self, mock_request): ''' - Test submit_task method + Test submit_task method calls the GA4GH API correctly ''' - # Mock response + # Set up parameters + workflow_url = 'workflow_id' + workflow_parameters = {'input': 'value'} + + # Mock the GA4GH response for submit_task mock_response = MagicMock() - mock_response.content = json.dumps({'run_id': 'test_run_id'}).encode('utf-8') mock_response.json.return_value = {'run_id': 'test_run_id'} mock_request.return_value = mock_response - # Set up parameters - project = {'id': 'test_project', 'name': 'Test Project'} - parameters = {'input': 'value'} - # Test task = self.platform.submit_task( name='Test Task', - project=project, - workflow='1234567', - parameters=parameters, + project={'id': "P-1234567", 'name': 'Test Project'}, + workflow=workflow_url, + parameters=workflow_parameters, + execution_settings={"use_spot_instance": False} ) - # Verify request + # Verify GA4GH API request withing submit_task was made correctly mock_request.assert_called_with( method='POST', url='https://wes.example.com/ga4gh/wes/v1/runs', headers={'Authorization': 'Bearer test_token'}, data={ - 'workflow_params': json.dumps(parameters), + 'workflow_params': json.dumps(workflow_parameters), 'workflow_type': 'CWL', 'workflow_type_version': 'v1.0', - 'workflow_engine': 'CWL', - 'workflow_engine_parameters': '{"name": "Test Task"}', - 'workflow_url': '1234567', - 'tags': '{"Name": "Test Task", "Project": "Test Project"}' + 'workflow_url': workflow_url, + 'tags': '{"ProjectName": "Test Project", "TaskName": "Test Task"}' }, files=None, params=None, @@ -120,7 +118,7 @@ def test_submit_task(self, mock_request): self.assertEqual(task.run_id, 'test_run_id') self.assertEqual(task.name, 'Test Task') self.assertEqual(task.state, 'Queued') - self.assertEqual(task.inputs, parameters) + self.assertEqual(task.inputs, workflow_parameters) @patch('requests.request') def test_get_task_state(self, mock_request): From cd91753ebbea41dfc6680b043351f05d2d421b66 Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Fri, 2 Jan 2026 20:08:02 -0500 Subject: [PATCH 28/38] Resolve too many arguments and too many positional arguments in WESTask __init_ --- src/cwl_platform/ngs360_platform.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/cwl_platform/ngs360_platform.py b/src/cwl_platform/ngs360_platform.py index 6dabea4c..76b53f34 100644 --- a/src/cwl_platform/ngs360_platform.py +++ b/src/cwl_platform/ngs360_platform.py @@ -20,7 +20,8 @@ class WESTask: WES Task class to encapsulate task functionality """ - def __init__(self, run_id, name, state=None, outputs=None, inputs=None): + # * separator forces optional parameters to be keyword-only: + def __init__(self, run_id, name, *, state=None, outputs=None, inputs=None): self.run_id = run_id self.name = name self.state = state @@ -43,9 +44,9 @@ def from_dict(cls, task_dict): return cls( task_dict["run_id"], task_dict["name"], - task_dict.get("state"), - task_dict.get("outputs"), - task_dict.get("inputs"), + state=task_dict.get("state"), + outputs=task_dict.get("outputs"), + inputs=task_dict.get("inputs"), ) From 06e50eee059d33a5b7ca1239123a3bbabb2f3d59 Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Fri, 2 Jan 2026 20:11:25 -0500 Subject: [PATCH 29/38] Resolve too many arguments in WESTask __init__ --- src/cwl_platform/ngs360_platform.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/cwl_platform/ngs360_platform.py b/src/cwl_platform/ngs360_platform.py index 76b53f34..8cd574b5 100644 --- a/src/cwl_platform/ngs360_platform.py +++ b/src/cwl_platform/ngs360_platform.py @@ -20,13 +20,12 @@ class WESTask: WES Task class to encapsulate task functionality """ - # * separator forces optional parameters to be keyword-only: - def __init__(self, run_id, name, *, state=None, outputs=None, inputs=None): + def __init__(self, run_id, name, **kwargs): self.run_id = run_id self.name = name - self.state = state - self.outputs = outputs or {} - self.inputs = inputs or {} + self.state = kwargs.get("state") + self.outputs = kwargs.get("outputs") or {} + self.inputs = kwargs.get("inputs") or {} def to_dict(self): """Convert to dictionary""" From 285b99272323696f3a2cfd53e858dcdb17ef6d95 Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Fri, 2 Jan 2026 20:17:55 -0500 Subject: [PATCH 30/38] Reduice instance attribute count by combining auth and auth_token into _auth_config --- src/cwl_platform/ngs360_platform.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/src/cwl_platform/ngs360_platform.py b/src/cwl_platform/ngs360_platform.py index 8cd574b5..957ccfc4 100644 --- a/src/cwl_platform/ngs360_platform.py +++ b/src/cwl_platform/ngs360_platform.py @@ -73,8 +73,7 @@ def __init__(self, name): super().__init__(name) self.logger = logging.getLogger(__name__) self.api_endpoint = None - self.auth_token = None - self.auth = None + self._auth_config = {} self.projects = {} # Map project names to project objects self.workflows = {} # Map workflow names to workflow objects self.files = {} # Map file paths to file objects @@ -92,11 +91,13 @@ def connect(self, **kwargs): raise ValueError("WES API endpoint URL is required") # Set up auth token or username/password as auth - self.auth_token = kwargs.get("auth_token", os.environ.get("WES_AUTH_TOKEN")) - if not self.auth_token: + auth_token = kwargs.get("auth_token", os.environ.get("WES_AUTH_TOKEN")) + if auth_token: + self._auth_config['token'] = auth_token + else: username = os.environ.get("WES_USERNAME") password = os.environ.get("WES_PASSWORD") - self.auth = (username, password) + self._auth_config['credentials'] = (username, password) # Test connection by getting service info try: @@ -138,9 +139,12 @@ def _make_request(self, method, path, data=None, files=None, params=None): url = urljoin(endpoint, path) headers = {} + auth = None - if self.auth_token: - headers["Authorization"] = f"Bearer {self.auth_token}" + if 'token' in self._auth_config: + headers["Authorization"] = f"Bearer {self._auth_config['token']}" + elif 'credentials' in self._auth_config: + auth = self._auth_config['credentials'] response = requests.request( method=method, @@ -149,7 +153,7 @@ def _make_request(self, method, path, data=None, files=None, params=None): data=data, files=files, params=params, - auth=self.auth, + auth=auth, timeout=120 ) From c7c7ccd5dde91e856cc3ccc09cc6f732b061cef4 Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Fri, 2 Jan 2026 20:22:39 -0500 Subject: [PATCH 31/38] Update tests to setup _auth_config correctly --- tests/test_ngs360-ga4gh_platform.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_ngs360-ga4gh_platform.py b/tests/test_ngs360-ga4gh_platform.py index 6c2dd083..1e383d90 100644 --- a/tests/test_ngs360-ga4gh_platform.py +++ b/tests/test_ngs360-ga4gh_platform.py @@ -17,7 +17,7 @@ def setUp(self): ''' self.platform = NGS360Platform('WES') self.platform.api_endpoint = 'https://wes.example.com/ga4gh/wes/v1' - self.platform.auth_token = 'test_token' + self.platform._auth_config['token'] = 'test_token' self.platform.connected = True @patch('requests.request') From 1d7d9e0f600e1869b7f39e415f8282f6d4715cd4 Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Fri, 2 Jan 2026 20:24:42 -0500 Subject: [PATCH 32/38] Fix test_connect method to use _auth_config --- tests/test_ngs360-ga4gh_platform.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_ngs360-ga4gh_platform.py b/tests/test_ngs360-ga4gh_platform.py index 1e383d90..11971564 100644 --- a/tests/test_ngs360-ga4gh_platform.py +++ b/tests/test_ngs360-ga4gh_platform.py @@ -70,7 +70,7 @@ def test_connect(self, mock_request): self.assertTrue(result) self.assertTrue(platform.connected) self.assertEqual(platform.api_endpoint, 'https://wes.example.com/ga4gh/wes/v1') - self.assertEqual(platform.auth_token, 'test_token') + self.assertEqual(platform._auth_config['token'], 'test_token') @patch('requests.request') def test_submit_task(self, mock_request): From eef6c5fd799c2790f249a0a246748b0bec072720 Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Fri, 2 Jan 2026 20:27:14 -0500 Subject: [PATCH 33/38] Reduce arguments to _make_request --- src/cwl_platform/ngs360_platform.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/cwl_platform/ngs360_platform.py b/src/cwl_platform/ngs360_platform.py index 957ccfc4..4672fc17 100644 --- a/src/cwl_platform/ngs360_platform.py +++ b/src/cwl_platform/ngs360_platform.py @@ -117,7 +117,7 @@ def connect(self, **kwargs): self.connected = False return False - def _make_request(self, method, path, data=None, files=None, params=None): + def _make_request(self, method, path, **kwargs): """ Make a request to the WES API @@ -150,9 +150,9 @@ def _make_request(self, method, path, data=None, files=None, params=None): method=method, url=url, headers=headers, - data=data, - files=files, - params=params, + data=kwargs.get('data'), + files=kwargs.get('files'), + params=kwargs.get('params'), auth=auth, timeout=120 ) From 562a7bed57c3e487cecaadb2d70cc11f007b02b5 Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Fri, 2 Jan 2026 20:35:40 -0500 Subject: [PATCH 34/38] Remove id from create_project --- src/cwl_platform/ngs360_platform.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cwl_platform/ngs360_platform.py b/src/cwl_platform/ngs360_platform.py index 4672fc17..4b45c0ce 100644 --- a/src/cwl_platform/ngs360_platform.py +++ b/src/cwl_platform/ngs360_platform.py @@ -569,7 +569,7 @@ def create_project(self, project_name, project_description, **kwargs): :return: Project object """ project = { - "id": str(uuid.uuid4()), + # "id": str(uuid.uuid4()), "name": project_name, "description": project_description, } From 3257b6a6cda7ebe45ed3eaaed9bc30216fdb140c Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Fri, 2 Jan 2026 20:39:08 -0500 Subject: [PATCH 35/38] Use ProjectName instead of Project and TaskName instead of Name in get_tasks_by_name --- src/cwl_platform/ngs360_platform.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/cwl_platform/ngs360_platform.py b/src/cwl_platform/ngs360_platform.py index 4b45c0ce..e17e5b77 100644 --- a/src/cwl_platform/ngs360_platform.py +++ b/src/cwl_platform/ngs360_platform.py @@ -448,7 +448,7 @@ def get_task_output_filename(self, task, output_name): return output.split('/')[-1] - def get_tasks_by_name(self, project, task_name=None,inputs_to_compare=None,tasks=None): + def get_tasks_by_name(self, project, task_name=None, inputs_to_compare=None, tasks=None): """ Get all processes/tasks in a project with a specified name @@ -457,9 +457,9 @@ def get_tasks_by_name(self, project, task_name=None,inputs_to_compare=None,tasks :return: List of tasks """ try: - tags = {"Project": project["name"]} + tags = {"ProjectName": project["name"]} if task_name: - tags["Name"] = task_name + tags["TaskName"] = task_name params = { "tags": json.dumps(tags) } From c8c5114b92685a5d5b4b13912b54551c1fe61038 Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Fri, 2 Jan 2026 20:41:19 -0500 Subject: [PATCH 36/38] Update test get_tasks_by_name --- tests/test_ngs360-ga4gh_platform.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_ngs360-ga4gh_platform.py b/tests/test_ngs360-ga4gh_platform.py index 11971564..936464e8 100644 --- a/tests/test_ngs360-ga4gh_platform.py +++ b/tests/test_ngs360-ga4gh_platform.py @@ -273,7 +273,7 @@ def test_get_tasks_by_name(self, mock_request): headers={'Authorization': 'Bearer test_token'}, data=None, files=None, - params={'tags': '{"Project": "Test Project"}'}, + params={'tags': '{"ProjectName": "Test Project"}'}, timeout=120, auth=None ) From 3f47a5f68d84d2cf0a6bc1fffb0ab6f8be7cdb52 Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Fri, 2 Jan 2026 20:57:15 -0500 Subject: [PATCH 37/38] Remove unused import --- src/cwl_platform/ngs360_platform.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/cwl_platform/ngs360_platform.py b/src/cwl_platform/ngs360_platform.py index e17e5b77..ad93bcfc 100644 --- a/src/cwl_platform/ngs360_platform.py +++ b/src/cwl_platform/ngs360_platform.py @@ -8,7 +8,6 @@ import os import json import logging -import uuid from urllib.parse import urljoin import requests From 8a06f509271cb2e55eadd2a104aedb26a7fea3f7 Mon Sep 17 00:00:00 2001 From: Ryan Golhar Date: Mon, 12 Jan 2026 14:17:52 -0500 Subject: [PATCH 38/38] Update NGS360 Platform to mimick explicity platform behaviour for file uploading --- src/cwl_platform/ngs360_platform.py | 89 +++++++++++++++++++++++------ 1 file changed, 71 insertions(+), 18 deletions(-) diff --git a/src/cwl_platform/ngs360_platform.py b/src/cwl_platform/ngs360_platform.py index ad93bcfc..ec11e18a 100644 --- a/src/cwl_platform/ngs360_platform.py +++ b/src/cwl_platform/ngs360_platform.py @@ -47,9 +47,18 @@ def from_dict(cls, task_dict): inputs=task_dict.get("inputs"), ) +class NGS360Project(): + """ + NGS360 Project class to encapsulate project functionality + """ + + def __init__(self, project_id, name): + self.project_id = project_id + self.name = name + class NGS360Platform(Platform): - """GA4GH WES Platform class""" + """ NGS360 + GA4GH WES Platform class """ # WES API state mapping to Platform states STATE_MAP = { @@ -106,7 +115,7 @@ def connect(self, **kwargs): response.get('workflow_type_versions', {}) ) self.connected = True - return True + # return True except requests.RequestException as e: self.logger.error("Failed to connect to WES API: %s", e) self.connected = False @@ -116,6 +125,22 @@ def connect(self, **kwargs): self.connected = False return False + self.ngs360_endpoint = kwargs.get("ngs360_endpoint", os.environ.get("NGS360_API_ENDPOINT")) + if not self.ngs360_endpoint: + raise ValueError("NGS360 API endpoint URL is required") + try: + response = requests.get( + f"{self.ngs360_endpoint}/", + timeout=30 + ) + response.raise_for_status() + self.logger.info("Connected to NGS360 API") + return True + except requests.RequestException as e: + self.logger.error("Failed to connect to NGS360 API: %s", e) + self.connected = False + return False + def _make_request(self, method, path, **kwargs): """ Make a request to the WES API @@ -284,8 +309,27 @@ def upload_file( :param overwrite: Overwrite the file if it already exists. :return: ID of uploaded file. """ - self.logger.warning("WES API doesn't support file upload operations directly") - return filename + with open(filename, "rb") as f: + response = requests.post( + f"{self.ngs360_endpoint}/api/v1/files", + data={ + "filename": filename, + "entity_type": 'project', + "entity_id": project["project_id"], + "relative_path": dest_folder, + "destination_filename": destination_filename or os.path.basename(filename), + "overwrite": overwrite, + }, + files={"content": f}, + ) + + if response.status_code == 201: + file_info = response.json() + return f"ngs360://{file_info['file_id']}" + else: + self.logger.error(f"Error uploading file: {response.status_code}") + self.logger.error(response.json()) + return None # Task/Workflow methods def copy_workflow(self, src_workflow, destination_project): @@ -456,7 +500,7 @@ def get_tasks_by_name(self, project, task_name=None, inputs_to_compare=None, tas :return: List of tasks """ try: - tags = {"ProjectName": project["name"]} + tags = {"ProjectId": project["project_id"]} if task_name: tags["TaskName"] = task_name params = { @@ -532,7 +576,7 @@ def submit_task(self, name, project, workflow, parameters, execution_settings=No "workflow_type_version": workflow_type_version, "workflow_url": workflow_url, "tags": json.dumps({ - "ProjectName": project["name"], + "ProjectId": project["project_id"], "TaskName": name, }), } @@ -597,25 +641,34 @@ def get_project(self): def get_project_by_name(self, project_name): """ Get a project by its name - - Note: WES API doesn't have a concept of projects, so this returns the virtual project """ - if project_name in self.projects: - return self.projects[project_name] - - # Create a new project if it doesn't exist - return self.create_project(project_name, f"Virtual project for {project_name}") + response = requests.get( + f"{self.ngs360_endpoint}/api/v1/projects/search?query={project_name}", + timeout=30 + ) + if response.status_code == 200: + projects = response.json().get("data", []) + for project in projects: + if project.get("name") == project_name: + return project + self.logger.error(f"Project '{project_name}' not found") + return None def get_project_by_id(self, project_id): """ Get a project by its id - Note: WES API doesn't have a concept of projects, so this returns the virtual project """ - for project in self.projects.values(): - if project["id"] == project_id: - return project - return self.create_project(project_id, f"Virtual project for {project_id}") + response = requests.get( + f"{self.ngs360_endpoint}/api/v1/projects/{project_id}", + timeout=30 + ) + if response.status_code == 200: + return response.json() + else: + self.logger.error(f"Error retrieving project: {response.status_code}") + self.logger.error(response.json()) + return None def get_project_cost(self, project): """