From fdfa3e0457feba0a67a4bcec745734d06dad7ded Mon Sep 17 00:00:00 2001 From: zuolingxuan Date: Wed, 1 Apr 2026 11:44:52 +0800 Subject: [PATCH] expand streaming aggregate support --- README-zh.md | 4 +- README.md | 4 +- docs/streaming_runtime_design.md | 13 +- python_api/tests/test_streaming_v05.py | 79 +++ src/dataflow/core/execution/stream/stream.cc | 484 ++++++++++++++---- src/dataflow/core/execution/stream/stream.h | 27 +- src/dataflow/core/logical/sql/sql_planner.cc | 192 +++++-- src/dataflow/core/logical/sql/sql_planner.h | 2 +- src/dataflow/tests/sql_regression_test.cc | 106 +++- src/dataflow/tests/stream_runtime_test.cc | 376 ++++++++++++++ .../tests/stream_strategy_explain_test.cc | 90 ++++ 11 files changed, 1197 insertions(+), 180 deletions(-) diff --git a/README-zh.md b/README-zh.md index 31d515c..ed21abe 100644 --- a/README-zh.md +++ b/README-zh.md @@ -169,7 +169,8 @@ examples 与 helper scripts 只用于说明各层,不定义各层。 - 执行模式:`single-process`、`local-workers`、`actor-credit`、`auto` - 文件 source/sink - 基础 streaming operators:`select / filter / withColumn / drop / limit / window` -- stateful `sum` 和 `count` +- stateful streaming 聚合:`sum / count / min / max / avg` +- stream SQL grouped aggregate output:`SUM(col)`、`COUNT(*)`、`MIN(col)`、`MAX(col)`、`AVG(col)` - 最小 stream SQL 子集 - 固定维度 float vector 的本地检索 - Python Arrow 输入/输出 @@ -181,6 +182,7 @@ examples 与 helper scripts 只用于说明各层,不定义各层。 - 把 Python callback 拉进热路径 - Python UDF - 把 actor 并行化扩成任意 plan 的通用机制 +- 超出当前 `window_start,key + SUM(value)` / `COUNT(*)` 热路径之外的 actor acceleration - 宽泛 SQL 扩展,例如完整 `JOIN / CTE / subquery / UNION` - ANN / 独立 vector DB / 分布式 vector 执行 diff --git a/README.md b/README.md index 7b33a06..f1c5a46 100644 --- a/README.md +++ b/README.md @@ -169,7 +169,8 @@ Available today: - execution modes: `single-process`, `local-workers`, `actor-credit`, `auto` - file source/sink support - basic stream operators: `select / filter / withColumn / drop / limit / window` -- stateful `sum` and `count` +- stateful stream aggregates: `sum / count / min / max / avg` +- stream SQL grouped aggregate outputs with `SUM(col)`, `COUNT(*)`, `MIN(col)`, `MAX(col)`, `AVG(col)` - minimal stream SQL subset - local vector search on fixed-dimension float vectors - Python Arrow ingestion and output @@ -181,6 +182,7 @@ Out of scope in the current repo state: - Python callback execution in the hot path - Python UDFs - generic actor parallelization for arbitrary plans +- actor acceleration beyond the current `window_start,key + SUM(value)` / `COUNT(*)` hot paths - broad SQL expansion such as full `JOIN / CTE / subquery / UNION` - ANN / standalone vector DB / distributed vector execution diff --git a/docs/streaming_runtime_design.md b/docs/streaming_runtime_design.md index 9893ee8..1233aa4 100644 --- a/docs/streaming_runtime_design.md +++ b/docs/streaming_runtime_design.md @@ -59,18 +59,21 @@ This document describes runtime internals and current implementation shape. For ## Current Actor Pushdown Boundary -当前只有一类 query 会被 `ActorCredit` / `Auto` 接住: +当前只有两类 query 会被 `ActorCredit` / `Auto` 接住: - 前置变换全部为 `PartitionLocal` -- 最后一个 barrier 是 `groupBy({"window_start", "key"}).sum("value", ...)` +- 最后一个 barrier 是以下其一: + - `groupBy({"window_start", "key"}).sum("value", ...)` + - `groupBy({"window_start", "key"}).count(...)` 也就是说,当前 actor runtime 只服务于: - `window_start` - `key` -- `value` +- `value` 的窗口分组求和热路径 +- `COUNT(*)` 的窗口分组计数热路径 -这条窗口分组求和热路径。 +`MIN / MAX / AVG` 以及多 aggregate 输出当前仍走本地执行链;`Auto` 会明确写出 fallback reason。 如果 query 不满足这个形状,`StreamingQuery` 会回退到普通执行链,并把原因写入: @@ -188,7 +191,7 @@ actor-stream payload 当前使用 typed binary batch: ## Recommended Next Steps -1. 扩展 actor pushdown 到 `count` 和更多 group aggregate。 +1. 扩展 actor pushdown 到更多 group aggregate 与多 aggregate 输出。 2. 调整 query 级 `Auto` 阈值,使其更贴近真实 `StreamingQuery` workload。 3. 给 query 级 progress 增加更细粒度的 actor 指标拆分。 4. 继续把 source 到执行内核的中间 `Table/Row/Value` 转换收紧到更早的列式表示。 diff --git a/python_api/tests/test_streaming_v05.py b/python_api/tests/test_streaming_v05.py index 3cc07c7..52144cb 100644 --- a/python_api/tests/test_streaming_v05.py +++ b/python_api/tests/test_streaming_v05.py @@ -1,3 +1,4 @@ +import csv import json import pathlib import tempfile @@ -63,6 +64,57 @@ def test_stream_progress_contract_with_start_stream_sql(self): snapshot_json = json.dumps(progress) self.assertIn("execution_mode", snapshot_json) + def test_start_stream_sql_supports_multi_aggregate_and_having_alias(self): + session = velaria.Session() + table = pa.table( + { + "ts": [ + "2026-03-29T10:00:00", + "2026-03-29T10:00:10", + "2026-03-29T10:00:20", + "2026-03-29T10:00:30", + ], + "key": ["u1", "u1", "u1", "u2"], + "value": [10, 5, 7, 4], + } + ) + stream_df = session.create_stream_from_arrow(table) + session.create_temp_view("events_stream_multi", stream_df) + + with tempfile.TemporaryDirectory(prefix="velaria-py-stream-multi-") as tmp: + sink_path = str(pathlib.Path(tmp) / "sink.csv") + session.sql( + "CREATE SINK TABLE sink_multi " + "(window_start STRING, key STRING, event_count INT, value_sum INT, " + "min_value INT, max_value INT, avg_value DOUBLE) " + f"USING csv OPTIONS(path '{sink_path}', delimiter ',')" + ) + query = session.start_stream_sql( + "INSERT INTO sink_multi " + "SELECT window_start, key, COUNT(*) AS event_count, SUM(value) AS value_sum, " + "MIN(value) AS min_value, MAX(value) AS max_value, AVG(value) AS avg_value " + "FROM events_stream_multi " + "WINDOW BY ts EVERY 60000 AS window_start " + "GROUP BY window_start, key HAVING avg_value > 6", + trigger_interval_ms=0, + ) + query.start() + processed = query.await_termination() + + self.assertEqual(processed, 1) + + with open(sink_path, newline="", encoding="utf-8") as handle: + rows = list(csv.DictReader(handle)) + + self.assertEqual(len(rows), 1) + self.assertEqual(rows[0]["window_start"], "2026-03-29T10:00:00") + self.assertEqual(rows[0]["key"], "u1") + self.assertEqual(float(rows[0]["event_count"]), 3.0) + self.assertEqual(float(rows[0]["value_sum"]), 22.0) + self.assertEqual(float(rows[0]["min_value"]), 5.0) + self.assertEqual(float(rows[0]["max_value"]), 10.0) + self.assertAlmostEqual(float(rows[0]["avg_value"]), 22.0 / 3.0, places=5) + def test_explain_stream_sql_returns_strategy_sections(self): session = velaria.Session() table = pa.table( @@ -90,6 +142,33 @@ def test_explain_stream_sql_returns_strategy_sections(self): self.assertIn("selected_mode", explain) self.assertIn("checkpoint_delivery_mode=best-effort", explain) + def test_explain_stream_sql_reports_having_actor_fallback_reason(self): + session = velaria.Session() + table = pa.table( + { + "ts": ["2026-03-29T10:00:00", "2026-03-29T10:00:10"], + "key": ["u1", "u1"], + "value": [1, 2], + } + ) + stream_df = session.create_stream_from_arrow(table) + session.create_temp_view("events_stream_having_explain", stream_df) + + explain = session.explain_stream_sql( + "SELECT window_start, key, SUM(value) AS value_sum " + "FROM events_stream_having_explain " + "WINDOW BY ts EVERY 60000 AS window_start " + "GROUP BY window_start, key HAVING value_sum > 0", + trigger_interval_ms=0, + ) + + reason = ( + "actor acceleration requires the aggregate hot path to be the final stream transform" + ) + self.assertIn("actor_eligible=false", explain) + self.assertIn(f"actor_eligibility_reason={reason}", explain) + self.assertIn(f"reason={reason}", explain) + if __name__ == "__main__": unittest.main() diff --git a/src/dataflow/core/execution/stream/stream.cc b/src/dataflow/core/execution/stream/stream.cc index 7601988..39b298b 100644 --- a/src/dataflow/core/execution/stream/stream.cc +++ b/src/dataflow/core/execution/stream/stream.cc @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include @@ -386,7 +387,7 @@ bool queryHasStatefulOps(const std::vector& transforms) { bool queryHasWindowOps(const std::vector& transforms) { return std::any_of(transforms.begin(), transforms.end(), [](const StreamTransform& transform) { return transform.label().find("window") != std::string::npos || - transform.accelerator().kind == StreamAcceleratorKind::WindowKeySum; + transform.accelerator().kind != StreamAcceleratorKind::None; }); } @@ -458,9 +459,268 @@ size_t estimateInitialBatchCost(const std::vector& transforms) return transform_cost + (has_stateful ? 256 : 0); } -bool canRunActorWindowKeySum(const Table& table, const StreamAcceleratorSpec& accelerator) { - return accelerator.kind == StreamAcceleratorKind::WindowKeySum && table.schema.has("window_start") && - table.schema.has("key") && table.schema.has("value"); +std::string stateLabelForAggregate(const StreamAggregateSpec& spec) { + if (!spec.state_label.empty()) return spec.state_label; + std::string prefix = "group_"; + switch (spec.function) { + case AggregateFunction::Sum: + prefix += "sum"; + break; + case AggregateFunction::Count: + prefix += "count"; + break; + case AggregateFunction::Avg: + prefix += "avg"; + break; + case AggregateFunction::Min: + prefix += "min"; + break; + case AggregateFunction::Max: + prefix += "max"; + break; + } + return prefix + ":" + spec.output_column; +} + +std::string serializeStateValue(const Value& value) { + switch (value.type()) { + case DataType::Nil: + return "n:"; + case DataType::Int64: + return "i:" + std::to_string(value.asInt64()); + case DataType::Double: + return "d:" + value.toString(); + case DataType::String: + return "s:" + value.toString(); + case DataType::FixedVector: + return "v:" + value.toString(); + } + return "n:"; +} + +bool parseStateValue(const std::string& raw, Value* out) { + if (out == nullptr || raw.size() < 2 || raw[1] != ':') return false; + const std::string payload = raw.substr(2); + try { + switch (raw[0]) { + case 'n': + *out = Value(); + return true; + case 'i': + *out = Value(static_cast(std::stoll(payload))); + return true; + case 'd': + *out = Value(std::stod(payload)); + return true; + case 's': + *out = Value(payload); + return true; + case 'v': + *out = Value(Value::parseFixedVector(payload)); + return true; + default: + break; + } + } catch (...) { + } + return false; +} + +std::string serializeAverageState(double sum, int64_t count) { + std::ostringstream out; + out << std::fixed << std::setprecision(6) << sum << "|" << count; + return out.str(); +} + +bool parseAverageState(const std::string& raw, double* sum, int64_t* count) { + if (sum == nullptr || count == nullptr) return false; + const auto sep = raw.find('|'); + if (sep == std::string::npos) return false; + try { + *sum = std::stod(raw.substr(0, sep)); + *count = static_cast(std::stoll(raw.substr(sep + 1))); + return true; + } catch (...) { + return false; + } +} + +struct StreamAggregateRuntimeSpec { + StreamAggregateSpec user_spec; + std::optional batch_value_output; + std::optional batch_count_output; +}; + +std::vector buildBatchAggregateSpecs( + const Table& input, const std::vector& specs, + std::vector* runtime_specs) { + if (runtime_specs == nullptr) { + throw std::invalid_argument("runtime_specs cannot be null"); + } + runtime_specs->clear(); + + std::vector batch_specs; + batch_specs.reserve(specs.size() * 2); + for (size_t i = 0; i < specs.size(); ++i) { + const auto& spec = specs[i]; + StreamAggregateRuntimeSpec runtime; + runtime.user_spec = spec; + switch (spec.function) { + case AggregateFunction::Sum: + case AggregateFunction::Min: + case AggregateFunction::Max: { + const auto helper_name = "__agg_" + spec.output_column; + runtime.batch_value_output = helper_name; + batch_specs.push_back( + AggregateSpec{spec.function, input.schema.indexOf(spec.value_column), helper_name}); + break; + } + case AggregateFunction::Count: { + const auto helper_name = "__agg_" + spec.output_column; + runtime.batch_count_output = helper_name; + batch_specs.push_back( + AggregateSpec{AggregateFunction::Count, static_cast(-1), helper_name}); + break; + } + case AggregateFunction::Avg: { + const auto sum_name = "__agg_sum_" + spec.output_column; + const auto count_name = "__agg_count_" + spec.output_column; + runtime.batch_value_output = sum_name; + runtime.batch_count_output = count_name; + const auto value_index = input.schema.indexOf(spec.value_column); + batch_specs.push_back(AggregateSpec{AggregateFunction::Sum, value_index, sum_name}); + batch_specs.push_back( + AggregateSpec{AggregateFunction::Count, static_cast(-1), count_name}); + break; + } + } + runtime_specs->push_back(std::move(runtime)); + } + return batch_specs; +} + +Table applyStatefulGroupedAggregates(const Table& input, const std::vector& keys, + const std::vector& specs, + std::shared_ptr state_store, + const StreamingQueryOptions& options) { + std::vector key_indices; + key_indices.reserve(keys.size()); + for (const auto& key : keys) { + key_indices.push_back(input.schema.indexOf(key)); + } + + std::vector runtime_specs; + const auto batch_specs = buildBatchAggregateSpecs(input, specs, &runtime_specs); + Table batch_out = DataFrame(input).aggregate(key_indices, batch_specs).toTable(); + + Table out; + out.schema.fields = keys; + for (const auto& spec : specs) { + out.schema.fields.push_back(spec.output_column); + } + for (size_t i = 0; i < out.schema.fields.size(); ++i) { + out.schema.index[out.schema.fields[i]] = i; + } + + size_t window_pos = 0; + const bool has_window = findWindowKeyPosition(keys, &window_pos); + std::unordered_set touched_labels; + const size_t key_cols = keys.size(); + for (const auto& batch_row : batch_out.rows) { + Row out_row; + out_row.reserve(key_cols + specs.size()); + for (size_t i = 0; i < key_cols; ++i) { + out_row.push_back(i < batch_row.size() ? batch_row[i] : Value()); + } + + std::vector state_key_indices; + state_key_indices.reserve(key_cols); + for (size_t i = 0; i < key_cols; ++i) state_key_indices.push_back(i); + + for (size_t i = 0; i < runtime_specs.size(); ++i) { + const auto& runtime = runtime_specs[i]; + const auto label = stateLabelForAggregate(runtime.user_spec); + touched_labels.insert(label); + const auto state_key = makeStateKey(out_row, state_key_indices, label + ":"); + + if (runtime.user_spec.function == AggregateFunction::Sum) { + const double delta = batch_row[batch_out.schema.indexOf(*runtime.batch_value_output)].asDouble(); + out_row.push_back(Value(state_store->addDouble(state_key, delta))); + } else if (runtime.user_spec.function == AggregateFunction::Count) { + const double delta = + static_cast(batch_row[batch_out.schema.indexOf(*runtime.batch_count_output)].asInt64()); + out_row.push_back(Value(static_cast(state_store->addDouble(state_key, delta)))); + } else if (runtime.user_spec.function == AggregateFunction::Avg) { + const double delta_sum = + batch_row[batch_out.schema.indexOf(*runtime.batch_value_output)].asDouble(); + const int64_t delta_count = + batch_row[batch_out.schema.indexOf(*runtime.batch_count_output)].asInt64(); + std::string raw; + double current_sum = 0.0; + int64_t current_count = 0; + if (state_store->get(state_key, &raw)) { + parseAverageState(raw, ¤t_sum, ¤t_count); + } + current_sum += delta_sum; + current_count += delta_count; + state_store->put(state_key, serializeAverageState(current_sum, current_count)); + out_row.push_back( + Value(current_count == 0 ? 0.0 : current_sum / static_cast(current_count))); + } else { + const Value batch_value = batch_row[batch_out.schema.indexOf(*runtime.batch_value_output)]; + std::string raw; + Value current_value; + const bool has_current = state_store->get(state_key, &raw) && parseStateValue(raw, ¤t_value); + bool replace = !has_current; + if (runtime.user_spec.function == AggregateFunction::Min) { + replace = replace || batch_value < current_value; + } else { + replace = replace || batch_value > current_value; + } + if (replace) { + current_value = batch_value; + state_store->put(state_key, serializeStateValue(current_value)); + } + out_row.push_back(current_value); + } + + if (has_window && window_pos < out_row.size()) { + registerWindowState(state_store.get(), label, out_row[window_pos].toString(), state_key); + } + } + out.rows.push_back(std::move(out_row)); + } + + for (const auto& label : touched_labels) { + evictExpiredWindows(state_store.get(), label, options); + } + return out; +} + +bool canRunActorWindowKeyAggregate(const Table& table, const StreamAcceleratorSpec& accelerator) { + if (!table.schema.has("window_start") || !table.schema.has("key")) { + return false; + } + if (accelerator.kind == StreamAcceleratorKind::WindowKeySum) { + return table.schema.has("value"); + } + return accelerator.kind == StreamAcceleratorKind::WindowKeyCount; +} + +Table buildActorAggregateInput(const Table& table, const StreamAcceleratorSpec& accelerator) { + if (accelerator.kind != StreamAcceleratorKind::WindowKeyCount) { + return table; + } + + Table out(Schema({"window_start", "key", "value"}), {}); + const auto window_idx = table.schema.indexOf("window_start"); + const auto key_idx = table.schema.indexOf("key"); + out.rows.reserve(table.rows.size()); + for (const auto& row : table.rows) { + out.rows.push_back( + {row[window_idx], row[key_idx], Value(static_cast(1))}); + } + return out; } void renameColumn(Table* table, const std::string& from, const std::string& to) { @@ -471,12 +731,21 @@ void renameColumn(Table* table, const std::string& from, const std::string& to) table->schema.index[to] = idx; } -Table finalizeActorWindowKeySumOutput(Table aggregated, const StreamAcceleratorSpec& accelerator, - std::shared_ptr state, - const StreamingQueryOptions& options, uint64_t* state_ms) { +Table finalizeActorWindowKeyAggregateOutput(Table aggregated, const StreamAcceleratorSpec& accelerator, + std::shared_ptr state, + const StreamingQueryOptions& options, + uint64_t* state_ms) { auto started = std::chrono::steady_clock::now(); - renameColumn(&aggregated, "value_sum", accelerator.output_column.empty() ? "value_sum" - : accelerator.output_column); + const auto output_name = accelerator.aggregate.output_column.empty() + ? "value_sum" + : accelerator.aggregate.output_column; + renameColumn(&aggregated, "value_sum", output_name); + const auto value_idx = aggregated.schema.indexOf(output_name); + if (accelerator.kind == StreamAcceleratorKind::WindowKeyCount) { + for (auto& row : aggregated.rows) { + row[value_idx] = Value(static_cast(row[value_idx].asDouble())); + } + } if (!accelerator.stateful) { if (state_ms != nullptr) *state_ms = 0; return aggregated; @@ -485,15 +754,19 @@ Table finalizeActorWindowKeySumOutput(Table aggregated, const StreamAcceleratorS auto state_store = state ? state : makeMemoryStateStore(); const size_t window_idx = aggregated.schema.indexOf("window_start"); const size_t key_idx = aggregated.schema.indexOf("key"); - const size_t sum_idx = - aggregated.schema.indexOf(accelerator.output_column.empty() ? "value_sum" : accelerator.output_column); + const auto label = stateLabelForAggregate(accelerator.aggregate); for (auto& row : aggregated.rows) { - const auto state_key = makeStateKey(row, {window_idx, key_idx}, "group_sum:"); - const double delta = row[sum_idx].asDouble(); - row[sum_idx] = Value(state_store->addDouble(state_key, delta)); - registerWindowState(state_store.get(), "group_sum", row[window_idx].toString(), state_key); + const auto state_key = makeStateKey(row, {window_idx, key_idx}, label + ":"); + const double delta = row[value_idx].asDouble(); + const double next = state_store->addDouble(state_key, delta); + if (accelerator.kind == StreamAcceleratorKind::WindowKeyCount) { + row[value_idx] = Value(static_cast(next)); + } else { + row[value_idx] = Value(next); + } + registerWindowState(state_store.get(), label, row[window_idx].toString(), state_key); } - evictExpiredWindows(state_store.get(), "group_sum", options); + evictExpiredWindows(state_store.get(), label, options); if (state_ms != nullptr) { *state_ms = toMillis(std::chrono::steady_clock::now() - started); } @@ -1002,7 +1275,7 @@ std::shared_ptr makeStateStore( MemoryStreamSource::MemoryStreamSource(std::vector batches) : batches_(std::move(batches)) {} -void MemoryStreamSource::open(const StreamSourceContext&) {} +void MemoryStreamSource::open(const StreamSourceContext&) { index_ = 0; } bool MemoryStreamSource::nextBatch(const StreamPullContext&, Table& batch) { if (index_ >= batches_.size()) return false; @@ -1024,7 +1297,10 @@ bool MemoryStreamSource::restoreOffsetToken(const std::string& token) { DirectoryCsvStreamSource::DirectoryCsvStreamSource(std::string directory, char delimiter) : directory_(std::move(directory)), delimiter_(delimiter) {} -void DirectoryCsvStreamSource::open(const StreamSourceContext&) {} +void DirectoryCsvStreamSource::open(const StreamSourceContext&) { + resume_after_.clear(); + last_processed_.clear(); +} bool DirectoryCsvStreamSource::nextBatch(const StreamPullContext&, Table& batch) { namespace fs = std::filesystem; @@ -1404,110 +1680,92 @@ GroupedStreamingDataFrame::GroupedStreamingDataFrame(std::shared_ptr& specs, bool stateful) const { auto t = transforms_; StreamAcceleratorSpec accelerator; - if (keys_ == std::vector{"window_start", "key"} && valueColumn == "value") { - accelerator.kind = StreamAcceleratorKind::WindowKeySum; - accelerator.stateful = stateful; - accelerator.output_column = outputColumn; - } - if (!stateful) { - t.emplace_back( - [keys = keys_, valueColumn, outputColumn](const Table& input, - const StreamingQueryOptions&) { - auto grouped = DataFrame(input).groupBy(keys).sum(valueColumn, outputColumn); - return grouped.toTable(); - }, - StreamTransformMode::GlobalBarrier, false, "group_sum", accelerator); - return StreamingDataFrame(source_, std::move(t), nullptr); + if (specs.size() == 1 && keys_ == std::vector{"window_start", "key"}) { + const auto& spec = specs.front(); + if (spec.function == AggregateFunction::Sum && spec.value_column == "value") { + accelerator.kind = StreamAcceleratorKind::WindowKeySum; + accelerator.stateful = stateful; + accelerator.aggregate = spec; + } else if (spec.function == AggregateFunction::Count && spec.is_count_star) { + accelerator.kind = StreamAcceleratorKind::WindowKeyCount; + accelerator.stateful = stateful; + accelerator.aggregate = spec; + } } - auto state = state_ ? state_ : makeMemoryStateStore(); - auto stateStore = state; - t.emplace_back( - [keys = keys_, valueColumn, outputColumn, stateStore](const Table& input, - const StreamingQueryOptions& options) { - auto grouped = DataFrame(input).groupBy(keys).sum(valueColumn, outputColumn); - Table out = grouped.toTable(); - - const size_t keyCols = keys.size(); - const size_t sumIndex = out.schema.indexOf(outputColumn); - size_t window_pos = 0; - const bool has_window = findWindowKeyPosition(keys, &window_pos); - for (auto& row : out.rows) { - std::vector idx; - idx.reserve(keyCols); - for (size_t k = 0; k < keyCols && k < row.size(); ++k) idx.push_back(k); - const auto stateKey = makeStateKey(row, idx, "group_sum:"); - const double delta = sumIndex < row.size() ? row[sumIndex].asDouble() : 0.0; - row[sumIndex] = Value(stateStore->addDouble(stateKey, delta)); - if (has_window && window_pos < row.size()) { - registerWindowState(stateStore.get(), "group_sum", row[window_pos].toString(), stateKey); - } - } - evictExpiredWindows(stateStore.get(), "group_sum", options); - return out; - }, - StreamTransformMode::GlobalBarrier, true, "stateful_group_sum", accelerator); - - return StreamingDataFrame(source_, std::move(t), state); -} - -StreamingDataFrame GroupedStreamingDataFrame::count(bool stateful, - const std::string& outputColumn) const { - auto t = transforms_; if (!stateful) { t.emplace_back( - [keys = keys_, outputColumn](const Table& input, const StreamingQueryOptions&) { + [keys = keys_, specs](const Table& input, const StreamingQueryOptions&) { std::vector key_indices; key_indices.reserve(keys.size()); for (const auto& key : keys) { key_indices.push_back(input.schema.indexOf(key)); } - AggregateSpec spec{AggregateFunction::Count, 0, outputColumn}; - return DataFrame(input).aggregate(key_indices, {spec}).toTable(); + std::vector batch_specs; + batch_specs.reserve(specs.size()); + for (const auto& spec : specs) { + batch_specs.push_back(AggregateSpec{ + spec.function, + spec.is_count_star ? static_cast(-1) : input.schema.indexOf(spec.value_column), + spec.output_column, + }); + } + return DataFrame(input).aggregate(key_indices, batch_specs).toTable(); }, - StreamTransformMode::GlobalBarrier, false, "group_count"); + StreamTransformMode::GlobalBarrier, false, "group_aggregate", accelerator); return StreamingDataFrame(source_, std::move(t), nullptr); } auto state = state_ ? state_ : makeMemoryStateStore(); auto stateStore = state; t.emplace_back( - [keys = keys_, outputColumn, stateStore](const Table& input, - const StreamingQueryOptions& options) { - std::vector key_indices; - key_indices.reserve(keys.size()); - for (const auto& key : keys) { - key_indices.push_back(input.schema.indexOf(key)); - } - AggregateSpec spec{AggregateFunction::Count, 0, outputColumn}; - Table out = DataFrame(input).aggregate(key_indices, {spec}).toTable(); - const size_t keyCols = keys.size(); - const size_t countIndex = out.schema.indexOf(outputColumn); - size_t window_pos = 0; - const bool has_window = findWindowKeyPosition(keys, &window_pos); - for (auto& row : out.rows) { - std::vector idx; - idx.reserve(keyCols); - for (size_t k = 0; k < keyCols && k < row.size(); ++k) idx.push_back(k); - const auto stateKey = makeStateKey(row, idx, "group_count:"); - const double delta = - countIndex < row.size() ? static_cast(row[countIndex].asInt64()) : 0.0; - row[countIndex] = Value(static_cast(stateStore->addDouble(stateKey, delta))); - if (has_window && window_pos < row.size()) { - registerWindowState(stateStore.get(), "group_count", row[window_pos].toString(), stateKey); - } - } - evictExpiredWindows(stateStore.get(), "group_count", options); - return out; + [keys = keys_, specs, stateStore](const Table& input, const StreamingQueryOptions& options) { + return applyStatefulGroupedAggregates(input, keys, specs, stateStore, options); }, - StreamTransformMode::GlobalBarrier, true, "stateful_group_count"); + StreamTransformMode::GlobalBarrier, true, "stateful_group_aggregate", accelerator); + return StreamingDataFrame(source_, std::move(t), state); } +StreamingDataFrame GroupedStreamingDataFrame::sum(const std::string& valueColumn, bool stateful, + const std::string& outputColumn) const { + return aggregate({StreamAggregateSpec{ + AggregateFunction::Sum, valueColumn, outputColumn, false, "group_sum:" + outputColumn}}, + stateful); +} + +StreamingDataFrame GroupedStreamingDataFrame::count(bool stateful, + const std::string& outputColumn) const { + return aggregate({StreamAggregateSpec{ + AggregateFunction::Count, "", outputColumn, true, "group_count:" + outputColumn}}, + stateful); +} + +StreamingDataFrame GroupedStreamingDataFrame::min(const std::string& valueColumn, bool stateful, + const std::string& outputColumn) const { + return aggregate({StreamAggregateSpec{ + AggregateFunction::Min, valueColumn, outputColumn, false, "group_min:" + outputColumn}}, + stateful); +} + +StreamingDataFrame GroupedStreamingDataFrame::max(const std::string& valueColumn, bool stateful, + const std::string& outputColumn) const { + return aggregate({StreamAggregateSpec{ + AggregateFunction::Max, valueColumn, outputColumn, false, "group_max:" + outputColumn}}, + stateful); +} + +StreamingDataFrame GroupedStreamingDataFrame::avg(const std::string& valueColumn, bool stateful, + const std::string& outputColumn) const { + return aggregate({StreamAggregateSpec{ + AggregateFunction::Avg, valueColumn, outputColumn, false, "group_avg:" + outputColumn}}, + stateful); +} + // ---------- Query ---------- StreamingQuery::StreamingQuery(StreamingDataFrame root, std::shared_ptr sink, @@ -1760,15 +2018,17 @@ size_t StreamingQuery::awaitTermination(size_t maxBatches) { options_.execution_mode == StreamingExecutionMode::ActorCredit || options_.execution_mode == StreamingExecutionMode::Auto; if (wants_actor_mode && actor_pipeline_supported) { - Table actor_input = - executePartitionStage(envelope.table, actor_prefix_transforms, options_, 1); - if (canRunActorWindowKeySum(actor_input, actor_accelerator)) { + Table actor_input = buildActorAggregateInput( + executePartitionStage(envelope.table, actor_prefix_transforms, options_, 1), + actor_accelerator); + if (canRunActorWindowKeyAggregate(actor_input, actor_accelerator)) { const auto actor_options = makeActorOptions(options_); if (options_.execution_mode == StreamingExecutionMode::ActorCredit) { auto actor_result = runLocalActorStreamWindowKeySum(std::vector
{actor_input}, actor_options); - out = finalizeActorWindowKeySumOutput(std::move(actor_result.final_table), actor_accelerator, - root_.state_, options_, &state_ms); + out = finalizeActorWindowKeyAggregateOutput(std::move(actor_result.final_table), + actor_accelerator, root_.state_, options_, + &state_ms); partitions_used = std::max(1, actor_result.processed_partitions); used_actor_runtime = true; execution_decided_ = true; @@ -1808,9 +2068,9 @@ size_t StreamingQuery::awaitTermination(size_t maxBatches) { strategy_decision_.compute_to_overhead_ratio = decision.compute_to_overhead_ratio; if (resolved_execution_mode_ == StreamingExecutionMode::ActorCredit) { - out = finalizeActorWindowKeySumOutput(std::move(actor_result.final_table), - actor_accelerator, root_.state_, options_, - &state_ms); + out = finalizeActorWindowKeyAggregateOutput(std::move(actor_result.final_table), + actor_accelerator, root_.state_, options_, + &state_ms); partitions_used = std::max(1, actor_result.processed_partitions); used_actor_runtime = true; strategy_decision_.transport_mode = actor_result.used_shared_memory @@ -1830,8 +2090,9 @@ size_t StreamingQuery::awaitTermination(size_t maxBatches) { } else if (resolved_execution_mode_ == StreamingExecutionMode::ActorCredit) { auto actor_result = runLocalActorStreamWindowKeySum(std::vector
{actor_input}, actor_options); - out = finalizeActorWindowKeySumOutput(std::move(actor_result.final_table), actor_accelerator, - root_.state_, options_, &state_ms); + out = finalizeActorWindowKeyAggregateOutput(std::move(actor_result.final_table), + actor_accelerator, root_.state_, options_, + &state_ms); partitions_used = std::max(1, actor_result.processed_partitions); used_actor_runtime = true; strategy_decision_.used_actor_runtime = true; @@ -1846,7 +2107,8 @@ size_t StreamingQuery::awaitTermination(size_t maxBatches) { } else if (options_.execution_mode == StreamingExecutionMode::Auto && !execution_decided_) { execution_decided_ = true; resolved_execution_mode_ = StreamingExecutionMode::SingleProcess; - execution_reason_ = "actor path requires window_start/key/value schema after partition-local transforms"; + execution_reason_ = + "actor path requires window_start/key with supported SUM(value) or COUNT(*) shape"; strategy_decision_.resolved_execution_mode = streamingExecutionModeName(resolved_execution_mode_); strategy_decision_.reason = execution_reason_; diff --git a/src/dataflow/core/execution/stream/stream.h b/src/dataflow/core/execution/stream/stream.h index dd74606..bfa833b 100644 --- a/src/dataflow/core/execution/stream/stream.h +++ b/src/dataflow/core/execution/stream/stream.h @@ -11,11 +11,16 @@ #include #include +#include "src/dataflow/core/logical/planner/plan.h" #include "src/dataflow/core/execution/table.h" #include "src/dataflow/core/contract/source_sink_abi.h" namespace dataflow { +namespace sql { +class SqlPlanner; +} + using StreamId = std::uint64_t; class StreamingDataFrame; @@ -390,12 +395,20 @@ class MemoryStreamSink : public StreamSink { enum class StreamTransformMode { PartitionLocal, GlobalBarrier }; -enum class StreamAcceleratorKind { None, WindowKeySum }; +enum class StreamAcceleratorKind { None, WindowKeySum, WindowKeyCount }; + +struct StreamAggregateSpec { + AggregateFunction function = AggregateFunction::Sum; + std::string value_column; + std::string output_column; + bool is_count_star = false; + std::string state_label; +}; struct StreamAcceleratorSpec { StreamAcceleratorKind kind = StreamAcceleratorKind::None; bool stateful = false; - std::string output_column; + StreamAggregateSpec aggregate; }; class RuntimeSourceAdapter : public StreamSource { @@ -508,8 +521,18 @@ class GroupedStreamingDataFrame { const std::string& outputColumn = "sum") const; StreamingDataFrame count(bool stateful = false, const std::string& outputColumn = "count") const; + StreamingDataFrame min(const std::string& valueColumn, bool stateful = false, + const std::string& outputColumn = "min") const; + StreamingDataFrame max(const std::string& valueColumn, bool stateful = false, + const std::string& outputColumn = "max") const; + StreamingDataFrame avg(const std::string& valueColumn, bool stateful = false, + const std::string& outputColumn = "avg") const; private: + friend class sql::SqlPlanner; + StreamingDataFrame aggregate(const std::vector& specs, + bool stateful) const; + std::shared_ptr source_; std::vector transforms_; std::vector keys_; diff --git a/src/dataflow/core/logical/sql/sql_planner.cc b/src/dataflow/core/logical/sql/sql_planner.cc index 0b10802..0a60fa1 100644 --- a/src/dataflow/core/logical/sql/sql_planner.cc +++ b/src/dataflow/core/logical/sql/sql_planner.cc @@ -140,37 +140,70 @@ std::string aggregateOutputName(const SelectItem& item) { if (!item.is_aggregate) { throw SQLSemanticError("expected aggregate select item"); } - if (!item.alias.empty()) return item.alias; - switch (item.aggregate.function) { - case AggregateFunctionKind::Sum: - return "sum"; - case AggregateFunctionKind::Count: - return "count"; - case AggregateFunctionKind::Avg: - return "avg"; - case AggregateFunctionKind::Min: - return "min"; - case AggregateFunctionKind::Max: - return "max"; - } - return "agg"; + return defaultAggregateAlias(item.aggregate.function, item.aggregate.argument.name, item.alias); } void validateStreamAggregate(const AggregateExpr& aggregate) { switch (aggregate.function) { case AggregateFunctionKind::Sum: - return; case AggregateFunctionKind::Count: - if (!aggregate.count_all) { - throw SQLSemanticError("stream SQL only supports COUNT(*)"); - } - return; case AggregateFunctionKind::Avg: case AggregateFunctionKind::Min: case AggregateFunctionKind::Max: - break; + return; + } + throw SQLSemanticError("unsupported stream SQL aggregate"); +} + +std::string streamAggregateStateLabel(const StreamAggregateSpec& aggregate) { + switch (aggregate.function) { + case AggregateFunction::Sum: + return "group_sum:" + aggregate.output_column; + case AggregateFunction::Count: + return "group_count:" + aggregate.output_column; + case AggregateFunction::Avg: + return "group_avg:" + aggregate.output_column; + case AggregateFunction::Min: + return "group_min:" + aggregate.output_column; + case AggregateFunction::Max: + return "group_max:" + aggregate.output_column; } - throw SQLSemanticError("stream SQL aggregate only supports SUM and COUNT(*)"); + return "group_aggregate:" + aggregate.output_column; +} + +StreamAggregateSpec toStreamAggregateSpec(const SelectItem& item, const FromItem& from) { + if (!item.is_aggregate) { + throw SQLSemanticError("expected aggregate select item"); + } + validateStreamAggregate(item.aggregate); + + StreamAggregateSpec spec; + spec.function = toPlanFunction(item.aggregate.function); + spec.output_column = aggregateOutputName(item); + spec.is_count_star = item.aggregate.function == AggregateFunctionKind::Count; + if (!spec.is_count_star) { + spec.value_column = resolveStreamColumnName(item.aggregate.argument, from); + } else if (!item.aggregate.count_all) { + throw SQLSemanticError("stream SQL only supports COUNT(*)"); + } + spec.state_label = streamAggregateStateLabel(spec); + return spec; +} + +std::string streamAggregateExplain(const StreamAggregateSpec& aggregate) { + switch (aggregate.function) { + case AggregateFunction::Sum: + return "SUM(" + aggregate.value_column + ") AS " + aggregate.output_column; + case AggregateFunction::Count: + return "COUNT(*) AS " + aggregate.output_column; + case AggregateFunction::Avg: + return "AVG(" + aggregate.value_column + ") AS " + aggregate.output_column; + case AggregateFunction::Min: + return "MIN(" + aggregate.value_column + ") AS " + aggregate.output_column; + case AggregateFunction::Max: + return "MAX(" + aggregate.value_column + ") AS " + aggregate.output_column; + } + return aggregate.output_column; } struct RelationView { @@ -325,6 +358,17 @@ bool streamPlanIsPartitionLocal(const StreamPlanNode& node) { node.kind == StreamPlanNodeKind::WindowAssign; } +bool isActorHotPathAggregate(const StreamPlanNode& node) { + if (node.kind != StreamPlanNodeKind::Aggregate || !node.stateful || + node.group_keys != std::vector{"window_start", "key"} || + node.aggregates.size() != 1) { + return false; + } + const auto& aggregate = node.aggregates.front(); + return (aggregate.function == AggregateFunction::Sum && aggregate.value_column == "value") || + (aggregate.function == AggregateFunction::Count && aggregate.is_count_star); +} + } // namespace LogicalPlan SqlPlanner::buildLogicalPlan(const SqlQuery& query, const ViewCatalog& catalog) const { @@ -736,17 +780,20 @@ StreamLogicalPlan SqlPlanner::buildStreamLogicalPlan(const SqlQuery& query) cons group_keys.push_back(resolveStreamColumnName(key, query.from)); } - std::optional aggregate_item; + std::vector aggregates; for (const auto& item : query.select_items) { if (item.is_all || item.is_table_all) { throw SQLSemanticError("stream SQL aggregate query does not support star projection"); } if (item.is_aggregate) { - if (aggregate_item.has_value()) { - throw SQLSemanticError("stream SQL aggregate query supports only one aggregate output"); + const auto aggregate = toStreamAggregateSpec(item, query.from); + if (std::any_of(aggregates.begin(), aggregates.end(), + [&](const StreamAggregateSpec& spec) { + return spec.output_column == aggregate.output_column; + })) { + throw SQLSemanticError("duplicate aggregate alias: " + aggregate.output_column); } - validateStreamAggregate(item.aggregate); - aggregate_item = item; + aggregates.push_back(aggregate); continue; } const auto column = resolveStreamColumnName(item.column, query.from); @@ -754,29 +801,61 @@ StreamLogicalPlan SqlPlanner::buildStreamLogicalPlan(const SqlQuery& query) cons throw SQLSemanticError("non-aggregate field must appear in GROUP BY: " + column); } } - if (!aggregate_item.has_value()) { - throw SQLSemanticError("stream SQL aggregate query requires one aggregate output"); + if (aggregates.empty()) { + throw SQLSemanticError("stream SQL aggregate query requires at least one aggregate output"); } StreamPlanNode aggregate; aggregate.kind = StreamPlanNodeKind::Aggregate; aggregate.group_keys = group_keys; - aggregate.output_column = aggregateOutputName(*aggregate_item); aggregate.stateful = true; - aggregate.is_count_star = - aggregate_item->aggregate.function == AggregateFunctionKind::Count; - if (!aggregate.is_count_star) { - aggregate.value_column = - resolveStreamColumnName(aggregate_item->aggregate.argument, query.from); - } + aggregate.aggregates = aggregates; logical.nodes.push_back(aggregate); if (query.having.has_value()) { const auto& predicate = *query.having; StreamPlanNode filter; filter.kind = StreamPlanNodeKind::Filter; - filter.column = predicate.lhs_is_aggregate ? aggregate.output_column - : resolveStreamColumnName(predicate.lhs, query.from); + if (predicate.lhs_is_aggregate) { + const auto target_output = + defaultAggregateAlias(predicate.lhs_aggregate.function, + predicate.lhs_aggregate.argument.name, ""); + auto it = std::find_if(aggregates.begin(), aggregates.end(), + [&](const StreamAggregateSpec& spec) { + if (spec.function != toPlanFunction(predicate.lhs_aggregate.function)) { + return false; + } + if (spec.is_count_star != predicate.lhs_aggregate.count_all) { + return false; + } + if (!spec.is_count_star && + spec.value_column != + resolveStreamColumnName(predicate.lhs_aggregate.argument, query.from)) { + return false; + } + return true; + }); + if (it == aggregates.end()) { + auto alias_match = std::find_if(aggregates.begin(), aggregates.end(), + [&](const StreamAggregateSpec& spec) { + return spec.output_column == target_output; + }); + if (alias_match == aggregates.end()) { + throw SQLSemanticError("HAVING aggregate not found in SELECT: " + target_output); + } + filter.column = alias_match->output_column; + } else { + filter.column = it->output_column; + } + } else { + auto alias_match = std::find_if(aggregates.begin(), aggregates.end(), + [&](const StreamAggregateSpec& spec) { + return spec.output_column == predicate.lhs.name; + }); + filter.column = alias_match != aggregates.end() + ? alias_match->output_column + : resolveStreamColumnName(predicate.lhs, query.from); + } filter.op = opToString(predicate.op); filter.value = predicate.rhs; logical.nodes.push_back(filter); @@ -786,13 +865,15 @@ StreamLogicalPlan SqlPlanner::buildStreamLogicalPlan(const SqlQuery& query) cons project.kind = StreamPlanNodeKind::Project; for (const auto& item : query.select_items) { if (item.is_aggregate) { - project.columns.push_back(aggregate.output_column); + project.columns.push_back(aggregateOutputName(item)); } else { project.columns.push_back(resolveStreamColumnName(item.column, query.from)); } } std::vector natural_columns = group_keys; - natural_columns.push_back(aggregate.output_column); + for (const auto& agg : aggregates) { + natural_columns.push_back(agg.output_column); + } if (project.columns != natural_columns) { logical.nodes.push_back(project); } @@ -876,15 +957,23 @@ StreamPhysicalPlan SqlPlanner::buildStreamPhysicalPlan(const StreamLogicalPlan& } } - for (const auto& node : logical.nodes) { - if (node.kind == StreamPlanNodeKind::Aggregate && node.stateful && - node.group_keys == std::vector{"window_start", "key"} && - !node.is_count_star && node.value_column == "value") { - physical.actor_eligible = true; + for (std::size_t i = 0; i < logical.nodes.size(); ++i) { + const auto& node = logical.nodes[i]; + if (!isActorHotPathAggregate(node)) { + continue; + } + if (i + 1 != logical.nodes.size()) { physical.actor_eligibility_reason = - "window_start/key/value aggregate shape is eligible for actor acceleration"; + "actor acceleration requires the aggregate hot path to be the final stream transform"; return physical; } + const auto& aggregate = node.aggregates.front(); + physical.actor_eligible = true; + physical.actor_eligibility_reason = + aggregate.function == AggregateFunction::Sum + ? "window_start/key SUM(value) shape is eligible for actor acceleration" + : "window_start/key COUNT(*) shape is eligible for actor acceleration"; + return physical; } physical.actor_eligibility_reason = "query plan is not eligible for actor acceleration"; @@ -912,8 +1001,12 @@ std::string SqlPlanner::explainStreamLogicalPlan(const StreamLogicalPlan& logica << " output=" << node.output_column; } else if (node.kind == StreamPlanNodeKind::Aggregate) { out << " keys=[" << joinStrings(node.group_keys, ", ") << "]" - << " output=" << node.output_column - << " agg=" << (node.is_count_star ? "COUNT(*)" : "SUM(" + node.value_column + ")") + << " aggs=["; + for (size_t j = 0; j < node.aggregates.size(); ++j) { + if (j > 0) out << ", "; + out << streamAggregateExplain(node.aggregates[j]); + } + out << "]" << " stateful=" << (node.stateful ? "true" : "false"); } else if (node.kind == StreamPlanNodeKind::Sink) { out << " sink=" << node.sink_name; @@ -971,10 +1064,7 @@ StreamingDataFrame SqlPlanner::materializeStreamFromPhysical( current = current.window(node.column, node.window_ms, node.output_column); break; case StreamPlanNodeKind::Aggregate: - current = node.is_count_star - ? current.groupBy(node.group_keys).count(node.stateful, node.output_column) - : current.groupBy(node.group_keys) - .sum(node.value_column, node.stateful, node.output_column); + current = current.groupBy(node.group_keys).aggregate(node.aggregates, node.stateful); break; case StreamPlanNodeKind::Sink: break; diff --git a/src/dataflow/core/logical/sql/sql_planner.h b/src/dataflow/core/logical/sql/sql_planner.h index d3ed70d..a1ae71a 100644 --- a/src/dataflow/core/logical/sql/sql_planner.h +++ b/src/dataflow/core/logical/sql/sql_planner.h @@ -77,10 +77,10 @@ struct StreamPlanNode { std::vector columns; std::vector> aliases; std::vector group_keys; + std::vector aggregates; std::string value_column; std::string output_column; bool stateful = false; - bool is_count_star = false; std::size_t limit = 0; uint64_t window_ms = 0; }; diff --git a/src/dataflow/tests/sql_regression_test.cc b/src/dataflow/tests/sql_regression_test.cc index cc513ed..2599a71 100644 --- a/src/dataflow/tests/sql_regression_test.cc +++ b/src/dataflow/tests/sql_regression_test.cc @@ -344,14 +344,63 @@ void runStreamSqlRegression() { }, "does not support JOIN"); - expectThrows( - "stream_sql_avg_rejected", - [&]() { - s.streamSql( - "SELECT key, AVG(value) AS avg_value " - "FROM stream_events_csv_v1 GROUP BY key"); - }, - "only supports SUM and COUNT(*)"); + const std::string multi_sink_path = "/tmp/velaria-stream-sql-multi-aggregate-output.csv"; + fs::remove(multi_sink_path); + dataflow::Table multi_batch; + multi_batch.schema = dataflow::Schema({"key", "value"}); + multi_batch.rows = { + {Value("userA"), Value(int64_t(10))}, + {Value("userA"), Value(int64_t(5))}, + {Value("userA"), Value(int64_t(7))}, + {Value("userB"), Value(int64_t(20))}, + }; + s.createTempView( + "stream_multi_events_v1", + s.readStream(std::make_shared(std::vector
{multi_batch}))); + s.sql( + "CREATE SINK TABLE stream_multi_summary_v1 " + "(key STRING, value_sum INT, event_count INT, min_value INT, max_value INT, avg_value DOUBLE) " + "USING csv OPTIONS(path '" + + multi_sink_path + "', delimiter ',')"); + + dataflow::StreamingQueryOptions multi_options; + multi_options.trigger_interval_ms = 0; + + auto multi_query = s.startStreamSql( + "INSERT INTO stream_multi_summary_v1 " + "SELECT key, SUM(value) AS value_sum, COUNT(*) AS event_count, " + "MIN(value) AS min_value, MAX(value) AS max_value, AVG(value) AS avg_value " + "FROM stream_multi_events_v1 " + "GROUP BY key " + "HAVING avg_value > 7", + multi_options); + expect(multi_query.awaitTermination(1) == 1, "stream_sql_multi_aggregate_processed_batches"); + + const auto multi_sink_table = s.read_csv(multi_sink_path).toTable(); + expect(multi_sink_table.rows.size() == 2, "stream_sql_multi_aggregate_rows"); + bool has_multi_user_a = false; + bool has_multi_user_b = false; + const auto multi_key_idx = multi_sink_table.schema.indexOf("key"); + const auto multi_sum_idx = multi_sink_table.schema.indexOf("value_sum"); + const auto multi_count_idx = multi_sink_table.schema.indexOf("event_count"); + const auto multi_min_idx = multi_sink_table.schema.indexOf("min_value"); + const auto multi_max_idx = multi_sink_table.schema.indexOf("max_value"); + const auto multi_avg_idx = multi_sink_table.schema.indexOf("avg_value"); + for (const auto& row : multi_sink_table.rows) { + if (row[multi_key_idx].toString() == "userA" && row[multi_sum_idx].asInt64() == 22 && + row[multi_count_idx].asInt64() == 3 && row[multi_min_idx].asInt64() == 5 && + row[multi_max_idx].asInt64() == 10 && row[multi_avg_idx].asDouble() > 7.3 && + row[multi_avg_idx].asDouble() < 7.4) { + has_multi_user_a = true; + } + if (row[multi_key_idx].toString() == "userB" && row[multi_sum_idx].asInt64() == 20 && + row[multi_count_idx].asInt64() == 1 && row[multi_min_idx].asInt64() == 20 && + row[multi_max_idx].asInt64() == 20 && row[multi_avg_idx].asDouble() == 20.0) { + has_multi_user_b = true; + } + } + expect(has_multi_user_a, "stream_sql_multi_aggregate_user_a"); + expect(has_multi_user_b, "stream_sql_multi_aggregate_user_b"); expectThrows( "stream_sql_regular_csv_rejected", @@ -418,6 +467,47 @@ void runStreamSqlRegression() { } expect(has_window_user_a, "stream_sql_window_sink_has_user_a"); expect(has_window_user_b, "stream_sql_window_sink_has_user_b"); + + const std::string window_count_sink_path = "/tmp/velaria-stream-window-count-sql-regression-output.csv"; + fs::remove(window_count_sink_path); + s.sql( + "CREATE SINK TABLE stream_window_count_summary_v1 " + "(window_start STRING, key STRING, event_count INT) " + "USING csv OPTIONS(path '" + + window_count_sink_path + "', delimiter ',')"); + + auto window_count_query = s.startStreamSql( + "INSERT INTO stream_window_count_summary_v1 " + "SELECT window_start, key, COUNT(*) AS event_count " + "FROM stream_window_events_v1 " + "WINDOW BY ts EVERY 60000 AS window_start " + "GROUP BY window_start, key", + window_options); + expect(window_count_query.awaitTermination() == 1, "stream_sql_window_count_processed_batches"); + expect(window_count_query.progress().execution_mode == "actor-credit", + "stream_sql_window_count_actor_hot_path"); + + const auto window_count_sink_table = s.read_csv(window_count_sink_path).toTable(); + expect(window_count_sink_table.rows.size() == 2, "stream_sql_window_count_sink_rows"); + bool has_count_user_a = false; + bool has_count_user_b = false; + const auto count_window_idx = window_count_sink_table.schema.indexOf("window_start"); + const auto count_key_idx = window_count_sink_table.schema.indexOf("key"); + const auto count_value_idx = window_count_sink_table.schema.indexOf("event_count"); + for (const auto& row : window_count_sink_table.rows) { + if (row[count_window_idx].toString() == "2026-03-29T10:00:00" && + row[count_key_idx].toString() == "userA" && + row[count_value_idx].asInt64() == 2) { + has_count_user_a = true; + } + if (row[count_window_idx].toString() == "2026-03-29T10:01:00" && + row[count_key_idx].toString() == "userB" && + row[count_value_idx].asInt64() == 1) { + has_count_user_b = true; + } + } + expect(has_count_user_a, "stream_sql_window_count_sink_has_user_a"); + expect(has_count_user_b, "stream_sql_window_count_sink_has_user_b"); } } // namespace diff --git a/src/dataflow/tests/stream_runtime_test.cc b/src/dataflow/tests/stream_runtime_test.cc index db45cd7..98706dd 100644 --- a/src/dataflow/tests/stream_runtime_test.cc +++ b/src/dataflow/tests/stream_runtime_test.cc @@ -1,12 +1,16 @@ #include +#include #include #include #include +#include +#include #include #include #include #include #include +#include #include #include "src/dataflow/core/contract/api/session.h" @@ -52,6 +56,174 @@ class CollectSink : public dataflow::StreamSink { std::vector batches_; }; +class PersistentStateStore : public dataflow::StateStore { + public: + explicit PersistentStateStore(std::string path) : path_(std::move(path)) { load(); } + + bool get(const std::string& key, std::string* out) const override { + const auto it = values_.find(key); + if (it == values_.end()) return false; + if (out != nullptr) *out = it->second; + return true; + } + + void put(const std::string& key, const std::string& value) override { + values_[key] = value; + flush(); + } + + bool remove(const std::string& key) override { + const bool removed = values_.erase(key) > 0; + if (removed) flush(); + return removed; + } + + bool getMapField(const std::string& mapKey, const std::string& field, + std::string* out) const override { + const auto map_it = maps_.find(mapKey); + if (map_it == maps_.end()) return false; + const auto field_it = map_it->second.find(field); + if (field_it == map_it->second.end()) return false; + if (out != nullptr) *out = field_it->second; + return true; + } + + void putMapField(const std::string& mapKey, const std::string& field, + const std::string& value) override { + maps_[mapKey][field] = value; + flush(); + } + + bool removeMapField(const std::string& mapKey, const std::string& field) override { + const auto map_it = maps_.find(mapKey); + if (map_it == maps_.end()) return false; + const bool removed = map_it->second.erase(field) > 0; + if (map_it->second.empty()) { + maps_.erase(map_it); + } + if (removed) flush(); + return removed; + } + + bool getMapFields(const std::string& mapKey, std::vector* fields) const override { + if (fields == nullptr) return false; + fields->clear(); + const auto map_it = maps_.find(mapKey); + if (map_it == maps_.end()) return false; + for (const auto& entry : map_it->second) { + fields->push_back(entry.first); + } + return !fields->empty(); + } + + bool getValueList(const std::string& listKey, std::vector* values) const override { + if (values == nullptr) return false; + values->clear(); + const auto it = lists_.find(listKey); + if (it == lists_.end()) return false; + *values = it->second; + return true; + } + + bool setValueList(const std::string& listKey, + const std::vector& values) override { + lists_[listKey] = values; + flush(); + return true; + } + + bool appendValueToList(const std::string& listKey, const std::string& value) override { + lists_[listKey].push_back(value); + flush(); + return true; + } + + bool popValueFromList(const std::string& listKey, std::string* value) override { + const auto it = lists_.find(listKey); + if (it == lists_.end() || it->second.empty()) return false; + if (value != nullptr) *value = it->second.back(); + it->second.pop_back(); + if (it->second.empty()) { + lists_.erase(it); + } + flush(); + return true; + } + + bool listKeys(std::vector* keys) const override { + if (keys == nullptr) return false; + keys->clear(); + for (const auto& entry : values_) { + keys->push_back(entry.first); + } + return !keys->empty(); + } + + void close() override { flush(); } + + private: + void load() { + std::ifstream in(path_); + if (!in) return; + std::string tag; + while (in >> tag) { + if (tag == "K") { + std::string key; + std::string value; + in >> std::quoted(key) >> std::quoted(value); + values_[key] = value; + } else if (tag == "M") { + std::string map_key; + std::string field; + std::string value; + in >> std::quoted(map_key) >> std::quoted(field) >> std::quoted(value); + maps_[map_key][field] = value; + } else if (tag == "L") { + std::string list_key; + size_t size = 0; + in >> std::quoted(list_key) >> size; + auto& list = lists_[list_key]; + list.clear(); + for (size_t i = 0; i < size; ++i) { + std::string value; + in >> std::quoted(value); + list.push_back(value); + } + } else { + throw std::runtime_error("persistent state store encountered unknown record"); + } + } + } + + void flush() const { + std::ofstream out(path_, std::ios::trunc); + if (!out) { + throw std::runtime_error("cannot write persistent state store: " + path_); + } + for (const auto& entry : values_) { + out << "K " << std::quoted(entry.first) << " " << std::quoted(entry.second) << "\n"; + } + for (const auto& map_entry : maps_) { + for (const auto& field_entry : map_entry.second) { + out << "M " << std::quoted(map_entry.first) << " " << std::quoted(field_entry.first) + << " " << std::quoted(field_entry.second) << "\n"; + } + } + for (const auto& list_entry : lists_) { + out << "L " << std::quoted(list_entry.first) << " " << list_entry.second.size(); + for (const auto& value : list_entry.second) { + out << " " << std::quoted(value); + } + out << "\n"; + } + } + + std::string path_; + std::unordered_map values_; + std::unordered_map> maps_; + std::unordered_map> lists_; +}; + std::vector readNonEmptyLines(const std::string& path) { std::ifstream in(path); std::vector lines; @@ -87,6 +259,24 @@ void expectContains(const std::string& haystack, const std::string& needle, } } +void expectNear(double actual, double expected, double tolerance, const std::string& message) { + if (std::abs(actual - expected) > tolerance) { + throw std::runtime_error(message); + } +} + +double lookupMetric(const dataflow::Table& table, const std::string& output_column, + const std::string& user) { + const auto key_idx = table.schema.indexOf("key"); + const auto value_idx = table.schema.indexOf(output_column); + for (const auto& row : table.rows) { + if (row[key_idx].toString() == user) { + return row[value_idx].asDouble(); + } + } + throw std::runtime_error("missing metric row for user " + user); +} + void testBackpressure() { dataflow::DataflowSession& session = dataflow::DataflowSession::builder(); std::vector batches; @@ -164,6 +354,141 @@ void testStatefulWindowCount() { expect(user_b == 2, "userB count should accumulate to 2"); } +void testAggregateVariants() { + dataflow::DataflowSession& session = dataflow::DataflowSession::builder(); + std::vector batches = { + makeWindowBatch({{"2026-03-28T09:00:00", "userA", 10}, + {"2026-03-28T09:00:10", "userA", 5}, + {"2026-03-28T09:00:20", "userB", 7}}), + makeWindowBatch({{"2026-03-28T09:00:30", "userA", 3}, + {"2026-03-28T09:00:40", "userB", 9}}), + }; + + const auto run_and_capture = + [&](const std::function& build) { + auto sink = std::make_shared(); + auto query = build(session.readStream(std::make_shared(batches)) + .withStateStore(dataflow::makeMemoryStateStore()) + .window("ts", 60000, "window_start")) + .writeStream(sink, dataflow::StreamingQueryOptions{}); + query.start(); + expect(query.awaitTermination() == 2, "aggregate variant query should process all batches"); + const auto outputs = sink->batches(); + expect(outputs.size() == 2, "aggregate variant query should emit two batches"); + return outputs.back(); + }; + + const auto sum_table = run_and_capture([](const dataflow::StreamingDataFrame& df) { + return df.groupBy({"window_start", "key"}).sum("value", true, "value_sum"); + }); + const auto count_table = run_and_capture([](const dataflow::StreamingDataFrame& df) { + return df.groupBy({"window_start", "key"}).count(true, "event_count"); + }); + const auto min_table = run_and_capture([](const dataflow::StreamingDataFrame& df) { + return df.groupBy({"window_start", "key"}).min("value", true, "min_value"); + }); + const auto max_table = run_and_capture([](const dataflow::StreamingDataFrame& df) { + return df.groupBy({"window_start", "key"}).max("value", true, "max_value"); + }); + const auto avg_table = run_and_capture([](const dataflow::StreamingDataFrame& df) { + return df.groupBy({"window_start", "key"}).avg("value", true, "avg_value"); + }); + + expect(lookupMetric(sum_table, "value_sum", "userA") == 18.0, + "sum aggregate should keep userA running sum"); + expect(lookupMetric(sum_table, "value_sum", "userB") == 16.0, + "sum aggregate should keep userB running sum"); + expect(lookupMetric(count_table, "event_count", "userA") == 3.0, + "count aggregate should keep userA running count"); + expect(lookupMetric(count_table, "event_count", "userB") == 2.0, + "count aggregate should keep userB running count"); + expect(lookupMetric(min_table, "min_value", "userA") == 3.0, + "min aggregate should keep userA running min"); + expect(lookupMetric(min_table, "min_value", "userB") == 7.0, + "min aggregate should keep userB running min"); + expect(lookupMetric(max_table, "max_value", "userA") == 10.0, + "max aggregate should keep userA running max"); + expect(lookupMetric(max_table, "max_value", "userB") == 9.0, + "max aggregate should keep userB running max"); + expect(lookupMetric(avg_table, "avg_value", "userA") == 6.0, + "avg aggregate should keep userA running avg"); + expect(lookupMetric(avg_table, "avg_value", "userB") == 8.0, + "avg aggregate should keep userB running avg"); +} + +void testCheckpointRestoreAggregateVariants() { + namespace fs = std::filesystem; + dataflow::DataflowSession& session = dataflow::DataflowSession::builder(); + const std::vector batches = { + makeWindowBatch({{"2026-03-28T12:00:00", "userA", 10}}), + makeWindowBatch({{"2026-03-28T12:00:10", "userA", 4}}), + makeWindowBatch({{"2026-03-28T12:00:20", "userA", 8}}), + }; + + const auto run_checkpoint_restore = + [&](const std::string& checkpoint, const std::string& state_path, + const std::function& build) { + fs::remove(checkpoint); + fs::remove(state_path); + dataflow::StreamingQueryOptions options; + options.trigger_interval_ms = 0; + options.checkpoint_path = checkpoint; + options.checkpoint_delivery_mode = dataflow::CheckpointDeliveryMode::BestEffort; + + auto first_state = std::make_shared(state_path); + auto first = build(session.readStream(std::make_shared(batches)) + .withStateStore(first_state) + .window("ts", 60000, "window_start")) + .writeStream(std::make_shared(), options); + first.start(); + expect(first.awaitTermination(1) == 1, + "aggregate restore first run should stop after one batch"); + first_state->close(); + first_state.reset(); + + auto second_sink = std::make_shared(); + auto second_state = std::make_shared(state_path); + auto second = build(session.readStream(std::make_shared(batches)) + .withStateStore(second_state) + .window("ts", 60000, "window_start")) + .writeStream(second_sink, options); + second.start(); + expect(second.awaitTermination() == 2, + "aggregate restore second run should resume remaining batches"); + second_state->close(); + const auto outputs = second_sink->batches(); + expect(!outputs.empty(), "aggregate restore should emit output after resume"); + return outputs.back(); + }; + + const auto min_table = run_checkpoint_restore( + "/tmp/velaria-stream-runtime-test-min.checkpoint", + "/tmp/velaria-stream-runtime-test-min.state", + [](const dataflow::StreamingDataFrame& df) { + return df.groupBy({"window_start", "key"}).min("value", true, "min_value"); + }); + expect(lookupMetric(min_table, "min_value", "userA") == 4.0, + "min aggregate should restore its running minimum"); + + const auto max_table = run_checkpoint_restore( + "/tmp/velaria-stream-runtime-test-max.checkpoint", + "/tmp/velaria-stream-runtime-test-max.state", + [](const dataflow::StreamingDataFrame& df) { + return df.groupBy({"window_start", "key"}).max("value", true, "max_value"); + }); + expect(lookupMetric(max_table, "max_value", "userA") == 10.0, + "max aggregate should restore its running maximum"); + + const auto avg_table = run_checkpoint_restore( + "/tmp/velaria-stream-runtime-test-avg.checkpoint", + "/tmp/velaria-stream-runtime-test-avg.state", + [](const dataflow::StreamingDataFrame& df) { + return df.groupBy({"window_start", "key"}).avg("value", true, "avg_value"); + }); + expectNear(lookupMetric(avg_table, "avg_value", "userA"), 22.0 / 3.0, 1e-9, + "avg aggregate should restore its running average"); +} + void testCheckpointRestoreAtLeastOnceDefault() { namespace fs = std::filesystem; const std::string checkpoint = "/tmp/velaria-stream-runtime-test.checkpoint"; @@ -297,6 +622,54 @@ void testWindowEviction() { "window eviction should keep newest window key"); } +void testWindowEvictionAggregateVariants() { + dataflow::DataflowSession& session = dataflow::DataflowSession::builder(); + const std::vector batches = { + makeWindowBatch({{"2026-03-28T13:00:00", "userA", 10}}), + makeWindowBatch({{"2026-03-28T13:01:05", "userA", 4}}), + makeWindowBatch({{"2026-03-28T13:02:10", "userA", 8}}), + }; + + const auto run_eviction = + [&](const std::shared_ptr& state, + const std::function& build, + const std::string& prefix) { + dataflow::StreamingQueryOptions options; + options.trigger_interval_ms = 0; + options.max_retained_windows = 1; + + auto query = build(session.readStream(std::make_shared(batches)) + .withStateStore(state) + .window("ts", 60000, "window_start")) + .writeStream(std::make_shared(), options); + query.start(); + expect(query.awaitTermination() == 3, "aggregate eviction query should process all batches"); + + std::vector state_keys; + state->listKeysByPrefix(prefix, &state_keys); + expect(state_keys.size() == 1, + "aggregate eviction should retain only the latest window state"); + expect(state_keys.front().find("2026-03-28T13:02:00") != std::string::npos, + "aggregate eviction should keep the newest window key"); + }; + + run_eviction(dataflow::makeMemoryStateStore(), + [](const dataflow::StreamingDataFrame& df) { + return df.groupBy({"window_start", "key"}).min("value", true, "min_value"); + }, + "group_min:min_value"); + run_eviction(dataflow::makeMemoryStateStore(), + [](const dataflow::StreamingDataFrame& df) { + return df.groupBy({"window_start", "key"}).max("value", true, "max_value"); + }, + "group_max:max_value"); + run_eviction(dataflow::makeMemoryStateStore(), + [](const dataflow::StreamingDataFrame& df) { + return df.groupBy({"window_start", "key"}).avg("value", true, "avg_value"); + }, + "group_avg:avg_value"); +} + void testSnapshotJsonContract() { dataflow::DataflowSession& session = dataflow::DataflowSession::builder(); auto sink = std::make_shared(); @@ -370,10 +743,13 @@ int main() { try { testBackpressure(); testStatefulWindowCount(); + testAggregateVariants(); + testCheckpointRestoreAggregateVariants(); testCheckpointRestoreAtLeastOnceDefault(); testCheckpointRestoreBestEffort(); testCheckpointRestoreDuplicatesSinkOutputAtLeastOnce(); testWindowEviction(); + testWindowEvictionAggregateVariants(); testSnapshotJsonContract(); std::cout << "[test] stream runtime ok" << std::endl; return 0; diff --git a/src/dataflow/tests/stream_strategy_explain_test.cc b/src/dataflow/tests/stream_strategy_explain_test.cc index 6bdf558..d31a536 100644 --- a/src/dataflow/tests/stream_strategy_explain_test.cc +++ b/src/dataflow/tests/stream_strategy_explain_test.cc @@ -59,6 +59,17 @@ std::unordered_map sumTableToMap(const Table& table) { return out; } +std::unordered_map countTableToMap(const Table& table) { + std::unordered_map out; + const auto window_idx = table.schema.indexOf("window_start"); + const auto key_idx = table.schema.indexOf("key"); + const auto value_idx = table.schema.indexOf("event_count"); + for (const auto& row : table.rows) { + out[row[window_idx].toString() + "|" + row[key_idx].toString()] = row[value_idx].asDouble(); + } + return out; +} + struct QueryRun { Table table; dataflow::StreamingQueryProgress progress; @@ -91,6 +102,33 @@ QueryRun runHotPath(StreamingExecutionMode mode) { return {sink->lastTable(), query.progress()}; } +QueryRun runCountHotPath(StreamingExecutionMode mode) { + DataflowSession& session = DataflowSession::builder(); + auto sink = std::make_shared(); + + StreamingQueryOptions options; + options.trigger_interval_ms = 0; + options.execution_mode = mode; + options.local_workers = 4; + options.actor_workers = 4; + options.actor_max_inflight_partitions = 4; + options.actor_auto_options.sample_batches = 1; + options.actor_auto_options.min_rows_per_batch = 0; + options.actor_auto_options.min_projected_payload_bytes = 0; + options.actor_auto_options.min_compute_to_overhead_ratio = 0.0; + options.actor_auto_options.min_actor_speedup = 0.0; + options.actor_auto_options.strong_actor_speedup = 0.0; + + auto query = session.readStream(std::make_shared(std::vector
{makeHotPathBatch()})) + .withStateStore(dataflow::makeMemoryStateStore()) + .groupBy({"window_start", "key"}) + .count(true, "event_count") + .writeStream(sink, options); + query.start(); + expect(query.awaitTermination() == 1, "count hot-path query should process one batch"); + return {sink->lastTable(), query.progress()}; +} + void testExplainStreamSql() { DataflowSession& session = DataflowSession::builder(); session.createTempView( @@ -99,6 +137,13 @@ void testExplainStreamSql() { session.sql( "CREATE SINK TABLE strategy_hot_sink (window_start STRING, key STRING, value_sum INT) " "USING csv OPTIONS(path '/tmp/velaria-stream-strategy-explain.csv', delimiter ',')"); + session.sql( + "CREATE SINK TABLE strategy_count_sink (window_start STRING, key STRING, event_count INT) " + "USING csv OPTIONS(path '/tmp/velaria-stream-strategy-count-explain.csv', delimiter ',')"); + session.sql( + "CREATE SINK TABLE strategy_multi_sink " + "(window_start STRING, key STRING, event_count INT, avg_value DOUBLE) " + "USING csv OPTIONS(path '/tmp/velaria-stream-strategy-multi-explain.csv', delimiter ',')"); StreamingQueryOptions options; options.execution_mode = StreamingExecutionMode::Auto; @@ -128,6 +173,41 @@ void testExplainStreamSql() { "explain should expose the initial selected mode"); expect(explain.find("actor_shared_memory_transport=true") != std::string::npos, "explain should expose shared-memory knobs"); + + const std::string count_explain = session.explainStreamSql( + "INSERT INTO strategy_count_sink " + "SELECT window_start, key, COUNT(*) AS event_count " + "FROM strategy_hot_events GROUP BY window_start, key", + options); + expect(count_explain.find("COUNT(*) AS event_count") != std::string::npos, + "count explain should describe count aggregate"); + expect(count_explain.find("actor_eligible=true") != std::string::npos, + "count explain should expose actor eligibility"); + + const std::string multi_explain = session.explainStreamSql( + "INSERT INTO strategy_multi_sink " + "SELECT window_start, key, COUNT(*) AS event_count, AVG(value) AS avg_value " + "FROM strategy_hot_events GROUP BY window_start, key", + options); + expect(multi_explain.find("AVG(value) AS avg_value") != std::string::npos, + "multi explain should describe avg aggregate"); + expect(multi_explain.find("actor_eligible=false") != std::string::npos, + "multi explain should disable actor eligibility for multi aggregate"); + + const std::string having_explain = session.explainStreamSql( + "INSERT INTO strategy_hot_sink " + "SELECT window_start, key, SUM(value) AS value_sum " + "FROM strategy_hot_events GROUP BY window_start, key HAVING value_sum > 0", + options); + const std::string final_transform_reason = + "actor acceleration requires the aggregate hot path to be the final stream transform"; + expect(having_explain.find("actor_eligible=false") != std::string::npos, + "having explain should disable actor eligibility after aggregate"); + expect(having_explain.find("actor_eligibility_reason=" + final_transform_reason) != + std::string::npos, + "having explain should expose the final-transform eligibility reason"); + expect(having_explain.find("reason=" + final_transform_reason) != std::string::npos, + "having explain strategy reason should stay aligned with physical eligibility reason"); } void testExecutionModeConsistency() { @@ -152,6 +232,16 @@ void testExecutionModeConsistency() { expect(automatic.progress.transport_mode == "shared-memory" || automatic.progress.transport_mode == "rpc-copy", "auto hot path should expose actor transport mode"); + + const auto count_single = runCountHotPath(StreamingExecutionMode::SingleProcess); + const auto count_actor = runCountHotPath(StreamingExecutionMode::ActorCredit); + const auto count_auto = runCountHotPath(StreamingExecutionMode::Auto); + expect(countTableToMap(count_actor.table) == countTableToMap(count_single.table), + "count actor-credit result should match single-process"); + expect(countTableToMap(count_auto.table) == countTableToMap(count_single.table), + "count auto result should match single-process"); + expect(count_auto.progress.execution_mode == "actor-credit", + "count auto hot path should resolve to actor-credit"); } void testAutoFallbackForNonHotPath() {