diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index 1277e4051..d2f28b482 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -25,9 +25,9 @@ */ #include "rti_remote.h" +#include "clock.h" // For lf_clock_cond_timedwait() #include "net_util.h" #include -#include "clock.h" // For lf_clock_cond_timedwait() // Global variables defined in tag.c: extern instant_t start_time; @@ -227,8 +227,8 @@ static int get_num_absent_upstream_transients(federate_info_t* fed) { } /** - * @brief Send MSG_TYPE_UPSTREAM_CONNECTED to the specified `destination` if it is connected to the RTI, - * telling it that the specified `upstream` federate is also now connected. + * @brief Send MSG_TYPE_UPSTREAM_CONNECTED to the specified `destination` if it is connected to the + * RTI, telling it that the specified `upstream` federate is also now connected. * * This function assumes that the mutex lock is already held. * @param destination The destination federate. @@ -246,6 +246,9 @@ static void send_upstream_connected_locked(federate_info_t* destination, federat if (write_to_net_close_on_error(destination->net, MSG_TYPE_UPSTREAM_CONNECTED_LENGTH, buffer)) { lf_print_warning("RTI: Failed to send upstream connected message to federate %d.", destination->enclave.id); } + if (rti_remote->base.tracing_enabled) { + tracepoint_rti_to_federate(send_UPSTREAM_CONNECTED, destination->enclave.id, NULL); + } } /** @@ -262,10 +265,81 @@ static void send_upstream_disconnected_locked(federate_info_t* destination, fede if (write_to_net_close_on_error(destination->net, MSG_TYPE_UPSTREAM_DISCONNECTED_LENGTH, buffer)) { lf_print_warning("RTI: Failed to send upstream disconnected message to federate %d.", disconnected->enclave.id); } + if (rti_remote->base.tracing_enabled) { + tracepoint_rti_to_federate(send_UPSTREAM_DISCONNECTED, destination->enclave.id, NULL); + } +} + +/** + * @brief Send MSG_TYPE_OUTBOUND_CONNECTED to the specified inbound federate. + * + * This notifies inbound federates that are transient outbound federates of that it has + * (re-)connected, so the inbound should query the RTI for its address and establish + * (or re-establish) the outbound P2P connection. + * + * This function assumes that the mutex lock is already held. + * @param my_fed The transient federate that has just connected. + */ +static void send_outbound_connected_locked(federate_info_t* my_fed) { + unsigned char buffer[MSG_TYPE_OUTBOUND_CONNECTED_LENGTH]; + buffer[0] = MSG_TYPE_OUTBOUND_CONNECTED; + encode_uint16(my_fed->enclave.id, &buffer[1]); + // Iterate over all federates and notify those that have my_fed as an outbound transient. + for (int i = 0; i < rti_remote->base.number_of_scheduling_nodes; i++) { + federate_info_t* fed = GET_FED_INFO(i); + if (fed->enclave.state == NOT_CONNECTED) { + continue; + } + for (int32_t j = 0; j < fed->number_of_outbound_transients; j++) { + if (fed->outbound_transients[j] == (int32_t)my_fed->enclave.id) { + if (write_to_net_close_on_error(fed->net, MSG_TYPE_OUTBOUND_CONNECTED_LENGTH, buffer)) { + lf_print_warning("RTI: Failed to send outbound connected message to federate %d.", fed->enclave.id); + } + if (rti_remote->base.tracing_enabled) { + tracepoint_rti_to_federate(send_OUTBOUND_CONNECTED, fed->enclave.id, NULL); + } + break; + } + } + } +} + +/** + * @brief Send MSG_TYPE_OUTBOUND_DISCONNECTED to the upstream federates of a transient federate. + * + * This notifies upstream federates that a transient federate downstream of them has + * disconnected, so they can close the outbound P2P connection to it. + * + * This function assumes that the mutex lock is already held. + * @param my_fed The transient federate that has just disconnected. + */ +static void send_outbound_disconnected_locked(federate_info_t* my_fed) { + unsigned char buffer[MSG_TYPE_OUTBOUND_DISCONNECTED_LENGTH]; + buffer[0] = MSG_TYPE_OUTBOUND_DISCONNECTED; + encode_uint16(my_fed->enclave.id, &buffer[1]); + // Iterate over all federates and notify those that have my_fed as an outbound transient. + for (int i = 0; i < rti_remote->base.number_of_scheduling_nodes; i++) { + federate_info_t* fed = GET_FED_INFO(i); + if (fed->enclave.state == NOT_CONNECTED) { + continue; + } + for (int32_t j = 0; j < fed->number_of_outbound_transients; j++) { + if (fed->outbound_transients[j] == (int32_t)my_fed->enclave.id) { + if (write_to_net_close_on_error(fed->net, MSG_TYPE_OUTBOUND_DISCONNECTED_LENGTH, buffer)) { + lf_print_warning("RTI: Failed to send outbound disconnected message to federate %d.", fed->enclave.id); + } + if (rti_remote->base.tracing_enabled) { + tracepoint_rti_to_federate(send_OUTBOUND_DISCONNECTED, fed->enclave.id, NULL); + } + break; + } + } + } } /** - * @brief Mark a federate as disconnected and, if this is a transient, inform downstream federates. + * @brief Mark a federate as disconnected and, if this is a transient, inform downstream and + * inbound federates. * @param fed The disconnected federate. */ static void notify_federate_disconnected(federate_info_t* fed) { @@ -281,6 +355,8 @@ static void notify_federate_disconnected(federate_info_t* fed) { send_upstream_disconnected_locked(downstream, fed); } } + // Notify upstream federates that have fed in their list of outbound transients. + send_outbound_disconnected_locked(fed); LF_MUTEX_UNLOCK(&rti_mutex); } } @@ -575,9 +651,9 @@ void handle_timed_message(federate_info_t* sending_federate, unsigned char* buff // issue a TAG before this message has been forwarded. LF_MUTEX_LOCK(&rti_mutex); - // If the destination federate is no longer connected, or it is a transient that has not started executing yet - // (the delayed intended tag is less than the effective start tag of the destination), issue a warning, remove the - // message from the network abstraction, and return. + // If the destination federate is no longer connected, or it is a transient that has not started + // executing yet (the delayed intended tag is less than the effective start tag of the + // destination), issue a warning, remove the message from the network abstraction, and return. federate_info_t* fed = GET_FED_INFO(federate_id); interval_t delay = NEVER; for (int i = 0; i < fed->enclave.num_immediate_upstreams; i++) { @@ -692,7 +768,8 @@ void handle_next_event_tag(federate_info_t* fed) { // Acquire a mutex lock to ensure that this state does not change while a // message is in transport or being used to determine a TAG. LF_MUTEX_LOCK(&rti_mutex); // FIXME: Instead of using a mutex, it might be more efficient to use a - // select() mechanism to read and process federates' buffers in an orderly fashion. + // select() mechanism to read and process federates' buffers in an + // orderly fashion. tag_t intended_tag = extract_tag(buffer); if (rti_remote->base.tracing_enabled) { @@ -908,8 +985,10 @@ void handle_address_query(uint16_t fed_id) { // Use buffer both for reading and constructing the reply. // The length is what is needed for the reply. unsigned char buffer[1 + sizeof(int32_t)]; - read_from_net_fail_on_error(fed->net, sizeof(uint16_t), (unsigned char*)buffer, "Failed to read address query."); + // Read remote_fed_id (2 bytes) + is_transient flag (1 byte). + read_from_net_fail_on_error(fed->net, sizeof(uint16_t) + 1, (unsigned char*)buffer, "Failed to read address query."); uint16_t remote_fed_id = extract_uint16(buffer); + bool remote_is_transient = (buffer[sizeof(uint16_t)] == 1); if (rti_remote->base.tracing_enabled) { tracepoint_rti_from_federate(receive_ADR_QR, fed_id, NULL); @@ -917,6 +996,28 @@ void handle_address_query(uint16_t fed_id) { LF_PRINT_DEBUG("RTI received address query from %d for %d.", fed_id, remote_fed_id); + // If the queried federate is transient, record it in the querying federate's + // outbound_transients array (if not already present). + if (remote_is_transient) { + LF_MUTEX_LOCK(&rti_mutex); + bool already_registered = false; + int32_t i = 0; + for (; i < fed->number_of_outbound_transients; i++) { + if (fed->outbound_transients[i] == (int32_t)remote_fed_id) { + already_registered = true; + break; + } else if (fed->outbound_transients[i] == -1) { + // This means we have found an empty slot, so we can stop looking. + break; + } + } + if (!already_registered) { + fed->outbound_transients[i] = (int32_t)remote_fed_id; + fed->number_of_outbound_transients++; + } + LF_MUTEX_UNLOCK(&rti_mutex); + } + // NOTE: server_port initializes to -1, which means the RTI does not know // the port number because it has not yet received an MSG_TYPE_ADDRESS_ADVERTISEMENT message // from this federate. In that case, it will respond by sending -1. @@ -990,17 +1091,18 @@ void handle_address_ad(uint16_t federate_id) { } /** - * @brief Send the global federation start time and the federate-specific starting tag to the specified federate. + * @brief Send the global federation start time and the federate-specific starting tag to the + * specified federate. * - * For persistent federates and transient federates that happen to join during federation startup, the - * `federation_start_time` will match the time in the `federate_start_tag`, and the microstep will be 0. - * For a transient federate that joins later, the time in the `federate_start_tag` will be greater than the - * federation_start_time`. + * For persistent federates and transient federates that happen to join during federation startup, + * the `federation_start_time` will match the time in the `federate_start_tag`, and the microstep + * will be 0. For a transient federate that joins later, the time in the `federate_start_tag` will + * be greater than the federation_start_time`. * * - * Before sending the start time and tag, this function notifies my_fed of all upstream transient federates that are - * connected. After sending the start time and tag, and if my_fed is transient, notify federates downstream of its - * connection, ensuring proper handling of zero-delay cycles. + * Before sending the start time and tag, this function notifies my_fed of all upstream transient + * federates that are connected. After sending the start time and tag, and if my_fed is transient, + * notify federates downstream of its connection, ensuring proper handling of zero-delay cycles. * * This function assumes that the mutex lock is already held. * @@ -1010,14 +1112,15 @@ void handle_address_ad(uint16_t federate_id) { */ static void send_start_tag_locked(federate_info_t* my_fed, instant_t federation_start_time, tag_t federate_start_tag) { // Notify my_fed of any upstream transient federates that are connected. - // This has to occur before sending the start tag so that my_fed does not begin executing thinking that these - // upstream federates are not connected. + // This has to occur before sending the start tag so that my_fed does not begin executing thinking + // that these upstream federates are not connected. for (int i = 0; i < my_fed->enclave.num_immediate_upstreams; i++) { federate_info_t* fed = GET_FED_INFO(my_fed->enclave.immediate_upstreams[i]); if (fed->is_transient && fed->enclave.state == GRANTED) { send_upstream_connected_locked(my_fed, fed); } } + send_outbound_connected_locked(my_fed); // Send back to the federate the maximum time plus an offset on a TIMESTAMP_START // message. @@ -1045,6 +1148,7 @@ static void send_start_tag_locked(federate_info_t* my_fed, instant_t federation_ // If this is a transient federate, notify its downstream federates that it is now connected. if (my_fed->is_transient) { + // Notify downstreams: their upstream (my_fed) has connected. for (int i = 0; i < my_fed->enclave.num_immediate_downstreams; i++) { send_upstream_connected_locked(GET_FED_INFO(my_fed->enclave.immediate_downstreams[i]), my_fed); } @@ -1178,7 +1282,8 @@ void handle_timestamp(federate_info_t* my_fed) { // effective_start_time of the federate, cancel it. // FIXME: Should this be higher-than or equal to? // FIXME: Also, won't the grant simply be lost? - // If the joining federate doesn't send anything, the downstream federate won't issue another NET. + // If the joining federate doesn't send anything, the downstream federate won't issue another + // NET. for (int j = 0; j < my_fed->enclave.num_immediate_downstreams; j++) { federate_info_t* downstream = GET_FED_INFO(my_fed->enclave.immediate_downstreams[j]); @@ -1383,7 +1488,8 @@ static void handle_federate_failed(federate_info_t* my_fed) { // Check downstream federates to see whether they should now be granted a TAG. // To handle cycles, need to create a boolean array to keep // track of which upstream federates have been visited. - bool* visited = (bool*)calloc(rti_remote->base.number_of_scheduling_nodes, sizeof(bool)); // Initializes to 0. + bool* visited = (bool*)calloc(rti_remote->base.number_of_scheduling_nodes, + sizeof(bool)); // Initializes to 0. notify_downstream_advance_grant_if_safe(&(my_fed->enclave), visited); free(visited); @@ -1425,7 +1531,8 @@ static void handle_federate_resign(federate_info_t* my_fed) { // Check downstream federates to see whether they should now be granted a TAG. // To handle cycles, need to create a boolean array to keep // track of which upstream federates have been visited. - bool* visited = (bool*)calloc(rti_remote->base.number_of_scheduling_nodes, sizeof(bool)); // Initializes to 0. + bool* visited = (bool*)calloc(rti_remote->base.number_of_scheduling_nodes, + sizeof(bool)); // Initializes to 0. notify_downstream_advance_grant_if_safe(&(my_fed->enclave), visited); free(visited); @@ -2098,10 +2205,11 @@ void send_stop(federate_info_t* fed) { } void* lf_connect_to_transient_federates_thread(void* nothing) { - // This loop will continue to accept connections of transient federates, as soon as there is room, or enable hot swap + // This loop will continue to accept connections of transient federates, as soon as there is room, + // or enable hot swap while (!rti_remote->all_persistent_federates_exited) { - // Continue waiting for an incoming connection requests from transients to join, or for hot swap. - // Wait for an incoming connection request. + // Continue waiting for an incoming connection requests from transients to join, or for hot + // swap. Wait for an incoming connection request. net_abstraction_t fed_net = accept_net(rti_remote->rti_net); // If accept failed (e.g., net was shut down), exit the loop. @@ -2283,6 +2391,12 @@ void initialize_federate(federate_info_t* fed, uint16_t id) { fed->has_upstream_transient_federates = false; fed->is_transient = true; fed->effective_start_tag = NEVER_TAG; + fed->number_of_outbound_transients = 0; + int32_t num_transients = rti_remote->number_of_transient_federates; + fed->outbound_transients = (int32_t*)malloc(num_transients * sizeof(int32_t)); + for (int32_t i = 0; i < num_transients; i++) { + fed->outbound_transients[i] = -1; + } } void reset_transient_federate(federate_info_t* fed) { @@ -2300,8 +2414,13 @@ void reset_transient_federate(federate_info_t* fed) { fed->in_transit_message_tags = pqueue_tag_init(10); fed->requested_stop = false; fed->effective_start_tag = NEVER_TAG; - // Whenver a transient resigns or leaves, invalidate all federates, so that all min_delays_upstream - // get re-computed. + fed->number_of_outbound_transients = 0; + int32_t num_transients = rti_remote->number_of_transient_federates; + for (int32_t i = 0; i < num_transients; i++) { + fed->outbound_transients[i] = -1; + } + // Whenver a transient resigns or leaves, invalidate all federates, so that all + // min_delays_upstream get re-computed. // FIXME: Maybe optimize it to only invalidate those affected by the transient invalidate_min_delays(); } diff --git a/core/federated/RTI/rti_remote.h b/core/federated/RTI/rti_remote.h index dd16f389d..4925d9667 100644 --- a/core/federated/RTI/rti_remote.h +++ b/core/federated/RTI/rti_remote.h @@ -68,6 +68,12 @@ typedef struct federate_info_t { bool is_transient; /** @brief Records the start time of the federate, which is mainly useful for transient federates. */ tag_t effective_start_tag; + /** @brief Number of outbound connections to transient federates. */ + int32_t number_of_outbound_transients; + /** @brief IDs of transient federates this federate has outbound connections to. + * The array has size equal to the total number of transient federates in the federation, + * and entries are initialized (and reset) to -1. */ + int32_t* outbound_transients; } federate_info_t; /** diff --git a/core/federated/federate.c b/core/federated/federate.c index 7c880777b..5dd0fddf0 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -20,6 +20,7 @@ #include // Defined perror(), errno #include // Defines bzero(). +#include "api/schedule.h" #include "clock-sync.h" #include "federate.h" #include "net_common.h" @@ -28,13 +29,12 @@ #include "reactor.h" #include "reactor_common.h" #include "reactor_threaded.h" -#include "api/schedule.h" #include "scheduler.h" #include "tracepoint.h" #ifdef FEDERATED_AUTHENTICATED -#include // For secure random number generation. #include // For HMAC-based authentication of federates. +#include // For secure random number generation. #endif // Global variables defined in tag.c: @@ -247,7 +247,8 @@ static void update_last_known_status_on_input_ports(tag_t tag, environment_t* en * @param env The top-level environment, whose mutex is assumed to be held. * @param tag The tag on which the latest status of the specified network input port is known. * @param port_id The port ID. - * @param warn If true, print a warning if the tag is less than the last known status tag of the port. + * @param warn If true, print a warning if the tag is less than the last known status tag of the + * port. */ static void update_last_known_status_on_input_port(environment_t* env, tag_t tag, int port_id, bool warn) { if (lf_tag_compare(tag, env->current_tag) < 0) @@ -283,7 +284,8 @@ static void update_last_known_status_on_input_port(environment_t* env, tag_t tag } /** - * @brief Mark all the input ports connected to the given federate as known to be absent until FOREVER. + * @brief Mark all the input ports connected to the given federate as known to be absent until + * FOREVER. * * This does nothing if the federate is not using decentralized coordination. * This function acquires the mutex on the top-level environment. @@ -291,16 +293,31 @@ static void update_last_known_status_on_input_port(environment_t* env, tag_t tag */ static void mark_inputs_known_absent(int fed_id) { #ifdef FEDERATED_DECENTRALIZED - // Note that when transient federates are supported, this will need to be updated because the - // federate could rejoin. environment_t* env; _lf_get_environments(&env); LF_MUTEX_LOCK(&env->mutex); + // For a persistent federate, use FOREVER_TAG: it will never send again, so + // all its input ports can be permanently marked absent. + // For a transient federate, use env->current_tag instead: the federate may + // rejoin later, and stamping FOREVER_TAG would permanently block its ports + // from being updated after reconnection, causing spurious + // "Attempt to update to earlier tag" warnings and downstream STP violations. + // env->current_tag is the right choice because update_last_known_status_on_input_port + // clamps upward (if tag < current_tag, it uses current_tag anyway), so passing + // current_tag is equivalent to "absent at the current logical time" — sufficient + // to unblock the scheduler, but small enough that any future message from the + // rejoining federate at a tag >= current_tag will update the port normally. + bool is_transient = _fed.inbound_p2p_connection_is_transient[fed_id]; + tag_t absent_until = is_transient ? env->current_tag : FOREVER_TAG; + for (size_t i = 0; i < _lf_action_table_size; i++) { lf_action_base_t* action = _lf_action_table[i]; if (action->source_id == fed_id) { - update_last_known_status_on_input_port(env, FOREVER_TAG, i, true); + // For transients, pass warn=false: the port's last_known_status_tag may already be + // ahead of current_tag (advanced by a prior message or the STAA thread), so the + // update will be a no-op. That is correct and expected — no warning needed. + update_last_known_status_on_input_port(env, absent_until, i, !is_transient); } } LF_MUTEX_UNLOCK(&env->mutex); @@ -330,8 +347,8 @@ static void update_last_known_status_on_action(environment_t* env, lf_action_bas tag = env->current_tag; trigger_t* input_port_trigger = action->trigger; if (lf_tag_compare(tag, input_port_trigger->last_known_status_tag) > 0) { - LF_PRINT_LOG("Updating the last known status tag of port for upstream absent transient federate from " PRINTF_TAG - " to " PRINTF_TAG ".", + LF_PRINT_LOG("Updating the last known status tag of port for upstream absent transient " + "federate from " PRINTF_TAG " to " PRINTF_TAG ".", input_port_trigger->last_known_status_tag.time - lf_time_start(), input_port_trigger->last_known_status_tag.microstep, tag.time - lf_time_start(), tag.microstep); input_port_trigger->last_known_status_tag = tag; @@ -427,7 +444,8 @@ static trigger_handle_t schedule_message_received_from_network_locked(environmen * cycles, where processing at a tag must be able to begin before all messages have arrived * at that tag. This returns true if the following conditions are all true: * - * 1. the first reaction triggered has a level >= MLAA (a port is or will be blocked on this trigger); + * 1. the first reaction triggered has a level >= MLAA (a port is or will be blocked on this + * trigger); * 2. the intended_tag is equal to the current tag of the environment; * 3. the intended_tag is greater than the last_tag of the trigger; * 4. the intended_tag is greater than the last_known_status_tag of the trigger; @@ -611,7 +629,8 @@ static int handle_tagged_message(net_abstraction_t net, int fed_id) { element_count = length / element_size; if (length % element_size != 0) { // Log a warning if the payload size is not an exact multiple of element_size. - lf_print_warning("Received message for port %d with payload length %zu bytes not a multiple of element_size %zu; " + lf_print_warning("Received message for port %d with payload length %zu bytes not a multiple " + "of element_size %zu; " "truncating to %zu elements.", port_id, (size_t)length, element_size, element_count); } @@ -647,8 +666,8 @@ static int handle_tagged_message(net_abstraction_t net, int fed_id) { tag_t actual_tag = intended_tag; #ifdef FEDERATED_DECENTRALIZED - // For tardy messages in decentralized coordination, we need to figure out what the actual tag will be. - // (Centralized coordination errors out with tardy messages). + // For tardy messages in decentralized coordination, we need to figure out what the actual tag + // will be. (Centralized coordination errors out with tardy messages). if (lf_tag_compare(intended_tag, env->current_tag) <= 0) { // Message is tardy. actual_tag = env->current_tag; @@ -950,6 +969,7 @@ static void handle_upstream_connected_message(void) { read_from_net_fail_on_error(_fed.net_to_RTI, bytes_to_read, buffer, "Failed to read upstream connected message from RTI."); uint16_t connected = extract_uint16(buffer); + tracepoint_federate_from_rti(receive_UPSTREAM_CONNECTED, _lf_my_fed_id, NULL); LF_PRINT_DEBUG("Received notification that upstream federate %d has connected", connected); // Mark the upstream as connected. for (size_t i = 0; i < _lf_zero_delay_cycle_action_table_size; i++) { @@ -969,6 +989,7 @@ static void handle_upstream_disconnected_message(void) { read_from_net_fail_on_error(_fed.net_to_RTI, bytes_to_read, buffer, "Failed to read upstream disconnected message from RTI."); uint16_t disconnected = extract_uint16(buffer); + tracepoint_federate_from_rti(receive_UPSTREAM_DISCONNECTED, _lf_my_fed_id, NULL); LF_PRINT_DEBUG("Received notification that upstream federate %d has disconnected", disconnected); // Mark the upstream as disconnected. for (size_t i = 0; i < _lf_zero_delay_cycle_action_table_size; i++) { @@ -978,6 +999,44 @@ static void handle_upstream_disconnected_message(void) { } } +/** + * @brief Handle message from the RTI that a transient outbound federate has connected. + * + * Reads the outbound federate's ID, then synchronously queries the RTI for its address + * and establishes (or re-establishes) the outbound P2P connection to it. + * This function is called inline from listen_to_rti_TCP or get_start_time_from_rti, + * so it reads the address-query reply directly from net_to_RTI. + */ +static void handle_outbound_connected_message(void) { + size_t bytes_to_read = sizeof(uint16_t); + unsigned char buffer[bytes_to_read]; + read_from_net_fail_on_error(_fed.net_to_RTI, bytes_to_read, buffer, NULL, + "Failed to read outbound connected message from RTI."); + uint16_t remote_federate_id = extract_uint16(buffer); + tracepoint_federate_from_rti(receive_OUTBOUND_CONNECTED, _lf_my_fed_id, NULL); + LF_PRINT_DEBUG("Received notification that outbound transient federate %d has connected.", remote_federate_id); + + // + lf_connect_to_federate(remote_federate_id, true); +} + +/** + * @brief Handle message from the RTI that a transient downstream federate has disconnected. + * + * Reads the downstream federate's ID and closes the outbound P2P net to it. + */ +static void handle_outbound_disconnected_message(void) { + size_t bytes_to_read = sizeof(uint16_t); + unsigned char buffer[bytes_to_read]; + read_from_net_fail_on_error(_fed.net_to_RTI, bytes_to_read, buffer, NULL, + "Failed to read outbound disconnected message from RTI."); + uint16_t remote_federate_id = extract_uint16(buffer); + tracepoint_federate_from_rti(receive_OUTBOUND_DISCONNECTED, _lf_my_fed_id, NULL); + LF_PRINT_DEBUG("Received notification that downstream transient federate %d has disconnected.", remote_federate_id); + + shutdown_net(_fed.net_for_outbound_p2p_connections[remote_federate_id], false); +} + /** * Send the specified timestamp to the RTI and wait for a response. * The specified timestamp should be current physical time of the @@ -996,6 +1055,18 @@ static instant_t get_start_time_from_rti(instant_t my_physical_time) { size_t buffer_length = (_fed.is_transient) ? MSG_TYPE_TIMESTAMP_TAG_LENGTH : MSG_TYPE_TIMESTAMP_LENGTH; unsigned char buffer[buffer_length]; + // Deferred OUTBOUND_CONNECTED notifications: calling lf_connect_to_federate() inline + // here is unsafe because the RTI may have already written MSG_TYPE_TIMESTAMP into this + // federate's TCP stream immediately after MSG_TYPE_OUTBOUND_CONNECTED (from a concurrent + // send_start_tag_locked call for the transient federate). If we call lf_connect_to_federate() + // now it will read from the net_abs expecting MSG_TYPE_ADDRESS_QUERY_REPLY but will instead + // consume the queued MSG_TYPE_TIMESTAMP bytes, causing a fatal "Unexpected reply of type 2". + // Fix: read and save each downstream federate ID, then call lf_connect_to_federate() for + // each one only after MSG_TYPE_TIMESTAMP has been received and the loop has exited. + uint16_t + pending_downstream_ids[_fed.number_of_outbound_p2p_transients > 0 ? _fed.number_of_outbound_p2p_transients : 1]; + size_t num_pending_downstream = 0; + while (true) { read_from_net_fail_on_error(_fed.net_to_RTI, 1, buffer, "Failed to read MSG_TYPE_TIMESTAMP message from RTI."); // First byte received is the message ID. @@ -1010,6 +1081,30 @@ static instant_t get_start_time_from_rti(instant_t my_physical_time) { // We need to handle this message and continue waiting for MSG_TYPE_TIMESTAMP to arrive handle_upstream_disconnected_message(); continue; + } else if (buffer[0] == MSG_TYPE_OUTBOUND_DISCONNECTED) { + // A transient outbound federate disconnected before we even got our start time. + // Drain the federate ID payload and continue waiting for MSG_TYPE_TIMESTAMP. + handle_outbound_disconnected_message(); + continue; + } else if (buffer[0] == MSG_TYPE_OUTBOUND_CONNECTED) { + // Defer lf_connect_to_federate() until after MSG_TYPE_TIMESTAMP is received. + // Read the federate ID payload now to drain the net_abs, but do not attempt the + // address query yet: the RTI may have written MSG_TYPE_TIMESTAMP into this net_abs + // right after MSG_TYPE_OUTBOUND_CONNECTED (from send_start_tag_locked running + // concurrently for the joining transient), so any read inside lf_connect_to_federate + // would consume those bytes and crash with "Unexpected reply of type 2". + unsigned char id_buf[sizeof(uint16_t)]; + read_from_net_fail_on_error(_fed.net_to_RTI, sizeof(uint16_t), id_buf, NULL, + "Failed to read outbound connected federate ID."); + tracepoint_federate_from_rti(receive_OUTBOUND_CONNECTED, _lf_my_fed_id, NULL); + uint16_t remote_federate_id = extract_uint16(id_buf); + LF_PRINT_DEBUG("Deferring P2P connection to downstream transient federate %d until after " + "start time is received.", + remote_federate_id); + if (num_pending_downstream < _fed.number_of_outbound_p2p_transients) { + pending_downstream_ids[num_pending_downstream++] = remote_federate_id; + } + continue; } else { lf_print_error_and_exit("Expected a MSG_TYPE_TIMESTAMP message from the RTI. Got %u (see net_common.h).", buffer[0]); @@ -1039,6 +1134,16 @@ static instant_t get_start_time_from_rti(instant_t my_physical_time) { tracepoint_federate_from_rti(receive_TIMESTAMP, _lf_my_fed_id, &effective_start_tag); LF_PRINT_LOG("Current physical time is: " PRINTF_TIME ".", lf_time_physical()); + // Now that MSG_TYPE_TIMESTAMP has been received and the start time is known, it is safe + // to establish outbound P2P connections to any transient downstream federates that sent + // OUTBOUND_CONNECTED notifications while we were waiting. The ADDRESS_QUERY round-trip + // can proceed without risk of consuming queued TIMESTAMP bytes. + for (size_t i = 0; i < num_pending_downstream; i++) { + LF_PRINT_DEBUG("Establishing deferred P2P connection to downstream transient federate %d.", + pending_downstream_ids[i]); + lf_connect_to_federate(pending_downstream_ids[i], true); + } + return timestamp; } @@ -1099,7 +1204,8 @@ static void handle_tag_advance_grant(void) { #ifdef FEDERATED_DECENTRALIZED /** - * @brief Return true if there is an input port among those with a given STAA whose status is unknown. + * @brief Return true if there is an input port among those with a given STAA whose status is + * unknown. * * @param staa_elem A record of all input port actions. */ @@ -1170,11 +1276,11 @@ static void* update_ports_from_staa_offsets(void* args) { tag_t tag_when_started_waiting = lf_tag(env); for (size_t i = 0; i < staa_lst_size; ++i) { staa_t* staa_elem = staa_lst[i]; - // The staa_elem is adjusted in the code generator to have subtracted the delay on the connection. - // The list is sorted in increasing order of adjusted STAA offsets. - // We need to add the lf_fed_STA_offset to the wait time and guard against overflow. - // Skip this if the current tag is the dynamically determined stop time - // (due to a call to lf_request_stop()). This is indicated by a stop_tag with microstep greater than 0. + // The staa_elem is adjusted in the code generator to have subtracted the delay on the + // connection. The list is sorted in increasing order of adjusted STAA offsets. We need to add + // the lf_fed_STA_offset to the wait time and guard against overflow. Skip this if the current + // tag is the dynamically determined stop time (due to a call to lf_request_stop()). This is + // indicated by a stop_tag with microstep greater than 0. interval_t wait_time = 0; if (lf_tag_compare(env->current_tag, env->stop_tag) != 0 || env->stop_tag.microstep == 0) { wait_time = lf_time_add(staa_elem->STAA, lf_fed_STA_offset); @@ -1206,10 +1312,10 @@ static void* update_ports_from_staa_offsets(void* args) { } /* Possibly useful for debugging: tag_t current_tag = lf_tag(env); - LF_PRINT_DEBUG("**** (update thread) Assuming absent! " PRINTF_TAG, current_tag.time - lf_time_start(), - current_tag.microstep); LF_PRINT_DEBUG("**** (update thread) Lag is " PRINTF_TIME, current_tag.time - - lf_time_physical()); LF_PRINT_DEBUG("**** (update thread) Wait until time is " PRINTF_TIME, - wait_until_time - lf_time_start()); + LF_PRINT_DEBUG("**** (update thread) Assuming absent! " PRINTF_TAG, current_tag.time - + lf_time_start(), current_tag.microstep); LF_PRINT_DEBUG("**** (update thread) Lag is " + PRINTF_TIME, current_tag.time - lf_time_physical()); LF_PRINT_DEBUG("**** (update thread) + Wait until time is " PRINTF_TIME, wait_until_time - lf_time_start()); */ // Mark input ports absent. @@ -1423,7 +1529,8 @@ static void handle_stop_granted_message() { // Sanity check. if (lf_tag_compare(received_stop_tag, env[i].current_tag) <= 0) { - lf_print_error("RTI granted a MSG_TYPE_STOP_GRANTED tag that is equal to or less than this federate's current " + lf_print_error("RTI granted a MSG_TYPE_STOP_GRANTED tag that is equal to or less than this " + "federate's current " "tag " PRINTF_TAG ". " "Stopping at the next microstep instead.", env[i].current_tag.time - start_time, env[i].current_tag.microstep); @@ -1680,6 +1787,12 @@ static void* listen_to_rti_net(void* args) { case MSG_TYPE_UPSTREAM_DISCONNECTED: handle_upstream_disconnected_message(); break; + case MSG_TYPE_OUTBOUND_CONNECTED: + handle_outbound_connected_message(); + break; + case MSG_TYPE_OUTBOUND_DISCONNECTED: + handle_outbound_disconnected_message(); + break; case MSG_TYPE_CLOCK_SYNC_T1: case MSG_TYPE_CLOCK_SYNC_T4: lf_print_error("Federate %d received unexpected clock sync message from RTI.", _lf_my_fed_id); @@ -1816,7 +1929,7 @@ void lf_terminate_execution(environment_t* env) { ////////////////////////////////////////////////////////////////////////////////// // Public functions (declared in federate.h, in alphabetical order) -void lf_connect_to_federate(uint16_t remote_federate_id) { +void lf_connect_to_federate(uint16_t remote_federate_id, bool is_transient) { int result = -1; // Ask the RTI for port number of the remote federate. @@ -1826,25 +1939,30 @@ void lf_connect_to_federate(uint16_t remote_federate_id) { int port = -1; struct in_addr host_ip_addr; instant_t start_connect = lf_time_physical(); + // If the remote federate if oersistent, iterate until we get a valid port number from the RTI, + // If not, execute only once, as a request registration. while (port == -1 && !_lf_termination_executed) { buffer[0] = MSG_TYPE_ADDRESS_QUERY; // NOTE: Sending messages in little endian. encode_uint16(remote_federate_id, &(buffer[1])); + // Indicate whether the remote federate being queried is transient. + buffer[1 + sizeof(uint16_t)] = is_transient ? 1 : 0; LF_PRINT_DEBUG("Sending address query for federate %d.", remote_federate_id); // Trace the event when tracing is enabled tracepoint_federate_to_rti(send_ADR_QR, _lf_my_fed_id, NULL); LF_MUTEX_LOCK(&lf_outbound_net_mutex); - write_to_net_fail_on_error(_fed.net_to_RTI, sizeof(uint16_t) + 1, buffer, &lf_outbound_net_mutex, + write_to_net_fail_on_error(_fed.net_to_RTI, 1 + sizeof(uint16_t) + 1, buffer, &lf_outbound_net_mutex, "Failed to send address query for federate %d to RTI.", remote_federate_id); - LF_MUTEX_UNLOCK(&lf_outbound_net_mutex); // Read RTI's response. read_from_net_fail_on_error(_fed.net_to_RTI, sizeof(int32_t) + 1, buffer, "Failed to read the requested port number for federate %d from RTI.", remote_federate_id); + LF_MUTEX_UNLOCK(&lf_outbound_net_mutex); + if (buffer[0] != MSG_TYPE_ADDRESS_QUERY_REPLY) { // Unexpected reply. Could be that RTI has failed and sent a resignation. if (buffer[0] == MSG_TYPE_FAILED) { @@ -1863,12 +1981,16 @@ void lf_connect_to_federate(uint16_t remote_federate_id) { // the port number of the remote federate, presumably because the // remote federate has not yet sent an MSG_TYPE_ADDRESS_ADVERTISEMENT message to the RTI. // Sleep for some time before retrying. - if (port == -1) { + if (port == -1 && !is_transient) { if (CHECK_TIMEOUT(start_connect, CONNECT_TIMEOUT)) { lf_print_error_and_exit("TIMEOUT obtaining IP/port for federate %d from the RTI.", remote_federate_id); } // Wait ADDRESS_QUERY_RETRY_INTERVAL nanoseconds. lf_sleep(ADDRESS_QUERY_RETRY_INTERVAL); + } else if (port == -1 && is_transient) { + // For transient federates, we only execute once, as a request registration. If the RTI does + // not reply with a valid port number, we treat it normally and return. + return; } } assert(port < 65536); @@ -1926,9 +2048,21 @@ void lf_connect_to_federate(uint16_t remote_federate_id) { write_to_net_fail_on_error(net, federation_id_length, (unsigned char*)federation_metadata.federation_id, NULL, "Failed to send federation id to federate %d.", remote_federate_id); - read_from_net_fail_on_error(net, 1, (unsigned char*)buffer, - "Failed to read MSG_TYPE_ACK from federate %d in response to sending fed_id.", - remote_federate_id); + // For transient outbound connections, a connection reset from the remote side + // (e.g. macOS resets the TCP connection if the accept loop hasn't run yet) is + // a soft error: the RTI will resend MSG_TYPE_OUTBOUND_CONNECTED when the transient + // is ready. Using the non-fatal read here prevents a spurious fatal exit on macOS. + int ack_read_failed = read_from_net(net, 1, (unsigned char*)buffer); + if (ack_read_failed) { + if (is_transient) { + lf_print_warning("Failed to read MSG_TYPE_ACK from transient federate %d. Connection may have been reset. " + "Will retry when RTI notifies reconnection.", + remote_federate_id); + return; + } + lf_print_error_and_exit("Failed to read MSG_TYPE_ACK from federate %d in response to sending fed_id.", + remote_federate_id); + } if (buffer[0] != MSG_TYPE_ACK) { // Get the error code. read_from_net_fail_on_error(net, 1, (unsigned char*)buffer, @@ -2143,8 +2277,9 @@ void* lf_handle_p2p_connections_from_federates(void* env_arg) { size_t received_federates = 0; // Allocate memory to store thread IDs. _fed.inbound_net_listeners = (lf_thread_t*)calloc(_fed.number_of_inbound_p2p_connections, sizeof(lf_thread_t)); - while (received_federates < _fed.number_of_inbound_p2p_connections && !_lf_termination_executed) { - if (rti_failed()) { + while (!_lf_termination_executed) { + // Case where all inbound connections are to persistent federates + if (received_federates == _fed.number_of_inbound_p2p_connections && _fed.number_of_inbound_p2p_transients == 0) { break; } // Wait for an incoming connection request. @@ -2200,6 +2335,7 @@ void* lf_handle_p2p_connections_from_federates(void* env_arg) { // Extract the ID of the sending federate. uint16_t remote_fed_id = extract_uint16((unsigned char*)&(buffer[1])); bool remote_fed_is_transient = buffer[1 + sizeof(uint16_t)]; + _fed.inbound_p2p_connection_is_transient[remote_fed_id] = remote_fed_is_transient; if (remote_fed_is_transient) { LF_PRINT_DEBUG("Received sending federate ID %d, which is transient.", remote_fed_id); } else { @@ -2254,7 +2390,8 @@ void lf_latest_tag_confirmed(tag_t tag_to_send) { } _lf_get_environments(&env); if (!env->need_to_send_LTC) { - LF_PRINT_LOG("Skip sending Latest Tag Confirmed (LTC) to the RTI because there was no tagged message with the " + LF_PRINT_LOG("Skip sending Latest Tag Confirmed (LTC) to the RTI because there was no tagged " + "message with the " "tag " PRINTF_TAG " that this federate has received.", tag_to_send.time - start_time, tag_to_send.microstep); return; @@ -2323,6 +2460,14 @@ int lf_send_message(int message_type, unsigned short port, unsigned short federa lf_print_error("lf_send_message: Unsupported message type (%d).", message_type); return -1; } + + // If there are outbound transients, check whether the destination is one of them. + // If it is and its net_abs is shut, gracefully skip the send. + if (_fed.outbound_p2p_connection_is_transient[federate] && _fed.net_for_outbound_p2p_connections[federate] < 0) { + lf_print_info("The destination transient federate %d is not connected. Abort sending!", federate); + return 0; + } + header_buffer[0] = (unsigned char)message_type; // Next two bytes identify the destination port. // NOTE: Send messages little endian (network order), not big endian. @@ -2388,7 +2533,8 @@ tag_t lf_send_next_event_tag(environment_t* env, tag_t tag, bool wait_for_reply) LF_PRINT_DEBUG("Granted tag " PRINTF_TAG " because TAG or PTAG has been received.", _fed.last_TAG.time - start_time, _fed.last_TAG.microstep); - // In case a downstream federate needs the NET of this tag or has not received any DNET, send NET. + // In case a downstream federate needs the NET of this tag or has not received any DNET, send + // NET. if (!_fed.received_any_DNET || (lf_tag_compare(_fed.last_DNET, tag) < 0 && lf_tag_compare(_fed.last_DNET, _fed.last_sent_NET) >= 0)) { send_tag(MSG_TYPE_NEXT_EVENT_TAG, tag); @@ -2491,9 +2637,10 @@ tag_t lf_send_next_event_tag(environment_t* env, tag_t tag, bool wait_for_reply) return tag; } - // This federate should repeatedly advance its tag to ensure downstream federates can make progress. - // Before advancing to the next tag, we need to wait some time so that we don't overwhelm the network and the - // RTI. That amount of time will be no greater than ADVANCE_MESSAGE_INTERVAL in the future. + // This federate should repeatedly advance its tag to ensure downstream federates can make + // progress. Before advancing to the next tag, we need to wait some time so that we don't + // overwhelm the network and the RTI. That amount of time will be no greater than + // ADVANCE_MESSAGE_INTERVAL in the future. LF_PRINT_DEBUG("Waiting for physical time to elapse or an event on the event queue."); instant_t wait_until_time_ns = lf_time_physical() + ADVANCE_MESSAGE_INTERVAL; @@ -2615,6 +2762,11 @@ int lf_send_tagged_message(environment_t* env, interval_t additional_delay, int lf_print_error("lf_send_message: Unsupported message type (%d).", message_type); return -1; } + if (_fed.outbound_p2p_connection_is_transient[federate] && _fed.net_for_outbound_p2p_connections[federate] < 0) { + // Only print a warning if the destination is a known outbound transient. + lf_print_info("The destination transient federate %d is not connected. Abort sending!", federate); + return 0; + } size_t buffer_head = 0; // First byte is the message type. @@ -2832,9 +2984,9 @@ instant_t lf_wait_until_time(tag_t tag) { // Do not add the STA if the tag is the starting tag. if (tag.time != start_time || tag.microstep != 0u) { - // Apply the STA to the logical time, but only if at least one network input port is not known up to this tag. - // Subtract one microstep because it is sufficient to commit to a tag if the input ports are known - // up to one microstep earlier. + // Apply the STA to the logical time, but only if at least one network input port is not known + // up to this tag. Subtract one microstep because it is sufficient to commit to a tag if the + // input ports are known up to one microstep earlier. if (tag.microstep > 0) { tag.microstep--; } else { diff --git a/include/core/federated/federate.h b/include/core/federated/federate.h index 439460388..81f1d9d89 100644 --- a/include/core/federated/federate.h +++ b/include/core/federated/federate.h @@ -8,11 +8,11 @@ * @author Edward A. Lee * @author Anirudh Rengarajsm * - * This file defines the core data structures and functions used in federated Lingua Franca programs. - * It includes the federate instance structure that tracks the state of a federate, including its - * connections to the RTI and other federates, message handling, and coordination mechanisms. - * The file also provides functions for managing these connections, sending and receiving messages, - * and handling various aspects of federated execution. + * This file defines the core data structures and functions used in federated Lingua Franca + * programs. It includes the federate instance structure that tracks the state of a federate, + * including its connections to the RTI and other federates, message handling, and coordination + * mechanisms. The file also provides functions for managing these connections, sending and + * receiving messages, and handling various aspects of federated execution. */ #ifndef FEDERATE_H @@ -20,11 +20,11 @@ #include -#include "tag.h" -#include "lf_types.h" #include "environment.h" +#include "lf_types.h" #include "low_level_platform.h" #include "net_abstraction.h" +#include "tag.h" #ifndef ADVANCE_MESSAGE_INTERVAL #define ADVANCE_MESSAGE_INTERVAL MSEC(10) @@ -57,6 +57,11 @@ typedef struct federate_instance_t { */ size_t number_of_inbound_p2p_connections; + /** + * Number of inbound peer-to-peer connections from transient federates. + */ + size_t number_of_inbound_p2p_transients; + /** * Array of thread IDs for threads that listen for incoming messages. * This is NULL if there are none and otherwise has size given by @@ -71,6 +76,18 @@ typedef struct federate_instance_t { */ size_t number_of_outbound_p2p_connections; + /** + * Number of outbound peer-to-peer connections to transient federates. + */ + size_t number_of_outbound_p2p_transients; + + /** + * An array of IDs of transient federates to which this federate has outbound + * peer-to-peer connections. The array has size number_of_outbound_p2p_transients + * and is allocated at startup by the generated _lf_executable_preamble(). + */ + bool outbound_p2p_connection_is_transient[NUMBER_OF_FEDERATES]; + /** * An array that holds the network abstractions for inbound * connections from each federate. The index will be the federate @@ -86,6 +103,15 @@ typedef struct federate_instance_t { */ net_abstraction_t net_for_inbound_p2p_connections[NUMBER_OF_FEDERATES]; + /** + * An array indexed by federate ID indicating whether the corresponding + * inbound peer-to-peer federate is transient. Initialized to false. + * Set in lf_handle_p2p_connections_from_federates() when the handshake + * reveals the remote federate's type. Used by mark_inputs_known_absent() + * to avoid permanently stamping FOREVER_TAG on ports whose source may rejoin. + */ + bool inbound_p2p_connection_is_transient[NUMBER_OF_FEDERATES]; + /** * An array that holds the network abstractions for outbound direct * connections to each remote federate. The index will be the federate @@ -101,6 +127,13 @@ typedef struct federate_instance_t { */ net_abstraction_t net_for_outbound_p2p_connections[NUMBER_OF_FEDERATES]; + /** + * An array indicating whether each federate is transient. + * The index is the federate ID. + * This is initialized at startup by the generated _lf_executable_preamble(). + */ + bool transients[NUMBER_OF_FEDERATES]; + /** * Thread ID for a thread that accepts network abstractions and then supervises * listening to those network abstractions for incoming P2P (physical) connections. @@ -255,7 +288,7 @@ extern lf_cond_t lf_port_status_changed; // Public functions (in alphabetical order) /** - * @brief Connect to the federate with the specified id. + * @brief Connect to the federate with the specified id, based if it is transient or not. * @ingroup Federated * * The established connection will then be used in functions such as lf_send_tagged_message() @@ -268,8 +301,12 @@ extern lf_cond_t lf_port_status_changed; * refer to the network abstraction for communicating directly with the federate. * * @param remote_federate_id The ID of the remote federate. + * @param is_transient Whether the remote federate is transient. This affects + * connection behavior: a transient remote federate may not be immediately + * available, so the connection attempt is handled differently than for a + * persistent federate. */ -void lf_connect_to_federate(uint16_t remote_federate_id); +void lf_connect_to_federate(uint16_t remote_federate_id, bool is_transient); /** * @brief Connect to the RTI at the specified host and port. @@ -487,9 +524,10 @@ int lf_send_stop_request_to_rti(tag_t stop_tag); * @brief Send a tagged message to the specified port of the specified federate. * @ingroup Federated * - * The tag will be the current tag of the specified environment delayed by the specified additional_delay. - * If the delayed tag falls after the timeout time, then the message is not sent and -1 is returned. - * The caller can reuse or free the memory storing the message after this returns. + * The tag will be the current tag of the specified environment delayed by the specified + * additional_delay. If the delayed tag falls after the timeout time, then the message is not sent + * and -1 is returned. The caller can reuse or free the memory storing the message after this + * returns. * * If the message fails to send (e.g. the network abstraction connection is broken), then the * response depends on the message_type. For MSG_TYPE_TAGGED_MESSAGE, the message is @@ -556,7 +594,8 @@ void lf_spawn_staa_thread(void); void lf_stall_advance_level_federation(environment_t* env, size_t level); /** - * @brief Version of lf_stall_advance_level_federation() that assumes the caller holds the mutex lock. + * @brief Version of lf_stall_advance_level_federation() that assumes the caller holds the mutex + * lock. * @ingroup Federated * * @param level The level to which we would like to advance. diff --git a/lingua-franca-ref.txt b/lingua-franca-ref.txt index 52199a147..17e20d967 100644 --- a/lingua-franca-ref.txt +++ b/lingua-franca-ref.txt @@ -1 +1 @@ -transient-fed +transient-fed-dec diff --git a/network/api/net_common.h b/network/api/net_common.h index 4ca6fd4a5..c44835343 100644 --- a/network/api/net_common.h +++ b/network/api/net_common.h @@ -174,9 +174,9 @@ * request). When the RTI has gathered all the stop tags * from federates (that are still connected), it will decide on a common stop tag * which is the maximum of the seen stop tag and answer with a MSG_TYPE_STOP_GRANTED. The federate - * sending the MSG_TYPE_STOP_REQUEST and federates sending the MSG_TYPE_STOP_REQUEST_REPLY will freeze - * the advancement of tag until they receive the MSG_TYPE_STOP_GRANTED message, in which - * case they might continue their execution until the stop tag has been reached. + * sending the MSG_TYPE_STOP_REQUEST and federates sending the MSG_TYPE_STOP_REQUEST_REPLY will + * freeze the advancement of tag until they receive the MSG_TYPE_STOP_GRANTED message, in which case + * they might continue their execution until the stop tag has been reached. * */ @@ -193,7 +193,8 @@ #define FED_COM_BUFFER_SIZE 256u /** - * @brief Time that a federate waits before asking the RTI again for the port and IP address of a federate. + * @brief Time that a federate waits before asking the RTI again for the port and IP address of a + * federate. * @ingroup Network * * The federate repeatedly sends an MSG_TYPE_ADDRESS_QUERY message after the RTI responds that it @@ -576,6 +577,7 @@ * @ingroup Network * * The next two bytes are the other federate's ID. + * The following byte is 1 if the remote federate being queried is transient, 0 otherwise. */ #define MSG_TYPE_ADDRESS_QUERY 13 @@ -776,6 +778,24 @@ #define MSG_TYPE_UPSTREAM_DISCONNECTED 28 #define MSG_TYPE_UPSTREAM_DISCONNECTED_LENGTH (1 + sizeof(uint16_t)) +/** + * A message that informs an upstream federate that a transient federate downstream of it + * has (re-)connected. The next 2 bytes are the federate ID of the downstream federate. + * Upon receiving this, the upstream federate should query the RTI for the downstream's + * address and establish (or re-establish) the outbound P2P connection. + */ +#define MSG_TYPE_OUTBOUND_CONNECTED 30 +#define MSG_TYPE_OUTBOUND_CONNECTED_LENGTH (1 + sizeof(uint16_t)) + +/** + * A message that informs an upstream federate that a transient federate downstream of it + * has disconnected. The next 2 bytes are the federate ID of the downstream federate. + * Upon receiving this, the upstream federate should close its outbound P2P connection + * to the downstream. + */ +#define MSG_TYPE_OUTBOUND_DISCONNECTED 31 +#define MSG_TYPE_OUTBOUND_DISCONNECTED_LENGTH (1 + sizeof(uint16_t)) + /** * Byte sent by the RTI ordering the federate to stop. Upon receiving the message, * the federate will call lf_stop(), which will make it resign at its current_tag diff --git a/network/impl/src/lf_socket_support.c b/network/impl/src/lf_socket_support.c index 76aee2308..85eecd484 100644 --- a/network/impl/src/lf_socket_support.c +++ b/network/impl/src/lf_socket_support.c @@ -123,8 +123,12 @@ void read_from_net_fail_on_error(net_abstraction_t net_abs, size_t num_bytes, un // Read failed. if (format != NULL) { va_start(args, format); - lf_print_error_system_failure(format, args); + // Use lf_vprint_error (va_list variant) rather than lf_print_error_system_failure + // (variadic variant). Passing a va_list to a '...' function is undefined behaviour + // and manifests as garbage argument values on macOS due to ABI differences. + lf_vprint_error(format, args); va_end(args); + lf_print_error_and_exit("Error %d: %s", errno, strerror(errno)); } else { lf_print_error_system_failure("Failed to read from socket."); } diff --git a/network/impl/src/socket_common.c b/network/impl/src/socket_common.c index 53c78c68d..5487f8878 100644 --- a/network/impl/src/socket_common.c +++ b/network/impl/src/socket_common.c @@ -202,7 +202,10 @@ int accept_socket(int socket) { // Got a socket break; } else if (socket_id < 0 && (errno != EAGAIN || errno != EWOULDBLOCK || errno != EINTR)) { - if (errno != ECONNABORTED) { + // ECONNABORTED: a connection was aborted before accept() could complete — not fatal. + // EINVAL: the socket was shut down (e.g., shutdown_socket() was called to unblock this + // accept() intentionally when the RTI is shutting down) — expected, not an error. + if (errno != ECONNABORTED && errno != EINVAL) { lf_print_warning("Failed to accept the socket. %s.", strerror(errno)); } break; @@ -294,7 +297,6 @@ int read_from_socket(int socket, size_t num_bytes, unsigned char* buffer) { } return 0; } - ssize_t peek_from_socket(int socket, unsigned char* result) { ssize_t bytes_read = recv(socket, result, 1, MSG_DONTWAIT | MSG_PEEK); if (bytes_read < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) diff --git a/trace/api/types/trace_types.h b/trace/api/types/trace_types.h index f13dc7ad5..c9e70f789 100644 --- a/trace/api/types/trace_types.h +++ b/trace/api/types/trace_types.h @@ -53,6 +53,8 @@ typedef enum { send_ADR_QR, send_ADR_QR_REP, send_DNET, + send_UPSTREAM_CONNECTED, + send_UPSTREAM_DISCONNECTED, // Receiving messages receive_ACK, receive_FAILED, @@ -77,9 +79,16 @@ typedef enum { receive_ADR_QR, receive_ADR_QR_REP, receive_DNET, + receive_UPSTREAM_CONNECTED, + receive_UPSTREAM_DISCONNECTED, receive_UNIDENTIFIED, send_STOP, receive_STOP, + // New entries must be added here, at the end, to avoid shifting existing indices. + send_OUTBOUND_CONNECTED, + receive_OUTBOUND_CONNECTED, + send_OUTBOUND_DISCONNECTED, + receive_OUTBOUND_DISCONNECTED, NUM_EVENT_TYPES } trace_event_t; @@ -123,6 +132,8 @@ static const char* trace_event_names[] = { "Sending ADR_QR", "Sending ADR_QR_REP", "Sending DNET", + "Sending UPSTREAM_CONNECTED", + "Sending UPSTREAM_DISCONNECTED", // Receiving messages "Receiving ACK", "Receiving FAILED", @@ -147,9 +158,16 @@ static const char* trace_event_names[] = { "Receiving ADR_QR", "Receiving ADR_QR_REP", "Receiving DNET", + "Receiving UPSTREAM_CONNECTED", + "Receiving UPSTREAM_DISCONNECTED", "Receiving UNIDENTIFIED", "Sending STOP", "Receiving STOP", + // New entries appended at the end to avoid shifting existing indices. + "Sending OUTBOUND_CONNECTED", + "Receiving OUTBOUND_CONNECTED", + "Sending OUTBOUND_DISCONNECTED", + "Receiving OUTBOUND_DISCONNECTED", }; static inline void _suppress_unused_variable_warning_for_static_variable() { (void)trace_event_names; } diff --git a/util/tracing/visualization/fedsd.py b/util/tracing/visualization/fedsd.py index 7d1a08e64..3798950fa 100644 --- a/util/tracing/visualization/fedsd.py +++ b/util/tracing/visualization/fedsd.py @@ -30,6 +30,10 @@ .DNET { stroke: #7b2d8b; fill: #7b2d8b; } \ .TIMESTAMP { stroke: #888888; fill: #888888; } \ .FED_ID { stroke: #80DD99; fill: #80DD99; } \ + .UPSTREAM_CONNECTED { stroke: #f4a261; fill: #f4a261; } \ + .UPSTREAM_DISCONNECTED { stroke: #e76f51; fill: #e76f51; } \ + .OUTBOUND_CONNECTED { stroke: #2a9d8f; fill: #2a9d8f; } \ + .OUTBOUND_DISCONNECTED { stroke: #264653; fill: #264653; } \ .ACK { stroke: #52b788; fill: #52b788; } \ .FAILED { stroke: #c1121f; fill: #c1121f; } \ .STOP {stroke: #d0b7eb; fill: #d0b7eb} \ @@ -66,6 +70,10 @@ "Sending STOP_REQ_REP": "STOP_REQ_REP", "Sending STOP_GRN": "STOP_GRN", "Sending FED_ID": "FED_ID", + "Sending UPSTREAM_CONNECTED": "UPSTREAM_CONNECTED", + "Sending UPSTREAM_DISCONNECTED": "UPSTREAM_DISCONNECTED", + "Sending OUTBOUND_CONNECTED": "OUTBOUND_CONNECTED", + "Sending OUTBOUND_DISCONNECTED": "OUTBOUND_DISCONNECTED", "Sending PTAG": "PTAG", "Sending TAG": "TAG", "Sending REJECT": "REJECT", @@ -89,6 +97,10 @@ "Receiving STOP_REQ_REP": "STOP_REQ_REP", "Receiving STOP_GRN": "STOP_GRN", "Receiving FED_ID": "FED_ID", + "Receiving UPSTREAM_CONNECTED": "UPSTREAM_CONNECTED", + "Receiving UPSTREAM_DISCONNECTED": "UPSTREAM_DISCONNECTED", + "Receiving OUTBOUND_CONNECTED": "OUTBOUND_CONNECTED", + "Receiving OUTBOUND_DISCONNECTED": "OUTBOUND_DISCONNECTED", "Receiving PTAG": "PTAG", "Receiving TAG": "TAG", "Receiving REJECT": "REJECT", @@ -150,7 +162,8 @@ def format_actor_name(name): # Events matching at the sender and receiver ends depend on whether they are tagged # (the elapsed logical time and microstep have to be the same) or not. # Set of non-tagged events (messages) -non_tagged_messages = {'FED_ID', 'ACK', 'RESIGN', 'FAILED', 'REJECT', 'ADR_QR', 'ADR_QR_REP', 'ADR_AD', 'MSG', 'P2P_MSG', 'STOP'} +non_tagged_messages = {'FED_ID', 'UPSTREAM_CONNECTED', 'UPSTREAM_DISCONNECTED', 'OUTBOUND_CONNECTED', 'OUTBOUND_DISCONNECTED', + 'ACK', 'RESIGN', 'FAILED', 'REJECT', 'ADR_QR', 'ADR_QR_REP', 'ADR_AD', 'MSG', 'P2P_MSG', 'STOP'} ################################################################################ @@ -645,6 +658,25 @@ def get_and_convert_lft_files(rti_lft_file, federates_lft_files, start_time, end (trace_df['logical_time'] == logical_time) & \ (trace_df['microstep'] == microstep) \ ] + elif (event == 'P2P_MSG'): + # P2P messages travel directly between federates without going through the + # RTI, so partner_id in the trace is typically -1 on both sides (the RTI is + # not involved and the tracepoint has no partner). We therefore cannot use + # partner_id for matching. Instead we match each 'out' to the first pending + # 'in' whose physical_time >= the sender's physical_time (causality guarantee: + # the receive cannot precede the send). + physical_time = trace_df.at[index, 'physical_time'] + if (inout == 'out'): + matching_df = trace_df[\ + (trace_df['inout'] == 'in') & \ + (trace_df['arrow'] == 'pending') & \ + (trace_df['event'] == event) & \ + (trace_df['physical_time'] >= physical_time) \ + ] + else: + # 'in' rows are claimed by the corresponding 'out' pass above. + # If we reach an 'in' here it means no 'out' claimed it; render as dot. + matching_df = trace_df[0:0] else : matching_df = trace_df[\ (trace_df['inout'] != inout) & \