diff --git a/builtin-functions/kphp-light/stdlib/rpc.txt b/builtin-functions/kphp-light/stdlib/rpc.txt index 90211d0d7f..af710c332a 100644 --- a/builtin-functions/kphp-light/stdlib/rpc.txt +++ b/builtin-functions/kphp-light/stdlib/rpc.txt @@ -107,7 +107,7 @@ function set_fail_rpc_on_int32_overflow ($fail_rpc ::: bool): bool; */ function extract_kphp_rpc_response_extra_info ($resumable_id ::: int) ::: ?tuple(int, float); -function rpc_queue_create ($request_ids ::: array = TODO) ::: int; +function rpc_queue_create ($request_ids ::: mixed = TODO) ::: int; function rpc_queue_push ($queue_id ::: int, $request_id ::: int) ::: void; diff --git a/runtime-light/coroutine/await-set.h b/runtime-light/coroutine/await-set.h index 0d4274483c..3d3bdd76f7 100644 --- a/runtime-light/coroutine/await-set.h +++ b/runtime-light/coroutine/await-set.h @@ -8,8 +8,6 @@ #include #include -#include "runtime-common/core/allocator/script-allocator.h" -#include "runtime-common/core/std/containers.h" #include "runtime-light/coroutine/async-stack.h" #include "runtime-light/coroutine/concepts.h" #include "runtime-light/coroutine/coroutine-state.h" diff --git a/runtime-light/coroutine/shared-task.h b/runtime-light/coroutine/shared-task.h index 5eb4ec59e5..76957148d9 100644 --- a/runtime-light/coroutine/shared-task.h +++ b/runtime-light/coroutine/shared-task.h @@ -21,10 +21,10 @@ namespace kphp::coro { namespace shared_task_impl { -struct shared_task_waiter final { +struct shared_task_awaiter final { std::coroutine_handle<> m_continuation; - shared_task_waiter* m_next{}; - shared_task_waiter* m_prev{}; + shared_task_awaiter* m_next{}; + shared_task_awaiter* m_prev{}; }; template @@ -42,21 +42,21 @@ struct promise_base : kphp::coro::async_stack_element { auto await_suspend(std::coroutine_handle coro) const noexcept -> std::coroutine_handle<> { promise_base& promise{coro.promise()}; // mark promise as ready - auto* waiter{static_cast(std::exchange(promise.m_waiters, std::addressof(promise)))}; - if (waiter == STARTED_NO_WAITERS_VAL) { // no waiters, so just finish this coroutine + auto* awaiter{static_cast(std::exchange(promise.m_awaiters, std::addressof(promise)))}; + if (awaiter == STARTED_NO_WAITERS_VAL) { // no awaiters, so just finish this coroutine return std::noop_coroutine(); } - while (waiter->m_next != nullptr) { + while (awaiter->m_next != nullptr) { // read the m_next pointer before resuming the coroutine // since resuming the coroutine may destroy the shared_task_waiter value - auto* next{waiter->m_next}; + auto* next{awaiter->m_next}; auto& async_stack_root{*promise.get_async_stack_frame().async_stack_root}; - kphp::coro::resume(waiter->m_continuation, async_stack_root); - waiter = next; + kphp::coro::resume(awaiter->m_continuation, async_stack_root); + awaiter = next; } - // return last waiter's coroutine_handle to allow it to potentially be compiled as a tail-call - return waiter->m_continuation; + // return last awaiter's coroutine_handle to allow it to potentially be compiled as a tail-call + return awaiter->m_continuation; } constexpr auto await_resume() const noexcept -> void {} @@ -69,35 +69,35 @@ struct promise_base : kphp::coro::async_stack_element { } auto done() const noexcept -> bool { - return m_waiters == this; + return m_awaiters == this; } auto add_ref() noexcept -> void { ++m_refcnt; } - // try to enqueue a waiter to the list of waiters. + // try to enqueue a awaiter to the list of awaiters. // - // return true if the waiter was successfully queued, in which case - // waiter->coroutine will be resumed when the task completes. + // return true if the awaiter was successfully queued, in which case + // awaiter->coroutine will be resumed when the task completes. // false if the coroutine was already completed and the awaiting // coroutine can continue without suspending. - auto suspend_awaiter(shared_task_impl::shared_task_waiter& waiter) noexcept -> bool { - const void* const NOT_STARTED_VAL{std::addressof(this->m_waiters)}; + auto suspend_awaiter(shared_task_impl::shared_task_awaiter& awaiter) noexcept -> bool { + const void* const NOT_STARTED_VAL{std::addressof(this->m_awaiters)}; - // NOTE: If the coroutine is not yet started then the first waiter + // NOTE: If the coroutine is not yet started then the first awaiter // will start the coroutine before enqueuing itself up to the list - // of suspended waiters waiting for completion. We split this into + // of suspended awaiters waiting for completion. We split this into // two steps to allow the first awaiter to return without suspending. - // This avoids recursively resuming the first waiter inside the call to + // This avoids recursively resuming the first awaiter inside the call to // coroutine.resume() in the case that the coroutine completes // synchronously, which could otherwise lead to stack-overflow if // the awaiting coroutine awaited many synchronously-completing // tasks in a row. // start the coroutine if not yet started - if (m_waiters == NOT_STARTED_VAL) { - m_waiters = STARTED_NO_WAITERS_VAL; + if (m_awaiters == NOT_STARTED_VAL) { + m_awaiters = STARTED_NO_WAITERS_VAL; const auto& handle{std::coroutine_handle::from_promise(*static_cast(this))}; auto& async_stack_root{*get_async_stack_frame().async_stack_root}; kphp::coro::resume(handle, async_stack_root); @@ -107,13 +107,13 @@ struct promise_base : kphp::coro::async_stack_element { return false; } - waiter.m_prev = nullptr; - waiter.m_next = static_cast(m_waiters); + awaiter.m_prev = nullptr; + awaiter.m_next = static_cast(m_awaiters); // at this point 'm_waiters' can only be 'STARTED_NO_WAITERS_VAL' or 'other' - if (m_waiters != STARTED_NO_WAITERS_VAL) { - static_cast(m_waiters)->m_prev = std::addressof(waiter); + if (m_awaiters != STARTED_NO_WAITERS_VAL) { + static_cast(m_awaiters)->m_prev = std::addressof(awaiter); } - m_waiters = static_cast(std::addressof(waiter)); + m_awaiters = static_cast(std::addressof(awaiter)); return true; } @@ -124,20 +124,20 @@ struct promise_base : kphp::coro::async_stack_element { return m_refcnt-- != 1; } - auto cancel_awaiter(const shared_task_impl::shared_task_waiter& waiter) noexcept -> void { - const void* const NOT_STARTED_VAL{std::addressof(this->m_waiters)}; - if (m_waiters == NOT_STARTED_VAL || m_waiters == STARTED_NO_WAITERS_VAL) [[unlikely]] { + auto cancel_awaiter(const shared_task_impl::shared_task_awaiter& awaiter) noexcept -> void { + const void* const NOT_STARTED_VAL{std::addressof(this->m_awaiters)}; + if (m_awaiters == NOT_STARTED_VAL || m_awaiters == STARTED_NO_WAITERS_VAL) [[unlikely]] { return; } - const auto* waiter_ptr{std::addressof(waiter)}; - if (m_waiters == waiter_ptr) { // waiter is the head of the list - m_waiters = waiter_ptr->m_next; - } else if (waiter_ptr->m_next == nullptr) { // waiter is the last in the list - waiter_ptr->m_prev->m_next = nullptr; - } else { // waiter is somewhere in the middle of the list - waiter_ptr->m_next->m_prev = waiter_ptr->m_prev; - waiter_ptr->m_prev->m_next = waiter_ptr->m_next; + const auto* awaiter_ptr{std::addressof(awaiter)}; + if (m_awaiters == awaiter_ptr) { // awaiter is the head of the list + m_awaiters = awaiter_ptr->m_next; + } else if (awaiter_ptr->m_next == nullptr) { // awaiter is the last in the list + awaiter_ptr->m_prev->m_next = nullptr; + } else { // awaiter is somewhere in the middle of the list + awaiter_ptr->m_next->m_prev = awaiter_ptr->m_prev; + awaiter_ptr->m_prev->m_next = awaiter_ptr->m_next; } } @@ -155,13 +155,13 @@ struct promise_base : kphp::coro::async_stack_element { uint32_t m_refcnt{1}; // Value is either - // - nullptr - indicates started, no waiters - // - &this->w_waiters - indicates the coroutine is not yet started - // - this - indicates value is ready - // - other - pointer to head item in linked-list of waiters. - // values are of type 'shared_task_impl_::shared_task_waiter_t'. - // indicates that the coroutine has been started. - void* m_waiters{std::addressof(m_waiters)}; + // - nullptr - indicates started, no awaiters + // - &this->m_awaiters - indicates the coroutine is not yet started + // - this - indicates value is ready + // - other - pointer to head item in linked-list of awaiters. + // values are of type 'shared_task_impl_::shared_task_waiter_t'. + // indicates that the coroutine has been started. + void* m_awaiters{std::addressof(m_awaiters)}; }; template @@ -190,7 +190,7 @@ class awaiter_base { protected: std::coroutine_handle m_coro; - shared_task_impl::shared_task_waiter m_waiter{}; + shared_task_impl::shared_task_awaiter m_awaiter{}; public: explicit awaiter_base(std::coroutine_handle coro) noexcept @@ -199,7 +199,7 @@ class awaiter_base { awaiter_base(awaiter_base&& other) noexcept : m_suspended(std::exchange(other.m_suspended, false)), m_coro(std::exchange(other.m_coro, {})), - m_waiter(std::exchange(other.m_waiter, {})) {} + m_awaiter(std::exchange(other.m_awaiter, {})) {} awaiter_base(const awaiter_base& other) = delete; awaiter_base& operator=(const awaiter_base& other) = delete; @@ -207,7 +207,7 @@ class awaiter_base { ~awaiter_base() { if (m_suspended) { - m_coro.promise().cancel_awaiter(m_waiter); + m_coro.promise().cancel_awaiter(m_awaiter); } } @@ -218,8 +218,8 @@ class awaiter_base { template caller_promise_type> [[clang::noinline]] auto await_suspend(std::coroutine_handle awaiting_coroutine) noexcept -> bool { set_async_top_frame(awaiting_coroutine.promise().get_async_stack_frame(), STACK_RETURN_ADDRESS); - m_waiter.m_continuation = awaiting_coroutine; - m_suspended = m_coro.promise().suspend_awaiter(m_waiter); + m_awaiter.m_continuation = awaiting_coroutine; + m_suspended = m_coro.promise().suspend_awaiter(m_awaiter); reset_async_top_frame(awaiting_coroutine.promise().get_async_stack_frame()); return m_suspended; } diff --git a/runtime-light/coroutine/task.h b/runtime-light/coroutine/task.h index 97b8b22635..7ba3fb2041 100644 --- a/runtime-light/coroutine/task.h +++ b/runtime-light/coroutine/task.h @@ -154,24 +154,24 @@ struct task { task() noexcept = default; - explicit task(std::coroutine_handle<> coro) noexcept - : m_haddress(coro.address()) {} + explicit task(std::coroutine_handle coro) noexcept + : m_coro(coro) {} task(const task& other) noexcept = delete; task(task&& other) noexcept - : m_haddress(std::exchange(other.m_haddress, nullptr)) {} + : m_coro(std::exchange(other.m_coro, {})) {} task& operator=(const task& other) noexcept = delete; task& operator=(task&& other) noexcept { - std::swap(m_haddress, other.m_haddress); + std::swap(m_coro, other.m_coro); return *this; } ~task() { - if (m_haddress) { - std::coroutine_handle::from_address(m_haddress).destroy(); + if (m_coro) { + m_coro.destroy(); } } @@ -217,28 +217,28 @@ struct task { return awaiter_base::m_coro.promise().result(); } }; - return awaiter{std::coroutine_handle::from_address(m_haddress)}; + return awaiter{m_coro}; } auto get_handle() noexcept -> std::coroutine_handle { - return std::coroutine_handle::from_address(m_haddress); + return m_coro; } // conversion functions // // erase type explicit operator task<>() && noexcept { - return task<>{std::coroutine_handle<>::from_address(std::exchange(m_haddress, nullptr))}; + return task<>{std::exchange(m_coro, {})}; } // restore erased type template requires(std::same_as) explicit operator task() && noexcept { - return task{std::coroutine_handle<>::from_address(std::exchange(m_haddress, nullptr))}; + return task{std::exchange(m_coro, {})}; } private: - void* m_haddress{}; + std::coroutine_handle m_coro{}; }; } // namespace kphp::coro diff --git a/runtime-light/state/instance-state.h b/runtime-light/state/instance-state.h index 7060a499af..68473d69e7 100644 --- a/runtime-light/state/instance-state.h +++ b/runtime-light/state/instance-state.h @@ -34,6 +34,7 @@ #include "runtime-light/stdlib/math/random-state.h" #include "runtime-light/stdlib/output/output-state.h" #include "runtime-light/stdlib/rpc/rpc-client-state.h" +#include "runtime-light/stdlib/rpc/rpc-queue-state.h" #include "runtime-light/stdlib/serialization/serialization-state.h" #include "runtime-light/stdlib/string/regex-state.h" #include "runtime-light/stdlib/string/string-state.h" @@ -94,6 +95,7 @@ struct InstanceState final : vk::not_copyable { CoroutineInstanceState coroutine_instance_state; ForkInstanceState fork_instance_state; WaitQueueInstanceState wait_queue_instance_state; + RpcQueueInstanceState rpc_queue_instance_state; PhpScriptMutableGlobals php_script_mutable_globals_singleton; RuntimeContext runtime_context; @@ -133,5 +135,5 @@ struct InstanceState final : vk::not_copyable { enum image_kind image_kind_ { image_kind::invalid }; enum instance_kind instance_kind_ { instance_kind::invalid }; - static constexpr auto INIT_INSTANCE_ALLOCATOR_SIZE = static_cast(16U * 1024U * 1024U); + static constexpr auto INIT_INSTANCE_ALLOCATOR_SIZE = static_cast(256U * 1024U * 1024U); }; diff --git a/runtime-light/stdlib/fork/fork-state.h b/runtime-light/stdlib/fork/fork-state.h index 62b219c617..30a737eaa5 100644 --- a/runtime-light/stdlib/fork/fork-state.h +++ b/runtime-light/stdlib/fork/fork-state.h @@ -49,13 +49,13 @@ struct ForkInstanceState final : private vk::not_copyable { template std::pair> create_fork(kphp::coro::task task) noexcept { + static constexpr auto fork_coroutine{[](kphp::coro::task task, int64_t fork_id) noexcept -> kphp::coro::shared_task { + ForkInstanceState::get().current_id = fork_id; + co_return co_await std::move(task); + }}; + const int64_t fork_id{next_fork_id++}; - auto fork_task{std::invoke( - [](kphp::coro::task task, int64_t fork_id) noexcept -> kphp::coro::shared_task { - ForkInstanceState::get().current_id = fork_id; - co_return co_await std::move(task); - }, - std::move(task), fork_id)}; + auto fork_task{std::invoke(fork_coroutine, std::move(task), fork_id)}; forks.emplace(fork_id, fork_info{.awaited = {}, .thrown_exception = {}, .opt_handle = static_cast>(fork_task)}); return std::make_pair(fork_id, std::move(fork_task)); } diff --git a/runtime-light/stdlib/rpc/rpc-api.cpp b/runtime-light/stdlib/rpc/rpc-api.cpp index 1c2aa3d60f..2e3720f107 100644 --- a/runtime-light/stdlib/rpc/rpc-api.cpp +++ b/runtime-light/stdlib/rpc/rpc-api.cpp @@ -9,21 +9,19 @@ #include #include #include -#include -#include #include #include +#include #include +#include #include #include "common/containers/final_action.h" #include "common/rpc-error-codes.h" -#include "runtime-common/core/allocator/script-allocator.h" -#include "runtime-common/core/allocator/script-malloc-interface.h" #include "runtime-common/core/runtime-core.h" -#include "runtime-common/core/std/containers.h" #include "runtime-light/allocator/allocator.h" #include "runtime-light/coroutine/io-scheduler.h" +#include "runtime-light/coroutine/shared-task.h" #include "runtime-light/coroutine/task.h" #include "runtime-light/k2-platform/k2-api.h" #include "runtime-light/server/rpc/rpc-server-state.h" @@ -31,7 +29,6 @@ #include "runtime-light/stdlib/diagnostics/exception-functions.h" #include "runtime-light/stdlib/diagnostics/logs.h" #include "runtime-light/stdlib/fork/fork-functions.h" -#include "runtime-light/stdlib/fork/fork-state.h" #include "runtime-light/stdlib/rpc/rpc-client-state.h" #include "runtime-light/stdlib/rpc/rpc-constants.h" #include "runtime-light/stdlib/rpc/rpc-extra-headers.h" @@ -180,21 +177,22 @@ kphp::coro::task> rpc_tl_query_result_one_impl(int64_t query_id) no auto& rpc_client_instance_st{RpcClientInstanceState::get()}; class_instance rpc_query{}; - int64_t response_waiter_fork_id{kphp::forks::INVALID_ID}; + std::optional>> opt_awaiter_task{}; { - const auto it_query{rpc_client_instance_st.response_fetcher_instances.find(query_id)}; - const auto it_fork_id{rpc_client_instance_st.response_waiter_forks.find(query_id)}; - const vk::final_action finalizer{[&rpc_client_instance_st, it_query, it_fork_id] noexcept { - rpc_client_instance_st.response_fetcher_instances.erase(it_query); - rpc_client_instance_st.response_waiter_forks.erase(it_fork_id); + const auto it_response_fetcher{rpc_client_instance_st.response_fetcher_instances.find(query_id)}; + const auto it_fork_task{rpc_client_instance_st.response_awaiter_tasks.find(query_id)}; + const vk::final_action finalizer{[&rpc_client_instance_st, it_response_fetcher, it_fork_task] noexcept { + rpc_client_instance_st.response_fetcher_instances.erase(it_response_fetcher); + rpc_client_instance_st.response_awaiter_tasks.erase(it_fork_task); }}; - if (it_query == rpc_client_instance_st.response_fetcher_instances.end() || it_fork_id == rpc_client_instance_st.response_waiter_forks.end()) [[unlikely]] { + if (it_response_fetcher == rpc_client_instance_st.response_fetcher_instances.end() || it_fork_task == rpc_client_instance_st.response_awaiter_tasks.end()) + [[unlikely]] { co_return TlRpcError::make_error(TL_ERROR_INTERNAL, string{"unexpectedly could not find query in pending queries"}); } - rpc_query = std::move(it_query->second); - response_waiter_fork_id = it_fork_id->second; + rpc_query = std::move(it_response_fetcher->second); + opt_awaiter_task.emplace(std::move(it_fork_task->second)); } if (rpc_query.is_null()) [[unlikely]] { @@ -207,17 +205,16 @@ kphp::coro::task> rpc_tl_query_result_one_impl(int64_t query_id) no co_return TlRpcError::make_error(TL_ERROR_INTERNAL, string{"can't get untyped result from typed TL query. Use consistent API for that"}); } - auto opt_response{co_await kphp::forks::wait>(response_waiter_fork_id, MAX_TIMEOUT_NS)}; + kphp::log::assertion(opt_awaiter_task.has_value()); + auto opt_response{co_await kphp::forks::id_managed(*std::exchange(opt_awaiter_task, std::nullopt))}; if (!opt_response) [[unlikely]] { - co_return TlRpcError::make_error(TL_ERROR_INTERNAL, string{"can't find waiter fork"}); - } - auto response{*std::move(opt_response)}; - if (response.empty()) [[unlikely]] { co_return TlRpcError::make_error(TL_ERROR_QUERY_TIMEOUT, string{"rpc response timeout"}); } + auto response{*std::move(opt_response)}; // don't check response's emptyness; will throw if it's empty, indicating a fetch error + f$rpc_clean(); - RpcServerInstanceState::get().tl_fetcher = tl::fetcher{response}; + RpcServerInstanceState::get().tl_fetcher = tl::fetcher{{reinterpret_cast(response.c_str()), response.size()}}; auto res{fetch_function_untyped(rpc_query)}; // THROWING // handle exceptions that could arise during fetch_function_untyped if (auto err{TlRpcError::transform_exception_into_error_if_possible()}; !err.empty()) [[unlikely]] { @@ -233,21 +230,22 @@ kphp::coro::task> typed_rpc_tl_query_result_ auto& rpc_client_instance_st{RpcClientInstanceState::get()}; class_instance rpc_query{}; - int64_t response_waiter_fork_id{kphp::forks::INVALID_ID}; + std::optional>> opt_awaiter_task{}; { - const auto it_query{rpc_client_instance_st.response_fetcher_instances.find(query_id)}; - const auto it_fork_id{rpc_client_instance_st.response_waiter_forks.find(query_id)}; - const vk::final_action finalizer{[&rpc_client_instance_st, it_query, it_fork_id] noexcept { - rpc_client_instance_st.response_fetcher_instances.erase(it_query); - rpc_client_instance_st.response_waiter_forks.erase(it_fork_id); + const auto it_response_fetcher{rpc_client_instance_st.response_fetcher_instances.find(query_id)}; + const auto it_fork_task{rpc_client_instance_st.response_awaiter_tasks.find(query_id)}; + const vk::final_action finalizer{[&rpc_client_instance_st, it_response_fetcher, it_fork_task] noexcept { + rpc_client_instance_st.response_fetcher_instances.erase(it_response_fetcher); + rpc_client_instance_st.response_awaiter_tasks.erase(it_fork_task); }}; - if (it_query == rpc_client_instance_st.response_fetcher_instances.end() || it_fork_id == rpc_client_instance_st.response_waiter_forks.end()) [[unlikely]] { + if (it_response_fetcher == rpc_client_instance_st.response_fetcher_instances.end() || it_fork_task == rpc_client_instance_st.response_awaiter_tasks.end()) + [[unlikely]] { co_return error_factory.make_error(TL_ERROR_INTERNAL, string{"unexpectedly could not find query in pending queries"}); } - rpc_query = std::move(it_query->second); - response_waiter_fork_id = it_fork_id->second; + rpc_query = std::move(it_response_fetcher->second); + opt_awaiter_task.emplace(std::move(it_fork_task->second)); } if (rpc_query.is_null()) [[unlikely]] { @@ -260,17 +258,16 @@ kphp::coro::task> typed_rpc_tl_query_result_ co_return error_factory.make_error(TL_ERROR_INTERNAL, string{"can't get typed result from untyped TL query. Use consistent API for that"}); } - auto opt_response{co_await kphp::forks::wait>(response_waiter_fork_id, MAX_TIMEOUT_NS)}; + kphp::log::assertion(opt_awaiter_task.has_value()); + auto opt_response{co_await kphp::forks::id_managed(*std::exchange(opt_awaiter_task, std::nullopt))}; if (!opt_response) [[unlikely]] { - co_return error_factory.make_error(TL_ERROR_INTERNAL, string{"can't find waiter fork"}); - } - auto response{*std::move(opt_response)}; - if (response.empty()) [[unlikely]] { co_return error_factory.make_error(TL_ERROR_QUERY_TIMEOUT, string{"rpc response timeout"}); } + auto response{*std::move(opt_response)}; // don't check response's emptyness; will throw if it's empty, indicating a fetch error + f$rpc_clean(); - RpcServerInstanceState::get().tl_fetcher = tl::fetcher{response}; + RpcServerInstanceState::get().tl_fetcher = tl::fetcher{{reinterpret_cast(response.c_str()), response.size()}}; auto res{fetch_function_typed(rpc_query, error_factory)}; // THROWING // handle exceptions that could arise during fetch_function_typed if (auto err{error_factory.transform_exception_into_error_if_possible()}; !err.is_null()) [[unlikely]] { @@ -285,51 +282,55 @@ kphp::coro::task send_request(std::string_view actor, std bool collect_responses_extra_info) noexcept { auto& rpc_client_instance_st{RpcClientInstanceState::get()}; auto& rpc_server_instance_st{RpcServerInstanceState::get()}; + const size_t request_size{rpc_server_instance_st.tl_storer.view().size_bytes()}; + const auto timestamp{std::chrono::duration{std::chrono::system_clock::now().time_since_epoch()}.count()}; - // prepare RPC request - size_t request_size{rpc_server_instance_st.tl_storer.view().size()}; - std::unique_ptr request{nullptr, kphp::memory::script::free}; - // 'request_buf' will look like this: - // [ RpcExtraHeaders (optional) ] [ payload ] - if (const auto& [opt_new_extra_header, cur_extra_header_size]{kphp::rpc::regularize_extra_headers(rpc_server_instance_st.tl_storer.view(), ignore_answer)}; - opt_new_extra_header.has_value()) { - const auto new_extra_header{*opt_new_extra_header}; - const auto new_extra_header_size{sizeof(std::remove_cvref_t)}; - request_size = request_size - cur_extra_header_size + new_extra_header_size; - - request.reset(reinterpret_cast(kphp::memory::script::alloc(request_size))); - kphp::log::assertion(request != nullptr); - std::memcpy(request.get(), std::addressof(new_extra_header), new_extra_header_size); - std::memcpy(std::next(request.get(), new_extra_header_size), std::next(rpc_server_instance_st.tl_storer.view().data(), cur_extra_header_size), - rpc_server_instance_st.tl_storer.view().size() - cur_extra_header_size); - } else { - request.reset(reinterpret_cast(kphp::memory::script::alloc(request_size))); - kphp::log::assertion(request != nullptr); - std::memcpy(request.get(), rpc_server_instance_st.tl_storer.view().data(), request_size); + auto expected_stream{kphp::component::stream::open(actor, k2::stream_kind::component)}; + if (!expected_stream) [[unlikely]] { + co_return kphp::rpc::query_info{ + .id = kphp::rpc::INVALID_QUERY_ID, .request_size = rpc_server_instance_st.tl_storer.view().size_bytes(), .timestamp = timestamp}; } - // send RPC request - const auto query_id{rpc_client_instance_st.current_query_id++}; - const auto timestamp{std::chrono::duration{std::chrono::system_clock::now().time_since_epoch()}.count()}; + auto stream{*std::move(expected_stream)}; + { + auto tl_storer{std::exchange(rpc_server_instance_st.tl_storer, tl::storer{0})}; + const vk::final_action finalizer{[&tl_storer, &rpc_server_instance_st] noexcept { + if (tl_storer.capacity() > rpc_server_instance_st.tl_storer.capacity()) { + std::swap(tl_storer, rpc_server_instance_st.tl_storer); + } + }}; - auto expected_stream{kphp::component::stream::open(actor, k2::stream_kind::component)}; - if (!expected_stream || !co_await kphp::component::send_request(*expected_stream, {request.get(), request_size})) [[unlikely]] { - co_return kphp::rpc::query_info{.id = kphp::rpc::INVALID_QUERY_ID, .request_size = request_size, .timestamp = timestamp}; + // prepare and send RPC request + // 'request_buf' will look like this: + // [ RpcExtraHeaders (optional) ] [ payload ] + if (const auto& [opt_new_extra_header, cur_extra_header_size]{kphp::rpc::regularize_extra_headers(tl_storer.view(), ignore_answer)}; opt_new_extra_header) { + std::span request_body{tl_storer.view().subspan(cur_extra_header_size)}; + std::span new_header{reinterpret_cast(std::addressof(*opt_new_extra_header)), + sizeof(std::remove_cvref_t)}; + + if (!co_await stream.write_all(new_header) || !co_await kphp::component::send_request(stream, request_body)) [[unlikely]] { + co_return kphp::rpc::query_info{.id = kphp::rpc::INVALID_QUERY_ID, .request_size = request_size, .timestamp = timestamp}; + } + } else if (!co_await kphp::component::send_request(stream, tl_storer.view())) [[unlikely]] { + co_return kphp::rpc::query_info{.id = kphp::rpc::INVALID_QUERY_ID, .request_size = request_size, .timestamp = timestamp}; + } } + const auto query_id{rpc_client_instance_st.current_query_id++}; + // create response extra info if (collect_responses_extra_info) { rpc_client_instance_st.rpc_responses_extra_info.emplace(query_id, std::make_pair(response_extra_info_status::not_ready, response_extra_info{0, timestamp})); } - // create fork to wait for RPC response. we need to do it even if 'ignore_answer' is 'true' to make sure + // create a task to wait for RPC response. we need to do it even if 'ignore_answer' is 'true' to make sure // that the stream will not be closed too early. otherwise, platform may even not send RPC request - auto waiter{[](int64_t query_id, kphp::component::stream stream, std::chrono::nanoseconds timeout, - bool collect_responses_extra_info) noexcept -> kphp::coro::task> { - kphp::stl::vector response{}; - auto fetch_task{kphp::component::fetch_response(stream, kphp::component::read_ext::append(response))}; + static constexpr auto awaiter_coroutine{[](int64_t query_id, kphp::component::stream stream, std::chrono::nanoseconds timeout, + bool collect_responses_extra_info) noexcept -> kphp::coro::shared_task> { + std::optional opt_response{std::in_place}; + auto fetch_task{kphp::component::fetch_response(stream, kphp::component::read_ext::append(*opt_response))}; if (auto expected{co_await kphp::coro::io_scheduler::get().schedule(std::move(fetch_task), timeout)}; !expected) [[unlikely]] { - response.clear(); + opt_response = std::nullopt; } // update response extra info if needed @@ -337,26 +338,28 @@ kphp::coro::task send_request(std::string_view actor, std auto& extra_info_map{RpcClientInstanceState::get().rpc_responses_extra_info}; if (const auto it_extra_info{extra_info_map.find(query_id)}; it_extra_info != extra_info_map.end()) [[likely]] { const auto timestamp{std::chrono::duration{std::chrono::system_clock::now().time_since_epoch()}.count()}; - it_extra_info->second.second = std::make_tuple(response.size(), timestamp - std::get<1>(it_extra_info->second.second)); + const auto response_size{opt_response.transform([](const string& response) noexcept { return static_cast(response.size()); }).value_or(0)}; + it_extra_info->second.second = std::make_tuple(response_size, timestamp - std::get<1>(it_extra_info->second.second)); it_extra_info->second.first = response_extra_info_status::ready; } else { kphp::log::warning("can't find extra info for RPC query {}", query_id); } } - co_return std::move(response); + co_return std::move(opt_response); }}; + // normalize timeout const auto normalized_timeout{kphp::rpc::detail::normalize_timeout( std::chrono::duration_cast(std::chrono::duration{opt_timeout.value_or(kphp::rpc::detail::DEFAULT_TIMEOUT)}))}; - // start waiter fork - const auto waiter_fork_id{ - kphp::forks::start(std::invoke(std::move(waiter), query_id, *std::move(expected_stream), normalized_timeout, collect_responses_extra_info))}; + // start awaiter task + auto awaiter_task{awaiter_coroutine(query_id, std::move(stream), normalized_timeout, collect_responses_extra_info)}; + kphp::log::assertion(kphp::coro::io_scheduler::get().start(awaiter_task)); if (ignore_answer) { co_return kphp::rpc::query_info{.id = kphp::rpc::IGNORED_ANSWER_QUERY_ID, .request_size = request_size, .timestamp = timestamp}; } - rpc_client_instance_st.response_waiter_forks.emplace(query_id, waiter_fork_id); + rpc_client_instance_st.response_awaiter_tasks.emplace(query_id, std::move(awaiter_task)); co_return kphp::rpc::query_info{.id = query_id, .request_size = request_size, .timestamp = timestamp}; } diff --git a/runtime-light/stdlib/rpc/rpc-api.h b/runtime-light/stdlib/rpc/rpc-api.h index f7b5913ce3..710c3653c5 100644 --- a/runtime-light/stdlib/rpc/rpc-api.h +++ b/runtime-light/stdlib/rpc/rpc-api.h @@ -221,7 +221,7 @@ inline kphp::coro::task<> f$rpc_server_store_response(class_instance>> f$rpc_fetch_responses_synchronously(array< // === client typed =============================================================================== -template rpc_function_t, std::same_as rpc_request_t = KphpRpcRequest> -kphp::coro::task> f$rpc_send_typed_query_requests(string actor, array> query_functions, Optional timeout, - bool ignore_answer, class_instance requests_extra_info, - bool need_responses_extra_info) noexcept { +template rpc_function_type, std::same_as rpc_request_type = KphpRpcRequest> +kphp::coro::task> +f$rpc_send_typed_query_requests(string actor, array> query_functions, Optional timeout, bool ignore_answer, + class_instance requests_extra_info, bool need_responses_extra_info) noexcept { if (ignore_answer && need_responses_extra_info) [[unlikely]] { kphp::log::warning("both $ignore_answer and $need_responses_extra_info are 'true'. Metrics won't be collected"); } @@ -290,7 +290,7 @@ kphp::coro::task> f$rpc_send_typed_query_requests(string actor, a for (const auto& it : std::as_const(query_functions)) { const auto query_info{co_await kphp::forks::id_managed(kphp::rpc::detail::typed_rpc_tl_query_one_impl( - {actor.c_str(), actor.size()}, rpc_request_t{it.get_value()}, opt_timeout, collect_resp_extra_info, ignore_answer))}; + {actor.c_str(), actor.size()}, rpc_request_type{it.get_value()}, opt_timeout, collect_resp_extra_info, ignore_answer))}; query_ids.set_value(it.get_key(), query_info.id); req_extra_info_arr.set_value(it.get_key(), kphp::rpc::request_extra_info{query_info.request_size}); } @@ -301,12 +301,12 @@ kphp::coro::task> f$rpc_send_typed_query_requests(string actor, a co_return std::move(query_ids); } -template query_id_t = int64_t, std::same_as error_factory_t = RpcResponseErrorFactory> -requires std::default_initializable -kphp::coro::task>> f$rpc_fetch_typed_responses(array query_ids) noexcept { +template query_id_type = int64_t, std::same_as error_factory_type = RpcResponseErrorFactory> +requires std::default_initializable +kphp::coro::task>> f$rpc_fetch_typed_responses(array query_ids) noexcept { array> res{query_ids.size()}; for (const auto& it : std::as_const(query_ids)) { - res.set_value(it.get_key(), co_await kphp::forks::id_managed(kphp::rpc::detail::typed_rpc_tl_query_result_one_impl(it.get_value(), error_factory_t{}))); + res.set_value(it.get_key(), co_await kphp::forks::id_managed(kphp::rpc::detail::typed_rpc_tl_query_result_one_impl(it.get_value(), error_factory_type{}))); } co_return std::move(res); } diff --git a/runtime-light/stdlib/rpc/rpc-client-state.h b/runtime-light/stdlib/rpc/rpc-client-state.h index 5bef5d3247..e5fdc5c9b1 100644 --- a/runtime-light/stdlib/rpc/rpc-client-state.h +++ b/runtime-light/stdlib/rpc/rpc-client-state.h @@ -5,31 +5,27 @@ #pragma once #include +#include #include #include "common/mixin/not_copyable.h" #include "runtime-common/core/allocator/script-allocator.h" #include "runtime-common/core/runtime-core.h" #include "runtime-common/core/std/containers.h" +#include "runtime-light/coroutine/shared-task.h" #include "runtime-light/stdlib/rpc/rpc-constants.h" #include "runtime-light/stdlib/rpc/rpc-extra-info.h" #include "runtime-light/stdlib/rpc/rpc-tl-defs.h" #include "runtime-light/stdlib/rpc/rpc-tl-query.h" struct RpcClientInstanceState final : private vk::not_copyable { - template - using unordered_map = kphp::stl::unordered_map; - CurrentTlQuery current_client_query{}; int64_t current_query_id{kphp::rpc::VALID_QUERY_ID_RANGE_START}; - unordered_map response_waiter_forks; - unordered_map> response_fetcher_instances; - unordered_map> rpc_responses_extra_info; - - // An analogue of the response_waiter_forks mapping, - // which stores mappings from fork_id to response_id for responses awaited in the rpc queue. - unordered_map awaiter_forks_to_response; + kphp::stl::unordered_map>, kphp::memory::script_allocator> response_awaiter_tasks; + kphp::stl::unordered_map, kphp::memory::script_allocator> response_fetcher_instances; + kphp::stl::unordered_map, kphp::memory::script_allocator> + rpc_responses_extra_info; RpcClientInstanceState() noexcept = default; diff --git a/runtime-light/stdlib/rpc/rpc-queue-functions.h b/runtime-light/stdlib/rpc/rpc-queue-functions.h index 2a38b4c6ba..1690705727 100644 --- a/runtime-light/stdlib/rpc/rpc-queue-functions.h +++ b/runtime-light/stdlib/rpc/rpc-queue-functions.h @@ -7,60 +7,108 @@ #include #include #include +#include +#include +#include "runtime-common/core/runtime-core.h" +#include "runtime-light/coroutine/io-scheduler.h" #include "runtime-light/coroutine/task.h" #include "runtime-light/stdlib/diagnostics/logs.h" #include "runtime-light/stdlib/fork/fork-functions.h" -#include "runtime-light/stdlib/fork/wait-queue-functions.h" #include "runtime-light/stdlib/rpc/rpc-client-state.h" +#include "runtime-light/stdlib/rpc/rpc-queue-state.h" namespace kphp::rpc { inline void rpc_queue_push(int64_t queue_id, int64_t request_id) noexcept { + static constexpr auto rpc_queue_wrapper_task{[](kphp::coro::shared_task<> awaiter_task, int64_t request_id) noexcept -> kphp::coro::task { + co_await std::move(awaiter_task).when_ready(); + co_return request_id; + }}; + auto& rpc_client_instance_st{RpcClientInstanceState::get()}; - const auto it_fork_id{rpc_client_instance_st.response_waiter_forks.find(request_id)}; - if (it_fork_id == rpc_client_instance_st.response_waiter_forks.end()) [[unlikely]] { + const auto it_awaiter_task{rpc_client_instance_st.response_awaiter_tasks.find(request_id)}; + if (it_awaiter_task == rpc_client_instance_st.response_awaiter_tasks.end()) [[unlikely]] { kphp::log::warning("could not find rpc query with id {} in pending queries", queue_id); return; } - const int64_t response_waiter_fork_id{it_fork_id->second}; - rpc_client_instance_st.awaiter_forks_to_response.emplace(response_waiter_fork_id, request_id); - kphp::forks::wait_queue_push(queue_id, response_waiter_fork_id); + auto& rpc_queue_instance_st{RpcQueueInstanceState::get()}; + auto opt_await_set{rpc_queue_instance_st.get_queue(queue_id)}; + if (!opt_await_set) [[unlikely]] { + kphp::log::warning("future with id {} isn't associated with rpc queue", queue_id); + return; + } + + auto& await_set{(*opt_await_set).get()}; + await_set.push(rpc_queue_wrapper_task(static_cast>(it_awaiter_task->second), request_id)); +} + +inline int64_t rpc_queue_create(std::span request_ids) noexcept { + const int64_t queue_id{RpcQueueInstanceState::get().create_queue()}; + for (int64_t request_id : request_ids) { + rpc_queue_push(queue_id, request_id); + } + + return queue_id; } inline kphp::coro::task> rpc_queue_next(int64_t queue_id, std::chrono::nanoseconds timeout) noexcept { - auto wait_result{co_await kphp::forks::wait_queue_next(queue_id, timeout)}; - if (!wait_result.has_value()) { + static constexpr auto rpc_queue_next_task{ + [](auto await_set_awaitable) noexcept -> kphp::coro::task> { co_return co_await std::move(await_set_awaitable); }}; + + auto& rpc_queue_instance_st{RpcQueueInstanceState::get()}; + auto opt_await_set{rpc_queue_instance_st.get_queue(queue_id)}; + + if (!opt_await_set) [[unlikely]] { + kphp::log::warning("future with id {} isn't associated with rpc queue", queue_id); co_return std::nullopt; } - const int64_t ready_fork_id{*wait_result}; - auto& rpc_client_instance_st{RpcClientInstanceState::get()}; - const auto it_request_id{rpc_client_instance_st.awaiter_forks_to_response.find(ready_fork_id)}; - if (it_request_id == rpc_client_instance_st.awaiter_forks_to_response.end()) [[unlikely]] { - kphp::log::warning("awaiter fork {} in the rpc queue isn't associated with any response", ready_fork_id); + auto& await_set{(*opt_await_set).get()}; + const auto expected_next{co_await kphp::coro::io_scheduler::get().schedule(rpc_queue_next_task(await_set.next()), timeout)}; + if (!expected_next) { co_return std::nullopt; } - const int64_t ready_response_id{it_request_id->second}; - rpc_client_instance_st.awaiter_forks_to_response.erase(it_request_id); - co_return ready_response_id; + const auto opt_request_id{*expected_next}; + if (!opt_request_id) [[unlikely]] { + kphp::log::warning("failed to get RPC response from rpc queue {}", queue_id); + co_return kphp::rpc::INVALID_QUERY_ID; + } + co_return *opt_request_id; +} + +inline bool rpc_queue_empty(int64_t queue_id) noexcept { + auto& rpc_queue_instance_st{RpcQueueInstanceState::get()}; + auto opt_await_set{rpc_queue_instance_st.get_queue(queue_id)}; + if (!opt_await_set) [[unlikely]] { + kphp::log::warning("future with id {} isn't associated with rpc queue", queue_id); + return true; + } + const auto& await_set{(*opt_await_set).get()}; + return await_set.empty(); } + } // namespace kphp::rpc inline int64_t f$rpc_queue_create() noexcept { - return f$wait_queue_create(); + return kphp::rpc::rpc_queue_create({}); } -inline int64_t f$rpc_queue_create(const array& request_ids) noexcept { - int64_t future{f$wait_queue_create()}; - for (auto request_id : request_ids) { - kphp::rpc::rpc_queue_push(future, request_id.get_value()); +inline int64_t f$rpc_queue_create(const mixed& request_ids) noexcept { + const int64_t queue_id{kphp::rpc::rpc_queue_create({})}; + if (!request_ids.is_array()) { + return queue_id; + } + for (auto request_id : request_ids.as_array()) { + if (request_id.get_value().is_int()) { + kphp::rpc::rpc_queue_push(queue_id, request_id.get_value().as_int()); + } } - return future; + return queue_id; } inline void f$rpc_queue_push(int64_t queue_id, int64_t request_id) noexcept { @@ -68,7 +116,7 @@ inline void f$rpc_queue_push(int64_t queue_id, int64_t request_id) noexcept { } inline bool f$rpc_queue_empty(int64_t queue_id) noexcept { - return f$wait_queue_empty(queue_id); + return kphp::rpc::rpc_queue_empty(queue_id); } inline kphp::coro::task> f$rpc_queue_next(int64_t queue_id, double timeout = -1) noexcept { diff --git a/runtime-light/stdlib/rpc/rpc-queue-state.cpp b/runtime-light/stdlib/rpc/rpc-queue-state.cpp new file mode 100644 index 0000000000..a346863a01 --- /dev/null +++ b/runtime-light/stdlib/rpc/rpc-queue-state.cpp @@ -0,0 +1,11 @@ +// Compiler for PHP (aka KPHP) +// Copyright (c) 2025 LLC «V Kontakte» +// Distributed under the GPL v3 License, see LICENSE.notice.txt + +#include "runtime-light/stdlib/rpc/rpc-queue-state.h" + +#include "runtime-light/state/instance-state.h" + +RpcQueueInstanceState& RpcQueueInstanceState::get() noexcept { + return InstanceState::get().rpc_queue_instance_state; +} diff --git a/runtime-light/stdlib/rpc/rpc-queue-state.h b/runtime-light/stdlib/rpc/rpc-queue-state.h new file mode 100644 index 0000000000..7c307dd27c --- /dev/null +++ b/runtime-light/stdlib/rpc/rpc-queue-state.h @@ -0,0 +1,38 @@ +// Compiler for PHP (aka KPHP) +// Copyright (c) 2025 LLC «V Kontakte» +// Distributed under the GPL v3 License, see LICENSE.notice.txt + +#pragma once + +#include +#include +#include + +#include "common/mixin/not_copyable.h" +#include "runtime-common/core/allocator/script-allocator.h" +#include "runtime-common/core/std/containers.h" +#include "runtime-light/coroutine/await-set.h" + +class RpcQueueInstanceState final : private vk::not_copyable { + static constexpr int64_t RPC_QUEUE_ID_INIT = 0; + int64_t m_rpc_wait_queue_id{RPC_QUEUE_ID_INIT}; + kphp::stl::unordered_map, kphp::memory::script_allocator> m_queues; + +public: + RpcQueueInstanceState() noexcept = default; + + static RpcQueueInstanceState& get() noexcept; + + [[nodiscard]] int64_t create_queue() noexcept { + const int64_t wait_queue_id{m_rpc_wait_queue_id++}; + m_queues.emplace(wait_queue_id, kphp::coro::await_set{}); + return wait_queue_id; + } + + std::optional>> get_queue(int64_t queue_id) noexcept { + if (auto it{m_queues.find(queue_id)}; it != m_queues.end()) { + return it->second; + } + return std::nullopt; + } +}; diff --git a/runtime-light/stdlib/rpc/rpc-tl-kphp-request.h b/runtime-light/stdlib/rpc/rpc-tl-kphp-request.h index 4bc9a2c157..bace1d8a3f 100644 --- a/runtime-light/stdlib/rpc/rpc-tl-kphp-request.h +++ b/runtime-light/stdlib/rpc/rpc-tl-kphp-request.h @@ -25,31 +25,32 @@ void f$VK$TL$RpcFunctionFetcher$$typedStore(class_instance&& wrapped) - : wrapped_(std::move(wrapped)) {} + explicit tl_func_base_simple_wrapper(class_instance&& wrapped) noexcept + : m_wrapped(std::move(wrapped)) {} - virtual mixed fetch() { + mixed fetch() noexcept override { php_critical_error("this function should never be called for typed RPC function."); return mixed{}; } - virtual class_instance typed_fetch() { - return f$VK$TL$RpcFunctionFetcher$$typedFetch(wrapped_); + class_instance typed_fetch() noexcept override { + return f$VK$TL$RpcFunctionFetcher$$typedFetch(m_wrapped); } - virtual void rpc_server_typed_store(const class_instance& result) { - return f$VK$TL$RpcFunctionFetcher$$typedStore(wrapped_, result); + void rpc_server_typed_store(const class_instance& result) noexcept override { + return f$VK$TL$RpcFunctionFetcher$$typedStore(m_wrapped, result); } private: - class_instance wrapped_; + class_instance m_wrapped; }; -inline std::unique_ptr make_tl_func_base_simple_wrapper(class_instance&& wrapped) { +inline std::unique_ptr make_tl_func_base_simple_wrapper(class_instance&& wrapped) noexcept { return std::make_unique(std::move(wrapped)); } namespace kphp::rpc::rpc_impl { + // use template, because t_ReqResult_ is unknown on runtime compilation template class t_ReqResult_> class KphpRpcRequestResult final : public RpcRequestResult { @@ -66,7 +67,7 @@ class KphpRpcRequestResult final : public RpcRequestResult { } std::unique_ptr extract_untyped_fetcher() noexcept final { - kphp::log::error("Forbidden to call for typed rpc requests"); + kphp::log::error("forbidden to call for typed rpc requests"); } }; @@ -82,18 +83,19 @@ class KphpRpcRequest final : public RpcRequest { cur_query.set_current_tl_function(tl_function_name()); const vk::final_action finalizer{[&cur_query] noexcept { cur_query.reset(); }}; std::unique_ptr stored_fetcher; - auto custom_fetcher = f$VK$TL$RpcFunction$$typedStore(storing_function); + auto custom_fetcher{f$VK$TL$RpcFunction$$typedStore(storing_function)}; if (custom_fetcher.is_null()) { stored_fetcher = storing_function.get()->store(); } else { stored_fetcher = make_tl_func_base_simple_wrapper(std::move(custom_fetcher)); - auto magic = f$VK$TL$RpcFunction$$getTLFunctionMagic(storing_function); - CurrentTlQuery::get().set_last_stored_tl_function_magic(magic); + auto magic{f$VK$TL$RpcFunction$$getTLFunctionMagic(storing_function)}; + cur_query.set_last_stored_tl_function_magic(magic); } CHECK_EXCEPTION(return {}); return make_unique_on_script_memory>(std::move(stored_fetcher)); } }; + } // namespace kphp::rpc::rpc_impl template diff --git a/runtime-light/stdlib/stdlib.cmake b/runtime-light/stdlib/stdlib.cmake index 484175c20d..13942567f7 100644 --- a/runtime-light/stdlib/stdlib.cmake +++ b/runtime-light/stdlib/stdlib.cmake @@ -25,6 +25,7 @@ prepend( rpc/rpc-client-state.cpp rpc/rpc-extra-headers.cpp rpc/rpc-extra-info.cpp + rpc/rpc-queue-state.cpp rpc/rpc-tl-builtins.cpp rpc/rpc-tl-error.cpp rpc/rpc-tl-query.cpp diff --git a/runtime-light/streams/stream.h b/runtime-light/streams/stream.h index 5655cd8e7c..0d8cdfd8a6 100644 --- a/runtime-light/streams/stream.h +++ b/runtime-light/streams/stream.h @@ -45,7 +45,9 @@ class stream { } ~stream() { - reset(k2::INVALID_PLATFORM_DESCRIPTOR); + if (m_descriptor != k2::INVALID_PLATFORM_DESCRIPTOR) { + reset(k2::INVALID_PLATFORM_DESCRIPTOR); + } } stream(const stream&) = delete; diff --git a/runtime-light/tl/tl-core.h b/runtime-light/tl/tl-core.h index 9c9d378d28..eb34dad4df 100644 --- a/runtime-light/tl/tl-core.h +++ b/runtime-light/tl/tl-core.h @@ -62,6 +62,10 @@ class storer { m_buffer.reserve(capacity); } + size_t capacity() const noexcept { + return m_buffer.capacity(); + } + void store_bytes(std::span bytes) noexcept { m_buffer.append_range(bytes); }