diff --git a/Snakefile b/Snakefile index cf075b0f..f0c74549 100644 --- a/Snakefile +++ b/Snakefile @@ -1,7 +1,10 @@ import os from spras import runner import shutil +import json import yaml +from pathlib import Path +from spras.containers import TimeoutError from spras.dataset import Dataset from spras.evaluation import Evaluation from spras.analysis import ml, summary, cytoscape @@ -260,6 +263,22 @@ def collect_prepared_input(wildcards): return prepared_inputs +def mark_error(file, **details): + """Marks a file as an error with associated details.""" + Path(file).write_text(json.dumps({"status": "error", **details})) + +def is_error(file): + """Checks if a file was produced by mark_error.""" + try: + with open(file, 'r') as f: + json.load(f)["status"] == "error" + except ValueError: + return False + +def filter_successful(files): + """Convenient function for filtering iterators by whether or not their items are error files.""" + return [file for file in files if not is_error(file)] + # Run the pathway reconstruction algorithm rule reconstruct: input: collect_prepared_input @@ -268,25 +287,47 @@ rule reconstruct: # Overwriting files can happen because the pathway reconstruction algorithms often generate output files with the # same name regardless of the inputs or parameters, and these aren't renamed until after the container command # terminates - output: pathway_file = SEP.join([out_dir, '{dataset}-{algorithm}-{params}', 'raw-pathway.txt']) + output: + pathway_file = SEP.join([out_dir, '{dataset}-{algorithm}-{params}', 'raw-pathway.txt']), + # Despite this being a 'log' file, we don't use the log directive as this rule doesn't actually throw errors. + resource_info = SEP.join([out_dir, '{dataset}-{algorithm}-{params}', 'resource-log.json']) + params: + # Get the timeout from the config and use it as an input. + # TODO: This has unexpected behavior when this rule succeeds but the timeout extends, + # making this rule run again. + timeout = lambda wildcards: _config.config.algorithm_timeouts[wildcards.algorithm] run: # Create a copy so that the updates are not written to the parameters logfile - params = reconstruction_params(wildcards.algorithm, wildcards.params).copy() + algorithm_params = reconstruction_params(wildcards.algorithm, wildcards.params).copy() # Declare the input files as a dictionary. inputs = dict(zip(runner.get_required_inputs(wildcards.algorithm), *{input}, strict=True)) # Remove the _spras_run_name parameter added for keeping track of the run name for parameters.yml - if '_spras_run_name' in params: - params.pop('_spras_run_name') - runner.run(wildcards.algorithm, inputs, output.pathway_file, params, container_settings) + if '_spras_run_name' in algorithm_params: + algorithm_params.pop('_spras_run_name') + try: + runner.run(wildcards.algorithm, inputs, output.pathway_file, algorithm_params, container_settings, params.timeout) + Path(output.resource_info).write_text(json.dumps({"status": "success"})) + except TimeoutError as err: + # We don't raise the error here (and use `--keep-going` to avoid re-running this rule [or others!] unnecessarily.) + Path(output.resource_info).write_text(json.dumps({"status": "error", "type": "timeout", "duration": params.timeout})) + # and we touch pathway_file still: Snakemake doesn't have optional files, so + # we'll filter the ones that didn't time out by passing around empty files. + Path(output.pathway_file).touch() # Original pathway reconstruction output to universal output # Use PRRunner as a wrapper to call the algorithm-specific parse_output rule parse_output: - input: - raw_file = SEP.join([out_dir, '{dataset}-{algorithm}-{params}', 'raw-pathway.txt']), + input: + raw_file = rules.reconstruct.output.pathway_file, + resource_info = rules.reconstruct.output.resource_info, dataset_file = SEP.join([out_dir, 'dataset-{dataset}-merged.pickle']) output: standardized_file = SEP.join([out_dir, '{dataset}-{algorithm}-{params}', 'pathway.txt']) run: + resource_info = json.loads(Path(input.resource_info).read_bytes()) + if resource_info["status"] != "success": + mark_error(output.standardized_file) + return + params = reconstruction_params(wildcards.algorithm, wildcards.params).copy() params['dataset'] = input.dataset_file runner.parse_output(wildcards.algorithm, input.raw_file, output.standardized_file, params) @@ -308,7 +349,7 @@ rule viz_cytoscape: output: session = SEP.join([out_dir, '{dataset}-cytoscape.cys']) run: - cytoscape.run_cytoscape(input.pathways, output.session, container_settings) + cytoscape.run_cytoscape(filter_successful(input.pathways), output.session, container_settings) # Write a single summary table for all pathways for each dataset @@ -321,7 +362,7 @@ rule summary_table: run: # Load the node table from the pickled dataset file node_table = Dataset.from_file(input.dataset_file).node_table - summary_df = summary.summarize_networks(input.pathways, node_table, algorithm_params, algorithms_with_params) + summary_df = summary.summarize_networks(filter_successful(input.pathways), node_table, algorithm_params, algorithms_with_params) summary_df.to_csv(output.summary_table, sep='\t', index=False) # Cluster the output pathways for each dataset @@ -337,7 +378,7 @@ rule ml_analysis: hac_image_horizontal = SEP.join([out_dir, '{dataset}-ml', 'hac-horizontal.png']), hac_clusters_horizontal = SEP.join([out_dir, '{dataset}-ml', 'hac-clusters-horizontal.txt']), run: - summary_df = ml.summarize_networks(input.pathways) + summary_df = ml.summarize_networks(filter_successful(input.pathways)) ml.hac_vertical(summary_df, output.hac_image_vertical, output.hac_clusters_vertical, **hac_params) ml.hac_horizontal(summary_df, output.hac_image_horizontal, output.hac_clusters_horizontal, **hac_params) ml.pca(summary_df, output.pca_image, output.pca_variance, output.pca_coordinates, **pca_params) @@ -351,7 +392,7 @@ rule jaccard_similarity: jaccard_similarity_matrix = SEP.join([out_dir, '{dataset}-ml', 'jaccard-matrix.txt']), jaccard_similarity_heatmap = SEP.join([out_dir, '{dataset}-ml', 'jaccard-heatmap.png']) run: - summary_df = ml.summarize_networks(input.pathways) + summary_df = ml.summarize_networks(filter_successful(input.pathways)) ml.jaccard_similarity_eval(summary_df, output.jaccard_similarity_matrix, output.jaccard_similarity_heatmap) @@ -362,7 +403,7 @@ rule ensemble: output: ensemble_network_file = SEP.join([out_dir,'{dataset}-ml', 'ensemble-pathway.txt']) run: - summary_df = ml.summarize_networks(input.pathways) + summary_df = ml.summarize_networks(filter_successful(input.pathways)) ml.ensemble_network(summary_df, output.ensemble_network_file) # Returns all pathways for a specific algorithm @@ -384,7 +425,7 @@ rule ml_analysis_aggregate_algo: hac_image_horizontal = SEP.join([out_dir, '{dataset}-ml', '{algorithm}-hac-horizontal.png']), hac_clusters_horizontal = SEP.join([out_dir, '{dataset}-ml', '{algorithm}-hac-clusters-horizontal.txt']), run: - summary_df = ml.summarize_networks(input.pathways) + summary_df = ml.summarize_networks(filter_successful(input.pathways)) ml.hac_vertical(summary_df, output.hac_image_vertical, output.hac_clusters_vertical, **hac_params) ml.hac_horizontal(summary_df, output.hac_image_horizontal, output.hac_clusters_horizontal, **hac_params) ml.pca(summary_df, output.pca_image, output.pca_variance, output.pca_coordinates, **pca_params) @@ -396,7 +437,7 @@ rule ensemble_per_algo: output: ensemble_network_file = SEP.join([out_dir,'{dataset}-ml', '{algorithm}-ensemble-pathway.txt']) run: - summary_df = ml.summarize_networks(input.pathways) + summary_df = ml.summarize_networks(filter_successful(input.pathways)) ml.ensemble_network(summary_df, output.ensemble_network_file) # Calculated Jaccard similarity between output pathways for each dataset per algorithm @@ -407,7 +448,7 @@ rule jaccard_similarity_per_algo: jaccard_similarity_matrix = SEP.join([out_dir, '{dataset}-ml', '{algorithm}-jaccard-matrix.txt']), jaccard_similarity_heatmap = SEP.join([out_dir, '{dataset}-ml', '{algorithm}-jaccard-heatmap.png']) run: - summary_df = ml.summarize_networks(input.pathways) + summary_df = ml.summarize_networks(filter_successful(input.pathways)) ml.jaccard_similarity_eval(summary_df, output.jaccard_similarity_matrix, output.jaccard_similarity_heatmap) # Return the gold standard pickle file for a specific gold standard @@ -438,7 +479,7 @@ rule evaluation_pr_per_pathways: node_pr_png = SEP.join([out_dir, '{dataset_gold_standard_pair}-eval', 'pr-per-pathway-nodes.png']), run: node_table = Evaluation.from_file(input.node_gold_standard_file).node_table - pr_df = Evaluation.node_precision_and_recall(input.pathways, node_table) + pr_df = Evaluation.node_precision_and_recall(filter_successful(input.pathways), node_table) Evaluation.precision_and_recall_per_pathway(pr_df, output.node_pr_file, output.node_pr_png) # Returns all pathways for a specific algorithm and dataset @@ -457,7 +498,7 @@ rule evaluation_per_algo_pr_per_pathways: node_pr_png = SEP.join([out_dir, '{dataset_gold_standard_pair}-eval', 'pr-per-pathway-for-{algorithm}-nodes.png']), run: node_table = Evaluation.from_file(input.node_gold_standard_file).node_table - pr_df = Evaluation.node_precision_and_recall(input.pathways, node_table) + pr_df = Evaluation.node_precision_and_recall(filter_successful(input.pathways), node_table) Evaluation.precision_and_recall_per_pathway(pr_df, output.node_pr_file, output.node_pr_png, include_aggregate_algo_eval) # Return pathway summary file per dataset diff --git a/config/config.yaml b/config/config.yaml index f2899fb9..40aea251 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -102,6 +102,7 @@ algorithms: - name: "allpairs" include: true + timeout: 1d - name: "domino" include: true diff --git a/environment.yml b/environment.yml index e5fc75b0..c65643fa 100644 --- a/environment.yml +++ b/environment.yml @@ -15,6 +15,7 @@ dependencies: - scikit-learn=1.7.0 - seaborn=0.13.2 - spython=0.3.14 + - pytimeparse=1.1.8 # conda-specific for dsub - python-dateutil=2.9.0 diff --git a/pyproject.toml b/pyproject.toml index bfc602c6..38074f77 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,6 +30,7 @@ dependencies = [ "scikit-learn==1.7.0", "seaborn==0.13.2", "spython==0.3.14", + "pytimeparse==1.1.8", # toolchain deps "pip==25.3", diff --git a/spras/allpairs.py b/spras/allpairs.py index 21fca6ee..d015acfd 100644 --- a/spras/allpairs.py +++ b/spras/allpairs.py @@ -74,7 +74,7 @@ def generate_inputs(data: Dataset, filename_map): header=["#Interactor1", "Interactor2", "Weight"]) @staticmethod - def run(inputs, output_file, args=None, container_settings=None): + def run(inputs, output_file, args=None, container_settings=None, timeout=None): if not container_settings: container_settings = ProcessedContainerSettings() AllPairs.validate_required_run_args(inputs) @@ -111,7 +111,8 @@ def run(inputs, output_file, args=None, container_settings=None): volumes, work_dir, out_dir, - container_settings) + container_settings, + timeout) @staticmethod def parse_output(raw_pathway_file, standardized_pathway_file, params): diff --git a/spras/analysis/cytoscape.py b/spras/analysis/cytoscape.py index e8489950..6eadfadd 100644 --- a/spras/analysis/cytoscape.py +++ b/spras/analysis/cytoscape.py @@ -58,5 +58,6 @@ def run_cytoscape(pathways: List[Union[str, PurePath]], output_file: str, contai # (https://github.com/Reed-CompBio/spras/pull/390/files#r2485100875) None, container_settings, + None, env) rmtree(cytoscape_output_dir) diff --git a/spras/btb.py b/spras/btb.py index d2f18deb..8c835887 100644 --- a/spras/btb.py +++ b/spras/btb.py @@ -72,7 +72,7 @@ def generate_inputs(data, filename_map): # Skips parameter validation step @staticmethod - def run(inputs, output_file, args=None, container_settings=None): + def run(inputs, output_file, args=None, container_settings=None, timeout=None): if not container_settings: container_settings = ProcessedContainerSettings() BowTieBuilder.validate_required_run_args(inputs) @@ -130,7 +130,8 @@ def run(inputs, output_file, args=None, container_settings=None): volumes, work_dir, out_dir, - container_settings) + container_settings, + timeout) # Output is already written to raw-pathway.txt file diff --git a/spras/config/algorithms.py b/spras/config/algorithms.py index 552fbc4e..5abb4dbf 100644 --- a/spras/config/algorithms.py +++ b/spras/config/algorithms.py @@ -5,7 +5,7 @@ """ import ast import copy -from typing import Annotated, Any, Callable, Literal, Union, cast, get_args +from typing import Annotated, Any, Callable, Literal, Optional, Union, cast, get_args import numpy as np from pydantic import ( @@ -167,6 +167,7 @@ def construct_algorithm_model(name: str, model: type[BaseModel]) -> type[BaseMod return create_model( f'{name}Model', name=Literal[name], + timeout=(Optional[str], None), include=bool, # For algorithms that have a default parameter config, we allow arbitrarily running an algorithm # if no runs are specified. For example, the following config diff --git a/spras/config/config.py b/spras/config/config.py index e180183c..819fd74a 100644 --- a/spras/config/config.py +++ b/spras/config/config.py @@ -16,10 +16,11 @@ import itertools as it import os import warnings -from typing import Any +from typing import Any, Optional import numpy as np import yaml +from pytimeparse import parse from spras.config.container_schema import ProcessedContainerSettings from spras.config.schema import RawConfig @@ -73,6 +74,8 @@ def __init__(self, raw_config: dict[str, Any]): self.container_settings = ProcessedContainerSettings.from_container_settings(parsed_raw_config.containers, self.hash_length) # The list of algorithms to run in the workflow. Each is a dict with 'name' as an expected key. self.algorithms = None + # Dictionary of algorithms to their respective timeout in seconds + self.algorithm_timeouts: dict[str, Optional[int]] = dict() # A nested dict mapping algorithm names to dicts that map parameter hashes to parameter combinations. # Only includes algorithms that are set to be run with 'include: true'. self.algorithm_params: dict[str, dict[str, Any]] = dict() @@ -156,6 +159,16 @@ def process_algorithms(self, raw_config: RawConfig): # Do not parse the rest of the parameters for this algorithm if it is not included continue + if alg.timeout: + # Coerce to an `int` if an int isn't possible. + timeout = parse(alg.timeout, granularity='seconds') + if not timeout: raise RuntimeError(f"Algorithm {alg} has unparsable timeout string {alg.timeout}.") + self.algorithm_timeouts[alg.name] = int(timeout) + else: + # As per the type signature, we still want to say explicitly that this algorithm's timeout + # is uninhabited. + self.algorithm_timeouts[alg.name] = None + runs: dict[str, Any] = alg.runs # Each set of runs should be 1 level down in the config file diff --git a/spras/containers.py b/spras/containers.py index 124b9741..2f199564 100644 --- a/spras/containers.py +++ b/spras/containers.py @@ -8,6 +8,7 @@ import docker import docker.errors +import requests from spras.config.container_schema import ProcessedContainerSettings from spras.logging import indent @@ -166,6 +167,20 @@ def streams_contain(self, needle: str): def __str__(self): return self.message +class TimeoutError(RuntimeError): + """Raises when a function times out.""" + timeout: int + message: str + + def __init__(self, timeout: int, *args): + self.timeout = timeout + self.message = f"Timed out after {timeout}s." + + super(TimeoutError, self).__init__(timeout, *args) + + def __str__(self): + return self.message + def env_to_items(environment: dict[str, str]) -> Iterator[str]: """ Turns an environment variable dictionary to KEY=VALUE pairs. @@ -176,7 +191,17 @@ def env_to_items(environment: dict[str, str]) -> Iterator[str]: # TODO consider a better default environment variable # Follow docker-py's naming conventions (https://docker-py.readthedocs.io/en/stable/containers.html) # Technically the argument is an image, not a container, but we use container here. -def run_container(container_suffix: str, command: List[str], volumes: List[Tuple[PurePath, PurePath]], working_dir: str, out_dir: str | os.PathLike, container_settings: ProcessedContainerSettings, environment: Optional[dict[str, str]] = None, network_disabled = False): +def run_container( + container_suffix: str, + command: List[str], + volumes: List[Tuple[PurePath, PurePath]], + working_dir: str, + out_dir: str | os.PathLike, + container_settings: ProcessedContainerSettings, + timeout: Optional[int], + environment: Optional[dict[str, str]] = None, + network_disabled = False +): """ Runs a command in the container using Singularity or Docker @param container_suffix: name of the DockerHub container without the 'docker://' prefix @@ -185,6 +210,7 @@ def run_container(container_suffix: str, command: List[str], volumes: List[Tuple @param working_dir: the working directory in the container @param container_settings: the settings to use to run the container @param out_dir: output directory for the rule's artifacts. Only passed into run_container_singularity for the purpose of profiling. + @param timeout: the timeout (in seconds), throwing a TimeoutException if the timeout is reached. @param environment: environment variables to set in the container @param network_disabled: Disables the network on the container. Only works for docker for now. This acts as a 'runtime assertion' that a container works w/o networking. @return: output from Singularity execute or Docker run @@ -193,7 +219,7 @@ def run_container(container_suffix: str, command: List[str], volumes: List[Tuple container = container_settings.prefix + "/" + container_suffix if normalized_framework == 'docker': - return run_container_docker(container, command, volumes, working_dir, environment, network_disabled) + return run_container_docker(container, command, volumes, working_dir, environment, timeout, network_disabled) elif normalized_framework == 'singularity' or normalized_framework == "apptainer": return run_container_singularity(container, command, volumes, working_dir, out_dir, container_settings, environment) elif normalized_framework == 'dsub': @@ -201,7 +227,17 @@ def run_container(container_suffix: str, command: List[str], volumes: List[Tuple else: raise ValueError(f'{container_settings.framework} is not a recognized container framework. Choose "docker", "dsub", "apptainer", or "singularity".') -def run_container_and_log(name: str, container_suffix: str, command: List[str], volumes: List[Tuple[PurePath, PurePath]], working_dir: str, out_dir: str | os.PathLike, container_settings: ProcessedContainerSettings, environment: Optional[dict[str, str]] = None, network_disabled=False): +def run_container_and_log( + name: str, + container_suffix: str, + command: List[str], + volumes: List[Tuple[PurePath, PurePath]], + working_dir: str, out_dir: str | os.PathLike, + container_settings: ProcessedContainerSettings, + timeout: Optional[int], + environment: Optional[dict[str, str]] = None, + network_disabled=False +): """ Runs a command in the container using Singularity or Docker with associated pretty printed messages. @param name: the display name of the running container for logging purposes @@ -210,6 +246,7 @@ def run_container_and_log(name: str, container_suffix: str, command: List[str], @param volumes: a list of volumes to mount where each item is a (source, destination) tuple @param working_dir: the working directory in the container @param container_settings: the container settings to use + @param timeout: the timeout (in seconds), throwing a TimeoutException if the timeout is reached. @param environment: environment variables to set in the container @param network_disabled: Disables the network on the container. Only works for docker for now. This acts as a 'runtime assertion' that a container works w/o networking. @return: output from Singularity execute or Docker run @@ -219,7 +256,17 @@ def run_container_and_log(name: str, container_suffix: str, command: List[str], print('Running {} on container framework "{}" on env {} with command: {}'.format(name, container_settings.framework, list(env_to_items(environment)), ' '.join(command)), flush=True) try: - out = run_container(container_suffix=container_suffix, command=command, volumes=volumes, working_dir=working_dir, out_dir=out_dir, container_settings=container_settings, environment=environment, network_disabled=network_disabled) + out = run_container( + container_suffix=container_suffix, + command=command, + volumes=volumes, + working_dir=working_dir, + out_dir=out_dir, + container_settings=container_settings, + timeout=timeout, + environment=environment, + network_disabled=network_disabled + ) if out is not None: if isinstance(out, list): out = ''.join(out) @@ -250,7 +297,15 @@ def run_container_and_log(name: str, container_suffix: str, command: List[str], raise ContainerError(message, err.exit_status, stdout, stderr) from None # TODO any issue with creating a new client each time inside this function? -def run_container_docker(container: str, command: List[str], volumes: List[Tuple[PurePath, PurePath]], working_dir: str, environment: Optional[dict[str, str]] = None, network_disabled=False): +def run_container_docker( + container: str, + command: List[str], + volumes: List[Tuple[PurePath, PurePath]], + working_dir: str, + environment: Optional[dict[str, str]] = None, + timeout: Optional[int] = None, + network_disabled=False +): """ Runs a command in the container using Docker. Attempts to automatically correct file owner and group for new files created by the container, setting them to the @@ -261,6 +316,8 @@ def run_container_docker(container: str, command: List[str], volumes: List[Tuple @param volumes: a list of volumes to mount where each item is a (source, destination) tuple @param working_dir: the working directory in the container @param environment: environment variables to set in the container + @param timeout: the timeout (in seconds), throwing a TimeoutException if the timeout is reached. + @param network_disabled: if enabled, disables the underlying network: useful when containers don't fetch any online resources. @return: output from Docker run, or will error if the container errored. """ @@ -290,13 +347,25 @@ def run_container_docker(container: str, command: List[str], volumes: List[Tuple bind_paths = [f'{prepare_path_docker(src)}:{dest}' for src, dest in volumes] - out = client.containers.run(container, - command, - stderr=True, - volumes=bind_paths, - working_dir=working_dir, - network_disabled=network_disabled, - environment=environment).decode('utf-8') + container_obj = client.containers.run( + container, + command, + volumes=bind_paths, + working_dir=working_dir, + network_disabled=network_disabled, + environment=environment, + detach=True + ) + + try: + container_obj.wait(timeout=timeout) + except requests.exceptions.ReadTimeout as err: + container_obj.stop() + client.close() + if timeout: raise TimeoutError(timeout) from err + else: raise RuntimeError("Timeout error but no timeout specified. Please file an issue with this error and stacktrace at https://github.com/Reed-CompBio/spras/issues/new.") from None + + out = container_obj.attach(stderr=True).decode('utf-8') # TODO does this cleanup need to still run even if there was an error in the above run command? # On Unix, files written by the above Docker run command will be owned by root and cannot be modified @@ -345,7 +414,7 @@ def run_container_docker(container: str, command: List[str], volumes: List[Tuple return out -def run_container_singularity(container: str, command: List[str], volumes: List[Tuple[PurePath, PurePath]], working_dir: str, out_dir: str, config: ProcessedContainerSettings, environment: Optional[dict[str, str]] = None): +def run_container_singularity(container: str, command: List[str], volumes: List[Tuple[PurePath, PurePath]], working_dir: str, out_dir: str | os.PathLike, config: ProcessedContainerSettings, environment: Optional[dict[str, str]] = None): """ Runs a command in the container using Singularity. Only available on Linux. @@ -427,7 +496,7 @@ def run_container_singularity(container: str, command: List[str], volumes: List[ for bind in bind_paths: singularity_cmd.extend(["--bind", bind]) singularity_cmd.extend(singularity_options) - singularity_cmd.append(image_to_run) + singularity_cmd.append(str(image_to_run)) singularity_cmd.extend(command) my_cgroup = create_peer_cgroup() @@ -438,7 +507,7 @@ def run_container_singularity(container: str, command: List[str], volumes: List[ proc = subprocess.run(cmd, capture_output=True, text=True, stderr=subprocess.STDOUT) print("Reading memory and CPU stats from cgroup") - create_apptainer_container_stats(my_cgroup, out_dir) + create_apptainer_container_stats(my_cgroup, str(out_dir)) result = proc.stdout else: diff --git a/spras/domino.py b/spras/domino.py index 31604443..eda2934f 100644 --- a/spras/domino.py +++ b/spras/domino.py @@ -83,7 +83,7 @@ def generate_inputs(data, filename_map): header=['ID_interactor_A', 'ppi', 'ID_interactor_B']) @staticmethod - def run(inputs, output_file, args=None, container_settings=None): + def run(inputs, output_file, args=None, container_settings=None, timeout=None): if not container_settings: container_settings = ProcessedContainerSettings() if not args: args = DominoParams() DOMINO.validate_required_run_args(inputs) @@ -121,7 +121,8 @@ def run(inputs, output_file, args=None, container_settings=None): volumes, work_dir, out_dir, - container_settings) + container_settings, + timeout) except ContainerError as err: # Occurs when DOMINO gets passed some empty dataframe from network_file. # This counts as an empty input, so we return an empty output. @@ -155,7 +156,8 @@ def run(inputs, output_file, args=None, container_settings=None): volumes, work_dir, out_dir, - container_settings) + container_settings, + timeout) except ContainerError as err: # Occurs when DOMINO gets passed some empty dataframe from network_file. # This counts as an empty input, so we return an empty output. diff --git a/spras/meo.py b/spras/meo.py index 5d4630f4..94e90c92 100644 --- a/spras/meo.py +++ b/spras/meo.py @@ -143,10 +143,9 @@ def generate_inputs(data, filename_map): edges.to_csv(filename_map['edges'], sep='\t', index=False, columns=['Interactor1', 'EdgeType', 'Interactor2', 'Weight'], header=False) - # TODO add parameter validation # TODO document required arguments @staticmethod - def run(inputs, output_file=None, args=None, container_settings=None): + def run(inputs, output_file, args=None, container_settings=None, timeout=None): """ Run Maximum Edge Orientation in the Docker image with the provided parameters. The properties file is generated from the provided arguments. @@ -203,7 +202,8 @@ def run(inputs, output_file=None, args=None, container_settings=None): volumes, work_dir, out_dir, - container_settings) + container_settings, + timeout) properties_file_local.unlink(missing_ok=True) diff --git a/spras/mincostflow.py b/spras/mincostflow.py index dad1d706..2a8a8d58 100644 --- a/spras/mincostflow.py +++ b/spras/mincostflow.py @@ -76,7 +76,7 @@ def generate_inputs(data, filename_map): header=False) @staticmethod - def run(inputs, output_file, args=None, container_settings=None): + def run(inputs, output_file, args=None, container_settings=None, timeout=None): if not container_settings: container_settings = ProcessedContainerSettings() if not args: args = MinCostFlowParams() MinCostFlow.validate_required_run_args(inputs) @@ -127,7 +127,8 @@ def run(inputs, output_file, args=None, container_settings=None): volumes, work_dir, out_dir, - container_settings) + container_settings, + timeout) # Check the output of the container out_dir_content = sorted(out_dir.glob('*.sif')) diff --git a/spras/omicsintegrator1.py b/spras/omicsintegrator1.py index 3e5cbf1b..0d913ec1 100644 --- a/spras/omicsintegrator1.py +++ b/spras/omicsintegrator1.py @@ -158,7 +158,7 @@ def generate_inputs(data, filename_map): # TODO add support for knockout argument # TODO add reasonable default values @staticmethod - def run(inputs, output_file, args, container_settings=None): + def run(inputs, output_file, args, container_settings=None, timeout=None): if not container_settings: container_settings = ProcessedContainerSettings() OmicsIntegrator1.validate_required_run_args(inputs, ["dummy_nodes"]) @@ -231,6 +231,7 @@ def run(inputs, output_file, args, container_settings=None): work_dir, out_dir, container_settings, + timeout, {'TMPDIR': mapped_out_dir}) conf_file_local.unlink(missing_ok=True) diff --git a/spras/omicsintegrator2.py b/spras/omicsintegrator2.py index 38624d3a..25b4d5f2 100644 --- a/spras/omicsintegrator2.py +++ b/spras/omicsintegrator2.py @@ -108,7 +108,7 @@ def generate_inputs(data: Dataset, filename_map): # TODO add reasonable default values @staticmethod - def run(inputs, output_file, args=None, container_settings=None): + def run(inputs, output_file, args=None, container_settings=None, timeout=None): if not container_settings: container_settings = ProcessedContainerSettings() if not args: args = OmicsIntegrator2Params() OmicsIntegrator2.validate_required_run_args(inputs) @@ -160,6 +160,7 @@ def run(inputs, output_file, args=None, container_settings=None): work_dir, out_dir, container_settings, + timeout, network_disabled=True) # TODO do we want to retain other output files? diff --git a/spras/pathlinker.py b/spras/pathlinker.py index c534f294..2f41dcc3 100644 --- a/spras/pathlinker.py +++ b/spras/pathlinker.py @@ -75,7 +75,7 @@ def generate_inputs(data, filename_map): header=["#Interactor1","Interactor2","Weight"]) @staticmethod - def run(inputs, output_file, args=None, container_settings=None): + def run(inputs, output_file, args=None, container_settings=None, timeout=None): if not container_settings: container_settings = ProcessedContainerSettings() if not args: args = PathLinkerParams() PathLinker.validate_required_run_args(inputs) @@ -115,7 +115,8 @@ def run(inputs, output_file, args=None, container_settings=None): volumes, work_dir, out_dir, - container_settings) + container_settings, + timeout) # Rename the primary output file to match the desired output filename # Currently PathLinker only writes one output file so we do not need to delete others diff --git a/spras/prm.py b/spras/prm.py index 636859cf..663587f9 100644 --- a/spras/prm.py +++ b/spras/prm.py @@ -59,9 +59,10 @@ def get_params_generic(cls) -> type[T]: # This is used in `runner.py` to avoid a dependency diamond when trying # to import the actual algorithm schema. @classmethod - def run_typeless(cls, inputs: dict[str, str | os.PathLike], output_file: str | os.PathLike, args: dict[str, Any], container_settings: ProcessedContainerSettings): + def run_typeless(cls, inputs: dict[str, str | os.PathLike], output_file: str | os.PathLike, args: dict[str, Any], container_settings: ProcessedContainerSettings, timeout: Optional[int]): """ - This is similar to PRA.run, but it does pydantic logic internally to re-validate argument parameters. + This is similar to PRA.run, but `args` is a dictionary and not a pydantic structure. + However, this method still re-validates `args` against the associated pydantic PRM argument model. """ T_class = cls.get_params_generic() @@ -75,17 +76,23 @@ def run_typeless(cls, inputs: dict[str, str | os.PathLike], output_file: str | o # (Pydantic already provides nice error messages, so we don't need to worry about catching this.) T_parsed = T_class.model_validate(args) - return cls.run(inputs, output_file, T_parsed, container_settings) + return cls.run(inputs, output_file, T_parsed, container_settings, timeout) @staticmethod @abstractmethod - def run(inputs: dict[str, str | os.PathLike], output_file: str | os.PathLike, args: T, container_settings: ProcessedContainerSettings): + def run(inputs: dict[str, str | os.PathLike], output_file: str | os.PathLike, args: T, container_settings: ProcessedContainerSettings, timeout: Optional[int]): """ - Runs an algorithm with the specified inputs, algorithm params (T), - the designated output_file, and the desired container_settings. - - See the algorithm-specific `generate_inputs` and `parse_output` + Runs an algorithm. + @param inputs: specified inputs + @param output_file: designated reconstructed pathway output + @param args: (T) typed algorithm params + @param container_settings: what settings should be associated with the individual container. + @param timeout: timeout in seconds to run the container with + + See the algorithm-specific `PRM.generate_inputs` and `PRM.parse_output` for information about the input and output format. + + Also see `PRM.run_typeless` for the non-pydantic version of this method (where `args` is a dict). """ raise NotImplementedError diff --git a/spras/responsenet.py b/spras/responsenet.py index 4989de48..0c813df8 100644 --- a/spras/responsenet.py +++ b/spras/responsenet.py @@ -68,7 +68,7 @@ def generate_inputs(data, filename_map): header=False) @staticmethod - def run(inputs, output_file, args=None, container_settings=None): + def run(inputs, output_file, args=None, container_settings=None, timeout=None): if not container_settings: container_settings = ProcessedContainerSettings() ResponseNet.validate_required_run_args(inputs) if not args: args = ResponseNetParams() @@ -117,7 +117,8 @@ def run(inputs, output_file, args=None, container_settings=None): volumes, work_dir, out_dir, - container_settings) + container_settings, + timeout) # Rename the primary output file to match the desired output filename out_file_suffixed.rename(output_file) diff --git a/spras/runner.py b/spras/runner.py index d138d8e3..a9f251ba 100644 --- a/spras/runner.py +++ b/spras/runner.py @@ -1,8 +1,10 @@ -from typing import Any +from os import PathLike +from typing import Any, Optional # supported algorithm imports from spras.allpairs import AllPairs from spras.btb import BowTieBuilder +from spras.config.container_schema import ProcessedContainerSettings from spras.dataset import Dataset from spras.domino import DOMINO from spras.meo import MEO @@ -35,14 +37,21 @@ def get_algorithm(algorithm: str) -> type[PRM]: except KeyError as exc: raise NotImplementedError(f'{algorithm} is not currently supported.') from exc -def run(algorithm: str, inputs, output_file, args, container_settings): +def run( + algorithm: str, + inputs: dict[str, str | PathLike], + output_file: str | PathLike, + args: dict[str, Any], + container_settings: ProcessedContainerSettings, + timeout: Optional[int] +): """ A generic interface to the algorithm-specific run functions """ algorithm_runner = get_algorithm(algorithm) # We can't use config.config here else we would get a cyclic dependency. # Since args is a dict here, we use the 'run_typeless' utility PRM function. - algorithm_runner.run_typeless(inputs, output_file, args, container_settings) + algorithm_runner.run_typeless(inputs, output_file, args, container_settings, timeout) def get_required_inputs(algorithm: str): diff --git a/spras/rwr.py b/spras/rwr.py index e6f54d67..5f495885 100644 --- a/spras/rwr.py +++ b/spras/rwr.py @@ -56,7 +56,7 @@ def generate_inputs(data, filename_map): edges.to_csv(filename_map['network'],sep='|',index=False,columns=['Interactor1','Interactor2'],header=False) @staticmethod - def run(inputs, output_file, args, container_settings=None): + def run(inputs, output_file, args, container_settings=None, timeout=None): if not container_settings: container_settings = ProcessedContainerSettings() RWR.validate_required_run_args(inputs) @@ -103,7 +103,8 @@ def run(inputs, output_file, args, container_settings=None): volumes, work_dir, out_dir, - container_settings) + container_settings, + timeout) # Rename the primary output file to match the desired output filename output_edges = Path(out_dir, 'output.txt') diff --git a/spras/strwr.py b/spras/strwr.py index 42928e4c..8058c0cd 100644 --- a/spras/strwr.py +++ b/spras/strwr.py @@ -58,7 +58,7 @@ def generate_inputs(data, filename_map): edges.to_csv(filename_map['network'],sep='|',index=False,columns=['Interactor1','Interactor2'],header=False) @staticmethod - def run(inputs, output_file, args, container_settings=None): + def run(inputs, output_file, args, container_settings=None, timeout=None): if not container_settings: container_settings = ProcessedContainerSettings() ST_RWR.validate_required_run_args(inputs) @@ -110,7 +110,8 @@ def run(inputs, output_file, args, container_settings=None): volumes, work_dir, out_dir, - container_settings) + container_settings, + timeout) # Rename the primary output file to match the desired output filename output_edges = Path(out_dir, 'output.txt')