Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README-zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ examples 与 helper scripts 只用于说明各层,不定义各层。
- 本地 batch + streaming 共用一个内核
- `read_csv`, `readStream(...)`, `readStreamCsvDir(...)`
- query-local 反压、有界 backlog、progress snapshot、checkpoint path
- 执行模式:`single-process`、`local-workers`、`actor-credit`、`auto`
- 执行模式:`single-process`、`local-workers`
- 文件 source/sink
- 基础 streaming operators:`select / filter / withColumn / drop / limit / window`
- stateful streaming 聚合:`sum / count / min / max / avg`
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ Available today:
- local batch + streaming execution through one kernel
- `read_csv`, `readStream(...)`, `readStreamCsvDir(...)`
- query-local backpressure, bounded backlog, progress snapshots, checkpoint path
- execution modes: `single-process`, `local-workers`, `actor-credit`, `auto`
- execution modes: `single-process`, `local-workers`
- file source/sink support
- basic stream operators: `select / filter / withColumn / drop / limit / window`
- stateful stream aggregates: `sum / count / min / max / avg`
Expand Down
51 changes: 7 additions & 44 deletions docs/streaming_runtime_design.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,11 @@ This document describes runtime internals and current implementation shape. For

- `SingleProcess`
- `LocalWorkers`
- `ActorCredit`
- `Auto`

语义:

- `SingleProcess`:所有 batch 在本进程内执行。
- `LocalWorkers`:partition-local 阶段使用本进程线程并行,barrier 阶段仍在本地汇总。
- `ActorCredit`:仅对明确支持的热路径下推到本地 actor-stream runtime。
- `Auto`:先采样,再在 `SingleProcess` 和 `ActorCredit` 之间做一次 query-local 选择。
- `LocalWorkers`:普通路径下使用本进程线程并行;对明确支持的热路径,可在同一 mode 下启用 credit-based 本地加速。

## Strategy Decision and Explain

Expand Down Expand Up @@ -59,7 +55,7 @@ This document describes runtime internals and current implementation shape. For

## Current Actor Pushdown Boundary

当前只有两类 query 会被 `ActorCredit` / `Auto` 接住:
当前只有少数固定形状的 query 会被 `LocalWorkers` 内部的 credit-based hot path 接住:

- 前置变换全部为 `PartitionLocal`
- 最后一个 barrier 是以下其一:
Expand All @@ -73,7 +69,7 @@ This document describes runtime internals and current implementation shape. For
- `value` 的窗口分组求和热路径
- `COUNT(*)` 的窗口分组计数热路径

`MIN / MAX / AVG` 以及多 aggregate 输出当前仍走本地执行链;`Auto` 会明确写出 fallback reason。
`MIN / MAX / AVG` 以及多 aggregate 输出当前仍走本地执行链;`LocalWorkers` 会明确写出 fallback reason。

如果 query 不满足这个形状,`StreamingQuery` 会回退到普通执行链,并把原因写入:

Expand All @@ -84,37 +80,6 @@ This document describes runtime internals and current implementation shape. For

- `SingleProcess`
- `LocalWorkers`
- `ActorCredit`
- `Auto`

其中 `Auto` 若未命中热路径,必须稳定回退到 `SingleProcess` 并给出明确 reason。

## Auto Selection Rule

`Auto` 当前采用“前若干批采样”的启发式规则。

采样输出:

- `rows_per_batch`
- `average_projected_payload_bytes`
- `single_rows_per_s`
- `actor_rows_per_s`
- `actor_speedup`
- `compute_to_overhead_ratio`

默认阈值:

- `sample_batches = 2`
- `min_rows_per_batch = 64K`
- `min_projected_payload_bytes = 256KB`
- `min_compute_to_overhead_ratio = 1.5`
- `min_actor_speedup = 1.05`
- `strong_actor_speedup = 1.25`

选择规则:

