forked from oist/optinist
-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathrun_cluster.py
More file actions
138 lines (114 loc) · 5.28 KB
/
run_cluster.py
File metadata and controls
138 lines (114 loc) · 5.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import argparse
import os
import shutil
import tempfile
from pathlib import Path
from snakemake import snakemake
from studio.app.common.core.experiment.experiment import ExptOutputPathIds
from studio.app.common.core.utils.config_handler import ConfigReader
from studio.app.common.core.utils.filepath_creater import join_filepath
from studio.app.dir_path import DIRPATH
def main(args):
# Temp dir for snakemake config in case of permission errors on cluster.
# Output files saved to config file path
with tempfile.TemporaryDirectory() as temp_dir:
temp_workdir = Path(temp_dir)
print(f"Temporary work directory created at: {temp_workdir}")
config_file_path = None
temp_config_file_path = None
if args.config is None:
raise FileNotFoundError(
"Please provide snakemake file path --config='my/path/snakemake.yaml'"
)
else:
if not (args.config.endswith(".yml") or args.config.endswith(".yaml")):
config_file_path = join_filepath(
[args.config, DIRPATH.SNAKEMAKE_CONFIG_YML]
)
else:
config_file_path = args.config
if not Path(config_file_path).exists():
raise FileNotFoundError(f"Config file not found: {args.config}")
temp_config_file_path = join_filepath(
[str(temp_workdir), Path(config_file_path).name]
)
shutil.copyfile(config_file_path, temp_config_file_path)
print(
f"Config file copied from {config_file_path} to {temp_config_file_path}"
)
# Main snakemake execution
snakemake_args = {
"snakefile": DIRPATH.SNAKEMAKE_FILEPATH,
"forceall": args.forceall,
"cores": args.cores,
"use_conda": args.use_conda,
"workdir": f"{os.path.dirname(DIRPATH.STUDIO_DIR)}",
}
if config_file_path is not None:
snakemake_args["configfiles"] = [str(temp_config_file_path)]
if args.use_conda:
snakemake_args["conda_prefix"] = DIRPATH.SNAKEMAKE_CONDA_ENV_DIR
print(f"Snakemake arguments: {snakemake_args}")
try:
result = snakemake(**snakemake_args)
if result:
print("snakemake execution succeeded.")
# Copy config file back to output directory for future reference
try:
# Find the output directory from the last_output in config
config_data = ConfigReader.read(config_file_path)
if config_data and "last_output" in config_data:
# Extract the last output path
last_output_path = config_data["last_output"][0]
# Construct absolute path and extract directory
absolute_output_path = join_filepath(
[DIRPATH.OUTPUT_DIR, last_output_path]
)
# Extract the workspace_id and unique_id
path_ids = ExptOutputPathIds(
os.path.dirname(absolute_output_path)
)
# Create the output directory path to copy the config file
output_config_dir = join_filepath(
[
DIRPATH.OUTPUT_DIR,
path_ids.workspace_id,
path_ids.unique_id,
]
)
os.makedirs(output_config_dir, exist_ok=True)
output_config_path = join_filepath(
[output_config_dir, DIRPATH.SNAKEMAKE_CONFIG_YML]
)
shutil.copyfile(config_file_path, output_config_path)
print(
f"Config copied to output directory: {output_config_path}"
)
else:
print(
"Warning: No output path found in config, "
"skipping config copy"
)
except Exception as e:
print(f"Warning: Failed to copy config to output directory: {e}")
else:
print("snakemake execution failed.")
print("Check for errors above in the snakemake output.")
except Exception as e:
print(f"snakemake execution failed with exception: {e}")
import traceback
print("Full traceback:")
traceback.print_exc()
result = False
return result
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="optinist")
parser.add_argument("--cores", type=int, default=2)
parser.add_argument( # Default true, use --no-forceall to disable forceall
"--forceall", default=True, action=argparse.BooleanOptionalAction
)
parser.add_argument( # Default true, use --no-use_conda to disable conda usage
"--use_conda", default=True, action=argparse.BooleanOptionalAction
)
parser.add_argument("--config", type=str, default=None)
main(parser.parse_args())