From 5b84f92e95aa605459c15ff1932cf425ec0296f4 Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Mon, 26 Jan 2026 15:39:40 +0800 Subject: [PATCH 1/7] refind StatefulPlanner::plan --- .../experimental/stateful/StatefulPlanner.cpp | 51 ++++++++++++++++++- velox/experimental/stateful/StatefulPlanner.h | 12 ++++- 2 files changed, 61 insertions(+), 2 deletions(-) diff --git a/velox/experimental/stateful/StatefulPlanner.cpp b/velox/experimental/stateful/StatefulPlanner.cpp index 8409a1b3b505..cef16fb7cf70 100644 --- a/velox/experimental/stateful/StatefulPlanner.cpp +++ b/velox/experimental/stateful/StatefulPlanner.cpp @@ -61,19 +61,68 @@ #include "velox/experimental/stateful/rank/AppendOnlyTopNRanker.h" #include "velox/experimental/stateful/agg/AggsHandleFunction.h" #include "velox/experimental/stateful/agg/GroupAggregator.h" +#include namespace facebook::velox::stateful { static std::atomic opId = 0; +static int nextOperatorId() { + return opId.fetch_add(1); +} + // static StatefulOperatorPtr StatefulPlanner::plan( const core::PlanFragment& planFragment, exec::DriverCtx* ctx, StateBackend* stateBackend) { - return nodeToStatefulOperator(planFragment.planNode, ctx, stateBackend); + // return nodeToStatefulOperator(planFragment.planNode, ctx, stateBackend); + StatefulPlanner planner(ctx, stateBackend); + return planner.buildOperators(planFragment.planNode); +} + +#define CHECK_NODE_TYPE(TYPE, node) std::dynamic_pointer_cast(node->node()) != nullptr + +StatefulOperatorPtr StatefulPlanner::buildOperators(const core::PlanNodePtr& planNode) { + auto statefulNode = std::dynamic_pointer_cast(planNode); + VELOX_CHECK(statefulNode, "Not stateful node: {}", planNode->toString()); + StatefulOperatorPtr result; + if (std::dynamic_pointer_cast(statefulNode->node()) != nullptr) { + result = buildWatermarkAssignerOperator(*statefulNode); + } + VELOX_CHECK(result, "Failed to build operator for node: {}", planNode->toString()); + return result; +} + +std::vector StatefulPlanner::buildOperators(const std::vector& targets) { + std::vector operators; + operators.resize(targets.size()); + std::transform(targets.begin(), targets.end(), operators.begin(), [this](const core::PlanNodePtr& target) { + return buildOperators(target); + }); + return operators; +} + +StatefulOperatorPtr StatefulPlanner::buildWatermarkAssignerOperator(const StatefulPlanNode& planNode) { + std::vector targets = buildOperators(planNode.targets()); + + auto watermarkAssignerNode = std::dynamic_pointer_cast(planNode.node()); + + auto op = std::make_unique( + nextOperatorId(), + ctx_, + nullptr, + watermarkAssignerNode->project()); + + return std::make_unique( + std::move(op), + std::move(targets), + watermarkAssignerNode->idleTimeout(), + watermarkAssignerNode->rowtimeFieldIndex(), + watermarkAssignerNode->watermarkInterval()); } + //static StatefulOperatorPtr StatefulPlanner::nodeToStatefulOperator( const core::PlanNodePtr& planNode, diff --git a/velox/experimental/stateful/StatefulPlanner.h b/velox/experimental/stateful/StatefulPlanner.h index 06f4aea5b271..990683443373 100644 --- a/velox/experimental/stateful/StatefulPlanner.h +++ b/velox/experimental/stateful/StatefulPlanner.h @@ -17,6 +17,7 @@ #include "velox/exec/Operator.h" #include "velox/experimental/stateful/StatefulOperator.h" +#include "velox/experimental/stateful/StatefulPlanNode.h" #include "velox/experimental/stateful/state/StateBackend.h" namespace facebook::velox::core { @@ -33,8 +34,14 @@ class StatefulPlanner { const core::PlanFragment& planFragment, exec::DriverCtx* ctx, StateBackend* stateBackend); - + protected: + StatefulPlanner(exec::DriverCtx* ctx, StateBackend* stateBackend) : ctx_(ctx),stateBackend_(stateBackend) {} + private: + exec::DriverCtx* ctx_ = nullptr; + StateBackend* stateBackend_ = nullptr; + + static std::unique_ptr nodeToStatefulOperator( const core::PlanNodePtr& planNode, exec::DriverCtx* ctx, @@ -43,5 +50,8 @@ class StatefulPlanner { static std::unique_ptr nodeToOperator( const core::PlanNodePtr& planNode, exec::DriverCtx* ctx); + StatefulOperatorPtr buildOperators(const core::PlanNodePtr& planNode); + std::vector buildOperators(const std::vector& targets); + StatefulOperatorPtr buildWatermarkAssignerOperator(const StatefulPlanNode& planNode); }; } // namespace facebook::velox::stateful From e2c291ca407006786e9f3ee2a903e2d77787b46c Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Tue, 27 Jan 2026 16:08:30 +0800 Subject: [PATCH 2/7] refine StatefulPlanner Co-authored-by: Cursor --- .../experimental/stateful/StatefulPlanner.cpp | 367 +++++++++++++++++- velox/experimental/stateful/StatefulPlanner.h | 16 +- 2 files changed, 372 insertions(+), 11 deletions(-) diff --git a/velox/experimental/stateful/StatefulPlanner.cpp b/velox/experimental/stateful/StatefulPlanner.cpp index cef16fb7cf70..14bc49ca1db6 100644 --- a/velox/experimental/stateful/StatefulPlanner.cpp +++ b/velox/experimental/stateful/StatefulPlanner.cpp @@ -78,33 +78,49 @@ StatefulOperatorPtr StatefulPlanner::plan( StateBackend* stateBackend) { // return nodeToStatefulOperator(planFragment.planNode, ctx, stateBackend); StatefulPlanner planner(ctx, stateBackend); - return planner.buildOperators(planFragment.planNode); + return planner.transformStatefulOperators(planFragment.planNode); } #define CHECK_NODE_TYPE(TYPE, node) std::dynamic_pointer_cast(node->node()) != nullptr -StatefulOperatorPtr StatefulPlanner::buildOperators(const core::PlanNodePtr& planNode) { +StatefulOperatorPtr StatefulPlanner::transformStatefulOperators(const core::PlanNodePtr& planNode) { auto statefulNode = std::dynamic_pointer_cast(planNode); VELOX_CHECK(statefulNode, "Not stateful node: {}", planNode->toString()); StatefulOperatorPtr result; if (std::dynamic_pointer_cast(statefulNode->node()) != nullptr) { - result = buildWatermarkAssignerOperator(*statefulNode); + result = transformWatermarkAssignerOperator(*statefulNode); + } else if (std::dynamic_pointer_cast(statefulNode->node()) != nullptr) { + result = transformStreamPartitionOperator(*statefulNode); + } else if (std::dynamic_pointer_cast(statefulNode->node()) != nullptr) { + result = transformStreamJoinOperator(*statefulNode); + } else if (std::dynamic_pointer_cast(statefulNode->node()) != nullptr) { + result = transformStreamWindowJoinOperator(*statefulNode); + } else if (std::dynamic_pointer_cast(statefulNode->node()) != nullptr) { + result = transformStreamWindowAggregationOperator(*statefulNode); + } else if (std::dynamic_pointer_cast(statefulNode->node()) != nullptr) { + result = transformGroupWindowAggregationOperator(*statefulNode); + } else if (std::dynamic_pointer_cast(statefulNode->node()) != nullptr) { + result = transformStreamRankOperator(*statefulNode); + } else if (std::dynamic_pointer_cast(statefulNode->node()) != nullptr) { + result = transformGroupAggregationOperator(*statefulNode); + } else { + result = transformGenericOperator(*statefulNode); } VELOX_CHECK(result, "Failed to build operator for node: {}", planNode->toString()); return result; } -std::vector StatefulPlanner::buildOperators(const std::vector& targets) { +std::vector StatefulPlanner::transformStatefulOperators(const std::vector& targets) { std::vector operators; operators.resize(targets.size()); std::transform(targets.begin(), targets.end(), operators.begin(), [this](const core::PlanNodePtr& target) { - return buildOperators(target); + return transformStatefulOperators(target); }); return operators; } -StatefulOperatorPtr StatefulPlanner::buildWatermarkAssignerOperator(const StatefulPlanNode& planNode) { - std::vector targets = buildOperators(planNode.targets()); +StatefulOperatorPtr StatefulPlanner::transformWatermarkAssignerOperator(const StatefulPlanNode& planNode) { + std::vector targets = transformStatefulOperators(planNode.targets()); auto watermarkAssignerNode = std::dynamic_pointer_cast(planNode.node()); @@ -122,6 +138,341 @@ StatefulOperatorPtr StatefulPlanner::buildWatermarkAssignerOperator(const Statef watermarkAssignerNode->watermarkInterval()); } +StatefulOperatorPtr StatefulPlanner::transformStreamPartitionOperator(const StatefulPlanNode& planNode) { + std::vector targets = transformStatefulOperators(planNode.targets()); + VELOX_CHECK(targets.empty(), "StreamPartitionNode should have no targets"); + + auto partitionNode = std::dynamic_pointer_cast(planNode.node()); + VELOX_CHECK(partitionNode, "Failed to cast to StreamPartitionNode"); + + auto op = std::make_unique( + nextOperatorId(), + ctx_, + partitionNode->partition()); + + return std::make_unique( + std::move(op), + partitionNode->partition()->partitionFunctionSpec(), + partitionNode->numPartitions()); +} + +StatefulOperatorPtr StatefulPlanner::transformStreamJoinOperator(const StatefulPlanNode& planNode) { + std::vector targets = transformStatefulOperators(planNode.targets()); + + auto joinNode = std::dynamic_pointer_cast(planNode.node()); + VELOX_CHECK(joinNode, "Failed to cast to StreamJoinNode"); + VELOX_CHECK(joinNode->sources().size() == 2, "StreamJoinNode should have 2 sources"); + + std::unique_ptr left = transformOperator(joinNode->sources()[0]); + std::unique_ptr right = transformOperator(joinNode->sources()[1]); + std::unique_ptr probe = transformOperator(joinNode->probe()); + + std::unique_ptr leftKeySelector = + std::make_unique( + joinNode->leftPartFuncSpec()->create(INT_MAX, false), + probe->pool(), + joinNode->numPartitions()); + std::unique_ptr rightKeySelector = + std::make_unique( + joinNode->rightPartFuncSpec()->create(INT_MAX, false), + probe->pool(), + joinNode->numPartitions()); + + return std::make_unique( + std::move(left), + std::move(right), + std::move(leftKeySelector), + std::move(rightKeySelector), + std::move(probe), + std::move(targets)); +} + +StatefulOperatorPtr StatefulPlanner::transformStreamWindowJoinOperator(const StatefulPlanNode& planNode) { + std::vector targets = transformStatefulOperators(planNode.targets()); + + auto joinNode = std::dynamic_pointer_cast(planNode.node()); + VELOX_CHECK(joinNode, "Failed to cast to StreamWindowJoinNode"); + VELOX_CHECK(joinNode->sources().size() == 2, "StreamWindowJoinNode should have 2 sources"); + + std::unique_ptr left = transformOperator(joinNode->sources()[0]); + std::unique_ptr right = transformOperator(joinNode->sources()[1]); + std::unique_ptr probe = transformOperator(joinNode->probe()); + + std::unique_ptr leftKeySelector = + std::make_unique( + joinNode->leftPartFuncSpec()->create(INT_MAX, false), + probe->pool(), + joinNode->numPartitions()); + std::unique_ptr rightKeySelector = + std::make_unique( + joinNode->rightPartFuncSpec()->create(INT_MAX, false), + probe->pool(), + joinNode->numPartitions()); + + return std::make_unique( + std::move(left), + std::move(right), + std::move(leftKeySelector), + std::move(rightKeySelector), + std::move(probe), + std::move(targets), + joinNode->leftWindowEndIndex(), + joinNode->rightWindowEndIndex()); +} + +StatefulOperatorPtr StatefulPlanner::transformStreamWindowAggregationOperator(const StatefulPlanNode& planNode) { + std::vector targets = transformStatefulOperators(planNode.targets()); + + auto windowAggNode = std::dynamic_pointer_cast(planNode.node()); + VELOX_CHECK(windowAggNode, "Failed to cast to StreamWindowAggregationNode"); + + auto op = transformOperator(windowAggNode->aggregation()); + + std::unique_ptr keySelector = + std::make_unique( + windowAggNode->keySelectorSpec()->create(INT_MAX, true), + op->pool()); + std::unique_ptr sliceAssigner = + std::make_unique( + windowAggNode->sliceAssignerSpec()->create(INT_MAX, true), + op->pool()); + + if (windowAggNode->isLocalAgg()) { + return std::make_unique( + std::move(op), + std::move(targets), + std::move(keySelector), + std::move(sliceAssigner), + windowAggNode->windowInterval(), + windowAggNode->useDayLightSaving(), + windowAggNode->outputType()); + } else { + auto localAggregator = transformOperator(windowAggNode->localAgg()); + std::unique_ptr globalSliceAssigner = + std::make_unique( + std::move(sliceAssigner), + windowAggNode->size(), + windowAggNode->step(), + windowAggNode->offset(), + windowAggNode->windowType(), + windowAggNode->rowtimeIndex()); + return std::make_unique( + std::move(localAggregator), + std::move(op), + std::move(targets), + std::move(keySelector), + std::move(globalSliceAssigner), + windowAggNode->windowInterval(), + windowAggNode->useDayLightSaving()); + } +} + +StatefulOperatorPtr StatefulPlanner::transformGroupWindowAggregationOperator(const StatefulPlanNode& planNode) { + std::vector targets = transformStatefulOperators(planNode.targets()); + + auto windowAggNode = std::dynamic_pointer_cast(planNode.node()); + VELOX_CHECK(windowAggNode, "Failed to cast to GroupWindowAggregationNode"); + + VELOX_MEM_LOG(ERROR)<< "transformGroupWindowAggregationOperator:" << planNode.toString(); + auto op = transformOperator(windowAggNode->aggregation()); + + std::unique_ptr keySelector = + std::make_unique( + windowAggNode->keySelectorSpec()->create(INT_MAX, true), + op->pool()); + std::unique_ptr sliceAssigner = + std::make_unique( + windowAggNode->sliceAssignerSpec()->create(INT_MAX, true), + op->pool()); + std::unique_ptr windowAssigner = + std::make_unique( + std::move(sliceAssigner), + 0, + 0, + 0, + windowAggNode->windowType(), + windowAggNode->rowtimeIndex()); + + return std::make_unique( + std::unique_ptr(dynamic_cast(op.release())), + // TODO: support window parameters + std::make_unique(10, windowAggNode->isEventTime()), + std::move(targets), + std::move(keySelector), + std::move(windowAssigner), + windowAggNode->allowedLateness(), + windowAggNode->produceUpdates(), + windowAggNode->rowtimeIndex(), + windowAggNode->isEventTime()); +} + +StatefulOperatorPtr StatefulPlanner::transformStreamRankOperator(const StatefulPlanNode& planNode) { + std::vector targets = transformStatefulOperators(planNode.targets()); + + auto rankNode = std::dynamic_pointer_cast(planNode.node()); + VELOX_CHECK(rankNode, "Failed to cast to StreamRankNode"); + + auto op = transformOperator(rankNode->ranker()); + + std::unique_ptr keySelector = + std::make_unique( + rankNode->keySelectorSpec()->create(INT_MAX, true), + op->pool()); + + return std::make_unique( + std::move(op), + std::move(keySelector), + std::move(targets)); +} + +StatefulOperatorPtr StatefulPlanner::transformGroupAggregationOperator(const StatefulPlanNode& planNode) { + std::vector targets = transformStatefulOperators(planNode.targets()); + + auto aggNode = std::dynamic_pointer_cast(planNode.node()); + VELOX_CHECK(aggNode, "Failed to cast to GroupAggregationNode"); + + auto op = transformOperator(aggNode->aggregation()); + + std::unique_ptr keySelector = + std::make_unique( + aggNode->keySelectorSpec()->create(INT_MAX, true), + op->pool()); + + return std::make_unique( + std::move(op), + std::move(keySelector), + std::move(targets)); +} + +std::unique_ptr StatefulPlanner::transformOperator(const core::PlanNodePtr& planNode) { + if (auto filterNode = std::dynamic_pointer_cast(planNode)) { + if (planNode->sources().size() == 1) { + auto next = planNode->sources()[0]; + if (auto projectNode = std::dynamic_pointer_cast(next)) { + return std::make_unique( + nextOperatorId(), + ctx_, + filterNode, + projectNode); + } + } + return std::make_unique( + nextOperatorId(), + ctx_, + filterNode, + nullptr); + } else if (auto projectNode = std::dynamic_pointer_cast(planNode)) { + std::shared_ptr filterNode = nullptr; + const std::vector& sources = projectNode->sources(); + if (sources.size() == 1) { + filterNode = std::dynamic_pointer_cast(sources[0]); + } + return std::make_unique(nextOperatorId(), ctx_, filterNode, projectNode); + } else if (auto valuesNode = std::dynamic_pointer_cast(planNode)) { + return std::make_unique(nextOperatorId(), ctx_, valuesNode); + } else if (auto tableScanNode = std::dynamic_pointer_cast(planNode)) { + return std::make_unique(nextOperatorId(), ctx_, tableScanNode); + } else if (auto tableWriteNode = std::dynamic_pointer_cast(planNode)) { + return std::make_unique(nextOperatorId(), ctx_, tableWriteNode); + } else if (auto tableWriteMergeNode = std::dynamic_pointer_cast(planNode)) { + return std::make_unique(nextOperatorId(), ctx_, tableWriteMergeNode); + } else if (auto joinNode = std::dynamic_pointer_cast(planNode)) { + return std::make_unique(nextOperatorId(), ctx_, joinNode); + } else if (auto joinNode = std::dynamic_pointer_cast(planNode)) { + return std::make_unique(nextOperatorId(), ctx_, joinNode); + } else if (auto joinNode = std::dynamic_pointer_cast(planNode)) { + return std::make_unique(nextOperatorId(), ctx_, joinNode); + } else if (auto aggregationNode = std::dynamic_pointer_cast(planNode)) { + if (aggregationNode->isPreGrouped()) { + return std::make_unique(nextOperatorId(), ctx_, aggregationNode); + } else { + return std::make_unique(nextOperatorId(), ctx_, aggregationNode); + } + } else if (auto expandNode = std::dynamic_pointer_cast(planNode)) { + return std::make_unique(nextOperatorId(), ctx_, expandNode); + } else if (auto groupIdNode = std::dynamic_pointer_cast(planNode)) { + return std::make_unique(nextOperatorId(), ctx_, groupIdNode); + } else if (auto topNNode = std::dynamic_pointer_cast(planNode)) { + return std::make_unique(nextOperatorId(), ctx_, topNNode); + } else if (auto limitNode = std::dynamic_pointer_cast(planNode)) { + return std::make_unique(nextOperatorId(), ctx_, limitNode); + } else if (auto orderByNode = std::dynamic_pointer_cast(planNode)) { + return std::make_unique(nextOperatorId(), ctx_, orderByNode); + } else if (auto windowNode = std::dynamic_pointer_cast(planNode)) { + return std::make_unique(nextOperatorId(), ctx_, windowNode); + } else if (auto rowNumberNode = std::dynamic_pointer_cast(planNode)) { + return std::make_unique(nextOperatorId(), ctx_, rowNumberNode); + } else if (auto topNRowNumberNode = std::dynamic_pointer_cast(planNode)) { + return std::make_unique(nextOperatorId(), ctx_, topNRowNumberNode); + } else if (auto markDistinctNode = std::dynamic_pointer_cast(planNode)) { + return std::make_unique(nextOperatorId(), ctx_, markDistinctNode); + } else if (auto mergeJoin = std::dynamic_pointer_cast(planNode)) { + auto mergeJoinOp = std::make_unique(nextOperatorId(), ctx_, mergeJoin); + ctx_->task->createMergeJoinSource(ctx_->splitGroupId, mergeJoin->id()); + return std::move(mergeJoinOp); + } else if (auto unnest = std::dynamic_pointer_cast(planNode)) { + return std::make_unique(nextOperatorId(), ctx_, unnest); + } else if (auto enforceSingleRow = std::dynamic_pointer_cast(planNode)) { + return std::make_unique(nextOperatorId(), ctx_, enforceSingleRow); + } else if (auto assignUniqueIdNode = std::dynamic_pointer_cast(planNode)) { + return std::make_unique( + nextOperatorId(), + ctx_, + assignUniqueIdNode, + assignUniqueIdNode->taskUniqueId(), + assignUniqueIdNode->uniqueIdCounter()); + } else if (auto aggsHandlerNode = std::dynamic_pointer_cast(planNode)) { + return std::make_unique(nextOperatorId(), ctx_, aggsHandlerNode); + } else if (auto aggsHandlerNode = std::dynamic_pointer_cast(planNode)) { + return std::make_unique( + nextOperatorId(), + ctx_, + aggsHandlerNode, + std::make_unique(), // TODO: not complete yet + 0, // stateRetentionTime: default to 0 + aggsHandlerNode->generateUpdateBefore()); + } else if (auto deduplicateNode = std::dynamic_pointer_cast(planNode)) { + return std::make_unique( + nextOperatorId(), + ctx_, + deduplicateNode, + deduplicateNode->minRetentionTime(), + deduplicateNode->rowtimeIndex(), + deduplicateNode->generateUpdateBefore(), + deduplicateNode->generateInsert(), + deduplicateNode->keepLastRow()); + } else if (auto topNNode = std::dynamic_pointer_cast(planNode)) { + auto op = transformOperator(topNNode->topN()); + std::shared_ptr sortKeySelector = + std::make_shared( + topNNode->sortKeySelectorSpec()->create(INT_MAX, true), + op->pool()); + return std::make_unique( + nextOperatorId(), + ctx_, + topNNode, + std::move(op), + sortKeySelector, + topNNode->generateUpdateBefore(), + topNNode->outputRankNumber(), + topNNode->cacheSize()); + } + std::unique_ptr extended; + extended = exec::Operator::fromPlanNode(ctx_, nextOperatorId(), planNode); + if (!extended) { + VELOX_MEM_LOG(ERROR)<< "Failed to create operator for plan node:" << process::StackTrace().toString(); + } + VELOX_CHECK(extended, "Unsupported plan node: {}", planNode->toString()); + return extended; +} + +StatefulOperatorPtr StatefulPlanner::transformGenericOperator(const StatefulPlanNode& planNode) { + std::vector targets = transformStatefulOperators(planNode.targets()); + std::unique_ptr op = transformOperator(planNode.node()); + return std::make_unique( + std::move(op), + std::move(targets)); +} //static StatefulOperatorPtr StatefulPlanner::nodeToStatefulOperator( @@ -468,7 +819,7 @@ std::unique_ptr StatefulPlanner::nodeToOperator( auto op = nodeToOperator(topNNode->topN(), ctx); std::unique_ptr sortKeySelector = std::make_unique( - std::move(topNNode->sortKeySelectorSpec()->create(INT_MAX, true)), + topNNode->sortKeySelectorSpec()->create(INT_MAX, true), op->pool()); return std::make_unique( opId.fetch_add(1), diff --git a/velox/experimental/stateful/StatefulPlanner.h b/velox/experimental/stateful/StatefulPlanner.h index 990683443373..03eebb3b5b2a 100644 --- a/velox/experimental/stateful/StatefulPlanner.h +++ b/velox/experimental/stateful/StatefulPlanner.h @@ -50,8 +50,18 @@ class StatefulPlanner { static std::unique_ptr nodeToOperator( const core::PlanNodePtr& planNode, exec::DriverCtx* ctx); - StatefulOperatorPtr buildOperators(const core::PlanNodePtr& planNode); - std::vector buildOperators(const std::vector& targets); - StatefulOperatorPtr buildWatermarkAssignerOperator(const StatefulPlanNode& planNode); + StatefulOperatorPtr transformStatefulOperators(const core::PlanNodePtr& planNode); + std::vector transformStatefulOperators(const std::vector& targets); + std::unique_ptr transformOperator(const core::PlanNodePtr& planNode); + StatefulOperatorPtr transformWatermarkAssignerOperator(const StatefulPlanNode& planNode); + StatefulOperatorPtr transformStreamPartitionOperator(const StatefulPlanNode& planNode); + StatefulOperatorPtr transformStreamJoinOperator(const StatefulPlanNode& planNode); + StatefulOperatorPtr transformStreamWindowJoinOperator(const StatefulPlanNode& planNode); + StatefulOperatorPtr transformStreamWindowAggregationOperator(const StatefulPlanNode& planNode); + StatefulOperatorPtr transformGroupWindowAggregationOperator(const StatefulPlanNode& planNode); + StatefulOperatorPtr transformStreamRankOperator(const StatefulPlanNode& planNode); + StatefulOperatorPtr transformGroupAggregationOperator(const StatefulPlanNode& planNode); + StatefulOperatorPtr transformGenericOperator(const StatefulPlanNode& planNode); + }; } // namespace facebook::velox::stateful From c530fc1162382fd5f692bb626f7c6825fe806e09 Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Wed, 28 Jan 2026 18:30:36 +0800 Subject: [PATCH 3/7] clear --- .../experimental/stateful/StatefulPlanner.cpp | 383 ------------------ velox/experimental/stateful/StatefulPlanner.h | 9 - 2 files changed, 392 deletions(-) diff --git a/velox/experimental/stateful/StatefulPlanner.cpp b/velox/experimental/stateful/StatefulPlanner.cpp index 14bc49ca1db6..45ba143a50bb 100644 --- a/velox/experimental/stateful/StatefulPlanner.cpp +++ b/velox/experimental/stateful/StatefulPlanner.cpp @@ -76,7 +76,6 @@ StatefulOperatorPtr StatefulPlanner::plan( const core::PlanFragment& planFragment, exec::DriverCtx* ctx, StateBackend* stateBackend) { - // return nodeToStatefulOperator(planFragment.planNode, ctx, stateBackend); StatefulPlanner planner(ctx, stateBackend); return planner.transformStatefulOperators(planFragment.planNode); } @@ -474,386 +473,4 @@ StatefulOperatorPtr StatefulPlanner::transformGenericOperator(const StatefulPlan std::move(targets)); } -//static -StatefulOperatorPtr StatefulPlanner::nodeToStatefulOperator( - const core::PlanNodePtr& planNode, - exec::DriverCtx* ctx, - StateBackend* stateBackend) { - auto statefulNode = - std::dynamic_pointer_cast(planNode); - VELOX_CHECK(statefulNode, "Not stateful node: {}", planNode->toString()); - std::vector targets; - std::unique_ptr op = std::move(nodeToOperator(statefulNode->node(), ctx)); - for (auto target : statefulNode->targets()) { - targets.push_back(std::move(nodeToStatefulOperator(target, ctx, stateBackend))); - } - if (auto watermarkAssignerNode = - std::dynamic_pointer_cast(statefulNode->node())) { - return std::make_unique( - std::move(op), - std::move(targets), - watermarkAssignerNode->idleTimeout(), - watermarkAssignerNode->rowtimeFieldIndex(), - watermarkAssignerNode->watermarkInterval()); - } else if (auto partitionNode = - std::dynamic_pointer_cast(statefulNode->node())) { - VELOX_CHECK(targets.size() == 0, "StreamPartitionNode should have no targets"); - int numPartitions = partitionNode->numPartitions(); - return std::make_unique( - std::move(op), - partitionNode->partition()->partitionFunctionSpec(), - numPartitions); - } else if ( - auto joinNode = - std::dynamic_pointer_cast(statefulNode->node())) { - VELOX_CHECK(joinNode->sources().size() == 2, "StreamJoinNode should have 2 sources"); - std::unique_ptr left = std::move(nodeToOperator(joinNode->sources()[0], ctx)); - std::unique_ptr right = std::move(nodeToOperator(joinNode->sources()[1], ctx)); - std::unique_ptr leftKeySelector = - std::make_unique( - std::move(joinNode->leftPartFuncSpec()->create(INT_MAX, false)), - op->pool(), - joinNode->numPartitions()); - std::unique_ptr rightKeySelector = - std::make_unique( - std::move(joinNode->rightPartFuncSpec()->create(INT_MAX, false)), - op->pool(), - joinNode->numPartitions()); - return std::make_unique( - std::move(left), - std::move(right), - std::move(leftKeySelector), - std::move(rightKeySelector), - std::move(op), - std::move(targets)); - } else if ( - auto joinNode = - std::dynamic_pointer_cast(statefulNode->node())) { - VELOX_CHECK(joinNode->sources().size() == 2, "StreamWindowJoinNode should have 2 sources"); - std::unique_ptr left = std::move(nodeToOperator(joinNode->sources()[0], ctx)); - std::unique_ptr right = std::move(nodeToOperator(joinNode->sources()[1], ctx)); - std::unique_ptr leftKeySelector = - std::make_unique( - std::move(joinNode->leftPartFuncSpec()->create(INT_MAX, false)), - op->pool(), - joinNode->numPartitions()); - std::unique_ptr rightKeySelector = - std::make_unique( - std::move(joinNode->rightPartFuncSpec()->create(INT_MAX, false)), - op->pool(), - joinNode->numPartitions()); - return std::make_unique( - std::move(left), - std::move(right), - std::move(leftKeySelector), - std::move(rightKeySelector), - std::move(op), - std::move(targets), - joinNode->leftWindowEndIndex(), - joinNode->rightWindowEndIndex()); - } else if ( - auto windowAggNode = - std::dynamic_pointer_cast(statefulNode->node())) { - std::unique_ptr keySelector = - std::make_unique( - std::move(windowAggNode->keySelectorSpec()->create(INT_MAX, true)), - op->pool()); - std::unique_ptr sliceAssigner = - std::make_unique( - std::move(windowAggNode->sliceAssignerSpec()->create(INT_MAX, true)), - op->pool()); - if (windowAggNode->isLocalAgg()) { - return std::make_unique( - std::move(op), - std::move(targets), - std::move(keySelector), - std::move(sliceAssigner), - windowAggNode->windowInterval(), - windowAggNode->useDayLightSaving(), - windowAggNode->outputType()); - } else { - auto localAggregator = nodeToOperator(windowAggNode->localAgg(), ctx); - std::unique_ptr globalSliceAssigner = - std::make_unique( - std::move(sliceAssigner), - windowAggNode->size(), - windowAggNode->step(), - windowAggNode->offset(), - windowAggNode->windowType(), - windowAggNode->rowtimeIndex()); - return std::make_unique( - std::move(localAggregator), - std::move(op), - std::move(targets), - std::move(keySelector), - std::move(globalSliceAssigner), - windowAggNode->windowInterval(), - windowAggNode->useDayLightSaving()); - } - } else if ( - auto windowAggNode = - std::dynamic_pointer_cast(statefulNode->node())) { - std::unique_ptr keySelector = - std::make_unique( - std::move(windowAggNode->keySelectorSpec()->create(INT_MAX, true)), - op->pool()); - std::unique_ptr sliceAssigner = - std::make_unique( - std::move(windowAggNode->sliceAssignerSpec()->create(INT_MAX, true)), - op->pool()); - std::unique_ptr windowAssigner = - std::make_unique( - std::move(sliceAssigner), - 0, - 0, - 0, - windowAggNode->windowType(), - windowAggNode->rowtimeIndex()); - return std::make_unique( - std::unique_ptr(dynamic_cast(op.release())), - // TODO: support window parameters - std::make_unique(10, windowAggNode->isEventTime()), - std::move(targets), - std::move(keySelector), - std::move(windowAssigner), - windowAggNode->allowedLateness(), - windowAggNode->produceUpdates(), - windowAggNode->rowtimeIndex(), - windowAggNode->isEventTime()); - } else if ( - auto rankNode = - std::dynamic_pointer_cast(statefulNode->node())) { - std::unique_ptr keySelector = - std::make_unique( - std::move(rankNode->keySelectorSpec()->create(INT_MAX, true)), - op->pool()); - return std::make_unique( - std::move(op), - std::move(keySelector), - std::move(targets)); - } else if ( - auto aggNode = - std::dynamic_pointer_cast(statefulNode->node())) { - std::unique_ptr keySelector = - std::make_unique( - std::move(aggNode->keySelectorSpec()->create(INT_MAX, true)), - op->pool()); - return std::make_unique( - std::move(op), - std::move(keySelector), - std::move(targets)); - } - - return std::make_unique(std::move(op), std::move(targets)); -} - -//static -std::unique_ptr StatefulPlanner::nodeToOperator( - const core::PlanNodePtr& planNode, - exec::DriverCtx* ctx) { - if (auto filterNode = - std::dynamic_pointer_cast(planNode)) { - if (planNode->sources().size() == 1) { - auto next = planNode->sources()[0]; - if (auto projectNode = - std::dynamic_pointer_cast(next)) { - return std::make_unique( - opId.fetch_add(1), - ctx, - filterNode, - projectNode); - } - } - return std::make_unique(opId.fetch_add(1), ctx, filterNode, nullptr); - } else if ( - auto projectNode = - std::dynamic_pointer_cast(planNode)) { - std::shared_ptr filterNode = nullptr; - const std::vector& sources = projectNode->sources(); - if (sources.size() == 1) { - filterNode = std::dynamic_pointer_cast(sources[0]); - } - return std::make_unique(opId.fetch_add(1), ctx, filterNode, projectNode); - } else if ( - auto joinNode = - std::dynamic_pointer_cast(planNode)) { - return nodeToOperator(joinNode->probe(), ctx); - } else if ( - auto partitionNode = - std::dynamic_pointer_cast(planNode)) { - return std::make_unique(opId.fetch_add(1), ctx, partitionNode->partition()); - } else if ( - auto valuesNode = - std::dynamic_pointer_cast(planNode)) { - return std::make_unique(opId.fetch_add(1), ctx, valuesNode); - } else if ( - auto tableScanNode = - std::dynamic_pointer_cast(planNode)) { - return std::make_unique(opId.fetch_add(1), ctx, tableScanNode); - } else if ( - auto tableWriteNode = - std::dynamic_pointer_cast(planNode)) { - return std::make_unique(opId.fetch_add(1), ctx, tableWriteNode); - } else if ( - auto tableWriteMergeNode = - std::dynamic_pointer_cast(planNode)) { - return std::make_unique(opId.fetch_add(1), ctx, tableWriteMergeNode); - } else if ( - auto joinNode = - std::dynamic_pointer_cast(planNode)) { - return std::make_unique(opId.fetch_add(1), ctx, joinNode); - } else if ( - auto joinNode = - std::dynamic_pointer_cast(planNode)) { - return std::make_unique(opId.fetch_add(1), ctx, joinNode); - } else if ( - auto joinNode = - std::dynamic_pointer_cast(planNode)) { - return std::make_unique(opId.fetch_add(1), ctx, joinNode); - } else if ( - auto aggregationNode = - std::dynamic_pointer_cast(planNode)) { - if (aggregationNode->isPreGrouped()) { - return std::make_unique(opId.fetch_add(1), ctx, aggregationNode); - } else { - return std::make_unique(opId.fetch_add(1), ctx, aggregationNode); - } - } else if ( - auto expandNode = - std::dynamic_pointer_cast(planNode)) { - return std::make_unique(opId.fetch_add(1), ctx, expandNode); - } else if ( - auto groupIdNode = - std::dynamic_pointer_cast(planNode)) { - return std::make_unique(opId.fetch_add(1), ctx, groupIdNode); - } else if ( - auto topNNode = - std::dynamic_pointer_cast(planNode)) { - return std::make_unique(opId.fetch_add(1), ctx, topNNode); - } else if ( - auto limitNode = - std::dynamic_pointer_cast(planNode)) { - return std::make_unique(opId.fetch_add(1), ctx, limitNode); - } else if ( - auto orderByNode = - std::dynamic_pointer_cast(planNode)) { - return std::make_unique(opId.fetch_add(1), ctx, orderByNode); - } else if ( - auto windowNode = - std::dynamic_pointer_cast(planNode)) { - return std::make_unique(opId.fetch_add(1), ctx, windowNode); - } else if ( - auto rowNumberNode = - std::dynamic_pointer_cast(planNode)) { - return std::make_unique(opId.fetch_add(1), ctx, rowNumberNode); - } else if ( - auto topNRowNumberNode = - std::dynamic_pointer_cast(planNode)) { - return std::make_unique(opId.fetch_add(1), ctx, topNRowNumberNode); - } else if ( - auto markDistinctNode = - std::dynamic_pointer_cast(planNode)) { - return std::make_unique(opId.fetch_add(1), ctx, markDistinctNode); - } else if ( - auto mergeJoin = - std::dynamic_pointer_cast(planNode)) { - auto mergeJoinOp = std::make_unique(opId.fetch_add(1), ctx, mergeJoin); - ctx->task->createMergeJoinSource(ctx->splitGroupId, mergeJoin->id()); - return std::move(mergeJoinOp); - } else if ( - auto unnest = - std::dynamic_pointer_cast(planNode)) { - return std::make_unique(opId.fetch_add(1), ctx, unnest); - } else if ( - auto enforceSingleRow = - std::dynamic_pointer_cast(planNode)) { - return std::make_unique(opId.fetch_add(1), ctx, enforceSingleRow); - } else if ( - auto assignUniqueIdNode = - std::dynamic_pointer_cast(planNode)) { - return std::make_unique( - opId.fetch_add(1), - ctx, - assignUniqueIdNode, - assignUniqueIdNode->taskUniqueId(), - assignUniqueIdNode->uniqueIdCounter()); - } else if ( - auto watermarkAssignerNode = - std::dynamic_pointer_cast(planNode)) { - return std::make_unique( - opId.fetch_add(1), - ctx, - nullptr, - watermarkAssignerNode->project()); - } else if ( - auto joinNode = - std::dynamic_pointer_cast(planNode)) { - return nodeToOperator(joinNode->probe(), ctx); - } else if ( - auto windowAggNode = - std::dynamic_pointer_cast(planNode)) { - return nodeToOperator(windowAggNode->aggregation(), ctx); - } else if ( - auto windowAggNode = - std::dynamic_pointer_cast(planNode)) { - return nodeToOperator(windowAggNode->aggregation(), ctx); - } else if ( - auto aggsHandlerNode = - std::dynamic_pointer_cast(planNode)) { - return std::make_unique(opId.fetch_add(1), ctx, aggsHandlerNode); - } else if ( - auto deduplicateNode = - std::dynamic_pointer_cast(planNode)) { - return std::make_unique( - opId.fetch_add(1), - ctx, - deduplicateNode, - deduplicateNode->rowtimeIndex(), - deduplicateNode->minRetentionTime(), - deduplicateNode->generateUpdateBefore(), - deduplicateNode->generateInsert(), - deduplicateNode->keepLastRow()); - } else if ( - auto topNNode = - std::dynamic_pointer_cast(planNode)) { - auto op = nodeToOperator(topNNode->topN(), ctx); - std::unique_ptr sortKeySelector = - std::make_unique( - topNNode->sortKeySelectorSpec()->create(INT_MAX, true), - op->pool()); - return std::make_unique( - opId.fetch_add(1), - ctx, - topNNode, - std::move(op), - std::move(sortKeySelector), - topNNode->generateUpdateBefore(), - topNNode->outputRankNumber(), - topNNode->cacheSize()); - } else if ( - auto rankNode = - std::dynamic_pointer_cast(planNode)) { - return nodeToOperator(rankNode->ranker(), ctx); - } else if ( - auto groupAggNode = - std::dynamic_pointer_cast(planNode)) { - return nodeToOperator(groupAggNode->aggregation(), ctx); - } else if ( - auto aggsHandlerNode = - std::dynamic_pointer_cast(planNode)) { - return std::make_unique( - opId.fetch_add(1), - ctx, - aggsHandlerNode, - std::make_unique(), // TODO: not complete yet - aggsHandlerNode->generateUpdateBefore(), - aggsHandlerNode->needRetraction()); - } else { - std::unique_ptr extended; - extended = exec::Operator::fromPlanNode(ctx, opId.fetch_add(1), planNode); - VELOX_CHECK(extended, "Unsupported plan node: {}", planNode->toString()); - return extended; - } -} - } // namespace facebook::velox::stateful diff --git a/velox/experimental/stateful/StatefulPlanner.h b/velox/experimental/stateful/StatefulPlanner.h index 03eebb3b5b2a..9f72598e4368 100644 --- a/velox/experimental/stateful/StatefulPlanner.h +++ b/velox/experimental/stateful/StatefulPlanner.h @@ -41,15 +41,6 @@ class StatefulPlanner { exec::DriverCtx* ctx_ = nullptr; StateBackend* stateBackend_ = nullptr; - - static std::unique_ptr nodeToStatefulOperator( - const core::PlanNodePtr& planNode, - exec::DriverCtx* ctx, - StateBackend* stateBackend); - - static std::unique_ptr nodeToOperator( - const core::PlanNodePtr& planNode, - exec::DriverCtx* ctx); StatefulOperatorPtr transformStatefulOperators(const core::PlanNodePtr& planNode); std::vector transformStatefulOperators(const std::vector& targets); std::unique_ptr transformOperator(const core::PlanNodePtr& planNode); From 2f81a170a1a6406ae32b915e76c310b623c4f9e7 Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Wed, 28 Jan 2026 19:42:43 +0800 Subject: [PATCH 4/7] update --- .../experimental/stateful/StatefulPlanner.cpp | 35 ++++++------------- 1 file changed, 11 insertions(+), 24 deletions(-) diff --git a/velox/experimental/stateful/StatefulPlanner.cpp b/velox/experimental/stateful/StatefulPlanner.cpp index 45ba143a50bb..73f3303fe55b 100644 --- a/velox/experimental/stateful/StatefulPlanner.cpp +++ b/velox/experimental/stateful/StatefulPlanner.cpp @@ -15,24 +15,18 @@ */ #include "velox/core/PlanFragment.h" #include "velox/exec/AssignUniqueId.h" -#include "velox/exec/CallbackSink.h" #include "velox/exec/EnforceSingleRow.h" -#include "velox/exec/Exchange.h" #include "velox/exec/Expand.h" #include "velox/exec/FilterProject.h" #include "velox/exec/GroupId.h" #include "velox/exec/HashAggregation.h" -#include "velox/exec/HashBuild.h" #include "velox/exec/HashProbe.h" #include "velox/exec/IndexLookupJoin.h" #include "velox/exec/Limit.h" #include "velox/exec/MarkDistinct.h" -#include "velox/exec/Merge.h" #include "velox/exec/MergeJoin.h" -#include "velox/exec/NestedLoopJoinBuild.h" #include "velox/exec/NestedLoopJoinProbe.h" #include "velox/exec/OrderBy.h" -#include "velox/exec/RoundRobinPartitionFunction.h" #include "velox/exec/RowNumber.h" #include "velox/exec/StreamingAggregation.h" #include "velox/exec/TableScan.h" @@ -65,9 +59,9 @@ namespace facebook::velox::stateful { -static std::atomic opId = 0; static int nextOperatorId() { + static std::atomic opId = 0; return opId.fetch_add(1); } @@ -80,8 +74,6 @@ StatefulOperatorPtr StatefulPlanner::plan( return planner.transformStatefulOperators(planFragment.planNode); } -#define CHECK_NODE_TYPE(TYPE, node) std::dynamic_pointer_cast(node->node()) != nullptr - StatefulOperatorPtr StatefulPlanner::transformStatefulOperators(const core::PlanNodePtr& planNode) { auto statefulNode = std::dynamic_pointer_cast(planNode); VELOX_CHECK(statefulNode, "Not stateful node: {}", planNode->toString()); @@ -272,7 +264,6 @@ StatefulOperatorPtr StatefulPlanner::transformGroupWindowAggregationOperator(con auto windowAggNode = std::dynamic_pointer_cast(planNode.node()); VELOX_CHECK(windowAggNode, "Failed to cast to GroupWindowAggregationNode"); - VELOX_MEM_LOG(ERROR)<< "transformGroupWindowAggregationOperator:" << planNode.toString(); auto op = transformOperator(windowAggNode->aggregation()); std::unique_ptr keySelector = @@ -343,6 +334,14 @@ StatefulOperatorPtr StatefulPlanner::transformGroupAggregationOperator(const Sta std::move(targets)); } +StatefulOperatorPtr StatefulPlanner::transformGenericOperator(const StatefulPlanNode& planNode) { + std::vector targets = transformStatefulOperators(planNode.targets()); + std::unique_ptr op = transformOperator(planNode.node()); + return std::make_unique( + std::move(op), + std::move(targets)); +} + std::unique_ptr StatefulPlanner::transformOperator(const core::PlanNodePtr& planNode) { if (auto filterNode = std::dynamic_pointer_cast(planNode)) { if (planNode->sources().size() == 1) { @@ -456,21 +455,9 @@ std::unique_ptr StatefulPlanner::transformOperator(const core::P topNNode->outputRankNumber(), topNNode->cacheSize()); } - std::unique_ptr extended; - extended = exec::Operator::fromPlanNode(ctx_, nextOperatorId(), planNode); - if (!extended) { - VELOX_MEM_LOG(ERROR)<< "Failed to create operator for plan node:" << process::StackTrace().toString(); - } - VELOX_CHECK(extended, "Unsupported plan node: {}", planNode->toString()); + std::unique_ptr extended = exec::Operator::fromPlanNode(ctx_, nextOperatorId(), planNode); + VELOX_CHECK(extended, "Unsupported plan node: {}\n{}", planNode->toString(), process::StackTrace().toString()); return extended; } -StatefulOperatorPtr StatefulPlanner::transformGenericOperator(const StatefulPlanNode& planNode) { - std::vector targets = transformStatefulOperators(planNode.targets()); - std::unique_ptr op = transformOperator(planNode.node()); - return std::make_unique( - std::move(op), - std::move(targets)); -} - } // namespace facebook::velox::stateful From 9435733b6916ef1eb0b4e9e27be10455303054ad Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Fri, 30 Jan 2026 10:14:36 +0800 Subject: [PATCH 5/7] fix --- velox/experimental/stateful/StatefulPlanner.cpp | 6 +++--- velox/experimental/stateful/StatefulPlanner.h | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/velox/experimental/stateful/StatefulPlanner.cpp b/velox/experimental/stateful/StatefulPlanner.cpp index 73f3303fe55b..09b61b83ebe8 100644 --- a/velox/experimental/stateful/StatefulPlanner.cpp +++ b/velox/experimental/stateful/StatefulPlanner.cpp @@ -427,15 +427,15 @@ std::unique_ptr StatefulPlanner::transformOperator(const core::P ctx_, aggsHandlerNode, std::make_unique(), // TODO: not complete yet - 0, // stateRetentionTime: default to 0 - aggsHandlerNode->generateUpdateBefore()); + aggsHandlerNode->generateUpdateBefore(), + aggsHandlerNode->needRetraction()); } else if (auto deduplicateNode = std::dynamic_pointer_cast(planNode)) { return std::make_unique( nextOperatorId(), ctx_, deduplicateNode, - deduplicateNode->minRetentionTime(), deduplicateNode->rowtimeIndex(), + deduplicateNode->minRetentionTime(), deduplicateNode->generateUpdateBefore(), deduplicateNode->generateInsert(), deduplicateNode->keepLastRow()); diff --git a/velox/experimental/stateful/StatefulPlanner.h b/velox/experimental/stateful/StatefulPlanner.h index 9f72598e4368..fbdf9932a644 100644 --- a/velox/experimental/stateful/StatefulPlanner.h +++ b/velox/experimental/stateful/StatefulPlanner.h @@ -35,7 +35,7 @@ class StatefulPlanner { exec::DriverCtx* ctx, StateBackend* stateBackend); protected: - StatefulPlanner(exec::DriverCtx* ctx, StateBackend* stateBackend) : ctx_(ctx),stateBackend_(stateBackend) {} + StatefulPlanner(exec::DriverCtx* ctx, StateBackend* stateBackend) : ctx_(ctx), stateBackend_(stateBackend) {} private: exec::DriverCtx* ctx_ = nullptr; From 89303d933ec9155edefa5c8874a885a6caef6b81 Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Fri, 30 Jan 2026 10:34:50 +0800 Subject: [PATCH 6/7] Refactor StatefulPlanner and fix operator constructor parameter order - Refactor nodeToStatefulOperator/nodeToOperator to transform* methods - Add support for GroupWindowAggsHandlerNode, GroupAggsHandlerNode, DeduplicateNode, StreamTopNNode - Fix RowTimeDeduplicateRanker parameter order: minRetentionTime should come before rowtimeIndex - Fix GroupAggregator parameter order: use stateRetentionTime=0 and generateUpdateBefore - Remove unused headers: CallbackSink, Exchange, HashBuild, Merge, NestedLoopJoinBuild, RoundRobinPartitionFunction - Add FIXME comment for stateRetentionTime handling in GroupAggregator --- .../experimental/stateful/StatefulPlanner.cpp | 800 ++++++++++-------- 1 file changed, 452 insertions(+), 348 deletions(-) diff --git a/velox/experimental/stateful/StatefulPlanner.cpp b/velox/experimental/stateful/StatefulPlanner.cpp index 09b61b83ebe8..ae83c83f64ca 100644 --- a/velox/experimental/stateful/StatefulPlanner.cpp +++ b/velox/experimental/stateful/StatefulPlanner.cpp @@ -13,6 +13,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#include "velox/experimental/stateful/StatefulPlanner.h" +#include #include "velox/core/PlanFragment.h" #include "velox/exec/AssignUniqueId.h" #include "velox/exec/EnforceSingleRow.h" @@ -39,30 +41,27 @@ #include "velox/exec/Values.h" #include "velox/exec/Window.h" #include "velox/experimental/stateful/EmptyOperator.h" +#include "velox/experimental/stateful/GroupWindowAggregator.h" #include "velox/experimental/stateful/KeySelector.h" #include "velox/experimental/stateful/LocalWindowAggregator.h" -#include "velox/experimental/stateful/StatefulPlanner.h" #include "velox/experimental/stateful/StatefulPlanNode.h" -#include "velox/experimental/stateful/StreamPartition.h" #include "velox/experimental/stateful/StreamJoin.h" +#include "velox/experimental/stateful/StreamKeyedOperator.h" +#include "velox/experimental/stateful/StreamPartition.h" #include "velox/experimental/stateful/WatermarkAssigner.h" #include "velox/experimental/stateful/WindowAggregator.h" #include "velox/experimental/stateful/WindowJoin.h" -#include "velox/experimental/stateful/GroupWindowAggregator.h" -#include "velox/experimental/stateful/window/GroupWindowAggsHandler.h" -#include "velox/experimental/stateful/StreamKeyedOperator.h" -#include "velox/experimental/stateful/rank/RowTimeDeduplicateRanker.h" -#include "velox/experimental/stateful/rank/AppendOnlyTopNRanker.h" #include "velox/experimental/stateful/agg/AggsHandleFunction.h" #include "velox/experimental/stateful/agg/GroupAggregator.h" -#include +#include "velox/experimental/stateful/rank/AppendOnlyTopNRanker.h" +#include "velox/experimental/stateful/rank/RowTimeDeduplicateRanker.h" +#include "velox/experimental/stateful/window/GroupWindowAggsHandler.h" namespace facebook::velox::stateful { - static int nextOperatorId() { - static std::atomic opId = 0; - return opId.fetch_add(1); + static std::atomic opId = 0; + return opId.fetch_add(1); } // static @@ -74,390 +73,495 @@ StatefulOperatorPtr StatefulPlanner::plan( return planner.transformStatefulOperators(planFragment.planNode); } -StatefulOperatorPtr StatefulPlanner::transformStatefulOperators(const core::PlanNodePtr& planNode) { - auto statefulNode = std::dynamic_pointer_cast(planNode); - VELOX_CHECK(statefulNode, "Not stateful node: {}", planNode->toString()); - StatefulOperatorPtr result; - if (std::dynamic_pointer_cast(statefulNode->node()) != nullptr) { - result = transformWatermarkAssignerOperator(*statefulNode); - } else if (std::dynamic_pointer_cast(statefulNode->node()) != nullptr) { - result = transformStreamPartitionOperator(*statefulNode); - } else if (std::dynamic_pointer_cast(statefulNode->node()) != nullptr) { - result = transformStreamJoinOperator(*statefulNode); - } else if (std::dynamic_pointer_cast(statefulNode->node()) != nullptr) { - result = transformStreamWindowJoinOperator(*statefulNode); - } else if (std::dynamic_pointer_cast(statefulNode->node()) != nullptr) { - result = transformStreamWindowAggregationOperator(*statefulNode); - } else if (std::dynamic_pointer_cast(statefulNode->node()) != nullptr) { - result = transformGroupWindowAggregationOperator(*statefulNode); - } else if (std::dynamic_pointer_cast(statefulNode->node()) != nullptr) { - result = transformStreamRankOperator(*statefulNode); - } else if (std::dynamic_pointer_cast(statefulNode->node()) != nullptr) { - result = transformGroupAggregationOperator(*statefulNode); - } else { - result = transformGenericOperator(*statefulNode); - } - VELOX_CHECK(result, "Failed to build operator for node: {}", planNode->toString()); - return result; +StatefulOperatorPtr StatefulPlanner::transformStatefulOperators( + const core::PlanNodePtr& planNode) { + auto statefulNode = + std::dynamic_pointer_cast(planNode); + VELOX_CHECK(statefulNode, "Not stateful node: {}", planNode->toString()); + StatefulOperatorPtr result; + if (std::dynamic_pointer_cast( + statefulNode->node()) != nullptr) { + result = transformWatermarkAssignerOperator(*statefulNode); + } else if ( + std::dynamic_pointer_cast( + statefulNode->node()) != nullptr) { + result = transformStreamPartitionOperator(*statefulNode); + } else if ( + std::dynamic_pointer_cast(statefulNode->node()) != + nullptr) { + result = transformStreamJoinOperator(*statefulNode); + } else if ( + std::dynamic_pointer_cast( + statefulNode->node()) != nullptr) { + result = transformStreamWindowJoinOperator(*statefulNode); + } else if ( + std::dynamic_pointer_cast( + statefulNode->node()) != nullptr) { + result = transformStreamWindowAggregationOperator(*statefulNode); + } else if ( + std::dynamic_pointer_cast( + statefulNode->node()) != nullptr) { + result = transformGroupWindowAggregationOperator(*statefulNode); + } else if ( + std::dynamic_pointer_cast(statefulNode->node()) != + nullptr) { + result = transformStreamRankOperator(*statefulNode); + } else if ( + std::dynamic_pointer_cast( + statefulNode->node()) != nullptr) { + result = transformGroupAggregationOperator(*statefulNode); + } else { + result = transformGenericOperator(*statefulNode); + } + VELOX_CHECK( + result, "Failed to build operator for node: {}", planNode->toString()); + return result; } -std::vector StatefulPlanner::transformStatefulOperators(const std::vector& targets) { - std::vector operators; - operators.resize(targets.size()); - std::transform(targets.begin(), targets.end(), operators.begin(), [this](const core::PlanNodePtr& target) { +std::vector StatefulPlanner::transformStatefulOperators( + const std::vector& targets) { + std::vector operators; + operators.resize(targets.size()); + std::transform( + targets.begin(), + targets.end(), + operators.begin(), + [this](const core::PlanNodePtr& target) { return transformStatefulOperators(target); - }); - return operators; + }); + return operators; } -StatefulOperatorPtr StatefulPlanner::transformWatermarkAssignerOperator(const StatefulPlanNode& planNode) { - std::vector targets = transformStatefulOperators(planNode.targets()); +StatefulOperatorPtr StatefulPlanner::transformWatermarkAssignerOperator( + const StatefulPlanNode& planNode) { + std::vector targets = + transformStatefulOperators(planNode.targets()); - auto watermarkAssignerNode = std::dynamic_pointer_cast(planNode.node()); + auto watermarkAssignerNode = + std::dynamic_pointer_cast(planNode.node()); - auto op = std::make_unique( - nextOperatorId(), - ctx_, - nullptr, - watermarkAssignerNode->project()); + auto op = std::make_unique( + nextOperatorId(), ctx_, nullptr, watermarkAssignerNode->project()); - return std::make_unique( - std::move(op), - std::move(targets), - watermarkAssignerNode->idleTimeout(), - watermarkAssignerNode->rowtimeFieldIndex(), - watermarkAssignerNode->watermarkInterval()); + return std::make_unique( + std::move(op), + std::move(targets), + watermarkAssignerNode->idleTimeout(), + watermarkAssignerNode->rowtimeFieldIndex(), + watermarkAssignerNode->watermarkInterval()); } -StatefulOperatorPtr StatefulPlanner::transformStreamPartitionOperator(const StatefulPlanNode& planNode) { - std::vector targets = transformStatefulOperators(planNode.targets()); - VELOX_CHECK(targets.empty(), "StreamPartitionNode should have no targets"); +StatefulOperatorPtr StatefulPlanner::transformStreamPartitionOperator( + const StatefulPlanNode& planNode) { + std::vector targets = + transformStatefulOperators(planNode.targets()); + VELOX_CHECK(targets.empty(), "StreamPartitionNode should have no targets"); - auto partitionNode = std::dynamic_pointer_cast(planNode.node()); - VELOX_CHECK(partitionNode, "Failed to cast to StreamPartitionNode"); + auto partitionNode = + std::dynamic_pointer_cast(planNode.node()); + VELOX_CHECK(partitionNode, "Failed to cast to StreamPartitionNode"); - auto op = std::make_unique( - nextOperatorId(), - ctx_, - partitionNode->partition()); + auto op = std::make_unique( + nextOperatorId(), ctx_, partitionNode->partition()); - return std::make_unique( - std::move(op), - partitionNode->partition()->partitionFunctionSpec(), - partitionNode->numPartitions()); + return std::make_unique( + std::move(op), + partitionNode->partition()->partitionFunctionSpec(), + partitionNode->numPartitions()); } -StatefulOperatorPtr StatefulPlanner::transformStreamJoinOperator(const StatefulPlanNode& planNode) { - std::vector targets = transformStatefulOperators(planNode.targets()); - - auto joinNode = std::dynamic_pointer_cast(planNode.node()); - VELOX_CHECK(joinNode, "Failed to cast to StreamJoinNode"); - VELOX_CHECK(joinNode->sources().size() == 2, "StreamJoinNode should have 2 sources"); - - std::unique_ptr left = transformOperator(joinNode->sources()[0]); - std::unique_ptr right = transformOperator(joinNode->sources()[1]); - std::unique_ptr probe = transformOperator(joinNode->probe()); - - std::unique_ptr leftKeySelector = - std::make_unique( - joinNode->leftPartFuncSpec()->create(INT_MAX, false), - probe->pool(), - joinNode->numPartitions()); - std::unique_ptr rightKeySelector = - std::make_unique( - joinNode->rightPartFuncSpec()->create(INT_MAX, false), - probe->pool(), - joinNode->numPartitions()); - - return std::make_unique( - std::move(left), - std::move(right), - std::move(leftKeySelector), - std::move(rightKeySelector), - std::move(probe), - std::move(targets)); +StatefulOperatorPtr StatefulPlanner::transformStreamJoinOperator( + const StatefulPlanNode& planNode) { + std::vector targets = + transformStatefulOperators(planNode.targets()); + + auto joinNode = + std::dynamic_pointer_cast(planNode.node()); + VELOX_CHECK(joinNode, "Failed to cast to StreamJoinNode"); + VELOX_CHECK( + joinNode->sources().size() == 2, "StreamJoinNode should have 2 sources"); + + std::unique_ptr left = + transformOperator(joinNode->sources()[0]); + std::unique_ptr right = + transformOperator(joinNode->sources()[1]); + std::unique_ptr probe = transformOperator(joinNode->probe()); + + std::unique_ptr leftKeySelector = std::make_unique( + joinNode->leftPartFuncSpec()->create(INT_MAX, false), + probe->pool(), + joinNode->numPartitions()); + std::unique_ptr rightKeySelector = std::make_unique( + joinNode->rightPartFuncSpec()->create(INT_MAX, false), + probe->pool(), + joinNode->numPartitions()); + + return std::make_unique( + std::move(left), + std::move(right), + std::move(leftKeySelector), + std::move(rightKeySelector), + std::move(probe), + std::move(targets)); } -StatefulOperatorPtr StatefulPlanner::transformStreamWindowJoinOperator(const StatefulPlanNode& planNode) { - std::vector targets = transformStatefulOperators(planNode.targets()); - - auto joinNode = std::dynamic_pointer_cast(planNode.node()); - VELOX_CHECK(joinNode, "Failed to cast to StreamWindowJoinNode"); - VELOX_CHECK(joinNode->sources().size() == 2, "StreamWindowJoinNode should have 2 sources"); - - std::unique_ptr left = transformOperator(joinNode->sources()[0]); - std::unique_ptr right = transformOperator(joinNode->sources()[1]); - std::unique_ptr probe = transformOperator(joinNode->probe()); - - std::unique_ptr leftKeySelector = - std::make_unique( - joinNode->leftPartFuncSpec()->create(INT_MAX, false), - probe->pool(), - joinNode->numPartitions()); - std::unique_ptr rightKeySelector = - std::make_unique( - joinNode->rightPartFuncSpec()->create(INT_MAX, false), - probe->pool(), - joinNode->numPartitions()); - - return std::make_unique( - std::move(left), - std::move(right), - std::move(leftKeySelector), - std::move(rightKeySelector), - std::move(probe), - std::move(targets), - joinNode->leftWindowEndIndex(), - joinNode->rightWindowEndIndex()); +StatefulOperatorPtr StatefulPlanner::transformStreamWindowJoinOperator( + const StatefulPlanNode& planNode) { + std::vector targets = + transformStatefulOperators(planNode.targets()); + + auto joinNode = + std::dynamic_pointer_cast(planNode.node()); + VELOX_CHECK(joinNode, "Failed to cast to StreamWindowJoinNode"); + VELOX_CHECK( + joinNode->sources().size() == 2, + "StreamWindowJoinNode should have 2 sources"); + + std::unique_ptr left = + transformOperator(joinNode->sources()[0]); + std::unique_ptr right = + transformOperator(joinNode->sources()[1]); + std::unique_ptr probe = transformOperator(joinNode->probe()); + + std::unique_ptr leftKeySelector = std::make_unique( + joinNode->leftPartFuncSpec()->create(INT_MAX, false), + probe->pool(), + joinNode->numPartitions()); + std::unique_ptr rightKeySelector = std::make_unique( + joinNode->rightPartFuncSpec()->create(INT_MAX, false), + probe->pool(), + joinNode->numPartitions()); + + return std::make_unique( + std::move(left), + std::move(right), + std::move(leftKeySelector), + std::move(rightKeySelector), + std::move(probe), + std::move(targets), + joinNode->leftWindowEndIndex(), + joinNode->rightWindowEndIndex()); } -StatefulOperatorPtr StatefulPlanner::transformStreamWindowAggregationOperator(const StatefulPlanNode& planNode) { - std::vector targets = transformStatefulOperators(planNode.targets()); +StatefulOperatorPtr StatefulPlanner::transformStreamWindowAggregationOperator( + const StatefulPlanNode& planNode) { + std::vector targets = + transformStatefulOperators(planNode.targets()); - auto windowAggNode = std::dynamic_pointer_cast(planNode.node()); - VELOX_CHECK(windowAggNode, "Failed to cast to StreamWindowAggregationNode"); + auto windowAggNode = + std::dynamic_pointer_cast( + planNode.node()); + VELOX_CHECK(windowAggNode, "Failed to cast to StreamWindowAggregationNode"); - auto op = transformOperator(windowAggNode->aggregation()); + auto op = transformOperator(windowAggNode->aggregation()); - std::unique_ptr keySelector = - std::make_unique( - windowAggNode->keySelectorSpec()->create(INT_MAX, true), - op->pool()); - std::unique_ptr sliceAssigner = - std::make_unique( - windowAggNode->sliceAssignerSpec()->create(INT_MAX, true), - op->pool()); + std::unique_ptr keySelector = std::make_unique( + windowAggNode->keySelectorSpec()->create(INT_MAX, true), op->pool()); + std::unique_ptr sliceAssigner = std::make_unique( + windowAggNode->sliceAssignerSpec()->create(INT_MAX, true), op->pool()); - if (windowAggNode->isLocalAgg()) { - return std::make_unique( - std::move(op), - std::move(targets), - std::move(keySelector), - std::move(sliceAssigner), - windowAggNode->windowInterval(), - windowAggNode->useDayLightSaving(), - windowAggNode->outputType()); - } else { - auto localAggregator = transformOperator(windowAggNode->localAgg()); - std::unique_ptr globalSliceAssigner = - std::make_unique( - std::move(sliceAssigner), - windowAggNode->size(), - windowAggNode->step(), - windowAggNode->offset(), - windowAggNode->windowType(), - windowAggNode->rowtimeIndex()); - return std::make_unique( - std::move(localAggregator), - std::move(op), - std::move(targets), - std::move(keySelector), - std::move(globalSliceAssigner), - windowAggNode->windowInterval(), - windowAggNode->useDayLightSaving()); - } -} - -StatefulOperatorPtr StatefulPlanner::transformGroupWindowAggregationOperator(const StatefulPlanNode& planNode) { - std::vector targets = transformStatefulOperators(planNode.targets()); - - auto windowAggNode = std::dynamic_pointer_cast(planNode.node()); - VELOX_CHECK(windowAggNode, "Failed to cast to GroupWindowAggregationNode"); - - auto op = transformOperator(windowAggNode->aggregation()); - - std::unique_ptr keySelector = - std::make_unique( - windowAggNode->keySelectorSpec()->create(INT_MAX, true), - op->pool()); - std::unique_ptr sliceAssigner = - std::make_unique( - windowAggNode->sliceAssignerSpec()->create(INT_MAX, true), - op->pool()); - std::unique_ptr windowAssigner = + if (windowAggNode->isLocalAgg()) { + return std::make_unique( + std::move(op), + std::move(targets), + std::move(keySelector), + std::move(sliceAssigner), + windowAggNode->windowInterval(), + windowAggNode->useDayLightSaving(), + windowAggNode->outputType()); + } else { + auto localAggregator = transformOperator(windowAggNode->localAgg()); + std::unique_ptr globalSliceAssigner = std::make_unique( std::move(sliceAssigner), - 0, - 0, - 0, + windowAggNode->size(), + windowAggNode->step(), + windowAggNode->offset(), windowAggNode->windowType(), windowAggNode->rowtimeIndex()); - - return std::make_unique( - std::unique_ptr(dynamic_cast(op.release())), - // TODO: support window parameters - std::make_unique(10, windowAggNode->isEventTime()), + return std::make_unique( + std::move(localAggregator), + std::move(op), std::move(targets), std::move(keySelector), - std::move(windowAssigner), - windowAggNode->allowedLateness(), - windowAggNode->produceUpdates(), - windowAggNode->rowtimeIndex(), - windowAggNode->isEventTime()); + std::move(globalSliceAssigner), + windowAggNode->windowInterval(), + windowAggNode->useDayLightSaving()); + } } -StatefulOperatorPtr StatefulPlanner::transformStreamRankOperator(const StatefulPlanNode& planNode) { - std::vector targets = transformStatefulOperators(planNode.targets()); +StatefulOperatorPtr StatefulPlanner::transformGroupWindowAggregationOperator( + const StatefulPlanNode& planNode) { + std::vector targets = + transformStatefulOperators(planNode.targets()); + + auto windowAggNode = + std::dynamic_pointer_cast( + planNode.node()); + VELOX_CHECK(windowAggNode, "Failed to cast to GroupWindowAggregationNode"); + + auto op = transformOperator(windowAggNode->aggregation()); + + std::unique_ptr keySelector = std::make_unique( + windowAggNode->keySelectorSpec()->create(INT_MAX, true), op->pool()); + std::unique_ptr sliceAssigner = std::make_unique( + windowAggNode->sliceAssignerSpec()->create(INT_MAX, true), op->pool()); + std::unique_ptr windowAssigner = + std::make_unique( + std::move(sliceAssigner), + 0, + 0, + 0, + windowAggNode->windowType(), + windowAggNode->rowtimeIndex()); + + return std::make_unique( + std::unique_ptr( + dynamic_cast(op.release())), + // TODO: support window parameters + std::make_unique(10, windowAggNode->isEventTime()), + std::move(targets), + std::move(keySelector), + std::move(windowAssigner), + windowAggNode->allowedLateness(), + windowAggNode->produceUpdates(), + windowAggNode->rowtimeIndex(), + windowAggNode->isEventTime()); +} - auto rankNode = std::dynamic_pointer_cast(planNode.node()); - VELOX_CHECK(rankNode, "Failed to cast to StreamRankNode"); +StatefulOperatorPtr StatefulPlanner::transformStreamRankOperator( + const StatefulPlanNode& planNode) { + std::vector targets = + transformStatefulOperators(planNode.targets()); - auto op = transformOperator(rankNode->ranker()); + auto rankNode = + std::dynamic_pointer_cast(planNode.node()); + VELOX_CHECK(rankNode, "Failed to cast to StreamRankNode"); - std::unique_ptr keySelector = - std::make_unique( - rankNode->keySelectorSpec()->create(INT_MAX, true), - op->pool()); + auto op = transformOperator(rankNode->ranker()); - return std::make_unique( - std::move(op), - std::move(keySelector), - std::move(targets)); + std::unique_ptr keySelector = std::make_unique( + rankNode->keySelectorSpec()->create(INT_MAX, true), op->pool()); + + return std::make_unique( + std::move(op), std::move(keySelector), std::move(targets)); } -StatefulOperatorPtr StatefulPlanner::transformGroupAggregationOperator(const StatefulPlanNode& planNode) { - std::vector targets = transformStatefulOperators(planNode.targets()); +StatefulOperatorPtr StatefulPlanner::transformGroupAggregationOperator( + const StatefulPlanNode& planNode) { + std::vector targets = + transformStatefulOperators(planNode.targets()); - auto aggNode = std::dynamic_pointer_cast(planNode.node()); - VELOX_CHECK(aggNode, "Failed to cast to GroupAggregationNode"); + auto aggNode = + std::dynamic_pointer_cast(planNode.node()); + VELOX_CHECK(aggNode, "Failed to cast to GroupAggregationNode"); - auto op = transformOperator(aggNode->aggregation()); + auto op = transformOperator(aggNode->aggregation()); - std::unique_ptr keySelector = - std::make_unique( - aggNode->keySelectorSpec()->create(INT_MAX, true), - op->pool()); + std::unique_ptr keySelector = std::make_unique( + aggNode->keySelectorSpec()->create(INT_MAX, true), op->pool()); - return std::make_unique( - std::move(op), - std::move(keySelector), - std::move(targets)); + return std::make_unique( + std::move(op), std::move(keySelector), std::move(targets)); } -StatefulOperatorPtr StatefulPlanner::transformGenericOperator(const StatefulPlanNode& planNode) { - std::vector targets = transformStatefulOperators(planNode.targets()); - std::unique_ptr op = transformOperator(planNode.node()); - return std::make_unique( - std::move(op), - std::move(targets)); +StatefulOperatorPtr StatefulPlanner::transformGenericOperator( + const StatefulPlanNode& planNode) { + std::vector targets = + transformStatefulOperators(planNode.targets()); + std::unique_ptr op = transformOperator(planNode.node()); + return std::make_unique(std::move(op), std::move(targets)); } -std::unique_ptr StatefulPlanner::transformOperator(const core::PlanNodePtr& planNode) { - if (auto filterNode = std::dynamic_pointer_cast(planNode)) { - if (planNode->sources().size() == 1) { - auto next = planNode->sources()[0]; - if (auto projectNode = std::dynamic_pointer_cast(next)) { - return std::make_unique( - nextOperatorId(), - ctx_, - filterNode, - projectNode); - } - } +std::unique_ptr StatefulPlanner::transformOperator( + const core::PlanNodePtr& planNode) { + if (auto filterNode = + std::dynamic_pointer_cast(planNode)) { + if (planNode->sources().size() == 1) { + auto next = planNode->sources()[0]; + if (auto projectNode = + std::dynamic_pointer_cast(next)) { return std::make_unique( - nextOperatorId(), - ctx_, - filterNode, - nullptr); - } else if (auto projectNode = std::dynamic_pointer_cast(planNode)) { - std::shared_ptr filterNode = nullptr; - const std::vector& sources = projectNode->sources(); - if (sources.size() == 1) { - filterNode = std::dynamic_pointer_cast(sources[0]); - } - return std::make_unique(nextOperatorId(), ctx_, filterNode, projectNode); - } else if (auto valuesNode = std::dynamic_pointer_cast(planNode)) { - return std::make_unique(nextOperatorId(), ctx_, valuesNode); - } else if (auto tableScanNode = std::dynamic_pointer_cast(planNode)) { - return std::make_unique(nextOperatorId(), ctx_, tableScanNode); - } else if (auto tableWriteNode = std::dynamic_pointer_cast(planNode)) { - return std::make_unique(nextOperatorId(), ctx_, tableWriteNode); - } else if (auto tableWriteMergeNode = std::dynamic_pointer_cast(planNode)) { - return std::make_unique(nextOperatorId(), ctx_, tableWriteMergeNode); - } else if (auto joinNode = std::dynamic_pointer_cast(planNode)) { - return std::make_unique(nextOperatorId(), ctx_, joinNode); - } else if (auto joinNode = std::dynamic_pointer_cast(planNode)) { - return std::make_unique(nextOperatorId(), ctx_, joinNode); - } else if (auto joinNode = std::dynamic_pointer_cast(planNode)) { - return std::make_unique(nextOperatorId(), ctx_, joinNode); - } else if (auto aggregationNode = std::dynamic_pointer_cast(planNode)) { - if (aggregationNode->isPreGrouped()) { - return std::make_unique(nextOperatorId(), ctx_, aggregationNode); - } else { - return std::make_unique(nextOperatorId(), ctx_, aggregationNode); - } - } else if (auto expandNode = std::dynamic_pointer_cast(planNode)) { - return std::make_unique(nextOperatorId(), ctx_, expandNode); - } else if (auto groupIdNode = std::dynamic_pointer_cast(planNode)) { - return std::make_unique(nextOperatorId(), ctx_, groupIdNode); - } else if (auto topNNode = std::dynamic_pointer_cast(planNode)) { - return std::make_unique(nextOperatorId(), ctx_, topNNode); - } else if (auto limitNode = std::dynamic_pointer_cast(planNode)) { - return std::make_unique(nextOperatorId(), ctx_, limitNode); - } else if (auto orderByNode = std::dynamic_pointer_cast(planNode)) { - return std::make_unique(nextOperatorId(), ctx_, orderByNode); - } else if (auto windowNode = std::dynamic_pointer_cast(planNode)) { - return std::make_unique(nextOperatorId(), ctx_, windowNode); - } else if (auto rowNumberNode = std::dynamic_pointer_cast(planNode)) { - return std::make_unique(nextOperatorId(), ctx_, rowNumberNode); - } else if (auto topNRowNumberNode = std::dynamic_pointer_cast(planNode)) { - return std::make_unique(nextOperatorId(), ctx_, topNRowNumberNode); - } else if (auto markDistinctNode = std::dynamic_pointer_cast(planNode)) { - return std::make_unique(nextOperatorId(), ctx_, markDistinctNode); - } else if (auto mergeJoin = std::dynamic_pointer_cast(planNode)) { - auto mergeJoinOp = std::make_unique(nextOperatorId(), ctx_, mergeJoin); - ctx_->task->createMergeJoinSource(ctx_->splitGroupId, mergeJoin->id()); - return std::move(mergeJoinOp); - } else if (auto unnest = std::dynamic_pointer_cast(planNode)) { - return std::make_unique(nextOperatorId(), ctx_, unnest); - } else if (auto enforceSingleRow = std::dynamic_pointer_cast(planNode)) { - return std::make_unique(nextOperatorId(), ctx_, enforceSingleRow); - } else if (auto assignUniqueIdNode = std::dynamic_pointer_cast(planNode)) { - return std::make_unique( - nextOperatorId(), - ctx_, - assignUniqueIdNode, - assignUniqueIdNode->taskUniqueId(), - assignUniqueIdNode->uniqueIdCounter()); - } else if (auto aggsHandlerNode = std::dynamic_pointer_cast(planNode)) { - return std::make_unique(nextOperatorId(), ctx_, aggsHandlerNode); - } else if (auto aggsHandlerNode = std::dynamic_pointer_cast(planNode)) { - return std::make_unique( - nextOperatorId(), - ctx_, - aggsHandlerNode, - std::make_unique(), // TODO: not complete yet - aggsHandlerNode->generateUpdateBefore(), - aggsHandlerNode->needRetraction()); - } else if (auto deduplicateNode = std::dynamic_pointer_cast(planNode)) { - return std::make_unique( - nextOperatorId(), - ctx_, - deduplicateNode, - deduplicateNode->rowtimeIndex(), - deduplicateNode->minRetentionTime(), - deduplicateNode->generateUpdateBefore(), - deduplicateNode->generateInsert(), - deduplicateNode->keepLastRow()); - } else if (auto topNNode = std::dynamic_pointer_cast(planNode)) { - auto op = transformOperator(topNNode->topN()); - std::shared_ptr sortKeySelector = - std::make_shared( - topNNode->sortKeySelectorSpec()->create(INT_MAX, true), - op->pool()); - return std::make_unique( - nextOperatorId(), - ctx_, - topNNode, - std::move(op), - sortKeySelector, - topNNode->generateUpdateBefore(), - topNNode->outputRankNumber(), - topNNode->cacheSize()); + nextOperatorId(), ctx_, filterNode, projectNode); + } } - std::unique_ptr extended = exec::Operator::fromPlanNode(ctx_, nextOperatorId(), planNode); - VELOX_CHECK(extended, "Unsupported plan node: {}\n{}", planNode->toString(), process::StackTrace().toString()); - return extended; + return std::make_unique( + nextOperatorId(), ctx_, filterNode, nullptr); + } else if ( + auto projectNode = + std::dynamic_pointer_cast(planNode)) { + std::shared_ptr filterNode = nullptr; + const std::vector& sources = projectNode->sources(); + if (sources.size() == 1) { + filterNode = + std::dynamic_pointer_cast(sources[0]); + } + return std::make_unique( + nextOperatorId(), ctx_, filterNode, projectNode); + } else if ( + auto valuesNode = + std::dynamic_pointer_cast(planNode)) { + return std::make_unique(nextOperatorId(), ctx_, valuesNode); + } else if ( + auto tableScanNode = + std::dynamic_pointer_cast(planNode)) { + return std::make_unique( + nextOperatorId(), ctx_, tableScanNode); + } else if ( + auto tableWriteNode = + std::dynamic_pointer_cast(planNode)) { + return std::make_unique( + nextOperatorId(), ctx_, tableWriteNode); + } else if ( + auto tableWriteMergeNode = + std::dynamic_pointer_cast( + planNode)) { + return std::make_unique( + nextOperatorId(), ctx_, tableWriteMergeNode); + } else if ( + auto joinNode = + std::dynamic_pointer_cast(planNode)) { + return std::make_unique(nextOperatorId(), ctx_, joinNode); + } else if ( + auto joinNode = + std::dynamic_pointer_cast(planNode)) { + return std::make_unique( + nextOperatorId(), ctx_, joinNode); + } else if ( + auto joinNode = + std::dynamic_pointer_cast( + planNode)) { + return std::make_unique( + nextOperatorId(), ctx_, joinNode); + } else if ( + auto aggregationNode = + std::dynamic_pointer_cast(planNode)) { + if (aggregationNode->isPreGrouped()) { + return std::make_unique( + nextOperatorId(), ctx_, aggregationNode); + } else { + return std::make_unique( + nextOperatorId(), ctx_, aggregationNode); + } + } else if ( + auto expandNode = + std::dynamic_pointer_cast(planNode)) { + return std::make_unique(nextOperatorId(), ctx_, expandNode); + } else if ( + auto groupIdNode = + std::dynamic_pointer_cast(planNode)) { + return std::make_unique(nextOperatorId(), ctx_, groupIdNode); + } else if ( + auto topNNode = + std::dynamic_pointer_cast(planNode)) { + return std::make_unique(nextOperatorId(), ctx_, topNNode); + } else if ( + auto limitNode = + std::dynamic_pointer_cast(planNode)) { + return std::make_unique(nextOperatorId(), ctx_, limitNode); + } else if ( + auto orderByNode = + std::dynamic_pointer_cast(planNode)) { + return std::make_unique(nextOperatorId(), ctx_, orderByNode); + } else if ( + auto windowNode = + std::dynamic_pointer_cast(planNode)) { + return std::make_unique(nextOperatorId(), ctx_, windowNode); + } else if ( + auto rowNumberNode = + std::dynamic_pointer_cast(planNode)) { + return std::make_unique( + nextOperatorId(), ctx_, rowNumberNode); + } else if ( + auto topNRowNumberNode = + std::dynamic_pointer_cast(planNode)) { + return std::make_unique( + nextOperatorId(), ctx_, topNRowNumberNode); + } else if ( + auto markDistinctNode = + std::dynamic_pointer_cast(planNode)) { + return std::make_unique( + nextOperatorId(), ctx_, markDistinctNode); + } else if ( + auto mergeJoin = + std::dynamic_pointer_cast(planNode)) { + auto mergeJoinOp = + std::make_unique(nextOperatorId(), ctx_, mergeJoin); + ctx_->task->createMergeJoinSource(ctx_->splitGroupId, mergeJoin->id()); + return std::move(mergeJoinOp); + } else if ( + auto unnest = + std::dynamic_pointer_cast(planNode)) { + return std::make_unique(nextOperatorId(), ctx_, unnest); + } else if ( + auto enforceSingleRow = + std::dynamic_pointer_cast( + planNode)) { + return std::make_unique( + nextOperatorId(), ctx_, enforceSingleRow); + } else if ( + auto assignUniqueIdNode = + std::dynamic_pointer_cast(planNode)) { + return std::make_unique( + nextOperatorId(), + ctx_, + assignUniqueIdNode, + assignUniqueIdNode->taskUniqueId(), + assignUniqueIdNode->uniqueIdCounter()); + } else if ( + auto aggsHandlerNode = + std::dynamic_pointer_cast( + planNode)) { + return std::make_unique( + nextOperatorId(), ctx_, aggsHandlerNode); + } else if ( + auto aggsHandlerNode = + std::dynamic_pointer_cast(planNode)) { + // FIXME: stateRetentionTime is not handled yet + return std::make_unique( + nextOperatorId(), + ctx_, + aggsHandlerNode, + std::make_unique(), // TODO: not complete yet + 0, + aggsHandlerNode->generateUpdateBefore()); + } else if ( + auto deduplicateNode = + std::dynamic_pointer_cast(planNode)) { + return std::make_unique( + nextOperatorId(), + ctx_, + deduplicateNode, + deduplicateNode->minRetentionTime(), + deduplicateNode->rowtimeIndex(), + deduplicateNode->generateUpdateBefore(), + deduplicateNode->generateInsert(), + deduplicateNode->keepLastRow()); + } else if ( + auto topNNode = + std::dynamic_pointer_cast(planNode)) { + auto op = transformOperator(topNNode->topN()); + std::shared_ptr sortKeySelector = + std::make_shared( + topNNode->sortKeySelectorSpec()->create(INT_MAX, true), op->pool()); + return std::make_unique( + nextOperatorId(), + ctx_, + topNNode, + std::move(op), + sortKeySelector, + topNNode->generateUpdateBefore(), + topNNode->outputRankNumber(), + topNNode->cacheSize()); + } + std::unique_ptr extended = + exec::Operator::fromPlanNode(ctx_, nextOperatorId(), planNode); + VELOX_CHECK( + extended, + "Unsupported plan node: {}\n{}", + planNode->toString(), + process::StackTrace().toString()); + return extended; } } // namespace facebook::velox::stateful From a52e618fb42b6c075854901ab00f5328808cdc85 Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Fri, 30 Jan 2026 10:40:53 +0800 Subject: [PATCH 7/7] Format StatefulPlanner.h to match Velox code style --- velox/experimental/stateful/StatefulPlanner.h | 44 ++++++++++++------- 1 file changed, 28 insertions(+), 16 deletions(-) diff --git a/velox/experimental/stateful/StatefulPlanner.h b/velox/experimental/stateful/StatefulPlanner.h index fbdf9932a644..afebee685452 100644 --- a/velox/experimental/stateful/StatefulPlanner.h +++ b/velox/experimental/stateful/StatefulPlanner.h @@ -27,32 +27,44 @@ struct PlanFragment; namespace facebook::velox::stateful { class StatefulPlanner { - public: // Create stateful operator chain according to plan. static StatefulOperatorPtr plan( const core::PlanFragment& planFragment, exec::DriverCtx* ctx, StateBackend* stateBackend); + protected: - StatefulPlanner(exec::DriverCtx* ctx, StateBackend* stateBackend) : ctx_(ctx), stateBackend_(stateBackend) {} - + StatefulPlanner(exec::DriverCtx* ctx, StateBackend* stateBackend) + : ctx_(ctx), stateBackend_(stateBackend) {} + private: exec::DriverCtx* ctx_ = nullptr; StateBackend* stateBackend_ = nullptr; - StatefulOperatorPtr transformStatefulOperators(const core::PlanNodePtr& planNode); - std::vector transformStatefulOperators(const std::vector& targets); - std::unique_ptr transformOperator(const core::PlanNodePtr& planNode); - StatefulOperatorPtr transformWatermarkAssignerOperator(const StatefulPlanNode& planNode); - StatefulOperatorPtr transformStreamPartitionOperator(const StatefulPlanNode& planNode); - StatefulOperatorPtr transformStreamJoinOperator(const StatefulPlanNode& planNode); - StatefulOperatorPtr transformStreamWindowJoinOperator(const StatefulPlanNode& planNode); - StatefulOperatorPtr transformStreamWindowAggregationOperator(const StatefulPlanNode& planNode); - StatefulOperatorPtr transformGroupWindowAggregationOperator(const StatefulPlanNode& planNode); - StatefulOperatorPtr transformStreamRankOperator(const StatefulPlanNode& planNode); - StatefulOperatorPtr transformGroupAggregationOperator(const StatefulPlanNode& planNode); - StatefulOperatorPtr transformGenericOperator(const StatefulPlanNode& planNode); - + StatefulOperatorPtr transformStatefulOperators( + const core::PlanNodePtr& planNode); + std::vector transformStatefulOperators( + const std::vector& targets); + std::unique_ptr transformOperator( + const core::PlanNodePtr& planNode); + StatefulOperatorPtr transformWatermarkAssignerOperator( + const StatefulPlanNode& planNode); + StatefulOperatorPtr transformStreamPartitionOperator( + const StatefulPlanNode& planNode); + StatefulOperatorPtr transformStreamJoinOperator( + const StatefulPlanNode& planNode); + StatefulOperatorPtr transformStreamWindowJoinOperator( + const StatefulPlanNode& planNode); + StatefulOperatorPtr transformStreamWindowAggregationOperator( + const StatefulPlanNode& planNode); + StatefulOperatorPtr transformGroupWindowAggregationOperator( + const StatefulPlanNode& planNode); + StatefulOperatorPtr transformStreamRankOperator( + const StatefulPlanNode& planNode); + StatefulOperatorPtr transformGroupAggregationOperator( + const StatefulPlanNode& planNode); + StatefulOperatorPtr transformGenericOperator( + const StatefulPlanNode& planNode); }; } // namespace facebook::velox::stateful