From f81a33ec3f537ff04777b075cd73d6330845d74a Mon Sep 17 00:00:00 2001 From: "Tristan F." Date: Mon, 12 Jan 2026 09:41:39 +0000 Subject: [PATCH 1/8] feat: timeout --- Snakefile | 4 +- config/config.yaml | 1 + environment.yml | 1 + pyproject.toml | 1 + spras/allpairs.py | 5 +- spras/btb.py | 5 +- spras/config/algorithms.py | 3 +- spras/config/config.py | 11 ++++- spras/containers.py | 93 ++++++++++++++++++++++++++++++++------ spras/domino.py | 5 +- spras/meo.py | 6 +-- spras/mincostflow.py | 5 +- spras/omicsintegrator1.py | 3 +- spras/omicsintegrator2.py | 3 +- spras/pathlinker.py | 5 +- spras/prm.py | 23 ++++++---- spras/responsenet.py | 5 +- spras/runner.py | 15 ++++-- spras/rwr.py | 5 +- spras/strwr.py | 5 +- 20 files changed, 154 insertions(+), 50 deletions(-) diff --git a/Snakefile b/Snakefile index cf075b0f..b9b076e3 100644 --- a/Snakefile +++ b/Snakefile @@ -274,10 +274,12 @@ rule reconstruct: params = reconstruction_params(wildcards.algorithm, wildcards.params).copy() # Declare the input files as a dictionary. inputs = dict(zip(runner.get_required_inputs(wildcards.algorithm), *{input}, strict=True)) + # Get the timeout from the config + timeout = _config.config.algorithm_timeouts[wildcards.algorithm] # Remove the _spras_run_name parameter added for keeping track of the run name for parameters.yml if '_spras_run_name' in params: params.pop('_spras_run_name') - runner.run(wildcards.algorithm, inputs, output.pathway_file, params, container_settings) + runner.run(wildcards.algorithm, inputs, output.pathway_file, timeout, params, container_settings) # Original pathway reconstruction output to universal output # Use PRRunner as a wrapper to call the algorithm-specific parse_output diff --git a/config/config.yaml b/config/config.yaml index f2899fb9..40aea251 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -102,6 +102,7 @@ algorithms: - name: "allpairs" include: true + timeout: 1d - name: "domino" include: true diff --git a/environment.yml b/environment.yml index e5fc75b0..c65643fa 100644 --- a/environment.yml +++ b/environment.yml @@ -15,6 +15,7 @@ dependencies: - scikit-learn=1.7.0 - seaborn=0.13.2 - spython=0.3.14 + - pytimeparse=1.1.8 # conda-specific for dsub - python-dateutil=2.9.0 diff --git a/pyproject.toml b/pyproject.toml index bfc602c6..38074f77 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,6 +30,7 @@ dependencies = [ "scikit-learn==1.7.0", "seaborn==0.13.2", "spython==0.3.14", + "pytimeparse==1.1.8", # toolchain deps "pip==25.3", diff --git a/spras/allpairs.py b/spras/allpairs.py index 21fca6ee..500e864f 100644 --- a/spras/allpairs.py +++ b/spras/allpairs.py @@ -74,7 +74,7 @@ def generate_inputs(data: Dataset, filename_map): header=["#Interactor1", "Interactor2", "Weight"]) @staticmethod - def run(inputs, output_file, args=None, container_settings=None): + def run(inputs, output_file, timeout, args=None, container_settings=None): if not container_settings: container_settings = ProcessedContainerSettings() AllPairs.validate_required_run_args(inputs) @@ -111,7 +111,8 @@ def run(inputs, output_file, args=None, container_settings=None): volumes, work_dir, out_dir, - container_settings) + container_settings, + timeout) @staticmethod def parse_output(raw_pathway_file, standardized_pathway_file, params): diff --git a/spras/btb.py b/spras/btb.py index d2f18deb..e19ed5d1 100644 --- a/spras/btb.py +++ b/spras/btb.py @@ -72,7 +72,7 @@ def generate_inputs(data, filename_map): # Skips parameter validation step @staticmethod - def run(inputs, output_file, args=None, container_settings=None): + def run(inputs, output_file, timeout, args=None, container_settings=None): if not container_settings: container_settings = ProcessedContainerSettings() BowTieBuilder.validate_required_run_args(inputs) @@ -130,7 +130,8 @@ def run(inputs, output_file, args=None, container_settings=None): volumes, work_dir, out_dir, - container_settings) + container_settings, + timeout) # Output is already written to raw-pathway.txt file diff --git a/spras/config/algorithms.py b/spras/config/algorithms.py index 552fbc4e..d918f533 100644 --- a/spras/config/algorithms.py +++ b/spras/config/algorithms.py @@ -5,7 +5,7 @@ """ import ast import copy -from typing import Annotated, Any, Callable, Literal, Union, cast, get_args +from typing import Annotated, Any, Callable, Literal, Union, cast, get_args, Optional import numpy as np from pydantic import ( @@ -167,6 +167,7 @@ def construct_algorithm_model(name: str, model: type[BaseModel]) -> type[BaseMod return create_model( f'{name}Model', name=Literal[name], + timeout=(Optional[str], None), include=bool, # For algorithms that have a default parameter config, we allow arbitrarily running an algorithm # if no runs are specified. For example, the following config diff --git a/spras/config/config.py b/spras/config/config.py index e180183c..9c8bdc8a 100644 --- a/spras/config/config.py +++ b/spras/config/config.py @@ -16,8 +16,9 @@ import itertools as it import os import warnings -from typing import Any +from typing import Any, Optional +from pytimeparse import parse import numpy as np import yaml @@ -73,6 +74,8 @@ def __init__(self, raw_config: dict[str, Any]): self.container_settings = ProcessedContainerSettings.from_container_settings(parsed_raw_config.containers, self.hash_length) # The list of algorithms to run in the workflow. Each is a dict with 'name' as an expected key. self.algorithms = None + # Dictionary of algorithms to their respective timeout in seconds + self.algorithm_timeouts: dict[str, Optional[int]] = dict() # A nested dict mapping algorithm names to dicts that map parameter hashes to parameter combinations. # Only includes algorithms that are set to be run with 'include: true'. self.algorithm_params: dict[str, dict[str, Any]] = dict() @@ -156,6 +159,12 @@ def process_algorithms(self, raw_config: RawConfig): # Do not parse the rest of the parameters for this algorithm if it is not included continue + if alg.timeout: + # Coerce to an `int` if an int isn't possible. + timeout = parse(alg.timeout, granularity='seconds') + if not timeout: raise RuntimeError(f"Algorithm {alg} has unparsable timeout string {alg.timeout}.") + self.algorithm_timeouts[alg.name] = int(timeout) + runs: dict[str, Any] = alg.runs # Each set of runs should be 1 level down in the config file diff --git a/spras/containers.py b/spras/containers.py index 124b9741..c4b41a52 100644 --- a/spras/containers.py +++ b/spras/containers.py @@ -1,6 +1,7 @@ import os import platform import re +import requests import subprocess import textwrap from pathlib import Path, PurePath, PurePosixPath @@ -166,6 +167,17 @@ def streams_contain(self, needle: str): def __str__(self): return self.message +class TimeoutError(RuntimeError): + """Raises when a function times out.""" + timeout: int + message: str + + def __init__(self, timeout: int, *args): + self.timeout = timeout + self.message = f"Timed out after {timeout}s." + + super(TimeoutError, self).__init__(timeout, *args) + def env_to_items(environment: dict[str, str]) -> Iterator[str]: """ Turns an environment variable dictionary to KEY=VALUE pairs. @@ -176,7 +188,17 @@ def env_to_items(environment: dict[str, str]) -> Iterator[str]: # TODO consider a better default environment variable # Follow docker-py's naming conventions (https://docker-py.readthedocs.io/en/stable/containers.html) # Technically the argument is an image, not a container, but we use container here. -def run_container(container_suffix: str, command: List[str], volumes: List[Tuple[PurePath, PurePath]], working_dir: str, out_dir: str | os.PathLike, container_settings: ProcessedContainerSettings, environment: Optional[dict[str, str]] = None, network_disabled = False): +def run_container( + container_suffix: str, + command: List[str], + volumes: List[Tuple[PurePath, PurePath]], + working_dir: str, + out_dir: str | os.PathLike, + container_settings: ProcessedContainerSettings, + timeout: Optional[int], + environment: Optional[dict[str, str]] = None, + network_disabled = False +): """ Runs a command in the container using Singularity or Docker @param container_suffix: name of the DockerHub container without the 'docker://' prefix @@ -185,6 +207,7 @@ def run_container(container_suffix: str, command: List[str], volumes: List[Tuple @param working_dir: the working directory in the container @param container_settings: the settings to use to run the container @param out_dir: output directory for the rule's artifacts. Only passed into run_container_singularity for the purpose of profiling. + @param timeout: the timeout (in seconds), throwing a TimeoutException if the timeout is reached. @param environment: environment variables to set in the container @param network_disabled: Disables the network on the container. Only works for docker for now. This acts as a 'runtime assertion' that a container works w/o networking. @return: output from Singularity execute or Docker run @@ -193,7 +216,7 @@ def run_container(container_suffix: str, command: List[str], volumes: List[Tuple container = container_settings.prefix + "/" + container_suffix if normalized_framework == 'docker': - return run_container_docker(container, command, volumes, working_dir, environment, network_disabled) + return run_container_docker(container, command, volumes, working_dir, environment, timeout, network_disabled) elif normalized_framework == 'singularity' or normalized_framework == "apptainer": return run_container_singularity(container, command, volumes, working_dir, out_dir, container_settings, environment) elif normalized_framework == 'dsub': @@ -201,7 +224,17 @@ def run_container(container_suffix: str, command: List[str], volumes: List[Tuple else: raise ValueError(f'{container_settings.framework} is not a recognized container framework. Choose "docker", "dsub", "apptainer", or "singularity".') -def run_container_and_log(name: str, container_suffix: str, command: List[str], volumes: List[Tuple[PurePath, PurePath]], working_dir: str, out_dir: str | os.PathLike, container_settings: ProcessedContainerSettings, environment: Optional[dict[str, str]] = None, network_disabled=False): +def run_container_and_log( + name: str, + container_suffix: str, + command: List[str], + volumes: List[Tuple[PurePath, PurePath]], + working_dir: str, out_dir: str | os.PathLike, + container_settings: ProcessedContainerSettings, + timeout: Optional[int], + environment: Optional[dict[str, str]] = None, + network_disabled=False +): """ Runs a command in the container using Singularity or Docker with associated pretty printed messages. @param name: the display name of the running container for logging purposes @@ -210,6 +243,7 @@ def run_container_and_log(name: str, container_suffix: str, command: List[str], @param volumes: a list of volumes to mount where each item is a (source, destination) tuple @param working_dir: the working directory in the container @param container_settings: the container settings to use + @param timeout: the timeout (in seconds), throwing a TimeoutException if the timeout is reached. @param environment: environment variables to set in the container @param network_disabled: Disables the network on the container. Only works for docker for now. This acts as a 'runtime assertion' that a container works w/o networking. @return: output from Singularity execute or Docker run @@ -219,7 +253,17 @@ def run_container_and_log(name: str, container_suffix: str, command: List[str], print('Running {} on container framework "{}" on env {} with command: {}'.format(name, container_settings.framework, list(env_to_items(environment)), ' '.join(command)), flush=True) try: - out = run_container(container_suffix=container_suffix, command=command, volumes=volumes, working_dir=working_dir, out_dir=out_dir, container_settings=container_settings, environment=environment, network_disabled=network_disabled) + out = run_container( + container_suffix=container_suffix, + command=command, + volumes=volumes, + working_dir=working_dir, + out_dir=out_dir, + container_settings=container_settings, + timeout=timeout, + environment=environment, + network_disabled=network_disabled + ) if out is not None: if isinstance(out, list): out = ''.join(out) @@ -250,7 +294,15 @@ def run_container_and_log(name: str, container_suffix: str, command: List[str], raise ContainerError(message, err.exit_status, stdout, stderr) from None # TODO any issue with creating a new client each time inside this function? -def run_container_docker(container: str, command: List[str], volumes: List[Tuple[PurePath, PurePath]], working_dir: str, environment: Optional[dict[str, str]] = None, network_disabled=False): +def run_container_docker( + container: str, + command: List[str], + volumes: List[Tuple[PurePath, PurePath]], + working_dir: str, + environment: Optional[dict[str, str]] = None, + timeout: Optional[int] = None, + network_disabled=False +): """ Runs a command in the container using Docker. Attempts to automatically correct file owner and group for new files created by the container, setting them to the @@ -261,6 +313,8 @@ def run_container_docker(container: str, command: List[str], volumes: List[Tuple @param volumes: a list of volumes to mount where each item is a (source, destination) tuple @param working_dir: the working directory in the container @param environment: environment variables to set in the container + @param timeout: the timeout (in seconds), throwing a TimeoutException if the timeout is reached. + @param network_disabled: if enabled, disables the underlying network: useful when containers don't fetch any online resources. @return: output from Docker run, or will error if the container errored. """ @@ -290,13 +344,22 @@ def run_container_docker(container: str, command: List[str], volumes: List[Tuple bind_paths = [f'{prepare_path_docker(src)}:{dest}' for src, dest in volumes] - out = client.containers.run(container, - command, - stderr=True, - volumes=bind_paths, - working_dir=working_dir, - network_disabled=network_disabled, - environment=environment).decode('utf-8') + container_obj = client.containers.create( + container, + command, + volumes=bind_paths, + working_dir=working_dir, + network_disabled=network_disabled, + environment=environment + ) + + if timeout: + try: + container_obj.wait(timeout=timeout) + except requests.exceptions.ReadTimeout: + raise TimeoutError(timeout) + + out = container_obj.attach(stderr=True).decode('utf-8') # TODO does this cleanup need to still run even if there was an error in the above run command? # On Unix, files written by the above Docker run command will be owned by root and cannot be modified @@ -345,7 +408,7 @@ def run_container_docker(container: str, command: List[str], volumes: List[Tuple return out -def run_container_singularity(container: str, command: List[str], volumes: List[Tuple[PurePath, PurePath]], working_dir: str, out_dir: str, config: ProcessedContainerSettings, environment: Optional[dict[str, str]] = None): +def run_container_singularity(container: str, command: List[str], volumes: List[Tuple[PurePath, PurePath]], working_dir: str, out_dir: str | os.PathLike, config: ProcessedContainerSettings, environment: Optional[dict[str, str]] = None): """ Runs a command in the container using Singularity. Only available on Linux. @@ -427,7 +490,7 @@ def run_container_singularity(container: str, command: List[str], volumes: List[ for bind in bind_paths: singularity_cmd.extend(["--bind", bind]) singularity_cmd.extend(singularity_options) - singularity_cmd.append(image_to_run) + singularity_cmd.append(str(image_to_run)) singularity_cmd.extend(command) my_cgroup = create_peer_cgroup() @@ -438,7 +501,7 @@ def run_container_singularity(container: str, command: List[str], volumes: List[ proc = subprocess.run(cmd, capture_output=True, text=True, stderr=subprocess.STDOUT) print("Reading memory and CPU stats from cgroup") - create_apptainer_container_stats(my_cgroup, out_dir) + create_apptainer_container_stats(my_cgroup, str(out_dir)) result = proc.stdout else: diff --git a/spras/domino.py b/spras/domino.py index 31604443..1faaf9d6 100644 --- a/spras/domino.py +++ b/spras/domino.py @@ -83,7 +83,7 @@ def generate_inputs(data, filename_map): header=['ID_interactor_A', 'ppi', 'ID_interactor_B']) @staticmethod - def run(inputs, output_file, args=None, container_settings=None): + def run(inputs, output_file, timeout, args=None, container_settings=None): if not container_settings: container_settings = ProcessedContainerSettings() if not args: args = DominoParams() DOMINO.validate_required_run_args(inputs) @@ -121,7 +121,8 @@ def run(inputs, output_file, args=None, container_settings=None): volumes, work_dir, out_dir, - container_settings) + container_settings, + timeout) except ContainerError as err: # Occurs when DOMINO gets passed some empty dataframe from network_file. # This counts as an empty input, so we return an empty output. diff --git a/spras/meo.py b/spras/meo.py index 5d4630f4..b7be4db9 100644 --- a/spras/meo.py +++ b/spras/meo.py @@ -143,10 +143,9 @@ def generate_inputs(data, filename_map): edges.to_csv(filename_map['edges'], sep='\t', index=False, columns=['Interactor1', 'EdgeType', 'Interactor2', 'Weight'], header=False) - # TODO add parameter validation # TODO document required arguments @staticmethod - def run(inputs, output_file=None, args=None, container_settings=None): + def run(inputs, output_file, timeout, args=None, container_settings=None): """ Run Maximum Edge Orientation in the Docker image with the provided parameters. The properties file is generated from the provided arguments. @@ -203,7 +202,8 @@ def run(inputs, output_file=None, args=None, container_settings=None): volumes, work_dir, out_dir, - container_settings) + container_settings, + timeout) properties_file_local.unlink(missing_ok=True) diff --git a/spras/mincostflow.py b/spras/mincostflow.py index dad1d706..cbb373d2 100644 --- a/spras/mincostflow.py +++ b/spras/mincostflow.py @@ -76,7 +76,7 @@ def generate_inputs(data, filename_map): header=False) @staticmethod - def run(inputs, output_file, args=None, container_settings=None): + def run(inputs, output_file, timeout, args=None, container_settings=None): if not container_settings: container_settings = ProcessedContainerSettings() if not args: args = MinCostFlowParams() MinCostFlow.validate_required_run_args(inputs) @@ -127,7 +127,8 @@ def run(inputs, output_file, args=None, container_settings=None): volumes, work_dir, out_dir, - container_settings) + container_settings, + timeout) # Check the output of the container out_dir_content = sorted(out_dir.glob('*.sif')) diff --git a/spras/omicsintegrator1.py b/spras/omicsintegrator1.py index 3e5cbf1b..7a053ec9 100644 --- a/spras/omicsintegrator1.py +++ b/spras/omicsintegrator1.py @@ -158,7 +158,7 @@ def generate_inputs(data, filename_map): # TODO add support for knockout argument # TODO add reasonable default values @staticmethod - def run(inputs, output_file, args, container_settings=None): + def run(inputs, output_file, timeout, args, container_settings=None): if not container_settings: container_settings = ProcessedContainerSettings() OmicsIntegrator1.validate_required_run_args(inputs, ["dummy_nodes"]) @@ -231,6 +231,7 @@ def run(inputs, output_file, args, container_settings=None): work_dir, out_dir, container_settings, + timeout, {'TMPDIR': mapped_out_dir}) conf_file_local.unlink(missing_ok=True) diff --git a/spras/omicsintegrator2.py b/spras/omicsintegrator2.py index 38624d3a..8994dd1f 100644 --- a/spras/omicsintegrator2.py +++ b/spras/omicsintegrator2.py @@ -108,7 +108,7 @@ def generate_inputs(data: Dataset, filename_map): # TODO add reasonable default values @staticmethod - def run(inputs, output_file, args=None, container_settings=None): + def run(inputs, output_file, timeout, args=None, container_settings=None): if not container_settings: container_settings = ProcessedContainerSettings() if not args: args = OmicsIntegrator2Params() OmicsIntegrator2.validate_required_run_args(inputs) @@ -160,6 +160,7 @@ def run(inputs, output_file, args=None, container_settings=None): work_dir, out_dir, container_settings, + timeout, network_disabled=True) # TODO do we want to retain other output files? diff --git a/spras/pathlinker.py b/spras/pathlinker.py index c534f294..eb91c49c 100644 --- a/spras/pathlinker.py +++ b/spras/pathlinker.py @@ -75,7 +75,7 @@ def generate_inputs(data, filename_map): header=["#Interactor1","Interactor2","Weight"]) @staticmethod - def run(inputs, output_file, args=None, container_settings=None): + def run(inputs, output_file, timeout, args=None, container_settings=None): if not container_settings: container_settings = ProcessedContainerSettings() if not args: args = PathLinkerParams() PathLinker.validate_required_run_args(inputs) @@ -115,7 +115,8 @@ def run(inputs, output_file, args=None, container_settings=None): volumes, work_dir, out_dir, - container_settings) + container_settings, + timeout) # Rename the primary output file to match the desired output filename # Currently PathLinker only writes one output file so we do not need to delete others diff --git a/spras/prm.py b/spras/prm.py index 636859cf..2f79c376 100644 --- a/spras/prm.py +++ b/spras/prm.py @@ -59,9 +59,10 @@ def get_params_generic(cls) -> type[T]: # This is used in `runner.py` to avoid a dependency diamond when trying # to import the actual algorithm schema. @classmethod - def run_typeless(cls, inputs: dict[str, str | os.PathLike], output_file: str | os.PathLike, args: dict[str, Any], container_settings: ProcessedContainerSettings): + def run_typeless(cls, inputs: dict[str, str | os.PathLike], output_file: str | os.PathLike, timeout: Optional[int], args: dict[str, Any], container_settings: ProcessedContainerSettings): """ - This is similar to PRA.run, but it does pydantic logic internally to re-validate argument parameters. + This is similar to PRA.run, but `args` is a dictionary and not a pydantic structure. + However, this method still re-validates `args` against the associated pydantic PRM argument model. """ T_class = cls.get_params_generic() @@ -75,17 +76,23 @@ def run_typeless(cls, inputs: dict[str, str | os.PathLike], output_file: str | o # (Pydantic already provides nice error messages, so we don't need to worry about catching this.) T_parsed = T_class.model_validate(args) - return cls.run(inputs, output_file, T_parsed, container_settings) + return cls.run(inputs, output_file, timeout, T_parsed, container_settings) @staticmethod @abstractmethod - def run(inputs: dict[str, str | os.PathLike], output_file: str | os.PathLike, args: T, container_settings: ProcessedContainerSettings): + def run(inputs: dict[str, str | os.PathLike], output_file: str | os.PathLike, timeout: Optional[int], args: T, container_settings: ProcessedContainerSettings): """ - Runs an algorithm with the specified inputs, algorithm params (T), - the designated output_file, and the desired container_settings. - - See the algorithm-specific `generate_inputs` and `parse_output` + Runs an algorithm. + @param inputs: specified inputs + @param output_file: designated reconstructed pathway output + @param timeout: timeout in seconds to run the container with + @param args: (T) typed algorithm params + @param container_settings: what settings should be associated with the individual container. + + See the algorithm-specific `PRM.generate_inputs` and `PRM.parse_output` for information about the input and output format. + + Also see `PRM.run_typeless` for the non-pydantic version of this method (where `args` is a dict). """ raise NotImplementedError diff --git a/spras/responsenet.py b/spras/responsenet.py index 4989de48..565f02da 100644 --- a/spras/responsenet.py +++ b/spras/responsenet.py @@ -68,7 +68,7 @@ def generate_inputs(data, filename_map): header=False) @staticmethod - def run(inputs, output_file, args=None, container_settings=None): + def run(inputs, output_file, timeout, args=None, container_settings=None): if not container_settings: container_settings = ProcessedContainerSettings() ResponseNet.validate_required_run_args(inputs) if not args: args = ResponseNetParams() @@ -117,7 +117,8 @@ def run(inputs, output_file, args=None, container_settings=None): volumes, work_dir, out_dir, - container_settings) + container_settings, + timeout) # Rename the primary output file to match the desired output filename out_file_suffixed.rename(output_file) diff --git a/spras/runner.py b/spras/runner.py index d138d8e3..b5e68910 100644 --- a/spras/runner.py +++ b/spras/runner.py @@ -1,8 +1,10 @@ -from typing import Any +from os import PathLike +from typing import Any, Optional # supported algorithm imports from spras.allpairs import AllPairs from spras.btb import BowTieBuilder +from spras.config.container_schema import ProcessedContainerSettings from spras.dataset import Dataset from spras.domino import DOMINO from spras.meo import MEO @@ -35,14 +37,21 @@ def get_algorithm(algorithm: str) -> type[PRM]: except KeyError as exc: raise NotImplementedError(f'{algorithm} is not currently supported.') from exc -def run(algorithm: str, inputs, output_file, args, container_settings): +def run( + algorithm: str, + inputs: dict[str, str | PathLike], + output_file: str | PathLike, + timeout: Optional[int], + args: dict[str, Any], + container_settings: ProcessedContainerSettings +): """ A generic interface to the algorithm-specific run functions """ algorithm_runner = get_algorithm(algorithm) # We can't use config.config here else we would get a cyclic dependency. # Since args is a dict here, we use the 'run_typeless' utility PRM function. - algorithm_runner.run_typeless(inputs, output_file, args, container_settings) + algorithm_runner.run_typeless(inputs, output_file, timeout, args, container_settings) def get_required_inputs(algorithm: str): diff --git a/spras/rwr.py b/spras/rwr.py index e6f54d67..e359767f 100644 --- a/spras/rwr.py +++ b/spras/rwr.py @@ -56,7 +56,7 @@ def generate_inputs(data, filename_map): edges.to_csv(filename_map['network'],sep='|',index=False,columns=['Interactor1','Interactor2'],header=False) @staticmethod - def run(inputs, output_file, args, container_settings=None): + def run(inputs, output_file, timeout, args, container_settings=None): if not container_settings: container_settings = ProcessedContainerSettings() RWR.validate_required_run_args(inputs) @@ -103,7 +103,8 @@ def run(inputs, output_file, args, container_settings=None): volumes, work_dir, out_dir, - container_settings) + container_settings, + timeout) # Rename the primary output file to match the desired output filename output_edges = Path(out_dir, 'output.txt') diff --git a/spras/strwr.py b/spras/strwr.py index 42928e4c..b25046a6 100644 --- a/spras/strwr.py +++ b/spras/strwr.py @@ -58,7 +58,7 @@ def generate_inputs(data, filename_map): edges.to_csv(filename_map['network'],sep='|',index=False,columns=['Interactor1','Interactor2'],header=False) @staticmethod - def run(inputs, output_file, args, container_settings=None): + def run(inputs, output_file, timeout, args, container_settings=None): if not container_settings: container_settings = ProcessedContainerSettings() ST_RWR.validate_required_run_args(inputs) @@ -110,7 +110,8 @@ def run(inputs, output_file, args, container_settings=None): volumes, work_dir, out_dir, - container_settings) + container_settings, + timeout) # Rename the primary output file to match the desired output filename output_edges = Path(out_dir, 'output.txt') From 0342b5cbb8844222d3d79b80d69c4fbdbf2f8e61 Mon Sep 17 00:00:00 2001 From: "Tristan F." Date: Mon, 12 Jan 2026 11:28:17 -0800 Subject: [PATCH 2/8] feat: snakemake err checkpoint --- Snakefile | 44 ++++++++++++++++++++++++++++++++---------- spras/config/config.py | 4 ++++ spras/containers.py | 3 +++ 3 files changed, 41 insertions(+), 10 deletions(-) diff --git a/Snakefile b/Snakefile index b9b076e3..960ccf37 100644 --- a/Snakefile +++ b/Snakefile @@ -1,7 +1,10 @@ import os from spras import runner import shutil +import json import yaml +from pathlib import Path +from spras.containers import TimeoutError from spras.dataset import Dataset from spras.evaluation import Evaluation from spras.analysis import ml, summary, cytoscape @@ -261,31 +264,52 @@ def collect_prepared_input(wildcards): return prepared_inputs # Run the pathway reconstruction algorithm -rule reconstruct: +checkpoint reconstruct: input: collect_prepared_input # Each reconstruct call should be in a separate output subdirectory that is unique for the parameter combination so # that multiple instances of the container can run simultaneously without overwriting the output files # Overwriting files can happen because the pathway reconstruction algorithms often generate output files with the # same name regardless of the inputs or parameters, and these aren't renamed until after the container command # terminates - output: pathway_file = SEP.join([out_dir, '{dataset}-{algorithm}-{params}', 'raw-pathway.txt']) + output: + pathway_file = SEP.join([out_dir, '{dataset}-{algorithm}-{params}', 'raw-pathway.txt']) + log: + resource_info = SEP.join([out_dir, '{dataset}-{algorithm}-{params}', 'resource-log.json']) + params: + # Get the timeout from the config and use it as an input. + # TODO: This has unexpected behavior when this rule succeeds but the timeout extends, + # making this rule run again. + timeout = lambda wildcards: _config.config.algorithm_timeouts[wildcards.algorithm] run: # Create a copy so that the updates are not written to the parameters logfile - params = reconstruction_params(wildcards.algorithm, wildcards.params).copy() + algorithm_params = reconstruction_params(wildcards.algorithm, wildcards.params).copy() # Declare the input files as a dictionary. inputs = dict(zip(runner.get_required_inputs(wildcards.algorithm), *{input}, strict=True)) - # Get the timeout from the config - timeout = _config.config.algorithm_timeouts[wildcards.algorithm] # Remove the _spras_run_name parameter added for keeping track of the run name for parameters.yml - if '_spras_run_name' in params: - params.pop('_spras_run_name') - runner.run(wildcards.algorithm, inputs, output.pathway_file, timeout, params, container_settings) + if '_spras_run_name' in algorithm_params: + algorithm_params.pop('_spras_run_name') + try: + runner.run(wildcards.algorithm, inputs, output.pathway_file, params.timeout, algorithm_params, container_settings) + Path(log.resource_info).write_text(json.dumps({"status": "success"})) + except TimeoutError as err: + # We don't raise the error here (and use `--keep-going` to avoid re-running this rule [or others!] unnecessarily.) + Path(log.resource_info).write_text(json.dumps({"status": "error", "type": "timeout", "duration": params.timeout})) + # and we touch pathway_file still: Snakemake doesn't have optional files, so + # we'll filter the ones that didn't time out in collect_successful_reconstructions. + Path(output.pathway_file).touch() + +def collect_successful_reconstructions(wildcards): + reconstruct_checkpoint = checkpoints.reconstruct.get(**wildcards) + resource_info = json.loads(Path(reconstruct_checkpoint.log.resource_info).read_bytes()) + if resource_info["status"] == "success": + return [reconstruct_checkpoint.output.pathway_file] + return [] # Original pathway reconstruction output to universal output # Use PRRunner as a wrapper to call the algorithm-specific parse_output rule parse_output: - input: - raw_file = SEP.join([out_dir, '{dataset}-{algorithm}-{params}', 'raw-pathway.txt']), + input: + raw_file = collect_successful_reconstructions, dataset_file = SEP.join([out_dir, 'dataset-{dataset}-merged.pickle']) output: standardized_file = SEP.join([out_dir, '{dataset}-{algorithm}-{params}', 'pathway.txt']) run: diff --git a/spras/config/config.py b/spras/config/config.py index 9c8bdc8a..63e8e2d6 100644 --- a/spras/config/config.py +++ b/spras/config/config.py @@ -164,6 +164,10 @@ def process_algorithms(self, raw_config: RawConfig): timeout = parse(alg.timeout, granularity='seconds') if not timeout: raise RuntimeError(f"Algorithm {alg} has unparsable timeout string {alg.timeout}.") self.algorithm_timeouts[alg.name] = int(timeout) + else: + # As per the type signature, we still want to say explicitly that this algorithm's timeout + # is uninhabited. + self.algorithm_timeouts[alg.name] = None runs: dict[str, Any] = alg.runs diff --git a/spras/containers.py b/spras/containers.py index c4b41a52..a72bf3a6 100644 --- a/spras/containers.py +++ b/spras/containers.py @@ -178,6 +178,9 @@ def __init__(self, timeout: int, *args): super(TimeoutError, self).__init__(timeout, *args) + def __str__(self): + return self.message + def env_to_items(environment: dict[str, str]) -> Iterator[str]: """ Turns an environment variable dictionary to KEY=VALUE pairs. From 841d24240eee1cc10cb3d4a9238b1dcf8095970a Mon Sep 17 00:00:00 2001 From: "Tristan F." Date: Mon, 12 Jan 2026 11:51:48 -0800 Subject: [PATCH 3/8] fix: use timeout correctly --- Snakefile | 14 +++++++------- spras/containers.py | 15 ++++++++------- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/Snakefile b/Snakefile index 960ccf37..758d46df 100644 --- a/Snakefile +++ b/Snakefile @@ -272,8 +272,8 @@ checkpoint reconstruct: # same name regardless of the inputs or parameters, and these aren't renamed until after the container command # terminates output: - pathway_file = SEP.join([out_dir, '{dataset}-{algorithm}-{params}', 'raw-pathway.txt']) - log: + pathway_file = SEP.join([out_dir, '{dataset}-{algorithm}-{params}', 'raw-pathway.txt']), + # Despite this being a 'log' file, we don't use the log directive as this rule doesn't actually throw errors. resource_info = SEP.join([out_dir, '{dataset}-{algorithm}-{params}', 'resource-log.json']) params: # Get the timeout from the config and use it as an input. @@ -290,20 +290,20 @@ checkpoint reconstruct: algorithm_params.pop('_spras_run_name') try: runner.run(wildcards.algorithm, inputs, output.pathway_file, params.timeout, algorithm_params, container_settings) - Path(log.resource_info).write_text(json.dumps({"status": "success"})) + Path(output.resource_info).write_text(json.dumps({"status": "success"})) except TimeoutError as err: # We don't raise the error here (and use `--keep-going` to avoid re-running this rule [or others!] unnecessarily.) - Path(log.resource_info).write_text(json.dumps({"status": "error", "type": "timeout", "duration": params.timeout})) + Path(output.resource_info).write_text(json.dumps({"status": "error", "type": "timeout", "duration": params.timeout})) # and we touch pathway_file still: Snakemake doesn't have optional files, so # we'll filter the ones that didn't time out in collect_successful_reconstructions. Path(output.pathway_file).touch() def collect_successful_reconstructions(wildcards): reconstruct_checkpoint = checkpoints.reconstruct.get(**wildcards) - resource_info = json.loads(Path(reconstruct_checkpoint.log.resource_info).read_bytes()) + resource_info = json.loads(Path(reconstruct_checkpoint.output.resource_info).read_bytes()) if resource_info["status"] == "success": - return [reconstruct_checkpoint.output.pathway_file] - return [] + return reconstruct_checkpoint.output.pathway_file + return None # Original pathway reconstruction output to universal output # Use PRRunner as a wrapper to call the algorithm-specific parse_output diff --git a/spras/containers.py b/spras/containers.py index a72bf3a6..63fb7170 100644 --- a/spras/containers.py +++ b/spras/containers.py @@ -347,20 +347,21 @@ def run_container_docker( bind_paths = [f'{prepare_path_docker(src)}:{dest}' for src, dest in volumes] - container_obj = client.containers.create( + container_obj = client.containers.run( container, command, volumes=bind_paths, working_dir=working_dir, network_disabled=network_disabled, - environment=environment + environment=environment, + detach=True ) - if timeout: - try: - container_obj.wait(timeout=timeout) - except requests.exceptions.ReadTimeout: - raise TimeoutError(timeout) + try: + container_obj.wait(timeout=timeout) + except requests.exceptions.ReadTimeout: + if timeout: raise TimeoutError(timeout) + else: raise RuntimeError("Timeout error but no timeout specified. Please file an issue with this error and stacktrace at https://github.com/Reed-CompBio/spras/issues/new.") out = container_obj.attach(stderr=True).decode('utf-8') From 75fd7f1cf15674b737a66d77c2c42e5d7c57a676 Mon Sep 17 00:00:00 2001 From: "Tristan F." Date: Tue, 13 Jan 2026 04:39:32 -0800 Subject: [PATCH 4/8] fix: filter files w/ errors --- Snakefile | 52 ++++++++++++++++++++++++++++----------------- spras/containers.py | 2 ++ 2 files changed, 35 insertions(+), 19 deletions(-) diff --git a/Snakefile b/Snakefile index 758d46df..afb2ad99 100644 --- a/Snakefile +++ b/Snakefile @@ -263,6 +263,21 @@ def collect_prepared_input(wildcards): return prepared_inputs +def mark_error(file, **details): + """Marks a file as an error with associated details.""" + Path(file).write_text(json.dumps({"status": "error", **details})) + +def is_error(file): + """Checks if a file was produced by mark_error.""" + try: + return json.loads(Path(file).read_bytes())["status"] == "error" + except ValueError: + return False + +def filter_successful(files): + """Convenient function for filtering iterators by whether or not their items are error files.""" + return [file for file in files if not is_error(file)] + # Run the pathway reconstruction algorithm checkpoint reconstruct: input: collect_prepared_input @@ -295,24 +310,23 @@ checkpoint reconstruct: # We don't raise the error here (and use `--keep-going` to avoid re-running this rule [or others!] unnecessarily.) Path(output.resource_info).write_text(json.dumps({"status": "error", "type": "timeout", "duration": params.timeout})) # and we touch pathway_file still: Snakemake doesn't have optional files, so - # we'll filter the ones that didn't time out in collect_successful_reconstructions. + # we'll filter the ones that didn't time out by passing around empty files. Path(output.pathway_file).touch() -def collect_successful_reconstructions(wildcards): - reconstruct_checkpoint = checkpoints.reconstruct.get(**wildcards) - resource_info = json.loads(Path(reconstruct_checkpoint.output.resource_info).read_bytes()) - if resource_info["status"] == "success": - return reconstruct_checkpoint.output.pathway_file - return None - # Original pathway reconstruction output to universal output # Use PRRunner as a wrapper to call the algorithm-specific parse_output rule parse_output: input: - raw_file = collect_successful_reconstructions, + raw_file = rules.reconstruct.output.pathway_file, + resource_info = rules.reconstruct.output.resource_info, dataset_file = SEP.join([out_dir, 'dataset-{dataset}-merged.pickle']) output: standardized_file = SEP.join([out_dir, '{dataset}-{algorithm}-{params}', 'pathway.txt']) run: + resource_info = json.loads(Path(input.resource_info).read_bytes()) + if resource_info["status"] != "success": + mark_error(output.standardized_file) + return + params = reconstruction_params(wildcards.algorithm, wildcards.params).copy() params['dataset'] = input.dataset_file runner.parse_output(wildcards.algorithm, input.raw_file, output.standardized_file, params) @@ -334,7 +348,7 @@ rule viz_cytoscape: output: session = SEP.join([out_dir, '{dataset}-cytoscape.cys']) run: - cytoscape.run_cytoscape(input.pathways, output.session, container_settings) + cytoscape.run_cytoscape(filter_successful(input.pathways), output.session, container_settings) # Write a single summary table for all pathways for each dataset @@ -347,7 +361,7 @@ rule summary_table: run: # Load the node table from the pickled dataset file node_table = Dataset.from_file(input.dataset_file).node_table - summary_df = summary.summarize_networks(input.pathways, node_table, algorithm_params, algorithms_with_params) + summary_df = summary.summarize_networks(filter_successful(input.pathways), node_table, algorithm_params, algorithms_with_params) summary_df.to_csv(output.summary_table, sep='\t', index=False) # Cluster the output pathways for each dataset @@ -363,7 +377,7 @@ rule ml_analysis: hac_image_horizontal = SEP.join([out_dir, '{dataset}-ml', 'hac-horizontal.png']), hac_clusters_horizontal = SEP.join([out_dir, '{dataset}-ml', 'hac-clusters-horizontal.txt']), run: - summary_df = ml.summarize_networks(input.pathways) + summary_df = ml.summarize_networks(filter_successful(input.pathways)) ml.hac_vertical(summary_df, output.hac_image_vertical, output.hac_clusters_vertical, **hac_params) ml.hac_horizontal(summary_df, output.hac_image_horizontal, output.hac_clusters_horizontal, **hac_params) ml.pca(summary_df, output.pca_image, output.pca_variance, output.pca_coordinates, **pca_params) @@ -377,7 +391,7 @@ rule jaccard_similarity: jaccard_similarity_matrix = SEP.join([out_dir, '{dataset}-ml', 'jaccard-matrix.txt']), jaccard_similarity_heatmap = SEP.join([out_dir, '{dataset}-ml', 'jaccard-heatmap.png']) run: - summary_df = ml.summarize_networks(input.pathways) + summary_df = ml.summarize_networks(filter_successful(input.pathways)) ml.jaccard_similarity_eval(summary_df, output.jaccard_similarity_matrix, output.jaccard_similarity_heatmap) @@ -388,7 +402,7 @@ rule ensemble: output: ensemble_network_file = SEP.join([out_dir,'{dataset}-ml', 'ensemble-pathway.txt']) run: - summary_df = ml.summarize_networks(input.pathways) + summary_df = ml.summarize_networks(filter_successful(input.pathways)) ml.ensemble_network(summary_df, output.ensemble_network_file) # Returns all pathways for a specific algorithm @@ -410,7 +424,7 @@ rule ml_analysis_aggregate_algo: hac_image_horizontal = SEP.join([out_dir, '{dataset}-ml', '{algorithm}-hac-horizontal.png']), hac_clusters_horizontal = SEP.join([out_dir, '{dataset}-ml', '{algorithm}-hac-clusters-horizontal.txt']), run: - summary_df = ml.summarize_networks(input.pathways) + summary_df = ml.summarize_networks(filter_successful(input.pathways)) ml.hac_vertical(summary_df, output.hac_image_vertical, output.hac_clusters_vertical, **hac_params) ml.hac_horizontal(summary_df, output.hac_image_horizontal, output.hac_clusters_horizontal, **hac_params) ml.pca(summary_df, output.pca_image, output.pca_variance, output.pca_coordinates, **pca_params) @@ -422,7 +436,7 @@ rule ensemble_per_algo: output: ensemble_network_file = SEP.join([out_dir,'{dataset}-ml', '{algorithm}-ensemble-pathway.txt']) run: - summary_df = ml.summarize_networks(input.pathways) + summary_df = ml.summarize_networks(filter_successful(input.pathways)) ml.ensemble_network(summary_df, output.ensemble_network_file) # Calculated Jaccard similarity between output pathways for each dataset per algorithm @@ -433,7 +447,7 @@ rule jaccard_similarity_per_algo: jaccard_similarity_matrix = SEP.join([out_dir, '{dataset}-ml', '{algorithm}-jaccard-matrix.txt']), jaccard_similarity_heatmap = SEP.join([out_dir, '{dataset}-ml', '{algorithm}-jaccard-heatmap.png']) run: - summary_df = ml.summarize_networks(input.pathways) + summary_df = ml.summarize_networks(filter_successful(input.pathways)) ml.jaccard_similarity_eval(summary_df, output.jaccard_similarity_matrix, output.jaccard_similarity_heatmap) # Return the gold standard pickle file for a specific gold standard @@ -464,7 +478,7 @@ rule evaluation_pr_per_pathways: node_pr_png = SEP.join([out_dir, '{dataset_gold_standard_pair}-eval', 'pr-per-pathway-nodes.png']), run: node_table = Evaluation.from_file(input.node_gold_standard_file).node_table - pr_df = Evaluation.node_precision_and_recall(input.pathways, node_table) + pr_df = Evaluation.node_precision_and_recall(filter_successful(input.pathways), node_table) Evaluation.precision_and_recall_per_pathway(pr_df, output.node_pr_file, output.node_pr_png) # Returns all pathways for a specific algorithm and dataset @@ -483,7 +497,7 @@ rule evaluation_per_algo_pr_per_pathways: node_pr_png = SEP.join([out_dir, '{dataset_gold_standard_pair}-eval', 'pr-per-pathway-for-{algorithm}-nodes.png']), run: node_table = Evaluation.from_file(input.node_gold_standard_file).node_table - pr_df = Evaluation.node_precision_and_recall(input.pathways, node_table) + pr_df = Evaluation.node_precision_and_recall(filter_successful(input.pathways), node_table) Evaluation.precision_and_recall_per_pathway(pr_df, output.node_pr_file, output.node_pr_png, include_aggregate_algo_eval) # Return pathway summary file per dataset diff --git a/spras/containers.py b/spras/containers.py index 63fb7170..57f2fc29 100644 --- a/spras/containers.py +++ b/spras/containers.py @@ -360,6 +360,8 @@ def run_container_docker( try: container_obj.wait(timeout=timeout) except requests.exceptions.ReadTimeout: + container_obj.stop() + client.close() if timeout: raise TimeoutError(timeout) else: raise RuntimeError("Timeout error but no timeout specified. Please file an issue with this error and stacktrace at https://github.com/Reed-CompBio/spras/issues/new.") From 7abd7093f4fd008407d8ae2d3e93bfec4d615cf1 Mon Sep 17 00:00:00 2001 From: "Tristan F." Date: Tue, 13 Jan 2026 07:22:14 -0800 Subject: [PATCH 5/8] fix: correct timeout order --- Snakefile | 2 +- spras/allpairs.py | 2 +- spras/btb.py | 2 +- spras/config/algorithms.py | 2 +- spras/config/config.py | 2 +- spras/containers.py | 10 +++++----- spras/domino.py | 5 +++-- spras/meo.py | 2 +- spras/mincostflow.py | 2 +- spras/omicsintegrator1.py | 2 +- spras/omicsintegrator2.py | 2 +- spras/pathlinker.py | 2 +- spras/prm.py | 8 ++++---- spras/responsenet.py | 2 +- spras/runner.py | 6 +++--- spras/rwr.py | 2 +- spras/strwr.py | 2 +- 17 files changed, 28 insertions(+), 27 deletions(-) diff --git a/Snakefile b/Snakefile index afb2ad99..5e0330a0 100644 --- a/Snakefile +++ b/Snakefile @@ -304,7 +304,7 @@ checkpoint reconstruct: if '_spras_run_name' in algorithm_params: algorithm_params.pop('_spras_run_name') try: - runner.run(wildcards.algorithm, inputs, output.pathway_file, params.timeout, algorithm_params, container_settings) + runner.run(wildcards.algorithm, inputs, output.pathway_file, algorithm_params, container_settings, params.timeout) Path(output.resource_info).write_text(json.dumps({"status": "success"})) except TimeoutError as err: # We don't raise the error here (and use `--keep-going` to avoid re-running this rule [or others!] unnecessarily.) diff --git a/spras/allpairs.py b/spras/allpairs.py index 500e864f..d015acfd 100644 --- a/spras/allpairs.py +++ b/spras/allpairs.py @@ -74,7 +74,7 @@ def generate_inputs(data: Dataset, filename_map): header=["#Interactor1", "Interactor2", "Weight"]) @staticmethod - def run(inputs, output_file, timeout, args=None, container_settings=None): + def run(inputs, output_file, args=None, container_settings=None, timeout=None): if not container_settings: container_settings = ProcessedContainerSettings() AllPairs.validate_required_run_args(inputs) diff --git a/spras/btb.py b/spras/btb.py index e19ed5d1..8c835887 100644 --- a/spras/btb.py +++ b/spras/btb.py @@ -72,7 +72,7 @@ def generate_inputs(data, filename_map): # Skips parameter validation step @staticmethod - def run(inputs, output_file, timeout, args=None, container_settings=None): + def run(inputs, output_file, args=None, container_settings=None, timeout=None): if not container_settings: container_settings = ProcessedContainerSettings() BowTieBuilder.validate_required_run_args(inputs) diff --git a/spras/config/algorithms.py b/spras/config/algorithms.py index d918f533..5abb4dbf 100644 --- a/spras/config/algorithms.py +++ b/spras/config/algorithms.py @@ -5,7 +5,7 @@ """ import ast import copy -from typing import Annotated, Any, Callable, Literal, Union, cast, get_args, Optional +from typing import Annotated, Any, Callable, Literal, Optional, Union, cast, get_args import numpy as np from pydantic import ( diff --git a/spras/config/config.py b/spras/config/config.py index 63e8e2d6..819fd74a 100644 --- a/spras/config/config.py +++ b/spras/config/config.py @@ -18,9 +18,9 @@ import warnings from typing import Any, Optional -from pytimeparse import parse import numpy as np import yaml +from pytimeparse import parse from spras.config.container_schema import ProcessedContainerSettings from spras.config.schema import RawConfig diff --git a/spras/containers.py b/spras/containers.py index 57f2fc29..2f199564 100644 --- a/spras/containers.py +++ b/spras/containers.py @@ -1,7 +1,6 @@ import os import platform import re -import requests import subprocess import textwrap from pathlib import Path, PurePath, PurePosixPath @@ -9,6 +8,7 @@ import docker import docker.errors +import requests from spras.config.container_schema import ProcessedContainerSettings from spras.logging import indent @@ -262,7 +262,7 @@ def run_container_and_log( volumes=volumes, working_dir=working_dir, out_dir=out_dir, - container_settings=container_settings, + container_settings=container_settings, timeout=timeout, environment=environment, network_disabled=network_disabled @@ -359,11 +359,11 @@ def run_container_docker( try: container_obj.wait(timeout=timeout) - except requests.exceptions.ReadTimeout: + except requests.exceptions.ReadTimeout as err: container_obj.stop() client.close() - if timeout: raise TimeoutError(timeout) - else: raise RuntimeError("Timeout error but no timeout specified. Please file an issue with this error and stacktrace at https://github.com/Reed-CompBio/spras/issues/new.") + if timeout: raise TimeoutError(timeout) from err + else: raise RuntimeError("Timeout error but no timeout specified. Please file an issue with this error and stacktrace at https://github.com/Reed-CompBio/spras/issues/new.") from None out = container_obj.attach(stderr=True).decode('utf-8') diff --git a/spras/domino.py b/spras/domino.py index 1faaf9d6..eda2934f 100644 --- a/spras/domino.py +++ b/spras/domino.py @@ -83,7 +83,7 @@ def generate_inputs(data, filename_map): header=['ID_interactor_A', 'ppi', 'ID_interactor_B']) @staticmethod - def run(inputs, output_file, timeout, args=None, container_settings=None): + def run(inputs, output_file, args=None, container_settings=None, timeout=None): if not container_settings: container_settings = ProcessedContainerSettings() if not args: args = DominoParams() DOMINO.validate_required_run_args(inputs) @@ -156,7 +156,8 @@ def run(inputs, output_file, timeout, args=None, container_settings=None): volumes, work_dir, out_dir, - container_settings) + container_settings, + timeout) except ContainerError as err: # Occurs when DOMINO gets passed some empty dataframe from network_file. # This counts as an empty input, so we return an empty output. diff --git a/spras/meo.py b/spras/meo.py index b7be4db9..94e90c92 100644 --- a/spras/meo.py +++ b/spras/meo.py @@ -145,7 +145,7 @@ def generate_inputs(data, filename_map): # TODO document required arguments @staticmethod - def run(inputs, output_file, timeout, args=None, container_settings=None): + def run(inputs, output_file, args=None, container_settings=None, timeout=None): """ Run Maximum Edge Orientation in the Docker image with the provided parameters. The properties file is generated from the provided arguments. diff --git a/spras/mincostflow.py b/spras/mincostflow.py index cbb373d2..2a8a8d58 100644 --- a/spras/mincostflow.py +++ b/spras/mincostflow.py @@ -76,7 +76,7 @@ def generate_inputs(data, filename_map): header=False) @staticmethod - def run(inputs, output_file, timeout, args=None, container_settings=None): + def run(inputs, output_file, args=None, container_settings=None, timeout=None): if not container_settings: container_settings = ProcessedContainerSettings() if not args: args = MinCostFlowParams() MinCostFlow.validate_required_run_args(inputs) diff --git a/spras/omicsintegrator1.py b/spras/omicsintegrator1.py index 7a053ec9..0d913ec1 100644 --- a/spras/omicsintegrator1.py +++ b/spras/omicsintegrator1.py @@ -158,7 +158,7 @@ def generate_inputs(data, filename_map): # TODO add support for knockout argument # TODO add reasonable default values @staticmethod - def run(inputs, output_file, timeout, args, container_settings=None): + def run(inputs, output_file, args, container_settings=None, timeout=None): if not container_settings: container_settings = ProcessedContainerSettings() OmicsIntegrator1.validate_required_run_args(inputs, ["dummy_nodes"]) diff --git a/spras/omicsintegrator2.py b/spras/omicsintegrator2.py index 8994dd1f..25b4d5f2 100644 --- a/spras/omicsintegrator2.py +++ b/spras/omicsintegrator2.py @@ -108,7 +108,7 @@ def generate_inputs(data: Dataset, filename_map): # TODO add reasonable default values @staticmethod - def run(inputs, output_file, timeout, args=None, container_settings=None): + def run(inputs, output_file, args=None, container_settings=None, timeout=None): if not container_settings: container_settings = ProcessedContainerSettings() if not args: args = OmicsIntegrator2Params() OmicsIntegrator2.validate_required_run_args(inputs) diff --git a/spras/pathlinker.py b/spras/pathlinker.py index eb91c49c..2f41dcc3 100644 --- a/spras/pathlinker.py +++ b/spras/pathlinker.py @@ -75,7 +75,7 @@ def generate_inputs(data, filename_map): header=["#Interactor1","Interactor2","Weight"]) @staticmethod - def run(inputs, output_file, timeout, args=None, container_settings=None): + def run(inputs, output_file, args=None, container_settings=None, timeout=None): if not container_settings: container_settings = ProcessedContainerSettings() if not args: args = PathLinkerParams() PathLinker.validate_required_run_args(inputs) diff --git a/spras/prm.py b/spras/prm.py index 2f79c376..663587f9 100644 --- a/spras/prm.py +++ b/spras/prm.py @@ -59,7 +59,7 @@ def get_params_generic(cls) -> type[T]: # This is used in `runner.py` to avoid a dependency diamond when trying # to import the actual algorithm schema. @classmethod - def run_typeless(cls, inputs: dict[str, str | os.PathLike], output_file: str | os.PathLike, timeout: Optional[int], args: dict[str, Any], container_settings: ProcessedContainerSettings): + def run_typeless(cls, inputs: dict[str, str | os.PathLike], output_file: str | os.PathLike, args: dict[str, Any], container_settings: ProcessedContainerSettings, timeout: Optional[int]): """ This is similar to PRA.run, but `args` is a dictionary and not a pydantic structure. However, this method still re-validates `args` against the associated pydantic PRM argument model. @@ -76,18 +76,18 @@ def run_typeless(cls, inputs: dict[str, str | os.PathLike], output_file: str | o # (Pydantic already provides nice error messages, so we don't need to worry about catching this.) T_parsed = T_class.model_validate(args) - return cls.run(inputs, output_file, timeout, T_parsed, container_settings) + return cls.run(inputs, output_file, T_parsed, container_settings, timeout) @staticmethod @abstractmethod - def run(inputs: dict[str, str | os.PathLike], output_file: str | os.PathLike, timeout: Optional[int], args: T, container_settings: ProcessedContainerSettings): + def run(inputs: dict[str, str | os.PathLike], output_file: str | os.PathLike, args: T, container_settings: ProcessedContainerSettings, timeout: Optional[int]): """ Runs an algorithm. @param inputs: specified inputs @param output_file: designated reconstructed pathway output - @param timeout: timeout in seconds to run the container with @param args: (T) typed algorithm params @param container_settings: what settings should be associated with the individual container. + @param timeout: timeout in seconds to run the container with See the algorithm-specific `PRM.generate_inputs` and `PRM.parse_output` for information about the input and output format. diff --git a/spras/responsenet.py b/spras/responsenet.py index 565f02da..0c813df8 100644 --- a/spras/responsenet.py +++ b/spras/responsenet.py @@ -68,7 +68,7 @@ def generate_inputs(data, filename_map): header=False) @staticmethod - def run(inputs, output_file, timeout, args=None, container_settings=None): + def run(inputs, output_file, args=None, container_settings=None, timeout=None): if not container_settings: container_settings = ProcessedContainerSettings() ResponseNet.validate_required_run_args(inputs) if not args: args = ResponseNetParams() diff --git a/spras/runner.py b/spras/runner.py index b5e68910..a9f251ba 100644 --- a/spras/runner.py +++ b/spras/runner.py @@ -41,9 +41,9 @@ def run( algorithm: str, inputs: dict[str, str | PathLike], output_file: str | PathLike, - timeout: Optional[int], args: dict[str, Any], - container_settings: ProcessedContainerSettings + container_settings: ProcessedContainerSettings, + timeout: Optional[int] ): """ A generic interface to the algorithm-specific run functions @@ -51,7 +51,7 @@ def run( algorithm_runner = get_algorithm(algorithm) # We can't use config.config here else we would get a cyclic dependency. # Since args is a dict here, we use the 'run_typeless' utility PRM function. - algorithm_runner.run_typeless(inputs, output_file, timeout, args, container_settings) + algorithm_runner.run_typeless(inputs, output_file, args, container_settings, timeout) def get_required_inputs(algorithm: str): diff --git a/spras/rwr.py b/spras/rwr.py index e359767f..5f495885 100644 --- a/spras/rwr.py +++ b/spras/rwr.py @@ -56,7 +56,7 @@ def generate_inputs(data, filename_map): edges.to_csv(filename_map['network'],sep='|',index=False,columns=['Interactor1','Interactor2'],header=False) @staticmethod - def run(inputs, output_file, timeout, args, container_settings=None): + def run(inputs, output_file, args, container_settings=None, timeout=None): if not container_settings: container_settings = ProcessedContainerSettings() RWR.validate_required_run_args(inputs) diff --git a/spras/strwr.py b/spras/strwr.py index b25046a6..8058c0cd 100644 --- a/spras/strwr.py +++ b/spras/strwr.py @@ -58,7 +58,7 @@ def generate_inputs(data, filename_map): edges.to_csv(filename_map['network'],sep='|',index=False,columns=['Interactor1','Interactor2'],header=False) @staticmethod - def run(inputs, output_file, timeout, args, container_settings=None): + def run(inputs, output_file, args, container_settings=None, timeout=None): if not container_settings: container_settings = ProcessedContainerSettings() ST_RWR.validate_required_run_args(inputs) From e07c96148c0246b6449b41e22a0d733fa9ff7b13 Mon Sep 17 00:00:00 2001 From: "Tristan F." Date: Tue, 13 Jan 2026 15:34:36 +0000 Subject: [PATCH 6/8] fix(cytoscape): specify optional timeout --- spras/analysis/cytoscape.py | 1 + 1 file changed, 1 insertion(+) diff --git a/spras/analysis/cytoscape.py b/spras/analysis/cytoscape.py index e8489950..6eadfadd 100644 --- a/spras/analysis/cytoscape.py +++ b/spras/analysis/cytoscape.py @@ -58,5 +58,6 @@ def run_cytoscape(pathways: List[Union[str, PurePath]], output_file: str, contai # (https://github.com/Reed-CompBio/spras/pull/390/files#r2485100875) None, container_settings, + None, env) rmtree(cytoscape_output_dir) From d5b7e18b74fe609dd444b95b1cfc05f099d65bf6 Mon Sep 17 00:00:00 2001 From: "Tristan F." Date: Tue, 13 Jan 2026 16:03:31 +0000 Subject: [PATCH 7/8] chore(Snakefile): decheckpointify reconstruct --- Snakefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Snakefile b/Snakefile index 5e0330a0..6fbb0104 100644 --- a/Snakefile +++ b/Snakefile @@ -279,7 +279,7 @@ def filter_successful(files): return [file for file in files if not is_error(file)] # Run the pathway reconstruction algorithm -checkpoint reconstruct: +rule reconstruct: input: collect_prepared_input # Each reconstruct call should be in a separate output subdirectory that is unique for the parameter combination so # that multiple instances of the container can run simultaneously without overwriting the output files From 111e53fda43bcad5d97bc1f215e2ab51f2e85252 Mon Sep 17 00:00:00 2001 From: "Tristan F." Date: Tue, 13 Jan 2026 17:48:22 +0000 Subject: [PATCH 8/8] perf(Snakefile): make is_error check not consume the entire file --- Snakefile | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Snakefile b/Snakefile index 6fbb0104..f0c74549 100644 --- a/Snakefile +++ b/Snakefile @@ -270,7 +270,8 @@ def mark_error(file, **details): def is_error(file): """Checks if a file was produced by mark_error.""" try: - return json.loads(Path(file).read_bytes())["status"] == "error" + with open(file, 'r') as f: + json.load(f)["status"] == "error" except ValueError: return False