From ebadb51e32d79619ecce5668654b777bb66c870d Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Thu, 29 Jan 2026 22:37:37 -0500 Subject: [PATCH 01/37] Unify bounce buffer management --- cpp/include/kvikio/bounce_buffer.hpp | 102 +++++++++++++++++++ cpp/src/bounce_buffer.cpp | 140 +++++++++++++++++++++++++++ 2 files changed, 242 insertions(+) diff --git a/cpp/include/kvikio/bounce_buffer.hpp b/cpp/include/kvikio/bounce_buffer.hpp index 4ebc914d10..527a50a5a9 100644 --- a/cpp/include/kvikio/bounce_buffer.hpp +++ b/cpp/include/kvikio/bounce_buffer.hpp @@ -268,4 +268,106 @@ using CudaPinnedBounceBufferPool = BounceBufferPool; * Provides both page alignment (for Direct I/O) and CUDA registration (for efficient transfers) */ using CudaPageAlignedPinnedBounceBufferPool = BounceBufferPool; + +/** + * @brief K-way bounce buffer ring for overlapping I/O with host-device transfers. + * + * @tparam Allocator The allocator policy for bounce buffers. + */ +template +class BounceBufferRing { + private: + std::vector::Buffer> _buffers; + std::size_t _num_buffers; + std::size_t _cur_buf_idx{0}; + std::size_t _initial_buf_idx{0}; + std::size_t _cur_buf_offset{0}; + + /** + * @brief Async copy from current bounce buffer to device memory. + * + * @param device_dst Device memory destination. + * @param size Bytes to copy. + * @param stream CUDA stream for the async transfer. + */ + void copy_to_device(void* device_dst, std::size_t size, CUstream stream); + + /** + * @brief Async copy from device memory to current bounce buffer. + * + * @param device_src Device memory source. + * @param size Bytes to copy. + * @param stream CUDA stream for the async transfer. + */ + void copy_from_device(void const* device_src, std::size_t size, CUstream stream); + + /** + * @brief Advance to next buffer, sync stream if wrapping around. + * + * @param stream CUDA stream to synchronize on wrap-around. + */ + void advance(CUstream stream); + + /** + * @brief Get remaining number of bytes in current buffer for accumulation. + */ + [[nodiscard]] std::size_t remaining_bytes() const noexcept; + + public: + /** + * @brief Construct a bounce buffer ring. + * + * @param num_buffers Number of bounce buffers for k-way overlap (must be >= 1). + */ + explicit BounceBufferRing(std::size_t num_buffers = 1); + + ~BounceBufferRing() noexcept = default; + + // Non-copyable, non-movable + BounceBufferRing(BounceBufferRing const&) = delete; + BounceBufferRing& operator=(BounceBufferRing const&) = delete; + BounceBufferRing(BounceBufferRing&&) = delete; + BounceBufferRing& operator=(BounceBufferRing&&) = delete; + + // Accessors + [[nodiscard]] void* buffer() const noexcept; + [[nodiscard]] void* buffer(std::ptrdiff_t offset) const noexcept; + [[nodiscard]] std::size_t buffer_size() const noexcept; + [[nodiscard]] std::size_t num_buffers() const noexcept; + [[nodiscard]] std::size_t offset() const noexcept; + + /** + * @brief Accumulate data into bounce buffer, auto-flush when full. + * + * Copies src to internal buffer. When buffer fills, issues async H2D copy to device_dst and + * advances to next buffer. + * + * @param device_dst Current device destination pointer. + * @param host_src Source data in host memory. + * @param size Bytes to write. + * @param stream CUDA stream for async H2D transfers. + */ + void accumulate_and_submit_h2d(void* device_dst, + void const* host_src, + std::size_t size, + CUstream stream); + + void submit_h2d(void* device_dst, std::size_t size, CUstream stream); + + /** + * @brief Flush any accumulated data to device. + * + * @param device_dst Device memory destination. + * @param stream CUDA stream for async H2D transfer. + * @return Bytes flushed. + */ + std::size_t flush_h2d(void* device_dst, CUstream stream); + + /** + * @brief Synchronize the given CUDA stream. + * + * @param stream CUDA stream to synchronize. + */ + static void synchronize(CUstream stream); +}; } // namespace kvikio diff --git a/cpp/src/bounce_buffer.cpp b/cpp/src/bounce_buffer.cpp index 62b08a6883..bef35756a4 100644 --- a/cpp/src/bounce_buffer.cpp +++ b/cpp/src/bounce_buffer.cpp @@ -216,4 +216,144 @@ BounceBufferPool& BounceBufferPool::instance() template class BounceBufferPool; template class BounceBufferPool; template class BounceBufferPool; + +template +BounceBufferRing::BounceBufferRing(std::size_t num_buffers) : _num_buffers{num_buffers} +{ + KVIKIO_NVTX_FUNC_RANGE(); + KVIKIO_EXPECT(num_buffers >= 1, "BounceBufferRing requires at least 1 buffer"); + + _buffers.reserve(_num_buffers); + for (std::size_t i = 0; i < _num_buffers; ++i) { + _buffers.emplace_back(BounceBufferPool::instance().get()); + } +} + +template +void BounceBufferRing::copy_to_device(void* device_dst, + std::size_t size, + CUstream stream) +{ + KVIKIO_NVTX_FUNC_RANGE(); + CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyHtoDAsync( + convert_void2deviceptr(device_dst), buffer(), size, stream)); +} + +template +void BounceBufferRing::copy_from_device(void const* device_src, + std::size_t size, + CUstream stream) +{ + KVIKIO_NVTX_FUNC_RANGE(); + CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyDtoHAsync( + buffer(), convert_void2deviceptr(device_src), size, stream)); +} + +template +void BounceBufferRing::advance(CUstream stream) +{ + KVIKIO_NVTX_FUNC_RANGE(); + _cur_buf_offset = 0; + ++_cur_buf_idx; + if (_cur_buf_idx >= _num_buffers) { _cur_buf_idx = 0; } + if (_cur_buf_idx == _initial_buf_idx) { + CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream)); + _initial_buf_idx = _cur_buf_idx; + } +} + +template +void* BounceBufferRing::buffer() const noexcept +{ + return _buffers[_cur_buf_idx].get(); +} + +template +void* BounceBufferRing::buffer(std::ptrdiff_t off) const noexcept +{ + return _buffers[_cur_buf_idx].get(off); +} + +template +std::size_t BounceBufferRing::buffer_size() const noexcept +{ + return _buffers[_cur_buf_idx].size(); +} + +template +std::size_t BounceBufferRing::num_buffers() const noexcept +{ + return _num_buffers; +} + +template +std::size_t BounceBufferRing::offset() const noexcept +{ + return _cur_buf_offset; +} + +template +std::size_t BounceBufferRing::remaining_bytes() const noexcept +{ + return buffer_size() - _cur_buf_offset; +} + +template +void BounceBufferRing::accumulate_and_submit_h2d(void* device_dst, + void const* host_src, + std::size_t size, + CUstream stream) +{ + KVIKIO_NVTX_FUNC_RANGE(); + auto const* host_str_ptr = static_cast(host_src); + auto* device_dst_ptr = static_cast(device_dst); + + while (size > 0) { + auto const cur_remaining_bytes = remaining_bytes(); + auto const num_bytes_to_copy = std::min(size, cur_remaining_bytes); + + // Copy from host to bounce buffer + std::memcpy(buffer(_cur_buf_offset), host_str_ptr, num_bytes_to_copy); + _cur_buf_offset += num_bytes_to_copy; + host_str_ptr += num_bytes_to_copy; + size -= num_bytes_to_copy; + + if (_cur_buf_offset >= buffer_size()) { + copy_to_device(device_dst_ptr, _cur_buf_offset, stream); + device_dst_ptr += _cur_buf_offset; + advance(stream); + } + } +} + +template +void BounceBufferRing::submit_h2d(void* device_dst, std::size_t size, CUstream stream) +{ + KVIKIO_NVTX_FUNC_RANGE(); + copy_to_device(device_dst, size, stream); + advance(stream); +} + +template +std::size_t BounceBufferRing::flush_h2d(void* device_dst, CUstream stream) +{ + KVIKIO_NVTX_FUNC_RANGE(); + if (_cur_buf_offset == 0) { return 0; } + auto const flushed = _cur_buf_offset; + copy_to_device(device_dst, _cur_buf_offset, stream); + advance(stream); + return flushed; +} + +template +void BounceBufferRing::synchronize(CUstream stream) +{ + KVIKIO_NVTX_FUNC_RANGE(); + CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream)); +} + +// Explicit instantiations +template class BounceBufferRing; +template class BounceBufferRing; + } // namespace kvikio From 9ba980ecd8a06fc9ca9caffcc6df7e3f679faad9 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Fri, 30 Jan 2026 11:23:20 -0500 Subject: [PATCH 02/37] Improve implementation and function naming --- cpp/include/kvikio/bounce_buffer.hpp | 205 +++++++++++++++++++++------ cpp/src/bounce_buffer.cpp | 142 ++++++++++++++----- 2 files changed, 272 insertions(+), 75 deletions(-) diff --git a/cpp/include/kvikio/bounce_buffer.hpp b/cpp/include/kvikio/bounce_buffer.hpp index 527a50a5a9..a5fcd97275 100644 --- a/cpp/include/kvikio/bounce_buffer.hpp +++ b/cpp/include/kvikio/bounce_buffer.hpp @@ -272,102 +272,221 @@ using CudaPageAlignedPinnedBounceBufferPool = BounceBufferPool class BounceBufferRing { + public: + struct BatchTransferContext { + std::vector srcs; + std::vector dsts; + std::vector sizes; + + void add_entry(void* dst, void* src, std::size_t size); + void clear(); + }; + private: std::vector::Buffer> _buffers; + BatchTransferContext _batch_transfer_ctx; std::size_t _num_buffers; std::size_t _cur_buf_idx{0}; std::size_t _initial_buf_idx{0}; - std::size_t _cur_buf_offset{0}; + std::size_t _cur_buffer_offset{0}; + bool _batch_copy; + public: /** - * @brief Async copy from current bounce buffer to device memory. + * @brief Construct a bounce buffer ring. * - * @param device_dst Device memory destination. - * @param size Bytes to copy. - * @param stream CUDA stream for the async transfer. + * @param num_buffers Number of bounce buffers (k) for k-way overlap. Must be >= 1. Higher values + * allow more overlap but consume more memory. + * @param batch_copy If true, defer H2D copies and issue them in batches. Useful for many small + * transfers where API overhead dominates. If false (default), issue H2D copies immediately for + * better overlap. */ - void copy_to_device(void* device_dst, std::size_t size, CUstream stream); + explicit BounceBufferRing(std::size_t num_buffers = 1, bool batch_copy = false); + + ~BounceBufferRing() noexcept = default; + + // Non-copyable, non-movable + BounceBufferRing(BounceBufferRing const&) = delete; + BounceBufferRing& operator=(BounceBufferRing const&) = delete; + BounceBufferRing(BounceBufferRing&&) = delete; + BounceBufferRing& operator=(BounceBufferRing&&) = delete; /** - * @brief Async copy from device memory to current bounce buffer. + * @brief Get pointer to the current bounce buffer. * - * @param device_src Device memory source. - * @param size Bytes to copy. - * @param stream CUDA stream for the async transfer. + * Use this to fill the buffer directly (e.g., via pread), then call submit_h2d() to transfer the + * data to device. + * + * @return Pointer to the start of the current buffer. */ - void copy_from_device(void const* device_src, std::size_t size, CUstream stream); + [[nodiscard]] void* cur_buffer() const noexcept; /** - * @brief Advance to next buffer, sync stream if wrapping around. + * @brief Get pointer to the current bounce buffer at a specific offset. * - * @param stream CUDA stream to synchronize on wrap-around. + * Useful for partial buffer fills or when accumulating data incrementally. + * + * @param offset Byte offset from the start of the current buffer. + * @return Pointer to cur_buffer() + offset. */ - void advance(CUstream stream); + [[nodiscard]] void* cur_buffer(std::ptrdiff_t offset) const noexcept; /** - * @brief Get remaining number of bytes in current buffer for accumulation. + * @brief Get the size of each bounce buffer in the ring. + * + * All buffers in the ring have the same size, determined by defaults::bounce_buffer_size() at + * ring construction time. + * + * @return Size in bytes of each buffer. */ - [[nodiscard]] std::size_t remaining_bytes() const noexcept; + [[nodiscard]] std::size_t buffer_size() const noexcept; - public: /** - * @brief Construct a bounce buffer ring. + * @brief Get the number of buffers in the ring (k for k-way overlap). * - * @param num_buffers Number of bounce buffers for k-way overlap (must be >= 1). + * @return Number of bounce buffers. */ - explicit BounceBufferRing(std::size_t num_buffers = 1); + [[nodiscard]] std::size_t num_buffers() const noexcept; - ~BounceBufferRing() noexcept = default; + /** + * @brief Get the current fill level of the active buffer. + * + * Indicates how many bytes have been accumulated in the current buffer via + * accumulate_and_submit_h2d(). Reset to 0 after each advance(). + * + * @return Number of bytes currently in the buffer. + */ + [[nodiscard]] std::size_t cur_buffer_offset() const noexcept; - // Non-copyable, non-movable - BounceBufferRing(BounceBufferRing const&) = delete; - BounceBufferRing& operator=(BounceBufferRing const&) = delete; - BounceBufferRing(BounceBufferRing&&) = delete; - BounceBufferRing& operator=(BounceBufferRing&&) = delete; + /** + * @brief Get remaining number of bytes in current buffer for accumulation. + */ + [[nodiscard]] std::size_t cur_buffer_remaining_capacity() const noexcept; - // Accessors - [[nodiscard]] void* buffer() const noexcept; - [[nodiscard]] void* buffer(std::ptrdiff_t offset) const noexcept; - [[nodiscard]] std::size_t buffer_size() const noexcept; - [[nodiscard]] std::size_t num_buffers() const noexcept; - [[nodiscard]] std::size_t offset() const noexcept; + /** + * @brief Advance to next buffer in the ring. + * + * Resets current buffer offset and moves to next buffer index. If wrapping around to the oldest + * in-flight buffer, synchronizes the stream first to ensure that buffer's transfer is complete. + * + * @param stream CUDA stream to synchronize on wrap-around. + */ + void advance(CUstream stream); /** - * @brief Accumulate data into bounce buffer, auto-flush when full. + * @brief Queue async copy from current bounce buffer to device memory. + * + * In non-batch mode, issues cuMemcpyHtoDAsync immediately. In batch mode, defers the copy until + * wrap-around or synchronize(). + * + * @param device_dst Device memory destination. + * @param size Bytes to copy from cur_buffer(). + * @param stream CUDA stream for the async transfer. * - * Copies src to internal buffer. When buffer fills, issues async H2D copy to device_dst and - * advances to next buffer. + * @note Does NOT advance to next buffer. Call submit_h2d() for copy + advance. + */ + void enqueue_h2d(void* device_dst, std::size_t size, CUstream stream); + + /** + * @brief Async copy from device memory to current bounce buffer. * - * @param device_dst Current device destination pointer. + * @param device_src Device memory source. + * @param size Bytes to copy. + * @param stream CUDA stream for the async transfer. + */ + void enqueue_d2h(void* device_src, std::size_t size, CUstream stream); + + /** + * @brief Accumulate data into bounce buffer, auto-submit when full. + * + * Copies host data into the internal buffer. When the buffer fills, issues an async H2D copy and + * advances to the next buffer. Handles data larger than buffer_size() by splitting across + * multiple buffers. + * + * Typical usage for streaming host data to device: + * @code + * while (has_more_data()) { + * ring.accumulate_and_submit_h2d(device_ptr, host_data, chunk_size, stream); + * device_ptr += chunk_size; // Note: only advance by submitted amount + * } + * ring.flush_h2d(device_ptr, stream); + * ring.synchronize(stream); + * @endcode + * + * @param device_dst Device memory destination (should track cumulative offset externally). * @param host_src Source data in host memory. - * @param size Bytes to write. + * @param size Bytes to copy. * @param stream CUDA stream for async H2D transfers. + * + * @note Partial buffer contents remain until flush_h2d() is called. + * @note Final data visibility requires flush_h2d() + synchronize(). */ void accumulate_and_submit_h2d(void* device_dst, void const* host_src, std::size_t size, CUstream stream); + /** + * @brief Submit current buffer contents to device and advance to next buffer. + * + * Typical usage pattern for direct-fill (e.g., pread into buffer): + * @code + * ssize_t n = pread(fd, ring.cur_buffer(), ring.buffer_size(), offset); + * ring.submit_h2d(device_ptr, n, stream); + * device_ptr += n; + * @endcode + * + * @param device_dst Device memory destination. + * @param size Bytes actually written to cur_buffer(). + * @param stream CUDA stream for async H2D transfer. + * + * @note Synchronization may occur if this causes a wrap-around. + * @note Final data visibility requires calling synchronize() after all submits. + */ void submit_h2d(void* device_dst, std::size_t size, CUstream stream); /** - * @brief Flush any accumulated data to device. + * @brief Flush any partially accumulated data to device. * - * @param device_dst Device memory destination. + * Call after accumulate_and_submit_h2d() to submit remaining data that didn't fill a complete + * buffer. + * + * @param device_dst Device memory destination for the partial buffer. * @param stream CUDA stream for async H2D transfer. - * @return Bytes flushed. + * @return Number of bytes flushed (0 if buffer was empty). + * + * @note Still requires synchronize() for data visibility guarantee. */ std::size_t flush_h2d(void* device_dst, CUstream stream); /** - * @brief Synchronize the given CUDA stream. + * @brief Ensure all queued H2D transfers are complete. + * + * In batch mode, issues any pending batch copies first. Then synchronizes the stream to guarantee + * all data is visible in device memory. * * @param stream CUDA stream to synchronize. + * + * @note Must be called before reading transferred data on device. + * @note After synchronize(), the ring can be reused for new transfers. */ - static void synchronize(CUstream stream); + void synchronize(CUstream stream); }; } // namespace kvikio diff --git a/cpp/src/bounce_buffer.cpp b/cpp/src/bounce_buffer.cpp index bef35756a4..0ecf9961f7 100644 --- a/cpp/src/bounce_buffer.cpp +++ b/cpp/src/bounce_buffer.cpp @@ -217,8 +217,73 @@ template class BounceBufferPool; template class BounceBufferPool; template class BounceBufferPool; +namespace { +void separate_copy(std::span dsts, + std::span srcs, + std::span sizes, + CUstream stream) +{ + // Fall back to the conventional H2D copy if the batch copy API is not available. + for (std::size_t i = 0; i < srcs.size(); ++i) { + CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyHtoDAsync( + convert_void2deviceptr(dsts[i]), reinterpret_cast(srcs[i]), sizes[i], stream)); + } +} + +void batch_copy(std::span dsts, + std::span srcs, + std::span sizes, + CUstream stream) +{ + if (srcs.size() == 0) return; + +#if CUDA_VERSION >= 12080 + if (cudaAPI::instance().MemcpyBatchAsync) { + CUmemcpyAttributes attrs{}; + std::size_t attrs_idxs[] = {0}; + attrs.srcAccessOrder = CUmemcpySrcAccessOrder_enum::CU_MEMCPY_SRC_ACCESS_ORDER_STREAM; + CUDA_DRIVER_TRY( + cudaAPI::instance().MemcpyBatchAsync(dsts.data(), + srcs.data(), + sizes.data(), + srcs.size(), + &attrs, + attrs_idxs, + static_cast(1) /* num_attrs */, +#if CUDA_VERSION < 13000 + static_cast(nullptr), +#endif + stream)); + } else { + separate_copy(dsts, srcs, sizes, stream); + } +#else + separate_copy(dsts, srcs, sizes, stream); +#endif +} +} // namespace + +template +void BounceBufferRing::BatchTransferContext::add_entry(void* dst, + void* src, + std::size_t size) +{ + srcs.push_back(src); + dsts.push_back(dst); + sizes.push_back(size); +} + template -BounceBufferRing::BounceBufferRing(std::size_t num_buffers) : _num_buffers{num_buffers} +void BounceBufferRing::BatchTransferContext::clear() +{ + srcs.clear(); + dsts.clear(); + sizes.clear(); +} + +template +BounceBufferRing::BounceBufferRing(std::size_t num_buffers, bool batch_copy) + : _num_buffers{num_buffers}, _batch_copy{batch_copy} { KVIKIO_NVTX_FUNC_RANGE(); KVIKIO_EXPECT(num_buffers >= 1, "BounceBufferRing requires at least 1 buffer"); @@ -230,46 +295,55 @@ BounceBufferRing::BounceBufferRing(std::size_t num_buffers) : _num_bu } template -void BounceBufferRing::copy_to_device(void* device_dst, - std::size_t size, - CUstream stream) +void BounceBufferRing::enqueue_h2d(void* device_dst, std::size_t size, CUstream stream) { KVIKIO_NVTX_FUNC_RANGE(); - CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyHtoDAsync( - convert_void2deviceptr(device_dst), buffer(), size, stream)); + if (_batch_copy) { + _batch_transfer_ctx.add_entry(device_dst, cur_buffer(), size); + } else { + CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyHtoDAsync( + convert_void2deviceptr(device_dst), cur_buffer(), size, stream)); + } } template -void BounceBufferRing::copy_from_device(void const* device_src, - std::size_t size, - CUstream stream) +void BounceBufferRing::enqueue_d2h(void* device_src, std::size_t size, CUstream stream) { KVIKIO_NVTX_FUNC_RANGE(); - CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyDtoHAsync( - buffer(), convert_void2deviceptr(device_src), size, stream)); + if (_batch_copy) { + _batch_transfer_ctx.add_entry(cur_buffer(), device_src, size); + } else { + CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyDtoHAsync( + cur_buffer(), convert_void2deviceptr(device_src), size, stream)); + } } template void BounceBufferRing::advance(CUstream stream) { KVIKIO_NVTX_FUNC_RANGE(); - _cur_buf_offset = 0; + _cur_buffer_offset = 0; ++_cur_buf_idx; if (_cur_buf_idx >= _num_buffers) { _cur_buf_idx = 0; } if (_cur_buf_idx == _initial_buf_idx) { + if (_batch_copy) { + batch_copy( + _batch_transfer_ctx.dsts, _batch_transfer_ctx.srcs, _batch_transfer_ctx.sizes, stream); + _batch_transfer_ctx.clear(); + } CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream)); _initial_buf_idx = _cur_buf_idx; } } template -void* BounceBufferRing::buffer() const noexcept +void* BounceBufferRing::cur_buffer() const noexcept { return _buffers[_cur_buf_idx].get(); } template -void* BounceBufferRing::buffer(std::ptrdiff_t off) const noexcept +void* BounceBufferRing::cur_buffer(std::ptrdiff_t off) const noexcept { return _buffers[_cur_buf_idx].get(off); } @@ -287,15 +361,15 @@ std::size_t BounceBufferRing::num_buffers() const noexcept } template -std::size_t BounceBufferRing::offset() const noexcept +std::size_t BounceBufferRing::cur_buffer_offset() const noexcept { - return _cur_buf_offset; + return _cur_buffer_offset; } template -std::size_t BounceBufferRing::remaining_bytes() const noexcept +std::size_t BounceBufferRing::cur_buffer_remaining_capacity() const noexcept { - return buffer_size() - _cur_buf_offset; + return buffer_size() - _cur_buffer_offset; } template @@ -305,23 +379,22 @@ void BounceBufferRing::accumulate_and_submit_h2d(void* device_dst, CUstream stream) { KVIKIO_NVTX_FUNC_RANGE(); - auto const* host_str_ptr = static_cast(host_src); + auto const* host_src_ptr = static_cast(host_src); auto* device_dst_ptr = static_cast(device_dst); while (size > 0) { - auto const cur_remaining_bytes = remaining_bytes(); - auto const num_bytes_to_copy = std::min(size, cur_remaining_bytes); + auto const remaining_bytes = cur_buffer_remaining_capacity(); + auto const num_bytes_to_copy = std::min(size, remaining_bytes); // Copy from host to bounce buffer - std::memcpy(buffer(_cur_buf_offset), host_str_ptr, num_bytes_to_copy); - _cur_buf_offset += num_bytes_to_copy; - host_str_ptr += num_bytes_to_copy; + std::memcpy(cur_buffer(_cur_buffer_offset), host_src_ptr, num_bytes_to_copy); + _cur_buffer_offset += num_bytes_to_copy; + host_src_ptr += num_bytes_to_copy; size -= num_bytes_to_copy; - if (_cur_buf_offset >= buffer_size()) { - copy_to_device(device_dst_ptr, _cur_buf_offset, stream); - device_dst_ptr += _cur_buf_offset; - advance(stream); + if (_cur_buffer_offset >= buffer_size()) { + submit_h2d(device_dst_ptr, buffer_size(), stream); + device_dst_ptr += buffer_size(); } } } @@ -330,7 +403,7 @@ template void BounceBufferRing::submit_h2d(void* device_dst, std::size_t size, CUstream stream) { KVIKIO_NVTX_FUNC_RANGE(); - copy_to_device(device_dst, size, stream); + enqueue_h2d(device_dst, size, stream); advance(stream); } @@ -338,9 +411,9 @@ template std::size_t BounceBufferRing::flush_h2d(void* device_dst, CUstream stream) { KVIKIO_NVTX_FUNC_RANGE(); - if (_cur_buf_offset == 0) { return 0; } - auto const flushed = _cur_buf_offset; - copy_to_device(device_dst, _cur_buf_offset, stream); + if (_cur_buffer_offset == 0) { return 0; } + auto const flushed = _cur_buffer_offset; + enqueue_h2d(device_dst, _cur_buffer_offset, stream); advance(stream); return flushed; } @@ -349,6 +422,11 @@ template void BounceBufferRing::synchronize(CUstream stream) { KVIKIO_NVTX_FUNC_RANGE(); + if (_batch_copy) { + batch_copy( + _batch_transfer_ctx.dsts, _batch_transfer_ctx.srcs, _batch_transfer_ctx.sizes, stream); + _batch_transfer_ctx.clear(); + } CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream)); } From d90a8018603a407473f54d9ce280a7aeac20a3fe Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Fri, 30 Jan 2026 11:26:36 -0500 Subject: [PATCH 03/37] Modify naming --- cpp/src/bounce_buffer.cpp | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/cpp/src/bounce_buffer.cpp b/cpp/src/bounce_buffer.cpp index 0ecf9961f7..2b26c0f155 100644 --- a/cpp/src/bounce_buffer.cpp +++ b/cpp/src/bounce_buffer.cpp @@ -218,10 +218,10 @@ template class BounceBufferPool; template class BounceBufferPool; namespace { -void separate_copy(std::span dsts, - std::span srcs, - std::span sizes, - CUstream stream) +void unbatched_copy(std::span dsts, + std::span srcs, + std::span sizes, + CUstream stream) { // Fall back to the conventional H2D copy if the batch copy API is not available. for (std::size_t i = 0; i < srcs.size(); ++i) { @@ -255,10 +255,10 @@ void batch_copy(std::span dsts, #endif stream)); } else { - separate_copy(dsts, srcs, sizes, stream); + unbatched_copy(dsts, srcs, sizes, stream); } #else - separate_copy(dsts, srcs, sizes, stream); + unbatched_copy(dsts, srcs, sizes, stream); #endif } } // namespace From 95e57c80425adcf64055665ab475d1f6a0f74f77 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Fri, 30 Jan 2026 11:28:36 -0500 Subject: [PATCH 04/37] Update --- cpp/src/bounce_buffer.cpp | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/cpp/src/bounce_buffer.cpp b/cpp/src/bounce_buffer.cpp index 2b26c0f155..1cb0a14c36 100644 --- a/cpp/src/bounce_buffer.cpp +++ b/cpp/src/bounce_buffer.cpp @@ -218,18 +218,27 @@ template class BounceBufferPool; template class BounceBufferPool; namespace { +/** + * @brief Issue individual async H2D copies for each transfer. + * + * Fallback when cuMemcpyBatchAsync is unavailable. + */ void unbatched_copy(std::span dsts, std::span srcs, std::span sizes, CUstream stream) { - // Fall back to the conventional H2D copy if the batch copy API is not available. for (std::size_t i = 0; i < srcs.size(); ++i) { CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyHtoDAsync( convert_void2deviceptr(dsts[i]), reinterpret_cast(srcs[i]), sizes[i], stream)); } } +/** + * @brief Issue H2D copies using batch API if available, otherwise fall back to individual copies. + * + * Uses cuMemcpyBatchAsync (CUDA 12.8+) to reduce API overhead for multiple small transfers. + */ void batch_copy(std::span dsts, std::span srcs, std::span sizes, From b13a2101ffab60476e087bc25cc4471ce547b612 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Fri, 30 Jan 2026 11:40:19 -0500 Subject: [PATCH 05/37] Update --- cpp/src/bounce_buffer.cpp | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/cpp/src/bounce_buffer.cpp b/cpp/src/bounce_buffer.cpp index 1cb0a14c36..73a369faac 100644 --- a/cpp/src/bounce_buffer.cpp +++ b/cpp/src/bounce_buffer.cpp @@ -39,6 +39,7 @@ void* CudaPinnedAllocator::allocate(std::size_t size) return buffer; } + void CudaPinnedAllocator::deallocate(void* buffer, std::size_t /*size*/) { CUDA_DRIVER_TRY(cudaAPI::instance().MemFreeHost(buffer)); @@ -116,7 +117,7 @@ template void* BounceBufferPool::Buffer::get(std::ptrdiff_t offset) const noexcept { KVIKIO_NVTX_FUNC_RANGE(); - return static_cast(_buffer) + offset; + return static_cast(_buffer) + offset; } template @@ -388,9 +389,10 @@ void BounceBufferRing::accumulate_and_submit_h2d(void* device_dst, CUstream stream) { KVIKIO_NVTX_FUNC_RANGE(); - auto const* host_src_ptr = static_cast(host_src); - auto* device_dst_ptr = static_cast(device_dst); + auto const* host_src_ptr = static_cast(host_src); + auto* device_dst_ptr = static_cast(device_dst); + // The data is split across multiple buffers if its size is greater than buffer_size() while (size > 0) { auto const remaining_bytes = cur_buffer_remaining_capacity(); auto const num_bytes_to_copy = std::min(size, remaining_bytes); From a4eae689aefd1ef4ba2d42377ee100ccf53a21f2 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Fri, 30 Jan 2026 12:09:33 -0500 Subject: [PATCH 06/37] Add return value for accumulate_and_submit_h2d --- cpp/include/kvikio/bounce_buffer.hpp | 16 +++++++++------- cpp/src/bounce_buffer.cpp | 10 ++++++---- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/cpp/include/kvikio/bounce_buffer.hpp b/cpp/include/kvikio/bounce_buffer.hpp index a5fcd97275..ba075a5e90 100644 --- a/cpp/include/kvikio/bounce_buffer.hpp +++ b/cpp/include/kvikio/bounce_buffer.hpp @@ -423,10 +423,11 @@ class BounceBufferRing { * Typical usage for streaming host data to device: * @code * while (has_more_data()) { - * ring.accumulate_and_submit_h2d(device_ptr, host_data, chunk_size, stream); - * device_ptr += chunk_size; // Note: only advance by submitted amount + * auto submitted = ring.accumulate_and_submit_h2d(device_ptr, host_data, chunk_size, stream); + * device_ptr += submitted; * } - * ring.flush_h2d(device_ptr, stream); + * auto flushed = ring.flush_h2d(device_ptr, stream); + * device_ptr += flushed; * ring.synchronize(stream); * @endcode * @@ -434,14 +435,15 @@ class BounceBufferRing { * @param host_src Source data in host memory. * @param size Bytes to copy. * @param stream CUDA stream for async H2D transfers. + * @return Number of bytes (in buffer sizes) that have actually been submitted for copy. * * @note Partial buffer contents remain until flush_h2d() is called. * @note Final data visibility requires flush_h2d() + synchronize(). */ - void accumulate_and_submit_h2d(void* device_dst, - void const* host_src, - std::size_t size, - CUstream stream); + std::size_t accumulate_and_submit_h2d(void* device_dst, + void const* host_src, + std::size_t size, + CUstream stream); /** * @brief Submit current buffer contents to device and advance to next buffer. diff --git a/cpp/src/bounce_buffer.cpp b/cpp/src/bounce_buffer.cpp index 73a369faac..a79aae1c50 100644 --- a/cpp/src/bounce_buffer.cpp +++ b/cpp/src/bounce_buffer.cpp @@ -383,10 +383,10 @@ std::size_t BounceBufferRing::cur_buffer_remaining_capacity() const n } template -void BounceBufferRing::accumulate_and_submit_h2d(void* device_dst, - void const* host_src, - std::size_t size, - CUstream stream) +std::size_t BounceBufferRing::accumulate_and_submit_h2d(void* device_dst, + void const* host_src, + std::size_t size, + CUstream stream) { KVIKIO_NVTX_FUNC_RANGE(); auto const* host_src_ptr = static_cast(host_src); @@ -408,6 +408,8 @@ void BounceBufferRing::accumulate_and_submit_h2d(void* device_dst, device_dst_ptr += buffer_size(); } } + + return static_cast(device_dst_ptr - static_cast(device_dst)); } template From 00338b33f1e49a253a71315196a2167d8f66bfc0 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Fri, 30 Jan 2026 13:24:54 -0500 Subject: [PATCH 07/37] Add reset --- cpp/include/kvikio/bounce_buffer.hpp | 15 ++++++++++++++- cpp/src/bounce_buffer.cpp | 11 +++++++++-- 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/cpp/include/kvikio/bounce_buffer.hpp b/cpp/include/kvikio/bounce_buffer.hpp index ba075a5e90..f9016680b2 100644 --- a/cpp/include/kvikio/bounce_buffer.hpp +++ b/cpp/include/kvikio/bounce_buffer.hpp @@ -286,6 +286,8 @@ using CudaPageAlignedPinnedBounceBufferPool = BounceBufferPool class BounceBufferRing { @@ -435,7 +437,8 @@ class BounceBufferRing { * @param host_src Source data in host memory. * @param size Bytes to copy. * @param stream CUDA stream for async H2D transfers. - * @return Number of bytes (in buffer sizes) that have actually been submitted for copy. + * @return Number of bytes submitted to device (always a multiple of buffer_size(); partial buffer + * contents remain until flush_h2d()). * * @note Partial buffer contents remain until flush_h2d() is called. * @note Final data visibility requires flush_h2d() + synchronize(). @@ -490,5 +493,15 @@ class BounceBufferRing { * @note After synchronize(), the ring can be reused for new transfers. */ void synchronize(CUstream stream); + + /** + * @brief Synchronize pending transfers and reset ring state for a new transfer session. + * + * Ensures all in-flight transfers complete, then resets the ring to its initial state. After + * reset(), the ring can be safely reused for either H2D or D2H operations. + * + * @param stream CUDA stream to synchronize. + */ + void reset(CUstream stream); }; } // namespace kvikio diff --git a/cpp/src/bounce_buffer.cpp b/cpp/src/bounce_buffer.cpp index a79aae1c50..922828102e 100644 --- a/cpp/src/bounce_buffer.cpp +++ b/cpp/src/bounce_buffer.cpp @@ -109,14 +109,12 @@ BounceBufferPool::Buffer& BounceBufferPool::Buffer::operat template void* BounceBufferPool::Buffer::get() const noexcept { - KVIKIO_NVTX_FUNC_RANGE(); return _buffer; } template void* BounceBufferPool::Buffer::get(std::ptrdiff_t offset) const noexcept { - KVIKIO_NVTX_FUNC_RANGE(); return static_cast(_buffer) + offset; } @@ -443,6 +441,15 @@ void BounceBufferRing::synchronize(CUstream stream) CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream)); } +template +void BounceBufferRing::reset(CUstream stream) +{ + synchronize(stream); + _cur_buf_idx = 0; + _initial_buf_idx = 0; + _cur_buffer_offset = 0; +} + // Explicit instantiations template class BounceBufferRing; template class BounceBufferRing; From ee33ad85933b9476fd8f2fb866e06ec05e416801 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Fri, 30 Jan 2026 17:45:53 -0500 Subject: [PATCH 08/37] Add clarifying comments on bounce buffer's potential multicontext use --- cpp/include/kvikio/bounce_buffer.hpp | 9 +++++++++ cpp/src/bounce_buffer.cpp | 10 ++++++---- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/cpp/include/kvikio/bounce_buffer.hpp b/cpp/include/kvikio/bounce_buffer.hpp index f9016680b2..49492f7a55 100644 --- a/cpp/include/kvikio/bounce_buffer.hpp +++ b/cpp/include/kvikio/bounce_buffer.hpp @@ -43,6 +43,9 @@ class PageAlignedAllocator { * transferred to/from GPU device memory. The allocation is only guaranteed to be aligned to "at * least 256 bytes". It is NOT guaranteed to be page aligned. * + * @note Allocations use CU_MEMHOSTALLOC_PORTABLE, making them accessible from all CUDA contexts, + * not just the one that performed the allocation. This allows the singleton BounceBufferPool to + * safely serve buffers across multiple contexts and devices. * @note Do NOT use with Direct I/O - lacks page alignment guarantee */ class CudaPinnedAllocator { @@ -71,6 +74,9 @@ class CudaPinnedAllocator { * (for efficient host-device transfers). Uses std::aligned_alloc followed by * cudaMemHostRegister to achieve both properties. * + * @note Registration uses CU_MEMHOSTALLOC_PORTABLE, making buffers accessible from all CUDA + * contexts, not just the one active during allocation. This allows the singleton BounceBufferPool + * to safely serve buffers across multiple contexts and devices. * @note This is the required allocator for Direct I/O with device memory. Requires a valid CUDA * context when allocating. */ @@ -111,6 +117,9 @@ class CudaPageAlignedPinnedAllocator { * - CudaPinnedAllocator: For device I/O without Direct I/O * - CudaPageAlignedPinnedAllocator: For device I/O with Direct I/O * + * @note The singleton pool is safe for use across multiple CUDA contexts. CUDA-pinned allocators + * use CU_MEMHOSTALLOC_PORTABLE, ensuring buffers allocated in one context can be used from any + * other context. * @note The destructor intentionally leaks allocations to avoid CUDA cleanup issues when static * destructors run after CUDA context destruction */ diff --git a/cpp/src/bounce_buffer.cpp b/cpp/src/bounce_buffer.cpp index 922828102e..400ca7f122 100644 --- a/cpp/src/bounce_buffer.cpp +++ b/cpp/src/bounce_buffer.cpp @@ -31,10 +31,9 @@ void* CudaPinnedAllocator::allocate(std::size_t size) { void* buffer{}; - // If no available allocation, allocate and register a new one - // Allocate page-locked host memory - // Under unified addressing, host memory allocated this way is automatically portable and - // mapped. + // Allocate page-locked (pinned) host memory with CU_MEMHOSTALLOC_PORTABLE. The PORTABLE flag + // ensures this memory is accessible from all CUDA contexts, which is essential for the singleton + // BounceBufferPool that may serve multiple contexts and devices. CUDA_DRIVER_TRY(cudaAPI::instance().MemHostAlloc(&buffer, size, CU_MEMHOSTALLOC_PORTABLE)); return buffer; @@ -52,6 +51,9 @@ void* CudaPageAlignedPinnedAllocator::allocate(std::size_t size) auto const aligned_size = detail::align_up(size, page_size); buffer = std::aligned_alloc(page_size, aligned_size); KVIKIO_EXPECT(buffer != nullptr, "Aligned allocation failed"); + // Register the page-aligned allocation as pinned memory with CU_MEMHOSTALLOC_PORTABLE. The + // PORTABLE flag ensures this memory is accessible from all CUDA contexts, which is essential for + // the singleton BounceBufferPool that may serve multiple contexts and devices. CUDA_DRIVER_TRY( cudaAPI::instance().MemHostRegister(buffer, aligned_size, CU_MEMHOSTALLOC_PORTABLE)); return buffer; From 91a6bee72f15390cbcd98804b31c8376bfa2a8b5 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Fri, 30 Jan 2026 17:52:08 -0500 Subject: [PATCH 09/37] Clarify the multicontext issue for bounce buffer ring --- cpp/include/kvikio/bounce_buffer.hpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cpp/include/kvikio/bounce_buffer.hpp b/cpp/include/kvikio/bounce_buffer.hpp index 49492f7a55..5d2428ba84 100644 --- a/cpp/include/kvikio/bounce_buffer.hpp +++ b/cpp/include/kvikio/bounce_buffer.hpp @@ -293,6 +293,9 @@ using CudaPageAlignedPinnedBounceBufferPool = BounceBufferPool Date: Sun, 1 Feb 2026 16:40:58 -0500 Subject: [PATCH 10/37] Implement event pool --- cpp/CMakeLists.txt | 4 ++- cpp/include/kvikio/detail/event.hpp | 10 ++++++ cpp/include/kvikio/detail/posix_io.hpp | 37 ++-------------------- cpp/include/kvikio/detail/stream.hpp | 43 ++++++++++++++++++++++++++ cpp/src/detail/event.cpp | 9 ++++++ cpp/src/detail/posix_io.cpp | 33 +------------------- cpp/src/detail/stream.cpp | 39 +++++++++++++++++++++++ cpp/src/mmap.cpp | 4 +-- cpp/src/remote_handle.cpp | 4 +-- 9 files changed, 112 insertions(+), 71 deletions(-) create mode 100644 cpp/include/kvikio/detail/event.hpp create mode 100644 cpp/include/kvikio/detail/stream.hpp create mode 100644 cpp/src/detail/event.cpp create mode 100644 cpp/src/detail/stream.cpp diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index a8897a27e2..a2b22031e7 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -1,6 +1,6 @@ # ============================================================================= # cmake-format: off -# SPDX-FileCopyrightText: Copyright (c) 2021-2025, NVIDIA CORPORATION. +# SPDX-FileCopyrightText: Copyright (c) 2021-2026, NVIDIA CORPORATION. # SPDX-License-Identifier: Apache-2.0 # cmake-format: on # ============================================================================= @@ -153,8 +153,10 @@ set(SOURCES "src/file_utils.cpp" "src/mmap.cpp" "src/detail/env.cpp" + "src/detail/event.cpp" "src/detail/nvtx.cpp" "src/detail/posix_io.cpp" + "src/detail/stream.cpp" "src/shim/cuda.cpp" "src/shim/cufile.cpp" "src/shim/utils.cpp" diff --git a/cpp/include/kvikio/detail/event.hpp b/cpp/include/kvikio/detail/event.hpp new file mode 100644 index 0000000000..7ecbdd9787 --- /dev/null +++ b/cpp/include/kvikio/detail/event.hpp @@ -0,0 +1,10 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. + * SPDX-License-Identifier: Apache-2.0 + */ +#pragma once + +#include + +namespace kvikio::detail { +} diff --git a/cpp/include/kvikio/detail/posix_io.hpp b/cpp/include/kvikio/detail/posix_io.hpp index 2997337906..5332892635 100644 --- a/cpp/include/kvikio/detail/posix_io.hpp +++ b/cpp/include/kvikio/detail/posix_io.hpp @@ -1,18 +1,18 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2022-2025, NVIDIA CORPORATION. + * SPDX-FileCopyrightText: Copyright (c) 2022-2026, NVIDIA CORPORATION. * SPDX-License-Identifier: Apache-2.0 */ #pragma once #include + #include #include -#include -#include #include #include #include +#include #include #include #include @@ -36,37 +36,6 @@ enum class PartialIO : uint8_t { NO, ///< POSIX read/write is called repeatedly until all requested bytes are processed. }; -/** - * @brief Singleton class to retrieve a CUDA stream for device-host copying - * - * Call `StreamsByThread::get` to get the CUDA stream assigned to the current - * CUDA context and thread. - */ -class StreamsByThread { - private: - std::map, CUstream> _streams; - - public: - StreamsByThread() = default; - - // Here we intentionally do not destroy in the destructor the CUDA resources - // (e.g. CUstream) with static storage duration, but instead let them leak - // on program termination. This is to prevent undefined behavior in CUDA. See - // - // This also prevents crash (segmentation fault) if clients call - // cuDevicePrimaryCtxReset() or cudaDeviceReset() before program termination. - ~StreamsByThread() = default; - - KVIKIO_EXPORT static CUstream get(CUcontext ctx, std::thread::id thd_id); - - static CUstream get(); - - StreamsByThread(StreamsByThread const&) = delete; - StreamsByThread& operator=(StreamsByThread const&) = delete; - StreamsByThread(StreamsByThread&& o) = delete; - StreamsByThread& operator=(StreamsByThread&& o) = delete; -}; - /** * @brief Read or write host memory to or from disk using POSIX with opportunistic Direct I/O * diff --git a/cpp/include/kvikio/detail/stream.hpp b/cpp/include/kvikio/detail/stream.hpp new file mode 100644 index 0000000000..74b24ca5fa --- /dev/null +++ b/cpp/include/kvikio/detail/stream.hpp @@ -0,0 +1,43 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. + * SPDX-License-Identifier: Apache-2.0 + */ +#pragma once + +#include +#include + +#include + +namespace kvikio::detail { +/** + * @brief Singleton class to retrieve a CUDA stream for device-host copying + * + * Call `StreamsByThread::get` to get the CUDA stream assigned to the current + * CUDA context and thread. + */ +class StreamsByThread { + private: + std::map, CUstream> _streams; + + public: + StreamsByThread() = default; + + // Here we intentionally do not destroy in the destructor the CUDA resources + // (e.g. CUstream) with static storage duration, but instead let them leak + // on program termination. This is to prevent undefined behavior in CUDA. See + // + // This also prevents crash (segmentation fault) if clients call + // cuDevicePrimaryCtxReset() or cudaDeviceReset() before program termination. + ~StreamsByThread() = default; + + KVIKIO_EXPORT static CUstream get(CUcontext ctx, std::thread::id thd_id); + + static CUstream get(); + + StreamsByThread(StreamsByThread const&) = delete; + StreamsByThread& operator=(StreamsByThread const&) = delete; + StreamsByThread(StreamsByThread&& o) = delete; + StreamsByThread& operator=(StreamsByThread&& o) = delete; +}; +} // namespace kvikio::detail diff --git a/cpp/src/detail/event.cpp b/cpp/src/detail/event.cpp new file mode 100644 index 0000000000..37c6796a2a --- /dev/null +++ b/cpp/src/detail/event.cpp @@ -0,0 +1,9 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include + +namespace kvikio::detail { +} diff --git a/cpp/src/detail/posix_io.cpp b/cpp/src/detail/posix_io.cpp index 1ddafa4d3e..42091c55c8 100644 --- a/cpp/src/detail/posix_io.cpp +++ b/cpp/src/detail/posix_io.cpp @@ -1,13 +1,11 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION. + * SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION. * SPDX-License-Identifier: Apache-2.0 */ #include #include #include -#include -#include #include #include @@ -17,35 +15,6 @@ #include namespace kvikio::detail { - -CUstream StreamsByThread::get(CUcontext ctx, std::thread::id thd_id) -{ - KVIKIO_NVTX_FUNC_RANGE(); - static StreamsByThread _instance; - - // If no current context, we return the null/default stream - if (ctx == nullptr) { return nullptr; } - auto key = std::make_pair(ctx, thd_id); - - // Create a new stream if `ctx` doesn't have one. - if (auto search = _instance._streams.find(key); search == _instance._streams.end()) { - CUstream stream{}; - CUDA_DRIVER_TRY(cudaAPI::instance().StreamCreate(&stream, CU_STREAM_DEFAULT)); - _instance._streams[key] = stream; - return stream; - } else { - return search->second; - } -} - -CUstream StreamsByThread::get() -{ - KVIKIO_NVTX_FUNC_RANGE(); - CUcontext ctx{nullptr}; - CUDA_DRIVER_TRY(cudaAPI::instance().CtxGetCurrent(&ctx)); - return get(ctx, std::this_thread::get_id()); -} - std::size_t posix_device_read(int fd_direct_off, void const* devPtr_base, std::size_t size, diff --git a/cpp/src/detail/stream.cpp b/cpp/src/detail/stream.cpp new file mode 100644 index 0000000000..e79cc9122b --- /dev/null +++ b/cpp/src/detail/stream.cpp @@ -0,0 +1,39 @@ + +/* + * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include +#include + +namespace kvikio::detail { + +CUstream StreamsByThread::get(CUcontext ctx, std::thread::id thd_id) +{ + KVIKIO_NVTX_FUNC_RANGE(); + static StreamsByThread _instance; + + // If no current context, we return the null/default stream + if (ctx == nullptr) { return nullptr; } + auto key = std::make_pair(ctx, thd_id); + + // Create a new stream if `ctx` doesn't have one. + if (auto search = _instance._streams.find(key); search == _instance._streams.end()) { + CUstream stream{}; + CUDA_DRIVER_TRY(cudaAPI::instance().StreamCreate(&stream, CU_STREAM_DEFAULT)); + _instance._streams[key] = stream; + return stream; + } else { + return search->second; + } +} + +CUstream StreamsByThread::get() +{ + KVIKIO_NVTX_FUNC_RANGE(); + CUcontext ctx{nullptr}; + CUDA_DRIVER_TRY(cudaAPI::instance().CtxGetCurrent(&ctx)); + return get(ctx, std::this_thread::get_id()); +} +} // namespace kvikio::detail diff --git a/cpp/src/mmap.cpp b/cpp/src/mmap.cpp index ff579cfa4e..d6bbd91b6d 100644 --- a/cpp/src/mmap.cpp +++ b/cpp/src/mmap.cpp @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION. + * SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION. * SPDX-License-Identifier: Apache-2.0 */ #include @@ -16,7 +16,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/cpp/src/remote_handle.cpp b/cpp/src/remote_handle.cpp index 7c917c9a0b..cb7a8226ea 100644 --- a/cpp/src/remote_handle.cpp +++ b/cpp/src/remote_handle.cpp @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION. + * SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION. * SPDX-License-Identifier: Apache-2.0 */ @@ -19,8 +19,8 @@ #include #include #include -#include #include +#include #include #include #include From 3948400a01934f41bc29809893d8f9e9230aff82 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Mon, 2 Feb 2026 00:33:22 -0500 Subject: [PATCH 11/37] Fix stream race condition --- cpp/CMakeLists.txt | 1 - cpp/include/kvikio/detail/event.hpp | 10 ----- cpp/include/kvikio/detail/posix_io.hpp | 2 +- cpp/include/kvikio/detail/stream.hpp | 57 ++++++++++++++++++-------- cpp/src/detail/event.cpp | 9 ---- cpp/src/detail/stream.cpp | 12 ++++-- cpp/src/mmap.cpp | 2 +- cpp/src/remote_handle.cpp | 2 +- 8 files changed, 51 insertions(+), 44 deletions(-) delete mode 100644 cpp/include/kvikio/detail/event.hpp delete mode 100644 cpp/src/detail/event.cpp diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index a2b22031e7..eacb406732 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -153,7 +153,6 @@ set(SOURCES "src/file_utils.cpp" "src/mmap.cpp" "src/detail/env.cpp" - "src/detail/event.cpp" "src/detail/nvtx.cpp" "src/detail/posix_io.cpp" "src/detail/stream.cpp" diff --git a/cpp/include/kvikio/detail/event.hpp b/cpp/include/kvikio/detail/event.hpp deleted file mode 100644 index 7ecbdd9787..0000000000 --- a/cpp/include/kvikio/detail/event.hpp +++ /dev/null @@ -1,10 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. - * SPDX-License-Identifier: Apache-2.0 - */ -#pragma once - -#include - -namespace kvikio::detail { -} diff --git a/cpp/include/kvikio/detail/posix_io.hpp b/cpp/include/kvikio/detail/posix_io.hpp index 5332892635..89aba27819 100644 --- a/cpp/include/kvikio/detail/posix_io.hpp +++ b/cpp/include/kvikio/detail/posix_io.hpp @@ -207,7 +207,7 @@ std::size_t posix_device_io(int fd_direct_off, off_t const chunk_size2 = convert_size2off(bounce_buffer.size()); // Get a stream for the current CUDA context and thread - CUstream stream = StreamsByThread::get(); + CUstream stream = StreamCachePerThreadAndContext::get(); while (bytes_remaining > 0) { off_t const nbytes_requested = std::min(chunk_size2, bytes_remaining); diff --git a/cpp/include/kvikio/detail/stream.hpp b/cpp/include/kvikio/detail/stream.hpp index 74b24ca5fa..49563b6e10 100644 --- a/cpp/include/kvikio/detail/stream.hpp +++ b/cpp/include/kvikio/detail/stream.hpp @@ -5,39 +5,62 @@ #pragma once #include +#include #include #include namespace kvikio::detail { /** - * @brief Singleton class to retrieve a CUDA stream for device-host copying + * @brief Singleton cache that provides one CUDA stream per (context, thread) pair. * - * Call `StreamsByThread::get` to get the CUDA stream assigned to the current - * CUDA context and thread. + * This class manages CUDA streams used for host-device memory transfers. Each unique combination of + * CUDA context and calling thread is assigned a dedicated stream, which is created lazily on first + * access and reused for subsequent calls. + * + * The cache is thread-safe and handles concurrent access from multiple threads. + * + * @note CUDA streams are intentionally leaked on program termination rather than destroyed in the + * destructor. This avoids undefined behavior that can occur when destroying CUDA resources during + * static destruction, and prevents crashes (segmentation faults) if clients call + * cuDevicePrimaryCtxReset() or cudaDeviceReset() before program termination. See: + * https://docs.nvidia.com/cuda/cuda-c-programming-guide/index.html#initialization */ -class StreamsByThread { +class StreamCachePerThreadAndContext { private: std::map, CUstream> _streams; + std::mutex mutable _mutex; public: - StreamsByThread() = default; - - // Here we intentionally do not destroy in the destructor the CUDA resources - // (e.g. CUstream) with static storage duration, but instead let them leak - // on program termination. This is to prevent undefined behavior in CUDA. See - // - // This also prevents crash (segmentation fault) if clients call - // cuDevicePrimaryCtxReset() or cudaDeviceReset() before program termination. - ~StreamsByThread() = default; + StreamCachePerThreadAndContext() = default; + ~StreamCachePerThreadAndContext() = default; + /** + * @brief Get or create a CUDA stream for the specified context and thread. + * + * If a stream already exists for the given (context, thread) pair, it is returned. Otherwise, a + * new stream is created, cached, and returned. + * + * @param ctx The CUDA context. If null, the null stream is returned. + * @param thd_id The thread identifier. + * @return The CUDA stream associated with this (context, thread) pair, or nullptr if @p ctx is + * null. + */ KVIKIO_EXPORT static CUstream get(CUcontext ctx, std::thread::id thd_id); + /** + * @brief Get or create a CUDA stream for the current context and thread. + * + * Convenience overload that uses the current CUDA context and calling thread's ID. + * + * @return The CUDA stream associated with the current (context, thread) pair, or nullptr if no + * CUDA context is current. + */ static CUstream get(); - StreamsByThread(StreamsByThread const&) = delete; - StreamsByThread& operator=(StreamsByThread const&) = delete; - StreamsByThread(StreamsByThread&& o) = delete; - StreamsByThread& operator=(StreamsByThread&& o) = delete; + StreamCachePerThreadAndContext(StreamCachePerThreadAndContext const&) = delete; + StreamCachePerThreadAndContext& operator=(StreamCachePerThreadAndContext const&) = delete; + StreamCachePerThreadAndContext(StreamCachePerThreadAndContext&& o) = delete; + StreamCachePerThreadAndContext& operator=(StreamCachePerThreadAndContext&& o) = delete; }; } // namespace kvikio::detail diff --git a/cpp/src/detail/event.cpp b/cpp/src/detail/event.cpp deleted file mode 100644 index 37c6796a2a..0000000000 --- a/cpp/src/detail/event.cpp +++ /dev/null @@ -1,9 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. - * SPDX-License-Identifier: Apache-2.0 - */ - -#include - -namespace kvikio::detail { -} diff --git a/cpp/src/detail/stream.cpp b/cpp/src/detail/stream.cpp index e79cc9122b..a0907330c8 100644 --- a/cpp/src/detail/stream.cpp +++ b/cpp/src/detail/stream.cpp @@ -4,21 +4,25 @@ * SPDX-License-Identifier: Apache-2.0 */ +#include + #include #include namespace kvikio::detail { -CUstream StreamsByThread::get(CUcontext ctx, std::thread::id thd_id) +CUstream StreamCachePerThreadAndContext::get(CUcontext ctx, std::thread::id thd_id) { KVIKIO_NVTX_FUNC_RANGE(); - static StreamsByThread _instance; + static StreamCachePerThreadAndContext _instance; // If no current context, we return the null/default stream if (ctx == nullptr) { return nullptr; } + + std::lock_guard const lock(_instance._mutex); auto key = std::make_pair(ctx, thd_id); - // Create a new stream if `ctx` doesn't have one. + // Create a new stream if this (context, thread) pair doesn't have one. if (auto search = _instance._streams.find(key); search == _instance._streams.end()) { CUstream stream{}; CUDA_DRIVER_TRY(cudaAPI::instance().StreamCreate(&stream, CU_STREAM_DEFAULT)); @@ -29,7 +33,7 @@ CUstream StreamsByThread::get(CUcontext ctx, std::thread::id thd_id) } } -CUstream StreamsByThread::get() +CUstream StreamCachePerThreadAndContext::get() { KVIKIO_NVTX_FUNC_RANGE(); CUcontext ctx{nullptr}; diff --git a/cpp/src/mmap.cpp b/cpp/src/mmap.cpp index d6bbd91b6d..9689f08aa8 100644 --- a/cpp/src/mmap.cpp +++ b/cpp/src/mmap.cpp @@ -191,7 +191,7 @@ void read_impl(void* dst_buf, // - Copy from the bounce buffer to the device buffer PushAndPopContext c(ctx); - CUstream stream = detail::StreamsByThread::get(); + CUstream stream = detail::StreamCachePerThreadAndContext::get(); auto h2d_batch_cpy_sync = [](CUdeviceptr dst_devptr, CUdeviceptr src_devptr, std::size_t size, CUstream stream) { diff --git a/cpp/src/remote_handle.cpp b/cpp/src/remote_handle.cpp index cb7a8226ea..210ee9a31c 100644 --- a/cpp/src/remote_handle.cpp +++ b/cpp/src/remote_handle.cpp @@ -791,7 +791,7 @@ std::size_t RemoteHandle::read(void* buf, std::size_t size, std::size_t file_off PushAndPopContext c(get_context_from_pointer(buf)); // We use a bounce buffer to avoid many small memory copies to device. Libcurl has a // maximum chunk size of 16kb (`CURL_MAX_WRITE_SIZE`) but chunks are often much smaller. - BounceBufferH2D bounce_buffer(detail::StreamsByThread::get(), buf); + BounceBufferH2D bounce_buffer(detail::StreamCachePerThreadAndContext::get(), buf); ctx.bounce_buffer = &bounce_buffer; curl.perform(); } From 43a38d9369b56b6800aa64a6e2eb1c3895b80dc5 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Mon, 2 Feb 2026 09:56:32 -0500 Subject: [PATCH 12/37] Make ctor dtor private. Remove inner NVTX --- cpp/include/kvikio/detail/stream.hpp | 3 ++- cpp/src/detail/stream.cpp | 1 - 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/include/kvikio/detail/stream.hpp b/cpp/include/kvikio/detail/stream.hpp index 49563b6e10..a2f155fca7 100644 --- a/cpp/include/kvikio/detail/stream.hpp +++ b/cpp/include/kvikio/detail/stream.hpp @@ -31,10 +31,11 @@ class StreamCachePerThreadAndContext { std::map, CUstream> _streams; std::mutex mutable _mutex; - public: + private: StreamCachePerThreadAndContext() = default; ~StreamCachePerThreadAndContext() = default; + public: /** * @brief Get or create a CUDA stream for the specified context and thread. * diff --git a/cpp/src/detail/stream.cpp b/cpp/src/detail/stream.cpp index a0907330c8..9f517d393c 100644 --- a/cpp/src/detail/stream.cpp +++ b/cpp/src/detail/stream.cpp @@ -13,7 +13,6 @@ namespace kvikio::detail { CUstream StreamCachePerThreadAndContext::get(CUcontext ctx, std::thread::id thd_id) { - KVIKIO_NVTX_FUNC_RANGE(); static StreamCachePerThreadAndContext _instance; // If no current context, we return the null/default stream From bf207449e89b1329663bd954e96cfba4247d8786 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Mon, 2 Feb 2026 14:54:36 -0500 Subject: [PATCH 13/37] Initial impl of event pool --- cpp/CMakeLists.txt | 1 + cpp/include/kvikio/detail/event.hpp | 68 ++++++++++++++++ cpp/include/kvikio/shim/cuda.hpp | 4 + cpp/src/detail/event.cpp | 120 ++++++++++++++++++++++++++++ cpp/src/shim/cuda.cpp | 4 + 5 files changed, 197 insertions(+) create mode 100644 cpp/include/kvikio/detail/event.hpp create mode 100644 cpp/src/detail/event.cpp diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index eacb406732..a2b22031e7 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -153,6 +153,7 @@ set(SOURCES "src/file_utils.cpp" "src/mmap.cpp" "src/detail/env.cpp" + "src/detail/event.cpp" "src/detail/nvtx.cpp" "src/detail/posix_io.cpp" "src/detail/stream.cpp" diff --git a/cpp/include/kvikio/detail/event.hpp b/cpp/include/kvikio/detail/event.hpp new file mode 100644 index 0000000000..fac7170f41 --- /dev/null +++ b/cpp/include/kvikio/detail/event.hpp @@ -0,0 +1,68 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. + * SPDX-License-Identifier: Apache-2.0 + */ +#pragma once + +#include +#include +#include + +#include + +namespace kvikio::detail { +class EventPool { + public: + class Event { + private: + CUevent _event{nullptr}; + CUcontext _context{nullptr}; + + public: + explicit Event(CUevent event, CUcontext context) noexcept; + ~Event() noexcept; + + // Move-only + Event(Event const&) = delete; + Event& operator=(Event const&) = delete; + Event(Event&& other) noexcept; + Event& operator=(Event&& other) noexcept; + + [[nodiscard]] CUevent get() const noexcept; + + [[nodiscard]] CUcontext context() const noexcept; + + void record(CUstream stream); + + void synchronize(); + }; + + private: + std::mutex mutable _mutex; + std::unordered_map> _pools; + + EventPool() = default; + + // Intentionally leak events during static destruction. \sa BounceBufferPool + ~EventPool() noexcept = default; + + public: + // Non-copyable, non-movable singleton + EventPool(EventPool const&) = delete; + EventPool& operator=(EventPool const&) = delete; + EventPool(EventPool&&) = delete; + EventPool& operator=(EventPool&&) = delete; + + [[nodiscard]] Event get(); + + [[nodiscard]] Event get(CUcontext context); + + void put(CUevent event, CUcontext context) noexcept; + + [[nodiscard]] std::size_t num_free_events(CUcontext context) const; + + [[nodiscard]] std::size_t total_free_events() const; + + static EventPool& instance(); +}; +} // namespace kvikio::detail diff --git a/cpp/include/kvikio/shim/cuda.hpp b/cpp/include/kvikio/shim/cuda.hpp index 22c8276817..95a30ccb07 100644 --- a/cpp/include/kvikio/shim/cuda.hpp +++ b/cpp/include/kvikio/shim/cuda.hpp @@ -107,6 +107,10 @@ class cudaAPI { decltype(cuStreamCreate)* StreamCreate{nullptr}; decltype(cuStreamDestroy)* StreamDestroy{nullptr}; decltype(cuDriverGetVersion)* DriverGetVersion{nullptr}; + decltype(cuEventSynchronize)* EventSynchronize{nullptr}; + decltype(cuEventCreate)* EventCreate{nullptr}; + decltype(cuEventDestroy)* EventDestroy{nullptr}; + decltype(cuEventRecord)* EventRecord{nullptr}; private: cudaAPI(); diff --git a/cpp/src/detail/event.cpp b/cpp/src/detail/event.cpp new file mode 100644 index 0000000000..7ea38fdc8b --- /dev/null +++ b/cpp/src/detail/event.cpp @@ -0,0 +1,120 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include +#include +#include +#include + +namespace kvikio::detail { + +EventPool::Event::Event(CUevent event, CUcontext context) noexcept + : _event(event), _context(context) +{ +} + +EventPool::Event::~Event() noexcept +{ + if (_event != nullptr) { EventPool::instance().put(_event, _context); } +} + +EventPool::Event::Event(Event&& other) noexcept + : _event(std::exchange(other._event, nullptr)), _context(std::exchange(other._context, nullptr)) +{ +} + +EventPool::Event& EventPool::Event::operator=(Event&& other) noexcept +{ + if (this != &other) { + if (_event != nullptr) { EventPool::instance().put(_event, _context); } + _event = std::exchange(other._event, nullptr); + _context = std::exchange(other._context, nullptr); + } + return *this; +} + +CUevent EventPool::Event::get() const noexcept { return _event; } + +CUcontext EventPool::Event::context() const noexcept { return _context; } + +void EventPool::Event::record(CUstream stream) +{ + CUDA_DRIVER_TRY(cudaAPI::instance().EventRecord(_event, stream)); +} + +void EventPool::Event::synchronize() +{ + KVIKIO_NVTX_FUNC_RANGE(); + CUDA_DRIVER_TRY(cudaAPI::instance().EventSynchronize(_event)); +} + +EventPool::Event EventPool::get() +{ + KVIKIO_NVTX_FUNC_RANGE(); + CUcontext ctx{}; + CUDA_DRIVER_TRY(cudaAPI::instance().CtxGetCurrent(&ctx)); + KVIKIO_EXPECT(ctx != nullptr, "No CUDA context is current"); + return get(ctx); +} + +EventPool::Event EventPool::get(CUcontext context) +{ + KVIKIO_EXPECT(context != nullptr, "No CUDA context is current"); + CUevent event{}; + { + std::lock_guard const lock(_mutex); + auto it = _pools.find(context); + if (it != _pools.end() && !it->second.empty()) { + event = it->second.back(); + it->second.pop_back(); + } + } + + // Create the event outside the lock + if (event == nullptr) { + CUDA_DRIVER_TRY(cudaAPI::instance().EventCreate(&event, CU_EVENT_DISABLE_TIMING)); + } + + return Event(event, context); +} + +void EventPool::put(CUevent event, CUcontext context) noexcept +{ + KVIKIO_NVTX_FUNC_RANGE(); + if (event == nullptr) { return; } + + try { + std::lock_guard const lock(_mutex); + _pools[context].push_back(event); + } catch (...) { + // If returning to pool fails, destroy the event + cudaAPI::instance().EventDestroy(event); + } +} + +std::size_t EventPool::num_free_events(CUcontext context) const +{ + std::lock_guard const lock(_mutex); + auto it = _pools.find(context); + return (it != _pools.end()) ? it->second.size() : 0; +} + +std::size_t EventPool::total_free_events() const +{ + std::lock_guard const lock(_mutex); + std::size_t total{0}; + for (auto const& [ctx, events] : _pools) { + total += events.size(); + } + return total; +} + +EventPool& EventPool::instance() +{ + static EventPool pool; + return pool; +} + +} // namespace kvikio::detail diff --git a/cpp/src/shim/cuda.cpp b/cpp/src/shim/cuda.cpp index 693dd1bd2a..cf5b9e9239 100644 --- a/cpp/src/shim/cuda.cpp +++ b/cpp/src/shim/cuda.cpp @@ -41,6 +41,10 @@ cudaAPI::cudaAPI() get_symbol(StreamCreate, lib, KVIKIO_STRINGIFY(cuStreamCreate)); get_symbol(StreamDestroy, lib, KVIKIO_STRINGIFY(cuStreamDestroy)); get_symbol(DriverGetVersion, lib, KVIKIO_STRINGIFY(cuDriverGetVersion)); + get_symbol(EventSynchronize, lib, KVIKIO_STRINGIFY(cuSEventSynchronize)); + get_symbol(EventCreate, lib, KVIKIO_STRINGIFY(cuEventCreate)); + get_symbol(EventDestroy, lib, KVIKIO_STRINGIFY(cuEventDestroy)); + get_symbol(EventRecord, lib, KVIKIO_STRINGIFY(cuEventRecord)); CUDA_DRIVER_TRY(DriverGetVersion(&driver_version)); From 7e0c71f0b5e35217c27d87565baef1d00b5604ef Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Mon, 2 Feb 2026 15:07:18 -0500 Subject: [PATCH 14/37] Set a get() overload to private --- cpp/include/kvikio/detail/stream.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/include/kvikio/detail/stream.hpp b/cpp/include/kvikio/detail/stream.hpp index a2f155fca7..bcd19ad676 100644 --- a/cpp/include/kvikio/detail/stream.hpp +++ b/cpp/include/kvikio/detail/stream.hpp @@ -35,7 +35,6 @@ class StreamCachePerThreadAndContext { StreamCachePerThreadAndContext() = default; ~StreamCachePerThreadAndContext() = default; - public: /** * @brief Get or create a CUDA stream for the specified context and thread. * @@ -49,6 +48,7 @@ class StreamCachePerThreadAndContext { */ KVIKIO_EXPORT static CUstream get(CUcontext ctx, std::thread::id thd_id); + public: /** * @brief Get or create a CUDA stream for the current context and thread. * From 74e6740f46360ee35b9991ae55f7c98f3cf661f4 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Mon, 2 Feb 2026 15:58:43 -0500 Subject: [PATCH 15/37] Add Doxygen comments --- cpp/include/kvikio/detail/event.hpp | 92 +++++++++++++++++++++++++++-- cpp/src/detail/event.cpp | 15 ++--- 2 files changed, 94 insertions(+), 13 deletions(-) diff --git a/cpp/include/kvikio/detail/event.hpp b/cpp/include/kvikio/detail/event.hpp index fac7170f41..672cfb4139 100644 --- a/cpp/include/kvikio/detail/event.hpp +++ b/cpp/include/kvikio/detail/event.hpp @@ -11,15 +11,39 @@ #include namespace kvikio::detail { +/** + * @brief Thread-safe singleton pool for reusable CUDA events + * + * Manages a pool of CUDA events organized by CUDA context. Events are retained and reused across + * calls to minimize allocation overhead. Each context maintains its own separate pool of events + * since CUDA events are context-specific resources. + * + * All events are created with `CU_EVENT_DISABLE_TIMING` for minimal overhead. + * + * Call `EventPool::instance().get()` to acquire an event that will be automatically returned to the + * pool when it goes out of scope (RAII). + * + * @note The destructor intentionally leaks events to avoid CUDA cleanup issues when static + * destructors run after CUDA context destruction. @sa BounceBufferPool + */ class EventPool { public: class Event { + friend class EventPool; + private: CUevent _event{nullptr}; CUcontext _context{nullptr}; - public: + /** + * @brief Construct an Event wrapping a CUDA event handle + * + * @param event The CUDA event handle to wrap + * @param context The CUDA context associated with this event + */ explicit Event(CUevent event, CUcontext context) noexcept; + + public: ~Event() noexcept; // Move-only @@ -28,22 +52,49 @@ class EventPool { Event(Event&& other) noexcept; Event& operator=(Event&& other) noexcept; + /** + * @brief Get the underlying CUDA event handle + * + * @return The CUevent handle wrapped by this object + */ [[nodiscard]] CUevent get() const noexcept; + /** + * @brief Get the CUDA context associated with this event + * + * @return The CUcontext this event belongs to + */ [[nodiscard]] CUcontext context() const noexcept; + /** + * @brief Record the event on a CUDA stream + * + * Records the event to capture the current state of the stream. The event will be signaled when + * all preceding operations on the stream have completed. + * + * @param stream The CUDA stream to record the event on (must belong to the same context as this + * event) + */ void record(CUstream stream); + /** + * @brief Block the calling thread until the event has been signaled + * + * Waits for all work captured by a preceding record() call to complete. + * + * @exception CudaError if the synchronize operation fails + */ void synchronize(); }; private: std::mutex mutable _mutex; + // Per-context pools of free events std::unordered_map> _pools; EventPool() = default; - // Intentionally leak events during static destruction. \sa BounceBufferPool + // Intentionally leak events during static destruction. @sa BounceBufferPool ~EventPool() noexcept = default; public: @@ -53,16 +104,49 @@ class EventPool { EventPool(EventPool&&) = delete; EventPool& operator=(EventPool&&) = delete; + /** + * @brief Acquire a CUDA event from the pool + * + * Returns a cached event for the current CUDA context if available, otherwise creates a new one. + * The returned Event object will automatically return the event to the pool when it goes out of + * scope. + * + * @return RAII Event object wrapping the acquired CUDA event + * @exception CudaError if no CUDA context is current or event creation fails + */ [[nodiscard]] Event get(); - [[nodiscard]] Event get(CUcontext context); - + /** + * @brief Return an event to the pool for reuse + * + * Typically called automatically by Event's destructor. Adds the event to the pool associated + * with its context for future reuse. + * + * @param event The CUDA event handle to return + * @param context The CUDA context associated with the event + */ void put(CUevent event, CUcontext context) noexcept; + /** + * @brief Get the number of free events for a specific context + * + * @param context The CUDA context to query + * @return The number of events available for reuse in that context's pool + */ [[nodiscard]] std::size_t num_free_events(CUcontext context) const; + /** + * @brief Get the total number of free events across all contexts + * + * @return The total count of events available for reuse + */ [[nodiscard]] std::size_t total_free_events() const; + /** + * @brief Get the singleton instance of the event pool + * + * @return Reference to the singleton EventPool instance + */ static EventPool& instance(); }; } // namespace kvikio::detail diff --git a/cpp/src/detail/event.cpp b/cpp/src/detail/event.cpp index 7ea38fdc8b..c26319ca63 100644 --- a/cpp/src/detail/event.cpp +++ b/cpp/src/detail/event.cpp @@ -56,28 +56,25 @@ EventPool::Event EventPool::get() CUcontext ctx{}; CUDA_DRIVER_TRY(cudaAPI::instance().CtxGetCurrent(&ctx)); KVIKIO_EXPECT(ctx != nullptr, "No CUDA context is current"); - return get(ctx); -} -EventPool::Event EventPool::get(CUcontext context) -{ - KVIKIO_EXPECT(context != nullptr, "No CUDA context is current"); CUevent event{}; { std::lock_guard const lock(_mutex); - auto it = _pools.find(context); - if (it != _pools.end() && !it->second.empty()) { + // If the key (`ctx`) is found from the pool, assign the search result to `event` + if (auto it = _pools.find(ctx); it != _pools.end() && !it->second.empty()) { event = it->second.back(); it->second.pop_back(); } } - // Create the event outside the lock if (event == nullptr) { + // Create an event outside the lock to improve performance. + // The pool is not updated here; the returned Event object will automatically return the event + // to the pool when it goes out of scope CUDA_DRIVER_TRY(cudaAPI::instance().EventCreate(&event, CU_EVENT_DISABLE_TIMING)); } - return Event(event, context); + return Event(event, ctx); } void EventPool::put(CUevent event, CUcontext context) noexcept From bc7064bc65db85767677454d1f96ec5ff36184fc Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Mon, 2 Feb 2026 16:49:38 -0500 Subject: [PATCH 16/37] Update --- cpp/include/kvikio/bounce_buffer.hpp | 30 ++++++++++++++++++++++++ cpp/include/kvikio/defaults.hpp | 7 +++++- cpp/src/bounce_buffer.cpp | 34 ++++++++++++++++++++++++++++ cpp/src/defaults.cpp | 18 ++++++++++++++- 4 files changed, 87 insertions(+), 2 deletions(-) diff --git a/cpp/include/kvikio/bounce_buffer.hpp b/cpp/include/kvikio/bounce_buffer.hpp index 5d2428ba84..5e58c6a865 100644 --- a/cpp/include/kvikio/bounce_buffer.hpp +++ b/cpp/include/kvikio/bounce_buffer.hpp @@ -4,7 +4,9 @@ */ #pragma once +#include #include +#include #include @@ -516,4 +518,32 @@ class BounceBufferRing { */ void reset(CUstream stream); }; + +template +class BounceBufferRingCachePerThreadAndContext { + public: + using Ring = BounceBufferRing; + + private: + std::map, std::unique_ptr> _rings; + std::mutex mutable _mutex; + + BounceBufferRingCachePerThreadAndContext() = default; + ~BounceBufferRingCachePerThreadAndContext() = default; + + public: + // Non-copyable, non-movable singleton + BounceBufferRingCachePerThreadAndContext(BounceBufferRingCachePerThreadAndContext const&) = + delete; + BounceBufferRingCachePerThreadAndContext& operator=( + BounceBufferRingCachePerThreadAndContext const&) = delete; + BounceBufferRingCachePerThreadAndContext(BounceBufferRingCachePerThreadAndContext&&) = delete; + BounceBufferRingCachePerThreadAndContext& operator=(BounceBufferRingCachePerThreadAndContext&&) = + delete; + + Ring& ring(); + + static BounceBufferRingCachePerThreadAndContext& instance(); +}; + } // namespace kvikio diff --git a/cpp/include/kvikio/defaults.hpp b/cpp/include/kvikio/defaults.hpp index 190909c2cc..06e13b0997 100644 --- a/cpp/include/kvikio/defaults.hpp +++ b/cpp/include/kvikio/defaults.hpp @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2022-2025, NVIDIA CORPORATION. + * SPDX-FileCopyrightText: Copyright (c) 2022-2026, NVIDIA CORPORATION. * SPDX-License-Identifier: Apache-2.0 */ @@ -116,6 +116,7 @@ class defaults { std::size_t _task_size; std::size_t _gds_threshold; std::size_t _bounce_buffer_size; + std::size_t _bounce_buffer_count; std::size_t _http_max_attempts; long _http_timeout; std::vector _http_status_codes; @@ -302,6 +303,10 @@ class defaults { */ static void set_bounce_buffer_size(std::size_t nbytes); + [[nodiscard]] static std::size_t bounce_buffer_count(); + + static void set_bounce_buffer_count(std::size_t count); + /** * @brief Get the maximum number of attempts per remote IO read. * diff --git a/cpp/src/bounce_buffer.cpp b/cpp/src/bounce_buffer.cpp index e0fa465ae7..412c98ef69 100644 --- a/cpp/src/bounce_buffer.cpp +++ b/cpp/src/bounce_buffer.cpp @@ -457,4 +457,38 @@ void BounceBufferRing::reset(CUstream stream) template class BounceBufferRing; template class BounceBufferRing; +template +BounceBufferRingCachePerThreadAndContext::Ring& +BounceBufferRingCachePerThreadAndContext::ring() +{ + KVIKIO_NVTX_FUNC_RANGE(); + + CUcontext ctx{nullptr}; + CUDA_DRIVER_TRY(cudaAPI::instance().CtxGetCurrent(&ctx)); + KVIKIO_EXPECT(ctx != nullptr, "No CUDA context is current"); + auto key = std::make_pair(ctx, std::this_thread::get_id()); + + std::lock_guard const lock(_mutex); + + bool use_batch_copy = getenv_or("KVIKIO_USE_BATCH_COPY", false); + auto it = _rings.find(key); + if (it == _rings.end()) { + auto ring = std::make_unique(defaults::bounce_buffer_count(), use_batch_copy); + it = _rings.emplace(key, std::move(ring)).first; + } + return *it->second; +} + +template +BounceBufferRingCachePerThreadAndContext& +BounceBufferRingCachePerThreadAndContext::instance() +{ + static BounceBufferRingCachePerThreadAndContext instance; + return instance; +} + +// Explicit instantiations +template class BounceBufferRingCachePerThreadAndContext; +template class BounceBufferRingCachePerThreadAndContext; + } // namespace kvikio diff --git a/cpp/src/defaults.cpp b/cpp/src/defaults.cpp index 841e7314d3..2ec65bab8e 100644 --- a/cpp/src/defaults.cpp +++ b/cpp/src/defaults.cpp @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION. + * SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION. * SPDX-License-Identifier: Apache-2.0 */ @@ -114,6 +114,13 @@ defaults::defaults() env > 0, "KVIKIO_BOUNCE_BUFFER_SIZE has to be a positive integer", std::invalid_argument); _bounce_buffer_size = env; } + // Determine the default value of `bounce_buffer_count` + { + ssize_t const env = getenv_or("KVIKIO_BOUNCE_BUFFER_COUNT", 4); + KVIKIO_EXPECT( + env > 0, "KVIKIO_BOUNCE_BUFFER_COUNT has to be a positive integer", std::invalid_argument); + _bounce_buffer_count = env; + } // Determine the default value of `http_max_attempts` { ssize_t const env = getenv_or("KVIKIO_HTTP_MAX_ATTEMPTS", 3); @@ -213,6 +220,15 @@ void defaults::set_bounce_buffer_size(std::size_t nbytes) instance()->_bounce_buffer_size = nbytes; } +std::size_t defaults::bounce_buffer_count() { return instance()->_bounce_buffer_count; } + +void defaults::set_bounce_buffer_count(std::size_t count) +{ + KVIKIO_EXPECT( + count > 0, "Number of the bounce buffers must be a positive integer", std::invalid_argument); + instance()->_bounce_buffer_count = count; +} + std::size_t defaults::http_max_attempts() { return instance()->_http_max_attempts; } void defaults::set_http_max_attempts(std::size_t attempts) From 812c8a9afea290c6194430e58c0ccb34a4a38f54 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Tue, 3 Feb 2026 09:42:54 -0500 Subject: [PATCH 17/37] Remove get(ctx, tid) and move its content to get() --- cpp/include/kvikio/detail/stream.hpp | 18 +++--------------- cpp/src/detail/stream.cpp | 20 ++++++++------------ 2 files changed, 11 insertions(+), 27 deletions(-) diff --git a/cpp/include/kvikio/detail/stream.hpp b/cpp/include/kvikio/detail/stream.hpp index bcd19ad676..f2a03fb0c5 100644 --- a/cpp/include/kvikio/detail/stream.hpp +++ b/cpp/include/kvikio/detail/stream.hpp @@ -35,29 +35,17 @@ class StreamCachePerThreadAndContext { StreamCachePerThreadAndContext() = default; ~StreamCachePerThreadAndContext() = default; - /** - * @brief Get or create a CUDA stream for the specified context and thread. - * - * If a stream already exists for the given (context, thread) pair, it is returned. Otherwise, a - * new stream is created, cached, and returned. - * - * @param ctx The CUDA context. If null, the null stream is returned. - * @param thd_id The thread identifier. - * @return The CUDA stream associated with this (context, thread) pair, or nullptr if @p ctx is - * null. - */ - KVIKIO_EXPORT static CUstream get(CUcontext ctx, std::thread::id thd_id); - public: /** * @brief Get or create a CUDA stream for the current context and thread. * - * Convenience overload that uses the current CUDA context and calling thread's ID. + * If a stream already exists for the current (context, thread) pair, it is returned. Otherwise, a + * new stream is created, cached, and returned. * * @return The CUDA stream associated with the current (context, thread) pair, or nullptr if no * CUDA context is current. */ - static CUstream get(); + KVIKIO_EXPORT static CUstream get(); StreamCachePerThreadAndContext(StreamCachePerThreadAndContext const&) = delete; StreamCachePerThreadAndContext& operator=(StreamCachePerThreadAndContext const&) = delete; diff --git a/cpp/src/detail/stream.cpp b/cpp/src/detail/stream.cpp index 9f517d393c..51e1966378 100644 --- a/cpp/src/detail/stream.cpp +++ b/cpp/src/detail/stream.cpp @@ -11,17 +11,21 @@ namespace kvikio::detail { -CUstream StreamCachePerThreadAndContext::get(CUcontext ctx, std::thread::id thd_id) +CUstream StreamCachePerThreadAndContext::get() { - static StreamCachePerThreadAndContext _instance; + KVIKIO_NVTX_FUNC_RANGE(); + CUcontext ctx{nullptr}; + CUDA_DRIVER_TRY(cudaAPI::instance().CtxGetCurrent(&ctx)); // If no current context, we return the null/default stream if (ctx == nullptr) { return nullptr; } + static StreamCachePerThreadAndContext _instance; + auto key = std::make_pair(ctx, std::this_thread::get_id()); + std::lock_guard const lock(_instance._mutex); - auto key = std::make_pair(ctx, thd_id); - // Create a new stream if this (context, thread) pair doesn't have one. + // Create a new stream if the (context, thread) pair doesn't have one. if (auto search = _instance._streams.find(key); search == _instance._streams.end()) { CUstream stream{}; CUDA_DRIVER_TRY(cudaAPI::instance().StreamCreate(&stream, CU_STREAM_DEFAULT)); @@ -31,12 +35,4 @@ CUstream StreamCachePerThreadAndContext::get(CUcontext ctx, std::thread::id thd_ return search->second; } } - -CUstream StreamCachePerThreadAndContext::get() -{ - KVIKIO_NVTX_FUNC_RANGE(); - CUcontext ctx{nullptr}; - CUDA_DRIVER_TRY(cudaAPI::instance().CtxGetCurrent(&ctx)); - return get(ctx, std::this_thread::get_id()); -} } // namespace kvikio::detail From d878f796c7c662cf462c036441167f1161421280 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Tue, 3 Feb 2026 10:17:09 -0500 Subject: [PATCH 18/37] Log exception msg --- cpp/src/detail/event.cpp | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/cpp/src/detail/event.cpp b/cpp/src/detail/event.cpp index c26319ca63..10eb9b463a 100644 --- a/cpp/src/detail/event.cpp +++ b/cpp/src/detail/event.cpp @@ -85,9 +85,14 @@ void EventPool::put(CUevent event, CUcontext context) noexcept try { std::lock_guard const lock(_mutex); _pools[context].push_back(event); - } catch (...) { - // If returning to pool fails, destroy the event - cudaAPI::instance().EventDestroy(event); + } catch (std::exception const& e) { + KVIKIO_LOG_ERROR(e.what()); + try { + // If returning to pool fails, destroy the event + CUDA_DRIVER_TRY(cudaAPI::instance().EventDestroy(event)); + } catch (std::exception const& e) { + KVIKIO_LOG_ERROR(e.what()); + } } } From df8a4a8c9b5524eab6b913f4c35767ed935f6791 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Tue, 3 Feb 2026 10:18:12 -0500 Subject: [PATCH 19/37] Update cpp/src/detail/event.cpp Co-authored-by: Lawrence Mitchell --- cpp/src/detail/event.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/detail/event.cpp b/cpp/src/detail/event.cpp index 10eb9b463a..56dfbb33b9 100644 --- a/cpp/src/detail/event.cpp +++ b/cpp/src/detail/event.cpp @@ -107,7 +107,7 @@ std::size_t EventPool::total_free_events() const { std::lock_guard const lock(_mutex); std::size_t total{0}; - for (auto const& [ctx, events] : _pools) { + for (auto const& [_, events] : _pools) { total += events.size(); } return total; From 9d73727307b1ec06950c0e312134ac875d685b96 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Tue, 3 Feb 2026 13:33:20 -0500 Subject: [PATCH 20/37] Update name for clarity --- cpp/include/kvikio/detail/event.hpp | 6 +++--- cpp/src/detail/event.cpp | 25 +++++++++++++------------ 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/cpp/include/kvikio/detail/event.hpp b/cpp/include/kvikio/detail/event.hpp index 672cfb4139..607f25329f 100644 --- a/cpp/include/kvikio/detail/event.hpp +++ b/cpp/include/kvikio/detail/event.hpp @@ -33,7 +33,7 @@ class EventPool { private: CUevent _event{nullptr}; - CUcontext _context{nullptr}; + CUcontext _cuda_context{nullptr}; /** * @brief Construct an Event wrapping a CUDA event handle @@ -64,7 +64,7 @@ class EventPool { * * @return The CUcontext this event belongs to */ - [[nodiscard]] CUcontext context() const noexcept; + [[nodiscard]] CUcontext cuda_context() const noexcept; /** * @brief Record the event on a CUDA stream @@ -105,7 +105,7 @@ class EventPool { EventPool& operator=(EventPool&&) = delete; /** - * @brief Acquire a CUDA event from the pool + * @brief Acquire a CUDA event from the pool for the current CUDA context. * * Returns a cached event for the current CUDA context if available, otherwise creates a new one. * The returned Event object will automatically return the event to the pool when it goes out of diff --git a/cpp/src/detail/event.cpp b/cpp/src/detail/event.cpp index 56dfbb33b9..4d07985713 100644 --- a/cpp/src/detail/event.cpp +++ b/cpp/src/detail/event.cpp @@ -10,34 +10,35 @@ namespace kvikio::detail { -EventPool::Event::Event(CUevent event, CUcontext context) noexcept - : _event(event), _context(context) +EventPool::Event::Event(CUevent event, CUcontext cuda_context) noexcept + : _event(event), _cuda_context(cuda_context) { } EventPool::Event::~Event() noexcept { - if (_event != nullptr) { EventPool::instance().put(_event, _context); } + if (_event != nullptr) { EventPool::instance().put(_event, _cuda_context); } } EventPool::Event::Event(Event&& other) noexcept - : _event(std::exchange(other._event, nullptr)), _context(std::exchange(other._context, nullptr)) + : _event(std::exchange(other._event, nullptr)), + _cuda_context(std::exchange(other._cuda_context, nullptr)) { } EventPool::Event& EventPool::Event::operator=(Event&& other) noexcept { if (this != &other) { - if (_event != nullptr) { EventPool::instance().put(_event, _context); } - _event = std::exchange(other._event, nullptr); - _context = std::exchange(other._context, nullptr); + if (_event != nullptr) { EventPool::instance().put(_event, _cuda_context); } + _event = std::exchange(other._event, nullptr); + _cuda_context = std::exchange(other._cuda_context, nullptr); } return *this; } CUevent EventPool::Event::get() const noexcept { return _event; } -CUcontext EventPool::Event::context() const noexcept { return _context; } +CUcontext EventPool::Event::cuda_context() const noexcept { return _cuda_context; } void EventPool::Event::record(CUstream stream) { @@ -77,14 +78,14 @@ EventPool::Event EventPool::get() return Event(event, ctx); } -void EventPool::put(CUevent event, CUcontext context) noexcept +void EventPool::put(CUevent event, CUcontext cuda_context) noexcept { KVIKIO_NVTX_FUNC_RANGE(); if (event == nullptr) { return; } try { std::lock_guard const lock(_mutex); - _pools[context].push_back(event); + _pools[cuda_context].push_back(event); } catch (std::exception const& e) { KVIKIO_LOG_ERROR(e.what()); try { @@ -96,10 +97,10 @@ void EventPool::put(CUevent event, CUcontext context) noexcept } } -std::size_t EventPool::num_free_events(CUcontext context) const +std::size_t EventPool::num_free_events(CUcontext cuda_context) const { std::lock_guard const lock(_mutex); - auto it = _pools.find(context); + auto it = _pools.find(cuda_context); return (it != _pools.end()) ? it->second.size() : 0; } From 1626605bac01e5e89c7a7fe6278507150d33222c Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Tue, 3 Feb 2026 14:17:18 -0500 Subject: [PATCH 21/37] Update --- cpp/CMakeLists.txt | 1 + cpp/include/kvikio/detail/context.hpp | 38 ++++++++++++++++++++++ cpp/src/detail/context.cpp | 45 +++++++++++++++++++++++++++ cpp/src/file_handle.cpp | 19 ++++++----- 4 files changed, 96 insertions(+), 7 deletions(-) create mode 100644 cpp/include/kvikio/detail/context.hpp create mode 100644 cpp/src/detail/context.cpp diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index a2b22031e7..121a13a312 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -152,6 +152,7 @@ set(SOURCES "src/file_handle.cpp" "src/file_utils.cpp" "src/mmap.cpp" + "src/detail/context.cpp" "src/detail/env.cpp" "src/detail/event.cpp" "src/detail/nvtx.cpp" diff --git a/cpp/include/kvikio/detail/context.hpp b/cpp/include/kvikio/detail/context.hpp new file mode 100644 index 0000000000..be08e7b480 --- /dev/null +++ b/cpp/include/kvikio/detail/context.hpp @@ -0,0 +1,38 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. + * SPDX-License-Identifier: Apache-2.0 + */ +#pragma once + +#include +#include +#include + +#include +#include + +namespace kvikio::detail { +class IoContext { + public: + CUcontext _cuda_context; + std::mutex mutable _mutex; + std::map _thread_events; + + public: + explicit IoContext(CUcontext cuda_context) noexcept; + + ~IoContext() = default; + + // Non-copyable, non-movable (shared via shared_ptr) + IoContext(IoContext const&) = delete; + IoContext& operator=(IoContext const&) = delete; + IoContext(IoContext&&) = delete; + IoContext& operator=(IoContext&&) = delete; + + [[nodiscard]] CUcontext cuda_context() const noexcept; + + void record_event(CUstream stream); + + void sync_all_events(); +}; +} // namespace kvikio::detail diff --git a/cpp/src/detail/context.cpp b/cpp/src/detail/context.cpp new file mode 100644 index 0000000000..f9a9037365 --- /dev/null +++ b/cpp/src/detail/context.cpp @@ -0,0 +1,45 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include +#include +#include +#include +#include + +namespace kvikio::detail { + +[[nodiscard]] CUcontext IoContext::cuda_context() const noexcept { return _cuda_context; } + +void IoContext::record_event(CUstream stream) +{ + KVIKIO_NVTX_FUNC_RANGE(); + auto const tid = std::this_thread::get_id(); + + std::lock_guard const lock(_mutex); + + // If not found, acquire an event from the pool + // If found, retrieve the event + auto [it, _] = _thread_events.try_emplace(tid, EventPool::instance().get()); + + // If the event has been used on a previous I/O task on the same thread, overwrite previous + // captured state + it->second.record(stream); +} + +void IoContext::sync_all_events() +{ + KVIKIO_NVTX_FUNC_RANGE(); + // No lock needed. All I/O tasks are done, no concurrent access. + + for (auto& [_, event] : _thread_events) { + event.synchronize(); + } + + // Clear the map. Event destructors return events to the pool + _thread_events.clear(); +} + +} // namespace kvikio::detail diff --git a/cpp/src/file_handle.cpp b/cpp/src/file_handle.cpp index b978b49dc3..ede569cff4 100644 --- a/cpp/src/file_handle.cpp +++ b/cpp/src/file_handle.cpp @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION. + * SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION. * SPDX-License-Identifier: Apache-2.0 */ @@ -14,8 +14,10 @@ #include #include +#include #include #include +#include #include #include #include @@ -246,7 +248,8 @@ std::future FileHandle::pread(void* buf, op, buf, size, file_offset, task_size, 0, actual_thread_pool, call_idx, nvtx_color); } - CUcontext ctx = get_context_from_pointer(buf); + CUcontext ctx = get_context_from_pointer(buf); + auto io_context = std::make_shared(ctx); // Shortcut that circumvent the threadpool and use the POSIX backend directly. if (size < gds_threshold) { @@ -265,11 +268,13 @@ std::future FileHandle::pread(void* buf, } // Regular case that use the threadpool and run the tasks in parallel - auto task = [this, ctx](void* devPtr_base, - std::size_t size, - std::size_t file_offset, - std::size_t devPtr_offset) -> std::size_t { - PushAndPopContext c(ctx); + auto task = [this, io_context = io_context](void* devPtr_base, + std::size_t size, + std::size_t file_offset, + std::size_t devPtr_offset) -> std::size_t { + PushAndPopContext c(io_context->cuda_context()); + auto& bounce_ring = + BounceBufferRingCachePerThreadAndContext::instance().ring(); return read(devPtr_base, size, file_offset, devPtr_offset, /* sync_default_stream = */ false); }; auto [devPtr_base, base_size, devPtr_offset] = get_alloc_info(buf, &ctx); From b71c85755f8528c1bf461e7c0eebd8d83892d9a6 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Tue, 3 Feb 2026 14:43:02 -0500 Subject: [PATCH 22/37] Update --- cpp/include/kvikio/detail/event.hpp | 24 ++++++++++++++++++------ cpp/src/detail/event.cpp | 26 ++++++++++++++------------ 2 files changed, 32 insertions(+), 18 deletions(-) diff --git a/cpp/include/kvikio/detail/event.hpp b/cpp/include/kvikio/detail/event.hpp index 607f25329f..d45e3610ec 100644 --- a/cpp/include/kvikio/detail/event.hpp +++ b/cpp/include/kvikio/detail/event.hpp @@ -28,20 +28,30 @@ namespace kvikio::detail { */ class EventPool { public: + /** + * @brief RAII wrapper for a pooled CUDA event + * + * Automatically returns the event to the pool when destroyed. Provides access to the underlying + * CUevent handle and common event operations (record, synchronize). + * + * @note Non-copyable but movable to allow transfer of ownership while maintaining RAII + */ class Event { friend class EventPool; private: - CUevent _event{nullptr}; - CUcontext _cuda_context{nullptr}; + EventPool* _pool{}; + CUevent _event{}; + CUcontext _cuda_context{}; /** * @brief Construct an Event wrapping a CUDA event handle * + * @param pool The owning EventPool to return this event to on destruction * @param event The CUDA event handle to wrap * @param context The CUDA context associated with this event */ - explicit Event(CUevent event, CUcontext context) noexcept; + explicit Event(EventPool* pool, CUevent event, CUcontext context) noexcept; public: ~Event() noexcept; @@ -49,8 +59,8 @@ class EventPool { // Move-only Event(Event const&) = delete; Event& operator=(Event const&) = delete; - Event(Event&& other) noexcept; - Event& operator=(Event&& other) noexcept; + Event(Event&& o) noexcept; + Event& operator=(Event&& o) noexcept; /** * @brief Get the underlying CUDA event handle @@ -74,6 +84,8 @@ class EventPool { * * @param stream The CUDA stream to record the event on (must belong to the same context as this * event) + * + * @exception CUfileException if the record operation fails */ void record(CUstream stream); @@ -82,7 +94,7 @@ class EventPool { * * Waits for all work captured by a preceding record() call to complete. * - * @exception CudaError if the synchronize operation fails + * @exception CUfileException if the synchronize operation fails */ void synchronize(); }; diff --git a/cpp/src/detail/event.cpp b/cpp/src/detail/event.cpp index 4d07985713..74b88f09b5 100644 --- a/cpp/src/detail/event.cpp +++ b/cpp/src/detail/event.cpp @@ -10,28 +10,30 @@ namespace kvikio::detail { -EventPool::Event::Event(CUevent event, CUcontext cuda_context) noexcept - : _event(event), _cuda_context(cuda_context) +EventPool::Event::Event(EventPool* pool, CUevent event, CUcontext cuda_context) noexcept + : _pool(pool), _event(event), _cuda_context(cuda_context) { } EventPool::Event::~Event() noexcept { - if (_event != nullptr) { EventPool::instance().put(_event, _cuda_context); } + if (_event != nullptr) { _pool->put(_event, _cuda_context); } } -EventPool::Event::Event(Event&& other) noexcept - : _event(std::exchange(other._event, nullptr)), - _cuda_context(std::exchange(other._cuda_context, nullptr)) +EventPool::Event::Event(Event&& o) noexcept + : _pool(std::exchange(o._pool, nullptr)), + _event(std::exchange(o._event, nullptr)), + _cuda_context(std::exchange(o._cuda_context, nullptr)) { } -EventPool::Event& EventPool::Event::operator=(Event&& other) noexcept +EventPool::Event& EventPool::Event::operator=(Event&& o) noexcept { - if (this != &other) { - if (_event != nullptr) { EventPool::instance().put(_event, _cuda_context); } - _event = std::exchange(other._event, nullptr); - _cuda_context = std::exchange(other._cuda_context, nullptr); + if (this != &o) { + if (_event != nullptr) { _pool->put(_event, _cuda_context); } + _pool = std::exchange(o._pool, nullptr); + _event = std::exchange(o._event, nullptr); + _cuda_context = std::exchange(o._cuda_context, nullptr); } return *this; } @@ -75,7 +77,7 @@ EventPool::Event EventPool::get() CUDA_DRIVER_TRY(cudaAPI::instance().EventCreate(&event, CU_EVENT_DISABLE_TIMING)); } - return Event(event, ctx); + return Event(this, event, ctx); } void EventPool::put(CUevent event, CUcontext cuda_context) noexcept From 7b2c32a509f56900ae16f3f43f1642d8bca11605 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Tue, 3 Feb 2026 14:51:15 -0500 Subject: [PATCH 23/37] Set buffer ctor to private --- cpp/include/kvikio/bounce_buffer.hpp | 5 ++++- cpp/tests/test_bounce_buffer.cpp | 8 +++----- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/cpp/include/kvikio/bounce_buffer.hpp b/cpp/include/kvikio/bounce_buffer.hpp index 5e58c6a865..6fddb97a7d 100644 --- a/cpp/include/kvikio/bounce_buffer.hpp +++ b/cpp/include/kvikio/bounce_buffer.hpp @@ -145,13 +145,16 @@ class BounceBufferPool { * @note Non-copyable but movable to allow transfer of ownership while maintaining RAII */ class Buffer { + friend BounceBufferPool; + private: BounceBufferPool* _pool; void* _buffer; std::size_t _size; - public: Buffer(BounceBufferPool* pool, void* buffer, std::size_t size); + + public: Buffer(Buffer const&) = delete; Buffer& operator=(Buffer const&) = delete; Buffer(Buffer&& o) noexcept; diff --git a/cpp/tests/test_bounce_buffer.cpp b/cpp/tests/test_bounce_buffer.cpp index c1959a2e0c..50e8b4fefc 100644 --- a/cpp/tests/test_bounce_buffer.cpp +++ b/cpp/tests/test_bounce_buffer.cpp @@ -59,12 +59,10 @@ TEST_F(BounceBufferTest, move_construction_and_move_assignment) { // Buffer reused auto buf1 = pool.get(); + auto buf2 = pool.get(); - // Manually create a new buffer outside the pool - kvikio::CudaPinnedAllocator allocator; - auto* buffer = allocator.allocate(pool.buffer_size()); - kvikio::CudaPinnedBounceBufferPool::Buffer buf2(&pool, buffer, pool.buffer_size()); - // Move assignment that adds the previous buffer to the pool and then transfers the ownership + // Move assignment that returns the previous buffer in buf2 to the pool and then transfers the + // ownership from buf1 to buf2 buf2 = std::move(buf1); } From c0d57d7d1ca313d557987c05fcd7aba80c8da412 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Tue, 3 Feb 2026 14:58:59 -0500 Subject: [PATCH 24/37] Update --- cpp/include/kvikio/bounce_buffer.hpp | 31 ++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/cpp/include/kvikio/bounce_buffer.hpp b/cpp/include/kvikio/bounce_buffer.hpp index 6fddb97a7d..df2e91d262 100644 --- a/cpp/include/kvikio/bounce_buffer.hpp +++ b/cpp/include/kvikio/bounce_buffer.hpp @@ -522,6 +522,22 @@ class BounceBufferRing { void reset(CUstream stream); }; +/** + * @brief Thread-safe singleton cache for per-thread, per-context bounce buffer rings. + * + * Manages a collection of BounceBufferRing instances, each uniquely associated with a (CUDA + * context, thread ID) pair. This ensures that each thread operating within a specific CUDA context + * gets its own dedicated ring, avoiding synchronization overhead during I/O operations while + * maintaining correct behavior across multi-context applications. + * + * @tparam Allocator The allocator policy for the underlying bounce buffer rings: + * - CudaPinnedAllocator: For device I/O without Direct I/O + * - CudaPageAlignedPinnedAllocator: For device I/O with Direct I/O + * + * @note Rings are created lazily on first access and persist for the lifetime of the program. + * @note The cache itself is thread-safe; individual rings are not (by design, since each ring is + * accessed by only one thread within one context). + */ template class BounceBufferRingCachePerThreadAndContext { public: @@ -544,8 +560,23 @@ class BounceBufferRingCachePerThreadAndContext { BounceBufferRingCachePerThreadAndContext& operator=(BounceBufferRingCachePerThreadAndContext&&) = delete; + /** + * @brief Get the bounce buffer ring for the current thread and CUDA context. + * + * Returns the cached ring for the calling thread's current CUDA context, creating one if it + * doesn't exist. The ring is configured with `defaults::bounce_buffer_count()` buffers and batch + * copy mode controlled by the `KVIKIO_USE_BATCH_COPY` environment variable. + * + * @return Reference to the ring associated with (current context, current thread). + * @exception kvikio::CUfileException if no CUDA context is current. + */ Ring& ring(); + /** + * @brief Get the singleton instance of the cache. + * + * @return Reference to the singleton cache instance. + */ static BounceBufferRingCachePerThreadAndContext& instance(); }; From dab118e53a2272af69bb2dd11a08e5c184502b3f Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Tue, 3 Feb 2026 15:01:24 -0500 Subject: [PATCH 25/37] Update --- cpp/include/kvikio/detail/event.hpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cpp/include/kvikio/detail/event.hpp b/cpp/include/kvikio/detail/event.hpp index d45e3610ec..34831ce0f8 100644 --- a/cpp/include/kvikio/detail/event.hpp +++ b/cpp/include/kvikio/detail/event.hpp @@ -85,7 +85,7 @@ class EventPool { * @param stream The CUDA stream to record the event on (must belong to the same context as this * event) * - * @exception CUfileException if the record operation fails + * @exception kvikio::CUfileException if the record operation fails */ void record(CUstream stream); @@ -94,7 +94,7 @@ class EventPool { * * Waits for all work captured by a preceding record() call to complete. * - * @exception CUfileException if the synchronize operation fails + * @exception kvikio::CUfileException if the synchronize operation fails */ void synchronize(); }; @@ -124,7 +124,7 @@ class EventPool { * scope. * * @return RAII Event object wrapping the acquired CUDA event - * @exception CudaError if no CUDA context is current or event creation fails + * @exception kvikio::CUfileException if no CUDA context is current or event creation fails */ [[nodiscard]] Event get(); From 6f6c41e0db3b08d971fd681cef2a729724484124 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Wed, 4 Feb 2026 10:29:44 -0500 Subject: [PATCH 26/37] Update --- cpp/include/kvikio/detail/posix_io.hpp | 37 ++++++++++++ cpp/include/kvikio/file_handle.hpp | 14 ++++- cpp/src/detail/context.cpp | 2 + cpp/src/file_handle.cpp | 79 +++++++++++++++++++++----- 4 files changed, 117 insertions(+), 15 deletions(-) diff --git a/cpp/include/kvikio/detail/posix_io.hpp b/cpp/include/kvikio/detail/posix_io.hpp index 89aba27819..ced137a52e 100644 --- a/cpp/include/kvikio/detail/posix_io.hpp +++ b/cpp/include/kvikio/detail/posix_io.hpp @@ -333,4 +333,41 @@ std::size_t posix_device_write(int fd_direct_off, std::size_t devPtr_offset, int fd_direct_on = -1); +template +std::size_t posix_device_read_with_bounce_buffer_ring_impl(int fd_direct_off, + void const* devPtr_base, + std::size_t size, + std::size_t file_offset, + std::size_t devPtr_offset, + int fd_direct_on = -1) +{ + // Direct I/O requires page-aligned bounce buffers. CudaPinnedAllocator uses + // cudaMemHostAlloc which does not guarantee page alignment. + if (std::is_same_v) { + KVIKIO_EXPECT(fd_direct_on == -1, + "Direct I/O requires page-aligned bounce buffers. CudaPinnedAllocator does not " + "guarantee page alignment. Use CudaPageAlignedPinnedAllocator instead."); + } + + auto devPtr = convert_void2deviceptr(devPtr_base) + devPtr_offset; + auto cur_file_offset = convert_size2off(file_offset); + auto bytes_remaining = convert_size2off(size); + auto& ring = BounceBufferRingCachePerThreadAndContext::instance().ring(); + auto const ring_buffer_size = convert_size2off(ring.buffer_size()); + + // Get a stream for the current CUDA context and thread + CUstream stream = StreamCachePerThreadAndContext::get(); + + while (bytes_remaining > 0) { + off_t const nbytes_requested = std::min(ring_buffer_size, bytes_remaining); + auto nbytes_got = nbytes_requested; + nbytes_got = posix_host_io( + fd_direct_off, ring.cur_buffer(), nbytes_requested, cur_file_offset, fd_direct_on); + ring.enqueue_h2d(reinterpret_cast(devPtr), nbytes_got, stream); + cur_file_offset += nbytes_got; + devPtr += nbytes_got; + bytes_remaining -= nbytes_got; + } + return size; +} } // namespace kvikio::detail diff --git a/cpp/include/kvikio/file_handle.hpp b/cpp/include/kvikio/file_handle.hpp index 0bf4328b9f..d8deb321f7 100644 --- a/cpp/include/kvikio/file_handle.hpp +++ b/cpp/include/kvikio/file_handle.hpp @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2021-2025, NVIDIA CORPORATION. + * SPDX-FileCopyrightText: Copyright (c) 2021-2026, NVIDIA CORPORATION. * SPDX-License-Identifier: Apache-2.0 */ #pragma once @@ -25,6 +25,11 @@ namespace kvikio { +// Forward declaration +namespace detail { +class IoContext; +} + /** * @brief Handle of an open file registered with cufile. * @@ -168,6 +173,13 @@ class FileHandle { std::size_t devPtr_offset, bool sync_default_stream = true); + std::size_t read_with_bounce_buffer_ring(detail::IoContext* io_context, + void* devPtr_base, + std::size_t size, + std::size_t file_offset, + std::size_t devPtr_offset, + bool sync_default_stream = true); + /** * @brief Writes specified bytes from the device memory into the file. * diff --git a/cpp/src/detail/context.cpp b/cpp/src/detail/context.cpp index f9a9037365..dead72f11d 100644 --- a/cpp/src/detail/context.cpp +++ b/cpp/src/detail/context.cpp @@ -11,6 +11,8 @@ namespace kvikio::detail { +IoContext::IoContext(CUcontext cuda_context) noexcept : _cuda_context(cuda_context) {} + [[nodiscard]] CUcontext IoContext::cuda_context() const noexcept { return _cuda_context; } void IoContext::record_event(CUstream stream) diff --git a/cpp/src/file_handle.cpp b/cpp/src/file_handle.cpp index ede569cff4..e88e25efaf 100644 --- a/cpp/src/file_handle.cpp +++ b/cpp/src/file_handle.cpp @@ -10,6 +10,7 @@ #include #include +#include #include #include #include @@ -21,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -191,6 +193,41 @@ std::size_t FileHandle::read(void* devPtr_base, return ret; } +std::size_t FileHandle::read_with_bounce_buffer_ring(detail::IoContext* io_context, + void* devPtr_base, + std::size_t size, + std::size_t file_offset, + std::size_t devPtr_offset, + bool sync_default_stream) +{ + KVIKIO_NVTX_FUNC_RANGE(size); + if (get_compat_mode_manager().is_compat_mode_preferred()) { + auto posix_device_read = [=, this]() -> std::size_t { + auto num_bytes_read = detail::posix_device_read_with_bounce_buffer_ring_impl( + _file_direct_off.fd(), devPtr_base, size, file_offset, devPtr_offset, _file_direct_on.fd()); + io_context->record_event(detail::StreamCachePerThreadAndContext::get()); + return num_bytes_read; + }; + + // If Direct I/O is supported and requested + if (_file_direct_on.fd() != -1 && defaults::auto_direct_io_read()) { + return posix_device_read.operator()(); + } else { + return posix_device_read.operator()(); + } + } + + if (sync_default_stream) { CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(nullptr)); } + + ssize_t ret = cuFileAPI::instance().Read(_cufile_handle.handle(), + devPtr_base, + size, + convert_size2off(file_offset), + convert_size2off(devPtr_offset)); + CUFILE_CHECK_BYTES_DONE(ret); + return ret; +} + std::size_t FileHandle::write(void const* devPtr_base, std::size_t size, std::size_t file_offset, @@ -248,12 +285,11 @@ std::future FileHandle::pread(void* buf, op, buf, size, file_offset, task_size, 0, actual_thread_pool, call_idx, nvtx_color); } - CUcontext ctx = get_context_from_pointer(buf); - auto io_context = std::make_shared(ctx); + auto io_context = std::make_shared(get_context_from_pointer(buf)); // Shortcut that circumvent the threadpool and use the POSIX backend directly. if (size < gds_threshold) { - PushAndPopContext c(ctx); + PushAndPopContext c(io_context->cuda_context()); auto bytes_read = detail::posix_device_read( _file_direct_off.fd(), buf, size, file_offset, 0, _file_direct_on.fd()); // Maintain API consistency while making this trivial case synchronous. @@ -263,7 +299,7 @@ std::future FileHandle::pread(void* buf, // Let's synchronize once instead of in each task. if (sync_default_stream && !get_compat_mode_manager().is_compat_mode_preferred()) { - PushAndPopContext c(ctx); + PushAndPopContext c(io_context->cuda_context()); CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(nullptr)); } @@ -275,18 +311,33 @@ std::future FileHandle::pread(void* buf, PushAndPopContext c(io_context->cuda_context()); auto& bounce_ring = BounceBufferRingCachePerThreadAndContext::instance().ring(); - return read(devPtr_base, size, file_offset, devPtr_offset, /* sync_default_stream = */ false); + return read_with_bounce_buffer_ring(io_context.get(), + devPtr_base, + size, + file_offset, + devPtr_offset, + false /* sync_default_stream*/); }; + + auto ctx{io_context->cuda_context()}; auto [devPtr_base, base_size, devPtr_offset] = get_alloc_info(buf, &ctx); - return parallel_io(task, - devPtr_base, - size, - file_offset, - task_size, - devPtr_offset, - actual_thread_pool, - call_idx, - nvtx_color); + + auto fut = parallel_io(task, + devPtr_base, + size, + file_offset, + task_size, + devPtr_offset, + actual_thread_pool, + call_idx, + nvtx_color); + + return std::async(std::launch::deferred, + [fut = std::move(fut), io_context = io_context]() mutable { + auto num_bytes_read = fut.get(); + io_context->sync_all_events(); + return num_bytes_read; + }); } std::future FileHandle::pwrite(void const* buf, From 282c60c3fdc5dcd6a592c908870670c128a0ba2d Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Wed, 4 Feb 2026 11:07:39 -0500 Subject: [PATCH 27/37] Bug fixes --- cpp/include/kvikio/detail/posix_io.hpp | 2 +- cpp/src/file_handle.cpp | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/cpp/include/kvikio/detail/posix_io.hpp b/cpp/include/kvikio/detail/posix_io.hpp index ced137a52e..baefb5c31e 100644 --- a/cpp/include/kvikio/detail/posix_io.hpp +++ b/cpp/include/kvikio/detail/posix_io.hpp @@ -363,7 +363,7 @@ std::size_t posix_device_read_with_bounce_buffer_ring_impl(int fd_direct_off, auto nbytes_got = nbytes_requested; nbytes_got = posix_host_io( fd_direct_off, ring.cur_buffer(), nbytes_requested, cur_file_offset, fd_direct_on); - ring.enqueue_h2d(reinterpret_cast(devPtr), nbytes_got, stream); + ring.submit_h2d(reinterpret_cast(devPtr), nbytes_got, stream); cur_file_offset += nbytes_got; devPtr += nbytes_got; bytes_remaining -= nbytes_got; diff --git a/cpp/src/file_handle.cpp b/cpp/src/file_handle.cpp index e88e25efaf..9aa9bb0d0f 100644 --- a/cpp/src/file_handle.cpp +++ b/cpp/src/file_handle.cpp @@ -309,8 +309,6 @@ std::future FileHandle::pread(void* buf, std::size_t file_offset, std::size_t devPtr_offset) -> std::size_t { PushAndPopContext c(io_context->cuda_context()); - auto& bounce_ring = - BounceBufferRingCachePerThreadAndContext::instance().ring(); return read_with_bounce_buffer_ring(io_context.get(), devPtr_base, size, From 3103863a2bc80a5b435cfcc7edafa946fceaf9b8 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Wed, 4 Feb 2026 11:08:15 -0500 Subject: [PATCH 28/37] Silly bug fixes --- cpp/src/shim/cuda.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/shim/cuda.cpp b/cpp/src/shim/cuda.cpp index cf5b9e9239..65c6aab5e3 100644 --- a/cpp/src/shim/cuda.cpp +++ b/cpp/src/shim/cuda.cpp @@ -41,7 +41,7 @@ cudaAPI::cudaAPI() get_symbol(StreamCreate, lib, KVIKIO_STRINGIFY(cuStreamCreate)); get_symbol(StreamDestroy, lib, KVIKIO_STRINGIFY(cuStreamDestroy)); get_symbol(DriverGetVersion, lib, KVIKIO_STRINGIFY(cuDriverGetVersion)); - get_symbol(EventSynchronize, lib, KVIKIO_STRINGIFY(cuSEventSynchronize)); + get_symbol(EventSynchronize, lib, KVIKIO_STRINGIFY(cuEventSynchronize)); get_symbol(EventCreate, lib, KVIKIO_STRINGIFY(cuEventCreate)); get_symbol(EventDestroy, lib, KVIKIO_STRINGIFY(cuEventDestroy)); get_symbol(EventRecord, lib, KVIKIO_STRINGIFY(cuEventRecord)); From 93e05c98480d1b8645b275941460d22e97528e7b Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Wed, 4 Feb 2026 11:36:45 -0500 Subject: [PATCH 29/37] Bug fixes --- cpp/src/detail/posix_io.cpp | 4 ++-- cpp/src/file_handle.cpp | 10 ++++++---- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/cpp/src/detail/posix_io.cpp b/cpp/src/detail/posix_io.cpp index 42091c55c8..6c059e78ce 100644 --- a/cpp/src/detail/posix_io.cpp +++ b/cpp/src/detail/posix_io.cpp @@ -23,7 +23,7 @@ std::size_t posix_device_read(int fd_direct_off, int fd_direct_on) { KVIKIO_NVTX_FUNC_RANGE(size); - // If Direct I/O is supported and requested + // If Direct I/O is available and requested if (fd_direct_on != -1 && defaults::auto_direct_io_read()) { return detail::posix_device_io( fd_direct_off, devPtr_base, size, file_offset, devPtr_offset, fd_direct_on); @@ -41,7 +41,7 @@ std::size_t posix_device_write(int fd_direct_off, int fd_direct_on) { KVIKIO_NVTX_FUNC_RANGE(size); - // If Direct I/O is supported and requested + // If Direct I/O is available and requested if (fd_direct_on != -1 && defaults::auto_direct_io_write()) { return detail::posix_device_io( fd_direct_off, devPtr_base, size, file_offset, devPtr_offset, fd_direct_on); diff --git a/cpp/src/file_handle.cpp b/cpp/src/file_handle.cpp index 9aa9bb0d0f..761a96501a 100644 --- a/cpp/src/file_handle.cpp +++ b/cpp/src/file_handle.cpp @@ -202,18 +202,20 @@ std::size_t FileHandle::read_with_bounce_buffer_ring(detail::IoContext* io_conte { KVIKIO_NVTX_FUNC_RANGE(size); if (get_compat_mode_manager().is_compat_mode_preferred()) { - auto posix_device_read = [=, this]() -> std::size_t { + auto posix_device_read = [=, this](int fd_direct_off, + int fd_direct_on = -1) -> std::size_t { auto num_bytes_read = detail::posix_device_read_with_bounce_buffer_ring_impl( - _file_direct_off.fd(), devPtr_base, size, file_offset, devPtr_offset, _file_direct_on.fd()); + fd_direct_off, devPtr_base, size, file_offset, devPtr_offset, fd_direct_on); io_context->record_event(detail::StreamCachePerThreadAndContext::get()); return num_bytes_read; }; // If Direct I/O is supported and requested if (_file_direct_on.fd() != -1 && defaults::auto_direct_io_read()) { - return posix_device_read.operator()(); + return posix_device_read.operator()(_file_direct_off.fd(), + _file_direct_on.fd()); } else { - return posix_device_read.operator()(); + return posix_device_read.operator()(_file_direct_off.fd()); } } From ab00a347b39edcd752186bebbedea8b1f9436f5b Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Wed, 4 Feb 2026 12:03:37 -0500 Subject: [PATCH 30/37] Leak intentionnaly --- cpp/include/kvikio/bounce_buffer.hpp | 2 +- cpp/src/bounce_buffer.cpp | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cpp/include/kvikio/bounce_buffer.hpp b/cpp/include/kvikio/bounce_buffer.hpp index df2e91d262..2d5e1189db 100644 --- a/cpp/include/kvikio/bounce_buffer.hpp +++ b/cpp/include/kvikio/bounce_buffer.hpp @@ -544,7 +544,7 @@ class BounceBufferRingCachePerThreadAndContext { using Ring = BounceBufferRing; private: - std::map, std::unique_ptr> _rings; + std::map, Ring*> _rings; std::mutex mutable _mutex; BounceBufferRingCachePerThreadAndContext() = default; diff --git a/cpp/src/bounce_buffer.cpp b/cpp/src/bounce_buffer.cpp index 412c98ef69..948193fe0e 100644 --- a/cpp/src/bounce_buffer.cpp +++ b/cpp/src/bounce_buffer.cpp @@ -473,8 +473,8 @@ BounceBufferRingCachePerThreadAndContext::ring() bool use_batch_copy = getenv_or("KVIKIO_USE_BATCH_COPY", false); auto it = _rings.find(key); if (it == _rings.end()) { - auto ring = std::make_unique(defaults::bounce_buffer_count(), use_batch_copy); - it = _rings.emplace(key, std::move(ring)).first; + auto ring = new Ring(defaults::bounce_buffer_count(), use_batch_copy); + it = _rings.emplace(key, ring).first; } return *it->second; } From 4208e5c86bb6ada5c7d86faca8cec18eea1791e7 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Wed, 4 Feb 2026 13:14:00 -0500 Subject: [PATCH 31/37] Update --- cpp/include/kvikio/shim/cuda.hpp | 2 +- cpp/src/shim/cuda.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/include/kvikio/shim/cuda.hpp b/cpp/include/kvikio/shim/cuda.hpp index 95a30ccb07..d5a098d67e 100644 --- a/cpp/include/kvikio/shim/cuda.hpp +++ b/cpp/include/kvikio/shim/cuda.hpp @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2022-2025, NVIDIA CORPORATION. + * SPDX-FileCopyrightText: Copyright (c) 2022-2026, NVIDIA CORPORATION. * SPDX-License-Identifier: Apache-2.0 */ #pragma once diff --git a/cpp/src/shim/cuda.cpp b/cpp/src/shim/cuda.cpp index 65c6aab5e3..004297a1cc 100644 --- a/cpp/src/shim/cuda.cpp +++ b/cpp/src/shim/cuda.cpp @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION. + * SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION. * SPDX-License-Identifier: Apache-2.0 */ From b36f967b29df55bd8a0248ea750fc5027480c5dc Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Wed, 4 Feb 2026 13:32:33 -0500 Subject: [PATCH 32/37] Update doxygen doc --- cpp/include/kvikio/bounce_buffer.hpp | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/cpp/include/kvikio/bounce_buffer.hpp b/cpp/include/kvikio/bounce_buffer.hpp index 2d5e1189db..bc4e97cf10 100644 --- a/cpp/include/kvikio/bounce_buffer.hpp +++ b/cpp/include/kvikio/bounce_buffer.hpp @@ -537,6 +537,20 @@ class BounceBufferRing { * @note Rings are created lazily on first access and persist for the lifetime of the program. * @note The cache itself is thread-safe; individual rings are not (by design, since each ring is * accessed by only one thread within one context). + * @note This class intentionally leaks its Ring allocations to avoid undefined behavior during + * process exit. Without this, the following destruction sequence causes crashes. By using raw + * pointers and never deleting them, we avoid calling into CUDA during static destruction. The OS + * reclaims all process memory at exit regardless. + * - `BounceBufferRingCachePerThreadAndContext` singleton is destroyed + * - Its map of `Ring*` entries would be deleted + * - Each `Ring` destructor destroys its `std::vector` + * - Each `Buffer` destructor calls `BounceBufferPool::put()` to return memory + * - `put()` may call `CudaPinnedAllocator::deallocate()` which includes CUDA API call + * - CUDA driver is already shut down, which leads to heap corruption / crash (sample error message: + * "free(): corrupted unsorted chunks") + * + * @sa BounceBufferPool, EventPool (same intentional leak pattern) + * @sa https://docs.nvidia.com/cuda/cuda-c-programming-guide/index.html#initialization */ template class BounceBufferRingCachePerThreadAndContext { From b1ba9ddde2e46ed3ff7507e482813275edbd8a0a Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Mon, 9 Feb 2026 08:36:58 -0500 Subject: [PATCH 33/37] Remove batch copy as it is half baked --- cpp/include/kvikio/bounce_buffer.hpp | 30 +------- cpp/src/bounce_buffer.cpp | 107 ++------------------------- 2 files changed, 11 insertions(+), 126 deletions(-) diff --git a/cpp/include/kvikio/bounce_buffer.hpp b/cpp/include/kvikio/bounce_buffer.hpp index bc4e97cf10..3935ceea43 100644 --- a/cpp/include/kvikio/bounce_buffer.hpp +++ b/cpp/include/kvikio/bounce_buffer.hpp @@ -5,8 +5,10 @@ #pragma once #include +#include #include #include +#include #include @@ -301,31 +303,17 @@ using CudaPageAlignedPinnedBounceBufferPool = BounceBufferPool class BounceBufferRing { - public: - struct BatchTransferContext { - std::vector srcs; - std::vector dsts; - std::vector sizes; - - void add_entry(void* dst, void* src, std::size_t size); - void clear(); - }; - private: std::vector::Buffer> _buffers; - BatchTransferContext _batch_transfer_ctx; std::size_t _num_buffers; std::size_t _cur_buf_idx{0}; std::size_t _initial_buf_idx{0}; std::size_t _cur_buffer_offset{0}; - bool _batch_copy; public: /** @@ -333,11 +321,8 @@ class BounceBufferRing { * * @param num_buffers Number of bounce buffers (k) for k-way overlap. Must be >= 1. Higher values * allow more overlap but consume more memory. - * @param batch_copy If true, defer H2D copies and issue them in batches. Useful for many small - * transfers where API overhead dominates. If false (default), issue H2D copies immediately for - * better overlap. */ - explicit BounceBufferRing(std::size_t num_buffers = 1, bool batch_copy = false); + explicit BounceBufferRing(std::size_t num_buffers = 1); ~BounceBufferRing() noexcept = default; @@ -412,9 +397,6 @@ class BounceBufferRing { /** * @brief Queue async copy from current bounce buffer to device memory. * - * In non-batch mode, issues cuMemcpyHtoDAsync immediately. In batch mode, defers the copy until - * wrap-around or synchronize(). - * * @param device_dst Device memory destination. * @param size Bytes to copy from cur_buffer(). * @param stream CUDA stream for the async transfer. @@ -501,9 +483,6 @@ class BounceBufferRing { /** * @brief Ensure all queued H2D transfers are complete. * - * In batch mode, issues any pending batch copies first. Then synchronizes the stream to guarantee - * all data is visible in device memory. - * * @param stream CUDA stream to synchronize. * * @note Must be called before reading transferred data on device. @@ -578,8 +557,7 @@ class BounceBufferRingCachePerThreadAndContext { * @brief Get the bounce buffer ring for the current thread and CUDA context. * * Returns the cached ring for the calling thread's current CUDA context, creating one if it - * doesn't exist. The ring is configured with `defaults::bounce_buffer_count()` buffers and batch - * copy mode controlled by the `KVIKIO_USE_BATCH_COPY` environment variable. + * doesn't exist. The ring is configured with `defaults::bounce_buffer_count()` buffers. * * @return Reference to the ring associated with (current context, current thread). * @exception kvikio::CUfileException if no CUDA context is current. diff --git a/cpp/src/bounce_buffer.cpp b/cpp/src/bounce_buffer.cpp index 948193fe0e..ecedaf1f45 100644 --- a/cpp/src/bounce_buffer.cpp +++ b/cpp/src/bounce_buffer.cpp @@ -219,82 +219,8 @@ template class BounceBufferPool; template class BounceBufferPool; template class BounceBufferPool; -namespace { -/** - * @brief Issue individual async H2D copies for each transfer. - * - * Fallback when cuMemcpyBatchAsync is unavailable. - */ -void unbatched_copy(std::span dsts, - std::span srcs, - std::span sizes, - CUstream stream) -{ - for (std::size_t i = 0; i < srcs.size(); ++i) { - CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyHtoDAsync( - convert_void2deviceptr(dsts[i]), reinterpret_cast(srcs[i]), sizes[i], stream)); - } -} - -/** - * @brief Issue H2D copies using batch API if available, otherwise fall back to individual copies. - * - * Uses cuMemcpyBatchAsync (CUDA 12.8+) to reduce API overhead for multiple small transfers. - */ -void batch_copy(std::span dsts, - std::span srcs, - std::span sizes, - CUstream stream) -{ - if (srcs.size() == 0) return; - -#if CUDA_VERSION >= 12080 - if (cudaAPI::instance().MemcpyBatchAsync) { - CUmemcpyAttributes attrs{}; - std::size_t attrs_idxs[] = {0}; - attrs.srcAccessOrder = CUmemcpySrcAccessOrder_enum::CU_MEMCPY_SRC_ACCESS_ORDER_STREAM; - CUDA_DRIVER_TRY( - cudaAPI::instance().MemcpyBatchAsync(dsts.data(), - srcs.data(), - sizes.data(), - srcs.size(), - &attrs, - attrs_idxs, - static_cast(1) /* num_attrs */, -#if CUDA_VERSION < 13000 - static_cast(nullptr), -#endif - stream)); - } else { - unbatched_copy(dsts, srcs, sizes, stream); - } -#else - unbatched_copy(dsts, srcs, sizes, stream); -#endif -} -} // namespace - template -void BounceBufferRing::BatchTransferContext::add_entry(void* dst, - void* src, - std::size_t size) -{ - srcs.push_back(src); - dsts.push_back(dst); - sizes.push_back(size); -} - -template -void BounceBufferRing::BatchTransferContext::clear() -{ - srcs.clear(); - dsts.clear(); - sizes.clear(); -} - -template -BounceBufferRing::BounceBufferRing(std::size_t num_buffers, bool batch_copy) - : _num_buffers{num_buffers}, _batch_copy{batch_copy} +BounceBufferRing::BounceBufferRing(std::size_t num_buffers) : _num_buffers{num_buffers} { KVIKIO_NVTX_FUNC_RANGE(); KVIKIO_EXPECT(num_buffers >= 1, "BounceBufferRing requires at least 1 buffer"); @@ -309,24 +235,16 @@ template void BounceBufferRing::enqueue_h2d(void* device_dst, std::size_t size, CUstream stream) { KVIKIO_NVTX_FUNC_RANGE(); - if (_batch_copy) { - _batch_transfer_ctx.add_entry(device_dst, cur_buffer(), size); - } else { - CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyHtoDAsync( - convert_void2deviceptr(device_dst), cur_buffer(), size, stream)); - } + CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyHtoDAsync( + convert_void2deviceptr(device_dst), cur_buffer(), size, stream)); } template void BounceBufferRing::enqueue_d2h(void* device_src, std::size_t size, CUstream stream) { KVIKIO_NVTX_FUNC_RANGE(); - if (_batch_copy) { - _batch_transfer_ctx.add_entry(cur_buffer(), device_src, size); - } else { - CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyDtoHAsync( - cur_buffer(), convert_void2deviceptr(device_src), size, stream)); - } + CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyDtoHAsync( + cur_buffer(), convert_void2deviceptr(device_src), size, stream)); } template @@ -337,11 +255,6 @@ void BounceBufferRing::advance(CUstream stream) ++_cur_buf_idx; if (_cur_buf_idx >= _num_buffers) { _cur_buf_idx = 0; } if (_cur_buf_idx == _initial_buf_idx) { - if (_batch_copy) { - batch_copy( - _batch_transfer_ctx.dsts, _batch_transfer_ctx.srcs, _batch_transfer_ctx.sizes, stream); - _batch_transfer_ctx.clear(); - } CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream)); _initial_buf_idx = _cur_buf_idx; } @@ -436,11 +349,6 @@ template void BounceBufferRing::synchronize(CUstream stream) { KVIKIO_NVTX_FUNC_RANGE(); - if (_batch_copy) { - batch_copy( - _batch_transfer_ctx.dsts, _batch_transfer_ctx.srcs, _batch_transfer_ctx.sizes, stream); - _batch_transfer_ctx.clear(); - } CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream)); } @@ -470,10 +378,9 @@ BounceBufferRingCachePerThreadAndContext::ring() std::lock_guard const lock(_mutex); - bool use_batch_copy = getenv_or("KVIKIO_USE_BATCH_COPY", false); - auto it = _rings.find(key); + auto it = _rings.find(key); if (it == _rings.end()) { - auto ring = new Ring(defaults::bounce_buffer_count(), use_batch_copy); + auto ring = new Ring(defaults::bounce_buffer_count()); it = _rings.emplace(key, ring).first; } return *it->second; From 64ee4253a846aeda1baf562efd0335887b00aa9c Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Mon, 9 Feb 2026 09:08:38 -0500 Subject: [PATCH 34/37] Add event query --- cpp/include/kvikio/shim/cuda.hpp | 1 + cpp/src/shim/cuda.cpp | 1 + 2 files changed, 2 insertions(+) diff --git a/cpp/include/kvikio/shim/cuda.hpp b/cpp/include/kvikio/shim/cuda.hpp index d5a098d67e..1773842417 100644 --- a/cpp/include/kvikio/shim/cuda.hpp +++ b/cpp/include/kvikio/shim/cuda.hpp @@ -111,6 +111,7 @@ class cudaAPI { decltype(cuEventCreate)* EventCreate{nullptr}; decltype(cuEventDestroy)* EventDestroy{nullptr}; decltype(cuEventRecord)* EventRecord{nullptr}; + decltype(cuEventQuery)* EventQuery{nullptr}; private: cudaAPI(); diff --git a/cpp/src/shim/cuda.cpp b/cpp/src/shim/cuda.cpp index 004297a1cc..00538d6d19 100644 --- a/cpp/src/shim/cuda.cpp +++ b/cpp/src/shim/cuda.cpp @@ -45,6 +45,7 @@ cudaAPI::cudaAPI() get_symbol(EventCreate, lib, KVIKIO_STRINGIFY(cuEventCreate)); get_symbol(EventDestroy, lib, KVIKIO_STRINGIFY(cuEventDestroy)); get_symbol(EventRecord, lib, KVIKIO_STRINGIFY(cuEventRecord)); + get_symbol(EventQuery, lib, KVIKIO_STRINGIFY(cuEventQuery)); CUDA_DRIVER_TRY(DriverGetVersion(&driver_version)); From 9645424e9266d7741ac5291789af7e2c8835873f Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Mon, 9 Feb 2026 10:33:45 -0500 Subject: [PATCH 35/37] Improve impl by using the event based approach --- cpp/include/kvikio/bounce_buffer.hpp | 60 +++++++++++----------------- cpp/src/bounce_buffer.cpp | 37 +++++++++-------- 2 files changed, 44 insertions(+), 53 deletions(-) diff --git a/cpp/include/kvikio/bounce_buffer.hpp b/cpp/include/kvikio/bounce_buffer.hpp index 3935ceea43..55eec1d4c4 100644 --- a/cpp/include/kvikio/bounce_buffer.hpp +++ b/cpp/include/kvikio/bounce_buffer.hpp @@ -11,6 +11,7 @@ #include #include +#include namespace kvikio { @@ -78,7 +79,7 @@ class CudaPinnedAllocator { * (for efficient host-device transfers). Uses std::aligned_alloc followed by * cudaMemHostRegister to achieve both properties. * - * @note Registration uses CU_MEMHOSTALLOC_PORTABLE, making buffers accessible from all CUDA + * @note Registration uses CU_MEMHOSTREGISTER_PORTABLE, making buffers accessible from all CUDA * contexts, not just the one active during allocation. This allows the singleton BounceBufferPool * to safely serve buffers across multiple contexts and devices. * @note This is the required allocator for Direct I/O with device memory. Requires a valid CUDA @@ -303,17 +304,33 @@ using CudaPageAlignedPinnedBounceBufferPool = BounceBufferPool class BounceBufferRing { private: std::vector::Buffer> _buffers; - std::size_t _num_buffers; std::size_t _cur_buf_idx{0}; - std::size_t _initial_buf_idx{0}; std::size_t _cur_buffer_offset{0}; + std::vector _events; + + /** + * @brief Advance to next buffer in the ring. + * + * Resets current buffer offset and moves to next buffer index. Waits on the target buffer's event + * if a prior async transfer is still in flight, ensuring the buffer is safe for CPU writes. + */ + void advance(); + + /** + * @brief Queue async copy from current bounce buffer to device memory. + * + * @param device_dst Device memory destination. + * @param size Bytes to copy from cur_buffer(). + * @param stream CUDA stream for the async transfer. + * + * @note Does NOT advance to next buffer. Call submit_h2d() for copy + advance. + */ + void enqueue_h2d(void* device_dst, std::size_t size, CUstream stream); public: /** @@ -384,36 +401,6 @@ class BounceBufferRing { */ [[nodiscard]] std::size_t cur_buffer_remaining_capacity() const noexcept; - /** - * @brief Advance to next buffer in the ring. - * - * Resets current buffer offset and moves to next buffer index. If wrapping around to the oldest - * in-flight buffer, synchronizes the stream first to ensure that buffer's transfer is complete. - * - * @param stream CUDA stream to synchronize on wrap-around. - */ - void advance(CUstream stream); - - /** - * @brief Queue async copy from current bounce buffer to device memory. - * - * @param device_dst Device memory destination. - * @param size Bytes to copy from cur_buffer(). - * @param stream CUDA stream for the async transfer. - * - * @note Does NOT advance to next buffer. Call submit_h2d() for copy + advance. - */ - void enqueue_h2d(void* device_dst, std::size_t size, CUstream stream); - - /** - * @brief Async copy from device memory to current bounce buffer. - * - * @param device_src Device memory source. - * @param size Bytes to copy. - * @param stream CUDA stream for the async transfer. - */ - void enqueue_d2h(void* device_src, std::size_t size, CUstream stream); - /** * @brief Accumulate data into bounce buffer, auto-submit when full. * @@ -493,8 +480,7 @@ class BounceBufferRing { /** * @brief Synchronize pending transfers and reset ring state for a new transfer session. * - * Ensures all in-flight transfers complete, then resets the ring to its initial state. After - * reset(), the ring can be safely reused for either H2D or D2H operations. + * Ensures all in-flight transfers complete, then resets the ring to its initial state. * * @param stream CUDA stream to synchronize. */ diff --git a/cpp/src/bounce_buffer.cpp b/cpp/src/bounce_buffer.cpp index ecedaf1f45..2d37923487 100644 --- a/cpp/src/bounce_buffer.cpp +++ b/cpp/src/bounce_buffer.cpp @@ -9,6 +9,7 @@ #include #include +#include #include #include #include @@ -52,11 +53,11 @@ void* CudaPageAlignedPinnedAllocator::allocate(std::size_t size) auto const aligned_size = detail::align_up(size, page_size); buffer = std::aligned_alloc(page_size, aligned_size); KVIKIO_EXPECT(buffer != nullptr, "Aligned allocation failed"); - // Register the page-aligned allocation as pinned memory with CU_MEMHOSTALLOC_PORTABLE. The + // Register the page-aligned allocation as pinned memory with CU_MEMHOSTREGISTER_PORTABLE. The // PORTABLE flag ensures this memory is accessible from all CUDA contexts, which is essential for // the singleton BounceBufferPool that may serve multiple contexts and devices. CUDA_DRIVER_TRY( - cudaAPI::instance().MemHostRegister(buffer, aligned_size, CU_MEMHOSTALLOC_PORTABLE)); + cudaAPI::instance().MemHostRegister(buffer, aligned_size, CU_MEMHOSTREGISTER_PORTABLE)); return buffer; } @@ -220,21 +221,22 @@ template class BounceBufferPool; template class BounceBufferPool; template -BounceBufferRing::BounceBufferRing(std::size_t num_buffers) : _num_buffers{num_buffers} +BounceBufferRing::BounceBufferRing(std::size_t num_buffers) { KVIKIO_NVTX_FUNC_RANGE(); KVIKIO_EXPECT(num_buffers >= 1, "BounceBufferRing requires at least 1 buffer"); - _buffers.reserve(_num_buffers); - for (std::size_t i = 0; i < _num_buffers; ++i) { + _buffers.reserve(num_buffers); + _events.reserve(num_buffers); + for (std::size_t i = 0; i < num_buffers; ++i) { _buffers.emplace_back(BounceBufferPool::instance().get()); + _events.emplace_back(detail::EventPool::instance().get()); } } template void BounceBufferRing::enqueue_h2d(void* device_dst, std::size_t size, CUstream stream) { - KVIKIO_NVTX_FUNC_RANGE(); CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyHtoDAsync( convert_void2deviceptr(device_dst), cur_buffer(), size, stream)); } @@ -242,7 +244,6 @@ void BounceBufferRing::enqueue_h2d(void* device_dst, std::size_t size template void BounceBufferRing::enqueue_d2h(void* device_src, std::size_t size, CUstream stream) { - KVIKIO_NVTX_FUNC_RANGE(); CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyDtoHAsync( cur_buffer(), convert_void2deviceptr(device_src), size, stream)); } @@ -253,10 +254,15 @@ void BounceBufferRing::advance(CUstream stream) KVIKIO_NVTX_FUNC_RANGE(); _cur_buffer_offset = 0; ++_cur_buf_idx; - if (_cur_buf_idx >= _num_buffers) { _cur_buf_idx = 0; } - if (_cur_buf_idx == _initial_buf_idx) { - CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream)); - _initial_buf_idx = _cur_buf_idx; + if (_cur_buf_idx >= _buffers.size()) { _cur_buf_idx = 0; } + + // Ensure the buffer we are advancing into is not still being read by a prior H2D + auto event = _events[_cur_buf_idx].get(); + auto event_status = cudaAPI::instance().EventQuery(event); + if (event_status == CUDA_ERROR_NOT_READY) { + CUDA_DRIVER_TRY(cudaAPI::instance().EventSynchronize(event)); + } else if (event_status != CUDA_SUCCESS) { + CUDA_DRIVER_TRY(event_status); } } @@ -281,7 +287,7 @@ std::size_t BounceBufferRing::buffer_size() const noexcept template std::size_t BounceBufferRing::num_buffers() const noexcept { - return _num_buffers; + return _buffers.size(); } template @@ -331,7 +337,8 @@ void BounceBufferRing::submit_h2d(void* device_dst, std::size_t size, { KVIKIO_NVTX_FUNC_RANGE(); enqueue_h2d(device_dst, size, stream); - advance(stream); + CUDA_DRIVER_TRY(cudaAPI::instance().EventRecord(_events[_cur_buf_idx].get(), stream)); + advance(); } template @@ -340,8 +347,7 @@ std::size_t BounceBufferRing::flush_h2d(void* device_dst, CUstream st KVIKIO_NVTX_FUNC_RANGE(); if (_cur_buffer_offset == 0) { return 0; } auto const flushed = _cur_buffer_offset; - enqueue_h2d(device_dst, _cur_buffer_offset, stream); - advance(stream); + submit_h2d(device_dst, _cur_buffer_offset, stream); return flushed; } @@ -357,7 +363,6 @@ void BounceBufferRing::reset(CUstream stream) { synchronize(stream); _cur_buf_idx = 0; - _initial_buf_idx = 0; _cur_buffer_offset = 0; } From db169e6d9d3a0025ce9d860fd4ed198a6ff2d83a Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Mon, 9 Feb 2026 10:39:35 -0500 Subject: [PATCH 36/37] Fix error --- cpp/src/bounce_buffer.cpp | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/cpp/src/bounce_buffer.cpp b/cpp/src/bounce_buffer.cpp index 2d37923487..1ae0ed0730 100644 --- a/cpp/src/bounce_buffer.cpp +++ b/cpp/src/bounce_buffer.cpp @@ -242,14 +242,7 @@ void BounceBufferRing::enqueue_h2d(void* device_dst, std::size_t size } template -void BounceBufferRing::enqueue_d2h(void* device_src, std::size_t size, CUstream stream) -{ - CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyDtoHAsync( - cur_buffer(), convert_void2deviceptr(device_src), size, stream)); -} - -template -void BounceBufferRing::advance(CUstream stream) +void BounceBufferRing::advance() { KVIKIO_NVTX_FUNC_RANGE(); _cur_buffer_offset = 0; From 67c2825e6892510a447284d1453e75824e171ce1 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Mon, 9 Feb 2026 11:11:31 -0500 Subject: [PATCH 37/37] Update --- cpp/include/kvikio/detail/posix_io.hpp | 39 ++++++++++++++++++++++++-- 1 file changed, 37 insertions(+), 2 deletions(-) diff --git a/cpp/include/kvikio/detail/posix_io.hpp b/cpp/include/kvikio/detail/posix_io.hpp index baefb5c31e..4e50effae3 100644 --- a/cpp/include/kvikio/detail/posix_io.hpp +++ b/cpp/include/kvikio/detail/posix_io.hpp @@ -193,7 +193,7 @@ std::size_t posix_device_io(int fd_direct_off, { // Direct I/O requires page-aligned bounce buffers. CudaPinnedBounceBufferPool uses // cudaMemHostAlloc which does not guarantee page alignment. - if (std::is_same_v) { + if constexpr (std::is_same_v) { KVIKIO_EXPECT( fd_direct_on == -1, "Direct I/O requires page-aligned bounce buffers. CudaPinnedBounceBufferPool does not " @@ -333,6 +333,39 @@ std::size_t posix_device_write(int fd_direct_off, std::size_t devPtr_offset, int fd_direct_on = -1); +/** + * @brief Read from file into device memory using bounce buffer ring for I/O-H2D overlap. + * + * Performs the same logical operation as read(), but stages data through a per-(context, thread) + * bounce buffer ring to overlap POSIX file reads with asynchronous host-to-device transfers. The + * ring is obtained from BounceBufferRingCachePerThreadAndContext and the H2D stream from + * StreamCachePerThreadAndContext; both are keyed by the current CUDA context and calling thread. + * + * After all file data has been submitted to the ring, an event is recorded on the H2D stream via @p + * io_context so that the caller can defer synchronization until all parallel tasks have completed. + * The caller must invoke `io_context->sync_all_events()` before accessing the device buffer. + * + * When GDS is available and compatibility mode is not preferred, the call falls through to + * cuFileRead directly (identical to read()). + * + * @param io_context Shared I/O context that collects CUDA events from each parallel task. The + * caller is responsible for calling `sync_all_events()` after all tasks finish to guarantee data + * visibility on the device. + * @param devPtr_base Base address of buffer in device memory. For registered buffers, must remain + * set to the base address used in the buffer_register call. + * @param size Size in bytes to read. + * @param file_offset Offset in the file to read from. + * @param devPtr_offset Offset relative to @p devPtr_base to read into. This parameter should be + * used only with registered buffers. + * @param sync_default_stream Synchronize the CUDA default (null) stream prior to calling cuFile. + * Ignored when in compatibility mode (POSIX path is already default-stream-ordered). + * @return Size of bytes that were successfully read (always equal to @p size, or throws). + * + * @exception kvikio::CUfileException on I/O or CUDA errors. + * + * @note The H2D transfers are asynchronous and may still be in flight when this function returns. + * Device data is only safe to consume after `io_context->sync_all_events()`. + */ template std::size_t posix_device_read_with_bounce_buffer_ring_impl(int fd_direct_off, void const* devPtr_base, @@ -341,9 +374,11 @@ std::size_t posix_device_read_with_bounce_buffer_ring_impl(int fd_direct_off, std::size_t devPtr_offset, int fd_direct_on = -1) { + KVIKIO_NVTX_FUNC_RANGE(size); + // Direct I/O requires page-aligned bounce buffers. CudaPinnedAllocator uses // cudaMemHostAlloc which does not guarantee page alignment. - if (std::is_same_v) { + if constexpr (std::is_same_v) { KVIKIO_EXPECT(fd_direct_on == -1, "Direct I/O requires page-aligned bounce buffers. CudaPinnedAllocator does not " "guarantee page alignment. Use CudaPageAlignedPinnedAllocator instead.");