Skip to content

Commit 853e982

Browse files
authored
[k2] optimize RPC (#1448)
* reduce amount of memory allocation performed for each RPC call * get rid of forks for RPC response awaiters * increase amount of instance's initial script memory
1 parent 50fcd34 commit 853e982

16 files changed

Lines changed: 307 additions & 202 deletions

File tree

builtin-functions/kphp-light/stdlib/rpc.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ function set_fail_rpc_on_int32_overflow ($fail_rpc ::: bool): bool;
107107
*/
108108
function extract_kphp_rpc_response_extra_info ($resumable_id ::: int) ::: ?tuple(int, float);
109109

110-
function rpc_queue_create ($request_ids ::: array<int> = TODO) ::: int;
110+
function rpc_queue_create ($request_ids ::: mixed = TODO) ::: int;
111111

112112
function rpc_queue_push ($queue_id ::: int, $request_id ::: int) ::: void;
113113

runtime-light/coroutine/await-set.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@
88
#include <memory>
99
#include <type_traits>
1010

11-
#include "runtime-common/core/allocator/script-allocator.h"
12-
#include "runtime-common/core/std/containers.h"
1311
#include "runtime-light/coroutine/async-stack.h"
1412
#include "runtime-light/coroutine/concepts.h"
1513
#include "runtime-light/coroutine/coroutine-state.h"

runtime-light/coroutine/shared-task.h

Lines changed: 50 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@ namespace kphp::coro {
2121

2222
namespace shared_task_impl {
2323

24-
struct shared_task_waiter final {
24+
struct shared_task_awaiter final {
2525
std::coroutine_handle<> m_continuation;
26-
shared_task_waiter* m_next{};
27-
shared_task_waiter* m_prev{};
26+
shared_task_awaiter* m_next{};
27+
shared_task_awaiter* m_prev{};
2828
};
2929

3030
template<typename promise_type>
@@ -42,21 +42,21 @@ struct promise_base : kphp::coro::async_stack_element {
4242
auto await_suspend(std::coroutine_handle<promise_type> coro) const noexcept -> std::coroutine_handle<> {
4343
promise_base& promise{coro.promise()};
4444
// mark promise as ready
45-
auto* waiter{static_cast<shared_task_impl::shared_task_waiter*>(std::exchange(promise.m_waiters, std::addressof(promise)))};
46-
if (waiter == STARTED_NO_WAITERS_VAL) { // no waiters, so just finish this coroutine
45+
auto* awaiter{static_cast<shared_task_impl::shared_task_awaiter*>(std::exchange(promise.m_awaiters, std::addressof(promise)))};
46+
if (awaiter == STARTED_NO_WAITERS_VAL) { // no awaiters, so just finish this coroutine
4747
return std::noop_coroutine();
4848
}
4949

50-
while (waiter->m_next != nullptr) {
50+
while (awaiter->m_next != nullptr) {
5151
// read the m_next pointer before resuming the coroutine
5252
// since resuming the coroutine may destroy the shared_task_waiter value
53-
auto* next{waiter->m_next};
53+
auto* next{awaiter->m_next};
5454
auto& async_stack_root{*promise.get_async_stack_frame().async_stack_root};
55-
kphp::coro::resume(waiter->m_continuation, async_stack_root);
56-
waiter = next;
55+
kphp::coro::resume(awaiter->m_continuation, async_stack_root);
56+
awaiter = next;
5757
}
58-
// return last waiter's coroutine_handle to allow it to potentially be compiled as a tail-call
59-
return waiter->m_continuation;
58+
// return last awaiter's coroutine_handle to allow it to potentially be compiled as a tail-call
59+
return awaiter->m_continuation;
6060
}
6161

6262
constexpr auto await_resume() const noexcept -> void {}
@@ -69,35 +69,35 @@ struct promise_base : kphp::coro::async_stack_element {
6969
}
7070

7171
auto done() const noexcept -> bool {
72-
return m_waiters == this;
72+
return m_awaiters == this;
7373
}
7474

7575
auto add_ref() noexcept -> void {
7676
++m_refcnt;
7777
}
7878

79-
// try to enqueue a waiter to the list of waiters.
79+
// try to enqueue a awaiter to the list of awaiters.
8080
//
81-
// return true if the waiter was successfully queued, in which case
82-
// waiter->coroutine will be resumed when the task completes.
81+
// return true if the awaiter was successfully queued, in which case
82+
// awaiter->coroutine will be resumed when the task completes.
8383
// false if the coroutine was already completed and the awaiting
8484
// coroutine can continue without suspending.
85-
auto suspend_awaiter(shared_task_impl::shared_task_waiter& waiter) noexcept -> bool {
86-
const void* const NOT_STARTED_VAL{std::addressof(this->m_waiters)};
85+
auto suspend_awaiter(shared_task_impl::shared_task_awaiter& awaiter) noexcept -> bool {
86+
const void* const NOT_STARTED_VAL{std::addressof(this->m_awaiters)};
8787

88-
// NOTE: If the coroutine is not yet started then the first waiter
88+
// NOTE: If the coroutine is not yet started then the first awaiter
8989
// will start the coroutine before enqueuing itself up to the list
90-
// of suspended waiters waiting for completion. We split this into
90+
// of suspended awaiters waiting for completion. We split this into
9191
// two steps to allow the first awaiter to return without suspending.
92-
// This avoids recursively resuming the first waiter inside the call to
92+
// This avoids recursively resuming the first awaiter inside the call to
9393
// coroutine.resume() in the case that the coroutine completes
9494
// synchronously, which could otherwise lead to stack-overflow if
9595
// the awaiting coroutine awaited many synchronously-completing
9696
// tasks in a row.
9797

9898
// start the coroutine if not yet started
99-
if (m_waiters == NOT_STARTED_VAL) {
100-
m_waiters = STARTED_NO_WAITERS_VAL;
99+
if (m_awaiters == NOT_STARTED_VAL) {
100+
m_awaiters = STARTED_NO_WAITERS_VAL;
101101
const auto& handle{std::coroutine_handle<promise_type>::from_promise(*static_cast<promise_type*>(this))};
102102
auto& async_stack_root{*get_async_stack_frame().async_stack_root};
103103
kphp::coro::resume(handle, async_stack_root);
@@ -107,13 +107,13 @@ struct promise_base : kphp::coro::async_stack_element {
107107
return false;
108108
}
109109

110-
waiter.m_prev = nullptr;
111-
waiter.m_next = static_cast<shared_task_impl::shared_task_waiter*>(m_waiters);
110+
awaiter.m_prev = nullptr;
111+
awaiter.m_next = static_cast<shared_task_impl::shared_task_awaiter*>(m_awaiters);
112112
// at this point 'm_waiters' can only be 'STARTED_NO_WAITERS_VAL' or 'other'
113-
if (m_waiters != STARTED_NO_WAITERS_VAL) {
114-
static_cast<shared_task_waiter*>(m_waiters)->m_prev = std::addressof(waiter);
113+
if (m_awaiters != STARTED_NO_WAITERS_VAL) {
114+
static_cast<shared_task_awaiter*>(m_awaiters)->m_prev = std::addressof(awaiter);
115115
}
116-
m_waiters = static_cast<void*>(std::addressof(waiter));
116+
m_awaiters = static_cast<void*>(std::addressof(awaiter));
117117
return true;
118118
}
119119

@@ -124,20 +124,20 @@ struct promise_base : kphp::coro::async_stack_element {
124124
return m_refcnt-- != 1;
125125
}
126126

127-
auto cancel_awaiter(const shared_task_impl::shared_task_waiter& waiter) noexcept -> void {
128-
const void* const NOT_STARTED_VAL{std::addressof(this->m_waiters)};
129-
if (m_waiters == NOT_STARTED_VAL || m_waiters == STARTED_NO_WAITERS_VAL) [[unlikely]] {
127+
auto cancel_awaiter(const shared_task_impl::shared_task_awaiter& awaiter) noexcept -> void {
128+
const void* const NOT_STARTED_VAL{std::addressof(this->m_awaiters)};
129+
if (m_awaiters == NOT_STARTED_VAL || m_awaiters == STARTED_NO_WAITERS_VAL) [[unlikely]] {
130130
return;
131131
}
132132

133-
const auto* waiter_ptr{std::addressof(waiter)};
134-
if (m_waiters == waiter_ptr) { // waiter is the head of the list
135-
m_waiters = waiter_ptr->m_next;
136-
} else if (waiter_ptr->m_next == nullptr) { // waiter is the last in the list
137-
waiter_ptr->m_prev->m_next = nullptr;
138-
} else { // waiter is somewhere in the middle of the list
139-
waiter_ptr->m_next->m_prev = waiter_ptr->m_prev;
140-
waiter_ptr->m_prev->m_next = waiter_ptr->m_next;
133+
const auto* awaiter_ptr{std::addressof(awaiter)};
134+
if (m_awaiters == awaiter_ptr) { // awaiter is the head of the list
135+
m_awaiters = awaiter_ptr->m_next;
136+
} else if (awaiter_ptr->m_next == nullptr) { // awaiter is the last in the list
137+
awaiter_ptr->m_prev->m_next = nullptr;
138+
} else { // awaiter is somewhere in the middle of the list
139+
awaiter_ptr->m_next->m_prev = awaiter_ptr->m_prev;
140+
awaiter_ptr->m_prev->m_next = awaiter_ptr->m_next;
141141
}
142142
}
143143

@@ -155,13 +155,13 @@ struct promise_base : kphp::coro::async_stack_element {
155155

156156
uint32_t m_refcnt{1};
157157
// Value is either
158-
// - nullptr - indicates started, no waiters
159-
// - &this->w_waiters - indicates the coroutine is not yet started
160-
// - this - indicates value is ready
161-
// - other - pointer to head item in linked-list of waiters.
162-
// values are of type 'shared_task_impl_::shared_task_waiter_t'.
163-
// indicates that the coroutine has been started.
164-
void* m_waiters{std::addressof(m_waiters)};
158+
// - nullptr - indicates started, no awaiters
159+
// - &this->m_awaiters - indicates the coroutine is not yet started
160+
// - this - indicates value is ready
161+
// - other - pointer to head item in linked-list of awaiters.
162+
// values are of type 'shared_task_impl_::shared_task_waiter_t'.
163+
// indicates that the coroutine has been started.
164+
void* m_awaiters{std::addressof(m_awaiters)};
165165
};
166166

167167
template<typename promise_type>
@@ -190,7 +190,7 @@ class awaiter_base {
190190

191191
protected:
192192
std::coroutine_handle<promise_type> m_coro;
193-
shared_task_impl::shared_task_waiter m_waiter{};
193+
shared_task_impl::shared_task_awaiter m_awaiter{};
194194

195195
public:
196196
explicit awaiter_base(std::coroutine_handle<promise_type> coro) noexcept
@@ -199,15 +199,15 @@ class awaiter_base {
199199
awaiter_base(awaiter_base&& other) noexcept
200200
: m_suspended(std::exchange(other.m_suspended, false)),
201201
m_coro(std::exchange(other.m_coro, {})),
202-
m_waiter(std::exchange(other.m_waiter, {})) {}
202+
m_awaiter(std::exchange(other.m_awaiter, {})) {}
203203

204204
awaiter_base(const awaiter_base& other) = delete;
205205
awaiter_base& operator=(const awaiter_base& other) = delete;
206206
awaiter_base& operator=(awaiter_base&& other) = delete;
207207

208208
~awaiter_base() {
209209
if (m_suspended) {
210-
m_coro.promise().cancel_awaiter(m_waiter);
210+
m_coro.promise().cancel_awaiter(m_awaiter);
211211
}
212212
}
213213

@@ -218,8 +218,8 @@ class awaiter_base {
218218
template<std::derived_from<kphp::coro::async_stack_element> caller_promise_type>
219219
[[clang::noinline]] auto await_suspend(std::coroutine_handle<caller_promise_type> awaiting_coroutine) noexcept -> bool {
220220
set_async_top_frame(awaiting_coroutine.promise().get_async_stack_frame(), STACK_RETURN_ADDRESS);
221-
m_waiter.m_continuation = awaiting_coroutine;
222-
m_suspended = m_coro.promise().suspend_awaiter(m_waiter);
221+
m_awaiter.m_continuation = awaiting_coroutine;
222+
m_suspended = m_coro.promise().suspend_awaiter(m_awaiter);
223223
reset_async_top_frame(awaiting_coroutine.promise().get_async_stack_frame());
224224
return m_suspended;
225225
}

runtime-light/coroutine/task.h

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -154,24 +154,24 @@ struct task {
154154

155155
task() noexcept = default;
156156

157-
explicit task(std::coroutine_handle<> coro) noexcept
158-
: m_haddress(coro.address()) {}
157+
explicit task(std::coroutine_handle<promise_type> coro) noexcept
158+
: m_coro(coro) {}
159159

160160
task(const task& other) noexcept = delete;
161161

162162
task(task&& other) noexcept
163-
: m_haddress(std::exchange(other.m_haddress, nullptr)) {}
163+
: m_coro(std::exchange(other.m_coro, {})) {}
164164

165165
task& operator=(const task& other) noexcept = delete;
166166

167167
task& operator=(task&& other) noexcept {
168-
std::swap(m_haddress, other.m_haddress);
168+
std::swap(m_coro, other.m_coro);
169169
return *this;
170170
}
171171

172172
~task() {
173-
if (m_haddress) {
174-
std::coroutine_handle<promise_type>::from_address(m_haddress).destroy();
173+
if (m_coro) {
174+
m_coro.destroy();
175175
}
176176
}
177177

@@ -217,28 +217,28 @@ struct task {
217217
return awaiter_base::m_coro.promise().result();
218218
}
219219
};
220-
return awaiter{std::coroutine_handle<promise_type>::from_address(m_haddress)};
220+
return awaiter{m_coro};
221221
}
222222

223223
auto get_handle() noexcept -> std::coroutine_handle<promise_type> {
224-
return std::coroutine_handle<promise_type>::from_address(m_haddress);
224+
return m_coro;
225225
}
226226

227227
// conversion functions
228228
//
229229
// erase type
230230
explicit operator task<>() && noexcept {
231-
return task<>{std::coroutine_handle<>::from_address(std::exchange(m_haddress, nullptr))};
231+
return task<>{std::exchange(m_coro, {})};
232232
}
233233
// restore erased type
234234
template<typename U>
235235
requires(std::same_as<void, T>)
236236
explicit operator task<U>() && noexcept {
237-
return task<U>{std::coroutine_handle<>::from_address(std::exchange(m_haddress, nullptr))};
237+
return task<U>{std::exchange(m_coro, {})};
238238
}
239239

240240
private:
241-
void* m_haddress{};
241+
std::coroutine_handle<promise_type> m_coro{};
242242
};
243243

244244
} // namespace kphp::coro

runtime-light/state/instance-state.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
#include "runtime-light/stdlib/math/random-state.h"
3535
#include "runtime-light/stdlib/output/output-state.h"
3636
#include "runtime-light/stdlib/rpc/rpc-client-state.h"
37+
#include "runtime-light/stdlib/rpc/rpc-queue-state.h"
3738
#include "runtime-light/stdlib/serialization/serialization-state.h"
3839
#include "runtime-light/stdlib/string/regex-state.h"
3940
#include "runtime-light/stdlib/string/string-state.h"
@@ -94,6 +95,7 @@ struct InstanceState final : vk::not_copyable {
9495
CoroutineInstanceState coroutine_instance_state;
9596
ForkInstanceState fork_instance_state;
9697
WaitQueueInstanceState wait_queue_instance_state;
98+
RpcQueueInstanceState rpc_queue_instance_state;
9799
PhpScriptMutableGlobals php_script_mutable_globals_singleton;
98100

99101
RuntimeContext runtime_context;
@@ -133,5 +135,5 @@ struct InstanceState final : vk::not_copyable {
133135
enum image_kind image_kind_ { image_kind::invalid };
134136
enum instance_kind instance_kind_ { instance_kind::invalid };
135137

136-
static constexpr auto INIT_INSTANCE_ALLOCATOR_SIZE = static_cast<size_t>(16U * 1024U * 1024U);
138+
static constexpr auto INIT_INSTANCE_ALLOCATOR_SIZE = static_cast<size_t>(256U * 1024U * 1024U);
137139
};

runtime-light/stdlib/fork/fork-state.h

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,13 +49,13 @@ struct ForkInstanceState final : private vk::not_copyable {
4949

5050
template<typename return_type>
5151
std::pair<int64_t, kphp::coro::shared_task<return_type>> create_fork(kphp::coro::task<return_type> task) noexcept {
52+
static constexpr auto fork_coroutine{[](kphp::coro::task<return_type> task, int64_t fork_id) noexcept -> kphp::coro::shared_task<return_type> {
53+
ForkInstanceState::get().current_id = fork_id;
54+
co_return co_await std::move(task);
55+
}};
56+
5257
const int64_t fork_id{next_fork_id++};
53-
auto fork_task{std::invoke(
54-
[](kphp::coro::task<return_type> task, int64_t fork_id) noexcept -> kphp::coro::shared_task<return_type> {
55-
ForkInstanceState::get().current_id = fork_id;
56-
co_return co_await std::move(task);
57-
},
58-
std::move(task), fork_id)};
58+
auto fork_task{std::invoke(fork_coroutine, std::move(task), fork_id)};
5959
forks.emplace(fork_id, fork_info{.awaited = {}, .thrown_exception = {}, .opt_handle = static_cast<kphp::coro::shared_task<>>(fork_task)});
6060
return std::make_pair(fork_id, std::move(fork_task));
6161
}

0 commit comments

Comments
 (0)