diff --git a/cpp/include/kvikio/bounce_buffer.hpp b/cpp/include/kvikio/bounce_buffer.hpp index 4ebc914d10..5d2428ba84 100644 --- a/cpp/include/kvikio/bounce_buffer.hpp +++ b/cpp/include/kvikio/bounce_buffer.hpp @@ -43,6 +43,9 @@ class PageAlignedAllocator { * transferred to/from GPU device memory. The allocation is only guaranteed to be aligned to "at * least 256 bytes". It is NOT guaranteed to be page aligned. * + * @note Allocations use CU_MEMHOSTALLOC_PORTABLE, making them accessible from all CUDA contexts, + * not just the one that performed the allocation. This allows the singleton BounceBufferPool to + * safely serve buffers across multiple contexts and devices. * @note Do NOT use with Direct I/O - lacks page alignment guarantee */ class CudaPinnedAllocator { @@ -71,6 +74,9 @@ class CudaPinnedAllocator { * (for efficient host-device transfers). Uses std::aligned_alloc followed by * cudaMemHostRegister to achieve both properties. * + * @note Registration uses CU_MEMHOSTALLOC_PORTABLE, making buffers accessible from all CUDA + * contexts, not just the one active during allocation. This allows the singleton BounceBufferPool + * to safely serve buffers across multiple contexts and devices. * @note This is the required allocator for Direct I/O with device memory. Requires a valid CUDA * context when allocating. */ @@ -111,6 +117,9 @@ class CudaPageAlignedPinnedAllocator { * - CudaPinnedAllocator: For device I/O without Direct I/O * - CudaPageAlignedPinnedAllocator: For device I/O with Direct I/O * + * @note The singleton pool is safe for use across multiple CUDA contexts. CUDA-pinned allocators + * use CU_MEMHOSTALLOC_PORTABLE, ensuring buffers allocated in one context can be used from any + * other context. * @note The destructor intentionally leaks allocations to avoid CUDA cleanup issues when static * destructors run after CUDA context destruction */ @@ -268,4 +277,243 @@ using CudaPinnedBounceBufferPool = BounceBufferPool; * Provides both page alignment (for Direct I/O) and CUDA registration (for efficient transfers) */ using CudaPageAlignedPinnedBounceBufferPool = BounceBufferPool; + +/** + * @brief K-way bounce buffer ring for overlapping I/O with host-device transfers. + * + * Manages a ring of k bounce buffers to enable pipelining between file I/O and CUDA memory + * transfers. By rotating through multiple buffers, the ring allows async H2D copies to proceed + * while the next buffer is being filled. + * + * Synchronization occurs automatically when the ring wraps around to prevent overwriting buffers + * with in-flight transfers. + * + * @tparam Allocator The allocator policy for bounce buffers: + * - CudaPinnedAllocator: For device I/O without Direct I/O + * - CudaPageAlignedPinnedAllocator: For device I/O with Direct I/O + * + * @note This class is NOT thread-safe. Use one ring per thread or per operation. + * @note The internal bounce buffers use CU_MEMHOSTALLOC_PORTABLE and are accessible from any CUDA + * context. However, each transfer session (from first enqueue/submit through synchronize/reset) + * must use a stream and device pointers from a single, consistent CUDA context. + * @note In batch mode, H2D copies are deferred until wrap-around or synchronize(), trading overlap + * for reduced API call overhead. + * @note H2D and D2H operations should not be interleaved within a single transfer session. Call + * reset() between direction changes to ensure correct synchronization behavior. + */ +template +class BounceBufferRing { + public: + struct BatchTransferContext { + std::vector srcs; + std::vector dsts; + std::vector sizes; + + void add_entry(void* dst, void* src, std::size_t size); + void clear(); + }; + + private: + std::vector::Buffer> _buffers; + BatchTransferContext _batch_transfer_ctx; + std::size_t _num_buffers; + std::size_t _cur_buf_idx{0}; + std::size_t _initial_buf_idx{0}; + std::size_t _cur_buffer_offset{0}; + bool _batch_copy; + + public: + /** + * @brief Construct a bounce buffer ring. + * + * @param num_buffers Number of bounce buffers (k) for k-way overlap. Must be >= 1. Higher values + * allow more overlap but consume more memory. + * @param batch_copy If true, defer H2D copies and issue them in batches. Useful for many small + * transfers where API overhead dominates. If false (default), issue H2D copies immediately for + * better overlap. + */ + explicit BounceBufferRing(std::size_t num_buffers = 1, bool batch_copy = false); + + ~BounceBufferRing() noexcept = default; + + // Non-copyable, non-movable + BounceBufferRing(BounceBufferRing const&) = delete; + BounceBufferRing& operator=(BounceBufferRing const&) = delete; + BounceBufferRing(BounceBufferRing&&) = delete; + BounceBufferRing& operator=(BounceBufferRing&&) = delete; + + /** + * @brief Get pointer to the current bounce buffer. + * + * Use this to fill the buffer directly (e.g., via pread), then call submit_h2d() to transfer the + * data to device. + * + * @return Pointer to the start of the current buffer. + */ + [[nodiscard]] void* cur_buffer() const noexcept; + + /** + * @brief Get pointer to the current bounce buffer at a specific offset. + * + * Useful for partial buffer fills or when accumulating data incrementally. + * + * @param offset Byte offset from the start of the current buffer. + * @return Pointer to cur_buffer() + offset. + */ + [[nodiscard]] void* cur_buffer(std::ptrdiff_t offset) const noexcept; + + /** + * @brief Get the size of each bounce buffer in the ring. + * + * All buffers in the ring have the same size, determined by defaults::bounce_buffer_size() at + * ring construction time. + * + * @return Size in bytes of each buffer. + */ + [[nodiscard]] std::size_t buffer_size() const noexcept; + + /** + * @brief Get the number of buffers in the ring (k for k-way overlap). + * + * @return Number of bounce buffers. + */ + [[nodiscard]] std::size_t num_buffers() const noexcept; + + /** + * @brief Get the current fill level of the active buffer. + * + * Indicates how many bytes have been accumulated in the current buffer via + * accumulate_and_submit_h2d(). Reset to 0 after each advance(). + * + * @return Number of bytes currently in the buffer. + */ + [[nodiscard]] std::size_t cur_buffer_offset() const noexcept; + + /** + * @brief Get remaining number of bytes in current buffer for accumulation. + */ + [[nodiscard]] std::size_t cur_buffer_remaining_capacity() const noexcept; + + /** + * @brief Advance to next buffer in the ring. + * + * Resets current buffer offset and moves to next buffer index. If wrapping around to the oldest + * in-flight buffer, synchronizes the stream first to ensure that buffer's transfer is complete. + * + * @param stream CUDA stream to synchronize on wrap-around. + */ + void advance(CUstream stream); + + /** + * @brief Queue async copy from current bounce buffer to device memory. + * + * In non-batch mode, issues cuMemcpyHtoDAsync immediately. In batch mode, defers the copy until + * wrap-around or synchronize(). + * + * @param device_dst Device memory destination. + * @param size Bytes to copy from cur_buffer(). + * @param stream CUDA stream for the async transfer. + * + * @note Does NOT advance to next buffer. Call submit_h2d() for copy + advance. + */ + void enqueue_h2d(void* device_dst, std::size_t size, CUstream stream); + + /** + * @brief Async copy from device memory to current bounce buffer. + * + * @param device_src Device memory source. + * @param size Bytes to copy. + * @param stream CUDA stream for the async transfer. + */ + void enqueue_d2h(void* device_src, std::size_t size, CUstream stream); + + /** + * @brief Accumulate data into bounce buffer, auto-submit when full. + * + * Copies host data into the internal buffer. When the buffer fills, issues an async H2D copy and + * advances to the next buffer. Handles data larger than buffer_size() by splitting across + * multiple buffers. + * + * Typical usage for streaming host data to device: + * @code + * while (has_more_data()) { + * auto submitted = ring.accumulate_and_submit_h2d(device_ptr, host_data, chunk_size, stream); + * device_ptr += submitted; + * } + * auto flushed = ring.flush_h2d(device_ptr, stream); + * device_ptr += flushed; + * ring.synchronize(stream); + * @endcode + * + * @param device_dst Device memory destination (should track cumulative offset externally). + * @param host_src Source data in host memory. + * @param size Bytes to copy. + * @param stream CUDA stream for async H2D transfers. + * @return Number of bytes submitted to device (always a multiple of buffer_size(); partial buffer + * contents remain until flush_h2d()). + * + * @note Partial buffer contents remain until flush_h2d() is called. + * @note Final data visibility requires flush_h2d() + synchronize(). + */ + std::size_t accumulate_and_submit_h2d(void* device_dst, + void const* host_src, + std::size_t size, + CUstream stream); + + /** + * @brief Submit current buffer contents to device and advance to next buffer. + * + * Typical usage pattern for direct-fill (e.g., pread into buffer): + * @code + * ssize_t n = pread(fd, ring.cur_buffer(), ring.buffer_size(), offset); + * ring.submit_h2d(device_ptr, n, stream); + * device_ptr += n; + * @endcode + * + * @param device_dst Device memory destination. + * @param size Bytes actually written to cur_buffer(). + * @param stream CUDA stream for async H2D transfer. + * + * @note Synchronization may occur if this causes a wrap-around. + * @note Final data visibility requires calling synchronize() after all submits. + */ + void submit_h2d(void* device_dst, std::size_t size, CUstream stream); + + /** + * @brief Flush any partially accumulated data to device. + * + * Call after accumulate_and_submit_h2d() to submit remaining data that didn't fill a complete + * buffer. + * + * @param device_dst Device memory destination for the partial buffer. + * @param stream CUDA stream for async H2D transfer. + * @return Number of bytes flushed (0 if buffer was empty). + * + * @note Still requires synchronize() for data visibility guarantee. + */ + std::size_t flush_h2d(void* device_dst, CUstream stream); + + /** + * @brief Ensure all queued H2D transfers are complete. + * + * In batch mode, issues any pending batch copies first. Then synchronizes the stream to guarantee + * all data is visible in device memory. + * + * @param stream CUDA stream to synchronize. + * + * @note Must be called before reading transferred data on device. + * @note After synchronize(), the ring can be reused for new transfers. + */ + void synchronize(CUstream stream); + + /** + * @brief Synchronize pending transfers and reset ring state for a new transfer session. + * + * Ensures all in-flight transfers complete, then resets the ring to its initial state. After + * reset(), the ring can be safely reused for either H2D or D2H operations. + * + * @param stream CUDA stream to synchronize. + */ + void reset(CUstream stream); +}; } // namespace kvikio diff --git a/cpp/include/kvikio/remote_handle.hpp b/cpp/include/kvikio/remote_handle.hpp index 0b0808c45e..31d0353c9a 100644 --- a/cpp/include/kvikio/remote_handle.hpp +++ b/cpp/include/kvikio/remote_handle.hpp @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION. + * SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION. * SPDX-License-Identifier: Apache-2.0 */ #pragma once @@ -11,6 +11,7 @@ #include #include +#include #include #include #include @@ -441,7 +442,10 @@ class RemoteHandle { * @param file_offset File offset in bytes. * @return Number of bytes read, which is always `size`. */ - std::size_t read(void* buf, std::size_t size, std::size_t file_offset = 0); + std::size_t read(void* buf, + std::size_t size, + std::size_t file_offset = 0, + BounceBufferRing* bounce_buffer_ring = nullptr); /** * @brief Read from remote source into buffer (host or device memory) in parallel. diff --git a/cpp/src/bounce_buffer.cpp b/cpp/src/bounce_buffer.cpp index 629c26550e..e0fa465ae7 100644 --- a/cpp/src/bounce_buffer.cpp +++ b/cpp/src/bounce_buffer.cpp @@ -32,14 +32,14 @@ void* CudaPinnedAllocator::allocate(std::size_t size) { void* buffer{}; - // If no available allocation, allocate and register a new one - // Allocate page-locked host memory - // Under unified addressing, host memory allocated this way is automatically portable and - // mapped. + // Allocate page-locked (pinned) host memory with CU_MEMHOSTALLOC_PORTABLE. The PORTABLE flag + // ensures this memory is accessible from all CUDA contexts, which is essential for the singleton + // BounceBufferPool that may serve multiple contexts and devices. CUDA_DRIVER_TRY(cudaAPI::instance().MemHostAlloc(&buffer, size, CU_MEMHOSTALLOC_PORTABLE)); return buffer; } + void CudaPinnedAllocator::deallocate(void* buffer, std::size_t /*size*/) { CUDA_DRIVER_TRY(cudaAPI::instance().MemFreeHost(buffer)); @@ -52,6 +52,9 @@ void* CudaPageAlignedPinnedAllocator::allocate(std::size_t size) auto const aligned_size = detail::align_up(size, page_size); buffer = std::aligned_alloc(page_size, aligned_size); KVIKIO_EXPECT(buffer != nullptr, "Aligned allocation failed"); + // Register the page-aligned allocation as pinned memory with CU_MEMHOSTALLOC_PORTABLE. The + // PORTABLE flag ensures this memory is accessible from all CUDA contexts, which is essential for + // the singleton BounceBufferPool that may serve multiple contexts and devices. CUDA_DRIVER_TRY( cudaAPI::instance().MemHostRegister(buffer, aligned_size, CU_MEMHOSTALLOC_PORTABLE)); return buffer; @@ -109,15 +112,13 @@ BounceBufferPool::Buffer& BounceBufferPool::Buffer::operat template void* BounceBufferPool::Buffer::get() const noexcept { - KVIKIO_NVTX_FUNC_RANGE(); return _buffer; } template void* BounceBufferPool::Buffer::get(std::ptrdiff_t offset) const noexcept { - KVIKIO_NVTX_FUNC_RANGE(); - return static_cast(_buffer) + offset; + return static_cast(_buffer) + offset; } template @@ -217,4 +218,243 @@ BounceBufferPool& BounceBufferPool::instance() template class BounceBufferPool; template class BounceBufferPool; template class BounceBufferPool; + +namespace { +/** + * @brief Issue individual async H2D copies for each transfer. + * + * Fallback when cuMemcpyBatchAsync is unavailable. + */ +void unbatched_copy(std::span dsts, + std::span srcs, + std::span sizes, + CUstream stream) +{ + for (std::size_t i = 0; i < srcs.size(); ++i) { + CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyHtoDAsync( + convert_void2deviceptr(dsts[i]), reinterpret_cast(srcs[i]), sizes[i], stream)); + } +} + +/** + * @brief Issue H2D copies using batch API if available, otherwise fall back to individual copies. + * + * Uses cuMemcpyBatchAsync (CUDA 12.8+) to reduce API overhead for multiple small transfers. + */ +void batch_copy(std::span dsts, + std::span srcs, + std::span sizes, + CUstream stream) +{ + if (srcs.size() == 0) return; + +#if CUDA_VERSION >= 12080 + if (cudaAPI::instance().MemcpyBatchAsync) { + CUmemcpyAttributes attrs{}; + std::size_t attrs_idxs[] = {0}; + attrs.srcAccessOrder = CUmemcpySrcAccessOrder_enum::CU_MEMCPY_SRC_ACCESS_ORDER_STREAM; + CUDA_DRIVER_TRY( + cudaAPI::instance().MemcpyBatchAsync(dsts.data(), + srcs.data(), + sizes.data(), + srcs.size(), + &attrs, + attrs_idxs, + static_cast(1) /* num_attrs */, +#if CUDA_VERSION < 13000 + static_cast(nullptr), +#endif + stream)); + } else { + unbatched_copy(dsts, srcs, sizes, stream); + } +#else + unbatched_copy(dsts, srcs, sizes, stream); +#endif +} +} // namespace + +template +void BounceBufferRing::BatchTransferContext::add_entry(void* dst, + void* src, + std::size_t size) +{ + srcs.push_back(src); + dsts.push_back(dst); + sizes.push_back(size); +} + +template +void BounceBufferRing::BatchTransferContext::clear() +{ + srcs.clear(); + dsts.clear(); + sizes.clear(); +} + +template +BounceBufferRing::BounceBufferRing(std::size_t num_buffers, bool batch_copy) + : _num_buffers{num_buffers}, _batch_copy{batch_copy} +{ + KVIKIO_NVTX_FUNC_RANGE(); + KVIKIO_EXPECT(num_buffers >= 1, "BounceBufferRing requires at least 1 buffer"); + + _buffers.reserve(_num_buffers); + for (std::size_t i = 0; i < _num_buffers; ++i) { + _buffers.emplace_back(BounceBufferPool::instance().get()); + } +} + +template +void BounceBufferRing::enqueue_h2d(void* device_dst, std::size_t size, CUstream stream) +{ + KVIKIO_NVTX_FUNC_RANGE(); + if (_batch_copy) { + _batch_transfer_ctx.add_entry(device_dst, cur_buffer(), size); + } else { + CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyHtoDAsync( + convert_void2deviceptr(device_dst), cur_buffer(), size, stream)); + } +} + +template +void BounceBufferRing::enqueue_d2h(void* device_src, std::size_t size, CUstream stream) +{ + KVIKIO_NVTX_FUNC_RANGE(); + if (_batch_copy) { + _batch_transfer_ctx.add_entry(cur_buffer(), device_src, size); + } else { + CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyDtoHAsync( + cur_buffer(), convert_void2deviceptr(device_src), size, stream)); + } +} + +template +void BounceBufferRing::advance(CUstream stream) +{ + KVIKIO_NVTX_FUNC_RANGE(); + _cur_buffer_offset = 0; + ++_cur_buf_idx; + if (_cur_buf_idx >= _num_buffers) { _cur_buf_idx = 0; } + if (_cur_buf_idx == _initial_buf_idx) { + if (_batch_copy) { + batch_copy( + _batch_transfer_ctx.dsts, _batch_transfer_ctx.srcs, _batch_transfer_ctx.sizes, stream); + _batch_transfer_ctx.clear(); + } + CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream)); + _initial_buf_idx = _cur_buf_idx; + } +} + +template +void* BounceBufferRing::cur_buffer() const noexcept +{ + return _buffers[_cur_buf_idx].get(); +} + +template +void* BounceBufferRing::cur_buffer(std::ptrdiff_t off) const noexcept +{ + return _buffers[_cur_buf_idx].get(off); +} + +template +std::size_t BounceBufferRing::buffer_size() const noexcept +{ + return _buffers[_cur_buf_idx].size(); +} + +template +std::size_t BounceBufferRing::num_buffers() const noexcept +{ + return _num_buffers; +} + +template +std::size_t BounceBufferRing::cur_buffer_offset() const noexcept +{ + return _cur_buffer_offset; +} + +template +std::size_t BounceBufferRing::cur_buffer_remaining_capacity() const noexcept +{ + return buffer_size() - _cur_buffer_offset; +} + +template +std::size_t BounceBufferRing::accumulate_and_submit_h2d(void* device_dst, + void const* host_src, + std::size_t size, + CUstream stream) +{ + KVIKIO_NVTX_FUNC_RANGE(); + auto const* host_src_ptr = static_cast(host_src); + auto* device_dst_ptr = static_cast(device_dst); + + // The data is split across multiple buffers if its size is greater than buffer_size() + while (size > 0) { + auto const remaining_bytes = cur_buffer_remaining_capacity(); + auto const num_bytes_to_copy = std::min(size, remaining_bytes); + + // Copy from host to bounce buffer + std::memcpy(cur_buffer(_cur_buffer_offset), host_src_ptr, num_bytes_to_copy); + _cur_buffer_offset += num_bytes_to_copy; + host_src_ptr += num_bytes_to_copy; + size -= num_bytes_to_copy; + + if (_cur_buffer_offset >= buffer_size()) { + submit_h2d(device_dst_ptr, buffer_size(), stream); + device_dst_ptr += buffer_size(); + } + } + + return static_cast(device_dst_ptr - static_cast(device_dst)); +} + +template +void BounceBufferRing::submit_h2d(void* device_dst, std::size_t size, CUstream stream) +{ + KVIKIO_NVTX_FUNC_RANGE(); + enqueue_h2d(device_dst, size, stream); + advance(stream); +} + +template +std::size_t BounceBufferRing::flush_h2d(void* device_dst, CUstream stream) +{ + KVIKIO_NVTX_FUNC_RANGE(); + if (_cur_buffer_offset == 0) { return 0; } + auto const flushed = _cur_buffer_offset; + enqueue_h2d(device_dst, _cur_buffer_offset, stream); + advance(stream); + return flushed; +} + +template +void BounceBufferRing::synchronize(CUstream stream) +{ + KVIKIO_NVTX_FUNC_RANGE(); + if (_batch_copy) { + batch_copy( + _batch_transfer_ctx.dsts, _batch_transfer_ctx.srcs, _batch_transfer_ctx.sizes, stream); + _batch_transfer_ctx.clear(); + } + CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream)); +} + +template +void BounceBufferRing::reset(CUstream stream) +{ + synchronize(stream); + _cur_buf_idx = 0; + _initial_buf_idx = 0; + _cur_buffer_offset = 0; +} + +// Explicit instantiations +template class BounceBufferRing; +template class BounceBufferRing; + } // namespace kvikio diff --git a/cpp/src/remote_handle.cpp b/cpp/src/remote_handle.cpp index 7c917c9a0b..d2c5470c17 100644 --- a/cpp/src/remote_handle.cpp +++ b/cpp/src/remote_handle.cpp @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION. + * SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION. * SPDX-License-Identifier: Apache-2.0 */ @@ -702,11 +702,19 @@ struct CallbackContext { std::size_t size; // Total number of bytes to read. std::ptrdiff_t offset; // Offset into `buf` to start reading. bool overflow_error; // Flag to indicate overflow. - CallbackContext(void* buf, std::size_t size) - : buf{static_cast(buf)}, size{size}, offset{0}, overflow_error{0} + BounceBufferRing* bounce_buffer_ring{ + nullptr}; // Only used by callback_device_memory + + CallbackContext(void* a_buf, + std::size_t a_size, + BounceBufferRing* a_bounce_buffer_ring) + : buf{static_cast(a_buf)}, + size{a_size}, + offset{0}, + overflow_error{false}, + bounce_buffer_ring{a_bounce_buffer_ring} { } - BounceBufferH2D* bounce_buffer{nullptr}; // Only used by callback_device_memory }; /** @@ -755,13 +763,17 @@ std::size_t callback_device_memory(char* data, std::size_t size, std::size_t nme } KVIKIO_NVTX_FUNC_RANGE(nbytes); - ctx->bounce_buffer->write(data, nbytes); + ctx->bounce_buffer_ring->accumulate_and_submit_h2d( + ctx->buf, data, nbytes, detail::StreamsByThread::get()); ctx->offset += nbytes; return nbytes; } } // namespace -std::size_t RemoteHandle::read(void* buf, std::size_t size, std::size_t file_offset) +std::size_t RemoteHandle::read(void* buf, + std::size_t size, + std::size_t file_offset, + BounceBufferRing* bounce_buffer_ring) { KVIKIO_NVTX_FUNC_RANGE(size); @@ -781,7 +793,7 @@ std::size_t RemoteHandle::read(void* buf, std::size_t size, std::size_t file_off } else { curl.setopt(CURLOPT_WRITEFUNCTION, callback_device_memory); } - CallbackContext ctx{buf, size}; + CallbackContext ctx{buf, size, bounce_buffer_ring}; curl.setopt(CURLOPT_WRITEDATA, &ctx); try { @@ -791,8 +803,11 @@ std::size_t RemoteHandle::read(void* buf, std::size_t size, std::size_t file_off PushAndPopContext c(get_context_from_pointer(buf)); // We use a bounce buffer to avoid many small memory copies to device. Libcurl has a // maximum chunk size of 16kb (`CURL_MAX_WRITE_SIZE`) but chunks are often much smaller. - BounceBufferH2D bounce_buffer(detail::StreamsByThread::get(), buf); - ctx.bounce_buffer = &bounce_buffer; + std::unique_ptr> a_bounce_buffer_ring; + if (bounce_buffer_ring == nullptr) { + a_bounce_buffer_ring = std::make_unique>(); + ctx.bounce_buffer_ring = a_bounce_buffer_ring.get(); + } curl.perform(); } } catch (std::runtime_error const& e) { @@ -815,11 +830,14 @@ std::future RemoteHandle::pread(void* buf, KVIKIO_EXPECT(thread_pool != nullptr, "The thread pool must not be nullptr"); auto& [nvtx_color, call_idx] = detail::get_next_color_and_call_idx(); KVIKIO_NVTX_FUNC_RANGE(size); - auto task = [this](void* devPtr_base, - std::size_t size, - std::size_t file_offset, - std::size_t devPtr_offset) -> std::size_t { - return read(static_cast(devPtr_base) + devPtr_offset, size, file_offset); + auto bounce_buffer_ring = std::make_shared>(); + auto task = [this, bounce_buffer_ring = bounce_buffer_ring]( + void* devPtr_base, + std::size_t size, + std::size_t file_offset, + std::size_t devPtr_offset) -> std::size_t { + return read( + static_cast(devPtr_base) + devPtr_offset, size, file_offset, bounce_buffer_ring.get()); }; return parallel_io(task, buf, size, file_offset, task_size, 0, thread_pool, call_idx, nvtx_color); }