Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/velox/compute/VeloxPlanConverter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ std::shared_ptr<SplitInfo> parseScanSplitInfo(
using SubstraitFileFormatCase = ::substrait::ReadRel_LocalFiles_FileOrFiles::FileFormatCase;

auto splitInfo = std::make_shared<SplitInfo>();
splitInfo->leafType = SplitInfo::LeafType::TABLE_SCAN;
splitInfo->paths.reserve(fileList.size());
splitInfo->starts.reserve(fileList.size());
splitInfo->lengths.reserve(fileList.size());
Expand Down
11 changes: 8 additions & 3 deletions cpp/velox/compute/VeloxRuntime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,16 @@ void VeloxRuntime::getInfoAndIds(
// 1. Streams follow "iterator:<idx>" in the substrait plan;
// 2. Files follow the traversal order in the plan node tree.
// FIXME: Why we didn't have a unified design?
if (splitInfo->isStream) {
switch (splitInfo->leafType) {
case SplitInfo::LeafType::SPLIT_AWARE_STREAM:
streamIds.emplace_back(ValueStreamConnectorFactory::nodeIdOf(streamIdx++));
} else {
scanInfos.emplace_back(splitInfo);
break;
case SplitInfo::LeafType::TABLE_SCAN:
scanInfos.emplace_back(splitInfo);
scanIds.emplace_back(leafPlanNodeId);
break;
case SplitInfo::LeafType::TRIVIAL_LEAF:
break;
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ void WholeStageResultIterator::constructPartitionColumns(
}

void WholeStageResultIterator::addIteratorSplits(const std::vector<std::shared_ptr<ResultIterator>>& inputIterators) {
GLUTEN_CHECK(!allSplitsAdded, "Method addIteratorSplits should not be called since all splits has been added to the Velox task.");
GLUTEN_CHECK(!allSplitsAdded_, "Method addIteratorSplits should not be called since all splits has been added to the Velox task.");
// Create IteratorConnectorSplit for each iterator
for (size_t i = 0; i < streamIds_.size() && i < inputIterators.size(); ++i) {
if (inputIterators[i] == nullptr) {
Expand All @@ -372,7 +372,7 @@ void WholeStageResultIterator::addIteratorSplits(const std::vector<std::shared_p
}

void WholeStageResultIterator::noMoreSplits() {
if (allSplitsAdded) {
if (allSplitsAdded_) {
return;
}
// Mark no more splits for all scan nodes
Expand All @@ -390,7 +390,7 @@ void WholeStageResultIterator::noMoreSplits() {
for (const auto& streamId : streamIds_) {
task_->noMoreSplits(streamId);
}
allSplitsAdded = true;
allSplitsAdded_ = true;
}

void WholeStageResultIterator::collectMetrics() {
Expand Down
2 changes: 1 addition & 1 deletion cpp/velox/compute/WholeStageResultIterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ class WholeStageResultIterator : public SplitAwareColumnarBatchIterator {
std::vector<std::shared_ptr<SplitInfo>> scanInfos_;
std::vector<facebook::velox::core::PlanNodeId> streamIds_;
std::vector<std::vector<facebook::velox::exec::Split>> splits_;
bool allSplitsAdded = false;
bool allSplitsAdded_ = false;

int64_t loadLazyVectorTime_ = 0;
};
Expand Down
7 changes: 4 additions & 3 deletions cpp/velox/substrait/SubstraitToVeloxPlan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1323,7 +1323,7 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::constructValueStreamNode(

// Mark this as a stream-based split
auto splitInfo = std::make_shared<SplitInfo>();
splitInfo->isStream = true;
splitInfo->leafType = SplitInfo::LeafType::SPLIT_AWARE_STREAM;
splitInfoMap_[tableScanNode->id()] = splitInfo;

return tableScanNode;
Expand Down Expand Up @@ -1360,7 +1360,7 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::constructCudfValueStreamNode(
auto node = std::make_shared<CudfValueStreamNode>(nextPlanNodeId(), outputType, std::move(iterator));

auto splitInfo = std::make_shared<SplitInfo>();
splitInfo->isStream = true;
splitInfo->leafType = SplitInfo::LeafType::TRIVIAL_LEAF;
splitInfoMap_[node->id()] = splitInfo;
return node;
}
Expand All @@ -1381,7 +1381,7 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::constructValuesNode(
}
auto node = std::make_shared<facebook::velox::core::ValuesNode>(nextPlanNodeId(), std::move(rowVectors));
auto splitInfo = std::make_shared<SplitInfo>();
splitInfo->isStream = true;
splitInfo->leafType = SplitInfo::LeafType::TRIVIAL_LEAF;
splitInfoMap_[node->id()] = splitInfo;
return node;
}
Expand Down Expand Up @@ -1412,6 +1412,7 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::

// Otherwise, will create TableScan node for ReadRel.
auto splitInfo = std::make_shared<SplitInfo>();
splitInfo->leafType = SplitInfo::LeafType::TABLE_SCAN;
if (!validationMode_) {
VELOX_CHECK_LT(splitInfoIdx_, splitInfos_.size(), "Plan must have readRel and related split info.");
splitInfo = splitInfos_[splitInfoIdx_++];
Expand Down
13 changes: 11 additions & 2 deletions cpp/velox/substrait/SubstraitToVeloxPlan.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,17 @@ namespace gluten {
class ResultIterator;

struct SplitInfo {
/// Whether the split comes from arrow array stream node.
bool isStream = false;
enum class LeafType {
/// A streaming node that accepts iterator splits.
SPLIT_AWARE_STREAM = 0,
/// A table scan node that accepts scan splits.
TABLE_SCAN = 1,
/// A leaf node that doesn't rely on splits.
TRIVIAL_LEAF = 2
};

/// The type of the associated Velox leaf query plan node.
LeafType leafType = LeafType::TRIVIAL_LEAF;

/// The Partition index.
u_int32_t partitionIndex;
Expand Down