From 1d0f37399f98b06ddb74f14e6669e3b6e07602aa Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Thu, 4 Sep 2025 16:08:44 +0000 Subject: [PATCH 01/15] [GLUTEN-10621][VL] feat: Support cudf parquet connector --- .../apache/gluten/config/VeloxConfig.scala | 6 + cpp/velox/CMakeLists.txt | 18 ++- cpp/velox/compute/VeloxBackend.cc | 16 +++ cpp/velox/compute/VeloxPlanConverter.cc | 4 +- cpp/velox/compute/WholeStageResultIterator.cc | 39 ++++++- cpp/velox/config/VeloxConfig.h | 5 + cpp/velox/substrait/SubstraitToVeloxPlan.cc | 109 +++++++++++++++--- cpp/velox/substrait/SubstraitToVeloxPlan.h | 2 + 8 files changed, 181 insertions(+), 18 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala index e87b18e07884..86a220046d50 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala @@ -609,6 +609,12 @@ object VeloxConfig { .intConf .createWithDefault(50) + val CUDF_ENABLE_TABLE_SCAN = + buildStaticConf("spark.gluten.sql.columnar.backend.velox.cudf.enableTableScan") + .doc("Enable cudf table scan") + .booleanConf + .createWithDefault(true) + val MEMORY_DUMP_ON_EXIT = buildConf("spark.gluten.monitor.memoryDumpOnExit") .internal() diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt index dd3cd60d9cb3..036023260d73 100644 --- a/cpp/velox/CMakeLists.txt +++ b/cpp/velox/CMakeLists.txt @@ -414,9 +414,25 @@ if(ENABLE_GPU) import_library( facebook::velox::velox_cudf_vector ${VELOX_BUILD_PATH}/velox/experimental/cudf/vector/libvelox_cudf_vector.a) + import_library( + facebook::velox::velox_cudf_parquet_connector + ${VELOX_BUILD_PATH}/velox/experimental/cudf/connectors/parquet/libvelox_cudf_parquet_connector.a) + target_include_directories( + velox + PRIVATE + ${VELOX_BUILD_PATH}/_deps/cudf-src/cpp/include + ${VELOX_BUILD_PATH}/_deps/rmm-src/cpp/include + ${VELOX_BUILD_PATH}/_deps/kvikio-src/cpp/include + ${VELOX_BUILD_PATH}/_deps/nvtx3-src/c/include + ${VELOX_BUILD_PATH}/_deps/nvcomp_proprietary_binary-src/include + ${VELOX_BUILD_PATH}/_deps/rapids_logger-src/include + /usr/local/cuda/include) + + target_compile_definitions(velox PRIVATE LIBCUDACXX_ENABLE_EXPERIMENTAL_MEMORY_RESOURCE) target_link_libraries(velox PUBLIC facebook::velox::velox_cudf_exec - facebook::velox::velox_cudf_vector) + facebook::velox::velox_cudf_vector + facebook::velox::velox_cudf_parquet_connector) target_link_libraries(velox PRIVATE ${VELOX_BUILD_PATH}/_deps/cudf-build/libcudf.so) endif() diff --git a/cpp/velox/compute/VeloxBackend.cc b/cpp/velox/compute/VeloxBackend.cc index e26cd852b61a..431a8603a309 100644 --- a/cpp/velox/compute/VeloxBackend.cc +++ b/cpp/velox/compute/VeloxBackend.cc @@ -29,6 +29,7 @@ #endif #ifdef GLUTEN_ENABLE_GPU #include "velox/experimental/cudf/exec/ToCudf.h" +#include "velox/experimental/cudf/connectors/parquet/ParquetConnector.h" #endif #include "compute/VeloxRuntime.h" @@ -167,6 +168,7 @@ void VeloxBackend::init( if (backendConf_->get(kCudfEnabled, kCudfEnabledDefault)) { FLAGS_velox_cudf_debug = backendConf_->get(kDebugCudf, kDebugCudfDefault); FLAGS_velox_cudf_memory_resource = backendConf_->get(kCudfMemoryResource, kCudfMemoryResourceDefault); + FLAGS_velox_cudf_table_scan = backendConf_->get(kCudfEnableTableScan, kCudfEnableTableScanDefault); auto& options = velox::cudf_velox::CudfOptions::getInstance(); options.memoryPercent = backendConf_->get(kCudfMemoryPercent, kCudfMemoryPercentDefault); velox::cudf_velox::registerCudf(options); @@ -306,6 +308,20 @@ void VeloxBackend::initConnector(const std::shared_ptr(kHiveConnectorId, hiveConf, ioExecutor_.get())); +#ifdef GLUTEN_ENABLE_GPU + if (FLAGS_velox_cudf_table_scan) { + facebook::velox::connector::registerConnectorFactory( + std::make_shared()); + auto parquetConnector = + facebook::velox::connector::getConnectorFactory( + cudf_velox::connector::parquet::ParquetConnectorFactory::kParquetConnectorName) + ->newConnector( + kCudfHiveConnectorId, + hiveConf, + ioExecutor_.get()); + facebook::velox::connector::registerConnector(parquetConnector); + } +#endif } void VeloxBackend::initUdf() { diff --git a/cpp/velox/compute/VeloxPlanConverter.cc b/cpp/velox/compute/VeloxPlanConverter.cc index 7b58584c344d..6ef6a9b0992f 100644 --- a/cpp/velox/compute/VeloxPlanConverter.cc +++ b/cpp/velox/compute/VeloxPlanConverter.cc @@ -60,7 +60,9 @@ std::shared_ptr parseScanSplitInfo( for (const auto& partitionColumn : file.partition_columns()) { partitionColumnMap[partitionColumn.key()] = partitionColumn.value(); } - splitInfo->partitionColumns.emplace_back(partitionColumnMap); + if (!partitionColumnMap.empty()) { + splitInfo->partitionColumns.emplace_back(partitionColumnMap); + } std::unordered_map metadataColumnMap; for (const auto& metadataColumn : file.metadata_columns()) { diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index d2b130f62982..aee2422305c3 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -24,6 +24,7 @@ #ifdef GLUTEN_ENABLE_GPU #include #include "velox/experimental/cudf/exec/ToCudf.h" +#include "velox/experimental/cudf/connectors/parquet/ParquetConnectorSplit.h" #endif using namespace facebook; @@ -131,14 +132,19 @@ WholeStageResultIterator::WholeStageResultIterator( const auto& format = scanInfo->format; const auto& partitionColumns = scanInfo->partitionColumns; const auto& metadataColumns = scanInfo->metadataColumns; + // In the pre-condition all the spli infos has same partition column and format. + const auto canUseCudfConnector = scanInfo->canUseCudfConnector(); std::vector> connectorSplits; connectorSplits.reserve(paths.size()); for (int idx = 0; idx < paths.size(); idx++) { - auto partitionColumn = partitionColumns[idx]; auto metadataColumn = metadataColumns[idx]; std::unordered_map> partitionKeys; - constructPartitionColumns(partitionKeys, partitionColumn); + if (!partitionColumns.empty()) { + auto partitionColumn = partitionColumns[idx]; + constructPartitionColumns(partitionKeys, partitionColumn); + } + std::shared_ptr split; if (auto icebergSplitInfo = std::dynamic_pointer_cast(scanInfo)) { // Set Iceberg split. @@ -159,6 +165,34 @@ WholeStageResultIterator::WholeStageResultIterator( std::unordered_map(), properties[idx]); } else { +#ifdef GLUTEN_ENABLE_GPU + if (canUseCudfConnector) { + split = std::make_shared( + kCudfHiveConnectorId, + paths[idx], + starts[idx], + lengths[idx], + 0 /*splitWeight*/, + metadataColumn); + } else { + split = std::make_shared( + kHiveConnectorId, + paths[idx], + format, + starts[idx], + lengths[idx], + partitionKeys, + std::nullopt /*tableBucketName*/, + std::unordered_map(), + nullptr, + std::unordered_map(), + 0, + true, + metadataColumn, + properties[idx]); + } + +#else split = std::make_shared( kHiveConnectorId, paths[idx], @@ -174,6 +208,7 @@ WholeStageResultIterator::WholeStageResultIterator( true, metadataColumn, properties[idx]); +#endif } connectorSplits.emplace_back(split); } diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h index e37c99987e1c..a62e16c82afa 100644 --- a/cpp/velox/config/VeloxConfig.h +++ b/cpp/velox/config/VeloxConfig.h @@ -180,4 +180,9 @@ const int32_t kCudfMemoryPercentDefault = 50; /// Preferred size of batches in bytes to be returned by operators. const std::string kVeloxPreferredBatchBytes = "spark.gluten.sql.columnar.backend.velox.preferredBatchBytes"; +const std::string kCudfEnableTableScan = "spark.gluten.sql.columnar.backend.velox.cudf.enableTableScan"; +const bool kCudfEnableTableScanDefault = false; + +const std::string kCudfHiveConnectorId = "cudf-hive"; + } // namespace gluten diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index ceaabb5ad34f..40132371571f 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -31,9 +31,29 @@ #include "config/GlutenConfig.h" #include "config/VeloxConfig.h" +#ifdef GLUTEN_ENABLE_GPU +#include "velox/experimental/cudf/exec/ToCudf.h" +#include "velox/experimental/cudf/exec/VeloxCudfInterop.h" +#include "velox/experimental/cudf/connectors/parquet/ParquetDataSink.h" +#include "velox/experimental/cudf/connectors/parquet/ParquetTableHandle.h" +#endif + namespace gluten { namespace { + bool useCudfTableHandle(const std::vector>& splitInfos) { +#ifdef GLUTEN_ENABLE_GPU + if (splitInfos.empty()) { + return false; + } + + return splitInfos[0]->canUseCudfConnector(); + +#else + return false; +#endif +} + core::SortOrder toSortOrder(const ::substrait::SortField& sortField) { switch (sortField.direction()) { case ::substrait::SortField_SortDirection_SORT_DIRECTION_ASC_NULLS_FIRST: @@ -144,6 +164,14 @@ RowTypePtr getJoinOutputType( } // namespace +bool SplitInfo::canUseCudfConnector() { + #ifdef GLUTEN_ENABLE_GPU + return partitionColumns.empty() && format == dwio::common::FileFormat::PARQUET && FLAGS_velox_cudf_table_scan; + #else + return false; + #endif +} + core::PlanNodePtr SubstraitToVeloxPlanConverter::processEmit( const ::substrait::RelCommon& relCommon, const core::PlanNodePtr& noEmitNode) { @@ -585,6 +613,36 @@ std::shared_ptr makeHiveInsertTableHandl writerOptions); } +#ifdef GLUTEN_ENABLE_GPU + +std::shared_ptr +makeCudfParquetInsertTableHandle( + const std::vector& tableColumnNames, + const std::vector& tableColumnTypes, + std::shared_ptr locationHandle, + const std::optional compressionKind, + const std::unordered_map& serdeParameters, + const std::shared_ptr& writerOptions) { + std::vector> + columnHandles; + + for (int i = 0; i < tableColumnNames.size(); ++i) { + columnHandles.push_back( + std::make_shared( + tableColumnNames.at(i), + tableColumnTypes.at(i), + cudf::data_type{cudf_velox::veloxToCudfTypeId(tableColumnTypes.at(i))})); + } + + return std::make_shared( + columnHandles, + locationHandle, + compressionKind, + serdeParameters, + writerOptions); +} +#endif + core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::WriteRel& writeRel) { core::PlanNodePtr childNode; if (writeRel.has_input()) { @@ -678,13 +736,22 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: // Spark's default compression code is snappy. const auto& compressionKind = writerOptions->compressionKind.value_or(common::CompressionKind::CompressionKind_SNAPPY); - - return std::make_shared( - nextPlanNodeId(), - inputType, - tableColumnNames, - std::nullopt, /*columnStatsSpec*/ - std::make_shared( + std::shared_ptr tableHandle; + if (useCudfTableHandle(splitInfos_)) { +#ifdef GLUTEN_ENABLE_GPU + tableHandle = std::make_shared( + kCudfHiveConnectorId, + makeCudfParquetInsertTableHandle( + tableColumnNames, /*inputType->names() clolumn name is different*/ + inputType->children(), + std::make_shared( + writePath, cudf_velox::connector::parquet::LocationHandle::TableType::kNew, fileName), + compressionKind, + {}, + writerOptions)); +#endif + } else { + tableHandle = std::make_shared( kHiveConnectorId, makeHiveInsertTableHandle( tableColumnNames, /*inputType->names() clolumn name is different*/ @@ -694,7 +761,14 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: makeLocationHandle(writePath, fileName, fileFormat, compressionKind, bucketProperty != nullptr), writerOptions, fileFormat, - compressionKind)), + compressionKind)); + } + return std::make_shared( + nextPlanNodeId(), + inputType, + tableColumnNames, + std::nullopt, /*columnStatsSpec*/ + tableHandle, (!partitionedKey.empty()), exec::TableWriteTraits::outputType(std::nullopt), connector::CommitStrategy::kNoCommit, @@ -1277,14 +1351,21 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: auto names = colNameList; auto types = veloxTypeList; auto dataColumns = ROW(std::move(names), std::move(types)); - std::shared_ptr tableHandle; - if (!readRel.has_filter()) { - tableHandle = std::make_shared( - kHiveConnectorId, "hive_table", filterPushdownEnabled, common::SubfieldFilters{}, nullptr, dataColumns); + connector::ConnectorTableHandlePtr tableHandle; + auto remainingFilter = readRel.has_filter() ? exprConverter_->toVeloxExpr(readRel.filter(), dataColumns) : nullptr; + if (useCudfTableHandle(splitInfos_)) { +#ifdef GLUTEN_ENABLE_GPU + LOG(INFO) << "use gpu read ParquetTableHandle"; + tableHandle = std::make_shared( + kCudfHiveConnectorId, + "hive_table", + filterPushdownEnabled, + nullptr, + remainingFilter, + dataColumns); +#endif } else { common::SubfieldFilters subfieldFilters; - auto remainingFilter = exprConverter_->toVeloxExpr(readRel.filter(), dataColumns); - tableHandle = std::make_shared( kHiveConnectorId, "hive_table", diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.h b/cpp/velox/substrait/SubstraitToVeloxPlan.h index 1d816da612f6..b00c3447f712 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.h +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.h @@ -57,6 +57,8 @@ struct SplitInfo { /// Make SplitInfo polymorphic virtual ~SplitInfo() = default; + + bool canUseCudfConnector(); }; /// This class is used to convert the Substrait plan into Velox plan. From 0084c5965b2ebd99f76d7589df9a99f30ce30ffc Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Wed, 24 Sep 2025 11:14:02 +0100 Subject: [PATCH 02/15] use new interface --- .../apache/gluten/config/VeloxConfig.scala | 2 +- cpp/velox/compute/VeloxBackend.cc | 16 +++------ cpp/velox/compute/WholeStageResultIterator.cc | 36 +++++-------------- cpp/velox/substrait/SubstraitToVeloxPlan.cc | 32 +++++++++-------- 4 files changed, 31 insertions(+), 55 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala index 86a220046d50..ab43af282f0a 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala @@ -613,7 +613,7 @@ object VeloxConfig { buildStaticConf("spark.gluten.sql.columnar.backend.velox.cudf.enableTableScan") .doc("Enable cudf table scan") .booleanConf - .createWithDefault(true) + .createWithDefault(false) val MEMORY_DUMP_ON_EXIT = buildConf("spark.gluten.monitor.memoryDumpOnExit") diff --git a/cpp/velox/compute/VeloxBackend.cc b/cpp/velox/compute/VeloxBackend.cc index 431a8603a309..dda57d1f8898 100644 --- a/cpp/velox/compute/VeloxBackend.cc +++ b/cpp/velox/compute/VeloxBackend.cc @@ -29,7 +29,7 @@ #endif #ifdef GLUTEN_ENABLE_GPU #include "velox/experimental/cudf/exec/ToCudf.h" -#include "velox/experimental/cudf/connectors/parquet/ParquetConnector.h" +#include "velox/experimental/cudf/connectors/hive/CudfHiveConnector.h" #endif #include "compute/VeloxRuntime.h" @@ -310,16 +310,10 @@ void VeloxBackend::initConnector(const std::shared_ptr(kHiveConnectorId, hiveConf, ioExecutor_.get())); #ifdef GLUTEN_ENABLE_GPU if (FLAGS_velox_cudf_table_scan) { - facebook::velox::connector::registerConnectorFactory( - std::make_shared()); - auto parquetConnector = - facebook::velox::connector::getConnectorFactory( - cudf_velox::connector::parquet::ParquetConnectorFactory::kParquetConnectorName) - ->newConnector( - kCudfHiveConnectorId, - hiveConf, - ioExecutor_.get()); - facebook::velox::connector::registerConnector(parquetConnector); + facebook::velox::cudf_velox::connector::hive::CudfHiveConnectorFactory factory; + auto hiveConnector = + factory.newConnector(kCudfHiveConnectorId, config, ioExecutor_.get()); + facebook::velox::connector::registerConnector(hiveConnector); } #endif } diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index aee2422305c3..364f436caf1e 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -24,7 +24,7 @@ #ifdef GLUTEN_ENABLE_GPU #include #include "velox/experimental/cudf/exec/ToCudf.h" -#include "velox/experimental/cudf/connectors/parquet/ParquetConnectorSplit.h" +#include "velox/experimental/cudf/connectors/hive/HiveConnectorSplit.h" #endif using namespace facebook; @@ -132,7 +132,7 @@ WholeStageResultIterator::WholeStageResultIterator( const auto& format = scanInfo->format; const auto& partitionColumns = scanInfo->partitionColumns; const auto& metadataColumns = scanInfo->metadataColumns; - // In the pre-condition all the spli infos has same partition column and format. + // Under the pre-condition that all the split infos has same partition column and format. const auto canUseCudfConnector = scanInfo->canUseCudfConnector(); std::vector> connectorSplits; @@ -165,34 +165,14 @@ WholeStageResultIterator::WholeStageResultIterator( std::unordered_map(), properties[idx]); } else { + auto connectorId = kHiveConnectorId; #ifdef GLUTEN_ENABLE_GPU if (canUseCudfConnector) { - split = std::make_shared( - kCudfHiveConnectorId, - paths[idx], - starts[idx], - lengths[idx], - 0 /*splitWeight*/, - metadataColumn); - } else { - split = std::make_shared( - kHiveConnectorId, - paths[idx], - format, - starts[idx], - lengths[idx], - partitionKeys, - std::nullopt /*tableBucketName*/, - std::unordered_map(), - nullptr, - std::unordered_map(), - 0, - true, - metadataColumn, - properties[idx]); + connector = kCudfHiveConnectorId; + VELOX_CHECK_EQ(starts[idx], 0, "Not support split file"); + VELOX_CHECK_EQ(lengths[idx], scanInfo->properties[idx]->fileSize, "Not support split file"); } - -#else +#endif split = std::make_shared( kHiveConnectorId, paths[idx], @@ -208,7 +188,7 @@ WholeStageResultIterator::WholeStageResultIterator( true, metadataColumn, properties[idx]); -#endif + } connectorSplits.emplace_back(split); } diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index 40132371571f..131ac44b5401 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -34,8 +34,12 @@ #ifdef GLUTEN_ENABLE_GPU #include "velox/experimental/cudf/exec/ToCudf.h" #include "velox/experimental/cudf/exec/VeloxCudfInterop.h" -#include "velox/experimental/cudf/connectors/parquet/ParquetDataSink.h" -#include "velox/experimental/cudf/connectors/parquet/ParquetTableHandle.h" +#include "velox/experimental/cudf/connectors/hive/CudfHiveDataSink.h" +#include "velox/experimental/cudf/connectors/hive/CudfHiveTableHandle.h" +#endif + +#ifdef GLUTEN_ENABLE_GPU +using namespace cudf_velox::connector::hive; #endif namespace gluten { @@ -615,26 +619,25 @@ std::shared_ptr makeHiveInsertTableHandl #ifdef GLUTEN_ENABLE_GPU -std::shared_ptr -makeCudfParquetInsertTableHandle( +std::shared_ptr +makeCudfHiveInsertTableHandle( const std::vector& tableColumnNames, const std::vector& tableColumnTypes, - std::shared_ptr locationHandle, + std::shared_ptr locationHandle, const std::optional compressionKind, const std::unordered_map& serdeParameters, const std::shared_ptr& writerOptions) { - std::vector> - columnHandles; + std::vector> columnHandles; for (int i = 0; i < tableColumnNames.size(); ++i) { columnHandles.push_back( - std::make_shared( + std::make_shared( tableColumnNames.at(i), tableColumnTypes.at(i), cudf::data_type{cudf_velox::veloxToCudfTypeId(tableColumnTypes.at(i))})); } - return std::make_shared( + return std::make_shared( columnHandles, locationHandle, compressionKind, @@ -741,11 +744,11 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: #ifdef GLUTEN_ENABLE_GPU tableHandle = std::make_shared( kCudfHiveConnectorId, - makeCudfParquetInsertTableHandle( + makeCudfHiveInsertTableHandle( tableColumnNames, /*inputType->names() clolumn name is different*/ inputType->children(), - std::make_shared( - writePath, cudf_velox::connector::parquet::LocationHandle::TableType::kNew, fileName), + std::make_shared( + writePath, cudf_velox::connector::hive::LocationHandle::TableType::kNew, fileName), compressionKind, {}, writerOptions)); @@ -1355,10 +1358,9 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: auto remainingFilter = readRel.has_filter() ? exprConverter_->toVeloxExpr(readRel.filter(), dataColumns) : nullptr; if (useCudfTableHandle(splitInfos_)) { #ifdef GLUTEN_ENABLE_GPU - LOG(INFO) << "use gpu read ParquetTableHandle"; - tableHandle = std::make_shared( + tableHandle = std::make_shared( kCudfHiveConnectorId, - "hive_table", + "cudf_hive_table", filterPushdownEnabled, nullptr, remainingFilter, From 995f73d3a5fdb9a908b784e31a5e65677d31a622 Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Wed, 24 Sep 2025 11:16:33 +0100 Subject: [PATCH 03/15] fix compile --- cpp/velox/compute/WholeStageResultIterator.cc | 5 ++--- cpp/velox/substrait/SubstraitToVeloxPlan.cc | 6 +----- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index 364f436caf1e..9b2e9f00d505 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -168,13 +168,13 @@ WholeStageResultIterator::WholeStageResultIterator( auto connectorId = kHiveConnectorId; #ifdef GLUTEN_ENABLE_GPU if (canUseCudfConnector) { - connector = kCudfHiveConnectorId; + connectorId = kCudfHiveConnectorId; VELOX_CHECK_EQ(starts[idx], 0, "Not support split file"); VELOX_CHECK_EQ(lengths[idx], scanInfo->properties[idx]->fileSize, "Not support split file"); } #endif split = std::make_shared( - kHiveConnectorId, + connectorId, paths[idx], format, starts[idx], @@ -188,7 +188,6 @@ WholeStageResultIterator::WholeStageResultIterator( true, metadataColumn, properties[idx]); - } connectorSplits.emplace_back(split); } diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index 131ac44b5401..f8005b72d697 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -36,9 +36,7 @@ #include "velox/experimental/cudf/exec/VeloxCudfInterop.h" #include "velox/experimental/cudf/connectors/hive/CudfHiveDataSink.h" #include "velox/experimental/cudf/connectors/hive/CudfHiveTableHandle.h" -#endif -#ifdef GLUTEN_ENABLE_GPU using namespace cudf_velox::connector::hive; #endif @@ -618,9 +616,7 @@ std::shared_ptr makeHiveInsertTableHandl } #ifdef GLUTEN_ENABLE_GPU - -std::shared_ptr -makeCudfHiveInsertTableHandle( +std::shared_ptr makeCudfHiveInsertTableHandle( const std::vector& tableColumnNames, const std::vector& tableColumnTypes, std::shared_ptr locationHandle, From cc58e7c39a3bbbb22d9e5cb840e23dd1791c0f30 Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Wed, 24 Sep 2025 13:00:04 +0100 Subject: [PATCH 04/15] fix compile --- cpp/velox/CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt index 036023260d73..7992011582ae 100644 --- a/cpp/velox/CMakeLists.txt +++ b/cpp/velox/CMakeLists.txt @@ -416,7 +416,7 @@ if(ENABLE_GPU) ${VELOX_BUILD_PATH}/velox/experimental/cudf/vector/libvelox_cudf_vector.a) import_library( facebook::velox::velox_cudf_parquet_connector - ${VELOX_BUILD_PATH}/velox/experimental/cudf/connectors/parquet/libvelox_cudf_parquet_connector.a) + ${VELOX_BUILD_PATH}/velox/experimental/cudf/connectors/hive/libvelox_cudf_hive_connector.a) target_include_directories( velox PRIVATE From 4d4f9af5eaab65a3d550609da5db49ba94c17d35 Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Wed, 24 Sep 2025 13:24:12 +0100 Subject: [PATCH 05/15] fix compile --- cpp/velox/compute/VeloxBackend.cc | 13 ++- cpp/velox/substrait/SubstraitToVeloxPlan.cc | 94 ++++++++++----------- 2 files changed, 50 insertions(+), 57 deletions(-) diff --git a/cpp/velox/compute/VeloxBackend.cc b/cpp/velox/compute/VeloxBackend.cc index dda57d1f8898..b6c089c50bcf 100644 --- a/cpp/velox/compute/VeloxBackend.cc +++ b/cpp/velox/compute/VeloxBackend.cc @@ -28,8 +28,8 @@ #include "utils/qat/QatCodec.h" #endif #ifdef GLUTEN_ENABLE_GPU -#include "velox/experimental/cudf/exec/ToCudf.h" #include "velox/experimental/cudf/connectors/hive/CudfHiveConnector.h" +#include "velox/experimental/cudf/exec/ToCudf.h" #endif #include "compute/VeloxRuntime.h" @@ -168,7 +168,6 @@ void VeloxBackend::init( if (backendConf_->get(kCudfEnabled, kCudfEnabledDefault)) { FLAGS_velox_cudf_debug = backendConf_->get(kDebugCudf, kDebugCudfDefault); FLAGS_velox_cudf_memory_resource = backendConf_->get(kCudfMemoryResource, kCudfMemoryResourceDefault); - FLAGS_velox_cudf_table_scan = backendConf_->get(kCudfEnableTableScan, kCudfEnableTableScanDefault); auto& options = velox::cudf_velox::CudfOptions::getInstance(); options.memoryPercent = backendConf_->get(kCudfMemoryPercent, kCudfMemoryPercentDefault); velox::cudf_velox::registerCudf(options); @@ -309,11 +308,11 @@ void VeloxBackend::initConnector(const std::shared_ptr(kHiveConnectorId, hiveConf, ioExecutor_.get())); #ifdef GLUTEN_ENABLE_GPU - if (FLAGS_velox_cudf_table_scan) { - facebook::velox::cudf_velox::connector::hive::CudfHiveConnectorFactory factory; - auto hiveConnector = - factory.newConnector(kCudfHiveConnectorId, config, ioExecutor_.get()); - facebook::velox::connector::registerConnector(hiveConnector); + if (backendConf_->get(kCudfEnableTableScan, kCudfEnableTableScanDefault) && + backendConf_->get(kCudfEnabled, kCudfEnabledDefault)) { + facebook::velox::cudf_velox::connector::hive::CudfHiveConnectorFactory factory; + auto hiveConnector = factory.newConnector(kCudfHiveConnectorId, config, ioExecutor_.get()); + facebook::velox::connector::registerConnector(hiveConnector); } #endif } diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index f8005b72d697..9d96bcae51f4 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -32,10 +32,10 @@ #include "config/VeloxConfig.h" #ifdef GLUTEN_ENABLE_GPU -#include "velox/experimental/cudf/exec/ToCudf.h" -#include "velox/experimental/cudf/exec/VeloxCudfInterop.h" #include "velox/experimental/cudf/connectors/hive/CudfHiveDataSink.h" #include "velox/experimental/cudf/connectors/hive/CudfHiveTableHandle.h" +#include "velox/experimental/cudf/exec/ToCudf.h" +#include "velox/experimental/cudf/exec/VeloxCudfInterop.h" using namespace cudf_velox::connector::hive; #endif @@ -43,7 +43,7 @@ using namespace cudf_velox::connector::hive; namespace gluten { namespace { - bool useCudfTableHandle(const std::vector>& splitInfos) { +bool useCudfTableHandle(const std::vector>& splitInfos) { #ifdef GLUTEN_ENABLE_GPU if (splitInfos.empty()) { return false; @@ -53,7 +53,7 @@ namespace { #else return false; -#endif +#endif } core::SortOrder toSortOrder(const ::substrait::SortField& sortField) { @@ -167,11 +167,11 @@ RowTypePtr getJoinOutputType( } // namespace bool SplitInfo::canUseCudfConnector() { - #ifdef GLUTEN_ENABLE_GPU - return partitionColumns.empty() && format == dwio::common::FileFormat::PARQUET && FLAGS_velox_cudf_table_scan; - #else +#ifdef GLUTEN_ENABLE_GPU + return partitionColumns.empty() && format == dwio::common::FileFormat::PARQUET; +#else return false; - #endif +#endif } core::PlanNodePtr SubstraitToVeloxPlanConverter::processEmit( @@ -589,17 +589,19 @@ std::shared_ptr makeHiveInsertTableHandl } if (std::find(partitionedBy.cbegin(), partitionedBy.cend(), tableColumnNames.at(i)) != partitionedBy.cend()) { ++numPartitionColumns; - columnHandles.emplace_back(std::make_shared( - tableColumnNames.at(i), - connector::hive::HiveColumnHandle::ColumnType::kPartitionKey, - tableColumnTypes.at(i), - tableColumnTypes.at(i))); + columnHandles.emplace_back( + std::make_shared( + tableColumnNames.at(i), + connector::hive::HiveColumnHandle::ColumnType::kPartitionKey, + tableColumnTypes.at(i), + tableColumnTypes.at(i))); } else { - columnHandles.emplace_back(std::make_shared( - tableColumnNames.at(i), - connector::hive::HiveColumnHandle::ColumnType::kRegular, - tableColumnTypes.at(i), - tableColumnTypes.at(i))); + columnHandles.emplace_back( + std::make_shared( + tableColumnNames.at(i), + connector::hive::HiveColumnHandle::ColumnType::kRegular, + tableColumnTypes.at(i), + tableColumnTypes.at(i))); } } VELOX_CHECK_EQ(numPartitionColumns, partitionedBy.size()); @@ -634,11 +636,7 @@ std::shared_ptr makeCudfHiveInsertTableHandle( } return std::make_shared( - columnHandles, - locationHandle, - compressionKind, - serdeParameters, - writerOptions); + columnHandles, locationHandle, compressionKind, serdeParameters, writerOptions); } #endif @@ -736,31 +734,32 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: const auto& compressionKind = writerOptions->compressionKind.value_or(common::CompressionKind::CompressionKind_SNAPPY); std::shared_ptr tableHandle; - if (useCudfTableHandle(splitInfos_)) { + if (useCudfTableHandle(splitInfos_) && veloxCfg_->get(kCudfEnableTableScan, kCudfEnableTableScanDefault) && + veloxCfg_->get(kCudfEnabled, kCudfEnabledDefault)) { #ifdef GLUTEN_ENABLE_GPU - tableHandle = std::make_shared( - kCudfHiveConnectorId, - makeCudfHiveInsertTableHandle( - tableColumnNames, /*inputType->names() clolumn name is different*/ - inputType->children(), - std::make_shared( - writePath, cudf_velox::connector::hive::LocationHandle::TableType::kNew, fileName), - compressionKind, - {}, - writerOptions)); + tableHandle = std::make_shared( + kCudfHiveConnectorId, + makeCudfHiveInsertTableHandle( + tableColumnNames, /*inputType->names() clolumn name is different*/ + inputType->children(), + std::make_shared( + writePath, cudf_velox::connector::hive::LocationHandle::TableType::kNew, fileName), + compressionKind, + {}, + writerOptions)); #endif } else { tableHandle = std::make_shared( - kHiveConnectorId, - makeHiveInsertTableHandle( - tableColumnNames, /*inputType->names() clolumn name is different*/ - inputType->children(), - partitionedKey, - bucketProperty, - makeLocationHandle(writePath, fileName, fileFormat, compressionKind, bucketProperty != nullptr), - writerOptions, - fileFormat, - compressionKind)); + kHiveConnectorId, + makeHiveInsertTableHandle( + tableColumnNames, /*inputType->names() clolumn name is different*/ + inputType->children(), + partitionedKey, + bucketProperty, + makeLocationHandle(writePath, fileName, fileFormat, compressionKind, bucketProperty != nullptr), + writerOptions, + fileFormat, + compressionKind)); } return std::make_shared( nextPlanNodeId(), @@ -1355,12 +1354,7 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: if (useCudfTableHandle(splitInfos_)) { #ifdef GLUTEN_ENABLE_GPU tableHandle = std::make_shared( - kCudfHiveConnectorId, - "cudf_hive_table", - filterPushdownEnabled, - nullptr, - remainingFilter, - dataColumns); + kCudfHiveConnectorId, "cudf_hive_table", filterPushdownEnabled, nullptr, remainingFilter, dataColumns); #endif } else { common::SubfieldFilters subfieldFilters; From a3eca4ca40e27b2cdf366a4732b79d19427f419f Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Wed, 24 Sep 2025 13:33:42 +0100 Subject: [PATCH 06/15] fix compile --- cpp/velox/compute/VeloxBackend.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/velox/compute/VeloxBackend.cc b/cpp/velox/compute/VeloxBackend.cc index b6c089c50bcf..54cb08bf572a 100644 --- a/cpp/velox/compute/VeloxBackend.cc +++ b/cpp/velox/compute/VeloxBackend.cc @@ -311,7 +311,7 @@ void VeloxBackend::initConnector(const std::shared_ptrget(kCudfEnableTableScan, kCudfEnableTableScanDefault) && backendConf_->get(kCudfEnabled, kCudfEnabledDefault)) { facebook::velox::cudf_velox::connector::hive::CudfHiveConnectorFactory factory; - auto hiveConnector = factory.newConnector(kCudfHiveConnectorId, config, ioExecutor_.get()); + auto hiveConnector = factory.newConnector(kCudfHiveConnectorId, hiveConf, ioExecutor_.get()); facebook::velox::connector::registerConnector(hiveConnector); } #endif From 3f37690e56639ea1eb67e8c70c6557c83fe63d1d Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Thu, 25 Sep 2025 14:20:50 +0100 Subject: [PATCH 07/15] fix --- .../apache/gluten/config/VeloxConfig.scala | 2 ++ .../extension/CudfNodeValidationRule.scala | 20 +++++++++++-------- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala index ab43af282f0a..cd26117c28dc 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala @@ -80,6 +80,8 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) { getConf(ENABLE_ENHANCED_FEATURES) def veloxPreferredBatchBytes: Long = getConf(COLUMNAR_VELOX_PREFERRED_BATCH_BYTES) + + def cudfEnableTableScan: Boolean = getConf(CUDF_ENABLE_TABLE_SCAN) } object VeloxConfig { diff --git a/backends-velox/src/main/scala/org/apache/gluten/extension/CudfNodeValidationRule.scala b/backends-velox/src/main/scala/org/apache/gluten/extension/CudfNodeValidationRule.scala index 9b63fb2c5b7e..aaac41850584 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/extension/CudfNodeValidationRule.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/extension/CudfNodeValidationRule.scala @@ -16,9 +16,8 @@ */ package org.apache.gluten.extension -import org.apache.gluten.config.GlutenConfig +import org.apache.gluten.config.{GlutenConfig, VeloxConfig} import org.apache.gluten.execution.{CudfTag, LeafTransformSupport, WholeStageTransformer} - import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.SparkPlan @@ -31,12 +30,17 @@ case class CudfNodeValidationRule(glutenConf: GlutenConfig) extends Rule[SparkPl } plan.transformUp { case transformer: WholeStageTransformer => - // Spark3.2 does not have exists - val hasLeaf = transformer.find { - case _: LeafTransformSupport => true - case _ => false - }.isDefined - transformer.setTagValue(CudfTag.CudfTag, !hasLeaf) + + if (!VeloxConfig.get.cudfEnableTableScan) { + // Spark3.2 does not have exists + val hasLeaf = transformer.find { + case _: LeafTransformSupport => true + case _ => false + }.isDefined + transformer.setTagValue(CudfTag.CudfTag, !hasLeaf) + } else { + transformer.setTagValue(CudfTag.CudfTag, true) + } transformer } } From b7fb13917f6079a6c94fb04a9ab9c8aa8e967135 Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Thu, 25 Sep 2025 14:30:55 +0100 Subject: [PATCH 08/15] fix build --- cpp/core/config/GlutenConfig.h | 2 -- cpp/velox/config/VeloxConfig.h | 3 ++- cpp/velox/substrait/SubstraitToVeloxPlan.cc | 14 ++------------ 3 files changed, 4 insertions(+), 15 deletions(-) diff --git a/cpp/core/config/GlutenConfig.h b/cpp/core/config/GlutenConfig.h index 0384622e61af..2f5992b9c0df 100644 --- a/cpp/core/config/GlutenConfig.h +++ b/cpp/core/config/GlutenConfig.h @@ -94,12 +94,10 @@ const std::string kSparkLegacyStatisticalAggregate = "spark.sql.legacy.statistic const std::string kSparkJsonIgnoreNullFields = "spark.sql.jsonGenerator.ignoreNullFields"; // cudf -#ifdef GLUTEN_ENABLE_GPU const std::string kCudfEnabled = "spark.gluten.sql.columnar.cudf"; const bool kCudfEnabledDefault = "true"; const std::string kDebugCudf = "spark.gluten.sql.debug.cudf"; const bool kDebugCudfDefault = "false"; -#endif std::unordered_map parseConfMap(JNIEnv* env, const uint8_t* planData, const int32_t planDataLength); diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h index a62e16c82afa..9362147ce5c5 100644 --- a/cpp/velox/config/VeloxConfig.h +++ b/cpp/velox/config/VeloxConfig.h @@ -180,9 +180,10 @@ const int32_t kCudfMemoryPercentDefault = 50; /// Preferred size of batches in bytes to be returned by operators. const std::string kVeloxPreferredBatchBytes = "spark.gluten.sql.columnar.backend.velox.preferredBatchBytes"; +/// cudf const std::string kCudfEnableTableScan = "spark.gluten.sql.columnar.backend.velox.cudf.enableTableScan"; const bool kCudfEnableTableScanDefault = false; - const std::string kCudfHiveConnectorId = "cudf-hive"; + } // namespace gluten diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index 9d96bcae51f4..e7fb5b3cda99 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -48,9 +48,7 @@ bool useCudfTableHandle(const std::vector>& splitInfo if (splitInfos.empty()) { return false; } - - return splitInfos[0]->canUseCudfConnector(); - + return splitInfos[0]->partitionColumns.empty() && splitInfos[0]->format == dwio::common::FileFormat::PARQUET; #else return false; #endif @@ -166,14 +164,6 @@ RowTypePtr getJoinOutputType( } // namespace -bool SplitInfo::canUseCudfConnector() { -#ifdef GLUTEN_ENABLE_GPU - return partitionColumns.empty() && format == dwio::common::FileFormat::PARQUET; -#else - return false; -#endif -} - core::PlanNodePtr SubstraitToVeloxPlanConverter::processEmit( const ::substrait::RelCommon& relCommon, const core::PlanNodePtr& noEmitNode) { @@ -736,7 +726,7 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: std::shared_ptr tableHandle; if (useCudfTableHandle(splitInfos_) && veloxCfg_->get(kCudfEnableTableScan, kCudfEnableTableScanDefault) && veloxCfg_->get(kCudfEnabled, kCudfEnabledDefault)) { -#ifdef GLUTEN_ENABLE_GPU + #ifdef GLUTEN_ENABLE_GPU tableHandle = std::make_shared( kCudfHiveConnectorId, makeCudfHiveInsertTableHandle( From a4a3edc0e0549ee69b7ddc9964d31cd85095289e Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Thu, 25 Sep 2025 14:39:37 +0100 Subject: [PATCH 09/15] fix code style --- .../extension/CudfNodeValidationRule.scala | 2 +- cpp/velox/CMakeLists.txt | 33 ++++++++++--------- cpp/velox/config/VeloxConfig.h | 1 - 3 files changed, 19 insertions(+), 17 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/extension/CudfNodeValidationRule.scala b/backends-velox/src/main/scala/org/apache/gluten/extension/CudfNodeValidationRule.scala index aaac41850584..20e819e21582 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/extension/CudfNodeValidationRule.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/extension/CudfNodeValidationRule.scala @@ -18,6 +18,7 @@ package org.apache.gluten.extension import org.apache.gluten.config.{GlutenConfig, VeloxConfig} import org.apache.gluten.execution.{CudfTag, LeafTransformSupport, WholeStageTransformer} + import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.SparkPlan @@ -30,7 +31,6 @@ case class CudfNodeValidationRule(glutenConf: GlutenConfig) extends Rule[SparkPl } plan.transformUp { case transformer: WholeStageTransformer => - if (!VeloxConfig.get.cudfEnableTableScan) { // Spark3.2 does not have exists val hasLeaf = transformer.find { diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt index 7992011582ae..e0907d400b45 100644 --- a/cpp/velox/CMakeLists.txt +++ b/cpp/velox/CMakeLists.txt @@ -416,23 +416,26 @@ if(ENABLE_GPU) ${VELOX_BUILD_PATH}/velox/experimental/cudf/vector/libvelox_cudf_vector.a) import_library( facebook::velox::velox_cudf_parquet_connector - ${VELOX_BUILD_PATH}/velox/experimental/cudf/connectors/hive/libvelox_cudf_hive_connector.a) + ${VELOX_BUILD_PATH}/velox/experimental/cudf/connectors/hive/libvelox_cudf_hive_connector.a + ) target_include_directories( velox - PRIVATE - ${VELOX_BUILD_PATH}/_deps/cudf-src/cpp/include - ${VELOX_BUILD_PATH}/_deps/rmm-src/cpp/include - ${VELOX_BUILD_PATH}/_deps/kvikio-src/cpp/include - ${VELOX_BUILD_PATH}/_deps/nvtx3-src/c/include - ${VELOX_BUILD_PATH}/_deps/nvcomp_proprietary_binary-src/include - ${VELOX_BUILD_PATH}/_deps/rapids_logger-src/include - /usr/local/cuda/include) - - target_compile_definitions(velox PRIVATE LIBCUDACXX_ENABLE_EXPERIMENTAL_MEMORY_RESOURCE) - - target_link_libraries(velox PUBLIC facebook::velox::velox_cudf_exec - facebook::velox::velox_cudf_vector - facebook::velox::velox_cudf_parquet_connector) + PRIVATE ${VELOX_BUILD_PATH}/_deps/cudf-src/cpp/include + ${VELOX_BUILD_PATH}/_deps/rmm-src/cpp/include + ${VELOX_BUILD_PATH}/_deps/kvikio-src/cpp/include + ${VELOX_BUILD_PATH}/_deps/nvtx3-src/c/include + ${VELOX_BUILD_PATH}/_deps/nvcomp_proprietary_binary-src/include + ${VELOX_BUILD_PATH}/_deps/rapids_logger-src/include + /usr/local/cuda/include) + + target_compile_definitions( + velox PRIVATE LIBCUDACXX_ENABLE_EXPERIMENTAL_MEMORY_RESOURCE) + + target_link_libraries( + velox + PUBLIC facebook::velox::velox_cudf_exec + facebook::velox::velox_cudf_vector + facebook::velox::velox_cudf_parquet_connector) target_link_libraries(velox PRIVATE ${VELOX_BUILD_PATH}/_deps/cudf-build/libcudf.so) endif() diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h index 9362147ce5c5..e74cc838862d 100644 --- a/cpp/velox/config/VeloxConfig.h +++ b/cpp/velox/config/VeloxConfig.h @@ -185,5 +185,4 @@ const std::string kCudfEnableTableScan = "spark.gluten.sql.columnar.backend.velo const bool kCudfEnableTableScanDefault = false; const std::string kCudfHiveConnectorId = "cudf-hive"; - } // namespace gluten From cf1335cad9afa02fada0ef8abc9c3821a659661d Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Mon, 29 Sep 2025 09:50:10 +0100 Subject: [PATCH 10/15] fix compile --- cpp/velox/CMakeLists.txt | 3 +-- cpp/velox/compute/WholeStageResultIterator.cc | 2 +- cpp/velox/substrait/SubstraitToVeloxPlan.cc | 2 +- cpp/velox/substrait/SubstraitToVeloxPlan.h | 4 +++- 4 files changed, 6 insertions(+), 5 deletions(-) diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt index e0907d400b45..510707fda565 100644 --- a/cpp/velox/CMakeLists.txt +++ b/cpp/velox/CMakeLists.txt @@ -433,8 +433,7 @@ if(ENABLE_GPU) target_link_libraries( velox - PUBLIC facebook::velox::velox_cudf_exec - facebook::velox::velox_cudf_vector + PUBLIC facebook::velox::velox_cudf_exec facebook::velox::velox_cudf_vector facebook::velox::velox_cudf_parquet_connector) target_link_libraries(velox PRIVATE ${VELOX_BUILD_PATH}/_deps/cudf-build/libcudf.so) diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index 9b2e9f00d505..660d81961760 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -24,7 +24,7 @@ #ifdef GLUTEN_ENABLE_GPU #include #include "velox/experimental/cudf/exec/ToCudf.h" -#include "velox/experimental/cudf/connectors/hive/HiveConnectorSplit.h" +#include "velox/experimental/cudf/connectors/hive/CudfHiveConnectorSplit.h" #endif using namespace facebook; diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index e7fb5b3cda99..6069640de7dd 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -48,7 +48,7 @@ bool useCudfTableHandle(const std::vector>& splitInfo if (splitInfos.empty()) { return false; } - return splitInfos[0]->partitionColumns.empty() && splitInfos[0]->format == dwio::common::FileFormat::PARQUET; + return splitInfos[0]->canUseCudfConnector(); #else return false; #endif diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.h b/cpp/velox/substrait/SubstraitToVeloxPlan.h index b00c3447f712..102083c6b103 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.h +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.h @@ -58,7 +58,9 @@ struct SplitInfo { /// Make SplitInfo polymorphic virtual ~SplitInfo() = default; - bool canUseCudfConnector(); + bool canUseCudfConnector() { + return partitionColumns.empty() && format == dwio::common::FileFormat::PARQUET; + } }; /// This class is used to convert the Substrait plan into Velox plan. From d2e393290b75a1dfaf86068ee4bd563757a4e1e2 Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Mon, 29 Sep 2025 09:53:05 +0100 Subject: [PATCH 11/15] fix compile --- cpp/velox/CMakeLists.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt index 510707fda565..741fa509b2e0 100644 --- a/cpp/velox/CMakeLists.txt +++ b/cpp/velox/CMakeLists.txt @@ -415,7 +415,7 @@ if(ENABLE_GPU) facebook::velox::velox_cudf_vector ${VELOX_BUILD_PATH}/velox/experimental/cudf/vector/libvelox_cudf_vector.a) import_library( - facebook::velox::velox_cudf_parquet_connector + facebook::velox::velox_cudf_hive_connector ${VELOX_BUILD_PATH}/velox/experimental/cudf/connectors/hive/libvelox_cudf_hive_connector.a ) target_include_directories( @@ -434,7 +434,7 @@ if(ENABLE_GPU) target_link_libraries( velox PUBLIC facebook::velox::velox_cudf_exec facebook::velox::velox_cudf_vector - facebook::velox::velox_cudf_parquet_connector) + facebook::velox::velox_cudf_hive_connector) target_link_libraries(velox PRIVATE ${VELOX_BUILD_PATH}/_deps/cudf-build/libcudf.so) endif() From df9405c0059035ed29dff2fec66f7a446e75ce9c Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Mon, 29 Sep 2025 12:12:55 +0100 Subject: [PATCH 12/15] fix compile --- cpp/velox/compute/WholeStageResultIterator.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index 660d81961760..45fe4895e005 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -23,6 +23,7 @@ #include "velox/exec/PlanNodeStats.h" #ifdef GLUTEN_ENABLE_GPU #include +#include #include "velox/experimental/cudf/exec/ToCudf.h" #include "velox/experimental/cudf/connectors/hive/CudfHiveConnectorSplit.h" #endif From 54ccb43a5e61cf2a8bd54dcaca2be8ec9bee0142 Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Mon, 29 Sep 2025 13:20:54 +0100 Subject: [PATCH 13/15] fix document --- docs/Configuration.md | 30 +++++++++++++++--------------- docs/velox-configuration.md | 1 + 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/docs/Configuration.md b/docs/Configuration.md index b7e725278a35..9aacdb377743 100644 --- a/docs/Configuration.md +++ b/docs/Configuration.md @@ -75,21 +75,21 @@ nav_order: 15 | spark.gluten.sql.columnar.maxBatchSize | 4096 | | spark.gluten.sql.columnar.overwriteByExpression | true | Enable or disable columnar v2 command overwrite by expression. | | spark.gluten.sql.columnar.parquet.write.blockSize | 128MB | -| spark.gluten.sql.columnar.partial.project | true | Break up one project node into 2 phases when some of the expressions are non offload-able. Phase one is a regular offloaded project transformer that evaluates the offload-able expressions in native, phase two preserves the output from phase one and evaluates the remaining non-offload-able expressions using vanilla Spark projections | -| spark.gluten.sql.columnar.partial.generate | true | evaluates the non-offload-able HiveUDTF using vanilla Spark generator | -| spark.gluten.sql.columnar.physicalJoinOptimizationLevel | 12 | Fallback to row operators if there are several continuous joins. | -| spark.gluten.sql.columnar.physicalJoinOptimizeEnable | false | Enable or disable columnar physicalJoinOptimize. | -| spark.gluten.sql.columnar.preferStreamingAggregate | true | Velox backend supports `StreamingAggregate`. `StreamingAggregate` uses the less memory as it does not need to hold all groups in memory, so it could avoid spill. When true and the child output ordering satisfies the grouping key then Gluten will choose `StreamingAggregate` as the native operator. | -| spark.gluten.sql.columnar.project | true | Enable or disable columnar project. | -| spark.gluten.sql.columnar.project.collapse | true | Combines two columnar project operators into one and perform alias substitution | -| spark.gluten.sql.columnar.query.fallback.threshold | -1 | The threshold for whether query will fall back by counting the number of ColumnarToRow & vanilla leaf node. | -| spark.gluten.sql.columnar.range | true | Enable or disable columnar range. | -| spark.gluten.sql.columnar.replaceData | true | Enable or disable columnar v2 command replace data. | -| spark.gluten.sql.columnar.scanOnly | false | When enabled, only scan and the filter after scan will be offloaded to native. | -| spark.gluten.sql.columnar.shuffle | true | Enable or disable columnar shuffle. | -| spark.gluten.sql.columnar.shuffle.celeborn.fallback.enabled | true | If enabled, fall back to ColumnarShuffleManager when celeborn service is unavailable.Otherwise, throw an exception. | -| spark.gluten.sql.columnar.shuffle.celeborn.useRssSort | true | If true, use RSS sort implementation for Celeborn sort-based shuffle.If false, use Gluten's row-based sort implementation. Only valid when `spark.celeborn.client.spark.shuffle.writer` is set to `sort`. | -| spark.gluten.sql.columnar.shuffle.codec | <undefined> | By default, the supported codecs are lz4 and zstd. When spark.gluten.sql.columnar.shuffle.codecBackend=qat,the supported codecs are gzip and zstd. | +| spark.gluten.sql.columnar.partial.generate | true | Evaluates the non-offload-able HiveUDTF using vanilla Spark generator | +| spark.gluten.sql.columnar.partial.project | true | Break up one project node into 2 phases when some of the expressions are non offload-able. Phase one is a regular offloaded project transformer that evaluates the offload-able expressions in native, phase two preserves the output from phase one and evaluates the remaining non-offload-able expressions using vanilla Spark projections | +| spark.gluten.sql.columnar.physicalJoinOptimizationLevel | 12 | Fallback to row operators if there are several continuous joins. | +| spark.gluten.sql.columnar.physicalJoinOptimizeEnable | false | Enable or disable columnar physicalJoinOptimize. | +| spark.gluten.sql.columnar.preferStreamingAggregate | true | Velox backend supports `StreamingAggregate`. `StreamingAggregate` uses the less memory as it does not need to hold all groups in memory, so it could avoid spill. When true and the child output ordering satisfies the grouping key then Gluten will choose `StreamingAggregate` as the native operator. | +| spark.gluten.sql.columnar.project | true | Enable or disable columnar project. | +| spark.gluten.sql.columnar.project.collapse | true | Combines two columnar project operators into one and perform alias substitution | +| spark.gluten.sql.columnar.query.fallback.threshold | -1 | The threshold for whether query will fall back by counting the number of ColumnarToRow & vanilla leaf node. | +| spark.gluten.sql.columnar.range | true | Enable or disable columnar range. | +| spark.gluten.sql.columnar.replaceData | true | Enable or disable columnar v2 command replace data. | +| spark.gluten.sql.columnar.scanOnly | false | When enabled, only scan and the filter after scan will be offloaded to native. | +| spark.gluten.sql.columnar.shuffle | true | Enable or disable columnar shuffle. | +| spark.gluten.sql.columnar.shuffle.celeborn.fallback.enabled | true | If enabled, fall back to ColumnarShuffleManager when celeborn service is unavailable.Otherwise, throw an exception. | +| spark.gluten.sql.columnar.shuffle.celeborn.useRssSort | true | If true, use RSS sort implementation for Celeborn sort-based shuffle.If false, use Gluten's row-based sort implementation. Only valid when `spark.celeborn.client.spark.shuffle.writer` is set to `sort`. | +| spark.gluten.sql.columnar.shuffle.codec | <undefined> | By default, the supported codecs are lz4 and zstd. When spark.gluten.sql.columnar.shuffle.codecBackend=qat,the supported codecs are gzip and zstd. | | spark.gluten.sql.columnar.shuffle.codecBackend | <undefined> | | spark.gluten.sql.columnar.shuffle.compression.threshold | 100 | If number of rows in a batch falls below this threshold, will copy all buffers into one buffer to compress. | | spark.gluten.sql.columnar.shuffle.dictionary.enabled | false | Enable dictionary in hash-based shuffle. | diff --git a/docs/velox-configuration.md b/docs/velox-configuration.md index b5724f24e899..78123bde3fe7 100644 --- a/docs/velox-configuration.md +++ b/docs/velox-configuration.md @@ -22,6 +22,7 @@ nav_order: 16 | spark.gluten.sql.columnar.backend.velox.cacheEnabled | false | Enable Velox cache, default off. It's recommended to enablesoft-affinity as well when enable velox cache. | | spark.gluten.sql.columnar.backend.velox.cachePrefetchMinPct | 0 | Set prefetch cache min pct for velox file scan | | spark.gluten.sql.columnar.backend.velox.checkUsageLeak | true | Enable check memory usage leak. | +| spark.gluten.sql.columnar.backend.velox.cudf.enableTableScan | false | Enable cudf table scan | | spark.gluten.sql.columnar.backend.velox.cudf.memoryPercent | 50 | The initial percent of GPU memory to allocate for memory resource for one thread. | | spark.gluten.sql.columnar.backend.velox.cudf.memoryResource | async | GPU RMM memory resource. | | spark.gluten.sql.columnar.backend.velox.directorySizeGuess | 32KB | Deprecated, rename to spark.gluten.sql.columnar.backend.velox.footerEstimatedSize | From 1080227f6940bd32b3f6e511bcaea2ff0a3abc69 Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Mon, 29 Sep 2025 14:10:09 +0100 Subject: [PATCH 14/15] fix compile --- cpp/velox/substrait/SubstraitToVeloxPlan.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index 6069640de7dd..2ba8333a50d9 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -102,7 +102,7 @@ EmitInfo getEmitInfo(const ::substrait::RelCommon& relCommon, const core::PlanNo RowTypePtr getJoinInputType(const core::PlanNodePtr& leftNode, const core::PlanNodePtr& rightNode) { auto outputSize = leftNode->outputType()->size() + rightNode->outputType()->size(); std::vector outputNames; - std::vector> outputTypes; + std::vector outputTypes; outputNames.reserve(outputSize); outputTypes.reserve(outputSize); for (const auto& node : {leftNode, rightNode}) { @@ -139,7 +139,7 @@ RowTypePtr getJoinOutputType( if (outputMayIncludeLeftColumns) { if (core::isLeftSemiProjectJoin(joinType)) { std::vector outputNames = leftNode->outputType()->names(); - std::vector> outputTypes = leftNode->outputType()->children(); + std::vector outputTypes = leftNode->outputType()->children(); outputNames.emplace_back("exists"); outputTypes.emplace_back(BOOLEAN()); return std::make_shared(std::move(outputNames), std::move(outputTypes)); @@ -151,7 +151,7 @@ RowTypePtr getJoinOutputType( if (outputMayIncludeRightColumns) { if (core::isRightSemiProjectJoin(joinType)) { std::vector outputNames = rightNode->outputType()->names(); - std::vector> outputTypes = rightNode->outputType()->children(); + std::vector outputTypes = rightNode->outputType()->children(); outputNames.emplace_back("exists"); outputTypes.emplace_back(BOOLEAN()); return std::make_shared(std::move(outputNames), std::move(outputTypes)); From 9a05c79106ef2375e05b79c551eda460352331f4 Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Mon, 29 Sep 2025 16:55:30 +0100 Subject: [PATCH 15/15] fix iceberg partitionValues must have value --- cpp/velox/compute/VeloxPlanConverter.cc | 4 +--- cpp/velox/substrait/SubstraitToVeloxPlan.cc | 17 +++++++++++++++++ cpp/velox/substrait/SubstraitToVeloxPlan.h | 4 +--- 3 files changed, 19 insertions(+), 6 deletions(-) diff --git a/cpp/velox/compute/VeloxPlanConverter.cc b/cpp/velox/compute/VeloxPlanConverter.cc index 6ef6a9b0992f..7b58584c344d 100644 --- a/cpp/velox/compute/VeloxPlanConverter.cc +++ b/cpp/velox/compute/VeloxPlanConverter.cc @@ -60,9 +60,7 @@ std::shared_ptr parseScanSplitInfo( for (const auto& partitionColumn : file.partition_columns()) { partitionColumnMap[partitionColumn.key()] = partitionColumn.value(); } - if (!partitionColumnMap.empty()) { - splitInfo->partitionColumns.emplace_back(partitionColumnMap); - } + splitInfo->partitionColumns.emplace_back(partitionColumnMap); std::unordered_map metadataColumnMap; for (const auto& metadataColumn : file.metadata_columns()) { diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index 2ba8333a50d9..6f32c5237ec9 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -164,6 +164,23 @@ RowTypePtr getJoinOutputType( } // namespace +bool SplitInfo::canUseCudfConnector() { + bool isEmpty = partitionColumns.empty(); + + if (!isEmpty) { + // Check if all maps are empty + bool allMapsEmpty = true; + for (const auto& m : partitionColumns) { + if (!m.empty()) { + allMapsEmpty = false; + break; + } + } + isEmpty = allMapsEmpty; + } + return isEmpty && format == dwio::common::FileFormat::PARQUET; +} + core::PlanNodePtr SubstraitToVeloxPlanConverter::processEmit( const ::substrait::RelCommon& relCommon, const core::PlanNodePtr& noEmitNode) { diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.h b/cpp/velox/substrait/SubstraitToVeloxPlan.h index 102083c6b103..b00c3447f712 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.h +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.h @@ -58,9 +58,7 @@ struct SplitInfo { /// Make SplitInfo polymorphic virtual ~SplitInfo() = default; - bool canUseCudfConnector() { - return partitionColumns.empty() && format == dwio::common::FileFormat::PARQUET; - } + bool canUseCudfConnector(); }; /// This class is used to convert the Substrait plan into Velox plan.