diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index eacb406732..a2b22031e7 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -153,6 +153,7 @@ set(SOURCES "src/file_utils.cpp" "src/mmap.cpp" "src/detail/env.cpp" + "src/detail/event.cpp" "src/detail/nvtx.cpp" "src/detail/posix_io.cpp" "src/detail/stream.cpp" diff --git a/cpp/include/kvikio/bounce_buffer.hpp b/cpp/include/kvikio/bounce_buffer.hpp index 4ebc914d10..55eec1d4c4 100644 --- a/cpp/include/kvikio/bounce_buffer.hpp +++ b/cpp/include/kvikio/bounce_buffer.hpp @@ -4,9 +4,14 @@ */ #pragma once +#include +#include #include +#include +#include #include +#include namespace kvikio { @@ -43,6 +48,9 @@ class PageAlignedAllocator { * transferred to/from GPU device memory. The allocation is only guaranteed to be aligned to "at * least 256 bytes". It is NOT guaranteed to be page aligned. * + * @note Allocations use CU_MEMHOSTALLOC_PORTABLE, making them accessible from all CUDA contexts, + * not just the one that performed the allocation. This allows the singleton BounceBufferPool to + * safely serve buffers across multiple contexts and devices. * @note Do NOT use with Direct I/O - lacks page alignment guarantee */ class CudaPinnedAllocator { @@ -71,6 +79,9 @@ class CudaPinnedAllocator { * (for efficient host-device transfers). Uses std::aligned_alloc followed by * cudaMemHostRegister to achieve both properties. * + * @note Registration uses CU_MEMHOSTREGISTER_PORTABLE, making buffers accessible from all CUDA + * contexts, not just the one active during allocation. This allows the singleton BounceBufferPool + * to safely serve buffers across multiple contexts and devices. * @note This is the required allocator for Direct I/O with device memory. Requires a valid CUDA * context when allocating. */ @@ -111,6 +122,9 @@ class CudaPageAlignedPinnedAllocator { * - CudaPinnedAllocator: For device I/O without Direct I/O * - CudaPageAlignedPinnedAllocator: For device I/O with Direct I/O * + * @note The singleton pool is safe for use across multiple CUDA contexts. CUDA-pinned allocators + * use CU_MEMHOSTALLOC_PORTABLE, ensuring buffers allocated in one context can be used from any + * other context. * @note The destructor intentionally leaks allocations to avoid CUDA cleanup issues when static * destructors run after CUDA context destruction */ @@ -134,13 +148,16 @@ class BounceBufferPool { * @note Non-copyable but movable to allow transfer of ownership while maintaining RAII */ class Buffer { + friend BounceBufferPool; + private: BounceBufferPool* _pool; void* _buffer; std::size_t _size; - public: Buffer(BounceBufferPool* pool, void* buffer, std::size_t size); + + public: Buffer(Buffer const&) = delete; Buffer& operator=(Buffer const&) = delete; Buffer(Buffer&& o) noexcept; @@ -268,4 +285,277 @@ using CudaPinnedBounceBufferPool = BounceBufferPool; * Provides both page alignment (for Direct I/O) and CUDA registration (for efficient transfers) */ using CudaPageAlignedPinnedBounceBufferPool = BounceBufferPool; + +/** + * @brief K-way bounce buffer ring for overlapping I/O with host-device transfers. + * + * Manages a ring of k bounce buffers to enable pipelining between file I/O and CUDA memory + * transfers. By rotating through multiple buffers, the ring allows async H2D copies to proceed + * while the next buffer is being filled. + * + * Synchronization occurs automatically when the ring wraps around to prevent overwriting buffers + * with in-flight transfers. + * + * @tparam Allocator The allocator policy for bounce buffers: + * - CudaPinnedAllocator: For device I/O without Direct I/O + * - CudaPageAlignedPinnedAllocator: For device I/O with Direct I/O + * + * @note This class is NOT thread-safe. Use one ring per thread or per operation. + * @note The internal bounce buffers use CU_MEMHOSTALLOC_PORTABLE and are accessible from any CUDA + * context. However, each transfer session (from first enqueue/submit through synchronize/reset) + * must use a stream and device pointers from a single, consistent CUDA context. + */ +template +class BounceBufferRing { + private: + std::vector::Buffer> _buffers; + std::size_t _cur_buf_idx{0}; + std::size_t _cur_buffer_offset{0}; + std::vector _events; + + /** + * @brief Advance to next buffer in the ring. + * + * Resets current buffer offset and moves to next buffer index. Waits on the target buffer's event + * if a prior async transfer is still in flight, ensuring the buffer is safe for CPU writes. + */ + void advance(); + + /** + * @brief Queue async copy from current bounce buffer to device memory. + * + * @param device_dst Device memory destination. + * @param size Bytes to copy from cur_buffer(). + * @param stream CUDA stream for the async transfer. + * + * @note Does NOT advance to next buffer. Call submit_h2d() for copy + advance. + */ + void enqueue_h2d(void* device_dst, std::size_t size, CUstream stream); + + public: + /** + * @brief Construct a bounce buffer ring. + * + * @param num_buffers Number of bounce buffers (k) for k-way overlap. Must be >= 1. Higher values + * allow more overlap but consume more memory. + */ + explicit BounceBufferRing(std::size_t num_buffers = 1); + + ~BounceBufferRing() noexcept = default; + + // Non-copyable, non-movable + BounceBufferRing(BounceBufferRing const&) = delete; + BounceBufferRing& operator=(BounceBufferRing const&) = delete; + BounceBufferRing(BounceBufferRing&&) = delete; + BounceBufferRing& operator=(BounceBufferRing&&) = delete; + + /** + * @brief Get pointer to the current bounce buffer. + * + * Use this to fill the buffer directly (e.g., via pread), then call submit_h2d() to transfer the + * data to device. + * + * @return Pointer to the start of the current buffer. + */ + [[nodiscard]] void* cur_buffer() const noexcept; + + /** + * @brief Get pointer to the current bounce buffer at a specific offset. + * + * Useful for partial buffer fills or when accumulating data incrementally. + * + * @param offset Byte offset from the start of the current buffer. + * @return Pointer to cur_buffer() + offset. + */ + [[nodiscard]] void* cur_buffer(std::ptrdiff_t offset) const noexcept; + + /** + * @brief Get the size of each bounce buffer in the ring. + * + * All buffers in the ring have the same size, determined by defaults::bounce_buffer_size() at + * ring construction time. + * + * @return Size in bytes of each buffer. + */ + [[nodiscard]] std::size_t buffer_size() const noexcept; + + /** + * @brief Get the number of buffers in the ring (k for k-way overlap). + * + * @return Number of bounce buffers. + */ + [[nodiscard]] std::size_t num_buffers() const noexcept; + + /** + * @brief Get the current fill level of the active buffer. + * + * Indicates how many bytes have been accumulated in the current buffer via + * accumulate_and_submit_h2d(). Reset to 0 after each advance(). + * + * @return Number of bytes currently in the buffer. + */ + [[nodiscard]] std::size_t cur_buffer_offset() const noexcept; + + /** + * @brief Get remaining number of bytes in current buffer for accumulation. + */ + [[nodiscard]] std::size_t cur_buffer_remaining_capacity() const noexcept; + + /** + * @brief Accumulate data into bounce buffer, auto-submit when full. + * + * Copies host data into the internal buffer. When the buffer fills, issues an async H2D copy and + * advances to the next buffer. Handles data larger than buffer_size() by splitting across + * multiple buffers. + * + * Typical usage for streaming host data to device: + * @code + * while (has_more_data()) { + * auto submitted = ring.accumulate_and_submit_h2d(device_ptr, host_data, chunk_size, stream); + * device_ptr += submitted; + * } + * auto flushed = ring.flush_h2d(device_ptr, stream); + * device_ptr += flushed; + * ring.synchronize(stream); + * @endcode + * + * @param device_dst Device memory destination (should track cumulative offset externally). + * @param host_src Source data in host memory. + * @param size Bytes to copy. + * @param stream CUDA stream for async H2D transfers. + * @return Number of bytes submitted to device (always a multiple of buffer_size(); partial buffer + * contents remain until flush_h2d()). + * + * @note Partial buffer contents remain until flush_h2d() is called. + * @note Final data visibility requires flush_h2d() + synchronize(). + */ + std::size_t accumulate_and_submit_h2d(void* device_dst, + void const* host_src, + std::size_t size, + CUstream stream); + + /** + * @brief Submit current buffer contents to device and advance to next buffer. + * + * Typical usage pattern for direct-fill (e.g., pread into buffer): + * @code + * ssize_t n = pread(fd, ring.cur_buffer(), ring.buffer_size(), offset); + * ring.submit_h2d(device_ptr, n, stream); + * device_ptr += n; + * @endcode + * + * @param device_dst Device memory destination. + * @param size Bytes actually written to cur_buffer(). + * @param stream CUDA stream for async H2D transfer. + * + * @note Synchronization may occur if this causes a wrap-around. + * @note Final data visibility requires calling synchronize() after all submits. + */ + void submit_h2d(void* device_dst, std::size_t size, CUstream stream); + + /** + * @brief Flush any partially accumulated data to device. + * + * Call after accumulate_and_submit_h2d() to submit remaining data that didn't fill a complete + * buffer. + * + * @param device_dst Device memory destination for the partial buffer. + * @param stream CUDA stream for async H2D transfer. + * @return Number of bytes flushed (0 if buffer was empty). + * + * @note Still requires synchronize() for data visibility guarantee. + */ + std::size_t flush_h2d(void* device_dst, CUstream stream); + + /** + * @brief Ensure all queued H2D transfers are complete. + * + * @param stream CUDA stream to synchronize. + * + * @note Must be called before reading transferred data on device. + * @note After synchronize(), the ring can be reused for new transfers. + */ + void synchronize(CUstream stream); + + /** + * @brief Synchronize pending transfers and reset ring state for a new transfer session. + * + * Ensures all in-flight transfers complete, then resets the ring to its initial state. + * + * @param stream CUDA stream to synchronize. + */ + void reset(CUstream stream); +}; + +/** + * @brief Thread-safe singleton cache for per-thread, per-context bounce buffer rings. + * + * Manages a collection of BounceBufferRing instances, each uniquely associated with a (CUDA + * context, thread ID) pair. This ensures that each thread operating within a specific CUDA context + * gets its own dedicated ring, avoiding synchronization overhead during I/O operations while + * maintaining correct behavior across multi-context applications. + * + * @tparam Allocator The allocator policy for the underlying bounce buffer rings: + * - CudaPinnedAllocator: For device I/O without Direct I/O + * - CudaPageAlignedPinnedAllocator: For device I/O with Direct I/O + * + * @note Rings are created lazily on first access and persist for the lifetime of the program. + * @note The cache itself is thread-safe; individual rings are not (by design, since each ring is + * accessed by only one thread within one context). + * @note This class intentionally leaks its Ring allocations to avoid undefined behavior during + * process exit. Without this, the following destruction sequence causes crashes. By using raw + * pointers and never deleting them, we avoid calling into CUDA during static destruction. The OS + * reclaims all process memory at exit regardless. + * - `BounceBufferRingCachePerThreadAndContext` singleton is destroyed + * - Its map of `Ring*` entries would be deleted + * - Each `Ring` destructor destroys its `std::vector` + * - Each `Buffer` destructor calls `BounceBufferPool::put()` to return memory + * - `put()` may call `CudaPinnedAllocator::deallocate()` which includes CUDA API call + * - CUDA driver is already shut down, which leads to heap corruption / crash (sample error message: + * "free(): corrupted unsorted chunks") + * + * @sa BounceBufferPool, EventPool (same intentional leak pattern) + * @sa https://docs.nvidia.com/cuda/cuda-c-programming-guide/index.html#initialization + */ +template +class BounceBufferRingCachePerThreadAndContext { + public: + using Ring = BounceBufferRing; + + private: + std::map, Ring*> _rings; + std::mutex mutable _mutex; + + BounceBufferRingCachePerThreadAndContext() = default; + ~BounceBufferRingCachePerThreadAndContext() = default; + + public: + // Non-copyable, non-movable singleton + BounceBufferRingCachePerThreadAndContext(BounceBufferRingCachePerThreadAndContext const&) = + delete; + BounceBufferRingCachePerThreadAndContext& operator=( + BounceBufferRingCachePerThreadAndContext const&) = delete; + BounceBufferRingCachePerThreadAndContext(BounceBufferRingCachePerThreadAndContext&&) = delete; + BounceBufferRingCachePerThreadAndContext& operator=(BounceBufferRingCachePerThreadAndContext&&) = + delete; + + /** + * @brief Get the bounce buffer ring for the current thread and CUDA context. + * + * Returns the cached ring for the calling thread's current CUDA context, creating one if it + * doesn't exist. The ring is configured with `defaults::bounce_buffer_count()` buffers. + * + * @return Reference to the ring associated with (current context, current thread). + * @exception kvikio::CUfileException if no CUDA context is current. + */ + Ring& ring(); + + /** + * @brief Get the singleton instance of the cache. + * + * @return Reference to the singleton cache instance. + */ + static BounceBufferRingCachePerThreadAndContext& instance(); +}; + } // namespace kvikio diff --git a/cpp/include/kvikio/defaults.hpp b/cpp/include/kvikio/defaults.hpp index 190909c2cc..06e13b0997 100644 --- a/cpp/include/kvikio/defaults.hpp +++ b/cpp/include/kvikio/defaults.hpp @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2022-2025, NVIDIA CORPORATION. + * SPDX-FileCopyrightText: Copyright (c) 2022-2026, NVIDIA CORPORATION. * SPDX-License-Identifier: Apache-2.0 */ @@ -116,6 +116,7 @@ class defaults { std::size_t _task_size; std::size_t _gds_threshold; std::size_t _bounce_buffer_size; + std::size_t _bounce_buffer_count; std::size_t _http_max_attempts; long _http_timeout; std::vector _http_status_codes; @@ -302,6 +303,10 @@ class defaults { */ static void set_bounce_buffer_size(std::size_t nbytes); + [[nodiscard]] static std::size_t bounce_buffer_count(); + + static void set_bounce_buffer_count(std::size_t count); + /** * @brief Get the maximum number of attempts per remote IO read. * diff --git a/cpp/include/kvikio/detail/event.hpp b/cpp/include/kvikio/detail/event.hpp new file mode 100644 index 0000000000..34831ce0f8 --- /dev/null +++ b/cpp/include/kvikio/detail/event.hpp @@ -0,0 +1,164 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. + * SPDX-License-Identifier: Apache-2.0 + */ +#pragma once + +#include +#include +#include + +#include + +namespace kvikio::detail { +/** + * @brief Thread-safe singleton pool for reusable CUDA events + * + * Manages a pool of CUDA events organized by CUDA context. Events are retained and reused across + * calls to minimize allocation overhead. Each context maintains its own separate pool of events + * since CUDA events are context-specific resources. + * + * All events are created with `CU_EVENT_DISABLE_TIMING` for minimal overhead. + * + * Call `EventPool::instance().get()` to acquire an event that will be automatically returned to the + * pool when it goes out of scope (RAII). + * + * @note The destructor intentionally leaks events to avoid CUDA cleanup issues when static + * destructors run after CUDA context destruction. @sa BounceBufferPool + */ +class EventPool { + public: + /** + * @brief RAII wrapper for a pooled CUDA event + * + * Automatically returns the event to the pool when destroyed. Provides access to the underlying + * CUevent handle and common event operations (record, synchronize). + * + * @note Non-copyable but movable to allow transfer of ownership while maintaining RAII + */ + class Event { + friend class EventPool; + + private: + EventPool* _pool{}; + CUevent _event{}; + CUcontext _cuda_context{}; + + /** + * @brief Construct an Event wrapping a CUDA event handle + * + * @param pool The owning EventPool to return this event to on destruction + * @param event The CUDA event handle to wrap + * @param context The CUDA context associated with this event + */ + explicit Event(EventPool* pool, CUevent event, CUcontext context) noexcept; + + public: + ~Event() noexcept; + + // Move-only + Event(Event const&) = delete; + Event& operator=(Event const&) = delete; + Event(Event&& o) noexcept; + Event& operator=(Event&& o) noexcept; + + /** + * @brief Get the underlying CUDA event handle + * + * @return The CUevent handle wrapped by this object + */ + [[nodiscard]] CUevent get() const noexcept; + + /** + * @brief Get the CUDA context associated with this event + * + * @return The CUcontext this event belongs to + */ + [[nodiscard]] CUcontext cuda_context() const noexcept; + + /** + * @brief Record the event on a CUDA stream + * + * Records the event to capture the current state of the stream. The event will be signaled when + * all preceding operations on the stream have completed. + * + * @param stream The CUDA stream to record the event on (must belong to the same context as this + * event) + * + * @exception kvikio::CUfileException if the record operation fails + */ + void record(CUstream stream); + + /** + * @brief Block the calling thread until the event has been signaled + * + * Waits for all work captured by a preceding record() call to complete. + * + * @exception kvikio::CUfileException if the synchronize operation fails + */ + void synchronize(); + }; + + private: + std::mutex mutable _mutex; + // Per-context pools of free events + std::unordered_map> _pools; + + EventPool() = default; + + // Intentionally leak events during static destruction. @sa BounceBufferPool + ~EventPool() noexcept = default; + + public: + // Non-copyable, non-movable singleton + EventPool(EventPool const&) = delete; + EventPool& operator=(EventPool const&) = delete; + EventPool(EventPool&&) = delete; + EventPool& operator=(EventPool&&) = delete; + + /** + * @brief Acquire a CUDA event from the pool for the current CUDA context. + * + * Returns a cached event for the current CUDA context if available, otherwise creates a new one. + * The returned Event object will automatically return the event to the pool when it goes out of + * scope. + * + * @return RAII Event object wrapping the acquired CUDA event + * @exception kvikio::CUfileException if no CUDA context is current or event creation fails + */ + [[nodiscard]] Event get(); + + /** + * @brief Return an event to the pool for reuse + * + * Typically called automatically by Event's destructor. Adds the event to the pool associated + * with its context for future reuse. + * + * @param event The CUDA event handle to return + * @param context The CUDA context associated with the event + */ + void put(CUevent event, CUcontext context) noexcept; + + /** + * @brief Get the number of free events for a specific context + * + * @param context The CUDA context to query + * @return The number of events available for reuse in that context's pool + */ + [[nodiscard]] std::size_t num_free_events(CUcontext context) const; + + /** + * @brief Get the total number of free events across all contexts + * + * @return The total count of events available for reuse + */ + [[nodiscard]] std::size_t total_free_events() const; + + /** + * @brief Get the singleton instance of the event pool + * + * @return Reference to the singleton EventPool instance + */ + static EventPool& instance(); +}; +} // namespace kvikio::detail diff --git a/cpp/include/kvikio/shim/cuda.hpp b/cpp/include/kvikio/shim/cuda.hpp index 22c8276817..1773842417 100644 --- a/cpp/include/kvikio/shim/cuda.hpp +++ b/cpp/include/kvikio/shim/cuda.hpp @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2022-2025, NVIDIA CORPORATION. + * SPDX-FileCopyrightText: Copyright (c) 2022-2026, NVIDIA CORPORATION. * SPDX-License-Identifier: Apache-2.0 */ #pragma once @@ -107,6 +107,11 @@ class cudaAPI { decltype(cuStreamCreate)* StreamCreate{nullptr}; decltype(cuStreamDestroy)* StreamDestroy{nullptr}; decltype(cuDriverGetVersion)* DriverGetVersion{nullptr}; + decltype(cuEventSynchronize)* EventSynchronize{nullptr}; + decltype(cuEventCreate)* EventCreate{nullptr}; + decltype(cuEventDestroy)* EventDestroy{nullptr}; + decltype(cuEventRecord)* EventRecord{nullptr}; + decltype(cuEventQuery)* EventQuery{nullptr}; private: cudaAPI(); diff --git a/cpp/src/bounce_buffer.cpp b/cpp/src/bounce_buffer.cpp index 629c26550e..1ae0ed0730 100644 --- a/cpp/src/bounce_buffer.cpp +++ b/cpp/src/bounce_buffer.cpp @@ -9,6 +9,7 @@ #include #include +#include #include #include #include @@ -32,14 +33,14 @@ void* CudaPinnedAllocator::allocate(std::size_t size) { void* buffer{}; - // If no available allocation, allocate and register a new one - // Allocate page-locked host memory - // Under unified addressing, host memory allocated this way is automatically portable and - // mapped. + // Allocate page-locked (pinned) host memory with CU_MEMHOSTALLOC_PORTABLE. The PORTABLE flag + // ensures this memory is accessible from all CUDA contexts, which is essential for the singleton + // BounceBufferPool that may serve multiple contexts and devices. CUDA_DRIVER_TRY(cudaAPI::instance().MemHostAlloc(&buffer, size, CU_MEMHOSTALLOC_PORTABLE)); return buffer; } + void CudaPinnedAllocator::deallocate(void* buffer, std::size_t /*size*/) { CUDA_DRIVER_TRY(cudaAPI::instance().MemFreeHost(buffer)); @@ -52,8 +53,11 @@ void* CudaPageAlignedPinnedAllocator::allocate(std::size_t size) auto const aligned_size = detail::align_up(size, page_size); buffer = std::aligned_alloc(page_size, aligned_size); KVIKIO_EXPECT(buffer != nullptr, "Aligned allocation failed"); + // Register the page-aligned allocation as pinned memory with CU_MEMHOSTREGISTER_PORTABLE. The + // PORTABLE flag ensures this memory is accessible from all CUDA contexts, which is essential for + // the singleton BounceBufferPool that may serve multiple contexts and devices. CUDA_DRIVER_TRY( - cudaAPI::instance().MemHostRegister(buffer, aligned_size, CU_MEMHOSTALLOC_PORTABLE)); + cudaAPI::instance().MemHostRegister(buffer, aligned_size, CU_MEMHOSTREGISTER_PORTABLE)); return buffer; } @@ -109,15 +113,13 @@ BounceBufferPool::Buffer& BounceBufferPool::Buffer::operat template void* BounceBufferPool::Buffer::get() const noexcept { - KVIKIO_NVTX_FUNC_RANGE(); return _buffer; } template void* BounceBufferPool::Buffer::get(std::ptrdiff_t offset) const noexcept { - KVIKIO_NVTX_FUNC_RANGE(); - return static_cast(_buffer) + offset; + return static_cast(_buffer) + offset; } template @@ -217,4 +219,181 @@ BounceBufferPool& BounceBufferPool::instance() template class BounceBufferPool; template class BounceBufferPool; template class BounceBufferPool; + +template +BounceBufferRing::BounceBufferRing(std::size_t num_buffers) +{ + KVIKIO_NVTX_FUNC_RANGE(); + KVIKIO_EXPECT(num_buffers >= 1, "BounceBufferRing requires at least 1 buffer"); + + _buffers.reserve(num_buffers); + _events.reserve(num_buffers); + for (std::size_t i = 0; i < num_buffers; ++i) { + _buffers.emplace_back(BounceBufferPool::instance().get()); + _events.emplace_back(detail::EventPool::instance().get()); + } +} + +template +void BounceBufferRing::enqueue_h2d(void* device_dst, std::size_t size, CUstream stream) +{ + CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyHtoDAsync( + convert_void2deviceptr(device_dst), cur_buffer(), size, stream)); +} + +template +void BounceBufferRing::advance() +{ + KVIKIO_NVTX_FUNC_RANGE(); + _cur_buffer_offset = 0; + ++_cur_buf_idx; + if (_cur_buf_idx >= _buffers.size()) { _cur_buf_idx = 0; } + + // Ensure the buffer we are advancing into is not still being read by a prior H2D + auto event = _events[_cur_buf_idx].get(); + auto event_status = cudaAPI::instance().EventQuery(event); + if (event_status == CUDA_ERROR_NOT_READY) { + CUDA_DRIVER_TRY(cudaAPI::instance().EventSynchronize(event)); + } else if (event_status != CUDA_SUCCESS) { + CUDA_DRIVER_TRY(event_status); + } +} + +template +void* BounceBufferRing::cur_buffer() const noexcept +{ + return _buffers[_cur_buf_idx].get(); +} + +template +void* BounceBufferRing::cur_buffer(std::ptrdiff_t off) const noexcept +{ + return _buffers[_cur_buf_idx].get(off); +} + +template +std::size_t BounceBufferRing::buffer_size() const noexcept +{ + return _buffers[_cur_buf_idx].size(); +} + +template +std::size_t BounceBufferRing::num_buffers() const noexcept +{ + return _buffers.size(); +} + +template +std::size_t BounceBufferRing::cur_buffer_offset() const noexcept +{ + return _cur_buffer_offset; +} + +template +std::size_t BounceBufferRing::cur_buffer_remaining_capacity() const noexcept +{ + return buffer_size() - _cur_buffer_offset; +} + +template +std::size_t BounceBufferRing::accumulate_and_submit_h2d(void* device_dst, + void const* host_src, + std::size_t size, + CUstream stream) +{ + KVIKIO_NVTX_FUNC_RANGE(); + auto const* host_src_ptr = static_cast(host_src); + auto* device_dst_ptr = static_cast(device_dst); + + // The data is split across multiple buffers if its size is greater than buffer_size() + while (size > 0) { + auto const remaining_bytes = cur_buffer_remaining_capacity(); + auto const num_bytes_to_copy = std::min(size, remaining_bytes); + + // Copy from host to bounce buffer + std::memcpy(cur_buffer(_cur_buffer_offset), host_src_ptr, num_bytes_to_copy); + _cur_buffer_offset += num_bytes_to_copy; + host_src_ptr += num_bytes_to_copy; + size -= num_bytes_to_copy; + + if (_cur_buffer_offset >= buffer_size()) { + submit_h2d(device_dst_ptr, buffer_size(), stream); + device_dst_ptr += buffer_size(); + } + } + + return static_cast(device_dst_ptr - static_cast(device_dst)); +} + +template +void BounceBufferRing::submit_h2d(void* device_dst, std::size_t size, CUstream stream) +{ + KVIKIO_NVTX_FUNC_RANGE(); + enqueue_h2d(device_dst, size, stream); + CUDA_DRIVER_TRY(cudaAPI::instance().EventRecord(_events[_cur_buf_idx].get(), stream)); + advance(); +} + +template +std::size_t BounceBufferRing::flush_h2d(void* device_dst, CUstream stream) +{ + KVIKIO_NVTX_FUNC_RANGE(); + if (_cur_buffer_offset == 0) { return 0; } + auto const flushed = _cur_buffer_offset; + submit_h2d(device_dst, _cur_buffer_offset, stream); + return flushed; +} + +template +void BounceBufferRing::synchronize(CUstream stream) +{ + KVIKIO_NVTX_FUNC_RANGE(); + CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream)); +} + +template +void BounceBufferRing::reset(CUstream stream) +{ + synchronize(stream); + _cur_buf_idx = 0; + _cur_buffer_offset = 0; +} + +// Explicit instantiations +template class BounceBufferRing; +template class BounceBufferRing; + +template +BounceBufferRingCachePerThreadAndContext::Ring& +BounceBufferRingCachePerThreadAndContext::ring() +{ + KVIKIO_NVTX_FUNC_RANGE(); + + CUcontext ctx{nullptr}; + CUDA_DRIVER_TRY(cudaAPI::instance().CtxGetCurrent(&ctx)); + KVIKIO_EXPECT(ctx != nullptr, "No CUDA context is current"); + auto key = std::make_pair(ctx, std::this_thread::get_id()); + + std::lock_guard const lock(_mutex); + + auto it = _rings.find(key); + if (it == _rings.end()) { + auto ring = new Ring(defaults::bounce_buffer_count()); + it = _rings.emplace(key, ring).first; + } + return *it->second; +} + +template +BounceBufferRingCachePerThreadAndContext& +BounceBufferRingCachePerThreadAndContext::instance() +{ + static BounceBufferRingCachePerThreadAndContext instance; + return instance; +} + +// Explicit instantiations +template class BounceBufferRingCachePerThreadAndContext; +template class BounceBufferRingCachePerThreadAndContext; + } // namespace kvikio diff --git a/cpp/src/defaults.cpp b/cpp/src/defaults.cpp index 841e7314d3..2ec65bab8e 100644 --- a/cpp/src/defaults.cpp +++ b/cpp/src/defaults.cpp @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION. + * SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION. * SPDX-License-Identifier: Apache-2.0 */ @@ -114,6 +114,13 @@ defaults::defaults() env > 0, "KVIKIO_BOUNCE_BUFFER_SIZE has to be a positive integer", std::invalid_argument); _bounce_buffer_size = env; } + // Determine the default value of `bounce_buffer_count` + { + ssize_t const env = getenv_or("KVIKIO_BOUNCE_BUFFER_COUNT", 4); + KVIKIO_EXPECT( + env > 0, "KVIKIO_BOUNCE_BUFFER_COUNT has to be a positive integer", std::invalid_argument); + _bounce_buffer_count = env; + } // Determine the default value of `http_max_attempts` { ssize_t const env = getenv_or("KVIKIO_HTTP_MAX_ATTEMPTS", 3); @@ -213,6 +220,15 @@ void defaults::set_bounce_buffer_size(std::size_t nbytes) instance()->_bounce_buffer_size = nbytes; } +std::size_t defaults::bounce_buffer_count() { return instance()->_bounce_buffer_count; } + +void defaults::set_bounce_buffer_count(std::size_t count) +{ + KVIKIO_EXPECT( + count > 0, "Number of the bounce buffers must be a positive integer", std::invalid_argument); + instance()->_bounce_buffer_count = count; +} + std::size_t defaults::http_max_attempts() { return instance()->_http_max_attempts; } void defaults::set_http_max_attempts(std::size_t attempts) diff --git a/cpp/src/detail/event.cpp b/cpp/src/detail/event.cpp new file mode 100644 index 0000000000..74b88f09b5 --- /dev/null +++ b/cpp/src/detail/event.cpp @@ -0,0 +1,125 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include +#include +#include +#include + +namespace kvikio::detail { + +EventPool::Event::Event(EventPool* pool, CUevent event, CUcontext cuda_context) noexcept + : _pool(pool), _event(event), _cuda_context(cuda_context) +{ +} + +EventPool::Event::~Event() noexcept +{ + if (_event != nullptr) { _pool->put(_event, _cuda_context); } +} + +EventPool::Event::Event(Event&& o) noexcept + : _pool(std::exchange(o._pool, nullptr)), + _event(std::exchange(o._event, nullptr)), + _cuda_context(std::exchange(o._cuda_context, nullptr)) +{ +} + +EventPool::Event& EventPool::Event::operator=(Event&& o) noexcept +{ + if (this != &o) { + if (_event != nullptr) { _pool->put(_event, _cuda_context); } + _pool = std::exchange(o._pool, nullptr); + _event = std::exchange(o._event, nullptr); + _cuda_context = std::exchange(o._cuda_context, nullptr); + } + return *this; +} + +CUevent EventPool::Event::get() const noexcept { return _event; } + +CUcontext EventPool::Event::cuda_context() const noexcept { return _cuda_context; } + +void EventPool::Event::record(CUstream stream) +{ + CUDA_DRIVER_TRY(cudaAPI::instance().EventRecord(_event, stream)); +} + +void EventPool::Event::synchronize() +{ + KVIKIO_NVTX_FUNC_RANGE(); + CUDA_DRIVER_TRY(cudaAPI::instance().EventSynchronize(_event)); +} + +EventPool::Event EventPool::get() +{ + KVIKIO_NVTX_FUNC_RANGE(); + CUcontext ctx{}; + CUDA_DRIVER_TRY(cudaAPI::instance().CtxGetCurrent(&ctx)); + KVIKIO_EXPECT(ctx != nullptr, "No CUDA context is current"); + + CUevent event{}; + { + std::lock_guard const lock(_mutex); + // If the key (`ctx`) is found from the pool, assign the search result to `event` + if (auto it = _pools.find(ctx); it != _pools.end() && !it->second.empty()) { + event = it->second.back(); + it->second.pop_back(); + } + } + + if (event == nullptr) { + // Create an event outside the lock to improve performance. + // The pool is not updated here; the returned Event object will automatically return the event + // to the pool when it goes out of scope + CUDA_DRIVER_TRY(cudaAPI::instance().EventCreate(&event, CU_EVENT_DISABLE_TIMING)); + } + + return Event(this, event, ctx); +} + +void EventPool::put(CUevent event, CUcontext cuda_context) noexcept +{ + KVIKIO_NVTX_FUNC_RANGE(); + if (event == nullptr) { return; } + + try { + std::lock_guard const lock(_mutex); + _pools[cuda_context].push_back(event); + } catch (std::exception const& e) { + KVIKIO_LOG_ERROR(e.what()); + try { + // If returning to pool fails, destroy the event + CUDA_DRIVER_TRY(cudaAPI::instance().EventDestroy(event)); + } catch (std::exception const& e) { + KVIKIO_LOG_ERROR(e.what()); + } + } +} + +std::size_t EventPool::num_free_events(CUcontext cuda_context) const +{ + std::lock_guard const lock(_mutex); + auto it = _pools.find(cuda_context); + return (it != _pools.end()) ? it->second.size() : 0; +} + +std::size_t EventPool::total_free_events() const +{ + std::lock_guard const lock(_mutex); + std::size_t total{0}; + for (auto const& [_, events] : _pools) { + total += events.size(); + } + return total; +} + +EventPool& EventPool::instance() +{ + static EventPool pool; + return pool; +} + +} // namespace kvikio::detail diff --git a/cpp/src/shim/cuda.cpp b/cpp/src/shim/cuda.cpp index 693dd1bd2a..00538d6d19 100644 --- a/cpp/src/shim/cuda.cpp +++ b/cpp/src/shim/cuda.cpp @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION. + * SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION. * SPDX-License-Identifier: Apache-2.0 */ @@ -41,6 +41,11 @@ cudaAPI::cudaAPI() get_symbol(StreamCreate, lib, KVIKIO_STRINGIFY(cuStreamCreate)); get_symbol(StreamDestroy, lib, KVIKIO_STRINGIFY(cuStreamDestroy)); get_symbol(DriverGetVersion, lib, KVIKIO_STRINGIFY(cuDriverGetVersion)); + get_symbol(EventSynchronize, lib, KVIKIO_STRINGIFY(cuEventSynchronize)); + get_symbol(EventCreate, lib, KVIKIO_STRINGIFY(cuEventCreate)); + get_symbol(EventDestroy, lib, KVIKIO_STRINGIFY(cuEventDestroy)); + get_symbol(EventRecord, lib, KVIKIO_STRINGIFY(cuEventRecord)); + get_symbol(EventQuery, lib, KVIKIO_STRINGIFY(cuEventQuery)); CUDA_DRIVER_TRY(DriverGetVersion(&driver_version)); diff --git a/cpp/tests/test_bounce_buffer.cpp b/cpp/tests/test_bounce_buffer.cpp index c1959a2e0c..50e8b4fefc 100644 --- a/cpp/tests/test_bounce_buffer.cpp +++ b/cpp/tests/test_bounce_buffer.cpp @@ -59,12 +59,10 @@ TEST_F(BounceBufferTest, move_construction_and_move_assignment) { // Buffer reused auto buf1 = pool.get(); + auto buf2 = pool.get(); - // Manually create a new buffer outside the pool - kvikio::CudaPinnedAllocator allocator; - auto* buffer = allocator.allocate(pool.buffer_size()); - kvikio::CudaPinnedBounceBufferPool::Buffer buf2(&pool, buffer, pool.buffer_size()); - // Move assignment that adds the previous buffer to the pool and then transfers the ownership + // Move assignment that returns the previous buffer in buf2 to the pool and then transfers the + // ownership from buf1 to buf2 buf2 = std::move(buf1); }