diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala index c6ad7d50295b..668e60b20542 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala @@ -213,11 +213,12 @@ class VeloxIteratorApi extends IteratorApi with Logging { val resIter: ColumnarBatchOutIterator = transKernel.createKernelWithBatchIterator( inputPartition.plan, - splitInfoByteArray, - columnarNativeIterators.asJava, + if (splitInfoByteArray.nonEmpty) splitInfoByteArray else null, + if (columnarNativeIterators.nonEmpty) columnarNativeIterators.toArray else null, partitionIndex, BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath) ) + resIter.noMoreSplits() val itrMetrics = IteratorMetricsJniWrapper.create() Iterators @@ -261,12 +262,12 @@ class VeloxIteratorApi extends IteratorApi with Logging { val nativeResultIterator = transKernel.createKernelWithBatchIterator( rootNode.toProtobuf.toByteArray, - // Final iterator does not contain scan split, so pass empty split info to native here. - new Array[Array[Byte]](0), - columnarNativeIterator.asJava, + null, + if (columnarNativeIterator.nonEmpty) columnarNativeIterator.toArray else null, partitionIndex, BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath) ) + nativeResultIterator.noMoreSplits() val itrMetrics = IteratorMetricsJniWrapper.create() Iterators diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala index 0ea6bde3d827..e1a0fd98eeb5 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala @@ -869,7 +869,7 @@ class MiscOperatorSuite extends VeloxWholeStageTransformerSuite with AdaptiveSpa } assert(wholeStageTransformers.size == 3) val nativePlanString = wholeStageTransformers.head.nativePlanString() - assert(nativePlanString.contains("Aggregation[1][SINGLE")) + assert(nativePlanString.matches("[\\s\\S]*Aggregation\\[\\d+]\\[SINGLE[\\s\\S]*")) assert(nativePlanString.contains("ValueStream")) assert(wholeStageTransformers(1).nativePlanString().contains("ValueStream")) assert(wholeStageTransformers.last.nativePlanString().contains("TableScan")) diff --git a/cpp/core/compute/Runtime.h b/cpp/core/compute/Runtime.h index c1f82a0c3489..a15cdc83d310 100644 --- a/cpp/core/compute/Runtime.h +++ b/cpp/core/compute/Runtime.h @@ -23,6 +23,7 @@ #include "compute/ResultIterator.h" #include "memory/ColumnarBatch.h" #include "memory/MemoryManager.h" +#include "memory/SplitAwareColumnarBatchIterator.h" #include "operators/c2r/ColumnarToRow.h" #include "operators/r2c/RowToColumnar.h" #include "operators/serializer/ColumnarBatchSerializer.h" @@ -105,6 +106,10 @@ class Runtime : public std::enable_shared_from_this { throw GlutenException("Not implemented"); } + virtual void noMoreSplits(ResultIterator* iter) { + throw GlutenException("Not implemented"); + } + virtual std::shared_ptr createOrGetEmptySchemaBatch(int32_t numRows) { throw GlutenException("Not implemented"); } diff --git a/cpp/core/jni/JniCommon.h b/cpp/core/jni/JniCommon.h index cb400e2db321..1044edb6d779 100644 --- a/cpp/core/jni/JniCommon.h +++ b/cpp/core/jni/JniCommon.h @@ -233,6 +233,7 @@ class SafeNativeArray { public: virtual ~SafeNativeArray() { PrimitiveArray::release(env_, javaArray_, nativeArray_); + env_->DeleteLocalRef(javaArray_); } SafeNativeArray(const SafeNativeArray&) = delete; @@ -255,7 +256,7 @@ class SafeNativeArray { private: SafeNativeArray(JNIEnv* env, JavaArrayType javaArray, JniNativeArrayType nativeArray) - : env_(env), javaArray_(javaArray), nativeArray_(nativeArray){}; + : env_(env), javaArray_(static_cast(env_->NewLocalRef(javaArray))), nativeArray_(nativeArray){}; JNIEnv* env_; JavaArrayType javaArray_; diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index adada15f91df..e9d797e1dbfb 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -29,6 +29,7 @@ #include #include #include "memory/AllocationListener.h" +#include "memory/SplitAwareColumnarBatchIterator.h" #include "operators/serializer/ColumnarBatchSerializer.h" #include "shuffle/LocalPartitionWriter.h" #include "shuffle/Partitioning.h" @@ -455,7 +456,7 @@ Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_nativeCreateKernelWith jobject wrapper, jbyteArray planArr, jobjectArray splitInfosArr, - jobjectArray iterArr, + jobjectArray batchItrArray, jint stageId, jint partitionId, jlong taskId, @@ -478,24 +479,28 @@ Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_nativeCreateKernelWith ctx->parsePlan(safePlanArray.elems(), planSize); - for (jsize i = 0, splitInfoArraySize = env->GetArrayLength(splitInfosArr); i < splitInfoArraySize; i++) { - jbyteArray splitInfoArray = static_cast(env->GetObjectArrayElement(splitInfosArr, i)); - jsize splitInfoSize = env->GetArrayLength(splitInfoArray); - auto safeSplitArray = getByteArrayElementsSafe(env, splitInfoArray); - auto splitInfoData = safeSplitArray.elems(); + if (splitInfosArr != nullptr) { + for (jsize i = 0, splitInfoArraySize = env->GetArrayLength(splitInfosArr); i < splitInfoArraySize; i++) { + jbyteArray splitInfoArray = static_cast(env->GetObjectArrayElement(splitInfosArr, i)); + jsize splitInfoSize = env->GetArrayLength(splitInfoArray); + auto safeSplitArray = getByteArrayElementsSafe(env, splitInfoArray); + auto splitInfoData = safeSplitArray.elems(); - ctx->parseSplitInfo(splitInfoData, splitInfoSize, i); + ctx->parseSplitInfo(splitInfoData, splitInfoSize, i); + } } // Handle the Java iters - jsize itersLen = env->GetArrayLength(iterArr); std::vector> inputIters; - inputIters.reserve(itersLen); - for (int idx = 0; idx < itersLen; idx++) { - jobject iter = env->GetObjectArrayElement(iterArr, idx); - auto arrayIter = std::make_unique(env, iter, ctx, idx); - auto resultIter = std::make_shared(std::move(arrayIter)); - inputIters.push_back(std::move(resultIter)); + if (batchItrArray != nullptr) { + jsize itersLen = env->GetArrayLength(batchItrArray); + inputIters.reserve(itersLen); + for (int idx = 0; idx < itersLen; idx++) { + jobject iter = env->GetObjectArrayElement(batchItrArray, idx); + auto arrayIter = std::make_unique(env, iter, ctx, idx); + auto resultIter = std::make_shared(std::move(arrayIter)); + inputIters.push_back(std::move(resultIter)); + } } return ctx->saveObject(ctx->createResultIterator(spillDirStr, inputIters)); @@ -630,6 +635,66 @@ JNIEXPORT void JNICALL Java_org_apache_gluten_vectorized_ColumnarBatchOutIterato JNI_METHOD_END() } +JNIEXPORT jboolean JNICALL Java_org_apache_gluten_vectorized_ColumnarBatchOutIterator_nativeAddIteratorSplits( // NOLINT + JNIEnv* env, + jobject wrapper, + jlong iterHandle, + jobjectArray batchItrArray) { + JNI_METHOD_START + auto ctx = getRuntime(env, wrapper); + auto outIter = ObjectStore::retrieve(iterHandle); + if (outIter == nullptr) { + throw GlutenException("Invalid iterator handle for addSplits"); + } + + // Get the underlying split-aware iterator + auto* splitAwareIter = dynamic_cast(outIter->getInputIter()); + if (splitAwareIter == nullptr) { + throw GlutenException("Iterator does not support split management"); + } + + GLUTEN_CHECK(batchItrArray != nullptr, "FATAL: Splits to add cannot be null"); + + // Convert Java ColumnarBatchInIterator[] to native iterators and add as splits + jsize numIterators = env->GetArrayLength(batchItrArray); + std::vector> inputIterators; + inputIterators.reserve(numIterators); + + for (jsize idx = 0; idx < numIterators; idx++) { + jobject iter = env->GetObjectArrayElement(batchItrArray, idx); + if (iter == nullptr) { + inputIterators.push_back(nullptr); + } else { + auto arrayIter = std::make_unique(env, iter, ctx, idx); + auto resultIter = std::make_shared(std::move(arrayIter)); + inputIterators.push_back(std::move(resultIter)); + } + env->DeleteLocalRef(iter); + } + + // Add iterator splits via interface method + if (!inputIterators.empty()) { + splitAwareIter->addIteratorSplits(inputIterators); + } + + return true; + JNI_METHOD_END(false) +} + +JNIEXPORT void JNICALL Java_org_apache_gluten_vectorized_ColumnarBatchOutIterator_nativeNoMoreSplits( // NOLINT + JNIEnv* env, + jobject wrapper, + jlong iterHandle) { + JNI_METHOD_START + auto ctx = getRuntime(env, wrapper); + auto iter = ObjectStore::retrieve(iterHandle); + if (iter == nullptr) { + throw GlutenException("Invalid iterator handle for noMoreSplits"); + } + ctx->noMoreSplits(iter.get()); + JNI_METHOD_END() +} + JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_NativeColumnarToRowJniWrapper_nativeColumnarToRowInit( // NOLINT JNIEnv* env, diff --git a/cpp/core/memory/SplitAwareColumnarBatchIterator.h b/cpp/core/memory/SplitAwareColumnarBatchIterator.h new file mode 100644 index 000000000000..e12f38afadf5 --- /dev/null +++ b/cpp/core/memory/SplitAwareColumnarBatchIterator.h @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include "ColumnarBatchIterator.h" + +// Forward declarations +namespace substrait { +class ReadRel_LocalFiles; +} + +namespace gluten { + +// Forward declaration +class ResultIterator; + +/// Abstract base class for iterators that support dynamic split management. +/// Provides APIs for adding splits after iterator creation and signaling completion. +class SplitAwareColumnarBatchIterator : public ColumnarBatchIterator { + public: + SplitAwareColumnarBatchIterator() = default; + virtual ~SplitAwareColumnarBatchIterator() = default; + + /// Add iterator-based splits from input iterators. + virtual void addIteratorSplits(const std::vector>& inputIterators) = 0; + + /// Signal that no more splits will be added to this iterator. + /// This must be called after all splits have been added to ensure proper task completion. + virtual void noMoreSplits() = 0; +}; + +} // namespace gluten diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt index 56fab701ee07..eb0e11e04b59 100644 --- a/cpp/velox/CMakeLists.txt +++ b/cpp/velox/CMakeLists.txt @@ -205,6 +205,7 @@ if(ENABLE_GPU) VELOX_SRCS cudf/CudfPlanValidator.cc cudf/GpuLock.cc + operators/plannodes/CudfVectorStream.cc shuffle/VeloxGpuShuffleReader.cc shuffle/VeloxGpuShuffleWriter.cc operators/serializer/VeloxGpuColumnarBatchSerializer.cc diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc b/cpp/velox/benchmarks/GenericBenchmark.cc index 212125413711..616ba9bcfbde 100644 --- a/cpp/velox/benchmarks/GenericBenchmark.cc +++ b/cpp/velox/benchmarks/GenericBenchmark.cc @@ -452,6 +452,7 @@ auto BM_Generic = [](::benchmark::State& state, } auto resultIter = runtime->createResultIterator(veloxSpillDir, std::move(inputIters)); + runtime->noMoreSplits(resultIter.get()); listenerPtr->setIterator(resultIter.get()); if (FLAGS_with_shuffle) { diff --git a/cpp/velox/compute/VeloxBackend.cc b/cpp/velox/compute/VeloxBackend.cc index 75fd84b104bb..45a64908e199 100644 --- a/cpp/velox/compute/VeloxBackend.cc +++ b/cpp/velox/compute/VeloxBackend.cc @@ -47,6 +47,7 @@ #include "velox/connectors/hive/BufferedInputBuilder.h" #include "velox/connectors/hive/HiveConnector.h" #include "velox/connectors/hive/HiveDataSource.h" +#include "operators/plannodes/RowVectorStream.h" #include "velox/connectors/hive/storage_adapters/abfs/RegisterAbfsFileSystem.h" // @manual #include "velox/connectors/hive/storage_adapters/gcs/RegisterGcsFileSystem.h" // @manual #include "velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h" @@ -204,7 +205,6 @@ void VeloxBackend::init( // RSS shuffle serde. facebook::velox::serializer::presto::PrestoVectorSerde::registerNamedVectorSerde(); } - velox::exec::Operator::registerOperator(std::make_unique()); initUdf(); @@ -317,6 +317,10 @@ void VeloxBackend::initConnector(const std::shared_ptr(kHiveConnectorId, hiveConf, ioExecutor_.get())); + + // Register value-stream connector for runtime iterator-based inputs + velox::connector::registerConnector(std::make_shared(kIteratorConnectorId, hiveConf)); + #ifdef GLUTEN_ENABLE_GPU if (backendConf_->get(kCudfEnableTableScan, kCudfEnableTableScanDefault) && backendConf_->get(kCudfEnabled, kCudfEnabledDefault)) { diff --git a/cpp/velox/compute/VeloxPlanConverter.cc b/cpp/velox/compute/VeloxPlanConverter.cc index 05e78eb1badc..4764fad382c4 100644 --- a/cpp/velox/compute/VeloxPlanConverter.cc +++ b/cpp/velox/compute/VeloxPlanConverter.cc @@ -18,27 +18,25 @@ #include "VeloxPlanConverter.h" #include -#include "compute/ResultIterator.h" #include "config/GlutenConfig.h" #include "iceberg/IcebergPlanConverter.h" -#include "velox/common/file/FileSystems.h" +#include "operators/plannodes/IteratorSplit.h" namespace gluten { using namespace facebook; VeloxPlanConverter::VeloxPlanConverter( - const std::vector>& inputIters, velox::memory::MemoryPool* veloxPool, const facebook::velox::config::ConfigBase* veloxCfg, + const std::vector>& rowVectors, const std::optional writeFilesTempPath, const std::optional writeFileName, bool validationMode) : validationMode_(validationMode), veloxCfg_(veloxCfg), - substraitVeloxPlanConverter_(veloxPool, veloxCfg, writeFilesTempPath, writeFileName, validationMode) { + substraitVeloxPlanConverter_(veloxPool, veloxCfg, rowVectors, writeFilesTempPath, writeFileName, validationMode) { VELOX_USER_CHECK_NOT_NULL(veloxCfg_); - substraitVeloxPlanConverter_.setInputIters(std::move(inputIters)); } namespace { @@ -136,7 +134,6 @@ void parseLocalFileNodes( splitInfos.reserve(localFiles.size()); for (const auto& localFile : localFiles) { const auto& fileList = localFile.items(); - splitInfos.push_back(parseScanSplitInfo(veloxCfg, fileList)); } diff --git a/cpp/velox/compute/VeloxPlanConverter.h b/cpp/velox/compute/VeloxPlanConverter.h index 4678dccea74a..0b597a91f9ed 100644 --- a/cpp/velox/compute/VeloxPlanConverter.h +++ b/cpp/velox/compute/VeloxPlanConverter.h @@ -18,11 +18,11 @@ #pragma once #include -#include "compute/ResultIterator.h" -#include "memory/VeloxMemoryManager.h" +#include +#include + #include "substrait/SubstraitToVeloxPlan.h" #include "substrait/plan.pb.h" -#include "velox/core/PlanNode.h" namespace gluten { @@ -30,9 +30,9 @@ namespace gluten { class VeloxPlanConverter { public: explicit VeloxPlanConverter( - const std::vector>& inputIters, facebook::velox::memory::MemoryPool* veloxPool, const facebook::velox::config::ConfigBase* veloxCfg, + const std::vector>& rowVectors, const std::optional writeFilesTempPath = std::nullopt, const std::optional writeFileName = std::nullopt, bool validationMode = false); @@ -45,6 +45,12 @@ class VeloxPlanConverter { return substraitVeloxPlanConverter_.splitInfos(); } + /// The input iterators not inlined to VeloxPlan. They should be then manually added to the Velox task + /// via WholeStageResultIterator#addIteratorSplits. Empty if no input iterators remaining. + const std::vector>& remainingInputIterators() const { + return substraitVeloxPlanConverter_.remainingInputIterators(); + } + private: bool validationMode_; diff --git a/cpp/velox/compute/VeloxRuntime.cc b/cpp/velox/compute/VeloxRuntime.cc index af0fde37ce49..69d21b4a57c9 100644 --- a/cpp/velox/compute/VeloxRuntime.cc +++ b/cpp/velox/compute/VeloxRuntime.cc @@ -17,6 +17,8 @@ #include "VeloxRuntime.h" +#include + #include #include @@ -124,14 +126,19 @@ void VeloxRuntime::getInfoAndIds( std::vector>& scanInfos, std::vector& scanIds, std::vector& streamIds) { + int32_t streamIdx = 0; for (const auto& leafPlanNodeId : leafPlanNodeIds) { auto it = splitInfoMap.find(leafPlanNodeId); if (it == splitInfoMap.end()) { throw std::runtime_error("Could not find leafPlanNodeId."); } auto splitInfo = it->second; + // Based on the current code, indexing of streams and files follow different orders: + // 1. Streams follow "iterator:" in the substrait plan; + // 2. Files follow the traversal order in the plan node tree. + // FIXME: Why we didn't have a unified design? if (splitInfo->isStream) { - streamIds.emplace_back(leafPlanNodeId); + streamIds.emplace_back(ValueStreamConnectorFactory::nodeIdOf(streamIdx++)); } else { scanInfos.emplace_back(splitInfo); scanIds.emplace_back(leafPlanNodeId); @@ -140,10 +147,8 @@ void VeloxRuntime::getInfoAndIds( } std::string VeloxRuntime::planString(bool details, const std::unordered_map& sessionConf) { - std::vector> inputs; auto veloxMemoryPool = gluten::defaultLeafVeloxMemoryPool(); - VeloxPlanConverter veloxPlanConverter( - inputs, veloxMemoryPool.get(), veloxCfg_.get(), std::nullopt, std::nullopt, true); + VeloxPlanConverter veloxPlanConverter(veloxMemoryPool.get(), veloxCfg_.get(), {}, std::nullopt, std::nullopt, true); auto veloxPlan = veloxPlanConverter.toVeloxPlan(substraitPlan_, localFiles_); return veloxPlan->toString(details, true); } @@ -160,9 +165,9 @@ std::shared_ptr VeloxRuntime::createResultIterator( LOG_IF(INFO, debugModeEnabled_) << "VeloxRuntime session config:" << printConfig(confMap_); VeloxPlanConverter veloxPlanConverter( - inputs, memoryManager()->getLeafMemoryPool().get(), veloxCfg_.get(), + inputs, *localWriteFilesTempPath(), *localWriteFileName()); veloxPlan_ = veloxPlanConverter.toVeloxPlan(substraitPlan_, std::move(localFiles_)); @@ -187,9 +192,24 @@ std::shared_ptr VeloxRuntime::createResultIterator( spillDir, veloxCfg_, taskInfo_.has_value() ? taskInfo_.value() : SparkTaskInfo{}); + + auto remainingInputIterators = veloxPlanConverter.remainingInputIterators(); + if (!remainingInputIterators.empty()) { + // Converts remaining input iterators to splits and add them to the task. + wholeStageIter->addIteratorSplits(remainingInputIterators); + } + return std::make_shared(std::move(wholeStageIter), this); } +void VeloxRuntime::noMoreSplits(ResultIterator* iter){ + auto* splitAwareIter = dynamic_cast(iter->getInputIter()); + if (splitAwareIter == nullptr) { + throw GlutenException("Iterator does not support split management"); + } + splitAwareIter->noMoreSplits(); +} + std::shared_ptr VeloxRuntime::createColumnar2RowConverter(int64_t column2RowMemThreshold) { auto veloxPool = memoryManager()->getLeafMemoryPool(); return std::make_shared(veloxPool, column2RowMemThreshold); diff --git a/cpp/velox/compute/VeloxRuntime.h b/cpp/velox/compute/VeloxRuntime.h index b39733e83938..a3c3da0c5ac9 100644 --- a/cpp/velox/compute/VeloxRuntime.h +++ b/cpp/velox/compute/VeloxRuntime.h @@ -59,6 +59,8 @@ class VeloxRuntime final : public Runtime { const std::string& spillDir, const std::vector>& inputs = {}) override; + void noMoreSplits(ResultIterator* iter) override; + std::shared_ptr createColumnar2RowConverter(int64_t column2RowMemThreshold) override; std::shared_ptr createOrGetEmptySchemaBatch(int32_t numRows) override; diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index e91e2ad69d62..ac5773e439ee 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -16,6 +16,7 @@ */ #include "WholeStageResultIterator.h" #include "VeloxBackend.h" +#include "VeloxPlanConverter.h" #include "VeloxRuntime.h" #include "config/VeloxConfig.h" #include "utils/ConfigExtractor.h" @@ -29,6 +30,8 @@ #include "velox/experimental/cudf/exec/ToCudf.h" #include "cudf/GpuLock.h" #endif +#include "operators/plannodes/RowVectorStream.h" + using namespace facebook; @@ -227,7 +230,6 @@ std::shared_ptr WholeStageResultIterator::createNewVeloxQ } std::shared_ptr WholeStageResultIterator::next() { - tryAddSplitsToTask(); if (task_->isFinished()) { return nullptr; } @@ -355,17 +357,40 @@ void WholeStageResultIterator::constructPartitionColumns( } } -void WholeStageResultIterator::tryAddSplitsToTask() { - if (noMoreSplits_) { +void WholeStageResultIterator::addIteratorSplits(const std::vector>& inputIterators) { + GLUTEN_CHECK(!allSplitsAdded, "Method addIteratorSplits should not be called since all splits has been added to the Velox task."); + // Create IteratorConnectorSplit for each iterator + for (size_t i = 0; i < streamIds_.size() && i < inputIterators.size(); ++i) { + if (inputIterators[i] == nullptr) { + continue; + } + auto connectorSplit = std::make_shared( + kIteratorConnectorId, inputIterators[i]); + exec::Split split(folly::copy(connectorSplit), -1); + task_->addSplit(streamIds_[i], std::move(split)); + } +} + +void WholeStageResultIterator::noMoreSplits() { + if (allSplitsAdded) { return; } + // Mark no more splits for all scan nodes for (int idx = 0; idx < scanNodeIds_.size(); idx++) { for (auto& split : splits_[idx]) { task_->addSplit(scanNodeIds_[idx], std::move(split)); } - task_->noMoreSplits(scanNodeIds_[idx]); } - noMoreSplits_ = true; + + for (const auto& scanNodeId : scanNodeIds_) { + task_->noMoreSplits(scanNodeId); + } + + // Mark no more splits for all stream nodes + for (const auto& streamId : streamIds_) { + task_->noMoreSplits(streamId); + } + allSplitsAdded = true; } void WholeStageResultIterator::collectMetrics() { diff --git a/cpp/velox/compute/WholeStageResultIterator.h b/cpp/velox/compute/WholeStageResultIterator.h index 358b153c7ad6..401cff06dafd 100644 --- a/cpp/velox/compute/WholeStageResultIterator.h +++ b/cpp/velox/compute/WholeStageResultIterator.h @@ -18,7 +18,7 @@ #include "compute/Runtime.h" #include "iceberg/IcebergPlanConverter.h" -#include "memory/ColumnarBatchIterator.h" +#include "memory/SplitAwareColumnarBatchIterator.h" #include "memory/VeloxColumnarBatch.h" #include "substrait/SubstraitToVeloxPlan.h" #include "substrait/plan.pb.h" @@ -33,7 +33,7 @@ namespace gluten { -class WholeStageResultIterator : public ColumnarBatchIterator { +class WholeStageResultIterator : public SplitAwareColumnarBatchIterator { public: WholeStageResultIterator( VeloxMemoryManager* memoryManager, @@ -69,14 +69,22 @@ class WholeStageResultIterator : public ColumnarBatchIterator { return metrics_.get(); } - const facebook::velox::exec::Task* task() const { - return task_.get(); - } - const facebook::velox::core::PlanNode* veloxPlan() const { return veloxPlan_.get(); } + /// Get the underlying Velox task for direct manipulation + facebook::velox::exec::Task* task() { + return task_.get(); + } + + /// Add iterator-based splits from input iterators + void addIteratorSplits(const std::vector>& inputIterators) override; + + /// Signal that no more splits will be added. + /// This is required for proper task completion and enables future barrier support. + void noMoreSplits() override; + private: /// Get the Spark confs to Velox query context. std::unordered_map getQueryContextConf(); @@ -94,9 +102,6 @@ class WholeStageResultIterator : public ColumnarBatchIterator { std::unordered_map>&, const std::unordered_map&); - /// Add splits to task. Skip if already added. - void tryAddSplitsToTask(); - /// Collect Velox metrics. void collectMetrics(); @@ -134,7 +139,7 @@ class WholeStageResultIterator : public ColumnarBatchIterator { std::vector> scanInfos_; std::vector streamIds_; std::vector> splits_; - bool noMoreSplits_ = false; + bool allSplitsAdded = false; int64_t loadLazyVectorTime_ = 0; }; diff --git a/cpp/velox/cudf/CudfPlanValidator.cc b/cpp/velox/cudf/CudfPlanValidator.cc index b2d6e82c22b1..a5d350e57950 100644 --- a/cpp/velox/cudf/CudfPlanValidator.cc +++ b/cpp/velox/cudf/CudfPlanValidator.cc @@ -46,8 +46,8 @@ bool CudfPlanValidator::validate(const ::substrait::Plan& substraitPlan) { std::vector> inputs; std::shared_ptr veloxCfg = std::make_shared( std::unordered_map{{kCudfEnabled, "true"}}); - VeloxPlanConverter veloxPlanConverter( - inputs, veloxMemoryPool.get(), veloxCfg.get(), std::nullopt, std::nullopt, true); + VeloxPlanConverter veloxPlanConverter(veloxMemoryPool.get(), veloxCfg.get(), + inputs, std::nullopt, std::nullopt, true); auto planNode = veloxPlanConverter.toVeloxPlan(substraitPlan, localFiles); std::unordered_set emptySet; velox::core::PlanFragment planFragment{planNode, velox::core::ExecutionStrategy::kUngrouped, 1, emptySet}; diff --git a/cpp/velox/jni/JniFileSystem.cc b/cpp/velox/jni/JniFileSystem.cc index 51a7821f0168..06d831a11769 100644 --- a/cpp/velox/jni/JniFileSystem.cc +++ b/cpp/velox/jni/JniFileSystem.cc @@ -352,6 +352,7 @@ class JniFileSystem : public facebook::velox::filesystems::FileSystem { jstring element = static_cast(env->GetObjectArrayElement(jarray, i)); std::string cElement = jStringToCString(env, element); out.push_back(cElement); + env->DeleteLocalRef(element); } return out; } diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc index 74adb1dff5fc..ad6f8947eb28 100644 --- a/cpp/velox/jni/VeloxJniWrapper.cc +++ b/cpp/velox/jni/VeloxJniWrapper.cc @@ -219,6 +219,7 @@ Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_nativeValidateExpressi auto id = sFmap.function_anchor(); auto name = sFmap.name(); functionMappings.emplace(id, name); + env->DeleteLocalRef(mapping); } auto pool = defaultLeafVeloxMemoryPool().get(); @@ -475,6 +476,7 @@ Java_org_apache_gluten_utils_VeloxFileSystemValidationJniWrapper_allSupportedByR if (!velox::filesystems::isPathSupportedByRegisteredFileSystems(path)) { return false; } + env->DeleteLocalRef(string); } return true; JNI_METHOD_END(false) diff --git a/cpp/velox/operators/plannodes/CudfVectorStream.cc b/cpp/velox/operators/plannodes/CudfVectorStream.cc new file mode 100644 index 000000000000..85cd2c021dba --- /dev/null +++ b/cpp/velox/operators/plannodes/CudfVectorStream.cc @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "CudfVectorStream.h" +#include "memory/VeloxColumnarBatch.h" +#include "velox/exec/Driver.h" +#include "velox/exec/Operator.h" +#include "velox/exec/Task.h" + +namespace { + +class SuspendedSection { + public: + explicit SuspendedSection(facebook::velox::exec::Driver* driver) : driver_(driver) { + if (driver_->task()->enterSuspended(driver->state()) != facebook::velox::exec::StopReason::kNone) { + VELOX_FAIL("Terminate detected when entering suspended section"); + } + } + + virtual ~SuspendedSection() { + if (driver_->task()->leaveSuspended(driver_->state()) != facebook::velox::exec::StopReason::kNone) { + LOG(WARNING) << "Terminate detected when leaving suspended section for driver " << driver_->driverCtx()->driverId + << " from task " << driver_->task()->taskId(); + } + } + + private: + facebook::velox::exec::Driver* const driver_; +}; + +} // namespace + +namespace gluten { +bool CudfVectorStreamBase::hasNext() { + if (finished_) { + return false; + } + VELOX_DCHECK_NOT_NULL(iterator_); + + bool hasNext; + { + // We are leaving Velox task execution and are probably entering Spark code through JNI. Suspend the current + // driver to make the current task open to spilling. + // + // When a task is getting spilled, it should have been suspended so has zero running threads, otherwise there's + // possibility that this spill call hangs. See https://github.com/apache/incubator-gluten/issues/7243. + // As of now, non-zero running threads usually happens when: + // 1. Task A spills task B; + // 2. Task A tries to grow buffers created by task B, during which spill is requested on task A again. + SuspendedSection ss(driverCtx_->driver); + hasNext = iterator_->hasNext(); + } + if (!hasNext) { + finished_ = true; + } + return hasNext; +} + +std::shared_ptr CudfVectorStreamBase::nextInternal() { + if (finished_) { + return nullptr; + } + std::shared_ptr cb; + { + // We are leaving Velox task execution and are probably entering Spark code through JNI. Suspend the current + // driver to make the current task open to spilling. + SuspendedSection ss(driverCtx_->driver); + cb = iterator_->next(); + } + return cb; +} + +facebook::velox::RowVectorPtr CudfVectorStreamBase::next() { + auto cb = nextInternal(); + const std::shared_ptr& vb = VeloxColumnarBatch::from(pool_, cb); + auto vp = vb->getRowVector(); + VELOX_DCHECK(vp != nullptr); + return std::make_shared( + vp->pool(), outputType_, facebook::velox::BufferPtr(0), vp->size(), vp->children()); +} +} // namespace gluten \ No newline at end of file diff --git a/cpp/velox/operators/plannodes/CudfVectorStream.h b/cpp/velox/operators/plannodes/CudfVectorStream.h index 9758d1c35e28..7a663d252e4c 100644 --- a/cpp/velox/operators/plannodes/CudfVectorStream.h +++ b/cpp/velox/operators/plannodes/CudfVectorStream.h @@ -18,6 +18,11 @@ #pragma once #include "CudfVectorStream.h" +#include "compute/ResultIterator.h" +#include "memory/VeloxColumnarBatch.h" +#include "velox/exec/Driver.h" +#include "velox/exec/Operator.h" +#include "velox/exec/Task.h" #include "velox/experimental/cudf/exec/CudfOperator.h" #include "velox/experimental/cudf/exec/Utilities.h" #include "velox/experimental/cudf/exec/VeloxCudfInterop.h" @@ -25,14 +30,78 @@ namespace gluten { -class CudfVectorStream : public RowVectorStream { +class CudfVectorStreamBase { + public: + virtual ~CudfVectorStreamBase() = default; + + explicit CudfVectorStreamBase( + facebook::velox::exec::DriverCtx* driverCtx, + facebook::velox::memory::MemoryPool* pool, + ResultIterator* iterator, + const facebook::velox::RowTypePtr& outputType) + : driverCtx_(driverCtx), pool_(pool), outputType_(outputType), iterator_(iterator) {} + + bool hasNext(); + + // Convert arrow batch to row vector, construct the new Rowvector with new outputType. + virtual facebook::velox::RowVectorPtr next(); + + protected: + // Get the next batch from iterator_. + std::shared_ptr nextInternal(); + + facebook::velox::exec::DriverCtx* driverCtx_; + facebook::velox::memory::MemoryPool* pool_; + const facebook::velox::RowTypePtr outputType_; + ResultIterator* iterator_; + + bool finished_{false}; +}; + +class ValueStreamNode final : public facebook::velox::core::PlanNode { + public: + ValueStreamNode( + const facebook::velox::core::PlanNodeId& id, + const facebook::velox::RowTypePtr& outputType, + std::shared_ptr iterator) + : facebook::velox::core::PlanNode(id), outputType_(outputType), iterator_(std::move(iterator)) {} + + const facebook::velox::RowTypePtr& outputType() const override { + return outputType_; + } + + const std::vector& sources() const override { + return kEmptySources_; + }; + + ResultIterator* iterator() const { + return iterator_.get(); + } + + std::string_view name() const override { + return "ValueStream"; + } + + folly::dynamic serialize() const override { + VELOX_UNSUPPORTED("ValueStream plan node is not serializable"); + } + + private: + void addDetails(std::stringstream& stream) const override{}; + + const facebook::velox::RowTypePtr outputType_; + std::shared_ptr iterator_; + const std::vector kEmptySources_; +}; + +class CudfVectorStream : public CudfVectorStreamBase { public: CudfVectorStream( facebook::velox::exec::DriverCtx* driverCtx, facebook::velox::memory::MemoryPool* pool, ResultIterator* iterator, const facebook::velox::RowTypePtr& outputType) - : RowVectorStream(driverCtx, pool, iterator, outputType) {} + : CudfVectorStreamBase(driverCtx, pool, iterator, outputType) {} // Convert arrow batch to row vector and use new output columns facebook::velox::RowVectorPtr next() override { @@ -133,7 +202,7 @@ class CudfValueStream : public facebook::velox::exec::SourceOperator, public fac private: bool finished_ = false; - std::unique_ptr rvStream_; + std::unique_ptr rvStream_; }; class CudfVectorStreamOperatorTranslator : public facebook::velox::exec::Operator::PlanNodeTranslator { diff --git a/cpp/velox/operators/plannodes/IteratorSplit.h b/cpp/velox/operators/plannodes/IteratorSplit.h new file mode 100644 index 000000000000..6e42d10e6f3f --- /dev/null +++ b/cpp/velox/operators/plannodes/IteratorSplit.h @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "compute/ResultIterator.h" +#include "velox/connectors/Connector.h" + +namespace gluten { + +/// Custom connector ID for iterator-based splits +constexpr const char* kIteratorConnectorId = "value-stream"; + +/// A custom split type that wraps a ResultIterator +/// This allows iterators to be treated as splits and added dynamically to tasks +class IteratorConnectorSplit : public facebook::velox::connector::ConnectorSplit { + public: + explicit IteratorConnectorSplit(const std::string& connectorId, std::shared_ptr iterator) + : ConnectorSplit(connectorId), iterator_(std::move(iterator)) {} + + std::shared_ptr iterator() const { + return iterator_; + } + + std::string toString() const override { + return fmt::format("IteratorSplit[{}]", connectorId); + } + + private: + std::shared_ptr iterator_; +}; + +} // namespace gluten diff --git a/cpp/velox/operators/plannodes/RowVectorStream.cc b/cpp/velox/operators/plannodes/RowVectorStream.cc index 7121fe93ed1a..7c0b00979a74 100644 --- a/cpp/velox/operators/plannodes/RowVectorStream.cc +++ b/cpp/velox/operators/plannodes/RowVectorStream.cc @@ -20,6 +20,7 @@ #include "velox/exec/Driver.h" #include "velox/exec/Operator.h" #include "velox/exec/Task.h" +#include "velox/vector/arrow/Bridge.h" namespace { @@ -45,6 +46,7 @@ class SuspendedSection { } // namespace namespace gluten { + bool RowVectorStream::hasNext() { if (finished_) { return false; @@ -61,7 +63,13 @@ bool RowVectorStream::hasNext() { // As of now, non-zero running threads usually happens when: // 1. Task A spills task B; // 2. Task A tries to grow buffers created by task B, during which spill is requested on task A again. - SuspendedSection ss(driverCtx_->driver); + const facebook::velox::exec::DriverThreadContext* driverThreadCtx = + facebook::velox::exec::driverThreadContext(); + VELOX_CHECK_NOT_NULL( + driverThreadCtx, + "ExternalStreamDataSource::next() is not called " + "from a driver thread"); + SuspendedSection ss(driverThreadCtx->driverCtx()->driver); hasNext = iterator_->hasNext(); } if (!hasNext) { @@ -78,7 +86,13 @@ std::shared_ptr RowVectorStream::nextInternal() { { // We are leaving Velox task execution and are probably entering Spark code through JNI. Suspend the current // driver to make the current task open to spilling. - SuspendedSection ss(driverCtx_->driver); + const facebook::velox::exec::DriverThreadContext* driverThreadCtx = + facebook::velox::exec::driverThreadContext(); + VELOX_CHECK_NOT_NULL( + driverThreadCtx, + "ExternalStreamDataSource::next() is not called " + "from a driver thread"); + SuspendedSection ss(driverThreadCtx->driverCtx()->driver); cb = iterator_->next(); } return cb; @@ -92,4 +106,67 @@ facebook::velox::RowVectorPtr RowVectorStream::next() { return std::make_shared( vp->pool(), outputType_, facebook::velox::BufferPtr(0), vp->size(), vp->children()); } -} // namespace gluten + +ValueStreamDataSource::ValueStreamDataSource( + const facebook::velox::RowTypePtr& outputType, + const facebook::velox::connector::ConnectorTableHandlePtr& tableHandle, + const facebook::velox::connector::ColumnHandleMap& columnHandles, + facebook::velox::connector::ConnectorQueryCtx* connectorQueryCtx) + : outputType_(outputType), + pool_(connectorQueryCtx->memoryPool()) {} + +void ValueStreamDataSource::addSplit(std::shared_ptr split) { + // Cast to IteratorConnectorSplit to extract the iterator + auto iteratorSplit = std::dynamic_pointer_cast(split); + if (!iteratorSplit) { + throw std::runtime_error("Split is not an IteratorConnectorSplit"); + } + + auto iterator = iteratorSplit->iterator(); + if (!iterator) { + throw std::runtime_error("IteratorConnectorSplit contains null iterator"); + } + + // Create RowVectorStream wrapper and add to pending queue + auto rowVectorStream = std::make_shared(pool_, iterator, outputType_); + pendingIterators_.push_back(rowVectorStream); +} + +std::optional ValueStreamDataSource::next( + uint64_t size, + facebook::velox::ContinueFuture& future) { + // Try to get current iterator if we don't have one + while (!currentIterator_) { + if (pendingIterators_.empty()) { + // No more iterators to process + return nullptr; + } + + // Get next RowVectorStream from queue + currentIterator_ = pendingIterators_.front(); + pendingIterators_.erase(pendingIterators_.begin()); + } + + // Check if current stream has more data + if (!currentIterator_->hasNext()) { + // Current stream exhausted, try next one + currentIterator_ = nullptr; + return next(size, future); // Recursively try next stream + } + + // Get next batch from current stream (RowVectorStream handles conversion) + auto rowVector = currentIterator_->next(); + + if (!rowVector) { + currentIterator_ = nullptr; + return next(size, future); // Recursively try next stream + } + + // Update metrics + completedRows_ += rowVector->size(); + completedBytes_ += rowVector->estimateFlatSize(); + + return rowVector; +} + +} // namespace gluten \ No newline at end of file diff --git a/cpp/velox/operators/plannodes/RowVectorStream.h b/cpp/velox/operators/plannodes/RowVectorStream.h index d6b8d37bcd0e..6e6ccd1527d5 100644 --- a/cpp/velox/operators/plannodes/RowVectorStream.h +++ b/cpp/velox/operators/plannodes/RowVectorStream.h @@ -19,6 +19,8 @@ #include "compute/ResultIterator.h" #include "memory/VeloxColumnarBatch.h" +#include "operators/plannodes/IteratorSplit.h" +#include "velox/connectors/Connector.h" #include "velox/exec/Driver.h" #include "velox/exec/Operator.h" #include "velox/exec/Task.h" @@ -30,11 +32,10 @@ class RowVectorStream { virtual ~RowVectorStream() = default; explicit RowVectorStream( - facebook::velox::exec::DriverCtx* driverCtx, facebook::velox::memory::MemoryPool* pool, - ResultIterator* iterator, + std::shared_ptr iterator, const facebook::velox::RowTypePtr& outputType) - : driverCtx_(driverCtx), pool_(pool), outputType_(outputType), iterator_(iterator) {} + : pool_(pool), outputType_(outputType), iterator_(iterator) {} bool hasNext(); @@ -45,100 +46,133 @@ class RowVectorStream { // Get the next batch from iterator_. std::shared_ptr nextInternal(); - facebook::velox::exec::DriverCtx* driverCtx_; facebook::velox::memory::MemoryPool* pool_; const facebook::velox::RowTypePtr outputType_; - ResultIterator* iterator_; + std::shared_ptr iterator_; bool finished_{false}; }; -class ValueStreamNode final : public facebook::velox::core::PlanNode { +/// DataSource implementation that reads from ResultIterator instances. +/// This allows iterator-based data to be consumed via Velox's standard +/// connector/split mechanism, enabling proper integration with Task::addSplit(). +class ValueStreamDataSource : public facebook::velox::connector::DataSource { public: - ValueStreamNode( - const facebook::velox::core::PlanNodeId& id, + ValueStreamDataSource( const facebook::velox::RowTypePtr& outputType, - std::shared_ptr iterator) - : facebook::velox::core::PlanNode(id), outputType_(outputType), iterator_(std::move(iterator)) {} + const facebook::velox::connector::ConnectorTableHandlePtr& tableHandle, + const facebook::velox::connector::ColumnHandleMap& columnHandles, + facebook::velox::connector::ConnectorQueryCtx* connectorQueryCtx); - const facebook::velox::RowTypePtr& outputType() const override { - return outputType_; - } + void addSplit(std::shared_ptr split) override; - const std::vector& sources() const override { - return kEmptySources_; - }; + std::optional next(uint64_t size, facebook::velox::ContinueFuture& future) override; - ResultIterator* iterator() const { - return iterator_.get(); + void addDynamicFilter( + facebook::velox::column_index_t outputChannel, + const std::shared_ptr& filter) override { + // Iterator-based sources don't support dynamic filtering } - std::string_view name() const override { - return "ValueStream"; + uint64_t getCompletedBytes() override { + return completedBytes_; } - folly::dynamic serialize() const override { - VELOX_UNSUPPORTED("ValueStream plan node is not serializable"); + uint64_t getCompletedRows() override { + return completedRows_; } - private: - void addDetails(std::stringstream& stream) const override{}; + std::unordered_map getRuntimeStats() override { + return {}; + } + private: const facebook::velox::RowTypePtr outputType_; - std::shared_ptr iterator_; - const std::vector kEmptySources_; + facebook::velox::memory::MemoryPool* pool_; + + std::vector> pendingIterators_; + std::shared_ptr currentIterator_{nullptr}; + uint64_t completedBytes_{0}; + uint64_t completedRows_{0}; }; -class ValueStream : public facebook::velox::exec::SourceOperator { +/// Table handle for iterator-based scans +class ValueStreamTableHandle : public facebook::velox::connector::ConnectorTableHandle { public: - ValueStream( - int32_t operatorId, - facebook::velox::exec::DriverCtx* driverCtx, - std::shared_ptr valueStreamNode) - : facebook::velox::exec::SourceOperator( - driverCtx, - valueStreamNode->outputType(), - operatorId, - valueStreamNode->id(), - valueStreamNode->name().data()) { - ResultIterator* itr = valueStreamNode->iterator(); - rvStream_ = std::make_unique(driverCtx, pool(), itr, outputType_); + explicit ValueStreamTableHandle(std::string connectorId) : ConnectorTableHandle(connectorId) {} + + const std::string& name() const override { + static const std::string kName = "ValueStreamTableHandle"; + return kName; } - facebook::velox::RowVectorPtr getOutput() override { - if (finished_) { - return nullptr; - } - if (rvStream_->hasNext()) { - return rvStream_->next(); - } else { - finished_ = true; - return nullptr; - } + folly::dynamic serialize() const override { + VELOX_NYI(); } +}; + +/// Column handle for iterator-based scans +class ValueStreamColumnHandle : public facebook::velox::connector::ColumnHandle { + public: + ValueStreamColumnHandle(std::string name, facebook::velox::TypePtr type) + : name_(std::move(name)), type_(std::move(type)) {} - facebook::velox::exec::BlockingReason isBlocked(facebook::velox::ContinueFuture* /* unused */) override { - return facebook::velox::exec::BlockingReason::kNotBlocked; + const std::string& name() const { + return name_; } - bool isFinished() override { - return finished_; + const facebook::velox::TypePtr& type() const { + return type_; } private: - bool finished_ = false; - std::unique_ptr rvStream_; + std::string name_; + facebook::velox::TypePtr type_; }; -class RowVectorStreamOperatorTranslator : public facebook::velox::exec::Operator::PlanNodeTranslator { - std::unique_ptr toOperator( - facebook::velox::exec::DriverCtx* ctx, - int32_t id, - const facebook::velox::core::PlanNodePtr& node) override { - if (auto valueStreamNode = std::dynamic_pointer_cast(node)) { - return std::make_unique(id, ctx, valueStreamNode); - } - return nullptr; +/// Connector implementation for iterator-based data sources +class ValueStreamConnector : public facebook::velox::connector::Connector { + public: + explicit ValueStreamConnector( + const std::string& id, + std::shared_ptr config) + : Connector(id, config) {} + + std::unique_ptr createDataSource( + const facebook::velox::RowTypePtr& outputType, + const facebook::velox::connector::ConnectorTableHandlePtr& tableHandle, + const facebook::velox::connector::ColumnHandleMap& columnHandles, + facebook::velox::connector::ConnectorQueryCtx* connectorQueryCtx) override { + return std::make_unique(outputType, tableHandle, columnHandles, connectorQueryCtx); + } + + std::unique_ptr createDataSink( + facebook::velox::RowTypePtr inputType, + facebook::velox::connector::ConnectorInsertTableHandlePtr connectorInsertTableHandle, + facebook::velox::connector::ConnectorQueryCtx* connectorQueryCtx, + facebook::velox::connector::CommitStrategy commitStrategy) override { + VELOX_UNSUPPORTED("ValueStreamConnector does not support data sinks"); + } +}; + +/// Factory for creating ValueStreamConnector instances +class ValueStreamConnectorFactory : public facebook::velox::connector::ConnectorFactory { + public: + static constexpr const char* kValueStreamConnectorName = "value-stream"; + + static std::string nodeIdOf(int32_t streamIdx) { + return fmt::format("{}:{}", kValueStreamConnectorName, streamIdx); + } + + ValueStreamConnectorFactory() : ConnectorFactory(kValueStreamConnectorName) {} + + std::shared_ptr newConnector( + const std::string& id, + std::shared_ptr config, + folly::Executor* ioExecutor = nullptr, + folly::Executor* cpuExecutor = nullptr) override { + return std::make_shared(id, config); } }; + } // namespace gluten diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index b20b4a3d0901..0085c553ba85 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -899,15 +899,15 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: SubstraitParser::configSetInOptimization(generateRel.advanced_extension(), "injectedProject="); if (injectedProject) { - // Child should be either ProjectNode or ValueStreamNode in case of project fallback. + // Child should be either ProjectNode or CudfValueStreamNode (GPU) in case of project fallback. VELOX_CHECK( (std::dynamic_pointer_cast(childNode) != nullptr || - std::dynamic_pointer_cast(childNode) != nullptr) + std::dynamic_pointer_cast(childNode) != nullptr #ifdef GLUTEN_ENABLE_GPU || std::dynamic_pointer_cast(childNode) != nullptr #endif - && childNode->outputType()->size() > requiredChildOutput.size(), - "injectedProject is true, but the ProjectNode or ValueStreamNode (in case of projection fallback)" + ) && childNode->outputType()->size() > requiredChildOutput.size(), + "injectedProject is true, but the ProjectNode or TableScanNode or CudfValueStreamNode (in case of projection fallback)" " is missing or does not have the corresponding projection field"); bool isStack = generateRel.has_advanced_extension() && @@ -1281,8 +1281,56 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: nextPlanNodeId(), sortingKeys, sortingOrders, static_cast(topNRel.n()), false /*isPartial*/, childNode); } -template core::PlanNodePtr SubstraitToVeloxPlanConverter::constructValueStreamNode( + const ::substrait::ReadRel& readRel, int32_t streamIdx) { + // Use TableScanNode with iterator connector for runtime iterator inputs + // Get output schema from ReadRel + uint64_t colNum = 0; + std::vector veloxTypeList; + if (readRel.has_base_schema()) { + const auto& baseSchema = readRel.base_schema(); + colNum = baseSchema.names().size(); + veloxTypeList = SubstraitParser::parseNamedStruct(baseSchema); + } + + auto nodeId = ValueStreamConnectorFactory::nodeIdOf(streamIdx); + std::vector outNames; + outNames.reserve(colNum); + for (int idx = 0; idx < colNum; idx++) { + // TODO: We'd use the designated names in readRel rather than assigning new names. + auto colName = fmt::format("node_{}_{}", nodeId, idx); + outNames.emplace_back(colName); + } + auto outputType = ROW(std::move(outNames), std::move(veloxTypeList)); + + // Create TableHandle + auto tableHandle = std::make_shared(kIteratorConnectorId); + + // Create column assignments + connector::ColumnHandleMap assignments; + for (int idx = 0; idx < outputType->size(); idx++) { + auto name = outputType->nameOf(idx); + auto type = outputType->childAt(idx); + assignments[name] = std::make_shared(name, type); + } + + // Create TableScanNode + auto tableScanNode = std::make_shared( + nodeId, + outputType, + tableHandle, + assignments); + + // Mark this as a stream-based split + auto splitInfo = std::make_shared(); + splitInfo->isStream = true; + splitInfoMap_[tableScanNode->id()] = splitInfo; + + return tableScanNode; +} + +#ifdef GLUTEN_ENABLE_GPU +core::PlanNodePtr SubstraitToVeloxPlanConverter::constructCudfValueStreamNode( const ::substrait::ReadRel& readRel, int32_t streamIdx) { // Get the input schema of this iterator. @@ -1307,28 +1355,31 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::constructValueStreamNode( std::shared_ptr iterator; if (!validationMode_) { VELOX_CHECK_LT(streamIdx, inputIters_.size(), "Could not find stream index {} in input iterator list.", streamIdx); - iterator = inputIters_[streamIdx]; + iterator = std::move(inputIters_[streamIdx]); } - auto node = std::make_shared(nextPlanNodeId(), outputType, std::move(iterator)); + auto node = std::make_shared(nextPlanNodeId(), outputType, std::move(iterator)); auto splitInfo = std::make_shared(); splitInfo->isStream = true; splitInfoMap_[node->id()] = splitInfo; return node; } +#endif core::PlanNodePtr SubstraitToVeloxPlanConverter::constructValuesNode( const ::substrait::ReadRel& readRel, int32_t streamIdx) { - std::vector values; + // ValuesNode is only used for validation/benchmarking with query trace + // It loads all data from the iterator at plan construction time VELOX_CHECK_LT(streamIdx, inputIters_.size(), "Could not find stream index {} in input iterator list.", streamIdx); - const auto iterator = inputIters_[streamIdx]; - while (iterator->hasNext()) { - auto cb = VeloxColumnarBatch::from(defaultLeafVeloxMemoryPool().get(), iterator->next()); - values.emplace_back(cb->getRowVector()); - } - auto node = std::make_shared(nextPlanNodeId(), std::move(values)); - + const auto iter = std::move(inputIters_[streamIdx]); + std::vector rowVectors; + while (iter->hasNext()) { + auto batch = iter->next(); + auto veloxBatch = VeloxColumnarBatch::from(defaultLeafVeloxMemoryPool().get(), batch); + rowVectors.emplace_back(veloxBatch->getRowVector()); + } + auto node = std::make_shared(nextPlanNodeId(), std::move(rowVectors)); auto splitInfo = std::make_shared(); splitInfo->isStream = true; splitInfoMap_[node->id()] = splitInfo; @@ -1343,19 +1394,20 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: !readRel.common().has_emit(), "Emit not supported for ValuesNode and TableScanNode related Substrait plans."); } - // Check if the ReadRel specifies an input of stream. If yes, build ValueStreamNode as the data source. auto streamIdx = getStreamIndex(readRel); if (streamIdx >= 0) { + // Check if the ReadRel specifies an input of stream. If yes, build TableScanNode with iterator connector. + const bool isQueryTraceEnabled = veloxCfg_->get(kQueryTraceEnabled, false); + if (isQueryTraceEnabled) { + // Only used in benchmark enable query trace, replace ValueStreamNode to ValuesNode to support serialization. + return constructValuesNode(readRel, streamIdx); + } #ifdef GLUTEN_ENABLE_GPU if (veloxCfg_->get(kCudfEnabled, kCudfEnabledDefault)) { - return constructValueStreamNode(readRel, streamIdx); + return constructCudfValueStreamNode(readRel, streamIdx); } #endif - if (!veloxCfg_->get(kQueryTraceEnabled, false)) { - return constructValueStreamNode(readRel, streamIdx); - } - // Only used in benchmark enable query trace, replace ValueStreamNode to ValuesNode to support serialization. - return constructValuesNode(readRel, streamIdx); + return constructValueStreamNode(readRel, streamIdx); } // Otherwise, will create TableScan node for ReadRel. @@ -1651,14 +1703,4 @@ bool SubstraitToVeloxPlanConverter::checkTypeExtension(const ::substrait::Plan& return true; } -#ifdef GLUTEN_ENABLE_GPU -template core::PlanNodePtr SubstraitToVeloxPlanConverter::constructValueStreamNode( - const ::substrait::ReadRel& sRead, - int32_t streamIdx); -#endif - -template core::PlanNodePtr SubstraitToVeloxPlanConverter::constructValueStreamNode( - const ::substrait::ReadRel& sRead, - int32_t streamIdx); - } // namespace gluten diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.h b/cpp/velox/substrait/SubstraitToVeloxPlan.h index 48e5709ea88c..0e00764a66e2 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.h +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.h @@ -70,11 +70,13 @@ class SubstraitToVeloxPlanConverter { explicit SubstraitToVeloxPlanConverter( memory::MemoryPool* pool, const facebook::velox::config::ConfigBase* veloxCfg, + const std::vector>& inputIters, const std::optional writeFilesTempPath = std::nullopt, const std::optional writeFileName = std::nullopt, bool validationMode = false) : pool_(pool), veloxCfg_(veloxCfg), + inputIters_(inputIters), writeFilesTempPath_(writeFilesTempPath), writeFileName_(writeFileName), validationMode_(validationMode) { @@ -133,9 +135,12 @@ class SubstraitToVeloxPlanConverter { /// FileProperties: the file sizes and modification times of the files to be scanned. core::PlanNodePtr toVeloxPlan(const ::substrait::ReadRel& sRead); - template + // Construct a table scan node accepting value streams as input. core::PlanNodePtr constructValueStreamNode(const ::substrait::ReadRel& sRead, int32_t streamIdx); + // Construct a cuDF value stream node. + core::PlanNodePtr constructCudfValueStreamNode(const ::substrait::ReadRel& sRead, int32_t streamIdx); + // This is only used in benchmark and enable query trace, which will load all the data to ValuesNode. core::PlanNodePtr constructValuesNode(const ::substrait::ReadRel& sRead, int32_t streamIdx); @@ -181,8 +186,10 @@ class SubstraitToVeloxPlanConverter { splitInfos_ = splitInfos; } - void setInputIters(std::vector> inputIters) { - inputIters_ = std::move(inputIters); + /// The input iterators not inlined to VeloxPlan. They should be then manually added to the Velox task + /// via WholeStageResultIterator#addIteratorSplits. Empty if no input iterators remaining. + const std::vector>& remainingInputIterators() const { + return inputIters_; } /// Used to check if ReadRel specifies an input of stream. @@ -271,8 +278,6 @@ class SubstraitToVeloxPlanConverter { /// The map storing the split stats for each PlanNode. std::unordered_map> splitInfoMap_; - std::vector> inputIters_; - /// The map storing the pre-built plan nodes which can be accessed through /// index. This map is only used when the computation of a Substrait plan /// depends on other input nodes. @@ -291,6 +296,9 @@ class SubstraitToVeloxPlanConverter { /// A map of custom configs. const facebook::velox::config::ConfigBase* veloxCfg_; + /// Input row-vectors for query trace mode (ValuesNode / cuDF ValueStream support) + std::vector> inputIters_; + /// The temporary path used to write files. std::optional writeFilesTempPath_; std::optional writeFileName_; diff --git a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h index 9005604f817f..8afc7c5bf8b2 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h +++ b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h @@ -33,8 +33,8 @@ class SubstraitToVeloxPlanValidator { std::unordered_map configs{ {velox::core::QueryConfig::kSparkPartitionId, "0"}, {velox::core::QueryConfig::kSessionTimezone, "GMT"}}; veloxCfg_ = std::make_shared(std::move(configs)); - planConverter_ = - std::make_unique(pool, veloxCfg_.get(), std::nullopt, std::nullopt, true); + planConverter_ = std::make_unique( + pool, veloxCfg_.get(), std::vector>{}, std::nullopt, std::nullopt, true); queryCtx_ = velox::core::QueryCtx::create(nullptr, velox::core::QueryConfig(veloxCfg_->rawConfigs())); // An execution context used for function validation. execCtx_ = std::make_unique(pool, queryCtx_.get()); diff --git a/cpp/velox/tests/FunctionTest.cc b/cpp/velox/tests/FunctionTest.cc index fc7892003f62..17645aa1a95a 100644 --- a/cpp/velox/tests/FunctionTest.cc +++ b/cpp/velox/tests/FunctionTest.cc @@ -44,7 +44,7 @@ class FunctionTest : public ::testing::Test, public test::VectorTestBase { std::shared_ptr veloxCfg_ = std::make_shared(std::unordered_map()); std::shared_ptr planConverter_ = - std::make_shared(pool(), veloxCfg_.get()); + std::make_shared(pool(), veloxCfg_.get(), std::vector>()); }; TEST_F(FunctionTest, makeNames) { diff --git a/cpp/velox/tests/RuntimeTest.cc b/cpp/velox/tests/RuntimeTest.cc index 8847fed685cb..5487e7fd2e4e 100644 --- a/cpp/velox/tests/RuntimeTest.cc +++ b/cpp/velox/tests/RuntimeTest.cc @@ -65,6 +65,11 @@ class DummyRuntime final : public Runtime { auto iter = std::make_shared(std::move(resIter)); return iter; } + + void noMoreSplits(ResultIterator* iter) override { + // Do nothing. + } + MemoryManager* memoryManager() override { throw GlutenException("Not yet implemented"); } @@ -150,6 +155,7 @@ TEST(TestRuntime, GetResultIterator) { DummyMemoryManager mm(kDummyBackendKind); auto runtime = std::make_shared(kDummyBackendKind, &mm, std::unordered_map()); auto iter = runtime->createResultIterator("/tmp/test-spill", {}); + runtime->noMoreSplits(iter.get()); ASSERT_TRUE(iter->hasNext()); auto next = iter->next(); ASSERT_NE(next, nullptr); diff --git a/cpp/velox/tests/Substrait2VeloxPlanConversionTest.cc b/cpp/velox/tests/Substrait2VeloxPlanConversionTest.cc index f1f6ef3865ad..8222f74caae6 100644 --- a/cpp/velox/tests/Substrait2VeloxPlanConversionTest.cc +++ b/cpp/velox/tests/Substrait2VeloxPlanConversionTest.cc @@ -72,7 +72,7 @@ class Substrait2VeloxPlanConversionTest : public exec::test::HiveConnectorTestBa std::shared_ptr veloxCfg_ = std::make_shared(std::unordered_map()); std::shared_ptr planConverter_ = - std::make_shared(std::vector>(), pool(), veloxCfg_.get()); + std::make_shared(pool(), veloxCfg_.get(), std::vector>()); }; // This test will firstly generate mock TPC-H lineitem ORC file. Then, Velox's diff --git a/cpp/velox/tests/Substrait2VeloxValuesNodeConversionTest.cc b/cpp/velox/tests/Substrait2VeloxValuesNodeConversionTest.cc index 0a2b40952621..d041bc359af2 100644 --- a/cpp/velox/tests/Substrait2VeloxValuesNodeConversionTest.cc +++ b/cpp/velox/tests/Substrait2VeloxValuesNodeConversionTest.cc @@ -43,7 +43,7 @@ TEST_F(Substrait2VeloxValuesNodeConversionTest, valuesNode) { JsonToProtoConverter::readFromFile(planPath, substraitPlan); auto veloxCfg = std::make_shared(std::unordered_map()); std::shared_ptr planConverter_ = - std::make_shared(pool_.get(), veloxCfg.get(), std::nullopt, std::nullopt, true); + std::make_shared(pool_.get(), veloxCfg.get(), std::vector>(), std::nullopt, std::nullopt, true); auto veloxPlan = planConverter_->toVeloxPlan(substraitPlan); RowVectorPtr expectedData = makeRowVector( diff --git a/cpp/velox/tests/VeloxSubstraitRoundTripTest.cc b/cpp/velox/tests/VeloxSubstraitRoundTripTest.cc index 7b8e8336ea9c..ae150bb98ae3 100644 --- a/cpp/velox/tests/VeloxSubstraitRoundTripTest.cc +++ b/cpp/velox/tests/VeloxSubstraitRoundTripTest.cc @@ -72,7 +72,7 @@ class VeloxSubstraitRoundTripTest : public OperatorTestBase { auto veloxCfg = std::make_shared(std::unordered_map()); std::shared_ptr substraitConverter_ = - std::make_shared(pool_.get(), veloxCfg.get(), std::nullopt, std::nullopt, true); + std::make_shared(pool_.get(), veloxCfg.get(), std::vector>(), std::nullopt, std::nullopt, true); // Convert Substrait Plan to the same Velox Plan. auto samePlan = substraitConverter_->toVeloxPlan(substraitPlan); @@ -94,7 +94,7 @@ class VeloxSubstraitRoundTripTest : public OperatorTestBase { std::make_shared(std::unordered_map()); std::shared_ptr substraitConverter_ = std::make_shared( - pool_.get(), veloxCfg.get(), std::nullopt, std::nullopt, true); + pool_.get(), veloxCfg.get(), std::vector>(), std::nullopt, std::nullopt, true); // Convert Substrait Plan to the same Velox Plan. auto samePlan = substraitConverter_->toVeloxPlan(substraitPlan); diff --git a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java index f4d2c8e7d1b1..ad200dd46c81 100644 --- a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java +++ b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java @@ -53,6 +53,11 @@ public long itrHandle() { private native void nativeClose(long iterHandle); + private native boolean nativeAddIteratorSplits( + long iterHandle, ColumnarBatchInIterator[] batchItr); + + private native void nativeNoMoreSplits(long iterHandle); + @Override public boolean hasNext0() throws IOException { return nativeHasNext(iterHandle); @@ -75,6 +80,34 @@ public long spill(long size) { } } + /** + * Add new iterator splits to the iterator as new inputs for processing. Note: File-based splits + * are not supported. + * + * @param batchItr Array of iterators to add as splits + * @return true if splits were added successfully, false otherwise + * @throws IllegalStateException if the iterator is closed + */ + public boolean addIteratorSplits(ColumnarBatchInIterator[] batchItr) { + if (closed.get()) { + throw new IllegalStateException("Cannot add splits to a closed iterator"); + } + return nativeAddIteratorSplits(iterHandle, batchItr); + } + + /** + * Signal that no more splits will be added to the iterator. This is required for proper task + * completion and is a prerequisite for barrier support. + * + * @throws IllegalStateException if the iterator is closed + */ + public void noMoreSplits() { + if (closed.get()) { + throw new IllegalStateException("Cannot call noMoreSplits on a closed iterator"); + } + nativeNoMoreSplits(iterHandle); + } + @Override public void close0() { // To make sure the outputted batches are still accessible after the iterator is closed. diff --git a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java index 813578566baa..6d2c90896b2a 100644 --- a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java +++ b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java @@ -28,7 +28,6 @@ import org.slf4j.LoggerFactory; import java.nio.charset.StandardCharsets; -import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; @@ -71,10 +70,11 @@ public static void injectWriteFilesTempPath(String path, String fileName) { // Used by WholeStageTransform to create the native computing pipeline and // return a columnar result iterator. + // Supports both creation-time splits (splitInfo, iterList) and runtime splits (via addSplits()). public ColumnarBatchOutIterator createKernelWithBatchIterator( byte[] wsPlan, byte[][] splitInfo, - List iterList, + ColumnarBatchInIterator[] iterList, int partitionIndex, String spillDirPath) throws RuntimeException { @@ -82,7 +82,7 @@ public ColumnarBatchOutIterator createKernelWithBatchIterator( jniWrapper.nativeCreateKernelWithIterator( wsPlan, splitInfo, - iterList.toArray(new ColumnarBatchInIterator[0]), + iterList, TaskContext.get().stageId(), partitionIndex, // TaskContext.getPartitionId(), TaskContext.get().taskAttemptId(), diff --git a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java index b8de4d63b569..a80829067987 100644 --- a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java +++ b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java @@ -62,8 +62,11 @@ public long rtHandle() { public native String nativePlanString(byte[] substraitPlan, Boolean details); /** - * Create a native compute kernel and return a columnar result iterator. + * Create a native compute kernel and return a columnar result iterator. Supports both + * creation-time splits (splitInfo, batchItr) and runtime splits (via addSplits). * + * @param splitInfo optional file-based splits to add at creation time (can be null) + * @param batchItr optional iterator-based splits to add at creation time (can be null) * @return iterator instance id */ public native long nativeCreateKernelWithIterator(