From 5a1f5dcd2b0e075d52216109b66698ab3943bb8a Mon Sep 17 00:00:00 2001 From: zuolingxuan Date: Wed, 1 Apr 2026 21:53:28 +0800 Subject: [PATCH] add python workspace run tracking --- AGENTS.md | 2 + README-zh.md | 238 +++-- README.md | 240 +++-- python_api/BUILD.bazel | 38 + python_api/README.md | 97 ++ python_api/sync_native_extension.py | 27 + python_api/tests/test_artifact_index.py | 113 +++ python_api/tests/test_python_cli_contract.py | 85 +- python_api/tests/test_streaming_v05.py | 4 +- python_api/tests/test_workspace_runs.py | 135 +++ python_api/velaria/workspace/__init__.py | 31 + .../velaria/workspace/artifact_index.py | 399 ++++++++ python_api/velaria/workspace/paths.py | 31 + python_api/velaria/workspace/run_store.py | 135 +++ python_api/velaria/workspace/types.py | 46 + python_api/velaria_cli.py | 902 ++++++++++++++++-- scripts/build_py_cli_executable.sh | 12 +- scripts/run_python_ci_checks.sh | 1 + scripts/run_python_ecosystem_regression.sh | 1 + skills/velaria_python_local/SKILL.md | 208 +++- src/dataflow/interop/python/python_module.cc | 26 + 21 files changed, 2433 insertions(+), 338 deletions(-) create mode 100644 python_api/sync_native_extension.py create mode 100644 python_api/tests/test_artifact_index.py create mode 100644 python_api/tests/test_workspace_runs.py create mode 100644 python_api/velaria/workspace/__init__.py create mode 100644 python_api/velaria/workspace/artifact_index.py create mode 100644 python_api/velaria/workspace/paths.py create mode 100644 python_api/velaria/workspace/run_store.py create mode 100644 python_api/velaria/workspace/types.py diff --git a/AGENTS.md b/AGENTS.md index d6af52a..e1f4d3c 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -20,6 +20,8 @@ - 单节点示例命令保持可用,不要为了多进程实验破坏 `sql_demo / df_demo / stream_demo`。 - 所有 Python 相关命令统一显式使用 `uv` 执行,包括测试、脚本、依赖安装;不要直接调用 `python` / `pip`。 - `README.md` 保持英文,`README-zh.md` 保持中文;后续修改 README 内容时必须同步更新这两份文档。 +- `skills/*.md` 面向最终用户使用说明,不写仓库内部编译、Bazel 构建、源码同步或其他实现侧操作;只保留用户可直接执行的使用方式、参数说明与输入输出约束。 +- 仓库文档若展示 Python CLI 命令,必须使用仓库内真实可见入口:源码脚本 `uv run --project python_api python python_api/velaria_cli.py ...`,或已打包产物 `./dist/velaria-cli ...`;不要默认写成全局可执行的 `velaria-cli ...`,除非文档已明确提供安装该命令的步骤。 ## 命名与术语约束 diff --git a/README-zh.md b/README-zh.md index d675571..0caefc4 100644 --- a/README-zh.md +++ b/README-zh.md @@ -1,56 +1,25 @@ # Velaria:纯 C++17 本地数据流内核 -`README-zh.md` 是中文镜像文档,对应英文主文档位于 [README.md](./README.md)。后续修改必须保持这两份文件结构和语义同步。 +`README-zh.md` 是中文镜像文档,对应英文主文档位于 [README.md](./README.md)。两份文档必须保持同步。 -Velaria 是一个本地优先的 C++17 数据流引擎研究项目。仓库现在围绕“一个内核 + 两个非内核层”组织: +Velaria 是一个本地优先的 C++17 数据流引擎研究项目。当前目标保持收敛: -- `Core Kernel` - - 本地执行语义 - - batch + stream 共用一个模型 - - 稳定的 explain / progress / checkpoint contract -- `Python Ecosystem` - - 正式支持 Arrow / wheel / CLI / `uv` / Excel / Bitable / custom stream adapter - - 向外投影内核能力,但不进入热路径 -- `Experimental Runtime` - - 同机 `actor/rpc/jobmaster` - - 用于执行与观测研究,不是第二套内核 +- 保持一个 native kernel 作为执行真相来源 +- 稳住单机链路 +- 通过正式支持的 Python 生态层向外暴露能力 +- 把同机 actor/rpc 路径放在实验通道,而不是做成第二套内核 -## 黄金路径 - -唯一黄金路径是: - -```text -Arrow / CSV / Python ingress - -> DataflowSession / DataFrame / StreamingDataFrame - -> local runtime kernel - -> sink - -> explain / progress / checkpoint -``` - -公开 session 入口: - -- `DataflowSession` - -核心对外对象: - -- `DataFrame` -- `StreamingDataFrame` -- `StreamingQuery` - -同机 actor/rpc 路径仍保留在仓库里,但不再作为主叙事。 - -## 仓库分层 +## 分层模型 ### Core Kernel -Core 负责: +负责: +- 本地 batch 与 streaming 执行 - logical planning 与最小 SQL 映射 -- table/value 执行模型 -- 本地 batch 与 streaming runtime - source/sink ABI -- runtime contract surface -- 本地 vector search 能力 +- explain / progress / checkpoint contract +- 本地 vector search 仓库入口: @@ -58,80 +27,89 @@ Core 负责: - [docs/core-boundary.md](./docs/core-boundary.md) - [docs/runtime-contract.md](./docs/runtime-contract.md) - [docs/streaming_runtime_design.md](./docs/streaming_runtime_design.md) -- Bazel source group: +- source group: - `//:velaria_core_logical_sources` - `//:velaria_core_execution_sources` - `//:velaria_core_contract_sources` -- 回归套件: +- regression: - `//:core_regression` ### Python Ecosystem -Python 是正式支持的生态层,不是顺手附带的 wrapper。 - -它包括: +负责: - `python_api` 里的 native binding - Arrow 输入与输出 - `uv` 工作流 - wheel / native wheel / CLI 打包 -- Excel 与 Bitable 适配 -- custom source / custom sink adapter -- `python_api/velaria_cli.py` 里的正式 CLI 工具入口 -- `python_api/examples` 里的 Python 生态 demo -- `python_api/benchmarks` 里的 Python benchmark +- Excel / Bitable / custom stream adapter +- 本地 run 与 artifact 的 workspace 跟踪 -它不定义: +不负责: -- 执行热路径行为 -- 独立的 progress/checkpoint 语义 -- 独立的 vector-search 语义 +- 执行热路径语义 +- 独立的 explain / progress / checkpoint 语义 +- 替代内核 checkpoint 存储 +- 把 SQLite 当成大结果输出引擎 仓库入口: - 文档: - [python_api/README.md](./python_api/README.md) -- Bazel source group: +- source group: - `//:velaria_python_ecosystem_sources` -- Python 层 source group: - `//python_api:velaria_python_supported_sources` - `//python_api:velaria_python_example_sources` - `//python_api:velaria_python_experimental_sources` -- 回归套件: +- regression: - `//:python_ecosystem_regression` -- Python 层回归套件: - `//python_api:velaria_python_supported_regression` -- shell 入口: - `./scripts/run_python_ecosystem_regression.sh` ### Experimental Runtime -Experimental runtime 包括: +负责: -- actor runtime -- rpc codec / transport 实验 -- scheduler / worker / client 链路 +- 同机 `actor/rpc/jobmaster` 实验 +- transport / codec / scheduler 观测 - 同机 smoke 与 benchmark 工具 +不代表: + +- 已完成 distributed scheduling +- 已完成 distributed fault recovery +- 已完成 cluster resource governance +- 已支持 production 级 distributed execution + 仓库入口: -- Bazel source group: +- source group: - `//:velaria_experimental_sources` -- 回归套件: +- regression: - `//:experimental_regression` -- shell 入口: - `./scripts/run_experimental_regression.sh` -### Examples +## 黄金路径 + +```text +Arrow / CSV / Python ingress + -> DataflowSession / DataFrame / StreamingDataFrame + -> local runtime kernel + -> sink + -> explain / progress / checkpoint +``` + +公开 session 入口: -examples 与 helper scripts 只用于说明各层,不定义各层。 +- `DataflowSession` -- Bazel source group: - - `//:velaria_examples_sources` +核心对外对象: -## Runtime Contract +- `DataFrame` +- `StreamingDataFrame` +- `StreamingQuery` -稳定的 runtime contract 文档位于 [docs/runtime-contract.md](./docs/runtime-contract.md)。 +## 稳定 Runtime Contract 主要 stream 入口: @@ -142,7 +120,7 @@ examples 与 helper scripts 只用于说明各层,不定义各层。 - `session.startStreamSql(sql, options)` - `StreamingDataFrame.writeStream(sink, options)` -稳定 stream contract surface: +稳定 contract surface: - `StreamingQueryProgress` - `snapshotJson()` @@ -157,39 +135,40 @@ examples 与 helper scripts 只用于说明各层,不定义各层。 - `physical` - `strategy` -其中 `strategy` 是 mode 选择、fallback reason、transport、backpressure threshold 与 checkpoint delivery mode 的唯一解释出口。 +`strategy` 统一解释 mode 选择、fallback reason、transport、backpressure 与 checkpoint delivery mode。 -## 当前能力边界 +workspace 落盘会保留内核 contract,不会重定义它们: + +- `explain.json` 保存 `logical / physical / strategy` +- `progress.jsonl` 逐行追加原生 `snapshotJson()` 输出 +- 大结果保留为文件;SQLite 只保存索引元数据和小 preview + +## 当前范围 当前已具备: -- 本地 batch + streaming 共用一个内核 +- 一个 native kernel 同时支持 batch + streaming - `read_csv`, `readStream(...)`, `readStreamCsvDir(...)` - query-local 反压、有界 backlog、progress snapshot、checkpoint path - 执行模式:`single-process`、`local-workers` - 文件 source/sink - 基础 streaming operators:`select / filter / withColumn / drop / limit / window` - stateful streaming 聚合:`sum / count / min / max / avg` -- stream SQL grouped aggregate output:`SUM(col)`、`COUNT(*)`、`MIN(col)`、`MAX(col)`、`AVG(col)` - 最小 stream SQL 子集 - 固定维度 float vector 的本地检索 - Python Arrow 输入/输出 +- 本地 tracked run、run 目录落盘与 artifact 索引 - 同机 actor/rpc/jobmaster smoke 链路 -当前明确不做: +当前不做: - 宣称已完成 distributed runtime -- 把 Python callback 拉进热路径 -- Python UDF -- 把 actor 并行化扩成任意 plan 的通用机制 -- 超出当前 `window_start,key + SUM(value)` / `COUNT(*)` 热路径之外的 actor acceleration +- 把 Python callback 或 Python UDF 拉进热路径 - 宽泛 SQL 扩展,例如完整 `JOIN / CTE / subquery / UNION` - ANN / 独立 vector DB / 分布式 vector 执行 ## Python Ecosystem -Python 继续是正式支持的 ingress 与打包层,但不成为执行内核。 - 当前支持的 Python surface: - `Session.read_csv(...)` @@ -206,39 +185,71 @@ Python 继续是正式支持的 ingress 与打包层,但不成为执行内核 - `read_excel(...)` - custom source / custom sink adapter -本仓库中的 Python 命令统一使用 `uv`: +### Workspace 模型 + +- `Workspace` + - 根目录位于 `VELARIA_HOME` 或 `~/.velaria` +- `RunStore` + - 每次执行对应一个 run 目录 + - 持久化 `run.json`、`inputs.json`、`explain.json`、`progress.jsonl`、日志和 `artifacts/` +- `ArtifactIndex` + - 默认使用 SQLite 做元数据索引 + - SQLite 不可用时退化到 JSONL + - 只缓存小结果 preview + +这一层主要服务于 agent / skill 调用、本地可追踪性和机器可读 CLI 集成;它不是第二套执行引擎。 + +### CLI 真实入口 + +仓库内真实可见的 CLI 入口是: + +- 源码目录: + - `uv run --project python_api python python_api/velaria_cli.py ...` +- 打包产物: + - `./dist/velaria-cli ...` + +不要默认存在全局 `velaria-cli` 命令,除非你另外安装并暴露了这个入口。 + +### Python 工作流 + +初始化: ```bash bazel build //:velaria_pyext +bazel run //python_api:sync_native_extension uv sync --project python_api --python python3.12 +``` + +运行示例: + +```bash uv run --project python_api python python_api/examples/demo_batch_sql_arrow.py uv run --project python_api python python_api/examples/demo_stream_sql.py uv run --project python_api python python_api/examples/demo_vector_search.py ``` -Python ecosystem 构建 / 测试前提: - -- `uv` -- 一个带 `Python.h` 的本地 CPython -- 当 Bazel 不能自动发现可用解释器时,设置 `VELARIA_PYTHON_BIN` - -推荐回归入口: +tracked run 示例: ```bash -./scripts/run_python_ecosystem_regression.sh +uv run --project python_api python python_api/velaria_cli.py run start -- csv-sql \ + --csv /path/to/input.csv \ + --query "SELECT * FROM input_table LIMIT 5" + +uv run --project python_api python python_api/velaria_cli.py run show --run-id +uv run --project python_api python python_api/velaria_cli.py artifacts list --run-id +uv run --project python_api python python_api/velaria_cli.py artifacts preview --artifact-id ``` ## Local Vector Search -vector search 是本地内核能力,不是新子系统。 +vector search 是本地内核能力,不是独立子系统。 -`v0.1` 范围: +当前范围: - fixed-dimension `float32` - 指标:`cosine`、`dot`、`l2` - `top-k` - exact scan only -- `DataFrame` / `DataflowSession` - Python `Session.vector_search(...)` - Arrow `FixedSizeList` - explain 输出 @@ -255,8 +266,7 @@ vector search 是本地内核能力,不是新子系统。 CLI 示例: ```bash -bazel build //:velaria_cli -./bazel-bin/velaria_cli \ +uv run --project python_api python python_api/velaria_cli.py csv-sql \ --csv /path/to/input.csv \ --query "SELECT * FROM input_table LIMIT 5" @@ -268,7 +278,7 @@ bazel build //:velaria_cli --top-k 5 ``` -vector explain 是稳定 contract 的一部分,当前要求至少包含: +vector explain 属于稳定 contract,当前字段包括: - `mode=exact-scan` - `metric=` @@ -278,35 +288,20 @@ vector explain 是稳定 contract 的一部分,当前要求至少包含: - `filter_pushdown=false` - `acceleration=flat-buffer+heap-topk` -benchmark 基线入口: +benchmark 基线: ```bash ./scripts/run_vector_search_benchmark.sh ``` -该脚本默认跑轻量 `--quick` 基线;如需完整 sweep,直接执行 `bazel run //:vector_search_benchmark`。 - ## Experimental Runtime -同机路径继续刻意保持收敛: +同机执行链路: ```text client -> scheduler(jobmaster) -> worker -> in-proc operator chain -> result ``` -它存在的目的: - -- 同机执行实验 -- transport 与 codec 观测 -- benchmark 与 observability 开发 - -它不代表: - -- 已完成 distributed scheduling -- 已完成 distributed fault recovery -- 已完成 cluster resource governance -- 已支持 production 级 distributed vector execution - 构建: ```bash @@ -343,6 +338,7 @@ bazel run //:stream_demo ./scripts/run_core_regression.sh ./scripts/run_python_ecosystem_regression.sh ./scripts/run_experimental_regression.sh +./scripts/run_stream_observability_regression.sh ``` 直接使用 Bazel suite: @@ -353,12 +349,6 @@ bazel test //:python_ecosystem_regression bazel test //:experimental_regression ``` -同机 observability regression: - -```bash -./scripts/run_stream_observability_regression.sh -``` - ## 仓库规则 - 语言基线:`C++17` diff --git a/README.md b/README.md index 70e2910..3f9e3ac 100644 --- a/README.md +++ b/README.md @@ -2,55 +2,24 @@ `README.md` is the English source of truth. The Chinese mirror lives in [README-zh.md](./README-zh.md). Keep both files aligned. -Velaria is a local-first C++17 dataflow engine research project. The repository is now organized around one kernel plus two explicit non-kernel layers: - -- `Core Kernel` - - local execution semantics - - batch + stream in one model - - stable explain / progress / checkpoint contract -- `Python Ecosystem` - - supported Arrow / wheel / CLI / `uv` / Excel / Bitable / custom stream adapters - - projects the kernel outward without becoming the hot path -- `Experimental Runtime` - - same-host `actor/rpc/jobmaster` - - execution and observability research lane, not a second kernel +Velaria is a local-first C++17 dataflow engine research project. The current goal is narrow and explicit: -## Golden Path - -The only golden path is: - -```text -Arrow / CSV / Python ingress - -> DataflowSession / DataFrame / StreamingDataFrame - -> local runtime kernel - -> sink - -> explain / progress / checkpoint -``` - -Public session entry: - -- `DataflowSession` - -Core user-facing objects: - -- `DataFrame` -- `StreamingDataFrame` -- `StreamingQuery` +- keep one native kernel as the execution source of truth +- keep the single-node path stable +- expose that kernel through a supported Python ecosystem layer +- use the same-host actor/rpc path as an experiment lane, not as a second kernel -The same-host actor/rpc path stays in the repo, but it is not the main product story. - -## Repository Layers +## Layer Model ### Core Kernel -Core owns: +Owns: +- local batch and streaming execution - logical planning and minimal SQL mapping -- table/value execution model -- local batch and streaming runtime - source/sink ABI -- runtime contract surfaces -- local vector search capability +- explain / progress / checkpoint contract +- local vector search Repository entrypoints: @@ -58,80 +27,89 @@ Repository entrypoints: - [docs/core-boundary.md](./docs/core-boundary.md) - [docs/runtime-contract.md](./docs/runtime-contract.md) - [docs/streaming_runtime_design.md](./docs/streaming_runtime_design.md) -- Bazel source groups: +- source groups: - `//:velaria_core_logical_sources` - `//:velaria_core_execution_sources` - `//:velaria_core_contract_sources` -- regression suite: +- regression: - `//:core_regression` ### Python Ecosystem -Python is a supported ecosystem layer, not a convenience-only wrapper. - -It includes: +Owns: - native binding in `python_api` -- Arrow ingestion and output +- Arrow ingress and output - `uv` workflow - wheel / native wheel / CLI packaging -- Excel and Bitable adapters -- custom source / custom sink adapters -- supported CLI tooling in `python_api/velaria_cli.py` -- Python ecosystem demos in `python_api/examples` -- Python benchmarks in `python_api/benchmarks` +- Excel / Bitable / custom stream adapters +- local workspace tracking for runs and artifacts -It does not define: +Does not own: -- execution hot-path behavior -- independent progress/checkpoint semantics -- independent vector-search semantics +- execution hot-path semantics +- independent explain / progress / checkpoint semantics +- replacement checkpoint storage +- SQLite as a large-result engine Repository entrypoints: - docs: - [python_api/README.md](./python_api/README.md) -- Bazel source group: +- source groups: - `//:velaria_python_ecosystem_sources` -- Python-layer source groups: - `//python_api:velaria_python_supported_sources` - `//python_api:velaria_python_example_sources` - `//python_api:velaria_python_experimental_sources` -- regression suite: +- regression: - `//:python_ecosystem_regression` -- Python-layer regression suite: - `//python_api:velaria_python_supported_regression` -- shell entrypoint: - `./scripts/run_python_ecosystem_regression.sh` ### Experimental Runtime -Experimental runtime includes: +Owns: + +- same-host `actor/rpc/jobmaster` experiments +- transport / codec / scheduler observation +- same-host smoke and benchmark tooling + +Does not imply: -- actor runtime -- rpc codec / transport experiments -- scheduler / worker / client flow -- same-host smoke and benchmark tools +- distributed scheduling +- distributed fault recovery +- cluster resource governance +- production distributed execution Repository entrypoints: -- Bazel source group: +- source group: - `//:velaria_experimental_sources` -- regression suite: +- regression: - `//:experimental_regression` -- shell entrypoint: - `./scripts/run_experimental_regression.sh` -### Examples +## Golden Path -Examples and helper scripts illustrate layers; they do not define them. +```text +Arrow / CSV / Python ingress + -> DataflowSession / DataFrame / StreamingDataFrame + -> local runtime kernel + -> sink + -> explain / progress / checkpoint +``` -- Bazel source group: - - `//:velaria_examples_sources` +Public session entry: -## Runtime Contract +- `DataflowSession` -The stable runtime-facing contract is documented in [docs/runtime-contract.md](./docs/runtime-contract.md). +Core user-facing objects: + +- `DataFrame` +- `StreamingDataFrame` +- `StreamingQuery` + +## Stable Runtime Contract Main stream entry points: @@ -142,7 +120,7 @@ Main stream entry points: - `session.startStreamSql(sql, options)` - `StreamingDataFrame.writeStream(sink, options)` -Stable stream contract surfaces: +Stable contract surfaces: - `StreamingQueryProgress` - `snapshotJson()` @@ -157,39 +135,40 @@ Stable stream contract surfaces: - `physical` - `strategy` -`strategy` is the single explanation outlet for mode selection, fallback reason, transport, backpressure thresholds, and checkpoint delivery mode. +`strategy` is the single outlet for mode selection, fallback reason, transport, backpressure, and checkpoint delivery mode. + +Workspace persistence keeps the kernel contract unchanged: -## Current Capability Boundary +- `explain.json` stores `logical / physical / strategy` +- `progress.jsonl` appends native `snapshotJson()` output line by line +- large results stay in files; SQLite stores only index rows and small previews + +## Current Scope Available today: -- local batch + streaming execution through one kernel +- one native kernel for batch + streaming - `read_csv`, `readStream(...)`, `readStreamCsvDir(...)` - query-local backpressure, bounded backlog, progress snapshots, checkpoint path - execution modes: `single-process`, `local-workers` - file source/sink support - basic stream operators: `select / filter / withColumn / drop / limit / window` - stateful stream aggregates: `sum / count / min / max / avg` -- stream SQL grouped aggregate outputs with `SUM(col)`, `COUNT(*)`, `MIN(col)`, `MAX(col)`, `AVG(col)` - minimal stream SQL subset - local vector search on fixed-dimension float vectors -- Python Arrow ingestion and output +- Python Arrow ingress/output +- tracked local runs with run directory persistence and artifact indexing - same-host actor/rpc/jobmaster smoke path -Out of scope in the current repo state: +Out of scope: - completed distributed runtime claims -- Python callback execution in the hot path -- Python UDFs -- generic actor parallelization for arbitrary plans -- actor acceleration beyond the current `window_start,key + SUM(value)` / `COUNT(*)` hot paths +- Python callbacks or Python UDFs in the hot path - broad SQL expansion such as full `JOIN / CTE / subquery / UNION` - ANN / standalone vector DB / distributed vector execution ## Python Ecosystem -Python remains a supported ingress and packaging layer. It does not become the execution core. - Main supported Python surfaces: - `Session.read_csv(...)` @@ -206,39 +185,71 @@ Main supported Python surfaces: - `read_excel(...)` - custom source / custom sink adapters -Python ecosystem commands in this repo use `uv`: +### Workspace Model + +- `Workspace` + - root under `VELARIA_HOME` or `~/.velaria` +- `RunStore` + - one run directory per execution + - persists `run.json`, `inputs.json`, `explain.json`, `progress.jsonl`, logs, and `artifacts/` +- `ArtifactIndex` + - SQLite-first metadata index + - JSONL fallback when SQLite is unavailable + - preview cache for small result slices only + +This layer is for agent/skill invocation, local traceability, and machine-readable CLI integration. It is not a second execution engine. + +### CLI Entry Points + +Repo-visible CLI entrypoints are: + +- source checkout: + - `uv run --project python_api python python_api/velaria_cli.py ...` +- packaged binary: + - `./dist/velaria-cli ...` + +Do not assume a global `velaria-cli` command exists unless you have installed one separately. + +### Python Workflow + +Bootstrap: ```bash bazel build //:velaria_pyext +bazel run //python_api:sync_native_extension uv sync --project python_api --python python3.12 +``` + +Run examples: + +```bash uv run --project python_api python python_api/examples/demo_batch_sql_arrow.py uv run --project python_api python python_api/examples/demo_stream_sql.py uv run --project python_api python python_api/examples/demo_vector_search.py ``` -Python ecosystem build/test prerequisites: - -- `uv` -- a local CPython interpreter with `Python.h` -- `VELARIA_PYTHON_BIN` when Bazel cannot auto-discover a usable interpreter - -Recommended regression entrypoint: +Tracked run examples: ```bash -./scripts/run_python_ecosystem_regression.sh +uv run --project python_api python python_api/velaria_cli.py run start -- csv-sql \ + --csv /path/to/input.csv \ + --query "SELECT * FROM input_table LIMIT 5" + +uv run --project python_api python python_api/velaria_cli.py run show --run-id +uv run --project python_api python python_api/velaria_cli.py artifacts list --run-id +uv run --project python_api python python_api/velaria_cli.py artifacts preview --artifact-id ``` ## Local Vector Search -Vector search is a local kernel capability, not a new subsystem. +Vector search is a local kernel capability, not a separate subsystem. -Scope in `v0.1`: +Current scope: - fixed-dimension `float32` - metrics: `cosine`, `dot`, `l2` - `top-k` - exact scan only -- `DataFrame` / `DataflowSession` - Python `Session.vector_search(...)` - Arrow `FixedSizeList` - explain output @@ -255,8 +266,7 @@ Design doc: CLI examples: ```bash -bazel build //:velaria_cli -./bazel-bin/velaria_cli \ +uv run --project python_api python python_api/velaria_cli.py csv-sql \ --csv /path/to/input.csv \ --query "SELECT * FROM input_table LIMIT 5" @@ -268,7 +278,7 @@ bazel build //:velaria_cli --top-k 5 ``` -Vector explain is part of the stable contract. Current required fields include: +Vector explain is part of the stable contract. Current fields include: - `mode=exact-scan` - `metric=` @@ -284,29 +294,14 @@ Benchmark baseline: ./scripts/run_vector_search_benchmark.sh ``` -The script runs a quick exact-scan baseline. Use `bazel run //:vector_search_benchmark` for the full sweep. - ## Experimental Runtime -The same-host path stays intentionally narrow: +Same-host flow: ```text client -> scheduler(jobmaster) -> worker -> in-proc operator chain -> result ``` -It exists for: - -- same-host execution experiments -- transport and codec observation -- benchmark and observability development - -It does not imply: - -- distributed scheduling -- distributed fault recovery -- cluster resource governance -- production distributed vector execution - Build: ```bash @@ -343,6 +338,7 @@ Layered regression entrypoints: ./scripts/run_core_regression.sh ./scripts/run_python_ecosystem_regression.sh ./scripts/run_experimental_regression.sh +./scripts/run_stream_observability_regression.sh ``` Direct Bazel suites: @@ -353,12 +349,6 @@ bazel test //:python_ecosystem_regression bazel test //:experimental_regression ``` -Same-host observability regression: - -```bash -./scripts/run_stream_observability_regression.sh -``` - ## Repository Rules - language baseline: `C++17` diff --git a/python_api/BUILD.bazel b/python_api/BUILD.bazel index 2fec7f9..9881f4b 100644 --- a/python_api/BUILD.bazel +++ b/python_api/BUILD.bazel @@ -5,6 +5,15 @@ load("@rules_python//python:py_test.bzl", "py_test") load("@pypi//:requirements.bzl", "requirement") load("//python_api:version.bzl", "VELARIA_PY_VERSION") +py_binary( + name = "sync_native_extension", + srcs = ["sync_native_extension.py"], + main = "sync_native_extension.py", + args = ["$(location //:velaria_pyext)"], + data = ["//:velaria_pyext"], + visibility = ["//visibility:public"], +) + genrule( name = "velaria_native_py_pkg", srcs = ["//:velaria_pyext"], @@ -20,6 +29,11 @@ py_library( "velaria/excel.py", "velaria/custom_stream.py", "velaria/bitable.py", + "velaria/workspace/__init__.py", + "velaria/workspace/artifact_index.py", + "velaria/workspace/paths.py", + "velaria/workspace/run_store.py", + "velaria/workspace/types.py", ], data = [":velaria_native_py_pkg"], imports = ["."], @@ -251,10 +265,33 @@ py_test( ], ) +py_test( + name = "workspace_runs_test", + srcs = ["tests/test_workspace_runs.py"], + main = "tests/test_workspace_runs.py", + imports = ["."], + deps = [ + ":velaria_cli_lib", + ":velaria_py_pkg", + requirement("pyarrow"), + ], +) + +py_test( + name = "artifact_index_test", + srcs = ["tests/test_artifact_index.py"], + main = "tests/test_artifact_index.py", + imports = ["."], + deps = [ + ":velaria_py_pkg", + ], +) + test_suite( name = "velaria_python_supported_regression", tests = [ ":arrow_stream_ingestion_test", + ":artifact_index_test", ":bitable_group_by_owner_integration_test", ":bitable_stream_source_test", ":custom_stream_source_test", @@ -262,5 +299,6 @@ test_suite( ":read_excel_test", ":streaming_v05_test", ":vector_search_test", + ":workspace_runs_test", ], ) diff --git a/python_api/README.md b/python_api/README.md index e406e3b..b0cb6ae 100644 --- a/python_api/README.md +++ b/python_api/README.md @@ -118,9 +118,17 @@ Bootstrap: ```bash bazel build //:velaria_pyext +bazel run //python_api:sync_native_extension uv sync --project python_api --python python3.12 ``` +If you run `python_api/velaria_cli.py` or other source-checkout Python entrypoints directly, +keep `python_api/velaria/_velaria.so` in sync with: + +```bash +bazel run //python_api:sync_native_extension +``` + Run demos: ```bash @@ -149,6 +157,8 @@ Build targets: - native extension: - `//:velaria_pyext` +- sync built native extension into the source checkout: + - `//python_api:sync_native_extension` - pure-Python wheel wrapper: - `//python_api:velaria_whl` - native wheel: @@ -167,6 +177,93 @@ Single-file CLI packaging: The CLI is part of the ecosystem layer. For supported paths, it should delegate to the same native session contract as Python and C++. +Repo-visible CLI entrypoints are: + +- source checkout: + - `uv run --project python_api python python_api/velaria_cli.py ...` +- packaged binary: + - `./dist/velaria-cli ...` + +Do not assume a global `velaria-cli` command exists unless you have separately installed and exposed one in your environment. + +### Workspace + Artifacts + +The CLI also supports a local workspace layout for tracked runs and artifact indexing. + +Default paths: + +- runs: `~/.velaria/runs//` +- index: `~/.velaria/index/artifacts.sqlite` + +You can override the root with: + +```bash +export VELARIA_HOME=/tmp/velaria-home +``` + +Tracked run commands: + +```bash +uv run --project python_api python python_api/velaria_cli.py run start -- csv-sql \ + --csv /path/to/input.csv \ + --query "SELECT * FROM input_table LIMIT 5" + +./dist/velaria-cli run start -- csv-sql \ + --csv /path/to/input.csv \ + --query "SELECT * FROM input_table LIMIT 5" + +uv run --project python_api python python_api/velaria_cli.py run show --run-id +uv run --project python_api python python_api/velaria_cli.py run status --run-id +uv run --project python_api python python_api/velaria_cli.py artifacts list --run-id +uv run --project python_api python python_api/velaria_cli.py artifacts preview --artifact-id +uv run --project python_api python python_api/velaria_cli.py run cleanup --keep-last 10 +``` + +The tracked workspace contract is: + +- stdout returns JSON only +- logs go to `stdout.log` / `stderr.log` +- stream progress appends native `snapshotJson()` output to `progress.jsonl` +- stream explain keeps the native `logical` / `physical` / `strategy` structure +- large results stay in files under `artifacts/`; SQLite stores only index rows and small previews +- deleting run directories requires the explicit `--delete-files` switch + +End-to-end examples: + +CSV SQL to parquet plus preview: + +```bash +uv run --project python_api python python_api/velaria_cli.py run start -- csv-sql \ + --csv /path/to/input.csv \ + --query "SELECT name, score FROM input_table WHERE score > 10" + +uv run --project python_api python python_api/velaria_cli.py artifacts list --run-id +uv run --project python_api python python_api/velaria_cli.py artifacts preview --artifact-id +``` + +Stream SQL once plus status: + +```bash +uv run --project python_api python python_api/velaria_cli.py run start -- stream-sql-once \ + --source-csv-dir /path/to/source_dir \ + --sink-schema "key STRING, value_sum INT" \ + --query "INSERT INTO output_sink SELECT key, SUM(value) AS value_sum FROM input_stream GROUP BY key" + +uv run --project python_api python python_api/velaria_cli.py run status --run-id +``` + +Vector search plus explain artifact: + +```bash +uv run --project python_api python python_api/velaria_cli.py run start -- vector-search \ + --csv /path/to/vectors.csv \ + --vector-column embedding \ + --query-vector "0.1,0.2,0.3" \ + --top-k 5 + +uv run --project python_api python python_api/velaria_cli.py artifacts list --run-id +``` + Python ecosystem source groups: - supported: diff --git a/python_api/sync_native_extension.py b/python_api/sync_native_extension.py new file mode 100644 index 0000000..d3eb5b4 --- /dev/null +++ b/python_api/sync_native_extension.py @@ -0,0 +1,27 @@ +import pathlib +import shutil +import sys +import os +import stat + + +def main() -> int: + if len(sys.argv) != 2: + raise SystemExit("usage: sync_native_extension.py ") + workspace = pathlib.Path(os.environ.get("BUILD_WORKSPACE_DIRECTORY", pathlib.Path.cwd())).resolve() + src = pathlib.Path(sys.argv[1]).resolve() + dst = workspace / "python_api" / "velaria" / "_velaria.so" + dst.parent.mkdir(parents=True, exist_ok=True) + if dst.exists(): + current_mode = dst.stat().st_mode + dst.chmod(current_mode | stat.S_IWUSR) + dst.unlink() + shutil.copy2(src, dst) + synced_mode = dst.stat().st_mode + dst.chmod(synced_mode | stat.S_IWUSR) + print(f"[sync-native-extension] copied {src} -> {dst}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/python_api/tests/test_artifact_index.py b/python_api/tests/test_artifact_index.py new file mode 100644 index 0000000..5050e55 --- /dev/null +++ b/python_api/tests/test_artifact_index.py @@ -0,0 +1,113 @@ +import os +import pathlib +import tempfile +import unittest +from unittest import mock + +from velaria.workspace.artifact_index import ArtifactIndex + + +class ArtifactIndexTest(unittest.TestCase): + def test_insert_list_preview_and_cleanup(self): + with tempfile.TemporaryDirectory(prefix="velaria-artifact-index-") as tmp: + run_dir = pathlib.Path(tmp) / "runs" / "run-1" + run_dir.mkdir(parents=True) + with mock.patch.dict(os.environ, {"VELARIA_HOME": tmp}): + index = ArtifactIndex() + index.upsert_run( + { + "run_id": "run-1", + "created_at": "2026-04-01T10:00:00Z", + "finished_at": "2026-04-01T10:00:02Z", + "status": "succeeded", + "action": "csv-sql", + "cli_args": {"query": "SELECT 1"}, + "velaria_version": "0.0.test", + "run_dir": str(run_dir), + } + ) + index.insert_artifact( + { + "artifact_id": "artifact-1", + "run_id": "run-1", + "created_at": "2026-04-01T10:00:01Z", + "type": "file", + "uri": "file:///tmp/result.parquet", + "format": "parquet", + "row_count": 3, + "schema_json": ["name", "score"], + "preview_json": {"rows": [{"name": "alice", "score": 10}]}, + "tags_json": ["result"], + } + ) + artifacts = index.list_artifacts() + self.assertEqual(len(artifacts), 1) + self.assertEqual(artifacts[0]["artifact_id"], "artifact-1") + self.assertEqual(artifacts[0]["schema_json"], ["name", "score"]) + self.assertEqual(artifacts[0]["preview_json"]["rows"][0]["name"], "alice") + + index.update_artifact_preview( + "artifact-1", + {"rows": [{"name": "bob", "score": 20}], "truncated": False}, + ) + artifact = index.get_artifact("artifact-1") + self.assertIsNotNone(artifact) + self.assertEqual(artifact["preview_json"]["rows"][0]["name"], "bob") + + cleanup = index.cleanup_runs(keep_last_n=0, delete_files=False) + self.assertEqual(cleanup["deleted_run_ids"], ["run-1"]) + self.assertTrue(run_dir.exists()) + self.assertEqual(index.list_artifacts(), []) + + def test_cleanup_skips_running_runs_even_with_delete_files(self): + with tempfile.TemporaryDirectory(prefix="velaria-artifact-running-") as tmp: + run_dir = pathlib.Path(tmp) / "runs" / "run-1" + run_dir.mkdir(parents=True) + with mock.patch.dict(os.environ, {"VELARIA_HOME": tmp}): + index = ArtifactIndex() + index.upsert_run( + { + "run_id": "run-1", + "created_at": "2026-04-01T10:00:00Z", + "finished_at": None, + "status": "running", + "action": "stream-sql-once", + "cli_args": {"query": "INSERT INTO sink SELECT * FROM source"}, + "velaria_version": "0.0.test", + "run_dir": str(run_dir), + } + ) + cleanup = index.cleanup_runs(keep_last_n=0, delete_files=True) + self.assertEqual(cleanup["deleted_run_ids"], []) + self.assertTrue(run_dir.exists()) + self.assertIsNotNone(index.get_run("run-1")) + + def test_keep_last_and_delete_files(self): + with tempfile.TemporaryDirectory(prefix="velaria-artifact-keep-") as tmp: + with mock.patch.dict(os.environ, {"VELARIA_HOME": tmp}): + index = ArtifactIndex() + run_ids = ["run-1", "run-2", "run-3"] + for offset, run_id in enumerate(run_ids, start=1): + run_dir = pathlib.Path(tmp) / "runs" / run_id + run_dir.mkdir(parents=True) + index.upsert_run( + { + "run_id": run_id, + "created_at": f"2026-04-0{offset}T10:00:00Z", + "finished_at": None, + "status": "succeeded", + "action": "csv-sql", + "cli_args": {}, + "velaria_version": "0.0.test", + "run_dir": str(run_dir), + } + ) + cleanup = index.cleanup_runs(keep_last_n=1, delete_files=True) + self.assertEqual(set(cleanup["deleted_run_ids"]), {"run-1", "run-2"}) + self.assertFalse((pathlib.Path(tmp) / "runs" / "run-1").exists()) + self.assertFalse((pathlib.Path(tmp) / "runs" / "run-2").exists()) + self.assertTrue((pathlib.Path(tmp) / "runs" / "run-3").exists()) + + +if __name__ == "__main__": + unittest.main() diff --git a/python_api/tests/test_python_cli_contract.py b/python_api/tests/test_python_cli_contract.py index e55da7c..4a8f07a 100644 --- a/python_api/tests/test_python_cli_contract.py +++ b/python_api/tests/test_python_cli_contract.py @@ -1,12 +1,22 @@ import io import json +import importlib +import os import pathlib +import sys import tempfile import unittest -from contextlib import redirect_stdout +from contextlib import redirect_stderr, redirect_stdout from unittest import mock -import velaria_cli +import pyarrow as pa +import pyarrow.parquet as pq + +try: + velaria_cli = importlib.import_module("velaria_cli") +except ModuleNotFoundError: + sys.path.insert(0, str(pathlib.Path(__file__).resolve().parents[1])) + velaria_cli = importlib.import_module("velaria_cli") class _FakeArrowResult: @@ -24,6 +34,77 @@ def to_arrow(self): class PythonCliContractTest(unittest.TestCase): + def test_workspace_errors_return_json_without_stderr_noise(self): + with tempfile.TemporaryDirectory(prefix="velaria-cli-errors-") as tmp: + with mock.patch.dict(os.environ, {"VELARIA_HOME": tmp}): + cases = [ + (["run", "start", "--"], "run start requires an action"), + (["run", "show", "--run-id", "missing-run"], "run not found: missing-run"), + ( + ["artifacts", "preview", "--artifact-id", "missing-artifact"], + "artifact not found: missing-artifact", + ), + ] + for argv, expected_error in cases: + stdout = io.StringIO() + stderr = io.StringIO() + with self.subTest(argv=argv): + with redirect_stdout(stdout), redirect_stderr(stderr): + exit_code = velaria_cli.main(argv) + self.assertEqual(exit_code, 1) + self.assertEqual(stderr.getvalue(), "") + payload = json.loads(stdout.getvalue()) + self.assertFalse(payload["ok"]) + self.assertIn(expected_error, payload["error"]) + + def test_artifact_preview_cache_miss_reports_full_row_count_for_parquet(self): + workspace = importlib.import_module("velaria.workspace") + with tempfile.TemporaryDirectory(prefix="velaria-cli-preview-") as tmp: + parquet_path = pathlib.Path(tmp) / "artifact.parquet" + table = pa.table({"name": ["alice", "bob", "carol"], "score": [1, 2, 3]}) + pq.write_table(table, parquet_path) + with mock.patch.dict(os.environ, {"VELARIA_HOME": tmp}): + index = workspace.ArtifactIndex() + run_dir = pathlib.Path(tmp) / "runs" / "run-1" + run_dir.mkdir(parents=True) + index.upsert_run( + { + "run_id": "run-1", + "created_at": "2026-04-01T10:00:00Z", + "finished_at": "2026-04-01T10:00:01Z", + "status": "succeeded", + "action": "csv-sql", + "cli_args": {}, + "velaria_version": "0.0.test", + "run_dir": str(run_dir), + } + ) + index.insert_artifact( + { + "artifact_id": "artifact-1", + "run_id": "run-1", + "created_at": "2026-04-01T10:00:01Z", + "type": "file", + "uri": parquet_path.resolve().as_uri(), + "format": "parquet", + "row_count": 3, + "schema_json": ["name", "score"], + "preview_json": None, + "tags_json": ["result"], + } + ) + stdout = io.StringIO() + with redirect_stdout(stdout): + exit_code = velaria_cli.main( + ["artifacts", "preview", "--artifact-id", "artifact-1", "--limit", "2"] + ) + self.assertEqual(exit_code, 0) + payload = json.loads(stdout.getvalue()) + self.assertTrue(payload["ok"]) + self.assertEqual(payload["preview"]["row_count"], 3) + self.assertTrue(payload["preview"]["truncated"]) + self.assertEqual(len(payload["preview"]["rows"]), 2) + def test_vector_cli_delegates_to_session_contract(self): fake_session = mock.Mock() fake_session.read_csv.return_value = mock.Mock(name="df") diff --git a/python_api/tests/test_streaming_v05.py b/python_api/tests/test_streaming_v05.py index 3ae9261..ba03ef9 100644 --- a/python_api/tests/test_streaming_v05.py +++ b/python_api/tests/test_streaming_v05.py @@ -1,5 +1,4 @@ import csv -import json import pathlib import tempfile import unittest @@ -61,8 +60,9 @@ def test_stream_progress_contract_with_start_stream_sql(self): self.assertIn("backpressure_high_watermark", progress) self.assertIn("backpressure_low_watermark", progress) - snapshot_json = json.dumps(progress) + snapshot_json = query.snapshot_json() self.assertIn("execution_mode", snapshot_json) + self.assertIn("checkpoint_delivery_mode", snapshot_json) def test_start_stream_sql_supports_multi_aggregate_and_having_alias(self): session = velaria.Session() diff --git a/python_api/tests/test_workspace_runs.py b/python_api/tests/test_workspace_runs.py new file mode 100644 index 0000000..c3c6b72 --- /dev/null +++ b/python_api/tests/test_workspace_runs.py @@ -0,0 +1,135 @@ +import io +import importlib +import json +import os +import pathlib +import sys +import tempfile +import unittest +from contextlib import redirect_stdout +from unittest import mock + +try: + velaria_cli = importlib.import_module("velaria_cli") + workspace = importlib.import_module("velaria.workspace") +except ModuleNotFoundError: + sys.path.insert(0, str(pathlib.Path(__file__).resolve().parents[1])) + velaria_cli = importlib.import_module("velaria_cli") + workspace = importlib.import_module("velaria.workspace") + +ArtifactIndex = workspace.ArtifactIndex +create_run = workspace.create_run +read_run = workspace.read_run + + +class WorkspaceRunsTest(unittest.TestCase): + def test_run_start_csv_sql_writes_run_dir_and_preview(self): + with tempfile.TemporaryDirectory(prefix="velaria-workspace-run-") as tmp: + csv_path = pathlib.Path(tmp) / "scores.csv" + csv_path.write_text("name,score\nalice,10\nbob,22\ncarol,31\n", encoding="utf-8") + with mock.patch.dict(os.environ, {"VELARIA_HOME": tmp}): + stdout = io.StringIO() + with redirect_stdout(stdout): + exit_code = velaria_cli.main( + [ + "run", + "start", + "--", + "csv-sql", + "--csv", + str(csv_path), + "--query", + "SELECT name, score FROM input_table WHERE score > 20", + ] + ) + self.assertEqual(exit_code, 0) + payload = json.loads(stdout.getvalue()) + self.assertEqual(payload["status"], "succeeded") + + run_id = payload["run_id"] + run_dir = pathlib.Path(payload["run_dir"]) + self.assertTrue((run_dir / "run.json").exists()) + self.assertTrue((run_dir / "inputs.json").exists()) + self.assertTrue((run_dir / "stdout.log").exists()) + self.assertTrue((run_dir / "stderr.log").exists()) + self.assertTrue((run_dir / "explain.json").exists()) + self.assertTrue((run_dir / "artifacts" / "result.parquet").exists()) + + run_meta = read_run(run_id) + self.assertEqual(run_meta["status"], "succeeded") + self.assertEqual(run_meta["action"], "csv-sql") + + index = ArtifactIndex() + artifacts = index.list_artifacts(run_id=run_id) + self.assertEqual(len(artifacts), 1) + artifact_id = artifacts[0]["artifact_id"] + + preview_stdout = io.StringIO() + with redirect_stdout(preview_stdout): + preview_exit = velaria_cli.main( + [ + "artifacts", + "preview", + "--artifact-id", + artifact_id, + "--limit", + "5", + ] + ) + self.assertEqual(preview_exit, 0) + preview_payload = json.loads(preview_stdout.getvalue()) + self.assertEqual(preview_payload["artifact_id"], artifact_id) + self.assertEqual(len(preview_payload["preview"]["rows"]), 2) + + status_stdout = io.StringIO() + with redirect_stdout(status_stdout): + status_exit = velaria_cli.main(["run", "status", "--run-id", run_id]) + self.assertEqual(status_exit, 0) + status_payload = json.loads(status_stdout.getvalue()) + self.assertEqual(status_payload["status"], "succeeded") + self.assertEqual(len(status_payload["artifacts"]), 1) + + def test_stream_sql_once_writes_snapshot_json_progress(self): + with tempfile.TemporaryDirectory(prefix="velaria-stream-run-") as tmp: + source_dir = pathlib.Path(tmp) / "source" + source_dir.mkdir(parents=True) + (source_dir / "part-000.csv").write_text( + "key,value\nu1,1\nu1,2\nu2,4\n", + encoding="utf-8", + ) + with mock.patch.dict(os.environ, {"VELARIA_HOME": tmp}): + run_id, run_dir = create_run("stream-sql-once", {"query": "demo"}, "0.0.test") + result = velaria_cli._execute_stream_sql_once( + source_csv_dir=source_dir, + source_table="input_stream", + source_delimiter=",", + sink_table="output_sink", + sink_schema="key STRING, value_sum INT", + sink_path=run_dir / "artifacts" / "stream_result.csv", + sink_delimiter=",", + query=( + "INSERT INTO output_sink " + "SELECT key, SUM(value) AS value_sum FROM input_stream GROUP BY key" + ), + trigger_interval_ms=0, + checkpoint_delivery_mode="best-effort", + execution_mode="single-process", + local_workers=1, + max_inflight_partitions=0, + max_batches=0, + run_id=run_id, + ) + self.assertIn("progress", result["payload"]) + progress_path = run_dir / "progress.jsonl" + lines = [line for line in progress_path.read_text(encoding="utf-8").splitlines() if line] + self.assertGreaterEqual(len(lines), 2) + snapshots = [json.loads(line) for line in lines] + self.assertIn("execution_mode", snapshots[-1]) + explain = json.loads((run_dir / "explain.json").read_text(encoding="utf-8")) + self.assertIn("logical", explain) + self.assertIn("physical", explain) + self.assertIn("strategy", explain) + + +if __name__ == "__main__": + unittest.main() diff --git a/python_api/velaria/workspace/__init__.py b/python_api/velaria/workspace/__init__.py new file mode 100644 index 0000000..3918b7f --- /dev/null +++ b/python_api/velaria/workspace/__init__.py @@ -0,0 +1,31 @@ +from .artifact_index import ArtifactIndex, SQLITE_SCHEMA +from .paths import ensure_dirs, get_index_dir, get_runs_dir, get_velaria_home +from .run_store import ( + append_progress_snapshot, + append_stderr, + append_stdout, + create_run, + finalize_run, + read_run, + update_run, + write_explain, + write_inputs, +) + +__all__ = [ + "ArtifactIndex", + "SQLITE_SCHEMA", + "append_progress_snapshot", + "append_stderr", + "append_stdout", + "create_run", + "ensure_dirs", + "finalize_run", + "get_index_dir", + "get_runs_dir", + "get_velaria_home", + "read_run", + "update_run", + "write_explain", + "write_inputs", +] diff --git a/python_api/velaria/workspace/artifact_index.py b/python_api/velaria/workspace/artifact_index.py new file mode 100644 index 0000000..c817749 --- /dev/null +++ b/python_api/velaria/workspace/artifact_index.py @@ -0,0 +1,399 @@ +from __future__ import annotations + +import json +import pathlib +import shutil +import sqlite3 +from datetime import datetime, timedelta, timezone +from typing import Any + +from .paths import ensure_dirs, get_index_dir + +SQLITE_SCHEMA = """ +CREATE TABLE IF NOT EXISTS runs ( + run_id TEXT PRIMARY KEY, + created_at TEXT NOT NULL, + finished_at TEXT, + status TEXT NOT NULL, + action TEXT NOT NULL, + args_json TEXT NOT NULL, + velaria_version TEXT, + run_dir TEXT NOT NULL +); + +CREATE TABLE IF NOT EXISTS artifacts ( + artifact_id TEXT PRIMARY KEY, + run_id TEXT NOT NULL, + created_at TEXT NOT NULL, + type TEXT NOT NULL, + uri TEXT NOT NULL, + format TEXT NOT NULL, + row_count INTEGER, + schema_json TEXT, + preview_json TEXT, + tags_json TEXT, + FOREIGN KEY(run_id) REFERENCES runs(run_id) +); + +CREATE INDEX IF NOT EXISTS idx_artifacts_run_id ON artifacts(run_id); +CREATE INDEX IF NOT EXISTS idx_runs_created_at ON runs(created_at); +""" + +TERMINAL_RUN_STATUSES = frozenset({"succeeded", "failed", "timed_out"}) + + +def _json_dumps(payload: Any) -> str: + return json.dumps(payload, ensure_ascii=False, separators=(",", ":")) + + +def _json_loads(payload: str | None) -> Any: + if not payload: + return None + return json.loads(payload) + + +def _utc_now() -> str: + return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z") + + +def _parse_timestamp(value: str | None) -> datetime | None: + if not value: + return None + return datetime.fromisoformat(value.replace("Z", "+00:00")) + + +class ArtifactIndex: + def __init__(self) -> None: + ensure_dirs() + self.sqlite_path = get_index_dir() / "artifacts.sqlite" + self.fallback_path = get_index_dir() / "artifacts.jsonl" + self.backend = "sqlite" + self._conn: sqlite3.Connection | None = None + try: + self._conn = sqlite3.connect(self.sqlite_path) + self._conn.row_factory = sqlite3.Row + self._conn.executescript(SQLITE_SCHEMA) + self._conn.commit() + except sqlite3.Error: + self.backend = "jsonl" + self._conn = None + self.fallback_path.touch(exist_ok=True) + + def _append_event(self, payload: dict[str, Any]) -> None: + with self.fallback_path.open("a", encoding="utf-8") as handle: + handle.write(_json_dumps(payload)) + handle.write("\n") + + def _load_fallback_state(self) -> tuple[dict[str, dict[str, Any]], dict[str, dict[str, Any]]]: + runs: dict[str, dict[str, Any]] = {} + artifacts: dict[str, dict[str, Any]] = {} + if not self.fallback_path.exists(): + return runs, artifacts + with self.fallback_path.open("r", encoding="utf-8") as handle: + for line in handle: + line = line.strip() + if not line: + continue + event = json.loads(line) + kind = event["kind"] + if kind == "run_upsert": + payload = dict(event["payload"]) + runs[payload["run_id"]] = payload + elif kind == "artifact_upsert": + payload = dict(event["payload"]) + artifacts[payload["artifact_id"]] = payload + elif kind == "artifact_preview": + artifact_id = event["artifact_id"] + if artifact_id in artifacts: + artifacts[artifact_id]["preview_json"] = event["preview_json"] + elif kind == "run_delete": + run_id = event["run_id"] + runs.pop(run_id, None) + for artifact_id in [ + artifact["artifact_id"] + for artifact in artifacts.values() + if artifact["run_id"] == run_id + ]: + artifacts.pop(artifact_id, None) + return runs, artifacts + + def upsert_run(self, run_meta: dict[str, Any]) -> None: + payload = { + "run_id": run_meta["run_id"], + "created_at": run_meta["created_at"], + "finished_at": run_meta.get("finished_at"), + "status": run_meta["status"], + "action": run_meta["action"], + "args_json": _json_dumps(run_meta.get("cli_args", {})), + "velaria_version": run_meta.get("velaria_version"), + "run_dir": run_meta["run_dir"], + } + if self.backend == "sqlite": + assert self._conn is not None + self._conn.execute( + """ + INSERT INTO runs ( + run_id, created_at, finished_at, status, action, args_json, velaria_version, run_dir + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(run_id) DO UPDATE SET + created_at=excluded.created_at, + finished_at=excluded.finished_at, + status=excluded.status, + action=excluded.action, + args_json=excluded.args_json, + velaria_version=excluded.velaria_version, + run_dir=excluded.run_dir + """, + ( + payload["run_id"], + payload["created_at"], + payload["finished_at"], + payload["status"], + payload["action"], + payload["args_json"], + payload["velaria_version"], + payload["run_dir"], + ), + ) + self._conn.commit() + return + self._append_event({"kind": "run_upsert", "payload": payload}) + + def get_run(self, run_id: str) -> dict[str, Any] | None: + if self.backend == "sqlite": + assert self._conn is not None + row = self._conn.execute( + "SELECT * FROM runs WHERE run_id = ?", + (run_id,), + ).fetchone() + if row is None: + return None + return { + "run_id": row["run_id"], + "created_at": row["created_at"], + "finished_at": row["finished_at"], + "status": row["status"], + "action": row["action"], + "cli_args": _json_loads(row["args_json"]) or {}, + "velaria_version": row["velaria_version"], + "run_dir": row["run_dir"], + } + runs, _ = self._load_fallback_state() + row = runs.get(run_id) + if row is None: + return None + return { + "run_id": row["run_id"], + "created_at": row["created_at"], + "finished_at": row.get("finished_at"), + "status": row["status"], + "action": row["action"], + "cli_args": _json_loads(row.get("args_json")) or {}, + "velaria_version": row.get("velaria_version"), + "run_dir": row["run_dir"], + } + + def insert_artifact(self, artifact_meta: dict[str, Any]) -> None: + payload = dict(artifact_meta) + payload["schema_json"] = _json_dumps(payload.get("schema_json")) + payload["preview_json"] = _json_dumps(payload.get("preview_json")) + payload["tags_json"] = _json_dumps(payload.get("tags_json")) + if self.backend == "sqlite": + assert self._conn is not None + self._conn.execute( + """ + INSERT OR REPLACE INTO artifacts ( + artifact_id, run_id, created_at, type, uri, format, row_count, schema_json, preview_json, + tags_json + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + payload["artifact_id"], + payload["run_id"], + payload["created_at"], + payload["type"], + payload["uri"], + payload["format"], + payload.get("row_count"), + payload.get("schema_json"), + payload.get("preview_json"), + payload.get("tags_json"), + ), + ) + self._conn.commit() + return + self._append_event({"kind": "artifact_upsert", "payload": payload}) + + def update_artifact_preview(self, artifact_id: str, preview_json: dict[str, Any]) -> None: + encoded = _json_dumps(preview_json) + if self.backend == "sqlite": + assert self._conn is not None + self._conn.execute( + "UPDATE artifacts SET preview_json = ? WHERE artifact_id = ?", + (encoded, artifact_id), + ) + self._conn.commit() + return + self._append_event( + { + "kind": "artifact_preview", + "artifact_id": artifact_id, + "preview_json": encoded, + } + ) + + def _artifact_from_row(self, row: dict[str, Any] | sqlite3.Row) -> dict[str, Any]: + return { + "artifact_id": row["artifact_id"], + "run_id": row["run_id"], + "created_at": row["created_at"], + "type": row["type"], + "uri": row["uri"], + "format": row["format"], + "row_count": row["row_count"], + "schema_json": _json_loads(row["schema_json"]), + "preview_json": _json_loads(row["preview_json"]), + "tags_json": _json_loads(row["tags_json"]) or [], + } + + def list_artifacts( + self, + limit: int = 50, + run_id: str | None = None, + since: str | None = None, + tag: str | None = None, + ) -> list[dict[str, Any]]: + if self.backend == "sqlite": + assert self._conn is not None + clauses: list[str] = [] + params: list[Any] = [] + if run_id: + clauses.append("run_id = ?") + params.append(run_id) + if since: + clauses.append("created_at >= ?") + params.append(since) + where = f"WHERE {' AND '.join(clauses)}" if clauses else "" + rows = self._conn.execute( + f""" + SELECT * FROM artifacts + {where} + ORDER BY created_at DESC + LIMIT ? + """, + (*params, limit), + ).fetchall() + artifacts = [self._artifact_from_row(row) for row in rows] + else: + _, state = self._load_fallback_state() + artifacts = list(state.values()) + if run_id: + artifacts = [artifact for artifact in artifacts if artifact["run_id"] == run_id] + if since: + artifacts = [artifact for artifact in artifacts if artifact["created_at"] >= since] + artifacts = [self._artifact_from_row(artifact) for artifact in artifacts] + artifacts.sort(key=lambda item: item["created_at"], reverse=True) + artifacts = artifacts[:limit] + if tag: + artifacts = [artifact for artifact in artifacts if tag in artifact["tags_json"]] + return artifacts + + def get_artifact(self, artifact_id: str) -> dict[str, Any] | None: + if self.backend == "sqlite": + assert self._conn is not None + row = self._conn.execute( + "SELECT * FROM artifacts WHERE artifact_id = ?", + (artifact_id,), + ).fetchone() + if row is None: + return None + return self._artifact_from_row(row) + _, artifacts = self._load_fallback_state() + row = artifacts.get(artifact_id) + if row is None: + return None + return self._artifact_from_row(row) + + def _select_runs_for_cleanup( + self, + keep_last_n: int | None, + ttl_days: int | None, + ) -> list[dict[str, Any]]: + if self.backend == "sqlite": + assert self._conn is not None + rows = self._conn.execute( + "SELECT * FROM runs ORDER BY created_at DESC" + ).fetchall() + runs = [ + { + "run_id": row["run_id"], + "created_at": row["created_at"], + "run_dir": row["run_dir"], + "status": row["status"], + } + for row in rows + ] + else: + runs_state, _ = self._load_fallback_state() + runs = [ + { + "run_id": row["run_id"], + "created_at": row["created_at"], + "run_dir": row["run_dir"], + "status": row["status"], + } + for row in runs_state.values() + ] + runs.sort(key=lambda item: item["created_at"], reverse=True) + keep_ids = {run["run_id"] for run in runs[:keep_last_n]} if keep_last_n else set() + threshold = None + if ttl_days is not None: + threshold = datetime.now(timezone.utc) - timedelta(days=ttl_days) + selected: list[dict[str, Any]] = [] + for run in runs: + if run["run_id"] in keep_ids: + continue + if run.get("status") not in TERMINAL_RUN_STATUSES: + continue + created_at = _parse_timestamp(run["created_at"]) + expired = threshold is not None and created_at is not None and created_at < threshold + keep_overflow = keep_last_n is not None and run["run_id"] not in keep_ids + if expired or keep_overflow: + selected.append(run) + return selected + + def cleanup_runs( + self, + keep_last_n: int | None = None, + ttl_days: int | None = None, + delete_files: bool = False, + ) -> dict[str, Any]: + selected = self._select_runs_for_cleanup(keep_last_n, ttl_days) + deleted_run_ids = [run["run_id"] for run in selected] + if self.backend == "sqlite": + assert self._conn is not None + for run_id in deleted_run_ids: + self._conn.execute("DELETE FROM artifacts WHERE run_id = ?", (run_id,)) + self._conn.execute("DELETE FROM runs WHERE run_id = ?", (run_id,)) + self._conn.commit() + else: + for run_id in deleted_run_ids: + self._append_event( + { + "kind": "run_delete", + "run_id": run_id, + "created_at": _utc_now(), + } + ) + deleted_dirs: list[str] = [] + if delete_files: + for run in selected: + run_dir = pathlib.Path(run["run_dir"]) + if run_dir.exists(): + shutil.rmtree(run_dir) + deleted_dirs.append(str(run_dir)) + return { + "deleted_run_ids": deleted_run_ids, + "deleted_run_dirs": deleted_dirs, + "backend": self.backend, + } diff --git a/python_api/velaria/workspace/paths.py b/python_api/velaria/workspace/paths.py new file mode 100644 index 0000000..9aea03e --- /dev/null +++ b/python_api/velaria/workspace/paths.py @@ -0,0 +1,31 @@ +import os +import pathlib + + +def get_velaria_home() -> pathlib.Path: + raw = os.environ.get("VELARIA_HOME") + if raw: + return pathlib.Path(raw).expanduser().resolve() + return (pathlib.Path.home() / ".velaria").resolve() + + +def get_runs_dir() -> pathlib.Path: + return get_velaria_home() / "runs" + + +def get_index_dir() -> pathlib.Path: + return get_velaria_home() / "index" + + +def ensure_dirs() -> dict[str, pathlib.Path]: + home = get_velaria_home() + runs = get_runs_dir() + index = get_index_dir() + home.mkdir(parents=True, exist_ok=True) + runs.mkdir(parents=True, exist_ok=True) + index.mkdir(parents=True, exist_ok=True) + return { + "home": home, + "runs": runs, + "index": index, + } diff --git a/python_api/velaria/workspace/run_store.py b/python_api/velaria/workspace/run_store.py new file mode 100644 index 0000000..b092270 --- /dev/null +++ b/python_api/velaria/workspace/run_store.py @@ -0,0 +1,135 @@ +from __future__ import annotations + +import json +import pathlib +import secrets +import tempfile +from datetime import datetime, timezone +from typing import Any + +from .paths import ensure_dirs, get_runs_dir +from .types import RunRecord + + +def utc_now() -> str: + return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z") + + +def _make_run_id() -> str: + ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") + return f"{ts}_{secrets.token_hex(4)}" + + +def get_run_dir(run_id: str) -> pathlib.Path: + return get_runs_dir() / run_id + + +def get_run_file(run_id: str) -> pathlib.Path: + return get_run_dir(run_id) / "run.json" + + +def _write_json(path: pathlib.Path, payload: dict[str, Any]) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + with tempfile.NamedTemporaryFile( + "w", + encoding="utf-8", + dir=str(path.parent), + delete=False, + ) as handle: + json.dump(payload, handle, indent=2, ensure_ascii=False) + handle.write("\n") + tmp_path = pathlib.Path(handle.name) + tmp_path.replace(path) + + +def read_run(run_id: str) -> dict[str, Any]: + with get_run_file(run_id).open("r", encoding="utf-8") as handle: + return json.load(handle) + + +def update_run(run_id: str, **updates: Any) -> dict[str, Any]: + payload = read_run(run_id) + for key, value in updates.items(): + if value is not None: + payload[key] = value + _write_json(get_run_file(run_id), payload) + return payload + + +def create_run( + action: str, + args: dict[str, Any], + velaria_version: str | None, + run_name: str | None = None, +) -> tuple[str, pathlib.Path]: + ensure_dirs() + run_id = _make_run_id() + run_dir = get_run_dir(run_id) + artifacts_dir = run_dir / "artifacts" + artifacts_dir.mkdir(parents=True, exist_ok=True) + (run_dir / "stdout.log").touch() + (run_dir / "stderr.log").touch() + (run_dir / "progress.jsonl").touch() + record = RunRecord( + run_id=run_id, + created_at=utc_now(), + action=action, + cli_args=args, + velaria_version=velaria_version, + run_dir=str(run_dir), + run_name=run_name, + ) + _write_json(run_dir / "run.json", record.to_dict()) + return run_id, run_dir + + +def write_inputs(run_id: str, payload: dict[str, Any]) -> pathlib.Path: + path = get_run_dir(run_id) / "inputs.json" + _write_json(path, payload) + return path + + +def write_explain(run_id: str, payload: dict[str, Any]) -> pathlib.Path: + path = get_run_dir(run_id) / "explain.json" + _write_json(path, payload) + return path + + +def append_progress_snapshot(run_id: str, snapshot_json: str) -> pathlib.Path: + path = get_run_dir(run_id) / "progress.jsonl" + with path.open("a", encoding="utf-8") as handle: + handle.write(snapshot_json.rstrip("\n")) + handle.write("\n") + return path + + +def _append_log(run_id: str, filename: str, message: str) -> pathlib.Path: + path = get_run_dir(run_id) / filename + with path.open("a", encoding="utf-8") as handle: + handle.write(message) + return path + + +def append_stdout(run_id: str, message: str) -> pathlib.Path: + return _append_log(run_id, "stdout.log", message) + + +def append_stderr(run_id: str, message: str) -> pathlib.Path: + return _append_log(run_id, "stderr.log", message) + + +def finalize_run( + run_id: str, + status: str, + finished_at: str | None = None, + error: str | None = None, + details: dict[str, Any] | None = None, +) -> dict[str, Any]: + updates: dict[str, Any] = { + "status": status, + "finished_at": finished_at or utc_now(), + "error": error, + } + if details: + updates["details"] = details + return update_run(run_id, **updates) diff --git a/python_api/velaria/workspace/types.py b/python_api/velaria/workspace/types.py new file mode 100644 index 0000000..02e19bf --- /dev/null +++ b/python_api/velaria/workspace/types.py @@ -0,0 +1,46 @@ +from __future__ import annotations + +from dataclasses import asdict, dataclass, field +from typing import Any + + +JsonValue = dict[str, Any] | list[Any] | str | int | float | bool | None + +PREVIEW_LIMIT_ROWS = 50 +PREVIEW_LIMIT_BYTES = 200 * 1024 + + +@dataclass +class RunRecord: + run_id: str + created_at: str + action: str + cli_args: dict[str, Any] + velaria_version: str | None + run_dir: str + status: str = "running" + finished_at: str | None = None + run_name: str | None = None + error: str | None = None + details: dict[str, Any] = field(default_factory=dict) + + def to_dict(self) -> dict[str, Any]: + return asdict(self) + + +@dataclass +class ArtifactRecord: + artifact_id: str + run_id: str + created_at: str + type: str + uri: str + format: str + row_count: int | None = None + schema_json: JsonValue = None + preview_json: JsonValue = None + tags_json: list[str] | None = None + path: str | None = None + + def to_dict(self) -> dict[str, Any]: + return asdict(self) diff --git a/python_api/velaria_cli.py b/python_api/velaria_cli.py index 121ee15..cbdd473 100644 --- a/python_api/velaria_cli.py +++ b/python_api/velaria_cli.py @@ -1,28 +1,100 @@ +from __future__ import annotations + import argparse +import contextlib +import csv +import io import json +import multiprocessing import pathlib +import secrets +import sys +import traceback +from datetime import datetime, timezone +from typing import Any +from urllib.parse import urlparse -from velaria import Session +import pyarrow as pa +import pyarrow.csv as pa_csv +import pyarrow.ipc as pa_ipc +import pyarrow.parquet as pq +from velaria import Session, __version__ +from velaria.workspace import ( + ArtifactIndex, + append_progress_snapshot, + append_stderr, + create_run, + finalize_run, + read_run, + update_run, + write_explain, + write_inputs, +) +from velaria.workspace.types import PREVIEW_LIMIT_BYTES, PREVIEW_LIMIT_ROWS -def _run_csv_sql(csv_path: pathlib.Path, table: str, query: str) -> int: - session = Session() - df = session.read_csv(str(csv_path)) - session.create_temp_view(table, df) - result = session.sql(query).to_arrow() + +class CliUsageError(ValueError): + pass + + +class JsonArgumentParser(argparse.ArgumentParser): + def error(self, message: str) -> None: + raise CliUsageError(message) + + def exit(self, status: int = 0, message: str | None = None) -> None: + if status == 0: + raise SystemExit(0) + raise CliUsageError(message.strip() if message else "invalid arguments") + + +def _json_dumps(payload: Any) -> str: + return json.dumps(payload, indent=2, ensure_ascii=False) + + +def _emit_json(payload: Any) -> int: + print(_json_dumps(payload)) + return 0 + + +def _emit_error_json(error: str, *, error_type: str = "cli_error") -> int: print( - json.dumps( + _json_dumps( { - "table": table, - "query": query, - "schema": result.schema.names, - "rows": result.to_pylist(), - }, - indent=2, - ensure_ascii=False, + "ok": False, + "error": error, + "error_type": error_type, + } ) ) - return 0 + return 1 + + +def _utc_now() -> str: + return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z") + + +def _new_artifact_id() -> str: + return f"artifact_{secrets.token_hex(8)}" + + +def _normalize_metric(metric: str) -> str: + return "cosine" if metric in ("cosine", "cosin") else metric + + +def _normalize_path(path: pathlib.Path) -> pathlib.Path: + return path.expanduser().resolve() + + +def _uri_from_path(path: pathlib.Path) -> str: + return _normalize_path(path).as_uri() + + +def _path_from_uri(uri: str) -> pathlib.Path: + parsed = urlparse(uri) + if parsed.scheme == "file": + return pathlib.Path(parsed.path) + raise ValueError(f"unsupported artifact uri: {uri}") def _parse_vector_text(text: str) -> list[float]: @@ -31,16 +103,241 @@ def _parse_vector_text(text: str) -> list[float]: value = value[1:-1].strip() if not value: return [] - return [float(part.strip()) for part in value.split(",") if part.strip()] + if "," in value: + parts = [part.strip() for part in value.split(",")] + else: + parts = value.split() + return [float(part) for part in parts if part] -def _run_vector_search( +def _parse_explain_sections(explain: str) -> dict[str, str]: + sections = { + "logical": "", + "physical": "", + "strategy": "", + } + current: str | None = None + for line in explain.splitlines(): + stripped = line.strip() + if stripped in sections: + current = stripped + continue + if current is None: + continue + if sections[current]: + sections[current] += "\n" + sections[current] += line + return sections + + +def _build_batch_explain(logical: str, output_path: pathlib.Path) -> dict[str, str]: + return { + "logical": logical, + "physical": ( + "local-batch\n" + f"result_sink=file://{_normalize_path(output_path)}\n" + "materialization=pyarrow-table" + ), + "strategy": ( + "selected_mode=single-process\n" + "transport_mode=inproc\n" + "execution_reason=batch query executed through DataflowSession" + ), + } + + +def _preview_payload_from_table( + table: pa.Table, + limit: int = PREVIEW_LIMIT_ROWS, + max_bytes: int = PREVIEW_LIMIT_BYTES, +) -> dict[str, Any]: + rows = table.slice(0, limit).to_pylist() + preview: dict[str, Any] = { + "schema": table.schema.names, + "rows": rows, + "row_count": table.num_rows, + "truncated": table.num_rows > limit, + } + encoded = json.dumps(preview, ensure_ascii=False) + while len(encoded.encode("utf-8")) > max_bytes and preview["rows"]: + preview["rows"].pop() + preview["truncated"] = True + encoded = json.dumps(preview, ensure_ascii=False) + if len(encoded.encode("utf-8")) > max_bytes: + preview = { + "schema": table.schema.names, + "rows": [], + "row_count": table.num_rows, + "truncated": True, + "message": "preview truncated to satisfy size limit", + } + return preview + + +def _preview_payload_from_csv( + csv_path: pathlib.Path, + limit: int = PREVIEW_LIMIT_ROWS, + max_bytes: int = PREVIEW_LIMIT_BYTES, +) -> dict[str, Any]: + rows: list[dict[str, Any]] = [] + row_count = 0 + with csv_path.open("r", encoding="utf-8", newline="") as handle: + reader = csv.DictReader(handle) + schema = reader.fieldnames or [] + for row in reader: + row_count += 1 + if len(rows) < limit: + rows.append(dict(row)) + truncated = row_count > len(rows) + preview: dict[str, Any] = { + "schema": schema, + "rows": rows, + "row_count": row_count, + "truncated": truncated, + } + encoded = json.dumps(preview, ensure_ascii=False) + while len(encoded.encode("utf-8")) > max_bytes and preview["rows"]: + preview["rows"].pop() + preview["truncated"] = True + encoded = json.dumps(preview, ensure_ascii=False) + return preview + + +def _finalize_preview_payload( + preview: dict[str, Any], + artifact_row_count: int | None, +) -> dict[str, Any]: + if artifact_row_count is not None: + preview["row_count"] = artifact_row_count + preview["truncated"] = preview.get("truncated", False) or artifact_row_count > len( + preview.get("rows", []) + ) + return preview + + +def _infer_format(path: pathlib.Path) -> str: + suffix = path.suffix.lower() + if suffix == ".csv": + return "csv" + if suffix in (".parquet", ".pq"): + return "parquet" + if suffix in (".arrow", ".feather"): + return "arrow" + raise ValueError(f"unsupported output format for path: {path}") + + +def _write_table(path: pathlib.Path, table: pa.Table) -> str: + fmt = _infer_format(path) + path.parent.mkdir(parents=True, exist_ok=True) + if fmt == "csv": + pa_csv.write_csv(table, str(path)) + elif fmt == "parquet": + pq.write_table(table, str(path)) + elif fmt == "arrow": + with pa.OSFile(str(path), "wb") as sink: + with pa_ipc.new_file(sink, table.schema) as writer: + writer.write_table(table) + return fmt + + +def _table_artifact(path: pathlib.Path, table: pa.Table, tags: list[str]) -> dict[str, Any]: + fmt = _write_table(path, table) + preview = _preview_payload_from_table(table) + return { + "type": "file", + "uri": _uri_from_path(path), + "format": fmt, + "row_count": table.num_rows, + "schema_json": table.schema.names, + "preview_json": preview, + "tags_json": tags, + } + + +def _text_artifact(path: pathlib.Path, text: str, tags: list[str]) -> dict[str, Any]: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(text, encoding="utf-8") + return { + "type": "file", + "uri": _uri_from_path(path), + "format": "text", + "row_count": None, + "schema_json": None, + "preview_json": None, + "tags_json": tags, + } + + +def _read_preview_for_artifact( + artifact: dict[str, Any], + limit: int = PREVIEW_LIMIT_ROWS, +) -> dict[str, Any]: + if artifact.get("preview_json") is not None: + return artifact["preview_json"] + path = _path_from_uri(artifact["uri"]) + fmt = artifact["format"] + if fmt == "csv": + return _finalize_preview_payload( + _preview_payload_from_csv(path, limit=limit), + artifact.get("row_count"), + ) + if fmt == "parquet": + return _finalize_preview_payload( + _preview_payload_from_table(pq.read_table(str(path)), limit=limit), + artifact.get("row_count"), + ) + if fmt == "arrow": + with path.open("rb") as handle: + try: + table = pa_ipc.open_file(handle).read_all() + except pa.ArrowInvalid: + handle.seek(0) + table = pa_ipc.open_stream(handle).read_all() + return _finalize_preview_payload( + _preview_payload_from_table(table, limit=limit), + artifact.get("row_count"), + ) + raise ValueError(f"preview unsupported for format: {fmt}") + + +def _execute_csv_sql( + csv_path: pathlib.Path, + table: str, + query: str, + output_path: pathlib.Path | None = None, + run_id: str | None = None, +) -> dict[str, Any]: + session = Session() + df = session.read_csv(str(csv_path)) + session.create_temp_view(table, df) + result_df = session.sql(query) + logical = result_df.explain() if hasattr(result_df, "explain") else "" + result = result_df.to_arrow() + artifacts: list[dict[str, Any]] = [] + if output_path is not None: + artifacts.append(_table_artifact(output_path, result, ["result", "csv-sql"])) + if run_id is not None: + write_explain(run_id, _build_batch_explain(logical, output_path)) + return { + "payload": { + "table": table, + "query": query, + "schema": result.schema.names, + "rows": result.to_pylist(), + }, + "artifacts": artifacts, + } + + +def _execute_vector_search( csv_path: pathlib.Path, vector_column: str, query_vector: str, metric: str, top_k: int, -) -> int: + output_path: pathlib.Path | None = None, + explain_path: pathlib.Path | None = None, +) -> dict[str, Any]: session = Session() df = session.read_csv(str(csv_path)) session.create_temp_view("input_table", df) @@ -62,91 +359,544 @@ def _run_vector_search( top_k=top_k, metric=metric, ) - payload = { - "metric": "cosine" if metric in ("cosine", "cosin") else metric, - "top_k": top_k, - "schema": result.schema.names, - "rows": result.to_pylist(), - "explain": explain, + artifacts: list[dict[str, Any]] = [] + if output_path is not None: + artifacts.append(_table_artifact(output_path, result, ["result", "vector-search"])) + if explain_path is not None: + artifacts.append(_text_artifact(explain_path, explain, ["explain", "vector-search"])) + return { + "payload": { + "metric": _normalize_metric(metric), + "top_k": top_k, + "schema": result.schema.names, + "rows": result.to_pylist(), + "explain": explain, + }, + "artifacts": artifacts, } - print(json.dumps(payload, indent=2, ensure_ascii=False)) - return 0 -def _build_parser() -> argparse.ArgumentParser: - parser = argparse.ArgumentParser( - prog="velaria-cli", - description="Velaria CLI for SQL query execution.", +def _execute_stream_sql_once( + source_csv_dir: pathlib.Path, + source_table: str, + source_delimiter: str, + sink_table: str, + sink_schema: str, + sink_path: pathlib.Path, + sink_delimiter: str, + query: str, + trigger_interval_ms: int, + checkpoint_delivery_mode: str, + execution_mode: str, + local_workers: int, + max_inflight_partitions: int, + max_batches: int, + run_id: str, +) -> dict[str, Any]: + if not query.lstrip().upper().startswith("INSERT INTO"): + raise ValueError("--query for stream-sql-once must start with INSERT INTO") + effective_max_batches = max_batches if max_batches > 0 else 1 + session = Session() + stream_df = session.read_stream_csv_dir(str(source_csv_dir), delimiter=source_delimiter) + session.create_temp_view(source_table, stream_df) + session.sql( + f"CREATE SINK TABLE {sink_table} ({sink_schema}) " + f"USING csv OPTIONS(path '{_normalize_path(sink_path)}', delimiter '{sink_delimiter}')" ) - subparsers = parser.add_subparsers(dest="command", required=True) + explain = session.explain_stream_sql( + query, + trigger_interval_ms=trigger_interval_ms, + checkpoint_delivery_mode=checkpoint_delivery_mode, + execution_mode=execution_mode, + local_workers=local_workers, + max_inflight_partitions=max_inflight_partitions, + ) + write_explain(run_id, _parse_explain_sections(explain)) + streaming_query = session.start_stream_sql( + query, + trigger_interval_ms=trigger_interval_ms, + checkpoint_delivery_mode=checkpoint_delivery_mode, + execution_mode=execution_mode, + local_workers=local_workers, + max_inflight_partitions=max_inflight_partitions, + ) + streaming_query.start() + append_progress_snapshot(run_id, streaming_query.snapshot_json()) + processed = streaming_query.await_termination(max_batches=effective_max_batches) + append_progress_snapshot(run_id, streaming_query.snapshot_json()) + result = session.read_csv(str(sink_path)).to_arrow() + artifacts = [_table_artifact(sink_path, result, ["result", "stream-sql-once"])] + return { + "payload": { + "processed_batches": processed, + "schema": result.schema.names, + "rows": result.to_pylist(), + "progress": streaming_query.progress(), + }, + "artifacts": artifacts, + } - csv_sql = subparsers.add_parser( - "csv-sql", - help="Read CSV and run a SQL query through DataflowSession.", + +def _run_csv_sql(csv_path: pathlib.Path, table: str, query: str) -> int: + return _emit_json(_execute_csv_sql(csv_path, table, query)["payload"]) + + +def _run_vector_search( + csv_path: pathlib.Path, + vector_column: str, + query_vector: str, + metric: str, + top_k: int, +) -> int: + return _emit_json( + _execute_vector_search(csv_path, vector_column, query_vector, metric, top_k)["payload"] ) - csv_sql.add_argument( - "--csv", - required=True, - help="CSV file path.", + + +def _child_result_path(run_dir: pathlib.Path) -> pathlib.Path: + return run_dir / "_action_result.json" + + +def _worker_stdout_path(run_dir: pathlib.Path) -> pathlib.Path: + return run_dir / "stdout.log" + + +def _worker_stderr_path(run_dir: pathlib.Path) -> pathlib.Path: + return run_dir / "stderr.log" + + +def _execute_action_for_run(spec: dict[str, Any]) -> dict[str, Any]: + action = spec["action"] + args = spec["action_args"] + run_id = spec["run_id"] + run_dir = pathlib.Path(spec["run_dir"]) + artifacts_dir = run_dir / "artifacts" + if action == "csv-sql": + output_path = pathlib.Path(args["output_path"]) if args.get("output_path") else artifacts_dir / "result.parquet" + return _execute_csv_sql( + csv_path=pathlib.Path(args["csv"]), + table=args["table"], + query=args["query"], + output_path=output_path, + run_id=run_id, + ) + if action == "vector-search": + output_path = pathlib.Path(args["output_path"]) if args.get("output_path") else artifacts_dir / "result.parquet" + explain_path = artifacts_dir / "vector_explain.txt" + return _execute_vector_search( + csv_path=pathlib.Path(args["csv"]), + vector_column=args["vector_column"], + query_vector=args["query_vector"], + metric=args["metric"], + top_k=args["top_k"], + output_path=output_path, + explain_path=explain_path, + ) + if action == "stream-sql-once": + sink_path = pathlib.Path(args["sink_path"]) if args.get("sink_path") else artifacts_dir / "stream_result.csv" + return _execute_stream_sql_once( + source_csv_dir=pathlib.Path(args["source_csv_dir"]), + source_table=args["source_table"], + source_delimiter=args["source_delimiter"], + sink_table=args["sink_table"], + sink_schema=args["sink_schema"], + sink_path=sink_path, + sink_delimiter=args["sink_delimiter"], + query=args["query"], + trigger_interval_ms=args["trigger_interval_ms"], + checkpoint_delivery_mode=args["checkpoint_delivery_mode"], + execution_mode=args["execution_mode"], + local_workers=args["local_workers"], + max_inflight_partitions=args["max_inflight_partitions"], + max_batches=args["max_batches"], + run_id=run_id, + ) + raise ValueError(f"unsupported run action: {action}") + + +def _run_action_subprocess_target(spec: dict[str, Any], result_path: str, stdout_path: str, stderr_path: str) -> None: + with open(stdout_path, "a", encoding="utf-8") as stdout_handle, open( + stderr_path, "a", encoding="utf-8" + ) as stderr_handle, contextlib.redirect_stdout(stdout_handle), contextlib.redirect_stderr( + stderr_handle + ): + try: + result = _execute_action_for_run(spec) + pathlib.Path(result_path).write_text(_json_dumps(result), encoding="utf-8") + except Exception as exc: # pragma: no cover - exercised via parent behavior + traceback.print_exc(file=stderr_handle) + failure = { + "error": str(exc), + "traceback": traceback.format_exc(), + } + pathlib.Path(result_path).write_text(_json_dumps(failure), encoding="utf-8") + raise + + +def _run_action_with_timeout(spec: dict[str, Any], timeout_ms: int | None) -> dict[str, Any]: + result_path = _child_result_path(pathlib.Path(spec["run_dir"])) + stdout_path = _worker_stdout_path(pathlib.Path(spec["run_dir"])) + stderr_path = _worker_stderr_path(pathlib.Path(spec["run_dir"])) + context = multiprocessing.get_context("spawn") + process = context.Process( + target=_run_action_subprocess_target, + args=(spec, str(result_path), str(stdout_path), str(stderr_path)), ) - csv_sql.add_argument( + process.start() + process.join(None if timeout_ms is None else timeout_ms / 1000.0) + if process.is_alive(): + process.terminate() + process.join() + return { + "timed_out": True, + "error": f"run timed out after {timeout_ms} ms", + } + if not result_path.exists(): + return { + "timed_out": False, + "error": f"action process exited with code {process.exitcode}", + } + payload = json.loads(result_path.read_text(encoding="utf-8")) + if process.exitcode not in (0, None): + payload.setdefault("error", f"action process exited with code {process.exitcode}") + return payload + + +def _register_artifacts( + index: ArtifactIndex, + run_id: str, + artifacts: list[dict[str, Any]], +) -> list[dict[str, Any]]: + created: list[dict[str, Any]] = [] + for artifact in artifacts: + record = dict(artifact) + record["artifact_id"] = _new_artifact_id() + record["run_id"] = run_id + record["created_at"] = _utc_now() + index.insert_artifact(record) + created.append(record) + return created + + +def _add_csv_sql_arguments(parser: argparse.ArgumentParser) -> None: + parser.add_argument("--csv", required=True, help="CSV file path.") + parser.add_argument( "--table", default="input_table", help="Temporary table name exposed to session.sql(...).", ) - csv_sql.add_argument( - "--query", - required=True, - help="SQL query text.", - ) + parser.add_argument("--query", required=True, help="SQL query text.") - vector_search = subparsers.add_parser( - "vector-search", - help="Read CSV and run fixed-length vector nearest search.", - ) - vector_search.add_argument("--csv", required=True, help="CSV file path.") - vector_search.add_argument( + +def _add_vector_search_arguments(parser: argparse.ArgumentParser) -> None: + parser.add_argument("--csv", required=True, help="CSV file path.") + parser.add_argument( "--vector-column", required=True, help="Vector column name. CSV row value format should use bracketed vectors like '[1 2 3]' or '[1,2,3]'.", ) - vector_search.add_argument( - "--query-vector", - required=True, - help="Query vector, e.g. '0.1,0.2,0.3'.", - ) - vector_search.add_argument( + parser.add_argument("--query-vector", required=True, help="Query vector, e.g. '0.1,0.2,0.3'.") + parser.add_argument( "--metric", default="cosine", choices=["cosine", "cosin", "dot", "l2"], help="Distance metric.", ) - vector_search.add_argument( - "--top-k", - type=int, - default=5, - help="Return top-k nearest rows.", + parser.add_argument("--top-k", type=int, default=5, help="Return top-k nearest rows.") + + +def _add_stream_sql_once_arguments(parser: argparse.ArgumentParser) -> None: + parser.add_argument("--source-csv-dir", required=True, help="Streaming source CSV directory.") + parser.add_argument("--source-table", default="input_stream", help="Streaming source temp view name.") + parser.add_argument("--source-delimiter", default=",", help="Streaming source CSV delimiter.") + parser.add_argument("--sink-table", default="output_sink", help="Sink table name.") + parser.add_argument("--sink-schema", required=True, help="Sink schema body used in CREATE SINK TABLE.") + parser.add_argument("--sink-path", help="Sink CSV path. Defaults to run_dir/artifacts/stream_result.csv.") + parser.add_argument("--sink-delimiter", default=",", help="Sink CSV delimiter.") + parser.add_argument("--query", required=True, help="INSERT INTO ... SELECT ... streaming SQL.") + parser.add_argument("--trigger-interval-ms", type=int, default=0) + parser.add_argument("--checkpoint-delivery-mode", default="at-least-once") + parser.add_argument("--execution-mode", default="single-process") + parser.add_argument("--local-workers", type=int, default=1) + parser.add_argument("--max-inflight-partitions", type=int, default=0) + parser.add_argument("--max-batches", type=int, default=1) + + +def _parse_action_args(action: str, argv: list[str]) -> dict[str, Any]: + parser = JsonArgumentParser(prog=f"velaria-cli {action}") + if action == "csv-sql": + _add_csv_sql_arguments(parser) + parser.add_argument("--output-path") + elif action == "vector-search": + _add_vector_search_arguments(parser) + parser.add_argument("--output-path") + elif action == "stream-sql-once": + _add_stream_sql_once_arguments(parser) + else: + raise ValueError(f"unsupported run action: {action}") + return vars(parser.parse_args(argv)) + + +def _parse_passthrough(command_args: list[str]) -> tuple[str, dict[str, Any]]: + if command_args and command_args[0] == "--": + command_args = command_args[1:] + if not command_args: + raise ValueError("run start requires an action after '--'") + return command_args[0], _parse_action_args(command_args[0], command_args[1:]) + + +def _load_latest_progress(run_id: str) -> dict[str, Any] | None: + progress_path = pathlib.Path(_read_run_or_raise(run_id)["run_dir"]) / "progress.jsonl" + if not progress_path.exists(): + return None + last_line = "" + with progress_path.open("r", encoding="utf-8") as handle: + for line in handle: + if line.strip(): + last_line = line.strip() + return json.loads(last_line) if last_line else None + + +def _read_run_or_raise(run_id: str) -> dict[str, Any]: + try: + return read_run(run_id) + except FileNotFoundError as exc: + raise ValueError(f"run not found: {run_id}") from exc + + +def _run_start(args: argparse.Namespace) -> int: + action, action_args = _parse_passthrough(args.command_args) + run_id, run_dir = create_run(action, action_args, __version__, run_name=args.run_name) + write_inputs( + run_id, + { + "action": action, + "action_args": action_args, + "timeout_ms": args.timeout_ms, + }, ) + index = ArtifactIndex() + index.upsert_run(read_run(run_id)) + spec = { + "run_id": run_id, + "run_dir": str(run_dir), + "action": action, + "action_args": action_args, + } + result = _run_action_with_timeout(spec, args.timeout_ms) + if result.get("timed_out"): + append_stderr(run_id, f"{result['error']}\n") + finalized = finalize_run(run_id, "timed_out", error=result["error"]) + index.upsert_run(finalized) + _emit_json( + { + "ok": False, + "run_id": run_id, + "run_dir": str(run_dir), + "status": "timed_out", + "error": result["error"], + } + ) + return 1 + if "error" in result and "payload" not in result: + append_stderr(run_id, f"{result['error']}\n") + if result.get("traceback"): + append_stderr(run_id, result["traceback"]) + finalized = finalize_run(run_id, "failed", error=result["error"]) + index.upsert_run(finalized) + _emit_json( + { + "ok": False, + "run_id": run_id, + "run_dir": str(run_dir), + "status": "failed", + "error": result["error"], + } + ) + return 1 + created_artifacts = _register_artifacts(index, run_id, result.get("artifacts", [])) + details: dict[str, Any] = {} + for artifact in created_artifacts: + if "explain" in artifact.get("tags_json", []): + details["explain_artifact_id"] = artifact["artifact_id"] + details["explain_artifact_uri"] = artifact["uri"] + if details: + update_run(run_id, details=details) + finalized = finalize_run(run_id, "succeeded") + index.upsert_run(finalized) + return _emit_json( + { + "ok": True, + "run_id": run_id, + "run_dir": str(run_dir), + "status": "succeeded", + "action": action, + "result": result["payload"], + "artifacts": created_artifacts, + } + ) + + +def _run_show(args: argparse.Namespace) -> int: + index = ArtifactIndex() + return _emit_json( + { + "ok": True, + "run": _read_run_or_raise(args.run_id), + "artifacts": index.list_artifacts(run_id=args.run_id, limit=args.limit), + } + ) + + +def _run_status(args: argparse.Namespace) -> int: + index = ArtifactIndex() + run = _read_run_or_raise(args.run_id) + payload: dict[str, Any] = { + "ok": True, + "run_id": args.run_id, + "status": run["status"], + "action": run["action"], + "artifacts": index.list_artifacts(run_id=args.run_id, limit=args.limit), + } + latest_progress = _load_latest_progress(args.run_id) + if latest_progress is not None: + payload["latest_progress"] = latest_progress + return _emit_json(payload) + + +def _artifacts_list(args: argparse.Namespace) -> int: + index = ArtifactIndex() + return _emit_json( + { + "ok": True, + "artifacts": index.list_artifacts(limit=args.limit, run_id=args.run_id), + } + ) + + +def _artifacts_preview(args: argparse.Namespace) -> int: + index = ArtifactIndex() + artifact = index.get_artifact(args.artifact_id) + if artifact is None: + raise ValueError(f"artifact not found: {args.artifact_id}") + preview = _read_preview_for_artifact(artifact, limit=args.limit) + if artifact.get("preview_json") is None: + index.update_artifact_preview(args.artifact_id, preview) + artifact = index.get_artifact(args.artifact_id) or artifact + return _emit_json( + { + "ok": True, + "artifact_id": args.artifact_id, + "preview": preview, + "artifact": artifact, + } + ) + + +def _run_cleanup(args: argparse.Namespace) -> int: + index = ArtifactIndex() + payload = index.cleanup_runs( + keep_last_n=args.keep_last, + ttl_days=args.ttl_days, + delete_files=args.delete_files, + ) + payload["ok"] = True + return _emit_json(payload) + + +def _build_parser() -> argparse.ArgumentParser: + parser = JsonArgumentParser( + prog="velaria-cli", + description="Velaria CLI for SQL query execution and workspace run management.", + ) + subparsers = parser.add_subparsers(dest="command", required=True) + + csv_sql = subparsers.add_parser( + "csv-sql", + help="Read CSV and run a SQL query through DataflowSession.", + ) + _add_csv_sql_arguments(csv_sql) + + vector_search = subparsers.add_parser( + "vector-search", + help="Read CSV and run fixed-length vector nearest search.", + ) + _add_vector_search_arguments(vector_search) + + run_parser = subparsers.add_parser("run", help="Run management commands.") + run_subparsers = run_parser.add_subparsers(dest="run_command", required=True) + + run_start = run_subparsers.add_parser("start", help="Start a tracked run.") + run_start.add_argument("--run-name") + run_start.add_argument("--timeout-ms", type=int) + run_start.add_argument("command_args", nargs=argparse.REMAINDER) + + run_show = run_subparsers.add_parser("show", help="Show a run and its artifacts.") + run_show.add_argument("--run-id", required=True) + run_show.add_argument("--limit", type=int, default=50) + + run_status = run_subparsers.add_parser("status", help="Show run status.") + run_status.add_argument("--run-id", required=True) + run_status.add_argument("--limit", type=int, default=20) + + run_cleanup = run_subparsers.add_parser("cleanup", help="Cleanup indexed runs.") + run_cleanup.add_argument("--keep-last", type=int) + run_cleanup.add_argument("--ttl-days", type=int) + run_cleanup.add_argument("--delete-files", action="store_true") + + artifacts_parser = subparsers.add_parser("artifacts", help="Artifact index commands.") + artifacts_subparsers = artifacts_parser.add_subparsers(dest="artifacts_command", required=True) + + artifacts_list = artifacts_subparsers.add_parser("list", help="List artifacts.") + artifacts_list.add_argument("--run-id") + artifacts_list.add_argument("--limit", type=int, default=50) + + artifacts_preview = artifacts_subparsers.add_parser("preview", help="Preview an artifact.") + artifacts_preview.add_argument("--artifact-id", required=True) + artifacts_preview.add_argument("--limit", type=int, default=PREVIEW_LIMIT_ROWS) + return parser -def main() -> int: - parser = _build_parser() - args = parser.parse_args() +def main(argv: list[str] | None = None) -> int: + try: + parser = _build_parser() + args = parser.parse_args(argv) - if args.command == "csv-sql": - return _run_csv_sql(pathlib.Path(args.csv), args.table, args.query) - if args.command == "vector-search": - return _run_vector_search( - csv_path=pathlib.Path(args.csv), - vector_column=args.vector_column, - query_vector=args.query_vector, - metric=args.metric, - top_k=args.top_k, - ) + if args.command == "csv-sql": + return _run_csv_sql(pathlib.Path(args.csv), args.table, args.query) + if args.command == "vector-search": + return _run_vector_search( + csv_path=pathlib.Path(args.csv), + vector_column=args.vector_column, + query_vector=args.query_vector, + metric=args.metric, + top_k=args.top_k, + ) + if args.command == "run": + if args.run_command == "start": + return _run_start(args) + if args.run_command == "show": + return _run_show(args) + if args.run_command == "status": + return _run_status(args) + if args.run_command == "cleanup": + return _run_cleanup(args) + if args.command == "artifacts": + if args.artifacts_command == "list": + return _artifacts_list(args) + if args.artifacts_command == "preview": + return _artifacts_preview(args) - parser.error(f"unsupported command: {args.command}") - return 2 + raise CliUsageError("unsupported command") + except CliUsageError as exc: + return _emit_error_json(str(exc), error_type="usage_error") + except ValueError as exc: + return _emit_error_json(str(exc), error_type="value_error") + except FileNotFoundError as exc: + return _emit_error_json(str(exc), error_type="file_not_found") + except Exception as exc: + return _emit_error_json(str(exc), error_type="internal_error") if __name__ == "__main__": diff --git a/scripts/build_py_cli_executable.sh b/scripts/build_py_cli_executable.sh index 890948d..1b8e6d1 100755 --- a/scripts/build_py_cli_executable.sh +++ b/scripts/build_py_cli_executable.sh @@ -20,10 +20,18 @@ if [[ ! -f "${NATIVE_SO}" ]]; then exit 1 fi -cp "${NATIVE_SO}" "${PY_DIR}/velaria/_velaria.so" +TARGET_SO="${PY_DIR}/velaria/_velaria.so" +if [[ -f "${TARGET_SO}" ]]; then + chmod u+w "${TARGET_SO}" || true + rm -f "${TARGET_SO}" +fi +cp "${NATIVE_SO}" "${TARGET_SO}" cleanup() { - rm -f "${PY_DIR}/velaria/_velaria.so" + if [[ -f "${TARGET_SO}" ]]; then + chmod u+w "${TARGET_SO}" || true + fi + rm -f "${TARGET_SO}" } trap cleanup EXIT diff --git a/scripts/run_python_ci_checks.sh b/scripts/run_python_ci_checks.sh index fbcbd34..bdd50fc 100755 --- a/scripts/run_python_ci_checks.sh +++ b/scripts/run_python_ci_checks.sh @@ -17,6 +17,7 @@ if ! command -v uv >/dev/null 2>&1; then fi bazel build //:velaria_pyext +bazel run //python_api:sync_native_extension uv sync --project python_api --python "${VELARIA_PYTHON_BIN}" bazel test //:python_ecosystem_regression diff --git a/scripts/run_python_ecosystem_regression.sh b/scripts/run_python_ecosystem_regression.sh index d0e2e05..f2957bb 100755 --- a/scripts/run_python_ecosystem_regression.sh +++ b/scripts/run_python_ecosystem_regression.sh @@ -20,6 +20,7 @@ if ! command -v uv >/dev/null 2>&1; then fi bazel build //:velaria_pyext //python_api:velaria_whl //python_api:velaria_native_whl //python_api:velaria_cli +bazel run //python_api:sync_native_extension bazel test //:python_ecosystem_regression uv sync --project python_api --python "${VELARIA_PYTHON_BIN}" diff --git a/skills/velaria_python_local/SKILL.md b/skills/velaria_python_local/SKILL.md index 2693042..1b1ea9d 100644 --- a/skills/velaria_python_local/SKILL.md +++ b/skills/velaria_python_local/SKILL.md @@ -42,9 +42,202 @@ result = session.sql("SELECT region, SUM(amount) AS amount_sum FROM sales GROUP print(result.to_pylist()) ``` -## 4. 脚本示例 +如果你更适合用 CLI 而不是临时 Python 片段,现在也可以直接使用 workspace/run store。 +下文统一用 `` 表示你环境里实际可用的 Velaria CLI 入口;它可能是安装后提供的命令,也可能是分发包里的同名可执行文件。 -### 4.1 CSV 到 SQL +```bash + run start -- csv-sql \ + --csv path/to/file.csv \ + --query "SELECT region, COUNT(*) AS cnt FROM input_table GROUP BY region" + + run show --run-id + artifacts list --run-id +``` + +## 4. 新增功能与参数说明 + +### 4.1 `velaria-cli run start` + +用途: + +- 启动一次被 workspace 跟踪的执行 +- 自动生成 `run_id` +- 自动创建 run 目录 +- 自动记录 `run.json`、`inputs.json`、`stdout.log`、`stderr.log`、`progress.jsonl`、`artifacts/` + +基础语法: + +```bash + run start -- ... +``` + +公共参数: + +- `--run-name`:给本次执行起一个更易读的名字,便于人工检索 +- `--timeout-ms`:超时毫秒数;超时后 run 会标记为 `timed_out` + +当前支持的 action: + +- `csv-sql` +- `vector-search` +- `stream-sql-once` + +### 4.2 `csv-sql`(tracked run 模式) + +用途: + +- 读取本地 CSV +- 注册成临时视图 +- 执行 `session.sql(...)` +- 把结果落到 artifact 文件,并生成 preview + +关键参数: + +- `--csv`:输入 CSV 路径 +- `--table`:临时视图名,默认 `input_table` +- `--query`:要执行的 SQL +- `--output-path`:结果文件路径;不传时默认写到 `run_dir/artifacts/result.parquet` + +附加行为: + +- 会生成 `explain.json` +- `logical` 来自 `DataFrame.explain()` +- `physical` / `strategy` 是 Python 生态层补齐的本地 batch 执行说明 + +### 4.3 `vector-search`(tracked run 模式) + +用途: + +- 从 CSV 读入向量列 +- 调用 `Session.vector_search(...)` +- 落结果 artifact,并把 native vector explain 单独保存 + +关键参数: + +- `--csv`:输入 CSV 路径 +- `--vector-column`:向量列名 +- `--query-vector`:查询向量,例如 `1.0,0.0,0.0` +- `--metric`:距离指标,可选 `cosine`、`cosin`、`dot`、`l2` +- `--top-k`:返回前 K 个结果 +- `--output-path`:结果文件路径;不传时默认写到 `run_dir/artifacts/result.parquet` + +附加行为: + +- explain 不会包装成 `logical/physical/strategy` +- native explain 会单独写到 `run_dir/artifacts/vector_explain.txt` + +### 4.4 `stream-sql-once` + +用途: + +- 用目录流 CSV 作为 source +- 建立 CSV sink +- 执行一次 `INSERT INTO ... SELECT ...` 的流式 SQL +- 把流式 progress 逐行写到 `progress.jsonl` + +适合场景: + +- agent 需要拿到一次性的流式执行快照 +- 需要保留 native `snapshotJson()` 结果做状态观察 +- 需要把 sink 文件当成 artifact 管理 + +关键参数: + +- `--source-csv-dir`:输入 source 目录 +- `--source-table`:source 临时视图名,默认 `input_stream` +- `--source-delimiter`:source CSV 分隔符,默认 `,` +- `--sink-table`:sink 表名,默认 `output_sink` +- `--sink-schema`:sink schema,必填,用于 `CREATE SINK TABLE` +- `--sink-path`:sink 文件路径;默认 `run_dir/artifacts/stream_result.csv` +- `--sink-delimiter`:sink CSV 分隔符,默认 `,` +- `--query`:流式 SQL,必须以 `INSERT INTO` 开头 +- `--trigger-interval-ms`:trigger 间隔 +- `--checkpoint-delivery-mode`:checkpoint 投递模式,例如 `at-least-once`、`best-effort` +- `--execution-mode`:执行模式,例如 `single-process`、`local-workers` +- `--local-workers`:本地 worker 数 +- `--max-inflight-partitions`:最大并发 partition 数 +- `--max-batches`:最多处理批次数;默认 `1`,保证“once”语义 + +附加行为: + +- `explain.json` 会保留 native `logical/physical/strategy` +- `progress.jsonl` 每一行都直接写 native `snapshotJson()`,不改字段名 + +### 4.5 `run show` + +用途: + +- 查看单个 run 的完整元数据 +- 同时列出该 run 关联的 artifacts + +关键参数: + +- `--run-id`:目标 run id +- `--limit`:返回 artifact 条数上限 + +### 4.6 `run status` + +用途: + +- 查看 run 当前状态 +- 对 stream run 返回最后一条 progress snapshot +- 对 batch / vector run 返回状态和 artifact 摘要 + +关键参数: + +- `--run-id`:目标 run id +- `--limit`:返回 artifact 条数上限 + +### 4.7 `artifacts list` + +用途: + +- 从 artifact 索引列出结果文件、explain 文件、preview 缓存对应的记录 + +关键参数: + +- `--run-id`:只看某个 run 的 artifacts +- `--limit`:最多返回多少条 + +### 4.8 `artifacts preview` + +用途: + +- 读取某个 artifact 的前 N 行 preview +- 若索引中已有 preview,直接复用 +- 若没有,则现算并回写索引 + +关键参数: + +- `--artifact-id`:目标 artifact id +- `--limit`:最多预览多少行,默认 `50` + +支持的预览格式: + +- `csv` +- `parquet` +- `arrow` + +约束: + +- preview 会限制大小,避免 SQLite / JSONL 索引膨胀 + +### 4.9 `run cleanup` + +用途: + +- 清理旧 run 的索引记录 +- 可选删除 run 目录文件 + +关键参数: + +- `--keep-last N`:保留最新 N 个 run +- `--ttl-days D`:删除超过 D 天的 run +- `--delete-files`:显式删除 run 目录;不带该参数时默认只清索引,不删文件 + +## 5. 脚本示例 + +### 5.1 CSV 到 SQL ```bash uv run --with velaria --with "pyarrow==23.0.1" \\ @@ -53,7 +246,7 @@ uv run --with velaria --with "pyarrow==23.0.1" \\ --query "SELECT region, COUNT(*) AS cnt FROM csv_data GROUP BY region" ``` -### 4.2 Excel 到 SQL +### 5.2 Excel 到 SQL ```bash uv run --with velaria --with pandas --with openpyxl \\ @@ -63,7 +256,7 @@ uv run --with velaria --with pandas --with openpyxl \\ --query "SELECT name, dept, COUNT(*) AS cnt FROM excel_data GROUP BY name, dept" ``` -### 4.3 Bitable 到 SQL +### 5.3 Bitable 到 SQL ```bash FEISHU_BITABLE_APP_ID=... \\ @@ -74,7 +267,7 @@ uv run --with velaria --with pandas --with openpyxl \\ --query "SELECT owner, COUNT(*) AS cnt FROM bitable_data GROUP BY owner" ``` -### 4.4 读取本地 Excel 示例 +### 5.4 读取本地 Excel 示例 ```bash uv run --with velaria --with pandas --with openpyxl \\ @@ -83,7 +276,7 @@ uv run --with velaria --with pandas --with openpyxl \\ --query "SELECT name, COUNT(*) AS cnt FROM sheet_data GROUP BY name" ``` -## 5. Skill 自检 +## 6. Skill 自检 ```bash uv run --with velaria --with "pyarrow==23.0.1" \\ @@ -92,8 +285,9 @@ uv run --with velaria --with "pyarrow==23.0.1" \\ 输出 `ok` 代表最小场景(CSV batch + streaming sink)通过。 -## 6. 最小执行清单 +## 7. 最小执行清单 1. 确认 `uv` 环境可运行并已解析 `velaria` 2. 选一个脚本加载数据,确保 `session.create_temp_view(...)` 成功 3. 用 `session.sql(...)`/脚本 `--query` 将你的业务分析逻辑落到 SQL +4. 若需可追踪执行、状态查看和 artifact 预览,优先使用 `velaria-cli run ...` 与 `artifacts ...` diff --git a/src/dataflow/interop/python/python_module.cc b/src/dataflow/interop/python/python_module.cc index 4af3c49..5d265c1 100644 --- a/src/dataflow/interop/python/python_module.cc +++ b/src/dataflow/interop/python/python_module.cc @@ -1270,6 +1270,17 @@ PyObject* dataFrameCount(PyVelariaDataFrame* self, PyObject*) { }); } +PyObject* dataFrameExplain(PyVelariaDataFrame* self, PyObject*) { + return withExceptionTranslation([&]() -> PyObject* { + std::string explain; + { + AllowThreads allow; + explain = self->df_ptr->explain(); + } + return PyUnicode_FromStringAndSize(explain.c_str(), static_cast(explain.size())); + }); +} + PyObject* dataFrameShow(PyVelariaDataFrame* self, PyObject* args, PyObject* kwargs) { return withExceptionTranslation([&]() -> PyObject* { unsigned long long max_rows = 20; @@ -1525,6 +1536,17 @@ PyObject* queryProgress(PyVelariaStreamingQuery* self, PyObject*) { }); } +PyObject* querySnapshotJson(PyVelariaStreamingQuery* self, PyObject*) { + return withExceptionTranslation([&]() -> PyObject* { + std::string snapshot; + { + AllowThreads allow; + snapshot = self->query_ptr->snapshotJson(); + } + return PyUnicode_FromStringAndSize(snapshot.c_str(), static_cast(snapshot.size())); + }); +} + PyMethodDef sessionMethods[] = { {"read_csv", reinterpret_cast(sessionReadCsv), METH_VARARGS | METH_KEYWORDS, "Read a CSV file into a DataFrame."}, @@ -1559,6 +1581,8 @@ PyMethodDef dataFrameMethods[] = { {"__arrow_c_array__", reinterpret_cast(dataFrameArrowCapsules), METH_VARARGS | METH_KEYWORDS, "Export the DataFrame through the Arrow PyCapsule interface."}, {"count", reinterpret_cast(dataFrameCount), METH_NOARGS, "Return the row count."}, + {"explain", reinterpret_cast(dataFrameExplain), METH_NOARGS, + "Return the logical plan explain string."}, {"show", reinterpret_cast(dataFrameShow), METH_VARARGS | METH_KEYWORDS, "Print the DataFrame."}, {nullptr, nullptr, 0, nullptr}, @@ -1596,6 +1620,8 @@ PyMethodDef streamingQueryMethods[] = { "Stop the streaming query."}, {"progress", reinterpret_cast(queryProgress), METH_NOARGS, "Return the current query progress snapshot."}, + {"snapshot_json", reinterpret_cast(querySnapshotJson), METH_NOARGS, + "Return the current query snapshotJson() payload."}, {nullptr, nullptr, 0, nullptr}, };