Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 16 additions & 13 deletions docs/streaming_runtime_design.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,24 @@ This document describes runtime internals and current implementation shape. For

## Current Actor Pushdown Boundary

当前只有少数固定形状的 query 会被 `LocalWorkers` 内部的 credit-based hot path 接住
当前只有满足 grouped-aggregate hot path 条件的 query 会被 `LocalWorkers` 内部的 credit-based 路径接住

- 前置变换全部为 `PartitionLocal`
- 最后一个 barrier 是以下其一:
- `groupBy({"window_start", "key"}).sum("value", ...)`
- `groupBy({"window_start", "key"}).count(...)`
- 最后一个 barrier 是 stateful grouped aggregate
- aggregate 函数当前支持:
- `SUM`
- `COUNT(*)`
- `MIN`
- `MAX`
- `AVG`

也就是说,当前 actor runtime 只服务于
也就是说,当前 actor runtime 现在服务于

- `window_start`
- `key`
- `value` 的窗口分组求和热路径
- `COUNT(*)` 的窗口分组计数热路径
- 任意 group key 列组合
- 单聚合与多聚合输出
- `AVG` 通过内部 `sum + count` helper state 收敛

`MIN / MAX / AVG` 以及多 aggregate 输出当前仍走本地执行链;`LocalWorkers` 会明确写出 fallback reason。
如果 query 不满足“最终 transform + stateful grouped aggregate + 支持函数集合”这条边界,`LocalWorkers` 会明确写出 fallback reason。

如果 query 不满足这个形状,`StreamingQuery` 会回退到普通执行链,并把原因写入:

Expand Down Expand Up @@ -155,9 +158,9 @@ actor-stream payload 当前使用 typed binary batch:

## Recommended Next Steps

1. 扩展 credit-based local acceleration 到 `count` 和更多 group aggregate
2. 给 query 级 progress 增加更细粒度的 accelerator 指标拆分
3. 继续把 source 到执行内核的中间 `Table/Row/Value` 转换收紧到更早的列式表示
1. 给 query 级 progress 增加更细粒度的 accelerator 指标拆分
2. 继续把 source 到执行内核的中间 `Table/Row/Value` 转换收紧到更早的列式表示
3. 评估是否要给 grouped-aggregate hot path 增加更细的 payload/schema 预编译缓存


## Source/Sink ABI Bridge (v0.4)
Expand Down
129 changes: 129 additions & 0 deletions python_api/tests/test_streaming_v05.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,108 @@ def test_start_stream_sql_supports_multi_aggregate_and_having_alias(self):
self.assertEqual(float(rows[0]["max_value"]), 10.0)
self.assertAlmostEqual(float(rows[0]["avg_value"]), 22.0 / 3.0, places=5)

def test_start_stream_sql_hits_local_workers_grouped_hot_path(self):
session = velaria.Session()
table = pa.table(
{
"segment": ["alpha", "alpha", "alpha", "beta", "beta"],
"bucket": [1, 1, 2, 1, 1],
"value": [10, 14, 3, 6, 8],
}
)
stream_df = session.create_stream_from_arrow(table)
session.create_temp_view("events_stream_local_workers", stream_df)

with tempfile.TemporaryDirectory(prefix="velaria-py-stream-hot-path-") as tmp:
sink_path = str(pathlib.Path(tmp) / "sink.csv")
session.sql(
"CREATE SINK TABLE sink_local_workers "
"(segment STRING, bucket INT, value_sum INT, event_count INT, "
"min_value INT, max_value INT, avg_value DOUBLE) "
f"USING csv OPTIONS(path '{sink_path}', delimiter ',')"
)
query = session.start_stream_sql(
"INSERT INTO sink_local_workers "
"SELECT segment, bucket, SUM(value) AS value_sum, COUNT(*) AS event_count, "
"MIN(value) AS min_value, MAX(value) AS max_value, AVG(value) AS avg_value "
"FROM events_stream_local_workers GROUP BY segment, bucket",
trigger_interval_ms=0,
execution_mode="local-workers",
local_workers=4,
max_inflight_partitions=4,
)
query.start()
processed = query.await_termination()
progress = query.progress()

self.assertEqual(processed, 1)
self.assertEqual(progress["execution_mode"], "local-workers")
self.assertTrue(progress["actor_eligible"])
self.assertTrue(progress["used_actor_runtime"])
self.assertIn(progress["transport_mode"], ["shared-memory", "rpc-copy"])

