From bd2de14a3403af35145d473eea2d4b384822ef24 Mon Sep 17 00:00:00 2001 From: Chen Dongxiao Date: Mon, 6 Nov 2023 14:27:08 +0800 Subject: [PATCH] dht thread change socket select to libuv (#36) --- src/CMakeLists.txt | 4 +- src/addons/activeproxy/activeproxy.cc | 2 +- src/addons/activeproxy/connection.cc | 2 +- .../exceptions}/exceptions.h | 18 +- src/core/rpcserver.cc | 466 ++++++++---------- src/core/rpcserver.h | 40 +- src/core/value.cc | 6 +- 7 files changed, 245 insertions(+), 293 deletions(-) rename src/{addons/activeproxy => core/exceptions}/exceptions.h (78%) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 2c6847e..897d6d6 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -114,11 +114,13 @@ list(APPEND BOSON_DEPENDS spdlog-setup nlohmann libutf8proc + libuv ) set(LIBS utf8proc - sqlite3) + sqlite3 + uv_a) if(WIN32) set(LIBS diff --git a/src/addons/activeproxy/activeproxy.cc b/src/addons/activeproxy/activeproxy.cc index 3431eb0..3b49a4f 100644 --- a/src/addons/activeproxy/activeproxy.cc +++ b/src/addons/activeproxy/activeproxy.cc @@ -44,7 +44,7 @@ #include "activeproxy.h" #include "connection.h" -#include "exceptions.h" +#include "exceptions/exceptions.h" #include "utils/addr.h" #include "crypto/hex.h" diff --git a/src/addons/activeproxy/connection.cc b/src/addons/activeproxy/connection.cc index 2eb4a89..d7888c3 100644 --- a/src/addons/activeproxy/connection.cc +++ b/src/addons/activeproxy/connection.cc @@ -35,7 +35,7 @@ #include "connection.h" #include "activeproxy.h" #include "packettype.h" -#include "exceptions.h" +#include "exceptions/exceptions.h" #include "utils/log.h" namespace boson { diff --git a/src/addons/activeproxy/exceptions.h b/src/core/exceptions/exceptions.h similarity index 78% rename from src/addons/activeproxy/exceptions.h rename to src/core/exceptions/exceptions.h index c33e5dd..202a466 100644 --- a/src/addons/activeproxy/exceptions.h +++ b/src/core/exceptions/exceptions.h @@ -1,6 +1,10 @@ /* +<<<<<<< HEAD:src/addons/activeproxy/exceptions.h * Copyright (c) 2022 - 2023 trinity-tech.io * Copyright (c) 2023 - ~ bosonnetwork.io +======= + * Copyright (c) 2022 -2023 trinity-tech.io +>>>>>>> 821f2f0 (dht thread change socket select to libuv (#36)):src/core/exceptions/exceptions.h * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -27,7 +31,6 @@ #include namespace boson { -namespace activeproxy { class networking_error : public std::runtime_error { @@ -49,5 +52,16 @@ class state_error : public std::runtime_error virtual ~state_error() noexcept = default; }; -} // namespace activeproxy +inline void checkArgument(bool expression, const std::string& errorMessage) { + if (!expression) { + throw std::invalid_argument(errorMessage); + } +} + +inline void checkState(bool expression, const std::string& errorMessage) { + if (!expression) { + throw state_error(errorMessage); + } +} + } // namespace boson diff --git a/src/core/rpcserver.cc b/src/core/rpcserver.cc index 3e5a447..f8bb71f 100644 --- a/src/core/rpcserver.cc +++ b/src/core/rpcserver.cc @@ -29,20 +29,43 @@ #include #endif +#include "dht.h" +#include "error_code.h" +#include "constants.h" +#include "rpcserver.h" +#include "rpcstatistics.h" #include "boson/node.h" #include "utils/time.h" #include "utils/random_generator.h" #include "exceptions/dht_error.h" +#include "exceptions/exceptions.h" #include "messages/message.h" #include "messages/error_message.h" -#include "error_code.h" -#include "constants.h" -#include "rpcserver.h" -#include "dht.h" -#include "rpcstatistics.h" namespace boson { +static const uint32_t PERIODIC_INTERVAL = 100; // 100 milliseconds +static const size_t MAX_DATA_PACKET_SIZE = 0x7FFF; // 32767 + +struct SendRequest { + uv_udp_send_t request; + uv_buf_t buf; + Sp msg {}; + RPCServer* rpcServer; + + SendRequest(RPCServer* svr, size_t size, Sp& _msg) { + request.data = svr; + rpcServer = svr; + msg = _msg; + buf.base = new char[size]; + buf.len = buf.base ? size : 0; + } + + ~SendRequest() { + delete[] buf.base; + }; +}; + RPCServer::RPCServer(Node& _node, const Sp _dht4, const Sp _dht6): node(_node), dht4(_dht4 ? std::optional>(*_dht4) : std::nullopt), dht6(_dht6 ? std::optional>(*_dht6) : std::nullopt) { @@ -51,306 +74,140 @@ RPCServer::RPCServer(Node& _node, const Sp _dht4, const Sp _dht6): nod log = Logger::get("RpcServer"); - SocketAddress bind4, bind6; if (_dht4 != nullptr) bind4 = _dht4->getOrigin(); if (_dht6 != nullptr) bind6 = _dht6->getOrigin(); - bindSockets(bind4, bind6); + readBuffer.resize(MAX_DATA_PACKET_SIZE); } RPCServer::~RPCServer() { stop(); - if (rcv_thread.joinable()) - rcv_thread.join(); } -static bool setNonblocking(int fd, bool nonblocking = true) -{ -#ifdef _WIN32 - unsigned long mode = !!nonblocking; - int rc = ioctlsocket(fd, FIONBIO, &mode); - return rc == 0; -#else - int flags = fcntl(fd, F_GETFL, 0); - if (flags == -1) - return false; - - if (nonblocking) { - flags |= O_NONBLOCK; - } else { - flags &= ~O_NONBLOCK; +void RPCServer::readStart(uv_udp_t* handle, const SocketAddress& bind) { + auto rc = uv_udp_init(&loop, handle); + if (rc < 0) { + failHandler(rc, "initialize the udp"); } - return fcntl(fd, F_SETFL, flags) >= 0; -#endif -} - -static int bindSocket(const SocketAddress& addr, SocketAddress& bound) -{ - int sock = socket(addr.family(), SOCK_DGRAM, 0); - if (sock < 0) - throw std::runtime_error("Failed to create socket: " + std::string(std::strerror(errno))); - int set = 1; -#ifdef SO_NOSIGPIPE - setsockopt(sock, SOL_SOCKET, SO_NOSIGPIPE, (char*)&set, sizeof(set)); -#endif - if (addr.family() == AF_INET6) - setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (char*)&set, sizeof(set)); - - setNonblocking(sock); - int rc = bind(sock, addr.addr(), addr.length()); + handle->data = this; + rc = uv_udp_bind(handle, (const struct sockaddr*) bind.addr(), 0); if (rc < 0) { -#if defined(_WIN32) || defined(_WIN64) - closesocket(sock); -#else - close(sock); -#endif - throw std::runtime_error("Can't bind socket on " + addr.toString() + " " + std::string(std::strerror(errno))); + failHandler(rc, "bind the udp"); } - sockaddr_storage bound_addr; - socklen_t bound_addr_len = sizeof(bound_addr); - getsockname(sock, reinterpret_cast(&bound_addr), &bound_addr_len); - bound = {bound_addr}; - return sock; + rc = uv_udp_recv_start(handle, [](uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { + RPCServer* svr = (RPCServer*)handle->data; + buf->base = (char *)svr->readBuffer.data(); //need to use two buf? one for upd4 and udp5 + buf->len = svr->readBuffer.size(); + }, [](uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, const struct sockaddr *addr, unsigned flags) { + RPCServer* svr = (RPCServer*)handle->data; + if (nread > 0) { + svr->handlePacket((uint8_t*)buf->base, nread, addr); + } + }); + if (rc < 0) { + failHandler(rc, "receive the udp start"); + } } -int RPCServer::sendData(Sp& msg) { - const auto& remoteAddr = msg->getRemoteAddress(); - - if (!remoteAddr) { - auto failureReason = "Remote address is invalid, need to be specified!"; - log->error("send data failed: {}", failureReason); - throw std::runtime_error(failureReason); - } +void RPCServer::sendData(Sp& msg) { - int sockfd = -1; + const auto& remoteAddr = msg->getRemoteAddress(); + uv_udp_t* handle; switch (remoteAddr.family()) { case AF_INET: - sockfd = sock4; + handle = &udp4Handle; break; case AF_INET6: - sockfd = sock6; + handle = &udp6Handle; break; default: throw std::runtime_error("Unsupported address family!"); } - if (sockfd < 0) - throw std::runtime_error("Socket fd is error!!!"); - - int flags = 0; - #ifdef MSG_NOSIGNAL - flags |= MSG_NOSIGNAL; - #endif - #ifdef MSG_CONFIRM - //if (replied) - //flags |= MSG_CONFIRM; - #endif - auto buffer = msg->serialize(); auto encrypted = node.encrypt(msg->getRemoteId(), {buffer}); - buffer.resize(ID_BYTES + encrypted.size()); - std::memcpy(buffer.data(), msg->getId().data(), ID_BYTES); - std::memcpy(buffer.data() + ID_BYTES, encrypted.data(), encrypted.size()); + size_t size = ID_BYTES + encrypted.size(); + SendRequest* request = new SendRequest{this, size, msg}; + uint8_t* ptr = (uint8_t*)request->buf.base; + std::memcpy(ptr, msg->getId().data(), ID_BYTES); + std::memcpy(ptr + ID_BYTES, encrypted.data(), encrypted.size()); + + log->debug("Send {} to server {}.", msg->toString(), remoteAddr.toString()); + auto rc = uv_udp_send((uv_udp_send_t*)request, handle, &request->buf, 1, remoteAddr.addr(), [](uv_udp_send_t *req, int status) { + SendRequest* request = (SendRequest*)req; + RPCServer* svr = request->rpcServer; + Sp& msg = request->msg; + + if (status < 0) { + svr->messageQueue.push(msg); + svr->log->error("Send {} to {} failed({}): {}", + msg->toString(), msg->getRemoteAddress().toString(), status, uv_strerror(status)); + } + else { + svr->stats.onSentBytes(request->buf.len); + svr->stats.onSentMessage(*msg); - int ret = sendto(sockfd, (char*)buffer.data(), buffer.size(), flags, remoteAddr.addr(), remoteAddr.length()); - if (ret == 0 || (ret == -1 && errno == EAGAIN)) { + svr->log->debug("Sent {}/{} to {}: [{}] {}", msg->getMethodString(), msg->getTypeString(), + msg->getRemoteAddress().toString(), request->buf.len, msg->toString()); + } + + delete request; + }); + if (rc < 0) { messageQueue.push(msg); - return EAGAIN; - } else if (ret == -1) { - log->debug("Failed to send message to {}: {}", remoteAddr.toString(), std::strerror(errno)); - return errno; - } else { - stats.onSentBytes(buffer.size()); - stats.onSentMessage(*msg); - - log->debug("Sent {}/{} to {}: [{}] {}", msg->getMethodString(), msg->getTypeString(), - msg->getRemoteAddress().toString(), buffer.size(), msg->toString()); - return 0; + log->error("Send {} to {} failed({}): {}", + msg->toString(), msg->getRemoteAddress().toString(), rc, uv_strerror(rc)); + delete request; } } -void -RPCServer::bindSockets(const SocketAddress& bind4, const SocketAddress& bind6) +void RPCServer::onStop() noexcept { - sock4 = -1; - sock6 = -1; - - bound4 = {}; - if (bind4) { - try { - sock4 = bindSocket(bind4, bound4); - } catch (const DhtError& e) { - if (log) - log->error("Can't bind inet socket: {}", e.what()); - } + running = false; + log->info("RPCServer is on-stopping..."); + + if (udp6Started) { + udp6Started = false; + uv_udp_recv_stop(&udp6Handle); + uv_close((uv_handle_t*)&udp6Handle, NULL); } - bound6 = {}; - if (dht6 && bind6) { - if (bind6.port() == 0) { - // Attempt to use the same port as IPv4 with IPv6 - if (auto p4 = bound4.port()) { - auto b6 = SocketAddress({bind6.inaddr(), bind6.inaddrLength()}, p4); - try { - sock6 = bindSocket(b6, bound6); - } catch (const DhtError& e) { - if (log) - log->error("Can't bind inet6 socket: {}", e.what()); - } - } - } - if (sock6 == -1) { - try { - sock6 = bindSocket(bind6, bound6); - } catch (const DhtError& e) { - if (log) - log->error("Can't bind inet6 socket: {}", e.what()); - } - } + if (udp4Started) { + udp4Started = false; + uv_udp_recv_stop(&udp4Handle); + uv_close((uv_handle_t*)&udp4Handle, NULL); } - if (sock4 == -1 && sock6 == -1) { - throw DhtError("Can't bind socket"); + if (timerStarted) { + timerStarted = false; + uv_timer_stop(&timerHandle); + uv_close((uv_handle_t*)&timerHandle, nullptr); } -} -void -RPCServer::openSockets() -{ - running = true; - rcv_thread = std::thread([this, ls4=sock4, ls6=sock6]() mutable { - int selectFd = std::max({ls4, ls6}) + 1; - struct timeval timeout; - -// TODO:: will be remove - // //--------------------For Debug----------------------- - // char name[16]; - // snprintf(name, 16, "Tread%d", num++); - // pthread_setname_np(name); - - try { - while (running) { - fd_set readfds; - FD_ZERO(&readfds); - - if (ls4 >= 0) { - FD_SET(ls4, &readfds); - } - if (ls6 >= 0) { - FD_SET(ls6, &readfds); - } - - timeout.tv_sec = 0; - timeout.tv_usec = 100000; - - int rc = select(selectFd, &readfds, NULL, NULL, &timeout); - if (rc < 0) { - if (errno != EINTR) { - if (log) - log->error("Select error: {}", strerror(errno)); - std::this_thread::sleep_for(std::chrono::seconds(1)); - } - } - - if (not running) - break; - - if (rc > 0) { - std::array buf; - sockaddr_storage from; - socklen_t from_len = sizeof(from); - - if (ls4 >= 0 && FD_ISSET(ls4, &readfds)) - rc = recvfrom(ls4, (char*)buf.data(), (size_t)buf.size(), 0, (sockaddr*)&from, &from_len); - else if (ls6 >= 0 && FD_ISSET(ls6, &readfds)) - rc = recvfrom(ls6, (char*)buf.data(), (size_t)buf.size(), 0, (sockaddr*)&from, &from_len); - else - continue; - - if (rc > 0) { - SocketAddress addr = {from}; - handlePacket((uint8_t*)buf.data(), rc, addr); - } else if (rc == -1) { - if (log) - log->error("Error receiving packet: {}", strerror(errno)); - int err = errno; - if (err == EPIPE || err == ENOTCONN || err == ECONNRESET) { - if (not running) break; - std::unique_lock lk(lock, std::try_to_lock); - if (lk.owns_lock()) { - if (not running) break; - if (ls4 >= 0) { - #if defined(_WIN32) || defined(_WIN64) - closesocket(ls4); - #else - close(ls4); - #endif - try { - ls4 = bindSocket(bound4, bound4); - } catch (const DhtError& e) { - if (log) - log->error("Can't bind inet socket: {}", e.what()); - } - } - if (ls6 >= 0) { - #if defined(_WIN32) || defined(_WIN64) - closesocket(ls6); - #else - close(ls6); - #endif - try { - ls6 = bindSocket(bound6, bound6); - } catch (const DhtError& e) { - if (log) - log->error("Can't bind inet6 socket: {}", e.what()); - } - } - if (ls4 < 0 && ls6 < 0) - break; - sock4 = ls4; - sock6 = ls6; - selectFd = std::max({ls4, ls6}) + 1; - } else { - break; - } - } - } - } - - periodic(); - } - } catch (const std::exception& e) { - if (log) - log->error("Error in RPCServer rx thread: {}", e.what()); - } + if (checkStarted) { + checkStarted = false; + uv_check_stop(&checkHandle); + uv_close((uv_handle_t*)&checkHandle, nullptr); + } - if (ls4 >= 0) { -#if defined(_WIN32) || defined(_WIN64) - closesocket(ls4); -#else - close(ls4); -#endif - } - if (ls6 >= 0) { -#if defined(_WIN32) || defined(_WIN64) - closesocket(ls6); -#else - close(ls6); -#endif - } + if (asyncInited) { + asyncInited = false; + uv_close((uv_handle_t*)&stopHandle, nullptr); + } +} - std::unique_lock lk(lock, std::try_to_lock); - if (lk.owns_lock()) { - sock4 = -1; - sock6 = -1; - bound4 = {}; - bound6 = {}; - } - }); +void RPCServer::failHandler(int rc, std::string errType) { + log->error("RPCServer failed to {}({}): {}", errType, rc, uv_strerror(rc)); + onStop(); + if (loopInited) { + loopInited = false; + uv_loop_close(&loop); + } + throw networking_error(uv_strerror(rc)); } //-------------------------------------------------------------- @@ -359,15 +216,75 @@ void RPCServer::start() { if (state != State::INITIAL) return; - openSockets(); - state = State::RUNNING; startTime = currentTimeMillis(); - if (!bound6) - log->info("Started RPC server ipv4: {}", bound4.toString()); - else - log->info("Started RPC server ipv4: {}, ipv6: {}", bound4.toString(), bound6.toString()); + int rc = uv_loop_init(&loop); + if (rc < 0) { + failHandler(rc, "initialize the event loop"); + } + loopInited = true; + + // init the stop handle + rc = uv_async_init(&loop, &stopHandle, [](uv_async_t* handle) { + RPCServer* svr = (RPCServer*)handle->data; + svr->onStop(); + }); + if (rc < 0) { + failHandler(rc, "initialize the stop handle"); + } + stopHandle.data = this; + asyncInited = true; + + // init the idle/iteration handle + uv_check_init(&loop, &checkHandle); // always success + checkHandle.data = this; + uv_check_start(&checkHandle, [](uv_check_t* handle) { // always success + RPCServer* svr = (RPCServer*)handle->data; + svr->periodic(); + }); + checkStarted = true; + + // init the timer + uv_timer_init(&loop, &timerHandle); + timerHandle.data = this; + rc = uv_timer_start(&timerHandle, [](uv_timer_t* handle) { + RPCServer* svr = (RPCServer*)handle->data; + svr->periodic(); + }, PERIODIC_INTERVAL, PERIODIC_INTERVAL); + if (rc < 0) { + failHandler(rc, "start timer"); + } + timerStarted = true; + + // udp4 read start + if (hasIPv4()) { + log->trace("RPCServer start reading udp4 packet."); + readStart(&udp4Handle, bind4); + udp4Started = true; + } + + // udp6 read start + if (hasIPv6()) { + log->trace("RPCServer start reading udp6 packet."); + readStart(&udp6Handle, bind6); + udp4Started = true; + } + + // start the loop in thread + dht_thread = std::thread([&]() { + log->info("RPCServer is running."); + running = true; + + int rc = uv_run(&loop, UV_RUN_DEFAULT); + if (rc < 0) { + log->error("RPCServer failed to start the event loop({}): {}", rc, uv_strerror(rc)); + onStop(); + } + + uv_loop_close(&loop); + log->info("RPCServer is stopped."); + }); } @@ -379,13 +296,16 @@ void RPCServer::stop() { if (!running.exchange(false)) return; - if (rcv_thread.joinable()) - rcv_thread.join(); + uv_async_send(&stopHandle); + try { + dht_thread.join(); + } catch(...) { + } - if (bound4) - log->info("Stopped RPC Server ipv4: {}", bound4.toString()); - if (bound6) - log->info("Stopped RPC Server ipv6: {}", bound6.toString()); + if (hasIPv4()) + log->info("Stopped RPC Server ipv4: {}", bind4.toString()); + if (hasIPv6()) + log->info("Stopped RPC Server ipv6: {}", bind6.toString()); } void RPCServer::updateReachability(uint64_t now) { diff --git a/src/core/rpcserver.h b/src/core/rpcserver.h index a05271c..ca61b47 100644 --- a/src/core/rpcserver.h +++ b/src/core/rpcserver.h @@ -23,6 +23,7 @@ #pragma once +#include #include #include #include @@ -65,12 +66,12 @@ class RPCServer { bool hasIPv4() const { std::lock_guard lk(lock); - return sock4 != -1; + return dht4.has_value(); } bool hasIPv6() const { std::lock_guard lk(lock); - return sock6 != -1; + return dht6.has_value(); } Scheduler& getScheduler() { @@ -82,7 +83,7 @@ class RPCServer { } SocketAddress& getAddress(sa_family_t af) { - return (af == AF_INET) ? bound4: bound6; + return (af == AF_INET) ? bind4: bind6; } void sendError(Sp msg, int code, const std::string& err); @@ -92,10 +93,11 @@ class RPCServer { } private: - void bindSockets(const SocketAddress& bind4, const SocketAddress& bind6); - void openSockets(); - int sendData(Sp& msg); + void readStart(uv_udp_t* handle, const SocketAddress& bind); + void sendData(Sp& msg); void handlePacket(const uint8_t *buf, size_t buflen, const SocketAddress& from); + void failHandler(int rc, std::string errType); + void onStop() noexcept; void periodic(); Sp log; @@ -104,13 +106,10 @@ class RPCServer { std::optional> dht4; std::optional> dht6; - int sock4 {-1}; - int sock6 {-1}; + SocketAddress bind4 {}; + SocketAddress bind6 {}; - SocketAddress bound4 {}; - SocketAddress bound6 {}; - - std::thread rcv_thread {}; + std::thread dht_thread {}; std::atomic_bool running {false}; std::list> callQueue {}; @@ -130,6 +129,23 @@ class RPCServer { std::queue> messageQueue {}; Scheduler scheduler {}; + + uv_udp_t udp4Handle { 0 }; + uv_udp_t udp6Handle { 0 }; + + uv_loop_t loop { 0 }; + uv_async_t stopHandle { 0 }; + uv_check_t checkHandle { 0 }; + uv_timer_t timerHandle { 0 }; + + bool loopInited {false}; + bool asyncInited {false}; + bool checkStarted {false}; + bool timerStarted {false}; + bool udp4Started {false}; + bool udp6Started {false}; + + std::vector readBuffer {}; }; } // namespace boson diff --git a/src/core/value.cc b/src/core/value.cc index 8754c9d..2726da0 100644 --- a/src/core/value.cc +++ b/src/core/value.cc @@ -26,7 +26,7 @@ #include "boson/value.h" #include "crypto/hex.h" #include "crypto/shasum.h" -#include "exceptions/state_error.h" +#include "exceptions/exceptions.h" #include "serializers.h" namespace boson { @@ -150,10 +150,10 @@ std::vector Value::decryptData(Signature::PrivateKey recipientSk) { Value Value::update(const std::vector& data) { if (!isMutable()) - throw StateError("Immutable value " + getId().toBase58String()); + throw state_error("Immutable value " + getId().toBase58String()); if (!hasPrivateKey()) - throw StateError("Not the owner of the value " + getId().toBase58String()); + throw state_error("Not the owner of the value " + getId().toBase58String()); auto kp = Signature::KeyPair(getPrivateKey()); return Value(kp, recipient, nonce.value(), sequenceNumber + 1, data);