Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -213,11 +213,12 @@ class VeloxIteratorApi extends IteratorApi with Logging {
val resIter: ColumnarBatchOutIterator =
transKernel.createKernelWithBatchIterator(
inputPartition.plan,
splitInfoByteArray,
columnarNativeIterators.asJava,
if (splitInfoByteArray.nonEmpty) splitInfoByteArray else null,
if (columnarNativeIterators.nonEmpty) columnarNativeIterators.toArray else null,
partitionIndex,
BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath)
)
resIter.noMoreSplits()
val itrMetrics = IteratorMetricsJniWrapper.create()

Iterators
Expand Down Expand Up @@ -261,12 +262,12 @@ class VeloxIteratorApi extends IteratorApi with Logging {
val nativeResultIterator =
transKernel.createKernelWithBatchIterator(
rootNode.toProtobuf.toByteArray,
// Final iterator does not contain scan split, so pass empty split info to native here.
new Array[Array[Byte]](0),
columnarNativeIterator.asJava,
null,
if (columnarNativeIterator.nonEmpty) columnarNativeIterator.toArray else null,
partitionIndex,
BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath)
)
nativeResultIterator.noMoreSplits()
val itrMetrics = IteratorMetricsJniWrapper.create()

Iterators
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -869,7 +869,7 @@ class MiscOperatorSuite extends VeloxWholeStageTransformerSuite with AdaptiveSpa
}
assert(wholeStageTransformers.size == 3)
val nativePlanString = wholeStageTransformers.head.nativePlanString()
assert(nativePlanString.contains("Aggregation[1][SINGLE"))
assert(nativePlanString.matches("[\\s\\S]*Aggregation\\[\\d+]\\[SINGLE[\\s\\S]*"))
assert(nativePlanString.contains("ValueStream"))
assert(wholeStageTransformers(1).nativePlanString().contains("ValueStream"))
assert(wholeStageTransformers.last.nativePlanString().contains("TableScan"))
Expand Down
5 changes: 5 additions & 0 deletions cpp/core/compute/Runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "compute/ResultIterator.h"
#include "memory/ColumnarBatch.h"
#include "memory/MemoryManager.h"
#include "memory/SplitAwareColumnarBatchIterator.h"
#include "operators/c2r/ColumnarToRow.h"
#include "operators/r2c/RowToColumnar.h"
#include "operators/serializer/ColumnarBatchSerializer.h"
Expand Down Expand Up @@ -105,6 +106,10 @@ class Runtime : public std::enable_shared_from_this<Runtime> {
throw GlutenException("Not implemented");
}

virtual void noMoreSplits(ResultIterator* iter) {
throw GlutenException("Not implemented");
}

