Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 58 additions & 17 deletions Snakefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import os
from spras import runner
import shutil
import json
import yaml
from pathlib import Path
from spras.containers import TimeoutError
from spras.dataset import Dataset
from spras.evaluation import Evaluation
from spras.analysis import ml, summary, cytoscape
Expand Down Expand Up @@ -260,6 +263,22 @@ def collect_prepared_input(wildcards):

return prepared_inputs

def mark_error(file, **details):
"""Marks a file as an error with associated details."""
Path(file).write_text(json.dumps({"status": "error", **details}))

def is_error(file):
"""Checks if a file was produced by mark_error."""
try:
with open(file, 'r') as f:
json.load(f)["status"] == "error"
except ValueError:
return False

def filter_successful(files):
"""Convenient function for filtering iterators by whether or not their items are error files."""
return [file for file in files if not is_error(file)]

# Run the pathway reconstruction algorithm
rule reconstruct:
input: collect_prepared_input
Expand All @@ -268,25 +287,47 @@ rule reconstruct:
# Overwriting files can happen because the pathway reconstruction algorithms often generate output files with the
# same name regardless of the inputs or parameters, and these aren't renamed until after the container command
# terminates
output: pathway_file = SEP.join([out_dir, '{dataset}-{algorithm}-{params}', 'raw-pathway.txt'])
output:
pathway_file = SEP.join([out_dir, '{dataset}-{algorithm}-{params}', 'raw-pathway.txt']),
# Despite this being a 'log' file, we don't use the log directive as this rule doesn't actually throw errors.
resource_info = SEP.join([out_dir, '{dataset}-{algorithm}-{params}', 'resource-log.json'])
params:
# Get the timeout from the config and use it as an input.
# TODO: This has unexpected behavior when this rule succeeds but the timeout extends,
# making this rule run again.
timeout = lambda wildcards: _config.config.algorithm_timeouts[wildcards.algorithm]
run:
# Create a copy so that the updates are not written to the parameters logfile
params = reconstruction_params(wildcards.algorithm, wildcards.params).copy()
algorithm_params = reconstruction_params(wildcards.algorithm, wildcards.params).copy()
# Declare the input files as a dictionary.
inputs = dict(zip(runner.get_required_inputs(wildcards.algorithm), *{input}, strict=True))
# Remove the _spras_run_name parameter added for keeping track of the run name for parameters.yml
if '_spras_run_name' in params:
params.pop('_spras_run_name')
runner.run(wildcards.algorithm, inputs, output.pathway_file, params, container_settings)
if '_spras_run_name' in algorithm_params:
algorithm_params.pop('_spras_run_name')
try:
runner.run(wildcards.algorithm, inputs, output.pathway_file, algorithm_params, container_settings, params.timeout)
Path(output.resource_info).write_text(json.dumps({"status": "success"}))
except TimeoutError as err:
# We don't raise the error here (and use `--keep-going` to avoid re-running this rule [or others!] unnecessarily.)
Path(output.resource_info).write_text(json.dumps({"status": "error", "type": "timeout", "duration": params.timeout}))
# and we touch pathway_file still: Snakemake doesn't have optional files, so
# we'll filter the ones that didn't time out by passing around empty files.
Path(output.pathway_file).touch()

# Original pathway reconstruction output to universal output
# Use PRRunner as a wrapper to call the algorithm-specific parse_output
rule parse_output:
input:
raw_file = SEP.join([out_dir, '{dataset}-{algorithm}-{params}', 'raw-pathway.txt']),
input:
raw_file = rules.reconstruct.output.pathway_file,
resource_info = rules.reconstruct.output.resource_info,
dataset_file = SEP.join([out_dir, 'dataset-{dataset}-merged.pickle'])
output: standardized_file = SEP.join([out_dir, '{dataset}-{algorithm}-{params}', 'pathway.txt'])
run:
resource_info = json.loads(Path(input.resource_info).read_bytes())
if resource_info["status"] != "success":
mark_error(output.standardized_file)
return

params = reconstruction_params(wildcards.algorithm, wildcards.params).copy()
params['dataset'] = input.dataset_file
runner.parse_output(wildcards.algorithm, input.raw_file, output.standardized_file, params)
Expand All @@ -308,7 +349,7 @@ rule viz_cytoscape:
output:
session = SEP.join([out_dir, '{dataset}-cytoscape.cys'])
run:
cytoscape.run_cytoscape(input.pathways, output.session, container_settings)
cytoscape.run_cytoscape(filter_successful(input.pathways), output.session, container_settings)


# Write a single summary table for all pathways for each dataset
Expand All @@ -321,7 +362,7 @@ rule summary_table:
run:
# Load the node table from the pickled dataset file
node_table = Dataset.from_file(input.dataset_file).node_table
summary_df = summary.summarize_networks(input.pathways, node_table, algorithm_params, algorithms_with_params)
summary_df = summary.summarize_networks(filter_successful(input.pathways), node_table, algorithm_params, algorithms_with_params)
summary_df.to_csv(output.summary_table, sep='\t', index=False)

