From 92a3a8e012b130c91e84256127e3297dc3f3d442 Mon Sep 17 00:00:00 2001 From: Chadlia Jerad Date: Fri, 27 Mar 2026 12:24:29 +0100 Subject: [PATCH 01/22] Add a per-federate transients[] bool array to federate_instance_t, indicating whether each federate is transient. This is useful for outbound messages manipulation --- include/core/federated/federate.h | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/include/core/federated/federate.h b/include/core/federated/federate.h index 864d314ac..21fa6be57 100644 --- a/include/core/federated/federate.h +++ b/include/core/federated/federate.h @@ -101,6 +101,13 @@ typedef struct federate_instance_t { */ int sockets_for_outbound_p2p_connections[NUMBER_OF_FEDERATES]; + /** + * An array indicating whether each federate is transient. + * The index is the federate ID. + * This is initialized at startup by the generated _lf_executable_preamble(). + */ + bool transients[NUMBER_OF_FEDERATES]; + /** * Thread ID for a thread that accepts sockets and then supervises * listening to those sockets for incoming P2P (physical) connections. From d13b350d50abccfa593409ab12f78703e13873bf Mon Sep 17 00:00:00 2001 From: Chadlia Jerad Date: Sun, 29 Mar 2026 01:33:54 +0100 Subject: [PATCH 02/22] Add the new attributes to a federate instance, and add the function of lf_handle_p2p_connection_to_transients thread --- core/federated/federate.c | 148 +++++++++++++++++++++++++++++- include/core/federated/federate.h | 31 +++++++ 2 files changed, 178 insertions(+), 1 deletion(-) diff --git a/core/federated/federate.c b/core/federated/federate.c index e0a75bce7..a327484ae 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -2165,7 +2165,12 @@ void* lf_handle_p2p_connections_from_federates(void* env_arg) { size_t received_federates = 0; // Allocate memory to store thread IDs. _fed.inbound_socket_listeners = (lf_thread_t*)calloc(_fed.number_of_inbound_p2p_connections, sizeof(lf_thread_t)); - while (received_federates < _fed.number_of_inbound_p2p_connections && !_lf_termination_executed) { + while (!_lf_termination_executed) { + // Case where all inbound connections are to persistent federates + if (received_federates == _fed.number_of_inbound_p2p_connections && + _fed.number_of_inbound_p2p_connections_to_transients == 0) { + break; + } // Wait for an incoming connection request. int socket_id = accept_socket(_fed.server_socket, _fed.socket_TCP_RTI); if (socket_id < 0) { @@ -2262,6 +2267,147 @@ void* lf_handle_p2p_connections_from_federates(void* env_arg) { return NULL; } +void* lf_handle_p2p_connections_to_transients(void* env_arg) { + LF_ASSERT_NON_NULL(env_arg); + environment_t* env = (environment_t*)env_arg; + (void)env; // Reserved for future use (e.g., locking env->mutex). + + for (uint16_t remote_federate_id = 0; remote_federate_id < NUMBER_OF_FEDERATES; remote_federate_id++) { + // Only handle outbound connections to transient federates. + if (!_fed.transients[remote_federate_id]) { + continue; + } + // Check that we actually have an outbound connection to this federate. + if (_fed.sockets_for_outbound_p2p_connections[remote_federate_id] >= 0) { + continue; // Already connected. + } + + int result = -1; + + // Ask the RTI for the port number of the remote transient federate. + unsigned char buffer[sizeof(int32_t) + INET_ADDRSTRLEN + 1]; + int port = -1; + struct in_addr host_ip_addr; + instant_t start_connect = lf_time_physical(); + + while (port == -1 && !_lf_termination_executed) { + buffer[0] = MSG_TYPE_ADDRESS_QUERY; + encode_uint16(remote_federate_id, &(buffer[1])); + + LF_PRINT_DEBUG("Sending address query for transient federate %d.", remote_federate_id); + tracepoint_federate_to_rti(send_ADR_QR, _lf_my_fed_id, NULL); + + LF_MUTEX_LOCK(&lf_outbound_socket_mutex); + write_to_socket_fail_on_error(&_fed.socket_TCP_RTI, sizeof(uint16_t) + 1, buffer, &lf_outbound_socket_mutex, + "Failed to send address query for transient federate %d to RTI.", + remote_federate_id); + LF_MUTEX_UNLOCK(&lf_outbound_socket_mutex); + + read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, sizeof(int32_t) + 1, buffer, + "Failed to read port number for transient federate %d from RTI.", + remote_federate_id); + + if (buffer[0] != MSG_TYPE_ADDRESS_QUERY_REPLY) { + if (buffer[0] == MSG_TYPE_FAILED) { + lf_print_error_and_exit("RTI has failed."); + } else { + lf_print_error_and_exit("Unexpected reply of type %hhu from RTI (see net_common.h).", buffer[0]); + } + } + port = extract_int32(&buffer[1]); + + read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, sizeof(host_ip_addr), (unsigned char*)&host_ip_addr, + "Failed to read IP address for transient federate %d from RTI.", + remote_federate_id); + tracepoint_federate_from_rti(receive_ADR_QR_REP, _lf_my_fed_id, NULL); + + // A port of -1 means the transient federate has not yet registered with the RTI. + // Wait and retry. + if (port == -1) { + if (CHECK_TIMEOUT(start_connect, CONNECT_TIMEOUT)) { + lf_print_warning("TIMEOUT obtaining address for transient federate %d. Skipping.", remote_federate_id); + break; + } + lf_sleep(ADDRESS_QUERY_RETRY_INTERVAL); + } + } + + if (port <= 0) { + continue; // Could not obtain address; transient federate may not be present. + } + + assert(port < 65536); + uint16_t uport = (uint16_t)port; + char hostname[INET_ADDRSTRLEN]; + inet_ntop(AF_INET, &host_ip_addr, hostname, INET_ADDRSTRLEN); + + int socket_id = create_real_time_tcp_socket_errexit(); + if (connect_to_socket(socket_id, (const char*)hostname, uport) < 0) { + lf_print_error("Failed to connect() to transient federate %d.", remote_federate_id); + shutdown_socket(&socket_id, false); + continue; + } + + start_connect = lf_time_physical(); + while (result < 0 && !_lf_termination_executed) { + if (CHECK_TIMEOUT(start_connect, CONNECT_TIMEOUT)) { + lf_print_error("Failed to connect to transient federate %d with timeout: " PRINTF_TIME ". Giving up.", + remote_federate_id, CONNECT_TIMEOUT); + break; + } + if (rti_failed()) { + break; + } + + // Send our federate ID and federation ID. + size_t buffer_length = 1 + sizeof(uint16_t) + 1 + 1; + unsigned char send_buffer[buffer_length]; + send_buffer[0] = MSG_TYPE_P2P_SENDING_FED_ID; + if (_lf_my_fed_id == UINT16_MAX) { + lf_print_error_and_exit("Too many federates! More than %d.", UINT16_MAX - 1); + } + encode_uint16((uint16_t)_lf_my_fed_id, (unsigned char*)&(send_buffer[1])); + send_buffer[1 + sizeof(uint16_t)] = _fed.is_transient ? 1 : 0; + unsigned char federation_id_length = (unsigned char)strnlen(federation_metadata.federation_id, 255); + send_buffer[sizeof(uint16_t) + 2] = federation_id_length; + tracepoint_federate_to_federate(send_FED_ID, _lf_my_fed_id, remote_federate_id, NULL); + + write_to_socket_fail_on_error(&socket_id, buffer_length, send_buffer, NULL, + "Failed to send fed_id to transient federate %d.", remote_federate_id); + write_to_socket_fail_on_error(&socket_id, federation_id_length, + (unsigned char*)federation_metadata.federation_id, NULL, + "Failed to send federation id to transient federate %d.", remote_federate_id); + + unsigned char ack_buffer[1]; + read_from_socket_fail_on_error(&socket_id, 1, ack_buffer, + "Failed to read MSG_TYPE_ACK from transient federate %d.", remote_federate_id); + if (ack_buffer[0] != MSG_TYPE_ACK) { + read_from_socket_fail_on_error(&socket_id, 1, ack_buffer, + "Failed to read error code from transient federate %d.", remote_federate_id); + lf_print_error("Received MSG_TYPE_REJECT from transient federate %d (code %d).", remote_federate_id, + ack_buffer[0]); + result = -1; + lf_sleep(ADDRESS_QUERY_RETRY_INTERVAL); + continue; + } else { + lf_print("Connected to transient federate %d, port %hu.", remote_federate_id, uport); + tracepoint_federate_to_federate(receive_ACK, _lf_my_fed_id, remote_federate_id, NULL); + result = 0; + break; + } + } + + if (result == 0) { + _fed.sockets_for_outbound_p2p_connections[remote_federate_id] = socket_id; + } else { + shutdown_socket(&socket_id, false); + } + } + + LF_PRINT_LOG("Done handling outbound P2P connections to transient federates."); + return NULL; +} + void lf_latest_tag_confirmed(tag_t tag_to_send) { environment_t* env; if (lf_tag_compare(_fed.last_sent_LTC, tag_to_send) >= 0) { diff --git a/include/core/federated/federate.h b/include/core/federated/federate.h index 21fa6be57..ffd05723b 100644 --- a/include/core/federated/federate.h +++ b/include/core/federated/federate.h @@ -71,6 +71,16 @@ typedef struct federate_instance_t { */ size_t number_of_outbound_p2p_connections; + /** + * Number of inbound peer-to-peer connections from transient federates. + */ + size_t number_of_inbound_p2p_connections_to_transients; + + /** + * Number of outbound peer-to-peer connections to transient federates. + */ + size_t number_of_outbound_p2p_connections_to_transients; + /** * An array that holds the socket descriptors for inbound * connections from each federate. The index will be the federate @@ -114,6 +124,13 @@ typedef struct federate_instance_t { */ lf_thread_t inbound_p2p_handling_thread_id; + /** + * Thread ID for a thread that manages outbound P2P connections to transient federates. + * Transient federates may join and leave the federation, so connections to them + * are handled separately from persistent federates. + */ + lf_thread_t outbound_p2p_transients_handling_thread_id; + /** * A socket descriptor for the socket server of the federate. * This is assigned in lf_create_server(). @@ -343,6 +360,20 @@ void lf_enqueue_port_absent_reactions(environment_t* env); */ void* lf_handle_p2p_connections_from_federates(void* ignored); +/** + * @brief Thread that manages outbound P2P connections to transient federates. + * @ingroup Federated + * + * For each transient federate that this federate has an outbound connection to, + * this thread queries the RTI for its address and establishes the socket + * connection using the same handshake protocol as lf_connect_to_federate(). + * Unlike persistent federates, transient federates may not be present at + * startup, so connections to them are handled in a dedicated thread. + * + * @param env_arg Pointer to the environment (environment_t*). + */ +void* lf_handle_p2p_connections_to_transients(void* env_arg); + /** * @brief Send a latest tag confirmed (LTC) signal to the RTI. * @ingroup Federated From 4823845b2361236183decae8b01126d08dee822e Mon Sep 17 00:00:00 2001 From: Chadlia Jerad Date: Sat, 4 Apr 2026 12:04:42 +0100 Subject: [PATCH 03/22] Tracing and visualizing UPSTREAM_CONNECTED and UPSTREAM_DISCONNECTED events --- core/federated/RTI/rti_remote.c | 6 ++++++ core/federated/federate.c | 2 ++ trace/api/types/trace_types.h | 8 ++++++++ util/tracing/visualization/fedsd.py | 9 ++++++++- 4 files changed, 24 insertions(+), 1 deletion(-) diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index bde3a0242..9fd0f54e4 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -246,6 +246,9 @@ static void send_upstream_connected_locked(federate_info_t* destination, federat if (write_to_socket_close_on_error(&destination->socket, MSG_TYPE_UPSTREAM_CONNECTED_LENGTH, buffer)) { lf_print_warning("RTI: Failed to send upstream connected message to federate %d.", destination->enclave.id); } + if (rti_remote->base.tracing_enabled) { + tracepoint_rti_to_federate(send_UPSTREAM_CONNECTED, destination->enclave.id, NULL); + } } /** @@ -262,6 +265,9 @@ static void send_upstream_disconnected_locked(federate_info_t* destination, fede if (write_to_socket_close_on_error(&destination->socket, MSG_TYPE_UPSTREAM_DISCONNECTED_LENGTH, buffer)) { lf_print_warning("RTI: Failed to send upstream disconnected message to federate %d.", disconnected->enclave.id); } + if (rti_remote->base.tracing_enabled) { + tracepoint_rti_to_federate(send_UPSTREAM_DISCONNECTED, destination->enclave.id, NULL); + } } /** diff --git a/core/federated/federate.c b/core/federated/federate.c index a327484ae..c5e921219 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -971,6 +971,7 @@ static void handle_upstream_connected_message(void) { read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, bytes_to_read, buffer, NULL, "Failed to read upstream connected message from RTI."); uint16_t connected = extract_uint16(buffer); + tracepoint_federate_from_rti(receive_UPSTREAM_CONNECTED, _lf_my_fed_id, NULL); LF_PRINT_DEBUG("Received notification that upstream federate %d has connected", connected); // Mark the upstream as connected. for (size_t i = 0; i < _lf_zero_delay_cycle_action_table_size; i++) { @@ -990,6 +991,7 @@ static void handle_upstream_disconnected_message(void) { read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, bytes_to_read, buffer, NULL, "Failed to read upstream disconnected message from RTI."); uint16_t disconnected = extract_uint16(buffer); + tracepoint_federate_from_rti(receive_UPSTREAM_DISCONNECTED, _lf_my_fed_id, NULL); LF_PRINT_DEBUG("Received notification that upstream federate %d has disconnected", disconnected); // Mark the upstream as disconnected. for (size_t i = 0; i < _lf_zero_delay_cycle_action_table_size; i++) { diff --git a/trace/api/types/trace_types.h b/trace/api/types/trace_types.h index f13dc7ad5..30ca4a3b8 100644 --- a/trace/api/types/trace_types.h +++ b/trace/api/types/trace_types.h @@ -53,6 +53,8 @@ typedef enum { send_ADR_QR, send_ADR_QR_REP, send_DNET, + send_UPSTREAM_CONNECTED, + send_UPSTREAM_DISCONNECTED, // Receiving messages receive_ACK, receive_FAILED, @@ -77,6 +79,8 @@ typedef enum { receive_ADR_QR, receive_ADR_QR_REP, receive_DNET, + receive_UPSTREAM_CONNECTED, + receive_UPSTREAM_DISCONNECTED, receive_UNIDENTIFIED, send_STOP, receive_STOP, @@ -123,6 +127,8 @@ static const char* trace_event_names[] = { "Sending ADR_QR", "Sending ADR_QR_REP", "Sending DNET", + "Sending UPSTREAM_CONNECTED", + "Sending UPSTREAM_DISCONNECTED", // Receiving messages "Receiving ACK", "Receiving FAILED", @@ -147,6 +153,8 @@ static const char* trace_event_names[] = { "Receiving ADR_QR", "Receiving ADR_QR_REP", "Receiving DNET", + "Receiving UPSTREAM_CONNECTED", + "Receiving UPSTREAM_DISCONNECTED", "Receiving UNIDENTIFIED", "Sending STOP", "Receiving STOP", diff --git a/util/tracing/visualization/fedsd.py b/util/tracing/visualization/fedsd.py index 7d1a08e64..e084314af 100644 --- a/util/tracing/visualization/fedsd.py +++ b/util/tracing/visualization/fedsd.py @@ -30,6 +30,8 @@ .DNET { stroke: #7b2d8b; fill: #7b2d8b; } \ .TIMESTAMP { stroke: #888888; fill: #888888; } \ .FED_ID { stroke: #80DD99; fill: #80DD99; } \ + .UPSTREAM_CONNECTED { stroke: #f4a261; fill: #f4a261; } \ + .UPSTREAM_DISCONNECTED { stroke: #e76f51; fill: #e76f51; } \ .ACK { stroke: #52b788; fill: #52b788; } \ .FAILED { stroke: #c1121f; fill: #c1121f; } \ .STOP {stroke: #d0b7eb; fill: #d0b7eb} \ @@ -66,6 +68,8 @@ "Sending STOP_REQ_REP": "STOP_REQ_REP", "Sending STOP_GRN": "STOP_GRN", "Sending FED_ID": "FED_ID", + "Sending UPSTREAM_CONNECTED": "UPSTREAM_CONNECTED", + "Sending UPSTREAM_DISCONNECTED": "UPSTREAM_DISCONNECTED", "Sending PTAG": "PTAG", "Sending TAG": "TAG", "Sending REJECT": "REJECT", @@ -89,6 +93,8 @@ "Receiving STOP_REQ_REP": "STOP_REQ_REP", "Receiving STOP_GRN": "STOP_GRN", "Receiving FED_ID": "FED_ID", + "Receiving UPSTREAM_CONNECTED": "UPSTREAM_CONNECTED", + "Receiving UPSTREAM_DISCONNECTED": "UPSTREAM_DISCONNECTED", "Receiving PTAG": "PTAG", "Receiving TAG": "TAG", "Receiving REJECT": "REJECT", @@ -150,7 +156,8 @@ def format_actor_name(name): # Events matching at the sender and receiver ends depend on whether they are tagged # (the elapsed logical time and microstep have to be the same) or not. # Set of non-tagged events (messages) -non_tagged_messages = {'FED_ID', 'ACK', 'RESIGN', 'FAILED', 'REJECT', 'ADR_QR', 'ADR_QR_REP', 'ADR_AD', 'MSG', 'P2P_MSG', 'STOP'} +non_tagged_messages = {'FED_ID', 'UPSTREAM_CONNECTED', 'UPSTREAM_DISCONNECTED', + 'ACK', 'RESIGN', 'FAILED', 'REJECT', 'ADR_QR', 'ADR_QR_REP', 'ADR_AD', 'MSG', 'P2P_MSG', 'STOP'} ################################################################################ From 30eae00d21dc497c46b8fcaf9bc53dee35bcccbb Mon Sep 17 00:00:00 2001 From: Chadlia Jerad Date: Thu, 2 Apr 2026 12:29:09 +0100 Subject: [PATCH 04/22] Add MSG_TYPE_DOWNSTREAM_CONNECTED message, handler and tracing --- core/federated/RTI/rti_remote.c | 28 +++++++++++++++++++++ core/federated/federate.c | 27 ++++++++++++++++++++ include/core/federated/network/net_common.h | 9 +++++++ trace/api/types/trace_types.h | 6 +++++ util/tracing/visualization/fedsd.py | 5 +++- 5 files changed, 74 insertions(+), 1 deletion(-) diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index 9fd0f54e4..1c393655d 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -270,6 +270,34 @@ static void send_upstream_disconnected_locked(federate_info_t* destination, fede } } +/** + * @brief Send MSG_TYPE_DOWNSTREAM_CONNECTED to the specified upstream federate. + * + * This notifies an upstream federate that a transient federate downstream of it has + * (re-)connected, so the upstream should query the RTI for its address and establish + * (or re-establish) the outbound P2P connection. + * + * This function assumes that the mutex lock is already held. + * @param destination The upstream federate to notify. + * @param connected The transient federate that has connected. + */ +static void send_downstream_connected_locked(federate_info_t* destination, federate_info_t* connected) { + // if (destination->enclave.state == NOT_CONNECTED) { + // LF_PRINT_LOG("RTI did not send downstream connected message to federate %d, because it is not connected.", + // destination->enclave.id); + // return; + // } + unsigned char buffer[MSG_TYPE_DOWNSTREAM_CONNECTED_LENGTH]; + buffer[0] = MSG_TYPE_DOWNSTREAM_CONNECTED; + encode_uint16(connected->enclave.id, &buffer[1]); + if (write_to_socket_close_on_error(&destination->socket, MSG_TYPE_DOWNSTREAM_CONNECTED_LENGTH, buffer)) { + lf_print_warning("RTI: Failed to send downstream connected message to federate %d.", destination->enclave.id); + } + if (rti_remote->base.tracing_enabled) { + tracepoint_rti_to_federate(send_DOWNSTREAM_CONNECTED, destination->enclave.id, NULL); + } +} + /** * @brief Mark a federate as disconnected and, if this is a transient, inform downstream federates. * @param fed The disconnected federate. diff --git a/core/federated/federate.c b/core/federated/federate.c index c5e921219..66f69d29c 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -1001,6 +1001,26 @@ static void handle_upstream_disconnected_message(void) { } } +/** + * @brief Handle message from the RTI that a transient downstream federate has connected. + * + * Reads the downstream federate's ID, then synchronously queries the RTI for its address + * and establishes (or re-establishes) the outbound P2P connection to it. + * This function is called inline from listen_to_rti_TCP or get_start_time_from_rti, + * so it reads the address-query reply directly from socket_TCP_RTI. + */ +static void handle_downstream_connected_message(void) { + size_t bytes_to_read = sizeof(uint16_t); + unsigned char buffer[bytes_to_read]; + read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, bytes_to_read, buffer, NULL, + "Failed to read downstream connected message from RTI."); + uint16_t remote_federate_id = extract_uint16(buffer); + tracepoint_federate_from_rti(receive_DOWNSTREAM_CONNECTED, _lf_my_fed_id, NULL); + LF_PRINT_DEBUG("Received notification that downstream transient federate %d has connected.", remote_federate_id); + + lf_connect_to_federate(remote_federate_id, true); +} + /** * Send the specified timestamp to the RTI and wait for a response. * The specified timestamp should be current physical time of the @@ -1034,6 +1054,10 @@ static instant_t get_start_time_from_rti(instant_t my_physical_time) { // We need to handle this message and continue waiting for MSG_TYPE_TIMESTAMP to arrive handle_upstream_disconnected_message(); continue; + } else if (buffer[0] == MSG_TYPE_DOWNSTREAM_CONNECTED) { + // We need to handle this message and continue waiting for MSG_TYPE_TIMESTAMP to arrive + handle_downstream_connected_message(); + continue; } else { lf_print_error_and_exit("Expected a MSG_TYPE_TIMESTAMP message from the RTI. Got %u (see net_common.h).", buffer[0]); @@ -1718,6 +1742,9 @@ static void* listen_to_rti_TCP(void* args) { case MSG_TYPE_UPSTREAM_DISCONNECTED: handle_upstream_disconnected_message(); break; + case MSG_TYPE_DOWNSTREAM_CONNECTED: + handle_downstream_connected_message(); + break; case MSG_TYPE_CLOCK_SYNC_T1: case MSG_TYPE_CLOCK_SYNC_T4: lf_print_error("Federate %d received unexpected clock sync message from RTI on TCP socket.", _lf_my_fed_id); diff --git a/include/core/federated/network/net_common.h b/include/core/federated/network/net_common.h index ce1e0522f..09508c5e4 100644 --- a/include/core/federated/network/net_common.h +++ b/include/core/federated/network/net_common.h @@ -776,6 +776,15 @@ #define MSG_TYPE_UPSTREAM_DISCONNECTED 28 #define MSG_TYPE_UPSTREAM_DISCONNECTED_LENGTH (1 + sizeof(uint16_t)) +/** + * A message that informs an upstream federate that a transient federate downstream of it + * has (re-)connected. The next 2 bytes are the federate ID of the downstream federate. + * Upon receiving this, the upstream federate should query the RTI for the downstream's + * address and establish (or re-establish) the outbound P2P connection. + */ +#define MSG_TYPE_DOWNSTREAM_CONNECTED 30 +#define MSG_TYPE_DOWNSTREAM_CONNECTED_LENGTH (1 + sizeof(uint16_t)) + /** * Byte sent by the RTI ordering the federate to stop. Upon receiving the message, * the federate will call lf_stop(), which will make it resign at its current_tag diff --git a/trace/api/types/trace_types.h b/trace/api/types/trace_types.h index 30ca4a3b8..262418799 100644 --- a/trace/api/types/trace_types.h +++ b/trace/api/types/trace_types.h @@ -84,6 +84,9 @@ typedef enum { receive_UNIDENTIFIED, send_STOP, receive_STOP, + // New entries must be added here, at the end, to avoid shifting existing indices. + send_DOWNSTREAM_CONNECTED, + receive_DOWNSTREAM_CONNECTED, NUM_EVENT_TYPES } trace_event_t; @@ -158,6 +161,9 @@ static const char* trace_event_names[] = { "Receiving UNIDENTIFIED", "Sending STOP", "Receiving STOP", + // New entries appended at the end to avoid shifting existing indices. + "Sending DOWNSTREAM_CONNECTED", + "Receiving DOWNSTREAM_CONNECTED", }; static inline void _suppress_unused_variable_warning_for_static_variable() { (void)trace_event_names; } diff --git a/util/tracing/visualization/fedsd.py b/util/tracing/visualization/fedsd.py index e084314af..45a904135 100644 --- a/util/tracing/visualization/fedsd.py +++ b/util/tracing/visualization/fedsd.py @@ -32,6 +32,7 @@ .FED_ID { stroke: #80DD99; fill: #80DD99; } \ .UPSTREAM_CONNECTED { stroke: #f4a261; fill: #f4a261; } \ .UPSTREAM_DISCONNECTED { stroke: #e76f51; fill: #e76f51; } \ + .DOWNSTREAM_CONNECTED { stroke: #2a9d8f; fill: #2a9d8f; } \ .ACK { stroke: #52b788; fill: #52b788; } \ .FAILED { stroke: #c1121f; fill: #c1121f; } \ .STOP {stroke: #d0b7eb; fill: #d0b7eb} \ @@ -70,6 +71,7 @@ "Sending FED_ID": "FED_ID", "Sending UPSTREAM_CONNECTED": "UPSTREAM_CONNECTED", "Sending UPSTREAM_DISCONNECTED": "UPSTREAM_DISCONNECTED", + "Sending DOWNSTREAM_CONNECTED": "DOWNSTREAM_CONNECTED", "Sending PTAG": "PTAG", "Sending TAG": "TAG", "Sending REJECT": "REJECT", @@ -95,6 +97,7 @@ "Receiving FED_ID": "FED_ID", "Receiving UPSTREAM_CONNECTED": "UPSTREAM_CONNECTED", "Receiving UPSTREAM_DISCONNECTED": "UPSTREAM_DISCONNECTED", + "Receiving DOWNSTREAM_CONNECTED": "DOWNSTREAM_CONNECTED", "Receiving PTAG": "PTAG", "Receiving TAG": "TAG", "Receiving REJECT": "REJECT", @@ -156,7 +159,7 @@ def format_actor_name(name): # Events matching at the sender and receiver ends depend on whether they are tagged # (the elapsed logical time and microstep have to be the same) or not. # Set of non-tagged events (messages) -non_tagged_messages = {'FED_ID', 'UPSTREAM_CONNECTED', 'UPSTREAM_DISCONNECTED', +non_tagged_messages = {'FED_ID', 'UPSTREAM_CONNECTED', 'UPSTREAM_DISCONNECTED', 'DOWNSTREAM_CONNECTED', 'ACK', 'RESIGN', 'FAILED', 'REJECT', 'ADR_QR', 'ADR_QR_REP', 'ADR_AD', 'MSG', 'P2P_MSG', 'STOP'} From c180ad0297b77fd2f299efe32e5abeed11030208 Mon Sep 17 00:00:00 2001 From: Chadlia Jerad Date: Thu, 2 Apr 2026 12:52:40 +0100 Subject: [PATCH 05/22] Add is_transient byte to MSG_TYPE_ADDRESS_QUERY --- core/federated/RTI/rti_remote.c | 4 +++- include/core/federated/federate.h | 4 ++++ include/core/federated/network/net_common.h | 1 + 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index 1c393655d..4f74b933d 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -946,9 +946,11 @@ void handle_address_query(uint16_t fed_id) { // Use buffer both for reading and constructing the reply. // The length is what is needed for the reply. unsigned char buffer[1 + sizeof(int32_t)]; - read_from_socket_fail_on_error(&fed->socket, sizeof(uint16_t), (unsigned char*)buffer, + // Read remote_fed_id (2 bytes) + is_transient flag (1 byte). + read_from_socket_fail_on_error(&fed->socket, sizeof(uint16_t) + 1, (unsigned char*)buffer, "Failed to read address query."); uint16_t remote_fed_id = extract_uint16(buffer); + bool remote_is_transient = (buffer[sizeof(uint16_t)] == 1); if (rti_remote->base.tracing_enabled) { tracepoint_rti_from_federate(receive_ADR_QR, fed_id, NULL); diff --git a/include/core/federated/federate.h b/include/core/federated/federate.h index ffd05723b..19532239d 100644 --- a/include/core/federated/federate.h +++ b/include/core/federated/federate.h @@ -299,6 +299,10 @@ extern lf_cond_t lf_port_status_changed; * refer to the socket for communicating directly with the federate. * * @param remote_federate_id The ID of the remote federate. + * @param is_transient Whether the remote federate is transient. This affects + * connection behavior: a transient remote federate may not be immediately + * available, so the connection attempt is handled differently than for a + * persistent federate. */ void lf_connect_to_federate(uint16_t remote_federate_id); diff --git a/include/core/federated/network/net_common.h b/include/core/federated/network/net_common.h index 09508c5e4..f016c963d 100644 --- a/include/core/federated/network/net_common.h +++ b/include/core/federated/network/net_common.h @@ -576,6 +576,7 @@ * @ingroup Federated * * The next two bytes are the other federate's ID. + * The following byte is 1 if the remote federate being queried is transient, 0 otherwise. */ #define MSG_TYPE_ADDRESS_QUERY 13 From 7108c8c273c54ac4660ea4e7e260adb9d603991c Mon Sep 17 00:00:00 2001 From: Chadlia Jerad Date: Thu, 2 Apr 2026 12:56:07 +0100 Subject: [PATCH 06/22] Update lf_connect_to_federate() to account for the type of federate --- core/federated/federate.c | 14 +++++++++++--- include/core/federated/federate.h | 2 +- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/core/federated/federate.c b/core/federated/federate.c index 66f69d29c..31903b7a5 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -1881,7 +1881,7 @@ void lf_terminate_execution(environment_t* env) { ////////////////////////////////////////////////////////////////////////////////// // Public functions (declared in federate.h, in alphabetical order) -void lf_connect_to_federate(uint16_t remote_federate_id) { +void lf_connect_to_federate(uint16_t remote_federate_id, bool is_transient) { int result = -1; // Ask the RTI for port number of the remote federate. @@ -1891,17 +1891,21 @@ void lf_connect_to_federate(uint16_t remote_federate_id) { int port = -1; struct in_addr host_ip_addr; instant_t start_connect = lf_time_physical(); + // If the remote federate if oersistent, iterate until we get a valid port number from the RTI, + // If not, execute only once, as a request registration. while (port == -1 && !_lf_termination_executed) { buffer[0] = MSG_TYPE_ADDRESS_QUERY; // NOTE: Sending messages in little endian. encode_uint16(remote_federate_id, &(buffer[1])); + // Indicate whether the remote federate being queried is transient. + buffer[1 + sizeof(uint16_t)] = is_transient ? 1 : 0; LF_PRINT_DEBUG("Sending address query for federate %d.", remote_federate_id); // Trace the event when tracing is enabled tracepoint_federate_to_rti(send_ADR_QR, _lf_my_fed_id, NULL); LF_MUTEX_LOCK(&lf_outbound_socket_mutex); - write_to_socket_fail_on_error(&_fed.socket_TCP_RTI, sizeof(uint16_t) + 1, buffer, &lf_outbound_socket_mutex, + write_to_socket_fail_on_error(&_fed.socket_TCP_RTI, 1 + sizeof(uint16_t) + 1, buffer, &lf_outbound_socket_mutex, "Failed to send address query for federate %d to RTI.", remote_federate_id); LF_MUTEX_UNLOCK(&lf_outbound_socket_mutex); @@ -1928,12 +1932,16 @@ void lf_connect_to_federate(uint16_t remote_federate_id) { // the port number of the remote federate, presumably because the // remote federate has not yet sent an MSG_TYPE_ADDRESS_ADVERTISEMENT message to the RTI. // Sleep for some time before retrying. - if (port == -1) { + if (port == -1 && !is_transient) { if (CHECK_TIMEOUT(start_connect, CONNECT_TIMEOUT)) { lf_print_error_and_exit("TIMEOUT obtaining IP/port for federate %d from the RTI.", remote_federate_id); } // Wait ADDRESS_QUERY_RETRY_INTERVAL nanoseconds. lf_sleep(ADDRESS_QUERY_RETRY_INTERVAL); + } else if (port == -1 && is_transient) { + // For transient federates, we only execute once, as a request registration. If the RTI does not reply + // with a valid port number, we treat it normally and return. + return; } } assert(port < 65536); diff --git a/include/core/federated/federate.h b/include/core/federated/federate.h index 19532239d..7baf4c0b1 100644 --- a/include/core/federated/federate.h +++ b/include/core/federated/federate.h @@ -304,7 +304,7 @@ extern lf_cond_t lf_port_status_changed; * available, so the connection attempt is handled differently than for a * persistent federate. */ -void lf_connect_to_federate(uint16_t remote_federate_id); +void lf_connect_to_federate(uint16_t remote_federate_id, bool is_transient); /** * @brief Connect to the RTI at the specified host and port. From 3046958affdd39a50eb94aa3fc7c9d0e926df0d0 Mon Sep 17 00:00:00 2001 From: Chadlia Jerad Date: Thu, 2 Apr 2026 14:12:05 +0100 Subject: [PATCH 07/22] Manage downstream connection messages to request address in case of transients --- core/federated/RTI/rti_remote.c | 72 ++++++++++++--- core/federated/RTI/rti_remote.h | 6 ++ core/federated/federate.c | 147 ++---------------------------- include/core/federated/federate.h | 21 ----- 4 files changed, 69 insertions(+), 177 deletions(-) diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index 4f74b933d..972102d14 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -278,23 +278,29 @@ static void send_upstream_disconnected_locked(federate_info_t* destination, fede * (or re-establish) the outbound P2P connection. * * This function assumes that the mutex lock is already held. - * @param destination The upstream federate to notify. - * @param connected The transient federate that has connected. + * @param my_fed The transient federate that has just connected. */ -static void send_downstream_connected_locked(federate_info_t* destination, federate_info_t* connected) { - // if (destination->enclave.state == NOT_CONNECTED) { - // LF_PRINT_LOG("RTI did not send downstream connected message to federate %d, because it is not connected.", - // destination->enclave.id); - // return; - // } +static void send_downstream_connected_locked(federate_info_t* my_fed) { unsigned char buffer[MSG_TYPE_DOWNSTREAM_CONNECTED_LENGTH]; buffer[0] = MSG_TYPE_DOWNSTREAM_CONNECTED; - encode_uint16(connected->enclave.id, &buffer[1]); - if (write_to_socket_close_on_error(&destination->socket, MSG_TYPE_DOWNSTREAM_CONNECTED_LENGTH, buffer)) { - lf_print_warning("RTI: Failed to send downstream connected message to federate %d.", destination->enclave.id); - } - if (rti_remote->base.tracing_enabled) { - tracepoint_rti_to_federate(send_DOWNSTREAM_CONNECTED, destination->enclave.id, NULL); + encode_uint16(my_fed->enclave.id, &buffer[1]); + // Iterate over all federates and notify those that have my_fed as an outbound transient. + for (int i = 0; i < rti_remote->base.number_of_scheduling_nodes; i++) { + federate_info_t* fed = GET_FED_INFO(i); + if (fed->enclave.state == NOT_CONNECTED) { + continue; + } + for (int32_t j = 0; j < fed->number_of_outbound_transients; j++) { + if (fed->outbound_transients[j] == (int32_t)my_fed->enclave.id) { + if (write_to_socket_close_on_error(&fed->socket, MSG_TYPE_DOWNSTREAM_CONNECTED_LENGTH, buffer)) { + lf_print_warning("RTI: Failed to send downstream connected message to federate %d.", fed->enclave.id); + } + if (rti_remote->base.tracing_enabled) { + tracepoint_rti_to_federate(send_DOWNSTREAM_CONNECTED, fed->enclave.id, NULL); + } + break; + } + } } } @@ -958,6 +964,28 @@ void handle_address_query(uint16_t fed_id) { LF_PRINT_DEBUG("RTI received address query from %d for %d.", fed_id, remote_fed_id); + // If the queried federate is transient, record it in the querying federate's + // outbound_transients array (if not already present). + if (remote_is_transient) { + LF_MUTEX_LOCK(&rti_mutex); + bool already_registered = false; + int32_t i = 0; + for (; i < fed->number_of_outbound_transients; i++) { + if (fed->outbound_transients[i] == (int32_t)remote_fed_id) { + already_registered = true; + break; + } else if (fed->outbound_transients[i] == -1) { + // This means we have found an empty slot, so we can stop looking. + break; + } + } + if (!already_registered) { + fed->outbound_transients[i] = (int32_t)remote_fed_id; + fed->number_of_outbound_transients++; + } + LF_MUTEX_UNLOCK(&rti_mutex); + } + // NOTE: server_port initializes to -1, which means the RTI does not know // the port number because it has not yet received an MSG_TYPE_ADDRESS_ADVERTISEMENT message // from this federate. In that case, it will respond by sending -1. @@ -1034,13 +1062,15 @@ static void send_start_tag_locked(federate_info_t* my_fed, instant_t federation_ // Notify my_fed of any upstream transient federates that are connected. // This has to occur before sending the start tag so that my_fed does not begin executing thinking that these // upstream federates are not connected. + lf_print_info("================================== number of immediate upstreams: %d of %d", my_fed->enclave.num_immediate_upstreams, my_fed->enclave.id); for (int i = 0; i < my_fed->enclave.num_immediate_upstreams; i++) { federate_info_t* fed = GET_FED_INFO(my_fed->enclave.immediate_upstreams[i]); if (fed->is_transient && fed->enclave.state == GRANTED) { send_upstream_connected_locked(my_fed, fed); } } - + send_downstream_connected_locked(my_fed); + // Send back to the federate the maximum time plus an offset on a TIMESTAMP_START // message. // If it is a persistent federate, only the start_time is sent. If, however, it is a transient @@ -1067,6 +1097,7 @@ static void send_start_tag_locked(federate_info_t* my_fed, instant_t federation_ // If this is a transient federate, notify its downstream federates that it is now connected. if (my_fed->is_transient) { + // Notify downstreams: their upstream (my_fed) has connected. for (int i = 0; i < my_fed->enclave.num_immediate_downstreams; i++) { send_upstream_connected_locked(GET_FED_INFO(my_fed->enclave.immediate_downstreams[i]), my_fed); } @@ -2314,6 +2345,12 @@ void initialize_federate(federate_info_t* fed, uint16_t id) { fed->has_upstream_transient_federates = false; fed->is_transient = true; fed->effective_start_tag = NEVER_TAG; + fed->number_of_outbound_transients = 0; + int32_t num_transients = rti_remote->number_of_transient_federates; + fed->outbound_transients = (int32_t*)malloc(num_transients * sizeof(int32_t)); + for (int32_t i = 0; i < num_transients; i++) { + fed->outbound_transients[i] = -1; + } } void reset_transient_federate(federate_info_t* fed) { @@ -2333,6 +2370,11 @@ void reset_transient_federate(federate_info_t* fed) { fed->server_port = -1; fed->requested_stop = false; fed->effective_start_tag = NEVER_TAG; + fed->number_of_outbound_transients = 0; + int32_t num_transients = rti_remote->number_of_transient_federates; + for (int32_t i = 0; i < num_transients; i++) { + fed->outbound_transients[i] = -1; + } // Whenver a transient resigns or leaves, invalidate all federates, so that all min_delays_upstream // get re-computed. // FIXME: Maybe optimize it to only invalidate those affected by the transient diff --git a/core/federated/RTI/rti_remote.h b/core/federated/RTI/rti_remote.h index 145040a89..0b8c69a4f 100644 --- a/core/federated/RTI/rti_remote.h +++ b/core/federated/RTI/rti_remote.h @@ -82,6 +82,12 @@ typedef struct federate_info_t { bool is_transient; /** @brief Records the start time of the federate, which is mainly useful for transient federates. */ tag_t effective_start_tag; + /** @brief Number of outbound connections to transient federates. */ + int32_t number_of_outbound_transients; + /** @brief IDs of transient federates this federate has outbound connections to. + * The array has size equal to the total number of transient federates in the federation, + * and entries are initialized (and reset) to -1. */ + int32_t* outbound_transients; } federate_info_t; /** diff --git a/core/federated/federate.c b/core/federated/federate.c index 31903b7a5..b08d5f019 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -2304,147 +2304,6 @@ void* lf_handle_p2p_connections_from_federates(void* env_arg) { return NULL; } -void* lf_handle_p2p_connections_to_transients(void* env_arg) { - LF_ASSERT_NON_NULL(env_arg); - environment_t* env = (environment_t*)env_arg; - (void)env; // Reserved for future use (e.g., locking env->mutex). - - for (uint16_t remote_federate_id = 0; remote_federate_id < NUMBER_OF_FEDERATES; remote_federate_id++) { - // Only handle outbound connections to transient federates. - if (!_fed.transients[remote_federate_id]) { - continue; - } - // Check that we actually have an outbound connection to this federate. - if (_fed.sockets_for_outbound_p2p_connections[remote_federate_id] >= 0) { - continue; // Already connected. - } - - int result = -1; - - // Ask the RTI for the port number of the remote transient federate. - unsigned char buffer[sizeof(int32_t) + INET_ADDRSTRLEN + 1]; - int port = -1; - struct in_addr host_ip_addr; - instant_t start_connect = lf_time_physical(); - - while (port == -1 && !_lf_termination_executed) { - buffer[0] = MSG_TYPE_ADDRESS_QUERY; - encode_uint16(remote_federate_id, &(buffer[1])); - - LF_PRINT_DEBUG("Sending address query for transient federate %d.", remote_federate_id); - tracepoint_federate_to_rti(send_ADR_QR, _lf_my_fed_id, NULL); - - LF_MUTEX_LOCK(&lf_outbound_socket_mutex); - write_to_socket_fail_on_error(&_fed.socket_TCP_RTI, sizeof(uint16_t) + 1, buffer, &lf_outbound_socket_mutex, - "Failed to send address query for transient federate %d to RTI.", - remote_federate_id); - LF_MUTEX_UNLOCK(&lf_outbound_socket_mutex); - - read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, sizeof(int32_t) + 1, buffer, - "Failed to read port number for transient federate %d from RTI.", - remote_federate_id); - - if (buffer[0] != MSG_TYPE_ADDRESS_QUERY_REPLY) { - if (buffer[0] == MSG_TYPE_FAILED) { - lf_print_error_and_exit("RTI has failed."); - } else { - lf_print_error_and_exit("Unexpected reply of type %hhu from RTI (see net_common.h).", buffer[0]); - } - } - port = extract_int32(&buffer[1]); - - read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, sizeof(host_ip_addr), (unsigned char*)&host_ip_addr, - "Failed to read IP address for transient federate %d from RTI.", - remote_federate_id); - tracepoint_federate_from_rti(receive_ADR_QR_REP, _lf_my_fed_id, NULL); - - // A port of -1 means the transient federate has not yet registered with the RTI. - // Wait and retry. - if (port == -1) { - if (CHECK_TIMEOUT(start_connect, CONNECT_TIMEOUT)) { - lf_print_warning("TIMEOUT obtaining address for transient federate %d. Skipping.", remote_federate_id); - break; - } - lf_sleep(ADDRESS_QUERY_RETRY_INTERVAL); - } - } - - if (port <= 0) { - continue; // Could not obtain address; transient federate may not be present. - } - - assert(port < 65536); - uint16_t uport = (uint16_t)port; - char hostname[INET_ADDRSTRLEN]; - inet_ntop(AF_INET, &host_ip_addr, hostname, INET_ADDRSTRLEN); - - int socket_id = create_real_time_tcp_socket_errexit(); - if (connect_to_socket(socket_id, (const char*)hostname, uport) < 0) { - lf_print_error("Failed to connect() to transient federate %d.", remote_federate_id); - shutdown_socket(&socket_id, false); - continue; - } - - start_connect = lf_time_physical(); - while (result < 0 && !_lf_termination_executed) { - if (CHECK_TIMEOUT(start_connect, CONNECT_TIMEOUT)) { - lf_print_error("Failed to connect to transient federate %d with timeout: " PRINTF_TIME ". Giving up.", - remote_federate_id, CONNECT_TIMEOUT); - break; - } - if (rti_failed()) { - break; - } - - // Send our federate ID and federation ID. - size_t buffer_length = 1 + sizeof(uint16_t) + 1 + 1; - unsigned char send_buffer[buffer_length]; - send_buffer[0] = MSG_TYPE_P2P_SENDING_FED_ID; - if (_lf_my_fed_id == UINT16_MAX) { - lf_print_error_and_exit("Too many federates! More than %d.", UINT16_MAX - 1); - } - encode_uint16((uint16_t)_lf_my_fed_id, (unsigned char*)&(send_buffer[1])); - send_buffer[1 + sizeof(uint16_t)] = _fed.is_transient ? 1 : 0; - unsigned char federation_id_length = (unsigned char)strnlen(federation_metadata.federation_id, 255); - send_buffer[sizeof(uint16_t) + 2] = federation_id_length; - tracepoint_federate_to_federate(send_FED_ID, _lf_my_fed_id, remote_federate_id, NULL); - - write_to_socket_fail_on_error(&socket_id, buffer_length, send_buffer, NULL, - "Failed to send fed_id to transient federate %d.", remote_federate_id); - write_to_socket_fail_on_error(&socket_id, federation_id_length, - (unsigned char*)federation_metadata.federation_id, NULL, - "Failed to send federation id to transient federate %d.", remote_federate_id); - - unsigned char ack_buffer[1]; - read_from_socket_fail_on_error(&socket_id, 1, ack_buffer, - "Failed to read MSG_TYPE_ACK from transient federate %d.", remote_federate_id); - if (ack_buffer[0] != MSG_TYPE_ACK) { - read_from_socket_fail_on_error(&socket_id, 1, ack_buffer, - "Failed to read error code from transient federate %d.", remote_federate_id); - lf_print_error("Received MSG_TYPE_REJECT from transient federate %d (code %d).", remote_federate_id, - ack_buffer[0]); - result = -1; - lf_sleep(ADDRESS_QUERY_RETRY_INTERVAL); - continue; - } else { - lf_print("Connected to transient federate %d, port %hu.", remote_federate_id, uport); - tracepoint_federate_to_federate(receive_ACK, _lf_my_fed_id, remote_federate_id, NULL); - result = 0; - break; - } - } - - if (result == 0) { - _fed.sockets_for_outbound_p2p_connections[remote_federate_id] = socket_id; - } else { - shutdown_socket(&socket_id, false); - } - } - - LF_PRINT_LOG("Done handling outbound P2P connections to transient federates."); - return NULL; -} - void lf_latest_tag_confirmed(tag_t tag_to_send) { environment_t* env; if (lf_tag_compare(_fed.last_sent_LTC, tag_to_send) >= 0) { @@ -2813,6 +2672,12 @@ int lf_send_tagged_message(environment_t* env, interval_t additional_delay, int lf_print_error("lf_send_message: Unsupported message type (%d).", message_type); return -1; } +#if defined(FEDERATED_DECENTRALIZED) + if (_fed.transients[federate] && _fed.sockets_for_outbound_p2p_connections[federate] < 0) { + lf_print("The destination transient federate %d is not connected. Abort sending!", federate); + return 0; + } +#endif size_t buffer_head = 0; // First byte is the message type. diff --git a/include/core/federated/federate.h b/include/core/federated/federate.h index 7baf4c0b1..ebd78020d 100644 --- a/include/core/federated/federate.h +++ b/include/core/federated/federate.h @@ -124,13 +124,6 @@ typedef struct federate_instance_t { */ lf_thread_t inbound_p2p_handling_thread_id; - /** - * Thread ID for a thread that manages outbound P2P connections to transient federates. - * Transient federates may join and leave the federation, so connections to them - * are handled separately from persistent federates. - */ - lf_thread_t outbound_p2p_transients_handling_thread_id; - /** * A socket descriptor for the socket server of the federate. * This is assigned in lf_create_server(). @@ -364,20 +357,6 @@ void lf_enqueue_port_absent_reactions(environment_t* env); */ void* lf_handle_p2p_connections_from_federates(void* ignored); -/** - * @brief Thread that manages outbound P2P connections to transient federates. - * @ingroup Federated - * - * For each transient federate that this federate has an outbound connection to, - * this thread queries the RTI for its address and establishes the socket - * connection using the same handshake protocol as lf_connect_to_federate(). - * Unlike persistent federates, transient federates may not be present at - * startup, so connections to them are handled in a dedicated thread. - * - * @param env_arg Pointer to the environment (environment_t*). - */ -void* lf_handle_p2p_connections_to_transients(void* env_arg); - /** * @brief Send a latest tag confirmed (LTC) signal to the RTI. * @ingroup Federated From 91cda420a06597d9c4d262c0b6b9ee8db7e0f9f9 Mon Sep 17 00:00:00 2001 From: Chadlia Jerad Date: Thu, 2 Apr 2026 14:37:48 +0100 Subject: [PATCH 08/22] Remove overlooked message --- core/federated/RTI/rti_remote.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index 972102d14..b5d3a148e 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -1062,7 +1062,7 @@ static void send_start_tag_locked(federate_info_t* my_fed, instant_t federation_ // Notify my_fed of any upstream transient federates that are connected. // This has to occur before sending the start tag so that my_fed does not begin executing thinking that these // upstream federates are not connected. - lf_print_info("================================== number of immediate upstreams: %d of %d", my_fed->enclave.num_immediate_upstreams, my_fed->enclave.id); + my_fed->enclave.id); for (int i = 0; i < my_fed->enclave.num_immediate_upstreams; i++) { federate_info_t* fed = GET_FED_INFO(my_fed->enclave.immediate_upstreams[i]); if (fed->is_transient && fed->enclave.state == GRANTED) { From c2d9ecf79bb0d12e3bc3d1dfb9dab15cf85f5eef Mon Sep 17 00:00:00 2001 From: Chadlia Jerad Date: Thu, 2 Apr 2026 17:10:28 +0100 Subject: [PATCH 09/22] Update and optimize federates parameters regarding indound and outbound transients --- core/federated/RTI/rti_remote.c | 1 - core/federated/federate.c | 33 ++++++++++++++++++++++++++++--- include/core/federated/federate.h | 15 ++++++++++---- 3 files changed, 41 insertions(+), 8 deletions(-) diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index b5d3a148e..bfab18d0d 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -1062,7 +1062,6 @@ static void send_start_tag_locked(federate_info_t* my_fed, instant_t federation_ // Notify my_fed of any upstream transient federates that are connected. // This has to occur before sending the start tag so that my_fed does not begin executing thinking that these // upstream federates are not connected. - my_fed->enclave.id); for (int i = 0; i < my_fed->enclave.num_immediate_upstreams; i++) { federate_info_t* fed = GET_FED_INFO(my_fed->enclave.immediate_upstreams[i]); if (fed->is_transient && fed->enclave.state == GRANTED) { diff --git a/core/federated/federate.c b/core/federated/federate.c index b08d5f019..597fa6b47 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -2205,7 +2205,7 @@ void* lf_handle_p2p_connections_from_federates(void* env_arg) { while (!_lf_termination_executed) { // Case where all inbound connections are to persistent federates if (received_federates == _fed.number_of_inbound_p2p_connections && - _fed.number_of_inbound_p2p_connections_to_transients == 0) { + _fed.number_of_inbound_p2p_transients == 0) { break; } // Wait for an incoming connection request. @@ -2380,6 +2380,23 @@ int lf_send_message(int message_type, unsigned short port, unsigned short federa lf_print_error("lf_send_message: Unsupported message type (%d).", message_type); return -1; } + + // If there are outbound transients, check whether the destination is one of them. + // If it is and its socket is shut, gracefully skip the send. + if (_fed.number_of_outbound_p2p_transients > 0) { + bool is_outbound_transient = false; + for (size_t i = 0; i < _fed.number_of_outbound_p2p_transients; i++) { + if (_fed.outbound_p2p_transient_ids[i] == (uint16_t)federate) { + is_outbound_transient = true; + break; + } + } + if (is_outbound_transient && _fed.sockets_for_outbound_p2p_connections[federate] < 0) { + lf_print("The destination transient federate %d is not connected. Abort sending!", federate); + return 0; + } + } + header_buffer[0] = (unsigned char)message_type; // Next two bytes identify the destination port. // NOTE: Send messages little endian (network order), not big endian. @@ -2673,8 +2690,18 @@ int lf_send_tagged_message(environment_t* env, interval_t additional_delay, int return -1; } #if defined(FEDERATED_DECENTRALIZED) - if (_fed.transients[federate] && _fed.sockets_for_outbound_p2p_connections[federate] < 0) { - lf_print("The destination transient federate %d is not connected. Abort sending!", federate); + if (_fed.sockets_for_outbound_p2p_connections[federate] < 0) { + // Only print a warning if the destination is a known outbound transient. + bool is_outbound_transient = false; + for (size_t i = 0; i < _fed.number_of_outbound_p2p_transients; i++) { + if (_fed.outbound_p2p_transient_ids[i] == (uint16_t)federate) { + is_outbound_transient = true; + break; + } + } + if (is_outbound_transient) { + lf_print("The destination transient federate %d is not connected. Abort sending!", federate); + } return 0; } #endif diff --git a/include/core/federated/federate.h b/include/core/federated/federate.h index ebd78020d..e3d8c9caf 100644 --- a/include/core/federated/federate.h +++ b/include/core/federated/federate.h @@ -57,6 +57,11 @@ typedef struct federate_instance_t { */ size_t number_of_inbound_p2p_connections; + /** + * Number of inbound peer-to-peer connections from transient federates. + */ + size_t number_of_inbound_p2p_transients; + /** * Array of thread IDs for threads that listen for incoming messages. * This is NULL if there are none and otherwise has size given by @@ -72,14 +77,16 @@ typedef struct federate_instance_t { size_t number_of_outbound_p2p_connections; /** - * Number of inbound peer-to-peer connections from transient federates. + * Number of outbound peer-to-peer connections to transient federates. */ - size_t number_of_inbound_p2p_connections_to_transients; + size_t number_of_outbound_p2p_transients; /** - * Number of outbound peer-to-peer connections to transient federates. + * An array of IDs of transient federates to which this federate has outbound + * peer-to-peer connections. The array has size number_of_outbound_p2p_transients + * and is allocated at startup by the generated _lf_executable_preamble(). */ - size_t number_of_outbound_p2p_connections_to_transients; + uint16_t* outbound_p2p_transient_ids; /** * An array that holds the socket descriptors for inbound From 39798341e6acbd5684c772322d84c9c854e4e8dc Mon Sep 17 00:00:00 2001 From: Chadlia Jerad Date: Thu, 2 Apr 2026 21:48:48 +0100 Subject: [PATCH 10/22] Defer the P2P connection establishment until after the start-time handshake completes, avoiding the interleaved socket read that may cause a crash --- core/federated/federate.c | 44 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 41 insertions(+), 3 deletions(-) diff --git a/core/federated/federate.c b/core/federated/federate.c index 597fa6b47..0d2269d11 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -1039,6 +1039,19 @@ static instant_t get_start_time_from_rti(instant_t my_physical_time) { size_t buffer_length = (_fed.is_transient) ? MSG_TYPE_TIMESTAMP_TAG_LENGTH : MSG_TYPE_TIMESTAMP_LENGTH; unsigned char buffer[buffer_length]; + // Deferred DOWNSTREAM_CONNECTED notifications: calling lf_connect_to_federate() inline + // here is unsafe because the RTI may have already written MSG_TYPE_TIMESTAMP into this + // federate's TCP stream immediately after MSG_TYPE_DOWNSTREAM_CONNECTED (from a concurrent + // send_start_tag_locked call for the transient federate). If we call lf_connect_to_federate() + // now it will read from the socket expecting MSG_TYPE_ADDRESS_QUERY_REPLY but will instead + // consume the queued MSG_TYPE_TIMESTAMP bytes, causing a fatal "Unexpected reply of type 2". + // Fix: read and save each downstream federate ID, then call lf_connect_to_federate() for + // each one only after MSG_TYPE_TIMESTAMP has been received and the loop has exited. + uint16_t pending_downstream_ids[_fed.number_of_outbound_p2p_transients > 0 + ? _fed.number_of_outbound_p2p_transients + : 1]; + size_t num_pending_downstream = 0; + while (true) { read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, 1, buffer, NULL, "Failed to read MSG_TYPE_TIMESTAMP message from RTI."); @@ -1055,8 +1068,22 @@ static instant_t get_start_time_from_rti(instant_t my_physical_time) { handle_upstream_disconnected_message(); continue; } else if (buffer[0] == MSG_TYPE_DOWNSTREAM_CONNECTED) { - // We need to handle this message and continue waiting for MSG_TYPE_TIMESTAMP to arrive - handle_downstream_connected_message(); + // Defer lf_connect_to_federate() until after MSG_TYPE_TIMESTAMP is received. + // Read the federate ID payload now to drain the socket, but do not attempt the + // address query yet: the RTI may have written MSG_TYPE_TIMESTAMP into this socket + // right after MSG_TYPE_DOWNSTREAM_CONNECTED (from send_start_tag_locked running + // concurrently for the joining transient), so any read inside lf_connect_to_federate + // would consume those bytes and crash with "Unexpected reply of type 2". + unsigned char id_buf[sizeof(uint16_t)]; + read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, sizeof(uint16_t), id_buf, NULL, + "Failed to read downstream connected federate ID."); + tracepoint_federate_from_rti(receive_DOWNSTREAM_CONNECTED, _lf_my_fed_id, NULL); + uint16_t remote_federate_id = extract_uint16(id_buf); + LF_PRINT_DEBUG("Deferring P2P connection to downstream transient federate %d until after start time is received.", + remote_federate_id); + if (num_pending_downstream < _fed.number_of_outbound_p2p_transients) { + pending_downstream_ids[num_pending_downstream++] = remote_federate_id; + } continue; } else { lf_print_error_and_exit("Expected a MSG_TYPE_TIMESTAMP message from the RTI. Got %u (see net_common.h).", @@ -1087,6 +1114,16 @@ static instant_t get_start_time_from_rti(instant_t my_physical_time) { tracepoint_federate_from_rti(receive_TIMESTAMP, _lf_my_fed_id, &effective_start_tag); LF_PRINT_LOG("Current physical time is: " PRINTF_TIME ".", lf_time_physical()); + // Now that MSG_TYPE_TIMESTAMP has been received and the start time is known, it is safe + // to establish outbound P2P connections to any transient downstream federates that sent + // DOWNSTREAM_CONNECTED notifications while we were waiting. The ADDRESS_QUERY round-trip + // can proceed without risk of consuming queued TIMESTAMP bytes. + for (size_t i = 0; i < num_pending_downstream; i++) { + LF_PRINT_DEBUG("Establishing deferred P2P connection to downstream transient federate %d.", + pending_downstream_ids[i]); + lf_connect_to_federate(pending_downstream_ids[i], true); + } + return timestamp; } @@ -1907,13 +1944,14 @@ void lf_connect_to_federate(uint16_t remote_federate_id, bool is_transient) { LF_MUTEX_LOCK(&lf_outbound_socket_mutex); write_to_socket_fail_on_error(&_fed.socket_TCP_RTI, 1 + sizeof(uint16_t) + 1, buffer, &lf_outbound_socket_mutex, "Failed to send address query for federate %d to RTI.", remote_federate_id); - LF_MUTEX_UNLOCK(&lf_outbound_socket_mutex); // Read RTI's response. read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, sizeof(int32_t) + 1, buffer, "Failed to read the requested port number for federate %d from RTI.", remote_federate_id); + LF_MUTEX_UNLOCK(&lf_outbound_socket_mutex); + if (buffer[0] != MSG_TYPE_ADDRESS_QUERY_REPLY) { // Unexpected reply. Could be that RTI has failed and sent a resignation. if (buffer[0] == MSG_TYPE_FAILED) { From 8157729dada630c111ab52780c4e354159d163ab Mon Sep 17 00:00:00 2001 From: Chadlia Jerad Date: Thu, 2 Apr 2026 23:08:55 +0100 Subject: [PATCH 11/22] Add DOWNSTREAM_DISCONNECTED to silence spurious errors on transient departures --- core/federated/RTI/rti_remote.c | 38 ++++++++++++++++++++- core/federated/federate.c | 27 ++++++++++++++- include/core/federated/network/net_common.h | 9 +++++ trace/api/types/trace_types.h | 4 +++ util/tracing/visualization/fedsd.py | 5 ++- 5 files changed, 80 insertions(+), 3 deletions(-) diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index bfab18d0d..8118a93e5 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -305,7 +305,41 @@ static void send_downstream_connected_locked(federate_info_t* my_fed) { } /** - * @brief Mark a federate as disconnected and, if this is a transient, inform downstream federates. + * @brief Send MSG_TYPE_DOWNSTREAM_DISCONNECTED to the upstream federates of a transient federate. + * + * This notifies upstream federates that a transient federate downstream of them has + * disconnected, so they can close the outbound P2P connection to it. + * + * This function assumes that the mutex lock is already held. + * @param my_fed The transient federate that has just disconnected. + */ +static void send_downstream_disconnected_locked(federate_info_t* my_fed) { + unsigned char buffer[MSG_TYPE_DOWNSTREAM_DISCONNECTED_LENGTH]; + buffer[0] = MSG_TYPE_DOWNSTREAM_DISCONNECTED; + encode_uint16(my_fed->enclave.id, &buffer[1]); + // Iterate over all federates and notify those that have my_fed as an outbound transient. + for (int i = 0; i < rti_remote->base.number_of_scheduling_nodes; i++) { + federate_info_t* fed = GET_FED_INFO(i); + if (fed->enclave.state == NOT_CONNECTED) { + continue; + } + for (int32_t j = 0; j < fed->number_of_outbound_transients; j++) { + if (fed->outbound_transients[j] == (int32_t)my_fed->enclave.id) { + if (write_to_socket_close_on_error(&fed->socket, MSG_TYPE_DOWNSTREAM_DISCONNECTED_LENGTH, buffer)) { + lf_print_warning("RTI: Failed to send downstream disconnected message to federate %d.", fed->enclave.id); + } + if (rti_remote->base.tracing_enabled) { + tracepoint_rti_to_federate(send_DOWNSTREAM_DISCONNECTED, fed->enclave.id, NULL); + } + break; + } + } + } +} + +/** + * @brief Mark a federate as disconnected and, if this is a transient, inform downstream and + * inbound federates. * @param fed The disconnected federate. */ static void notify_federate_disconnected(federate_info_t* fed) { @@ -321,6 +355,8 @@ static void notify_federate_disconnected(federate_info_t* fed) { send_upstream_disconnected_locked(downstream, fed); } } + // Notify upstream federates that have fed in their list of outbound transients. + send_downstream_disconnected_locked(fed); LF_MUTEX_UNLOCK(&rti_mutex); } } diff --git a/core/federated/federate.c b/core/federated/federate.c index 0d2269d11..86edb1d84 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -1018,7 +1018,24 @@ static void handle_downstream_connected_message(void) { tracepoint_federate_from_rti(receive_DOWNSTREAM_CONNECTED, _lf_my_fed_id, NULL); LF_PRINT_DEBUG("Received notification that downstream transient federate %d has connected.", remote_federate_id); - lf_connect_to_federate(remote_federate_id, true); + lf_connect_to_federate(remote_federate_id, true); +} + +/** + * @brief Handle message from the RTI that a transient downstream federate has disconnected. + * + * Reads the downstream federate's ID and closes the outbound P2P socket to it. + */ +static void handle_downstream_disconnected_message(void) { + size_t bytes_to_read = sizeof(uint16_t); + unsigned char buffer[bytes_to_read]; + read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, bytes_to_read, buffer, NULL, + "Failed to read downstream disconnected message from RTI."); + uint16_t remote_federate_id = extract_uint16(buffer); + tracepoint_federate_from_rti(receive_DOWNSTREAM_DISCONNECTED, _lf_my_fed_id, NULL); + LF_PRINT_DEBUG("Received notification that downstream transient federate %d has disconnected.", remote_federate_id); + + shutdown_socket(&_fed.sockets_for_outbound_p2p_connections[remote_federate_id], false); } /** @@ -1067,6 +1084,11 @@ static instant_t get_start_time_from_rti(instant_t my_physical_time) { // We need to handle this message and continue waiting for MSG_TYPE_TIMESTAMP to arrive handle_upstream_disconnected_message(); continue; + } else if (buffer[0] == MSG_TYPE_DOWNSTREAM_DISCONNECTED) { + // A transient downstream disconnected before we even got our start time. + // Drain the federate ID payload and continue waiting for MSG_TYPE_TIMESTAMP. + handle_downstream_disconnected_message(); + continue; } else if (buffer[0] == MSG_TYPE_DOWNSTREAM_CONNECTED) { // Defer lf_connect_to_federate() until after MSG_TYPE_TIMESTAMP is received. // Read the federate ID payload now to drain the socket, but do not attempt the @@ -1782,6 +1804,9 @@ static void* listen_to_rti_TCP(void* args) { case MSG_TYPE_DOWNSTREAM_CONNECTED: handle_downstream_connected_message(); break; + case MSG_TYPE_DOWNSTREAM_DISCONNECTED: + handle_downstream_disconnected_message(); + break; case MSG_TYPE_CLOCK_SYNC_T1: case MSG_TYPE_CLOCK_SYNC_T4: lf_print_error("Federate %d received unexpected clock sync message from RTI on TCP socket.", _lf_my_fed_id); diff --git a/include/core/federated/network/net_common.h b/include/core/federated/network/net_common.h index f016c963d..2702a21f0 100644 --- a/include/core/federated/network/net_common.h +++ b/include/core/federated/network/net_common.h @@ -786,6 +786,15 @@ #define MSG_TYPE_DOWNSTREAM_CONNECTED 30 #define MSG_TYPE_DOWNSTREAM_CONNECTED_LENGTH (1 + sizeof(uint16_t)) +/** + * A message that informs an upstream federate that a transient federate downstream of it + * has disconnected. The next 2 bytes are the federate ID of the downstream federate. + * Upon receiving this, the upstream federate should close its outbound P2P connection + * to the downstream. + */ +#define MSG_TYPE_DOWNSTREAM_DISCONNECTED 31 +#define MSG_TYPE_DOWNSTREAM_DISCONNECTED_LENGTH (1 + sizeof(uint16_t)) + /** * Byte sent by the RTI ordering the federate to stop. Upon receiving the message, * the federate will call lf_stop(), which will make it resign at its current_tag diff --git a/trace/api/types/trace_types.h b/trace/api/types/trace_types.h index 262418799..fc8a0e4ea 100644 --- a/trace/api/types/trace_types.h +++ b/trace/api/types/trace_types.h @@ -87,6 +87,8 @@ typedef enum { // New entries must be added here, at the end, to avoid shifting existing indices. send_DOWNSTREAM_CONNECTED, receive_DOWNSTREAM_CONNECTED, + send_DOWNSTREAM_DISCONNECTED, + receive_DOWNSTREAM_DISCONNECTED, NUM_EVENT_TYPES } trace_event_t; @@ -164,6 +166,8 @@ static const char* trace_event_names[] = { // New entries appended at the end to avoid shifting existing indices. "Sending DOWNSTREAM_CONNECTED", "Receiving DOWNSTREAM_CONNECTED", + "Sending DOWNSTREAM_DISCONNECTED", + "Receiving DOWNSTREAM_DISCONNECTED", }; static inline void _suppress_unused_variable_warning_for_static_variable() { (void)trace_event_names; } diff --git a/util/tracing/visualization/fedsd.py b/util/tracing/visualization/fedsd.py index 45a904135..9e2083471 100644 --- a/util/tracing/visualization/fedsd.py +++ b/util/tracing/visualization/fedsd.py @@ -33,6 +33,7 @@ .UPSTREAM_CONNECTED { stroke: #f4a261; fill: #f4a261; } \ .UPSTREAM_DISCONNECTED { stroke: #e76f51; fill: #e76f51; } \ .DOWNSTREAM_CONNECTED { stroke: #2a9d8f; fill: #2a9d8f; } \ + .DOWNSTREAM_DISCONNECTED { stroke: #264653; fill: #264653; } \ .ACK { stroke: #52b788; fill: #52b788; } \ .FAILED { stroke: #c1121f; fill: #c1121f; } \ .STOP {stroke: #d0b7eb; fill: #d0b7eb} \ @@ -72,6 +73,7 @@ "Sending UPSTREAM_CONNECTED": "UPSTREAM_CONNECTED", "Sending UPSTREAM_DISCONNECTED": "UPSTREAM_DISCONNECTED", "Sending DOWNSTREAM_CONNECTED": "DOWNSTREAM_CONNECTED", + "Sending DOWNSTREAM_DISCONNECTED": "DOWNSTREAM_DISCONNECTED", "Sending PTAG": "PTAG", "Sending TAG": "TAG", "Sending REJECT": "REJECT", @@ -98,6 +100,7 @@ "Receiving UPSTREAM_CONNECTED": "UPSTREAM_CONNECTED", "Receiving UPSTREAM_DISCONNECTED": "UPSTREAM_DISCONNECTED", "Receiving DOWNSTREAM_CONNECTED": "DOWNSTREAM_CONNECTED", + "Receiving DOWNSTREAM_DISCONNECTED": "DOWNSTREAM_DISCONNECTED", "Receiving PTAG": "PTAG", "Receiving TAG": "TAG", "Receiving REJECT": "REJECT", @@ -159,7 +162,7 @@ def format_actor_name(name): # Events matching at the sender and receiver ends depend on whether they are tagged # (the elapsed logical time and microstep have to be the same) or not. # Set of non-tagged events (messages) -non_tagged_messages = {'FED_ID', 'UPSTREAM_CONNECTED', 'UPSTREAM_DISCONNECTED', 'DOWNSTREAM_CONNECTED', +non_tagged_messages = {'FED_ID', 'UPSTREAM_CONNECTED', 'UPSTREAM_DISCONNECTED', 'DOWNSTREAM_CONNECTED', 'DOWNSTREAM_DISCONNECTED', 'ACK', 'RESIGN', 'FAILED', 'REJECT', 'ADR_QR', 'ADR_QR_REP', 'ADR_AD', 'MSG', 'P2P_MSG', 'STOP'} From ce32505a8b1f8474248a0a3093a8856de383bb00 Mon Sep 17 00:00:00 2001 From: Chadlia Jerad Date: Fri, 3 Apr 2026 00:09:21 +0100 Subject: [PATCH 12/22] Fix 'Attempt to update to earlier tag' in decentralised coordination --- core/federated/federate.c | 18 +++++++++++++++--- include/core/federated/federate.h | 9 +++++++++ 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/core/federated/federate.c b/core/federated/federate.c index 86edb1d84..5b4657a9c 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -311,16 +311,27 @@ static void update_last_known_status_on_input_port(environment_t* env, tag_t tag */ static void mark_inputs_known_absent(int fed_id) { #ifdef FEDERATED_DECENTRALIZED - // Note that when transient federates are supported, this will need to be updated because the - // federate could rejoin. environment_t* env; _lf_get_environments(&env); LF_MUTEX_LOCK(&env->mutex); + // For a persistent federate, use FOREVER_TAG: it will never send again, so + // all its input ports can be permanently marked absent. + // For a transient federate, use env->current_tag instead: the federate may + // rejoin later, and stamping FOREVER_TAG would permanently block its ports + // from being updated after reconnection, causing spurious + // "Attempt to update to earlier tag" warnings and downstream STP violations. + // env->current_tag is the right choice because update_last_known_status_on_input_port + // clamps upward (if tag < current_tag, it uses current_tag anyway), so passing + // current_tag is equivalent to "absent at the current logical time" — sufficient + // to unblock the scheduler, but small enough that any future message from the + // rejoining federate at a tag >= current_tag will update the port normally. + tag_t absent_until = _fed.inbound_p2p_is_transient[fed_id] ? env->current_tag : FOREVER_TAG; + for (size_t i = 0; i < _lf_action_table_size; i++) { lf_action_base_t* action = _lf_action_table[i]; if (action->source_id == fed_id) { - update_last_known_status_on_input_port(env, FOREVER_TAG, i, true); + update_last_known_status_on_input_port(env, absent_until, i, true); } } LF_MUTEX_UNLOCK(&env->mutex); @@ -2321,6 +2332,7 @@ void* lf_handle_p2p_connections_from_federates(void* env_arg) { // Extract the ID of the sending federate. uint16_t remote_fed_id = extract_uint16((unsigned char*)&(buffer[1])); bool remote_fed_is_transient = buffer[1 + sizeof(uint16_t)]; + _fed.inbound_p2p_is_transient[remote_fed_id] = remote_fed_is_transient; if (remote_fed_is_transient) { LF_PRINT_DEBUG("Received sending federate ID %d, which is transient.", remote_fed_id); } else { diff --git a/include/core/federated/federate.h b/include/core/federated/federate.h index e3d8c9caf..95f378a60 100644 --- a/include/core/federated/federate.h +++ b/include/core/federated/federate.h @@ -103,6 +103,15 @@ typedef struct federate_instance_t { */ int sockets_for_inbound_p2p_connections[NUMBER_OF_FEDERATES]; + /** + * An array indexed by federate ID indicating whether the corresponding + * inbound peer-to-peer federate is transient. Initialized to false. + * Set in lf_handle_p2p_connections_from_federates() when the handshake + * reveals the remote federate's type. Used by mark_inputs_known_absent() + * to avoid permanently stamping FOREVER_TAG on ports whose source may rejoin. + */ + bool inbound_p2p_is_transient[NUMBER_OF_FEDERATES]; + /** * An array that holds the socket descriptors for outbound direct * connections to each remote federate. The index will be the federate From 94ddec9acb0e454a34bb173fd6a6fc57e34f6fa8 Mon Sep 17 00:00:00 2001 From: Chadlia Jerad Date: Fri, 3 Apr 2026 00:30:37 +0100 Subject: [PATCH 13/22] Fix P2P_MSG visualization in fedsd --- util/tracing/visualization/fedsd.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/util/tracing/visualization/fedsd.py b/util/tracing/visualization/fedsd.py index 9e2083471..07bf8a5bd 100644 --- a/util/tracing/visualization/fedsd.py +++ b/util/tracing/visualization/fedsd.py @@ -658,6 +658,25 @@ def get_and_convert_lft_files(rti_lft_file, federates_lft_files, start_time, end (trace_df['logical_time'] == logical_time) & \ (trace_df['microstep'] == microstep) \ ] + elif (event == 'P2P_MSG'): + # P2P messages travel directly between federates without going through the + # RTI, so partner_id in the trace is typically -1 on both sides (the RTI is + # not involved and the tracepoint has no partner). We therefore cannot use + # partner_id for matching. Instead we match each 'out' to the first pending + # 'in' whose physical_time >= the sender's physical_time (causality guarantee: + # the receive cannot precede the send). + physical_time = trace_df.at[index, 'physical_time'] + if (inout == 'out'): + matching_df = trace_df[\ + (trace_df['inout'] == 'in') & \ + (trace_df['arrow'] == 'pending') & \ + (trace_df['event'] == event) & \ + (trace_df['physical_time'] >= physical_time) \ + ] + else: + # 'in' rows are claimed by the corresponding 'out' pass above. + # If we reach an 'in' here it means no 'out' claimed it; render as dot. + matching_df = trace_df[0:0] else : matching_df = trace_df[\ (trace_df['inout'] != inout) & \ From a130de2db4ef39a98128c8d56e5e63abfcbec422 Mon Sep 17 00:00:00 2001 From: Chadlia Jerad Date: Fri, 3 Apr 2026 00:39:16 +0100 Subject: [PATCH 14/22] During shutdowng, remove the WARNING: Failed to accept the socket. Invalid argument. --- core/federated/network/socket_common.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/federated/network/socket_common.c b/core/federated/network/socket_common.c index 4245196b4..666ea8a71 100644 --- a/core/federated/network/socket_common.c +++ b/core/federated/network/socket_common.c @@ -192,7 +192,10 @@ int accept_socket(int socket, int rti_socket) { // Got a socket break; } else if (socket_id < 0 && (errno != EAGAIN || errno != EWOULDBLOCK || errno != EINTR)) { - if (errno != ECONNABORTED) { + // ECONNABORTED: a connection was aborted before accept() could complete — not fatal. + // EINVAL: the socket was shut down (e.g., shutdown_socket() was called to unblock this + // accept() intentionally when the RTI is shutting down) — expected, not an error. + if (errno != ECONNABORTED && errno != EINVAL) { lf_print_warning("Failed to accept the socket. %s.", strerror(errno)); } break; From 2669f91c4eda072a8841f2e68c43cd11b8476a72 Mon Sep 17 00:00:00 2001 From: Chadlia Jerad Date: Sat, 4 Apr 2026 00:06:54 +0100 Subject: [PATCH 15/22] Further tune mark_inputs_known_absent and simpler test on p2p transient connections --- core/federated/federate.c | 40 ++++++++++--------------------- include/core/federated/federate.h | 2 +- 2 files changed, 13 insertions(+), 29 deletions(-) diff --git a/core/federated/federate.c b/core/federated/federate.c index 5b4657a9c..b98c0bfce 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -326,12 +326,16 @@ static void mark_inputs_known_absent(int fed_id) { // current_tag is equivalent to "absent at the current logical time" — sufficient // to unblock the scheduler, but small enough that any future message from the // rejoining federate at a tag >= current_tag will update the port normally. - tag_t absent_until = _fed.inbound_p2p_is_transient[fed_id] ? env->current_tag : FOREVER_TAG; + bool is_transient = _fed.inbound_p2p_is_transient[fed_id]; + tag_t absent_until = is_transient ? env->current_tag : FOREVER_TAG; for (size_t i = 0; i < _lf_action_table_size; i++) { lf_action_base_t* action = _lf_action_table[i]; if (action->source_id == fed_id) { - update_last_known_status_on_input_port(env, absent_until, i, true); + // For transients, pass warn=false: the port's last_known_status_tag may already be + // ahead of current_tag (advanced by a prior message or the STAA thread), so the + // update will be a no-op. That is correct and expected — no warning needed. + update_last_known_status_on_input_port(env, absent_until, i, !is_transient); } } LF_MUTEX_UNLOCK(&env->mutex); @@ -2458,18 +2462,9 @@ int lf_send_message(int message_type, unsigned short port, unsigned short federa // If there are outbound transients, check whether the destination is one of them. // If it is and its socket is shut, gracefully skip the send. - if (_fed.number_of_outbound_p2p_transients > 0) { - bool is_outbound_transient = false; - for (size_t i = 0; i < _fed.number_of_outbound_p2p_transients; i++) { - if (_fed.outbound_p2p_transient_ids[i] == (uint16_t)federate) { - is_outbound_transient = true; - break; - } - } - if (is_outbound_transient && _fed.sockets_for_outbound_p2p_connections[federate] < 0) { - lf_print("The destination transient federate %d is not connected. Abort sending!", federate); - return 0; - } + if (_fed.outbound_p2p_connection_is_transient[federate] && _fed.sockets_for_outbound_p2p_connections[federate] < 0) { + lf_print_info("The destination transient federate %d is not connected. Abort sending!", federate); + return 0; } header_buffer[0] = (unsigned char)message_type; @@ -2764,22 +2759,11 @@ int lf_send_tagged_message(environment_t* env, interval_t additional_delay, int lf_print_error("lf_send_message: Unsupported message type (%d).", message_type); return -1; } -#if defined(FEDERATED_DECENTRALIZED) - if (_fed.sockets_for_outbound_p2p_connections[federate] < 0) { + if (_fed.outbound_p2p_connection_is_transient[federate] && _fed.sockets_for_outbound_p2p_connections[federate] < 0) { // Only print a warning if the destination is a known outbound transient. - bool is_outbound_transient = false; - for (size_t i = 0; i < _fed.number_of_outbound_p2p_transients; i++) { - if (_fed.outbound_p2p_transient_ids[i] == (uint16_t)federate) { - is_outbound_transient = true; - break; - } - } - if (is_outbound_transient) { - lf_print("The destination transient federate %d is not connected. Abort sending!", federate); - } + lf_print_info("The destination transient federate %d is not connected. Abort sending!", federate); return 0; - } -#endif + } size_t buffer_head = 0; // First byte is the message type. diff --git a/include/core/federated/federate.h b/include/core/federated/federate.h index 95f378a60..aa9390aa1 100644 --- a/include/core/federated/federate.h +++ b/include/core/federated/federate.h @@ -86,7 +86,7 @@ typedef struct federate_instance_t { * peer-to-peer connections. The array has size number_of_outbound_p2p_transients * and is allocated at startup by the generated _lf_executable_preamble(). */ - uint16_t* outbound_p2p_transient_ids; + bool outbound_p2p_connection_is_transient[NUMBER_OF_FEDERATES]; /** * An array that holds the socket descriptors for inbound From 2b8da8fc1a512251519b18a0d6b6710b1b7c92ab Mon Sep 17 00:00:00 2001 From: Chadlia Jerad Date: Sat, 4 Apr 2026 12:07:20 +0100 Subject: [PATCH 16/22] Better names in case of decentralized coordination: outbound instead of downstream and inbound instead of upstream --- core/federated/RTI/rti_remote.c | 36 +++++++-------- core/federated/federate.c | 51 +++++++++++---------- include/core/federated/federate.h | 4 +- include/core/federated/network/net_common.h | 8 ++-- trace/api/types/trace_types.h | 16 +++---- util/tracing/visualization/fedsd.py | 14 +++--- 6 files changed, 65 insertions(+), 64 deletions(-) diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index 8118a93e5..9437c2e13 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -271,18 +271,18 @@ static void send_upstream_disconnected_locked(federate_info_t* destination, fede } /** - * @brief Send MSG_TYPE_DOWNSTREAM_CONNECTED to the specified upstream federate. + * @brief Send MSG_TYPE_OUTBOUND_CONNECTED to the specified inbound federate. * - * This notifies an upstream federate that a transient federate downstream of it has - * (re-)connected, so the upstream should query the RTI for its address and establish + * This notifies inbound federates that are transient outbound federates of that it has + * (re-)connected, so the inbound should query the RTI for its address and establish * (or re-establish) the outbound P2P connection. * * This function assumes that the mutex lock is already held. * @param my_fed The transient federate that has just connected. */ -static void send_downstream_connected_locked(federate_info_t* my_fed) { - unsigned char buffer[MSG_TYPE_DOWNSTREAM_CONNECTED_LENGTH]; - buffer[0] = MSG_TYPE_DOWNSTREAM_CONNECTED; +static void send_outbound_connected_locked(federate_info_t* my_fed) { + unsigned char buffer[MSG_TYPE_OUTBOUND_CONNECTED_LENGTH]; + buffer[0] = MSG_TYPE_OUTBOUND_CONNECTED; encode_uint16(my_fed->enclave.id, &buffer[1]); // Iterate over all federates and notify those that have my_fed as an outbound transient. for (int i = 0; i < rti_remote->base.number_of_scheduling_nodes; i++) { @@ -292,11 +292,11 @@ static void send_downstream_connected_locked(federate_info_t* my_fed) { } for (int32_t j = 0; j < fed->number_of_outbound_transients; j++) { if (fed->outbound_transients[j] == (int32_t)my_fed->enclave.id) { - if (write_to_socket_close_on_error(&fed->socket, MSG_TYPE_DOWNSTREAM_CONNECTED_LENGTH, buffer)) { - lf_print_warning("RTI: Failed to send downstream connected message to federate %d.", fed->enclave.id); + if (write_to_socket_close_on_error(&fed->socket, MSG_TYPE_OUTBOUND_CONNECTED_LENGTH, buffer)) { + lf_print_warning("RTI: Failed to send outbound connected message to federate %d.", fed->enclave.id); } if (rti_remote->base.tracing_enabled) { - tracepoint_rti_to_federate(send_DOWNSTREAM_CONNECTED, fed->enclave.id, NULL); + tracepoint_rti_to_federate(send_OUTBOUND_CONNECTED, fed->enclave.id, NULL); } break; } @@ -305,7 +305,7 @@ static void send_downstream_connected_locked(federate_info_t* my_fed) { } /** - * @brief Send MSG_TYPE_DOWNSTREAM_DISCONNECTED to the upstream federates of a transient federate. + * @brief Send MSG_TYPE_OUTBOUND_DISCONNECTED to the upstream federates of a transient federate. * * This notifies upstream federates that a transient federate downstream of them has * disconnected, so they can close the outbound P2P connection to it. @@ -313,9 +313,9 @@ static void send_downstream_connected_locked(federate_info_t* my_fed) { * This function assumes that the mutex lock is already held. * @param my_fed The transient federate that has just disconnected. */ -static void send_downstream_disconnected_locked(federate_info_t* my_fed) { - unsigned char buffer[MSG_TYPE_DOWNSTREAM_DISCONNECTED_LENGTH]; - buffer[0] = MSG_TYPE_DOWNSTREAM_DISCONNECTED; +static void send_outbound_disconnected_locked(federate_info_t* my_fed) { + unsigned char buffer[MSG_TYPE_OUTBOUND_DISCONNECTED_LENGTH]; + buffer[0] = MSG_TYPE_OUTBOUND_DISCONNECTED; encode_uint16(my_fed->enclave.id, &buffer[1]); // Iterate over all federates and notify those that have my_fed as an outbound transient. for (int i = 0; i < rti_remote->base.number_of_scheduling_nodes; i++) { @@ -325,11 +325,11 @@ static void send_downstream_disconnected_locked(federate_info_t* my_fed) { } for (int32_t j = 0; j < fed->number_of_outbound_transients; j++) { if (fed->outbound_transients[j] == (int32_t)my_fed->enclave.id) { - if (write_to_socket_close_on_error(&fed->socket, MSG_TYPE_DOWNSTREAM_DISCONNECTED_LENGTH, buffer)) { - lf_print_warning("RTI: Failed to send downstream disconnected message to federate %d.", fed->enclave.id); + if (write_to_socket_close_on_error(&fed->socket, MSG_TYPE_OUTBOUND_DISCONNECTED_LENGTH, buffer)) { + lf_print_warning("RTI: Failed to send outbound disconnected message to federate %d.", fed->enclave.id); } if (rti_remote->base.tracing_enabled) { - tracepoint_rti_to_federate(send_DOWNSTREAM_DISCONNECTED, fed->enclave.id, NULL); + tracepoint_rti_to_federate(send_OUTBOUND_DISCONNECTED, fed->enclave.id, NULL); } break; } @@ -356,7 +356,7 @@ static void notify_federate_disconnected(federate_info_t* fed) { } } // Notify upstream federates that have fed in their list of outbound transients. - send_downstream_disconnected_locked(fed); + send_outbound_disconnected_locked(fed); LF_MUTEX_UNLOCK(&rti_mutex); } } @@ -1104,7 +1104,7 @@ static void send_start_tag_locked(federate_info_t* my_fed, instant_t federation_ send_upstream_connected_locked(my_fed, fed); } } - send_downstream_connected_locked(my_fed); + send_outbound_connected_locked(my_fed); // Send back to the federate the maximum time plus an offset on a TIMESTAMP_START // message. diff --git a/core/federated/federate.c b/core/federated/federate.c index b98c0bfce..fbdde1b76 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -326,7 +326,7 @@ static void mark_inputs_known_absent(int fed_id) { // current_tag is equivalent to "absent at the current logical time" — sufficient // to unblock the scheduler, but small enough that any future message from the // rejoining federate at a tag >= current_tag will update the port normally. - bool is_transient = _fed.inbound_p2p_is_transient[fed_id]; + bool is_transient = _fed.inbound_p2p_connection_is_transient[fed_id]; tag_t absent_until = is_transient ? env->current_tag : FOREVER_TAG; for (size_t i = 0; i < _lf_action_table_size; i++) { @@ -1017,22 +1017,23 @@ static void handle_upstream_disconnected_message(void) { } /** - * @brief Handle message from the RTI that a transient downstream federate has connected. + * @brief Handle message from the RTI that a transient outbound federate has connected. * - * Reads the downstream federate's ID, then synchronously queries the RTI for its address + * Reads the outbound federate's ID, then synchronously queries the RTI for its address * and establishes (or re-establishes) the outbound P2P connection to it. * This function is called inline from listen_to_rti_TCP or get_start_time_from_rti, * so it reads the address-query reply directly from socket_TCP_RTI. */ -static void handle_downstream_connected_message(void) { +static void handle_outbound_connected_message(void) { size_t bytes_to_read = sizeof(uint16_t); unsigned char buffer[bytes_to_read]; read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, bytes_to_read, buffer, NULL, - "Failed to read downstream connected message from RTI."); + "Failed to read outbound connected message from RTI."); uint16_t remote_federate_id = extract_uint16(buffer); - tracepoint_federate_from_rti(receive_DOWNSTREAM_CONNECTED, _lf_my_fed_id, NULL); - LF_PRINT_DEBUG("Received notification that downstream transient federate %d has connected.", remote_federate_id); + tracepoint_federate_from_rti(receive_OUTBOUND_CONNECTED, _lf_my_fed_id, NULL); + LF_PRINT_DEBUG("Received notification that outbound transient federate %d has connected.", remote_federate_id); + // lf_connect_to_federate(remote_federate_id, true); } @@ -1041,13 +1042,13 @@ static void handle_downstream_connected_message(void) { * * Reads the downstream federate's ID and closes the outbound P2P socket to it. */ -static void handle_downstream_disconnected_message(void) { +static void handle_outbound_disconnected_message(void) { size_t bytes_to_read = sizeof(uint16_t); unsigned char buffer[bytes_to_read]; read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, bytes_to_read, buffer, NULL, - "Failed to read downstream disconnected message from RTI."); + "Failed to read outbound disconnected message from RTI."); uint16_t remote_federate_id = extract_uint16(buffer); - tracepoint_federate_from_rti(receive_DOWNSTREAM_DISCONNECTED, _lf_my_fed_id, NULL); + tracepoint_federate_from_rti(receive_OUTBOUND_DISCONNECTED, _lf_my_fed_id, NULL); LF_PRINT_DEBUG("Received notification that downstream transient federate %d has disconnected.", remote_federate_id); shutdown_socket(&_fed.sockets_for_outbound_p2p_connections[remote_federate_id], false); @@ -1071,9 +1072,9 @@ static instant_t get_start_time_from_rti(instant_t my_physical_time) { size_t buffer_length = (_fed.is_transient) ? MSG_TYPE_TIMESTAMP_TAG_LENGTH : MSG_TYPE_TIMESTAMP_LENGTH; unsigned char buffer[buffer_length]; - // Deferred DOWNSTREAM_CONNECTED notifications: calling lf_connect_to_federate() inline + // Deferred OUTBOUND_CONNECTED notifications: calling lf_connect_to_federate() inline // here is unsafe because the RTI may have already written MSG_TYPE_TIMESTAMP into this - // federate's TCP stream immediately after MSG_TYPE_DOWNSTREAM_CONNECTED (from a concurrent + // federate's TCP stream immediately after MSG_TYPE_OUTBOUND_CONNECTED (from a concurrent // send_start_tag_locked call for the transient federate). If we call lf_connect_to_federate() // now it will read from the socket expecting MSG_TYPE_ADDRESS_QUERY_REPLY but will instead // consume the queued MSG_TYPE_TIMESTAMP bytes, causing a fatal "Unexpected reply of type 2". @@ -1099,22 +1100,22 @@ static instant_t get_start_time_from_rti(instant_t my_physical_time) { // We need to handle this message and continue waiting for MSG_TYPE_TIMESTAMP to arrive handle_upstream_disconnected_message(); continue; - } else if (buffer[0] == MSG_TYPE_DOWNSTREAM_DISCONNECTED) { - // A transient downstream disconnected before we even got our start time. + } else if (buffer[0] == MSG_TYPE_OUTBOUND_DISCONNECTED) { + // A transient outbound federate disconnected before we even got our start time. // Drain the federate ID payload and continue waiting for MSG_TYPE_TIMESTAMP. - handle_downstream_disconnected_message(); + handle_outbound_disconnected_message(); continue; - } else if (buffer[0] == MSG_TYPE_DOWNSTREAM_CONNECTED) { + } else if (buffer[0] == MSG_TYPE_OUTBOUND_CONNECTED) { // Defer lf_connect_to_federate() until after MSG_TYPE_TIMESTAMP is received. // Read the federate ID payload now to drain the socket, but do not attempt the // address query yet: the RTI may have written MSG_TYPE_TIMESTAMP into this socket - // right after MSG_TYPE_DOWNSTREAM_CONNECTED (from send_start_tag_locked running + // right after MSG_TYPE_OUTBOUND_CONNECTED (from send_start_tag_locked running // concurrently for the joining transient), so any read inside lf_connect_to_federate // would consume those bytes and crash with "Unexpected reply of type 2". unsigned char id_buf[sizeof(uint16_t)]; read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, sizeof(uint16_t), id_buf, NULL, - "Failed to read downstream connected federate ID."); - tracepoint_federate_from_rti(receive_DOWNSTREAM_CONNECTED, _lf_my_fed_id, NULL); + "Failed to read outbound connected federate ID."); + tracepoint_federate_from_rti(receive_OUTBOUND_CONNECTED, _lf_my_fed_id, NULL); uint16_t remote_federate_id = extract_uint16(id_buf); LF_PRINT_DEBUG("Deferring P2P connection to downstream transient federate %d until after start time is received.", remote_federate_id); @@ -1153,7 +1154,7 @@ static instant_t get_start_time_from_rti(instant_t my_physical_time) { // Now that MSG_TYPE_TIMESTAMP has been received and the start time is known, it is safe // to establish outbound P2P connections to any transient downstream federates that sent - // DOWNSTREAM_CONNECTED notifications while we were waiting. The ADDRESS_QUERY round-trip + // OUTBOUND_CONNECTED notifications while we were waiting. The ADDRESS_QUERY round-trip // can proceed without risk of consuming queued TIMESTAMP bytes. for (size_t i = 0; i < num_pending_downstream; i++) { LF_PRINT_DEBUG("Establishing deferred P2P connection to downstream transient federate %d.", @@ -1816,11 +1817,11 @@ static void* listen_to_rti_TCP(void* args) { case MSG_TYPE_UPSTREAM_DISCONNECTED: handle_upstream_disconnected_message(); break; - case MSG_TYPE_DOWNSTREAM_CONNECTED: - handle_downstream_connected_message(); + case MSG_TYPE_OUTBOUND_CONNECTED: + handle_outbound_connected_message(); break; - case MSG_TYPE_DOWNSTREAM_DISCONNECTED: - handle_downstream_disconnected_message(); + case MSG_TYPE_OUTBOUND_DISCONNECTED: + handle_outbound_disconnected_message(); break; case MSG_TYPE_CLOCK_SYNC_T1: case MSG_TYPE_CLOCK_SYNC_T4: @@ -2336,7 +2337,7 @@ void* lf_handle_p2p_connections_from_federates(void* env_arg) { // Extract the ID of the sending federate. uint16_t remote_fed_id = extract_uint16((unsigned char*)&(buffer[1])); bool remote_fed_is_transient = buffer[1 + sizeof(uint16_t)]; - _fed.inbound_p2p_is_transient[remote_fed_id] = remote_fed_is_transient; + _fed.inbound_p2p_connection_is_transient[remote_fed_id] = remote_fed_is_transient; if (remote_fed_is_transient) { LF_PRINT_DEBUG("Received sending federate ID %d, which is transient.", remote_fed_id); } else { diff --git a/include/core/federated/federate.h b/include/core/federated/federate.h index aa9390aa1..5596c29f8 100644 --- a/include/core/federated/federate.h +++ b/include/core/federated/federate.h @@ -110,7 +110,7 @@ typedef struct federate_instance_t { * reveals the remote federate's type. Used by mark_inputs_known_absent() * to avoid permanently stamping FOREVER_TAG on ports whose source may rejoin. */ - bool inbound_p2p_is_transient[NUMBER_OF_FEDERATES]; + bool inbound_p2p_connection_is_transient[NUMBER_OF_FEDERATES]; /** * An array that holds the socket descriptors for outbound direct @@ -295,7 +295,7 @@ extern lf_cond_t lf_port_status_changed; // Public functions (in alphabetical order) /** - * @brief Connect to the federate with the specified id. + * @brief Connect to the federate with the specified id, based if it is transient or not. * @ingroup Federated * * The established connection will then be used in functions such as lf_send_tagged_message() diff --git a/include/core/federated/network/net_common.h b/include/core/federated/network/net_common.h index 2702a21f0..73360dc2c 100644 --- a/include/core/federated/network/net_common.h +++ b/include/core/federated/network/net_common.h @@ -783,8 +783,8 @@ * Upon receiving this, the upstream federate should query the RTI for the downstream's * address and establish (or re-establish) the outbound P2P connection. */ -#define MSG_TYPE_DOWNSTREAM_CONNECTED 30 -#define MSG_TYPE_DOWNSTREAM_CONNECTED_LENGTH (1 + sizeof(uint16_t)) +#define MSG_TYPE_OUTBOUND_CONNECTED 30 +#define MSG_TYPE_OUTBOUND_CONNECTED_LENGTH (1 + sizeof(uint16_t)) /** * A message that informs an upstream federate that a transient federate downstream of it @@ -792,8 +792,8 @@ * Upon receiving this, the upstream federate should close its outbound P2P connection * to the downstream. */ -#define MSG_TYPE_DOWNSTREAM_DISCONNECTED 31 -#define MSG_TYPE_DOWNSTREAM_DISCONNECTED_LENGTH (1 + sizeof(uint16_t)) +#define MSG_TYPE_OUTBOUND_DISCONNECTED 31 +#define MSG_TYPE_OUTBOUND_DISCONNECTED_LENGTH (1 + sizeof(uint16_t)) /** * Byte sent by the RTI ordering the federate to stop. Upon receiving the message, diff --git a/trace/api/types/trace_types.h b/trace/api/types/trace_types.h index fc8a0e4ea..c9e70f789 100644 --- a/trace/api/types/trace_types.h +++ b/trace/api/types/trace_types.h @@ -85,10 +85,10 @@ typedef enum { send_STOP, receive_STOP, // New entries must be added here, at the end, to avoid shifting existing indices. - send_DOWNSTREAM_CONNECTED, - receive_DOWNSTREAM_CONNECTED, - send_DOWNSTREAM_DISCONNECTED, - receive_DOWNSTREAM_DISCONNECTED, + send_OUTBOUND_CONNECTED, + receive_OUTBOUND_CONNECTED, + send_OUTBOUND_DISCONNECTED, + receive_OUTBOUND_DISCONNECTED, NUM_EVENT_TYPES } trace_event_t; @@ -164,10 +164,10 @@ static const char* trace_event_names[] = { "Sending STOP", "Receiving STOP", // New entries appended at the end to avoid shifting existing indices. - "Sending DOWNSTREAM_CONNECTED", - "Receiving DOWNSTREAM_CONNECTED", - "Sending DOWNSTREAM_DISCONNECTED", - "Receiving DOWNSTREAM_DISCONNECTED", + "Sending OUTBOUND_CONNECTED", + "Receiving OUTBOUND_CONNECTED", + "Sending OUTBOUND_DISCONNECTED", + "Receiving OUTBOUND_DISCONNECTED", }; static inline void _suppress_unused_variable_warning_for_static_variable() { (void)trace_event_names; } diff --git a/util/tracing/visualization/fedsd.py b/util/tracing/visualization/fedsd.py index 07bf8a5bd..3798950fa 100644 --- a/util/tracing/visualization/fedsd.py +++ b/util/tracing/visualization/fedsd.py @@ -32,8 +32,8 @@ .FED_ID { stroke: #80DD99; fill: #80DD99; } \ .UPSTREAM_CONNECTED { stroke: #f4a261; fill: #f4a261; } \ .UPSTREAM_DISCONNECTED { stroke: #e76f51; fill: #e76f51; } \ - .DOWNSTREAM_CONNECTED { stroke: #2a9d8f; fill: #2a9d8f; } \ - .DOWNSTREAM_DISCONNECTED { stroke: #264653; fill: #264653; } \ + .OUTBOUND_CONNECTED { stroke: #2a9d8f; fill: #2a9d8f; } \ + .OUTBOUND_DISCONNECTED { stroke: #264653; fill: #264653; } \ .ACK { stroke: #52b788; fill: #52b788; } \ .FAILED { stroke: #c1121f; fill: #c1121f; } \ .STOP {stroke: #d0b7eb; fill: #d0b7eb} \ @@ -72,8 +72,8 @@ "Sending FED_ID": "FED_ID", "Sending UPSTREAM_CONNECTED": "UPSTREAM_CONNECTED", "Sending UPSTREAM_DISCONNECTED": "UPSTREAM_DISCONNECTED", - "Sending DOWNSTREAM_CONNECTED": "DOWNSTREAM_CONNECTED", - "Sending DOWNSTREAM_DISCONNECTED": "DOWNSTREAM_DISCONNECTED", + "Sending OUTBOUND_CONNECTED": "OUTBOUND_CONNECTED", + "Sending OUTBOUND_DISCONNECTED": "OUTBOUND_DISCONNECTED", "Sending PTAG": "PTAG", "Sending TAG": "TAG", "Sending REJECT": "REJECT", @@ -99,8 +99,8 @@ "Receiving FED_ID": "FED_ID", "Receiving UPSTREAM_CONNECTED": "UPSTREAM_CONNECTED", "Receiving UPSTREAM_DISCONNECTED": "UPSTREAM_DISCONNECTED", - "Receiving DOWNSTREAM_CONNECTED": "DOWNSTREAM_CONNECTED", - "Receiving DOWNSTREAM_DISCONNECTED": "DOWNSTREAM_DISCONNECTED", + "Receiving OUTBOUND_CONNECTED": "OUTBOUND_CONNECTED", + "Receiving OUTBOUND_DISCONNECTED": "OUTBOUND_DISCONNECTED", "Receiving PTAG": "PTAG", "Receiving TAG": "TAG", "Receiving REJECT": "REJECT", @@ -162,7 +162,7 @@ def format_actor_name(name): # Events matching at the sender and receiver ends depend on whether they are tagged # (the elapsed logical time and microstep have to be the same) or not. # Set of non-tagged events (messages) -non_tagged_messages = {'FED_ID', 'UPSTREAM_CONNECTED', 'UPSTREAM_DISCONNECTED', 'DOWNSTREAM_CONNECTED', 'DOWNSTREAM_DISCONNECTED', +non_tagged_messages = {'FED_ID', 'UPSTREAM_CONNECTED', 'UPSTREAM_DISCONNECTED', 'OUTBOUND_CONNECTED', 'OUTBOUND_DISCONNECTED', 'ACK', 'RESIGN', 'FAILED', 'REJECT', 'ADR_QR', 'ADR_QR_REP', 'ADR_AD', 'MSG', 'P2P_MSG', 'STOP'} From d372dd1e7ea7cc4f61294db781206002b2c6c60f Mon Sep 17 00:00:00 2001 From: Chadlia Jerad Date: Sat, 4 Apr 2026 11:21:44 +0100 Subject: [PATCH 17/22] Fix lingua-franca-ref.txt to point to transient-fed-dec --- lingua-franca-ref.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lingua-franca-ref.txt b/lingua-franca-ref.txt index 52199a147..17e20d967 100644 --- a/lingua-franca-ref.txt +++ b/lingua-franca-ref.txt @@ -1 +1 @@ -transient-fed +transient-fed-dec From 2900198c99aed6d047697be9e4812e7ef20c2412 Mon Sep 17 00:00:00 2001 From: Chadlia Jerad Date: Sat, 4 Apr 2026 11:38:11 +0100 Subject: [PATCH 18/22] Apply clang-format --- core/federated/RTI/rti_remote.c | 63 ++++++------ core/federated/federate.c | 100 +++++++++++--------- include/core/federated/federate.h | 24 ++--- include/core/federated/network/net_common.h | 9 +- 4 files changed, 107 insertions(+), 89 deletions(-) diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index 9437c2e13..459df9f32 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -25,9 +25,9 @@ */ #include "rti_remote.h" +#include "clock.h" // For lf_clock_cond_timedwait() #include "net_util.h" #include -#include "clock.h" // For lf_clock_cond_timedwait() // Global variables defined in tag.c: extern instant_t start_time; @@ -227,8 +227,8 @@ static int get_num_absent_upstream_transients(federate_info_t* fed) { } /** - * @brief Send MSG_TYPE_UPSTREAM_CONNECTED to the specified `destination` if it is connected to the RTI, - * telling it that the specified `upstream` federate is also now connected. + * @brief Send MSG_TYPE_UPSTREAM_CONNECTED to the specified `destination` if it is connected to the + * RTI, telling it that the specified `upstream` federate is also now connected. * * This function assumes that the mutex lock is already held. * @param destination The destination federate. @@ -651,9 +651,9 @@ void handle_timed_message(federate_info_t* sending_federate, unsigned char* buff // issue a TAG before this message has been forwarded. LF_MUTEX_LOCK(&rti_mutex); - // If the destination federate is no longer connected, or it is a transient that has not started executing yet - // (the delayed intended tag is less than the effective start tag of the destination), issue a warning, remove the - // message from the socket, and return. + // If the destination federate is no longer connected, or it is a transient that has not started + // executing yet (the delayed intended tag is less than the effective start tag of the + // destination), issue a warning, remove the message from the socket, and return. federate_info_t* fed = GET_FED_INFO(federate_id); interval_t delay = NEVER; for (int i = 0; i < fed->enclave.num_immediate_upstreams; i++) { @@ -771,7 +771,8 @@ void handle_next_event_tag(federate_info_t* fed) { // Acquire a mutex lock to ensure that this state does not change while a // message is in transport or being used to determine a TAG. LF_MUTEX_LOCK(&rti_mutex); // FIXME: Instead of using a mutex, it might be more efficient to use a - // select() mechanism to read and process federates' buffers in an orderly fashion. + // select() mechanism to read and process federates' buffers in an + // orderly fashion. tag_t intended_tag = extract_tag(buffer); if (rti_remote->base.tracing_enabled) { @@ -1004,7 +1005,7 @@ void handle_address_query(uint16_t fed_id) { // outbound_transients array (if not already present). if (remote_is_transient) { LF_MUTEX_LOCK(&rti_mutex); - bool already_registered = false; + bool already_registered = false; int32_t i = 0; for (; i < fed->number_of_outbound_transients; i++) { if (fed->outbound_transients[i] == (int32_t)remote_fed_id) { @@ -1076,17 +1077,18 @@ void handle_address_ad(uint16_t federate_id) { } /** - * @brief Send the global federation start time and the federate-specific starting tag to the specified federate. + * @brief Send the global federation start time and the federate-specific starting tag to the + * specified federate. * - * For persistent federates and transient federates that happen to join during federation startup, the - * `federation_start_time` will match the time in the `federate_start_tag`, and the microstep will be 0. - * For a transient federate that joins later, the time in the `federate_start_tag` will be greater than the - * federation_start_time`. + * For persistent federates and transient federates that happen to join during federation startup, + * the `federation_start_time` will match the time in the `federate_start_tag`, and the microstep + * will be 0. For a transient federate that joins later, the time in the `federate_start_tag` will + * be greater than the federation_start_time`. * * - * Before sending the start time and tag, this function notifies my_fed of all upstream transient federates that are - * connected. After sending the start time and tag, and if my_fed is transient, notify federates downstream of its - * connection, ensuring proper handling of zero-delay cycles. + * Before sending the start time and tag, this function notifies my_fed of all upstream transient + * federates that are connected. After sending the start time and tag, and if my_fed is transient, + * notify federates downstream of its connection, ensuring proper handling of zero-delay cycles. * * This function assumes that the mutex lock is already held. * @@ -1096,8 +1098,8 @@ void handle_address_ad(uint16_t federate_id) { */ static void send_start_tag_locked(federate_info_t* my_fed, instant_t federation_start_time, tag_t federate_start_tag) { // Notify my_fed of any upstream transient federates that are connected. - // This has to occur before sending the start tag so that my_fed does not begin executing thinking that these - // upstream federates are not connected. + // This has to occur before sending the start tag so that my_fed does not begin executing thinking + // that these upstream federates are not connected. for (int i = 0; i < my_fed->enclave.num_immediate_upstreams; i++) { federate_info_t* fed = GET_FED_INFO(my_fed->enclave.immediate_upstreams[i]); if (fed->is_transient && fed->enclave.state == GRANTED) { @@ -1105,7 +1107,7 @@ static void send_start_tag_locked(federate_info_t* my_fed, instant_t federation_ } } send_outbound_connected_locked(my_fed); - + // Send back to the federate the maximum time plus an offset on a TIMESTAMP_START // message. // If it is a persistent federate, only the start_time is sent. If, however, it is a transient @@ -1266,7 +1268,8 @@ void handle_timestamp(federate_info_t* my_fed) { // effective_start_time of the federate, cancel it. // FIXME: Should this be higher-than or equal to? // FIXME: Also, won't the grant simply be lost? - // If the joining federate doesn't send anything, the downstream federate won't issue another NET. + // If the joining federate doesn't send anything, the downstream federate won't issue another + // NET. for (int j = 0; j < my_fed->enclave.num_immediate_downstreams; j++) { federate_info_t* downstream = GET_FED_INFO(my_fed->enclave.immediate_downstreams[j]); @@ -1467,7 +1470,8 @@ static void handle_federate_failed(federate_info_t* my_fed) { // Check downstream federates to see whether they should now be granted a TAG. // To handle cycles, need to create a boolean array to keep // track of which upstream federates have been visited. - bool* visited = (bool*)calloc(rti_remote->base.number_of_scheduling_nodes, sizeof(bool)); // Initializes to 0. + bool* visited = (bool*)calloc(rti_remote->base.number_of_scheduling_nodes, + sizeof(bool)); // Initializes to 0. notify_downstream_advance_grant_if_safe(&(my_fed->enclave), visited); free(visited); @@ -1508,7 +1512,8 @@ static void handle_federate_resign(federate_info_t* my_fed) { // Check downstream federates to see whether they should now be granted a TAG. // To handle cycles, need to create a boolean array to keep // track of which upstream federates have been visited. - bool* visited = (bool*)calloc(rti_remote->base.number_of_scheduling_nodes, sizeof(bool)); // Initializes to 0. + bool* visited = (bool*)calloc(rti_remote->base.number_of_scheduling_nodes, + sizeof(bool)); // Initializes to 0. notify_downstream_advance_grant_if_safe(&(my_fed->enclave), visited); free(visited); @@ -1775,7 +1780,8 @@ static int32_t receive_and_check_fed_id_message(int* socket_id) { // The MSG_TYPE_FED_IDS message has the right federation ID. - // Get the peer address from the connected socket_id. Then assign it as the federate's socket server. + // Get the peer address from the connected socket_id. Then assign it as the federate's socket + // server. struct sockaddr_in peer_addr; socklen_t addr_len = sizeof(peer_addr); if (getpeername(*socket_id, (struct sockaddr*)&peer_addr, &addr_len) != 0) { @@ -2191,10 +2197,11 @@ void send_stop(federate_info_t* fed) { } void* lf_connect_to_transient_federates_thread(void* nothing) { - // This loop will continue to accept connections of transient federates, as soon as there is room, or enable hot swap + // This loop will continue to accept connections of transient federates, as soon as there is room, + // or enable hot swap while (!rti_remote->all_persistent_federates_exited) { - // Continue waiting for an incoming connection requests from transients to join, or for hot swap. - // Wait for an incoming connection request. + // Continue waiting for an incoming connection requests from transients to join, or for hot + // swap. Wait for an incoming connection request. int socket_id = accept_socket(rti_remote->socket_descriptor_TCP, -1); // If accept failed (e.g., socket was shut down), exit the loop. @@ -2410,8 +2417,8 @@ void reset_transient_federate(federate_info_t* fed) { for (int32_t i = 0; i < num_transients; i++) { fed->outbound_transients[i] = -1; } - // Whenver a transient resigns or leaves, invalidate all federates, so that all min_delays_upstream - // get re-computed. + // Whenver a transient resigns or leaves, invalidate all federates, so that all + // min_delays_upstream get re-computed. // FIXME: Maybe optimize it to only invalidate those affected by the transient invalidate_min_delays(); } diff --git a/core/federated/federate.c b/core/federated/federate.c index fbdde1b76..2c710aac8 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -16,15 +16,16 @@ #include // inet_ntop & inet_pton #include // Defines getaddrinfo(), freeaddrinfo() and struct addrinfo. #include // Defines struct sockaddr_in +#include // Defines strerror() +#include // Defines memset(), strnlen(), strncmp(), strncpy() #include #include // Defines read(), write(), and close() -#include // Defines memset(), strnlen(), strncmp(), strncpy() -#include // Defines strerror() #include #include // Defined perror(), errno #include // Defines bzero(). +#include "api/schedule.h" #include "clock-sync.h" #include "federate.h" #include "net_common.h" @@ -32,13 +33,12 @@ #include "reactor.h" #include "reactor_common.h" #include "reactor_threaded.h" -#include "api/schedule.h" #include "scheduler.h" #include "tracepoint.h" #ifdef FEDERATED_AUTHENTICATED -#include // For secure random number generation. #include // For HMAC-based authentication of federates. +#include // For secure random number generation. #endif // Global variables defined in tag.c: @@ -267,7 +267,8 @@ static void update_last_known_status_on_input_ports(tag_t tag, environment_t* en * @param env The top-level environment, whose mutex is assumed to be held. * @param tag The tag on which the latest status of the specified network input port is known. * @param port_id The port ID. - * @param warn If true, print a warning if the tag is less than the last known status tag of the port. + * @param warn If true, print a warning if the tag is less than the last known status tag of the + * port. */ static void update_last_known_status_on_input_port(environment_t* env, tag_t tag, int port_id, bool warn) { if (lf_tag_compare(tag, env->current_tag) < 0) @@ -303,7 +304,8 @@ static void update_last_known_status_on_input_port(environment_t* env, tag_t tag } /** - * @brief Mark all the input ports connected to the given federate as known to be absent until FOREVER. + * @brief Mark all the input ports connected to the given federate as known to be absent until + * FOREVER. * * This does nothing if the federate is not using decentralized coordination. * This function acquires the mutex on the top-level environment. @@ -365,8 +367,8 @@ static void update_last_known_status_on_action(environment_t* env, lf_action_bas tag = env->current_tag; trigger_t* input_port_trigger = action->trigger; if (lf_tag_compare(tag, input_port_trigger->last_known_status_tag) > 0) { - LF_PRINT_LOG("Updating the last known status tag of port for upstream absent transient federate from " PRINTF_TAG - " to " PRINTF_TAG ".", + LF_PRINT_LOG("Updating the last known status tag of port for upstream absent transient " + "federate from " PRINTF_TAG " to " PRINTF_TAG ".", input_port_trigger->last_known_status_tag.time - lf_time_start(), input_port_trigger->last_known_status_tag.microstep, tag.time - lf_time_start(), tag.microstep); input_port_trigger->last_known_status_tag = tag; @@ -462,7 +464,8 @@ static trigger_handle_t schedule_message_received_from_network_locked(environmen * cycles, where processing at a tag must be able to begin before all messages have arrived * at that tag. This returns true if the following conditions are all true: * - * 1. the first reaction triggered has a level >= MLAA (a port is or will be blocked on this trigger); + * 1. the first reaction triggered has a level >= MLAA (a port is or will be blocked on this + * trigger); * 2. the intended_tag is equal to the current tag of the environment; * 3. the intended_tag is greater than the last_tag of the trigger; * 4. the intended_tag is greater than the last_known_status_tag of the trigger; @@ -647,7 +650,8 @@ static int handle_tagged_message(int* socket, int fed_id) { element_count = length / element_size; if (length % element_size != 0) { // Log a warning if the payload size is not an exact multiple of element_size. - lf_print_warning("Received message for port %d with payload length %zu bytes not a multiple of element_size %zu; " + lf_print_warning("Received message for port %d with payload length %zu bytes not a multiple " + "of element_size %zu; " "truncating to %zu elements.", port_id, (size_t)length, element_size, element_count); } @@ -683,8 +687,8 @@ static int handle_tagged_message(int* socket, int fed_id) { tag_t actual_tag = intended_tag; #ifdef FEDERATED_DECENTRALIZED - // For tardy messages in decentralized coordination, we need to figure out what the actual tag will be. - // (Centralized coordination errors out with tardy messages). + // For tardy messages in decentralized coordination, we need to figure out what the actual tag + // will be. (Centralized coordination errors out with tardy messages). if (lf_tag_compare(intended_tag, env->current_tag) <= 0) { // Message is tardy. actual_tag = env->current_tag; @@ -1033,8 +1037,8 @@ static void handle_outbound_connected_message(void) { tracepoint_federate_from_rti(receive_OUTBOUND_CONNECTED, _lf_my_fed_id, NULL); LF_PRINT_DEBUG("Received notification that outbound transient federate %d has connected.", remote_federate_id); - // - lf_connect_to_federate(remote_federate_id, true); + // + lf_connect_to_federate(remote_federate_id, true); } /** @@ -1080,9 +1084,8 @@ static instant_t get_start_time_from_rti(instant_t my_physical_time) { // consume the queued MSG_TYPE_TIMESTAMP bytes, causing a fatal "Unexpected reply of type 2". // Fix: read and save each downstream federate ID, then call lf_connect_to_federate() for // each one only after MSG_TYPE_TIMESTAMP has been received and the loop has exited. - uint16_t pending_downstream_ids[_fed.number_of_outbound_p2p_transients > 0 - ? _fed.number_of_outbound_p2p_transients - : 1]; + uint16_t + pending_downstream_ids[_fed.number_of_outbound_p2p_transients > 0 ? _fed.number_of_outbound_p2p_transients : 1]; size_t num_pending_downstream = 0; while (true) { @@ -1117,7 +1120,8 @@ static instant_t get_start_time_from_rti(instant_t my_physical_time) { "Failed to read outbound connected federate ID."); tracepoint_federate_from_rti(receive_OUTBOUND_CONNECTED, _lf_my_fed_id, NULL); uint16_t remote_federate_id = extract_uint16(id_buf); - LF_PRINT_DEBUG("Deferring P2P connection to downstream transient federate %d until after start time is received.", + LF_PRINT_DEBUG("Deferring P2P connection to downstream transient federate %d until after " + "start time is received.", remote_federate_id); if (num_pending_downstream < _fed.number_of_outbound_p2p_transients) { pending_downstream_ids[num_pending_downstream++] = remote_federate_id; @@ -1223,7 +1227,8 @@ static void handle_tag_advance_grant(void) { #ifdef FEDERATED_DECENTRALIZED /** - * @brief Return true if there is an input port among those with a given STAA whose status is unknown. + * @brief Return true if there is an input port among those with a given STAA whose status is + * unknown. * * @param staa_elem A record of all input port actions. */ @@ -1294,11 +1299,11 @@ static void* update_ports_from_staa_offsets(void* args) { tag_t tag_when_started_waiting = lf_tag(env); for (size_t i = 0; i < staa_lst_size; ++i) { staa_t* staa_elem = staa_lst[i]; - // The staa_elem is adjusted in the code generator to have subtracted the delay on the connection. - // The list is sorted in increasing order of adjusted STAA offsets. - // We need to add the lf_fed_STA_offset to the wait time and guard against overflow. - // Skip this if the current tag is the dynamically determined stop time - // (due to a call to lf_request_stop()). This is indicated by a stop_tag with microstep greater than 0. + // The staa_elem is adjusted in the code generator to have subtracted the delay on the + // connection. The list is sorted in increasing order of adjusted STAA offsets. We need to add + // the lf_fed_STA_offset to the wait time and guard against overflow. Skip this if the current + // tag is the dynamically determined stop time (due to a call to lf_request_stop()). This is + // indicated by a stop_tag with microstep greater than 0. interval_t wait_time = 0; if (lf_tag_compare(env->current_tag, env->stop_tag) != 0 || env->stop_tag.microstep == 0) { wait_time = lf_time_add(staa_elem->STAA, lf_fed_STA_offset); @@ -1330,10 +1335,10 @@ static void* update_ports_from_staa_offsets(void* args) { } /* Possibly useful for debugging: tag_t current_tag = lf_tag(env); - LF_PRINT_DEBUG("**** (update thread) Assuming absent! " PRINTF_TAG, current_tag.time - lf_time_start(), - current_tag.microstep); LF_PRINT_DEBUG("**** (update thread) Lag is " PRINTF_TIME, current_tag.time - - lf_time_physical()); LF_PRINT_DEBUG("**** (update thread) Wait until time is " PRINTF_TIME, - wait_until_time - lf_time_start()); + LF_PRINT_DEBUG("**** (update thread) Assuming absent! " PRINTF_TAG, current_tag.time - + lf_time_start(), current_tag.microstep); LF_PRINT_DEBUG("**** (update thread) Lag is " + PRINTF_TIME, current_tag.time - lf_time_physical()); LF_PRINT_DEBUG("**** (update thread) + Wait until time is " PRINTF_TIME, wait_until_time - lf_time_start()); */ // Mark input ports absent. @@ -1547,7 +1552,8 @@ static void handle_stop_granted_message() { // Sanity check. if (lf_tag_compare(received_stop_tag, env[i].current_tag) <= 0) { - lf_print_error("RTI granted a MSG_TYPE_STOP_GRANTED tag that is equal to or less than this federate's current " + lf_print_error("RTI granted a MSG_TYPE_STOP_GRANTED tag that is equal to or less than this " + "federate's current " "tag " PRINTF_TAG ". " "Stopping at the next microstep instead.", env[i].current_tag.time - start_time, env[i].current_tag.microstep); @@ -1822,7 +1828,7 @@ static void* listen_to_rti_TCP(void* args) { break; case MSG_TYPE_OUTBOUND_DISCONNECTED: handle_outbound_disconnected_message(); - break; + break; case MSG_TYPE_CLOCK_SYNC_T1: case MSG_TYPE_CLOCK_SYNC_T4: lf_print_error("Federate %d received unexpected clock sync message from RTI on TCP socket.", _lf_my_fed_id); @@ -1970,7 +1976,7 @@ void lf_connect_to_federate(uint16_t remote_federate_id, bool is_transient) { struct in_addr host_ip_addr; instant_t start_connect = lf_time_physical(); // If the remote federate if oersistent, iterate until we get a valid port number from the RTI, - // If not, execute only once, as a request registration. + // If not, execute only once, as a request registration. while (port == -1 && !_lf_termination_executed) { buffer[0] = MSG_TYPE_ADDRESS_QUERY; // NOTE: Sending messages in little endian. @@ -2018,8 +2024,8 @@ void lf_connect_to_federate(uint16_t remote_federate_id, bool is_transient) { // Wait ADDRESS_QUERY_RETRY_INTERVAL nanoseconds. lf_sleep(ADDRESS_QUERY_RETRY_INTERVAL); } else if (port == -1 && is_transient) { - // For transient federates, we only execute once, as a request registration. If the RTI does not reply - // with a valid port number, we treat it normally and return. + // For transient federates, we only execute once, as a request registration. If the RTI does + // not reply with a valid port number, we treat it normally and return. return; } } @@ -2282,9 +2288,8 @@ void* lf_handle_p2p_connections_from_federates(void* env_arg) { // Allocate memory to store thread IDs. _fed.inbound_socket_listeners = (lf_thread_t*)calloc(_fed.number_of_inbound_p2p_connections, sizeof(lf_thread_t)); while (!_lf_termination_executed) { - // Case where all inbound connections are to persistent federates - if (received_federates == _fed.number_of_inbound_p2p_connections && - _fed.number_of_inbound_p2p_transients == 0) { + // Case where all inbound connections are to persistent federates + if (received_federates == _fed.number_of_inbound_p2p_connections && _fed.number_of_inbound_p2p_transients == 0) { break; } // Wait for an incoming connection request. @@ -2391,7 +2396,8 @@ void lf_latest_tag_confirmed(tag_t tag_to_send) { } _lf_get_environments(&env); if (!env->need_to_send_LTC) { - LF_PRINT_LOG("Skip sending Latest Tag Confirmed (LTC) to the RTI because there was no tagged message with the " + LF_PRINT_LOG("Skip sending Latest Tag Confirmed (LTC) to the RTI because there was no tagged " + "message with the " "tag " PRINTF_TAG " that this federate has received.", tag_to_send.time - start_time, tag_to_send.microstep); return; @@ -2460,7 +2466,7 @@ int lf_send_message(int message_type, unsigned short port, unsigned short federa lf_print_error("lf_send_message: Unsupported message type (%d).", message_type); return -1; } - + // If there are outbound transients, check whether the destination is one of them. // If it is and its socket is shut, gracefully skip the send. if (_fed.outbound_p2p_connection_is_transient[federate] && _fed.sockets_for_outbound_p2p_connections[federate] < 0) { @@ -2533,7 +2539,8 @@ tag_t lf_send_next_event_tag(environment_t* env, tag_t tag, bool wait_for_reply) LF_PRINT_DEBUG("Granted tag " PRINTF_TAG " because TAG or PTAG has been received.", _fed.last_TAG.time - start_time, _fed.last_TAG.microstep); - // In case a downstream federate needs the NET of this tag or has not received any DNET, send NET. + // In case a downstream federate needs the NET of this tag or has not received any DNET, send + // NET. if (!_fed.received_any_DNET || (lf_tag_compare(_fed.last_DNET, tag) < 0 && lf_tag_compare(_fed.last_DNET, _fed.last_sent_NET) >= 0)) { send_tag(MSG_TYPE_NEXT_EVENT_TAG, tag); @@ -2636,9 +2643,10 @@ tag_t lf_send_next_event_tag(environment_t* env, tag_t tag, bool wait_for_reply) return tag; } - // This federate should repeatedly advance its tag to ensure downstream federates can make progress. - // Before advancing to the next tag, we need to wait some time so that we don't overwhelm the network and the - // RTI. That amount of time will be no greater than ADVANCE_MESSAGE_INTERVAL in the future. + // This federate should repeatedly advance its tag to ensure downstream federates can make + // progress. Before advancing to the next tag, we need to wait some time so that we don't + // overwhelm the network and the RTI. That amount of time will be no greater than + // ADVANCE_MESSAGE_INTERVAL in the future. LF_PRINT_DEBUG("Waiting for physical time to elapse or an event on the event queue."); instant_t wait_until_time_ns = lf_time_physical() + ADVANCE_MESSAGE_INTERVAL; @@ -2764,7 +2772,7 @@ int lf_send_tagged_message(environment_t* env, interval_t additional_delay, int // Only print a warning if the destination is a known outbound transient. lf_print_info("The destination transient federate %d is not connected. Abort sending!", federate); return 0; - } + } size_t buffer_head = 0; // First byte is the message type. @@ -2982,9 +2990,9 @@ instant_t lf_wait_until_time(tag_t tag) { // Do not add the STA if the tag is the starting tag. if (tag.time != start_time || tag.microstep != 0u) { - // Apply the STA to the logical time, but only if at least one network input port is not known up to this tag. - // Subtract one microstep because it is sufficient to commit to a tag if the input ports are known - // up to one microstep earlier. + // Apply the STA to the logical time, but only if at least one network input port is not known + // up to this tag. Subtract one microstep because it is sufficient to commit to a tag if the + // input ports are known up to one microstep earlier. if (tag.microstep > 0) { tag.microstep--; } else { diff --git a/include/core/federated/federate.h b/include/core/federated/federate.h index 5596c29f8..59981628b 100644 --- a/include/core/federated/federate.h +++ b/include/core/federated/federate.h @@ -8,11 +8,11 @@ * @author Edward A. Lee * @author Anirudh Rengarajsm * - * This file defines the core data structures and functions used in federated Lingua Franca programs. - * It includes the federate instance structure that tracks the state of a federate, including its - * connections to the RTI and other federates, message handling, and coordination mechanisms. - * The file also provides functions for managing these connections, sending and receiving messages, - * and handling various aspects of federated execution. + * This file defines the core data structures and functions used in federated Lingua Franca + * programs. It includes the federate instance structure that tracks the state of a federate, + * including its connections to the RTI and other federates, message handling, and coordination + * mechanisms. The file also provides functions for managing these connections, sending and + * receiving messages, and handling various aspects of federated execution. */ #ifndef FEDERATE_H @@ -20,11 +20,11 @@ #include -#include "tag.h" -#include "lf_types.h" #include "environment.h" +#include "lf_types.h" #include "low_level_platform.h" #include "socket_common.h" +#include "tag.h" #ifndef ADVANCE_MESSAGE_INTERVAL #define ADVANCE_MESSAGE_INTERVAL MSEC(10) @@ -531,9 +531,10 @@ int lf_send_stop_request_to_rti(tag_t stop_tag); * @brief Send a tagged message to the specified port of the specified federate. * @ingroup Federated * - * The tag will be the current tag of the specified environment delayed by the specified additional_delay. - * If the delayed tag falls after the timeout time, then the message is not sent and -1 is returned. - * The caller can reuse or free the memory storing the message after this returns. + * The tag will be the current tag of the specified environment delayed by the specified + * additional_delay. If the delayed tag falls after the timeout time, then the message is not sent + * and -1 is returned. The caller can reuse or free the memory storing the message after this + * returns. * * If the message fails to send (e.g. the socket connection is broken), then the * response depends on the message_type. For MSG_TYPE_TAGGED_MESSAGE, the message is @@ -600,7 +601,8 @@ void lf_spawn_staa_thread(void); void lf_stall_advance_level_federation(environment_t* env, size_t level); /** - * @brief Version of lf_stall_advance_level_federation() that assumes the caller holds the mutex lock. + * @brief Version of lf_stall_advance_level_federation() that assumes the caller holds the mutex + * lock. * @ingroup Federated * * @param level The level to which we would like to advance. diff --git a/include/core/federated/network/net_common.h b/include/core/federated/network/net_common.h index 73360dc2c..4bddbcf2a 100644 --- a/include/core/federated/network/net_common.h +++ b/include/core/federated/network/net_common.h @@ -174,9 +174,9 @@ * request). When the RTI has gathered all the stop tags * from federates (that are still connected), it will decide on a common stop tag * which is the maximum of the seen stop tag and answer with a MSG_TYPE_STOP_GRANTED. The federate - * sending the MSG_TYPE_STOP_REQUEST and federates sending the MSG_TYPE_STOP_REQUEST_REPLY will freeze - * the advancement of tag until they receive the MSG_TYPE_STOP_GRANTED message, in which - * case they might continue their execution until the stop tag has been reached. + * sending the MSG_TYPE_STOP_REQUEST and federates sending the MSG_TYPE_STOP_REQUEST_REPLY will + * freeze the advancement of tag until they receive the MSG_TYPE_STOP_GRANTED message, in which case + * they might continue their execution until the stop tag has been reached. * */ @@ -193,7 +193,8 @@ #define FED_COM_BUFFER_SIZE 256u /** - * @brief Time that a federate waits before asking the RTI again for the port and IP address of a federate. + * @brief Time that a federate waits before asking the RTI again for the port and IP address of a + * federate. * @ingroup Federated * * The federate repeatedly sends an MSG_TYPE_ADDRESS_QUERY message after the RTI responds that it From 187fd21599c3001357055bb17f1eb84e5cd6e4cd Mon Sep 17 00:00:00 2001 From: Chadlia Jerad Date: Sat, 4 Apr 2026 15:24:56 +0100 Subject: [PATCH 19/22] Fix 2 bugs due to network behavior that is different between linux and macos --- core/federated/federate.c | 18 +++++++++++++++--- core/federated/network/socket_common.c | 6 +++++- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/core/federated/federate.c b/core/federated/federate.c index 2c710aac8..57f12bb06 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -2078,9 +2078,21 @@ void lf_connect_to_federate(uint16_t remote_federate_id, bool is_transient) { write_to_socket_fail_on_error(&socket_id, federation_id_length, (unsigned char*)federation_metadata.federation_id, NULL, "Failed to send federation id to federate %d.", remote_federate_id); - read_from_socket_fail_on_error(&socket_id, 1, (unsigned char*)buffer, - "Failed to read MSG_TYPE_ACK from federate %d in response to sending fed_id.", - remote_federate_id); + // For transient outbound connections, a connection reset from the remote side + // (e.g. macOS resets the TCP connection if the accept loop hasn't run yet) is + // a soft error: the RTI will resend MSG_TYPE_OUTBOUND_CONNECTED when the transient + // is ready. Using the non-fatal read here prevents a spurious fatal exit on macOS. + int ack_read_failed = read_from_socket_close_on_error(&socket_id, 1, (unsigned char*)buffer); + if (ack_read_failed) { + if (is_transient) { + lf_print_warning("Failed to read MSG_TYPE_ACK from transient federate %d. Connection may have been reset. " + "Will retry when RTI notifies reconnection.", + remote_federate_id); + return; + } + lf_print_error_and_exit("Failed to read MSG_TYPE_ACK from federate %d in response to sending fed_id.", + remote_federate_id); + } if (buffer[0] != MSG_TYPE_ACK) { // Get the error code. read_from_socket_fail_on_error(&socket_id, 1, (unsigned char*)buffer, diff --git a/core/federated/network/socket_common.c b/core/federated/network/socket_common.c index 666ea8a71..b73801d03 100644 --- a/core/federated/network/socket_common.c +++ b/core/federated/network/socket_common.c @@ -325,8 +325,12 @@ void read_from_socket_fail_on_error(int* socket, size_t num_bytes, unsigned char // Read failed. if (format != NULL) { va_start(args, format); - lf_print_error_system_failure(format, args); + // Use lf_vprint_error (va_list variant) rather than lf_print_error_system_failure + // (variadic variant). Passing a va_list to a '...' function is undefined behaviour + // and manifests as garbage argument values on macOS due to ABI differences. + lf_vprint_error(format, args); va_end(args); + lf_print_error_and_exit("Error %d: %s", errno, strerror(errno)); } else { lf_print_error_system_failure("Failed to read from socket."); } From be80a597b24d891b1fcad3d93e503fc754435719 Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Tue, 7 Apr 2026 14:18:45 -0700 Subject: [PATCH 20/22] Fix merge conflicts. --- network/impl/src/lf_socket_support.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/network/impl/src/lf_socket_support.c b/network/impl/src/lf_socket_support.c index 76aee2308..85eecd484 100644 --- a/network/impl/src/lf_socket_support.c +++ b/network/impl/src/lf_socket_support.c @@ -123,8 +123,12 @@ void read_from_net_fail_on_error(net_abstraction_t net_abs, size_t num_bytes, un // Read failed. if (format != NULL) { va_start(args, format); - lf_print_error_system_failure(format, args); + // Use lf_vprint_error (va_list variant) rather than lf_print_error_system_failure + // (variadic variant). Passing a va_list to a '...' function is undefined behaviour + // and manifests as garbage argument values on macOS due to ABI differences. + lf_vprint_error(format, args); va_end(args); + lf_print_error_and_exit("Error %d: %s", errno, strerror(errno)); } else { lf_print_error_system_failure("Failed to read from socket."); } From 461cf4f9a813e9b2dd6c0ae58eea499b746687a4 Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Tue, 7 Apr 2026 15:34:26 -0700 Subject: [PATCH 21/22] Merge conflict fix. --- core/federated/RTI/rti_remote.c | 4 ++-- core/federated/federate.c | 26 +++++++++++++------------- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index 7f1a09e58..cc23b009a 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -292,7 +292,7 @@ static void send_outbound_connected_locked(federate_info_t* my_fed) { } for (int32_t j = 0; j < fed->number_of_outbound_transients; j++) { if (fed->outbound_transients[j] == (int32_t)my_fed->enclave.id) { - if (write_to_socket_close_on_error(&fed->socket, MSG_TYPE_OUTBOUND_CONNECTED_LENGTH, buffer)) { + if (write_to_net_close_on_error(fed->net, MSG_TYPE_OUTBOUND_CONNECTED_LENGTH, buffer)) { lf_print_warning("RTI: Failed to send outbound connected message to federate %d.", fed->enclave.id); } if (rti_remote->base.tracing_enabled) { @@ -325,7 +325,7 @@ static void send_outbound_disconnected_locked(federate_info_t* my_fed) { } for (int32_t j = 0; j < fed->number_of_outbound_transients; j++) { if (fed->outbound_transients[j] == (int32_t)my_fed->enclave.id) { - if (write_to_socket_close_on_error(&fed->socket, MSG_TYPE_OUTBOUND_DISCONNECTED_LENGTH, buffer)) { + if (write_to_net_close_on_error(fed->net, MSG_TYPE_OUTBOUND_DISCONNECTED_LENGTH, buffer)) { lf_print_warning("RTI: Failed to send outbound disconnected message to federate %d.", fed->enclave.id); } if (rti_remote->base.tracing_enabled) { diff --git a/core/federated/federate.c b/core/federated/federate.c index bed4732c9..5963ac183 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -1005,12 +1005,12 @@ static void handle_upstream_disconnected_message(void) { * Reads the outbound federate's ID, then synchronously queries the RTI for its address * and establishes (or re-establishes) the outbound P2P connection to it. * This function is called inline from listen_to_rti_TCP or get_start_time_from_rti, - * so it reads the address-query reply directly from socket_TCP_RTI. + * so it reads the address-query reply directly from net_to_RTI. */ static void handle_outbound_connected_message(void) { size_t bytes_to_read = sizeof(uint16_t); unsigned char buffer[bytes_to_read]; - read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, bytes_to_read, buffer, NULL, + read_from_net_fail_on_error(_fed.net_to_RTI, bytes_to_read, buffer, NULL, "Failed to read outbound connected message from RTI."); uint16_t remote_federate_id = extract_uint16(buffer); tracepoint_federate_from_rti(receive_OUTBOUND_CONNECTED, _lf_my_fed_id, NULL); @@ -1023,18 +1023,18 @@ static void handle_outbound_connected_message(void) { /** * @brief Handle message from the RTI that a transient downstream federate has disconnected. * - * Reads the downstream federate's ID and closes the outbound P2P socket to it. + * Reads the downstream federate's ID and closes the outbound P2P net to it. */ static void handle_outbound_disconnected_message(void) { size_t bytes_to_read = sizeof(uint16_t); unsigned char buffer[bytes_to_read]; - read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, bytes_to_read, buffer, NULL, + read_from_net_fail_on_error(_fed.net_to_RTI, bytes_to_read, buffer, NULL, "Failed to read outbound disconnected message from RTI."); uint16_t remote_federate_id = extract_uint16(buffer); tracepoint_federate_from_rti(receive_OUTBOUND_DISCONNECTED, _lf_my_fed_id, NULL); LF_PRINT_DEBUG("Received notification that downstream transient federate %d has disconnected.", remote_federate_id); - shutdown_socket(&_fed.sockets_for_outbound_p2p_connections[remote_federate_id], false); + shutdown_net(_fed.net_for_outbound_p2p_connections[remote_federate_id], false); } /** @@ -1059,7 +1059,7 @@ static instant_t get_start_time_from_rti(instant_t my_physical_time) { // here is unsafe because the RTI may have already written MSG_TYPE_TIMESTAMP into this // federate's TCP stream immediately after MSG_TYPE_OUTBOUND_CONNECTED (from a concurrent // send_start_tag_locked call for the transient federate). If we call lf_connect_to_federate() - // now it will read from the socket expecting MSG_TYPE_ADDRESS_QUERY_REPLY but will instead + // now it will read from the net_abs expecting MSG_TYPE_ADDRESS_QUERY_REPLY but will instead // consume the queued MSG_TYPE_TIMESTAMP bytes, causing a fatal "Unexpected reply of type 2". // Fix: read and save each downstream federate ID, then call lf_connect_to_federate() for // each one only after MSG_TYPE_TIMESTAMP has been received and the loop has exited. @@ -1089,13 +1089,13 @@ static instant_t get_start_time_from_rti(instant_t my_physical_time) { continue; } else if (buffer[0] == MSG_TYPE_OUTBOUND_CONNECTED) { // Defer lf_connect_to_federate() until after MSG_TYPE_TIMESTAMP is received. - // Read the federate ID payload now to drain the socket, but do not attempt the - // address query yet: the RTI may have written MSG_TYPE_TIMESTAMP into this socket + // Read the federate ID payload now to drain the net_abs, but do not attempt the + // address query yet: the RTI may have written MSG_TYPE_TIMESTAMP into this net_abs // right after MSG_TYPE_OUTBOUND_CONNECTED (from send_start_tag_locked running // concurrently for the joining transient), so any read inside lf_connect_to_federate // would consume those bytes and crash with "Unexpected reply of type 2". unsigned char id_buf[sizeof(uint16_t)]; - read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, sizeof(uint16_t), id_buf, NULL, + read_from_net_fail_on_error(_fed.net_to_RTI, sizeof(uint16_t), id_buf, NULL, "Failed to read outbound connected federate ID."); tracepoint_federate_from_rti(receive_OUTBOUND_CONNECTED, _lf_my_fed_id, NULL); uint16_t remote_federate_id = extract_uint16(id_buf); @@ -1962,7 +1962,7 @@ void lf_connect_to_federate(uint16_t remote_federate_id, bool is_transient) { "Failed to read the requested port number for federate %d from RTI.", remote_federate_id); - LF_MUTEX_UNLOCK(&lf_outbound_socket_mutex); + LF_MUTEX_UNLOCK(&lf_outbound_net_mutex); if (buffer[0] != MSG_TYPE_ADDRESS_QUERY_REPLY) { // Unexpected reply. Could be that RTI has failed and sent a resignation. @@ -2463,8 +2463,8 @@ int lf_send_message(int message_type, unsigned short port, unsigned short federa } // If there are outbound transients, check whether the destination is one of them. - // If it is and its socket is shut, gracefully skip the send. - if (_fed.outbound_p2p_connection_is_transient[federate] && _fed.sockets_for_outbound_p2p_connections[federate] < 0) { + // If it is and its net_abs is shut, gracefully skip the send. + if (_fed.outbound_p2p_connection_is_transient[federate] && _fed.net_for_outbound_p2p_connections[federate] < 0) { lf_print_info("The destination transient federate %d is not connected. Abort sending!", federate); return 0; } @@ -2763,7 +2763,7 @@ int lf_send_tagged_message(environment_t* env, interval_t additional_delay, int lf_print_error("lf_send_message: Unsupported message type (%d).", message_type); return -1; } - if (_fed.outbound_p2p_connection_is_transient[federate] && _fed.sockets_for_outbound_p2p_connections[federate] < 0) { + if (_fed.outbound_p2p_connection_is_transient[federate] && _fed.net_for_outbound_p2p_connections[federate] < 0) { // Only print a warning if the destination is a known outbound transient. lf_print_info("The destination transient federate %d is not connected. Abort sending!", federate); return 0; From 9c3f741fbbbbc3e2463fa81f2cb7f8cb9151b298 Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Tue, 7 Apr 2026 15:48:55 -0700 Subject: [PATCH 22/22] Minor fix & formatting. --- core/federated/RTI/rti_remote.c | 2 +- core/federated/federate.c | 17 ++++++++--------- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index cc23b009a..d2f28b482 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -2199,7 +2199,7 @@ void send_stop(federate_info_t* fed) { tracepoint_rti_to_federate(send_STOP, fed->enclave.id, NULL); } write_to_net_fail_on_error(fed->net, MSG_TYPE_STOP_LENGTH, outgoing_buffer, NULL, - "RTI failed to send MSG_TYPE_STOP message to federate %d.", fed->enclave.id); + "RTI failed to send MSG_TYPE_STOP message to federate %d.", fed->enclave.id); LF_PRINT_LOG("RTI sent MSG_TYPE_STOP to federate %d.", fed->enclave.id); } diff --git a/core/federated/federate.c b/core/federated/federate.c index 5963ac183..5dd0fddf0 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -967,7 +967,7 @@ static void handle_upstream_connected_message(void) { size_t bytes_to_read = sizeof(uint16_t); unsigned char buffer[bytes_to_read]; read_from_net_fail_on_error(_fed.net_to_RTI, bytes_to_read, buffer, - "Failed to read upstream connected message from RTI."); + "Failed to read upstream connected message from RTI."); uint16_t connected = extract_uint16(buffer); tracepoint_federate_from_rti(receive_UPSTREAM_CONNECTED, _lf_my_fed_id, NULL); LF_PRINT_DEBUG("Received notification that upstream federate %d has connected", connected); @@ -987,7 +987,7 @@ static void handle_upstream_disconnected_message(void) { size_t bytes_to_read = sizeof(uint16_t); unsigned char buffer[bytes_to_read]; read_from_net_fail_on_error(_fed.net_to_RTI, bytes_to_read, buffer, - "Failed to read upstream disconnected message from RTI."); + "Failed to read upstream disconnected message from RTI."); uint16_t disconnected = extract_uint16(buffer); tracepoint_federate_from_rti(receive_UPSTREAM_DISCONNECTED, _lf_my_fed_id, NULL); LF_PRINT_DEBUG("Received notification that upstream federate %d has disconnected", disconnected); @@ -1011,7 +1011,7 @@ static void handle_outbound_connected_message(void) { size_t bytes_to_read = sizeof(uint16_t); unsigned char buffer[bytes_to_read]; read_from_net_fail_on_error(_fed.net_to_RTI, bytes_to_read, buffer, NULL, - "Failed to read outbound connected message from RTI."); + "Failed to read outbound connected message from RTI."); uint16_t remote_federate_id = extract_uint16(buffer); tracepoint_federate_from_rti(receive_OUTBOUND_CONNECTED, _lf_my_fed_id, NULL); LF_PRINT_DEBUG("Received notification that outbound transient federate %d has connected.", remote_federate_id); @@ -1029,7 +1029,7 @@ static void handle_outbound_disconnected_message(void) { size_t bytes_to_read = sizeof(uint16_t); unsigned char buffer[bytes_to_read]; read_from_net_fail_on_error(_fed.net_to_RTI, bytes_to_read, buffer, NULL, - "Failed to read outbound disconnected message from RTI."); + "Failed to read outbound disconnected message from RTI."); uint16_t remote_federate_id = extract_uint16(buffer); tracepoint_federate_from_rti(receive_OUTBOUND_DISCONNECTED, _lf_my_fed_id, NULL); LF_PRINT_DEBUG("Received notification that downstream transient federate %d has disconnected.", remote_federate_id); @@ -1068,8 +1068,7 @@ static instant_t get_start_time_from_rti(instant_t my_physical_time) { size_t num_pending_downstream = 0; while (true) { - read_from_net_fail_on_error(_fed.net_to_RTI, 1, buffer, - "Failed to read MSG_TYPE_TIMESTAMP message from RTI."); + read_from_net_fail_on_error(_fed.net_to_RTI, 1, buffer, "Failed to read MSG_TYPE_TIMESTAMP message from RTI."); // First byte received is the message ID. if (buffer[0] != MSG_TYPE_TIMESTAMP) { if (buffer[0] == MSG_TYPE_FAILED) { @@ -1096,7 +1095,7 @@ static instant_t get_start_time_from_rti(instant_t my_physical_time) { // would consume those bytes and crash with "Unexpected reply of type 2". unsigned char id_buf[sizeof(uint16_t)]; read_from_net_fail_on_error(_fed.net_to_RTI, sizeof(uint16_t), id_buf, NULL, - "Failed to read outbound connected federate ID."); + "Failed to read outbound connected federate ID."); tracepoint_federate_from_rti(receive_OUTBOUND_CONNECTED, _lf_my_fed_id, NULL); uint16_t remote_federate_id = extract_uint16(id_buf); LF_PRINT_DEBUG("Deferring P2P connection to downstream transient federate %d until after " @@ -1112,7 +1111,7 @@ static instant_t get_start_time_from_rti(instant_t my_physical_time) { } } else { read_from_net_fail_on_error(_fed.net_to_RTI, buffer_length - 1, buffer + 1, - "Failed to read MSG_TYPE_TIMESTAMP message from RTI."); + "Failed to read MSG_TYPE_TIMESTAMP message from RTI."); break; } } @@ -2053,7 +2052,7 @@ void lf_connect_to_federate(uint16_t remote_federate_id, bool is_transient) { // (e.g. macOS resets the TCP connection if the accept loop hasn't run yet) is // a soft error: the RTI will resend MSG_TYPE_OUTBOUND_CONNECTED when the transient // is ready. Using the non-fatal read here prevents a spurious fatal exit on macOS. - int ack_read_failed = read_from_net_fail_on_error(net, 1, (unsigned char*)buffer); + int ack_read_failed = read_from_net(net, 1, (unsigned char*)buffer); if (ack_read_failed) { if (is_transient) { lf_print_warning("Failed to read MSG_TYPE_ACK from transient federate %d. Connection may have been reset. "