diff --git a/docs/streaming_runtime_design.md b/docs/streaming_runtime_design.md index ca31418..d603bda 100644 --- a/docs/streaming_runtime_design.md +++ b/docs/streaming_runtime_design.md @@ -55,21 +55,24 @@ This document describes runtime internals and current implementation shape. For ## Current Actor Pushdown Boundary -当前只有少数固定形状的 query 会被 `LocalWorkers` 内部的 credit-based hot path 接住: +当前只有满足 grouped-aggregate hot path 条件的 query 会被 `LocalWorkers` 内部的 credit-based 路径接住: - 前置变换全部为 `PartitionLocal` -- 最后一个 barrier 是以下其一: - - `groupBy({"window_start", "key"}).sum("value", ...)` - - `groupBy({"window_start", "key"}).count(...)` +- 最后一个 barrier 是 stateful grouped aggregate +- aggregate 函数当前支持: + - `SUM` + - `COUNT(*)` + - `MIN` + - `MAX` + - `AVG` -也就是说,当前 actor runtime 只服务于: +也就是说,当前 actor runtime 现在服务于: -- `window_start` -- `key` -- `value` 的窗口分组求和热路径 -- `COUNT(*)` 的窗口分组计数热路径 +- 任意 group key 列组合 +- 单聚合与多聚合输出 +- `AVG` 通过内部 `sum + count` helper state 收敛 -`MIN / MAX / AVG` 以及多 aggregate 输出当前仍走本地执行链;`LocalWorkers` 会明确写出 fallback reason。 +如果 query 不满足“最终 transform + stateful grouped aggregate + 支持函数集合”这条边界,`LocalWorkers` 会明确写出 fallback reason。 如果 query 不满足这个形状,`StreamingQuery` 会回退到普通执行链,并把原因写入: @@ -155,9 +158,9 @@ actor-stream payload 当前使用 typed binary batch: ## Recommended Next Steps -1. 扩展 credit-based local acceleration 到 `count` 和更多 group aggregate。 -2. 给 query 级 progress 增加更细粒度的 accelerator 指标拆分。 -3. 继续把 source 到执行内核的中间 `Table/Row/Value` 转换收紧到更早的列式表示。 +1. 给 query 级 progress 增加更细粒度的 accelerator 指标拆分。 +2. 继续把 source 到执行内核的中间 `Table/Row/Value` 转换收紧到更早的列式表示。 +3. 评估是否要给 grouped-aggregate hot path 增加更细的 payload/schema 预编译缓存。 ## Source/Sink ABI Bridge (v0.4) diff --git a/python_api/tests/test_streaming_v05.py b/python_api/tests/test_streaming_v05.py index 52144cb..3ae9261 100644 --- a/python_api/tests/test_streaming_v05.py +++ b/python_api/tests/test_streaming_v05.py @@ -115,6 +115,108 @@ def test_start_stream_sql_supports_multi_aggregate_and_having_alias(self): self.assertEqual(float(rows[0]["max_value"]), 10.0) self.assertAlmostEqual(float(rows[0]["avg_value"]), 22.0 / 3.0, places=5) + def test_start_stream_sql_hits_local_workers_grouped_hot_path(self): + session = velaria.Session() + table = pa.table( + { + "segment": ["alpha", "alpha", "alpha", "beta", "beta"], + "bucket": [1, 1, 2, 1, 1], + "value": [10, 14, 3, 6, 8], + } + ) + stream_df = session.create_stream_from_arrow(table) + session.create_temp_view("events_stream_local_workers", stream_df) + + with tempfile.TemporaryDirectory(prefix="velaria-py-stream-hot-path-") as tmp: + sink_path = str(pathlib.Path(tmp) / "sink.csv") + session.sql( + "CREATE SINK TABLE sink_local_workers " + "(segment STRING, bucket INT, value_sum INT, event_count INT, " + "min_value INT, max_value INT, avg_value DOUBLE) " + f"USING csv OPTIONS(path '{sink_path}', delimiter ',')" + ) + query = session.start_stream_sql( + "INSERT INTO sink_local_workers " + "SELECT segment, bucket, SUM(value) AS value_sum, COUNT(*) AS event_count, " + "MIN(value) AS min_value, MAX(value) AS max_value, AVG(value) AS avg_value " + "FROM events_stream_local_workers GROUP BY segment, bucket", + trigger_interval_ms=0, + execution_mode="local-workers", + local_workers=4, + max_inflight_partitions=4, + ) + query.start() + processed = query.await_termination() + progress = query.progress() + + self.assertEqual(processed, 1) + self.assertEqual(progress["execution_mode"], "local-workers") + self.assertTrue(progress["actor_eligible"]) + self.assertTrue(progress["used_actor_runtime"]) + self.assertIn(progress["transport_mode"], ["shared-memory", "rpc-copy"]) + + with open(sink_path, newline="", encoding="utf-8") as handle: + rows = list(csv.DictReader(handle)) + + keyed = {(row["segment"], int(float(row["bucket"]))): row for row in rows} + self.assertEqual(len(keyed), 3) + self.assertEqual(float(keyed[("alpha", 1)]["value_sum"]), 24.0) + self.assertEqual(float(keyed[("alpha", 1)]["event_count"]), 2.0) + self.assertEqual(float(keyed[("alpha", 1)]["min_value"]), 10.0) + self.assertEqual(float(keyed[("alpha", 1)]["max_value"]), 14.0) + self.assertAlmostEqual(float(keyed[("alpha", 1)]["avg_value"]), 12.0, places=5) + self.assertEqual(float(keyed[("alpha", 2)]["value_sum"]), 3.0) + self.assertEqual(float(keyed[("beta", 1)]["value_sum"]), 14.0) + self.assertAlmostEqual(float(keyed[("beta", 1)]["avg_value"]), 7.0, places=5) + + def test_start_stream_sql_zero_worker_settings_fall_back_to_inproc_local_workers(self): + session = velaria.Session() + table = pa.table( + { + "ts": ["2026-03-29T10:00:00", "2026-03-29T10:00:10"], + "key": ["u1", "u1"], + "value": [1, 2], + } + ) + stream_df = session.create_stream_from_arrow(table) + session.create_temp_view("events_stream_zero_workers", stream_df) + + with tempfile.TemporaryDirectory(prefix="velaria-py-stream-zero-workers-") as tmp: + sink_path = str(pathlib.Path(tmp) / "sink.csv") + session.sql( + "CREATE SINK TABLE sink_zero_workers " + "(window_start STRING, key STRING, value_sum INT) " + f"USING csv OPTIONS(path '{sink_path}', delimiter ',')" + ) + query = session.start_stream_sql( + "INSERT INTO sink_zero_workers " + "SELECT window_start, key, SUM(value) AS value_sum " + "FROM events_stream_zero_workers " + "WINDOW BY ts EVERY 60000 AS window_start " + "GROUP BY window_start, key", + trigger_interval_ms=0, + execution_mode="local-workers", + local_workers=0, + max_inflight_partitions=0, + ) + query.start() + processed = query.await_termination() + progress = query.progress() + + self.assertEqual(processed, 1) + self.assertEqual(progress["execution_mode"], "local-workers") + self.assertFalse(progress["used_actor_runtime"]) + self.assertEqual(progress["transport_mode"], "inproc") + self.assertIn("local_workers > 1", progress["execution_reason"]) + + with open(sink_path, newline="", encoding="utf-8") as handle: + rows = list(csv.DictReader(handle)) + + self.assertEqual(len(rows), 1) + self.assertEqual(rows[0]["window_start"], "2026-03-29T10:00:00") + self.assertEqual(rows[0]["key"], "u1") + self.assertEqual(float(rows[0]["value_sum"]), 3.0) + def test_explain_stream_sql_returns_strategy_sections(self): session = velaria.Session() table = pa.table( @@ -142,6 +244,33 @@ def test_explain_stream_sql_returns_strategy_sections(self): self.assertIn("selected_mode", explain) self.assertIn("checkpoint_delivery_mode=best-effort", explain) + def test_explain_stream_sql_reports_local_workers_grouped_hot_path(self): + session = velaria.Session() + table = pa.table( + { + "segment": ["alpha", "alpha", "beta"], + "bucket": [1, 1, 1], + "value": [10, 14, 6], + } + ) + stream_df = session.create_stream_from_arrow(table) + session.create_temp_view("events_stream_local_workers_explain", stream_df) + + explain = session.explain_stream_sql( + "SELECT segment, bucket, SUM(value) AS value_sum, COUNT(*) AS event_count, " + "MIN(value) AS min_value, MAX(value) AS max_value, AVG(value) AS avg_value " + "FROM events_stream_local_workers_explain " + "GROUP BY segment, bucket", + trigger_interval_ms=0, + execution_mode="local-workers", + local_workers=4, + max_inflight_partitions=4, + ) + + self.assertIn("Aggregate keys=[segment, bucket]", explain) + self.assertIn("actor_eligible=true", explain) + self.assertIn("selected_mode=local-workers", explain) + def test_explain_stream_sql_reports_having_actor_fallback_reason(self): session = velaria.Session() table = pa.table( diff --git a/src/dataflow/core/execution/stream/stream.cc b/src/dataflow/core/execution/stream/stream.cc index e45b289..5f05870 100644 --- a/src/dataflow/core/execution/stream/stream.cc +++ b/src/dataflow/core/execution/stream/stream.cc @@ -382,8 +382,7 @@ bool queryHasStatefulOps(const std::vector& transforms) { bool queryHasWindowOps(const std::vector& transforms) { return std::any_of(transforms.begin(), transforms.end(), [](const StreamTransform& transform) { - return transform.label().find("window") != std::string::npos || - transform.accelerator().kind != StreamAcceleratorKind::None; + return transform.label().find("window") != std::string::npos; }); } @@ -432,10 +431,32 @@ ActorAccelerationAnalysis analyzeActorAcceleration(const std::vector& transforms) { const size_t transform_cost = std::max(1, transforms.size()) * 64; const bool has_stateful = @@ -682,28 +703,45 @@ Table applyStatefulGroupedAggregates(const Table& input, const std::vector columns = accelerator.group_keys; + for (const auto& aggregate : accelerator.aggregates) { + if (aggregate.function == AggregateFunction::Count) continue; + if (std::find(columns.begin(), columns.end(), aggregate.value_column) == columns.end()) { + columns.push_back(aggregate.value_column); + } + } + + Table out; + out.schema = Schema(columns); out.rows.reserve(table.rows.size()); for (const auto& row : table.rows) { - out.rows.push_back( - {row[window_idx], row[key_idx], Value(static_cast(1))}); + Row projected; + projected.reserve(columns.size()); + for (const auto& column : columns) { + projected.push_back(row[table.schema.indexOf(column)]); + } + out.rows.push_back(std::move(projected)); } return out; } @@ -716,46 +754,113 @@ void renameColumn(Table* table, const std::string& from, const std::string& to) table->schema.index[to] = idx; } -Table finalizeActorWindowKeyAggregateOutput(Table aggregated, const StreamAcceleratorSpec& accelerator, - std::shared_ptr state, - const StreamingQueryOptions& options, - uint64_t* state_ms) { +Table finalizeActorGroupedAggregateOutput(Table aggregated, const StreamAcceleratorSpec& accelerator, + std::shared_ptr state, + const StreamingQueryOptions& options, + uint64_t* state_ms) { auto started = std::chrono::steady_clock::now(); - const auto output_name = accelerator.aggregate.output_column.empty() - ? "value_sum" - : accelerator.aggregate.output_column; - renameColumn(&aggregated, "value_sum", output_name); - const auto value_idx = aggregated.schema.indexOf(output_name); - if (accelerator.kind == StreamAcceleratorKind::WindowKeyCount) { - for (auto& row : aggregated.rows) { - row[value_idx] = Value(static_cast(row[value_idx].asDouble())); - } + Table out; + out.schema.fields = accelerator.group_keys; + for (const auto& aggregate : accelerator.aggregates) { + out.schema.fields.push_back(aggregate.output_column); } + for (size_t i = 0; i < out.schema.fields.size(); ++i) { + out.schema.index[out.schema.fields[i]] = i; + } + if (!accelerator.stateful) { + out.rows.reserve(aggregated.rows.size()); + for (const auto& row : aggregated.rows) { + Row out_row; + out_row.reserve(out.schema.fields.size()); + for (const auto& key : accelerator.group_keys) { + out_row.push_back(row[aggregated.schema.indexOf(key)]); + } + for (const auto& aggregate : accelerator.aggregates) { + out_row.push_back(row[aggregated.schema.indexOf(aggregate.output_column)]); + } + out.rows.push_back(std::move(out_row)); + } if (state_ms != nullptr) *state_ms = 0; - return aggregated; + return out; } auto state_store = state ? state : makeMemoryStateStore(); - const size_t window_idx = aggregated.schema.indexOf("window_start"); - const size_t key_idx = aggregated.schema.indexOf("key"); - const auto label = stateLabelForAggregate(accelerator.aggregate); - for (auto& row : aggregated.rows) { - const auto state_key = makeStateKey(row, {window_idx, key_idx}, label + ":"); - const double delta = row[value_idx].asDouble(); - const double next = state_store->addDouble(state_key, delta); - if (accelerator.kind == StreamAcceleratorKind::WindowKeyCount) { - row[value_idx] = Value(static_cast(next)); - } else { - row[value_idx] = Value(next); + std::vector state_key_indices; + state_key_indices.reserve(accelerator.group_keys.size()); + for (size_t i = 0; i < accelerator.group_keys.size(); ++i) { + state_key_indices.push_back(i); + } + size_t window_pos = 0; + const bool has_window = findWindowKeyPosition(accelerator.group_keys, &window_pos); + std::unordered_set touched_labels; + out.rows.reserve(aggregated.rows.size()); + for (const auto& row : aggregated.rows) { + Row out_row; + out_row.reserve(out.schema.fields.size()); + for (const auto& key : accelerator.group_keys) { + out_row.push_back(row[aggregated.schema.indexOf(key)]); } - registerWindowState(state_store.get(), label, row[window_idx].toString(), state_key); + for (const auto& aggregate : accelerator.aggregates) { + const auto label = stateLabelForAggregate(aggregate); + touched_labels.insert(label); + const auto state_key = makeStateKey(out_row, state_key_indices, label + ":"); + if (aggregate.function == AggregateFunction::Sum) { + const double delta = row[aggregated.schema.indexOf(aggregate.output_column)].asDouble(); + out_row.push_back(Value(state_store->addDouble(state_key, delta))); + } else if (aggregate.function == AggregateFunction::Count) { + const double delta = static_cast( + row[aggregated.schema.indexOf(aggregate.output_column)].asInt64()); + out_row.push_back(Value(static_cast(state_store->addDouble(state_key, delta)))); + } else if (aggregate.function == AggregateFunction::Avg) { + const double delta_sum = + row[aggregated.schema.indexOf(actorStateAverageSumColumnName(aggregate.output_column))] + .asDouble(); + const int64_t delta_count = row[aggregated.schema.indexOf( + actorStateAverageCountColumnName(aggregate.output_column))] + .asInt64(); + std::string raw; + double current_sum = 0.0; + int64_t current_count = 0; + if (state_store->get(state_key, &raw)) { + parseAverageState(raw, ¤t_sum, ¤t_count); + } + current_sum += delta_sum; + current_count += delta_count; + state_store->put(state_key, serializeAverageState(current_sum, current_count)); + out_row.push_back( + Value(current_count == 0 ? 0.0 : current_sum / static_cast(current_count))); + } else { + const Value batch_value = row[aggregated.schema.indexOf(aggregate.output_column)]; + std::string raw; + Value current_value; + const bool has_current = + state_store->get(state_key, &raw) && parseStateValue(raw, ¤t_value); + bool replace = !has_current; + if (aggregate.function == AggregateFunction::Min) { + replace = replace || batch_value < current_value; + } else { + replace = replace || batch_value > current_value; + } + if (replace) { + current_value = batch_value; + state_store->put(state_key, serializeStateValue(current_value)); + } + out_row.push_back(current_value); + } + if (has_window && window_pos < out_row.size()) { + registerWindowState(state_store.get(), label, out_row[window_pos].toString(), state_key); + } + } + out.rows.push_back(std::move(out_row)); + } + for (const auto& label : touched_labels) { + evictExpiredWindows(state_store.get(), label, options); } - evictExpiredWindows(state_store.get(), label, options); if (state_ms != nullptr) { *state_ms = toMillis(std::chrono::steady_clock::now() - started); } - return aggregated; + return out; } } // namespace @@ -1666,17 +1771,11 @@ StreamingDataFrame GroupedStreamingDataFrame::aggregate( const std::vector& specs, bool stateful) const { auto t = transforms_; StreamAcceleratorSpec accelerator; - if (specs.size() == 1 && keys_ == std::vector{"window_start", "key"}) { - const auto& spec = specs.front(); - if (spec.function == AggregateFunction::Sum && spec.value_column == "value") { - accelerator.kind = StreamAcceleratorKind::WindowKeySum; - accelerator.stateful = stateful; - accelerator.aggregate = spec; - } else if (spec.function == AggregateFunction::Count && spec.is_count_star) { - accelerator.kind = StreamAcceleratorKind::WindowKeyCount; - accelerator.stateful = stateful; - accelerator.aggregate = spec; - } + if (stateful && !keys_.empty() && !specs.empty()) { + accelerator.kind = StreamAcceleratorKind::GroupedAggregate; + accelerator.stateful = true; + accelerator.group_keys = keys_; + accelerator.aggregates = specs; } if (!stateful) { @@ -2002,19 +2101,20 @@ size_t StreamingQuery::awaitTermination(size_t maxBatches) { Table actor_input = buildActorAggregateInput( executePartitionStage(envelope.table, actor_prefix_transforms, options_, 1), actor_accelerator); - if (canRunActorWindowKeyAggregate(actor_input, actor_accelerator)) { + if (canRunActorGroupedAggregate(actor_input, actor_accelerator)) { const auto actor_options = makeAcceleratorOptions(options_); - auto actor_result = - runLocalActorStreamWindowKeySum(std::vector{actor_input}, actor_options); - out = finalizeActorWindowKeyAggregateOutput(std::move(actor_result.final_table), - actor_accelerator, root_.state_, options_, - &state_ms); + auto actor_result = runLocalActorStreamGroupedAggregate( + std::vector
{actor_input}, toLocalGroupedAggregateSpec(actor_accelerator), + actor_options); + out = finalizeActorGroupedAggregateOutput(std::move(actor_result.final_table), + actor_accelerator, root_.state_, options_, + &state_ms); partitions_used = std::max(1, actor_result.processed_partitions); used_actor_runtime = true; execution_decided_ = true; resolved_execution_mode_ = StreamingExecutionMode::LocalWorkers; execution_reason_ = - "configured local-workers execution; using credit-based scheduling for the eligible hot path"; + "configured local-workers execution; using credit-based scheduling for the eligible grouped aggregate hot path"; strategy_decision_.resolved_execution_mode = streamingExecutionModeName(resolved_execution_mode_); strategy_decision_.reason = execution_reason_; @@ -2030,7 +2130,7 @@ size_t StreamingQuery::awaitTermination(size_t maxBatches) { execution_decided_ = true; resolved_execution_mode_ = StreamingExecutionMode::LocalWorkers; execution_reason_ = - "configured local-workers execution; using generic partition workers because credit scheduling requires window_start/key with supported SUM(value) or COUNT(*) shape"; + "configured local-workers execution; using generic partition workers because credit scheduling requires a final stateful grouped aggregate with supported SUM/COUNT/MIN/MAX/AVG outputs"; strategy_decision_.resolved_execution_mode = streamingExecutionModeName(resolved_execution_mode_); strategy_decision_.reason = execution_reason_; diff --git a/src/dataflow/core/execution/stream/stream.h b/src/dataflow/core/execution/stream/stream.h index 7efffbf..102a293 100644 --- a/src/dataflow/core/execution/stream/stream.h +++ b/src/dataflow/core/execution/stream/stream.h @@ -382,7 +382,7 @@ class MemoryStreamSink : public StreamSink { enum class StreamTransformMode { PartitionLocal, GlobalBarrier }; -enum class StreamAcceleratorKind { None, WindowKeySum, WindowKeyCount }; +enum class StreamAcceleratorKind { None, GroupedAggregate }; struct StreamAggregateSpec { AggregateFunction function = AggregateFunction::Sum; @@ -395,7 +395,8 @@ struct StreamAggregateSpec { struct StreamAcceleratorSpec { StreamAcceleratorKind kind = StreamAcceleratorKind::None; bool stateful = false; - StreamAggregateSpec aggregate; + std::vector group_keys; + std::vector aggregates; }; class RuntimeSourceAdapter : public StreamSource { diff --git a/src/dataflow/core/logical/sql/sql_planner.cc b/src/dataflow/core/logical/sql/sql_planner.cc index 0a60fa1..f761fae 100644 --- a/src/dataflow/core/logical/sql/sql_planner.cc +++ b/src/dataflow/core/logical/sql/sql_planner.cc @@ -359,14 +359,22 @@ bool streamPlanIsPartitionLocal(const StreamPlanNode& node) { } bool isActorHotPathAggregate(const StreamPlanNode& node) { - if (node.kind != StreamPlanNodeKind::Aggregate || !node.stateful || - node.group_keys != std::vector{"window_start", "key"} || - node.aggregates.size() != 1) { + if (node.kind != StreamPlanNodeKind::Aggregate || !node.stateful || node.group_keys.empty() || + node.aggregates.empty()) { return false; } - const auto& aggregate = node.aggregates.front(); - return (aggregate.function == AggregateFunction::Sum && aggregate.value_column == "value") || - (aggregate.function == AggregateFunction::Count && aggregate.is_count_star); + return std::all_of( + node.aggregates.begin(), node.aggregates.end(), [](const StreamAggregateSpec& aggregate) { + switch (aggregate.function) { + case AggregateFunction::Sum: + case AggregateFunction::Count: + case AggregateFunction::Avg: + case AggregateFunction::Min: + case AggregateFunction::Max: + return true; + } + return false; + }); } } // namespace @@ -967,12 +975,9 @@ StreamPhysicalPlan SqlPlanner::buildStreamPhysicalPlan(const StreamLogicalPlan& "actor acceleration requires the aggregate hot path to be the final stream transform"; return physical; } - const auto& aggregate = node.aggregates.front(); physical.actor_eligible = true; physical.actor_eligibility_reason = - aggregate.function == AggregateFunction::Sum - ? "window_start/key SUM(value) shape is eligible for actor acceleration" - : "window_start/key COUNT(*) shape is eligible for actor acceleration"; + "final stateful grouped aggregate with supported SUM/COUNT/MIN/MAX/AVG outputs is eligible for actor acceleration"; return physical; } diff --git a/src/dataflow/experimental/stream/actor_stream_runtime.cc b/src/dataflow/experimental/stream/actor_stream_runtime.cc index a276d14..3c28dab 100644 --- a/src/dataflow/experimental/stream/actor_stream_runtime.cc +++ b/src/dataflow/experimental/stream/actor_stream_runtime.cc @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -49,11 +50,290 @@ uint64_t actorOverheadMs(const LocalActorStreamResult& result) { result.worker_deserialize_ms + result.worker_serialize_ms; } +std::string partialValueColumnName(const std::string& output_column) { + return "__partial_" + output_column; +} + +std::string partialAverageSumColumnName(const std::string& output_column) { + return "__partial_sum_" + output_column; +} + +std::string partialAverageCountColumnName(const std::string& output_column) { + return "__partial_count_" + output_column; +} + +std::string stateAverageSumColumnName(const std::string& output_column) { + return "__state_sum_" + output_column; +} + +std::string stateAverageCountColumnName(const std::string& output_column) { + return "__state_count_" + output_column; +} + +struct RuntimeAggregateLayout { + LocalGroupedAggregateSpec::Aggregate aggregate; + std::string partial_value_column; + std::string partial_count_column; +}; + +double spinValue(double value, size_t cpu_spin_per_row); + +std::vector resolveGroupKeyIndices(const Table& input, + const std::vector& group_keys) { + std::vector indices; + indices.reserve(group_keys.size()); + for (const auto& key : group_keys) { + indices.push_back(input.schema.indexOf(key)); + } + return indices; +} + +std::vector buildRuntimePartialAggregateSpecs( + const Table& input, const LocalGroupedAggregateSpec& aggregate, + std::vector* layouts) { + if (layouts == nullptr) { + throw std::invalid_argument("layouts cannot be null"); + } + layouts->clear(); + + std::vector specs; + specs.reserve(aggregate.aggregates.size() * 2); + for (const auto& aggregate_spec : aggregate.aggregates) { + RuntimeAggregateLayout layout; + layout.aggregate = aggregate_spec; + switch (aggregate_spec.function) { + case AggregateFunction::Sum: + case AggregateFunction::Min: + case AggregateFunction::Max: { + layout.partial_value_column = partialValueColumnName(aggregate_spec.output_column); + specs.push_back(AggregateSpec{aggregate_spec.function, + input.schema.indexOf(aggregate_spec.value_column), + layout.partial_value_column}); + break; + } + case AggregateFunction::Count: { + layout.partial_count_column = partialValueColumnName(aggregate_spec.output_column); + specs.push_back(AggregateSpec{AggregateFunction::Count, static_cast(-1), + layout.partial_count_column}); + break; + } + case AggregateFunction::Avg: { + layout.partial_value_column = partialAverageSumColumnName(aggregate_spec.output_column); + layout.partial_count_column = partialAverageCountColumnName(aggregate_spec.output_column); + specs.push_back(AggregateSpec{AggregateFunction::Sum, + input.schema.indexOf(aggregate_spec.value_column), + layout.partial_value_column}); + specs.push_back(AggregateSpec{AggregateFunction::Count, static_cast(-1), + layout.partial_count_column}); + break; + } + } + layouts->push_back(std::move(layout)); + } + return specs; +} + +std::vector buildRuntimeMergeAggregateSpecs( + const Table& partials, const std::vector& layouts) { + std::vector specs; + specs.reserve(layouts.size() * 2); + for (const auto& layout : layouts) { + switch (layout.aggregate.function) { + case AggregateFunction::Sum: + specs.push_back(AggregateSpec{AggregateFunction::Sum, + partials.schema.indexOf(layout.partial_value_column), + layout.partial_value_column}); + break; + case AggregateFunction::Count: + specs.push_back(AggregateSpec{AggregateFunction::Sum, + partials.schema.indexOf(layout.partial_count_column), + layout.partial_count_column}); + break; + case AggregateFunction::Min: + specs.push_back(AggregateSpec{AggregateFunction::Min, + partials.schema.indexOf(layout.partial_value_column), + layout.partial_value_column}); + break; + case AggregateFunction::Max: + specs.push_back(AggregateSpec{AggregateFunction::Max, + partials.schema.indexOf(layout.partial_value_column), + layout.partial_value_column}); + break; + case AggregateFunction::Avg: + specs.push_back(AggregateSpec{AggregateFunction::Sum, + partials.schema.indexOf(layout.partial_value_column), + layout.partial_value_column}); + specs.push_back(AggregateSpec{AggregateFunction::Sum, + partials.schema.indexOf(layout.partial_count_column), + layout.partial_count_column}); + break; + } + } + return specs; +} + +Table makeAggregateOutputSkeleton(const LocalGroupedAggregateSpec& aggregate, + bool include_avg_state_helpers) { + Table table; + table.schema.fields = aggregate.group_keys; + for (const auto& aggregate_spec : aggregate.aggregates) { + table.schema.fields.push_back(aggregate_spec.output_column); + if (include_avg_state_helpers && aggregate_spec.function == AggregateFunction::Avg) { + table.schema.fields.push_back(stateAverageSumColumnName(aggregate_spec.output_column)); + table.schema.fields.push_back(stateAverageCountColumnName(aggregate_spec.output_column)); + } + } + for (size_t i = 0; i < table.schema.fields.size(); ++i) { + table.schema.index[table.schema.fields[i]] = i; + } + return table; +} + +Table applyCpuSpin(Table input, const LocalGroupedAggregateSpec& aggregate, size_t cpu_spin_per_row) { + if (cpu_spin_per_row == 0) { + return input; + } + + std::unordered_set value_columns; + for (const auto& aggregate_spec : aggregate.aggregates) { + if (aggregate_spec.function != AggregateFunction::Count) { + value_columns.insert(aggregate_spec.value_column); + } + } + for (const auto& column : value_columns) { + if (!input.schema.has(column)) continue; + const auto column_index = input.schema.indexOf(column); + for (auto& row : input.rows) { + if (column_index >= row.size() || row[column_index].isNull()) continue; + row[column_index] = Value(spinValue(row[column_index].asDouble(), cpu_spin_per_row)); + } + } + return input; +} + +std::vector buildInputProjectionColumns(const LocalGroupedAggregateSpec& aggregate) { + std::vector columns = aggregate.group_keys; + std::unordered_set seen(columns.begin(), columns.end()); + for (const auto& aggregate_spec : aggregate.aggregates) { + if (aggregate_spec.function == AggregateFunction::Count) continue; + if (seen.insert(aggregate_spec.value_column).second) { + columns.push_back(aggregate_spec.value_column); + } + } + return columns; +} + +Table appendTables(const std::vector
& tables) { + Table merged; + for (const auto& table : tables) { + if (merged.schema.fields.empty()) { + merged.schema = table.schema; + } + merged.rows.insert(merged.rows.end(), table.rows.begin(), table.rows.end()); + } + return merged; +} + +Table aggregateInputToPartial(const Table& input, const LocalGroupedAggregateSpec& aggregate, + size_t cpu_spin_per_row) { + if (aggregate.group_keys.empty() || aggregate.aggregates.empty()) { + return makeAggregateOutputSkeleton(aggregate, false); + } + Table spun = applyCpuSpin(input, aggregate, cpu_spin_per_row); + std::vector layouts; + const auto partial_specs = buildRuntimePartialAggregateSpecs(spun, aggregate, &layouts); + const auto key_indices = resolveGroupKeyIndices(spun, aggregate.group_keys); + return DataFrame(spun).aggregate(key_indices, partial_specs).toTable(); +} + +Table finalizeMergedPartialAggregates(const Table& partials, + const LocalGroupedAggregateSpec& aggregate) { + if (aggregate.group_keys.empty() || aggregate.aggregates.empty() || partials.rows.empty()) { + return makeAggregateOutputSkeleton(aggregate, true); + } + + std::vector layouts; + for (const auto& aggregate_spec : aggregate.aggregates) { + RuntimeAggregateLayout layout; + layout.aggregate = aggregate_spec; + switch (aggregate_spec.function) { + case AggregateFunction::Sum: + case AggregateFunction::Min: + case AggregateFunction::Max: + layout.partial_value_column = partialValueColumnName(aggregate_spec.output_column); + break; + case AggregateFunction::Count: + layout.partial_count_column = partialValueColumnName(aggregate_spec.output_column); + break; + case AggregateFunction::Avg: + layout.partial_value_column = partialAverageSumColumnName(aggregate_spec.output_column); + layout.partial_count_column = partialAverageCountColumnName(aggregate_spec.output_column); + break; + } + layouts.push_back(std::move(layout)); + } + + const auto key_indices = resolveGroupKeyIndices(partials, aggregate.group_keys); + const auto merge_specs = buildRuntimeMergeAggregateSpecs(partials, layouts); + const Table merged = DataFrame(partials).aggregate(key_indices, merge_specs).toTable(); + + Table output = makeAggregateOutputSkeleton(aggregate, true); + output.rows.reserve(merged.rows.size()); + for (const auto& merged_row : merged.rows) { + Row output_row; + output_row.reserve(output.schema.fields.size()); + for (size_t i = 0; i < aggregate.group_keys.size(); ++i) { + output_row.push_back(merged_row[i]); + } + for (const auto& layout : layouts) { + switch (layout.aggregate.function) { + case AggregateFunction::Sum: + case AggregateFunction::Min: + case AggregateFunction::Max: + output_row.push_back( + merged_row[merged.schema.indexOf(layout.partial_value_column)]); + break; + case AggregateFunction::Count: + output_row.push_back(Value(static_cast( + merged_row[merged.schema.indexOf(layout.partial_count_column)].asDouble()))); + break; + case AggregateFunction::Avg: { + const double sum = + merged_row[merged.schema.indexOf(layout.partial_value_column)].asDouble(); + const int64_t count = static_cast( + merged_row[merged.schema.indexOf(layout.partial_count_column)].asDouble()); + output_row.push_back(Value(count == 0 ? 0.0 : sum / static_cast(count))); + output_row.push_back(Value(sum)); + output_row.push_back(Value(count)); + break; + } + } + } + output.rows.push_back(std::move(output_row)); + } + return output; +} + +Table runSingleProcessGroupedAggregateImpl(const std::vector
& batches, + const LocalGroupedAggregateSpec& aggregate, + size_t cpu_spin_per_row) { + std::vector
partials; + partials.reserve(batches.size()); + for (const auto& batch : batches) { + partials.push_back(aggregateInputToPartial(batch, aggregate, cpu_spin_per_row)); + } + return finalizeMergedPartialAggregates(appendTables(partials), aggregate); +} + LocalActorStreamResult measureSingleProcessWindowKeySum(const std::vector
& batches, size_t cpu_spin_per_row) { LocalActorStreamResult result; auto started = std::chrono::steady_clock::now(); - result.final_table = runSingleProcessWindowKeySum(batches, cpu_spin_per_row); + LocalGroupedAggregateSpec aggregate; + aggregate.group_keys = {"window_start", "key"}; + aggregate.aggregates.push_back( + {AggregateFunction::Sum, "value", "value_sum", false}); + result.final_table = runSingleProcessGroupedAggregateImpl(batches, aggregate, cpu_spin_per_row); result.processed_batches = batches.size(); result.processed_partitions = batches.size(); result.elapsed_ms = static_cast( @@ -577,12 +857,10 @@ Table aggregatePartitionWithWork(const Table& input, size_t cpu_spin_per_row) { return aggregateVectorizedBatch(buildColumnarFromTable(input), cpu_spin_per_row); } -void workerLoop(int fd, uint64_t delay_ms, size_t cpu_spin_per_row, - size_t shared_memory_min_payload_bytes) { +void workerLoop(int fd, const LocalGroupedAggregateSpec& aggregate, uint64_t delay_ms, + size_t cpu_spin_per_row, size_t shared_memory_min_payload_bytes) { LengthPrefixedFrameCodec codec; BinaryRowBatchCodec batch_codec; - BinaryRowBatchOptions output_projection; - output_projection.projected_columns = {"window_start", "key", "partial_sum"}; ByteBufferPool payload_pool; ByteBufferPool frame_pool; @@ -609,26 +887,21 @@ void workerLoop(int fd, uint64_t delay_ms, size_t cpu_spin_per_row, request_region = openSharedMemoryReadRegion( SharedMemoryPayload{request.shared_memory_name, request.shared_memory_size}); } - WindowKeyValueColumnarBatch input; - const bool decoded = + const Table input = request.shared_memory - ? batch_codec.deserializeWindowKeyValueFromBuffer( - static_cast(request_region.mapped), request_region.size, &input) - : batch_codec.deserializeWindowKeyValue(request.table_payload, &input); + ? batch_codec.deserializeFromBuffer( + static_cast(request_region.mapped), request_region.size) + : batch_codec.deserialize(request.table_payload); closeSharedMemoryReadRegion(&request_region); - if (!decoded) { - throw std::runtime_error("deserializeWindowKeyValue failed"); - } const auto compute_started = std::chrono::steady_clock::now(); - const Table partial = aggregateVectorizedBatch(input, cpu_spin_per_row); + const Table partial = aggregateInputToPartial(input, aggregate, cpu_spin_per_row); const auto serialize_started = std::chrono::steady_clock::now(); result.ok = true; result.metrics.deserialize_ms = toMillis(compute_started - deserialize_started); result.metrics.compute_ms = toMillis(serialize_started - compute_started); - const PreparedBinaryRowBatch prepared_output = - batch_codec.prepare(partial, output_projection); + const PreparedBinaryRowBatch prepared_output = batch_codec.prepare(partial); const size_t estimated_payload = prepared_output.estimated_size; if (estimated_payload >= shared_memory_min_payload_bytes) { auto region = createSharedMemoryWriteRegion(estimated_payload); @@ -676,7 +949,8 @@ void workerLoop(int fd, uint64_t delay_ms, size_t cpu_spin_per_row, _exit(0); } -std::vector startWorkers(const LocalActorStreamOptions& options) { +std::vector startWorkers(const LocalGroupedAggregateSpec& aggregate, + const LocalActorStreamOptions& options) { std::vector workers; workers.reserve(options.worker_count); for (size_t i = 0; i < std::max(1, options.worker_count); ++i) { @@ -692,7 +966,7 @@ std::vector startWorkers(const LocalActorStreamOptions& options) { } if (pid == 0) { ::close(fds[0]); - workerLoop(fds[1], options.worker_delay_ms, options.cpu_spin_per_row, + workerLoop(fds[1], aggregate, options.worker_delay_ms, options.cpu_spin_per_row, options.shared_memory_min_payload_bytes); } ::close(fds[1]); @@ -734,27 +1008,36 @@ const char* localExecutionModeName(LocalExecutionMode mode) { } Table runSingleProcessWindowKeySum(const std::vector
& batches, size_t cpu_spin_per_row) { - std::unordered_map sums; - std::vector ordered_keys; - for (const auto& batch : batches) { - mergePartialTable(aggregatePartitionWithWork(batch, cpu_spin_per_row), &sums, &ordered_keys); - } - return materializeStateTable(sums, ordered_keys); + LocalGroupedAggregateSpec aggregate; + aggregate.group_keys = {"window_start", "key"}; + aggregate.aggregates.push_back( + {AggregateFunction::Sum, "value", "value_sum", false}); + return runSingleProcessGroupedAggregateImpl(batches, aggregate, cpu_spin_per_row); } -LocalActorStreamResult runLocalActorStreamWindowKeySum(const std::vector
& batches, - const LocalActorStreamOptions& options) { +Table runSingleProcessGroupedAggregate(const std::vector
& batches, + const LocalGroupedAggregateSpec& aggregate, + size_t cpu_spin_per_row) { + return runSingleProcessGroupedAggregateImpl(batches, aggregate, cpu_spin_per_row); +} + +LocalActorStreamResult runLocalActorStreamGroupedAggregate( + const std::vector
& batches, const LocalGroupedAggregateSpec& aggregate, + const LocalActorStreamOptions& options) { if (options.max_inflight_partitions == 0) { throw std::invalid_argument("max_inflight_partitions must be positive"); } + if (aggregate.group_keys.empty() || aggregate.aggregates.empty()) { + throw std::invalid_argument("grouped aggregate spec cannot be empty"); + } LocalActorStreamResult result; auto started = std::chrono::steady_clock::now(); - auto workers = startWorkers(options); + auto workers = startWorkers(aggregate, options); LengthPrefixedFrameCodec codec; BinaryRowBatchCodec batch_codec; BinaryRowBatchOptions input_projection; - input_projection.projected_columns = {"window_start", "key", "value"}; + input_projection.projected_columns = buildInputProjectionColumns(aggregate); ByteBufferPool payload_pool; ByteBufferPool frame_pool; @@ -797,8 +1080,7 @@ LocalActorStreamResult runLocalActorStreamWindowKeySum(const std::vector
& } } - std::unordered_map sums; - std::vector ordered_keys; + Table merged_partials; size_t next_pending = 0; size_t inflight = 0; std::unordered_map inflight_input_shared_memory; @@ -929,7 +1211,11 @@ LocalActorStreamResult runLocalActorStreamWindowKeySum(const std::vector
& result.worker_serialize_ms += msg.metrics.serialize_ms; const auto merge_started = std::chrono::steady_clock::now(); - mergePartialTable(partial, &sums, &ordered_keys); + if (merged_partials.schema.fields.empty()) { + merged_partials.schema = partial.schema; + } + merged_partials.rows.insert(merged_partials.rows.end(), partial.rows.begin(), + partial.rows.end()); result.coordinator_merge_ms += toMillis(std::chrono::steady_clock::now() - merge_started); ++result.processed_partitions; } else { @@ -943,7 +1229,11 @@ LocalActorStreamResult runLocalActorStreamWindowKeySum(const std::vector
& result.worker_serialize_ms += msg.metrics.serialize_ms; const auto merge_started = std::chrono::steady_clock::now(); - mergePartialTable(partial, &sums, &ordered_keys); + if (merged_partials.schema.fields.empty()) { + merged_partials.schema = partial.schema; + } + merged_partials.rows.insert(merged_partials.rows.end(), partial.rows.begin(), + partial.rows.end()); result.coordinator_merge_ms += toMillis(std::chrono::steady_clock::now() - merge_started); ++result.processed_partitions; } @@ -963,7 +1253,7 @@ LocalActorStreamResult runLocalActorStreamWindowKeySum(const std::vector
& } stopWorkers(&workers); - result.final_table = materializeStateTable(sums, ordered_keys); + result.final_table = finalizeMergedPartialAggregates(merged_partials, aggregate); result.elapsed_ms = static_cast( std::chrono::duration_cast( std::chrono::steady_clock::now() - started) @@ -971,6 +1261,15 @@ LocalActorStreamResult runLocalActorStreamWindowKeySum(const std::vector
& return result; } +LocalActorStreamResult runLocalActorStreamWindowKeySum(const std::vector
& batches, + const LocalActorStreamOptions& options) { + LocalGroupedAggregateSpec aggregate; + aggregate.group_keys = {"window_start", "key"}; + aggregate.aggregates.push_back( + {AggregateFunction::Sum, "value", "value_sum", false}); + return runLocalActorStreamGroupedAggregate(batches, aggregate, options); +} + LocalActorStreamResult runAutoLocalActorStreamWindowKeySum( const std::vector
& batches, const LocalActorStreamOptions& actor_options, const LocalExecutionAutoOptions& auto_options, LocalExecutionDecision* decision) { @@ -994,11 +1293,15 @@ LocalActorStreamResult runAutoLocalActorStreamWindowKeySum( local_decision.sampled_batches = sampled_batches; const std::vector
sample_batches_vec(batches.begin(), batches.begin() + sampled_batches); + LocalGroupedAggregateSpec aggregate; + aggregate.group_keys = {"window_start", "key"}; + aggregate.aggregates.push_back( + {AggregateFunction::Sum, "value", "value_sum", false}); const LocalActorStreamResult sample_single = measureSingleProcessWindowKeySum(sample_batches_vec, actor_options.cpu_spin_per_row); const LocalActorStreamResult sample_actor = - runLocalActorStreamWindowKeySum(sample_batches_vec, actor_options); + runLocalActorStreamGroupedAggregate(sample_batches_vec, aggregate, actor_options); const uint64_t total_projected_bytes = sample_actor.input_payload_bytes + sample_actor.input_shared_memory_bytes; @@ -1056,7 +1359,7 @@ LocalActorStreamResult runAutoLocalActorStreamWindowKeySum( : sample_single; } if (local_decision.chosen_mode == LocalExecutionMode::ActorCredit) { - return runLocalActorStreamWindowKeySum(batches, actor_options); + return runLocalActorStreamGroupedAggregate(batches, aggregate, actor_options); } return measureSingleProcessWindowKeySum(batches, actor_options.cpu_spin_per_row); } diff --git a/src/dataflow/experimental/stream/actor_stream_runtime.h b/src/dataflow/experimental/stream/actor_stream_runtime.h index 14bb1de..fa09233 100644 --- a/src/dataflow/experimental/stream/actor_stream_runtime.h +++ b/src/dataflow/experimental/stream/actor_stream_runtime.h @@ -5,6 +5,7 @@ #include #include +#include "src/dataflow/core/logical/planner/plan.h" #include "src/dataflow/core/execution/table.h" namespace dataflow { @@ -67,7 +68,25 @@ struct LocalExecutionDecision { std::string reason; }; +struct LocalGroupedAggregateSpec { + struct Aggregate { + AggregateFunction function = AggregateFunction::Sum; + std::string value_column; + std::string output_column; + bool is_count_star = false; + }; + + std::vector group_keys; + std::vector aggregates; +}; + const char* localExecutionModeName(LocalExecutionMode mode); +Table runSingleProcessGroupedAggregate(const std::vector
& batches, + const LocalGroupedAggregateSpec& aggregate, + size_t cpu_spin_per_row = 0); +LocalActorStreamResult runLocalActorStreamGroupedAggregate( + const std::vector
& batches, const LocalGroupedAggregateSpec& aggregate, + const LocalActorStreamOptions& options); Table runSingleProcessWindowKeySum(const std::vector
& batches, size_t cpu_spin_per_row = 0); LocalActorStreamResult runLocalActorStreamWindowKeySum(const std::vector
& batches, diff --git a/src/dataflow/interop/python/python_module.cc b/src/dataflow/interop/python/python_module.cc index c3993a3..4af3c49 100644 --- a/src/dataflow/interop/python/python_module.cc +++ b/src/dataflow/interop/python/python_module.cc @@ -985,14 +985,31 @@ PyObject* sessionNew(PyTypeObject* type, PyObject*, PyObject*) { return reinterpret_cast(self); } +df::StreamingExecutionMode parseExecutionMode(const char* execution_mode) { + const std::string mode = execution_mode == nullptr ? "single-process" : execution_mode; + if (mode == "single-process") { + return df::StreamingExecutionMode::SingleProcess; + } + if (mode == "local-workers") { + return df::StreamingExecutionMode::LocalWorkers; + } + throw std::runtime_error("execution_mode must be 'single-process' or 'local-workers'"); +} + df::StreamingQueryOptions parseQueryOptions(uint64_t trigger_interval_ms, const char* checkpoint_path, - const char* checkpoint_delivery_mode) { + const char* checkpoint_delivery_mode, + const char* execution_mode = "single-process", + unsigned long long local_workers = 1, + unsigned long long max_inflight_partitions = 0) { df::StreamingQueryOptions options; options.trigger_interval_ms = trigger_interval_ms; if (checkpoint_path != nullptr) { options.checkpoint_path = checkpoint_path; } + options.execution_mode = parseExecutionMode(execution_mode); + options.local_workers = static_cast(local_workers); + options.max_inflight_partitions = static_cast(max_inflight_partitions); const std::string mode = checkpoint_delivery_mode == nullptr ? "at-least-once" : checkpoint_delivery_mode; if (mode == "at-least-once") { @@ -1120,11 +1137,16 @@ PyObject* sessionStartStreamSql(PyVelariaSession* self, PyObject* args, PyObject unsigned long long trigger_interval_ms = 1000; const char* checkpoint_path = ""; const char* checkpoint_delivery_mode = "at-least-once"; + const char* execution_mode = "single-process"; + unsigned long long local_workers = 1; + unsigned long long max_inflight_partitions = 0; static const char* kwlist[] = {"sql", "trigger_interval_ms", "checkpoint_path", - "checkpoint_delivery_mode", nullptr}; - if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s|Kss", const_cast(kwlist), &sql, + "checkpoint_delivery_mode", "execution_mode", + "local_workers", "max_inflight_partitions", nullptr}; + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s|KsssKK", const_cast(kwlist), &sql, &trigger_interval_ms, &checkpoint_path, - &checkpoint_delivery_mode)) { + &checkpoint_delivery_mode, &execution_mode, &local_workers, + &max_inflight_partitions)) { return nullptr; } std::unique_ptr out; @@ -1132,7 +1154,8 @@ PyObject* sessionStartStreamSql(PyVelariaSession* self, PyObject* args, PyObject AllowThreads allow; out = std::make_unique(self->session->startStreamSql( sql, parseQueryOptions(static_cast(trigger_interval_ms), checkpoint_path, - checkpoint_delivery_mode))); + checkpoint_delivery_mode, execution_mode, local_workers, + max_inflight_partitions))); } return wrapStreamingQuery(std::move(*out)); }); @@ -1144,11 +1167,16 @@ PyObject* sessionExplainStreamSql(PyVelariaSession* self, PyObject* args, PyObje unsigned long long trigger_interval_ms = 1000; const char* checkpoint_path = ""; const char* checkpoint_delivery_mode = "at-least-once"; + const char* execution_mode = "single-process"; + unsigned long long local_workers = 1; + unsigned long long max_inflight_partitions = 0; static const char* kwlist[] = {"sql", "trigger_interval_ms", "checkpoint_path", - "checkpoint_delivery_mode", nullptr}; - if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s|Kss", const_cast(kwlist), &sql, + "checkpoint_delivery_mode", "execution_mode", + "local_workers", "max_inflight_partitions", nullptr}; + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s|KsssKK", const_cast(kwlist), &sql, &trigger_interval_ms, &checkpoint_path, - &checkpoint_delivery_mode)) { + &checkpoint_delivery_mode, &execution_mode, &local_workers, + &max_inflight_partitions)) { return nullptr; } std::string explain; @@ -1156,7 +1184,8 @@ PyObject* sessionExplainStreamSql(PyVelariaSession* self, PyObject* args, PyObje AllowThreads allow; explain = self->session->explainStreamSql( sql, parseQueryOptions(static_cast(trigger_interval_ms), checkpoint_path, - checkpoint_delivery_mode)); + checkpoint_delivery_mode, execution_mode, local_workers, + max_inflight_partitions)); } return PyUnicode_FromStringAndSize(explain.c_str(), static_cast(explain.size())); diff --git a/src/dataflow/tests/sql_regression_test.cc b/src/dataflow/tests/sql_regression_test.cc index 5b88dbc..2087ea0 100644 --- a/src/dataflow/tests/sql_regression_test.cc +++ b/src/dataflow/tests/sql_regression_test.cc @@ -362,9 +362,36 @@ void runStreamSqlRegression() { "(key STRING, value_sum INT, event_count INT, min_value INT, max_value INT, avg_value DOUBLE) " "USING csv OPTIONS(path '" + multi_sink_path + "', delimiter ',')"); + const std::string multi_hot_sink_path = "/tmp/velaria-stream-sql-multi-aggregate-hot-output.csv"; + fs::remove(multi_hot_sink_path); + s.sql( + "CREATE SINK TABLE stream_multi_hot_summary_v1 " + "(key STRING, value_sum INT, event_count INT, min_value INT, max_value INT, avg_value DOUBLE) " + "USING csv OPTIONS(path '" + + multi_hot_sink_path + "', delimiter ',')"); dataflow::StreamingQueryOptions multi_options; multi_options.trigger_interval_ms = 0; + multi_options.execution_mode = dataflow::StreamingExecutionMode::LocalWorkers; + multi_options.local_workers = 2; + multi_options.max_inflight_partitions = 2; + + auto multi_hot_query = s.startStreamSql( + "INSERT INTO stream_multi_hot_summary_v1 " + "SELECT key, SUM(value) AS value_sum, COUNT(*) AS event_count, " + "MIN(value) AS min_value, MAX(value) AS max_value, AVG(value) AS avg_value " + "FROM stream_multi_events_v1 " + "GROUP BY key", + multi_options); + expect(multi_hot_query.awaitTermination(1) == 1, + "stream_sql_multi_aggregate_hot_processed_batches"); + expect(multi_hot_query.progress().execution_mode == "local-workers", + "stream_sql_multi_aggregate_hot_local_workers_mode"); + expect(multi_hot_query.progress().used_actor_runtime, + "stream_sql_multi_aggregate_hot_credit_accelerator_used"); + + const auto multi_hot_sink_table = s.read_csv(multi_hot_sink_path).toTable(); + expect(multi_hot_sink_table.rows.size() == 2, "stream_sql_multi_aggregate_hot_rows"); auto multi_query = s.startStreamSql( "INSERT INTO stream_multi_summary_v1 " diff --git a/src/dataflow/tests/stream_actor_credit_test.cc b/src/dataflow/tests/stream_actor_credit_test.cc index ff7cd08..aec2bdc 100644 --- a/src/dataflow/tests/stream_actor_credit_test.cc +++ b/src/dataflow/tests/stream_actor_credit_test.cc @@ -1,3 +1,4 @@ +#include #include #include #include @@ -13,11 +14,11 @@ std::vector makeBatches() { std::vector batches; for (size_t batch = 0; batch < 6; ++batch) { dataflow::Table table; - table.schema = dataflow::Schema({"window_start", "key", "value"}); - const std::string window = "2026-03-28T09:0" + std::to_string(static_cast(batch % 3)); + table.schema = dataflow::Schema({"segment", "bucket", "value"}); + const int64_t bucket = static_cast((batch % 3) + 1); for (size_t row = 0; row < 64; ++row) { - table.rows.push_back({dataflow::Value(window), - dataflow::Value("user_" + std::to_string(row % 8)), + table.rows.push_back({dataflow::Value("group_" + std::to_string(row % 4)), + dataflow::Value(bucket), dataflow::Value(int64_t((row % 5) + 1))}); } batches.push_back(std::move(table)); @@ -25,13 +26,31 @@ std::vector makeBatches() { return batches; } -std::unordered_map toMap(const dataflow::Table& table) { - std::unordered_map out; - const auto window_idx = table.schema.indexOf("window_start"); - const auto key_idx = table.schema.indexOf("key"); - const auto value_idx = table.schema.indexOf("value_sum"); +struct AggregateRow { + int64_t sum = 0; + int64_t count = 0; + int64_t min = 0; + int64_t max = 0; + double avg = 0.0; +}; + +std::unordered_map toMap(const dataflow::Table& table) { + std::unordered_map out; + const auto segment_idx = table.schema.indexOf("segment"); + const auto bucket_idx = table.schema.indexOf("bucket"); + const auto sum_idx = table.schema.indexOf("value_sum"); + const auto count_idx = table.schema.indexOf("event_count"); + const auto min_idx = table.schema.indexOf("min_value"); + const auto max_idx = table.schema.indexOf("max_value"); + const auto avg_idx = table.schema.indexOf("avg_value"); for (const auto& row : table.rows) { - out[row[window_idx].toString() + "|" + row[key_idx].toString()] = row[value_idx].asDouble(); + out[row[segment_idx].toString() + "|" + row[bucket_idx].toString()] = AggregateRow{ + row[sum_idx].asInt64(), + row[count_idx].asInt64(), + row[min_idx].asInt64(), + row[max_idx].asInt64(), + row[avg_idx].asDouble(), + }; } return out; } @@ -42,18 +61,43 @@ void expect(bool condition, const std::string& message) { } } +void expectAggregateMapsEqual(const std::unordered_map& lhs, + const std::unordered_map& rhs, + const std::string& message) { + expect(lhs.size() == rhs.size(), message + ": size"); + for (const auto& entry : lhs) { + const auto it = rhs.find(entry.first); + expect(it != rhs.end(), message + ": missing key"); + expect(entry.second.sum == it->second.sum, message + ": sum"); + expect(entry.second.count == it->second.count, message + ": count"); + expect(entry.second.min == it->second.min, message + ": min"); + expect(entry.second.max == it->second.max, message + ": max"); + expect(std::fabs(entry.second.avg - it->second.avg) < 1e-9, message + ": avg"); + } +} + } // namespace int main() { try { const auto batches = makeBatches(); - const auto baseline = dataflow::runSingleProcessWindowKeySum(batches); + dataflow::LocalGroupedAggregateSpec aggregate; + aggregate.group_keys = {"segment", "bucket"}; + aggregate.aggregates = { + {dataflow::AggregateFunction::Sum, "value", "value_sum", false}, + {dataflow::AggregateFunction::Count, "", "event_count", true}, + {dataflow::AggregateFunction::Min, "value", "min_value", false}, + {dataflow::AggregateFunction::Max, "value", "max_value", false}, + {dataflow::AggregateFunction::Avg, "value", "avg_value", false}, + }; + + const auto baseline = dataflow::runSingleProcessGroupedAggregate(batches, aggregate); dataflow::LocalActorStreamOptions options; options.worker_count = 3; options.max_inflight_partitions = 2; options.worker_delay_ms = 20; - const auto actor = dataflow::runLocalActorStreamWindowKeySum(batches, options); + const auto actor = dataflow::runLocalActorStreamGroupedAggregate(batches, aggregate, options); expect(actor.processed_batches == batches.size(), "actor result should process all batches"); expect(actor.processed_partitions == batches.size() * options.worker_count, @@ -61,7 +105,8 @@ int main() { expect(actor.max_inflight_partitions <= options.max_inflight_partitions, "credit controller should bound inflight partitions"); expect(actor.blocked_count > 0, "credit controller should observe blocking under delay"); - expect(toMap(actor.final_table) == toMap(baseline), "actor result should match baseline aggregate"); + expectAggregateMapsEqual(toMap(actor.final_table), toMap(baseline), + "actor result should match grouped aggregate baseline"); std::cout << "[test] stream actor credit ok" << std::endl; return 0; diff --git a/src/dataflow/tests/stream_strategy_explain_test.cc b/src/dataflow/tests/stream_strategy_explain_test.cc index 7d935ce..1a46854 100644 --- a/src/dataflow/tests/stream_strategy_explain_test.cc +++ b/src/dataflow/tests/stream_strategy_explain_test.cc @@ -1,3 +1,5 @@ +#include +#include #include #include #include @@ -17,6 +19,7 @@ using dataflow::StreamingExecutionMode; using dataflow::StreamingQueryOptions; using dataflow::Table; using dataflow::Value; +namespace fs = std::filesystem; void expect(bool condition, const std::string& message) { if (!condition) { @@ -48,6 +51,19 @@ Table makeNonHotBatch() { return table; } +Table makeGenericMultiAggregateBatch() { + Table table; + table.schema = dataflow::Schema({"segment", "bucket", "value"}); + table.rows = { + {Value("alpha"), Value(int64_t(1)), Value(int64_t(10))}, + {Value("alpha"), Value(int64_t(1)), Value(int64_t(14))}, + {Value("alpha"), Value(int64_t(2)), Value(int64_t(3))}, + {Value("beta"), Value(int64_t(1)), Value(int64_t(6))}, + {Value("beta"), Value(int64_t(1)), Value(int64_t(8))}, + }; + return table; +} + std::unordered_map sumTableToMap(const Table& table) { std::unordered_map out; const auto window_idx = table.schema.indexOf("window_start"); @@ -70,6 +86,51 @@ std::unordered_map countTableToMap(const Table& table) { return out; } +struct AggregateRow { + int64_t sum = 0; + int64_t count = 0; + int64_t min = 0; + int64_t max = 0; + double avg = 0.0; +}; + +std::unordered_map multiAggregateTableToMap(const Table& table) { + std::unordered_map out; + const auto segment_idx = table.schema.indexOf("segment"); + const auto bucket_idx = table.schema.indexOf("bucket"); + const auto sum_idx = table.schema.indexOf("value_sum"); + const auto count_idx = table.schema.indexOf("event_count"); + const auto min_idx = table.schema.indexOf("min_value"); + const auto max_idx = table.schema.indexOf("max_value"); + const auto avg_idx = table.schema.indexOf("avg_value"); + for (const auto& row : table.rows) { + out[row[segment_idx].toString() + "|" + row[bucket_idx].toString()] = AggregateRow{ + row[sum_idx].asInt64(), + row[count_idx].asInt64(), + row[min_idx].asInt64(), + row[max_idx].asInt64(), + row[avg_idx].asDouble(), + }; + } + return out; +} + +void expectAggregateMapsEqual(const std::unordered_map& lhs, + const std::unordered_map& rhs, + const std::string& message) { + expect(lhs.size() == rhs.size(), message + ": size"); + for (const auto& entry : lhs) { + const auto it = rhs.find(entry.first); + expect(it != rhs.end(), message + ": missing key " + entry.first); + expect(entry.second.sum == it->second.sum, message + ": sum mismatch for " + entry.first); + expect(entry.second.count == it->second.count, message + ": count mismatch for " + entry.first); + expect(entry.second.min == it->second.min, message + ": min mismatch for " + entry.first); + expect(entry.second.max == it->second.max, message + ": max mismatch for " + entry.first); + expect(std::fabs(entry.second.avg - it->second.avg) < 1e-9, + message + ": avg mismatch for " + entry.first); + } +} + struct QueryRun { Table table; dataflow::StreamingQueryProgress progress; @@ -120,6 +181,10 @@ void testExplainStreamSql() { session.createTempView( "strategy_hot_events", session.readStream(std::make_shared(std::vector
{makeHotPathBatch()}))); + session.createTempView( + "strategy_generic_events", + session.readStream( + std::make_shared(std::vector
{makeGenericMultiAggregateBatch()}))); session.sql( "CREATE SINK TABLE strategy_hot_sink (window_start STRING, key STRING, value_sum INT) " "USING csv OPTIONS(path '/tmp/velaria-stream-strategy-explain.csv', delimiter ',')"); @@ -128,7 +193,7 @@ void testExplainStreamSql() { "USING csv OPTIONS(path '/tmp/velaria-stream-strategy-count-explain.csv', delimiter ',')"); session.sql( "CREATE SINK TABLE strategy_multi_sink " - "(window_start STRING, key STRING, event_count INT, avg_value DOUBLE) " + "(segment STRING, bucket INT, value_sum INT, event_count INT, min_value INT, max_value INT, avg_value DOUBLE) " "USING csv OPTIONS(path '/tmp/velaria-stream-strategy-multi-explain.csv', delimiter ',')"); StreamingQueryOptions options; @@ -141,7 +206,6 @@ void testExplainStreamSql() { "SELECT window_start, key, SUM(value) AS value_sum " "FROM strategy_hot_events GROUP BY window_start, key", options); - expect(explain.find("logical\n") != std::string::npos, "explain should contain logical section"); expect(explain.find("physical\n") != std::string::npos, "explain should contain physical section"); expect(explain.find("strategy\n") != std::string::npos, "explain should contain strategy section"); @@ -166,13 +230,16 @@ void testExplainStreamSql() { const std::string multi_explain = session.explainStreamSql( "INSERT INTO strategy_multi_sink " - "SELECT window_start, key, COUNT(*) AS event_count, AVG(value) AS avg_value " - "FROM strategy_hot_events GROUP BY window_start, key", + "SELECT segment, bucket, SUM(value) AS value_sum, COUNT(*) AS event_count, " + "MIN(value) AS min_value, MAX(value) AS max_value, AVG(value) AS avg_value " + "FROM strategy_generic_events GROUP BY segment, bucket", options); expect(multi_explain.find("AVG(value) AS avg_value") != std::string::npos, "multi explain should describe avg aggregate"); - expect(multi_explain.find("actor_eligible=false") != std::string::npos, - "multi explain should disable actor eligibility for multi aggregate"); + expect(multi_explain.find("Aggregate keys=[segment, bucket]") != std::string::npos, + "multi explain should describe generic aggregate keys"); + expect(multi_explain.find("actor_eligible=true") != std::string::npos, + "multi explain should enable actor eligibility for grouped multi aggregate"); const std::string having_explain = session.explainStreamSql( "INSERT INTO strategy_hot_sink " @@ -196,7 +263,6 @@ void testExecutionModeConsistency() { const auto baseline = sumTableToMap(single.table); expect(sumTableToMap(local.table) == baseline, "local-workers result should match single-process"); - expect(single.progress.execution_mode == "single-process", "single-process progress should keep single-process mode"); expect(local.progress.execution_mode == "local-workers", @@ -206,6 +272,7 @@ void testExecutionModeConsistency() { expect(local.progress.transport_mode == "shared-memory" || local.progress.transport_mode == "rpc-copy", "eligible local-workers hot path should expose accelerator transport mode"); + const auto count_single = runCountHotPath(StreamingExecutionMode::SingleProcess); const auto count_local = runCountHotPath(StreamingExecutionMode::LocalWorkers); expect(countTableToMap(count_local.table) == countTableToMap(count_single.table), @@ -216,6 +283,63 @@ void testExecutionModeConsistency() { "count local-workers hot path should use the credit accelerator"); } +void testMultiAggregateHotPathWithGenericKeys() { + DataflowSession& session = DataflowSession::builder(); + fs::remove("/tmp/velaria-stream-strategy-generic-single.csv"); + fs::remove("/tmp/velaria-stream-strategy-generic-local.csv"); + session.createTempView( + "strategy_generic_multi_events_single", + session.readStream( + std::make_shared(std::vector
{makeGenericMultiAggregateBatch()}))); + session.createTempView( + "strategy_generic_multi_events_local", + session.readStream( + std::make_shared(std::vector
{makeGenericMultiAggregateBatch()}))); + session.sql( + "CREATE SINK TABLE strategy_generic_multi_single " + "(segment STRING, bucket INT, value_sum INT, event_count INT, min_value INT, max_value INT, avg_value DOUBLE) " + "USING csv OPTIONS(path '/tmp/velaria-stream-strategy-generic-single.csv', delimiter ',')"); + session.sql( + "CREATE SINK TABLE strategy_generic_multi_local " + "(segment STRING, bucket INT, value_sum INT, event_count INT, min_value INT, max_value INT, avg_value DOUBLE) " + "USING csv OPTIONS(path '/tmp/velaria-stream-strategy-generic-local.csv', delimiter ',')"); + + StreamingQueryOptions single_options; + single_options.trigger_interval_ms = 0; + single_options.execution_mode = StreamingExecutionMode::SingleProcess; + + StreamingQueryOptions local_options; + local_options.trigger_interval_ms = 0; + local_options.execution_mode = StreamingExecutionMode::LocalWorkers; + local_options.local_workers = 4; + local_options.max_inflight_partitions = 4; + + auto single_query = session.startStreamSql( + "INSERT INTO strategy_generic_multi_single " + "SELECT segment, bucket, SUM(value) AS value_sum, COUNT(*) AS event_count, " + "MIN(value) AS min_value, MAX(value) AS max_value, AVG(value) AS avg_value " + "FROM strategy_generic_multi_events_single GROUP BY segment, bucket", + single_options); + auto local_query = session.startStreamSql( + "INSERT INTO strategy_generic_multi_local " + "SELECT segment, bucket, SUM(value) AS value_sum, COUNT(*) AS event_count, " + "MIN(value) AS min_value, MAX(value) AS max_value, AVG(value) AS avg_value " + "FROM strategy_generic_multi_events_local GROUP BY segment, bucket", + local_options); + expect(single_query.awaitTermination(1) == 1, "single-process multi aggregate should process one batch"); + expect(local_query.awaitTermination(1) == 1, "local-workers multi aggregate should process one batch"); + + const auto single_table = session.read_csv("/tmp/velaria-stream-strategy-generic-single.csv").toTable(); + const auto local_table = session.read_csv("/tmp/velaria-stream-strategy-generic-local.csv").toTable(); + expectAggregateMapsEqual(multiAggregateTableToMap(local_table), + multiAggregateTableToMap(single_table), + "generic multi aggregate output should match single-process"); + expect(local_query.progress().execution_mode == "local-workers", + "generic multi aggregate should stay in local-workers mode"); + expect(local_query.progress().used_actor_runtime, + "generic multi aggregate should use the credit accelerator"); +} + void testLocalWorkersFallbackForNonHotPath() { DataflowSession& session = DataflowSession::builder(); auto sink = std::make_shared(); @@ -230,6 +354,7 @@ void testLocalWorkersFallbackForNonHotPath() { .withStateStore(dataflow::makeMemoryStateStore()) .groupBy({"key"}) .count(true, "event_count") + .filter("event_count", ">", Value(int64_t(0))) .writeStream(sink, options); query.start(); expect(query.awaitTermination() == 1, "non-hot-path query should process one batch"); @@ -278,6 +403,7 @@ int main() { try { testExplainStreamSql(); testExecutionModeConsistency(); + testMultiAggregateHotPathWithGenericKeys(); testLocalWorkersFallbackForNonHotPath(); testLocalWorkersSingleWorkerDisablesCreditAcceleration(); std::cout << "[test] stream strategy explain ok" << std::endl;