Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README-zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ examples 与 helper scripts 只用于说明各层,不定义各层。
- 执行模式:`single-process`、`local-workers`、`actor-credit`、`auto`
- 文件 source/sink
- 基础 streaming operators:`select / filter / withColumn / drop / limit / window`
- stateful `sum` 和 `count`
- stateful streaming 聚合:`sum / count / min / max / avg`
- stream SQL grouped aggregate output:`SUM(col)`、`COUNT(*)`、`MIN(col)`、`MAX(col)`、`AVG(col)`
- 最小 stream SQL 子集
- 固定维度 float vector 的本地检索
- Python Arrow 输入/输出
Expand All @@ -181,6 +182,7 @@ examples 与 helper scripts 只用于说明各层,不定义各层。
- 把 Python callback 拉进热路径
- Python UDF
- 把 actor 并行化扩成任意 plan 的通用机制
- 超出当前 `window_start,key + SUM(value)` / `COUNT(*)` 热路径之外的 actor acceleration
- 宽泛 SQL 扩展,例如完整 `JOIN / CTE / subquery / UNION`
- ANN / 独立 vector DB / 分布式 vector 执行

Expand Down
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ Available today:
- execution modes: `single-process`, `local-workers`, `actor-credit`, `auto`
- file source/sink support
- basic stream operators: `select / filter / withColumn / drop / limit / window`
- stateful `sum` and `count`
- stateful stream aggregates: `sum / count / min / max / avg`
- stream SQL grouped aggregate outputs with `SUM(col)`, `COUNT(*)`, `MIN(col)`, `MAX(col)`, `AVG(col)`
- minimal stream SQL subset
- local vector search on fixed-dimension float vectors
- Python Arrow ingestion and output
Expand All @@ -181,6 +182,7 @@ Out of scope in the current repo state:
- Python callback execution in the hot path
- Python UDFs
- generic actor parallelization for arbitrary plans
- actor acceleration beyond the current `window_start,key + SUM(value)` / `COUNT(*)` hot paths
- broad SQL expansion such as full `JOIN / CTE / subquery / UNION`
- ANN / standalone vector DB / distributed vector execution

Expand Down
13 changes: 8 additions & 5 deletions docs/streaming_runtime_design.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,21 @@ This document describes runtime internals and current implementation shape. For

## Current Actor Pushdown Boundary

当前只有一类 query 会被 `ActorCredit` / `Auto` 接住:
当前只有两类 query 会被 `ActorCredit` / `Auto` 接住:

- 前置变换全部为 `PartitionLocal`
- 最后一个 barrier 是 `groupBy({"window_start", "key"}).sum("value", ...)`
- 最后一个 barrier 是以下其一:
- `groupBy({"window_start", "key"}).sum("value", ...)`
- `groupBy({"window_start", "key"}).count(...)`

也就是说,当前 actor runtime 只服务于:

- `window_start`
- `key`
- `value`
- `value` 的窗口分组求和热路径
- `COUNT(*)` 的窗口分组计数热路径

这条窗口分组求和热路径
`MIN / MAX / AVG` 以及多 aggregate 输出当前仍走本地执行链;`Auto` 会明确写出 fallback reason

如果 query 不满足这个形状,`StreamingQuery` 会回退到普通执行链,并把原因写入:

Expand Down Expand Up @@ -188,7 +191,7 @@ actor-stream payload 当前使用 typed binary batch:

## Recommended Next Steps

1. 扩展 actor pushdown 到 `count` 和更多 group aggregate。
1. 扩展 actor pushdown 到更多 group aggregate 与多 aggregate 输出
2. 调整 query 级 `Auto` 阈值,使其更贴近真实 `StreamingQuery` workload。
3. 给 query 级 progress 增加更细粒度的 actor 指标拆分。
4. 继续把 source 到执行内核的中间 `Table/Row/Value` 转换收紧到更早的列式表示。
Expand Down
79 changes: 79 additions & 0 deletions python_api/tests/test_streaming_v05.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import csv
import json
import pathlib
import tempfile
Expand Down Expand Up @@ -63,6 +64,57 @@ def test_stream_progress_contract_with_start_stream_sql(self):
snapshot_json = json.dumps(progress)
self.assertIn("execution_mode", snapshot_json)

