diff --git a/cpp/velox/compute/VeloxBackend.cc b/cpp/velox/compute/VeloxBackend.cc index a99cac823b0f..75fd84b104bb 100644 --- a/cpp/velox/compute/VeloxBackend.cc +++ b/cpp/velox/compute/VeloxBackend.cc @@ -28,21 +28,23 @@ #include "utils/qat/QatCodec.h" #endif #ifdef GLUTEN_ENABLE_GPU +#include "operators/plannodes/CudfVectorStream.h" #include "velox/experimental/cudf/CudfConfig.h" #include "velox/experimental/cudf/connectors/hive/CudfHiveConnector.h" #include "velox/experimental/cudf/exec/ToCudf.h" -#include "operators/plannodes/CudfVectorStream.h" #endif #include "compute/VeloxRuntime.h" #include "config/VeloxConfig.h" #include "jni/JniFileSystem.h" +#include "memory/GlutenBufferedInputBuilder.h" #include "operators/functions/SparkExprToSubfieldFilterParser.h" #include "shuffle/ArrowShuffleDictionaryWriter.h" #include "udf/UdfLoader.h" #include "utils/Exception.h" #include "velox/common/caching/SsdCache.h" #include "velox/common/file/FileSystems.h" +#include "velox/connectors/hive/BufferedInputBuilder.h" #include "velox/connectors/hive/HiveConnector.h" #include "velox/connectors/hive/HiveDataSource.h" #include "velox/connectors/hive/storage_adapters/abfs/RegisterAbfsFileSystem.h" // @manual @@ -190,6 +192,7 @@ void VeloxBackend::init( velox::parquet::registerParquetWriterFactory(); velox::orc::registerOrcReaderFactory(); velox::exec::ExprToSubfieldFilterParser::registerParser(std::make_unique()); + velox::connector::hive::BufferedInputBuilder::registerBuilder(std::make_shared()); // Register Velox functions registerAllFunctions(); diff --git a/cpp/velox/memory/GlutenBufferedInputBuilder.h b/cpp/velox/memory/GlutenBufferedInputBuilder.h new file mode 100644 index 000000000000..86116ff1e88e --- /dev/null +++ b/cpp/velox/memory/GlutenBufferedInputBuilder.h @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "GlutenDirectBufferedInput.h" +#include "velox/connectors/hive/BufferedInputBuilder.h" +#include "velox/connectors/hive/FileHandle.h" +#include "velox/dwio/common/CachedBufferedInput.h" + +namespace gluten { + +class GlutenBufferedInputBuilder : public facebook::velox::connector::hive::BufferedInputBuilder { + public: + std::unique_ptr create( + const facebook::velox::FileHandle& fileHandle, + const facebook::velox::dwio::common::ReaderOptions& readerOpts, + const facebook::velox::connector::ConnectorQueryCtx* connectorQueryCtx, + std::shared_ptr ioStats, + std::shared_ptr fsStats, + folly::Executor* executor, + const folly::F14FastMap& fileReadOps = {}) override { + if (connectorQueryCtx->cache()) { + return std::make_unique( + fileHandle.file, + dwio::common::MetricsLog::voidLog(), + fileHandle.uuid, + connectorQueryCtx->cache(), + facebook::velox::connector::Connector::getTracker(connectorQueryCtx->scanId(), readerOpts.loadQuantum()), + fileHandle.groupId, + ioStats, + std::move(fsStats), + executor, + readerOpts, + fileReadOps); + } + return std::make_unique( + fileHandle.file, + dwio::common::MetricsLog::voidLog(), + fileHandle.uuid, + facebook::velox::connector::Connector::getTracker(connectorQueryCtx->scanId(), readerOpts.loadQuantum()), + fileHandle.groupId, + std::move(ioStats), + std::move(fsStats), + executor, + readerOpts, + fileReadOps); + } +}; + +} // namespace gluten diff --git a/cpp/velox/memory/GlutenDirectBufferedInput.h b/cpp/velox/memory/GlutenDirectBufferedInput.h new file mode 100644 index 000000000000..edaff5c603d8 --- /dev/null +++ b/cpp/velox/memory/GlutenDirectBufferedInput.h @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/dwio/common/DirectBufferedInput.h" + +namespace gluten { + +class GlutenDirectBufferedInput : public facebook::velox::dwio::common::DirectBufferedInput { + public: + GlutenDirectBufferedInput( + std::shared_ptr readFile, + const facebook::velox::dwio::common::MetricsLogPtr& metricsLog, + facebook::velox::StringIdLease fileNum, + std::shared_ptr tracker, + facebook::velox::StringIdLease groupId, + std::shared_ptr ioStats, + std::shared_ptr fsStats, + folly::Executor* executor, + const facebook::velox::io::ReaderOptions& readerOptions, + folly::F14FastMap fileReadOps = {}) + : DirectBufferedInput( + std::move(readFile), + metricsLog, + std::move(fileNum), + std::move(tracker), + std::move(groupId), + std::move(ioStats), + std::move(fsStats), + executor, + readerOptions, + std::move(fileReadOps)) {} + + ~GlutenDirectBufferedInput() override { + requests_.clear(); + for (auto& load : coalescedLoads_) { + if (load->state() == facebook::velox::cache::CoalescedLoad::State::kLoading) { + folly::SemiFuture waitFuture(false); + if (!load->loadOrFuture(&waitFuture)) { + auto& exec = folly::QueuedImmediateExecutor::instance(); + std::move(waitFuture).via(&exec).wait(); + } + } + load->cancel(); + } + coalescedLoads_.clear(); + } +}; + +} // namespace gluten