From 04aff83d403da20be9751f89ef839b20375dbf0c Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Mon, 12 Jan 2026 11:35:45 -0500 Subject: [PATCH 01/34] Add a new remote I/O client based on libcurl poll-based multi API --- cpp/CMakeLists.txt | 14 ++++++++--- .../detail/remote_handle_poll_based.hpp | 23 +++++++++++++++++++ cpp/src/detail/remote_handle_poll_based.cpp | 15 ++++++++++++ cpp/src/remote_handle.cpp | 13 ++++++++++- cpp/tests/test_remote_handle.cpp | 23 ++++++++++++++++++- 5 files changed, 83 insertions(+), 5 deletions(-) create mode 100644 cpp/include/kvikio/detail/remote_handle_poll_based.hpp create mode 100644 cpp/src/detail/remote_handle_poll_based.cpp diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index a8897a27e2..a171cb6518 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -1,6 +1,6 @@ # ============================================================================= # cmake-format: off -# SPDX-FileCopyrightText: Copyright (c) 2021-2025, NVIDIA CORPORATION. +# SPDX-FileCopyrightText: Copyright (c) 2021-2026, NVIDIA CORPORATION. # SPDX-License-Identifier: Apache-2.0 # cmake-format: on # ============================================================================= @@ -163,8 +163,16 @@ set(SOURCES ) if(KvikIO_REMOTE_SUPPORT) - list(APPEND SOURCES "src/hdfs.cpp" "src/remote_handle.cpp" "src/detail/remote_handle.cpp" - "src/detail/tls.cpp" "src/detail/url.cpp" "src/shim/libcurl.cpp" + list( + APPEND + SOURCES + "src/hdfs.cpp" + "src/remote_handle.cpp" + "src/detail/remote_handle.cpp" + "src/detail/remote_handle_poll_based.cpp" + "src/detail/tls.cpp" + "src/detail/url.cpp" + "src/shim/libcurl.cpp" ) endif() diff --git a/cpp/include/kvikio/detail/remote_handle_poll_based.hpp b/cpp/include/kvikio/detail/remote_handle_poll_based.hpp new file mode 100644 index 0000000000..8802e15589 --- /dev/null +++ b/cpp/include/kvikio/detail/remote_handle_poll_based.hpp @@ -0,0 +1,23 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. + * SPDX-License-Identifier: Apache-2.0 + */ +#pragma once + +#include + +#include + +namespace kvikio::detail { + +class RemoteHandlePollBased { + public: + RemoteHandlePollBased(std::string const& url); + + std::size_t pread(void* buf, std::size_t size, std::size_t file_offset = 0); + + private: + CURLM* _multi; + std::string _url; +}; +} // namespace kvikio::detail diff --git a/cpp/src/detail/remote_handle_poll_based.cpp b/cpp/src/detail/remote_handle_poll_based.cpp new file mode 100644 index 0000000000..d5865c6377 --- /dev/null +++ b/cpp/src/detail/remote_handle_poll_based.cpp @@ -0,0 +1,15 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include + +namespace kvikio::detail { +RemoteHandlePollBased::RemoteHandlePollBased(std::string const& url) {} + +std::size_t RemoteHandlePollBased::pread(void* buf, std::size_t size, std::size_t file_offset) +{ + return 123; +} +} // namespace kvikio::detail diff --git a/cpp/src/remote_handle.cpp b/cpp/src/remote_handle.cpp index 7c917c9a0b..ae8251f67f 100644 --- a/cpp/src/remote_handle.cpp +++ b/cpp/src/remote_handle.cpp @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION. + * SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION. * SPDX-License-Identifier: Apache-2.0 */ @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -815,6 +816,16 @@ std::future RemoteHandle::pread(void* buf, KVIKIO_EXPECT(thread_pool != nullptr, "The thread pool must not be nullptr"); auto& [nvtx_color, call_idx] = detail::get_next_color_and_call_idx(); KVIKIO_NVTX_FUNC_RANGE(size); + + auto const remote_backend = + kvikio::getenv_or("KVIKIO_REMOTE_BACKEND", "LIBCURL_EASY"); + if (remote_backend == "LIBCURL_POLL_BASED") { + return std::async(std::launch::async, [&, this]() -> std::size_t { + detail::RemoteHandlePollBased poll_handle(_endpoint->str()); + return poll_handle.pread(buf, size, file_offset); + }); + } + auto task = [this](void* devPtr_base, std::size_t size, std::size_t file_offset, diff --git a/cpp/tests/test_remote_handle.cpp b/cpp/tests/test_remote_handle.cpp index 41d975fd00..908854b6b4 100644 --- a/cpp/tests/test_remote_handle.cpp +++ b/cpp/tests/test_remote_handle.cpp @@ -1,9 +1,10 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION. + * SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION. * SPDX-License-Identifier: Apache-2.0 */ #include +#include #include #include #include @@ -15,6 +16,7 @@ #include #include "utils/env.hpp" +#include "utils/utils.hpp" using ::testing::HasSubstr; using ::testing::ThrowsMessage; @@ -278,3 +280,22 @@ TEST_F(RemoteHandleTest, test_open) } } } + +TEST_F(RemoteHandleTest, poll_based) +{ + auto const url = kvikio::getenv_or("KVIKIO_REMOTE_URL", ""); + + auto endpoint = std::make_unique(url); + // auto endpoint = std::make_unique(url); + + kvikio::RemoteHandle remote_handle(std::move(endpoint)); + + kvikio::test::DevBuffer dev_buf(remote_handle.nbytes() / sizeof(double)); + auto fut = remote_handle.pread(dev_buf.ptr, remote_handle.nbytes()); + auto num_bytes_read = fut.get(); + + EXPECT_EQ(num_bytes_read, remote_handle.nbytes()); + auto host_buf = dev_buf.to_vector(); + std::cout << std::fixed << "d[0]: " << host_buf.front() << ", d[n-1]: " << host_buf.back() + << "\n"; +} From 8d976b235c3fd07c0f041321772bc32c780db34a Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Mon, 12 Jan 2026 16:00:44 -0500 Subject: [PATCH 02/34] Update --- .../kvikio/detail/remote_handle_poll_based.hpp | 15 +++++++++++++++ cpp/src/detail/remote_handle_poll_based.cpp | 17 ++++++++++++++++- 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/cpp/include/kvikio/detail/remote_handle_poll_based.hpp b/cpp/include/kvikio/detail/remote_handle_poll_based.hpp index 8802e15589..0a45cb5c59 100644 --- a/cpp/include/kvikio/detail/remote_handle_poll_based.hpp +++ b/cpp/include/kvikio/detail/remote_handle_poll_based.hpp @@ -8,16 +8,31 @@ #include +#define KVIKIO_CHECK_CURL_MULTI(err_code) \ + kvikio::detail::check_curl_multi(err_code, __FILE__, __LINE__) + namespace kvikio::detail { +inline void check_curl_multi(CURLMcode err_code, char const* filename, int line_number) +{ + if (err_code == CURLMcode::CURLM_OK) { return; } + std::stringstream ss; + ss << "libcurl error: " << curl_multi_strerror(err_code) << " at: " << filename << ":" + << line_number << "\n"; + throw std::runtime_error(ss.str()); +} + class RemoteHandlePollBased { public: RemoteHandlePollBased(std::string const& url); + ~RemoteHandlePollBased(); + std::size_t pread(void* buf, std::size_t size, std::size_t file_offset = 0); private: CURLM* _multi; std::string _url; + std::size_t _num_conns{8}; }; } // namespace kvikio::detail diff --git a/cpp/src/detail/remote_handle_poll_based.cpp b/cpp/src/detail/remote_handle_poll_based.cpp index d5865c6377..d29f666848 100644 --- a/cpp/src/detail/remote_handle_poll_based.cpp +++ b/cpp/src/detail/remote_handle_poll_based.cpp @@ -4,9 +4,24 @@ */ #include +#include namespace kvikio::detail { -RemoteHandlePollBased::RemoteHandlePollBased(std::string const& url) {} + +RemoteHandlePollBased::RemoteHandlePollBased(std::string const& url) : _url{url} +{ + _multi = curl_multi_init(); + KVIKIO_EXPECT(_multi != nullptr, "Failed to initialize libcurl multi API"); +} + +RemoteHandlePollBased::~RemoteHandlePollBased() +{ + try { + KVIKIO_CHECK_CURL_MULTI(curl_multi_cleanup(_multi)); + } catch (std::exception const& e) { + KVIKIO_LOG_ERROR(e.what()); + } +} std::size_t RemoteHandlePollBased::pread(void* buf, std::size_t size, std::size_t file_offset) { From 1715f038d00b71a839010466efe3ae6c6720b4aa Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Tue, 13 Jan 2026 10:52:37 -0500 Subject: [PATCH 03/34] Update --- cpp/include/kvikio/detail/remote_handle_poll_based.hpp | 4 ++-- cpp/src/detail/remote_handle_poll_based.cpp | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/cpp/include/kvikio/detail/remote_handle_poll_based.hpp b/cpp/include/kvikio/detail/remote_handle_poll_based.hpp index 0a45cb5c59..0233c968b8 100644 --- a/cpp/include/kvikio/detail/remote_handle_poll_based.hpp +++ b/cpp/include/kvikio/detail/remote_handle_poll_based.hpp @@ -24,7 +24,7 @@ inline void check_curl_multi(CURLMcode err_code, char const* filename, int line_ class RemoteHandlePollBased { public: - RemoteHandlePollBased(std::string const& url); + RemoteHandlePollBased(std::string const& url, std::size_t num_conns = 8); ~RemoteHandlePollBased(); @@ -33,6 +33,6 @@ class RemoteHandlePollBased { private: CURLM* _multi; std::string _url; - std::size_t _num_conns{8}; + std::size_t _num_conns; }; } // namespace kvikio::detail diff --git a/cpp/src/detail/remote_handle_poll_based.cpp b/cpp/src/detail/remote_handle_poll_based.cpp index d29f666848..0fec001090 100644 --- a/cpp/src/detail/remote_handle_poll_based.cpp +++ b/cpp/src/detail/remote_handle_poll_based.cpp @@ -8,7 +8,8 @@ namespace kvikio::detail { -RemoteHandlePollBased::RemoteHandlePollBased(std::string const& url) : _url{url} +RemoteHandlePollBased::RemoteHandlePollBased(std::string const& url, std::size_t num_conns) + : _url{url}, _num_conns{num_conns} { _multi = curl_multi_init(); KVIKIO_EXPECT(_multi != nullptr, "Failed to initialize libcurl multi API"); From 1863d63eb82bd2e1dc5a6860bb37ab08e088f8b0 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Tue, 13 Jan 2026 11:23:52 -0500 Subject: [PATCH 04/34] Make the bounce buffer move constructible and move assignable --- cpp/include/kvikio/bounce_buffer.hpp | 16 ++++++------ cpp/src/bounce_buffer.cpp | 38 ++++++++++++++++++++++++---- 2 files changed, 41 insertions(+), 13 deletions(-) diff --git a/cpp/include/kvikio/bounce_buffer.hpp b/cpp/include/kvikio/bounce_buffer.hpp index 8b7b45c2e9..f32015df00 100644 --- a/cpp/include/kvikio/bounce_buffer.hpp +++ b/cpp/include/kvikio/bounce_buffer.hpp @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION. + * SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION. * SPDX-License-Identifier: Apache-2.0 */ #pragma once @@ -117,7 +117,7 @@ class CudaPageAlignedPinnedAllocator { template class BounceBufferPool { private: - std::mutex _mutex{}; + mutable std::mutex _mutex{}; // Stack of free allocations (LIFO for cache locality) std::stack _free_buffers{}; // The size of each allocation in `_free_buffers` @@ -137,18 +137,18 @@ class BounceBufferPool { private: BounceBufferPool* _pool; void* _buffer; - std::size_t const _size; + std::size_t _size; public: Buffer(BounceBufferPool* pool, void* buffer, std::size_t size); Buffer(Buffer const&) = delete; Buffer& operator=(Buffer const&) = delete; - Buffer(Buffer&& o) = delete; - Buffer& operator=(Buffer&& o) = delete; + Buffer(Buffer&& o) noexcept; + Buffer& operator=(Buffer&& o) noexcept; ~Buffer() noexcept; - void* get() noexcept; - void* get(std::ptrdiff_t offset) noexcept; - std::size_t size() noexcept; + void* get() const noexcept; + void* get(std::ptrdiff_t offset) const noexcept; + std::size_t size() const noexcept; }; BounceBufferPool() = default; diff --git a/cpp/src/bounce_buffer.cpp b/cpp/src/bounce_buffer.cpp index d2f2c92b9c..88879ae1e2 100644 --- a/cpp/src/bounce_buffer.cpp +++ b/cpp/src/bounce_buffer.cpp @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION. + * SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION. * SPDX-License-Identifier: Apache-2.0 */ @@ -73,25 +73,53 @@ template BounceBufferPool::Buffer::~Buffer() noexcept { KVIKIO_NVTX_FUNC_RANGE(); - _pool->put(_buffer, _size); + if (_buffer) { + // Only return to the pool if not moved-from + _pool->put(_buffer, _size); + } +} + +template +BounceBufferPool::Buffer::Buffer(Buffer&& o) noexcept + : _pool(std::exchange(o._pool, nullptr)), + _buffer(std::exchange(o._buffer, nullptr)), + _size(std::exchange(o._size, 0)) +{ +} + +template +BounceBufferPool::Buffer& BounceBufferPool::Buffer::operator=( + Buffer&& o) noexcept +{ + if (this != &o) { + if (_buffer) { + // Return current buffer to the pool + _pool->put(_buffer, _size); + } + _pool = std::exchange(o._pool, nullptr); + _buffer = std::exchange(o._buffer, nullptr); + _size = std::exchange(o._size, 0); + } + + return *this; } template -void* BounceBufferPool::Buffer::get() noexcept +void* BounceBufferPool::Buffer::get() const noexcept { KVIKIO_NVTX_FUNC_RANGE(); return _buffer; } template -void* BounceBufferPool::Buffer::get(std::ptrdiff_t offset) noexcept +void* BounceBufferPool::Buffer::get(std::ptrdiff_t offset) const noexcept { KVIKIO_NVTX_FUNC_RANGE(); return static_cast(_buffer) + offset; } template -std::size_t BounceBufferPool::Buffer::size() noexcept +std::size_t BounceBufferPool::Buffer::size() const noexcept { return _size; } From 65299f62b647221c2ba23d48f13c883b3f016e48 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Tue, 13 Jan 2026 11:52:14 -0500 Subject: [PATCH 05/34] Update --- cpp/include/kvikio/bounce_buffer.hpp | 4 ++++ cpp/src/bounce_buffer.cpp | 14 ++++++++++++++ cpp/tests/CMakeLists.txt | 4 +++- cpp/tests/test_bounce_buffer.cpp | 4 ++++ 4 files changed, 25 insertions(+), 1 deletion(-) create mode 100644 cpp/tests/test_bounce_buffer.cpp diff --git a/cpp/include/kvikio/bounce_buffer.hpp b/cpp/include/kvikio/bounce_buffer.hpp index f32015df00..2deb4fafb1 100644 --- a/cpp/include/kvikio/bounce_buffer.hpp +++ b/cpp/include/kvikio/bounce_buffer.hpp @@ -212,6 +212,10 @@ class BounceBufferPool { */ std::size_t clear(); + std::size_t num_free_buffers() const; + + std::size_t buffer_size() const; + /** * @brief Get the singleton instance of the pool * diff --git a/cpp/src/bounce_buffer.cpp b/cpp/src/bounce_buffer.cpp index 88879ae1e2..85536bf294 100644 --- a/cpp/src/bounce_buffer.cpp +++ b/cpp/src/bounce_buffer.cpp @@ -189,6 +189,20 @@ std::size_t BounceBufferPool::clear() return _clear(); } +template +std::size_t BounceBufferPool::num_free_buffers() const +{ + std::lock_guard const lock(_mutex); + return _free_buffers.size(); +} + +template +std::size_t BounceBufferPool::buffer_size() const +{ + std::lock_guard const lock(_mutex); + return _buffer_size; +} + template BounceBufferPool& BounceBufferPool::instance() { diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index a44898f451..d267f6bc55 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -1,6 +1,6 @@ # ============================================================================= # cmake-format: off -# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION. +# SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION. # SPDX-License-Identifier: Apache-2.0 # cmake-format: on # ============================================================================= @@ -64,6 +64,8 @@ endfunction() kvikio_add_test(NAME BASIC_IO_TEST SOURCES test_basic_io.cpp utils/env.cpp) +kvikio_add_test(NAME BOUNCE_BUFFER_TEST SOURCES test_bounce_buffer.cpp) + kvikio_add_test(NAME DEFAULTS_TEST SOURCES test_defaults.cpp utils/env.cpp) kvikio_add_test(NAME ERROR_TEST SOURCES test_error.cpp) diff --git a/cpp/tests/test_bounce_buffer.cpp b/cpp/tests/test_bounce_buffer.cpp new file mode 100644 index 0000000000..11798920ee --- /dev/null +++ b/cpp/tests/test_bounce_buffer.cpp @@ -0,0 +1,4 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. + * SPDX-License-Identifier: Apache-2.0 + */ From 67d28a92153972c7c014a8e51d0dd097e4e61b7c Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Tue, 13 Jan 2026 13:55:12 -0500 Subject: [PATCH 06/34] Add detailed unit test --- cpp/include/kvikio/bounce_buffer.hpp | 15 ++++ cpp/tests/test_bounce_buffer.cpp | 118 +++++++++++++++++++++++++++ 2 files changed, 133 insertions(+) diff --git a/cpp/include/kvikio/bounce_buffer.hpp b/cpp/include/kvikio/bounce_buffer.hpp index 2deb4fafb1..a7facf812a 100644 --- a/cpp/include/kvikio/bounce_buffer.hpp +++ b/cpp/include/kvikio/bounce_buffer.hpp @@ -212,8 +212,23 @@ class BounceBufferPool { */ std::size_t clear(); + /** + * @brief Get the number of free buffers currently available in the pool + * + * Returns the count of buffers that have been returned to the pool and are ready for reuse. + * + * @return The number of buffers available for reuse + */ std::size_t num_free_buffers() const; + /** + * @brief Get the current buffer size used by the pool + * + * Returns the size of buffers currently managed by the pool. This reflects the value of + * `defaults::bounce_buffer_size()` as of the last pool operation. + * + * @return The size in bytes of each buffer in the pool + */ std::size_t buffer_size() const; /** diff --git a/cpp/tests/test_bounce_buffer.cpp b/cpp/tests/test_bounce_buffer.cpp index 11798920ee..c1959a2e0c 100644 --- a/cpp/tests/test_bounce_buffer.cpp +++ b/cpp/tests/test_bounce_buffer.cpp @@ -2,3 +2,121 @@ * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. * SPDX-License-Identifier: Apache-2.0 */ + +#include + +#include +#include + +#include "utils/utils.hpp" + +class BounceBufferTest : public testing::Test { + protected: + void SetUp() override { KVIKIO_CHECK_CUDA(cudaSetDevice(0)); } + void TearDown() override {} +}; + +TEST_F(BounceBufferTest, buffers_returned_to_pool) +{ + auto& pool = kvikio::CudaPinnedBounceBufferPool::instance(); + pool.clear(); + + EXPECT_EQ(pool.num_free_buffers(), 0); + + // Buffers created + { + auto buf1 = pool.get(); + auto buf2 = pool.get(); + // All created buffers are in use + EXPECT_EQ(pool.num_free_buffers(), 0); + } // buf1 and buf2 returned to the pool + EXPECT_EQ(pool.num_free_buffers(), 2); + + // Buffers reused + { + auto buf1 = pool.get(); + auto buf2 = pool.get(); + EXPECT_EQ(pool.num_free_buffers(), 0); + } // buf1 and buf2 returned to the pool + EXPECT_EQ(pool.num_free_buffers(), 2); +} + +TEST_F(BounceBufferTest, move_construction_and_move_assignment) +{ + auto& pool = kvikio::CudaPinnedBounceBufferPool::instance(); + pool.clear(); + + { + auto buf1 = pool.get(); + + // Move constructor that transfers the ownership + auto buf2 = std::move(buf1); + } + + // Only one return, not two + EXPECT_EQ(pool.num_free_buffers(), 1); + + { + // Buffer reused + auto buf1 = pool.get(); + + // Manually create a new buffer outside the pool + kvikio::CudaPinnedAllocator allocator; + auto* buffer = allocator.allocate(pool.buffer_size()); + kvikio::CudaPinnedBounceBufferPool::Buffer buf2(&pool, buffer, pool.buffer_size()); + // Move assignment that adds the previous buffer to the pool and then transfers the ownership + buf2 = std::move(buf1); + } + + EXPECT_EQ(pool.num_free_buffers(), 2); +} + +TEST_F(BounceBufferTest, buffer_size_changes_clears_pool) +{ + auto& pool = kvikio::CudaPinnedBounceBufferPool::instance(); + pool.clear(); + + auto original_size = kvikio::defaults::bounce_buffer_size(); + + // Populate pool with buffers at current size + { + auto buf1 = pool.get(); + auto buf2 = pool.get(); + } + EXPECT_EQ(pool.num_free_buffers(), 2); + EXPECT_EQ(pool.buffer_size(), original_size); + + // Change buffer size + auto new_size{original_size * 2}; + kvikio::defaults::set_bounce_buffer_size(new_size); + + // Next get() triggers _ensure_buffer_size(), clearing old buffers + { + auto buf = pool.get(); + EXPECT_EQ(buf.size(), new_size); + } + EXPECT_EQ(pool.num_free_buffers(), 1); // Only the new buffer + EXPECT_EQ(pool.buffer_size(), new_size); + + kvikio::defaults::set_bounce_buffer_size(original_size); +} + +TEST_F(BounceBufferTest, old_size_buffer_deallocated_not_returned) +{ + auto& pool = kvikio::CudaPinnedBounceBufferPool::instance(); + pool.clear(); + + auto original_size = kvikio::defaults::bounce_buffer_size(); + + { + auto buf = pool.get(); // Buffer at original size + + // Change size while buffer is outstanding + kvikio::defaults::set_bounce_buffer_size(original_size * 2); + } // buf destructor will call put() with mismatched size + + // Old buffer should have been deallocated, not returned to pool + EXPECT_EQ(pool.num_free_buffers(), 0); + + kvikio::defaults::set_bounce_buffer_size(original_size); +} From 3d47c769bdea3a115863df56bd47a6bc8999a09a Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Wed, 14 Jan 2026 00:31:36 -0500 Subject: [PATCH 07/34] Update --- .../detail/remote_handle_poll_based.hpp | 20 ++++++++- cpp/src/detail/remote_handle_poll_based.cpp | 44 ++++++++++++++++++- cpp/src/remote_handle.cpp | 4 +- 3 files changed, 63 insertions(+), 5 deletions(-) diff --git a/cpp/include/kvikio/detail/remote_handle_poll_based.hpp b/cpp/include/kvikio/detail/remote_handle_poll_based.hpp index 0233c968b8..85d508dd73 100644 --- a/cpp/include/kvikio/detail/remote_handle_poll_based.hpp +++ b/cpp/include/kvikio/detail/remote_handle_poll_based.hpp @@ -4,8 +4,11 @@ */ #pragma once +#include #include +#include +#include #include #define KVIKIO_CHECK_CURL_MULTI(err_code) \ @@ -22,9 +25,21 @@ inline void check_curl_multi(CURLMcode err_code, char const* filename, int line_ throw std::runtime_error(ss.str()); } +struct TransferContext { + bool overflow_error; + char* buf{}; + std::size_t chunk_size{}; + std::size_t bytes_transferred{}; + CudaPinnedBounceBufferPool::Buffer bounce_buffer; + + TransferContext(); +}; + class RemoteHandlePollBased { public: - RemoteHandlePollBased(std::string const& url, std::size_t num_conns = 8); + RemoteHandlePollBased(std::string const& url, + RemoteEndpoint* endpoint, + std::size_t num_conns = 8); ~RemoteHandlePollBased(); @@ -34,5 +49,8 @@ class RemoteHandlePollBased { CURLM* _multi; std::string _url; std::size_t _num_conns; + std::vector> _curl_easy_handles; + std::vector _transfer_ctxs; + RemoteEndpoint* _endpoint; }; } // namespace kvikio::detail diff --git a/cpp/src/detail/remote_handle_poll_based.cpp b/cpp/src/detail/remote_handle_poll_based.cpp index 0fec001090..5637aeee8f 100644 --- a/cpp/src/detail/remote_handle_poll_based.cpp +++ b/cpp/src/detail/remote_handle_poll_based.cpp @@ -3,16 +3,50 @@ * SPDX-License-Identifier: Apache-2.0 */ +#include +#include #include #include +#include +#include "kvikio/bounce_buffer.hpp" namespace kvikio::detail { +namespace { +std::size_t callback_host_memory(char* buffer, std::size_t size, std::size_t nmemb, void* userdata) +{ + KVIKIO_NVTX_FUNC_RANGE(); + auto* ctx = reinterpret_cast(userdata); + std::size_t const nbytes = size * nmemb; + + if (ctx->chunk_size < ctx->bytes_transferred + nbytes) { + ctx->overflow_error = true; + return CURL_WRITEFUNC_ERROR; + } + KVIKIO_NVTX_FUNC_RANGE(nbytes); + std::memcpy(ctx->buf + ctx->bytes_transferred, buffer, nbytes); + ctx->bytes_transferred += nbytes; + return nbytes; +} +} // namespace + +TransferContext::TransferContext() : bounce_buffer{CudaPinnedBounceBufferPool::instance().get()} {} -RemoteHandlePollBased::RemoteHandlePollBased(std::string const& url, std::size_t num_conns) - : _url{url}, _num_conns{num_conns} +RemoteHandlePollBased::RemoteHandlePollBased(std::string const& url, + RemoteEndpoint* endpoint, + std::size_t num_conns) + : _url{url}, _endpoint{endpoint}, _num_conns{num_conns}, _transfer_ctxs(_num_conns) { _multi = curl_multi_init(); KVIKIO_EXPECT(_multi != nullptr, "Failed to initialize libcurl multi API"); + + for (std::size_t i = 0; i < _num_conns; ++i) { + _curl_easy_handles.emplace_back( + std::make_unique(kvikio::LibCurl::instance().get_handle(), + kvikio::detail::fix_conda_file_path_hack(__FILE__), + KVIKIO_STRINGIFY(__LINE__))); + _endpoint->setopt(*_curl_easy_handles.back()); + _transfer_ctxs.emplace_back(); + } } RemoteHandlePollBased::~RemoteHandlePollBased() @@ -26,6 +60,12 @@ RemoteHandlePollBased::~RemoteHandlePollBased() std::size_t RemoteHandlePollBased::pread(void* buf, std::size_t size, std::size_t file_offset) { + if (size == 0) return 0; + + std::size_t const chunk_size = defaults::task_size(); + std::size_t num_chunks = (size + chunk_size - 1) / chunk_size; + std::size_t actual_num_conns = std::min(_num_conns, num_chunks); + return 123; } } // namespace kvikio::detail diff --git a/cpp/src/remote_handle.cpp b/cpp/src/remote_handle.cpp index ae8251f67f..7c67229549 100644 --- a/cpp/src/remote_handle.cpp +++ b/cpp/src/remote_handle.cpp @@ -819,9 +819,9 @@ std::future RemoteHandle::pread(void* buf, auto const remote_backend = kvikio::getenv_or("KVIKIO_REMOTE_BACKEND", "LIBCURL_EASY"); - if (remote_backend == "LIBCURL_POLL_BASED") { + if (remote_backend == "LIBCURL_POLL_BASED" && is_host_memory(buf)) { return std::async(std::launch::async, [&, this]() -> std::size_t { - detail::RemoteHandlePollBased poll_handle(_endpoint->str()); + detail::RemoteHandlePollBased poll_handle(_endpoint->str(), _endpoint.get()); return poll_handle.pread(buf, size, file_offset); }); } From 345821fc8c52c90b00dbdbb034e1ca9deb423f86 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Wed, 14 Jan 2026 01:02:24 -0500 Subject: [PATCH 08/34] Update --- cpp/include/kvikio/detail/remote_handle.hpp | 13 ++++++++- .../detail/remote_handle_poll_based.hpp | 3 +- cpp/src/detail/remote_handle.cpp | 9 +++++- cpp/src/detail/remote_handle_poll_based.cpp | 29 +++++++++++++++++-- cpp/src/remote_handle.cpp | 22 +++----------- 5 files changed, 52 insertions(+), 24 deletions(-) diff --git a/cpp/include/kvikio/detail/remote_handle.hpp b/cpp/include/kvikio/detail/remote_handle.hpp index 2e6613aeef..1fbd509c9d 100644 --- a/cpp/include/kvikio/detail/remote_handle.hpp +++ b/cpp/include/kvikio/detail/remote_handle.hpp @@ -1,11 +1,13 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION. + * SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION. * SPDX-License-Identifier: Apache-2.0 */ #pragma once #include +#include + namespace kvikio::detail { /** * @brief Callback for `CURLOPT_WRITEFUNCTION` that copies received data into a `std::string`. @@ -20,4 +22,13 @@ std::size_t callback_get_string_response(char* data, std::size_t size, std::size_t num_bytes, void* userdata); + +/** + * @brief Set up the range request for libcurl. Use this method when HTTP range request is supposed. + * + * @param curl A curl handle + * @param file_offset File offset + * @param size read size + */ +void setup_range_request_impl(CurlHandle& curl, std::size_t file_offset, std::size_t size); } // namespace kvikio::detail diff --git a/cpp/include/kvikio/detail/remote_handle_poll_based.hpp b/cpp/include/kvikio/detail/remote_handle_poll_based.hpp index 85d508dd73..bcb1a4a228 100644 --- a/cpp/include/kvikio/detail/remote_handle_poll_based.hpp +++ b/cpp/include/kvikio/detail/remote_handle_poll_based.hpp @@ -26,7 +26,8 @@ inline void check_curl_multi(CURLMcode err_code, char const* filename, int line_ } struct TransferContext { - bool overflow_error; + bool overflow_error{}; + bool is_host_mem{}; char* buf{}; std::size_t chunk_size{}; std::size_t bytes_transferred{}; diff --git a/cpp/src/detail/remote_handle.cpp b/cpp/src/detail/remote_handle.cpp index 87d1ed5ab5..85a3ced0f3 100644 --- a/cpp/src/detail/remote_handle.cpp +++ b/cpp/src/detail/remote_handle.cpp @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION. + * SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION. * SPDX-License-Identifier: Apache-2.0 */ @@ -18,4 +18,11 @@ std::size_t callback_get_string_response(char* data, response->append(data, new_data_size); return new_data_size; } + +void setup_range_request_impl(CurlHandle& curl, std::size_t file_offset, std::size_t size) +{ + std::string const byte_range = + std::to_string(file_offset) + "-" + std::to_string(file_offset + size - 1); + curl.setopt(CURLOPT_RANGE, byte_range.c_str()); +} } // namespace kvikio::detail diff --git a/cpp/src/detail/remote_handle_poll_based.cpp b/cpp/src/detail/remote_handle_poll_based.cpp index 5637aeee8f..136957a439 100644 --- a/cpp/src/detail/remote_handle_poll_based.cpp +++ b/cpp/src/detail/remote_handle_poll_based.cpp @@ -3,16 +3,17 @@ * SPDX-License-Identifier: Apache-2.0 */ +#include #include #include +#include #include #include #include -#include "kvikio/bounce_buffer.hpp" namespace kvikio::detail { namespace { -std::size_t callback_host_memory(char* buffer, std::size_t size, std::size_t nmemb, void* userdata) +std::size_t callback_memory(char* buffer, std::size_t size, std::size_t nmemb, void* userdata) { KVIKIO_NVTX_FUNC_RANGE(); auto* ctx = reinterpret_cast(userdata); @@ -23,10 +24,32 @@ std::size_t callback_host_memory(char* buffer, std::size_t size, std::size_t nme return CURL_WRITEFUNC_ERROR; } KVIKIO_NVTX_FUNC_RANGE(nbytes); - std::memcpy(ctx->buf + ctx->bytes_transferred, buffer, nbytes); + void* dst = ctx->is_host_mem ? ctx->buf : ctx->bounce_buffer.get(); + std::memcpy(static_cast(dst) + ctx->bytes_transferred, buffer, nbytes); ctx->bytes_transferred += nbytes; return nbytes; } + +void reconfig_easy_handle(CurlHandle& curl_easy_handle, + TransferContext* ctx, + void* buf, + bool is_host_mem, + std::size_t chunk_idx, + std::size_t chunk_size, + std::size_t size, + std::size_t offset) +{ + auto const local_offset = chunk_idx * chunk_size; + auto const actual_chunk_size = std::min(chunk_size, size - local_offset); + + ctx->overflow_error = false; + ctx->is_host_mem = is_host_mem; + ctx->buf = static_cast(buf) + local_offset; + ctx->chunk_size = actual_chunk_size; + ctx->bytes_transferred = 0; + + detail::setup_range_request_impl(curl_easy_handle, offset + local_offset, actual_chunk_size); +}; } // namespace TransferContext::TransferContext() : bounce_buffer{CudaPinnedBounceBufferPool::instance().get()} {} diff --git a/cpp/src/remote_handle.cpp b/cpp/src/remote_handle.cpp index 7c67229549..07f26d0d1d 100644 --- a/cpp/src/remote_handle.cpp +++ b/cpp/src/remote_handle.cpp @@ -158,20 +158,6 @@ std::size_t get_file_size_using_head_impl(RemoteEndpoint& endpoint, std::string return static_cast(cl); } -/** - * @brief Set up the range request for libcurl. Use this method when HTTP range request is supposed. - * - * @param curl A curl handle - * @param file_offset File offset - * @param size read size - */ -void setup_range_request_impl(CurlHandle& curl, std::size_t file_offset, std::size_t size) -{ - std::string const byte_range = - std::to_string(file_offset) + "-" + std::to_string(file_offset + size - 1); - curl.setopt(CURLOPT_RANGE, byte_range.c_str()); -} - /** * @brief Whether the given URL is compatible with the S3 endpoint (including the credential-based * access and presigned URL) which uses HTTP/HTTPS. @@ -254,7 +240,7 @@ std::size_t HttpEndpoint::get_file_size() void HttpEndpoint::setup_range_request(CurlHandle& curl, std::size_t file_offset, std::size_t size) { - setup_range_request_impl(curl, file_offset, size); + detail::setup_range_request_impl(curl, file_offset, size); } bool HttpEndpoint::is_url_valid(std::string const& url) noexcept @@ -410,7 +396,7 @@ std::size_t S3Endpoint::get_file_size() void S3Endpoint::setup_range_request(CurlHandle& curl, std::size_t file_offset, std::size_t size) { KVIKIO_NVTX_FUNC_RANGE(); - setup_range_request_impl(curl, file_offset, size); + detail::setup_range_request_impl(curl, file_offset, size); } bool S3Endpoint::is_url_valid(std::string const& url) noexcept @@ -457,7 +443,7 @@ void S3PublicEndpoint::setup_range_request(CurlHandle& curl, std::size_t size) { KVIKIO_NVTX_FUNC_RANGE(); - setup_range_request_impl(curl, file_offset, size); + detail::setup_range_request_impl(curl, file_offset, size); } bool S3PublicEndpoint::is_url_valid(std::string const& url) noexcept @@ -553,7 +539,7 @@ void S3EndpointWithPresignedUrl::setup_range_request(CurlHandle& curl, std::size_t size) { KVIKIO_NVTX_FUNC_RANGE(); - setup_range_request_impl(curl, file_offset, size); + detail::setup_range_request_impl(curl, file_offset, size); } bool S3EndpointWithPresignedUrl::is_url_valid(std::string const& url) noexcept From 8305ee4af9d4b9ccbbcc5c352eda8337a52c0c7c Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Wed, 14 Jan 2026 01:19:41 -0500 Subject: [PATCH 09/34] Update --- cpp/src/detail/remote_handle_poll_based.cpp | 45 ++++++++++++++------- 1 file changed, 31 insertions(+), 14 deletions(-) diff --git a/cpp/src/detail/remote_handle_poll_based.cpp b/cpp/src/detail/remote_handle_poll_based.cpp index 136957a439..14e76cfba5 100644 --- a/cpp/src/detail/remote_handle_poll_based.cpp +++ b/cpp/src/detail/remote_handle_poll_based.cpp @@ -10,6 +10,7 @@ #include #include #include +#include "kvikio/utils.hpp" namespace kvikio::detail { namespace { @@ -31,24 +32,24 @@ std::size_t callback_memory(char* buffer, std::size_t size, std::size_t nmemb, v } void reconfig_easy_handle(CurlHandle& curl_easy_handle, - TransferContext* ctx, + TransferContext& ctx, void* buf, bool is_host_mem, - std::size_t chunk_idx, + std::size_t current_chunk_idx, std::size_t chunk_size, std::size_t size, - std::size_t offset) + std::size_t file_offset) { - auto const local_offset = chunk_idx * chunk_size; + auto const local_offset = current_chunk_idx * chunk_size; auto const actual_chunk_size = std::min(chunk_size, size - local_offset); - ctx->overflow_error = false; - ctx->is_host_mem = is_host_mem; - ctx->buf = static_cast(buf) + local_offset; - ctx->chunk_size = actual_chunk_size; - ctx->bytes_transferred = 0; + ctx.overflow_error = false; + ctx.is_host_mem = is_host_mem; + ctx.buf = static_cast(buf) + local_offset; + ctx.chunk_size = actual_chunk_size; + ctx.bytes_transferred = 0; - detail::setup_range_request_impl(curl_easy_handle, offset + local_offset, actual_chunk_size); + detail::setup_range_request_impl(curl_easy_handle, file_offset + local_offset, actual_chunk_size); }; } // namespace @@ -85,10 +86,26 @@ std::size_t RemoteHandlePollBased::pread(void* buf, std::size_t size, std::size_ { if (size == 0) return 0; - std::size_t const chunk_size = defaults::task_size(); - std::size_t num_chunks = (size + chunk_size - 1) / chunk_size; - std::size_t actual_num_conns = std::min(_num_conns, num_chunks); + bool const is_host_mem = is_host_memory(buf); - return 123; + auto const chunk_size = defaults::task_size(); + auto const num_chunks = (size + chunk_size - 1) / chunk_size; + auto const actual_num_conns = std::min(_num_conns, num_chunks); + + std::size_t num_byte_transferred{0}; + std::size_t current_chunk_idx{0}; + for (std::size_t i = 0; i < actual_num_conns; ++i) { + reconfig_easy_handle(*_curl_easy_handles[i], + _transfer_ctxs[i], + buf, + is_host_mem, + current_chunk_idx++, + chunk_size, + size, + file_offset); + KVIKIO_CHECK_CURL_MULTI(curl_multi_add_handle(_multi, _curl_easy_handles[i].get())); + } + + return num_byte_transferred; } } // namespace kvikio::detail From cf1341d853f93c6fef704b0a5e797d4d0b92b34b Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Wed, 14 Jan 2026 09:43:37 -0500 Subject: [PATCH 10/34] Update --- .../detail/remote_handle_poll_based.hpp | 17 +++- cpp/src/detail/remote_handle_poll_based.cpp | 84 +++++++++++++++---- cpp/src/remote_handle.cpp | 2 +- 3 files changed, 81 insertions(+), 22 deletions(-) diff --git a/cpp/include/kvikio/detail/remote_handle_poll_based.hpp b/cpp/include/kvikio/detail/remote_handle_poll_based.hpp index bcb1a4a228..c1f85984fe 100644 --- a/cpp/include/kvikio/detail/remote_handle_poll_based.hpp +++ b/cpp/include/kvikio/detail/remote_handle_poll_based.hpp @@ -11,11 +11,23 @@ #include #include +#define KVIKIO_CHECK_CURL_EASY(err_code) \ + kvikio::detail::check_curl_easy(err_code, __FILE__, __LINE__) + #define KVIKIO_CHECK_CURL_MULTI(err_code) \ kvikio::detail::check_curl_multi(err_code, __FILE__, __LINE__) namespace kvikio::detail { +inline void check_curl_easy(CURLcode err_code, char const* filename, int line_number) +{ + if (err_code == CURLcode::CURLE_OK) { return; } + std::stringstream ss; + ss << "libcurl error: " << curl_easy_strerror(err_code) << " at: " << filename << ":" + << line_number << "\n"; + throw std::runtime_error(ss.str()); +} + inline void check_curl_multi(CURLMcode err_code, char const* filename, int line_number) { if (err_code == CURLMcode::CURLM_OK) { return; } @@ -38,9 +50,7 @@ struct TransferContext { class RemoteHandlePollBased { public: - RemoteHandlePollBased(std::string const& url, - RemoteEndpoint* endpoint, - std::size_t num_conns = 8); + RemoteHandlePollBased(RemoteEndpoint* endpoint, std::size_t num_conns = 8); ~RemoteHandlePollBased(); @@ -48,7 +58,6 @@ class RemoteHandlePollBased { private: CURLM* _multi; - std::string _url; std::size_t _num_conns; std::vector> _curl_easy_handles; std::vector _transfer_ctxs; diff --git a/cpp/src/detail/remote_handle_poll_based.cpp b/cpp/src/detail/remote_handle_poll_based.cpp index 14e76cfba5..636b74a369 100644 --- a/cpp/src/detail/remote_handle_poll_based.cpp +++ b/cpp/src/detail/remote_handle_poll_based.cpp @@ -14,7 +14,7 @@ namespace kvikio::detail { namespace { -std::size_t callback_memory(char* buffer, std::size_t size, std::size_t nmemb, void* userdata) +std::size_t write_callback(char* buffer, std::size_t size, std::size_t nmemb, void* userdata) { KVIKIO_NVTX_FUNC_RANGE(); auto* ctx = reinterpret_cast(userdata); @@ -31,8 +31,8 @@ std::size_t callback_memory(char* buffer, std::size_t size, std::size_t nmemb, v return nbytes; } -void reconfig_easy_handle(CurlHandle& curl_easy_handle, - TransferContext& ctx, +void reconfig_easy_handle(CURL* curl_easy_handle, + TransferContext* ctx, void* buf, bool is_host_mem, std::size_t current_chunk_idx, @@ -43,39 +43,50 @@ void reconfig_easy_handle(CurlHandle& curl_easy_handle, auto const local_offset = current_chunk_idx * chunk_size; auto const actual_chunk_size = std::min(chunk_size, size - local_offset); - ctx.overflow_error = false; - ctx.is_host_mem = is_host_mem; - ctx.buf = static_cast(buf) + local_offset; - ctx.chunk_size = actual_chunk_size; - ctx.bytes_transferred = 0; + ctx->overflow_error = false; + ctx->is_host_mem = is_host_mem; + ctx->buf = static_cast(buf) + local_offset; + ctx->chunk_size = actual_chunk_size; + ctx->bytes_transferred = 0; - detail::setup_range_request_impl(curl_easy_handle, file_offset + local_offset, actual_chunk_size); + std::size_t const remote_start = file_offset + local_offset; + std::size_t const remote_end = remote_start + actual_chunk_size - 1; + std::string const byte_range = std::to_string(remote_start) + "-" + std::to_string(remote_end); + KVIKIO_CHECK_CURL_EASY(curl_easy_setopt(curl_easy_handle, CURLOPT_RANGE, byte_range.c_str())); }; } // namespace TransferContext::TransferContext() : bounce_buffer{CudaPinnedBounceBufferPool::instance().get()} {} -RemoteHandlePollBased::RemoteHandlePollBased(std::string const& url, - RemoteEndpoint* endpoint, - std::size_t num_conns) - : _url{url}, _endpoint{endpoint}, _num_conns{num_conns}, _transfer_ctxs(_num_conns) +RemoteHandlePollBased::RemoteHandlePollBased(RemoteEndpoint* endpoint, std::size_t num_conns) + : _endpoint{endpoint}, _num_conns{num_conns}, _transfer_ctxs(_num_conns) { _multi = curl_multi_init(); KVIKIO_EXPECT(_multi != nullptr, "Failed to initialize libcurl multi API"); + _curl_easy_handles.reserve(_num_conns); for (std::size_t i = 0; i < _num_conns; ++i) { _curl_easy_handles.emplace_back( std::make_unique(kvikio::LibCurl::instance().get_handle(), kvikio::detail::fix_conda_file_path_hack(__FILE__), KVIKIO_STRINGIFY(__LINE__))); + + // Initialize easy handle, associate it with transfer context _endpoint->setopt(*_curl_easy_handles.back()); - _transfer_ctxs.emplace_back(); + _curl_easy_handles.back()->setopt(CURLOPT_WRITEFUNCTION, write_callback); + _curl_easy_handles.back()->setopt(CURLOPT_WRITEDATA, &_transfer_ctxs[i]); + _curl_easy_handles.back()->setopt(CURLOPT_PRIVATE, &_transfer_ctxs[i]); } } RemoteHandlePollBased::~RemoteHandlePollBased() { try { + // Remove any lingering handles before cleanup + for (auto& handle : _curl_easy_handles) { + // Ignore errors + KVIKIO_CHECK_CURL_MULTI(curl_multi_remove_handle(_multi, handle->handle())); + } KVIKIO_CHECK_CURL_MULTI(curl_multi_cleanup(_multi)); } catch (std::exception const& e) { KVIKIO_LOG_ERROR(e.what()); @@ -92,20 +103,59 @@ std::size_t RemoteHandlePollBased::pread(void* buf, std::size_t size, std::size_ auto const num_chunks = (size + chunk_size - 1) / chunk_size; auto const actual_num_conns = std::min(_num_conns, num_chunks); + // Prepare for the run std::size_t num_byte_transferred{0}; std::size_t current_chunk_idx{0}; for (std::size_t i = 0; i < actual_num_conns; ++i) { - reconfig_easy_handle(*_curl_easy_handles[i], - _transfer_ctxs[i], + reconfig_easy_handle(_curl_easy_handles[i]->handle(), + &_transfer_ctxs[i], buf, is_host_mem, current_chunk_idx++, chunk_size, size, file_offset); - KVIKIO_CHECK_CURL_MULTI(curl_multi_add_handle(_multi, _curl_easy_handles[i].get())); + KVIKIO_CHECK_CURL_MULTI(curl_multi_add_handle(_multi, _curl_easy_handles[i]->handle())); } + // Start the run + int still_running{0}; + do { + KVIKIO_CHECK_CURL_MULTI(curl_multi_perform(_multi, &still_running)); + + CURLMsg* msg; + int msgs_left; + + while ((msg = curl_multi_info_read(_multi, &msgs_left))) { + if (msg->msg != CURLMSG_DONE) continue; + + TransferContext* ctx{nullptr}; + KVIKIO_CHECK_CURL_EASY(curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, &ctx)); + + KVIKIO_EXPECT(msg->data.result == CURLE_OK, + "Chunked transfer failed in poll-based multi API"); + num_byte_transferred += ctx->bytes_transferred; + KVIKIO_CHECK_CURL_MULTI(curl_multi_remove_handle(_multi, msg->easy_handle)); + + if (current_chunk_idx < num_chunks) { + reconfig_easy_handle(msg->easy_handle, + ctx, + buf, + is_host_mem, + current_chunk_idx++, + chunk_size, + size, + file_offset); + KVIKIO_CHECK_CURL_MULTI(curl_multi_add_handle(_multi, msg->easy_handle)); + } + } + + if (still_running > 0) { + KVIKIO_CHECK_CURL_MULTI(curl_multi_poll(_multi, nullptr, 0, 1000, nullptr)); + } + + } while (still_running > 0); + return num_byte_transferred; } } // namespace kvikio::detail diff --git a/cpp/src/remote_handle.cpp b/cpp/src/remote_handle.cpp index 07f26d0d1d..0dd5da3af9 100644 --- a/cpp/src/remote_handle.cpp +++ b/cpp/src/remote_handle.cpp @@ -807,7 +807,7 @@ std::future RemoteHandle::pread(void* buf, kvikio::getenv_or("KVIKIO_REMOTE_BACKEND", "LIBCURL_EASY"); if (remote_backend == "LIBCURL_POLL_BASED" && is_host_memory(buf)) { return std::async(std::launch::async, [&, this]() -> std::size_t { - detail::RemoteHandlePollBased poll_handle(_endpoint->str(), _endpoint.get()); + detail::RemoteHandlePollBased poll_handle(_endpoint.get()); return poll_handle.pread(buf, size, file_offset); }); } From 68d828496bc6aad83e1a8897fca45a1ccfad067d Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Wed, 14 Jan 2026 10:27:32 -0500 Subject: [PATCH 11/34] Update --- cpp/include/kvikio/defaults.hpp | 16 ++++- .../detail/remote_handle_poll_based.hpp | 5 +- cpp/include/kvikio/remote_backend_type.hpp | 16 +++++ cpp/src/defaults.cpp | 58 +++++++++++++++---- cpp/src/detail/remote_handle_poll_based.cpp | 18 +++--- cpp/src/remote_handle.cpp | 5 +- 6 files changed, 92 insertions(+), 26 deletions(-) create mode 100644 cpp/include/kvikio/remote_backend_type.hpp diff --git a/cpp/include/kvikio/defaults.hpp b/cpp/include/kvikio/defaults.hpp index 190909c2cc..44594b0294 100644 --- a/cpp/include/kvikio/defaults.hpp +++ b/cpp/include/kvikio/defaults.hpp @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2022-2025, NVIDIA CORPORATION. + * SPDX-FileCopyrightText: Copyright (c) 2022-2026, NVIDIA CORPORATION. * SPDX-License-Identifier: Apache-2.0 */ @@ -16,6 +16,7 @@ #include #include #include +#include #include #include @@ -49,6 +50,9 @@ bool getenv_or(std::string_view env_var_name, bool default_val); template <> CompatMode getenv_or(std::string_view env_var_name, CompatMode default_val); +template <> +RemoteBackendType getenv_or(std::string_view env_var_name, RemoteBackendType default_val); + template <> std::vector getenv_or(std::string_view env_var_name, std::vector default_val); @@ -122,6 +126,8 @@ class defaults { bool _auto_direct_io_read; bool _auto_direct_io_write; bool _thread_pool_per_block_device; + RemoteBackendType _remote_backend; + std::size_t _remote_max_connections; static unsigned int get_num_threads_from_env(); @@ -417,6 +423,14 @@ class defaults { * thread pool for all I/O operations. */ static void set_thread_pool_per_block_device(bool flag); + + [[nodiscard]] static RemoteBackendType remote_backend(); + + static void set_remote_backend(RemoteBackendType remote_backend); + + [[nodiscard]] static std::size_t remote_max_connections(); + + static void set_remote_max_connections(std::size_t remote_max_connections); }; } // namespace kvikio diff --git a/cpp/include/kvikio/detail/remote_handle_poll_based.hpp b/cpp/include/kvikio/detail/remote_handle_poll_based.hpp index c1f85984fe..632719068d 100644 --- a/cpp/include/kvikio/detail/remote_handle_poll_based.hpp +++ b/cpp/include/kvikio/detail/remote_handle_poll_based.hpp @@ -5,7 +5,6 @@ #pragma once #include -#include #include #include @@ -50,7 +49,7 @@ struct TransferContext { class RemoteHandlePollBased { public: - RemoteHandlePollBased(RemoteEndpoint* endpoint, std::size_t num_conns = 8); + RemoteHandlePollBased(RemoteEndpoint* endpoint, std::size_t max_connections = 8); ~RemoteHandlePollBased(); @@ -58,7 +57,7 @@ class RemoteHandlePollBased { private: CURLM* _multi; - std::size_t _num_conns; + std::size_t _max_connections; std::vector> _curl_easy_handles; std::vector _transfer_ctxs; RemoteEndpoint* _endpoint; diff --git a/cpp/include/kvikio/remote_backend_type.hpp b/cpp/include/kvikio/remote_backend_type.hpp new file mode 100644 index 0000000000..0d50174455 --- /dev/null +++ b/cpp/include/kvikio/remote_backend_type.hpp @@ -0,0 +1,16 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. + * SPDX-License-Identifier: Apache-2.0 + */ +#pragma once + +#include + +namespace kvikio { + +enum class RemoteBackendType : uint8_t { + LIBCURL_EASY, + LIBCURL_MULTI_POLL, +}; + +} // namespace kvikio diff --git a/cpp/src/defaults.cpp b/cpp/src/defaults.cpp index 841e7314d3..f1c9a59b33 100644 --- a/cpp/src/defaults.cpp +++ b/cpp/src/defaults.cpp @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION. + * SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION. * SPDX-License-Identifier: Apache-2.0 */ @@ -18,6 +18,7 @@ #include #include #include +#include "kvikio/remote_handle.hpp" namespace kvikio { template <> @@ -63,6 +64,26 @@ CompatMode getenv_or(std::string_view env_var_name, CompatMode default_val) return detail::parse_compat_mode_str(env_val); } +template <> +RemoteBackendType getenv_or(std::string_view env_var_name, RemoteBackendType default_val) +{ + KVIKIO_NVTX_FUNC_RANGE(); + auto* env_val = std::getenv(env_var_name.data()); + if (env_val == nullptr) { return default_val; } + std::string str{env_val}; + std::transform( + str.begin(), str.end(), str.begin(), [](unsigned char c) { return std::tolower(c); }); + std::stringstream trimmer; + trimmer << str; + str.clear(); + trimmer >> str; + if (str == "libcurl_easy") { return RemoteBackendType::LIBCURL_EASY; } + if (str == "libcurl_multi_poll") { return RemoteBackendType::LIBCURL_MULTI_POLL; } + KVIKIO_FAIL("unknown config value " + std::string{env_var_name} + "=" + std::string{env_val}, + std::invalid_argument); + return {}; +} + template <> std::vector getenv_or(std::string_view env_var_name, std::vector default_val) { @@ -131,20 +152,23 @@ defaults::defaults() } // Determine the default value of `http_status_codes` - { - _http_status_codes = - getenv_or("KVIKIO_HTTP_STATUS_CODES", std::vector{429, 500, 502, 503, 504}); - } + _http_status_codes = + getenv_or("KVIKIO_HTTP_STATUS_CODES", std::vector{429, 500, 502, 503, 504}); // Determine the default value of `auto_direct_io_read` and `auto_direct_io_write` - { - _auto_direct_io_read = getenv_or("KVIKIO_AUTO_DIRECT_IO_READ", false); - _auto_direct_io_write = getenv_or("KVIKIO_AUTO_DIRECT_IO_WRITE", true); - } + _auto_direct_io_read = getenv_or("KVIKIO_AUTO_DIRECT_IO_READ", false); + _auto_direct_io_write = getenv_or("KVIKIO_AUTO_DIRECT_IO_WRITE", true); // Determine the default value of `thread_pool_per_block_device` + _thread_pool_per_block_device = getenv_or("KVIKIO_THREAD_POOL_PER_BLOCK_DEVICE", false); + + _remote_backend = getenv_or("KVIKIO_REMOTE_BACKEND", RemoteBackendType::LIBCURL_EASY); + { - _thread_pool_per_block_device = getenv_or("KVIKIO_THREAD_POOL_PER_BLOCK_DEVICE", false); + auto const env = getenv_or("KVIKIO_REMOTE_MAX_CONNECTIONS", 8); + KVIKIO_EXPECT( + env > 0, "KVIKIO_REMOTE_MAX_CONNECTIONS has to be a positive integer", std::invalid_argument); + _remote_max_connections = env; } } @@ -250,4 +274,18 @@ void defaults::set_thread_pool_per_block_device(bool flag) { instance()->_thread_pool_per_block_device = flag; } + +RemoteBackendType defaults::remote_backend() { return instance()->_remote_backend; } + +void defaults::set_remote_backend(RemoteBackendType remote_backend) +{ + instance()->_remote_backend = remote_backend; +} + +std::size_t defaults::remote_max_connections() { return instance()->_remote_max_connections; } + +void defaults::set_remote_max_connections(std::size_t remote_max_connections) +{ + instance()->_remote_max_connections = remote_max_connections; +} } // namespace kvikio diff --git a/cpp/src/detail/remote_handle_poll_based.cpp b/cpp/src/detail/remote_handle_poll_based.cpp index 636b74a369..6f117bb2e4 100644 --- a/cpp/src/detail/remote_handle_poll_based.cpp +++ b/cpp/src/detail/remote_handle_poll_based.cpp @@ -10,7 +10,7 @@ #include #include #include -#include "kvikio/utils.hpp" +#include namespace kvikio::detail { namespace { @@ -58,14 +58,14 @@ void reconfig_easy_handle(CURL* curl_easy_handle, TransferContext::TransferContext() : bounce_buffer{CudaPinnedBounceBufferPool::instance().get()} {} -RemoteHandlePollBased::RemoteHandlePollBased(RemoteEndpoint* endpoint, std::size_t num_conns) - : _endpoint{endpoint}, _num_conns{num_conns}, _transfer_ctxs(_num_conns) +RemoteHandlePollBased::RemoteHandlePollBased(RemoteEndpoint* endpoint, std::size_t max_connections) + : _endpoint{endpoint}, _max_connections{max_connections}, _transfer_ctxs(_max_connections) { _multi = curl_multi_init(); KVIKIO_EXPECT(_multi != nullptr, "Failed to initialize libcurl multi API"); - _curl_easy_handles.reserve(_num_conns); - for (std::size_t i = 0; i < _num_conns; ++i) { + _curl_easy_handles.reserve(_max_connections); + for (std::size_t i = 0; i < _max_connections; ++i) { _curl_easy_handles.emplace_back( std::make_unique(kvikio::LibCurl::instance().get_handle(), kvikio::detail::fix_conda_file_path_hack(__FILE__), @@ -99,14 +99,14 @@ std::size_t RemoteHandlePollBased::pread(void* buf, std::size_t size, std::size_ bool const is_host_mem = is_host_memory(buf); - auto const chunk_size = defaults::task_size(); - auto const num_chunks = (size + chunk_size - 1) / chunk_size; - auto const actual_num_conns = std::min(_num_conns, num_chunks); + auto const chunk_size = defaults::task_size(); + auto const num_chunks = (size + chunk_size - 1) / chunk_size; + auto const actual_max_connections = std::min(_max_connections, num_chunks); // Prepare for the run std::size_t num_byte_transferred{0}; std::size_t current_chunk_idx{0}; - for (std::size_t i = 0; i < actual_num_conns; ++i) { + for (std::size_t i = 0; i < actual_max_connections; ++i) { reconfig_easy_handle(_curl_easy_handles[i]->handle(), &_transfer_ctxs[i], buf, diff --git a/cpp/src/remote_handle.cpp b/cpp/src/remote_handle.cpp index 0dd5da3af9..e5f523a2fa 100644 --- a/cpp/src/remote_handle.cpp +++ b/cpp/src/remote_handle.cpp @@ -28,6 +28,7 @@ #include #include #include +#include "kvikio/remote_backend_type.hpp" namespace kvikio { @@ -803,9 +804,7 @@ std::future RemoteHandle::pread(void* buf, auto& [nvtx_color, call_idx] = detail::get_next_color_and_call_idx(); KVIKIO_NVTX_FUNC_RANGE(size); - auto const remote_backend = - kvikio::getenv_or("KVIKIO_REMOTE_BACKEND", "LIBCURL_EASY"); - if (remote_backend == "LIBCURL_POLL_BASED" && is_host_memory(buf)) { + if (defaults::remote_backend() == RemoteBackendType::LIBCURL_MULTI_POLL && is_host_memory(buf)) { return std::async(std::launch::async, [&, this]() -> std::size_t { detail::RemoteHandlePollBased poll_handle(_endpoint.get()); return poll_handle.pread(buf, size, file_offset); From ca8bbc894ce3bcd1ad5e068f66e66972123f669c Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Wed, 14 Jan 2026 11:08:29 -0500 Subject: [PATCH 12/34] Update --- .../kvikio/detail/remote_handle_poll_based.hpp | 4 +--- cpp/src/detail/remote_handle_poll_based.cpp | 10 ++++++---- cpp/src/remote_handle.cpp | 3 ++- cpp/tests/test_remote_handle.cpp | 14 +++++++++++--- 4 files changed, 20 insertions(+), 11 deletions(-) diff --git a/cpp/include/kvikio/detail/remote_handle_poll_based.hpp b/cpp/include/kvikio/detail/remote_handle_poll_based.hpp index 632719068d..cd351042cf 100644 --- a/cpp/include/kvikio/detail/remote_handle_poll_based.hpp +++ b/cpp/include/kvikio/detail/remote_handle_poll_based.hpp @@ -42,9 +42,7 @@ struct TransferContext { char* buf{}; std::size_t chunk_size{}; std::size_t bytes_transferred{}; - CudaPinnedBounceBufferPool::Buffer bounce_buffer; - - TransferContext(); + std::optional bounce_buffer; }; class RemoteHandlePollBased { diff --git a/cpp/src/detail/remote_handle_poll_based.cpp b/cpp/src/detail/remote_handle_poll_based.cpp index 6f117bb2e4..7a1c52a715 100644 --- a/cpp/src/detail/remote_handle_poll_based.cpp +++ b/cpp/src/detail/remote_handle_poll_based.cpp @@ -2,7 +2,7 @@ * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. * SPDX-License-Identifier: Apache-2.0 */ - +#include #include #include #include @@ -25,7 +25,7 @@ std::size_t write_callback(char* buffer, std::size_t size, std::size_t nmemb, vo return CURL_WRITEFUNC_ERROR; } KVIKIO_NVTX_FUNC_RANGE(nbytes); - void* dst = ctx->is_host_mem ? ctx->buf : ctx->bounce_buffer.get(); + void* dst = ctx->is_host_mem ? ctx->buf : ctx->bounce_buffer.value().get(); std::memcpy(static_cast(dst) + ctx->bytes_transferred, buffer, nbytes); ctx->bytes_transferred += nbytes; return nbytes; @@ -49,6 +49,10 @@ void reconfig_easy_handle(CURL* curl_easy_handle, ctx->chunk_size = actual_chunk_size; ctx->bytes_transferred = 0; + if (!is_host_mem && !ctx->bounce_buffer.has_value()) { + ctx->bounce_buffer.emplace(CudaPinnedBounceBufferPool::instance().get()); + } + std::size_t const remote_start = file_offset + local_offset; std::size_t const remote_end = remote_start + actual_chunk_size - 1; std::string const byte_range = std::to_string(remote_start) + "-" + std::to_string(remote_end); @@ -56,8 +60,6 @@ void reconfig_easy_handle(CURL* curl_easy_handle, }; } // namespace -TransferContext::TransferContext() : bounce_buffer{CudaPinnedBounceBufferPool::instance().get()} {} - RemoteHandlePollBased::RemoteHandlePollBased(RemoteEndpoint* endpoint, std::size_t max_connections) : _endpoint{endpoint}, _max_connections{max_connections}, _transfer_ctxs(_max_connections) { diff --git a/cpp/src/remote_handle.cpp b/cpp/src/remote_handle.cpp index e5f523a2fa..3d06609af6 100644 --- a/cpp/src/remote_handle.cpp +++ b/cpp/src/remote_handle.cpp @@ -805,7 +805,8 @@ std::future RemoteHandle::pread(void* buf, KVIKIO_NVTX_FUNC_RANGE(size); if (defaults::remote_backend() == RemoteBackendType::LIBCURL_MULTI_POLL && is_host_memory(buf)) { - return std::async(std::launch::async, [&, this]() -> std::size_t { + return thread_pool->submit_task([=, this] { + KVIKIO_NVTX_SCOPED_RANGE("task (multi poll)", size, nvtx_color); detail::RemoteHandlePollBased poll_handle(_endpoint.get()); return poll_handle.pread(buf, size, file_offset); }); diff --git a/cpp/tests/test_remote_handle.cpp b/cpp/tests/test_remote_handle.cpp index 908854b6b4..b0238a4813 100644 --- a/cpp/tests/test_remote_handle.cpp +++ b/cpp/tests/test_remote_handle.cpp @@ -290,12 +290,20 @@ TEST_F(RemoteHandleTest, poll_based) kvikio::RemoteHandle remote_handle(std::move(endpoint)); - kvikio::test::DevBuffer dev_buf(remote_handle.nbytes() / sizeof(double)); - auto fut = remote_handle.pread(dev_buf.ptr, remote_handle.nbytes()); + // kvikio::test::DevBuffer dev_buf(remote_handle.nbytes() / sizeof(double)); + // auto fut = remote_handle.pread(dev_buf.ptr, remote_handle.nbytes()); + // auto num_bytes_read = fut.get(); + + // EXPECT_EQ(num_bytes_read, remote_handle.nbytes()); + // auto host_buf = dev_buf.to_vector(); + // std::cout << std::fixed << "d[0]: " << host_buf.front() << ", d[n-1]: " << host_buf.back() + // << "\n"; + + std::vector host_buf(remote_handle.nbytes() / sizeof(double)); + auto fut = remote_handle.pread(host_buf.data(), remote_handle.nbytes()); auto num_bytes_read = fut.get(); EXPECT_EQ(num_bytes_read, remote_handle.nbytes()); - auto host_buf = dev_buf.to_vector(); std::cout << std::fixed << "d[0]: " << host_buf.front() << ", d[n-1]: " << host_buf.back() << "\n"; } From 87c0b9f5cef00f59bc2ca7a431ce108aa0cf5108 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Wed, 14 Jan 2026 11:12:32 -0500 Subject: [PATCH 13/34] Update --- cpp/include/kvikio/detail/remote_handle_poll_based.hpp | 2 +- cpp/src/remote_handle.cpp | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/cpp/include/kvikio/detail/remote_handle_poll_based.hpp b/cpp/include/kvikio/detail/remote_handle_poll_based.hpp index cd351042cf..7376e4133a 100644 --- a/cpp/include/kvikio/detail/remote_handle_poll_based.hpp +++ b/cpp/include/kvikio/detail/remote_handle_poll_based.hpp @@ -47,7 +47,7 @@ struct TransferContext { class RemoteHandlePollBased { public: - RemoteHandlePollBased(RemoteEndpoint* endpoint, std::size_t max_connections = 8); + RemoteHandlePollBased(RemoteEndpoint* endpoint, std::size_t max_connections); ~RemoteHandlePollBased(); diff --git a/cpp/src/remote_handle.cpp b/cpp/src/remote_handle.cpp index 3d06609af6..45dd25bc2b 100644 --- a/cpp/src/remote_handle.cpp +++ b/cpp/src/remote_handle.cpp @@ -807,7 +807,8 @@ std::future RemoteHandle::pread(void* buf, if (defaults::remote_backend() == RemoteBackendType::LIBCURL_MULTI_POLL && is_host_memory(buf)) { return thread_pool->submit_task([=, this] { KVIKIO_NVTX_SCOPED_RANGE("task (multi poll)", size, nvtx_color); - detail::RemoteHandlePollBased poll_handle(_endpoint.get()); + detail::RemoteHandlePollBased poll_handle(_endpoint.get(), + defaults::remote_max_connections()); return poll_handle.pread(buf, size, file_offset); }); } From 7e7f6c8a3cd12692cb8205a9ffbd37f1700524c5 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Wed, 14 Jan 2026 14:42:44 -0500 Subject: [PATCH 14/34] Update --- cpp/src/detail/remote_handle_poll_based.cpp | 28 ++++++++++++++-- cpp/tests/test_remote_handle.cpp | 36 ++++++++++++--------- 2 files changed, 45 insertions(+), 19 deletions(-) diff --git a/cpp/src/detail/remote_handle_poll_based.cpp b/cpp/src/detail/remote_handle_poll_based.cpp index 7a1c52a715..95a4ab00f5 100644 --- a/cpp/src/detail/remote_handle_poll_based.cpp +++ b/cpp/src/detail/remote_handle_poll_based.cpp @@ -2,13 +2,15 @@ * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. * SPDX-License-Identifier: Apache-2.0 */ -#include + #include #include #include +#include #include #include #include +#include #include #include @@ -63,6 +65,9 @@ void reconfig_easy_handle(CURL* curl_easy_handle, RemoteHandlePollBased::RemoteHandlePollBased(RemoteEndpoint* endpoint, std::size_t max_connections) : _endpoint{endpoint}, _max_connections{max_connections}, _transfer_ctxs(_max_connections) { + KVIKIO_EXPECT(defaults::task_size() <= defaults::bounce_buffer_size(), + "bounce buffer size cannot be less than task size."); + _multi = curl_multi_init(); KVIKIO_EXPECT(_multi != nullptr, "Failed to initialize libcurl multi API"); @@ -99,12 +104,18 @@ std::size_t RemoteHandlePollBased::pread(void* buf, std::size_t size, std::size_ { if (size == 0) return 0; - bool const is_host_mem = is_host_memory(buf); - + bool const is_host_mem = is_host_memory(buf); auto const chunk_size = defaults::task_size(); auto const num_chunks = (size + chunk_size - 1) / chunk_size; auto const actual_max_connections = std::min(_max_connections, num_chunks); + std::optional cuda_ctx; + CUstream stream{}; + if (!is_host_mem) { + cuda_ctx.emplace(get_context_from_pointer(buf)); + stream = detail::StreamsByThread::get(); + } + // Prepare for the run std::size_t num_byte_transferred{0}; std::size_t current_chunk_idx{0}; @@ -128,14 +139,25 @@ std::size_t RemoteHandlePollBased::pread(void* buf, std::size_t size, std::size_ CURLMsg* msg; int msgs_left; + // Handle the completed messages while ((msg = curl_multi_info_read(_multi, &msgs_left))) { if (msg->msg != CURLMSG_DONE) continue; TransferContext* ctx{nullptr}; KVIKIO_CHECK_CURL_EASY(curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, &ctx)); + KVIKIO_EXPECT(ctx != nullptr, "Failed to retrieve transfer context"); KVIKIO_EXPECT(msg->data.result == CURLE_OK, "Chunked transfer failed in poll-based multi API"); + + if (!is_host_mem) { + CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyHtoDAsync(convert_void2deviceptr(ctx->buf), + ctx->bounce_buffer.value().get(), + ctx->bytes_transferred, + stream)); + CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream)); + } + num_byte_transferred += ctx->bytes_transferred; KVIKIO_CHECK_CURL_MULTI(curl_multi_remove_handle(_multi, msg->easy_handle)); diff --git a/cpp/tests/test_remote_handle.cpp b/cpp/tests/test_remote_handle.cpp index b0238a4813..f7586c04ec 100644 --- a/cpp/tests/test_remote_handle.cpp +++ b/cpp/tests/test_remote_handle.cpp @@ -290,20 +290,24 @@ TEST_F(RemoteHandleTest, poll_based) kvikio::RemoteHandle remote_handle(std::move(endpoint)); - // kvikio::test::DevBuffer dev_buf(remote_handle.nbytes() / sizeof(double)); - // auto fut = remote_handle.pread(dev_buf.ptr, remote_handle.nbytes()); - // auto num_bytes_read = fut.get(); - - // EXPECT_EQ(num_bytes_read, remote_handle.nbytes()); - // auto host_buf = dev_buf.to_vector(); - // std::cout << std::fixed << "d[0]: " << host_buf.front() << ", d[n-1]: " << host_buf.back() - // << "\n"; - - std::vector host_buf(remote_handle.nbytes() / sizeof(double)); - auto fut = remote_handle.pread(host_buf.data(), remote_handle.nbytes()); - auto num_bytes_read = fut.get(); - - EXPECT_EQ(num_bytes_read, remote_handle.nbytes()); - std::cout << std::fixed << "d[0]: " << host_buf.front() << ", d[n-1]: " << host_buf.back() - << "\n"; + { + kvikio::test::DevBuffer dev_buf(remote_handle.nbytes() / sizeof(double)); + auto fut = remote_handle.pread(dev_buf.ptr, remote_handle.nbytes()); + auto num_bytes_read = fut.get(); + + EXPECT_EQ(num_bytes_read, remote_handle.nbytes()); + auto host_buf = dev_buf.to_vector(); + std::cout << std::fixed << "d[0]: " << host_buf.front() << ", d[n-1]: " << host_buf.back() + << "\n"; + } + + { + std::vector host_buf(remote_handle.nbytes() / sizeof(double)); + auto fut = remote_handle.pread(host_buf.data(), remote_handle.nbytes()); + auto num_bytes_read = fut.get(); + + EXPECT_EQ(num_bytes_read, remote_handle.nbytes()); + std::cout << std::fixed << "d[0]: " << host_buf.front() << ", d[n-1]: " << host_buf.back() + << "\n"; + } } From d3ba6f4a74155080bad5db0a986511625503853b Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Wed, 14 Jan 2026 16:14:15 -0500 Subject: [PATCH 15/34] Update --- cpp/include/kvikio/remote_handle.hpp | 16 +++++++++++++--- cpp/src/remote_handle.cpp | 25 +++++++++++++++++++------ 2 files changed, 32 insertions(+), 9 deletions(-) diff --git a/cpp/include/kvikio/remote_handle.hpp b/cpp/include/kvikio/remote_handle.hpp index 0b0808c45e..cf0ed89f8f 100644 --- a/cpp/include/kvikio/remote_handle.hpp +++ b/cpp/include/kvikio/remote_handle.hpp @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION. + * SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION. * SPDX-License-Identifier: Apache-2.0 */ #pragma once @@ -8,6 +8,7 @@ #include #include #include +#include #include #include @@ -291,6 +292,11 @@ class S3EndpointWithPresignedUrl : public RemoteEndpoint { static bool is_url_valid(std::string const& url) noexcept; }; +// Forward declaration +namespace detail { +class RemoteHandlePollBased; +} + /** * @brief Handle of remote file. */ @@ -298,6 +304,9 @@ class RemoteHandle { private: std::unique_ptr _endpoint; std::size_t _nbytes; + std::unique_ptr _poll_handle; + std::once_flag _poll_handle_init_flag; + mutable std::mutex _poll_mutex; public: /** @@ -400,8 +409,9 @@ class RemoteHandle { RemoteHandle(std::unique_ptr endpoint); // A remote handle is moveable but not copyable. - RemoteHandle(RemoteHandle&& o) = default; - RemoteHandle& operator=(RemoteHandle&& o) = default; + ~RemoteHandle(); + RemoteHandle(RemoteHandle&& o); + RemoteHandle& operator=(RemoteHandle&& o); RemoteHandle(RemoteHandle const&) = delete; RemoteHandle& operator=(RemoteHandle const&) = delete; diff --git a/cpp/src/remote_handle.cpp b/cpp/src/remote_handle.cpp index 45dd25bc2b..87fe14404c 100644 --- a/cpp/src/remote_handle.cpp +++ b/cpp/src/remote_handle.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -25,10 +26,10 @@ #include #include #include +#include #include #include #include -#include "kvikio/remote_backend_type.hpp" namespace kvikio { @@ -671,6 +672,12 @@ RemoteHandle::RemoteHandle(std::unique_ptr endpoint) _endpoint = std::move(endpoint); } +RemoteHandle::~RemoteHandle() = default; + +RemoteHandle::RemoteHandle(RemoteHandle&& o) = default; + +RemoteHandle& RemoteHandle::operator=(RemoteHandle&& o) = default; + RemoteEndpointType RemoteHandle::remote_endpoint_type() const noexcept { return _endpoint->remote_endpoint_type(); @@ -804,12 +811,18 @@ std::future RemoteHandle::pread(void* buf, auto& [nvtx_color, call_idx] = detail::get_next_color_and_call_idx(); KVIKIO_NVTX_FUNC_RANGE(size); - if (defaults::remote_backend() == RemoteBackendType::LIBCURL_MULTI_POLL && is_host_memory(buf)) { + if (defaults::remote_backend() == RemoteBackendType::LIBCURL_MULTI_POLL) { + if (_poll_handle == nullptr) { + std::call_once(_poll_handle_init_flag, [this] { + _poll_handle = std::make_unique( + _endpoint.get(), defaults::remote_max_connections()); + }); + } + return thread_pool->submit_task([=, this] { - KVIKIO_NVTX_SCOPED_RANGE("task (multi poll)", size, nvtx_color); - detail::RemoteHandlePollBased poll_handle(_endpoint.get(), - defaults::remote_max_connections()); - return poll_handle.pread(buf, size, file_offset); + std::lock_guard const lock{_poll_mutex}; + KVIKIO_NVTX_SCOPED_RANGE("task_remote_multi_poll", size, nvtx_color); + return _poll_handle->pread(buf, size, file_offset); }); } From 2e65bafbb2037c2c892fc23c589a95873b6295ab Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Wed, 14 Jan 2026 16:15:16 -0500 Subject: [PATCH 16/34] Update --- cpp/src/remote_handle.cpp | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/cpp/src/remote_handle.cpp b/cpp/src/remote_handle.cpp index 87fe14404c..51a1607ea4 100644 --- a/cpp/src/remote_handle.cpp +++ b/cpp/src/remote_handle.cpp @@ -812,12 +812,10 @@ std::future RemoteHandle::pread(void* buf, KVIKIO_NVTX_FUNC_RANGE(size); if (defaults::remote_backend() == RemoteBackendType::LIBCURL_MULTI_POLL) { - if (_poll_handle == nullptr) { - std::call_once(_poll_handle_init_flag, [this] { - _poll_handle = std::make_unique( - _endpoint.get(), defaults::remote_max_connections()); - }); - } + std::call_once(_poll_handle_init_flag, [this] { + _poll_handle = std::make_unique( + _endpoint.get(), defaults::remote_max_connections()); + }); return thread_pool->submit_task([=, this] { std::lock_guard const lock{_poll_mutex}; From 019c472fb3dbdba24a558b985ae6075c3bec232a Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Wed, 14 Jan 2026 16:42:59 -0500 Subject: [PATCH 17/34] Update --- .../detail/remote_handle_poll_based.hpp | 15 ++++++----- cpp/include/kvikio/remote_handle.hpp | 3 --- cpp/src/detail/remote_handle_poll_based.cpp | 2 ++ cpp/src/remote_handle.cpp | 26 ++++++++++++------- 4 files changed, 26 insertions(+), 20 deletions(-) diff --git a/cpp/include/kvikio/detail/remote_handle_poll_based.hpp b/cpp/include/kvikio/detail/remote_handle_poll_based.hpp index 7376e4133a..278c719383 100644 --- a/cpp/include/kvikio/detail/remote_handle_poll_based.hpp +++ b/cpp/include/kvikio/detail/remote_handle_poll_based.hpp @@ -46,18 +46,19 @@ struct TransferContext { }; class RemoteHandlePollBased { - public: - RemoteHandlePollBased(RemoteEndpoint* endpoint, std::size_t max_connections); - - ~RemoteHandlePollBased(); - - std::size_t pread(void* buf, std::size_t size, std::size_t file_offset = 0); - private: CURLM* _multi; std::size_t _max_connections; std::vector> _curl_easy_handles; std::vector _transfer_ctxs; RemoteEndpoint* _endpoint; + mutable std::mutex _mutex; + + public: + RemoteHandlePollBased(RemoteEndpoint* endpoint, std::size_t max_connections); + + ~RemoteHandlePollBased(); + + std::size_t pread(void* buf, std::size_t size, std::size_t file_offset = 0); }; } // namespace kvikio::detail diff --git a/cpp/include/kvikio/remote_handle.hpp b/cpp/include/kvikio/remote_handle.hpp index cf0ed89f8f..fe9b4e80f1 100644 --- a/cpp/include/kvikio/remote_handle.hpp +++ b/cpp/include/kvikio/remote_handle.hpp @@ -8,7 +8,6 @@ #include #include #include -#include #include #include @@ -305,8 +304,6 @@ class RemoteHandle { std::unique_ptr _endpoint; std::size_t _nbytes; std::unique_ptr _poll_handle; - std::once_flag _poll_handle_init_flag; - mutable std::mutex _poll_mutex; public: /** diff --git a/cpp/src/detail/remote_handle_poll_based.cpp b/cpp/src/detail/remote_handle_poll_based.cpp index 95a4ab00f5..0f1a80b232 100644 --- a/cpp/src/detail/remote_handle_poll_based.cpp +++ b/cpp/src/detail/remote_handle_poll_based.cpp @@ -104,6 +104,8 @@ std::size_t RemoteHandlePollBased::pread(void* buf, std::size_t size, std::size_ { if (size == 0) return 0; + std::lock_guard lock{_mutex}; + bool const is_host_mem = is_host_memory(buf); auto const chunk_size = defaults::task_size(); auto const num_chunks = (size + chunk_size - 1) / chunk_size; diff --git a/cpp/src/remote_handle.cpp b/cpp/src/remote_handle.cpp index 51a1607ea4..de753c09d9 100644 --- a/cpp/src/remote_handle.cpp +++ b/cpp/src/remote_handle.cpp @@ -663,6 +663,10 @@ RemoteHandle::RemoteHandle(std::unique_ptr endpoint, std::size_t : _endpoint{std::move(endpoint)}, _nbytes{nbytes} { KVIKIO_NVTX_FUNC_RANGE(); + if (defaults::remote_backend() == RemoteBackendType::LIBCURL_MULTI_POLL) { + _poll_handle = std::make_unique( + _endpoint.get(), defaults::remote_max_connections()); + } } RemoteHandle::RemoteHandle(std::unique_ptr endpoint) @@ -670,12 +674,17 @@ RemoteHandle::RemoteHandle(std::unique_ptr endpoint) KVIKIO_NVTX_FUNC_RANGE(); _nbytes = endpoint->get_file_size(); _endpoint = std::move(endpoint); + if (defaults::remote_backend() == RemoteBackendType::LIBCURL_MULTI_POLL) { + _poll_handle = std::make_unique( + _endpoint.get(), defaults::remote_max_connections()); + } } -RemoteHandle::~RemoteHandle() = default; - -RemoteHandle::RemoteHandle(RemoteHandle&& o) = default; - +// Destructor and move operations must be defined in the .cpp file (not defaulted in the header) +// because RemoteHandle uses std::unique_ptr with a forward-declared type. +// The unique_ptr's deleter requires the complete type definition, which is only available here. +RemoteHandle::~RemoteHandle() = default; +RemoteHandle::RemoteHandle(RemoteHandle&& o) = default; RemoteHandle& RemoteHandle::operator=(RemoteHandle&& o) = default; RemoteEndpointType RemoteHandle::remote_endpoint_type() const noexcept @@ -812,13 +821,10 @@ std::future RemoteHandle::pread(void* buf, KVIKIO_NVTX_FUNC_RANGE(size); if (defaults::remote_backend() == RemoteBackendType::LIBCURL_MULTI_POLL) { - std::call_once(_poll_handle_init_flag, [this] { - _poll_handle = std::make_unique( - _endpoint.get(), defaults::remote_max_connections()); - }); - + KVIKIO_EXPECT(_poll_handle != nullptr, + "Remote backend changed to LIBCURL_MULTI_POLL after RemoteHandle construction. " + "The backend setting at construction time and pread call must match."); return thread_pool->submit_task([=, this] { - std::lock_guard const lock{_poll_mutex}; KVIKIO_NVTX_SCOPED_RANGE("task_remote_multi_poll", size, nvtx_color); return _poll_handle->pread(buf, size, file_offset); }); From cf1dc29416d2546b67c6d7f9d0b538c10534b9b3 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Thu, 15 Jan 2026 12:01:49 -0500 Subject: [PATCH 18/34] Impl batch copy --- cpp/src/detail/remote_handle_poll_based.cpp | 85 +++++++++++++++++++-- 1 file changed, 79 insertions(+), 6 deletions(-) diff --git a/cpp/src/detail/remote_handle_poll_based.cpp b/cpp/src/detail/remote_handle_poll_based.cpp index 0f1a80b232..58d4274952 100644 --- a/cpp/src/detail/remote_handle_poll_based.cpp +++ b/cpp/src/detail/remote_handle_poll_based.cpp @@ -59,6 +59,77 @@ void reconfig_easy_handle(CURL* curl_easy_handle, std::size_t const remote_end = remote_start + actual_chunk_size - 1; std::string const byte_range = std::to_string(remote_start) + "-" + std::to_string(remote_end); KVIKIO_CHECK_CURL_EASY(curl_easy_setopt(curl_easy_handle, CURLOPT_RANGE, byte_range.c_str())); +} + +class MemcpyHelper { + private: + std::size_t _max_connections; + std::vector _srcs; + std::vector _dsts; + std::vector _sizes; + std::size_t _num_entries; + CUstream _stream; + + void batch_copy_emulated() + { + for (std::size_t i = 0; i < _num_entries; ++i) { + CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyHtoDAsync( + _dsts[i], reinterpret_cast(_srcs[i]), _sizes[i], _stream)); + } + } + + public: + MemcpyHelper(std::size_t max_connections, CUstream stream) + : _max_connections{max_connections}, + _srcs(max_connections), + _dsts(max_connections), + _sizes(max_connections), + _num_entries{0}, + _stream{stream} + { + } + + void reset() { _num_entries = 0; } + + void register_region(void* dst, void* src, std::size_t size) + { + _dsts[_num_entries] = convert_void2deviceptr(dst); + _srcs[_num_entries] = convert_void2deviceptr(src); + _sizes[_num_entries] = size; + ++_num_entries; + } + + void batch_copy() + { + if (_num_entries == 0) return; + +#if CUDA_VERSION >= 12080 + if (cudaAPI::instance().MemcpyBatchAsync) { + CUmemcpyAttributes attrs{}; + std::size_t attrs_idxs[] = {0}; + attrs.srcAccessOrder = CUmemcpySrcAccessOrder_enum::CU_MEMCPY_SRC_ACCESS_ORDER_STREAM; + CUDA_DRIVER_TRY( + cudaAPI::instance().MemcpyBatchAsync(_dsts.data(), + _srcs.data(), + _sizes.data(), + _num_entries, + &attrs, + attrs_idxs, + static_cast(1) /* num_attrs */, +#if CUDA_VERSION < 13000 + static_cast(nullptr), +#endif + _stream)); + } else { + // Fall back to the conventional H2D copy if the batch copy API is not available. + batch_copy_emulated(); + } +#else + batch_copy_emulated(); +#endif + + CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(_stream)); + } }; } // namespace @@ -106,6 +177,7 @@ std::size_t RemoteHandlePollBased::pread(void* buf, std::size_t size, std::size_ std::lock_guard lock{_mutex}; + // Prepare for the run bool const is_host_mem = is_host_memory(buf); auto const chunk_size = defaults::task_size(); auto const num_chunks = (size + chunk_size - 1) / chunk_size; @@ -118,7 +190,6 @@ std::size_t RemoteHandlePollBased::pread(void* buf, std::size_t size, std::size_ stream = detail::StreamsByThread::get(); } - // Prepare for the run std::size_t num_byte_transferred{0}; std::size_t current_chunk_idx{0}; for (std::size_t i = 0; i < actual_max_connections; ++i) { @@ -133,6 +204,8 @@ std::size_t RemoteHandlePollBased::pread(void* buf, std::size_t size, std::size_ KVIKIO_CHECK_CURL_MULTI(curl_multi_add_handle(_multi, _curl_easy_handles[i]->handle())); } + MemcpyHelper memcpy_helper(actual_max_connections, stream); + // Start the run int still_running{0}; do { @@ -140,6 +213,7 @@ std::size_t RemoteHandlePollBased::pread(void* buf, std::size_t size, std::size_ CURLMsg* msg; int msgs_left; + if (!is_host_mem) { memcpy_helper.reset(); } // Handle the completed messages while ((msg = curl_multi_info_read(_multi, &msgs_left))) { @@ -153,11 +227,8 @@ std::size_t RemoteHandlePollBased::pread(void* buf, std::size_t size, std::size_ "Chunked transfer failed in poll-based multi API"); if (!is_host_mem) { - CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyHtoDAsync(convert_void2deviceptr(ctx->buf), - ctx->bounce_buffer.value().get(), - ctx->bytes_transferred, - stream)); - CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream)); + memcpy_helper.register_region( + ctx->buf, ctx->bounce_buffer.value().get(), ctx->bytes_transferred); } num_byte_transferred += ctx->bytes_transferred; @@ -176,6 +247,8 @@ std::size_t RemoteHandlePollBased::pread(void* buf, std::size_t size, std::size_ } } + if (!is_host_mem) { memcpy_helper.batch_copy(); } + if (still_running > 0) { KVIKIO_CHECK_CURL_MULTI(curl_multi_poll(_multi, nullptr, 0, 1000, nullptr)); } From 4e7eed3136701359674534d6a5368e61396aa537 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Thu, 15 Jan 2026 15:15:06 -0500 Subject: [PATCH 19/34] Update --- .../detail/remote_handle_poll_based.hpp | 16 ++- cpp/src/detail/remote_handle_poll_based.cpp | 104 +++++------------- 2 files changed, 42 insertions(+), 78 deletions(-) diff --git a/cpp/include/kvikio/detail/remote_handle_poll_based.hpp b/cpp/include/kvikio/detail/remote_handle_poll_based.hpp index 278c719383..97c7f69480 100644 --- a/cpp/include/kvikio/detail/remote_handle_poll_based.hpp +++ b/cpp/include/kvikio/detail/remote_handle_poll_based.hpp @@ -36,13 +36,27 @@ inline void check_curl_multi(CURLMcode err_code, char const* filename, int line_ throw std::runtime_error(ss.str()); } +class BounceBufferManager { + public: + BounceBufferManager(std::size_t num_bounce_buffers = 2); + + void* data() const noexcept; + + void copy(void* dst, std::size_t size, CUstream stream); + + private: + std::size_t _bounce_buffer_idx{}; + std::size_t _num_bounce_buffers{}; + std::vector _bounce_buffers; +}; + struct TransferContext { bool overflow_error{}; bool is_host_mem{}; char* buf{}; std::size_t chunk_size{}; std::size_t bytes_transferred{}; - std::optional bounce_buffer; + std::optional _bounce_buffer_manager; }; class RemoteHandlePollBased { diff --git a/cpp/src/detail/remote_handle_poll_based.cpp b/cpp/src/detail/remote_handle_poll_based.cpp index 58d4274952..baa0cd1e4d 100644 --- a/cpp/src/detail/remote_handle_poll_based.cpp +++ b/cpp/src/detail/remote_handle_poll_based.cpp @@ -27,7 +27,7 @@ std::size_t write_callback(char* buffer, std::size_t size, std::size_t nmemb, vo return CURL_WRITEFUNC_ERROR; } KVIKIO_NVTX_FUNC_RANGE(nbytes); - void* dst = ctx->is_host_mem ? ctx->buf : ctx->bounce_buffer.value().get(); + void* dst = ctx->is_host_mem ? ctx->buf : ctx->_bounce_buffer_manager->data(); std::memcpy(static_cast(dst) + ctx->bytes_transferred, buffer, nbytes); ctx->bytes_transferred += nbytes; return nbytes; @@ -51,8 +51,8 @@ void reconfig_easy_handle(CURL* curl_easy_handle, ctx->chunk_size = actual_chunk_size; ctx->bytes_transferred = 0; - if (!is_host_mem && !ctx->bounce_buffer.has_value()) { - ctx->bounce_buffer.emplace(CudaPinnedBounceBufferPool::instance().get()); + if (!is_host_mem && !ctx->_bounce_buffer_manager.has_value()) { + ctx->_bounce_buffer_manager.emplace(); } std::size_t const remote_start = file_offset + local_offset; @@ -60,78 +60,31 @@ void reconfig_easy_handle(CURL* curl_easy_handle, std::string const byte_range = std::to_string(remote_start) + "-" + std::to_string(remote_end); KVIKIO_CHECK_CURL_EASY(curl_easy_setopt(curl_easy_handle, CURLOPT_RANGE, byte_range.c_str())); } +} // namespace -class MemcpyHelper { - private: - std::size_t _max_connections; - std::vector _srcs; - std::vector _dsts; - std::vector _sizes; - std::size_t _num_entries; - CUstream _stream; - - void batch_copy_emulated() - { - for (std::size_t i = 0; i < _num_entries; ++i) { - CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyHtoDAsync( - _dsts[i], reinterpret_cast(_srcs[i]), _sizes[i], _stream)); - } - } - - public: - MemcpyHelper(std::size_t max_connections, CUstream stream) - : _max_connections{max_connections}, - _srcs(max_connections), - _dsts(max_connections), - _sizes(max_connections), - _num_entries{0}, - _stream{stream} - { - } - - void reset() { _num_entries = 0; } - - void register_region(void* dst, void* src, std::size_t size) - { - _dsts[_num_entries] = convert_void2deviceptr(dst); - _srcs[_num_entries] = convert_void2deviceptr(src); - _sizes[_num_entries] = size; - ++_num_entries; +BounceBufferManager::BounceBufferManager(std::size_t num_bounce_buffers) + : _num_bounce_buffers{num_bounce_buffers} +{ + for (std::size_t i = 0; i < _num_bounce_buffers; ++i) { + _bounce_buffers.emplace_back(CudaPinnedBounceBufferPool::instance().get()); } +} - void batch_copy() - { - if (_num_entries == 0) return; - -#if CUDA_VERSION >= 12080 - if (cudaAPI::instance().MemcpyBatchAsync) { - CUmemcpyAttributes attrs{}; - std::size_t attrs_idxs[] = {0}; - attrs.srcAccessOrder = CUmemcpySrcAccessOrder_enum::CU_MEMCPY_SRC_ACCESS_ORDER_STREAM; - CUDA_DRIVER_TRY( - cudaAPI::instance().MemcpyBatchAsync(_dsts.data(), - _srcs.data(), - _sizes.data(), - _num_entries, - &attrs, - attrs_idxs, - static_cast(1) /* num_attrs */, -#if CUDA_VERSION < 13000 - static_cast(nullptr), -#endif - _stream)); - } else { - // Fall back to the conventional H2D copy if the batch copy API is not available. - batch_copy_emulated(); - } -#else - batch_copy_emulated(); -#endif +void* BounceBufferManager::data() const noexcept +{ + return _bounce_buffers[_bounce_buffer_idx].get(); +} - CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(_stream)); +void BounceBufferManager::copy(void* dst, std::size_t size, CUstream stream) +{ + CUDA_DRIVER_TRY( + cudaAPI::instance().MemcpyHtoDAsync(convert_void2deviceptr(dst), data(), size, stream)); + ++_bounce_buffer_idx; + if (_bounce_buffer_idx == _bounce_buffers.size()) { + _bounce_buffer_idx = 0; + CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream)); } -}; -} // namespace +} RemoteHandlePollBased::RemoteHandlePollBased(RemoteEndpoint* endpoint, std::size_t max_connections) : _endpoint{endpoint}, _max_connections{max_connections}, _transfer_ctxs(_max_connections) @@ -204,8 +157,6 @@ std::size_t RemoteHandlePollBased::pread(void* buf, std::size_t size, std::size_ KVIKIO_CHECK_CURL_MULTI(curl_multi_add_handle(_multi, _curl_easy_handles[i]->handle())); } - MemcpyHelper memcpy_helper(actual_max_connections, stream); - // Start the run int still_running{0}; do { @@ -213,7 +164,6 @@ std::size_t RemoteHandlePollBased::pread(void* buf, std::size_t size, std::size_ CURLMsg* msg; int msgs_left; - if (!is_host_mem) { memcpy_helper.reset(); } // Handle the completed messages while ((msg = curl_multi_info_read(_multi, &msgs_left))) { @@ -227,8 +177,7 @@ std::size_t RemoteHandlePollBased::pread(void* buf, std::size_t size, std::size_ "Chunked transfer failed in poll-based multi API"); if (!is_host_mem) { - memcpy_helper.register_region( - ctx->buf, ctx->bounce_buffer.value().get(), ctx->bytes_transferred); + ctx->_bounce_buffer_manager->copy(ctx->buf, ctx->bytes_transferred, stream); } num_byte_transferred += ctx->bytes_transferred; @@ -247,14 +196,15 @@ std::size_t RemoteHandlePollBased::pread(void* buf, std::size_t size, std::size_ } } - if (!is_host_mem) { memcpy_helper.batch_copy(); } - if (still_running > 0) { KVIKIO_CHECK_CURL_MULTI(curl_multi_poll(_multi, nullptr, 0, 1000, nullptr)); } } while (still_running > 0); + // Ensure all H2D transfers complete before returning + if (!is_host_mem) { CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream)); } + return num_byte_transferred; } } // namespace kvikio::detail From 9b860fed27bbca92f0e13aeb293920499d709686 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Thu, 15 Jan 2026 22:33:55 -0500 Subject: [PATCH 20/34] Make num_bounce_buffer configurable --- cpp/include/kvikio/defaults.hpp | 5 +++++ .../kvikio/detail/remote_handle_poll_based.hpp | 2 +- cpp/src/defaults.cpp | 14 ++++++++++++++ cpp/src/detail/remote_handle_poll_based.cpp | 2 +- 4 files changed, 21 insertions(+), 2 deletions(-) diff --git a/cpp/include/kvikio/defaults.hpp b/cpp/include/kvikio/defaults.hpp index 44594b0294..e5eeab49df 100644 --- a/cpp/include/kvikio/defaults.hpp +++ b/cpp/include/kvikio/defaults.hpp @@ -128,6 +128,7 @@ class defaults { bool _thread_pool_per_block_device; RemoteBackendType _remote_backend; std::size_t _remote_max_connections; + std::size_t _num_bounce_buffers; static unsigned int get_num_threads_from_env(); @@ -431,6 +432,10 @@ class defaults { [[nodiscard]] static std::size_t remote_max_connections(); static void set_remote_max_connections(std::size_t remote_max_connections); + + [[nodiscard]] static std::size_t num_bounce_buffers(); + + static void set_num_bounce_buffers(std::size_t num_bounce_buffers); }; } // namespace kvikio diff --git a/cpp/include/kvikio/detail/remote_handle_poll_based.hpp b/cpp/include/kvikio/detail/remote_handle_poll_based.hpp index 97c7f69480..5a2c618222 100644 --- a/cpp/include/kvikio/detail/remote_handle_poll_based.hpp +++ b/cpp/include/kvikio/detail/remote_handle_poll_based.hpp @@ -38,7 +38,7 @@ inline void check_curl_multi(CURLMcode err_code, char const* filename, int line_ class BounceBufferManager { public: - BounceBufferManager(std::size_t num_bounce_buffers = 2); + BounceBufferManager(std::size_t num_bounce_buffers); void* data() const noexcept; diff --git a/cpp/src/defaults.cpp b/cpp/src/defaults.cpp index f1c9a59b33..ee16331f90 100644 --- a/cpp/src/defaults.cpp +++ b/cpp/src/defaults.cpp @@ -170,6 +170,13 @@ defaults::defaults() env > 0, "KVIKIO_REMOTE_MAX_CONNECTIONS has to be a positive integer", std::invalid_argument); _remote_max_connections = env; } + + { + auto const env = getenv_or("KVIKIO_NUM_BOUNCE_BUFFERS", 2); + KVIKIO_EXPECT( + env > 0, "KVIKIO_NUM_BOUNCE_BUFFERS has to be a positive integer", std::invalid_argument); + _num_bounce_buffers = env; + } } defaults* defaults::instance() @@ -288,4 +295,11 @@ void defaults::set_remote_max_connections(std::size_t remote_max_connections) { instance()->_remote_max_connections = remote_max_connections; } + +std::size_t defaults::num_bounce_buffers() { return instance()->_num_bounce_buffers; } + +void defaults::set_num_bounce_buffers(std::size_t num_bounce_buffers) +{ + instance()->_num_bounce_buffers = num_bounce_buffers; +} } // namespace kvikio diff --git a/cpp/src/detail/remote_handle_poll_based.cpp b/cpp/src/detail/remote_handle_poll_based.cpp index baa0cd1e4d..3bf4d60ffe 100644 --- a/cpp/src/detail/remote_handle_poll_based.cpp +++ b/cpp/src/detail/remote_handle_poll_based.cpp @@ -52,7 +52,7 @@ void reconfig_easy_handle(CURL* curl_easy_handle, ctx->bytes_transferred = 0; if (!is_host_mem && !ctx->_bounce_buffer_manager.has_value()) { - ctx->_bounce_buffer_manager.emplace(); + ctx->_bounce_buffer_manager.emplace(defaults::num_bounce_buffers()); } std::size_t const remote_start = file_offset + local_offset; From db8f49bb3e29aa7a7cb4220d818048c2de84d3ce Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Thu, 15 Jan 2026 23:36:34 -0500 Subject: [PATCH 21/34] Remove temporary C++ unit test --- cpp/tests/test_remote_handle.cpp | 35 +------------------------------- 1 file changed, 1 insertion(+), 34 deletions(-) diff --git a/cpp/tests/test_remote_handle.cpp b/cpp/tests/test_remote_handle.cpp index f7586c04ec..41d975fd00 100644 --- a/cpp/tests/test_remote_handle.cpp +++ b/cpp/tests/test_remote_handle.cpp @@ -1,10 +1,9 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION. + * SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION. * SPDX-License-Identifier: Apache-2.0 */ #include -#include #include #include #include @@ -16,7 +15,6 @@ #include #include "utils/env.hpp" -#include "utils/utils.hpp" using ::testing::HasSubstr; using ::testing::ThrowsMessage; @@ -280,34 +278,3 @@ TEST_F(RemoteHandleTest, test_open) } } } - -TEST_F(RemoteHandleTest, poll_based) -{ - auto const url = kvikio::getenv_or("KVIKIO_REMOTE_URL", ""); - - auto endpoint = std::make_unique(url); - // auto endpoint = std::make_unique(url); - - kvikio::RemoteHandle remote_handle(std::move(endpoint)); - - { - kvikio::test::DevBuffer dev_buf(remote_handle.nbytes() / sizeof(double)); - auto fut = remote_handle.pread(dev_buf.ptr, remote_handle.nbytes()); - auto num_bytes_read = fut.get(); - - EXPECT_EQ(num_bytes_read, remote_handle.nbytes()); - auto host_buf = dev_buf.to_vector(); - std::cout << std::fixed << "d[0]: " << host_buf.front() << ", d[n-1]: " << host_buf.back() - << "\n"; - } - - { - std::vector host_buf(remote_handle.nbytes() / sizeof(double)); - auto fut = remote_handle.pread(host_buf.data(), remote_handle.nbytes()); - auto num_bytes_read = fut.get(); - - EXPECT_EQ(num_bytes_read, remote_handle.nbytes()); - std::cout << std::fixed << "d[0]: " << host_buf.front() << ", d[n-1]: " << host_buf.back() - << "\n"; - } -} From edb7e3be6ae1d3915f4e546e753ae81de45f47b1 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Thu, 15 Jan 2026 23:54:58 -0500 Subject: [PATCH 22/34] Add Python API --- python/kvikio/kvikio/__init__.py | 2 +- python/kvikio/kvikio/_lib/defaults.pyx | 57 ++++++++++++++++++++++++-- python/kvikio/kvikio/defaults.py | 11 ++++- 3 files changed, 65 insertions(+), 5 deletions(-) diff --git a/python/kvikio/kvikio/__init__.py b/python/kvikio/kvikio/__init__.py index ac13c2ce74..e8cccf7854 100644 --- a/python/kvikio/kvikio/__init__.py +++ b/python/kvikio/kvikio/__init__.py @@ -12,7 +12,7 @@ del libkvikio -from kvikio._lib.defaults import CompatMode # noqa: F401 +from kvikio._lib.defaults import CompatMode, RemoteBackend # noqa: F401 from kvikio._version import __git_commit__, __version__ from kvikio.cufile import CuFile, clear_page_cache, get_page_cache_info from kvikio.mmap import Mmap diff --git a/python/kvikio/kvikio/_lib/defaults.pyx b/python/kvikio/kvikio/_lib/defaults.pyx index e5bfbca713..129015be1a 100644 --- a/python/kvikio/kvikio/_lib/defaults.pyx +++ b/python/kvikio/kvikio/_lib/defaults.pyx @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION. All rights reserved. +# SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # distutils: language = c++ @@ -14,6 +14,9 @@ cdef extern from "" namespace "kvikio" nogil: OFF = 0 ON = 1 AUTO = 2 + cpdef enum class RemoteBackendType(uint8_t): + LIBCURL_EASY = 0 + LIBCURL_MULTI_POLL = 1 bool cpp_is_compat_mode_preferred \ "kvikio::defaults::is_compat_mode_preferred"() except + CompatMode cpp_compat_mode "kvikio::defaults::compat_mode"() except + @@ -42,10 +45,19 @@ cdef extern from "" namespace "kvikio" nogil: "kvikio::defaults::set_http_timeout"(long timeout_seconds) except + bool cpp_auto_direct_io_read "kvikio::defaults::auto_direct_io_read"() except + void cpp_set_auto_direct_io_read \ - "kvikio::defaults::set_auto_direct_io_read"(size_t flag) except + + "kvikio::defaults::set_auto_direct_io_read"(bool flag) except + bool cpp_auto_direct_io_write "kvikio::defaults::auto_direct_io_write"() except + void cpp_set_auto_direct_io_write \ - "kvikio::defaults::set_auto_direct_io_write"(size_t flag) except + + "kvikio::defaults::set_auto_direct_io_write"(bool flag) except + + RemoteBackendType cpp_remote_backend "kvikio::defaults::remote_backend"() except + + void cpp_set_remote_backend \ + "kvikio::defaults::set_remote_backend"(RemoteBackendType remote_backend) except + + size_t cpp_remote_max_connections "kvikio::defaults::remote_max_connections"() except + + void cpp_set_remote_max_connections \ + "kvikio::defaults::set_remote_max_connections"(size_t remote_max_connections) except + + size_t cpp_num_bounce_buffers "kvikio::defaults::num_bounce_buffers"() except + + void cpp_set_num_bounce_buffers \ + "kvikio::defaults::set_num_bounce_buffers"(size_t num_bounce_buffers) except + def is_compat_mode_preferred() -> bool: @@ -179,3 +191,42 @@ def set_auto_direct_io_write(flag: bool) -> None: cdef bool cpp_flag = flag with nogil: cpp_set_auto_direct_io_write(cpp_flag) + + +def remote_backend() -> RemoteBackendType: + cdef RemoteBackendType result + with nogil: + result = cpp_remote_backend() + return result + + +def set_remote_backend(remote_backend: RemoteBackendType) -> None: + cdef RemoteBackendType cpp_remote_backend = remote_backend + with nogil: + cpp_set_remote_backend(cpp_remote_backend) + + +def remote_max_connections() -> int: + cdef size_t result + with nogil: + result = cpp_remote_max_connections() + return result + + +def set_remote_max_connections(attempts: int) -> None: + cdef size_t cpp_attempts = attempts + with nogil: + cpp_set_remote_max_connections(cpp_attempts) + + +def num_bounce_buffers() -> int: + cdef size_t result + with nogil: + result = cpp_num_bounce_buffers() + return result + + +def set_num_bounce_buffers(attempts: int) -> None: + cdef size_t cpp_attempts = attempts + with nogil: + cpp_set_num_bounce_buffers(cpp_attempts) diff --git a/python/kvikio/kvikio/defaults.py b/python/kvikio/kvikio/defaults.py index 3af9bd0929..ae8c96a58f 100644 --- a/python/kvikio/kvikio/defaults.py +++ b/python/kvikio/kvikio/defaults.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2021-2025, NVIDIA CORPORATION. All rights reserved. +# SPDX-FileCopyrightText: Copyright (c) 2021-2026, NVIDIA CORPORATION. All rights reserved. # SPDX-License-Identifier: Apache-2.0 @@ -58,6 +58,9 @@ def _property_getter_and_setter(self) -> tuple[dict[str, Any], dict[str, Any]]: "http_timeout", "auto_direct_io_read", "auto_direct_io_write", + "remote_backend", + "remote_max_connections", + "num_bounce_buffers", ] property_getters = {} @@ -127,6 +130,9 @@ def set(*config) -> ConfigContextManager: - ``"http_timeout"`` - ``"auto_direct_io_read"`` - ``"auto_direct_io_write"`` + - ``"remote_backend"`` + - ``"remote_max_connections"`` + - ``"num_bounce_buffers"`` Returns ------- @@ -172,6 +178,9 @@ def get(config_name: str) -> Any: - ``"http_timeout"`` - ``"auto_direct_io_read"`` - ``"auto_direct_io_write"`` + - ``"remote_backend"`` + - ``"remote_max_connections"`` + - ``"num_bounce_buffers"`` Returns ------- From f93c8338cc533b45c41d1f4df251728c9a32a361 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Fri, 16 Jan 2026 00:07:47 -0500 Subject: [PATCH 23/34] Fix a couple of minor issues --- cpp/src/detail/remote_handle_poll_based.cpp | 8 ++++++-- python/kvikio/kvikio/_lib/defaults.pyx | 6 +++--- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/cpp/src/detail/remote_handle_poll_based.cpp b/cpp/src/detail/remote_handle_poll_based.cpp index 3bf4d60ffe..82b3bdf104 100644 --- a/cpp/src/detail/remote_handle_poll_based.cpp +++ b/cpp/src/detail/remote_handle_poll_based.cpp @@ -18,7 +18,6 @@ namespace kvikio::detail { namespace { std::size_t write_callback(char* buffer, std::size_t size, std::size_t nmemb, void* userdata) { - KVIKIO_NVTX_FUNC_RANGE(); auto* ctx = reinterpret_cast(userdata); std::size_t const nbytes = size * nmemb; @@ -77,6 +76,8 @@ void* BounceBufferManager::data() const noexcept void BounceBufferManager::copy(void* dst, std::size_t size, CUstream stream) { + KVIKIO_EXPECT(size <= defaults::bounce_buffer_size(), + "Host-to-device copy size exceeds bounce buffer capacity"); CUDA_DRIVER_TRY( cudaAPI::instance().MemcpyHtoDAsync(convert_void2deviceptr(dst), data(), size, stream)); ++_bounce_buffer_idx; @@ -87,7 +88,7 @@ void BounceBufferManager::copy(void* dst, std::size_t size, CUstream stream) } RemoteHandlePollBased::RemoteHandlePollBased(RemoteEndpoint* endpoint, std::size_t max_connections) - : _endpoint{endpoint}, _max_connections{max_connections}, _transfer_ctxs(_max_connections) + : _max_connections{max_connections}, _transfer_ctxs(_max_connections), _endpoint{endpoint} { KVIKIO_EXPECT(defaults::task_size() <= defaults::bounce_buffer_size(), "bounce buffer size cannot be less than task size."); @@ -173,6 +174,9 @@ std::size_t RemoteHandlePollBased::pread(void* buf, std::size_t size, std::size_ KVIKIO_CHECK_CURL_EASY(curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, &ctx)); KVIKIO_EXPECT(ctx != nullptr, "Failed to retrieve transfer context"); + KVIKIO_EXPECT(!ctx->overflow_error, + "Overflow detected. Maybe the server doesn't support file ranges", + std::overflow_error); KVIKIO_EXPECT(msg->data.result == CURLE_OK, "Chunked transfer failed in poll-based multi API"); diff --git a/python/kvikio/kvikio/_lib/defaults.pyx b/python/kvikio/kvikio/_lib/defaults.pyx index 129015be1a..9adedfe24e 100644 --- a/python/kvikio/kvikio/_lib/defaults.pyx +++ b/python/kvikio/kvikio/_lib/defaults.pyx @@ -213,10 +213,10 @@ def remote_max_connections() -> int: return result -def set_remote_max_connections(attempts: int) -> None: - cdef size_t cpp_attempts = attempts +def set_remote_max_connections(remote_max_connections: int) -> None: + cdef size_t cpp_remote_max_connections = remote_max_connections with nogil: - cpp_set_remote_max_connections(cpp_attempts) + cpp_set_remote_max_connections(cpp_remote_max_connections) def num_bounce_buffers() -> int: From c024738c683717029289cc51bd08d2c3dbe53999 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Fri, 16 Jan 2026 00:10:38 -0500 Subject: [PATCH 24/34] Update --- python/kvikio/kvikio/__init__.py | 2 +- python/kvikio/kvikio/_lib/defaults.pyx | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/python/kvikio/kvikio/__init__.py b/python/kvikio/kvikio/__init__.py index e8cccf7854..5faef22cbf 100644 --- a/python/kvikio/kvikio/__init__.py +++ b/python/kvikio/kvikio/__init__.py @@ -12,7 +12,7 @@ del libkvikio -from kvikio._lib.defaults import CompatMode, RemoteBackend # noqa: F401 +from kvikio._lib.defaults import CompatMode, RemoteBackendType # noqa: F401 from kvikio._version import __git_commit__, __version__ from kvikio.cufile import CuFile, clear_page_cache, get_page_cache_info from kvikio.mmap import Mmap diff --git a/python/kvikio/kvikio/_lib/defaults.pyx b/python/kvikio/kvikio/_lib/defaults.pyx index 9adedfe24e..2752356dbe 100644 --- a/python/kvikio/kvikio/_lib/defaults.pyx +++ b/python/kvikio/kvikio/_lib/defaults.pyx @@ -226,7 +226,7 @@ def num_bounce_buffers() -> int: return result -def set_num_bounce_buffers(attempts: int) -> None: - cdef size_t cpp_attempts = attempts +def set_num_bounce_buffers(num_bounce_buffers: int) -> None: + cdef size_t cpp_num_bounce_buffers = num_bounce_buffers with nogil: - cpp_set_num_bounce_buffers(cpp_attempts) + cpp_set_num_bounce_buffers(cpp_num_bounce_buffers) From a1f1f9b7b16548cca37de93eb6755351d3e4f644 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Fri, 16 Jan 2026 00:37:26 -0500 Subject: [PATCH 25/34] Add more Doxygen C++ comments --- cpp/include/kvikio/defaults.hpp | 40 +++++++ .../detail/remote_handle_poll_based.hpp | 106 ++++++++++++++++++ cpp/include/kvikio/remote_backend_type.hpp | 16 ++- cpp/include/kvikio/remote_handle.hpp | 2 + cpp/src/detail/remote_handle_poll_based.cpp | 30 +++++ cpp/src/remote_handle.cpp | 12 +- 6 files changed, 198 insertions(+), 8 deletions(-) diff --git a/cpp/include/kvikio/defaults.hpp b/cpp/include/kvikio/defaults.hpp index e5eeab49df..43b3d62e55 100644 --- a/cpp/include/kvikio/defaults.hpp +++ b/cpp/include/kvikio/defaults.hpp @@ -425,16 +425,56 @@ class defaults { */ static void set_thread_pool_per_block_device(bool flag); + /** + * @brief Get the current remote I/O backend type. + * + * @return The currently configured RemoteBackendType. + */ [[nodiscard]] static RemoteBackendType remote_backend(); + /** + * @brief Set the remote I/O backend type. + * + * Note: Changing this after creating a RemoteHandle has no effect on existing handles. The + * backend is determined at RemoteHandle construction time. + * + * @param remote_backend The backend type to use for new RemoteHandle instances. + */ static void set_remote_backend(RemoteBackendType remote_backend); + /** + * @brief Get the maximum number of concurrent connections for poll-based remote I/O. + * + * Only applies when using RemoteBackendType::LIBCURL_MULTI_POLL. + * + * @return Maximum number of concurrent connections. + */ [[nodiscard]] static std::size_t remote_max_connections(); + /** + * @brief Set the maximum number of concurrent connections for poll-based remote I/O. + * + * Only applies when using RemoteBackendType::LIBCURL_MULTI_POLL. + * + * @param remote_max_connections Maximum concurrent connections (must be positive). + */ static void set_remote_max_connections(std::size_t remote_max_connections); + /** + * @brief Get the number of bounce buffers used per connection for poll-based remote I/O. + * + * Controls k-way buffering: higher values allow more overlap between network I/O and H2D + * transfers but consume more pinned memory. + * + * @return Number of bounce buffers per connection. + */ [[nodiscard]] static std::size_t num_bounce_buffers(); + /** + * @brief Set the number of bounce buffers used per connection for poll-based remote I/O. + * + * @param num_bounce_buffers Number of bounce buffers per connection (must be positive). + */ static void set_num_bounce_buffers(std::size_t num_bounce_buffers); }; diff --git a/cpp/include/kvikio/detail/remote_handle_poll_based.hpp b/cpp/include/kvikio/detail/remote_handle_poll_based.hpp index 5a2c618222..204a78db8e 100644 --- a/cpp/include/kvikio/detail/remote_handle_poll_based.hpp +++ b/cpp/include/kvikio/detail/remote_handle_poll_based.hpp @@ -10,14 +10,34 @@ #include #include +/** + * @brief Check a libcurl easy interface return code and throw on error. + * + * @param err_code The CURLcode to check. + * @exception std::runtime_error if err_code is not CURLE_OK. + */ #define KVIKIO_CHECK_CURL_EASY(err_code) \ kvikio::detail::check_curl_easy(err_code, __FILE__, __LINE__) +/** + * @brief Check a libcurl multi interface return code and throw on error. + * + * @param err_code The CURLMcode to check. + * @exception std::runtime_error if err_code is not CURLM_OK. + */ #define KVIKIO_CHECK_CURL_MULTI(err_code) \ kvikio::detail::check_curl_multi(err_code, __FILE__, __LINE__) namespace kvikio::detail { +/** + * @brief Check a libcurl easy interface return code and throw on error. + * + * @param err_code The CURLcode to check. + * @param filename Source filename for error reporting. + * @param line_number Source line number for error reporting. + * @exception std::runtime_error if err_code is not CURLE_OK. + */ inline void check_curl_easy(CURLcode err_code, char const* filename, int line_number) { if (err_code == CURLcode::CURLE_OK) { return; } @@ -27,6 +47,14 @@ inline void check_curl_easy(CURLcode err_code, char const* filename, int line_nu throw std::runtime_error(ss.str()); } +/** + * @brief Check a libcurl multi interface return code and throw on error. + * + * @param err_code The CURLMcode to check. + * @param filename Source filename for error reporting. + * @param line_number Source line number for error reporting. + * @exception std::runtime_error if err_code is not CURLM_OK. + */ inline void check_curl_multi(CURLMcode err_code, char const* filename, int line_number) { if (err_code == CURLMcode::CURLM_OK) { return; } @@ -36,12 +64,41 @@ inline void check_curl_multi(CURLMcode err_code, char const* filename, int line_ throw std::runtime_error(ss.str()); } +/** + * @brief Manages a rotating set of bounce buffers for overlapping network I/O with H2D transfers. + * + * This class implements k-way buffering, rotating through buffers circularly: while one buffer + * receives data from the network, previously filled buffers can be asynchronously copied to device + * memory. When all buffers have been used, the class synchronizes the CUDA stream before reusing + * buffers. + */ class BounceBufferManager { public: + /** + * @brief Construct a BounceBufferManager with the specified number of bounce buffers. + * + * @param num_bounce_buffers Number of bounce buffers to allocate from the pool. + */ BounceBufferManager(std::size_t num_bounce_buffers); + /** + * @brief Get a pointer to the current bounce buffer's data. + * + * @return Pointer to the current buffer's memory. + */ void* data() const noexcept; + /** + * @brief Copy data from the current bounce buffer to device memory and rotate to the next buffer. + * + * Issues an asynchronous H2D copy and advances to the next buffer. When wrapping around to buffer + * 0, synchronizes the stream to ensure all previous copies have completed before reuse. + * + * @param dst Device memory destination pointer. + * @param size Number of bytes to copy. + * @param stream CUDA stream for the asynchronous copy. + * @exception kvikio::CUfileException if size exceeds bounce buffer capacity. + */ void copy(void* dst, std::size_t size, CUstream stream); private: @@ -50,6 +107,12 @@ class BounceBufferManager { std::vector _bounce_buffers; }; +/** + * @brief Context for tracking the state of a single chunked transfer. + * + * Each concurrent connection has an associated TransferContext that tracks the destination buffer, + * transfer progress, and manages optional bounce buffers for GPU destinations. + */ struct TransferContext { bool overflow_error{}; bool is_host_mem{}; @@ -59,6 +122,17 @@ struct TransferContext { std::optional _bounce_buffer_manager; }; +/** + * @brief Poll-based remote file handle using libcurl's multi interface. + * + * This class provides an alternative to the thread-pool-based remote I/O by using libcurl's multi + * interface with curl_multi_poll() for managing concurrent connections. It implements chunked + * parallel downloads with k-way buffering to overlap network transfers with host-to-device memory + * copies. + * + * @note Thread safety: The pread() method is protected by a mutex, making it safe to call from + * multiple threads, though calls will be serialized. + */ class RemoteHandlePollBased { private: CURLM* _multi; @@ -69,10 +143,42 @@ class RemoteHandlePollBased { mutable std::mutex _mutex; public: + /** + * @brief Construct a poll-based remote handle. + * + * Initializes the libcurl multi handle and creates the specified number of easy handles for + * concurrent transfers. + * + * @param endpoint Non-owning pointer to the remote endpoint. Must outlive this object. + * @param max_connections Maximum number of concurrent connections to use. + * @exception kvikio::CUfileException if task_size exceeds bounce_buffer_size. + * @exception kvikio::CUfileException if libcurl multi initialization fails. + */ RemoteHandlePollBased(RemoteEndpoint* endpoint, std::size_t max_connections); + /** + * @brief Destructor that cleans up libcurl multi resources. + * + * Removes all easy handles from the multi handle and performs cleanup. Errors during cleanup are + * logged but do not throw. + */ ~RemoteHandlePollBased(); + /** + * @brief Read data from the remote file into a buffer. + * + * Performs a parallel chunked read using multiple concurrent HTTP range requests. For device + * memory destinations, uses bounce buffers with k-way buffering to overlap network I/O with H2D + * transfers. + * + * @param buf Destination buffer (host or device memory). + * @param size Number of bytes to read. + * @param file_offset Offset in the remote file to start reading from. + * @return Number of bytes actually read. + * @exception std::overflow_error if the server returns more data than expected (may indicate the + * server doesn't support range requests). + * @exception std::runtime_error on libcurl errors. + */ std::size_t pread(void* buf, std::size_t size, std::size_t file_offset = 0); }; } // namespace kvikio::detail diff --git a/cpp/include/kvikio/remote_backend_type.hpp b/cpp/include/kvikio/remote_backend_type.hpp index 0d50174455..0ddf3fac33 100644 --- a/cpp/include/kvikio/remote_backend_type.hpp +++ b/cpp/include/kvikio/remote_backend_type.hpp @@ -8,9 +8,21 @@ namespace kvikio { +/** + * @brief Enum representing the backend implementation for remote file I/O operations. + * + * KvikIO supports multiple libcurl-based backends for fetching data from remote endpoints (S3, + * HTTP, etc.). Each backend has different performance characteristics. + */ enum class RemoteBackendType : uint8_t { - LIBCURL_EASY, - LIBCURL_MULTI_POLL, + LIBCURL_EASY, ///< Use libcurl's easy interface with a thread pool for parallelism. Each chunk is + ///< fetched by a separate thread using blocking curl_easy_perform() calls. This is + ///< the default backend. + LIBCURL_MULTI_POLL, ///< Use libcurl's multi interface with poll-based concurrent transfers. A + ///< single call manages multiple concurrent connections using + ///< curl_multi_poll(), with k-way buffering to overlap network I/O with + ///< host-to-device transfers. This can reduce thread overhead for + ///< high-connection-count scenarios. }; } // namespace kvikio diff --git a/cpp/include/kvikio/remote_handle.hpp b/cpp/include/kvikio/remote_handle.hpp index fe9b4e80f1..677fe37e44 100644 --- a/cpp/include/kvikio/remote_handle.hpp +++ b/cpp/include/kvikio/remote_handle.hpp @@ -15,6 +15,7 @@ #include #include #include +#include "kvikio/remote_backend_type.hpp" struct curl_slist; @@ -304,6 +305,7 @@ class RemoteHandle { std::unique_ptr _endpoint; std::size_t _nbytes; std::unique_ptr _poll_handle; + RemoteBackendType _remote_backend_type; public: /** diff --git a/cpp/src/detail/remote_handle_poll_based.cpp b/cpp/src/detail/remote_handle_poll_based.cpp index 82b3bdf104..509635e4d9 100644 --- a/cpp/src/detail/remote_handle_poll_based.cpp +++ b/cpp/src/detail/remote_handle_poll_based.cpp @@ -16,6 +16,19 @@ namespace kvikio::detail { namespace { +/** + * @brief Callback function for libcurl's CURLOPT_WRITEFUNCTION. + * + * Called by libcurl when data is received from the remote server. Copies the received data either + * directly to host memory or to a bounce buffer (for device memory destinations). + * + * @param buffer Pointer to the received data. + * @param size Size of each data element (always 1). + * @param nmemb Number of data elements received. + * @param userdata Pointer to the TransferContext for this transfer. + * @return Number of bytes processed, or CURL_WRITEFUNC_ERROR if the received data would overflow + * the expected chunk size. + */ std::size_t write_callback(char* buffer, std::size_t size, std::size_t nmemb, void* userdata) { auto* ctx = reinterpret_cast(userdata); @@ -32,6 +45,23 @@ std::size_t write_callback(char* buffer, std::size_t size, std::size_t nmemb, vo return nbytes; } +/** + * @brief Reconfigure a libcurl easy handle for a new chunk transfer. + * + * Resets the transfer context and configures the easy handle to fetch the next chunk using an HTTP + * range request. For device memory destinations, lazily initializes the bounce buffer manager on + * first use. + * + * @param curl_easy_handle The libcurl easy handle to reconfigure. + * @param ctx Transfer context to reset and associate with this chunk. + * @param buf Base destination buffer pointer (host or device memory). + * @param is_host_mem True if buf points to host memory, false for device memory. + * @param current_chunk_idx Zero-based index of the chunk to fetch. + * @param chunk_size Size of each chunk (from defaults::task_size()). + * @param size Total size of the read operation. + * @param file_offset Starting offset in the remote file for the overall read. + * @exception std::runtime_error if setting the CURLOPT_RANGE option fails. + */ void reconfig_easy_handle(CURL* curl_easy_handle, TransferContext* ctx, void* buf, diff --git a/cpp/src/remote_handle.cpp b/cpp/src/remote_handle.cpp index de753c09d9..19882800d6 100644 --- a/cpp/src/remote_handle.cpp +++ b/cpp/src/remote_handle.cpp @@ -660,21 +660,24 @@ RemoteHandle RemoteHandle::open(std::string url, } RemoteHandle::RemoteHandle(std::unique_ptr endpoint, std::size_t nbytes) - : _endpoint{std::move(endpoint)}, _nbytes{nbytes} + : _endpoint{std::move(endpoint)}, + _nbytes{nbytes}, + _remote_backend_type{defaults::remote_backend()} { KVIKIO_NVTX_FUNC_RANGE(); - if (defaults::remote_backend() == RemoteBackendType::LIBCURL_MULTI_POLL) { + if (_remote_backend_type == RemoteBackendType::LIBCURL_MULTI_POLL) { _poll_handle = std::make_unique( _endpoint.get(), defaults::remote_max_connections()); } } RemoteHandle::RemoteHandle(std::unique_ptr endpoint) + : _remote_backend_type{defaults::remote_backend()} { KVIKIO_NVTX_FUNC_RANGE(); _nbytes = endpoint->get_file_size(); _endpoint = std::move(endpoint); - if (defaults::remote_backend() == RemoteBackendType::LIBCURL_MULTI_POLL) { + if (_remote_backend_type == RemoteBackendType::LIBCURL_MULTI_POLL) { _poll_handle = std::make_unique( _endpoint.get(), defaults::remote_max_connections()); } @@ -821,9 +824,6 @@ std::future RemoteHandle::pread(void* buf, KVIKIO_NVTX_FUNC_RANGE(size); if (defaults::remote_backend() == RemoteBackendType::LIBCURL_MULTI_POLL) { - KVIKIO_EXPECT(_poll_handle != nullptr, - "Remote backend changed to LIBCURL_MULTI_POLL after RemoteHandle construction. " - "The backend setting at construction time and pread call must match."); return thread_pool->submit_task([=, this] { KVIKIO_NVTX_SCOPED_RANGE("task_remote_multi_poll", size, nvtx_color); return _poll_handle->pread(buf, size, file_offset); From e799a240f3382c8c76aaabf510faf43024f8d0c9 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Fri, 16 Jan 2026 00:43:44 -0500 Subject: [PATCH 26/34] Update Python doc --- docs/source/runtime_settings.rst | 44 ++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/docs/source/runtime_settings.rst b/docs/source/runtime_settings.rst index 5bbed7f063..6e4a0533a7 100644 --- a/docs/source/runtime_settings.rst +++ b/docs/source/runtime_settings.rst @@ -121,3 +121,47 @@ Example: # Enable Direct I/O for reads, and disable it for writes kvikio.defaults.set({"auto_direct_io_read": True, "auto_direct_io_write": False}) + +Remote Backend ``KVIKIO_REMOTE_BACKEND`` +---------------------------------------- + +KvikIO supports multiple libcurl-based backends for fetching data from remote endpoints (S3, HTTP, etc.). Set the environment variable ``KVIKIO_REMOTE_BACKEND`` to one of the following options (case-insensitive): + + * ``LIBCURL_EASY``: Use libcurl's easy interface with a thread pool for parallelism. Each chunk is fetched by a separate thread using blocking ``curl_easy_perform()`` calls. This is the default backend. + * ``LIBCURL_MULTI_POLL``: Use libcurl's multi interface with poll-based concurrent transfers. A single call manages multiple concurrent connections using ``curl_multi_poll()``, with k-way buffering to overlap network I/O with host-to-device transfers. This can reduce thread overhead for high-connection-count scenarios. + +If not set, the default value is ``LIBCURL_EASY``. + +.. note:: + + Changing this setting after creating a ``RemoteHandle`` has no effect on existing handles. The backend is determined at ``RemoteHandle`` construction time. + +This setting can be queried (:py:func:`kvikio.defaults.get`) and modified (:py:func:`kvikio.defaults.set`) at runtime using the property name ``remote_backend``. + +Remote Max Connections ``KVIKIO_REMOTE_MAX_CONNECTIONS`` +-------------------------------------------------------- + +When using the ``LIBCURL_MULTI_POLL`` backend, this setting controls the maximum number of concurrent HTTP connections used for parallel chunk downloads. Set the environment variable ``KVIKIO_REMOTE_MAX_CONNECTIONS`` to a positive integer. + +If not set, the default value is 8. + +.. note:: + + This setting only applies when using ``RemoteBackendType.LIBCURL_MULTI_POLL``. It has no effect on the ``LIBCURL_EASY`` backend, which uses the thread pool size (``KVIKIO_NTHREADS``) to control parallelism. + +This setting can be queried (:py:func:`kvikio.defaults.get`) and modified (:py:func:`kvikio.defaults.set`) at runtime using the property name ``remote_max_connections``. + +Number of Bounce Buffers ``KVIKIO_NUM_BOUNCE_BUFFERS`` +------------------------------------------------------ + +When using the ``LIBCURL_MULTI_POLL`` backend with device memory destinations, KvikIO uses k-way buffering to overlap network I/O with host-to-device memory transfers. This setting controls the number of bounce buffers allocated per connection. + +Set the environment variable ``KVIKIO_NUM_BOUNCE_BUFFERS`` to a positive integer. Higher values allow more overlap between network I/O and H2D transfers but consume more pinned memory. The total pinned memory usage is ``remote_max_connections * num_bounce_buffers * bounce_buffer_size``. + +If not set, the default value is 2. + +.. note:: + + This setting only applies when using ``RemoteBackendType.LIBCURL_MULTI_POLL`` with device memory destinations. For host memory destinations or the ``LIBCURL_EASY`` backend, bounce buffers are not used. + +This setting can be queried (:py:func:`kvikio.defaults.get`) and modified (:py:func:`kvikio.defaults.set`) at runtime using the property name ``num_bounce_buffers``. From d2a376e8a6270a93e8c7a0bce7d17be71a9e647c Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Fri, 16 Jan 2026 00:57:50 -0500 Subject: [PATCH 27/34] Update the existing Python S3 unit test to include the new backend --- python/kvikio/tests/test_s3_io.py | 39 ++++++++++++++++++++++++------- 1 file changed, 31 insertions(+), 8 deletions(-) diff --git a/python/kvikio/tests/test_s3_io.py b/python/kvikio/tests/test_s3_io.py index d8610c73bc..0cfd9f63ec 100644 --- a/python/kvikio/tests/test_s3_io.py +++ b/python/kvikio/tests/test_s3_io.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION. All rights reserved. +# SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION. All rights reserved. # SPDX-License-Identifier: Apache-2.0 import multiprocessing as mp @@ -112,7 +112,14 @@ def test_read_access(s3_base): @pytest.mark.parametrize("nthreads", [1, 3]) @pytest.mark.parametrize("tasksize", [99, 999]) @pytest.mark.parametrize("buffer_size", [101, 1001]) -def test_read(s3_base, xp, size, nthreads, tasksize, buffer_size): +@pytest.mark.parametrize( + "remote_backend", + [ + kvikio.RemoteBackendType.LIBCURL_EASY, + kvikio.RemoteBackendType.LIBCURL_MULTI_POLL, + ], +) +def test_read(s3_base, xp, size, nthreads, tasksize, buffer_size, remote_backend): bucket_name = "test_read" object_name = "Aa1" a = xp.arange(size) @@ -124,8 +131,16 @@ def test_read(s3_base, xp, size, nthreads, tasksize, buffer_size): "num_threads": nthreads, "task_size": tasksize, "bounce_buffer_size": buffer_size, + "remote_backend": remote_backend, } ): + if ( + remote_backend == kvikio.RemoteBackendType.LIBCURL_MULTI_POLL + and tasksize > buffer_size + ): + pytest.skip( + "When the remote backend is LIBCURL_MULTI_POLL, task size must not be greater than the buffer size" + ) with kvikio.RemoteFile.open_s3_url( f"{server_address}/{bucket_name}/{object_name}" ) as f: @@ -144,18 +159,26 @@ def test_read(s3_base, xp, size, nthreads, tasksize, buffer_size): (42, int(2**20)), ], ) -def test_read_with_file_offset(s3_base, xp, start, end): +@pytest.mark.parametrize( + "remote_backend", + [ + kvikio.RemoteBackendType.LIBCURL_EASY, + kvikio.RemoteBackendType.LIBCURL_MULTI_POLL, + ], +) +def test_read_with_file_offset(s3_base, xp, start, end, remote_backend): bucket_name = "test_read_with_file_offset" object_name = "Aa1" a = xp.arange(end, dtype=xp.int64) with s3_context( s3_base=s3_base, bucket=bucket_name, files={object_name: bytes(a)} ) as server_address: - url = f"{server_address}/{bucket_name}/{object_name}" - with kvikio.RemoteFile.open_s3_url(url) as f: - b = xp.zeros(shape=(end - start,), dtype=xp.int64) - assert f.read(b, file_offset=start * a.itemsize) == b.nbytes - xp.testing.assert_array_equal(a[start:end], b) + with kvikio.defaults.set({"remote_backend": remote_backend}): + url = f"{server_address}/{bucket_name}/{object_name}" + with kvikio.RemoteFile.open_s3_url(url) as f: + b = xp.zeros(shape=(end - start,), dtype=xp.int64) + assert f.read(b, file_offset=start * a.itemsize) == b.nbytes + xp.testing.assert_array_equal(a[start:end], b) @pytest.mark.parametrize("scheme", ["S3"]) From 1fd38e39a8b6b303eb26f84776733c043a5d9808 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Fri, 16 Jan 2026 01:02:21 -0500 Subject: [PATCH 28/34] Fix header inclusion trivial issues --- cpp/include/kvikio/remote_handle.hpp | 2 +- cpp/src/defaults.cpp | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cpp/include/kvikio/remote_handle.hpp b/cpp/include/kvikio/remote_handle.hpp index 677fe37e44..5c4a78029e 100644 --- a/cpp/include/kvikio/remote_handle.hpp +++ b/cpp/include/kvikio/remote_handle.hpp @@ -13,9 +13,9 @@ #include #include +#include #include #include -#include "kvikio/remote_backend_type.hpp" struct curl_slist; diff --git a/cpp/src/defaults.cpp b/cpp/src/defaults.cpp index ee16331f90..d8ff218957 100644 --- a/cpp/src/defaults.cpp +++ b/cpp/src/defaults.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include @@ -16,9 +17,8 @@ #include #include #include +#include #include -#include -#include "kvikio/remote_handle.hpp" namespace kvikio { template <> From fde0601987888b6f6dda4343b40438c8ba1bc732 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Fri, 16 Jan 2026 01:14:36 -0500 Subject: [PATCH 29/34] Update --- cpp/include/kvikio/remote_backend_type.hpp | 3 +-- docs/source/runtime_settings.rst | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/cpp/include/kvikio/remote_backend_type.hpp b/cpp/include/kvikio/remote_backend_type.hpp index 0ddf3fac33..95bba088e2 100644 --- a/cpp/include/kvikio/remote_backend_type.hpp +++ b/cpp/include/kvikio/remote_backend_type.hpp @@ -21,8 +21,7 @@ enum class RemoteBackendType : uint8_t { LIBCURL_MULTI_POLL, ///< Use libcurl's multi interface with poll-based concurrent transfers. A ///< single call manages multiple concurrent connections using ///< curl_multi_poll(), with k-way buffering to overlap network I/O with - ///< host-to-device transfers. This can reduce thread overhead for - ///< high-connection-count scenarios. + ///< host-to-device transfers. }; } // namespace kvikio diff --git a/docs/source/runtime_settings.rst b/docs/source/runtime_settings.rst index 6e4a0533a7..b196064bb3 100644 --- a/docs/source/runtime_settings.rst +++ b/docs/source/runtime_settings.rst @@ -128,7 +128,7 @@ Remote Backend ``KVIKIO_REMOTE_BACKEND`` KvikIO supports multiple libcurl-based backends for fetching data from remote endpoints (S3, HTTP, etc.). Set the environment variable ``KVIKIO_REMOTE_BACKEND`` to one of the following options (case-insensitive): * ``LIBCURL_EASY``: Use libcurl's easy interface with a thread pool for parallelism. Each chunk is fetched by a separate thread using blocking ``curl_easy_perform()`` calls. This is the default backend. - * ``LIBCURL_MULTI_POLL``: Use libcurl's multi interface with poll-based concurrent transfers. A single call manages multiple concurrent connections using ``curl_multi_poll()``, with k-way buffering to overlap network I/O with host-to-device transfers. This can reduce thread overhead for high-connection-count scenarios. + * ``LIBCURL_MULTI_POLL``: Use libcurl's multi interface with poll-based concurrent transfers. A single call manages multiple concurrent connections using ``curl_multi_poll()``, with k-way buffering to overlap network I/O with host-to-device transfers. If not set, the default value is ``LIBCURL_EASY``. From 9d75bf83c10fe97ceb0b1c4215591905ad59ebe7 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Fri, 16 Jan 2026 10:48:53 -0500 Subject: [PATCH 30/34] Use addressoff to handle special cases --- cpp/src/bounce_buffer.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cpp/src/bounce_buffer.cpp b/cpp/src/bounce_buffer.cpp index 85536bf294..3f315046e5 100644 --- a/cpp/src/bounce_buffer.cpp +++ b/cpp/src/bounce_buffer.cpp @@ -3,6 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ +#include #include #include @@ -91,7 +92,7 @@ template BounceBufferPool::Buffer& BounceBufferPool::Buffer::operator=( Buffer&& o) noexcept { - if (this != &o) { + if (this != std::addressof(o)) { if (_buffer) { // Return current buffer to the pool _pool->put(_buffer, _size); From 267ee0c00d72fbe2b30f62b4bfb0d5b490876bc8 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Fri, 16 Jan 2026 14:26:17 -0500 Subject: [PATCH 31/34] Enable endpoint polymorphism to support more endpoint types --- .../kvikio/detail/remote_handle_poll_based.hpp | 2 ++ cpp/src/detail/remote_handle_poll_based.cpp | 16 +++++++++------- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/cpp/include/kvikio/detail/remote_handle_poll_based.hpp b/cpp/include/kvikio/detail/remote_handle_poll_based.hpp index 204a78db8e..2ffc733417 100644 --- a/cpp/include/kvikio/detail/remote_handle_poll_based.hpp +++ b/cpp/include/kvikio/detail/remote_handle_poll_based.hpp @@ -9,6 +9,7 @@ #include #include #include +#include "kvikio/detail/url.hpp" /** * @brief Check a libcurl easy interface return code and throw on error. @@ -117,6 +118,7 @@ struct TransferContext { bool overflow_error{}; bool is_host_mem{}; char* buf{}; + CurlHandle* curl_easy_handle{}; std::size_t chunk_size{}; std::size_t bytes_transferred{}; std::optional _bounce_buffer_manager; diff --git a/cpp/src/detail/remote_handle_poll_based.cpp b/cpp/src/detail/remote_handle_poll_based.cpp index 509635e4d9..4bc0b55a40 100644 --- a/cpp/src/detail/remote_handle_poll_based.cpp +++ b/cpp/src/detail/remote_handle_poll_based.cpp @@ -53,6 +53,7 @@ std::size_t write_callback(char* buffer, std::size_t size, std::size_t nmemb, vo * first use. * * @param curl_easy_handle The libcurl easy handle to reconfigure. + * @param endpoint Non-owning pointer to the remote endpoint. Must outlive this object. * @param ctx Transfer context to reset and associate with this chunk. * @param buf Base destination buffer pointer (host or device memory). * @param is_host_mem True if buf points to host memory, false for device memory. @@ -62,7 +63,8 @@ std::size_t write_callback(char* buffer, std::size_t size, std::size_t nmemb, vo * @param file_offset Starting offset in the remote file for the overall read. * @exception std::runtime_error if setting the CURLOPT_RANGE option fails. */ -void reconfig_easy_handle(CURL* curl_easy_handle, +void reconfig_easy_handle(CurlHandle& curl_easy_handle, + RemoteEndpoint* endpoint, TransferContext* ctx, void* buf, bool is_host_mem, @@ -77,6 +79,7 @@ void reconfig_easy_handle(CURL* curl_easy_handle, ctx->overflow_error = false; ctx->is_host_mem = is_host_mem; ctx->buf = static_cast(buf) + local_offset; + ctx->curl_easy_handle = &curl_easy_handle; ctx->chunk_size = actual_chunk_size; ctx->bytes_transferred = 0; @@ -84,10 +87,7 @@ void reconfig_easy_handle(CURL* curl_easy_handle, ctx->_bounce_buffer_manager.emplace(defaults::num_bounce_buffers()); } - std::size_t const remote_start = file_offset + local_offset; - std::size_t const remote_end = remote_start + actual_chunk_size - 1; - std::string const byte_range = std::to_string(remote_start) + "-" + std::to_string(remote_end); - KVIKIO_CHECK_CURL_EASY(curl_easy_setopt(curl_easy_handle, CURLOPT_RANGE, byte_range.c_str())); + endpoint->setup_range_request(curl_easy_handle, file_offset + local_offset, actual_chunk_size); } } // namespace @@ -177,7 +177,8 @@ std::size_t RemoteHandlePollBased::pread(void* buf, std::size_t size, std::size_ std::size_t num_byte_transferred{0}; std::size_t current_chunk_idx{0}; for (std::size_t i = 0; i < actual_max_connections; ++i) { - reconfig_easy_handle(_curl_easy_handles[i]->handle(), + reconfig_easy_handle(*_curl_easy_handles[i], + _endpoint, &_transfer_ctxs[i], buf, is_host_mem, @@ -218,7 +219,8 @@ std::size_t RemoteHandlePollBased::pread(void* buf, std::size_t size, std::size_ KVIKIO_CHECK_CURL_MULTI(curl_multi_remove_handle(_multi, msg->easy_handle)); if (current_chunk_idx < num_chunks) { - reconfig_easy_handle(msg->easy_handle, + reconfig_easy_handle(*ctx->curl_easy_handle, + _endpoint, ctx, buf, is_host_mem, From 5a5abab3258ec481ddfeed56b4c79cefea9a1891 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Fri, 16 Jan 2026 17:08:02 -0500 Subject: [PATCH 32/34] Bug fixes --- cpp/src/detail/remote_handle_poll_based.cpp | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/cpp/src/detail/remote_handle_poll_based.cpp b/cpp/src/detail/remote_handle_poll_based.cpp index 4bc0b55a40..8d8664695d 100644 --- a/cpp/src/detail/remote_handle_poll_based.cpp +++ b/cpp/src/detail/remote_handle_poll_based.cpp @@ -182,16 +182,18 @@ std::size_t RemoteHandlePollBased::pread(void* buf, std::size_t size, std::size_ &_transfer_ctxs[i], buf, is_host_mem, - current_chunk_idx++, + current_chunk_idx, chunk_size, size, file_offset); KVIKIO_CHECK_CURL_MULTI(curl_multi_add_handle(_multi, _curl_easy_handles[i]->handle())); + ++current_chunk_idx; } // Start the run + std::size_t in_flight{actual_max_connections}; int still_running{0}; - do { + while (in_flight > 0) { KVIKIO_CHECK_CURL_MULTI(curl_multi_perform(_multi, &still_running)); CURLMsg* msg; @@ -216,6 +218,7 @@ std::size_t RemoteHandlePollBased::pread(void* buf, std::size_t size, std::size_ } num_byte_transferred += ctx->bytes_transferred; + --in_flight; KVIKIO_CHECK_CURL_MULTI(curl_multi_remove_handle(_multi, msg->easy_handle)); if (current_chunk_idx < num_chunks) { @@ -224,19 +227,21 @@ std::size_t RemoteHandlePollBased::pread(void* buf, std::size_t size, std::size_ ctx, buf, is_host_mem, - current_chunk_idx++, + current_chunk_idx, chunk_size, size, file_offset); KVIKIO_CHECK_CURL_MULTI(curl_multi_add_handle(_multi, msg->easy_handle)); + ++current_chunk_idx; + ++in_flight; } } - if (still_running > 0) { - KVIKIO_CHECK_CURL_MULTI(curl_multi_poll(_multi, nullptr, 0, 1000, nullptr)); + if (in_flight > 0) { + constexpr int timeout_ms{1000}; + KVIKIO_CHECK_CURL_MULTI(curl_multi_poll(_multi, nullptr, 0, timeout_ms, nullptr)); } - - } while (still_running > 0); + } // Ensure all H2D transfers complete before returning if (!is_host_mem) { CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream)); } From b6652a03413376c0899ad734c7426311cef96288 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Wed, 21 Jan 2026 09:31:22 -0500 Subject: [PATCH 33/34] Move utility functions from hpp to cpp --- cpp/include/kvikio/detail/remote_handle.hpp | 38 +++++++++++++ .../detail/remote_handle_poll_based.hpp | 53 ------------------- cpp/src/detail/remote_handle.cpp | 18 +++++++ 3 files changed, 56 insertions(+), 53 deletions(-) diff --git a/cpp/include/kvikio/detail/remote_handle.hpp b/cpp/include/kvikio/detail/remote_handle.hpp index 1fbd509c9d..fc4da9b575 100644 --- a/cpp/include/kvikio/detail/remote_handle.hpp +++ b/cpp/include/kvikio/detail/remote_handle.hpp @@ -9,6 +9,44 @@ #include namespace kvikio::detail { +/** + * @brief Check a libcurl easy interface return code and throw on error. + * + * @param err_code The CURLcode to check. + * @exception std::runtime_error if err_code is not CURLE_OK. + */ +#define KVIKIO_CHECK_CURL_EASY(err_code) \ + kvikio::detail::check_curl_easy(err_code, __FILE__, __LINE__) + +/** + * @brief Check a libcurl multi interface return code and throw on error. + * + * @param err_code The CURLMcode to check. + * @exception std::runtime_error if err_code is not CURLM_OK. + */ +#define KVIKIO_CHECK_CURL_MULTI(err_code) \ + kvikio::detail::check_curl_multi(err_code, __FILE__, __LINE__) + +/** + * @brief Check a libcurl easy interface return code and throw on error. + * + * @param err_code The CURLcode to check. + * @param filename Source filename for error reporting. + * @param line_number Source line number for error reporting. + * @exception std::runtime_error if err_code is not CURLE_OK. + */ +void check_curl_easy(CURLcode err_code, char const* filename, int line_number); + +/** + * @brief Check a libcurl multi interface return code and throw on error. + * + * @param err_code The CURLMcode to check. + * @param filename Source filename for error reporting. + * @param line_number Source line number for error reporting. + * @exception std::runtime_error if err_code is not CURLM_OK. + */ +void check_curl_multi(CURLMcode err_code, char const* filename, int line_number); + /** * @brief Callback for `CURLOPT_WRITEFUNCTION` that copies received data into a `std::string`. * diff --git a/cpp/include/kvikio/detail/remote_handle_poll_based.hpp b/cpp/include/kvikio/detail/remote_handle_poll_based.hpp index 2ffc733417..5f8f7b5f44 100644 --- a/cpp/include/kvikio/detail/remote_handle_poll_based.hpp +++ b/cpp/include/kvikio/detail/remote_handle_poll_based.hpp @@ -9,62 +9,9 @@ #include #include #include -#include "kvikio/detail/url.hpp" - -/** - * @brief Check a libcurl easy interface return code and throw on error. - * - * @param err_code The CURLcode to check. - * @exception std::runtime_error if err_code is not CURLE_OK. - */ -#define KVIKIO_CHECK_CURL_EASY(err_code) \ - kvikio::detail::check_curl_easy(err_code, __FILE__, __LINE__) - -/** - * @brief Check a libcurl multi interface return code and throw on error. - * - * @param err_code The CURLMcode to check. - * @exception std::runtime_error if err_code is not CURLM_OK. - */ -#define KVIKIO_CHECK_CURL_MULTI(err_code) \ - kvikio::detail::check_curl_multi(err_code, __FILE__, __LINE__) namespace kvikio::detail { -/** - * @brief Check a libcurl easy interface return code and throw on error. - * - * @param err_code The CURLcode to check. - * @param filename Source filename for error reporting. - * @param line_number Source line number for error reporting. - * @exception std::runtime_error if err_code is not CURLE_OK. - */ -inline void check_curl_easy(CURLcode err_code, char const* filename, int line_number) -{ - if (err_code == CURLcode::CURLE_OK) { return; } - std::stringstream ss; - ss << "libcurl error: " << curl_easy_strerror(err_code) << " at: " << filename << ":" - << line_number << "\n"; - throw std::runtime_error(ss.str()); -} - -/** - * @brief Check a libcurl multi interface return code and throw on error. - * - * @param err_code The CURLMcode to check. - * @param filename Source filename for error reporting. - * @param line_number Source line number for error reporting. - * @exception std::runtime_error if err_code is not CURLM_OK. - */ -inline void check_curl_multi(CURLMcode err_code, char const* filename, int line_number) -{ - if (err_code == CURLMcode::CURLM_OK) { return; } - std::stringstream ss; - ss << "libcurl error: " << curl_multi_strerror(err_code) << " at: " << filename << ":" - << line_number << "\n"; - throw std::runtime_error(ss.str()); -} - /** * @brief Manages a rotating set of bounce buffers for overlapping network I/O with H2D transfers. * diff --git a/cpp/src/detail/remote_handle.cpp b/cpp/src/detail/remote_handle.cpp index 85a3ced0f3..aba672458b 100644 --- a/cpp/src/detail/remote_handle.cpp +++ b/cpp/src/detail/remote_handle.cpp @@ -8,6 +8,24 @@ #include namespace kvikio::detail { +void check_curl_easy(CURLcode err_code, char const* filename, int line_number) +{ + if (err_code == CURLcode::CURLE_OK) { return; } + std::stringstream ss; + ss << "libcurl error: " << curl_easy_strerror(err_code) << " at: " << filename << ":" + << line_number << "\n"; + throw std::runtime_error(ss.str()); +} + +void check_curl_multi(CURLMcode err_code, char const* filename, int line_number) +{ + if (err_code == CURLMcode::CURLM_OK) { return; } + std::stringstream ss; + ss << "libcurl error: " << curl_multi_strerror(err_code) << " at: " << filename << ":" + << line_number << "\n"; + throw std::runtime_error(ss.str()); +} + std::size_t callback_get_string_response(char* data, std::size_t size, std::size_t num_bytes, From 993706399efcf3c2bdbda82d288c690c705e909c Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Wed, 28 Jan 2026 10:13:00 -0500 Subject: [PATCH 34/34] Make dtor noexcept --- cpp/include/kvikio/detail/remote_handle_poll_based.hpp | 2 +- cpp/src/detail/remote_handle_poll_based.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/include/kvikio/detail/remote_handle_poll_based.hpp b/cpp/include/kvikio/detail/remote_handle_poll_based.hpp index 5f8f7b5f44..e26677dbaa 100644 --- a/cpp/include/kvikio/detail/remote_handle_poll_based.hpp +++ b/cpp/include/kvikio/detail/remote_handle_poll_based.hpp @@ -111,7 +111,7 @@ class RemoteHandlePollBased { * Removes all easy handles from the multi handle and performs cleanup. Errors during cleanup are * logged but do not throw. */ - ~RemoteHandlePollBased(); + ~RemoteHandlePollBased() noexcept; /** * @brief Read data from the remote file into a buffer. diff --git a/cpp/src/detail/remote_handle_poll_based.cpp b/cpp/src/detail/remote_handle_poll_based.cpp index 8d8664695d..4dce3f5301 100644 --- a/cpp/src/detail/remote_handle_poll_based.cpp +++ b/cpp/src/detail/remote_handle_poll_based.cpp @@ -141,7 +141,7 @@ RemoteHandlePollBased::RemoteHandlePollBased(RemoteEndpoint* endpoint, std::size } } -RemoteHandlePollBased::~RemoteHandlePollBased() +RemoteHandlePollBased::~RemoteHandlePollBased() noexcept { try { // Remove any lingering handles before cleanup