forked from arayabrain/barebone-studio
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrun_cluster.py
More file actions
101 lines (85 loc) · 3.34 KB
/
run_cluster.py
File metadata and controls
101 lines (85 loc) · 3.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import argparse
import shutil
import tempfile
from pathlib import Path
from snakemake.api import (
DAGSettings,
DeploymentMethod,
DeploymentSettings,
OutputSettings,
ResourceSettings,
SnakemakeApi,
StorageSettings,
)
from studio.app.common.core.utils.filepath_creater import join_filepath
from studio.app.dir_path import DIRPATH
def main(args):
# Temp dir for snakemake config in case of permission errors on cluster.
# Output files saved to config file path
with tempfile.TemporaryDirectory() as temp_dir:
temp_workdir = Path(temp_dir)
print(f"Temporary work directory created at: {temp_workdir}")
config_file_path = None
if args.config is None:
raise FileNotFoundError(
"Please provide snakemake file path --config='my/path/snakemake.yaml'"
)
else:
if not (args.config.endswith(".yml") or args.config.endswith(".yaml")):
config_file_path = join_filepath(
[args.config, DIRPATH.SNAKEMAKE_CONFIG_YML]
)
else:
config_file_path = args.config
if not Path(config_file_path).exists():
raise FileNotFoundError(f"Config file not found: {args.config}")
shutil.copyfile(
config_file_path,
join_filepath([str(temp_workdir), "snakemake.yaml"]),
)
print(f"Config file copied from {config_file_path} to {temp_workdir}")
# Determine deployment methods
deployment_methods = []
deployment_methods.append(DeploymentMethod.CONDA)
# Use new SnakemakeApi pattern
with SnakemakeApi(
OutputSettings(
verbose=True,
show_failed_logs=True,
),
) as snakemake_api:
workflow_api = snakemake_api.workflow(
snakefile=Path(DIRPATH.SNAKEMAKE_FILEPATH),
workdir=temp_workdir,
storage_settings=StorageSettings(),
resource_settings=ResourceSettings(cores=args.cores),
deployment_settings=DeploymentSettings(
deployment_method=deployment_methods,
conda_frontend="conda",
conda_prefix=DIRPATH.SNAKEMAKE_CONDA_ENV_DIR,
),
)
dag_settings = DAGSettings(
forceall=args.forceall,
)
dag_api = workflow_api.dag(
dag_settings=dag_settings,
)
try:
dag_api.execute_workflow()
print("Workflow execution completed successfully")
return True
except Exception as e:
print(f"Workflow execution failed: {e}")
return False
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="optinist")
parser.add_argument("--cores", type=int, default=2)
parser.add_argument( # Default true, use --no-forceall to disable forceall
"--forceall", default=True, action=argparse.BooleanOptionalAction
)
parser.add_argument( # Default true, use --no-use_conda to disable conda usage
"--use_conda", default=True, action=argparse.BooleanOptionalAction
)
parser.add_argument("--config", type=str, default=None)
main(parser.parse_args())