diff --git a/CMakeLists.txt b/CMakeLists.txt index 9a84157..4ff6e19 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -57,6 +57,7 @@ FIND_PACKAGE(ZLIB 1.2.11 REQUIRED) FIND_PACKAGE(fleet-protocol-cxx-helpers-static 1.2.0 REQUIRED) FIND_PACKAGE(aeron 1.48.6 REQUIRED) FIND_PACKAGE(async-function-execution-shared 1.0.0 REQUIRED) +FIND_PACKAGE(msquic CONFIG REQUIRED) FILE(GLOB_RECURSE source_files "source/*") ADD_LIBRARY(module-gateway-lib STATIC "${source_files}") @@ -76,6 +77,7 @@ TARGET_LINK_LIBRARIES(module-gateway-lib PUBLIC ZLIB::ZLIB fleet-protocol-cxx-helpers-static::fleet-protocol-cxx-helpers-static async-function-execution-shared::async-function-execution-shared + msquic ${CMAKE_DL_LIBS} ) diff --git a/include/bringauto/common_utils/EnumUtils.hpp b/include/bringauto/common_utils/EnumUtils.hpp index cb6b0bc..56994d6 100644 --- a/include/bringauto/common_utils/EnumUtils.hpp +++ b/include/bringauto/common_utils/EnumUtils.hpp @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -32,6 +33,8 @@ class EnumUtils { switch(toString) { case structures::ProtocolType::MQTT: return settings::Constants::MQTT; + case structures::ProtocolType::QUIC: + return settings::Constants::QUIC; case structures::ProtocolType::DUMMY: return settings::Constants::DUMMY; case structures::ProtocolType::INVALID: @@ -71,6 +74,31 @@ class EnumUtils { } }; -}; + /** + * @brief Converts connection state to string + * + * @param state external_client::connection::ConnectionState + * @return std::string_view + */ + static constexpr std::string_view connectionStateToString( + external_client::connection::ConnectionState state + ) { + using enum external_client::connection::ConnectionState; + switch (state) { + case NOT_INITIALIZED: + return settings::Constants::LOG_CONNECTION_STATE_NOT_INITIALIZED; + case NOT_CONNECTED: + return settings::Constants::LOG_CONNECTION_STATE_NOT_CONNECTED; + case CONNECTING: + return settings::Constants::LOG_CONNECTION_STATE_CONNECTING; + case CONNECTED: + return settings::Constants::LOG_CONNECTION_STATE_CONNECTED; + case CLOSING: + return settings::Constants::LOG_CONNECTION_STATE_CLOSING; + default: + return settings::Constants::LOG_UNKNOWN; + } + } + }; } diff --git a/include/bringauto/external_client/connection/ConnectionState.hpp b/include/bringauto/external_client/connection/ConnectionState.hpp index 91ec13f..375ca30 100644 --- a/include/bringauto/external_client/connection/ConnectionState.hpp +++ b/include/bringauto/external_client/connection/ConnectionState.hpp @@ -18,6 +18,10 @@ enum class ConnectionState { /** * CONNECTED - Client is connected to the External server */ - CONNECTED + CONNECTED, + /** + * CLOSING - Client closing connection to the External server + */ + CLOSING }; } diff --git a/include/bringauto/external_client/connection/communication/QuicCommunication.hpp b/include/bringauto/external_client/connection/communication/QuicCommunication.hpp new file mode 100644 index 0000000..aa28683 --- /dev/null +++ b/include/bringauto/external_client/connection/communication/QuicCommunication.hpp @@ -0,0 +1,338 @@ +#pragma once + +#include +#include + +#include +#include + +#include +#include +#include + + +namespace bringauto::external_client::connection::communication { + class QuicCommunication : public ICommunicationChannel { + public: + explicit QuicCommunication(const structures::ExternalConnectionSettings &settings, const std::string &company, + const std::string &vehicleName); + + ~QuicCommunication() override; + + /** + * @brief Initializes a QUIC connection to the server. + * + * Attempts to establish a new QUIC connection. + * It first atomically verifies that the current connection state is + * NOT_CONNECTED and transitions it to CONNECTING in order to prevent + * concurrent connection attempts. + * + * After the state transition, it opens a QUIC connection handle and + * starts the connection using the configured server address, port, + * and QUIC configuration. + * + * Any failures during the connection open or start process are logged. + */ + void initializeConnection() override; + + /** + * @brief Enqueues an outgoing message to be sent over the QUIC connection. + * + * Creates a shared copy of the provided ExternalClient message + * and pushes it into the outbound message queue in a thread-safe manner. + * After enqueuing, it notifies the sender thread via a condition variable + * that a new message is available for sending. + * + * @param message Pointer to the message that should be sent. + * @return true Always returns true to indicate the message was successfully enqueued. + */ + bool sendMessage(ExternalProtocol::ExternalClient *message) override; + + /** + * @brief Receives an incoming message from the QUIC connection. + * + * Waits for an incoming message to appear in the inbound + * queue or for the connection state to change from CONNECTED. + * The wait is bounded by a configurable timeout. + * + * If the wait times out, the connection is no longer in the CONNECTED + * state, or no message is available, the function returns nullptr. + * Otherwise, it retrieves and removes the next message from the inbound + * queue and returns it. + * + * @return A shared pointer to the received ExternalServer message, + * or nullptr if no message is available or the connection is not active. + */ + std::shared_ptr receiveMessage() override; + + /** + * @brief Initiates a graceful shutdown of the QUIC connection. + * + * Requests an orderly shutdown of the active QUIC connection. + * If no connection is currently established, the function returns immediately. + * + * The shutdown is performed asynchronously. Completion is signaled via the + * QUIC_CONNECTION_EVENT_SHUTDOWN_COMPLETE event, which is handled in + * the connectionCallback. + */ + void closeConnection() override; + + private: + /// Pointer to the MsQuic API function table + const QUIC_API_TABLE *quic_{nullptr}; + /// QUIC registration handle associated with the application + HQUIC registration_{nullptr}; + /// QUIC configuration handle (ALPN, credentials, transport settings) + HQUIC config_{nullptr}; + /// Active QUIC connection handle + HQUIC connection_{nullptr}; + /// Application-Layer Protocol Negotiation (ALPN) string + std::string alpn_; + /// QUIC buffer wrapping the ALPN string + QUIC_BUFFER alpnBuffer_{}; + + /// Path to the client certificate file + std::string certFile_; + /// Path to the client private key file + std::string keyFile_; + /// Path to the CA certificate file + std::string caFile_; + + /// Atomic state of the connection used for synchronization across threads + std::atomic connectionState_{ConnectionState::NOT_CONNECTED}; + + /// @name Inbound (peer → this) + /// @{ + /// Queue of incoming messages received from the peer + std::queue > inboundQueue_; + /// Mutex protecting access to the inbound message queue + std::mutex inboundMutex_; + /// Condition variable for signaling inbound message availability + std::condition_variable inboundCv_; + /// @} + + /// @name Outbound (this → peer) + /// @{ + /// Queue of outgoing messages to be sent to the peer + std::queue > outboundQueue_; + /// Mutex protecting access to the outbound message queue + std::mutex outboundMutex_; + /// Condition variable for signaling outbound message availability + std::condition_variable outboundCv_; + /// Dedicated sender thread responsible for transmitting outbound messages + std::jthread senderThread_; + /// @} + + /** + * @brief Owns memory for a single MsQuic StreamSend operation. + * + * SendBuffer wraps a QUIC_BUFFER together with its backing storage. + * The memory must remain valid until MsQuic signals + * QUIC_STREAM_EVENT_SEND_COMPLETE, at which point it can be safely freed. + * + * Instances of this struct are typically allocated on the heap and passed + * to MsQuic via the StreamSend ClientContext pointer. + */ + struct SendBuffer { + QUIC_BUFFER buffer{}; + std::string storage; + + /** + * @brief Constructs a SendBuffer with zero-initialized storage. + * + * Allocates storage of the given size, fills it with zero bytes, + * and initializes the QUIC_BUFFER to point to this storage. + * + * @param size Number of bytes to allocate for the send buffer. + */ + explicit SendBuffer(size_t size) + : storage(size, '\0') { + buffer.Length = static_cast(storage.size()); + buffer.Buffer = reinterpret_cast(storage.data()); + } + }; + + /** + * @brief Loads and initializes the MsQuic API. + * + * Initializes the MsQuic library and retrieves the + * QUIC API function table. The resulting table is stored for later + * use when creating registrations, configurations, and connections. + * + * If the initialization fails, an error is logged. + */ + void loadMsQuic(); + + /** + * @brief Initializes a QUIC registration. + * + * Creates a QUIC registration with the specified application name and + * a low-latency execution profile. The registration is required for + * creating QUIC configurations and connections. + * + * If registration creation fails, an error is logged. + */ + void initRegistration(); + + /** + * @brief Initializes the QUIC configuration and loads client credentials. + * + * Opens a QUIC configuration using the configured ALPN and default QUIC + * transport settings. Client TLS credentials are then set up using a + * certificate file, private key file, and CA certificate file. + * + * If configuration creation or credential loading fails, an error is logged. + */ + void initConfiguration(); + + /** + * @brief Opens a QUIC configuration. + * + * Creates a QUIC configuration associated with the current registration, + * configured ALPN, and optional transport settings. + * + * If settings are not provided, default QUIC settings are used. + * On failure, an error is logged. + */ + void configurationOpen(); + + /** + * @brief Loads TLS credentials into the QUIC configuration. + * + * Loads client-side TLS credentials into the active QUIC configuration. + * The credentials define the certificate, private key, and CA certificate + * used for secure communication. + * + * If credential loading fails, an error is logged. + * + * @param credential Pointer to the QUIC credential configuration. + */ + void configurationLoadCredential(const QUIC_CREDENTIAL_CONFIG *credential) const; + + /** + * @brief Handles a successfully decoded incoming message. + * + * Pushes the decoded ExternalServer message into the inbound queue + * in a thread-safe manner and notifies the receiver thread that a + * new message is available. + * + * @param msg Decoded message received from the peer. + */ + void onMessageDecoded(std::shared_ptr msg); + + /** + * @brief Sends a message to the peer using a QUIC stream. + * + * Opens a new QUIC stream on the active connection and serializes the + * provided ExternalClient message into a byte buffer. + * The message is sent using a single StreamSend call with START and FIN + * flags, effectively opening, sending, and closing the stream. + * + * The allocated send buffer is released asynchronously in the + * QUIC_STREAM_EVENT_SEND_COMPLETE callback. + * + * @param message Message to be sent to the peer. + */ + void sendViaQuicStream(const ExternalProtocol::ExternalClient& message); + + /** + * @brief Closes the active QUIC configuration. + */ + void closeConfiguration(); + + /** + * @brief Closes the QUIC registration. + */ + void closeRegistration(); + + /** + * @brief Closes the MsQuic API and releases associated resources. + */ + void closeMsQuic(); + + /** + * @brief Stops the QUIC communication and releases all resources. + * + * Initiates connection shutdown and closes the QUIC configuration, + * registration, and MsQuic API in the correct order. + * All waiting sender and receiver threads are unblocked by notifying + * the associated condition variables. + */ + void stop(); + + /** + * @brief Handles QUIC connection-level events. + * + * Processes connection lifecycle events reported by MsQuic, including + * successful connection establishment, peer-initiated shutdown, and + * shutdown completion. + * + * All QUIC_CONNECTION_EVENT cases are documented at + * https://microsoft.github.io/msquic/msquicdocs/docs/api/QUIC_CONNECTION_EVENT.html + * + * @param connection QUIC connection handle. + * @param context User-defined context pointer (QuicCommunication instance). + * @param event Connection event information provided by MsQuic. + * @return QUIC_STATUS_SUCCESS to indicate successful event handling. + */ + static QUIC_STATUS QUIC_API connectionCallback(HQUIC connection, void *context, QUIC_CONNECTION_EVENT *event); + + /** + * @brief Handles QUIC stream-level events. + * + * Processes stream events reported by MsQuic, including data reception, + * send completion, stream startup, and shutdown notifications. + * + * All QUIC_STREAM_EVENT cases are documented at + * https://microsoft.github.io/msquic/msquicdocs/docs/api/QUIC_STREAM_EVENT.html + * + * @param stream QUIC stream handle associated with the event. + * @param context User-defined context pointer (QuicCommunication instance). + * @param event Stream event information provided by MsQuic. + * @return QUIC_STATUS_SUCCESS to indicate successful event handling. + */ + static QUIC_STATUS QUIC_API streamCallback(HQUIC stream, void *context, QUIC_STREAM_EVENT *event); + + /** + * @brief Sender thread main loop for outbound messages. + * + * Waits for outbound messages while the connection is in the CONNECTED state. + * Messages are dequeued and sent over individual QUIC streams. + * + * If sending fails, the message is re-enqueued for a later retry. + * The loop terminates when the connection leaves the CONNECTED state. + */ + void senderLoop(); + + /** + * @brief Retrieves the QUIC stream identifier for the given stream handle. + * + * Queries MsQuic for the stream ID associated with the provided HQUIC stream. + * If the parameter query fails, an empty optional is returned. + * + * @param stream Valid QUIC stream handle. + * @return Stream identifier on success, or std::nullopt if the query fails. + */ + std::optional getStreamId(HQUIC stream); + + /** + * @brief Retrieves a protocol setting value as a plain string. + * + * Extracts a value from ExternalConnectionSettings::protocolSettings and + * transparently handles values stored as JSON-encoded strings. + * + * Allows uniform access to protocol settings regardless of whether + * they were stored as plain strings or JSON-serialized values. + * + * @param settings External connection settings containing protocolSettings. + * @param key Key identifying the protocol setting. + * @return Plain string value suitable for direct use (e.g. file paths). + * + * @throws std::out_of_range if the key is not present in protocolSettings. + */ + static std::string getProtocolSettingsString( + const structures::ExternalConnectionSettings &settings, + std::string_view key + ); + }; +} diff --git a/include/bringauto/settings/Constants.hpp b/include/bringauto/settings/Constants.hpp index 3aa6238..e9628b5 100644 --- a/include/bringauto/settings/Constants.hpp +++ b/include/bringauto/settings/Constants.hpp @@ -140,6 +140,7 @@ class Constants { inline static constexpr std::string_view LOG_LEVEL { "level" }; inline static constexpr std::string_view LOG_USE { "use" }; inline static constexpr std::string_view LOG_PATH { "path" }; + inline static constexpr std::string_view LOG_UNKNOWN { "unknown" }; inline static constexpr std::string_view LOG_LEVEL_DEBUG { "DEBUG" }; inline static constexpr std::string_view LOG_LEVEL_INFO { "INFO" }; @@ -148,6 +149,12 @@ class Constants { inline static constexpr std::string_view LOG_LEVEL_CRITICAL { "CRITICAL" }; inline static constexpr std::string_view LOG_LEVEL_INVALID { "INVALID" }; + inline static constexpr std::string_view LOG_CONNECTION_STATE_NOT_INITIALIZED { "not initialized" }; + inline static constexpr std::string_view LOG_CONNECTION_STATE_NOT_CONNECTED { "not connected" }; + inline static constexpr std::string_view LOG_CONNECTION_STATE_CONNECTING { "connecting" }; + inline static constexpr std::string_view LOG_CONNECTION_STATE_CONNECTED { "connected" }; + inline static constexpr std::string_view LOG_CONNECTION_STATE_CLOSING { "closing" }; + inline static constexpr std::string_view HELP { "help" }; inline static constexpr std::string_view PORT { "port" }; @@ -164,12 +171,15 @@ class Constants { inline static constexpr std::string_view PROTOCOL_TYPE { "protocol-type" }; inline static constexpr std::string_view MQTT { "MQTT" }; + inline static constexpr std::string_view QUIC { "QUIC" }; inline static constexpr std::string_view DUMMY { "DUMMY" }; inline static constexpr std::string_view MQTT_SETTINGS { "mqtt-settings" }; + inline static constexpr std::string_view QUIC_SETTINGS { "quic-settings" }; inline static constexpr std::string_view SSL { "ssl" }; inline static constexpr std::string_view CA_FILE { "ca-file" }; inline static constexpr std::string_view CLIENT_CERT { "client-cert" }; inline static constexpr std::string_view CLIENT_KEY { "client-key" }; + inline static constexpr std::string_view ALPN { "alpn" }; inline static constexpr std::string_view MODULES { "modules" }; inline static constexpr std::string_view AERON_CONNECTION { "aeron:ipc"}; diff --git a/include/bringauto/settings/QuicSettingsParser.hpp b/include/bringauto/settings/QuicSettingsParser.hpp new file mode 100644 index 0000000..b1df800 --- /dev/null +++ b/include/bringauto/settings/QuicSettingsParser.hpp @@ -0,0 +1,21 @@ +#pragma once + +#include + +#include + +#include + + +namespace bringauto::settings { + class QuicSettingsParser { + public: + static QUIC_SETTINGS parse(const structures::ExternalConnectionSettings &settings); + + private: + static std::optional getOptionalUint( + const structures::ExternalConnectionSettings &settings, + std::string_view key + ); + }; +} diff --git a/include/bringauto/structures/ExternalConnectionSettings.hpp b/include/bringauto/structures/ExternalConnectionSettings.hpp index bdc462e..5aee40b 100644 --- a/include/bringauto/structures/ExternalConnectionSettings.hpp +++ b/include/bringauto/structures/ExternalConnectionSettings.hpp @@ -14,6 +14,7 @@ namespace bringauto::structures { enum class ProtocolType { INVALID = -1, MQTT, + QUIC, DUMMY }; diff --git a/resources/config/README.md b/resources/config/README.md index 2889f14..a67581a 100644 --- a/resources/config/README.md +++ b/resources/config/README.md @@ -28,18 +28,27 @@ Note: at least one logging sink needs to be used * company : company name used as identification in external connection (string) * vehicle-name : vehicle name used as identification in external connection (string) * endpoints : array of objects listing possible ways to connect to external server - - protocol-type : string (only mqtt is supported) + - protocol-type : string (only mqtt and quic are supported) - server-ip : ip of the external connection (string) - port : port of the external connection (int) - modules : array of integers that represent module numbers to be used on this connection - - mqtt-settings : **only for mqtt** - - ssl : if connection requires ssl, bool - - ca-file : public trusted certificate file name (string) - - client-cert : public certificate chain file name (string) - - client-key : private key file name (string) -## Example +#### mqtt-settings (only for MQTT) +* ssl : if connection requires ssl, bool +* ca-file : public trusted certificate file name (string) +* client-cert : public certificate chain file name (string) +* client-key : private key file name (string) -[Example](./example.json) +#### quic-settings (only for QUIC) +* ca-file : path to the trusted CA certificate file (string) +* client-cert : path to the client certificate file (string) +* client-key : path to the client private key file (string) +* alpn : Application-Layer Protocol Negotiation identifier (string), must match the ALPN configured on the server + +Note: QUIC uses TLS 1.3 internally. All certificate files must be provided in a format supported by MsQuic/OpenSSL. +## Examples + +[MQTT Example](./example.json) +[QUIC Example](./quic_example.json) diff --git a/resources/config/quic_example.json b/resources/config/quic_example.json new file mode 100644 index 0000000..640b364 --- /dev/null +++ b/resources/config/quic_example.json @@ -0,0 +1,36 @@ +{ + "logging": { + "console": { "level": "DEBUG", "use": true }, + "file": { "level": "DEBUG", "use": false, "path": "./log" } + }, + + "internal-server-settings": { "port": 1636 }, + + "module-paths": { + "1": "/home/bringauto/modules/mission_module/lib/libmission-module-gateway-shared.so", + "2": "/home/bringauto/modules/io_module/lib/libio-module-gateway-shared.so", + "3": "/home/bringauto/modules/transparent_module/lib/libtransparent-module-gateway-shared.so" + }, + + "module-binary-path": "", + + "external-connection" : { + "company" : "bringauto", + "vehicle-name" : "virtual_vehicle", + "endpoints" : [ + { + "protocol-type" : "QUIC", + "server-ip": "127.0.0.1", + "port": 6121, + "quic-settings": { + "ca-file" : "build/certs/ca.pem", + "client-cert" : "build/certs/client.pem", + "client-key" : "build/certs/client-key.pem", + "alpn" : "sample", + "DisconnectTimeoutMs" : 800 + }, + "modules": [1,2,3] + } + ] + } +} diff --git a/source/bringauto/common_utils/EnumUtils.cpp b/source/bringauto/common_utils/EnumUtils.cpp index d7ab700..ae8976d 100644 --- a/source/bringauto/common_utils/EnumUtils.cpp +++ b/source/bringauto/common_utils/EnumUtils.cpp @@ -10,6 +10,8 @@ structures::ProtocolType EnumUtils::stringToProtocolType(std::string toEnum) { std::transform(toEnum.begin(), toEnum.end(), toEnum.begin(), ::toupper); if(toEnum == settings::Constants::MQTT) { return structures::ProtocolType::MQTT; + } else if(toEnum == settings::Constants::QUIC) { + return structures::ProtocolType::QUIC; } else if(toEnum == settings::Constants::DUMMY) { return structures::ProtocolType::DUMMY; } diff --git a/source/bringauto/external_client/ExternalClient.cpp b/source/bringauto/external_client/ExternalClient.cpp index 65c964f..c21b145 100644 --- a/source/bringauto/external_client/ExternalClient.cpp +++ b/source/bringauto/external_client/ExternalClient.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include @@ -99,6 +100,11 @@ void ExternalClient::initConnections() { connectionSettings, context_->settings->company, context_->settings->vehicleName ); break; + case structures::ProtocolType::QUIC: + communicationChannel = std::make_shared( + connectionSettings, context_->settings->company, context_->settings->vehicleName + ); + break; case structures::ProtocolType::DUMMY: communicationChannel = std::make_shared( connectionSettings diff --git a/source/bringauto/external_client/connection/communication/QuicCommunication.cpp b/source/bringauto/external_client/connection/communication/QuicCommunication.cpp new file mode 100644 index 0000000..f356681 --- /dev/null +++ b/source/bringauto/external_client/connection/communication/QuicCommunication.cpp @@ -0,0 +1,502 @@ +#include +#include +#include +#include +#include + + +namespace bringauto::external_client::connection::communication { + QuicCommunication::QuicCommunication(const structures::ExternalConnectionSettings &settings, + const std::string &company, + const std::string &vehicleName) : ICommunicationChannel(settings), + alpn_(getProtocolSettingsString( + settings, + settings::Constants::ALPN)), + certFile_(getProtocolSettingsString( + settings, + settings::Constants::CLIENT_CERT)), + keyFile_(getProtocolSettingsString( + settings, + settings::Constants::CLIENT_KEY)), + caFile_(getProtocolSettingsString( + settings, + settings::Constants::CA_FILE)) { + alpnBuffer_.Buffer = reinterpret_cast(alpn_.data()); + alpnBuffer_.Length = static_cast(alpn_.size()); + + loadMsQuic(); + initRegistration(); + initConfiguration(); + + settings::Logger::logInfo("[quic] Initialize QUIC communication to {}:{} for {}/{}", settings.serverIp, + settings.port, company, vehicleName); + } + + QuicCommunication::~QuicCommunication() { + stop(); + } + + void QuicCommunication::initializeConnection() { + settings::Logger::logDebug("[quic] Connecting to server when {}", + common_utils::EnumUtils::connectionStateToString(connectionState_)); + + ConnectionState expected = ConnectionState::NOT_CONNECTED; + if (!connectionState_. + compare_exchange_strong(expected, ConnectionState::CONNECTING, std::memory_order_acq_rel)) { + settings::Logger::logError("Connection already in progress or established"); + return; + } + + QUIC_STATUS status = quic_->ConnectionOpen(registration_, connectionCallback, this, &connection_); + if (QUIC_FAILED(status)) { + settings::Logger::logError("ConnectionOpen failed (status=0x{:x})", status); + return; + } + + status = quic_->ConnectionStart( + connection_, + config_, + QUIC_ADDRESS_FAMILY_INET, + settings_.serverIp.c_str(), + settings_.port + ); + + if (QUIC_FAILED(status)) { + settings::Logger::logError("ConnectionOpen failed (status=0x{:x})", status); + return; + } + + connectionState_ = ConnectionState::CONNECTING; + } + + bool QuicCommunication::sendMessage(ExternalProtocol::ExternalClient *message) { + if (connectionState_.load() == ConnectionState::NOT_CONNECTED) { + settings::Logger::logWarning("[quic] Connection not established, cannot send message"); + return false; + } + + { + auto copy = std::make_unique(*message); + std::lock_guard lock(outboundMutex_); + outboundQueue_.push(std::move(copy)); + } + settings::Logger::logDebug("[quic] Notifying sender thread about enqueued message"); + outboundCv_.notify_one(); + return true; + } + + std::shared_ptr QuicCommunication::receiveMessage() { + std::unique_lock lock(inboundMutex_); + + if (!inboundCv_.wait_for( + lock, + settings::receive_message_timeout, + [this] { return !inboundQueue_.empty() || connectionState_.load() != ConnectionState::CONNECTED; } + )) { + return nullptr; + } + + if (connectionState_.load() != ConnectionState::CONNECTED || inboundQueue_.empty()) { + return nullptr; + } + + auto msg = inboundQueue_.front(); + inboundQueue_.pop(); + return msg; + } + + void QuicCommunication::loadMsQuic() { + QUIC_STATUS status = MsQuicOpen2(&quic_); + if (QUIC_FAILED(status)) { + settings::Logger::logError("[quic] Failed to open QUIC; QUIC_STATUS => {:x}", status); + return; + } + } + + constexpr auto quicRegistrationAppName = "module-gateway-quic-client"; + + void QuicCommunication::initRegistration() { + QUIC_REGISTRATION_CONFIG config{}; + config.AppName = quicRegistrationAppName; + config.ExecutionProfile = QUIC_EXECUTION_PROFILE_LOW_LATENCY; + + QUIC_STATUS status = quic_->RegistrationOpen(&config, ®istration_); + if (QUIC_FAILED(status)) { + settings::Logger::logError("[quic] Failed to open QUIC registration; QUIC_STATUS => {:x}", status); + return; + } + } + + void QuicCommunication::initConfiguration() { + configurationOpen(); + + QUIC_CERTIFICATE_FILE certificate{}; + certificate.CertificateFile = certFile_.c_str(); + certificate.PrivateKeyFile = keyFile_.c_str(); + + QUIC_CREDENTIAL_CONFIG credential{}; + credential.Type = QUIC_CREDENTIAL_TYPE_CERTIFICATE_FILE; + credential.Flags = QUIC_CREDENTIAL_FLAG_CLIENT | QUIC_CREDENTIAL_FLAG_SET_CA_CERTIFICATE_FILE; + credential.CertificateFile = &certificate; + credential.CaCertificateFile = caFile_.c_str(); + + configurationLoadCredential(&credential); + } + + void QuicCommunication::configurationOpen() { + const QUIC_SETTINGS settings = settings::QuicSettingsParser::parse(settings_); + const uint32_t settingsSize = sizeof(QUIC_SETTINGS); + + QUIC_STATUS status = quic_->ConfigurationOpen( + registration_, + &alpnBuffer_, + 1, + &settings, + settingsSize, + nullptr, + &config_ + ); + + if (QUIC_FAILED(status)) { + settings::Logger::logError("[quic] Failed to open QUIC configuration; QUIC_STATUS => {:x}", status); + return; + } + } + + void QuicCommunication::configurationLoadCredential(const QUIC_CREDENTIAL_CONFIG *credential) const { + const QUIC_STATUS status = quic_->ConfigurationLoadCredential(config_, credential); + if (QUIC_FAILED(status)) { + settings::Logger::logError("[quic] Failed to load QUIC credential; QUIC_STATUS => {:x}", status); + return; + } + } + + void QuicCommunication::closeConnection() { + if (!connection_) { + return; + } + + quic_->ConnectionShutdown(connection_, QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0); + + /// Asynchronously waiting for QUIC_CONNECTION_EVENT_SHUTDOWN_COMPLETE event, then continue in connectionCallback + } + + void QuicCommunication::closeConfiguration() { + if (config_) { + quic_->ConfigurationClose(config_); + } + + config_ = nullptr; + } + + void QuicCommunication::closeRegistration() { + if (registration_) { + quic_->RegistrationClose(registration_); + } + + registration_ = nullptr; + } + + void QuicCommunication::closeMsQuic() { + if (quic_) { + MsQuicClose(quic_); + } + + quic_ = nullptr; + } + + void QuicCommunication::stop() { + closeConnection(); + closeConfiguration(); + closeRegistration(); + closeMsQuic(); + + inboundCv_.notify_all(); + outboundCv_.notify_all(); + + settings::Logger::logInfo("[quic] Connection stopped"); + } + + void QuicCommunication::onMessageDecoded( + std::shared_ptr msg + ) { + { + std::scoped_lock lock(inboundMutex_); + inboundQueue_.push(std::move(msg)); + } + settings::Logger::logDebug("[quic] Notifying receiver thread about dequeued message"); + inboundCv_.notify_one(); + } + + void QuicCommunication::sendViaQuicStream(const ExternalProtocol::ExternalClient& message) { + HQUIC stream{nullptr}; + if (QUIC_FAILED(quic_->StreamOpen(connection_, QUIC_STREAM_OPEN_FLAG_NONE, streamCallback, this, &stream))) { + settings::Logger::logError("[quic] StreamOpen failed"); + return; + } + + const size_t size = message.ByteSizeLong(); + auto sendBuffer = std::make_unique(size); + + if (!message.SerializeToArray(sendBuffer->storage.data(), static_cast(size))) { + settings::Logger::logError("[quic] Message serialization failed"); + return; + } + + const SendBuffer *raw = sendBuffer.get(); + const QUIC_BUFFER *quicBuf = &raw->buffer; + SendBuffer *quicBufContext = sendBuffer.release(); + + const QUIC_STATUS status = quic_->StreamSend( + stream, + quicBuf, + 1, + /** + * START => Simulates quic_->StreamStart before send + * FIN => Simulates quic_->StreamShutdown after send + */ + QUIC_SEND_FLAG_START | QUIC_SEND_FLAG_FIN, + quicBufContext + ); + + if (QUIC_FAILED(status)) { + std::unique_ptr reclaim{quicBufContext}; + + quic_->StreamShutdown(stream, QUIC_STREAM_SHUTDOWN_FLAG_ABORT, 0); + settings::Logger::logError("[quic] Failed to send QUIC stream; QUIC_STATUS => {:x}", status); + return; + } + + auto streamId = getStreamId(stream); + settings::Logger::logDebug("[quic] [stream {}] Message sent", streamId ? *streamId : 0); + } + + QUIC_STATUS QUIC_API QuicCommunication::connectionCallback(HQUIC connection, void *context, + QUIC_CONNECTION_EVENT *event) { + auto *self = static_cast(context); + + switch (event->Type) { + /// Fired when the QUIC handshake is complete and the connection is ready + /// for stream creation and data transfer. + case QUIC_CONNECTION_EVENT_CONNECTED: { + settings::Logger::logInfo("[quic] Connected to server"); + + auto expected = ConnectionState::CONNECTING; + if (self->connectionState_.compare_exchange_strong(expected, ConnectionState::CONNECTED)) { + /// Start sender thread only after connection is fully established + self->senderThread_ = std::jthread(&QuicCommunication::senderLoop, self); + self->outboundCv_.notify_all(); + } + break; + } + + /// Final notification that the connection has been fully shut down. + /// This is the last event delivered for the connection handle. + case QUIC_CONNECTION_EVENT_SHUTDOWN_COMPLETE: { + settings::Logger::logInfo("[quic] Connection shutdown complete"); + + self->connectionState_ = ConnectionState::NOT_CONNECTED; + self->outboundCv_.notify_all(); + + if (self->senderThread_.joinable()) { + self->senderThread_.request_stop(); + } + + self->quic_->ConnectionClose(connection); + self->connection_ = nullptr; + break; + } + + /// Peer or transport initiated connection shutdown (error or graceful close). + /// Further sends may fail after this event. + case QUIC_CONNECTION_EVENT_SHUTDOWN_INITIATED_BY_PEER: { + settings::Logger::logWarning("[quic] Connection shutdown initiated by peer"); + self->connectionState_ = ConnectionState::CLOSING; + break; + } + + default: { + settings::Logger::logDebug("[quic] Unhandled connection event 0x{:x}", + static_cast(event->Type)); + break; + } + } + + return QUIC_STATUS_SUCCESS; + } + + QUIC_STATUS QUIC_API QuicCommunication::streamCallback(HQUIC stream, void *context, QUIC_STREAM_EVENT *event) { + auto *self = static_cast(context); + auto streamId = self->getStreamId(stream); + + switch (event->Type) { + /// Raised when the peer sends stream data and MsQuic delivers received bytes to the application. + case QUIC_STREAM_EVENT_RECEIVE: { + if (event->RECEIVE.BufferCount == 0) { + settings::Logger::logDebug( + "[quic] [stream {}] End of stream received", + streamId ? *streamId : 0 + ); + break; + } + + settings::Logger::logDebug( + "[quic] [stream {}] Received {:d} bytes in {:d} buffers", + streamId ? *streamId : 0, + event->RECEIVE.TotalBufferLength, + event->RECEIVE.BufferCount + ); + + std::vector data; + data.reserve(event->RECEIVE.TotalBufferLength); + + for (uint32_t i = 0; i < event->RECEIVE.BufferCount; ++i) { + const auto &b = event->RECEIVE.Buffers[i]; + data.insert(data.end(), b.Buffer, b.Buffer + b.Length); + } + + auto msg = std::make_shared(); + if (!msg->ParseFromArray(data.data(), static_cast(data.size()))) { + settings::Logger::logError("[quic] Failed to parse ExternalServer message"); + } else { + self->onMessageDecoded(std::move(msg)); + } + + break; + } + + /// Raised when the peer has finished sending on this stream + /// (peer's FIN has been fully received and processed). + case QUIC_STREAM_EVENT_PEER_SEND_SHUTDOWN: { + settings::Logger::logDebug("[quic] [stream {}] Peer stream send shutdown", streamId ? *streamId : 0); + break; + } + + /// Raised when the local send direction is fully shut down + /// and the peer has acknowledged the FIN. + case QUIC_STREAM_EVENT_SEND_SHUTDOWN_COMPLETE: { + settings::Logger::logDebug("[quic] [stream {}] Stream send shutdown complete", + streamId ? *streamId : 0); + break; + } + + /// Raised after StreamStart completes successfully + /// and the stream becomes active with a valid stream ID. + case QUIC_STREAM_EVENT_START_COMPLETE: { + settings::Logger::logDebug("[quic] [stream {}] Stream start completed", streamId ? *streamId : 0); + break; + } + + /// Raised when a single StreamSend operation completes + /// (data was accepted, acknowledged, or the send was canceled). + case QUIC_STREAM_EVENT_SEND_COMPLETE: { + /** + * This event is raised when MsQuic has finished processing + * a single StreamSend request. + * + * Meaning: + * - MsQuic no longer needs the application-provided buffer: + * - the data has been acknowledged (ACKed) by the peer + * at the QUIC transport level and will not be retransmitted + * - OR the send was canceled (Canceled == TRUE), e.g. due to + * stream or connection shutdown + * + * Reliability semantics: + * - the ACK is strictly a QUIC transport-level acknowledgment + * - it does NOT mean the peer application has read or processed + * the data + * + * Practical consequence: + * - this is the only correct place to safely free the memory + * passed to StreamSend (via ClientContext) + */ + if (event->SEND_COMPLETE.Canceled) { + settings::Logger::logDebug("[quic] [stream {}] Stream send canceled", + streamId ? *streamId : 0); + } else { + settings::Logger::logDebug("[quic] [stream {}] Stream send completed", + streamId ? *streamId : 0); + } + + std::unique_ptr sendBuf{ + static_cast(event->SEND_COMPLETE.ClientContext) + }; + + break; + } + + /// Raised when both send and receive directions are closed + /// and the stream lifecycle is fully complete. + case QUIC_STREAM_EVENT_SHUTDOWN_COMPLETE: { + settings::Logger::logDebug("[quic] [stream {}] Stream shutdown complete", streamId ? *streamId : 0); + self->quic_->StreamClose(stream); + break; + } + + default: { + settings::Logger::logDebug("[quic] [stream {}] Unhandled stream event 0x{:x}", streamId ? *streamId : 0, + static_cast(event->Type)); + break; + } + } + + return QUIC_STATUS_SUCCESS; + } + + void QuicCommunication::senderLoop() { + settings::Logger::logDebug("[quic] Sender thread loop started"); + + while (connectionState_.load() == ConnectionState::CONNECTED) { + std::unique_ptr msg; + + std::unique_lock lock(outboundMutex_); + + settings::Logger::logDebug("[quic] Sender thread loop waiting for outbound queue"); + outboundCv_.wait(lock, [this] { + return !outboundQueue_.empty() || + connectionState_.load() != ConnectionState::CONNECTED; + }); + + if (connectionState_.load() != ConnectionState::CONNECTED) { + break; + } + + settings::Logger::logDebug("[quic] Sender thread loop sending outbound queue"); + msg = std::move(outboundQueue_.front()); + outboundQueue_.pop(); + lock.unlock(); + + sendViaQuicStream(*msg); + } + } + + std::optional QuicCommunication::getStreamId(HQUIC stream) { + uint64_t streamId = 0; + uint32_t streamIdLen = sizeof(streamId); + + if (QUIC_FAILED(quic_->GetParam( + stream, + QUIC_PARAM_STREAM_ID, + &streamIdLen, + &streamId))) { + return std::nullopt; + } + + return streamId; + } + + std::string QuicCommunication::getProtocolSettingsString( + const structures::ExternalConnectionSettings &settings, + std::string_view key + ) { + const auto &raw = settings.protocolSettings.at(std::string(key)); + + if (nlohmann::json::accept(raw)) { + auto j = nlohmann::json::parse(raw); + if (j.is_string()) { + return j.get(); + } + } + return raw; + } +} diff --git a/source/bringauto/settings/QuicSettingsParser.cpp b/source/bringauto/settings/QuicSettingsParser.cpp new file mode 100644 index 0000000..4c3dab7 --- /dev/null +++ b/source/bringauto/settings/QuicSettingsParser.cpp @@ -0,0 +1,64 @@ +#include +#include +#include + +#include + + +namespace bringauto::settings { + QUIC_SETTINGS QuicSettingsParser::parse( + const structures::ExternalConnectionSettings &settings + ) { + QUIC_SETTINGS quic{}; + + if (auto value = getOptionalUint(settings, "IdleTimeoutMs")) { + settings::Logger::logDebug("[quic] [settings] IdleTimeoutMs settings loaded"); + quic.IdleTimeoutMs = *value; + quic.IsSet.IdleTimeoutMs = TRUE; + } + + if (auto value = getOptionalUint(settings, "HandshakeIdleTimeoutMs")) { + settings::Logger::logDebug("[quic] [settings] HandshakeIdleTimeoutMs settings loaded"); + quic.HandshakeIdleTimeoutMs = *value; + quic.IsSet.HandshakeIdleTimeoutMs = TRUE; + } + + if (auto value = getOptionalUint(settings, "DisconnectTimeoutMs")) { + settings::Logger::logDebug("[quic] [settings] DisconnectTimeoutMs settings loaded"); + quic.DisconnectTimeoutMs = *value; + quic.IsSet.DisconnectTimeoutMs = TRUE; + } + + return quic; + } + + std::optional QuicSettingsParser::getOptionalUint( + const structures::ExternalConnectionSettings &settings, + std::string_view key + ) { + const auto it = settings.protocolSettings.find(std::string(key)); + if (it == settings.protocolSettings.end()) { + return std::nullopt; + } + + const auto &raw = it->second; + + if (!nlohmann::json::accept(raw)) { + settings::Logger::logWarning( + "[quic] QUIC setting '{}' is not valid JSON", key + ); + return std::nullopt; + } + + auto j = nlohmann::json::parse(raw); + + if (!j.is_number_unsigned()) { + settings::Logger::logWarning( + "[quic] QUIC setting '{}' must be an unsigned integer", key + ); + return std::nullopt; + } + + return j.get(); + } +} diff --git a/source/bringauto/settings/SettingsParser.cpp b/source/bringauto/settings/SettingsParser.cpp index ae2e332..ce60789 100644 --- a/source/bringauto/settings/SettingsParser.cpp +++ b/source/bringauto/settings/SettingsParser.cpp @@ -186,6 +186,9 @@ void SettingsParser::fillExternalConnectionSettings(const nlohmann::json &file) case structures::ProtocolType::MQTT: settingsName = std::string(Constants::MQTT_SETTINGS); break; + case structures::ProtocolType::QUIC: + settingsName = std::string(Constants::QUIC_SETTINGS); + break; case structures::ProtocolType::DUMMY: break; case structures::ProtocolType::INVALID: @@ -241,6 +244,9 @@ std::string SettingsParser::serializeToJson() const { case structures::ProtocolType::MQTT: settingsName = std::string(Constants::MQTT_SETTINGS); break; + case structures::ProtocolType::QUIC: + settingsName = std::string(Constants::QUIC_SETTINGS); + break; case structures::ProtocolType::DUMMY: settingsName = std::string(Constants::DUMMY); break;