Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) {
getConf(ENABLE_ENHANCED_FEATURES)

def veloxPreferredBatchBytes: Long = getConf(COLUMNAR_VELOX_PREFERRED_BATCH_BYTES)

def cudfEnableTableScan: Boolean = getConf(CUDF_ENABLE_TABLE_SCAN)
}

object VeloxConfig {
Expand Down Expand Up @@ -609,6 +611,12 @@ object VeloxConfig {
.intConf
.createWithDefault(50)

val CUDF_ENABLE_TABLE_SCAN =
buildStaticConf("spark.gluten.sql.columnar.backend.velox.cudf.enableTableScan")
.doc("Enable cudf table scan")
.booleanConf
.createWithDefault(false)

val MEMORY_DUMP_ON_EXIT =
buildConf("spark.gluten.monitor.memoryDumpOnExit")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.gluten.extension

import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.config.{GlutenConfig, VeloxConfig}
import org.apache.gluten.execution.{CudfTag, LeafTransformSupport, WholeStageTransformer}

import org.apache.spark.sql.catalyst.rules.Rule
Expand All @@ -31,12 +31,16 @@ case class CudfNodeValidationRule(glutenConf: GlutenConfig) extends Rule[SparkPl
}
plan.transformUp {
case transformer: WholeStageTransformer =>
// Spark3.2 does not have exists
val hasLeaf = transformer.find {
case _: LeafTransformSupport => true
case _ => false
}.isDefined
transformer.setTagValue(CudfTag.CudfTag, !hasLeaf)
if (!VeloxConfig.get.cudfEnableTableScan) {
// Spark3.2 does not have exists
val hasLeaf = transformer.find {
case _: LeafTransformSupport => true
case _ => false
}.isDefined
transformer.setTagValue(CudfTag.CudfTag, !hasLeaf)
} else {
transformer.setTagValue(CudfTag.CudfTag, true)
}
transformer
}
}
Expand Down
2 changes: 0 additions & 2 deletions cpp/core/config/GlutenConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,10 @@ const std::string kSparkLegacyStatisticalAggregate = "spark.sql.legacy.statistic
const std::string kSparkJsonIgnoreNullFields = "spark.sql.jsonGenerator.ignoreNullFields";

// cudf
#ifdef GLUTEN_ENABLE_GPU
const std::string kCudfEnabled = "spark.gluten.sql.columnar.cudf";
const bool kCudfEnabledDefault = "true";
const std::string kDebugCudf = "spark.gluten.sql.debug.cudf";
const bool kDebugCudfDefault = "false";
#endif

std::unordered_map<std::string, std::string>
parseConfMap(JNIEnv* env, const uint8_t* planData, const int32_t planDataLength);
Expand Down
22 changes: 20 additions & 2 deletions cpp/velox/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -414,9 +414,27 @@ if(ENABLE_GPU)
import_library(
facebook::velox::velox_cudf_vector
${VELOX_BUILD_PATH}/velox/experimental/cudf/vector/libvelox_cudf_vector.a)
import_library(
facebook::velox::velox_cudf_hive_connector
${VELOX_BUILD_PATH}/velox/experimental/cudf/connectors/hive/libvelox_cudf_hive_connector.a
)
target_include_directories(
velox
PRIVATE ${VELOX_BUILD_PATH}/_deps/cudf-src/cpp/include
${VELOX_BUILD_PATH}/_deps/rmm-src/cpp/include
${VELOX_BUILD_PATH}/_deps/kvikio-src/cpp/include
${VELOX_BUILD_PATH}/_deps/nvtx3-src/c/include
${VELOX_BUILD_PATH}/_deps/nvcomp_proprietary_binary-src/include
${VELOX_BUILD_PATH}/_deps/rapids_logger-src/include
/usr/local/cuda/include)

target_compile_definitions(
velox PRIVATE LIBCUDACXX_ENABLE_EXPERIMENTAL_MEMORY_RESOURCE)

target_link_libraries(velox PUBLIC facebook::velox::velox_cudf_exec
facebook::velox::velox_cudf_vector)
target_link_libraries(
velox
PUBLIC facebook::velox::velox_cudf_exec facebook::velox::velox_cudf_vector
facebook::velox::velox_cudf_hive_connector)
target_link_libraries(velox
PRIVATE ${VELOX_BUILD_PATH}/_deps/cudf-build/libcudf.so)
endif()
Expand Down
9 changes: 9 additions & 0 deletions cpp/velox/compute/VeloxBackend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "utils/qat/QatCodec.h"
#endif
#ifdef GLUTEN_ENABLE_GPU
#include "velox/experimental/cudf/connectors/hive/CudfHiveConnector.h"
#include "velox/experimental/cudf/exec/ToCudf.h"
#endif

Expand Down Expand Up @@ -306,6 +307,14 @@ void VeloxBackend::initConnector(const std::shared_ptr<velox::config::ConfigBase
}
velox::connector::registerConnector(
std::make_shared<velox::connector::hive::HiveConnector>(kHiveConnectorId, hiveConf, ioExecutor_.get()));
#ifdef GLUTEN_ENABLE_GPU
if (backendConf_->get<bool>(kCudfEnableTableScan, kCudfEnableTableScanDefault) &&
backendConf_->get<bool>(kCudfEnabled, kCudfEnabledDefault)) {
facebook::velox::cudf_velox::connector::hive::CudfHiveConnectorFactory factory;
auto hiveConnector = factory.newConnector(kCudfHiveConnectorId, hiveConf, ioExecutor_.get());
facebook::velox::connector::registerConnector(hiveConnector);
}
#endif
}

