diff --git a/README-zh.md b/README-zh.md index ed21abe..d675571 100644 --- a/README-zh.md +++ b/README-zh.md @@ -166,7 +166,7 @@ examples 与 helper scripts 只用于说明各层,不定义各层。 - 本地 batch + streaming 共用一个内核 - `read_csv`, `readStream(...)`, `readStreamCsvDir(...)` - query-local 反压、有界 backlog、progress snapshot、checkpoint path -- 执行模式:`single-process`、`local-workers`、`actor-credit`、`auto` +- 执行模式:`single-process`、`local-workers` - 文件 source/sink - 基础 streaming operators:`select / filter / withColumn / drop / limit / window` - stateful streaming 聚合:`sum / count / min / max / avg` diff --git a/README.md b/README.md index f1c5a46..70e2910 100644 --- a/README.md +++ b/README.md @@ -166,7 +166,7 @@ Available today: - local batch + streaming execution through one kernel - `read_csv`, `readStream(...)`, `readStreamCsvDir(...)` - query-local backpressure, bounded backlog, progress snapshots, checkpoint path -- execution modes: `single-process`, `local-workers`, `actor-credit`, `auto` +- execution modes: `single-process`, `local-workers` - file source/sink support - basic stream operators: `select / filter / withColumn / drop / limit / window` - stateful stream aggregates: `sum / count / min / max / avg` diff --git a/docs/streaming_runtime_design.md b/docs/streaming_runtime_design.md index 1233aa4..ca31418 100644 --- a/docs/streaming_runtime_design.md +++ b/docs/streaming_runtime_design.md @@ -19,15 +19,11 @@ This document describes runtime internals and current implementation shape. For - `SingleProcess` - `LocalWorkers` -- `ActorCredit` -- `Auto` 语义: - `SingleProcess`:所有 batch 在本进程内执行。 -- `LocalWorkers`:partition-local 阶段使用本进程线程并行,barrier 阶段仍在本地汇总。 -- `ActorCredit`:仅对明确支持的热路径下推到本地 actor-stream runtime。 -- `Auto`:先采样,再在 `SingleProcess` 和 `ActorCredit` 之间做一次 query-local 选择。 +- `LocalWorkers`:普通路径下使用本进程线程并行;对明确支持的热路径,可在同一 mode 下启用 credit-based 本地加速。 ## Strategy Decision and Explain @@ -59,7 +55,7 @@ This document describes runtime internals and current implementation shape. For ## Current Actor Pushdown Boundary -当前只有两类 query 会被 `ActorCredit` / `Auto` 接住: +当前只有少数固定形状的 query 会被 `LocalWorkers` 内部的 credit-based hot path 接住: - 前置变换全部为 `PartitionLocal` - 最后一个 barrier 是以下其一: @@ -73,7 +69,7 @@ This document describes runtime internals and current implementation shape. For - `value` 的窗口分组求和热路径 - `COUNT(*)` 的窗口分组计数热路径 -`MIN / MAX / AVG` 以及多 aggregate 输出当前仍走本地执行链;`Auto` 会明确写出 fallback reason。 +`MIN / MAX / AVG` 以及多 aggregate 输出当前仍走本地执行链;`LocalWorkers` 会明确写出 fallback reason。 如果 query 不满足这个形状,`StreamingQuery` 会回退到普通执行链,并把原因写入: @@ -84,37 +80,6 @@ This document describes runtime internals and current implementation shape. For - `SingleProcess` - `LocalWorkers` -- `ActorCredit` -- `Auto` - -其中 `Auto` 若未命中热路径,必须稳定回退到 `SingleProcess` 并给出明确 reason。 - -## Auto Selection Rule - -`Auto` 当前采用“前若干批采样”的启发式规则。 - -采样输出: - -- `rows_per_batch` -- `average_projected_payload_bytes` -- `single_rows_per_s` -- `actor_rows_per_s` -- `actor_speedup` -- `compute_to_overhead_ratio` - -默认阈值: - -- `sample_batches = 2` -- `min_rows_per_batch = 64K` -- `min_projected_payload_bytes = 256KB` -- `min_compute_to_overhead_ratio = 1.5` -- `min_actor_speedup = 1.05` -- `strong_actor_speedup = 1.25` - -选择规则: - -- 常规情况下,要求 `rows`、`payload`、`speedup`、`compute/overhead` 都满足阈值。 -- 若 `actor_speedup` 已经足够强,则允许越过 `compute/overhead` 的软阈值。 ## Shared Memory Transport @@ -182,19 +147,17 @@ actor-stream payload 当前使用 typed binary batch: ## Known Limits -- `ActorCredit` 还不是通用执行器,只是定向热路径。 +- credit-based local acceleration 还不是通用执行器,只是定向热路径。 - 最终状态回并仍在 coordinator 本地完成。 -- query 级 `Auto` 当前阈值更接近“保守正确”,还不是最终调优状态。 - `split_ms / merge_ms` 仍是毫秒级指标,对极短阶段不够敏感。 - SQL 路径仍未自动下推到 actor runtime;当前优化主要落在 streaming 执行内核。 - 同机 observability 仍是 experiment profile,不是完整 distributed telemetry 体系。 ## Recommended Next Steps -1. 扩展 actor pushdown 到更多 group aggregate 与多 aggregate 输出。 -2. 调整 query 级 `Auto` 阈值,使其更贴近真实 `StreamingQuery` workload。 -3. 给 query 级 progress 增加更细粒度的 actor 指标拆分。 -4. 继续把 source 到执行内核的中间 `Table/Row/Value` 转换收紧到更早的列式表示。 +1. 扩展 credit-based local acceleration 到 `count` 和更多 group aggregate。 +2. 给 query 级 progress 增加更细粒度的 accelerator 指标拆分。 +3. 继续把 source 到执行内核的中间 `Table/Row/Value` 转换收紧到更早的列式表示。 ## Source/Sink ABI Bridge (v0.4) diff --git a/src/dataflow/core/contract/api/session.cc b/src/dataflow/core/contract/api/session.cc index bd6d8ac..397f36f 100644 --- a/src/dataflow/core/contract/api/session.cc +++ b/src/dataflow/core/contract/api/session.cc @@ -45,24 +45,16 @@ std::string formatStreamStrategyExplain(const StreamingStrategyDecision& strateg out << "backpressure_high_watermark=" << strategy.backpressure_high_watermark << "\n"; out << "backpressure_low_watermark=" << strategy.backpressure_low_watermark << "\n"; out << "checkpoint_delivery_mode=" << strategy.checkpoint_delivery_mode << "\n"; - out << "actor_workers=" << std::max(2, options.effectiveActorWorkers()) << "\n"; - out << "actor_max_inflight_partitions=" - << std::max(1, options.actor_max_inflight_partitions > 0 - ? options.actor_max_inflight_partitions - : options.effectiveActorWorkers()) + out << "local_worker_count=" << std::max(1, options.effectiveLocalWorkers()) << "\n"; + out << "max_inflight_partitions=" + << std::max(1, options.max_inflight_partitions > 0 + ? options.max_inflight_partitions + : options.effectiveLocalWorkers()) << "\n"; - out << "actor_shared_memory_transport=" - << (options.actor_shared_memory_transport ? "true" : "false") << "\n"; - out << "actor_shared_memory_min_payload_bytes=" - << options.actor_shared_memory_min_payload_bytes << "\n"; - out << "auto_sample_batches=" << options.actor_auto_options.sample_batches << "\n"; - out << "auto_min_rows_per_batch=" << options.actor_auto_options.min_rows_per_batch << "\n"; - out << "auto_min_projected_payload_bytes=" - << options.actor_auto_options.min_projected_payload_bytes << "\n"; - out << "auto_min_compute_to_overhead_ratio=" - << options.actor_auto_options.min_compute_to_overhead_ratio << "\n"; - out << "auto_min_actor_speedup=" << options.actor_auto_options.min_actor_speedup << "\n"; - out << "auto_strong_actor_speedup=" << options.actor_auto_options.strong_actor_speedup << "\n"; + out << "shared_memory_transport=" + << (options.shared_memory_transport ? "true" : "false") << "\n"; + out << "shared_memory_min_payload_bytes=" + << options.shared_memory_min_payload_bytes << "\n"; return out.str(); } diff --git a/src/dataflow/core/execution/stream/stream.cc b/src/dataflow/core/execution/stream/stream.cc index 39b298b..e45b289 100644 --- a/src/dataflow/core/execution/stream/stream.cc +++ b/src/dataflow/core/execution/stream/stream.cc @@ -333,10 +333,6 @@ const char* streamingExecutionModeName(StreamingExecutionMode mode) { return "single-process"; case StreamingExecutionMode::LocalWorkers: return "local-workers"; - case StreamingExecutionMode::ActorCredit: - return "actor-credit"; - case StreamingExecutionMode::Auto: - return "auto"; } return "single-process"; } @@ -395,32 +391,21 @@ struct ActorAccelerationAnalysis { bool eligible = false; size_t transform_index = 0; StreamAcceleratorSpec accelerator; - std::string reason = "query plan is not eligible for actor acceleration"; + std::string reason = "query plan is not eligible for local-worker credit acceleration"; }; -LocalActorStreamOptions makeActorOptions(const StreamingQueryOptions& options) { +LocalActorStreamOptions makeAcceleratorOptions(const StreamingQueryOptions& options) { LocalActorStreamOptions actor; - actor.worker_count = std::max(2, options.effectiveActorWorkers()); + actor.worker_count = options.effectiveLocalWorkers(); actor.max_inflight_partitions = - std::max(1, options.actor_max_inflight_partitions > 0 - ? options.actor_max_inflight_partitions + std::max(1, options.max_inflight_partitions > 0 + ? options.max_inflight_partitions : actor.worker_count); - actor.shared_memory_transport = options.actor_shared_memory_transport; - actor.shared_memory_min_payload_bytes = options.actor_shared_memory_min_payload_bytes; + actor.shared_memory_transport = options.shared_memory_transport; + actor.shared_memory_min_payload_bytes = options.shared_memory_min_payload_bytes; return actor; } -LocalExecutionAutoOptions makeActorAutoOptions(const StreamingAutoExecutionOptions& options) { - LocalExecutionAutoOptions out; - out.sample_batches = options.sample_batches; - out.min_rows_per_batch = options.min_rows_per_batch; - out.min_projected_payload_bytes = options.min_projected_payload_bytes; - out.min_compute_to_overhead_ratio = options.min_compute_to_overhead_ratio; - out.min_actor_speedup = options.min_actor_speedup; - out.strong_actor_speedup = options.strong_actor_speedup; - return out; -} - bool findActorAcceleratorTransform(const std::vector& transforms, size_t* index, StreamAcceleratorSpec* accelerator) { if (index == nullptr || accelerator == nullptr) return false; @@ -795,23 +780,20 @@ StreamingStrategyDecision describeStreamingStrategy(const StreamingDataFrame& ro const auto actor = analyzeActorAcceleration(root.transforms_); decision.actor_eligible = actor.eligible; - decision.reason = actor.reason; + decision.reason = actor.eligible ? "configured single-process execution" + : actor.reason; if (options.execution_mode == StreamingExecutionMode::SingleProcess) { decision.reason = "configured single-process execution"; } else if (options.execution_mode == StreamingExecutionMode::LocalWorkers) { - decision.reason = "configured local-workers execution"; - } else if (options.execution_mode == StreamingExecutionMode::ActorCredit) { - if (actor.eligible) { - decision.reason = "configured actor-credit execution"; - } else { - decision.resolved_execution_mode = streamingExecutionModeName(StreamingExecutionMode::SingleProcess); - } - } else if (options.execution_mode == StreamingExecutionMode::Auto) { - if (actor.eligible) { - decision.reason = "actor-eligible query requires runtime auto sampling"; + if (actor.eligible && options.effectiveLocalWorkers() > 1) { + decision.reason = + "configured local-workers execution; credit-based scheduling is available for the eligible hot path"; + } else if (actor.eligible) { + decision.reason = + "configured local-workers execution; credit acceleration requires local_workers > 1"; } else { - decision.resolved_execution_mode = streamingExecutionModeName(StreamingExecutionMode::SingleProcess); + decision.reason = "configured local-workers execution"; } } @@ -1865,7 +1847,6 @@ size_t StreamingQuery::awaitTermination(size_t maxBatches) { const StreamAcceleratorSpec actor_accelerator = actor_analysis.accelerator; const bool actor_pipeline_supported = actor_analysis.eligible; strategy_decision_.actor_eligible = actor_analysis.eligible; - strategy_decision_.reason = actor_analysis.reason; applyStrategyDecision(strategy_decision_); std::vector actor_prefix_transforms; if (actor_pipeline_supported) { @@ -2014,101 +1995,42 @@ size_t StreamingQuery::awaitTermination(size_t maxBatches) { size_t partitions_used = 1; Table out; bool used_actor_runtime = false; - const bool wants_actor_mode = - options_.execution_mode == StreamingExecutionMode::ActorCredit || - options_.execution_mode == StreamingExecutionMode::Auto; - if (wants_actor_mode && actor_pipeline_supported) { + const bool credit_acceleration_enabled = + options_.execution_mode == StreamingExecutionMode::LocalWorkers && + actor_pipeline_supported && options_.effectiveLocalWorkers() > 1; + if (credit_acceleration_enabled) { Table actor_input = buildActorAggregateInput( executePartitionStage(envelope.table, actor_prefix_transforms, options_, 1), actor_accelerator); if (canRunActorWindowKeyAggregate(actor_input, actor_accelerator)) { - const auto actor_options = makeActorOptions(options_); - if (options_.execution_mode == StreamingExecutionMode::ActorCredit) { - auto actor_result = - runLocalActorStreamWindowKeySum(std::vector{actor_input}, actor_options); - out = finalizeActorWindowKeyAggregateOutput(std::move(actor_result.final_table), - actor_accelerator, root_.state_, options_, - &state_ms); - partitions_used = std::max(1, actor_result.processed_partitions); - used_actor_runtime = true; - execution_decided_ = true; - resolved_execution_mode_ = StreamingExecutionMode::ActorCredit; - execution_reason_ = "configured actor-credit execution"; - strategy_decision_.resolved_execution_mode = - streamingExecutionModeName(resolved_execution_mode_); - strategy_decision_.reason = execution_reason_; - strategy_decision_.transport_mode = actor_result.used_shared_memory - ? streamingTransportModeName( - StreamingTransportMode::SharedMemory) - : streamingTransportModeName( - StreamingTransportMode::RpcCopy); - strategy_decision_.used_actor_runtime = true; - strategy_decision_.used_shared_memory = actor_result.used_shared_memory; - applyStrategyDecision(strategy_decision_); - } else if (!execution_decided_) { - LocalExecutionDecision decision; - auto actor_result = runAutoLocalActorStreamWindowKeySum(std::vector
{actor_input}, - actor_options, - makeActorAutoOptions(options_.actor_auto_options), - &decision); - resolved_execution_mode_ = - decision.chosen_mode == LocalExecutionMode::ActorCredit - ? StreamingExecutionMode::ActorCredit - : StreamingExecutionMode::SingleProcess; - execution_reason_ = decision.reason; - execution_decided_ = true; - strategy_decision_.resolved_execution_mode = - streamingExecutionModeName(resolved_execution_mode_); - strategy_decision_.reason = execution_reason_; - strategy_decision_.sampled_batches = decision.sampled_batches; - strategy_decision_.sampled_rows_per_batch = decision.rows_per_batch; - strategy_decision_.average_projected_payload_bytes = - decision.average_projected_payload_bytes; - strategy_decision_.actor_speedup = decision.actor_speedup; - strategy_decision_.compute_to_overhead_ratio = - decision.compute_to_overhead_ratio; - if (resolved_execution_mode_ == StreamingExecutionMode::ActorCredit) { - out = finalizeActorWindowKeyAggregateOutput(std::move(actor_result.final_table), - actor_accelerator, root_.state_, options_, - &state_ms); - partitions_used = std::max(1, actor_result.processed_partitions); - used_actor_runtime = true; - strategy_decision_.transport_mode = actor_result.used_shared_memory - ? streamingTransportModeName( - StreamingTransportMode::SharedMemory) - : streamingTransportModeName( - StreamingTransportMode::RpcCopy); - strategy_decision_.used_actor_runtime = true; - strategy_decision_.used_shared_memory = actor_result.used_shared_memory; - } else { - strategy_decision_.transport_mode = - streamingTransportModeName(StreamingTransportMode::InProcess); - strategy_decision_.used_actor_runtime = false; - strategy_decision_.used_shared_memory = false; - } - applyStrategyDecision(strategy_decision_); - } else if (resolved_execution_mode_ == StreamingExecutionMode::ActorCredit) { - auto actor_result = - runLocalActorStreamWindowKeySum(std::vector
{actor_input}, actor_options); - out = finalizeActorWindowKeyAggregateOutput(std::move(actor_result.final_table), - actor_accelerator, root_.state_, options_, - &state_ms); - partitions_used = std::max(1, actor_result.processed_partitions); - used_actor_runtime = true; - strategy_decision_.used_actor_runtime = true; - strategy_decision_.used_shared_memory = actor_result.used_shared_memory; - strategy_decision_.transport_mode = actor_result.used_shared_memory - ? streamingTransportModeName( - StreamingTransportMode::SharedMemory) - : streamingTransportModeName( - StreamingTransportMode::RpcCopy); - applyStrategyDecision(strategy_decision_); - } - } else if (options_.execution_mode == StreamingExecutionMode::Auto && !execution_decided_) { + const auto actor_options = makeAcceleratorOptions(options_); + auto actor_result = + runLocalActorStreamWindowKeySum(std::vector
{actor_input}, actor_options); + out = finalizeActorWindowKeyAggregateOutput(std::move(actor_result.final_table), + actor_accelerator, root_.state_, options_, + &state_ms); + partitions_used = std::max(1, actor_result.processed_partitions); + used_actor_runtime = true; execution_decided_ = true; - resolved_execution_mode_ = StreamingExecutionMode::SingleProcess; + resolved_execution_mode_ = StreamingExecutionMode::LocalWorkers; execution_reason_ = - "actor path requires window_start/key with supported SUM(value) or COUNT(*) shape"; + "configured local-workers execution; using credit-based scheduling for the eligible hot path"; + strategy_decision_.resolved_execution_mode = + streamingExecutionModeName(resolved_execution_mode_); + strategy_decision_.reason = execution_reason_; + strategy_decision_.transport_mode = actor_result.used_shared_memory + ? streamingTransportModeName( + StreamingTransportMode::SharedMemory) + : streamingTransportModeName( + StreamingTransportMode::RpcCopy); + strategy_decision_.used_actor_runtime = true; + strategy_decision_.used_shared_memory = actor_result.used_shared_memory; + applyStrategyDecision(strategy_decision_); + } else if (!execution_decided_) { + execution_decided_ = true; + resolved_execution_mode_ = StreamingExecutionMode::LocalWorkers; + execution_reason_ = + "configured local-workers execution; using generic partition workers because credit scheduling requires window_start/key with supported SUM(value) or COUNT(*) shape"; strategy_decision_.resolved_execution_mode = streamingExecutionModeName(resolved_execution_mode_); strategy_decision_.reason = execution_reason_; @@ -2116,10 +2038,13 @@ size_t StreamingQuery::awaitTermination(size_t maxBatches) { streamingTransportModeName(StreamingTransportMode::InProcess); applyStrategyDecision(strategy_decision_); } - } else if (wants_actor_mode && !actor_pipeline_supported && !execution_decided_) { + } else if (options_.execution_mode == StreamingExecutionMode::LocalWorkers && + !execution_decided_) { execution_decided_ = true; - resolved_execution_mode_ = StreamingExecutionMode::SingleProcess; - execution_reason_ = "query plan is not eligible for actor acceleration"; + resolved_execution_mode_ = StreamingExecutionMode::LocalWorkers; + execution_reason_ = !actor_pipeline_supported + ? "configured local-workers execution; using generic partition workers because the query plan is not eligible for credit acceleration" + : "configured local-workers execution; using generic partition workers because credit acceleration requires local_workers > 1"; strategy_decision_.resolved_execution_mode = streamingExecutionModeName(resolved_execution_mode_); strategy_decision_.reason = execution_reason_; @@ -2134,10 +2059,16 @@ size_t StreamingQuery::awaitTermination(size_t maxBatches) { strategy_decision_.used_shared_memory = false; strategy_decision_.transport_mode = streamingTransportModeName(StreamingTransportMode::InProcess); + if (!execution_decided_) { + execution_decided_ = true; + resolved_execution_mode_ = options_.execution_mode; + execution_reason_ = options_.execution_mode == StreamingExecutionMode::LocalWorkers + ? "configured local-workers execution" + : "configured single-process execution"; + } strategy_decision_.resolved_execution_mode = - streamingExecutionModeName(options_.execution_mode == StreamingExecutionMode::Auto - ? resolved_execution_mode_ - : options_.execution_mode); + streamingExecutionModeName(resolved_execution_mode_); + strategy_decision_.reason = execution_reason_; applyStrategyDecision(strategy_decision_); } const auto before_sink = std::chrono::steady_clock::now(); diff --git a/src/dataflow/core/execution/stream/stream.h b/src/dataflow/core/execution/stream/stream.h index bfa833b..7efffbf 100644 --- a/src/dataflow/core/execution/stream/stream.h +++ b/src/dataflow/core/execution/stream/stream.h @@ -137,20 +137,11 @@ std::shared_ptr makeStateStore( const std::string& backend, const std::unordered_map& options = {}); -enum class StreamingExecutionMode { SingleProcess, LocalWorkers, ActorCredit, Auto }; +enum class StreamingExecutionMode { SingleProcess, LocalWorkers }; enum class StreamingTransportMode { InProcess, RpcCopy, SharedMemory, Auto }; enum class CheckpointDeliveryMode { AtLeastOnce, BestEffort }; -struct StreamingAutoExecutionOptions { - size_t sample_batches = 2; - size_t min_rows_per_batch = 64 * 1024; - size_t min_projected_payload_bytes = 256 * 1024; - double min_compute_to_overhead_ratio = 1.5; - double min_actor_speedup = 1.05; - double strong_actor_speedup = 1.25; -}; - struct StreamingQueryOptions { uint64_t trigger_interval_ms = 1000; size_t max_inflight_batches = 2; @@ -160,11 +151,9 @@ struct StreamingQueryOptions { std::string checkpoint_path; StreamingExecutionMode execution_mode = StreamingExecutionMode::SingleProcess; size_t local_workers = 1; - size_t actor_workers = 0; - size_t actor_max_inflight_partitions = 0; - bool actor_shared_memory_transport = true; - size_t actor_shared_memory_min_payload_bytes = 64 * 1024; - StreamingAutoExecutionOptions actor_auto_options; + size_t max_inflight_partitions = 0; + bool shared_memory_transport = true; + size_t shared_memory_min_payload_bytes = 64 * 1024; uint64_t idle_wait_ms = 100; size_t max_retained_windows = 0; CheckpointDeliveryMode checkpoint_delivery_mode = CheckpointDeliveryMode::AtLeastOnce; @@ -174,8 +163,6 @@ struct StreamingQueryOptions { ? local_workers : 1; } - - size_t effectiveActorWorkers() const { return actor_workers > 0 ? actor_workers : local_workers; } }; struct StreamPullContext { diff --git a/src/dataflow/examples/stream_benchmark.cc b/src/dataflow/examples/stream_benchmark.cc index 2c61f13..0057fd5 100644 --- a/src/dataflow/examples/stream_benchmark.cc +++ b/src/dataflow/examples/stream_benchmark.cc @@ -42,8 +42,7 @@ void runCase(const std::string& name, dataflow::StreamingExecutionMode mode, siz options.trigger_interval_ms = 0; options.execution_mode = mode; options.local_workers = workers; - options.actor_workers = workers; - options.actor_max_inflight_partitions = workers; + options.max_inflight_partitions = workers; options.max_inflight_batches = 4; options.max_queued_partitions = 16; options.max_retained_windows = 4; @@ -116,9 +115,5 @@ int main(int argc, char** argv) { rows_per_batch); runCase("stateful-local-workers", dataflow::StreamingExecutionMode::LocalWorkers, worker_count, true, batch_count, rows_per_batch); - runCase("stateful-actor-credit", dataflow::StreamingExecutionMode::ActorCredit, worker_count, true, - batch_count, rows_per_batch); - runCase("stateful-auto", dataflow::StreamingExecutionMode::Auto, worker_count, true, batch_count, - rows_per_batch); return 0; } diff --git a/src/dataflow/tests/sql_regression_test.cc b/src/dataflow/tests/sql_regression_test.cc index 2599a71..5b88dbc 100644 --- a/src/dataflow/tests/sql_regression_test.cc +++ b/src/dataflow/tests/sql_regression_test.cc @@ -431,9 +431,9 @@ void runStreamSqlRegression() { dataflow::StreamingQueryOptions window_options; window_options.trigger_interval_ms = 0; - window_options.execution_mode = dataflow::StreamingExecutionMode::ActorCredit; - window_options.actor_workers = 2; - window_options.actor_max_inflight_partitions = 2; + window_options.execution_mode = dataflow::StreamingExecutionMode::LocalWorkers; + window_options.local_workers = 2; + window_options.max_inflight_partitions = 2; auto window_query = s.startStreamSql( "INSERT INTO stream_window_summary_v1 " @@ -443,8 +443,10 @@ void runStreamSqlRegression() { "GROUP BY window_start, key", window_options); expect(window_query.awaitTermination() == 1, "stream_sql_window_processed_batches"); - expect(window_query.progress().execution_mode == "actor-credit", - "stream_sql_window_actor_hot_path"); + expect(window_query.progress().execution_mode == "local-workers", + "stream_sql_window_local_workers_mode"); + expect(window_query.progress().used_actor_runtime, + "stream_sql_window_credit_accelerator_used"); const auto window_sink_table = s.read_csv(window_sink_path).toTable(); expect(window_sink_table.rows.size() == 2, "stream_sql_window_sink_rows"); @@ -484,8 +486,10 @@ void runStreamSqlRegression() { "GROUP BY window_start, key", window_options); expect(window_count_query.awaitTermination() == 1, "stream_sql_window_count_processed_batches"); - expect(window_count_query.progress().execution_mode == "actor-credit", - "stream_sql_window_count_actor_hot_path"); + expect(window_count_query.progress().execution_mode == "local-workers", + "stream_sql_window_count_local_workers_mode"); + expect(window_count_query.progress().used_actor_runtime, + "stream_sql_window_count_credit_accelerator_used"); const auto window_count_sink_table = s.read_csv(window_count_sink_path).toTable(); expect(window_count_sink_table.rows.size() == 2, "stream_sql_window_count_sink_rows"); diff --git a/src/dataflow/tests/stream_strategy_explain_test.cc b/src/dataflow/tests/stream_strategy_explain_test.cc index d31a536..7d935ce 100644 --- a/src/dataflow/tests/stream_strategy_explain_test.cc +++ b/src/dataflow/tests/stream_strategy_explain_test.cc @@ -83,14 +83,7 @@ QueryRun runHotPath(StreamingExecutionMode mode) { options.trigger_interval_ms = 0; options.execution_mode = mode; options.local_workers = 4; - options.actor_workers = 4; - options.actor_max_inflight_partitions = 4; - options.actor_auto_options.sample_batches = 1; - options.actor_auto_options.min_rows_per_batch = 0; - options.actor_auto_options.min_projected_payload_bytes = 0; - options.actor_auto_options.min_compute_to_overhead_ratio = 0.0; - options.actor_auto_options.min_actor_speedup = 0.0; - options.actor_auto_options.strong_actor_speedup = 0.0; + options.max_inflight_partitions = 4; auto query = session.readStream(std::make_shared(std::vector
{makeHotPathBatch()})) .withStateStore(dataflow::makeMemoryStateStore()) @@ -110,14 +103,7 @@ QueryRun runCountHotPath(StreamingExecutionMode mode) { options.trigger_interval_ms = 0; options.execution_mode = mode; options.local_workers = 4; - options.actor_workers = 4; - options.actor_max_inflight_partitions = 4; - options.actor_auto_options.sample_batches = 1; - options.actor_auto_options.min_rows_per_batch = 0; - options.actor_auto_options.min_projected_payload_bytes = 0; - options.actor_auto_options.min_compute_to_overhead_ratio = 0.0; - options.actor_auto_options.min_actor_speedup = 0.0; - options.actor_auto_options.strong_actor_speedup = 0.0; + options.max_inflight_partitions = 4; auto query = session.readStream(std::make_shared(std::vector
{makeHotPathBatch()})) .withStateStore(dataflow::makeMemoryStateStore()) @@ -146,15 +132,9 @@ void testExplainStreamSql() { "USING csv OPTIONS(path '/tmp/velaria-stream-strategy-multi-explain.csv', delimiter ',')"); StreamingQueryOptions options; - options.execution_mode = StreamingExecutionMode::Auto; - options.actor_workers = 4; - options.actor_max_inflight_partitions = 4; - options.actor_auto_options.sample_batches = 1; - options.actor_auto_options.min_rows_per_batch = 0; - options.actor_auto_options.min_projected_payload_bytes = 0; - options.actor_auto_options.min_compute_to_overhead_ratio = 0.0; - options.actor_auto_options.min_actor_speedup = 0.0; - options.actor_auto_options.strong_actor_speedup = 0.0; + options.execution_mode = StreamingExecutionMode::LocalWorkers; + options.local_workers = 4; + options.max_inflight_partitions = 4; const std::string explain = session.explainStreamSql( "INSERT INTO strategy_hot_sink " @@ -169,9 +149,9 @@ void testExplainStreamSql() { "explain should describe aggregate keys"); expect(explain.find("actor_eligible=true") != std::string::npos, "explain should expose actor eligibility"); - expect(explain.find("selected_mode=auto") != std::string::npos, - "explain should expose the initial selected mode"); - expect(explain.find("actor_shared_memory_transport=true") != std::string::npos, + expect(explain.find("selected_mode=local-workers") != std::string::npos, + "explain should expose the selected local-workers mode"); + expect(explain.find("shared_memory_transport=true") != std::string::npos, "explain should expose shared-memory knobs"); const std::string count_explain = session.explainStreamSql( @@ -213,52 +193,38 @@ void testExplainStreamSql() { void testExecutionModeConsistency() { const auto single = runHotPath(StreamingExecutionMode::SingleProcess); const auto local = runHotPath(StreamingExecutionMode::LocalWorkers); - const auto actor = runHotPath(StreamingExecutionMode::ActorCredit); - const auto automatic = runHotPath(StreamingExecutionMode::Auto); const auto baseline = sumTableToMap(single.table); expect(sumTableToMap(local.table) == baseline, "local-workers result should match single-process"); - expect(sumTableToMap(actor.table) == baseline, "actor-credit result should match single-process"); - expect(sumTableToMap(automatic.table) == baseline, "auto result should match single-process"); expect(single.progress.execution_mode == "single-process", "single-process progress should keep single-process mode"); expect(local.progress.execution_mode == "local-workers", "local-workers progress should keep local-workers mode"); - expect(actor.progress.execution_mode == "actor-credit", - "actor-credit progress should keep actor-credit mode"); - expect(automatic.progress.execution_mode == "actor-credit", - "auto hot path should resolve to actor-credit with permissive thresholds"); - expect(automatic.progress.transport_mode == "shared-memory" || - automatic.progress.transport_mode == "rpc-copy", - "auto hot path should expose actor transport mode"); - + expect(local.progress.used_actor_runtime, + "eligible local-workers hot path should use the credit accelerator"); + expect(local.progress.transport_mode == "shared-memory" || + local.progress.transport_mode == "rpc-copy", + "eligible local-workers hot path should expose accelerator transport mode"); const auto count_single = runCountHotPath(StreamingExecutionMode::SingleProcess); - const auto count_actor = runCountHotPath(StreamingExecutionMode::ActorCredit); - const auto count_auto = runCountHotPath(StreamingExecutionMode::Auto); - expect(countTableToMap(count_actor.table) == countTableToMap(count_single.table), - "count actor-credit result should match single-process"); - expect(countTableToMap(count_auto.table) == countTableToMap(count_single.table), - "count auto result should match single-process"); - expect(count_auto.progress.execution_mode == "actor-credit", - "count auto hot path should resolve to actor-credit"); + const auto count_local = runCountHotPath(StreamingExecutionMode::LocalWorkers); + expect(countTableToMap(count_local.table) == countTableToMap(count_single.table), + "count local-workers result should match single-process"); + expect(count_local.progress.execution_mode == "local-workers", + "count local-workers hot path should stay in local-workers mode"); + expect(count_local.progress.used_actor_runtime, + "count local-workers hot path should use the credit accelerator"); } -void testAutoFallbackForNonHotPath() { +void testLocalWorkersFallbackForNonHotPath() { DataflowSession& session = DataflowSession::builder(); auto sink = std::make_shared(); StreamingQueryOptions options; options.trigger_interval_ms = 0; - options.execution_mode = StreamingExecutionMode::Auto; - options.actor_workers = 4; - options.actor_max_inflight_partitions = 4; - options.actor_auto_options.sample_batches = 1; - options.actor_auto_options.min_rows_per_batch = 0; - options.actor_auto_options.min_projected_payload_bytes = 0; - options.actor_auto_options.min_compute_to_overhead_ratio = 0.0; - options.actor_auto_options.min_actor_speedup = 0.0; - options.actor_auto_options.strong_actor_speedup = 0.0; + options.execution_mode = StreamingExecutionMode::LocalWorkers; + options.local_workers = 4; + options.max_inflight_partitions = 4; auto query = session.readStream(std::make_shared(std::vector
{makeNonHotBatch()})) .withStateStore(dataflow::makeMemoryStateStore()) @@ -269,10 +235,41 @@ void testAutoFallbackForNonHotPath() { expect(query.awaitTermination() == 1, "non-hot-path query should process one batch"); const auto progress = query.progress(); - expect(progress.execution_mode == "single-process", - "non-hot-path auto query should fall back to single-process"); - expect(progress.execution_reason.find("not eligible") != std::string::npos, - "non-hot-path auto query should expose fallback reason"); + expect(progress.execution_mode == "local-workers", + "non-hot-path local-workers query should stay in local-workers mode"); + expect(!progress.used_actor_runtime, + "non-hot-path local-workers query should fall back to generic partition workers"); + expect(progress.execution_reason.find("generic partition workers") != std::string::npos, + "non-hot-path local-workers query should expose fallback reason"); +} + +void testLocalWorkersSingleWorkerDisablesCreditAcceleration() { + DataflowSession& session = DataflowSession::builder(); + auto sink = std::make_shared(); + + StreamingQueryOptions options; + options.trigger_interval_ms = 0; + options.execution_mode = StreamingExecutionMode::LocalWorkers; + options.local_workers = 1; + options.max_inflight_partitions = 4; + + auto query = session.readStream(std::make_shared(std::vector
{makeHotPathBatch()})) + .withStateStore(dataflow::makeMemoryStateStore()) + .groupBy({"window_start", "key"}) + .sum("value", true, "value_sum") + .writeStream(sink, options); + query.start(); + expect(query.awaitTermination() == 1, "single-worker local-workers query should process one batch"); + const auto progress = query.progress(); + + expect(progress.execution_mode == "local-workers", + "single-worker local-workers query should stay in local-workers mode"); + expect(!progress.used_actor_runtime, + "single-worker local-workers query should not use credit acceleration"); + expect(progress.transport_mode == "inproc", + "single-worker local-workers query should stay in inproc transport"); + expect(progress.execution_reason.find("local_workers > 1") != std::string::npos, + "single-worker local-workers query should explain why credit acceleration was skipped"); } } // namespace @@ -281,7 +278,8 @@ int main() { try { testExplainStreamSql(); testExecutionModeConsistency(); - testAutoFallbackForNonHotPath(); + testLocalWorkersFallbackForNonHotPath(); + testLocalWorkersSingleWorkerDisablesCreditAcceleration(); std::cout << "[test] stream strategy explain ok" << std::endl; return 0; } catch (const std::exception& ex) {