Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions cpp/velox/compute/VeloxBackend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ void veloxRuntimeReleaser(Runtime* runtime) {
void VeloxBackend::init(
std::unique_ptr<AllocationListener> listener,
const std::unordered_map<std::string, std::string>& conf) {
backendConf_ =
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string, std::string>(conf));
backendConf_ = std::make_shared<facebook::velox::config::ConfigBase>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mutable true make copy the configs when iterating the configs

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we generate the HiveConfig by veloxCfg_ in WholeStageResultIterator every time?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, here veloxCfg_ contains only session level configurations, it does not cover the configurations defined in spark.conf

std::unordered_map<std::string, std::string>(conf), true /*mutable*/);

globalMemoryManager_ = std::make_unique<VeloxMemoryManager>(kVeloxBackendKind, std::move(listener), *backendConf_);

Expand All @@ -104,7 +104,7 @@ void VeloxBackend::init(
Runtime::registerFactory(kVeloxBackendKind, veloxRuntimeFactory, veloxRuntimeReleaser);

if (backendConf_->get<bool>(kDebugModeEnabled, false)) {
LOG(INFO) << "VeloxBackend config:" << printConfig(backendConf_->rawConfigs());
LOG(INFO) << "VeloxBackend config:" << printConfig(backendConf_->rawConfigsCopy());
}

// Init glog and log level.
Expand Down Expand Up @@ -181,7 +181,6 @@ void VeloxBackend::init(
#endif

initJolFilesystem();
initConnector(hiveConf);

velox::dwio::common::registerFileSinks();
velox::parquet::registerParquetReaderFactory();
Expand Down Expand Up @@ -303,15 +302,23 @@ void VeloxBackend::initCache() {
}

void VeloxBackend::initConnector(const std::shared_ptr<velox::config::ConfigBase>& hiveConf) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we move the function to WholeStageResultIterator?

for (const auto& [key, value] : hiveConf->rawConfigs()) {
// always update to use new session level conf
backendConf_->set(key, value);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the initConnector is only called from WholeStageResultIterator, is it possible to remove the filesystem related configurations from the backend conf?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Users may pass/modify the filesystem related configurations at session level, by setting the keys here it will allow us to pick the latest values

}
auto newConf = getHiveConfig(backendConf_);

auto ioThreads = backendConf_->get<int32_t>(kVeloxIOThreads, kVeloxIOThreadsDefault);
GLUTEN_CHECK(
ioThreads >= 0,
kVeloxIOThreads + " was set to negative number " + std::to_string(ioThreads) + ", this should not happen.");
if (ioThreads > 0) {
ioExecutor_ = std::make_unique<folly::IOThreadPoolExecutor>(ioThreads);
}
velox::connector::registerConnector(
std::make_shared<velox::connector::hive::HiveConnector>(kHiveConnectorId, hiveConf, ioExecutor_.get()));
auto hiveConnector =
std::make_shared<velox::connector::hive::HiveConnector>(kHiveConnectorId, newConf, ioExecutor_.get());
velox::connector::unregisterConnector(kHiveConnectorId);
velox::connector::registerConnector(hiveConnector);
#ifdef GLUTEN_ENABLE_GPU
if (backendConf_->get<bool>(kCudfEnableTableScan, kCudfEnableTableScanDefault) &&
backendConf_->get<bool>(kCudfEnabled, kCudfEnabledDefault)) {
Expand Down
6 changes: 5 additions & 1 deletion cpp/velox/compute/VeloxBackend.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ class VeloxBackend {

void tearDown();

void initConnector(const std::shared_ptr<facebook::velox::config::ConfigBase>& hiveConf);
std::mutex registerMutex;
std::atomic<bool> alreadyRegistered{false};
std::shared_ptr<facebook::velox::config::ConfigBase> lastSessionConf;

private:
explicit VeloxBackend(
std::unique_ptr<AllocationListener> listener,
Expand All @@ -67,7 +72,6 @@ class VeloxBackend {

void init(std::unique_ptr<AllocationListener> listener, const std::unordered_map<std::string, std::string>& conf);
void initCache();
void initConnector(const std::shared_ptr<facebook::velox::config::ConfigBase>& hiveConf);
void initUdf();
std::unique_ptr<facebook::velox::cache::SsdCache> initSsdCache(uint64_t ssdSize);

Expand Down
24 changes: 24 additions & 0 deletions cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ WholeStageResultIterator::WholeStageResultIterator(
}
getOrderedNodeIds(veloxPlan_, orderedNodeIds_);

// register the hive connectors
doRegister(std::make_shared<facebook::velox::config::ConfigBase>(veloxCfg_->rawConfigsCopy()));

auto fileSystem = velox::filesystems::getFileSystem(spillDir, nullptr);
GLUTEN_CHECK(fileSystem != nullptr, "File System for spilling is null!");
fileSystem->mkdir(spillDir);
Expand All @@ -100,6 +103,7 @@ WholeStageResultIterator::WholeStageResultIterator(
std::unordered_set<velox::core::PlanNodeId> emptySet;
velox::core::PlanFragment planFragment{planNode, velox::core::ExecutionStrategy::kUngrouped, 1, emptySet};
std::shared_ptr<velox::core::QueryCtx> queryCtx = createNewVeloxQueryCtx();

task_ = velox::exec::Task::create(
fmt::format(
"Gluten_Stage_{}_TID_{}_VTID_{}",
Expand Down Expand Up @@ -689,4 +693,24 @@ std::shared_ptr<velox::config::ConfigBase> WholeStageResultIterator::createConne
return std::make_shared<velox::config::ConfigBase>(std::move(configs));
}

void WholeStageResultIterator::doRegister(const std::shared_ptr<facebook::velox::config::ConfigBase>& veloxCfg) {
std::lock_guard<std::mutex> l(gluten::VeloxBackend::get()->registerMutex);
if (gluten::VeloxBackend::get()->lastSessionConf == nullptr) {
if (!gluten::VeloxBackend::get()->alreadyRegistered) {
gluten::VeloxBackend::get()->initConnector(veloxCfg);
gluten::VeloxBackend::get()->lastSessionConf = veloxCfg;
gluten::VeloxBackend::get()->alreadyRegistered = true;
}
return;
}
if (gluten::VeloxBackend::get()->lastSessionConf != nullptr &&
gluten::VeloxBackend::get()->lastSessionConf->rawConfigs() != veloxCfg->rawConfigs()) {
if (!gluten::VeloxBackend::get()->alreadyRegistered) {
gluten::VeloxBackend::get()->initConnector(veloxCfg);
gluten::VeloxBackend::get()->lastSessionConf = veloxCfg;
gluten::VeloxBackend::get()->alreadyRegistered = true;
}
}
}

} // namespace gluten
7 changes: 7 additions & 0 deletions cpp/velox/compute/WholeStageResultIterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
#pragma once

#include "VeloxBackend.h"
#include "compute/Runtime.h"
#include "iceberg/IcebergPlanConverter.h"
#include "memory/ColumnarBatchIterator.h"
Expand Down Expand Up @@ -50,6 +51,10 @@ class WholeStageResultIterator : public ColumnarBatchIterator {
// calling .wait() may take no effect in single thread execution mode
task_->requestCancel().wait();
}
{
std::lock_guard<std::mutex> l(gluten::VeloxBackend::get()->registerMutex);
gluten::VeloxBackend::get()->alreadyRegistered = false;
}
#ifdef GLUTEN_ENABLE_GPU
if (enableCudf_) {
unlockGpu();
Expand Down Expand Up @@ -128,6 +133,8 @@ class WholeStageResultIterator : public ColumnarBatchIterator {
/// Metrics
std::unique_ptr<Metrics> metrics_{};

void doRegister(const std::shared_ptr<facebook::velox::config::ConfigBase>& veloxCfg);

/// All the children plan node ids with postorder traversal.
std::vector<facebook::velox::core::PlanNodeId> orderedNodeIds_;

Expand Down
2 changes: 1 addition & 1 deletion cpp/velox/jni/VeloxJniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -821,7 +821,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_execution_IcebergWriteJniWrapper_
JNI_METHOD_START
auto ctx = getRuntime(env, wrapper);
auto runtime = dynamic_cast<VeloxRuntime*>(ctx);
auto backendConf = VeloxBackend::get()->getBackendConf()->rawConfigs();
auto backendConf = VeloxBackend::get()->getBackendConf()->rawConfigsCopy();
auto sparkConf = ctx->getConfMap();
sparkConf.merge(backendConf);
auto safeArray = gluten::getByteArrayElementsSafe(env, partition);
Expand Down
4 changes: 2 additions & 2 deletions cpp/velox/utils/ConfigExtractor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ void getS3HiveConfig(
};

// Convert all Spark bucket configs to Velox bucket configs.
for (const auto& [key, value] : conf->rawConfigs()) {
for (const auto& [key, value] : conf->rawConfigsCopy()) {
if (key.find(kSparkHadoopS3BucketPrefix) == 0) {
std::string_view skey = key;
auto remaining = skey.substr(kSparkHadoopS3BucketPrefix.size());
Expand Down Expand Up @@ -207,7 +207,7 @@ void getAbfsHiveConfig(
#ifdef ENABLE_ABFS
std::string_view kSparkHadoopPrefix = "spark.hadoop.";
std::string_view kSparkHadoopAbfsPrefix = "spark.hadoop.fs.azure.";
for (const auto& [key, value] : conf->rawConfigs()) {
for (const auto& [key, value] : conf->rawConfigsCopy()) {
if (key.find(kSparkHadoopAbfsPrefix) == 0) {
// Remove the SparkHadoopPrefix
hiveConfMap[key.substr(kSparkHadoopPrefix.size())] = value;
Expand Down
2 changes: 1 addition & 1 deletion cpp/velox/utils/VeloxWholeStageDumper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ VeloxWholeStageDumper::VeloxWholeStageDumper(
: taskInfo_(taskInfo), saveDir_(saveDir), batchSize_(batchSize), pool_(aggregatePool) {}

void VeloxWholeStageDumper::dumpConf(const std::unordered_map<std::string, std::string>& confMap) {
const auto& backendConfMap = VeloxBackend::get()->getBackendConf()->rawConfigs();
const auto& backendConfMap = VeloxBackend::get()->getBackendConf()->rawConfigsCopy();
auto allConfMap = backendConfMap;

for (const auto& pair : confMap) {
Expand Down
Loading