void VeloxBackend::initUdf() {
Expand Down
21 changes: 18 additions & 3 deletions cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
#include "velox/exec/PlanNodeStats.h"
#ifdef GLUTEN_ENABLE_GPU
#include <mutex>
#include <cudf/io/types.hpp>
#include "velox/experimental/cudf/exec/ToCudf.h"
#include "velox/experimental/cudf/connectors/hive/CudfHiveConnectorSplit.h"
#endif

using namespace facebook;
Expand Down Expand Up @@ -131,14 +133,19 @@ WholeStageResultIterator::WholeStageResultIterator(
const auto& format = scanInfo->format;
const auto& partitionColumns = scanInfo->partitionColumns;
const auto& metadataColumns = scanInfo->metadataColumns;
// Under the pre-condition that all the split infos has same partition column and format.
const auto canUseCudfConnector = scanInfo->canUseCudfConnector();

std::vector<std::shared_ptr<velox::connector::ConnectorSplit>> connectorSplits;
connectorSplits.reserve(paths.size());
for (int idx = 0; idx < paths.size(); idx++) {
auto partitionColumn = partitionColumns[idx];
auto metadataColumn = metadataColumns[idx];
std::unordered_map<std::string, std::optional<std::string>> partitionKeys;
constructPartitionColumns(partitionKeys, partitionColumn);
if (!partitionColumns.empty()) {
auto partitionColumn = partitionColumns[idx];
constructPartitionColumns(partitionKeys, partitionColumn);
}

std::shared_ptr<velox::connector::ConnectorSplit> split;
if (auto icebergSplitInfo = std::dynamic_pointer_cast<IcebergSplitInfo>(scanInfo)) {
// Set Iceberg split.
Expand All @@ -159,8 +166,16 @@ WholeStageResultIterator::WholeStageResultIterator(
std::unordered_map<std::string, std::string>(),
properties[idx]);
} else {
auto connectorId = kHiveConnectorId;
#ifdef GLUTEN_ENABLE_GPU
if (canUseCudfConnector) {
connectorId = kCudfHiveConnectorId;
VELOX_CHECK_EQ(starts[idx], 0, "Not support split file");
VELOX_CHECK_EQ(lengths[idx], scanInfo->properties[idx]->fileSize, "Not support split file");
}
#endif
split = std::make_shared<velox::connector::hive::HiveConnectorSplit>(
kHiveConnectorId,
connectorId,
paths[idx],
format,
starts[idx],
Expand Down
5 changes: 5 additions & 0 deletions cpp/velox/config/VeloxConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,4 +180,9 @@ const int32_t kCudfMemoryPercentDefault = 50;
/// Preferred size of batches in bytes to be returned by operators.
const std::string kVeloxPreferredBatchBytes = "spark.gluten.sql.columnar.backend.velox.preferredBatchBytes";

/// cudf
const std::string kCudfEnableTableScan = "spark.gluten.sql.columnar.backend.velox.cudf.enableTableScan";
const bool kCudfEnableTableScanDefault = false;
const std::string kCudfHiveConnectorId = "cudf-hive";

} // namespace gluten
142 changes: 111 additions & 31 deletions cpp/velox/substrait/SubstraitToVeloxPlan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,29 @@
#include "config/GlutenConfig.h"
#include "config/VeloxConfig.h"

#ifdef GLUTEN_ENABLE_GPU
#include "velox/experimental/cudf/connectors/hive/CudfHiveDataSink.h"
#include "velox/experimental/cudf/connectors/hive/CudfHiveTableHandle.h"
#include "velox/experimental/cudf/exec/ToCudf.h"
#include "velox/experimental/cudf/exec/VeloxCudfInterop.h"

using namespace cudf_velox::connector::hive;
#endif

namespace gluten {
namespace {

bool useCudfTableHandle(const std::vector<std::shared_ptr<SplitInfo>>& splitInfos) {
#ifdef GLUTEN_ENABLE_GPU
if (splitInfos.empty()) {
return false;
}
return splitInfos[0]->canUseCudfConnector();
#else
return false;
#endif
}

core::SortOrder toSortOrder(const ::substrait::SortField& sortField) {
switch (sortField.direction()) {
case ::substrait::SortField_SortDirection_SORT_DIRECTION_ASC_NULLS_FIRST:
Expand Down Expand Up @@ -82,7 +102,7 @@ EmitInfo getEmitInfo(const ::substrait::RelCommon& relCommon, const core::PlanNo
RowTypePtr getJoinInputType(const core::PlanNodePtr& leftNode, const core::PlanNodePtr& rightNode) {
auto outputSize = leftNode->outputType()->size() + rightNode->outputType()->size();
std::vector<std::string> outputNames;
std::vector<std::shared_ptr<const Type>> outputTypes;
std::vector<TypePtr> outputTypes;
outputNames.reserve(outputSize);
outputTypes.reserve(outputSize);
for (const auto& node : {leftNode, rightNode}) {
Expand Down Expand Up @@ -119,7 +139,7 @@ RowTypePtr getJoinOutputType(
if (outputMayIncludeLeftColumns) {
if (core::isLeftSemiProjectJoin(joinType)) {
std::vector<std::string> outputNames = leftNode->outputType()->names();
std::vector<std::shared_ptr<const Type>> outputTypes = leftNode->outputType()->children();
std::vector<TypePtr> outputTypes = leftNode->outputType()->children();
outputNames.emplace_back("exists");
outputTypes.emplace_back(BOOLEAN());
return std::make_shared<const RowType>(std::move(outputNames), std::move(outputTypes));
Expand All @@ -131,7 +151,7 @@ RowTypePtr getJoinOutputType(
if (outputMayIncludeRightColumns) {
if (core::isRightSemiProjectJoin(joinType)) {
std::vector<std::string> outputNames = rightNode->outputType()->names();
std::vector<std::shared_ptr<const Type>> outputTypes = rightNode->outputType()->children();
std::vector<TypePtr> outputTypes = rightNode->outputType()->children();
outputNames.emplace_back("exists");
outputTypes.emplace_back(BOOLEAN());
return std::make_shared<const RowType>(std::move(outputNames), std::move(outputTypes));
Expand All @@ -144,6 +164,23 @@ RowTypePtr getJoinOutputType(

} // namespace

bool SplitInfo::canUseCudfConnector() {
bool isEmpty = partitionColumns.empty();

if (!isEmpty) {
// Check if all maps are empty
bool allMapsEmpty = true;
for (const auto& m : partitionColumns) {
if (!m.empty()) {
allMapsEmpty = false;
break;
}
}
isEmpty = allMapsEmpty;
}
return isEmpty && format == dwio::common::FileFormat::PARQUET;
}

core::PlanNodePtr SubstraitToVeloxPlanConverter::processEmit(
const ::substrait::RelCommon& relCommon,
const core::PlanNodePtr& noEmitNode) {
Expand Down Expand Up @@ -559,17 +596,19 @@ std::shared_ptr<connector::hive::HiveInsertTableHandle> makeHiveInsertTableHandl
}
if (std::find(partitionedBy.cbegin(), partitionedBy.cend(), tableColumnNames.at(i)) != partitionedBy.cend()) {
++numPartitionColumns;
columnHandles.emplace_back(std::make_shared<connector::hive::HiveColumnHandle>(
tableColumnNames.at(i),
connector::hive::HiveColumnHandle::ColumnType::kPartitionKey,
tableColumnTypes.at(i),
tableColumnTypes.at(i)));
columnHandles.emplace_back(
std::make_shared<connector::hive::HiveColumnHandle>(
tableColumnNames.at(i),
connector::hive::HiveColumnHandle::ColumnType::kPartitionKey,
tableColumnTypes.at(i),
tableColumnTypes.at(i)));
} else {
columnHandles.emplace_back(std::make_shared<connector::hive::HiveColumnHandle>(
tableColumnNames.at(i),
connector::hive::HiveColumnHandle::ColumnType::kRegular,
tableColumnTypes.at(i),
tableColumnTypes.at(i)));
columnHandles.emplace_back(
std::make_shared<connector::hive::HiveColumnHandle>(
tableColumnNames.at(i),
connector::hive::HiveColumnHandle::ColumnType::kRegular,
tableColumnTypes.at(i),
tableColumnTypes.at(i)));
}
}
VELOX_CHECK_EQ(numPartitionColumns, partitionedBy.size());
Expand All @@ -585,6 +624,29 @@ std::shared_ptr<connector::hive::HiveInsertTableHandle> makeHiveInsertTableHandl
writerOptions);
}

#ifdef GLUTEN_ENABLE_GPU
std::shared_ptr<CudfHiveInsertTableHandle> makeCudfHiveInsertTableHandle(
const std::vector<std::string>& tableColumnNames,
const std::vector<TypePtr>& tableColumnTypes,
std::shared_ptr<cudf_velox::connector::hive::LocationHandle> locationHandle,
const std::optional<common::CompressionKind> compressionKind,
const std::unordered_map<std::string, std::string>& serdeParameters,
const std::shared_ptr<dwio::common::WriterOptions>& writerOptions) {
std::vector<std::shared_ptr<const CudfHiveColumnHandle>> columnHandles;

for (int i = 0; i < tableColumnNames.size(); ++i) {
columnHandles.push_back(
std::make_shared<CudfHiveColumnHandle>(
tableColumnNames.at(i),
tableColumnTypes.at(i),
cudf::data_type{cudf_velox::veloxToCudfTypeId(tableColumnTypes.at(i))}));
}

return std::make_shared<CudfHiveInsertTableHandle>(
columnHandles, locationHandle, compressionKind, serdeParameters, writerOptions);
}
#endif

core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::WriteRel& writeRel) {
core::PlanNodePtr childNode;
if (writeRel.has_input()) {
Expand Down Expand Up @@ -678,23 +740,40 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
// Spark's default compression code is snappy.
const auto& compressionKind =
writerOptions->compressionKind.value_or(common::CompressionKind::CompressionKind_SNAPPY);

std::shared_ptr<core::InsertTableHandle> tableHandle;
if (useCudfTableHandle(splitInfos_) && veloxCfg_->get<bool>(kCudfEnableTableScan, kCudfEnableTableScanDefault) &&
veloxCfg_->get<bool>(kCudfEnabled, kCudfEnabledDefault)) {
#ifdef GLUTEN_ENABLE_GPU
tableHandle = std::make_shared<core::InsertTableHandle>(
kCudfHiveConnectorId,
makeCudfHiveInsertTableHandle(
tableColumnNames, /*inputType->names() clolumn name is different*/
inputType->children(),
std::make_shared<cudf_velox::connector::hive::LocationHandle>(
writePath, cudf_velox::connector::hive::LocationHandle::TableType::kNew, fileName),
compressionKind,
{},
writerOptions));
#endif
} else {
tableHandle = std::make_shared<core::InsertTableHandle>(
kHiveConnectorId,
makeHiveInsertTableHandle(
tableColumnNames, /*inputType->names() clolumn name is different*/
inputType->children(),
partitionedKey,
bucketProperty,
makeLocationHandle(writePath, fileName, fileFormat, compressionKind, bucketProperty != nullptr),
writerOptions,
fileFormat,
compressionKind));
}
return std::make_shared<core::TableWriteNode>(
nextPlanNodeId(),
inputType,
tableColumnNames,
std::nullopt, /*columnStatsSpec*/
std::make_shared<core::InsertTableHandle>(
kHiveConnectorId,
makeHiveInsertTableHandle(
tableColumnNames, /*inputType->names() clolumn name is different*/
inputType->children(),
partitionedKey,
bucketProperty,
makeLocationHandle(writePath, fileName, fileFormat, compressionKind, bucketProperty != nullptr),
writerOptions,
fileFormat,
compressionKind)),
tableHandle,
(!partitionedKey.empty()),
exec::TableWriteTraits::outputType(std::nullopt),
connector::CommitStrategy::kNoCommit,
Expand Down Expand Up @@ -1277,14 +1356,15 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
auto names = colNameList;
auto types = veloxTypeList;
auto dataColumns = ROW(std::move(names), std::move(types));
std::shared_ptr<connector::hive::HiveTableHandle> tableHandle;
if (!readRel.has_filter()) {
tableHandle = std::make_shared<connector::hive::HiveTableHandle>(
kHiveConnectorId, "hive_table", filterPushdownEnabled, common::SubfieldFilters{}, nullptr, dataColumns);
connector::ConnectorTableHandlePtr tableHandle;
auto remainingFilter = readRel.has_filter() ? exprConverter_->toVeloxExpr(readRel.filter(), dataColumns) : nullptr;
if (useCudfTableHandle(splitInfos_)) {
#ifdef GLUTEN_ENABLE_GPU
tableHandle = std::make_shared<CudfHiveTableHandle>(
kCudfHiveConnectorId, "cudf_hive_table", filterPushdownEnabled, nullptr, remainingFilter, dataColumns);
#endif
} else {
common::SubfieldFilters subfieldFilters;
auto remainingFilter = exprConverter_->toVeloxExpr(readRel.filter(), dataColumns);

tableHandle = std::make_shared<connector::hive::HiveTableHandle>(
kHiveConnectorId,
"hive_table",
Expand Down
Loading