From ebadb51e32d79619ecce5668654b777bb66c870d Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Thu, 29 Jan 2026 22:37:37 -0500 Subject: [PATCH 01/10] Unify bounce buffer management --- cpp/include/kvikio/bounce_buffer.hpp | 102 +++++++++++++++++++ cpp/src/bounce_buffer.cpp | 140 +++++++++++++++++++++++++++ 2 files changed, 242 insertions(+) diff --git a/cpp/include/kvikio/bounce_buffer.hpp b/cpp/include/kvikio/bounce_buffer.hpp index 4ebc914d10..527a50a5a9 100644 --- a/cpp/include/kvikio/bounce_buffer.hpp +++ b/cpp/include/kvikio/bounce_buffer.hpp @@ -268,4 +268,106 @@ using CudaPinnedBounceBufferPool = BounceBufferPool; * Provides both page alignment (for Direct I/O) and CUDA registration (for efficient transfers) */ using CudaPageAlignedPinnedBounceBufferPool = BounceBufferPool; + +/** + * @brief K-way bounce buffer ring for overlapping I/O with host-device transfers. + * + * @tparam Allocator The allocator policy for bounce buffers. + */ +template +class BounceBufferRing { + private: + std::vector::Buffer> _buffers; + std::size_t _num_buffers; + std::size_t _cur_buf_idx{0}; + std::size_t _initial_buf_idx{0}; + std::size_t _cur_buf_offset{0}; + + /** + * @brief Async copy from current bounce buffer to device memory. + * + * @param device_dst Device memory destination. + * @param size Bytes to copy. + * @param stream CUDA stream for the async transfer. + */ + void copy_to_device(void* device_dst, std::size_t size, CUstream stream); + + /** + * @brief Async copy from device memory to current bounce buffer. + * + * @param device_src Device memory source. + * @param size Bytes to copy. + * @param stream CUDA stream for the async transfer. + */ + void copy_from_device(void const* device_src, std::size_t size, CUstream stream); + + /** + * @brief Advance to next buffer, sync stream if wrapping around. + * + * @param stream CUDA stream to synchronize on wrap-around. + */ + void advance(CUstream stream); + + /** + * @brief Get remaining number of bytes in current buffer for accumulation. + */ + [[nodiscard]] std::size_t remaining_bytes() const noexcept; + + public: + /** + * @brief Construct a bounce buffer ring. + * + * @param num_buffers Number of bounce buffers for k-way overlap (must be >= 1). + */ + explicit BounceBufferRing(std::size_t num_buffers = 1); + + ~BounceBufferRing() noexcept = default; + + // Non-copyable, non-movable + BounceBufferRing(BounceBufferRing const&) = delete; + BounceBufferRing& operator=(BounceBufferRing const&) = delete; + BounceBufferRing(BounceBufferRing&&) = delete; + BounceBufferRing& operator=(BounceBufferRing&&) = delete; + + // Accessors + [[nodiscard]] void* buffer() const noexcept; + [[nodiscard]] void* buffer(std::ptrdiff_t offset) const noexcept; + [[nodiscard]] std::size_t buffer_size() const noexcept; + [[nodiscard]] std::size_t num_buffers() const noexcept; + [[nodiscard]] std::size_t offset() const noexcept; + + /** + * @brief Accumulate data into bounce buffer, auto-flush when full. + * + * Copies src to internal buffer. When buffer fills, issues async H2D copy to device_dst and + * advances to next buffer. + * + * @param device_dst Current device destination pointer. + * @param host_src Source data in host memory. + * @param size Bytes to write. + * @param stream CUDA stream for async H2D transfers. + */ + void accumulate_and_submit_h2d(void* device_dst, + void const* host_src, + std::size_t size, + CUstream stream); + + void submit_h2d(void* device_dst, std::size_t size, CUstream stream); + + /** + * @brief Flush any accumulated data to device. + * + * @param device_dst Device memory destination. + * @param stream CUDA stream for async H2D transfer. + * @return Bytes flushed. + */ + std::size_t flush_h2d(void* device_dst, CUstream stream); + + /** + * @brief Synchronize the given CUDA stream. + * + * @param stream CUDA stream to synchronize. + */ + static void synchronize(CUstream stream); +}; } // namespace kvikio diff --git a/cpp/src/bounce_buffer.cpp b/cpp/src/bounce_buffer.cpp index 62b08a6883..bef35756a4 100644 --- a/cpp/src/bounce_buffer.cpp +++ b/cpp/src/bounce_buffer.cpp @@ -216,4 +216,144 @@ BounceBufferPool& BounceBufferPool::instance() template class BounceBufferPool; template class BounceBufferPool; template class BounceBufferPool; + +template +BounceBufferRing::BounceBufferRing(std::size_t num_buffers) : _num_buffers{num_buffers} +{ + KVIKIO_NVTX_FUNC_RANGE(); + KVIKIO_EXPECT(num_buffers >= 1, "BounceBufferRing requires at least 1 buffer"); + + _buffers.reserve(_num_buffers); + for (std::size_t i = 0; i < _num_buffers; ++i) { + _buffers.emplace_back(BounceBufferPool::instance().get()); + } +} + +template +void BounceBufferRing::copy_to_device(void* device_dst, + std::size_t size, + CUstream stream) +{ + KVIKIO_NVTX_FUNC_RANGE(); + CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyHtoDAsync( + convert_void2deviceptr(device_dst), buffer(), size, stream)); +} + +template +void BounceBufferRing::copy_from_device(void const* device_src, + std::size_t size, + CUstream stream) +{ + KVIKIO_NVTX_FUNC_RANGE(); + CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyDtoHAsync( + buffer(), convert_void2deviceptr(device_src), size, stream)); +} + +template +void BounceBufferRing::advance(CUstream stream) +{ + KVIKIO_NVTX_FUNC_RANGE(); + _cur_buf_offset = 0; + ++_cur_buf_idx; + if (_cur_buf_idx >= _num_buffers) { _cur_buf_idx = 0; } + if (_cur_buf_idx == _initial_buf_idx) { + CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream)); + _initial_buf_idx = _cur_buf_idx; + } +} + +template +void* BounceBufferRing::buffer() const noexcept +{ + return _buffers[_cur_buf_idx].get(); +} + +template +void* BounceBufferRing::buffer(std::ptrdiff_t off) const noexcept +{ + return _buffers[_cur_buf_idx].get(off); +} + +template +std::size_t BounceBufferRing::buffer_size() const noexcept +{ + return _buffers[_cur_buf_idx].size(); +} + +template +std::size_t BounceBufferRing::num_buffers() const noexcept +{ + return _num_buffers; +} + +template +std::size_t BounceBufferRing::offset() const noexcept +{ + return _cur_buf_offset; +} + +template +std::size_t BounceBufferRing::remaining_bytes() const noexcept +{ + return buffer_size() - _cur_buf_offset; +} + +template +void BounceBufferRing::accumulate_and_submit_h2d(void* device_dst, + void const* host_src, + std::size_t size, + CUstream stream) +{ + KVIKIO_NVTX_FUNC_RANGE(); + auto const* host_str_ptr = static_cast(host_src); + auto* device_dst_ptr = static_cast(device_dst); + + while (size > 0) { + auto const cur_remaining_bytes = remaining_bytes(); + auto const num_bytes_to_copy = std::min(size, cur_remaining_bytes); + + // Copy from host to bounce buffer + std::memcpy(buffer(_cur_buf_offset), host_str_ptr, num_bytes_to_copy); + _cur_buf_offset += num_bytes_to_copy; + host_str_ptr += num_bytes_to_copy; + size -= num_bytes_to_copy; + + if (_cur_buf_offset >= buffer_size()) { + copy_to_device(device_dst_ptr, _cur_buf_offset, stream); + device_dst_ptr += _cur_buf_offset; + advance(stream); + } + } +} + +template +void BounceBufferRing::submit_h2d(void* device_dst, std::size_t size, CUstream stream) +{ + KVIKIO_NVTX_FUNC_RANGE(); + copy_to_device(device_dst, size, stream); + advance(stream); +} + +template +std::size_t BounceBufferRing::flush_h2d(void* device_dst, CUstream stream) +{ + KVIKIO_NVTX_FUNC_RANGE(); + if (_cur_buf_offset == 0) { return 0; } + auto const flushed = _cur_buf_offset; + copy_to_device(device_dst, _cur_buf_offset, stream); + advance(stream); + return flushed; +} + +template +void BounceBufferRing::synchronize(CUstream stream) +{ + KVIKIO_NVTX_FUNC_RANGE(); + CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream)); +} + +// Explicit instantiations +template class BounceBufferRing; +template class BounceBufferRing; + } // namespace kvikio From 9ba980ecd8a06fc9ca9caffcc6df7e3f679faad9 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Fri, 30 Jan 2026 11:23:20 -0500 Subject: [PATCH 02/10] Improve implementation and function naming --- cpp/include/kvikio/bounce_buffer.hpp | 205 +++++++++++++++++++++------ cpp/src/bounce_buffer.cpp | 142 ++++++++++++++----- 2 files changed, 272 insertions(+), 75 deletions(-) diff --git a/cpp/include/kvikio/bounce_buffer.hpp b/cpp/include/kvikio/bounce_buffer.hpp index 527a50a5a9..a5fcd97275 100644 --- a/cpp/include/kvikio/bounce_buffer.hpp +++ b/cpp/include/kvikio/bounce_buffer.hpp @@ -272,102 +272,221 @@ using CudaPageAlignedPinnedBounceBufferPool = BounceBufferPool class BounceBufferRing { + public: + struct BatchTransferContext { + std::vector srcs; + std::vector dsts; + std::vector sizes; + + void add_entry(void* dst, void* src, std::size_t size); + void clear(); + }; + private: std::vector::Buffer> _buffers; + BatchTransferContext _batch_transfer_ctx; std::size_t _num_buffers; std::size_t _cur_buf_idx{0}; std::size_t _initial_buf_idx{0}; - std::size_t _cur_buf_offset{0}; + std::size_t _cur_buffer_offset{0}; + bool _batch_copy; + public: /** - * @brief Async copy from current bounce buffer to device memory. + * @brief Construct a bounce buffer ring. * - * @param device_dst Device memory destination. - * @param size Bytes to copy. - * @param stream CUDA stream for the async transfer. + * @param num_buffers Number of bounce buffers (k) for k-way overlap. Must be >= 1. Higher values + * allow more overlap but consume more memory. + * @param batch_copy If true, defer H2D copies and issue them in batches. Useful for many small + * transfers where API overhead dominates. If false (default), issue H2D copies immediately for + * better overlap. */ - void copy_to_device(void* device_dst, std::size_t size, CUstream stream); + explicit BounceBufferRing(std::size_t num_buffers = 1, bool batch_copy = false); + + ~BounceBufferRing() noexcept = default; + + // Non-copyable, non-movable + BounceBufferRing(BounceBufferRing const&) = delete; + BounceBufferRing& operator=(BounceBufferRing const&) = delete; + BounceBufferRing(BounceBufferRing&&) = delete; + BounceBufferRing& operator=(BounceBufferRing&&) = delete; /** - * @brief Async copy from device memory to current bounce buffer. + * @brief Get pointer to the current bounce buffer. * - * @param device_src Device memory source. - * @param size Bytes to copy. - * @param stream CUDA stream for the async transfer. + * Use this to fill the buffer directly (e.g., via pread), then call submit_h2d() to transfer the + * data to device. + * + * @return Pointer to the start of the current buffer. */ - void copy_from_device(void const* device_src, std::size_t size, CUstream stream); + [[nodiscard]] void* cur_buffer() const noexcept; /** - * @brief Advance to next buffer, sync stream if wrapping around. + * @brief Get pointer to the current bounce buffer at a specific offset. * - * @param stream CUDA stream to synchronize on wrap-around. + * Useful for partial buffer fills or when accumulating data incrementally. + * + * @param offset Byte offset from the start of the current buffer. + * @return Pointer to cur_buffer() + offset. */ - void advance(CUstream stream); + [[nodiscard]] void* cur_buffer(std::ptrdiff_t offset) const noexcept; /** - * @brief Get remaining number of bytes in current buffer for accumulation. + * @brief Get the size of each bounce buffer in the ring. + * + * All buffers in the ring have the same size, determined by defaults::bounce_buffer_size() at + * ring construction time. + * + * @return Size in bytes of each buffer. */ - [[nodiscard]] std::size_t remaining_bytes() const noexcept; + [[nodiscard]] std::size_t buffer_size() const noexcept; - public: /** - * @brief Construct a bounce buffer ring. + * @brief Get the number of buffers in the ring (k for k-way overlap). * - * @param num_buffers Number of bounce buffers for k-way overlap (must be >= 1). + * @return Number of bounce buffers. */ - explicit BounceBufferRing(std::size_t num_buffers = 1); + [[nodiscard]] std::size_t num_buffers() const noexcept; - ~BounceBufferRing() noexcept = default; + /** + * @brief Get the current fill level of the active buffer. + * + * Indicates how many bytes have been accumulated in the current buffer via + * accumulate_and_submit_h2d(). Reset to 0 after each advance(). + * + * @return Number of bytes currently in the buffer. + */ + [[nodiscard]] std::size_t cur_buffer_offset() const noexcept; - // Non-copyable, non-movable - BounceBufferRing(BounceBufferRing const&) = delete; - BounceBufferRing& operator=(BounceBufferRing const&) = delete; - BounceBufferRing(BounceBufferRing&&) = delete; - BounceBufferRing& operator=(BounceBufferRing&&) = delete; + /** + * @brief Get remaining number of bytes in current buffer for accumulation. + */ + [[nodiscard]] std::size_t cur_buffer_remaining_capacity() const noexcept; - // Accessors - [[nodiscard]] void* buffer() const noexcept; - [[nodiscard]] void* buffer(std::ptrdiff_t offset) const noexcept; - [[nodiscard]] std::size_t buffer_size() const noexcept; - [[nodiscard]] std::size_t num_buffers() const noexcept; - [[nodiscard]] std::size_t offset() const noexcept; + /** + * @brief Advance to next buffer in the ring. + * + * Resets current buffer offset and moves to next buffer index. If wrapping around to the oldest + * in-flight buffer, synchronizes the stream first to ensure that buffer's transfer is complete. + * + * @param stream CUDA stream to synchronize on wrap-around. + */ + void advance(CUstream stream); /** - * @brief Accumulate data into bounce buffer, auto-flush when full. + * @brief Queue async copy from current bounce buffer to device memory. + * + * In non-batch mode, issues cuMemcpyHtoDAsync immediately. In batch mode, defers the copy until + * wrap-around or synchronize(). + * + * @param device_dst Device memory destination. + * @param size Bytes to copy from cur_buffer(). + * @param stream CUDA stream for the async transfer. * - * Copies src to internal buffer. When buffer fills, issues async H2D copy to device_dst and - * advances to next buffer. + * @note Does NOT advance to next buffer. Call submit_h2d() for copy + advance. + */ + void enqueue_h2d(void* device_dst, std::size_t size, CUstream stream); + + /** + * @brief Async copy from device memory to current bounce buffer. * - * @param device_dst Current device destination pointer. + * @param device_src Device memory source. + * @param size Bytes to copy. + * @param stream CUDA stream for the async transfer. + */ + void enqueue_d2h(void* device_src, std::size_t size, CUstream stream); + + /** + * @brief Accumulate data into bounce buffer, auto-submit when full. + * + * Copies host data into the internal buffer. When the buffer fills, issues an async H2D copy and + * advances to the next buffer. Handles data larger than buffer_size() by splitting across + * multiple buffers. + * + * Typical usage for streaming host data to device: + * @code + * while (has_more_data()) { + * ring.accumulate_and_submit_h2d(device_ptr, host_data, chunk_size, stream); + * device_ptr += chunk_size; // Note: only advance by submitted amount + * } + * ring.flush_h2d(device_ptr, stream); + * ring.synchronize(stream); + * @endcode + * + * @param device_dst Device memory destination (should track cumulative offset externally). * @param host_src Source data in host memory. - * @param size Bytes to write. + * @param size Bytes to copy. * @param stream CUDA stream for async H2D transfers. + * + * @note Partial buffer contents remain until flush_h2d() is called. + * @note Final data visibility requires flush_h2d() + synchronize(). */ void accumulate_and_submit_h2d(void* device_dst, void const* host_src, std::size_t size, CUstream stream); + /** + * @brief Submit current buffer contents to device and advance to next buffer. + * + * Typical usage pattern for direct-fill (e.g., pread into buffer): + * @code + * ssize_t n = pread(fd, ring.cur_buffer(), ring.buffer_size(), offset); + * ring.submit_h2d(device_ptr, n, stream); + * device_ptr += n; + * @endcode + * + * @param device_dst Device memory destination. + * @param size Bytes actually written to cur_buffer(). + * @param stream CUDA stream for async H2D transfer. + * + * @note Synchronization may occur if this causes a wrap-around. + * @note Final data visibility requires calling synchronize() after all submits. + */ void submit_h2d(void* device_dst, std::size_t size, CUstream stream); /** - * @brief Flush any accumulated data to device. + * @brief Flush any partially accumulated data to device. * - * @param device_dst Device memory destination. + * Call after accumulate_and_submit_h2d() to submit remaining data that didn't fill a complete + * buffer. + * + * @param device_dst Device memory destination for the partial buffer. * @param stream CUDA stream for async H2D transfer. - * @return Bytes flushed. + * @return Number of bytes flushed (0 if buffer was empty). + * + * @note Still requires synchronize() for data visibility guarantee. */ std::size_t flush_h2d(void* device_dst, CUstream stream); /** - * @brief Synchronize the given CUDA stream. + * @brief Ensure all queued H2D transfers are complete. + * + * In batch mode, issues any pending batch copies first. Then synchronizes the stream to guarantee + * all data is visible in device memory. * * @param stream CUDA stream to synchronize. + * + * @note Must be called before reading transferred data on device. + * @note After synchronize(), the ring can be reused for new transfers. */ - static void synchronize(CUstream stream); + void synchronize(CUstream stream); }; } // namespace kvikio diff --git a/cpp/src/bounce_buffer.cpp b/cpp/src/bounce_buffer.cpp index bef35756a4..0ecf9961f7 100644 --- a/cpp/src/bounce_buffer.cpp +++ b/cpp/src/bounce_buffer.cpp @@ -217,8 +217,73 @@ template class BounceBufferPool; template class BounceBufferPool; template class BounceBufferPool; +namespace { +void separate_copy(std::span dsts, + std::span srcs, + std::span sizes, + CUstream stream) +{ + // Fall back to the conventional H2D copy if the batch copy API is not available. + for (std::size_t i = 0; i < srcs.size(); ++i) { + CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyHtoDAsync( + convert_void2deviceptr(dsts[i]), reinterpret_cast(srcs[i]), sizes[i], stream)); + } +} + +void batch_copy(std::span dsts, + std::span srcs, + std::span sizes, + CUstream stream) +{ + if (srcs.size() == 0) return; + +#if CUDA_VERSION >= 12080 + if (cudaAPI::instance().MemcpyBatchAsync) { + CUmemcpyAttributes attrs{}; + std::size_t attrs_idxs[] = {0}; + attrs.srcAccessOrder = CUmemcpySrcAccessOrder_enum::CU_MEMCPY_SRC_ACCESS_ORDER_STREAM; + CUDA_DRIVER_TRY( + cudaAPI::instance().MemcpyBatchAsync(dsts.data(), + srcs.data(), + sizes.data(), + srcs.size(), + &attrs, + attrs_idxs, + static_cast(1) /* num_attrs */, +#if CUDA_VERSION < 13000 + static_cast(nullptr), +#endif + stream)); + } else { + separate_copy(dsts, srcs, sizes, stream); + } +#else + separate_copy(dsts, srcs, sizes, stream); +#endif +} +} // namespace + +template +void BounceBufferRing::BatchTransferContext::add_entry(void* dst, + void* src, + std::size_t size) +{ + srcs.push_back(src); + dsts.push_back(dst); + sizes.push_back(size); +} + template -BounceBufferRing::BounceBufferRing(std::size_t num_buffers) : _num_buffers{num_buffers} +void BounceBufferRing::BatchTransferContext::clear() +{ + srcs.clear(); + dsts.clear(); + sizes.clear(); +} + +template +BounceBufferRing::BounceBufferRing(std::size_t num_buffers, bool batch_copy) + : _num_buffers{num_buffers}, _batch_copy{batch_copy} { KVIKIO_NVTX_FUNC_RANGE(); KVIKIO_EXPECT(num_buffers >= 1, "BounceBufferRing requires at least 1 buffer"); @@ -230,46 +295,55 @@ BounceBufferRing::BounceBufferRing(std::size_t num_buffers) : _num_bu } template -void BounceBufferRing::copy_to_device(void* device_dst, - std::size_t size, - CUstream stream) +void BounceBufferRing::enqueue_h2d(void* device_dst, std::size_t size, CUstream stream) { KVIKIO_NVTX_FUNC_RANGE(); - CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyHtoDAsync( - convert_void2deviceptr(device_dst), buffer(), size, stream)); + if (_batch_copy) { + _batch_transfer_ctx.add_entry(device_dst, cur_buffer(), size); + } else { + CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyHtoDAsync( + convert_void2deviceptr(device_dst), cur_buffer(), size, stream)); + } } template -void BounceBufferRing::copy_from_device(void const* device_src, - std::size_t size, - CUstream stream) +void BounceBufferRing::enqueue_d2h(void* device_src, std::size_t size, CUstream stream) { KVIKIO_NVTX_FUNC_RANGE(); - CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyDtoHAsync( - buffer(), convert_void2deviceptr(device_src), size, stream)); + if (_batch_copy) { + _batch_transfer_ctx.add_entry(cur_buffer(), device_src, size); + } else { + CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyDtoHAsync( + cur_buffer(), convert_void2deviceptr(device_src), size, stream)); + } } template void BounceBufferRing::advance(CUstream stream) { KVIKIO_NVTX_FUNC_RANGE(); - _cur_buf_offset = 0; + _cur_buffer_offset = 0; ++_cur_buf_idx; if (_cur_buf_idx >= _num_buffers) { _cur_buf_idx = 0; } if (_cur_buf_idx == _initial_buf_idx) { + if (_batch_copy) { + batch_copy( + _batch_transfer_ctx.dsts, _batch_transfer_ctx.srcs, _batch_transfer_ctx.sizes, stream); + _batch_transfer_ctx.clear(); + } CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream)); _initial_buf_idx = _cur_buf_idx; } } template -void* BounceBufferRing::buffer() const noexcept +void* BounceBufferRing::cur_buffer() const noexcept { return _buffers[_cur_buf_idx].get(); } template -void* BounceBufferRing::buffer(std::ptrdiff_t off) const noexcept +void* BounceBufferRing::cur_buffer(std::ptrdiff_t off) const noexcept { return _buffers[_cur_buf_idx].get(off); } @@ -287,15 +361,15 @@ std::size_t BounceBufferRing::num_buffers() const noexcept } template -std::size_t BounceBufferRing::offset() const noexcept +std::size_t BounceBufferRing::cur_buffer_offset() const noexcept { - return _cur_buf_offset; + return _cur_buffer_offset; } template -std::size_t BounceBufferRing::remaining_bytes() const noexcept +std::size_t BounceBufferRing::cur_buffer_remaining_capacity() const noexcept { - return buffer_size() - _cur_buf_offset; + return buffer_size() - _cur_buffer_offset; } template @@ -305,23 +379,22 @@ void BounceBufferRing::accumulate_and_submit_h2d(void* device_dst, CUstream stream) { KVIKIO_NVTX_FUNC_RANGE(); - auto const* host_str_ptr = static_cast(host_src); + auto const* host_src_ptr = static_cast(host_src); auto* device_dst_ptr = static_cast(device_dst); while (size > 0) { - auto const cur_remaining_bytes = remaining_bytes(); - auto const num_bytes_to_copy = std::min(size, cur_remaining_bytes); + auto const remaining_bytes = cur_buffer_remaining_capacity(); + auto const num_bytes_to_copy = std::min(size, remaining_bytes); // Copy from host to bounce buffer - std::memcpy(buffer(_cur_buf_offset), host_str_ptr, num_bytes_to_copy); - _cur_buf_offset += num_bytes_to_copy; - host_str_ptr += num_bytes_to_copy; + std::memcpy(cur_buffer(_cur_buffer_offset), host_src_ptr, num_bytes_to_copy); + _cur_buffer_offset += num_bytes_to_copy; + host_src_ptr += num_bytes_to_copy; size -= num_bytes_to_copy; - if (_cur_buf_offset >= buffer_size()) { - copy_to_device(device_dst_ptr, _cur_buf_offset, stream); - device_dst_ptr += _cur_buf_offset; - advance(stream); + if (_cur_buffer_offset >= buffer_size()) { + submit_h2d(device_dst_ptr, buffer_size(), stream); + device_dst_ptr += buffer_size(); } } } @@ -330,7 +403,7 @@ template void BounceBufferRing::submit_h2d(void* device_dst, std::size_t size, CUstream stream) { KVIKIO_NVTX_FUNC_RANGE(); - copy_to_device(device_dst, size, stream); + enqueue_h2d(device_dst, size, stream); advance(stream); } @@ -338,9 +411,9 @@ template std::size_t BounceBufferRing::flush_h2d(void* device_dst, CUstream stream) { KVIKIO_NVTX_FUNC_RANGE(); - if (_cur_buf_offset == 0) { return 0; } - auto const flushed = _cur_buf_offset; - copy_to_device(device_dst, _cur_buf_offset, stream); + if (_cur_buffer_offset == 0) { return 0; } + auto const flushed = _cur_buffer_offset; + enqueue_h2d(device_dst, _cur_buffer_offset, stream); advance(stream); return flushed; } @@ -349,6 +422,11 @@ template void BounceBufferRing::synchronize(CUstream stream) { KVIKIO_NVTX_FUNC_RANGE(); + if (_batch_copy) { + batch_copy( + _batch_transfer_ctx.dsts, _batch_transfer_ctx.srcs, _batch_transfer_ctx.sizes, stream); + _batch_transfer_ctx.clear(); + } CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream)); } From d90a8018603a407473f54d9ce280a7aeac20a3fe Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Fri, 30 Jan 2026 11:26:36 -0500 Subject: [PATCH 03/10] Modify naming --- cpp/src/bounce_buffer.cpp | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/cpp/src/bounce_buffer.cpp b/cpp/src/bounce_buffer.cpp index 0ecf9961f7..2b26c0f155 100644 --- a/cpp/src/bounce_buffer.cpp +++ b/cpp/src/bounce_buffer.cpp @@ -218,10 +218,10 @@ template class BounceBufferPool; template class BounceBufferPool; namespace { -void separate_copy(std::span dsts, - std::span srcs, - std::span sizes, - CUstream stream) +void unbatched_copy(std::span dsts, + std::span srcs, + std::span sizes, + CUstream stream) { // Fall back to the conventional H2D copy if the batch copy API is not available. for (std::size_t i = 0; i < srcs.size(); ++i) { @@ -255,10 +255,10 @@ void batch_copy(std::span dsts, #endif stream)); } else { - separate_copy(dsts, srcs, sizes, stream); + unbatched_copy(dsts, srcs, sizes, stream); } #else - separate_copy(dsts, srcs, sizes, stream); + unbatched_copy(dsts, srcs, sizes, stream); #endif } } // namespace From 95e57c80425adcf64055665ab475d1f6a0f74f77 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Fri, 30 Jan 2026 11:28:36 -0500 Subject: [PATCH 04/10] Update --- cpp/src/bounce_buffer.cpp | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/cpp/src/bounce_buffer.cpp b/cpp/src/bounce_buffer.cpp index 2b26c0f155..1cb0a14c36 100644 --- a/cpp/src/bounce_buffer.cpp +++ b/cpp/src/bounce_buffer.cpp @@ -218,18 +218,27 @@ template class BounceBufferPool; template class BounceBufferPool; namespace { +/** + * @brief Issue individual async H2D copies for each transfer. + * + * Fallback when cuMemcpyBatchAsync is unavailable. + */ void unbatched_copy(std::span dsts, std::span srcs, std::span sizes, CUstream stream) { - // Fall back to the conventional H2D copy if the batch copy API is not available. for (std::size_t i = 0; i < srcs.size(); ++i) { CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyHtoDAsync( convert_void2deviceptr(dsts[i]), reinterpret_cast(srcs[i]), sizes[i], stream)); } } +/** + * @brief Issue H2D copies using batch API if available, otherwise fall back to individual copies. + * + * Uses cuMemcpyBatchAsync (CUDA 12.8+) to reduce API overhead for multiple small transfers. + */ void batch_copy(std::span dsts, std::span srcs, std::span sizes, From b13a2101ffab60476e087bc25cc4471ce547b612 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Fri, 30 Jan 2026 11:40:19 -0500 Subject: [PATCH 05/10] Update --- cpp/src/bounce_buffer.cpp | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/cpp/src/bounce_buffer.cpp b/cpp/src/bounce_buffer.cpp index 1cb0a14c36..73a369faac 100644 --- a/cpp/src/bounce_buffer.cpp +++ b/cpp/src/bounce_buffer.cpp @@ -39,6 +39,7 @@ void* CudaPinnedAllocator::allocate(std::size_t size) return buffer; } + void CudaPinnedAllocator::deallocate(void* buffer, std::size_t /*size*/) { CUDA_DRIVER_TRY(cudaAPI::instance().MemFreeHost(buffer)); @@ -116,7 +117,7 @@ template void* BounceBufferPool::Buffer::get(std::ptrdiff_t offset) const noexcept { KVIKIO_NVTX_FUNC_RANGE(); - return static_cast(_buffer) + offset; + return static_cast(_buffer) + offset; } template @@ -388,9 +389,10 @@ void BounceBufferRing::accumulate_and_submit_h2d(void* device_dst, CUstream stream) { KVIKIO_NVTX_FUNC_RANGE(); - auto const* host_src_ptr = static_cast(host_src); - auto* device_dst_ptr = static_cast(device_dst); + auto const* host_src_ptr = static_cast(host_src); + auto* device_dst_ptr = static_cast(device_dst); + // The data is split across multiple buffers if its size is greater than buffer_size() while (size > 0) { auto const remaining_bytes = cur_buffer_remaining_capacity(); auto const num_bytes_to_copy = std::min(size, remaining_bytes); From a4eae689aefd1ef4ba2d42377ee100ccf53a21f2 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Fri, 30 Jan 2026 12:09:33 -0500 Subject: [PATCH 06/10] Add return value for accumulate_and_submit_h2d --- cpp/include/kvikio/bounce_buffer.hpp | 16 +++++++++------- cpp/src/bounce_buffer.cpp | 10 ++++++---- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/cpp/include/kvikio/bounce_buffer.hpp b/cpp/include/kvikio/bounce_buffer.hpp index a5fcd97275..ba075a5e90 100644 --- a/cpp/include/kvikio/bounce_buffer.hpp +++ b/cpp/include/kvikio/bounce_buffer.hpp @@ -423,10 +423,11 @@ class BounceBufferRing { * Typical usage for streaming host data to device: * @code * while (has_more_data()) { - * ring.accumulate_and_submit_h2d(device_ptr, host_data, chunk_size, stream); - * device_ptr += chunk_size; // Note: only advance by submitted amount + * auto submitted = ring.accumulate_and_submit_h2d(device_ptr, host_data, chunk_size, stream); + * device_ptr += submitted; * } - * ring.flush_h2d(device_ptr, stream); + * auto flushed = ring.flush_h2d(device_ptr, stream); + * device_ptr += flushed; * ring.synchronize(stream); * @endcode * @@ -434,14 +435,15 @@ class BounceBufferRing { * @param host_src Source data in host memory. * @param size Bytes to copy. * @param stream CUDA stream for async H2D transfers. + * @return Number of bytes (in buffer sizes) that have actually been submitted for copy. * * @note Partial buffer contents remain until flush_h2d() is called. * @note Final data visibility requires flush_h2d() + synchronize(). */ - void accumulate_and_submit_h2d(void* device_dst, - void const* host_src, - std::size_t size, - CUstream stream); + std::size_t accumulate_and_submit_h2d(void* device_dst, + void const* host_src, + std::size_t size, + CUstream stream); /** * @brief Submit current buffer contents to device and advance to next buffer. diff --git a/cpp/src/bounce_buffer.cpp b/cpp/src/bounce_buffer.cpp index 73a369faac..a79aae1c50 100644 --- a/cpp/src/bounce_buffer.cpp +++ b/cpp/src/bounce_buffer.cpp @@ -383,10 +383,10 @@ std::size_t BounceBufferRing::cur_buffer_remaining_capacity() const n } template -void BounceBufferRing::accumulate_and_submit_h2d(void* device_dst, - void const* host_src, - std::size_t size, - CUstream stream) +std::size_t BounceBufferRing::accumulate_and_submit_h2d(void* device_dst, + void const* host_src, + std::size_t size, + CUstream stream) { KVIKIO_NVTX_FUNC_RANGE(); auto const* host_src_ptr = static_cast(host_src); @@ -408,6 +408,8 @@ void BounceBufferRing::accumulate_and_submit_h2d(void* device_dst, device_dst_ptr += buffer_size(); } } + + return static_cast(device_dst_ptr - static_cast(device_dst)); } template From 00338b33f1e49a253a71315196a2167d8f66bfc0 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Fri, 30 Jan 2026 13:24:54 -0500 Subject: [PATCH 07/10] Add reset --- cpp/include/kvikio/bounce_buffer.hpp | 15 ++++++++++++++- cpp/src/bounce_buffer.cpp | 11 +++++++++-- 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/cpp/include/kvikio/bounce_buffer.hpp b/cpp/include/kvikio/bounce_buffer.hpp index ba075a5e90..f9016680b2 100644 --- a/cpp/include/kvikio/bounce_buffer.hpp +++ b/cpp/include/kvikio/bounce_buffer.hpp @@ -286,6 +286,8 @@ using CudaPageAlignedPinnedBounceBufferPool = BounceBufferPool class BounceBufferRing { @@ -435,7 +437,8 @@ class BounceBufferRing { * @param host_src Source data in host memory. * @param size Bytes to copy. * @param stream CUDA stream for async H2D transfers. - * @return Number of bytes (in buffer sizes) that have actually been submitted for copy. + * @return Number of bytes submitted to device (always a multiple of buffer_size(); partial buffer + * contents remain until flush_h2d()). * * @note Partial buffer contents remain until flush_h2d() is called. * @note Final data visibility requires flush_h2d() + synchronize(). @@ -490,5 +493,15 @@ class BounceBufferRing { * @note After synchronize(), the ring can be reused for new transfers. */ void synchronize(CUstream stream); + + /** + * @brief Synchronize pending transfers and reset ring state for a new transfer session. + * + * Ensures all in-flight transfers complete, then resets the ring to its initial state. After + * reset(), the ring can be safely reused for either H2D or D2H operations. + * + * @param stream CUDA stream to synchronize. + */ + void reset(CUstream stream); }; } // namespace kvikio diff --git a/cpp/src/bounce_buffer.cpp b/cpp/src/bounce_buffer.cpp index a79aae1c50..922828102e 100644 --- a/cpp/src/bounce_buffer.cpp +++ b/cpp/src/bounce_buffer.cpp @@ -109,14 +109,12 @@ BounceBufferPool::Buffer& BounceBufferPool::Buffer::operat template void* BounceBufferPool::Buffer::get() const noexcept { - KVIKIO_NVTX_FUNC_RANGE(); return _buffer; } template void* BounceBufferPool::Buffer::get(std::ptrdiff_t offset) const noexcept { - KVIKIO_NVTX_FUNC_RANGE(); return static_cast(_buffer) + offset; } @@ -443,6 +441,15 @@ void BounceBufferRing::synchronize(CUstream stream) CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream)); } +template +void BounceBufferRing::reset(CUstream stream) +{ + synchronize(stream); + _cur_buf_idx = 0; + _initial_buf_idx = 0; + _cur_buffer_offset = 0; +} + // Explicit instantiations template class BounceBufferRing; template class BounceBufferRing; From a1394abafd9712a7e316054e4868227c6c21f224 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Fri, 30 Jan 2026 14:36:01 -0500 Subject: [PATCH 08/10] Optimize easy-handle remote I/O using bounce buffer ring --- cpp/include/kvikio/remote_handle.hpp | 8 +++-- cpp/src/remote_handle.cpp | 46 +++++++++++++++++++--------- 2 files changed, 38 insertions(+), 16 deletions(-) diff --git a/cpp/include/kvikio/remote_handle.hpp b/cpp/include/kvikio/remote_handle.hpp index 0b0808c45e..31d0353c9a 100644 --- a/cpp/include/kvikio/remote_handle.hpp +++ b/cpp/include/kvikio/remote_handle.hpp @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION. + * SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION. * SPDX-License-Identifier: Apache-2.0 */ #pragma once @@ -11,6 +11,7 @@ #include #include +#include #include #include #include @@ -441,7 +442,10 @@ class RemoteHandle { * @param file_offset File offset in bytes. * @return Number of bytes read, which is always `size`. */ - std::size_t read(void* buf, std::size_t size, std::size_t file_offset = 0); + std::size_t read(void* buf, + std::size_t size, + std::size_t file_offset = 0, + BounceBufferRing* bounce_buffer_ring = nullptr); /** * @brief Read from remote source into buffer (host or device memory) in parallel. diff --git a/cpp/src/remote_handle.cpp b/cpp/src/remote_handle.cpp index 7c917c9a0b..d2c5470c17 100644 --- a/cpp/src/remote_handle.cpp +++ b/cpp/src/remote_handle.cpp @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION. + * SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION. * SPDX-License-Identifier: Apache-2.0 */ @@ -702,11 +702,19 @@ struct CallbackContext { std::size_t size; // Total number of bytes to read. std::ptrdiff_t offset; // Offset into `buf` to start reading. bool overflow_error; // Flag to indicate overflow. - CallbackContext(void* buf, std::size_t size) - : buf{static_cast(buf)}, size{size}, offset{0}, overflow_error{0} + BounceBufferRing* bounce_buffer_ring{ + nullptr}; // Only used by callback_device_memory + + CallbackContext(void* a_buf, + std::size_t a_size, + BounceBufferRing* a_bounce_buffer_ring) + : buf{static_cast(a_buf)}, + size{a_size}, + offset{0}, + overflow_error{false}, + bounce_buffer_ring{a_bounce_buffer_ring} { } - BounceBufferH2D* bounce_buffer{nullptr}; // Only used by callback_device_memory }; /** @@ -755,13 +763,17 @@ std::size_t callback_device_memory(char* data, std::size_t size, std::size_t nme } KVIKIO_NVTX_FUNC_RANGE(nbytes); - ctx->bounce_buffer->write(data, nbytes); + ctx->bounce_buffer_ring->accumulate_and_submit_h2d( + ctx->buf, data, nbytes, detail::StreamsByThread::get()); ctx->offset += nbytes; return nbytes; } } // namespace -std::size_t RemoteHandle::read(void* buf, std::size_t size, std::size_t file_offset) +std::size_t RemoteHandle::read(void* buf, + std::size_t size, + std::size_t file_offset, + BounceBufferRing* bounce_buffer_ring) { KVIKIO_NVTX_FUNC_RANGE(size); @@ -781,7 +793,7 @@ std::size_t RemoteHandle::read(void* buf, std::size_t size, std::size_t file_off } else { curl.setopt(CURLOPT_WRITEFUNCTION, callback_device_memory); } - CallbackContext ctx{buf, size}; + CallbackContext ctx{buf, size, bounce_buffer_ring}; curl.setopt(CURLOPT_WRITEDATA, &ctx); try { @@ -791,8 +803,11 @@ std::size_t RemoteHandle::read(void* buf, std::size_t size, std::size_t file_off PushAndPopContext c(get_context_from_pointer(buf)); // We use a bounce buffer to avoid many small memory copies to device. Libcurl has a // maximum chunk size of 16kb (`CURL_MAX_WRITE_SIZE`) but chunks are often much smaller. - BounceBufferH2D bounce_buffer(detail::StreamsByThread::get(), buf); - ctx.bounce_buffer = &bounce_buffer; + std::unique_ptr> a_bounce_buffer_ring; + if (bounce_buffer_ring == nullptr) { + a_bounce_buffer_ring = std::make_unique>(); + ctx.bounce_buffer_ring = a_bounce_buffer_ring.get(); + } curl.perform(); } } catch (std::runtime_error const& e) { @@ -815,11 +830,14 @@ std::future RemoteHandle::pread(void* buf, KVIKIO_EXPECT(thread_pool != nullptr, "The thread pool must not be nullptr"); auto& [nvtx_color, call_idx] = detail::get_next_color_and_call_idx(); KVIKIO_NVTX_FUNC_RANGE(size); - auto task = [this](void* devPtr_base, - std::size_t size, - std::size_t file_offset, - std::size_t devPtr_offset) -> std::size_t { - return read(static_cast(devPtr_base) + devPtr_offset, size, file_offset); + auto bounce_buffer_ring = std::make_shared>(); + auto task = [this, bounce_buffer_ring = bounce_buffer_ring]( + void* devPtr_base, + std::size_t size, + std::size_t file_offset, + std::size_t devPtr_offset) -> std::size_t { + return read( + static_cast(devPtr_base) + devPtr_offset, size, file_offset, bounce_buffer_ring.get()); }; return parallel_io(task, buf, size, file_offset, task_size, 0, thread_pool, call_idx, nvtx_color); } From ee33ad85933b9476fd8f2fb866e06ec05e416801 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Fri, 30 Jan 2026 17:45:53 -0500 Subject: [PATCH 09/10] Add clarifying comments on bounce buffer's potential multicontext use --- cpp/include/kvikio/bounce_buffer.hpp | 9 +++++++++ cpp/src/bounce_buffer.cpp | 10 ++++++---- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/cpp/include/kvikio/bounce_buffer.hpp b/cpp/include/kvikio/bounce_buffer.hpp index f9016680b2..49492f7a55 100644 --- a/cpp/include/kvikio/bounce_buffer.hpp +++ b/cpp/include/kvikio/bounce_buffer.hpp @@ -43,6 +43,9 @@ class PageAlignedAllocator { * transferred to/from GPU device memory. The allocation is only guaranteed to be aligned to "at * least 256 bytes". It is NOT guaranteed to be page aligned. * + * @note Allocations use CU_MEMHOSTALLOC_PORTABLE, making them accessible from all CUDA contexts, + * not just the one that performed the allocation. This allows the singleton BounceBufferPool to + * safely serve buffers across multiple contexts and devices. * @note Do NOT use with Direct I/O - lacks page alignment guarantee */ class CudaPinnedAllocator { @@ -71,6 +74,9 @@ class CudaPinnedAllocator { * (for efficient host-device transfers). Uses std::aligned_alloc followed by * cudaMemHostRegister to achieve both properties. * + * @note Registration uses CU_MEMHOSTALLOC_PORTABLE, making buffers accessible from all CUDA + * contexts, not just the one active during allocation. This allows the singleton BounceBufferPool + * to safely serve buffers across multiple contexts and devices. * @note This is the required allocator for Direct I/O with device memory. Requires a valid CUDA * context when allocating. */ @@ -111,6 +117,9 @@ class CudaPageAlignedPinnedAllocator { * - CudaPinnedAllocator: For device I/O without Direct I/O * - CudaPageAlignedPinnedAllocator: For device I/O with Direct I/O * + * @note The singleton pool is safe for use across multiple CUDA contexts. CUDA-pinned allocators + * use CU_MEMHOSTALLOC_PORTABLE, ensuring buffers allocated in one context can be used from any + * other context. * @note The destructor intentionally leaks allocations to avoid CUDA cleanup issues when static * destructors run after CUDA context destruction */ diff --git a/cpp/src/bounce_buffer.cpp b/cpp/src/bounce_buffer.cpp index 922828102e..400ca7f122 100644 --- a/cpp/src/bounce_buffer.cpp +++ b/cpp/src/bounce_buffer.cpp @@ -31,10 +31,9 @@ void* CudaPinnedAllocator::allocate(std::size_t size) { void* buffer{}; - // If no available allocation, allocate and register a new one - // Allocate page-locked host memory - // Under unified addressing, host memory allocated this way is automatically portable and - // mapped. + // Allocate page-locked (pinned) host memory with CU_MEMHOSTALLOC_PORTABLE. The PORTABLE flag + // ensures this memory is accessible from all CUDA contexts, which is essential for the singleton + // BounceBufferPool that may serve multiple contexts and devices. CUDA_DRIVER_TRY(cudaAPI::instance().MemHostAlloc(&buffer, size, CU_MEMHOSTALLOC_PORTABLE)); return buffer; @@ -52,6 +51,9 @@ void* CudaPageAlignedPinnedAllocator::allocate(std::size_t size) auto const aligned_size = detail::align_up(size, page_size); buffer = std::aligned_alloc(page_size, aligned_size); KVIKIO_EXPECT(buffer != nullptr, "Aligned allocation failed"); + // Register the page-aligned allocation as pinned memory with CU_MEMHOSTALLOC_PORTABLE. The + // PORTABLE flag ensures this memory is accessible from all CUDA contexts, which is essential for + // the singleton BounceBufferPool that may serve multiple contexts and devices. CUDA_DRIVER_TRY( cudaAPI::instance().MemHostRegister(buffer, aligned_size, CU_MEMHOSTALLOC_PORTABLE)); return buffer; From 91a6bee72f15390cbcd98804b31c8376bfa2a8b5 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Fri, 30 Jan 2026 17:52:08 -0500 Subject: [PATCH 10/10] Clarify the multicontext issue for bounce buffer ring --- cpp/include/kvikio/bounce_buffer.hpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cpp/include/kvikio/bounce_buffer.hpp b/cpp/include/kvikio/bounce_buffer.hpp index 49492f7a55..5d2428ba84 100644 --- a/cpp/include/kvikio/bounce_buffer.hpp +++ b/cpp/include/kvikio/bounce_buffer.hpp @@ -293,6 +293,9 @@ using CudaPageAlignedPinnedBounceBufferPool = BounceBufferPool