From 14820c7a01b13dc48794e1f035fdea824263132a Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Wed, 28 Jan 2026 19:42:43 +0800 Subject: [PATCH 1/2] update --- .../experimental/stateful/StatefulPlanner.cpp | 33 ++++++------------- 1 file changed, 10 insertions(+), 23 deletions(-) diff --git a/velox/experimental/stateful/StatefulPlanner.cpp b/velox/experimental/stateful/StatefulPlanner.cpp index 45ba143a50bb..a4835fff5ba4 100644 --- a/velox/experimental/stateful/StatefulPlanner.cpp +++ b/velox/experimental/stateful/StatefulPlanner.cpp @@ -15,24 +15,18 @@ */ #include "velox/core/PlanFragment.h" #include "velox/exec/AssignUniqueId.h" -#include "velox/exec/CallbackSink.h" #include "velox/exec/EnforceSingleRow.h" -#include "velox/exec/Exchange.h" #include "velox/exec/Expand.h" #include "velox/exec/FilterProject.h" #include "velox/exec/GroupId.h" #include "velox/exec/HashAggregation.h" -#include "velox/exec/HashBuild.h" #include "velox/exec/HashProbe.h" #include "velox/exec/IndexLookupJoin.h" #include "velox/exec/Limit.h" #include "velox/exec/MarkDistinct.h" -#include "velox/exec/Merge.h" #include "velox/exec/MergeJoin.h" -#include "velox/exec/NestedLoopJoinBuild.h" #include "velox/exec/NestedLoopJoinProbe.h" #include "velox/exec/OrderBy.h" -#include "velox/exec/RoundRobinPartitionFunction.h" #include "velox/exec/RowNumber.h" #include "velox/exec/StreamingAggregation.h" #include "velox/exec/TableScan.h" @@ -80,8 +74,6 @@ StatefulOperatorPtr StatefulPlanner::plan( return planner.transformStatefulOperators(planFragment.planNode); } -#define CHECK_NODE_TYPE(TYPE, node) std::dynamic_pointer_cast(node->node()) != nullptr - StatefulOperatorPtr StatefulPlanner::transformStatefulOperators(const core::PlanNodePtr& planNode) { auto statefulNode = std::dynamic_pointer_cast(planNode); VELOX_CHECK(statefulNode, "Not stateful node: {}", planNode->toString()); @@ -272,7 +264,6 @@ StatefulOperatorPtr StatefulPlanner::transformGroupWindowAggregationOperator(con auto windowAggNode = std::dynamic_pointer_cast(planNode.node()); VELOX_CHECK(windowAggNode, "Failed to cast to GroupWindowAggregationNode"); - VELOX_MEM_LOG(ERROR)<< "transformGroupWindowAggregationOperator:" << planNode.toString(); auto op = transformOperator(windowAggNode->aggregation()); std::unique_ptr keySelector = @@ -343,6 +334,14 @@ StatefulOperatorPtr StatefulPlanner::transformGroupAggregationOperator(const Sta std::move(targets)); } +StatefulOperatorPtr StatefulPlanner::transformGenericOperator(const StatefulPlanNode& planNode) { + std::vector targets = transformStatefulOperators(planNode.targets()); + std::unique_ptr op = transformOperator(planNode.node()); + return std::make_unique( + std::move(op), + std::move(targets)); +} + std::unique_ptr StatefulPlanner::transformOperator(const core::PlanNodePtr& planNode) { if (auto filterNode = std::dynamic_pointer_cast(planNode)) { if (planNode->sources().size() == 1) { @@ -456,21 +455,9 @@ std::unique_ptr StatefulPlanner::transformOperator(const core::P topNNode->outputRankNumber(), topNNode->cacheSize()); } - std::unique_ptr extended; - extended = exec::Operator::fromPlanNode(ctx_, nextOperatorId(), planNode); - if (!extended) { - VELOX_MEM_LOG(ERROR)<< "Failed to create operator for plan node:" << process::StackTrace().toString(); - } - VELOX_CHECK(extended, "Unsupported plan node: {}", planNode->toString()); + std::unique_ptr extended = exec::Operator::fromPlanNode(ctx_, nextOperatorId(), planNode); + VELOX_CHECK(extended, "Unsupported plan node: {}\n{}", planNode->toString(), process::StackTrace().toString()); return extended; } -StatefulOperatorPtr StatefulPlanner::transformGenericOperator(const StatefulPlanNode& planNode) { - std::vector targets = transformStatefulOperators(planNode.targets()); - std::unique_ptr op = transformOperator(planNode.node()); - return std::make_unique( - std::move(op), - std::move(targets)); -} - } // namespace facebook::velox::stateful From b84604b74696b291906879d24b38c276fea386ca Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 30 Jan 2026 02:03:32 +0000 Subject: [PATCH 2/2] Initial plan