Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
248 changes: 248 additions & 0 deletions cpp/include/kvikio/bounce_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ class PageAlignedAllocator {
* transferred to/from GPU device memory. The allocation is only guaranteed to be aligned to "at
* least 256 bytes". It is NOT guaranteed to be page aligned.
*
* @note Allocations use CU_MEMHOSTALLOC_PORTABLE, making them accessible from all CUDA contexts,
* not just the one that performed the allocation. This allows the singleton BounceBufferPool to
* safely serve buffers across multiple contexts and devices.
* @note Do NOT use with Direct I/O - lacks page alignment guarantee
*/
class CudaPinnedAllocator {
Expand Down Expand Up @@ -71,6 +74,9 @@ class CudaPinnedAllocator {
* (for efficient host-device transfers). Uses std::aligned_alloc followed by
* cudaMemHostRegister to achieve both properties.
*
* @note Registration uses CU_MEMHOSTALLOC_PORTABLE, making buffers accessible from all CUDA
* contexts, not just the one active during allocation. This allows the singleton BounceBufferPool
* to safely serve buffers across multiple contexts and devices.
* @note This is the required allocator for Direct I/O with device memory. Requires a valid CUDA
* context when allocating.
*/
Expand Down Expand Up @@ -111,6 +117,9 @@ class CudaPageAlignedPinnedAllocator {
* - CudaPinnedAllocator: For device I/O without Direct I/O
* - CudaPageAlignedPinnedAllocator: For device I/O with Direct I/O
*
* @note The singleton pool is safe for use across multiple CUDA contexts. CUDA-pinned allocators
* use CU_MEMHOSTALLOC_PORTABLE, ensuring buffers allocated in one context can be used from any
* other context.
* @note The destructor intentionally leaks allocations to avoid CUDA cleanup issues when static
* destructors run after CUDA context destruction
*/
Expand Down Expand Up @@ -268,4 +277,243 @@ using CudaPinnedBounceBufferPool = BounceBufferPool<CudaPinnedAllocator>;
* Provides both page alignment (for Direct I/O) and CUDA registration (for efficient transfers)
*/
using CudaPageAlignedPinnedBounceBufferPool = BounceBufferPool<CudaPageAlignedPinnedAllocator>;

/**
* @brief K-way bounce buffer ring for overlapping I/O with host-device transfers.
*
* Manages a ring of k bounce buffers to enable pipelining between file I/O and CUDA memory
* transfers. By rotating through multiple buffers, the ring allows async H2D copies to proceed
* while the next buffer is being filled.
*
* Synchronization occurs automatically when the ring wraps around to prevent overwriting buffers
* with in-flight transfers.
*
* @tparam Allocator The allocator policy for bounce buffers:
* - CudaPinnedAllocator: For device I/O without Direct I/O
* - CudaPageAlignedPinnedAllocator: For device I/O with Direct I/O
*
* @note This class is NOT thread-safe. Use one ring per thread or per operation.
* @note The internal bounce buffers use CU_MEMHOSTALLOC_PORTABLE and are accessible from any CUDA
* context. However, each transfer session (from first enqueue/submit through synchronize/reset)
* must use a stream and device pointers from a single, consistent CUDA context.
* @note In batch mode, H2D copies are deferred until wrap-around or synchronize(), trading overlap
* for reduced API call overhead.
* @note H2D and D2H operations should not be interleaved within a single transfer session. Call
* reset() between direction changes to ensure correct synchronization behavior.
*/
template <typename Allocator = CudaPinnedAllocator>
class BounceBufferRing {
public:
struct BatchTransferContext {
std::vector<void*> srcs;
std::vector<void*> dsts;
std::vector<std::size_t> sizes;

void add_entry(void* dst, void* src, std::size_t size);
void clear();
};

private:
std::vector<typename BounceBufferPool<Allocator>::Buffer> _buffers;
BatchTransferContext _batch_transfer_ctx;
std::size_t _num_buffers;
std::size_t _cur_buf_idx{0};
std::size_t _initial_buf_idx{0};
std::size_t _cur_buffer_offset{0};
bool _batch_copy;

public:
/**
* @brief Construct a bounce buffer ring.
*
* @param num_buffers Number of bounce buffers (k) for k-way overlap. Must be >= 1. Higher values
* allow more overlap but consume more memory.
* @param batch_copy If true, defer H2D copies and issue them in batches. Useful for many small
* transfers where API overhead dominates. If false (default), issue H2D copies immediately for
* better overlap.
*/
explicit BounceBufferRing(std::size_t num_buffers = 1, bool batch_copy = false);

~BounceBufferRing() noexcept = default;

// Non-copyable, non-movable
BounceBufferRing(BounceBufferRing const&) = delete;
BounceBufferRing& operator=(BounceBufferRing const&) = delete;
BounceBufferRing(BounceBufferRing&&) = delete;
BounceBufferRing& operator=(BounceBufferRing&&) = delete;

/**
* @brief Get pointer to the current bounce buffer.
*
* Use this to fill the buffer directly (e.g., via pread), then call submit_h2d() to transfer the
* data to device.
*
* @return Pointer to the start of the current buffer.
*/
[[nodiscard]] void* cur_buffer() const noexcept;

/**
* @brief Get pointer to the current bounce buffer at a specific offset.
*
* Useful for partial buffer fills or when accumulating data incrementally.
*
* @param offset Byte offset from the start of the current buffer.
* @return Pointer to cur_buffer() + offset.
*/
[[nodiscard]] void* cur_buffer(std::ptrdiff_t offset) const noexcept;

/**
* @brief Get the size of each bounce buffer in the ring.
*
* All buffers in the ring have the same size, determined by defaults::bounce_buffer_size() at
* ring construction time.
*
* @return Size in bytes of each buffer.
*/
[[nodiscard]] std::size_t buffer_size() const noexcept;

/**
* @brief Get the number of buffers in the ring (k for k-way overlap).
*
* @return Number of bounce buffers.
*/
[[nodiscard]] std::size_t num_buffers() const noexcept;

/**
* @brief Get the current fill level of the active buffer.
*
* Indicates how many bytes have been accumulated in the current buffer via
* accumulate_and_submit_h2d(). Reset to 0 after each advance().
*
* @return Number of bytes currently in the buffer.
*/
[[nodiscard]] std::size_t cur_buffer_offset() const noexcept;

/**
* @brief Get remaining number of bytes in current buffer for accumulation.
*/
[[nodiscard]] std::size_t cur_buffer_remaining_capacity() const noexcept;

/**
* @brief Advance to next buffer in the ring.
*
* Resets current buffer offset and moves to next buffer index. If wrapping around to the oldest
* in-flight buffer, synchronizes the stream first to ensure that buffer's transfer is complete.
*
* @param stream CUDA stream to synchronize on wrap-around.
*/
void advance(CUstream stream);

/**
* @brief Queue async copy from current bounce buffer to device memory.
*
* In non-batch mode, issues cuMemcpyHtoDAsync immediately. In batch mode, defers the copy until
* wrap-around or synchronize().
*
* @param device_dst Device memory destination.
* @param size Bytes to copy from cur_buffer().
* @param stream CUDA stream for the async transfer.
*
* @note Does NOT advance to next buffer. Call submit_h2d() for copy + advance.
*/
void enqueue_h2d(void* device_dst, std::size_t size, CUstream stream);

/**
* @brief Async copy from device memory to current bounce buffer.
*
* @param device_src Device memory source.
* @param size Bytes to copy.
* @param stream CUDA stream for the async transfer.
*/
void enqueue_d2h(void* device_src, std::size_t size, CUstream stream);

/**
* @brief Accumulate data into bounce buffer, auto-submit when full.
*
* Copies host data into the internal buffer. When the buffer fills, issues an async H2D copy and
* advances to the next buffer. Handles data larger than buffer_size() by splitting across
* multiple buffers.
*
* Typical usage for streaming host data to device:
* @code
* while (has_more_data()) {
* auto submitted = ring.accumulate_and_submit_h2d(device_ptr, host_data, chunk_size, stream);
* device_ptr += submitted;
* }
* auto flushed = ring.flush_h2d(device_ptr, stream);
* device_ptr += flushed;
* ring.synchronize(stream);
* @endcode
*
* @param device_dst Device memory destination (should track cumulative offset externally).
* @param host_src Source data in host memory.
* @param size Bytes to copy.
* @param stream CUDA stream for async H2D transfers.
* @return Number of bytes submitted to device (always a multiple of buffer_size(); partial buffer
* contents remain until flush_h2d()).
*
* @note Partial buffer contents remain until flush_h2d() is called.
* @note Final data visibility requires flush_h2d() + synchronize().
*/
std::size_t accumulate_and_submit_h2d(void* device_dst,
void const* host_src,
std::size_t size,
CUstream stream);

/**
* @brief Submit current buffer contents to device and advance to next buffer.
*
* Typical usage pattern for direct-fill (e.g., pread into buffer):
* @code
* ssize_t n = pread(fd, ring.cur_buffer(), ring.buffer_size(), offset);
* ring.submit_h2d(device_ptr, n, stream);
* device_ptr += n;
* @endcode
*
* @param device_dst Device memory destination.
* @param size Bytes actually written to cur_buffer().
* @param stream CUDA stream for async H2D transfer.
*
* @note Synchronization may occur if this causes a wrap-around.
* @note Final data visibility requires calling synchronize() after all submits.
*/
void submit_h2d(void* device_dst, std::size_t size, CUstream stream);

/**
* @brief Flush any partially accumulated data to device.
*
* Call after accumulate_and_submit_h2d() to submit remaining data that didn't fill a complete
* buffer.
*
* @param device_dst Device memory destination for the partial buffer.
* @param stream CUDA stream for async H2D transfer.
* @return Number of bytes flushed (0 if buffer was empty).
*
* @note Still requires synchronize() for data visibility guarantee.
*/
std::size_t flush_h2d(void* device_dst, CUstream stream);

/**
* @brief Ensure all queued H2D transfers are complete.
*
* In batch mode, issues any pending batch copies first. Then synchronizes the stream to guarantee
* all data is visible in device memory.
*
* @param stream CUDA stream to synchronize.
*
* @note Must be called before reading transferred data on device.
* @note After synchronize(), the ring can be reused for new transfers.
*/
void synchronize(CUstream stream);

/**
* @brief Synchronize pending transfers and reset ring state for a new transfer session.
*
* Ensures all in-flight transfers complete, then resets the ring to its initial state. After
* reset(), the ring can be safely reused for either H2D or D2H operations.
*
* @param stream CUDA stream to synchronize.
*/
void reset(CUstream stream);
};
} // namespace kvikio
8 changes: 6 additions & 2 deletions cpp/include/kvikio/remote_handle.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION.
* SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION.
* SPDX-License-Identifier: Apache-2.0
*/
#pragma once
Expand All @@ -11,6 +11,7 @@
#include <optional>
#include <string>

#include <kvikio/bounce_buffer.hpp>
#include <kvikio/defaults.hpp>
#include <kvikio/error.hpp>
#include <kvikio/threadpool_wrapper.hpp>
Expand Down Expand Up @@ -441,7 +442,10 @@ class RemoteHandle {
* @param file_offset File offset in bytes.
* @return Number of bytes read, which is always `size`.
*/
std::size_t read(void* buf, std::size_t size, std::size_t file_offset = 0);
std::size_t read(void* buf,
std::size_t size,
std::size_t file_offset = 0,
BounceBufferRing<CudaPinnedAllocator>* bounce_buffer_ring = nullptr);

/**
* @brief Read from remote source into buffer (host or device memory) in parallel.
Expand Down
Loading