-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrun.py
More file actions
78 lines (62 loc) · 2.34 KB
/
run.py
File metadata and controls
78 lines (62 loc) · 2.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
#!/usr/bin/env python3
# SPDX-License-Identifier: MIT
#
# █████╗ ██████╗ █████╗ ███████╗
# ██╔══██╗██╔══██╗██╔══██╗██╔════╝
# ███████║██████╔╝███████║███████╗
# ██╔══██║██╔══██╗██╔══██║╚════██║
# ██║ ██║██║ ██║██║ ██║███████║
# ╚═╝ ╚═╝╚═╝ ╚═╝╚═╝ ╚═╝╚══════╝
# Copyright (C) 2026 Riza Emre ARAS <r.emrearas@proton.me>
#
# Licensed under the MIT License.
# See LICENSE and THIRD_PARTY_LICENSES for details.
"""Run a workflow definition to build a corpus.
Usage:
python run.py --workflow workflows/eu-ai-act.yaml
"""
from __future__ import annotations
import argparse
import sys
from datetime import datetime, timezone
from pathlib import Path
# Engine lives in workflow-engine/
_ENGINE_DIR = Path(__file__).resolve().parent / "workflow-engine"
sys.path.insert(0, str(_ENGINE_DIR))
from src.config import load_config
from src.logger import get_logger
from src.pipeline import run_pipeline
log = get_logger("run")
def main() -> int:
parser = argparse.ArgumentParser(
prog="run",
description="Run a workflow definition to build a corpus",
)
parser.add_argument(
"--workflow",
type=Path,
required=True,
help="Path to workflow YAML file (e.g. workflows/eu-ai-act.yaml)",
)
args = parser.parse_args()
workflow = args.workflow.resolve()
if not workflow.exists():
log.error("Workflow file not found: %s", workflow)
return 1
cfg_result = load_config(workflow)
if not cfg_result.ok:
log.error(cfg_result.error)
return 1
stamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
stem = workflow.stem
output_dir = Path("dist") / f"{stem}-{stamp}" / "corpus"
output_dir = output_dir.resolve()
log.info("Workflow: %s", workflow.name)
log.info("Output: %s", output_dir)
result = run_pipeline(cfg_result.data, output_dir=output_dir)
if not result.ok:
log.error("Pipeline failed: %s", result.error)
return 1
return 0
if __name__ == "__main__":
sys.exit(main())