with open(sink_path, newline="", encoding="utf-8") as handle:
rows = list(csv.DictReader(handle))

keyed = {(row["segment"], int(float(row["bucket"]))): row for row in rows}
self.assertEqual(len(keyed), 3)
self.assertEqual(float(keyed[("alpha", 1)]["value_sum"]), 24.0)
self.assertEqual(float(keyed[("alpha", 1)]["event_count"]), 2.0)
self.assertEqual(float(keyed[("alpha", 1)]["min_value"]), 10.0)
self.assertEqual(float(keyed[("alpha", 1)]["max_value"]), 14.0)
self.assertAlmostEqual(float(keyed[("alpha", 1)]["avg_value"]), 12.0, places=5)
self.assertEqual(float(keyed[("alpha", 2)]["value_sum"]), 3.0)
self.assertEqual(float(keyed[("beta", 1)]["value_sum"]), 14.0)
self.assertAlmostEqual(float(keyed[("beta", 1)]["avg_value"]), 7.0, places=5)

def test_start_stream_sql_zero_worker_settings_fall_back_to_inproc_local_workers(self):
session = velaria.Session()
table = pa.table(
{
"ts": ["2026-03-29T10:00:00", "2026-03-29T10:00:10"],
"key": ["u1", "u1"],
"value": [1, 2],
}
)
stream_df = session.create_stream_from_arrow(table)
session.create_temp_view("events_stream_zero_workers", stream_df)

with tempfile.TemporaryDirectory(prefix="velaria-py-stream-zero-workers-") as tmp:
sink_path = str(pathlib.Path(tmp) / "sink.csv")
session.sql(
"CREATE SINK TABLE sink_zero_workers "
"(window_start STRING, key STRING, value_sum INT) "
f"USING csv OPTIONS(path '{sink_path}', delimiter ',')"
)
query = session.start_stream_sql(
"INSERT INTO sink_zero_workers "
"SELECT window_start, key, SUM(value) AS value_sum "
"FROM events_stream_zero_workers "
"WINDOW BY ts EVERY 60000 AS window_start "
"GROUP BY window_start, key",
trigger_interval_ms=0,
execution_mode="local-workers",
local_workers=0,
max_inflight_partitions=0,
)
query.start()
processed = query.await_termination()
progress = query.progress()

self.assertEqual(processed, 1)
self.assertEqual(progress["execution_mode"], "local-workers")
self.assertFalse(progress["used_actor_runtime"])
self.assertEqual(progress["transport_mode"], "inproc")
self.assertIn("local_workers > 1", progress["execution_reason"])

with open(sink_path, newline="", encoding="utf-8") as handle:
rows = list(csv.DictReader(handle))

self.assertEqual(len(rows), 1)
self.assertEqual(rows[0]["window_start"], "2026-03-29T10:00:00")
self.assertEqual(rows[0]["key"], "u1")
self.assertEqual(float(rows[0]["value_sum"]), 3.0)

def test_explain_stream_sql_returns_strategy_sections(self):
session = velaria.Session()
table = pa.table(
Expand Down Expand Up @@ -142,6 +244,33 @@ def test_explain_stream_sql_returns_strategy_sections(self):
self.assertIn("selected_mode", explain)
self.assertIn("checkpoint_delivery_mode=best-effort", explain)

def test_explain_stream_sql_reports_local_workers_grouped_hot_path(self):
session = velaria.Session()
table = pa.table(
{
"segment": ["alpha", "alpha", "beta"],
"bucket": [1, 1, 1],
"value": [10, 14, 6],
}
)
stream_df = session.create_stream_from_arrow(table)
session.create_temp_view("events_stream_local_workers_explain", stream_df)

explain = session.explain_stream_sql(
"SELECT segment, bucket, SUM(value) AS value_sum, COUNT(*) AS event_count, "
"MIN(value) AS min_value, MAX(value) AS max_value, AVG(value) AS avg_value "
"FROM events_stream_local_workers_explain "
"GROUP BY segment, bucket",
trigger_interval_ms=0,
execution_mode="local-workers",
local_workers=4,
max_inflight_partitions=4,
)

self.assertIn("Aggregate keys=[segment, bucket]", explain)
self.assertIn("actor_eligible=true", explain)
self.assertIn("selected_mode=local-workers", explain)

def test_explain_stream_sql_reports_having_actor_fallback_reason(self):
session = velaria.Session()
table = pa.table(
Expand Down
Loading
Loading