# Cluster the output pathways for each dataset
Expand All @@ -337,7 +378,7 @@ rule ml_analysis:
hac_image_horizontal = SEP.join([out_dir, '{dataset}-ml', 'hac-horizontal.png']),
hac_clusters_horizontal = SEP.join([out_dir, '{dataset}-ml', 'hac-clusters-horizontal.txt']),
run:
summary_df = ml.summarize_networks(input.pathways)
summary_df = ml.summarize_networks(filter_successful(input.pathways))
ml.hac_vertical(summary_df, output.hac_image_vertical, output.hac_clusters_vertical, **hac_params)
ml.hac_horizontal(summary_df, output.hac_image_horizontal, output.hac_clusters_horizontal, **hac_params)
ml.pca(summary_df, output.pca_image, output.pca_variance, output.pca_coordinates, **pca_params)
Expand All @@ -351,7 +392,7 @@ rule jaccard_similarity:
jaccard_similarity_matrix = SEP.join([out_dir, '{dataset}-ml', 'jaccard-matrix.txt']),
jaccard_similarity_heatmap = SEP.join([out_dir, '{dataset}-ml', 'jaccard-heatmap.png'])
run:
summary_df = ml.summarize_networks(input.pathways)
summary_df = ml.summarize_networks(filter_successful(input.pathways))
ml.jaccard_similarity_eval(summary_df, output.jaccard_similarity_matrix, output.jaccard_similarity_heatmap)


Expand All @@ -362,7 +403,7 @@ rule ensemble:
output:
ensemble_network_file = SEP.join([out_dir,'{dataset}-ml', 'ensemble-pathway.txt'])
run:
summary_df = ml.summarize_networks(input.pathways)
summary_df = ml.summarize_networks(filter_successful(input.pathways))
ml.ensemble_network(summary_df, output.ensemble_network_file)

# Returns all pathways for a specific algorithm
Expand All @@ -384,7 +425,7 @@ rule ml_analysis_aggregate_algo:
hac_image_horizontal = SEP.join([out_dir, '{dataset}-ml', '{algorithm}-hac-horizontal.png']),
hac_clusters_horizontal = SEP.join([out_dir, '{dataset}-ml', '{algorithm}-hac-clusters-horizontal.txt']),
run:
summary_df = ml.summarize_networks(input.pathways)
summary_df = ml.summarize_networks(filter_successful(input.pathways))
ml.hac_vertical(summary_df, output.hac_image_vertical, output.hac_clusters_vertical, **hac_params)
ml.hac_horizontal(summary_df, output.hac_image_horizontal, output.hac_clusters_horizontal, **hac_params)
ml.pca(summary_df, output.pca_image, output.pca_variance, output.pca_coordinates, **pca_params)
Expand All @@ -396,7 +437,7 @@ rule ensemble_per_algo:
output:
ensemble_network_file = SEP.join([out_dir,'{dataset}-ml', '{algorithm}-ensemble-pathway.txt'])
run:
summary_df = ml.summarize_networks(input.pathways)
summary_df = ml.summarize_networks(filter_successful(input.pathways))
ml.ensemble_network(summary_df, output.ensemble_network_file)

# Calculated Jaccard similarity between output pathways for each dataset per algorithm
Expand All @@ -407,7 +448,7 @@ rule jaccard_similarity_per_algo:
jaccard_similarity_matrix = SEP.join([out_dir, '{dataset}-ml', '{algorithm}-jaccard-matrix.txt']),
jaccard_similarity_heatmap = SEP.join([out_dir, '{dataset}-ml', '{algorithm}-jaccard-heatmap.png'])
run:
summary_df = ml.summarize_networks(input.pathways)
summary_df = ml.summarize_networks(filter_successful(input.pathways))
ml.jaccard_similarity_eval(summary_df, output.jaccard_similarity_matrix, output.jaccard_similarity_heatmap)

# Return the gold standard pickle file for a specific gold standard
Expand Down Expand Up @@ -438,7 +479,7 @@ rule evaluation_pr_per_pathways:
node_pr_png = SEP.join([out_dir, '{dataset_gold_standard_pair}-eval', 'pr-per-pathway-nodes.png']),
run:
node_table = Evaluation.from_file(input.node_gold_standard_file).node_table
pr_df = Evaluation.node_precision_and_recall(input.pathways, node_table)
pr_df = Evaluation.node_precision_and_recall(filter_successful(input.pathways), node_table)
Evaluation.precision_and_recall_per_pathway(pr_df, output.node_pr_file, output.node_pr_png)

# Returns all pathways for a specific algorithm and dataset
Expand All @@ -457,7 +498,7 @@ rule evaluation_per_algo_pr_per_pathways:
node_pr_png = SEP.join([out_dir, '{dataset_gold_standard_pair}-eval', 'pr-per-pathway-for-{algorithm}-nodes.png']),
run:
node_table = Evaluation.from_file(input.node_gold_standard_file).node_table
pr_df = Evaluation.node_precision_and_recall(input.pathways, node_table)
pr_df = Evaluation.node_precision_and_recall(filter_successful(input.pathways), node_table)
Evaluation.precision_and_recall_per_pathway(pr_df, output.node_pr_file, output.node_pr_png, include_aggregate_algo_eval)

