1010#include < memory>
1111#include < utility>
1212
13+ #include " runtime-common/core/runtime-core.h"
14+ #include " runtime-light/coroutine/event.h"
1315#include " runtime-light/k2-platform/k2-api.h"
16+ #include " runtime-light/stdlib/diagnostics/logs.h"
1417#include " runtime-light/streams/stream.h"
15- #include " runtime-light/streams/watcher.h"
1618
1719namespace kphp ::component {
1820
1921class connection {
22+ struct shared_state : refcountable_php_classes<shared_state> {
23+ std::optional<kphp::coro::event> m_unwatch_event;
24+
25+ shared_state () noexcept = default ;
26+ shared_state (shared_state&&) noexcept = default ;
27+ shared_state& operator =(shared_state&&) noexcept = default ;
28+ ~shared_state () = default ;
29+
30+ shared_state (const shared_state&) = delete ;
31+ shared_state operator =(const shared_state&) = delete ;
32+ };
33+
34+ class_instance<shared_state> m_shared_state;
2035 kphp::component::stream m_stream;
21- kphp::component::watcher m_watcher;
2236 uint32_t m_ignore_abort_level{};
2337
24- connection (kphp::component::stream&& stream, kphp::component::watcher&& watcher) noexcept
25- : m_stream(std::move(stream)),
26- m_watcher (std::move(watcher)) {}
38+ explicit connection (kphp::component::stream&& stream) noexcept ;
2739
2840public:
2941 connection () = delete ;
3042 connection (const connection&) = delete ;
3143 auto operator =(const connection&) = delete ;
3244
3345 connection (connection&&) noexcept = default ;
34- auto operator =(connection&&) noexcept -> connection& = default ;
35- ~connection () = default ;
46+ auto operator =(connection&& other) noexcept -> connection&;
47+
48+ ~connection () {
49+ unregister_abort_handler ();
50+ }
3651
3752 static auto from_stream (kphp::component::stream&& stream) noexcept -> std::expected<connection, int32_t>;
3853
@@ -41,7 +56,6 @@ class connection {
4156 auto increase_ignore_abort_level () noexcept -> void;
4257 auto decrease_ignore_abort_level () noexcept -> void;
4358 auto get_ignore_abort_level () const noexcept -> uint32_t;
44-
4559 auto is_aborted () const noexcept -> bool;
4660
4761 template <std::invocable on_abort_handler_type>
@@ -51,20 +65,26 @@ class connection {
5165
5266// ================================================================================================
5367
54- inline auto connection::from_stream (kphp::component::stream&& stream) noexcept -> std::expected<connection, int32_t> {
55- k2::StreamStatus stream_status{};
56- k2::stream_status (stream.descriptor (), std::addressof (stream_status));
57- if (stream_status.libc_errno != k2::errno_ok || stream_status.write_status == k2::IOStatus::IOClosed) [[unlikely]] {
58- return std::unexpected{stream_status.libc_errno != k2::errno_ok ? stream_status.libc_errno : k2::errno_eshutdown};
59- }
68+ inline connection::connection (kphp::component::stream&& stream) noexcept
69+ : m_stream(std::move(stream)) {
70+ kphp::log::assertion (!m_shared_state.alloc ().is_null ());
71+ }
6072
61- auto expected_watcher{kphp::component::watcher::create (stream.descriptor ())};
62- if (!expected_watcher) [[unlikely]] {
63- return std::unexpected{expected_watcher.error ()};
73+ inline auto connection::operator =(connection&& other) noexcept -> connection& {
74+ if (this != std::addressof (other)) {
75+ unregister_abort_handler ();
76+ m_shared_state = std::move (other.m_shared_state );
77+ m_stream = std::move (other.m_stream );
78+ m_ignore_abort_level = other.m_ignore_abort_level ;
6479 }
80+ return *this ;
81+ }
6582
66- auto watcher{*std::move (expected_watcher)};
67- return connection{std::move (stream), std::move (watcher)};
83+ inline auto connection::from_stream (kphp::component::stream&& stream) noexcept -> std::expected<connection, int32_t> {
84+ if (const auto status{stream.status ()}; status.libc_errno != k2::errno_ok) [[unlikely]] {
85+ return std::unexpected{status.libc_errno };
86+ }
87+ return connection{std::move (stream)};
6888}
6989
7090inline auto connection::get_stream () noexcept -> kphp::component::stream& {
@@ -91,11 +111,67 @@ inline auto connection::is_aborted() const noexcept -> bool {
91111
92112template <std::invocable on_abort_handler_type>
93113auto connection::register_abort_handler (on_abort_handler_type&& h) noexcept -> std::expected<void, int32_t> {
94- return m_watcher.watch (std::forward<on_abort_handler_type>(h));
114+ if (m_shared_state.is_null ()) [[unlikely]] {
115+ return std::unexpected{k2::errno_eshutdown};
116+ }
117+ if (m_shared_state.get ()->m_unwatch_event .has_value ()) [[unlikely]] { // already registered
118+ return std::unexpected{k2::errno_ealready};
119+ }
120+
121+ if (const auto status{m_stream.status ()}; status.libc_errno != k2::errno_ok || status.write_status == k2::IOStatus::IOClosed) [[unlikely]] {
122+ return std::unexpected{status.libc_errno != k2::errno_ok ? status.libc_errno : k2::errno_ecanceled};
123+ }
124+
125+ static constexpr auto watcher{[](k2::descriptor descriptor, class_instance<shared_state> state, on_abort_handler_type h) noexcept -> kphp::coro::task<> {
126+ static constexpr auto unwatch_awaiter{[](class_instance<shared_state> state) noexcept -> kphp::coro::task<> {
127+ kphp::log::assertion (state.get ()->m_unwatch_event .has_value ());
128+ co_await *state.get ()->m_unwatch_event ;
129+ }};
130+
131+ static constexpr auto descriptor_awaiter{[](k2::descriptor descriptor) noexcept -> kphp::coro::task<std::monostate> {
132+ k2::StreamStatus stream_status{};
133+ auto & io_scheduler{kphp::coro::io_scheduler::get ()};
134+ for (;;) { // FIXME it should actually use scheduler.poll
135+ k2::stream_status (descriptor, std::addressof (stream_status));
136+ if (stream_status.write_status == k2::IOStatus::IOClosed) {
137+ co_return std::monostate{};
138+ }
139+
140+ using namespace std ::chrono_literals;
141+ co_await io_scheduler.schedule (150ms);
142+ }
143+ }};
144+
145+ kphp::log::assertion (!state.is_null ());
146+ if (!state.get ()->m_unwatch_event .has_value ()) { // already unregistered
147+ co_return ;
148+ }
149+
150+ const auto finalizer{vk::finally ([state] noexcept { state.get ()->m_unwatch_event .reset (); })};
151+ const auto v{co_await kphp::coro::when_any (unwatch_awaiter (std::move (state)), descriptor_awaiter (descriptor))};
152+ if (std::holds_alternative<std::monostate>(v)) {
153+ if constexpr (kphp::coro::is_async_function_v<on_abort_handler_type>) {
154+ co_await std::invoke (std::move (h));
155+ } else {
156+ std::invoke (std::move (h));
157+ }
158+ }
159+ }};
160+
161+ m_shared_state.get ()->m_unwatch_event .emplace ();
162+ if (!kphp::coro::io_scheduler::get ().spawn (watcher (m_stream.descriptor (), m_shared_state, std::forward<on_abort_handler_type>(h)))) [[unlikely]] {
163+ m_shared_state.get ()->m_unwatch_event .reset ();
164+ return std::unexpected{k2::errno_ebusy};
165+ }
166+ return {};
95167}
96168
97169inline auto connection::unregister_abort_handler () noexcept -> void {
98- m_watcher.unwatch ();
170+ if (m_shared_state.is_null () || !m_shared_state.get ()->m_unwatch_event .has_value ()) {
171+ return ;
172+ }
173+ m_shared_state.get ()->m_unwatch_event ->set ();
174+ m_shared_state.get ()->m_unwatch_event .reset ();
99175}
100176
101177} // namespace kphp::component
0 commit comments