From 79e9fc92f00c4e5d2c6173416e2b30ffe9f9c312 Mon Sep 17 00:00:00 2001 From: Dan Pollak Date: Sun, 23 Apr 2023 17:50:42 -0500 Subject: [PATCH] async > sync --- library.cpp | 266 +++++++++++++++++++++++++++++++--------------------- library.h | 2 +- 2 files changed, 162 insertions(+), 106 deletions(-) diff --git a/library.cpp b/library.cpp index 4f95939..e389501 100644 --- a/library.cpp +++ b/library.cpp @@ -19,7 +19,7 @@ using namespace std::chrono_literals; #define CERR(msg) TRACE(std::wcerr, msg) namespace /*anon*/ { - static on_connect_t s_on_connect_cb{nullptr}; + // static on_connect_t s_on_connect_cb{nullptr}; static on_fail_t s_on_fail_cb{nullptr}; static on_disconnect_t s_on_disconnect_cb{nullptr}; static on_data_t s_on_data_cb{nullptr}; @@ -116,7 +116,7 @@ namespace /*anon*/ { tcp::resolver resolver_{ws_.get_executor()}; beast::flat_buffer buffer_; - std::wstring host_, path_; // path part in url. For example: /v2/ws + std::wstring host_, port_, path_; // path part in url. For example: /v2/ws /// Print error related information in stderr /// \param ec instance that contains error related information @@ -140,6 +140,67 @@ namespace /*anon*/ { public: Session() = default; + short connect(wchar_t const* szServer) + { + VERBOSE(L"Connecting to the server: " << szServer); + + static boost::wregex const s_pat(LR"(^wss?://([\w\.]+):(\d+)(.*)$)"); + + boost::wcmatch matches; + if (!boost::regex_match(szServer, matches, s_pat)) { + COUT(L"Failed to parse host & port. Correct example: ws://localhost:8080/"); + return 0; + } + + std::wstring path(boost::trim_copy(matches[3].str())); + if (path.empty()) + path = L"/"; + + host_ = std::move(matches[1]); + port_ = std::move(matches[2]); + path_ = std::move(path); + + boost::system::error_code ec; + auto const results = resolver_.resolve(utf8_encode(host_), utf8_encode(port_), ec); + if (ec) + { + COUT(L"Failed to resolve IP address"); + return 0; + } + + VERBOSE(L"IP address: " << results->endpoint()); + + // Set the timeout for the operation + beast::get_lowest_layer(ws_).expires_after(30s); + + // Make the connection on the IP address we get from a lookup + beast::get_lowest_layer(ws_).connect(results); + + // Turn off the timeout on the tcp_stream, because + // the websocket stream has its own timeout system. + beast::get_lowest_layer(ws_).expires_never(); + + // Set suggested timeout settings for the websocket + ws_.set_option(websocket::stream_base::timeout::suggested(beast::role_type::client)); + + // Set a decorator to change the User-Agent of the handshake + ws_.set_option(websocket::stream_base::decorator([](websocket::request_type& req) { + req.set(http::field::user_agent, + std::string(BOOST_BEAST_VERSION_STRING) + " WsDll"); + })); + + // Perform the websocket handshake + + // Host HTTP header includes the port. See https://tools.ietf.org/html/rfc7230#section-5.4 + ws_.handshake(utf8_encode(host_) + ":" + utf8_encode(port_), utf8_encode(path_)); + + // Send the message + VERBOSE(L"Issue async_read in on_handshake"); + ws_.async_read(buffer_, beast::bind_front_handler(&Session::on_read, shared_from_this())); + + return 1; + } + /// Send message to remote websocket server /// \param data to be sent void send_message(std::wstring const& data) @@ -153,24 +214,30 @@ namespace /*anon*/ { /// registered callback function to deal with close event. void disconnect() { - post(ws_.get_executor(), std::bind(&Session::do_disconnect, shared_from_this())); + // post(ws_.get_executor(), std::bind(&Session::do_disconnect, shared_from_this())); + VERBOSE(L"Disconnecting"); + get_lowest_layer(ws_).cancel(); // cause all async operations to abort + + if (!Manager::Clear(shared_from_this())) { + // CERR(L"Could not remove active session"); // redundant message when Sessions::Install fails + } } /// Start the asynchronous operation /// \param host host to be connected /// \param port tcp port to be connected - void run(std::wstring host, std::wstring port, std::wstring path) - { - // Save these for later - host_ = std::move(host); - path_ = std::move(path); + // void run(std::wstring host, std::wstring port, std::wstring path) + // { + // // Save these for later + // host_ = std::move(host); + // path_ = std::move(path); - VERBOSE(L"Run host_: " << host_ << L", port: " << port << L", path_: " << path_); + // VERBOSE(L"Run host_: " << host_ << L", port: " << port << L", path_: " << path_); - // Look up the domain name - resolver_.async_resolve(utf8_encode(host_), utf8_encode(port), - beast::bind_front_handler(&Session::on_resolve, shared_from_this())); - } + // // Look up the domain name + // resolver_.async_resolve(utf8_encode(host_), utf8_encode(port), + // beast::bind_front_handler(&Session::on_resolve, shared_from_this())); + // } private: // all private (do_*/on_*) assumed on strand std::deque _outbox; // NOTE: reference stability of elements @@ -184,12 +251,12 @@ namespace /*anon*/ { do_write_loop(); } - void do_disconnect() - { - VERBOSE(L"Disconnecting"); - ws_.async_close(websocket::close_code::normal, - beast::bind_front_handler(&Session::on_close, shared_from_this())); - } + // void do_disconnect() + // { + // VERBOSE(L"Disconnecting"); + // ws_.async_close(websocket::close_code::normal, + // beast::bind_front_handler(&Session::on_close, shared_from_this())); + // } /// Callback function registered by async_resolve method. It is /// called after resolve operation is done. It will call @@ -197,59 +264,59 @@ namespace /*anon*/ { /// callback function /// \param ec /// \param results - void on_resolve(beast::error_code ec, tcp::resolver::results_type const& results) - { - VERBOSE(L"In on_resolve"); - if (ec) - return fail(ec, L"resolve"); - - // Set the timeout for the operation - beast::get_lowest_layer(ws_).expires_after(30s); - - // Make the connection on the IP address we get from a lookup - beast::get_lowest_layer(ws_).async_connect( - results, beast::bind_front_handler(&Session::on_connect, shared_from_this())); - } - - void on_connect(beast::error_code ec, tcp::resolver::results_type::endpoint_type ep) - { - VERBOSE(L"In on_connect"); - if (ec) - return fail(ec, L"connect"); - - // Turn off the timeout on the tcp_stream, because - // the websocket stream has its own timeout system. - beast::get_lowest_layer(ws_).expires_never(); - - // Set suggested timeout settings for the websocket - ws_.set_option(websocket::stream_base::timeout::suggested(beast::role_type::client)); - - // Set a decorator to change the User-Agent of the handshake - ws_.set_option(websocket::stream_base::decorator([](websocket::request_type& req) { - req.set(http::field::user_agent, - std::string(BOOST_BEAST_VERSION_STRING) + " WsDll"); - })); - - // Perform the websocket handshake - - // Host HTTP header includes the port. See https://tools.ietf.org/html/rfc7230#section-5.4 - ws_.async_handshake(utf8_encode(host_) + ":" + std::to_string(ep.port()), utf8_encode(path_), - beast::bind_front_handler(&Session::on_handshake, shared_from_this())); - } - - void on_handshake(beast::error_code ec) - { - VERBOSE(L"In on_handshake"); - if (ec) - return fail(ec, L"handshake"); - - if (s_on_connect_cb) - s_on_connect_cb(); - - // Send the message - VERBOSE(L"Issue async_read in on_handshake"); - ws_.async_read(buffer_, beast::bind_front_handler(&Session::on_read, shared_from_this())); - } + // void on_resolve(beast::error_code ec, tcp::resolver::results_type const& results) + // { + // VERBOSE(L"In on_resolve"); + // if (ec) + // return fail(ec, L"resolve"); + + // // Set the timeout for the operation + // beast::get_lowest_layer(ws_).expires_after(30s); + + // // Make the connection on the IP address we get from a lookup + // beast::get_lowest_layer(ws_).async_connect( + // results, beast::bind_front_handler(&Session::on_connect, shared_from_this())); + // } + + // void on_connect(beast::error_code ec, tcp::resolver::results_type::endpoint_type ep) + // { + // VERBOSE(L"In on_connect"); + // if (ec) + // return fail(ec, L"connect"); + + // // Turn off the timeout on the tcp_stream, because + // // the websocket stream has its own timeout system. + // beast::get_lowest_layer(ws_).expires_never(); + + // // Set suggested timeout settings for the websocket + // ws_.set_option(websocket::stream_base::timeout::suggested(beast::role_type::client)); + + // // Set a decorator to change the User-Agent of the handshake + // ws_.set_option(websocket::stream_base::decorator([](websocket::request_type& req) { + // req.set(http::field::user_agent, + // std::string(BOOST_BEAST_VERSION_STRING) + " WsDll"); + // })); + + // // Perform the websocket handshake + + // // Host HTTP header includes the port. See https://tools.ietf.org/html/rfc7230#section-5.4 + // ws_.async_handshake(utf8_encode(host_) + ":" + std::to_string(ep.port()), utf8_encode(path_), + // beast::bind_front_handler(&Session::on_handshake, shared_from_this())); + // } + + // void on_handshake(beast::error_code ec) + // { + // VERBOSE(L"In on_handshake"); + // if (ec) + // return fail(ec, L"handshake"); + + // if (s_on_connect_cb) + // s_on_connect_cb(); + + // // Send the message + // VERBOSE(L"Issue async_read in on_handshake"); + // ws_.async_read(buffer_, beast::bind_front_handler(&Session::on_read, shared_from_this())); + // } void do_write_loop() { @@ -298,21 +365,21 @@ namespace /*anon*/ { /// Only called when client proactively closes connection by calling /// websocket_disconnect. /// \param ec instance of error code - void on_close(beast::error_code ec) - { - VERBOSE(L"In on_close"); - if (ec) - fail(ec, L"close"); + // void on_close(beast::error_code ec) + // { + // VERBOSE(L"In on_close"); + // if (ec) + // fail(ec, L"close"); - if (s_on_disconnect_cb) - s_on_disconnect_cb(); + // if (s_on_disconnect_cb) + // s_on_disconnect_cb(); - get_lowest_layer(ws_).cancel(); // cause all async operations to abort + // get_lowest_layer(ws_).cancel(); // cause all async operations to abort - if (!Manager::Clear(shared_from_this())) { - // CERR(L"Could not remove active session"); // redundant message when Sessions::Install fails - } - } + // if (!Manager::Clear(shared_from_this())) { + // // CERR(L"Could not remove active session"); // redundant message when Sessions::Install fails + // } + // } }; } @@ -331,23 +398,12 @@ WSDLLAPI size_t websocket_connect(wchar_t const* szServer) } assert(new_session == Manager::Active()); - VERBOSE(L"Connecting to the server: " << szServer); - - static boost::wregex const s_pat(LR"(^wss?://([\w\.]+):(\d+)(.*)$)"); - - boost::wcmatch matches; - if (!boost::regex_match(szServer, matches, s_pat)) { - COUT(L"Failed to parse host & port. Correct example: ws://localhost:8080/"); + if (!new_session->connect(szServer)) + { + COUT(L"Connect attempt failed."); return 0; } - - std::wstring path(boost::trim_copy(matches[3].str())); - if (path.empty()) - path = L"/"; - - new_session->run(matches[1], matches[2], std::move(path)); - - return 1; + return 1; } WSDLLAPI size_t websocket_disconnect() @@ -375,13 +431,13 @@ WSDLLAPI size_t websocket_isconnected() return Manager::Active() != nullptr; } -WSDLLAPI size_t websocket_register_on_connect_cb(size_t dwAddress) -{ - VERBOSE(L"Registering on_connect callback"); - s_on_connect_cb = reinterpret_cast(dwAddress); +// WSDLLAPI size_t websocket_register_on_connect_cb(size_t dwAddress) +// { +// VERBOSE(L"Registering on_connect callback"); +// s_on_connect_cb = reinterpret_cast(dwAddress); - return 1; -} +// return 1; +// } WSDLLAPI size_t websocket_register_on_fail_cb(size_t dwAddress) { diff --git a/library.h b/library.h index b4da75d..19d3ae1 100644 --- a/library.h +++ b/library.h @@ -24,7 +24,7 @@ #include extern "C" { - typedef void (*on_connect_t)(); + // typedef void (*on_connect_t)(); typedef void (*on_fail_t)(wchar_t const* from); typedef void (*on_disconnect_t)(); typedef void (*on_data_t)(wchar_t const*, size_t);