- 常规情况下,要求 `rows`、`payload`、`speedup`、`compute/overhead` 都满足阈值。
- 若 `actor_speedup` 已经足够强,则允许越过 `compute/overhead` 的软阈值。

## Shared Memory Transport

Expand Down Expand Up @@ -182,19 +147,17 @@ actor-stream payload 当前使用 typed binary batch:

## Known Limits

- `ActorCredit` 还不是通用执行器,只是定向热路径。
- credit-based local acceleration 还不是通用执行器,只是定向热路径。
- 最终状态回并仍在 coordinator 本地完成。
- query 级 `Auto` 当前阈值更接近“保守正确”,还不是最终调优状态。
- `split_ms / merge_ms` 仍是毫秒级指标,对极短阶段不够敏感。
- SQL 路径仍未自动下推到 actor runtime;当前优化主要落在 streaming 执行内核。
- 同机 observability 仍是 experiment profile,不是完整 distributed telemetry 体系。

## Recommended Next Steps

1. 扩展 actor pushdown 到更多 group aggregate 与多 aggregate 输出。
2. 调整 query 级 `Auto` 阈值,使其更贴近真实 `StreamingQuery` workload。
3. 给 query 级 progress 增加更细粒度的 actor 指标拆分。
4. 继续把 source 到执行内核的中间 `Table/Row/Value` 转换收紧到更早的列式表示。
1. 扩展 credit-based local acceleration 到 `count` 和更多 group aggregate。
2. 给 query 级 progress 增加更细粒度的 accelerator 指标拆分。
3. 继续把 source 到执行内核的中间 `Table/Row/Value` 转换收紧到更早的列式表示。


## Source/Sink ABI Bridge (v0.4)
Expand Down
26 changes: 9 additions & 17 deletions src/dataflow/core/contract/api/session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,24 +45,16 @@ std::string formatStreamStrategyExplain(const StreamingStrategyDecision& strateg
out << "backpressure_high_watermark=" << strategy.backpressure_high_watermark << "\n";
out << "backpressure_low_watermark=" << strategy.backpressure_low_watermark << "\n";
out << "checkpoint_delivery_mode=" << strategy.checkpoint_delivery_mode << "\n";
out << "actor_workers=" << std::max<size_t>(2, options.effectiveActorWorkers()) << "\n";
out << "actor_max_inflight_partitions="
<< std::max<size_t>(1, options.actor_max_inflight_partitions > 0
? options.actor_max_inflight_partitions
: options.effectiveActorWorkers())
out << "local_worker_count=" << std::max<size_t>(1, options.effectiveLocalWorkers()) << "\n";
out << "max_inflight_partitions="
<< std::max<size_t>(1, options.max_inflight_partitions > 0
? options.max_inflight_partitions
: options.effectiveLocalWorkers())
<< "\n";
out << "actor_shared_memory_transport="
<< (options.actor_shared_memory_transport ? "true" : "false") << "\n";
out << "actor_shared_memory_min_payload_bytes="
<< options.actor_shared_memory_min_payload_bytes << "\n";
out << "auto_sample_batches=" << options.actor_auto_options.sample_batches << "\n";
out << "auto_min_rows_per_batch=" << options.actor_auto_options.min_rows_per_batch << "\n";
out << "auto_min_projected_payload_bytes="
<< options.actor_auto_options.min_projected_payload_bytes << "\n";
out << "auto_min_compute_to_overhead_ratio="
<< options.actor_auto_options.min_compute_to_overhead_ratio << "\n";
out << "auto_min_actor_speedup=" << options.actor_auto_options.min_actor_speedup << "\n";
out << "auto_strong_actor_speedup=" << options.actor_auto_options.strong_actor_speedup << "\n";
out << "shared_memory_transport="
<< (options.shared_memory_transport ? "true" : "false") << "\n";
out << "shared_memory_min_payload_bytes="
<< options.shared_memory_min_payload_bytes << "\n";
return out.str();
}

Expand Down
Loading
Loading