virtual std::shared_ptr<ColumnarBatch> createOrGetEmptySchemaBatch(int32_t numRows) {
throw GlutenException("Not implemented");
}
Expand Down
3 changes: 2 additions & 1 deletion cpp/core/jni/JniCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ class SafeNativeArray {
public:
virtual ~SafeNativeArray() {
PrimitiveArray::release(env_, javaArray_, nativeArray_);
env_->DeleteLocalRef(javaArray_);
}

SafeNativeArray(const SafeNativeArray&) = delete;
Expand All @@ -255,7 +256,7 @@ class SafeNativeArray {

private:
SafeNativeArray(JNIEnv* env, JavaArrayType javaArray, JniNativeArrayType nativeArray)
: env_(env), javaArray_(javaArray), nativeArray_(nativeArray){};
: env_(env), javaArray_(static_cast<JavaArrayType>(env_->NewLocalRef(javaArray))), nativeArray_(nativeArray){};

JNIEnv* env_;
JavaArrayType javaArray_;
Expand Down
93 changes: 79 additions & 14 deletions cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <optional>
#include <string>
#include "memory/AllocationListener.h"
#include "memory/SplitAwareColumnarBatchIterator.h"
#include "operators/serializer/ColumnarBatchSerializer.h"
#include "shuffle/LocalPartitionWriter.h"
#include "shuffle/Partitioning.h"
Expand Down Expand Up @@ -455,7 +456,7 @@ Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_nativeCreateKernelWith
jobject wrapper,
jbyteArray planArr,
jobjectArray splitInfosArr,
jobjectArray iterArr,
jobjectArray batchItrArray,
jint stageId,
jint partitionId,
jlong taskId,
Expand All @@ -478,24 +479,28 @@ Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_nativeCreateKernelWith

ctx->parsePlan(safePlanArray.elems(), planSize);

for (jsize i = 0, splitInfoArraySize = env->GetArrayLength(splitInfosArr); i < splitInfoArraySize; i++) {
jbyteArray splitInfoArray = static_cast<jbyteArray>(env->GetObjectArrayElement(splitInfosArr, i));
jsize splitInfoSize = env->GetArrayLength(splitInfoArray);
auto safeSplitArray = getByteArrayElementsSafe(env, splitInfoArray);
auto splitInfoData = safeSplitArray.elems();
if (splitInfosArr != nullptr) {
for (jsize i = 0, splitInfoArraySize = env->GetArrayLength(splitInfosArr); i < splitInfoArraySize; i++) {
jbyteArray splitInfoArray = static_cast<jbyteArray>(env->GetObjectArrayElement(splitInfosArr, i));
jsize splitInfoSize = env->GetArrayLength(splitInfoArray);
auto safeSplitArray = getByteArrayElementsSafe(env, splitInfoArray);
auto splitInfoData = safeSplitArray.elems();

ctx->parseSplitInfo(splitInfoData, splitInfoSize, i);
ctx->parseSplitInfo(splitInfoData, splitInfoSize, i);
}
}

// Handle the Java iters
jsize itersLen = env->GetArrayLength(iterArr);
std::vector<std::shared_ptr<ResultIterator>> inputIters;
inputIters.reserve(itersLen);
for (int idx = 0; idx < itersLen; idx++) {
jobject iter = env->GetObjectArrayElement(iterArr, idx);
auto arrayIter = std::make_unique<JniColumnarBatchIterator>(env, iter, ctx, idx);
auto resultIter = std::make_shared<ResultIterator>(std::move(arrayIter));
inputIters.push_back(std::move(resultIter));
if (batchItrArray != nullptr) {
jsize itersLen = env->GetArrayLength(batchItrArray);
inputIters.reserve(itersLen);
for (int idx = 0; idx < itersLen; idx++) {
jobject iter = env->GetObjectArrayElement(batchItrArray, idx);
auto arrayIter = std::make_unique<JniColumnarBatchIterator>(env, iter, ctx, idx);
auto resultIter = std::make_shared<ResultIterator>(std::move(arrayIter));
inputIters.push_back(std::move(resultIter));
}
}

return ctx->saveObject(ctx->createResultIterator(spillDirStr, inputIters));
Expand Down Expand Up @@ -630,6 +635,66 @@ JNIEXPORT void JNICALL Java_org_apache_gluten_vectorized_ColumnarBatchOutIterato
JNI_METHOD_END()
}

JNIEXPORT jboolean JNICALL Java_org_apache_gluten_vectorized_ColumnarBatchOutIterator_nativeAddIteratorSplits( // NOLINT
JNIEnv* env,
jobject wrapper,
jlong iterHandle,
jobjectArray batchItrArray) {
JNI_METHOD_START
auto ctx = getRuntime(env, wrapper);
auto outIter = ObjectStore::retrieve<ResultIterator>(iterHandle);
if (outIter == nullptr) {
throw GlutenException("Invalid iterator handle for addSplits");
}

// Get the underlying split-aware iterator
auto* splitAwareIter = dynamic_cast<gluten::SplitAwareColumnarBatchIterator*>(outIter->getInputIter());
if (splitAwareIter == nullptr) {
throw GlutenException("Iterator does not support split management");
}

GLUTEN_CHECK(batchItrArray != nullptr, "FATAL: Splits to add cannot be null");

// Convert Java ColumnarBatchInIterator[] to native iterators and add as splits
jsize numIterators = env->GetArrayLength(batchItrArray);
std::vector<std::shared_ptr<ResultIterator>> inputIterators;
inputIterators.reserve(numIterators);

for (jsize idx = 0; idx < numIterators; idx++) {
jobject iter = env->GetObjectArrayElement(batchItrArray, idx);
if (iter == nullptr) {
inputIterators.push_back(nullptr);
} else {
auto arrayIter = std::make_unique<JniColumnarBatchIterator>(env, iter, ctx, idx);
auto resultIter = std::make_shared<ResultIterator>(std::move(arrayIter));
inputIterators.push_back(std::move(resultIter));
}
env->DeleteLocalRef(iter);
}

// Add iterator splits via interface method
if (!inputIterators.empty()) {
splitAwareIter->addIteratorSplits(inputIterators);
}

return true;
JNI_METHOD_END(false)
}

JNIEXPORT void JNICALL Java_org_apache_gluten_vectorized_ColumnarBatchOutIterator_nativeNoMoreSplits( // NOLINT
JNIEnv* env,
jobject wrapper,
jlong iterHandle) {
JNI_METHOD_START
auto ctx = getRuntime(env, wrapper);
auto iter = ObjectStore::retrieve<ResultIterator>(iterHandle);
if (iter == nullptr) {
throw GlutenException("Invalid iterator handle for noMoreSplits");
}
ctx->noMoreSplits(iter.get());
JNI_METHOD_END()
}

JNIEXPORT jlong JNICALL
Java_org_apache_gluten_vectorized_NativeColumnarToRowJniWrapper_nativeColumnarToRowInit( // NOLINT
JNIEnv* env,
Expand Down
49 changes: 49 additions & 0 deletions cpp/core/memory/SplitAwareColumnarBatchIterator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <memory>
#include <vector>
#include "ColumnarBatchIterator.h"

// Forward declarations
namespace substrait {
class ReadRel_LocalFiles;
}

namespace gluten {

// Forward declaration
class ResultIterator;

/// Abstract base class for iterators that support dynamic split management.
/// Provides APIs for adding splits after iterator creation and signaling completion.
class SplitAwareColumnarBatchIterator : public ColumnarBatchIterator {
public:
SplitAwareColumnarBatchIterator() = default;
virtual ~SplitAwareColumnarBatchIterator() = default;

/// Add iterator-based splits from input iterators.
virtual void addIteratorSplits(const std::vector<std::shared_ptr<ResultIterator>>& inputIterators) = 0;

/// Signal that no more splits will be added to this iterator.
/// This must be called after all splits have been added to ensure proper task completion.
virtual void noMoreSplits() = 0;
};

} // namespace gluten
1 change: 1 addition & 0 deletions cpp/velox/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ if(ENABLE_GPU)
VELOX_SRCS
cudf/CudfPlanValidator.cc
cudf/GpuLock.cc
operators/plannodes/CudfVectorStream.cc
shuffle/VeloxGpuShuffleReader.cc
shuffle/VeloxGpuShuffleWriter.cc
operators/serializer/VeloxGpuColumnarBatchSerializer.cc
Expand Down
1 change: 1 addition & 0 deletions cpp/velox/benchmarks/GenericBenchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,7 @@ auto BM_Generic = [](::benchmark::State& state,
}

auto resultIter = runtime->createResultIterator(veloxSpillDir, std::move(inputIters));
runtime->noMoreSplits(resultIter.get());
listenerPtr->setIterator(resultIter.get());

if (FLAGS_with_shuffle) {
Expand Down
6 changes: 5 additions & 1 deletion cpp/velox/compute/VeloxBackend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#include "velox/connectors/hive/BufferedInputBuilder.h"
#include "velox/connectors/hive/HiveConnector.h"
#include "velox/connectors/hive/HiveDataSource.h"
#include "operators/plannodes/RowVectorStream.h"
#include "velox/connectors/hive/storage_adapters/abfs/RegisterAbfsFileSystem.h" // @manual
#include "velox/connectors/hive/storage_adapters/gcs/RegisterGcsFileSystem.h" // @manual
#include "velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h"
Expand Down Expand Up @@ -204,7 +205,6 @@ void VeloxBackend::init(
// RSS shuffle serde.
facebook::velox::serializer::presto::PrestoVectorSerde::registerNamedVectorSerde();
}
velox::exec::Operator::registerOperator(std::make_unique<RowVectorStreamOperatorTranslator>());

initUdf();

Expand Down Expand Up @@ -317,6 +317,10 @@ void VeloxBackend::initConnector(const std::shared_ptr<velox::config::ConfigBase
}
velox::connector::registerConnector(
std::make_shared<velox::connector::hive::HiveConnector>(kHiveConnectorId, hiveConf, ioExecutor_.get()));

// Register value-stream connector for runtime iterator-based inputs
velox::connector::registerConnector(std::make_shared<ValueStreamConnector>(kIteratorConnectorId, hiveConf));

#ifdef GLUTEN_ENABLE_GPU
if (backendConf_->get<bool>(kCudfEnableTableScan, kCudfEnableTableScanDefault) &&
backendConf_->get<bool>(kCudfEnabled, kCudfEnabledDefault)) {
Expand Down
9 changes: 3 additions & 6 deletions cpp/velox/compute/VeloxPlanConverter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,25 @@
#include "VeloxPlanConverter.h"
#include <filesystem>

#include "compute/ResultIterator.h"
#include "config/GlutenConfig.h"
#include "iceberg/IcebergPlanConverter.h"
#include "velox/common/file/FileSystems.h"
#include "operators/plannodes/IteratorSplit.h"

namespace gluten {

using namespace facebook;

VeloxPlanConverter::VeloxPlanConverter(
const std::vector<std::shared_ptr<ResultIterator>>& inputIters,
velox::memory::MemoryPool* veloxPool,
const facebook::velox::config::ConfigBase* veloxCfg,
const std::vector<std::shared_ptr<ResultIterator>>& rowVectors,
const std::optional<std::string> writeFilesTempPath,
const std::optional<std::string> writeFileName,
bool validationMode)
: validationMode_(validationMode),
veloxCfg_(veloxCfg),
substraitVeloxPlanConverter_(veloxPool, veloxCfg, writeFilesTempPath, writeFileName, validationMode) {
substraitVeloxPlanConverter_(veloxPool, veloxCfg, rowVectors, writeFilesTempPath, writeFileName, validationMode) {
VELOX_USER_CHECK_NOT_NULL(veloxCfg_);
substraitVeloxPlanConverter_.setInputIters(std::move(inputIters));
}

namespace {
Expand Down Expand Up @@ -136,7 +134,6 @@ void parseLocalFileNodes(
splitInfos.reserve(localFiles.size());
for (const auto& localFile : localFiles) {
const auto& fileList = localFile.items();

splitInfos.push_back(parseScanSplitInfo(veloxCfg, fileList));
}

Expand Down
14 changes: 10 additions & 4 deletions cpp/velox/compute/VeloxPlanConverter.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,21 @@
#pragma once

#include <velox/common/memory/MemoryPool.h>
#include "compute/ResultIterator.h"
#include "memory/VeloxMemoryManager.h"
#include <velox/core/PlanNode.h>
#include <velox/exec/Split.h>

#include "substrait/SubstraitToVeloxPlan.h"
#include "substrait/plan.pb.h"
#include "velox/core/PlanNode.h"

namespace gluten {

// This class is used to convert the Substrait plan into Velox plan.
class VeloxPlanConverter {
public:
explicit VeloxPlanConverter(
const std::vector<std::shared_ptr<ResultIterator>>& inputIters,
facebook::velox::memory::MemoryPool* veloxPool,
const facebook::velox::config::ConfigBase* veloxCfg,
const std::vector<std::shared_ptr<ResultIterator>>& rowVectors,
const std::optional<std::string> writeFilesTempPath = std::nullopt,
const std::optional<std::string> writeFileName = std::nullopt,
bool validationMode = false);
Expand All @@ -45,6 +45,12 @@ class VeloxPlanConverter {
return substraitVeloxPlanConverter_.splitInfos();
}

/// The input iterators not inlined to VeloxPlan. They should be then manually added to the Velox task
/// via WholeStageResultIterator#addIteratorSplits. Empty if no input iterators remaining.
const std::vector<std::shared_ptr<ResultIterator>>& remainingInputIterators() const {
return substraitVeloxPlanConverter_.remainingInputIterators();
}

private:
bool validationMode_;

Expand Down
Loading