1010#include < memory>
1111#include < utility>
1212
13+ #include " runtime-common/core/runtime-core.h"
14+ #include " runtime-light/coroutine/event.h"
1315#include " runtime-light/k2-platform/k2-api.h"
1416#include " runtime-light/streams/stream.h"
15- #include " runtime-light/streams/watcher.h"
1617
1718namespace kphp ::component {
1819
1920class connection {
21+ struct shared_state : refcountable_php_classes<shared_state> {
22+ std::optional<kphp::coro::event> m_unwatch_event;
23+
24+ shared_state () noexcept = default ;
25+ shared_state (shared_state&&) noexcept = default ;
26+ shared_state& operator =(shared_state&&) noexcept = default ;
27+ ~shared_state () = default ;
28+
29+ shared_state (const shared_state&) = delete ;
30+ shared_state operator =(const shared_state&) = delete ;
31+ };
32+
33+ class_instance<shared_state> m_shared_state;
2034 kphp::component::stream m_stream;
21- kphp::component::watcher m_watcher;
2235 uint32_t m_ignore_abort_level{};
2336
24- connection (kphp::component::stream&& stream, kphp::component::watcher&& watcher) noexcept
25- : m_stream(std::move(stream)),
26- m_watcher (std::move(watcher)) {}
37+ explicit connection (kphp::component::stream&& stream) noexcept ;
2738
2839public:
2940 connection () = delete ;
3041 connection (const connection&) = delete ;
3142 auto operator =(const connection&) = delete ;
3243
3344 connection (connection&&) noexcept = default ;
34- auto operator =(connection&&) noexcept -> connection& = default ;
35- ~connection () = default ;
45+ auto operator =(connection&& other) noexcept -> connection&;
46+
47+ ~connection () {
48+ unregister_abort_handler ();
49+ }
3650
3751 static auto from_stream (kphp::component::stream&& stream) noexcept -> std::expected<connection, int32_t>;
3852
@@ -41,7 +55,6 @@ class connection {
4155 auto increase_ignore_abort_level () noexcept -> void;
4256 auto decrease_ignore_abort_level () noexcept -> void;
4357 auto get_ignore_abort_level () const noexcept -> uint32_t;
44-
4558 auto is_aborted () const noexcept -> bool;
4659
4760 template <std::invocable on_abort_handler_type>
@@ -51,20 +64,26 @@ class connection {
5164
5265// ================================================================================================
5366
54- inline auto connection::from_stream (kphp::component::stream&& stream) noexcept -> std::expected<connection, int32_t> {
55- k2::StreamStatus stream_status{};
56- k2::stream_status (stream.descriptor (), std::addressof (stream_status));
57- if (stream_status.libc_errno != k2::errno_ok || stream_status.write_status == k2::IOStatus::IOClosed) [[unlikely]] {
58- return std::unexpected{stream_status.libc_errno != k2::errno_ok ? stream_status.libc_errno : k2::errno_eshutdown};
59- }
67+ inline connection::connection (kphp::component::stream&& stream) noexcept
68+ : m_stream(std::move(stream)) {
69+ kphp::log::assertion (!m_shared_state.alloc ().is_null ());
70+ }
6071
61- auto expected_watcher{kphp::component::watcher::create (stream.descriptor ())};
62- if (!expected_watcher) [[unlikely]] {
63- return std::unexpected{expected_watcher.error ()};
72+ inline auto connection::operator =(connection&& other) noexcept -> connection& {
73+ if (this != std::addressof (other)) {
74+ unregister_abort_handler ();
75+ m_shared_state = std::move (other.m_shared_state );
76+ m_stream = std::move (other.m_stream );
77+ m_ignore_abort_level = other.m_ignore_abort_level ;
6478 }
79+ return *this ;
80+ }
6581
66- auto watcher{*std::move (expected_watcher)};
67- return connection{std::move (stream), std::move (watcher)};
82+ inline auto connection::from_stream (kphp::component::stream&& stream) noexcept -> std::expected<connection, int32_t> {
83+ if (const auto status{stream.status ()}; status.libc_errno != k2::errno_ok) [[unlikely]] {
84+ return std::unexpected{status.libc_errno };
85+ }
86+ return connection{std::move (stream)};
6887}
6988
7089inline auto connection::get_stream () noexcept -> kphp::component::stream& {
@@ -91,11 +110,61 @@ inline auto connection::is_aborted() const noexcept -> bool {
91110
92111template <std::invocable on_abort_handler_type>
93112auto connection::register_abort_handler (on_abort_handler_type&& h) noexcept -> std::expected<void, int32_t> {
94- return m_watcher.watch (std::forward<on_abort_handler_type>(h));
113+ if (m_shared_state.is_null ()) [[unlikely]] {
114+ return std::unexpected{k2::errno_eshutdown};
115+ }
116+ if (m_shared_state.get ()->m_unwatch_event .has_value ()) [[unlikely]] { // already registered
117+ return std::unexpected{k2::errno_ealready};
118+ }
119+
120+ if (const auto status{m_stream.status ()}; status.libc_errno != k2::errno_ok || status.write_status == k2::IOStatus::IOClosed) [[unlikely]] {
121+ return std::unexpected{status.libc_errno != k2::errno_ok ? status.libc_errno : k2::errno_ecanceled};
122+ }
123+
124+ static constexpr auto watcher{[](k2::descriptor descriptor, class_instance<shared_state> state, on_abort_handler_type h) noexcept -> kphp::coro::task<> {
125+ kphp::log::assertion (!state.is_null () && state.get ()->m_unwatch_event .has_value ());
126+ const auto finalizer{vk::finally ([state] noexcept { state.get ()->m_unwatch_event .reset (); })};
127+
128+ static constexpr auto unwatch_awaiter{[](class_instance<shared_state> state) noexcept -> kphp::coro::task<> { co_await *state.get ()->m_unwatch_event ; }};
129+
130+ static constexpr auto descriptor_awaiter{[](k2::descriptor descriptor) noexcept -> kphp::coro::task<std::monostate> {
131+ k2::StreamStatus stream_status{};
132+ auto & io_scheduler{kphp::coro::io_scheduler::get ()};
133+ for (;;) { // FIXME it should actually use scheduler.poll
134+ k2::stream_status (descriptor, std::addressof (stream_status));
135+ if (stream_status.write_status == k2::IOStatus::IOClosed) {
136+ co_return std::monostate{};
137+ }
138+
139+ using namespace std ::chrono_literals;
140+ co_await io_scheduler.schedule (150ms);
141+ }
142+ }};
143+
144+ const auto v{co_await kphp::coro::when_any (descriptor_awaiter (descriptor), unwatch_awaiter (std::move (state)))};
145+ if (std::holds_alternative<std::monostate>(v)) {
146+ if constexpr (kphp::coro::is_async_function_v<on_abort_handler_type>) {
147+ co_await std::invoke (std::move (h));
148+ } else {
149+ std::invoke (std::move (h));
150+ }
151+ }
152+ }};
153+
154+ m_shared_state.get ()->m_unwatch_event .emplace ();
155+ if (!kphp::coro::io_scheduler::get ().spawn (watcher (m_stream.descriptor (), m_shared_state, std::forward<on_abort_handler_type>(h)))) [[unlikely]] {
156+ m_shared_state.get ()->m_unwatch_event .reset ();
157+ return std::unexpected{k2::errno_ebusy};
158+ }
159+ return {};
95160}
96161
97162inline auto connection::unregister_abort_handler () noexcept -> void {
98- m_watcher.unwatch ();
163+ if (m_shared_state.is_null () || !m_shared_state.get ()->m_unwatch_event .has_value ()) {
164+ return ;
165+ }
166+ m_shared_state.get ()->m_unwatch_event ->set ();
167+ m_shared_state.get ()->m_unwatch_event .reset ();
99168}
100169
101170} // namespace kphp::component
0 commit comments