Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion builtin-functions/kphp-light/stdlib/rpc.txt
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ function set_fail_rpc_on_int32_overflow ($fail_rpc ::: bool): bool;
*/
function extract_kphp_rpc_response_extra_info ($resumable_id ::: int) ::: ?tuple(int, float);

function rpc_queue_create ($request_ids ::: array<int> = TODO) ::: int;
function rpc_queue_create ($request_ids ::: mixed = TODO) ::: int;

function rpc_queue_push ($queue_id ::: int, $request_id ::: int) ::: void;

Expand Down
2 changes: 0 additions & 2 deletions runtime-light/coroutine/await-set.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
#include <memory>
#include <type_traits>

#include "runtime-common/core/allocator/script-allocator.h"
#include "runtime-common/core/std/containers.h"
#include "runtime-light/coroutine/async-stack.h"
#include "runtime-light/coroutine/concepts.h"
#include "runtime-light/coroutine/coroutine-state.h"
Expand Down
100 changes: 50 additions & 50 deletions runtime-light/coroutine/shared-task.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ namespace kphp::coro {

namespace shared_task_impl {

struct shared_task_waiter final {
struct shared_task_awaiter final {
std::coroutine_handle<> m_continuation;
shared_task_waiter* m_next{};
shared_task_waiter* m_prev{};
shared_task_awaiter* m_next{};
shared_task_awaiter* m_prev{};
};

template<typename promise_type>
Expand All @@ -42,21 +42,21 @@ struct promise_base : kphp::coro::async_stack_element {
auto await_suspend(std::coroutine_handle<promise_type> coro) const noexcept -> std::coroutine_handle<> {
promise_base& promise{coro.promise()};
// mark promise as ready
auto* waiter{static_cast<shared_task_impl::shared_task_waiter*>(std::exchange(promise.m_waiters, std::addressof(promise)))};
if (waiter == STARTED_NO_WAITERS_VAL) { // no waiters, so just finish this coroutine
auto* awaiter{static_cast<shared_task_impl::shared_task_awaiter*>(std::exchange(promise.m_awaiters, std::addressof(promise)))};
if (awaiter == STARTED_NO_WAITERS_VAL) { // no awaiters, so just finish this coroutine
return std::noop_coroutine();
}

while (waiter->m_next != nullptr) {
while (awaiter->m_next != nullptr) {
// read the m_next pointer before resuming the coroutine
// since resuming the coroutine may destroy the shared_task_waiter value
auto* next{waiter->m_next};
auto* next{awaiter->m_next};
auto& async_stack_root{*promise.get_async_stack_frame().async_stack_root};
kphp::coro::resume(waiter->m_continuation, async_stack_root);
waiter = next;
kphp::coro::resume(awaiter->m_continuation, async_stack_root);
awaiter = next;
}
// return last waiter's coroutine_handle to allow it to potentially be compiled as a tail-call
return waiter->m_continuation;
// return last awaiter's coroutine_handle to allow it to potentially be compiled as a tail-call
return awaiter->m_continuation;
}

constexpr auto await_resume() const noexcept -> void {}
Expand All @@ -69,35 +69,35 @@ struct promise_base : kphp::coro::async_stack_element {
}

auto done() const noexcept -> bool {
return m_waiters == this;
return m_awaiters == this;
}

auto add_ref() noexcept -> void {
++m_refcnt;
}

// try to enqueue a waiter to the list of waiters.
// try to enqueue a awaiter to the list of awaiters.
//
// return true if the waiter was successfully queued, in which case
// waiter->coroutine will be resumed when the task completes.
// return true if the awaiter was successfully queued, in which case
// awaiter->coroutine will be resumed when the task completes.
// false if the coroutine was already completed and the awaiting
// coroutine can continue without suspending.
auto suspend_awaiter(shared_task_impl::shared_task_waiter& waiter) noexcept -> bool {
const void* const NOT_STARTED_VAL{std::addressof(this->m_waiters)};
auto suspend_awaiter(shared_task_impl::shared_task_awaiter& awaiter) noexcept -> bool {
const void* const NOT_STARTED_VAL{std::addressof(this->m_awaiters)};

// NOTE: If the coroutine is not yet started then the first waiter
// NOTE: If the coroutine is not yet started then the first awaiter
// will start the coroutine before enqueuing itself up to the list
// of suspended waiters waiting for completion. We split this into
// of suspended awaiters waiting for completion. We split this into
// two steps to allow the first awaiter to return without suspending.
// This avoids recursively resuming the first waiter inside the call to
// This avoids recursively resuming the first awaiter inside the call to
// coroutine.resume() in the case that the coroutine completes
// synchronously, which could otherwise lead to stack-overflow if
// the awaiting coroutine awaited many synchronously-completing
// tasks in a row.

// start the coroutine if not yet started
if (m_waiters == NOT_STARTED_VAL) {
m_waiters = STARTED_NO_WAITERS_VAL;
if (m_awaiters == NOT_STARTED_VAL) {
m_awaiters = STARTED_NO_WAITERS_VAL;
const auto& handle{std::coroutine_handle<promise_type>::from_promise(*static_cast<promise_type*>(this))};
auto& async_stack_root{*get_async_stack_frame().async_stack_root};
kphp::coro::resume(handle, async_stack_root);
Expand All @@ -107,13 +107,13 @@ struct promise_base : kphp::coro::async_stack_element {
return false;
}

waiter.m_prev = nullptr;
waiter.m_next = static_cast<shared_task_impl::shared_task_waiter*>(m_waiters);
awaiter.m_prev = nullptr;
awaiter.m_next = static_cast<shared_task_impl::shared_task_awaiter*>(m_awaiters);
// at this point 'm_waiters' can only be 'STARTED_NO_WAITERS_VAL' or 'other'
if (m_waiters != STARTED_NO_WAITERS_VAL) {
static_cast<shared_task_waiter*>(m_waiters)->m_prev = std::addressof(waiter);
if (m_awaiters != STARTED_NO_WAITERS_VAL) {
static_cast<shared_task_awaiter*>(m_awaiters)->m_prev = std::addressof(awaiter);
}
m_waiters = static_cast<void*>(std::addressof(waiter));
m_awaiters = static_cast<void*>(std::addressof(awaiter));
return true;
}

Expand All @@ -124,20 +124,20 @@ struct promise_base : kphp::coro::async_stack_element {
return m_refcnt-- != 1;
}

auto cancel_awaiter(const shared_task_impl::shared_task_waiter& waiter) noexcept -> void {
const void* const NOT_STARTED_VAL{std::addressof(this->m_waiters)};
if (m_waiters == NOT_STARTED_VAL || m_waiters == STARTED_NO_WAITERS_VAL) [[unlikely]] {
auto cancel_awaiter(const shared_task_impl::shared_task_awaiter& awaiter) noexcept -> void {
const void* const NOT_STARTED_VAL{std::addressof(this->m_awaiters)};
if (m_awaiters == NOT_STARTED_VAL || m_awaiters == STARTED_NO_WAITERS_VAL) [[unlikely]] {
return;
}

const auto* waiter_ptr{std::addressof(waiter)};
if (m_waiters == waiter_ptr) { // waiter is the head of the list
m_waiters = waiter_ptr->m_next;
} else if (waiter_ptr->m_next == nullptr) { // waiter is the last in the list
waiter_ptr->m_prev->m_next = nullptr;
} else { // waiter is somewhere in the middle of the list
waiter_ptr->m_next->m_prev = waiter_ptr->m_prev;
waiter_ptr->m_prev->m_next = waiter_ptr->m_next;
const auto* awaiter_ptr{std::addressof(awaiter)};
if (m_awaiters == awaiter_ptr) { // awaiter is the head of the list
m_awaiters = awaiter_ptr->m_next;
} else if (awaiter_ptr->m_next == nullptr) { // awaiter is the last in the list
awaiter_ptr->m_prev->m_next = nullptr;
} else { // awaiter is somewhere in the middle of the list
awaiter_ptr->m_next->m_prev = awaiter_ptr->m_prev;
awaiter_ptr->m_prev->m_next = awaiter_ptr->m_next;
}
}

Expand All @@ -155,13 +155,13 @@ struct promise_base : kphp::coro::async_stack_element {

uint32_t m_refcnt{1};
// Value is either
// - nullptr - indicates started, no waiters
// - &this->w_waiters - indicates the coroutine is not yet started
// - this - indicates value is ready
// - other - pointer to head item in linked-list of waiters.
// values are of type 'shared_task_impl_::shared_task_waiter_t'.
// indicates that the coroutine has been started.
void* m_waiters{std::addressof(m_waiters)};
// - nullptr - indicates started, no awaiters
// - &this->m_awaiters - indicates the coroutine is not yet started
// - this - indicates value is ready
// - other - pointer to head item in linked-list of awaiters.
// values are of type 'shared_task_impl_::shared_task_waiter_t'.
// indicates that the coroutine has been started.
void* m_awaiters{std::addressof(m_awaiters)};
};

template<typename promise_type>
Expand Down Expand Up @@ -190,7 +190,7 @@ class awaiter_base {

protected:
std::coroutine_handle<promise_type> m_coro;
shared_task_impl::shared_task_waiter m_waiter{};
shared_task_impl::shared_task_awaiter m_awaiter{};

public:
explicit awaiter_base(std::coroutine_handle<promise_type> coro) noexcept
Expand All @@ -199,15 +199,15 @@ class awaiter_base {
awaiter_base(awaiter_base&& other) noexcept
: m_suspended(std::exchange(other.m_suspended, false)),
m_coro(std::exchange(other.m_coro, {})),
m_waiter(std::exchange(other.m_waiter, {})) {}
m_awaiter(std::exchange(other.m_awaiter, {})) {}

awaiter_base(const awaiter_base& other) = delete;
awaiter_base& operator=(const awaiter_base& other) = delete;
awaiter_base& operator=(awaiter_base&& other) = delete;

~awaiter_base() {
if (m_suspended) {
m_coro.promise().cancel_awaiter(m_waiter);
m_coro.promise().cancel_awaiter(m_awaiter);
}
}

Expand All @@ -218,8 +218,8 @@ class awaiter_base {
template<std::derived_from<kphp::coro::async_stack_element> caller_promise_type>
[[clang::noinline]] auto await_suspend(std::coroutine_handle<caller_promise_type> awaiting_coroutine) noexcept -> bool {
set_async_top_frame(awaiting_coroutine.promise().get_async_stack_frame(), STACK_RETURN_ADDRESS);
m_waiter.m_continuation = awaiting_coroutine;
m_suspended = m_coro.promise().suspend_awaiter(m_waiter);
m_awaiter.m_continuation = awaiting_coroutine;
m_suspended = m_coro.promise().suspend_awaiter(m_awaiter);
reset_async_top_frame(awaiting_coroutine.promise().get_async_stack_frame());
return m_suspended;
}
Expand Down
22 changes: 11 additions & 11 deletions runtime-light/coroutine/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,24 +154,24 @@ struct task {

task() noexcept = default;

explicit task(std::coroutine_handle<> coro) noexcept
: m_haddress(coro.address()) {}
explicit task(std::coroutine_handle<promise_type> coro) noexcept
: m_coro(coro) {}

task(const task& other) noexcept = delete;

task(task&& other) noexcept
: m_haddress(std::exchange(other.m_haddress, nullptr)) {}
: m_coro(std::exchange(other.m_coro, {})) {}

task& operator=(const task& other) noexcept = delete;

task& operator=(task&& other) noexcept {
std::swap(m_haddress, other.m_haddress);
std::swap(m_coro, other.m_coro);
return *this;
}

~task() {
if (m_haddress) {
std::coroutine_handle<promise_type>::from_address(m_haddress).destroy();
if (m_coro) {
m_coro.destroy();
}
}

Expand Down Expand Up @@ -217,28 +217,28 @@ struct task {
return awaiter_base::m_coro.promise().result();
}
};
return awaiter{std::coroutine_handle<promise_type>::from_address(m_haddress)};
return awaiter{m_coro};
}

auto get_handle() noexcept -> std::coroutine_handle<promise_type> {
return std::coroutine_handle<promise_type>::from_address(m_haddress);
return m_coro;
}

// conversion functions
//
// erase type
explicit operator task<>() && noexcept {
return task<>{std::coroutine_handle<>::from_address(std::exchange(m_haddress, nullptr))};
return task<>{std::exchange(m_coro, {})};
}
// restore erased type
template<typename U>
requires(std::same_as<void, T>)
explicit operator task<U>() && noexcept {
return task<U>{std::coroutine_handle<>::from_address(std::exchange(m_haddress, nullptr))};
return task<U>{std::exchange(m_coro, {})};
}

private:
void* m_haddress{};
std::coroutine_handle<promise_type> m_coro{};
};

} // namespace kphp::coro
4 changes: 3 additions & 1 deletion runtime-light/state/instance-state.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "runtime-light/stdlib/math/random-state.h"
#include "runtime-light/stdlib/output/output-state.h"
#include "runtime-light/stdlib/rpc/rpc-client-state.h"
#include "runtime-light/stdlib/rpc/rpc-queue-state.h"
#include "runtime-light/stdlib/serialization/serialization-state.h"
#include "runtime-light/stdlib/string/regex-state.h"
#include "runtime-light/stdlib/string/string-state.h"
Expand Down Expand Up @@ -94,6 +95,7 @@ struct InstanceState final : vk::not_copyable {
CoroutineInstanceState coroutine_instance_state;
ForkInstanceState fork_instance_state;
WaitQueueInstanceState wait_queue_instance_state;
RpcQueueInstanceState rpc_queue_instance_state;
PhpScriptMutableGlobals php_script_mutable_globals_singleton;

RuntimeContext runtime_context;
Expand Down Expand Up @@ -133,5 +135,5 @@ struct InstanceState final : vk::not_copyable {
enum image_kind image_kind_ { image_kind::invalid };
enum instance_kind instance_kind_ { instance_kind::invalid };

static constexpr auto INIT_INSTANCE_ALLOCATOR_SIZE = static_cast<size_t>(16U * 1024U * 1024U);
static constexpr auto INIT_INSTANCE_ALLOCATOR_SIZE = static_cast<size_t>(256U * 1024U * 1024U);
};
12 changes: 6 additions & 6 deletions runtime-light/stdlib/fork/fork-state.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,13 @@ struct ForkInstanceState final : private vk::not_copyable {

template<typename return_type>
std::pair<int64_t, kphp::coro::shared_task<return_type>> create_fork(kphp::coro::task<return_type> task) noexcept {
static constexpr auto fork_coroutine{[](kphp::coro::task<return_type> task, int64_t fork_id) noexcept -> kphp::coro::shared_task<return_type> {
ForkInstanceState::get().current_id = fork_id;
co_return co_await std::move(task);
}};

const int64_t fork_id{next_fork_id++};
auto fork_task{std::invoke(
[](kphp::coro::task<return_type> task, int64_t fork_id) noexcept -> kphp::coro::shared_task<return_type> {
ForkInstanceState::get().current_id = fork_id;
co_return co_await std::move(task);
},
std::move(task), fork_id)};
auto fork_task{std::invoke(fork_coroutine, std::move(task), fork_id)};
forks.emplace(fork_id, fork_info{.awaited = {}, .thrown_exception = {}, .opt_handle = static_cast<kphp::coro::shared_task<>>(fork_task)});
return std::make_pair(fork_id, std::move(fork_task));
}
Expand Down
Loading
Loading