Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
266 changes: 161 additions & 105 deletions library.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ using namespace std::chrono_literals;
#define CERR(msg) TRACE(std::wcerr, msg)

namespace /*anon*/ {
static on_connect_t s_on_connect_cb{nullptr};
// static on_connect_t s_on_connect_cb{nullptr};
static on_fail_t s_on_fail_cb{nullptr};
static on_disconnect_t s_on_disconnect_cb{nullptr};
static on_data_t s_on_data_cb{nullptr};
Expand Down Expand Up @@ -116,7 +116,7 @@ namespace /*anon*/ {
tcp::resolver resolver_{ws_.get_executor()};

beast::flat_buffer buffer_;
std::wstring host_, path_; // path part in url. For example: /v2/ws
std::wstring host_, port_, path_; // path part in url. For example: /v2/ws

/// Print error related information in stderr
/// \param ec instance that contains error related information
Expand All @@ -140,6 +140,67 @@ namespace /*anon*/ {
public:
Session() = default;

short connect(wchar_t const* szServer)
{
VERBOSE(L"Connecting to the server: " << szServer);

static boost::wregex const s_pat(LR"(^wss?://([\w\.]+):(\d+)(.*)$)");

boost::wcmatch matches;
if (!boost::regex_match(szServer, matches, s_pat)) {
COUT(L"Failed to parse host & port. Correct example: ws://localhost:8080/");
return 0;
}

std::wstring path(boost::trim_copy(matches[3].str()));
if (path.empty())
path = L"/";

host_ = std::move(matches[1]);
port_ = std::move(matches[2]);
path_ = std::move(path);

boost::system::error_code ec;
auto const results = resolver_.resolve(utf8_encode(host_), utf8_encode(port_), ec);
if (ec)
{
COUT(L"Failed to resolve IP address");
return 0;
}

VERBOSE(L"IP address: " << results->endpoint());

// Set the timeout for the operation
beast::get_lowest_layer(ws_).expires_after(30s);

// Make the connection on the IP address we get from a lookup
beast::get_lowest_layer(ws_).connect(results);

// Turn off the timeout on the tcp_stream, because
// the websocket stream has its own timeout system.
beast::get_lowest_layer(ws_).expires_never();

// Set suggested timeout settings for the websocket
ws_.set_option(websocket::stream_base::timeout::suggested(beast::role_type::client));

// Set a decorator to change the User-Agent of the handshake
ws_.set_option(websocket::stream_base::decorator([](websocket::request_type& req) {
req.set(http::field::user_agent,
std::string(BOOST_BEAST_VERSION_STRING) + " WsDll");
}));

// Perform the websocket handshake

// Host HTTP header includes the port. See https://tools.ietf.org/html/rfc7230#section-5.4
ws_.handshake(utf8_encode(host_) + ":" + utf8_encode(port_), utf8_encode(path_));

// Send the message
VERBOSE(L"Issue async_read in on_handshake");
ws_.async_read(buffer_, beast::bind_front_handler(&Session::on_read, shared_from_this()));

return 1;
}

/// Send message to remote websocket server
/// \param data to be sent
void send_message(std::wstring const& data)
Expand All @@ -153,24 +214,30 @@ namespace /*anon*/ {
/// registered callback function to deal with close event.
void disconnect()
{
post(ws_.get_executor(), std::bind(&Session::do_disconnect, shared_from_this()));
// post(ws_.get_executor(), std::bind(&Session::do_disconnect, shared_from_this()));
VERBOSE(L"Disconnecting");
get_lowest_layer(ws_).cancel(); // cause all async operations to abort

if (!Manager::Clear(shared_from_this())) {
// CERR(L"Could not remove active session"); // redundant message when Sessions::Install fails
}
}

/// Start the asynchronous operation
/// \param host host to be connected
/// \param port tcp port to be connected
void run(std::wstring host, std::wstring port, std::wstring path)
{
// Save these for later
host_ = std::move(host);
path_ = std::move(path);
// void run(std::wstring host, std::wstring port, std::wstring path)
// {
// // Save these for later
// host_ = std::move(host);
// path_ = std::move(path);

VERBOSE(L"Run host_: " << host_ << L", port: " << port << L", path_: " << path_);
// VERBOSE(L"Run host_: " << host_ << L", port: " << port << L", path_: " << path_);

// Look up the domain name
resolver_.async_resolve(utf8_encode(host_), utf8_encode(port),
beast::bind_front_handler(&Session::on_resolve, shared_from_this()));
}
// // Look up the domain name
// resolver_.async_resolve(utf8_encode(host_), utf8_encode(port),
// beast::bind_front_handler(&Session::on_resolve, shared_from_this()));
// }

private: // all private (do_*/on_*) assumed on strand
std::deque<std::string> _outbox; // NOTE: reference stability of elements
Expand All @@ -184,72 +251,72 @@ namespace /*anon*/ {
do_write_loop();
}

void do_disconnect()
{
VERBOSE(L"Disconnecting");
ws_.async_close(websocket::close_code::normal,
beast::bind_front_handler(&Session::on_close, shared_from_this()));
}
// void do_disconnect()
// {
// VERBOSE(L"Disconnecting");
// ws_.async_close(websocket::close_code::normal,
// beast::bind_front_handler(&Session::on_close, shared_from_this()));
// }

/// Callback function registered by async_resolve method. It is
/// called after resolve operation is done. It will call
/// async_connect to issue async connecting operation with
/// callback function
/// \param ec
/// \param results
void on_resolve(beast::error_code ec, tcp::resolver::results_type const& results)
{
VERBOSE(L"In on_resolve");
if (ec)
return fail(ec, L"resolve");

// Set the timeout for the operation
beast::get_lowest_layer(ws_).expires_after(30s);

// Make the connection on the IP address we get from a lookup
beast::get_lowest_layer(ws_).async_connect(
results, beast::bind_front_handler(&Session::on_connect, shared_from_this()));
}

void on_connect(beast::error_code ec, tcp::resolver::results_type::endpoint_type ep)
{
VERBOSE(L"In on_connect");
if (ec)
return fail(ec, L"connect");

// Turn off the timeout on the tcp_stream, because
// the websocket stream has its own timeout system.
beast::get_lowest_layer(ws_).expires_never();

// Set suggested timeout settings for the websocket
ws_.set_option(websocket::stream_base::timeout::suggested(beast::role_type::client));

// Set a decorator to change the User-Agent of the handshake
ws_.set_option(websocket::stream_base::decorator([](websocket::request_type& req) {
req.set(http::field::user_agent,
std::string(BOOST_BEAST_VERSION_STRING) + " WsDll");
}));

// Perform the websocket handshake

// Host HTTP header includes the port. See https://tools.ietf.org/html/rfc7230#section-5.4
ws_.async_handshake(utf8_encode(host_) + ":" + std::to_string(ep.port()), utf8_encode(path_),
beast::bind_front_handler(&Session::on_handshake, shared_from_this()));
}

void on_handshake(beast::error_code ec)
{
VERBOSE(L"In on_handshake");
if (ec)
return fail(ec, L"handshake");

if (s_on_connect_cb)
s_on_connect_cb();

// Send the message
VERBOSE(L"Issue async_read in on_handshake");
ws_.async_read(buffer_, beast::bind_front_handler(&Session::on_read, shared_from_this()));
}
// void on_resolve(beast::error_code ec, tcp::resolver::results_type const& results)
// {
// VERBOSE(L"In on_resolve");
// if (ec)
// return fail(ec, L"resolve");

// // Set the timeout for the operation
// beast::get_lowest_layer(ws_).expires_after(30s);

// // Make the connection on the IP address we get from a lookup
// beast::get_lowest_layer(ws_).async_connect(
// results, beast::bind_front_handler(&Session::on_connect, shared_from_this()));
// }

// void on_connect(beast::error_code ec, tcp::resolver::results_type::endpoint_type ep)
// {
// VERBOSE(L"In on_connect");
// if (ec)
// return fail(ec, L"connect");

// // Turn off the timeout on the tcp_stream, because
// // the websocket stream has its own timeout system.
// beast::get_lowest_layer(ws_).expires_never();

// // Set suggested timeout settings for the websocket
// ws_.set_option(websocket::stream_base::timeout::suggested(beast::role_type::client));

// // Set a decorator to change the User-Agent of the handshake
// ws_.set_option(websocket::stream_base::decorator([](websocket::request_type& req) {
// req.set(http::field::user_agent,
// std::string(BOOST_BEAST_VERSION_STRING) + " WsDll");
// }));

// // Perform the websocket handshake

// // Host HTTP header includes the port. See https://tools.ietf.org/html/rfc7230#section-5.4
// ws_.async_handshake(utf8_encode(host_) + ":" + std::to_string(ep.port()), utf8_encode(path_),
// beast::bind_front_handler(&Session::on_handshake, shared_from_this()));
// }

// void on_handshake(beast::error_code ec)
// {
// VERBOSE(L"In on_handshake");
// if (ec)
// return fail(ec, L"handshake");

// if (s_on_connect_cb)
// s_on_connect_cb();

// // Send the message
// VERBOSE(L"Issue async_read in on_handshake");
// ws_.async_read(buffer_, beast::bind_front_handler(&Session::on_read, shared_from_this()));
// }

void do_write_loop()
{
Expand Down Expand Up @@ -298,21 +365,21 @@ namespace /*anon*/ {
/// Only called when client proactively closes connection by calling
/// websocket_disconnect.
/// \param ec instance of error code
void on_close(beast::error_code ec)
{
VERBOSE(L"In on_close");
if (ec)
fail(ec, L"close");
// void on_close(beast::error_code ec)
// {
// VERBOSE(L"In on_close");
// if (ec)
// fail(ec, L"close");

if (s_on_disconnect_cb)
s_on_disconnect_cb();
// if (s_on_disconnect_cb)
// s_on_disconnect_cb();

get_lowest_layer(ws_).cancel(); // cause all async operations to abort
// get_lowest_layer(ws_).cancel(); // cause all async operations to abort

if (!Manager::Clear(shared_from_this())) {
// CERR(L"Could not remove active session"); // redundant message when Sessions::Install fails
}
}
// if (!Manager::Clear(shared_from_this())) {
// // CERR(L"Could not remove active session"); // redundant message when Sessions::Install fails
// }
// }
};
}

Expand All @@ -331,23 +398,12 @@ WSDLLAPI size_t websocket_connect(wchar_t const* szServer)
}
assert(new_session == Manager::Active());

VERBOSE(L"Connecting to the server: " << szServer);

static boost::wregex const s_pat(LR"(^wss?://([\w\.]+):(\d+)(.*)$)");

boost::wcmatch matches;
if (!boost::regex_match(szServer, matches, s_pat)) {
COUT(L"Failed to parse host & port. Correct example: ws://localhost:8080/");
if (!new_session->connect(szServer))
{
COUT(L"Connect attempt failed.");
return 0;
}

std::wstring path(boost::trim_copy(matches[3].str()));
if (path.empty())
path = L"/";

new_session->run(matches[1], matches[2], std::move(path));

return 1;
return 1;
}

WSDLLAPI size_t websocket_disconnect()
Expand Down Expand Up @@ -375,13 +431,13 @@ WSDLLAPI size_t websocket_isconnected()
return Manager::Active() != nullptr;
}

WSDLLAPI size_t websocket_register_on_connect_cb(size_t dwAddress)
{
VERBOSE(L"Registering on_connect callback");
s_on_connect_cb = reinterpret_cast<on_connect_t>(dwAddress);
// WSDLLAPI size_t websocket_register_on_connect_cb(size_t dwAddress)
// {
// VERBOSE(L"Registering on_connect callback");
// s_on_connect_cb = reinterpret_cast<on_connect_t>(dwAddress);

return 1;
}
// return 1;
// }

WSDLLAPI size_t websocket_register_on_fail_cb(size_t dwAddress)
{
Expand Down
2 changes: 1 addition & 1 deletion library.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

#include <cstddef>
extern "C" {
typedef void (*on_connect_t)();
// typedef void (*on_connect_t)();
typedef void (*on_fail_t)(wchar_t const* from);
typedef void (*on_disconnect_t)();
typedef void (*on_data_t)(wchar_t const*, size_t);
Expand Down