# Return pathway summary file per dataset
Expand Down
1 change: 1 addition & 0 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ algorithms:

- name: "allpairs"
include: true
timeout: 1d

- name: "domino"
include: true
Expand Down
1 change: 1 addition & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ dependencies:
- scikit-learn=1.7.0
- seaborn=0.13.2
- spython=0.3.14
- pytimeparse=1.1.8

# conda-specific for dsub
- python-dateutil=2.9.0
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ dependencies = [
"scikit-learn==1.7.0",
"seaborn==0.13.2",
"spython==0.3.14",
"pytimeparse==1.1.8",

# toolchain deps
"pip==25.3",
Expand Down
5 changes: 3 additions & 2 deletions spras/allpairs.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def generate_inputs(data: Dataset, filename_map):
header=["#Interactor1", "Interactor2", "Weight"])

@staticmethod
def run(inputs, output_file, args=None, container_settings=None):
def run(inputs, output_file, args=None, container_settings=None, timeout=None):
if not container_settings: container_settings = ProcessedContainerSettings()
AllPairs.validate_required_run_args(inputs)

Expand Down Expand Up @@ -111,7 +111,8 @@ def run(inputs, output_file, args=None, container_settings=None):
volumes,
work_dir,
out_dir,
container_settings)
container_settings,
timeout)

@staticmethod
def parse_output(raw_pathway_file, standardized_pathway_file, params):
Expand Down
1 change: 1 addition & 0 deletions spras/analysis/cytoscape.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,6 @@ def run_cytoscape(pathways: List[Union[str, PurePath]], output_file: str, contai
# (https://github.com/Reed-CompBio/spras/pull/390/files#r2485100875)
None,
container_settings,
None,
env)
rmtree(cytoscape_output_dir)
5 changes: 3 additions & 2 deletions spras/btb.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def generate_inputs(data, filename_map):

# Skips parameter validation step
@staticmethod
def run(inputs, output_file, args=None, container_settings=None):
def run(inputs, output_file, args=None, container_settings=None, timeout=None):
if not container_settings: container_settings = ProcessedContainerSettings()
BowTieBuilder.validate_required_run_args(inputs)

Expand Down Expand Up @@ -130,7 +130,8 @@ def run(inputs, output_file, args=None, container_settings=None):
volumes,
work_dir,
out_dir,
container_settings)
container_settings,
timeout)
# Output is already written to raw-pathway.txt file


Expand Down
3 changes: 2 additions & 1 deletion spras/config/algorithms.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"""
import ast
import copy
from typing import Annotated, Any, Callable, Literal, Union, cast, get_args
from typing import Annotated, Any, Callable, Literal, Optional, Union, cast, get_args

import numpy as np
from pydantic import (
Expand Down Expand Up @@ -167,6 +167,7 @@ def construct_algorithm_model(name: str, model: type[BaseModel]) -> type[BaseMod
return create_model(
f'{name}Model',
name=Literal[name],
timeout=(Optional[str], None),
include=bool,
# For algorithms that have a default parameter config, we allow arbitrarily running an algorithm
# if no runs are specified. For example, the following config
Expand Down
15 changes: 14 additions & 1 deletion spras/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
import itertools as it
import os
import warnings
from typing import Any
from typing import Any, Optional

import numpy as np
import yaml
from pytimeparse import parse

from spras.config.container_schema import ProcessedContainerSettings
from spras.config.schema import RawConfig
Expand Down Expand Up @@ -73,6 +74,8 @@ def __init__(self, raw_config: dict[str, Any]):
self.container_settings = ProcessedContainerSettings.from_container_settings(parsed_raw_config.containers, self.hash_length)
# The list of algorithms to run in the workflow. Each is a dict with 'name' as an expected key.
self.algorithms = None
# Dictionary of algorithms to their respective timeout in seconds
self.algorithm_timeouts: dict[str, Optional[int]] = dict()
# A nested dict mapping algorithm names to dicts that map parameter hashes to parameter combinations.
# Only includes algorithms that are set to be run with 'include: true'.
self.algorithm_params: dict[str, dict[str, Any]] = dict()
Expand Down Expand Up @@ -156,6 +159,16 @@ def process_algorithms(self, raw_config: RawConfig):
# Do not parse the rest of the parameters for this algorithm if it is not included
continue

if alg.timeout:
# Coerce to an `int` if an int isn't possible.
timeout = parse(alg.timeout, granularity='seconds')
if not timeout: raise RuntimeError(f"Algorithm {alg} has unparsable timeout string {alg.timeout}.")
self.algorithm_timeouts[alg.name] = int(timeout)
else:
# As per the type signature, we still want to say explicitly that this algorithm's timeout
# is uninhabited.
self.algorithm_timeouts[alg.name] = None

runs: dict[str, Any] = alg.runs

# Each set of runs should be 1 level down in the config file
Expand Down
Loading
Loading