From 5270350242e410121ef7e3f42377b1542646d699 Mon Sep 17 00:00:00 2001 From: Rui Mo Date: Mon, 19 Jan 2026 16:43:36 +0000 Subject: [PATCH 1/2] Add custom direct buffered input --- cpp/velox/CMakeLists.txt | 2 + cpp/velox/compute/VeloxBackend.cc | 5 +- cpp/velox/memory/GlutenBufferedInputBuilder.h | 65 +++++ cpp/velox/memory/GlutenDirectBufferedInput.cc | 228 ++++++++++++++++++ cpp/velox/memory/GlutenDirectBufferedInput.h | 204 ++++++++++++++++ cpp/velox/memory/GlutenDirectInputStream.cc | 211 ++++++++++++++++ cpp/velox/memory/GlutenDirectInputStream.h | 114 +++++++++ 7 files changed, 828 insertions(+), 1 deletion(-) create mode 100644 cpp/velox/memory/GlutenBufferedInputBuilder.h create mode 100644 cpp/velox/memory/GlutenDirectBufferedInput.cc create mode 100644 cpp/velox/memory/GlutenDirectBufferedInput.h create mode 100644 cpp/velox/memory/GlutenDirectInputStream.cc create mode 100644 cpp/velox/memory/GlutenDirectInputStream.h diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt index 56fab701ee07..e31ab80cf259 100644 --- a/cpp/velox/CMakeLists.txt +++ b/cpp/velox/CMakeLists.txt @@ -158,6 +158,8 @@ set(VELOX_SRCS jni/JniUdf.cc jni/VeloxJniWrapper.cc memory/BufferOutputStream.cc + memory/GlutenDirectBufferedInput.cc + memory/GlutenDirectInputStream.cc memory/VeloxColumnarBatch.cc memory/VeloxMemoryManager.cc operators/functions/RegistrationAllFunctions.cc diff --git a/cpp/velox/compute/VeloxBackend.cc b/cpp/velox/compute/VeloxBackend.cc index a99cac823b0f..75fd84b104bb 100644 --- a/cpp/velox/compute/VeloxBackend.cc +++ b/cpp/velox/compute/VeloxBackend.cc @@ -28,21 +28,23 @@ #include "utils/qat/QatCodec.h" #endif #ifdef GLUTEN_ENABLE_GPU +#include "operators/plannodes/CudfVectorStream.h" #include "velox/experimental/cudf/CudfConfig.h" #include "velox/experimental/cudf/connectors/hive/CudfHiveConnector.h" #include "velox/experimental/cudf/exec/ToCudf.h" -#include "operators/plannodes/CudfVectorStream.h" #endif #include "compute/VeloxRuntime.h" #include "config/VeloxConfig.h" #include "jni/JniFileSystem.h" +#include "memory/GlutenBufferedInputBuilder.h" #include "operators/functions/SparkExprToSubfieldFilterParser.h" #include "shuffle/ArrowShuffleDictionaryWriter.h" #include "udf/UdfLoader.h" #include "utils/Exception.h" #include "velox/common/caching/SsdCache.h" #include "velox/common/file/FileSystems.h" +#include "velox/connectors/hive/BufferedInputBuilder.h" #include "velox/connectors/hive/HiveConnector.h" #include "velox/connectors/hive/HiveDataSource.h" #include "velox/connectors/hive/storage_adapters/abfs/RegisterAbfsFileSystem.h" // @manual @@ -190,6 +192,7 @@ void VeloxBackend::init( velox::parquet::registerParquetWriterFactory(); velox::orc::registerOrcReaderFactory(); velox::exec::ExprToSubfieldFilterParser::registerParser(std::make_unique()); + velox::connector::hive::BufferedInputBuilder::registerBuilder(std::make_shared()); // Register Velox functions registerAllFunctions(); diff --git a/cpp/velox/memory/GlutenBufferedInputBuilder.h b/cpp/velox/memory/GlutenBufferedInputBuilder.h new file mode 100644 index 000000000000..86116ff1e88e --- /dev/null +++ b/cpp/velox/memory/GlutenBufferedInputBuilder.h @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "GlutenDirectBufferedInput.h" +#include "velox/connectors/hive/BufferedInputBuilder.h" +#include "velox/connectors/hive/FileHandle.h" +#include "velox/dwio/common/CachedBufferedInput.h" + +namespace gluten { + +class GlutenBufferedInputBuilder : public facebook::velox::connector::hive::BufferedInputBuilder { + public: + std::unique_ptr create( + const facebook::velox::FileHandle& fileHandle, + const facebook::velox::dwio::common::ReaderOptions& readerOpts, + const facebook::velox::connector::ConnectorQueryCtx* connectorQueryCtx, + std::shared_ptr ioStats, + std::shared_ptr fsStats, + folly::Executor* executor, + const folly::F14FastMap& fileReadOps = {}) override { + if (connectorQueryCtx->cache()) { + return std::make_unique( + fileHandle.file, + dwio::common::MetricsLog::voidLog(), + fileHandle.uuid, + connectorQueryCtx->cache(), + facebook::velox::connector::Connector::getTracker(connectorQueryCtx->scanId(), readerOpts.loadQuantum()), + fileHandle.groupId, + ioStats, + std::move(fsStats), + executor, + readerOpts, + fileReadOps); + } + return std::make_unique( + fileHandle.file, + dwio::common::MetricsLog::voidLog(), + fileHandle.uuid, + facebook::velox::connector::Connector::getTracker(connectorQueryCtx->scanId(), readerOpts.loadQuantum()), + fileHandle.groupId, + std::move(ioStats), + std::move(fsStats), + executor, + readerOpts, + fileReadOps); + } +}; + +} // namespace gluten diff --git a/cpp/velox/memory/GlutenDirectBufferedInput.cc b/cpp/velox/memory/GlutenDirectBufferedInput.cc new file mode 100644 index 000000000000..81fcfd283522 --- /dev/null +++ b/cpp/velox/memory/GlutenDirectBufferedInput.cc @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "GlutenDirectBufferedInput.h" +#include "GlutenDirectInputStream.h" +#include "velox/common/process/TraceContext.h" + +DECLARE_int32(cache_prefetch_min_pct); + +namespace gluten { + +using namespace facebook::velox::dwio::common; + +std::unique_ptr GlutenDirectBufferedInput::enqueue( + facebook::velox::common::Region region, + const StreamIdentifier* sid = nullptr) { + if (!coalescedLoads_.empty()) { + // Results of previous load are no more available here. + coalescedLoads_.clear(); + streamToCoalescedLoad_.wlock()->clear(); + } + if (region.length == 0) { + return std::make_unique(static_cast(nullptr), 0); + } + + facebook::velox::cache::TrackingId id; + if (sid != nullptr) { + id = facebook::velox::cache::TrackingId(sid->getId()); + } + VELOX_CHECK_LE(region.offset + region.length, fileSize_); + requests_.emplace_back(region, id); + if (tracker_) { + tracker_->recordReference(id, region.length, fileNum_.id(), groupId_.id()); + } + auto stream = std::make_unique( + this, ioStats_.get(), region, input_, fileNum_.id(), tracker_, id, groupId_.id(), options_.loadQuantum()); + requests_.back().stream = stream.get(); + return stream; +} + +bool GlutenDirectBufferedInput::isBuffered(uint64_t /*offset*/, uint64_t /*length*/) const { + return false; +} + +bool GlutenDirectBufferedInput::shouldPreload(int32_t numPages) { + return false; +} + +namespace { + +// True if the percentage is high enough to warrant prefetch. +bool isPrefetchablePct(int32_t pct) { + return pct >= FLAGS_cache_prefetch_min_pct; +} + +bool lessThan(const LoadRequest* left, const LoadRequest* right) { + return *left < *right; +} + +} // namespace + +void GlutenDirectBufferedInput::load(const LogType /*unused*/) { + // After load, new requests cannot be merged into pre-load ones. + auto requests = std::move(requests_); + std::vector storageLoad[2]; + for (auto& request : requests) { + facebook::velox::cache::TrackingData trackingData; + const bool prefetchAnyway = + request.trackingId.empty() || request.trackingId.id() == StreamIdentifier::sequentialFile().id_; + if (!prefetchAnyway && tracker_) { + trackingData = tracker_->trackingData(request.trackingId); + } + const int loadIndex = (prefetchAnyway || isPrefetchablePct(adjustedReadPct(trackingData))) ? 1 : 0; + storageLoad[loadIndex].push_back(&request); + } + std::sort(storageLoad[1].begin(), storageLoad[1].end(), lessThan); + std::sort(storageLoad[0].begin(), storageLoad[0].end(), lessThan); + std::vector groupEnds[2]; + groupEnds[1] = groupRequests(storageLoad[1], true); + moveCoalesced( + storageLoad[1], + groupEnds[1], + storageLoad[0], + [](auto* request) { return request->region.offset; }, + [](auto* request) { return request->region.offset + request->region.length; }); + groupEnds[0] = groupRequests(storageLoad[0], false); + readRegions(storageLoad[1], true, groupEnds[1]); + readRegions(storageLoad[0], false, groupEnds[0]); +} + +std::vector GlutenDirectBufferedInput::groupRequests(const std::vector& requests, bool prefetch) + const { + if (requests.empty() || (requests.size() < 2 && !prefetch)) { + // A single request has no other requests to coalesce with and is not + // eligible to prefetch. This will be loaded by itself on first use. + return {}; + } + const int32_t maxDistance = options_.maxCoalesceDistance(); + const auto loadQuantum = options_.loadQuantum(); + // If reading densely accessed, coalesce into large for best throughput, if + // for sparse, coalesce to quantum to reduce overread. Not all sparse access + // is correlated. + const auto maxCoalesceBytes = prefetch ? options_.maxCoalesceBytes() : loadQuantum; + + // Combine adjacent short reads. + int64_t coalescedBytes = 0; + std::vector ends; + ends.reserve(requests.size()); + std::vector ranges; + facebook::velox::coalesceIo( + requests, + maxDistance, + // Break batches up. Better load more short ones i parallel. + std::numeric_limits::max(), // limit coalesce by size, not count. + [&](int32_t index) { return requests[index]->region.offset; }, + [&](int32_t index) -> int32_t { + auto size = requests[index]->region.length; + if (size > loadQuantum) { + coalescedBytes += loadQuantum; + return loadQuantum; + } + coalescedBytes += size; + return size; + }, + [&](int32_t index) { + if (coalescedBytes > maxCoalesceBytes) { + coalescedBytes = 0; + return facebook::velox::kNoCoalesce; + } + return 1; + }, + [&](LoadRequest* /*request*/, std::vector& ranges) { + // ranges.size() is used in coalesceIo so we cannot leave it empty. + ranges.push_back(0); + }, + [&](int32_t /*gap*/, std::vector /*ranges*/) { /*no op*/ }, + [&](const std::vector& /*requests*/, + int32_t /*begin*/, + int32_t end, + uint64_t /*offset*/, + const std::vector& /*ranges*/) { ends.push_back(end); }); + return ends; +} + +void GlutenDirectBufferedInput::readRegion(const std::vector& requests, bool prefetch) { + if (requests.empty() || (requests.size() == 1 && !prefetch)) { + return; + } + auto load = std::make_shared( + input_, ioStats_, fsStats_, groupId_.id(), requests, pool_, options_.loadQuantum()); + coalescedLoads_.push_back(load); + streamToCoalescedLoad_.withWLock([&](auto& loads) { + for (auto& request : requests) { + loads[request->stream] = load; + } + }); +} + +void GlutenDirectBufferedInput::readRegions( + const std::vector& requests, + bool prefetch, + const std::vector& groupEnds) { + int i = 0; + std::vector group; + for (auto end : groupEnds) { + while (i < end) { + group.push_back(requests[i++]); + } + readRegion(group, prefetch); + group.clear(); + } + if (prefetch && executor_) { + for (auto i = 0; i < coalescedLoads_.size(); ++i) { + auto& load = coalescedLoads_[i]; + if (load->state() == facebook::velox::cache::CoalescedLoad::State::kPlanned) { + AsyncLoadHolder loadHolder{.load = load, .pool = pool_->shared_from_this()}; + executor_->add([asyncLoad = std::move(loadHolder)]() { + facebook::velox::process::TraceContext trace("Read Ahead"); + VELOX_CHECK_NOT_NULL(asyncLoad.load); + asyncLoad.load->loadOrFuture(nullptr); + }); + } + } + } +} + +std::shared_ptr GlutenDirectBufferedInput::coalescedLoad(const SeekableInputStream* stream) { + return streamToCoalescedLoad_.withWLock([&](auto& loads) -> std::shared_ptr { + auto it = loads.find(stream); + if (it == loads.cend()) { + return nullptr; + } + auto load = std::move(it->second); + loads.erase(it); + return load; + }); +} + +std::unique_ptr +GlutenDirectBufferedInput::read(uint64_t offset, uint64_t length, LogType /*logType*/) const { + VELOX_CHECK_LE(offset + length, fileSize_); + return std::make_unique( + const_cast(this), + ioStats_.get(), + facebook::velox::common::Region{offset, length}, + input_, + fileNum_.id(), + nullptr, + facebook::velox::cache::TrackingId(), + 0, + options_.loadQuantum()); +} + +} // namespace gluten diff --git a/cpp/velox/memory/GlutenDirectBufferedInput.h b/cpp/velox/memory/GlutenDirectBufferedInput.h new file mode 100644 index 000000000000..21ca8a1cbab5 --- /dev/null +++ b/cpp/velox/memory/GlutenDirectBufferedInput.h @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/dwio/common/DirectBufferedInput.h" + +namespace gluten { + +class GlutenDirectBufferedInput : public facebook::velox::dwio::common::BufferedInput { + public: + static constexpr int32_t kTinySize = 2'000; + + GlutenDirectBufferedInput( + std::shared_ptr readFile, + const facebook::velox::dwio::common::MetricsLogPtr& metricsLog, + facebook::velox::StringIdLease fileNum, + std::shared_ptr tracker, + facebook::velox::StringIdLease groupId, + std::shared_ptr ioStats, + std::shared_ptr fsStats, + folly::Executor* executor, + const facebook::velox::io::ReaderOptions& readerOptions, + folly::F14FastMap fileReadOps = {}) + : BufferedInput( + std::move(readFile), + readerOptions.memoryPool(), + metricsLog, + ioStats.get(), + fsStats.get(), + kMaxMergeDistance, + std::nullopt, + std::move(fileReadOps)), + fileNum_(std::move(fileNum)), + tracker_(std::move(tracker)), + groupId_(std::move(groupId)), + ioStats_(std::move(ioStats)), + fsStats_(std::move(fsStats)), + executor_(executor), + fileSize_(input_->getLength()), + options_(readerOptions) {} + + ~GlutenDirectBufferedInput() override { + requests_.clear(); + for (auto& load : coalescedLoads_) { + if (load->state() == facebook::velox::cache::CoalescedLoad::State::kLoading) { + folly::SemiFuture waitFuture(false); + if (!load->loadOrFuture(&waitFuture)) { + auto& exec = folly::QueuedImmediateExecutor::instance(); + std::move(waitFuture).via(&exec).wait(); + } + } + load->cancel(); + } + coalescedLoads_.clear(); + } + + std::unique_ptr enqueue( + facebook::velox::common::Region region, + const facebook::velox::dwio::common::StreamIdentifier* sid) override; + + bool supportSyncLoad() const override { + return false; + } + + void load(const facebook::velox::dwio::common::LogType /*unused*/) override; + + bool isBuffered(uint64_t offset, uint64_t length) const override; + + bool shouldPreload(int32_t numPages = 0) override; + + bool shouldPrefetchStripes() const override { + return false; + } + + void setNumStripes(int32_t numStripes) override { + auto* stats = tracker_->fileGroupStats(); + if (stats) { + stats->recordFile(fileNum_.id(), groupId_.id(), numStripes); + } + } + + virtual std::unique_ptr clone() const override { + return std::unique_ptr( + new GlutenDirectBufferedInput(input_, fileNum_, tracker_, groupId_, ioStats_, fsStats_, executor_, options_)); + } + + facebook::velox::memory::MemoryPool* pool() const { + return pool_; + } + + /// Returns the CoalescedLoad that contains the correlated loads for + /// 'stream' or nullptr if none. Returns nullptr on all but first + /// call for 'stream' since the load is to be triggered by the first + /// access. + std::shared_ptr coalescedLoad( + const facebook::velox::dwio::common::SeekableInputStream* stream); + + std::unique_ptr + read(uint64_t offset, uint64_t length, facebook::velox::dwio::common::LogType logType) const override; + + folly::Executor* executor() const override { + return executor_; + } + + uint64_t nextFetchSize() const override { + VELOX_NYI(); + } + + private: + /// Constructor used by clone(). + GlutenDirectBufferedInput( + std::shared_ptr input, + facebook::velox::StringIdLease fileNum, + std::shared_ptr tracker, + facebook::velox::StringIdLease groupId, + std::shared_ptr ioStats, + std::shared_ptr fsStats, + folly::Executor* executor, + const facebook::velox::io::ReaderOptions& readerOptions) + : BufferedInput(std::move(input), readerOptions.memoryPool()), + fileNum_(std::move(fileNum)), + tracker_(std::move(tracker)), + groupId_(std::move(groupId)), + ioStats_(std::move(ioStats)), + fsStats_(std::move(fsStats)), + executor_(executor), + fileSize_(input_->getLength()), + options_(readerOptions) {} + + std::vector groupRequests( + const std::vector& requests, + bool prefetch) const; + + // Makes a CoalescedLoad for 'requests' to be read together, coalescing IO if + // appropriate. If 'prefetch' is set, schedules the CoalescedLoad on + // 'executor_'. Links the CoalescedLoad to all DirectInputStreams that it + // covers. + void readRegion(const std::vector& requests, bool prefetch); + + // Read coalesced regions. Regions are grouped together using `groupEnds'. + // For example if there are 5 regions, 1 and 2 are coalesced together and 3, + // 4, 5 are coalesced together, we will have {2, 5} in `groupEnds'. + void readRegions( + const std::vector& requests, + bool prefetch, + const std::vector& groupEnds); + + // Holds the reference on the memory pool for async load in case of early task + // terminate. + struct AsyncLoadHolder { + std::shared_ptr load; + std::shared_ptr pool; + + ~AsyncLoadHolder() { + // Release the load reference before the memory pool reference. + // This is to make sure the memory pool is not destroyed before we free up + // the allocated buffers. + // This is to handle the case that the associated task has already + // destroyed before the async load is done. The async load holds + // the last reference to the memory pool in that case. + load.reset(); + pool.reset(); + } + }; + + const facebook::velox::StringIdLease fileNum_; + const std::shared_ptr tracker_; + const facebook::velox::StringIdLease groupId_; + const std::shared_ptr ioStats_; + const std::shared_ptr fsStats_; + folly::Executor* const executor_; + const uint64_t fileSize_; + + // Regions that are candidates for loading. + std::vector requests_; + + // Coalesced loads spanning multiple streams in one IO. + folly::Synchronized>> + streamToCoalescedLoad_; + + // Distinct coalesced loads in 'coalescedLoads_'. + std::vector> coalescedLoads_; + + facebook::velox::io::ReaderOptions options_; +}; + +} // namespace gluten diff --git a/cpp/velox/memory/GlutenDirectInputStream.cc b/cpp/velox/memory/GlutenDirectInputStream.cc new file mode 100644 index 000000000000..b553a73cad87 --- /dev/null +++ b/cpp/velox/memory/GlutenDirectInputStream.cc @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "GlutenDirectInputStream.h" +#include "GlutenDirectBufferedInput.h" +#include "velox/common/process/TraceContext.h" + +namespace gluten { + +using facebook::velox::cache::ScanTracker; +using facebook::velox::cache::TrackingId; +using facebook::velox::common::Region; +using facebook::velox::memory::MemoryAllocator; + +using namespace facebook::velox::dwio::common; + +GlutenDirectInputStream::GlutenDirectInputStream( + GlutenDirectBufferedInput* bufferedInput, + IoStatistics* ioStats, + const Region& region, + std::shared_ptr input, + uint64_t fileNum, + std::shared_ptr tracker, + TrackingId trackingId, + uint64_t groupId, + int32_t loadQuantum) + : bufferedInput_(bufferedInput), + ioStats_(ioStats), + input_(std::move(input)), + region_(region), + fileNum_(fileNum), + tracker_(std::move(tracker)), + trackingId_(trackingId), + groupId_(groupId), + loadQuantum_(loadQuantum) {} + +bool GlutenDirectInputStream::Next(const void** buffer, int32_t* size) { + if (offsetInRegion_ >= region_.length) { + *size = 0; + return false; + } + loadPosition(); + + *buffer = reinterpret_cast(run_ + offsetInRun_); + *size = runSize_ - offsetInRun_; + if (offsetInRegion_ + *size > region_.length) { + *size = region_.length - offsetInRegion_; + } + offsetInRun_ += *size; + offsetInRegion_ += *size; + + if (tracker_) { + tracker_->recordRead(trackingId_, *size, fileNum_, groupId_); + } + return true; +} + +void GlutenDirectInputStream::BackUp(int32_t count) { + VELOX_CHECK_GE(count, 0, "can't backup negative distances"); + + const uint64_t unsignedCount = static_cast(count); + VELOX_CHECK_LE(unsignedCount, offsetInRun_, "Can't backup that much!"); + offsetInRegion_ -= unsignedCount; +} + +bool GlutenDirectInputStream::SkipInt64(int64_t count) { + if (count < 0) { + return false; + } + const uint64_t unsignedCount = static_cast(count); + if (unsignedCount + offsetInRegion_ <= region_.length) { + offsetInRegion_ += unsignedCount; + return true; + } + offsetInRegion_ = region_.length; + return false; +} + +int64_t GlutenDirectInputStream::ByteCount() const { + return static_cast(offsetInRegion_); +} + +void GlutenDirectInputStream::seekToPosition(PositionProvider& seekPosition) { + offsetInRegion_ = seekPosition.next(); + VELOX_CHECK_LE(offsetInRegion_, region_.length); +} + +std::string GlutenDirectInputStream::getName() const { + return fmt::format("GlutenDirectInputStream {} of {}", offsetInRegion_, region_.length); +} + +size_t GlutenDirectInputStream::positionSize() const { + // not compressed, so only need 1 position (uncompressed position) + return 1; +} + +namespace { +std::vector> +makeRanges(size_t size, facebook::velox::memory::Allocation& data, std::string& tinyData) { + std::vector> buffers; + if (data.numPages() > 0) { + buffers.reserve(data.numRuns()); + uint64_t offsetInRuns = 0; + for (int i = 0; i < data.numRuns(); ++i) { + auto run = data.runAt(i); + uint64_t bytes = facebook::velox::memory::AllocationTraits::pageBytes(run.numPages()); + uint64_t readSize = std::min(bytes, size - offsetInRuns); + buffers.push_back(folly::Range(run.data(), readSize)); + offsetInRuns += readSize; + } + } else { + buffers.push_back(folly::Range(tinyData.data(), size)); + } + return buffers; +} +} // namespace + +void GlutenDirectInputStream::loadSync() { + if (region_.length < GlutenDirectBufferedInput::kTinySize && data_.numPages() == 0) { + tinyData_.resize(region_.length); + } else { + const auto numPages = facebook::velox::memory::AllocationTraits::numPages(loadedRegion_.length); + if (numPages > data_.numPages()) { + bufferedInput_->pool()->allocateNonContiguous(numPages, data_); + } + } + + facebook::velox::process::TraceContext trace("DirectInputStream::loadSync"); + + ioStats_->incRawBytesRead(loadedRegion_.length); + auto ranges = makeRanges(loadedRegion_.length, data_, tinyData_); + uint64_t usecs = 0; + { + facebook::velox::MicrosecondTimer timer(&usecs); + input_->read(ranges, loadedRegion_.offset, LogType::FILE); + } + ioStats_->read().increment(loadedRegion_.length); + ioStats_->queryThreadIoLatency().increment(usecs); + ioStats_->incTotalScanTime(usecs * 1'000); +} + +void GlutenDirectInputStream::loadPosition() { + VELOX_CHECK_LT(offsetInRegion_, region_.length); + if (!loaded_) { + loaded_ = true; + auto load = bufferedInput_->coalescedLoad(this); + if (load != nullptr) { + folly::SemiFuture waitFuture(false); + uint64_t loadUs = 0; + { + facebook::velox::MicrosecondTimer timer(&loadUs); + if (!load->loadOrFuture(&waitFuture)) { + waitFuture.wait(); + } + loadedRegion_.offset = region_.offset; + loadedRegion_.length = load->getData(region_.offset, data_, tinyData_); + } + ioStats_->queryThreadIoLatency().increment(loadUs); + } else { + // Standalone stream, not part of coalesced load. + loadedRegion_.offset = 0; + loadedRegion_.length = 0; + } + } + + // Check if position outside of loaded bounds. + if (loadedRegion_.length == 0 || region_.offset + offsetInRegion_ < loadedRegion_.offset || + region_.offset + offsetInRegion_ >= loadedRegion_.offset + loadedRegion_.length) { + loadedRegion_.offset = region_.offset + offsetInRegion_; + loadedRegion_.length = + (offsetInRegion_ + loadQuantum_ <= region_.length) ? loadQuantum_ : (region_.length - offsetInRegion_); + + // Since the loadSync method updates the metric, but it is conditionally + // executed, we also need to update the metric in the loadData method. + loadSync(); + } + + const auto offsetInData = offsetInRegion_ - (loadedRegion_.offset - region_.offset); + if (data_.numPages() == 0) { + run_ = reinterpret_cast(tinyData_.data()); + runSize_ = tinyData_.size(); + offsetInRun_ = offsetInData; + offsetOfRun_ = 0; + } else { + data_.findRun(offsetInData, &runIndex_, &offsetInRun_); + offsetOfRun_ = offsetInData - offsetInRun_; + auto run = data_.runAt(runIndex_); + run_ = run.data(); + runSize_ = facebook::velox::memory::AllocationTraits::pageBytes(run.numPages()); + if (offsetOfRun_ + runSize_ > loadedRegion_.length) { + runSize_ = loadedRegion_.length - offsetOfRun_; + } + } + VELOX_CHECK_LT(offsetInRun_, runSize_); +} + +} // namespace gluten diff --git a/cpp/velox/memory/GlutenDirectInputStream.h b/cpp/velox/memory/GlutenDirectInputStream.h new file mode 100644 index 000000000000..826fc3f9a5e6 --- /dev/null +++ b/cpp/velox/memory/GlutenDirectInputStream.h @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/common/caching/ScanTracker.h" +#include "velox/dwio/common/SeekableInputStream.h" + +namespace gluten { + +class GlutenDirectBufferedInput; + +/// An input stream over possibly coalesced loads. Created by +/// GlutenDirectBufferedInput. Similar to CacheInputStream but does not use cache. +class GlutenDirectInputStream : public facebook::velox::dwio::common::SeekableInputStream { + public: + GlutenDirectInputStream( + GlutenDirectBufferedInput* bufferedInput, + facebook::velox::io::IoStatistics* ioStats, + const facebook::velox::common::Region& region, + std::shared_ptr input, + uint64_t fileNum, + std::shared_ptr tracker, + facebook::velox::cache::TrackingId trackingId, + uint64_t groupId, + int32_t loadQuantum); + + bool Next(const void** data, int* size) override; + void BackUp(int count) override; + bool SkipInt64(int64_t count) override; + int64_t ByteCount() const override; + + void seekToPosition(facebook::velox::dwio::common::PositionProvider& position) override; + std::string getName() const override; + size_t positionSize() const override; + + /// Testing function to access loaded state. + void testingData( + facebook::velox::common::Region& loadedRegion, + facebook::velox::memory::Allocation*& data, + std::string*& tinyData) { + loadedRegion = loadedRegion_; + data = &data_; + tinyData = &tinyData_; + } + + private: + // Ensures that the current position is covered by 'data_'. + void loadPosition(); + + // Synchronously sets 'data_' to cover loadedRegion_'. + void loadSync(); + + GlutenDirectBufferedInput* const bufferedInput_; + facebook::velox::io::IoStatistics* const ioStats_; + const std::shared_ptr input_; + // The region of 'input' 'this' ranges over. + const facebook::velox::common::Region region_; + const uint64_t fileNum_; + std::shared_ptr tracker_; + const facebook::velox::cache::TrackingId trackingId_; + const uint64_t groupId_; + + // Maximum number of bytes read from 'input' at a time. + const int32_t loadQuantum_; + + // The part of 'region_' that is loaded into 'data_'/'tinyData_'. Relative to + // file start. + facebook::velox::common::Region loadedRegion_; + + // Allocation with loaded data. Has space for region.length or loadQuantum_ + // bytes, whichever is less. + facebook::velox::memory::Allocation data_; + + // Contains the data if the range is too small for Allocation. + std::string tinyData_; + + // Pointer to start of current run in 'entry->data()' or + // 'entry->tinyData()'. + uint8_t* run_{nullptr}; + + // Offset of current run from start of 'data_' + uint64_t offsetOfRun_; + + // Position of stream relative to 'run_'. + int offsetInRun_{0}; + + // Index of run in 'data_' + int runIndex_ = -1; + + // Number of valid bytes starting at 'run_' + uint32_t runSize_ = 0; + // Position relative to 'region_.offset'. + uint64_t offsetInRegion_ = 0; + + // Set to true when data is first loaded. + bool loaded_{false}; +}; + +} // namespace gluten From 57771e4edff2ccde3a38f196af761b5234dce072 Mon Sep 17 00:00:00 2001 From: Rui Mo Date: Thu, 29 Jan 2026 14:33:08 +0000 Subject: [PATCH 2/2] Override DirectBufferedInput --- cpp/velox/CMakeLists.txt | 2 - cpp/velox/memory/GlutenDirectBufferedInput.cc | 228 ------------------ cpp/velox/memory/GlutenDirectBufferedInput.h | 159 +----------- cpp/velox/memory/GlutenDirectInputStream.cc | 211 ---------------- cpp/velox/memory/GlutenDirectInputStream.h | 114 --------- 5 files changed, 10 insertions(+), 704 deletions(-) delete mode 100644 cpp/velox/memory/GlutenDirectBufferedInput.cc delete mode 100644 cpp/velox/memory/GlutenDirectInputStream.cc delete mode 100644 cpp/velox/memory/GlutenDirectInputStream.h diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt index e31ab80cf259..56fab701ee07 100644 --- a/cpp/velox/CMakeLists.txt +++ b/cpp/velox/CMakeLists.txt @@ -158,8 +158,6 @@ set(VELOX_SRCS jni/JniUdf.cc jni/VeloxJniWrapper.cc memory/BufferOutputStream.cc - memory/GlutenDirectBufferedInput.cc - memory/GlutenDirectInputStream.cc memory/VeloxColumnarBatch.cc memory/VeloxMemoryManager.cc operators/functions/RegistrationAllFunctions.cc diff --git a/cpp/velox/memory/GlutenDirectBufferedInput.cc b/cpp/velox/memory/GlutenDirectBufferedInput.cc deleted file mode 100644 index 81fcfd283522..000000000000 --- a/cpp/velox/memory/GlutenDirectBufferedInput.cc +++ /dev/null @@ -1,228 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "GlutenDirectBufferedInput.h" -#include "GlutenDirectInputStream.h" -#include "velox/common/process/TraceContext.h" - -DECLARE_int32(cache_prefetch_min_pct); - -namespace gluten { - -using namespace facebook::velox::dwio::common; - -std::unique_ptr GlutenDirectBufferedInput::enqueue( - facebook::velox::common::Region region, - const StreamIdentifier* sid = nullptr) { - if (!coalescedLoads_.empty()) { - // Results of previous load are no more available here. - coalescedLoads_.clear(); - streamToCoalescedLoad_.wlock()->clear(); - } - if (region.length == 0) { - return std::make_unique(static_cast(nullptr), 0); - } - - facebook::velox::cache::TrackingId id; - if (sid != nullptr) { - id = facebook::velox::cache::TrackingId(sid->getId()); - } - VELOX_CHECK_LE(region.offset + region.length, fileSize_); - requests_.emplace_back(region, id); - if (tracker_) { - tracker_->recordReference(id, region.length, fileNum_.id(), groupId_.id()); - } - auto stream = std::make_unique( - this, ioStats_.get(), region, input_, fileNum_.id(), tracker_, id, groupId_.id(), options_.loadQuantum()); - requests_.back().stream = stream.get(); - return stream; -} - -bool GlutenDirectBufferedInput::isBuffered(uint64_t /*offset*/, uint64_t /*length*/) const { - return false; -} - -bool GlutenDirectBufferedInput::shouldPreload(int32_t numPages) { - return false; -} - -namespace { - -// True if the percentage is high enough to warrant prefetch. -bool isPrefetchablePct(int32_t pct) { - return pct >= FLAGS_cache_prefetch_min_pct; -} - -bool lessThan(const LoadRequest* left, const LoadRequest* right) { - return *left < *right; -} - -} // namespace - -void GlutenDirectBufferedInput::load(const LogType /*unused*/) { - // After load, new requests cannot be merged into pre-load ones. - auto requests = std::move(requests_); - std::vector storageLoad[2]; - for (auto& request : requests) { - facebook::velox::cache::TrackingData trackingData; - const bool prefetchAnyway = - request.trackingId.empty() || request.trackingId.id() == StreamIdentifier::sequentialFile().id_; - if (!prefetchAnyway && tracker_) { - trackingData = tracker_->trackingData(request.trackingId); - } - const int loadIndex = (prefetchAnyway || isPrefetchablePct(adjustedReadPct(trackingData))) ? 1 : 0; - storageLoad[loadIndex].push_back(&request); - } - std::sort(storageLoad[1].begin(), storageLoad[1].end(), lessThan); - std::sort(storageLoad[0].begin(), storageLoad[0].end(), lessThan); - std::vector groupEnds[2]; - groupEnds[1] = groupRequests(storageLoad[1], true); - moveCoalesced( - storageLoad[1], - groupEnds[1], - storageLoad[0], - [](auto* request) { return request->region.offset; }, - [](auto* request) { return request->region.offset + request->region.length; }); - groupEnds[0] = groupRequests(storageLoad[0], false); - readRegions(storageLoad[1], true, groupEnds[1]); - readRegions(storageLoad[0], false, groupEnds[0]); -} - -std::vector GlutenDirectBufferedInput::groupRequests(const std::vector& requests, bool prefetch) - const { - if (requests.empty() || (requests.size() < 2 && !prefetch)) { - // A single request has no other requests to coalesce with and is not - // eligible to prefetch. This will be loaded by itself on first use. - return {}; - } - const int32_t maxDistance = options_.maxCoalesceDistance(); - const auto loadQuantum = options_.loadQuantum(); - // If reading densely accessed, coalesce into large for best throughput, if - // for sparse, coalesce to quantum to reduce overread. Not all sparse access - // is correlated. - const auto maxCoalesceBytes = prefetch ? options_.maxCoalesceBytes() : loadQuantum; - - // Combine adjacent short reads. - int64_t coalescedBytes = 0; - std::vector ends; - ends.reserve(requests.size()); - std::vector ranges; - facebook::velox::coalesceIo( - requests, - maxDistance, - // Break batches up. Better load more short ones i parallel. - std::numeric_limits::max(), // limit coalesce by size, not count. - [&](int32_t index) { return requests[index]->region.offset; }, - [&](int32_t index) -> int32_t { - auto size = requests[index]->region.length; - if (size > loadQuantum) { - coalescedBytes += loadQuantum; - return loadQuantum; - } - coalescedBytes += size; - return size; - }, - [&](int32_t index) { - if (coalescedBytes > maxCoalesceBytes) { - coalescedBytes = 0; - return facebook::velox::kNoCoalesce; - } - return 1; - }, - [&](LoadRequest* /*request*/, std::vector& ranges) { - // ranges.size() is used in coalesceIo so we cannot leave it empty. - ranges.push_back(0); - }, - [&](int32_t /*gap*/, std::vector /*ranges*/) { /*no op*/ }, - [&](const std::vector& /*requests*/, - int32_t /*begin*/, - int32_t end, - uint64_t /*offset*/, - const std::vector& /*ranges*/) { ends.push_back(end); }); - return ends; -} - -void GlutenDirectBufferedInput::readRegion(const std::vector& requests, bool prefetch) { - if (requests.empty() || (requests.size() == 1 && !prefetch)) { - return; - } - auto load = std::make_shared( - input_, ioStats_, fsStats_, groupId_.id(), requests, pool_, options_.loadQuantum()); - coalescedLoads_.push_back(load); - streamToCoalescedLoad_.withWLock([&](auto& loads) { - for (auto& request : requests) { - loads[request->stream] = load; - } - }); -} - -void GlutenDirectBufferedInput::readRegions( - const std::vector& requests, - bool prefetch, - const std::vector& groupEnds) { - int i = 0; - std::vector group; - for (auto end : groupEnds) { - while (i < end) { - group.push_back(requests[i++]); - } - readRegion(group, prefetch); - group.clear(); - } - if (prefetch && executor_) { - for (auto i = 0; i < coalescedLoads_.size(); ++i) { - auto& load = coalescedLoads_[i]; - if (load->state() == facebook::velox::cache::CoalescedLoad::State::kPlanned) { - AsyncLoadHolder loadHolder{.load = load, .pool = pool_->shared_from_this()}; - executor_->add([asyncLoad = std::move(loadHolder)]() { - facebook::velox::process::TraceContext trace("Read Ahead"); - VELOX_CHECK_NOT_NULL(asyncLoad.load); - asyncLoad.load->loadOrFuture(nullptr); - }); - } - } - } -} - -std::shared_ptr GlutenDirectBufferedInput::coalescedLoad(const SeekableInputStream* stream) { - return streamToCoalescedLoad_.withWLock([&](auto& loads) -> std::shared_ptr { - auto it = loads.find(stream); - if (it == loads.cend()) { - return nullptr; - } - auto load = std::move(it->second); - loads.erase(it); - return load; - }); -} - -std::unique_ptr -GlutenDirectBufferedInput::read(uint64_t offset, uint64_t length, LogType /*logType*/) const { - VELOX_CHECK_LE(offset + length, fileSize_); - return std::make_unique( - const_cast(this), - ioStats_.get(), - facebook::velox::common::Region{offset, length}, - input_, - fileNum_.id(), - nullptr, - facebook::velox::cache::TrackingId(), - 0, - options_.loadQuantum()); -} - -} // namespace gluten diff --git a/cpp/velox/memory/GlutenDirectBufferedInput.h b/cpp/velox/memory/GlutenDirectBufferedInput.h index 21ca8a1cbab5..edaff5c603d8 100644 --- a/cpp/velox/memory/GlutenDirectBufferedInput.h +++ b/cpp/velox/memory/GlutenDirectBufferedInput.h @@ -21,10 +21,8 @@ namespace gluten { -class GlutenDirectBufferedInput : public facebook::velox::dwio::common::BufferedInput { +class GlutenDirectBufferedInput : public facebook::velox::dwio::common::DirectBufferedInput { public: - static constexpr int32_t kTinySize = 2'000; - GlutenDirectBufferedInput( std::shared_ptr readFile, const facebook::velox::dwio::common::MetricsLogPtr& metricsLog, @@ -36,23 +34,17 @@ class GlutenDirectBufferedInput : public facebook::velox::dwio::common::Buffered folly::Executor* executor, const facebook::velox::io::ReaderOptions& readerOptions, folly::F14FastMap fileReadOps = {}) - : BufferedInput( + : DirectBufferedInput( std::move(readFile), - readerOptions.memoryPool(), metricsLog, - ioStats.get(), - fsStats.get(), - kMaxMergeDistance, - std::nullopt, - std::move(fileReadOps)), - fileNum_(std::move(fileNum)), - tracker_(std::move(tracker)), - groupId_(std::move(groupId)), - ioStats_(std::move(ioStats)), - fsStats_(std::move(fsStats)), - executor_(executor), - fileSize_(input_->getLength()), - options_(readerOptions) {} + std::move(fileNum), + std::move(tracker), + std::move(groupId), + std::move(ioStats), + std::move(fsStats), + executor, + readerOptions, + std::move(fileReadOps)) {} ~GlutenDirectBufferedInput() override { requests_.clear(); @@ -68,137 +60,6 @@ class GlutenDirectBufferedInput : public facebook::velox::dwio::common::Buffered } coalescedLoads_.clear(); } - - std::unique_ptr enqueue( - facebook::velox::common::Region region, - const facebook::velox::dwio::common::StreamIdentifier* sid) override; - - bool supportSyncLoad() const override { - return false; - } - - void load(const facebook::velox::dwio::common::LogType /*unused*/) override; - - bool isBuffered(uint64_t offset, uint64_t length) const override; - - bool shouldPreload(int32_t numPages = 0) override; - - bool shouldPrefetchStripes() const override { - return false; - } - - void setNumStripes(int32_t numStripes) override { - auto* stats = tracker_->fileGroupStats(); - if (stats) { - stats->recordFile(fileNum_.id(), groupId_.id(), numStripes); - } - } - - virtual std::unique_ptr clone() const override { - return std::unique_ptr( - new GlutenDirectBufferedInput(input_, fileNum_, tracker_, groupId_, ioStats_, fsStats_, executor_, options_)); - } - - facebook::velox::memory::MemoryPool* pool() const { - return pool_; - } - - /// Returns the CoalescedLoad that contains the correlated loads for - /// 'stream' or nullptr if none. Returns nullptr on all but first - /// call for 'stream' since the load is to be triggered by the first - /// access. - std::shared_ptr coalescedLoad( - const facebook::velox::dwio::common::SeekableInputStream* stream); - - std::unique_ptr - read(uint64_t offset, uint64_t length, facebook::velox::dwio::common::LogType logType) const override; - - folly::Executor* executor() const override { - return executor_; - } - - uint64_t nextFetchSize() const override { - VELOX_NYI(); - } - - private: - /// Constructor used by clone(). - GlutenDirectBufferedInput( - std::shared_ptr input, - facebook::velox::StringIdLease fileNum, - std::shared_ptr tracker, - facebook::velox::StringIdLease groupId, - std::shared_ptr ioStats, - std::shared_ptr fsStats, - folly::Executor* executor, - const facebook::velox::io::ReaderOptions& readerOptions) - : BufferedInput(std::move(input), readerOptions.memoryPool()), - fileNum_(std::move(fileNum)), - tracker_(std::move(tracker)), - groupId_(std::move(groupId)), - ioStats_(std::move(ioStats)), - fsStats_(std::move(fsStats)), - executor_(executor), - fileSize_(input_->getLength()), - options_(readerOptions) {} - - std::vector groupRequests( - const std::vector& requests, - bool prefetch) const; - - // Makes a CoalescedLoad for 'requests' to be read together, coalescing IO if - // appropriate. If 'prefetch' is set, schedules the CoalescedLoad on - // 'executor_'. Links the CoalescedLoad to all DirectInputStreams that it - // covers. - void readRegion(const std::vector& requests, bool prefetch); - - // Read coalesced regions. Regions are grouped together using `groupEnds'. - // For example if there are 5 regions, 1 and 2 are coalesced together and 3, - // 4, 5 are coalesced together, we will have {2, 5} in `groupEnds'. - void readRegions( - const std::vector& requests, - bool prefetch, - const std::vector& groupEnds); - - // Holds the reference on the memory pool for async load in case of early task - // terminate. - struct AsyncLoadHolder { - std::shared_ptr load; - std::shared_ptr pool; - - ~AsyncLoadHolder() { - // Release the load reference before the memory pool reference. - // This is to make sure the memory pool is not destroyed before we free up - // the allocated buffers. - // This is to handle the case that the associated task has already - // destroyed before the async load is done. The async load holds - // the last reference to the memory pool in that case. - load.reset(); - pool.reset(); - } - }; - - const facebook::velox::StringIdLease fileNum_; - const std::shared_ptr tracker_; - const facebook::velox::StringIdLease groupId_; - const std::shared_ptr ioStats_; - const std::shared_ptr fsStats_; - folly::Executor* const executor_; - const uint64_t fileSize_; - - // Regions that are candidates for loading. - std::vector requests_; - - // Coalesced loads spanning multiple streams in one IO. - folly::Synchronized>> - streamToCoalescedLoad_; - - // Distinct coalesced loads in 'coalescedLoads_'. - std::vector> coalescedLoads_; - - facebook::velox::io::ReaderOptions options_; }; } // namespace gluten diff --git a/cpp/velox/memory/GlutenDirectInputStream.cc b/cpp/velox/memory/GlutenDirectInputStream.cc deleted file mode 100644 index b553a73cad87..000000000000 --- a/cpp/velox/memory/GlutenDirectInputStream.cc +++ /dev/null @@ -1,211 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "GlutenDirectInputStream.h" -#include "GlutenDirectBufferedInput.h" -#include "velox/common/process/TraceContext.h" - -namespace gluten { - -using facebook::velox::cache::ScanTracker; -using facebook::velox::cache::TrackingId; -using facebook::velox::common::Region; -using facebook::velox::memory::MemoryAllocator; - -using namespace facebook::velox::dwio::common; - -GlutenDirectInputStream::GlutenDirectInputStream( - GlutenDirectBufferedInput* bufferedInput, - IoStatistics* ioStats, - const Region& region, - std::shared_ptr input, - uint64_t fileNum, - std::shared_ptr tracker, - TrackingId trackingId, - uint64_t groupId, - int32_t loadQuantum) - : bufferedInput_(bufferedInput), - ioStats_(ioStats), - input_(std::move(input)), - region_(region), - fileNum_(fileNum), - tracker_(std::move(tracker)), - trackingId_(trackingId), - groupId_(groupId), - loadQuantum_(loadQuantum) {} - -bool GlutenDirectInputStream::Next(const void** buffer, int32_t* size) { - if (offsetInRegion_ >= region_.length) { - *size = 0; - return false; - } - loadPosition(); - - *buffer = reinterpret_cast(run_ + offsetInRun_); - *size = runSize_ - offsetInRun_; - if (offsetInRegion_ + *size > region_.length) { - *size = region_.length - offsetInRegion_; - } - offsetInRun_ += *size; - offsetInRegion_ += *size; - - if (tracker_) { - tracker_->recordRead(trackingId_, *size, fileNum_, groupId_); - } - return true; -} - -void GlutenDirectInputStream::BackUp(int32_t count) { - VELOX_CHECK_GE(count, 0, "can't backup negative distances"); - - const uint64_t unsignedCount = static_cast(count); - VELOX_CHECK_LE(unsignedCount, offsetInRun_, "Can't backup that much!"); - offsetInRegion_ -= unsignedCount; -} - -bool GlutenDirectInputStream::SkipInt64(int64_t count) { - if (count < 0) { - return false; - } - const uint64_t unsignedCount = static_cast(count); - if (unsignedCount + offsetInRegion_ <= region_.length) { - offsetInRegion_ += unsignedCount; - return true; - } - offsetInRegion_ = region_.length; - return false; -} - -int64_t GlutenDirectInputStream::ByteCount() const { - return static_cast(offsetInRegion_); -} - -void GlutenDirectInputStream::seekToPosition(PositionProvider& seekPosition) { - offsetInRegion_ = seekPosition.next(); - VELOX_CHECK_LE(offsetInRegion_, region_.length); -} - -std::string GlutenDirectInputStream::getName() const { - return fmt::format("GlutenDirectInputStream {} of {}", offsetInRegion_, region_.length); -} - -size_t GlutenDirectInputStream::positionSize() const { - // not compressed, so only need 1 position (uncompressed position) - return 1; -} - -namespace { -std::vector> -makeRanges(size_t size, facebook::velox::memory::Allocation& data, std::string& tinyData) { - std::vector> buffers; - if (data.numPages() > 0) { - buffers.reserve(data.numRuns()); - uint64_t offsetInRuns = 0; - for (int i = 0; i < data.numRuns(); ++i) { - auto run = data.runAt(i); - uint64_t bytes = facebook::velox::memory::AllocationTraits::pageBytes(run.numPages()); - uint64_t readSize = std::min(bytes, size - offsetInRuns); - buffers.push_back(folly::Range(run.data(), readSize)); - offsetInRuns += readSize; - } - } else { - buffers.push_back(folly::Range(tinyData.data(), size)); - } - return buffers; -} -} // namespace - -void GlutenDirectInputStream::loadSync() { - if (region_.length < GlutenDirectBufferedInput::kTinySize && data_.numPages() == 0) { - tinyData_.resize(region_.length); - } else { - const auto numPages = facebook::velox::memory::AllocationTraits::numPages(loadedRegion_.length); - if (numPages > data_.numPages()) { - bufferedInput_->pool()->allocateNonContiguous(numPages, data_); - } - } - - facebook::velox::process::TraceContext trace("DirectInputStream::loadSync"); - - ioStats_->incRawBytesRead(loadedRegion_.length); - auto ranges = makeRanges(loadedRegion_.length, data_, tinyData_); - uint64_t usecs = 0; - { - facebook::velox::MicrosecondTimer timer(&usecs); - input_->read(ranges, loadedRegion_.offset, LogType::FILE); - } - ioStats_->read().increment(loadedRegion_.length); - ioStats_->queryThreadIoLatency().increment(usecs); - ioStats_->incTotalScanTime(usecs * 1'000); -} - -void GlutenDirectInputStream::loadPosition() { - VELOX_CHECK_LT(offsetInRegion_, region_.length); - if (!loaded_) { - loaded_ = true; - auto load = bufferedInput_->coalescedLoad(this); - if (load != nullptr) { - folly::SemiFuture waitFuture(false); - uint64_t loadUs = 0; - { - facebook::velox::MicrosecondTimer timer(&loadUs); - if (!load->loadOrFuture(&waitFuture)) { - waitFuture.wait(); - } - loadedRegion_.offset = region_.offset; - loadedRegion_.length = load->getData(region_.offset, data_, tinyData_); - } - ioStats_->queryThreadIoLatency().increment(loadUs); - } else { - // Standalone stream, not part of coalesced load. - loadedRegion_.offset = 0; - loadedRegion_.length = 0; - } - } - - // Check if position outside of loaded bounds. - if (loadedRegion_.length == 0 || region_.offset + offsetInRegion_ < loadedRegion_.offset || - region_.offset + offsetInRegion_ >= loadedRegion_.offset + loadedRegion_.length) { - loadedRegion_.offset = region_.offset + offsetInRegion_; - loadedRegion_.length = - (offsetInRegion_ + loadQuantum_ <= region_.length) ? loadQuantum_ : (region_.length - offsetInRegion_); - - // Since the loadSync method updates the metric, but it is conditionally - // executed, we also need to update the metric in the loadData method. - loadSync(); - } - - const auto offsetInData = offsetInRegion_ - (loadedRegion_.offset - region_.offset); - if (data_.numPages() == 0) { - run_ = reinterpret_cast(tinyData_.data()); - runSize_ = tinyData_.size(); - offsetInRun_ = offsetInData; - offsetOfRun_ = 0; - } else { - data_.findRun(offsetInData, &runIndex_, &offsetInRun_); - offsetOfRun_ = offsetInData - offsetInRun_; - auto run = data_.runAt(runIndex_); - run_ = run.data(); - runSize_ = facebook::velox::memory::AllocationTraits::pageBytes(run.numPages()); - if (offsetOfRun_ + runSize_ > loadedRegion_.length) { - runSize_ = loadedRegion_.length - offsetOfRun_; - } - } - VELOX_CHECK_LT(offsetInRun_, runSize_); -} - -} // namespace gluten diff --git a/cpp/velox/memory/GlutenDirectInputStream.h b/cpp/velox/memory/GlutenDirectInputStream.h deleted file mode 100644 index 826fc3f9a5e6..000000000000 --- a/cpp/velox/memory/GlutenDirectInputStream.h +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include "velox/common/caching/ScanTracker.h" -#include "velox/dwio/common/SeekableInputStream.h" - -namespace gluten { - -class GlutenDirectBufferedInput; - -/// An input stream over possibly coalesced loads. Created by -/// GlutenDirectBufferedInput. Similar to CacheInputStream but does not use cache. -class GlutenDirectInputStream : public facebook::velox::dwio::common::SeekableInputStream { - public: - GlutenDirectInputStream( - GlutenDirectBufferedInput* bufferedInput, - facebook::velox::io::IoStatistics* ioStats, - const facebook::velox::common::Region& region, - std::shared_ptr input, - uint64_t fileNum, - std::shared_ptr tracker, - facebook::velox::cache::TrackingId trackingId, - uint64_t groupId, - int32_t loadQuantum); - - bool Next(const void** data, int* size) override; - void BackUp(int count) override; - bool SkipInt64(int64_t count) override; - int64_t ByteCount() const override; - - void seekToPosition(facebook::velox::dwio::common::PositionProvider& position) override; - std::string getName() const override; - size_t positionSize() const override; - - /// Testing function to access loaded state. - void testingData( - facebook::velox::common::Region& loadedRegion, - facebook::velox::memory::Allocation*& data, - std::string*& tinyData) { - loadedRegion = loadedRegion_; - data = &data_; - tinyData = &tinyData_; - } - - private: - // Ensures that the current position is covered by 'data_'. - void loadPosition(); - - // Synchronously sets 'data_' to cover loadedRegion_'. - void loadSync(); - - GlutenDirectBufferedInput* const bufferedInput_; - facebook::velox::io::IoStatistics* const ioStats_; - const std::shared_ptr input_; - // The region of 'input' 'this' ranges over. - const facebook::velox::common::Region region_; - const uint64_t fileNum_; - std::shared_ptr tracker_; - const facebook::velox::cache::TrackingId trackingId_; - const uint64_t groupId_; - - // Maximum number of bytes read from 'input' at a time. - const int32_t loadQuantum_; - - // The part of 'region_' that is loaded into 'data_'/'tinyData_'. Relative to - // file start. - facebook::velox::common::Region loadedRegion_; - - // Allocation with loaded data. Has space for region.length or loadQuantum_ - // bytes, whichever is less. - facebook::velox::memory::Allocation data_; - - // Contains the data if the range is too small for Allocation. - std::string tinyData_; - - // Pointer to start of current run in 'entry->data()' or - // 'entry->tinyData()'. - uint8_t* run_{nullptr}; - - // Offset of current run from start of 'data_' - uint64_t offsetOfRun_; - - // Position of stream relative to 'run_'. - int offsetInRun_{0}; - - // Index of run in 'data_' - int runIndex_ = -1; - - // Number of valid bytes starting at 'run_' - uint32_t runSize_ = 0; - // Position relative to 'region_.offset'. - uint64_t offsetInRegion_ = 0; - - // Set to true when data is first loaded. - bool loaded_{false}; -}; - -} // namespace gluten