From 701e64c7a79589c4359461d4bca89f5c6f37533e Mon Sep 17 00:00:00 2001 From: Yuan Date: Fri, 29 Aug 2025 13:49:20 +0100 Subject: [PATCH] [VL] Refactor hive connector register to accept session level config Signed-off-by: Yuan remove duplicated call Signed-off-by: Yuan revert to use inmutable config map Signed-off-by: Yuan fix to use dynamic config Signed-off-by: Yuan fix return config copy Signed-off-by: Yuan unregister hive connector before register Signed-off-by: Yuan register io connector before visit filesystem Signed-off-by: Yuan refactor Signed-off-by: Yuan Revert "refactor" This reverts commit 007472ec479ec433c387ede6f8c012a20638f591. Revert "Revert "refactor"" This reverts commit 4d2acca16d05780659fce21c305a5b1edc995252. omit register if same session config detected Signed-off-by: Yuan --- cpp/velox/compute/VeloxBackend.cc | 19 ++++++++++----- cpp/velox/compute/VeloxBackend.h | 6 ++++- cpp/velox/compute/WholeStageResultIterator.cc | 24 +++++++++++++++++++ cpp/velox/compute/WholeStageResultIterator.h | 7 ++++++ cpp/velox/jni/VeloxJniWrapper.cc | 2 +- cpp/velox/utils/ConfigExtractor.cc | 4 ++-- cpp/velox/utils/VeloxWholeStageDumper.cc | 2 +- 7 files changed, 53 insertions(+), 11 deletions(-) diff --git a/cpp/velox/compute/VeloxBackend.cc b/cpp/velox/compute/VeloxBackend.cc index 8e54762fc01a..fd9c13530ea0 100644 --- a/cpp/velox/compute/VeloxBackend.cc +++ b/cpp/velox/compute/VeloxBackend.cc @@ -94,8 +94,8 @@ void veloxRuntimeReleaser(Runtime* runtime) { void VeloxBackend::init( std::unique_ptr listener, const std::unordered_map& conf) { - backendConf_ = - std::make_shared(std::unordered_map(conf)); + backendConf_ = std::make_shared( + std::unordered_map(conf), true /*mutable*/); globalMemoryManager_ = std::make_unique(kVeloxBackendKind, std::move(listener), *backendConf_); @@ -104,7 +104,7 @@ void VeloxBackend::init( Runtime::registerFactory(kVeloxBackendKind, veloxRuntimeFactory, veloxRuntimeReleaser); if (backendConf_->get(kDebugModeEnabled, false)) { - LOG(INFO) << "VeloxBackend config:" << printConfig(backendConf_->rawConfigs()); + LOG(INFO) << "VeloxBackend config:" << printConfig(backendConf_->rawConfigsCopy()); } // Init glog and log level. @@ -181,7 +181,6 @@ void VeloxBackend::init( #endif initJolFilesystem(); - initConnector(hiveConf); velox::dwio::common::registerFileSinks(); velox::parquet::registerParquetReaderFactory(); @@ -303,6 +302,12 @@ void VeloxBackend::initCache() { } void VeloxBackend::initConnector(const std::shared_ptr& hiveConf) { + for (const auto& [key, value] : hiveConf->rawConfigs()) { + // always update to use new session level conf + backendConf_->set(key, value); + } + auto newConf = getHiveConfig(backendConf_); + auto ioThreads = backendConf_->get(kVeloxIOThreads, kVeloxIOThreadsDefault); GLUTEN_CHECK( ioThreads >= 0, @@ -310,8 +315,10 @@ void VeloxBackend::initConnector(const std::shared_ptr 0) { ioExecutor_ = std::make_unique(ioThreads); } - velox::connector::registerConnector( - std::make_shared(kHiveConnectorId, hiveConf, ioExecutor_.get())); + auto hiveConnector = + std::make_shared(kHiveConnectorId, newConf, ioExecutor_.get()); + velox::connector::unregisterConnector(kHiveConnectorId); + velox::connector::registerConnector(hiveConnector); #ifdef GLUTEN_ENABLE_GPU if (backendConf_->get(kCudfEnableTableScan, kCudfEnableTableScanDefault) && backendConf_->get(kCudfEnabled, kCudfEnabledDefault)) { diff --git a/cpp/velox/compute/VeloxBackend.h b/cpp/velox/compute/VeloxBackend.h index 94e7ec93fba0..9a62d59d44ba 100644 --- a/cpp/velox/compute/VeloxBackend.h +++ b/cpp/velox/compute/VeloxBackend.h @@ -58,6 +58,11 @@ class VeloxBackend { void tearDown(); + void initConnector(const std::shared_ptr& hiveConf); + std::mutex registerMutex; + std::atomic alreadyRegistered{false}; + std::shared_ptr lastSessionConf; + private: explicit VeloxBackend( std::unique_ptr listener, @@ -67,7 +72,6 @@ class VeloxBackend { void init(std::unique_ptr listener, const std::unordered_map& conf); void initCache(); - void initConnector(const std::shared_ptr& hiveConf); void initUdf(); std::unique_ptr initSsdCache(uint64_t ssdSize); diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index 16a865ac312f..65b68d5abcec 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -90,6 +90,9 @@ WholeStageResultIterator::WholeStageResultIterator( } getOrderedNodeIds(veloxPlan_, orderedNodeIds_); + // register the hive connectors + doRegister(std::make_shared(veloxCfg_->rawConfigsCopy())); + auto fileSystem = velox::filesystems::getFileSystem(spillDir, nullptr); GLUTEN_CHECK(fileSystem != nullptr, "File System for spilling is null!"); fileSystem->mkdir(spillDir); @@ -100,6 +103,7 @@ WholeStageResultIterator::WholeStageResultIterator( std::unordered_set emptySet; velox::core::PlanFragment planFragment{planNode, velox::core::ExecutionStrategy::kUngrouped, 1, emptySet}; std::shared_ptr queryCtx = createNewVeloxQueryCtx(); + task_ = velox::exec::Task::create( fmt::format( "Gluten_Stage_{}_TID_{}_VTID_{}", @@ -689,4 +693,24 @@ std::shared_ptr WholeStageResultIterator::createConne return std::make_shared(std::move(configs)); } +void WholeStageResultIterator::doRegister(const std::shared_ptr& veloxCfg) { + std::lock_guard l(gluten::VeloxBackend::get()->registerMutex); + if (gluten::VeloxBackend::get()->lastSessionConf == nullptr) { + if (!gluten::VeloxBackend::get()->alreadyRegistered) { + gluten::VeloxBackend::get()->initConnector(veloxCfg); + gluten::VeloxBackend::get()->lastSessionConf = veloxCfg; + gluten::VeloxBackend::get()->alreadyRegistered = true; + } + return; + } + if (gluten::VeloxBackend::get()->lastSessionConf != nullptr && + gluten::VeloxBackend::get()->lastSessionConf->rawConfigs() != veloxCfg->rawConfigs()) { + if (!gluten::VeloxBackend::get()->alreadyRegistered) { + gluten::VeloxBackend::get()->initConnector(veloxCfg); + gluten::VeloxBackend::get()->lastSessionConf = veloxCfg; + gluten::VeloxBackend::get()->alreadyRegistered = true; + } +} +} + } // namespace gluten diff --git a/cpp/velox/compute/WholeStageResultIterator.h b/cpp/velox/compute/WholeStageResultIterator.h index bca377cf1980..9b6c31f92257 100644 --- a/cpp/velox/compute/WholeStageResultIterator.h +++ b/cpp/velox/compute/WholeStageResultIterator.h @@ -16,6 +16,7 @@ */ #pragma once +#include "VeloxBackend.h" #include "compute/Runtime.h" #include "iceberg/IcebergPlanConverter.h" #include "memory/ColumnarBatchIterator.h" @@ -50,6 +51,10 @@ class WholeStageResultIterator : public ColumnarBatchIterator { // calling .wait() may take no effect in single thread execution mode task_->requestCancel().wait(); } + { + std::lock_guard l(gluten::VeloxBackend::get()->registerMutex); + gluten::VeloxBackend::get()->alreadyRegistered = false; + } #ifdef GLUTEN_ENABLE_GPU if (enableCudf_) { unlockGpu(); @@ -128,6 +133,8 @@ class WholeStageResultIterator : public ColumnarBatchIterator { /// Metrics std::unique_ptr metrics_{}; + void doRegister(const std::shared_ptr& veloxCfg); + /// All the children plan node ids with postorder traversal. std::vector orderedNodeIds_; diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc index a96946080839..9eff0e58ee9b 100644 --- a/cpp/velox/jni/VeloxJniWrapper.cc +++ b/cpp/velox/jni/VeloxJniWrapper.cc @@ -821,7 +821,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_execution_IcebergWriteJniWrapper_ JNI_METHOD_START auto ctx = getRuntime(env, wrapper); auto runtime = dynamic_cast(ctx); - auto backendConf = VeloxBackend::get()->getBackendConf()->rawConfigs(); + auto backendConf = VeloxBackend::get()->getBackendConf()->rawConfigsCopy(); auto sparkConf = ctx->getConfMap(); sparkConf.merge(backendConf); auto safeArray = gluten::getByteArrayElementsSafe(env, partition); diff --git a/cpp/velox/utils/ConfigExtractor.cc b/cpp/velox/utils/ConfigExtractor.cc index 1d4cd7f8597b..1f64c3e1c726 100644 --- a/cpp/velox/utils/ConfigExtractor.cc +++ b/cpp/velox/utils/ConfigExtractor.cc @@ -135,7 +135,7 @@ void getS3HiveConfig( }; // Convert all Spark bucket configs to Velox bucket configs. - for (const auto& [key, value] : conf->rawConfigs()) { + for (const auto& [key, value] : conf->rawConfigsCopy()) { if (key.find(kSparkHadoopS3BucketPrefix) == 0) { std::string_view skey = key; auto remaining = skey.substr(kSparkHadoopS3BucketPrefix.size()); @@ -207,7 +207,7 @@ void getAbfsHiveConfig( #ifdef ENABLE_ABFS std::string_view kSparkHadoopPrefix = "spark.hadoop."; std::string_view kSparkHadoopAbfsPrefix = "spark.hadoop.fs.azure."; - for (const auto& [key, value] : conf->rawConfigs()) { + for (const auto& [key, value] : conf->rawConfigsCopy()) { if (key.find(kSparkHadoopAbfsPrefix) == 0) { // Remove the SparkHadoopPrefix hiveConfMap[key.substr(kSparkHadoopPrefix.size())] = value; diff --git a/cpp/velox/utils/VeloxWholeStageDumper.cc b/cpp/velox/utils/VeloxWholeStageDumper.cc index d8a451dac738..08581fc61de1 100644 --- a/cpp/velox/utils/VeloxWholeStageDumper.cc +++ b/cpp/velox/utils/VeloxWholeStageDumper.cc @@ -62,7 +62,7 @@ VeloxWholeStageDumper::VeloxWholeStageDumper( : taskInfo_(taskInfo), saveDir_(saveDir), batchSize_(batchSize), pool_(aggregatePool) {} void VeloxWholeStageDumper::dumpConf(const std::unordered_map& confMap) { - const auto& backendConfMap = VeloxBackend::get()->getBackendConf()->rawConfigs(); + const auto& backendConfMap = VeloxBackend::get()->getBackendConf()->rawConfigsCopy(); auto allConfMap = backendConfMap; for (const auto& pair : confMap) {