diff --git a/cpp/velox/compute/VeloxPlanConverter.cc b/cpp/velox/compute/VeloxPlanConverter.cc index 4764fad382c4..627bd396b7df 100644 --- a/cpp/velox/compute/VeloxPlanConverter.cc +++ b/cpp/velox/compute/VeloxPlanConverter.cc @@ -46,6 +46,7 @@ std::shared_ptr parseScanSplitInfo( using SubstraitFileFormatCase = ::substrait::ReadRel_LocalFiles_FileOrFiles::FileFormatCase; auto splitInfo = std::make_shared(); + splitInfo->leafType = SplitInfo::LeafType::TABLE_SCAN; splitInfo->paths.reserve(fileList.size()); splitInfo->starts.reserve(fileList.size()); splitInfo->lengths.reserve(fileList.size()); diff --git a/cpp/velox/compute/VeloxRuntime.cc b/cpp/velox/compute/VeloxRuntime.cc index 69d21b4a57c9..e88cb43a91b6 100644 --- a/cpp/velox/compute/VeloxRuntime.cc +++ b/cpp/velox/compute/VeloxRuntime.cc @@ -137,11 +137,16 @@ void VeloxRuntime::getInfoAndIds( // 1. Streams follow "iterator:" in the substrait plan; // 2. Files follow the traversal order in the plan node tree. // FIXME: Why we didn't have a unified design? - if (splitInfo->isStream) { + switch (splitInfo->leafType) { + case SplitInfo::LeafType::SPLIT_AWARE_STREAM: streamIds.emplace_back(ValueStreamConnectorFactory::nodeIdOf(streamIdx++)); - } else { - scanInfos.emplace_back(splitInfo); +break; + case SplitInfo::LeafType::TABLE_SCAN: + scanInfos.emplace_back(splitInfo); scanIds.emplace_back(leafPlanNodeId); +break; + case SplitInfo::LeafType::TRIVIAL_LEAF: +break; } } } diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index ac5773e439ee..9a40b8b7dcfd 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -358,7 +358,7 @@ void WholeStageResultIterator::constructPartitionColumns( } void WholeStageResultIterator::addIteratorSplits(const std::vector>& inputIterators) { - GLUTEN_CHECK(!allSplitsAdded, "Method addIteratorSplits should not be called since all splits has been added to the Velox task."); + GLUTEN_CHECK(!allSplitsAdded_, "Method addIteratorSplits should not be called since all splits has been added to the Velox task."); // Create IteratorConnectorSplit for each iterator for (size_t i = 0; i < streamIds_.size() && i < inputIterators.size(); ++i) { if (inputIterators[i] == nullptr) { @@ -372,7 +372,7 @@ void WholeStageResultIterator::addIteratorSplits(const std::vectornoMoreSplits(streamId); } - allSplitsAdded = true; + allSplitsAdded_ = true; } void WholeStageResultIterator::collectMetrics() { diff --git a/cpp/velox/compute/WholeStageResultIterator.h b/cpp/velox/compute/WholeStageResultIterator.h index 401cff06dafd..85cb18fd251a 100644 --- a/cpp/velox/compute/WholeStageResultIterator.h +++ b/cpp/velox/compute/WholeStageResultIterator.h @@ -139,7 +139,7 @@ class WholeStageResultIterator : public SplitAwareColumnarBatchIterator { std::vector> scanInfos_; std::vector streamIds_; std::vector> splits_; - bool allSplitsAdded = false; + bool allSplitsAdded_ = false; int64_t loadLazyVectorTime_ = 0; }; diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index 0085c553ba85..b543dfa8ba89 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -1323,7 +1323,7 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::constructValueStreamNode( // Mark this as a stream-based split auto splitInfo = std::make_shared(); - splitInfo->isStream = true; + splitInfo->leafType = SplitInfo::LeafType::SPLIT_AWARE_STREAM; splitInfoMap_[tableScanNode->id()] = splitInfo; return tableScanNode; @@ -1360,7 +1360,7 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::constructCudfValueStreamNode( auto node = std::make_shared(nextPlanNodeId(), outputType, std::move(iterator)); auto splitInfo = std::make_shared(); - splitInfo->isStream = true; + splitInfo->leafType = SplitInfo::LeafType::TRIVIAL_LEAF; splitInfoMap_[node->id()] = splitInfo; return node; } @@ -1381,7 +1381,7 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::constructValuesNode( } auto node = std::make_shared(nextPlanNodeId(), std::move(rowVectors)); auto splitInfo = std::make_shared(); - splitInfo->isStream = true; + splitInfo->leafType = SplitInfo::LeafType::TRIVIAL_LEAF; splitInfoMap_[node->id()] = splitInfo; return node; } @@ -1412,6 +1412,7 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: // Otherwise, will create TableScan node for ReadRel. auto splitInfo = std::make_shared(); + splitInfo->leafType = SplitInfo::LeafType::TABLE_SCAN; if (!validationMode_) { VELOX_CHECK_LT(splitInfoIdx_, splitInfos_.size(), "Plan must have readRel and related split info."); splitInfo = splitInfos_[splitInfoIdx_++]; diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.h b/cpp/velox/substrait/SubstraitToVeloxPlan.h index 0e00764a66e2..47bf3a0525b1 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.h +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.h @@ -28,8 +28,17 @@ namespace gluten { class ResultIterator; struct SplitInfo { - /// Whether the split comes from arrow array stream node. - bool isStream = false; + enum class LeafType { + /// A streaming node that accepts iterator splits. + SPLIT_AWARE_STREAM = 0, + /// A table scan node that accepts scan splits. + TABLE_SCAN = 1, + /// A leaf node that doesn't rely on splits. + TRIVIAL_LEAF = 2 + }; + + /// The type of the associated Velox leaf query plan node. + LeafType leafType = LeafType::TRIVIAL_LEAF; /// The Partition index. u_int32_t partitionIndex;