def test_start_stream_sql_supports_multi_aggregate_and_having_alias(self):
session = velaria.Session()
table = pa.table(
{
"ts": [
"2026-03-29T10:00:00",
"2026-03-29T10:00:10",
"2026-03-29T10:00:20",
"2026-03-29T10:00:30",
],
"key": ["u1", "u1", "u1", "u2"],
"value": [10, 5, 7, 4],
}
)
stream_df = session.create_stream_from_arrow(table)
session.create_temp_view("events_stream_multi", stream_df)

with tempfile.TemporaryDirectory(prefix="velaria-py-stream-multi-") as tmp:
sink_path = str(pathlib.Path(tmp) / "sink.csv")
session.sql(
"CREATE SINK TABLE sink_multi "
"(window_start STRING, key STRING, event_count INT, value_sum INT, "
"min_value INT, max_value INT, avg_value DOUBLE) "
f"USING csv OPTIONS(path '{sink_path}', delimiter ',')"
)
query = session.start_stream_sql(
"INSERT INTO sink_multi "
"SELECT window_start, key, COUNT(*) AS event_count, SUM(value) AS value_sum, "
"MIN(value) AS min_value, MAX(value) AS max_value, AVG(value) AS avg_value "
"FROM events_stream_multi "
"WINDOW BY ts EVERY 60000 AS window_start "
"GROUP BY window_start, key HAVING avg_value > 6",
trigger_interval_ms=0,
)
query.start()
processed = query.await_termination()

self.assertEqual(processed, 1)

with open(sink_path, newline="", encoding="utf-8") as handle:
rows = list(csv.DictReader(handle))

self.assertEqual(len(rows), 1)
self.assertEqual(rows[0]["window_start"], "2026-03-29T10:00:00")
self.assertEqual(rows[0]["key"], "u1")
self.assertEqual(float(rows[0]["event_count"]), 3.0)
self.assertEqual(float(rows[0]["value_sum"]), 22.0)
self.assertEqual(float(rows[0]["min_value"]), 5.0)
self.assertEqual(float(rows[0]["max_value"]), 10.0)
self.assertAlmostEqual(float(rows[0]["avg_value"]), 22.0 / 3.0, places=5)

def test_explain_stream_sql_returns_strategy_sections(self):
session = velaria.Session()
table = pa.table(
Expand Down Expand Up @@ -90,6 +142,33 @@ def test_explain_stream_sql_returns_strategy_sections(self):
self.assertIn("selected_mode", explain)
self.assertIn("checkpoint_delivery_mode=best-effort", explain)

def test_explain_stream_sql_reports_having_actor_fallback_reason(self):
session = velaria.Session()
table = pa.table(
{
"ts": ["2026-03-29T10:00:00", "2026-03-29T10:00:10"],
"key": ["u1", "u1"],
"value": [1, 2],
}
)
stream_df = session.create_stream_from_arrow(table)
session.create_temp_view("events_stream_having_explain", stream_df)

explain = session.explain_stream_sql(
"SELECT window_start, key, SUM(value) AS value_sum "
"FROM events_stream_having_explain "
"WINDOW BY ts EVERY 60000 AS window_start "
"GROUP BY window_start, key HAVING value_sum > 0",
trigger_interval_ms=0,
)

reason = (
"actor acceleration requires the aggregate hot path to be the final stream transform"
)
self.assertIn("actor_eligible=false", explain)
self.assertIn(f"actor_eligibility_reason={reason}", explain)
self.assertIn(f"reason={reason}", explain)


if __name__ == "__main__":
unittest.main()
Loading
Loading