Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cpp/core/config/GlutenConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ const std::string kSparkJsonIgnoreNullFields = "spark.sql.jsonGenerator.ignoreNu

// cudf
const std::string kCudfEnabled = "spark.gluten.sql.columnar.cudf";
const bool kCudfEnabledDefault = "true";
constexpr bool kCudfEnabledDefault = true;
const std::string kDebugCudf = "spark.gluten.sql.debug.cudf";
const bool kDebugCudfDefault = "false";
const std::string kDebugCudfDefault = "false";

std::unordered_map<std::string, std::string>
parseConfMap(JNIEnv* env, const uint8_t* planData, const int32_t planDataLength);
Expand Down
16 changes: 11 additions & 5 deletions cpp/velox/compute/VeloxBackend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "utils/qat/QatCodec.h"
#endif
#ifdef GLUTEN_ENABLE_GPU
#include "velox/experimental/cudf/CudfConfig.h"
#include "velox/experimental/cudf/connectors/hive/CudfHiveConnector.h"
#include "velox/experimental/cudf/exec/ToCudf.h"
#endif
Expand Down Expand Up @@ -166,11 +167,16 @@ void VeloxBackend::init(

#ifdef GLUTEN_ENABLE_GPU
if (backendConf_->get<bool>(kCudfEnabled, kCudfEnabledDefault)) {
FLAGS_velox_cudf_debug = backendConf_->get<bool>(kDebugCudf, kDebugCudfDefault);
FLAGS_velox_cudf_memory_resource = backendConf_->get<std::string>(kCudfMemoryResource, kCudfMemoryResourceDefault);
auto& options = velox::cudf_velox::CudfOptions::getInstance();
options.memoryPercent = backendConf_->get<int32_t>(kCudfMemoryPercent, kCudfMemoryPercentDefault);
velox::cudf_velox::registerCudf(options);
std::unordered_map<std::string, std::string> options = {
{velox::cudf_velox::CudfConfig::kCudfEnabled, "true"},
{velox::cudf_velox::CudfConfig::kCudfDebugEnabled, backendConf_->get(kDebugCudf, kDebugCudfDefault)},
{velox::cudf_velox::CudfConfig::kCudfMemoryResource,
backendConf_->get(kCudfMemoryResource, kCudfMemoryResourceDefault)},
{velox::cudf_velox::CudfConfig::kCudfMemoryPercent,
backendConf_->get(kCudfMemoryPercent, kCudfMemoryPercentDefault)}};
auto& cudfConfig = velox::cudf_velox::CudfConfig::getInstance();
cudfConfig.initialize(std::move(options));
velox::cudf_velox::registerCudf();
}
#endif

Expand Down
3 changes: 2 additions & 1 deletion cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#ifdef GLUTEN_ENABLE_GPU
#include <cudf/io/types.hpp>
#include <mutex>
#include "velox/experimental/cudf/CudfConfig.h"
#include "velox/experimental/cudf/connectors/hive/CudfHiveConnectorSplit.h"
#include "velox/experimental/cudf/exec/ToCudf.h"
#endif
Expand Down Expand Up @@ -661,7 +662,7 @@ std::unordered_map<std::string, std::string> WholeStageResultIterator::getQueryC
std::to_string(veloxCfg_->get<bool>(kSparkJsonIgnoreNullFields, true));

#ifdef GLUTEN_ENABLE_GPU
configs[cudf_velox::kCudfEnabled] = std::to_string(veloxCfg_->get<bool>(kCudfEnabled, false));
configs[velox::cudf_velox::CudfConfig::kCudfEnabled] = std::to_string(veloxCfg_->get<bool>(kCudfEnabled, false));
#endif

const auto setIfExists = [&](const std::string& glutenKey, const std::string& veloxKey) {
Expand Down
2 changes: 1 addition & 1 deletion cpp/velox/config/VeloxConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ const std::string kCudfMemoryResourceDefault =

// Initial percent of GPU memory to allocate for memory resource for one thread
const std::string kCudfMemoryPercent = "spark.gluten.sql.columnar.backend.velox.cudf.memoryPercent";
const int32_t kCudfMemoryPercentDefault = 50;
const std::string kCudfMemoryPercentDefault = "50";

/// Preferred size of batches in bytes to be returned by operators.
const std::string kVeloxPreferredBatchBytes = "spark.gluten.sql.columnar.backend.velox.preferredBatchBytes";
Expand Down
21 changes: 5 additions & 16 deletions cpp/velox/jni/VeloxJniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -243,30 +243,19 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_columnarbatch_VeloxColumnarBatchJ

auto repeatedBatch = ObjectStore::retrieve<ColumnarBatch>(repeatedBatchHandle);
auto nonRepeatedBatch = ObjectStore::retrieve<ColumnarBatch>(nonRepeatedBatchHandle);
GLUTEN_CHECK(rowNums == nonRepeatedBatch->numRows(),
"Row numbers after repeated do not match the expected size");
GLUTEN_CHECK(rowNums == nonRepeatedBatch->numRows(), "Row numbers after repeated do not match the expected size");

// wrap repeatedBatch's rowVector in dictionary vector.
auto vb = std::dynamic_pointer_cast<VeloxColumnarBatch>(repeatedBatch);
auto rowVector = vb->getRowVector();
std::vector<VectorPtr> outputs(rowVector->childrenSize());
for (int i = 0; i < outputs.size(); i++) {
outputs[i] = BaseVector::wrapInDictionary(
nullptr /*nulls*/,
repeatedIndices,
rowNums,
rowVector->childAt(i));
outputs[i] = BaseVector::wrapInDictionary(nullptr /*nulls*/, repeatedIndices, rowNums, rowVector->childAt(i));
}
auto newRowVector = std::make_shared<RowVector>(
veloxPool.get(),
rowVector->type(),
BufferPtr(nullptr),
rowNums,
std::move(outputs));
auto newRowVector =
std::make_shared<RowVector>(veloxPool.get(), rowVector->type(), BufferPtr(nullptr), rowNums, std::move(outputs));
repeatedBatch = std::make_shared<VeloxColumnarBatch>(std::move(newRowVector));
auto newBatch = VeloxColumnarBatch::compose(
veloxPool.get(),
{std::move(repeatedBatch), std::move(nonRepeatedBatch)});
auto newBatch = VeloxColumnarBatch::compose(veloxPool.get(), {std::move(repeatedBatch), std::move(nonRepeatedBatch)});
return ctx->saveObject(newBatch);
JNI_METHOD_END(kInvalidObjectHandle)
}
Expand Down
74 changes: 33 additions & 41 deletions cpp/velox/substrait/SubstraitToVeloxPlan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -168,15 +168,15 @@ bool SplitInfo::canUseCudfConnector() {
bool isEmpty = partitionColumns.empty();

if (!isEmpty) {
// Check if all maps are empty
bool allMapsEmpty = true;
for (const auto& m : partitionColumns) {
if (!m.empty()) {
allMapsEmpty = false;
break;
}
// Check if all maps are empty
bool allMapsEmpty = true;
for (const auto& m : partitionColumns) {
if (!m.empty()) {
allMapsEmpty = false;
break;
}
isEmpty = allMapsEmpty;
}
isEmpty = allMapsEmpty;
}
return isEmpty && format == dwio::common::FileFormat::PARQUET;
}
Expand Down Expand Up @@ -596,19 +596,17 @@ std::shared_ptr<connector::hive::HiveInsertTableHandle> makeHiveInsertTableHandl
}
if (std::find(partitionedBy.cbegin(), partitionedBy.cend(), tableColumnNames.at(i)) != partitionedBy.cend()) {
++numPartitionColumns;
columnHandles.emplace_back(
std::make_shared<connector::hive::HiveColumnHandle>(
tableColumnNames.at(i),
connector::hive::HiveColumnHandle::ColumnType::kPartitionKey,
tableColumnTypes.at(i),
tableColumnTypes.at(i)));
columnHandles.emplace_back(std::make_shared<connector::hive::HiveColumnHandle>(
tableColumnNames.at(i),
connector::hive::HiveColumnHandle::ColumnType::kPartitionKey,
tableColumnTypes.at(i),
tableColumnTypes.at(i)));
} else {
columnHandles.emplace_back(
std::make_shared<connector::hive::HiveColumnHandle>(
tableColumnNames.at(i),
connector::hive::HiveColumnHandle::ColumnType::kRegular,
tableColumnTypes.at(i),
tableColumnTypes.at(i)));
columnHandles.emplace_back(std::make_shared<connector::hive::HiveColumnHandle>(
tableColumnNames.at(i),
connector::hive::HiveColumnHandle::ColumnType::kRegular,
tableColumnTypes.at(i),
tableColumnTypes.at(i)));
}
}
VELOX_CHECK_EQ(numPartitionColumns, partitionedBy.size());
Expand All @@ -635,11 +633,10 @@ std::shared_ptr<CudfHiveInsertTableHandle> makeCudfHiveInsertTableHandle(
std::vector<std::shared_ptr<const CudfHiveColumnHandle>> columnHandles;

for (int i = 0; i < tableColumnNames.size(); ++i) {
columnHandles.push_back(
std::make_shared<CudfHiveColumnHandle>(
tableColumnNames.at(i),
tableColumnTypes.at(i),
cudf::data_type{cudf_velox::veloxToCudfTypeId(tableColumnTypes.at(i))}));
columnHandles.push_back(std::make_shared<CudfHiveColumnHandle>(
tableColumnNames.at(i),
tableColumnTypes.at(i),
cudf::data_type{cudf_velox::veloxToCudfTypeId(tableColumnTypes.at(i))}));
}

return std::make_shared<CudfHiveInsertTableHandle>(
Expand Down Expand Up @@ -741,16 +738,16 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
const auto& compressionKind =
writerOptions->compressionKind.value_or(common::CompressionKind::CompressionKind_SNAPPY);
std::shared_ptr<core::InsertTableHandle> tableHandle = std::make_shared<core::InsertTableHandle>(
kHiveConnectorId,
makeHiveInsertTableHandle(
tableColumnNames, /*inputType->names() clolumn name is different*/
inputType->children(),
partitionedKey,
bucketProperty,
makeLocationHandle(writePath, fileName, fileFormat, compressionKind, bucketProperty != nullptr),
writerOptions,
fileFormat,
compressionKind));
kHiveConnectorId,
makeHiveInsertTableHandle(
tableColumnNames, /*inputType->names() clolumn name is different*/
inputType->children(),
partitionedKey,
bucketProperty,
makeLocationHandle(writePath, fileName, fileFormat, compressionKind, bucketProperty != nullptr),
writerOptions,
fileFormat,
compressionKind));
return std::make_shared<core::TableWriteNode>(
nextPlanNodeId(),
inputType,
Expand Down Expand Up @@ -1350,12 +1347,7 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
}
common::SubfieldFilters subfieldFilters;
tableHandle = std::make_shared<connector::hive::HiveTableHandle>(
connectorId,
"hive_table",
filterPushdownEnabled,
std::move(subfieldFilters),
remainingFilter,
dataColumns);
connectorId, "hive_table", filterPushdownEnabled, std::move(subfieldFilters), remainingFilter, dataColumns);

// Get assignments and out names.
std::vector<std::string> outNames;
Expand Down
4 changes: 2 additions & 2 deletions ep/build-velox/src/get_velox.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ set -exu

CURRENT_DIR=$(cd "$(dirname "$BASH_SOURCE")"; pwd)
VELOX_REPO=https://github.com/oap-project/velox.git
VELOX_BRANCH=2025_10_06
VELOX_BRANCH=2025_10_08
VELOX_HOME=""
RUN_SETUP_SCRIPT=ON
VELOX_ENHANCED_REPO=https://github.com/IBM/velox.git
VELOX_ENHANCED_BRANCH=ibm-2025_10_06
VELOX_ENHANCED_BRANCH=ibm-2025_10_08
ENABLE_ENHANCED_FEATURES=OFF

# Developer use only for testing Velox PR.
Expand Down