diff --git a/core/federated/RTI/main.c b/core/federated/RTI/main.c index 58346348f..9cdd5b527 100644 --- a/core/federated/RTI/main.c +++ b/core/federated/RTI/main.c @@ -99,6 +99,8 @@ void usage(int argc, const char* argv[]) { lf_print(" The ID of the federation that this RTI will control.\n"); lf_print(" -n, --number_of_federates "); lf_print(" The number of federates in the federation that this RTI will control.\n"); + lf_print(" -nt, --number_of_transient_federates "); + lf_print(" The number of transient federates in the federation that this RTI will control.\n"); lf_print(" -p, --port "); lf_print(" The port number to use for the RTI. Must be larger than 0 and smaller than %d. Default is %d.\n", UINT16_MAX, DEFAULT_PORT); @@ -219,6 +221,21 @@ int process_args(int argc, const char* argv[]) { } rti.base.number_of_scheduling_nodes = (int32_t)num_federates; // FIXME: Loses numbers on 64-bit machines lf_print_info("RTI: Number of federates: %d", rti.base.number_of_scheduling_nodes); + } else if (strcmp(argv[i], "-nt") == 0 || strcmp(argv[i], "--number_of_transient_federates") == 0) { + if (argc < i + 2) { + lf_print_error("--number_of_transient_federates needs a valid positive argument."); + usage(argc, argv); + return 0; + } + i++; + long num_transient_federates = strtol(argv[i], NULL, 10); + if (num_transient_federates == LONG_MAX || num_transient_federates == LONG_MIN) { + lf_print_error("--number_of_transient_federates needs a valid positive or null integer argument."); + usage(argc, argv); + return 0; + } + rti.number_of_transient_federates = (int32_t)num_transient_federates; // FIXME: Loses numbers on 64-bit machines + lf_print_info("RTI: Number of transient federates: %d", rti.number_of_transient_federates); } else if (strcmp(argv[i], "-p") == 0 || strcmp(argv[i], "--port") == 0) { #ifdef COMM_TYPE_TCP if (argc < i + 2) { @@ -265,6 +282,16 @@ int process_args(int argc, const char* argv[]) { return 0; } } + if (rti.base.number_of_scheduling_nodes == 0) { + lf_print_error("--number_of_federates needs a valid positive integer argument."); + usage(argc, argv); + return 0; + } + if (rti.number_of_transient_federates > rti.base.number_of_scheduling_nodes) { + lf_print_error("--number_of_transient_federates cannot be higher than the number of federates."); + usage(argc, argv); + return 0; + } return 1; } int main(int argc, const char* argv[]) { @@ -304,8 +331,8 @@ int main(int argc, const char* argv[]) { lf_print_info("Tracing the RTI execution in %s file.", rti_trace_file_name); } - lf_print_log("Starting RTI for %d federates in federation ID %s.", rti.base.number_of_scheduling_nodes, - rti.federation_id); + lf_print_log("Starting RTI for a total of %d federates, with %d being transient, in federation ID %s", + rti.base.number_of_scheduling_nodes, rti.number_of_transient_federates, rti.federation_id); assert(rti.base.number_of_scheduling_nodes < UINT16_MAX); // Allocate memory for the federates diff --git a/core/federated/RTI/rti_common.c b/core/federated/RTI/rti_common.c index 15a5596c7..b1b4bbadd 100644 --- a/core/federated/RTI/rti_common.c +++ b/core/federated/RTI/rti_common.c @@ -42,6 +42,7 @@ void invalidate_min_delays() { node->flags = 0; // All flags cleared because they get set lazily. } free(rti_common->min_delays); + rti_common->min_delays = NULL; } } @@ -101,6 +102,8 @@ tag_t earliest_future_incoming_message_tag(scheduling_node_t* e) { if (lf_tag_compare(rti_common->min_delays[i * n + e->id], FOREVER_TAG) != 0) { // Node i is upstream of e with min delay rti_common->min_delays[i * n + e->id] scheduling_node_t* upstream = rti_common->scheduling_nodes[i]; + if (upstream->state == NOT_CONNECTED) + continue; // If we haven't heard from the upstream node, then assume it can send an event at the start time. if (lf_tag_compare(upstream->next_event, NEVER_TAG) == 0) { tag_t start_tag = {.time = start_time, .microstep = 0}; @@ -163,6 +166,9 @@ tag_t eimt_strict(scheduling_node_t* e) { tag_advance_grant_t tag_advance_grant_if_safe(scheduling_node_t* e) { tag_advance_grant_t result = {.tag = NEVER_TAG, .is_provisional = false}; + // Check how many upstream federates are connected + int num_connected_upstream = 0; + // Find the earliest LTC of upstream scheduling_nodes (M). tag_t min_upstream_completed = FOREVER_TAG; @@ -172,6 +178,7 @@ tag_advance_grant_t tag_advance_grant_if_safe(scheduling_node_t* e) { // Ignore this enclave/federate if it is not connected. if (upstream->state == NOT_CONNECTED) continue; + num_connected_upstream++; // Adjust by the "after" delay. // Note that "no delay" is encoded as NEVER, @@ -184,8 +191,15 @@ tag_advance_grant_t tag_advance_grant_if_safe(scheduling_node_t* e) { } LF_PRINT_LOG("RTI: Minimum upstream LTC for federate/enclave %d is " PRINTF_TAG "(adjusted by after delay).", e->id, min_upstream_completed.time - start_time, min_upstream_completed.microstep); - if (lf_tag_compare(min_upstream_completed, e->last_granted) > 0 && - lf_tag_compare(min_upstream_completed, e->next_event) >= 0 // The enclave has to advance its tag + + if (num_connected_upstream == 0) { + // When none of the upstream federates is connected (case of transients), + if (lf_tag_compare(e->next_event, FOREVER_TAG) != 0) { + result.tag = e->next_event; + return result; + } + } else if (lf_tag_compare(min_upstream_completed, e->last_granted) > 0 && + lf_tag_compare(min_upstream_completed, e->next_event) >= 0 // The enclave has to advance its tag ) { result.tag = min_upstream_completed; return result; diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index 0f06b6c50..1277e4051 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -27,6 +27,7 @@ #include "rti_remote.h" #include "net_util.h" #include +#include "clock.h" // For lf_clock_cond_timedwait() // Global variables defined in tag.c: extern instant_t start_time; @@ -36,6 +37,15 @@ extern instant_t start_time; */ static rti_remote_t* rti_remote; +// Referance to the federate instance to support hot swap +static federate_info_t* hot_swap_federate; + +// Indicates if a hot swap process is in progress +static bool hot_swap_in_progress = false; + +// Indicates that the old federate has stopped. +static bool hot_swap_old_resigned = false; + bool _lf_federate_reports_error = false; // A convenient macro for getting the `federate_info_t *` at index `_idx` @@ -43,24 +53,248 @@ bool _lf_federate_reports_error = false; #define GET_FED_INFO(_idx) (federate_info_t*)rti_remote->base.scheduling_nodes[_idx] lf_mutex_t rti_mutex; -lf_cond_t received_start_times; -lf_cond_t sent_start_time; +static lf_cond_t received_start_times; +static lf_cond_t sent_start_time; +static lf_cond_t updated_delayed_grants; extern int lf_critical_section_enter(environment_t* env) { return lf_mutex_lock(&rti_mutex); } extern int lf_critical_section_exit(environment_t* env) { return lf_mutex_unlock(&rti_mutex); } -void notify_tag_advance_grant(scheduling_node_t* e, tag_t tag) { - if (e->state == NOT_CONNECTED || lf_tag_compare(tag, e->last_granted) <= 0 || - lf_tag_compare(tag, e->last_provisionally_granted) < 0) { +// Utility functions to simplify the call of pqueue_tag routines. +// These functions mainly do the casting. +// FIXME: Should we remove the queue parameter from the functions? + +/** + * @brief Creates a priority queue of delayed grants that is sorted by tags. + * + * @param nbr_delayed_grants The size. + * @return The dynamically allocated queue or NULL. + */ +static pqueue_delayed_grants_t* pqueue_delayed_grants_init(uint16_t nbr_delayed_grants) { + return (pqueue_delayed_grants_t*)pqueue_tag_init((size_t)nbr_delayed_grants); +} + +/** + * @brief Return the size of the queue. + * + * @param q The queue. + * @return The size. + */ +static size_t pqueue_delayed_grants_size(pqueue_delayed_grants_t* q) { return pqueue_tag_size((pqueue_tag_t*)q); } + +/** + * @brief Insert an\ delayed grant element into the queue. + * + * @param q The queue. + * @param e The delayed grant element to insert. + * @return 0 on success + */ +static int pqueue_delayed_grants_insert(pqueue_delayed_grants_t* q, pqueue_delayed_grant_element_t* d) { + return pqueue_tag_insert((pqueue_tag_t*)q, (void*)d); +} + +/** + * @brief Pop the least-tag element from the queue. + * + * @param q The queue. + * @return NULL on error, otherwise the entry + */ +static pqueue_delayed_grant_element_t* pqueue_delayed_grants_pop(pqueue_delayed_grants_t* q) { + return (pqueue_delayed_grant_element_t*)pqueue_tag_pop((pqueue_tag_t*)q); +} + +/** + * @brief Return highest-ranking element without removing it. + * + * @param q The queue. + * @return NULL on if the queue is empty, otherwise the delayed grant element. + */ +static pqueue_delayed_grant_element_t* pqueue_delayed_grants_peek(pqueue_delayed_grants_t* q) { + return (pqueue_delayed_grant_element_t*)pqueue_tag_peek((pqueue_tag_t*)q); +} + +/** + * @brief Free all memory used by the queue including elements that are marked dynamic. + * + * @param q The queue. + */ +static void pqueue_delayed_grants_free(pqueue_delayed_grants_t* q) { pqueue_tag_free((pqueue_tag_t*)q); } + +/** + * @brief Remove an item from the delayed grants queue. + * + * @param q The queue. + * @param e The entry to remove. + */ +static void pqueue_delayed_grants_remove(pqueue_delayed_grants_t* q, pqueue_delayed_grant_element_t* e) { + pqueue_tag_remove((pqueue_tag_t*)q, (void*)e); +} + +/** + * @brief Return the first item with the specified tag or NULL if there is none. + * @param q The queue. + * @param t The tag. + * @return An entry with the specified tag or NULL if there isn't one. + */ +pqueue_delayed_grant_element_t* pqueue_delayed_grants_find_with_tag(pqueue_delayed_grants_t* q, tag_t t) { + return (pqueue_delayed_grant_element_t*)pqueue_tag_find_with_tag((pqueue_tag_t*)q, t); +} + +// Function that does not in pqueue_tag.c +/** + * @brief Return the first item with the specified federate id or NULL if there is none. + * @param q The queue. + * @param fed_id The federate id. + * @return An entry with the specified federate if or NULL if there isn't one. + */ +static pqueue_delayed_grant_element_t* pqueue_delayed_grants_find_by_fed_id(pqueue_delayed_grants_t* q, + uint16_t fed_id) { + pqueue_delayed_grant_element_t* dge; + pqueue_t* _q = (pqueue_t*)q; + if (!q || q->size == 1) + return NULL; + for (int i = 1; i < q->size; i++) { + dge = (pqueue_delayed_grant_element_t*)q->d[i]; + if (dge) { + if (dge->fed_id == fed_id) { + return dge; + } + } + } + return NULL; +} + +/** + * @brief Insert the delayed grant into the delayed_grants queue and notify. + * + * This function assumes the caller holds the rti_mutex. + * @param fed The federate. + * @param tag The tag to grant. + * @param is_provisional State whther the grant is provisional. + */ +static void notify_grant_delayed(federate_info_t* fed, tag_t tag, bool is_provisional) { + // Check wether there is already a pending grant. + pqueue_delayed_grant_element_t* dge = + pqueue_delayed_grants_find_by_fed_id(rti_remote->delayed_grants, fed->enclave.id); + if (dge == NULL) { + pqueue_delayed_grant_element_t* dge = + (pqueue_delayed_grant_element_t*)malloc(sizeof(pqueue_delayed_grant_element_t)); + dge->base.is_dynamic = 1; + dge->base.tag = tag; + dge->fed_id = fed->enclave.id; + dge->is_provisional = is_provisional; + pqueue_delayed_grants_insert(rti_remote->delayed_grants, dge); + LF_PRINT_LOG("RTI: Inserting a delayed grant of " PRINTF_TAG " for federate %d.", dge->base.tag.time - start_time, + dge->base.tag.microstep, dge->fed_id); + lf_cond_signal(&updated_delayed_grants); + } else { + // Note that there should never be more than one pending grant for a federate. + int compare = lf_tag_compare(dge->base.tag, tag); + if (compare > 0) { + // Update the pre-existing grant. + dge->base.tag = tag; + dge->is_provisional = is_provisional; + LF_PRINT_LOG("RTI: Updating a delayed grant of " PRINTF_TAG " for federate %d.", tag.time - start_time, + tag.microstep, dge->fed_id); + lf_cond_signal(&updated_delayed_grants); + } else if (compare == 0) { + if (dge->is_provisional != is_provisional) { + // Update the grant to keep the most recent is_provisional status. + dge->is_provisional = is_provisional; + LF_PRINT_LOG("RTI: Changing status of a delayed grant of " PRINTF_TAG " for federate %d to provisional: %d.", + dge->base.tag.time - start_time, dge->base.tag.microstep, dge->fed_id, is_provisional); + } + } + } +} + +/** + * Find the number of non connected upstream transients + * @param fed The federate + * @return the number of non connected upstream transients + */ +static int get_num_absent_upstream_transients(federate_info_t* fed) { + int num_absent_upstream_transients = 0; + for (int j = 0; j < fed->enclave.num_immediate_upstreams; j++) { + federate_info_t* upstream = GET_FED_INFO(fed->enclave.immediate_upstreams[j]); + // Ignore this enclave if it no longer connected. + if ((upstream->enclave.state == NOT_CONNECTED) && (upstream->is_transient)) { + num_absent_upstream_transients++; + } + } + return num_absent_upstream_transients; +} + +/** + * @brief Send MSG_TYPE_UPSTREAM_CONNECTED to the specified `destination` if it is connected to the RTI, + * telling it that the specified `upstream` federate is also now connected. + * + * This function assumes that the mutex lock is already held. + * @param destination The destination federate. + * @param disconnected The connected federate. + */ +static void send_upstream_connected_locked(federate_info_t* destination, federate_info_t* connected) { + if (destination->enclave.state == NOT_CONNECTED) { + LF_PRINT_LOG("RTI did not send upstream connected message to federate %d, because it is not connected.", + destination->enclave.id); return; } - // Need to make sure that the destination federate's thread has already - // sent the starting MSG_TYPE_TIMESTAMP message. - while (e->state == PENDING) { - // Need to wait here. - lf_cond_wait(&sent_start_time); + unsigned char buffer[MSG_TYPE_UPSTREAM_CONNECTED_LENGTH]; + buffer[0] = MSG_TYPE_UPSTREAM_CONNECTED; + encode_uint16(connected->enclave.id, &buffer[1]); + if (write_to_net_close_on_error(destination->net, MSG_TYPE_UPSTREAM_CONNECTED_LENGTH, buffer)) { + lf_print_warning("RTI: Failed to send upstream connected message to federate %d.", destination->enclave.id); + } +} + +/** + * @brief Send MSG_TYPE_UPSTREAM_DISCONNECTED to the specified federate. + * + * This function assumes that the mutex lock is already held. + * @param destination The destination federate. + * @param disconnected The disconnected federate. + */ +static void send_upstream_disconnected_locked(federate_info_t* destination, federate_info_t* disconnected) { + unsigned char buffer[MSG_TYPE_UPSTREAM_DISCONNECTED_LENGTH]; + buffer[0] = MSG_TYPE_UPSTREAM_DISCONNECTED; + encode_uint16(disconnected->enclave.id, &buffer[1]); + if (write_to_net_close_on_error(destination->net, MSG_TYPE_UPSTREAM_DISCONNECTED_LENGTH, buffer)) { + lf_print_warning("RTI: Failed to send upstream disconnected message to federate %d.", disconnected->enclave.id); } +} + +/** + * @brief Mark a federate as disconnected and, if this is a transient, inform downstream federates. + * @param fed The disconnected federate. + */ +static void notify_federate_disconnected(federate_info_t* fed) { + fed->enclave.state = NOT_CONNECTED; + // Notify downstream federates. Need to hold the mutex lock to do this. + if (fed->is_transient) { + LF_MUTEX_LOCK(&rti_mutex); + for (int j = 0; j < fed->enclave.num_immediate_downstreams; j++) { + federate_info_t* downstream = GET_FED_INFO(fed->enclave.immediate_downstreams[j]); + // Ignore this enclave if it no longer connected. + if (downstream->enclave.state != NOT_CONNECTED) { + // Notify the downstream enclave. + send_upstream_disconnected_locked(downstream, fed); + } + } + LF_MUTEX_UNLOCK(&rti_mutex); + } +} + +/** + * Notify a tag advance grant (TAG) message to the specified federate immediately. + * + * This function will keep a record of this TAG in the enclave's last_granted + * field. + * + * @param e The enclave. + * @param tag The tag to grant. + */ +static void notify_tag_advance_grant_immediate(scheduling_node_t* e, tag_t tag) { size_t message_length = 1 + sizeof(int64_t) + sizeof(uint32_t); unsigned char buffer[message_length]; buffer[0] = MSG_TYPE_TAG_ADVANCE_GRANT; @@ -75,7 +309,8 @@ void notify_tag_advance_grant(scheduling_node_t* e, tag_t tag) { // to fail. Consider a failure here a soft failure and update the federate's status. if (write_to_net(((federate_info_t*)e)->net, message_length, buffer)) { lf_print_error("RTI failed to send tag advance grant to federate %d.", e->id); - e->state = NOT_CONNECTED; + // Mark a federate as disconnected and inform if necessary + notify_federate_disconnected(GET_FED_INFO(e->id)); } else { e->last_granted = tag; LF_PRINT_LOG("RTI sent to federate %d the tag advance grant (TAG) " PRINTF_TAG ".", e->id, tag.time - start_time, @@ -83,7 +318,7 @@ void notify_tag_advance_grant(scheduling_node_t* e, tag_t tag) { } } -void notify_provisional_tag_advance_grant(scheduling_node_t* e, tag_t tag) { +void notify_tag_advance_grant(scheduling_node_t* e, tag_t tag) { if (e->state == NOT_CONNECTED || lf_tag_compare(tag, e->last_granted) <= 0 || lf_tag_compare(tag, e->last_provisionally_granted) <= 0) { return; @@ -94,6 +329,32 @@ void notify_provisional_tag_advance_grant(scheduling_node_t* e, tag_t tag) { // Need to wait here. lf_cond_wait(&sent_start_time); } + + // Check if sending the tag advance grant needs to be delayed or not + // Delay is needed when a federate has, at least one, absent upstream transient + federate_info_t* fed = GET_FED_INFO(e->id); + if (!fed->has_upstream_transient_federates) { + notify_tag_advance_grant_immediate(e, tag); + } else { + if (get_num_absent_upstream_transients(fed) > 0) { + notify_grant_delayed(fed, tag, false); + } else { + notify_tag_advance_grant_immediate(e, tag); + } + } +} + +/** + * Notify a provisional tag advance grant (PTAG) message to the specified federate + * immediately. + * + * This function will keep a record of this TAG in the enclave's last_provisionally_granted + * field. + * + * @param e The scheduling node. + * @param tag The tag to grant. + */ +void notify_provisional_tag_advance_grant_immediate(scheduling_node_t* e, tag_t tag) { size_t message_length = 1 + sizeof(int64_t) + sizeof(uint32_t); unsigned char buffer[message_length]; buffer[0] = MSG_TYPE_PROVISIONAL_TAG_ADVANCE_GRANT; @@ -108,7 +369,8 @@ void notify_provisional_tag_advance_grant(scheduling_node_t* e, tag_t tag) { // to fail. Consider a failure here a soft failure and update the federate's status. if (write_to_net(((federate_info_t*)e)->net, message_length, buffer)) { lf_print_error("RTI failed to send tag advance grant to federate %d.", e->id); - e->state = NOT_CONNECTED; + // Mark a federate as disconnected and inform if necessary + notify_federate_disconnected(GET_FED_INFO(e->id)); } else { e->last_provisionally_granted = tag; LF_PRINT_LOG("RTI sent to federate %d the Provisional Tag Advance Grant (PTAG) " PRINTF_TAG ".", e->id, @@ -144,6 +406,32 @@ void notify_provisional_tag_advance_grant(scheduling_node_t* e, tag_t tag) { } } +void notify_provisional_tag_advance_grant(scheduling_node_t* e, tag_t tag) { + if (e->state == NOT_CONNECTED || lf_tag_compare(tag, e->last_granted) <= 0 || + lf_tag_compare(tag, e->last_provisionally_granted) <= 0) { + return; + } + // Need to make sure that the destination federate's thread has already + // sent the starting MSG_TYPE_TIMESTAMP message. + while (e->state == PENDING) { + // Need to wait here. + lf_cond_wait(&sent_start_time); + } + + // Check if sending the tag advance grant needs to be delayed or not + // Delay is needed when a federate has, at least one, absent upstream transient + federate_info_t* fed = GET_FED_INFO(e->id); + if (!fed->has_upstream_transient_federates) { + notify_provisional_tag_advance_grant_immediate(e, tag); + } else { + if (get_num_absent_upstream_transients(fed) > 0) { + notify_grant_delayed(fed, tag, true); + } else { + notify_provisional_tag_advance_grant_immediate(e, tag); + } + } +} + void notify_downstream_next_event_tag(scheduling_node_t* e, tag_t tag) { if (e->state == NOT_CONNECTED) { return; @@ -287,20 +575,22 @@ void handle_timed_message(federate_info_t* sending_federate, unsigned char* buff // issue a TAG before this message has been forwarded. LF_MUTEX_LOCK(&rti_mutex); - // If the destination federate is no longer connected, issue a warning, - // remove the message from the network abstraction and return. + // If the destination federate is no longer connected, or it is a transient that has not started executing yet + // (the delayed intended tag is less than the effective start tag of the destination), issue a warning, remove the + // message from the network abstraction, and return. federate_info_t* fed = GET_FED_INFO(federate_id); - if (fed->enclave.state == NOT_CONNECTED) { - lf_print_warning("RTI: Destination federate %d is no longer connected. Dropping message.", federate_id); - LF_PRINT_LOG("Fed status: next_event " PRINTF_TAG ", " - "completed " PRINTF_TAG ", " - "last_granted " PRINTF_TAG ", " - "last_provisionally_granted " PRINTF_TAG ".", - fed->enclave.next_event.time - start_time, fed->enclave.next_event.microstep, - fed->enclave.completed.time - start_time, fed->enclave.completed.microstep, - fed->enclave.last_granted.time - start_time, fed->enclave.last_granted.microstep, - fed->enclave.last_provisionally_granted.time - start_time, - fed->enclave.last_provisionally_granted.microstep); + interval_t delay = NEVER; + for (int i = 0; i < fed->enclave.num_immediate_upstreams; i++) { + if (fed->enclave.immediate_upstreams[i] == sending_federate->enclave.id) { + delay = fed->enclave.immediate_upstream_delays[i]; + break; + } + } + if (fed->enclave.state == NOT_CONNECTED || + lf_tag_compare(lf_delay_tag(intended_tag, delay), fed->effective_start_tag) < 0) { + lf_print_warning("RTI: Destination federate %d is not connected at logical time (" PRINTF_TAG + "). Dropping message.", + federate_id, intended_tag.time - start_time, intended_tag.microstep); // If the message was larger than the buffer, we must empty out the remainder also. size_t total_bytes_read = bytes_read; while (total_bytes_read < total_bytes_to_read) { @@ -462,18 +752,21 @@ static void broadcast_stop_time_to_federates_locked() { } /** - * Mark a federate requesting stop. If the number of federates handling stop reaches the - * NUM_OF_FEDERATES, broadcast MSG_TYPE_STOP_GRANTED to every federate. + * Mark a federate requesting stop. If the number of federates handling stop reaches + * the number of persistent federates, broadcast MSG_TYPE_STOP_GRANTED to every federate. * This function assumes the _RTI.mutex is already locked. * @param fed The federate that has requested a stop. * @return 1 if stop time has been sent to all federates and 0 otherwise. */ static int mark_federate_requesting_stop(federate_info_t* fed) { if (!fed->requested_stop) { - rti_remote->base.num_scheduling_nodes_handling_stop++; + // Increment the number of federates handling stop only if it is persistent + if (!fed->is_transient) + rti_remote->base.num_scheduling_nodes_handling_stop++; fed->requested_stop = true; } - if (rti_remote->base.num_scheduling_nodes_handling_stop == rti_remote->base.number_of_scheduling_nodes) { + if (rti_remote->base.num_scheduling_nodes_handling_stop == + (rti_remote->base.number_of_scheduling_nodes - rti_remote->number_of_transient_federates)) { // We now have information about the stop time of all // federates. broadcast_stop_time_to_federates_locked(); @@ -696,6 +989,69 @@ void handle_address_ad(uint16_t federate_id) { } } +/** + * @brief Send the global federation start time and the federate-specific starting tag to the specified federate. + * + * For persistent federates and transient federates that happen to join during federation startup, the + * `federation_start_time` will match the time in the `federate_start_tag`, and the microstep will be 0. + * For a transient federate that joins later, the time in the `federate_start_tag` will be greater than the + * federation_start_time`. + * + * + * Before sending the start time and tag, this function notifies my_fed of all upstream transient federates that are + * connected. After sending the start time and tag, and if my_fed is transient, notify federates downstream of its + * connection, ensuring proper handling of zero-delay cycles. + * + * This function assumes that the mutex lock is already held. + * + * @param my_fed the federate to send the start time to. + * @param federation_start_time the federation start_time + * @param federate_start_tag the federate effective start tag + */ +static void send_start_tag_locked(federate_info_t* my_fed, instant_t federation_start_time, tag_t federate_start_tag) { + // Notify my_fed of any upstream transient federates that are connected. + // This has to occur before sending the start tag so that my_fed does not begin executing thinking that these + // upstream federates are not connected. + for (int i = 0; i < my_fed->enclave.num_immediate_upstreams; i++) { + federate_info_t* fed = GET_FED_INFO(my_fed->enclave.immediate_upstreams[i]); + if (fed->is_transient && fed->enclave.state == GRANTED) { + send_upstream_connected_locked(my_fed, fed); + } + } + + // Send back to the federate the maximum time plus an offset on a TIMESTAMP_START + // message. + // If it is a persistent federate, only the start_time is sent. If, however, it is a transient + // federate, the effective_start_tag is also sent. + size_t buffer_size = (my_fed->is_transient) ? MSG_TYPE_TIMESTAMP_TAG_LENGTH : MSG_TYPE_TIMESTAMP_LENGTH; + unsigned char start_time_buffer[buffer_size]; + start_time_buffer[0] = MSG_TYPE_TIMESTAMP; + encode_int64(swap_bytes_if_big_endian_int64(federation_start_time), &start_time_buffer[1]); + if (my_fed->is_transient) { + encode_tag(&(start_time_buffer[1 + sizeof(instant_t)]), federate_start_tag); + } + if (rti_remote->base.tracing_enabled) { + tracepoint_rti_to_federate(send_TIMESTAMP, my_fed->enclave.id, &federate_start_tag); + } + if (write_to_net(my_fed->net, buffer_size, start_time_buffer)) { + lf_print_error("Failed to send the starting time to federate %d.", my_fed->enclave.id); + } else { + // Update state for the federate to indicate that the MSG_TYPE_TIMESTAMP_START + // message has been sent. That MSG_TYPE_TIMESTAMP_START message grants time advance to + // the federate to the federate_start_tag.time. + my_fed->enclave.state = GRANTED; + lf_cond_broadcast(&sent_start_time); + LF_PRINT_LOG("RTI sent start time " PRINTF_TIME " to federate %d.", start_time, my_fed->enclave.id); + + // If this is a transient federate, notify its downstream federates that it is now connected. + if (my_fed->is_transient) { + for (int i = 0; i < my_fed->enclave.num_immediate_downstreams; i++) { + send_upstream_connected_locked(GET_FED_INFO(my_fed->enclave.immediate_downstreams[i]), my_fed); + } + } + } +} + void handle_timestamp(federate_info_t* my_fed) { unsigned char buffer[sizeof(int64_t)]; // Read bytes from the network abstraction. We need 8 bytes. @@ -710,49 +1066,150 @@ void handle_timestamp(federate_info_t* my_fed) { LF_PRINT_DEBUG("RTI received timestamp message with time: " PRINTF_TIME ".", timestamp); LF_MUTEX_LOCK(&rti_mutex); - rti_remote->num_feds_proposed_start++; - if (timestamp > rti_remote->max_start_time) { - rti_remote->max_start_time = timestamp; - } - if (rti_remote->num_feds_proposed_start == rti_remote->base.number_of_scheduling_nodes) { - // All federates have proposed a start time. - lf_cond_broadcast(&received_start_times); + + // Processing the TIMESTAMP depends on whether it is the startup phase. + if (rti_remote->phase == startup_phase) { + // Not all persistent federates have proposed a start time. + if (timestamp > rti_remote->max_start_time) { + rti_remote->max_start_time = timestamp; + } + // Note that if a transient federate's thread gets here during the startup phase, + // then it will be assigned the same global tag as its effective start tag and its + // timestamp will affect that start tag. + if (!my_fed->is_transient) { + rti_remote->num_feds_proposed_start++; + } + if (rti_remote->num_feds_proposed_start == + (rti_remote->base.number_of_scheduling_nodes - rti_remote->number_of_transient_federates)) { + // This federate is the last persistent federate to proposed a start time. + lf_cond_broadcast(&received_start_times); + rti_remote->phase = execution_phase; + } else { + // Wait until all persistent federates have proposed a start time. + while (rti_remote->num_feds_proposed_start < + (rti_remote->base.number_of_scheduling_nodes - rti_remote->number_of_transient_federates)) { + lf_cond_wait(&received_start_times); + } + } + // Add an offset to the maximum tag to get everyone starting together. + start_time = rti_remote->max_start_time + DELAY_START; + // Set the start_time in the RTI trace + if (rti_remote->base.tracing_enabled) { + lf_tracing_set_start_time(start_time); + } + my_fed->effective_start_tag = (tag_t){.time = start_time, .microstep = 0u}; + + // Notify the federate of its start tag. + // This has to be done while still holding the mutex. + send_start_tag_locked(my_fed, start_time, my_fed->effective_start_tag); + + LF_MUTEX_UNLOCK(&rti_mutex); + } else if (rti_remote->phase == shutdown_phase || !my_fed->is_transient) { + LF_MUTEX_UNLOCK(&rti_mutex); + + // Send reject message if the federation is in shutdown phase or if + // it is in the execution phase but the federate is persistent. + send_reject(my_fed->net, JOINING_TOO_LATE); + return; } else { - // Some federates have not yet proposed a start time. - // wait for a notification. - while (rti_remote->num_feds_proposed_start < rti_remote->base.number_of_scheduling_nodes) { - // FIXME: Should have a timeout here? - lf_cond_wait(&received_start_times); + // The federate is transient and we are in the execution phase. + // At this point, we already hold the mutex. + + //// Algorithm for computing the effective_start_time of a joining transient + // The effective_start_time will be the max among all the following tags: + // 1. At tag: (joining time, 0 microstep) + // 2. (start_time, 0 microstep) + // 3. The latest completed logical tag + 1 microstep + // 4. The latest granted (P)TAG + 1 microstep, of every downstream federate + // 5. The maximun tag of messages from the upstream federates + 1 microstep + + // Condition 1. + my_fed->effective_start_tag = (tag_t){.time = timestamp, .microstep = 0u}; + + // Condition 2. + if (timestamp < start_time) { + my_fed->effective_start_tag = (tag_t){.time = start_time, .microstep = 0u}; } - } - LF_MUTEX_UNLOCK(&rti_mutex); + // Condition 3. + if (lf_tag_compare(my_fed->enclave.completed, my_fed->effective_start_tag) >= 0) { + my_fed->effective_start_tag = my_fed->enclave.completed; + my_fed->effective_start_tag.microstep++; + } - // Send back to the federate the maximum time plus an offset on a TIMESTAMP - // message. - unsigned char start_time_buffer[MSG_TYPE_TIMESTAMP_LENGTH]; - start_time_buffer[0] = MSG_TYPE_TIMESTAMP; - // Add an offset to this start time to get everyone starting together. - start_time = rti_remote->max_start_time + DELAY_START; - lf_tracing_set_start_time(start_time); - encode_int64(swap_bytes_if_big_endian_int64(start_time), &start_time_buffer[1]); + // Condition 4. Iterate over the downstream federates + for (int j = 0; j < my_fed->enclave.num_immediate_downstreams; j++) { + federate_info_t* downstream = GET_FED_INFO(my_fed->enclave.immediate_downstreams[j]); - if (rti_remote->base.tracing_enabled) { - tag_t tag = {.time = start_time, .microstep = 0}; - tracepoint_rti_to_federate(send_TIMESTAMP, my_fed->enclave.id, &tag); - } - if (write_to_net(my_fed->net, MSG_TYPE_TIMESTAMP_LENGTH, start_time_buffer)) { - lf_print_error("Failed to send the starting time to federate %d.", my_fed->enclave.id); - } + // Get the max over the TAG of the downstreams + if (lf_tag_compare(downstream->enclave.last_granted, my_fed->effective_start_tag) >= 0) { + my_fed->effective_start_tag = downstream->enclave.last_granted; + my_fed->effective_start_tag.microstep++; + } - LF_MUTEX_LOCK(&rti_mutex); - // Update state for the federate to indicate that the MSG_TYPE_TIMESTAMP - // message has been sent. That MSG_TYPE_TIMESTAMP message grants time advance to - // the federate to the start time. - my_fed->enclave.state = GRANTED; - lf_cond_broadcast(&sent_start_time); - LF_PRINT_LOG("RTI sent start time " PRINTF_TIME " to federate %d.", start_time, my_fed->enclave.id); - LF_MUTEX_UNLOCK(&rti_mutex); + // Get the max over the PTAG of the downstreams + if (lf_tag_compare(downstream->enclave.last_provisionally_granted, my_fed->effective_start_tag) >= 0) { + my_fed->effective_start_tag = downstream->enclave.last_provisionally_granted; + my_fed->effective_start_tag.microstep++; + } + } + + // Condition 5. + // This one is a bit subtle. Any messages from upstream federates that the RTI has + // not yet seen will be sent to this joining federate after the effective_start_tag + // because the effective_start_tag is sent while still holding the mutex. + + // Iterate over the messages from the upstream federates + for (int j = 0; j < my_fed->enclave.num_immediate_upstreams; j++) { + federate_info_t* upstream = GET_FED_INFO(my_fed->enclave.immediate_upstreams[j]); + + size_t queue_size = pqueue_tag_size(upstream->in_transit_message_tags); + if (queue_size != 0) { + tag_t max_tag = pqueue_tag_max_tag(upstream->in_transit_message_tags); + + if (lf_tag_compare(max_tag, my_fed->effective_start_tag) >= 0) { + my_fed->effective_start_tag = max_tag; + my_fed->effective_start_tag.microstep++; + } + } + } + + // For every downstream that has a pending grant that is higher than the + // effective_start_time of the federate, cancel it. + // FIXME: Should this be higher-than or equal to? + // FIXME: Also, won't the grant simply be lost? + // If the joining federate doesn't send anything, the downstream federate won't issue another NET. + for (int j = 0; j < my_fed->enclave.num_immediate_downstreams; j++) { + federate_info_t* downstream = GET_FED_INFO(my_fed->enclave.immediate_downstreams[j]); + + // Ignore this federate if it has resigned. + if (downstream->enclave.state == NOT_CONNECTED) { + continue; + } + + // Check the pending grants, if any, and keep it only if it is + // sooner than the effective start tag. + pqueue_delayed_grant_element_t* dge = + pqueue_delayed_grants_find_by_fed_id(rti_remote->delayed_grants, downstream->enclave.id); + if (dge != NULL && lf_tag_compare(dge->base.tag, my_fed->effective_start_tag) > 0) { + pqueue_delayed_grants_remove(rti_remote->delayed_grants, dge); + } + } + + // Once the effective start time set, sent it to the joining transient, + // together with the start time of the federation. + + // Have to send the start tag while still holding the mutex to ensure that no message + // from an upstream federate is forwarded before the start tag. + send_start_tag_locked(my_fed, start_time, my_fed->effective_start_tag); + + // Whenver a transient joins, invalidate all federates, so that all min_delays_upstream + // get re-computed. + // FIXME: Maybe optimize it to only invalidate those affected by the transient + invalidate_min_delays(); + + LF_MUTEX_UNLOCK(&rti_mutex); + } } void send_physical_clock(unsigned char message_type, federate_info_t* fed, socket_type_t socket_type) { @@ -904,18 +1361,19 @@ void* clock_synchronization_thread(void* noargs) { */ static void handle_federate_failed(federate_info_t* my_fed) { // Nothing more to do. Close the network abstraction and exit. - LF_MUTEX_LOCK(&rti_mutex); - if (rti_remote->base.tracing_enabled) { tracepoint_rti_from_federate(receive_FAILED, my_fed->enclave.id, NULL); } + // First, mark a federate as disconnected and inform if necessary + notify_federate_disconnected(my_fed); + + LF_MUTEX_LOCK(&rti_mutex); + // Set the flag telling the RTI to exit with an error code when it exits. _lf_federate_reports_error = true; lf_print_error("RTI: Federate %d reports an error and has exited.", my_fed->enclave.id); - my_fed->enclave.state = NOT_CONNECTED; - // Indicate that there will no further events from this federate. my_fed->enclave.next_event = FOREVER_TAG; @@ -946,19 +1404,21 @@ static void handle_federate_failed(federate_info_t* my_fed) { */ static void handle_federate_resign(federate_info_t* my_fed) { // Nothing more to do. Close the network abstraction and exit. - LF_MUTEX_LOCK(&rti_mutex); - if (rti_remote->base.tracing_enabled) { tracepoint_rti_from_federate(receive_RESIGN, my_fed->enclave.id, NULL); } + // First, mark a federate as disconnected and inform if necessary + notify_federate_disconnected(my_fed); lf_print_info("RTI: Federate %d has resigned.", my_fed->enclave.id); - my_fed->enclave.state = NOT_CONNECTED; + LF_MUTEX_LOCK(&rti_mutex); // Indicate that there will no further events from this federate. my_fed->enclave.next_event = FOREVER_TAG; + // Use shutdown_net to gracefully close the connection. + // This signals the other side that no further writes are forthcoming and waits for EOF. shutdown_net(my_fed->net, true); my_fed->net = NULL; @@ -1012,7 +1472,7 @@ void* federate_info_thread_TCP(void* fed) { break; case MSG_TYPE_RESIGN: handle_federate_resign(my_fed); - return NULL; + break; case MSG_TYPE_NEXT_EVENT_TAG: handle_next_event_tag(my_fed); break; @@ -1042,14 +1502,37 @@ void* federate_info_thread_TCP(void* fed) { } } } + + // Nothing more to do. Close the net and exit. + // Prevent multiple threads from closing the same net at the same time. + LF_MUTEX_LOCK(&rti_mutex); + shutdown_net(my_fed->net, false); + my_fed->net = NULL; + // Manual clean, in case of a transient federate + if (my_fed->is_transient) { + // FIXME: Aren't there transit messages anymore??? + // free_in_transit_message_q(my_fed->in_transit_message_tags); + lf_print_info("RTI: Transient Federate %d thread exited.", my_fed->enclave.id); + + // Update the number of connected transient federates + rti_remote->number_of_connected_transient_federates--; + + // Reset the status of the leaving federate + reset_transient_federate(my_fed); + } + // Signal the hot swap mechanism, if needed + if (hot_swap_in_progress && hot_swap_federate->enclave.id == my_fed->enclave.id) { + hot_swap_old_resigned = true; + } + LF_MUTEX_UNLOCK(&rti_mutex); return NULL; } -void send_reject(net_abstraction_t net_abs, unsigned char error_code) { +void send_reject(net_abstraction_t net_abs, rejection_code_t error_code) { LF_PRINT_DEBUG("RTI sending MSG_TYPE_REJECT."); unsigned char response[2]; response[0] = MSG_TYPE_REJECT; - response[1] = error_code; + response[1] = (unsigned char)error_code; LF_MUTEX_LOCK(&rti_mutex); // NOTE: Ignore errors on this response. if (write_to_net(net_abs, 2, response)) { @@ -1070,8 +1553,8 @@ void send_reject(net_abstraction_t net_abs, unsigned char error_code) { * @return The federate ID for success or -1 for failure. */ static int32_t receive_and_check_fed_id_message(net_abstraction_t fed_net) { - // Buffer for message ID, federate ID, and federation ID length. - size_t length = 1 + sizeof(uint16_t) + 1; // Message ID, federate ID, length of fedration ID. + // Buffer for message ID, federate ID, type (persistent or transient), and federation ID length. + size_t length = 1 + sizeof(uint16_t) + 1; // Message ID, federate ID and length of federation ID. unsigned char buffer[length]; // Read bytes from the network abstraction. We need 4 bytes. @@ -1081,9 +1564,10 @@ static int32_t receive_and_check_fed_id_message(net_abstraction_t fed_net) { } uint16_t fed_id = rti_remote->base.number_of_scheduling_nodes; // Initialize to an invalid value. + bool is_transient = false; // First byte received is the message type. - if (buffer[0] != MSG_TYPE_FED_IDS) { + if (buffer[0] != MSG_TYPE_FED_IDS && buffer[0] != MSG_TYPE_TRANSIENT_FED_IDS) { if (rti_remote->base.tracing_enabled) { tracepoint_rti_to_federate(send_REJECT, fed_id, NULL); } @@ -1107,10 +1591,21 @@ static int32_t receive_and_check_fed_id_message(net_abstraction_t fed_net) { } else { // Received federate ID. fed_id = extract_uint16(buffer + 1); - LF_PRINT_DEBUG("RTI received federate ID: %d.", fed_id); - - // Read the federation ID. First read the length, which is one byte. + // Read the federation ID length, which is one byte. size_t federation_id_length = (size_t)buffer[sizeof(uint16_t) + 1]; + if (buffer[0] == MSG_TYPE_TRANSIENT_FED_IDS) { + unsigned char buf; + read_from_net_close_on_error(fed_net, 1, &buf); + is_transient = (buf == 1) ? true : false; + } + + if (is_transient) { + LF_PRINT_LOG("RTI received federate ID: %d, which is transient.", fed_id); + } else { + LF_PRINT_LOG("RTI received federate ID: %d, which is persistent.", fed_id); + } + + // Read the federation ID. char federation_id_received[federation_id_length + 1]; // One extra for null terminator. // Next read the actual federation ID. if (read_from_net_close_on_error(fed_net, federation_id_length, (unsigned char*)federation_id_received)) { @@ -1146,18 +1641,58 @@ static int32_t receive_and_check_fed_id_message(net_abstraction_t fed_net) { send_reject(fed_net, FEDERATE_ID_OUT_OF_RANGE); return -1; } else { + // Find out if it is a new connection or a hot swap. + // Reject if: + // - duplicate of a connected persistent federate + // - or hot_swap is already in progress (Only 1 hot swap at a time!), for that + // particular federate + // - or it is a hot swap, but it is not the execution phase yet if ((rti_remote->base.scheduling_nodes[fed_id])->state != NOT_CONNECTED) { - lf_print_error("RTI received duplicate federate ID: %d.", fed_id); - if (rti_remote->base.tracing_enabled) { - tracepoint_rti_to_federate(send_REJECT, fed_id, NULL); + if (!is_transient) { + lf_print_error("RTI received duplicate federate ID: %d.", fed_id); + if (rti_remote->base.tracing_enabled) { + tracepoint_rti_to_federate(send_REJECT, fed_id, NULL); + } + send_reject(fed_net, FEDERATE_ID_IN_USE); + return -1; + } else if (hot_swap_in_progress || rti_remote->phase != execution_phase) { + lf_print_warning("RTI rejects the connection of transient federate %d, \ + because a hot swap is already in progress for federate %d. \n\ + Only one hot swap operation is allowed at a time.", + fed_id, hot_swap_federate->enclave.id); + if (rti_remote->base.tracing_enabled) { + tracepoint_rti_to_federate(send_REJECT, fed_id, NULL); + } + send_reject(fed_net, FEDERATE_ID_IN_USE); + return -1; } - send_reject(fed_net, FEDERATE_ID_IN_USE); - return -1; } } } } - federate_info_t* fed = GET_FED_INFO(fed_id); + + federate_info_t* fed_twin = GET_FED_INFO(fed_id); + federate_info_t* fed; + // If the federate is already connected (making the request a duplicate), and that + // the federate is transient, and it is the execution phase, then mark that a hot + // swap is in progreass and initialize the hot_swap_federate. + // Otherwise, proceed with a normal transinet connection + if (fed_twin->enclave.state != NOT_CONNECTED && is_transient && fed_twin->is_transient && + rti_remote->phase == execution_phase && !hot_swap_in_progress) { + // Allocate memory for the new federate and initilize it + hot_swap_federate = (federate_info_t*)malloc(sizeof(federate_info_t)); + initialize_federate(hot_swap_federate, fed_id); + + // Set that hot swap is in progress + hot_swap_in_progress = true; + // free(fed); // Free the old memory to prevent memory leak + fed = hot_swap_federate; + lf_print_info("RTI: Hot Swap starting for federate %d.", fed_id); + } else { + fed = fed_twin; + fed->is_transient = is_transient; + } + // The MSG_TYPE_FED_IDS message has the right federation ID. fed->net = fed_net; @@ -1189,6 +1724,11 @@ static int32_t receive_and_check_fed_id_message(net_abstraction_t fed_net) { /** * Listen for a MSG_TYPE_NEIGHBOR_STRUCTURE message, and upon receiving it, fill * out the relevant information in the federate's struct. + * + * In case of a hot swap, check that no changes were made to the connections, compared + * to the first instance that joigned. This means that the first instance to join + * __is__ the reference. + * * @return 1 on success and 0 on failure. */ static int receive_connection_information(net_abstraction_t fed_net, uint16_t fed_id) { @@ -1205,7 +1745,19 @@ static int receive_connection_information(net_abstraction_t fed_net, uint16_t fe send_reject(fed_net, UNEXPECTED_MESSAGE); return 0; } else { + // In case of a transient federate that is joining again, or a hot swap, then + // check that the connection information did not change. federate_info_t* fed = GET_FED_INFO(fed_id); + federate_info_t* temp_fed = NULL; + if (lf_tag_compare(fed->effective_start_tag, NEVER_TAG) != 0) { + if (hot_swap_in_progress) { + fed = hot_swap_federate; + } else { + temp_fed = (federate_info_t*)calloc(1, sizeof(federate_info_t)); + initialize_federate(temp_fed, fed_id); + fed = temp_fed; + } + } // Read the number of upstream and downstream connections fed->enclave.num_immediate_upstreams = extract_int32(&(connection_info_header[1])); fed->enclave.num_immediate_downstreams = extract_int32(&(connection_info_header[1 + sizeof(int32_t)])); @@ -1255,6 +1807,46 @@ static int receive_connection_information(net_abstraction_t fed_net, uint16_t fe free(connections_info_body); } + + // NOTE: In this design, changes in the connections are not allowed. This means that the first + // instance to join __is__ the reference. If this policy is to be changed, then it is in + // the following lines will be updated accordingly. + if (hot_swap_in_progress || temp_fed != NULL) { + if (temp_fed == NULL) { + temp_fed = hot_swap_federate; + } + // Now, compare the previous and the new neighberhood structure + // Start with the number of upstreams and downstreams + bool reject = false; + if ((fed->enclave.num_immediate_upstreams != temp_fed->enclave.num_immediate_upstreams) || + (fed->enclave.num_immediate_downstreams != temp_fed->enclave.num_immediate_downstreams)) { + reject = true; + } else { + // Then check all upstreams and their delays + for (int i = 0; i < fed->enclave.num_immediate_upstreams; i++) { + if ((fed->enclave.immediate_upstreams[i] != temp_fed->enclave.immediate_upstreams[i]) || + (fed->enclave.immediate_upstream_delays[i] != temp_fed->enclave.immediate_upstream_delays[i])) { + reject = true; + break; + } + } + if (!reject) { + // Finally, check all downstream federates + for (int i = 0; i < fed->enclave.num_immediate_downstreams; i++) { + if (fed->enclave.immediate_downstreams[i] != temp_fed->enclave.immediate_downstreams[i]) { + reject = true; + break; + } + } + } + } + if (reject) { + if (temp_fed != hot_swap_federate) { + free(temp_fed); + } + return 0; + } + } } LF_PRINT_DEBUG("RTI received neighbor structure from federate %d.", fed_id); return 1; @@ -1287,7 +1879,12 @@ static int receive_udp_message_and_set_up_clock_sync(net_abstraction_t fed_net, send_reject(fed_net, UNEXPECTED_MESSAGE); return 0; } else { - federate_info_t* fed = GET_FED_INFO(fed_id); + federate_info_t* fed; + if (hot_swap_in_progress) { + fed = hot_swap_federate; + } else { + fed = GET_FED_INFO(fed_id); + } if (rti_remote->clock_sync_global_status >= clock_sync_init) { // If no initial clock sync, no need perform initial clock sync. uint16_t federate_UDP_port_number = extract_uint16(&(response[1])); @@ -1413,8 +2010,8 @@ static bool authenticate_federate(net_abstraction_t fed_net) { } #endif -void lf_connect_to_federates(net_abstraction_t rti_net) { - for (int i = 0; i < rti_remote->base.number_of_scheduling_nodes; i++) { +void lf_connect_to_persistent_federates(net_abstraction_t rti_net) { + for (int i = 0; i < rti_remote->base.number_of_scheduling_nodes - rti_remote->number_of_transient_federates; i++) { net_abstraction_t fed_net = accept_net(rti_net); if (fed_net == NULL) { lf_print_warning("RTI failed to accept the federate."); @@ -1446,13 +2043,21 @@ void lf_connect_to_federates(net_abstraction_t rti_net) { // synchronization messages. federate_info_t* fed = GET_FED_INFO(fed_id); lf_thread_create(&(fed->thread_id), federate_info_thread_TCP, fed); + + // If the federate is transient, then do not count it. + if (fed->is_transient) { + rti_remote->number_of_connected_transient_federates++; + assert(rti_remote->number_of_connected_transient_federates <= rti_remote->number_of_transient_federates); + i--; + lf_print_info("RTI: Transient federate %d joined.", fed->enclave.id); + } } else { // Received message was rejected. Try again. i--; } } // All federates have connected. - LF_PRINT_DEBUG("All federates have connected to RTI."); + LF_PRINT_DEBUG("All persistent federates have connected to RTI."); if (rti_remote->clock_sync_global_status >= clock_sync_on) { // Create the thread that performs periodic PTP clock synchronization sessions @@ -1472,6 +2077,175 @@ void lf_connect_to_federates(net_abstraction_t rti_net) { } } +/** + * @brief A request for immediate stop to the federate + * + * @param fed: the deferate to stop + */ +void send_stop(federate_info_t* fed) { + // Reply with a stop granted to all federates + unsigned char outgoing_buffer[MSG_TYPE_STOP_LENGTH]; + outgoing_buffer[0] = MSG_TYPE_STOP; + lf_print("RTI sent MSG_TYPE_STOP to federate %d.", fed->enclave.id); + + if (rti_remote->base.tracing_enabled) { + tracepoint_rti_to_federate(send_STOP, fed->enclave.id, NULL); + } + write_to_net_fail_on_error(fed->net, MSG_TYPE_STOP_LENGTH, outgoing_buffer, NULL, + "RTI failed to send MSG_TYPE_STOP message to federate %d.", fed->enclave.id); + + LF_PRINT_LOG("RTI sent MSG_TYPE_STOP to federate %d.", fed->enclave.id); +} + +void* lf_connect_to_transient_federates_thread(void* nothing) { + // This loop will continue to accept connections of transient federates, as soon as there is room, or enable hot swap + while (!rti_remote->all_persistent_federates_exited) { + // Continue waiting for an incoming connection requests from transients to join, or for hot swap. + // Wait for an incoming connection request. + net_abstraction_t fed_net = accept_net(rti_remote->rti_net); + + // If accept failed (e.g., net was shut down), exit the loop. + if (fed_net == NULL) { + break; + } + +// Wait for the first message from the federate when RTI -a option is on. +#ifdef __RTI_AUTH__ + if (rti_remote->authentication_enabled) { + if (!authenticate_federate(fed_net)) { + lf_print_warning("RTI failed to authenticate the incoming federate."); + // Close the network abstraction without reading until EOF. + shutdown_net(fed_net, false); + fed_net = NULL; + continue; + } + } +#endif + + // The first message from the federate should contain its ID and the federation ID. + // The function also detects if a hot swap request is initiated. + int32_t fed_id = receive_and_check_fed_id_message(fed_net); + + if (fed_id >= 0 && receive_connection_information(fed_net, (uint16_t)fed_id) && + receive_udp_message_and_set_up_clock_sync(fed_net, (uint16_t)fed_id)) { + LF_MUTEX_LOCK(&rti_mutex); + if (hot_swap_in_progress) { + lf_print("RTI: Hot swap confirmed for federate %d.", fed_id); + + // Then send STOP + federate_info_t* fed_old = GET_FED_INFO(fed_id); + hot_swap_federate->enclave.completed = fed_old->enclave.completed; + + LF_PRINT_LOG("RTI: Send MSG_TYPE_STOP to old federate %d.", fed_id); + send_stop(fed_old); + LF_MUTEX_UNLOCK(&rti_mutex); + + // Wait for the old federate to send MSG_TYPE_RESIGN + LF_PRINT_LOG("RTI: Waiting for old federate %d to send resign.", fed_id); + // FIXME: This is a busy wait! Need instead a lf_cond_wait on a condition variable. + while (!hot_swap_old_resigned) { + } + + // The latest LTC is the tag at which the old federate resigned. This is useful + // for computing the effective_start_time of the new joining federate. + hot_swap_federate->enclave.completed = fed_old->enclave.completed; + + // Create a thread to communicate with the federate. + // This has to be done after clock synchronization is finished + // or that thread may end up attempting to handle incoming clock + // synchronization messages. + lf_thread_create(&(hot_swap_federate->thread_id), federate_info_thread_TCP, hot_swap_federate); + + // Redirect the federate in rti_remote + rti_remote->base.scheduling_nodes[fed_id] = (scheduling_node_t*)hot_swap_federate; + + // Free the old federate memory and reset the Hot wap indicators + // FIXME: Is this enough to free the memory allocated to the federate? + free(fed_old); + lf_mutex_lock(&rti_mutex); + hot_swap_in_progress = false; + lf_mutex_unlock(&rti_mutex); + + lf_print("RTI: Hot swap succeeded for federate %d.", fed_id); + } else { + lf_mutex_unlock(&rti_mutex); + + // Create a thread to communicate with the federate. + // This has to be done after clock synchronization is finished + // or that thread may end up attempting to handle incoming clock + // synchronization messages. + federate_info_t* fed = GET_FED_INFO(fed_id); + lf_thread_create(&(fed->thread_id), federate_info_thread_TCP, fed); + lf_print_info("RTI: Transient federate %d joined.", fed_id); + } + rti_remote->number_of_connected_transient_federates++; + } else { + // If a hot swap was initialed, but the connection information or/and clock + // synchronization fail, then reset hot_swap_in_profress, and free the memory + // allocated for hot_swap_federate + if (hot_swap_in_progress) { + lf_print("RTI: Hot swap canceled for federate %d.", fed_id); + lf_mutex_lock(&rti_mutex); + hot_swap_in_progress = false; + lf_mutex_unlock(&rti_mutex); + + // FIXME: Is this enough to free the memory of a federate_info_t data structure? + free(hot_swap_federate); + } + } + } + return NULL; +} + +/** + * @brief Thread that manages the delayed grants using a priprity queue. + * + * This thread is responsible for managing the priority queue of delayed grants to be issued. + * It waits until the current time matches the highest priority tag time in the queue. + * If reached, it notifies the grant immediately. If, however, the current time has not yet + * reached the highest priority tag and the queue has been updated (either by inserting or + * canceling an entry), the thread stops waiting and restarts the process again. + */ +static void* lf_delayed_grants_thread(void* nothing) { + initialize_lf_thread_id(); + // Hold the mutex when not waiting. + LF_MUTEX_LOCK(&rti_mutex); + while (!rti_remote->all_persistent_federates_exited) { + if (pqueue_delayed_grants_size(rti_remote->delayed_grants) > 0) { + // Do not pop, but rather peek. + pqueue_delayed_grant_element_t* next = pqueue_delayed_grants_peek(rti_remote->delayed_grants); + instant_t next_time = next->base.tag.time; + // Wait for expiration, or a signal to stop or terminate. + int ret = lf_clock_cond_timedwait(&updated_delayed_grants, next_time); + if (ret == LF_TIMEOUT) { + // Time reached to send the grant. + // However, the grant may have been canceled while we were waiting. + pqueue_delayed_grant_element_t* new_next = pqueue_delayed_grants_peek(rti_remote->delayed_grants); + if (next == new_next) { + pqueue_delayed_grants_pop(rti_remote->delayed_grants); + federate_info_t* fed = GET_FED_INFO(next->fed_id); + if (next->is_provisional) { + notify_provisional_tag_advance_grant_immediate(&(fed->enclave), next->base.tag); + } else { + notify_tag_advance_grant_immediate(&(fed->enclave), next->base.tag); + } + free(next); + } + } else if (ret != 0) { + // An error occurred. + lf_print_error_and_exit("lf_delayed_grants_thread: lf_clock_cond_timedwait failed with code %d.", ret); + } + } else if (pqueue_delayed_grants_size(rti_remote->delayed_grants) == 0) { + // Wait for something to appear on the queue. + lf_cond_wait(&updated_delayed_grants); + } + } + // Free any delayed grants that are still on the queue. + pqueue_delayed_grants_free(rti_remote->delayed_grants); + LF_MUTEX_UNLOCK(&rti_mutex); + return NULL; +} + void* respond_to_erroneous_connections(void* nothing) { initialize_lf_thread_id(); while (true) { @@ -1489,7 +2263,7 @@ void* respond_to_erroneous_connections(void* nothing) { lf_print_error("RTI received an unexpected connection request. Federation is running."); unsigned char response[2]; response[0] = MSG_TYPE_REJECT; - response[1] = FEDERATION_ID_DOES_NOT_MATCH; + response[1] = (unsigned char)FEDERATION_ID_DOES_NOT_MATCH; // Ignore errors on this response. if (write_to_net(fed_net, 2, response)) { lf_print_warning("RTI failed to write FEDERATION_ID_DOES_NOT_MATCH to erroneous incoming connection."); @@ -1506,6 +2280,30 @@ void initialize_federate(federate_info_t* fed, uint16_t id) { fed->requested_stop = false; fed->clock_synchronization_enabled = true; fed->in_transit_message_tags = pqueue_tag_init(10); + fed->has_upstream_transient_federates = false; + fed->is_transient = true; + fed->effective_start_tag = NEVER_TAG; +} + +void reset_transient_federate(federate_info_t* fed) { + // Reset all the timing information from the previous run + fed->enclave.completed = NEVER_TAG; + fed->enclave.last_granted = NEVER_TAG; + fed->enclave.last_provisionally_granted = NEVER_TAG; + fed->enclave.next_event = NEVER_TAG; + // Reset of the federate-related attributes + shutdown_net(fed->net, false); + fed->net = initialize_net(); + fed->clock_synchronization_enabled = true; + // FIXME: The following two lines can be improved? + pqueue_tag_free(fed->in_transit_message_tags); + fed->in_transit_message_tags = pqueue_tag_init(10); + fed->requested_stop = false; + fed->effective_start_tag = NEVER_TAG; + // Whenver a transient resigns or leaves, invalidate all federates, so that all min_delays_upstream + // get re-computed. + // FIXME: Maybe optimize it to only invalidate those affected by the transient + invalidate_min_delays(); } int start_rti_server() { @@ -1530,36 +2328,131 @@ int start_rti_server() { return 0; } +/** + * Iterate over the federates and sets 'has_upstream_transient_federates'. + * Once done, check that no transient federate has an upstream transient federate. + * and compute the number of persistent federates that do have upstream transients, + * which is the maximun number of delayed grants that can be pending at the same time. + * This is useful for initialyzing the queue of delayed grants. + + * @return -1, if there is more than one level of transiency, else, the number of + * persistents that have an upstream transient + */ +static int set_has_upstream_transient_federates_parameter_and_check() { + for (int i = 0; i < rti_remote->base.number_of_scheduling_nodes; i++) { + federate_info_t* fed = GET_FED_INFO(i); + for (int j = 0; j < fed->enclave.num_immediate_upstreams; j++) { + federate_info_t* upstream_fed = GET_FED_INFO(fed->enclave.immediate_upstreams[j]); + if (upstream_fed->is_transient) { + fed->has_upstream_transient_federates = true; + break; + } + } + } + + // Now check that no transient has an upstream transient + // FIXME: Do we really need this? Or should it be the job of the validator? + uint16_t max_number_of_delayed_grants = 0; + for (int i = 0; i < rti_remote->base.number_of_scheduling_nodes; i++) { + federate_info_t* fed = GET_FED_INFO(i); + if (fed->is_transient && fed->has_upstream_transient_federates) { + return -1; + } + if (!fed->is_transient && fed->has_upstream_transient_federates) { + max_number_of_delayed_grants++; + } + } + return max_number_of_delayed_grants; +} + void wait_for_federates() { - // Wait for connections from federates and create a thread for each. - lf_connect_to_federates(rti_remote->rti_net); + // Wait for connections from persistent federates and create a thread for each. + lf_connect_to_persistent_federates(rti_remote->rti_net); + + // Set has_upstream_transient_federates parameter in all federates and check + // that there is no more than one level of transiency + if (rti_remote->number_of_transient_federates > 0) { + int max_number_of_pending_grants = set_has_upstream_transient_federates_parameter_and_check(); + if (max_number_of_pending_grants == -1) { + lf_print_error_and_exit("RTI: Transient federates cannot have transient upstreams!"); + } + rti_remote->delayed_grants = pqueue_delayed_grants_init(max_number_of_pending_grants); + } - // All federates have connected. - lf_print_info("RTI: All expected federates have connected. Starting execution."); + // All persistent federates have connected. + lf_print_info("RTI: All expected persistent federates have connected. Starting execution."); + if (rti_remote->number_of_transient_federates > 0) { + lf_print_info("RTI: Transient Federates can join and leave the federation at any time."); + } - // The network abstraction server will not continue to accept connections after all the federates - // have joined. + // The network abstraction server will only continue to accept connections from transient + // federates. // In case some other federation's federates are trying to join the wrong // federation, need to respond. Start a separate thread to do that. lf_thread_t responder_thread; - lf_thread_create(&responder_thread, respond_to_erroneous_connections, NULL); + lf_thread_t transient_thread; + lf_thread_t delayed_grants_thread; - // Wait for federate threads to exit. + // If the federation does not include transient federates, then respond to + // erronous connections. Otherwise, continue to accept transients joining and + // respond to duplicate joing requests. + if (rti_remote->number_of_transient_federates == 0) { + lf_thread_create(&responder_thread, respond_to_erroneous_connections, NULL); + } else if (rti_remote->number_of_transient_federates > 0) { + lf_thread_create(&transient_thread, lf_connect_to_transient_federates_thread, NULL); + lf_thread_create(&delayed_grants_thread, lf_delayed_grants_thread, NULL); + } + + // Wait for persistent federate threads to exit. void* thread_exit_status; for (int i = 0; i < rti_remote->base.number_of_scheduling_nodes; i++) { federate_info_t* fed = GET_FED_INFO(i); - LF_PRINT_LOG("RTI: Waiting for thread handling federate %d.", fed->enclave.id); - lf_thread_join(fed->thread_id, &thread_exit_status); - pqueue_tag_free(fed->in_transit_message_tags); - LF_PRINT_LOG("RTI: Federate %d thread exited.", fed->enclave.id); + if (!fed->is_transient) { + LF_PRINT_LOG("RTI: Waiting for thread handling federate %d.", fed->enclave.id); + lf_thread_join(fed->thread_id, &thread_exit_status); + pqueue_tag_free(fed->in_transit_message_tags); + LF_PRINT_LOG("RTI: Persistent federate %d thread exited.", fed->enclave.id); + } } - rti_remote->all_federates_exited = true; + rti_remote->all_persistent_federates_exited = true; + rti_remote->phase = shutdown_phase; + lf_print_info("RTI: All persistent threads exited."); + + // Broadcast on updated_delayed_grants to wake up lf_delayed_grants_thread, + // which may be blocked in lf_cond_wait and uses all_persistent_federates_exited + // as its exit condition. + LF_MUTEX_LOCK(&rti_mutex); + lf_cond_broadcast(&updated_delayed_grants); + LF_MUTEX_UNLOCK(&rti_mutex); - // Shutdown and close the network abstraction that is listening for incoming connections - // so that the accept() call in respond_to_erroneous_connections returns. - // That thread should then check rti->all_federates_exited and it should exit. + // Shutdown and close the network abstraction that is listening for incoming connections. + // This must be done before joining transient threads so that the blocking + // accept_net() call in lf_connect_to_transient_federates_thread is + // unblocked, allowing that thread to see all_persistent_federates_exited + // and exit its loop. shutdown_net(rti_remote->rti_net, false); + + // Wait for transient federate threads to exit, if any. + if (rti_remote->number_of_transient_federates > 0) { + for (int i = 0; i < rti_remote->base.number_of_scheduling_nodes; i++) { + federate_info_t* fed = GET_FED_INFO(i); + // Only join if the federate is currently connected (thread is running). + // If state is NOT_CONNECTED, either the federate never joined (thread was + // never created) or it already resigned and its thread already exited. + if (fed->is_transient && fed->enclave.state != NOT_CONNECTED) { + LF_PRINT_LOG("RTI: Waiting for thread handling federate %d.", fed->enclave.id); + lf_thread_join(fed->thread_id, &thread_exit_status); + pqueue_tag_free(fed->in_transit_message_tags); + LF_PRINT_LOG("RTI: Transient federate %d thread exited.", fed->enclave.id); + } + } + // Join the transient listener and delayed grants threads to avoid leaking them. + lf_thread_join(transient_thread, &thread_exit_status); + lf_thread_join(delayed_grants_thread, &thread_exit_status); + } + + rti_remote->all_federates_exited = true; rti_remote->rti_net = NULL; if (rti_remote->socket_descriptor_UDP > 0) { @@ -1576,6 +2469,7 @@ void initialize_RTI(rti_remote_t* rti) { init_shutdown_mutex(); LF_COND_INIT(&received_start_times, &rti_mutex); LF_COND_INIT(&sent_start_time, &rti_mutex); + LF_COND_INIT(&updated_delayed_grants, &rti_mutex); initialize_rti_common(&rti_remote->base); rti_remote->base.mutex = &rti_mutex; @@ -1595,6 +2489,8 @@ void initialize_RTI(rti_remote_t* rti) { rti_remote->base.tracing_enabled = false; rti_remote->base.dnet_disabled = false; rti_remote->stop_in_progress = false; + rti_remote->number_of_transient_federates = 0; + rti_remote->phase = startup_phase; } // The RTI includes clock.c, which requires the following functions that are defined @@ -1606,6 +2502,7 @@ void clock_sync_subtract_offset(instant_t* t) { (void)t; } void free_scheduling_nodes(scheduling_node_t** scheduling_nodes, uint16_t number_of_scheduling_nodes) { invalidate_min_delays(); for (uint16_t i = 0; i < number_of_scheduling_nodes; i++) { + // FIXME: Gives error freeing memory not allocated!!!! scheduling_node_t* node = scheduling_nodes[i]; if (node->immediate_upstreams != NULL) { free(node->immediate_upstreams); diff --git a/core/federated/RTI/rti_remote.h b/core/federated/RTI/rti_remote.h index 0fcb4bfda..dd16f389d 100644 --- a/core/federated/RTI/rti_remote.h +++ b/core/federated/RTI/rti_remote.h @@ -62,6 +62,12 @@ typedef struct federate_info_t { /** @brief Record of in-transit messages to this federate that are not yet processed. This record is ordered based on * the time value of each message for a more efficient access. */ pqueue_tag_t* in_transit_message_tags; + /** @brief Indicates whether the federate has upstream transient federates. */ + bool has_upstream_transient_federates; + /** @brief Indicates whether the federate is transient or persistent. */ + bool is_transient; + /** @brief Records the start time of the federate, which is mainly useful for transient federates. */ + tag_t effective_start_tag; } federate_info_t; /** @@ -70,6 +76,29 @@ typedef struct federate_info_t { */ typedef enum clock_sync_stat { clock_sync_off, clock_sync_init, clock_sync_on } clock_sync_stat; +/** + * The federation life cycle phases. + */ +typedef enum federation_life_cycle_phase { + startup_phase, // Not all persistent federates have joined. + execution_phase, // All persistent federates have joined. + shutdown_phase // Federation is shutting down. +} federation_life_cycle_phase; + +/** + * @brief The type for an element in a delayed grants priority queue that is sorted by tag. + */ +typedef struct pqueue_delayed_grant_element_t { + pqueue_tag_element_t base; + uint16_t fed_id; // Id of the federate with delayed grant of tag (in base) + bool is_provisional; // Boolean recoding if the delayed grant is provisional +} pqueue_delayed_grant_element_t; + +/** + * @brief Type of a delayed grants queue sorted by tags. + */ +typedef pqueue_tag_t pqueue_delayed_grants_t; + /** * @brief Structure that an RTI instance uses to keep track of its own and its * corresponding federates' state. @@ -104,6 +133,16 @@ typedef struct rti_remote_t { */ volatile bool all_federates_exited; + /** + * @brief Boolean indicating that all persistent federates have exited. + * + * This gets set to true exactly once before the program waits for + * persistent federates, then exits. + * It is marked volatile because the write is not guarded by a mutex. + * The main thread makes this true. + */ + volatile bool all_persistent_federates_exited; + /** * @brief The ID of the federation that this RTI will supervise. * @@ -149,6 +188,27 @@ typedef struct rti_remote_t { /** @brief Boolean indicating that a stop request is already in progress. */ bool stop_in_progress; + + /** + * Number of transient federates + */ + int32_t number_of_transient_federates; + + /** + * Number of connected transient federates + */ + int32_t number_of_connected_transient_federates; + + /** + * Indicates the life cycle phase of the federation. + */ + federation_life_cycle_phase phase; + + /** + * Queue of the pending grants, in case transient federates are absent and + * issuing grants to their downstreams need to be delayed. + */ + pqueue_delayed_grants_t* delayed_grants; } rti_remote_t; extern int lf_critical_section_enter(environment_t* env); @@ -272,7 +332,7 @@ void handle_address_query(uint16_t fed_id); * field of the _RTI.federates[federate_id] array of structs. * * The server_hostname and server_ip_addr fields are assigned - * in lf_connect_to_federates() upon accepting the socket + * in lf_connect_to_persistent_federates() upon accepting the socket * from the remote federate. * * This function assumes the caller does not hold the mutex. @@ -352,18 +412,19 @@ void* federate_info_thread_TCP(void* fed); * @param net_abs Pointer to the network abstraction. * @param error_code An error code. */ -void send_reject(net_abstraction_t net_abs, unsigned char error_code); +void send_reject(net_abstraction_t net_abs, rejection_code_t error_code); /** - * @brief Wait for one incoming connection request from each federate, - * and, upon receiving it, create a thread to communicate with that federate. + * @brief Thread to wait for incoming connection request from transient federates. * @ingroup RTI * - * Return when all federates have connected. + * Upon receiving the connection request, check if a hot swap should start or + * simply create a thread to communicate with that federate. + * Stops if all persistent federates exited. * - * @param rti_net The rti's network abstraction on which to accept connections. + * @param nothing Nothing needed here. */ -void lf_connect_to_federates(net_abstraction_t rti_net); +void* lf_connect_to_transient_federates_thread(void* nothing); /** * @brief Thread to respond to new connections, which could be federates of other federations @@ -385,7 +446,15 @@ void* respond_to_erroneous_connections(void* nothing); void initialize_federate(federate_info_t* fed, uint16_t id); /** - * @brief Start the socket server for the runtime infrastructure (RTI). + * @brief Reset the federate. The federate has to be transient. + * @ingroup RTI + * + * @param fed A pointer to the federate + */ +void reset_transient_federate(federate_info_t* fed); + +/** + * @brief Start the server for the runtime infrastructure (RTI). * @ingroup RTI */ int start_rti_server(); diff --git a/core/federated/federate.c b/core/federated/federate.c index c5a94ae4e..7c880777b 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -39,6 +39,7 @@ // Global variables defined in tag.c: extern instant_t start_time; +extern tag_t effective_start_tag; // Global variable defined in reactor_common.c: extern bool _lf_termination_executed; @@ -85,7 +86,8 @@ federate_instance_t _fed = {.number_of_inbound_p2p_connections = 0, .last_sent_LTC = {.time = NEVER, .microstep = 0u}, .last_sent_NET = {.time = NEVER, .microstep = 0u}, .last_skipped_NET = {.time = NEVER, .microstep = 0u}, - .min_delay_from_physical_action_to_federate_output = NEVER}; + .min_delay_from_physical_action_to_federate_output = NEVER, + .is_transient = false}; federation_metadata_t federation_metadata = { .federation_id = "Unidentified Federation", .rti_host = NULL, .rti_port = -1, .rti_user = NULL}; @@ -151,6 +153,8 @@ extern interval_t _lf_action_delay_table[]; extern size_t _lf_action_table_size; extern lf_action_base_t* _lf_zero_delay_cycle_action_table[]; extern size_t _lf_zero_delay_cycle_action_table_size; +extern uint16_t _lf_zero_delay_cycle_upstream_ids[]; +extern bool _lf_zero_delay_cycle_upstream_disconnected[]; extern reaction_t* network_input_reactions[]; extern size_t num_network_input_reactions; extern reaction_t* port_absent_reaction[]; @@ -176,7 +180,7 @@ static lf_action_base_t* action_for_port(int port_id) { /** * Update the last known status tag of all network input ports - * to the value of `tag`, unless that the provided `tag` is less + * to the value of `tag`, unless the provided `tag` is less * than the last_known_status_tag of the port. This is called when * a TAG signal is received from the RTI in centralized coordination. * If any update occurs, then this broadcasts on `lf_port_status_changed`. @@ -242,8 +246,8 @@ static void update_last_known_status_on_input_ports(tag_t tag, environment_t* en * * @param env The top-level environment, whose mutex is assumed to be held. * @param tag The tag on which the latest status of the specified network input port is known. + * @param port_id The port ID. * @param warn If true, print a warning if the tag is less than the last known status tag of the port. - * @param portID The port ID. */ static void update_last_known_status_on_input_port(environment_t* env, tag_t tag, int port_id, bool warn) { if (lf_tag_compare(tag, env->current_tag) < 0) @@ -307,13 +311,41 @@ static void mark_inputs_known_absent(int fed_id) { } /** - * Set the status of network port with id portID. + * @brief Update the last known status tag of a network input action. + * + * This function is similar to update_last_known_status_on_input_port, but + * it is called when a PTAG is granted and an upstream transient federate is not + * connected. It updates the last known status tag of the network input action + * so that it will not wait for a message or absent message from the upstream federate. + * + * This function assumes the caller holds the mutex on the top-level environment, + * and, if the tag actually increases, it broadcasts on `lf_port_status_changed`. + * + * @param env The top-level environment, whose mutex is assumed to be held. + * @param action The action associated with the network input port. + * @param tag The tag of the PTAG. + */ +static void update_last_known_status_on_action(environment_t* env, lf_action_base_t* action, tag_t tag) { + if (lf_tag_compare(tag, env->current_tag) < 0) + tag = env->current_tag; + trigger_t* input_port_trigger = action->trigger; + if (lf_tag_compare(tag, input_port_trigger->last_known_status_tag) > 0) { + LF_PRINT_LOG("Updating the last known status tag of port for upstream absent transient federate from " PRINTF_TAG + " to " PRINTF_TAG ".", + input_port_trigger->last_known_status_tag.time - lf_time_start(), + input_port_trigger->last_known_status_tag.microstep, tag.time - lf_time_start(), tag.microstep); + input_port_trigger->last_known_status_tag = tag; + } +} + +/** + * Set the status of network port with id port_id. * - * @param portID The network port ID + * @param port_id The network port ID * @param status The network port status (port_status_t) */ -static void set_network_port_status(int portID, port_status_t status) { - lf_action_base_t* network_input_port_action = action_for_port(portID); +static void set_network_port_status(int port_id, port_status_t status) { + lf_action_base_t* network_input_port_action = action_for_port(port_id); network_input_port_action->trigger->status = status; } @@ -700,7 +732,7 @@ static int handle_port_absent_message(net_abstraction_t net, int fed_id) { tracepoint_federate_from_federate(receive_PORT_ABS, _lf_my_fed_id, fed_id, &intended_tag); } LF_PRINT_LOG("Handling port absent for tag " PRINTF_TAG " for port %hu of fed %d.", - intended_tag.time - lf_time_start(), intended_tag.microstep, port_id, fed_id); + intended_tag.time - lf_time_start(), intended_tag.microstep, port_id, _lf_my_fed_id); // Environment is always the one corresponding to the top-level scheduling enclave. environment_t* env; @@ -862,7 +894,7 @@ static int perform_hmac_authentication() { if (received[0] == MSG_TYPE_FAILED) { lf_print_error("RTI has failed."); return -1; - } else if (received[0] == MSG_TYPE_REJECT && received[1] == RTI_NOT_EXECUTED_WITH_AUTH) { + } else if (received[0] == MSG_TYPE_REJECT && received[1] == (unsigned char)RTI_NOT_EXECUTED_WITH_AUTH) { lf_print_error("RTI is not executed with HMAC option."); return -1; } else { @@ -885,7 +917,7 @@ static int perform_hmac_authentication() { lf_print_error("HMAC authentication failed."); unsigned char response[2]; response[0] = MSG_TYPE_REJECT; - response[1] = HMAC_DOES_NOT_MATCH; + response[1] = (unsigned char)HMAC_DOES_NOT_MATCH; // Ignore errors on writing back. write_to_net(_fed.net_to_RTI, 2, response); @@ -908,6 +940,44 @@ static int perform_hmac_authentication() { } #endif +/** + * @brief Handle message from the RTI that an upstream federate has connected. + * + */ +static void handle_upstream_connected_message(void) { + size_t bytes_to_read = sizeof(uint16_t); + unsigned char buffer[bytes_to_read]; + read_from_net_fail_on_error(_fed.net_to_RTI, bytes_to_read, buffer, + "Failed to read upstream connected message from RTI."); + uint16_t connected = extract_uint16(buffer); + LF_PRINT_DEBUG("Received notification that upstream federate %d has connected", connected); + // Mark the upstream as connected. + for (size_t i = 0; i < _lf_zero_delay_cycle_action_table_size; i++) { + if (_lf_zero_delay_cycle_upstream_ids[i] == connected) { + _lf_zero_delay_cycle_upstream_disconnected[i] = false; + } + } +} + +/** + * @brief Handle message from the RTI that an upstream federate has disconnected. + * + */ +static void handle_upstream_disconnected_message(void) { + size_t bytes_to_read = sizeof(uint16_t); + unsigned char buffer[bytes_to_read]; + read_from_net_fail_on_error(_fed.net_to_RTI, bytes_to_read, buffer, + "Failed to read upstream disconnected message from RTI."); + uint16_t disconnected = extract_uint16(buffer); + LF_PRINT_DEBUG("Received notification that upstream federate %d has disconnected", disconnected); + // Mark the upstream as disconnected. + for (size_t i = 0; i < _lf_zero_delay_cycle_action_table_size; i++) { + if (_lf_zero_delay_cycle_upstream_ids[i] == disconnected) { + _lf_zero_delay_cycle_upstream_disconnected[i] = true; + } + } +} + /** * Send the specified timestamp to the RTI and wait for a response. * The specified timestamp should be current physical time of the @@ -923,28 +993,50 @@ static instant_t get_start_time_from_rti(instant_t my_physical_time) { // Read bytes from the network abstraction. We need 9 bytes. // Buffer for message ID plus timestamp. - size_t buffer_length = 1 + sizeof(instant_t); + size_t buffer_length = (_fed.is_transient) ? MSG_TYPE_TIMESTAMP_TAG_LENGTH : MSG_TYPE_TIMESTAMP_LENGTH; unsigned char buffer[buffer_length]; - read_from_net_fail_on_error(_fed.net_to_RTI, buffer_length, buffer, - "Failed to read MSG_TYPE_TIMESTAMP message from RTI."); - LF_PRINT_DEBUG("Read 9 bytes."); - - // First byte received is the message ID. - if (buffer[0] != MSG_TYPE_TIMESTAMP) { - if (buffer[0] == MSG_TYPE_FAILED) { - lf_print_error_and_exit("RTI has failed."); + while (true) { + read_from_net_fail_on_error(_fed.net_to_RTI, 1, buffer, "Failed to read MSG_TYPE_TIMESTAMP message from RTI."); + // First byte received is the message ID. + if (buffer[0] != MSG_TYPE_TIMESTAMP) { + if (buffer[0] == MSG_TYPE_FAILED) { + lf_print_error_and_exit("RTI has failed."); + } else if (buffer[0] == MSG_TYPE_UPSTREAM_CONNECTED) { + // We need to handle this message and continue waiting for MSG_TYPE_TIMESTAMP to arrive + handle_upstream_connected_message(); + continue; + } else if (buffer[0] == MSG_TYPE_UPSTREAM_DISCONNECTED) { + // We need to handle this message and continue waiting for MSG_TYPE_TIMESTAMP to arrive + handle_upstream_disconnected_message(); + continue; + } else { + lf_print_error_and_exit("Expected a MSG_TYPE_TIMESTAMP message from the RTI. Got %u (see net_common.h).", + buffer[0]); + } + } else { + read_from_net_fail_on_error(_fed.net_to_RTI, buffer_length - 1, buffer + 1, + "Failed to read MSG_TYPE_TIMESTAMP message from RTI."); + break; } - lf_print_error_and_exit("Expected a MSG_TYPE_TIMESTAMP message from the RTI. Got %u (see net_common.h).", - buffer[0]); } + LF_PRINT_DEBUG("Read %zu bytes.", buffer_length); + instant_t timestamp = extract_int64(&(buffer[1])); + lf_print_info("Federation start time is: " PRINTF_TIME ".", timestamp); + if (_fed.is_transient) { + effective_start_tag = extract_tag(&(buffer[9])); + lf_print_info("Effective relative start tag is: (" PRINTF_TAG ").", effective_start_tag.time - timestamp, + effective_start_tag.microstep); + } else { + effective_start_tag = (tag_t){.time = timestamp, .microstep = 0u}; + } - tag_t tag = {.time = timestamp, .microstep = 0}; - // Trace the event when tracing is enabled - tracepoint_federate_from_rti(receive_TIMESTAMP, _lf_my_fed_id, &tag); - lf_print_info("Starting timestamp is: " PRINTF_TIME ".", timestamp); + // Trace the event when tracing is enabled. + // Note that we report in the trace the effective_start_tag. + // This is rather a choice. To be changed, if needed, of course. + tracepoint_federate_from_rti(receive_TIMESTAMP, _lf_my_fed_id, &effective_start_tag); LF_PRINT_LOG("Current physical time is: " PRINTF_TIME ".", lf_time_physical()); return timestamp; @@ -960,7 +1052,7 @@ static instant_t get_start_time_from_rti(instant_t my_physical_time) { * a notification of this update, which may unblock whichever worker * thread is trying to advance time. * - * @note This function is very similar to handle_provisinal_tag_advance_grant() except that + * @note This function is very similar to handle_provisional_tag_advance_grant() except that * it sets last_TAG_was_provisional to false. */ static void handle_tag_advance_grant(void) { @@ -1207,7 +1299,8 @@ static void* update_ports_from_staa_offsets(void* args) { * * @note This function is similar to handle_tag_advance_grant() except that * it sets last_TAG_was_provisional to true and also it does not update the - * last known tag for input ports. + * last known tag for input ports unless there is an upstream federate that is + * disconnected. */ static void handle_provisional_tag_advance_grant() { // Environment is always the one corresponding to the top-level scheduling enclave. @@ -1244,6 +1337,12 @@ static void handle_provisional_tag_advance_grant() { env->current_tag.time - start_time, env->current_tag.microstep, _fed.last_TAG.time - start_time, _fed.last_TAG.microstep); + for (size_t i = 0; i < _lf_zero_delay_cycle_action_table_size; i++) { + if (_lf_zero_delay_cycle_upstream_disconnected[i]) { + update_last_known_status_on_action(env, _lf_zero_delay_cycle_action_table[i], PTAG); + } + } + // Even if we don't modify the event queue, we need to broadcast a change // because we do not need to continue to wait for a TAG. lf_cond_broadcast(&env->event_q_changed); @@ -1343,6 +1442,20 @@ static void handle_stop_granted_message() { } } +/** + * @brief Handle a MSG_TYPE_STOP message from the RTI. + * + * This function simply calls lf_stop(). + */ +void handle_stop() { + // Trace the event when tracing is enabled + tracepoint_federate_from_rti(receive_STOP, _lf_my_fed_id, NULL); + + lf_print("Received from RTI a MSG_TYPE_STOP at physical time " PRINTF_TIME ".", lf_time_physical()); + + lf_stop(); +} + /** * Handle a MSG_TYPE_STOP_REQUEST message from the RTI. */ @@ -1546,6 +1659,9 @@ static void* listen_to_rti_net(void* args) { case MSG_TYPE_STOP_GRANTED: handle_stop_granted_message(); break; + case MSG_TYPE_STOP: + handle_stop(); + break; case MSG_TYPE_PORT_ABSENT: if (handle_port_absent_message(_fed.net_to_RTI, -1)) { // Failures to complete the read of absent messages from the RTI are fatal. @@ -1558,6 +1674,12 @@ static void* listen_to_rti_net(void* args) { case MSG_TYPE_FAILED: handle_rti_failed_message(); break; + case MSG_TYPE_UPSTREAM_CONNECTED: + handle_upstream_connected_message(); + break; + case MSG_TYPE_UPSTREAM_DISCONNECTED: + handle_upstream_disconnected_message(); + break; case MSG_TYPE_CLOCK_SYNC_T1: case MSG_TYPE_CLOCK_SYNC_T4: lf_print_error("Federate %d received unexpected clock sync message from RTI.", _lf_my_fed_id); @@ -1785,15 +1907,16 @@ void lf_connect_to_federate(uint16_t remote_federate_id) { break; } // Connect was successful. - size_t buffer_length = 1 + sizeof(uint16_t) + 1; + size_t buffer_length = 1 + sizeof(uint16_t) + 1 + 1; unsigned char buffer[buffer_length]; buffer[0] = MSG_TYPE_P2P_SENDING_FED_ID; if (_lf_my_fed_id == UINT16_MAX) { lf_print_error_and_exit("Too many federates! More than %d.", UINT16_MAX - 1); } encode_uint16((uint16_t)_lf_my_fed_id, (unsigned char*)&(buffer[1])); + buffer[1 + sizeof(uint16_t)] = _fed.is_transient ? 1 : 0; unsigned char federation_id_length = (unsigned char)strnlen(federation_metadata.federation_id, 255); - buffer[sizeof(uint16_t) + 1] = federation_id_length; + buffer[sizeof(uint16_t) + 2] = federation_id_length; // Trace the event when tracing is enabled tracepoint_federate_to_federate(send_FED_ID, _lf_my_fed_id, remote_federate_id, NULL); @@ -1851,7 +1974,7 @@ void lf_connect_to_rti(const char* hostname, int port) { while (!CHECK_TIMEOUT(start_connect, CONNECT_TIMEOUT) && !_lf_termination_executed) { // Have connected to an RTI, but not sure it's the right RTI. - // Send a MSG_TYPE_FED_IDS message and wait for a reply. + // Send a MSG_TYPE_FED_IDS or MSG_TYPE_TRANSIENT_FED_IDS message and wait for a reply. // Notify the RTI of the ID of this federate and its federation. #ifdef FEDERATED_AUTHENTICATED @@ -1868,9 +1991,14 @@ void lf_connect_to_rti(const char* hostname, int port) { LF_PRINT_LOG("Connected to an RTI. Sending federation ID for authentication."); #endif + unsigned char buffer[5]; // Send the message type first. - unsigned char buffer[4]; - buffer[0] = MSG_TYPE_FED_IDS; + if (_fed.is_transient) { + buffer[0] = MSG_TYPE_TRANSIENT_FED_IDS; + } else { + buffer[0] = MSG_TYPE_FED_IDS; + } + // Next send the federate ID. if (_lf_my_fed_id == UINT16_MAX) { lf_print_error_and_exit("Too many federates! More than %d.", UINT16_MAX - 1); @@ -1884,8 +2012,14 @@ void lf_connect_to_rti(const char* hostname, int port) { // Trace the event when tracing is enabled tracepoint_federate_to_rti(send_FED_ID, _lf_my_fed_id, NULL); + size_t size = 1 + sizeof(uint16_t) + 1; + if (_fed.is_transient) { + // Next send the federate type (persistent or transient) + buffer[2 + sizeof(uint16_t)] = _fed.is_transient ? 1 : 0; + size++; + } // No need for a mutex here because no other threads are writing to this network abstraction. - if (write_to_net(_fed.net_to_RTI, 2 + sizeof(uint16_t), buffer)) { + if (write_to_net(_fed.net_to_RTI, size, buffer)) { continue; // Try again, possibly on a new port. } @@ -1911,7 +2045,7 @@ void lf_connect_to_rti(const char* hostname, int port) { // Read one more byte to determine the cause of rejection. unsigned char cause; read_from_net_fail_on_error(_fed.net_to_RTI, 1, &cause, "Failed to read the cause of rejection by the RTI."); - if (cause == FEDERATION_ID_DOES_NOT_MATCH || cause == WRONG_SERVER) { + if (cause == (unsigned char)FEDERATION_ID_DOES_NOT_MATCH || cause == (unsigned char)WRONG_SERVER) { lf_print_warning("Connected to the wrong RTI. Will try again"); continue; } @@ -1923,8 +2057,12 @@ void lf_connect_to_rti(const char* hostname, int port) { } else if (response == MSG_TYPE_RESIGN) { lf_print_warning("RTI resigned. Will try again"); continue; + } else if (response == MSG_TYPE_UPSTREAM_CONNECTED) { + handle_upstream_connected_message(); + } else if (response == MSG_TYPE_UPSTREAM_DISCONNECTED) { + handle_upstream_disconnected_message(); } else { - lf_print_warning("RTI gave unexpect response %u. Will try again", response); + lf_print_warning("RTI on port %d gave unexpected response %u. Will try again", port, response); continue; } } @@ -2017,7 +2155,7 @@ void* lf_handle_p2p_connections_from_federates(void* env_arg) { } LF_PRINT_LOG("Accepted new connection from remote federate."); - size_t header_length = 1 + sizeof(uint16_t) + 1; + size_t header_length = 1 + sizeof(uint16_t) + 1 + 1; unsigned char buffer[header_length]; int read_failed = read_from_net(net, header_length, (unsigned char*)&buffer); if (read_failed || buffer[0] != MSG_TYPE_P2P_SENDING_FED_ID) { @@ -2027,7 +2165,7 @@ void* lf_handle_p2p_connections_from_federates(void* env_arg) { // Wrong message received. unsigned char response[2]; response[0] = MSG_TYPE_REJECT; - response[1] = WRONG_SERVER; + response[1] = (unsigned char)WRONG_SERVER; // Trace the event when tracing is enabled tracepoint_federate_to_federate(send_REJECT, _lf_my_fed_id, -3, NULL); // Ignore errors on this response. @@ -2048,7 +2186,7 @@ void* lf_handle_p2p_connections_from_federates(void* env_arg) { if (read_failed == 0) { unsigned char response[2]; response[0] = MSG_TYPE_REJECT; - response[1] = FEDERATION_ID_DOES_NOT_MATCH; + response[1] = (unsigned char)FEDERATION_ID_DOES_NOT_MATCH; // Trace the event when tracing is enabled tracepoint_federate_to_federate(send_REJECT, _lf_my_fed_id, -3, NULL); // Ignore errors on this response. @@ -2061,7 +2199,12 @@ void* lf_handle_p2p_connections_from_federates(void* env_arg) { // Extract the ID of the sending federate. uint16_t remote_fed_id = extract_uint16((unsigned char*)&(buffer[1])); - LF_PRINT_DEBUG("Received sending federate ID %d.", remote_fed_id); + bool remote_fed_is_transient = buffer[1 + sizeof(uint16_t)]; + if (remote_fed_is_transient) { + LF_PRINT_DEBUG("Received sending federate ID %d, which is transient.", remote_fed_id); + } else { + LF_PRINT_DEBUG("Received sending federate ID %d, which is persistent.", remote_fed_id); + } // Trace the event when tracing is enabled tracepoint_federate_to_federate(receive_FED_ID, _lf_my_fed_id, remote_fed_id, NULL); @@ -2576,6 +2719,10 @@ void lf_synchronize_with_other_federates(void) { // Reset the start time to the coordinated start time for all federates. // Note that this does not grant execution to this federate. start_time = get_start_time_from_rti(lf_time_physical()); + + lf_print_info("Starting timestamp is: " PRINTF_TIME " and effective start tag is: " PRINTF_TAG ".", lf_time_start(), + effective_start_tag.time - lf_time_start(), effective_start_tag.microstep); + lf_tracing_set_start_time(start_time); // Start a thread to listen for incoming messages from the RTI. @@ -2627,13 +2774,20 @@ bool lf_update_max_level(tag_t tag, bool is_provisional) { _lf_action_delay_table[i])) <= 0)) { continue; } +#else + // For centralized coordination, if there is an upstream transient federate that is not + // connected, then we don't want to block on its action. + if (_lf_zero_delay_cycle_upstream_disconnected[i]) { + // Mark the action known up to and including the current tag. It is absent. + update_last_known_status_on_action(env, input_port_action, env->current_tag); + } #endif // FEDERATED_DECENTRALIZED - // If the current tag is greater than the last known status tag of the input port, - // and the input port is not physical, then block on that port by ensuring - // the MLAA is no greater than the level of that port. - // For centralized coordination, this is applied only to input ports coming from - // federates that are in a ZDC. For decentralized coordination, this is applied - // to all input ports. + // If the current tag is greater than the last known status tag of the input port, + // and the input port is not physical, then block on that port by ensuring + // the MLAA is no greater than the level of that port. + // For centralized coordination, this is applied only to input ports coming from + // federates that are in a ZDC. For decentralized coordination, this is applied + // to all input ports. if (lf_tag_compare(env->current_tag, input_port_action->trigger->last_known_status_tag) > 0 && !input_port_action->trigger->is_physical) { max_level_allowed_to_advance = @@ -2645,6 +2799,32 @@ bool lf_update_max_level(tag_t tag, bool is_provisional) { return (prev_max_level_allowed_to_advance != max_level_allowed_to_advance); } +void lf_stop() { + environment_t* env; + int num_env = _lf_get_environments(&env); + + for (int i = 0; i < num_env; i++) { + LF_MUTEX_LOCK(&env[i].mutex); + + tag_t new_stop_tag; + new_stop_tag.time = env[i].current_tag.time; + new_stop_tag.microstep = env[i].current_tag.microstep + 1; + + lf_set_stop_tag(&env[i], new_stop_tag); + + LF_PRINT_LOG("Setting the stop tag of env %d to " PRINTF_TAG ".", i, env[i].stop_tag.time - start_time, + env[i].stop_tag.microstep); + + if (env[i].barrier.requestors) + _lf_decrement_tag_barrier_locked(&env[i]); + lf_cond_broadcast(&env[i].event_q_changed); + LF_MUTEX_UNLOCK(&env[i].mutex); + } + LF_PRINT_LOG("Federate is stopping."); +} + +const char* lf_get_federation_id() { return federation_metadata.federation_id; } + #ifdef FEDERATED_DECENTRALIZED instant_t lf_wait_until_time(tag_t tag) { instant_t result = tag.time; // Default. diff --git a/core/tag.c b/core/tag.c index d3f045406..003d4f20a 100644 --- a/core/tag.c +++ b/core/tag.c @@ -32,6 +32,12 @@ typedef enum _lf_time_type { LF_LOGICAL, LF_PHYSICAL, LF_ELAPSED_LOGICAL, LF_ELA // Global variables declared in tag.h: instant_t start_time = NEVER; +/** + * Only useful for transient federates. It records the effective start tag, to + * be used at startup. Elapsed logical time calculations will use start_time. + */ +tag_t effective_start_tag = {.time = 0LL, .microstep = 0}; + //////////////// Functions declared in tag.h tag_t lf_tag(void* env) { @@ -189,6 +195,8 @@ instant_t lf_time_physical_elapsed(void) { return lf_time_physical() - start_tim instant_t lf_time_start(void) { return start_time; } +tag_t lf_tag_start_effective(void) { return effective_start_tag; } + size_t lf_readable_time(char* buffer, instant_t time) { if (time <= (instant_t)0) { snprintf(buffer, 2, "0"); diff --git a/core/threaded/reactor_threaded.c b/core/threaded/reactor_threaded.c index 2b59e56ad..24ffcf74f 100644 --- a/core/threaded/reactor_threaded.c +++ b/core/threaded/reactor_threaded.c @@ -33,6 +33,7 @@ // Global variables defined in tag.c and shared across environments: extern instant_t start_time; +extern tag_t effective_start_tag; /** * The maximum amount of time a worker thread should stall @@ -510,12 +511,12 @@ static void _lf_initialize_start_tag(environment_t* env) { // statuses to unknown lf_reset_status_fields_on_input_port_triggers(); - // Get a start_time from the RTI + // Get a start_time and effective_start_tag from the RTI lf_synchronize_with_other_federates(); // Resets start_time in federated execution according to the RTI. } // The start time will likely have changed. Adjust the current tag and stop tag. - env->current_tag = (tag_t){.time = start_time, .microstep = 0u}; + env->current_tag = effective_start_tag; if (duration >= 0LL) { // A duration has been specified. Recalculate the stop time. env->stop_tag = ((tag_t){.time = start_time + duration, .microstep = 0}); @@ -534,25 +535,25 @@ static void _lf_initialize_start_tag(environment_t* env) { // To use uniform code below, we define it here as a local variable. instant_t lf_fed_STA_offset = 0; #endif - LF_PRINT_LOG("Waiting for start time " PRINTF_TIME ".", start_time); - - // Wait until the start time. This is required for federates because the startup procedure - // in lf_synchronize_with_other_federates() can decide on a new start_time that is - // larger than the current physical time. - // This wait_until() is deliberately called after most precursor operations - // for tag (0,0) are performed (e.g., injecting startup reactions, etc.). - // This has two benefits: First, the startup overheads will reduce - // the required waiting time. Second, this call releases the mutex lock and allows - // other threads (specifically, federate threads that handle incoming p2p messages - // from other federates) to hold the lock and possibly raise a tag barrier. - while (!wait_until(start_time, &env->event_q_changed)) { + LF_PRINT_LOG("Waiting for start time " PRINTF_TIME ".", effective_start_tag.time); + + // Wait until the effective start time. This is required for federates because the startup procedure + // in lf_synchronize_with_other_federates() can decide on a new start_time, or the effective start time if it is a + // transient federate, that is larger than the current physical time. + // This wait_until() is deliberately called after most precursor operations for tag (0,0), or effective_start_tag,q + // are performed (e.g., injecting startup reactions, etc.). This has two benefits: First, the startup overheads will + // reduce the required waiting time. Second, this call releases the mutex lock and allows other threads (specifically, + // federate threads that handle incoming p2p messages from other federates) to hold the lock and possibly raise a tag + // barrier. + while (!wait_until(effective_start_tag.time, &env->event_q_changed)) { }; - LF_PRINT_DEBUG("Done waiting for start time + STA offset " PRINTF_TIME ".", start_time + lf_fed_STA_offset); + LF_PRINT_DEBUG("Done waiting for effective start time + STA offset " PRINTF_TIME ".", + effective_start_tag.time + lf_fed_STA_offset); LF_PRINT_DEBUG("Physical time is ahead of current time by " PRINTF_TIME ". This should be close to the STA offset.", - lf_time_physical() - start_time); + lf_time_physical() - effective_start_tag.time); - // Restore the current tag to match the start time. - env->current_tag = (tag_t){.time = start_time, .microstep = 0u}; + // Restore the current tag to match the effective start time. + env->current_tag = (tag_t){.time = effective_start_tag.time, .microstep = effective_start_tag.microstep}; // If the stop_tag is (0,0), also insert the shutdown // reactions. This can only happen if the timeout time @@ -569,7 +570,7 @@ static void _lf_initialize_start_tag(environment_t* env) { // from exceeding the timestamp of the message. It will remove that barrier // once the complete message has been read. Here, we wait for that barrier // to be removed, if appropriate before proceeding to executing tag (0,0). - _lf_wait_on_tag_barrier(env, (tag_t){.time = start_time, .microstep = 0}); + _lf_wait_on_tag_barrier(env, effective_start_tag); // In addition, if the earliest event on the event queue has a tag greater // than (0,0), then wait until the time of that tag. This prevents the runtime @@ -1047,6 +1048,7 @@ int lf_reactor_c_main(int argc, const char* argv[]) { // Initialize the clock through the platform API. No reading of physical time before this. _lf_initialize_clock(); start_time = lf_time_physical(); + effective_start_tag = (tag_t){.time = start_time, .microstep = 0}; #ifndef FEDERATED lf_tracing_set_start_time(start_time); #endif diff --git a/core/utils/pqueue_tag.c b/core/utils/pqueue_tag.c index 5daa04e4a..2ea8e9a8b 100644 --- a/core/utils/pqueue_tag.c +++ b/core/utils/pqueue_tag.c @@ -157,3 +157,14 @@ void pqueue_tag_remove_up_to(pqueue_tag_t* q, tag_t t) { } void pqueue_tag_dump(pqueue_tag_t* q) { pqueue_dump((pqueue_t*)q, pqueue_tag_print_element); } + +tag_t pqueue_tag_max_tag(pqueue_tag_t* q) { + tag_t result = NEVER_TAG; + for (size_t i = 1; i < q->size; i++) { + pqueue_tag_element_t* element = (pqueue_tag_element_t*)(q->d[i]); + if (lf_tag_compare(element->tag, result) > 0) { + result = element->tag; + } + } + return result; +} diff --git a/include/core/federated/federate.h b/include/core/federated/federate.h index e516e2a74..439460388 100644 --- a/include/core/federated/federate.h +++ b/include/core/federated/federate.h @@ -192,6 +192,12 @@ typedef struct federate_instance_t { */ instant_t min_delay_from_physical_action_to_federate_output; + /** + * Indicator of whether this federate is transient. + * The default value of false may be overridden in _lf_initialize_trigger_objects. + */ + bool is_transient; + #ifdef FEDERATED_DECENTRALIZED /** * Thread responsible for setting ports to absent by an STAA offset if they @@ -522,6 +528,11 @@ int lf_send_tagged_message(environment_t* env, interval_t additional_delay, int */ void lf_set_federation_id(const char* fid); +/** + * @brief Return the federation id. + */ +const char* lf_get_federation_id(); + #ifdef FEDERATED_DECENTRALIZED /** * @brief Spawn a thread to iterate through STAA structs. diff --git a/include/core/utils/pqueue_tag.h b/include/core/utils/pqueue_tag.h index 7a349053d..eb0663937 100644 --- a/include/core/utils/pqueue_tag.h +++ b/include/core/utils/pqueue_tag.h @@ -250,4 +250,11 @@ void pqueue_tag_remove_up_to(pqueue_tag_t* q, tag_t t); */ void pqueue_tag_dump(pqueue_tag_t* q); +/** + * @brief Return the maximum tag in the queue or NEVER_TAG if the queue is empty. + * + * @param q The queue. + */ +tag_t pqueue_tag_max_tag(pqueue_tag_t* q); + #endif // PQUEUE_TAG_H diff --git a/include/core/utils/util.h b/include/core/utils/util.h index 3c6c5e2ad..edc6fe9a9 100644 --- a/include/core/utils/util.h +++ b/include/core/utils/util.h @@ -194,4 +194,14 @@ void lf_vprint_error_and_exit(const char* format, va_list args) ATTRIBUTE_FORMAT */ #define LF_CRITICAL_SECTION_EXIT(env) LF_ASSERT(!lf_critical_section_exit(env), "Could not exit critical section") +/** + * @brief Stop the execution of a federate. + * Every enclave within the federate will stop at one microstep later than its + * current tag. Unlike lf_request_stop(), this process does not require any + * involvement from the RTI, nor does it necessitate any consensus. + * + * This function is particularly useful for testing transient federates. + */ +void lf_stop(); + #endif /* UTIL_H */ diff --git a/lingua-franca-ref.txt b/lingua-franca-ref.txt index b756b406f..52199a147 100644 --- a/lingua-franca-ref.txt +++ b/lingua-franca-ref.txt @@ -1 +1 @@ -networkdriver +transient-fed diff --git a/network/api/net_common.h b/network/api/net_common.h index 8f4855fcc..4ca6fd4a5 100644 --- a/network/api/net_common.h +++ b/network/api/net_common.h @@ -23,10 +23,10 @@ * When it has successfully opened a TCP connection, the first message it sends * to the RTI is a @ref MSG_TYPE_FED_IDS message, which contains the ID of this federate * within the federation, contained in the global variable _lf_my_fed_id - * in the federate code - * (which is initialized by the code generator) and the unique ID of - * the federation, a GUID that is created at run time by the generated script - * that launches the federation. + * in the federate code (which is initialized by the code generator), + * the type of this federate (persistent (0) or transient (1)), + * and the unique ID of the federation, a GUID that is created at run time by the + * generated script that launches the federation. * If you launch the federates and the RTI manually, rather than using the script, * then the federation ID is a string that is optionally given to the federate * on the command line when it is launched. The federate will connect @@ -261,10 +261,9 @@ * * Two bytes (ushort) giving the federate ID. * * One byte (uchar) giving the length N of the federation ID. * * N bytes containing the federation ID. - * Each federate needs to have a unique ID between 0 and - * NUMBER_OF_FEDERATES-1. - * Each federate, when starting up, should send this message - * to the RTI. This is its first message to the RTI. + * Each federate needs to have a unique ID between 0 and NUMBER_OF_FEDERATES-1. + * Each federate, when starting up, should send either this message, or MSG_TYPE_TRANSIENT_FED_IDS + * to the RTI, as its first message to the RTI. * The RTI will respond with either MSG_TYPE_REJECT, MSG_TYPE_ACK, or MSG_TYPE_UDP_PORT. * If the federate is a C target LF program, the generated federate * code does this by calling lf_synchronize_with_other_federates(), @@ -272,6 +271,23 @@ */ #define MSG_TYPE_FED_IDS 1 +/** Byte identifying a message from a transient federate to an RTI containing + * the federate ID and the federation ID. The message contains, in this order: + * * One byte equal to MSG_TYPE_TRANSIENT_FED_IDS. + * * Two bytes (ushort) giving the federate ID. + * * One byte (uchar) giving the length N of the federation ID. + * * One byte giving the type of the federate (1 if transient, 0 if persistent) + * * N bytes containing the federation ID. + * Each federate needs to have a unique ID between 0 and NUMBER_OF_FEDERATES-1. + * Each federate, when starting up, should send either this message, or MSG_TYPE_FED_IDS + * to the RTI, as its first message to the RTI. + * The RTI will respond with either MSG_TYPE_REJECT, MSG_TYPE_ACK, or MSG_TYPE_UDP_PORT. + * If the federate is a C target LF program, the generated federate + * code does this by calling lf_synchronize_with_other_federates(), + * passing to it its federate ID. + */ +#define MSG_TYPE_TRANSIENT_FED_IDS 103 + /////////// Messages used for authenticated federation. /////////////// /** * @brief Byte identifying a message from a federate to an RTI containing @@ -336,8 +352,9 @@ * @ingroup Network * * Each federate sends its starting physical time as a message of this - * type, and the RTI broadcasts to all the federates the starting logical + * type, and the RTI broadcasts to all persistent federates the starting * time as a message of this type. + * In case of a joining federate, the RTI will also send the effective start tag. */ #define MSG_TYPE_TIMESTAMP 2 @@ -347,6 +364,12 @@ */ #define MSG_TYPE_TIMESTAMP_LENGTH (1 + sizeof(int64_t)) +/** + * @brief The length of a timestamp message with an effective start tag. + * @ingroup Federated + */ +#define MSG_TYPE_TIMESTAMP_TAG_LENGTH (1 + sizeof(instant_t) + sizeof(tag_t)) + /** * @brief Byte identifying a message to forward to another federate. * @ingroup Network @@ -737,55 +760,48 @@ #define MSG_TYPE_DOWNSTREAM_NEXT_EVENT_TAG 26 ///////////////////////////////////////////// -//// Rejection codes +//// Transient federate support /** - * @brief Code sent with a @ref MSG_TYPE_REJECT message indicating that the - * federation ID does not match. - * @ingroup Network + * A message the informs a downstream federate that a federate upstream of it + * is connected. The next 2 bytes are the federate ID of the upstream federate. */ -#define FEDERATION_ID_DOES_NOT_MATCH 1 +#define MSG_TYPE_UPSTREAM_CONNECTED 27 +#define MSG_TYPE_UPSTREAM_CONNECTED_LENGTH (1 + sizeof(uint16_t)) /** - * @brief Code sent with a @ref MSG_TYPE_REJECT message indicating that the - * federate ID is already in use. - * @ingroup Network + * A message the informs a downstream federate that a federate upstream of it + * is no longer connected. The next 2 bytes are the federate ID of the upstream federate. */ -#define FEDERATE_ID_IN_USE 2 +#define MSG_TYPE_UPSTREAM_DISCONNECTED 28 +#define MSG_TYPE_UPSTREAM_DISCONNECTED_LENGTH (1 + sizeof(uint16_t)) /** - * @brief Code sent with a @ref MSG_TYPE_REJECT message indicating that the - * federate ID is out of range. - * @ingroup Network - */ -#define FEDERATE_ID_OUT_OF_RANGE 3 - -/** - * @brief Code sent with a @ref MSG_TYPE_REJECT message indicating that the - * incoming message is not expected. - * @ingroup Network - */ -#define UNEXPECTED_MESSAGE 4 - -/** - * @brief Code sent with a @ref MSG_TYPE_REJECT message indicating that the - * connected to the wrong server. - * @ingroup Network + * Byte sent by the RTI ordering the federate to stop. Upon receiving the message, + * the federate will call lf_stop(), which will make it resign at its current_tag + * plus 1 microstep. + * The next 8 bytes will be the time at which the federates will stop. + * The next 4 bytes will be the microstep at which the federates will stop.. */ -#define WRONG_SERVER 5 +#define MSG_TYPE_STOP 29 +#define MSG_TYPE_STOP_LENGTH 1 -/** - * @brief Code sent with a @ref MSG_TYPE_REJECT message indicating that the - * HMAC authentication failed. - * @ingroup Network - */ -#define HMAC_DOES_NOT_MATCH 6 +///////////////////////////////////////////// +//// Rejection codes /** - * @brief Code sent with a @ref MSG_TYPE_REJECT message indicating that the - * RTI was not executed using the -a or --auth option. - * @ingroup Network + * These codes are sent in a MSG_TYPE_REJECT message. + * They are limited to one byte (uchar). */ -#define RTI_NOT_EXECUTED_WITH_AUTH 7 +typedef enum { + FEDERATION_ID_DOES_NOT_MATCH = 1, + FEDERATE_ID_IN_USE = 2, + FEDERATE_ID_OUT_OF_RANGE = 3, + UNEXPECTED_MESSAGE = 4, + WRONG_SERVER = 5, + HMAC_DOES_NOT_MATCH = 6, + RTI_NOT_EXECUTED_WITH_AUTH = 7, + JOINING_TOO_LATE = 8 +} rejection_code_t; #endif /* NET_COMMON_H */ diff --git a/tag/api/tag.h b/tag/api/tag.h index eb7d29a23..98104a942 100644 --- a/tag/api/tag.h +++ b/tag/api/tag.h @@ -306,6 +306,14 @@ instant_t lf_time_physical_elapsed(void); */ instant_t lf_time_start(void); +/** + * Return the tag at which the execution effectively started. + * Most of the time, this will default to {.time = start_time, .microstep: 0}. + * When the reactor is a transient federate, however, the value will be different. + * @return A tag. + */ +tag_t lf_tag_start_effective(void); + /** * @brief For user-friendly reporting of time values, the buffer length required. * @ingroup API diff --git a/test/general/utils/pqueue_test.c b/test/general/utils/pqueue_test.c index 665c4e13f..18b3009a8 100644 --- a/test/general/utils/pqueue_test.c +++ b/test/general/utils/pqueue_test.c @@ -23,6 +23,8 @@ static void insert_on_queue(pqueue_tag_t* q) { assert(!pqueue_tag_insert_tag(q, t2)); assert(!pqueue_tag_insert_tag(q, t3)); + assert(lf_tag_compare(pqueue_tag_max_tag(q), t1) == 0); + assert(!pqueue_tag_insert_if_no_match(q, t4)); assert(pqueue_tag_insert_if_no_match(q, t1)); assert(pqueue_tag_insert_if_no_match(q, t4)); diff --git a/trace/api/types/trace_types.h b/trace/api/types/trace_types.h index 743bb1b3e..f13dc7ad5 100644 --- a/trace/api/types/trace_types.h +++ b/trace/api/types/trace_types.h @@ -78,6 +78,8 @@ typedef enum { receive_ADR_QR_REP, receive_DNET, receive_UNIDENTIFIED, + send_STOP, + receive_STOP, NUM_EVENT_TYPES } trace_event_t; @@ -146,6 +148,8 @@ static const char* trace_event_names[] = { "Receiving ADR_QR_REP", "Receiving DNET", "Receiving UNIDENTIFIED", + "Sending STOP", + "Receiving STOP", }; static inline void _suppress_unused_variable_warning_for_static_variable() { (void)trace_event_names; } diff --git a/trace/impl/src/trace_impl.c b/trace/impl/src/trace_impl.c index 7bc44725b..a02b8dcac 100644 --- a/trace/impl/src/trace_impl.c +++ b/trace/impl/src/trace_impl.c @@ -260,10 +260,37 @@ void lf_tracing_global_init(char* process_name, char* process_names, int fedid, } process_id = fedid; char filename[100]; + + // When tracing transient federates, a new trace file is created for each execution. For this, the function + // checks for file existance. If the file exists, the function appends a number to the file name and checks + // again. + int iter = 0; + bool file_exists = false; + bool new_file = false; if (strcmp(process_name, "rti") == 0) { snprintf(filename, sizeof(filename), "%s.lft", process_name); } else { - snprintf(filename, sizeof(filename), "%s_%d.lft", process_name, process_id); + FILE* file; + do { + if (iter == 0) { + sprintf(filename, "%s_%d.lft", process_name, process_id); + } else { + sprintf(filename, "%s_%d_%d.lft", process_name, process_id, iter); + } + file = fopen(filename, "r"); + if (file) { + file_exists = true; + new_file = true; + fclose(file); + iter++; + } else { + file_exists = false; + } + } while (file_exists); + } + if (new_file) { + lf_print_warning("No overwriting! The default file name already exists. A new trace file named %s is created.", + filename); } trace_new(filename); start_trace(&trace, max_num_local_threads); diff --git a/util/tracing/visualization/fedsd.py b/util/tracing/visualization/fedsd.py index 9b2eeb703..7d1a08e64 100644 --- a/util/tracing/visualization/fedsd.py +++ b/util/tracing/visualization/fedsd.py @@ -32,6 +32,7 @@ .FED_ID { stroke: #80DD99; fill: #80DD99; } \ .ACK { stroke: #52b788; fill: #52b788; } \ .FAILED { stroke: #c1121f; fill: #c1121f; } \ + .STOP {stroke: #d0b7eb; fill: #d0b7eb} \ .STOP_REQ { stroke: #e76f51; fill: #e76f51; } \ .STOP_REQ_REP { stroke: #ca6702; fill: #ca6702; } \ .STOP_GRN { stroke: #e9c46a; fill: #e9c46a; } \ @@ -103,7 +104,9 @@ "Receiving ADR_QR_REP": "ADR_QR_REP", "Receiving DNET": "DNET", "Receiving UNIDENTIFIED": "UNIDENTIFIED", - "Scheduler advancing time ends": "AdvLT" + "Scheduler advancing time ends": "AdvLT", + "Sending STOP": "STOP", + "Receiving STOP": "STOP" } prune_event_name.setdefault(" ", "UNIDENTIFIED") @@ -147,7 +150,7 @@ def format_actor_name(name): # Events matching at the sender and receiver ends depend on whether they are tagged # (the elapsed logical time and microstep have to be the same) or not. # Set of non-tagged events (messages) -non_tagged_messages = {'FED_ID', 'ACK', 'RESIGN', 'FAILED', 'REJECT', 'ADR_QR', 'ADR_QR_REP', 'ADR_AD', 'MSG', 'P2P_MSG'} +non_tagged_messages = {'FED_ID', 'ACK', 'RESIGN', 'FAILED', 'REJECT', 'ADR_QR', 'ADR_QR_REP', 'ADR_AD', 'MSG', 'P2P_MSG', 'STOP'} ################################################################################ @@ -246,7 +249,6 @@ def svg_string_draw_label(x1, y1, x2, y2, label) : else: rotation = 0 str_line = '\t'+label+'\n' - #print('rot = '+str(rotation)+' x1='+str(x1)+' y1='+str(y1)+' x2='+str(x2)+' y2='+str(y2)) return str_line @@ -541,11 +543,17 @@ def get_and_convert_lft_files(rti_lft_file, federates_lft_files, start_time, end if (not fed_df.empty): # Get the federate id number fed_id = fed_df.iloc[-1]['self_id'] - # Add to the list of sequence diagram actors and add the name - actors.append(fed_id) - actors_names[fed_id] = Path(fed_trace).stem - # Derive the x coordinate of the actor - x_coor[fed_id] = (padding * 2) + (spacing * (len(actors) - 1)) + + ### Check that the federate id have not been entrered yet. + ### This is particlurly useful for transient actors, when + ### they leave and join several times + if (actors.count(fed_id) == 0): + # Add to the list of sequence diagram actors and add the name + actors.append(fed_id) + actors_names[fed_id] = Path(fed_trace).stem + # Derive the x coordinate of the actor + x_coor[fed_id] = (padding * 2) + (spacing * (len(actors)-1)) + fed_df['x1'] = x_coor[fed_id] trace_df = pd.concat([trace_df, fed_df]) fed_df = fed_df[0:0]