Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
92a3a8e
Add a per-federate transients[] bool array to federate_instance_t, in…
ChadliaJerad Mar 27, 2026
d13b350
Add the new attributes to a federate instance, and add the function o…
ChadliaJerad Mar 29, 2026
4823845
Tracing and visualizing UPSTREAM_CONNECTED and UPSTREAM_DISCONNECTED …
ChadliaJerad Apr 4, 2026
30eae00
Add MSG_TYPE_DOWNSTREAM_CONNECTED message, handler and tracing
ChadliaJerad Apr 2, 2026
c180ad0
Add is_transient byte to MSG_TYPE_ADDRESS_QUERY
ChadliaJerad Apr 2, 2026
7108c8c
Update lf_connect_to_federate() to account for the type of federate
ChadliaJerad Apr 2, 2026
3046958
Manage downstream connection messages to request address in case of t…
ChadliaJerad Apr 2, 2026
91cda42
Remove overlooked message
ChadliaJerad Apr 2, 2026
c2d9ecf
Update and optimize federates parameters regarding indound and outbou…
ChadliaJerad Apr 2, 2026
3979834
Defer the P2P connection establishment until after the start-time han…
ChadliaJerad Apr 2, 2026
8157729
Add DOWNSTREAM_DISCONNECTED to silence spurious errors on transient d…
ChadliaJerad Apr 2, 2026
ce32505
Fix 'Attempt to update to earlier tag' in decentralised coordination
ChadliaJerad Apr 2, 2026
94ddec9
Fix P2P_MSG visualization in fedsd
ChadliaJerad Apr 2, 2026
a130de2
During shutdowng, remove the WARNING: Failed to accept the socket. In…
ChadliaJerad Apr 2, 2026
2669f91
Further tune mark_inputs_known_absent and simpler test on p2p transie…
ChadliaJerad Apr 3, 2026
2b8da8f
Better names in case of decentralized coordination: outbound instead …
ChadliaJerad Apr 4, 2026
d372dd1
Fix lingua-franca-ref.txt to point to transient-fed-dec
ChadliaJerad Apr 4, 2026
2900198
Apply clang-format
ChadliaJerad Apr 4, 2026
187fd21
Fix 2 bugs due to network behavior that is different between linux an…
ChadliaJerad Apr 4, 2026
79a8288
Merge branch 'transient-fed' of https://github.com/lf-lang/reactor-c …
Jakio815 Apr 7, 2026
be80a59
Fix merge conflicts.
Jakio815 Apr 7, 2026
461cf4f
Merge conflict fix.
Jakio815 Apr 7, 2026
9c3f741
Minor fix & formatting.
Jakio815 Apr 7, 2026
814e88d
Merge branch 'transient-fed' of https://github.com/lf-lang/reactor-c …
Jakio815 Apr 7, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 146 additions & 27 deletions core/federated/RTI/rti_remote.c

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions core/federated/RTI/rti_remote.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ typedef struct federate_info_t {
bool is_transient;
/** @brief Records the start time of the federate, which is mainly useful for transient federates. */
tag_t effective_start_tag;
/** @brief Number of outbound connections to transient federates. */
int32_t number_of_outbound_transients;
/** @brief IDs of transient federates this federate has outbound connections to.
* The array has size equal to the total number of transient federates in the federation,
* and entries are initialized (and reset) to -1. */
int32_t* outbound_transients;
} federate_info_t;

/**
Expand Down
234 changes: 193 additions & 41 deletions core/federated/federate.c

Large diffs are not rendered by default.

65 changes: 52 additions & 13 deletions include/core/federated/federate.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,23 @@
* @author Edward A. Lee
* @author Anirudh Rengarajsm
*
* This file defines the core data structures and functions used in federated Lingua Franca programs.
* It includes the federate instance structure that tracks the state of a federate, including its
* connections to the RTI and other federates, message handling, and coordination mechanisms.
* The file also provides functions for managing these connections, sending and receiving messages,
* and handling various aspects of federated execution.
* This file defines the core data structures and functions used in federated Lingua Franca
* programs. It includes the federate instance structure that tracks the state of a federate,
* including its connections to the RTI and other federates, message handling, and coordination
* mechanisms. The file also provides functions for managing these connections, sending and
* receiving messages, and handling various aspects of federated execution.
*/

#ifndef FEDERATE_H
#define FEDERATE_H

#include <stdbool.h>

#include "tag.h"
#include "lf_types.h"
#include "environment.h"
#include "lf_types.h"
#include "low_level_platform.h"
#include "net_abstraction.h"
#include "tag.h"

#ifndef ADVANCE_MESSAGE_INTERVAL
#define ADVANCE_MESSAGE_INTERVAL MSEC(10)
Expand Down Expand Up @@ -57,6 +57,11 @@ typedef struct federate_instance_t {
*/
size_t number_of_inbound_p2p_connections;

/**
* Number of inbound peer-to-peer connections from transient federates.
*/
size_t number_of_inbound_p2p_transients;

/**
* Array of thread IDs for threads that listen for incoming messages.
* This is NULL if there are none and otherwise has size given by
Expand All @@ -71,6 +76,18 @@ typedef struct federate_instance_t {
*/
size_t number_of_outbound_p2p_connections;

/**
* Number of outbound peer-to-peer connections to transient federates.
*/
size_t number_of_outbound_p2p_transients;

/**
* An array of IDs of transient federates to which this federate has outbound
* peer-to-peer connections. The array has size number_of_outbound_p2p_transients
* and is allocated at startup by the generated _lf_executable_preamble().
*/
bool outbound_p2p_connection_is_transient[NUMBER_OF_FEDERATES];

/**
* An array that holds the network abstractions for inbound
* connections from each federate. The index will be the federate
Expand All @@ -86,6 +103,15 @@ typedef struct federate_instance_t {
*/
net_abstraction_t net_for_inbound_p2p_connections[NUMBER_OF_FEDERATES];

/**
* An array indexed by federate ID indicating whether the corresponding
* inbound peer-to-peer federate is transient. Initialized to false.
* Set in lf_handle_p2p_connections_from_federates() when the handshake
* reveals the remote federate's type. Used by mark_inputs_known_absent()
* to avoid permanently stamping FOREVER_TAG on ports whose source may rejoin.
*/
bool inbound_p2p_connection_is_transient[NUMBER_OF_FEDERATES];

/**
* An array that holds the network abstractions for outbound direct
* connections to each remote federate. The index will be the federate
Expand All @@ -101,6 +127,13 @@ typedef struct federate_instance_t {
*/
net_abstraction_t net_for_outbound_p2p_connections[NUMBER_OF_FEDERATES];

/**
* An array indicating whether each federate is transient.
* The index is the federate ID.
* This is initialized at startup by the generated _lf_executable_preamble().
*/
bool transients[NUMBER_OF_FEDERATES];

/**
* Thread ID for a thread that accepts network abstractions and then supervises
* listening to those network abstractions for incoming P2P (physical) connections.
Expand Down Expand Up @@ -255,7 +288,7 @@ extern lf_cond_t lf_port_status_changed;
// Public functions (in alphabetical order)

/**
* @brief Connect to the federate with the specified id.
* @brief Connect to the federate with the specified id, based if it is transient or not.
* @ingroup Federated
*
* The established connection will then be used in functions such as lf_send_tagged_message()
Expand All @@ -268,8 +301,12 @@ extern lf_cond_t lf_port_status_changed;
* refer to the network abstraction for communicating directly with the federate.
*
* @param remote_federate_id The ID of the remote federate.
* @param is_transient Whether the remote federate is transient. This affects
* connection behavior: a transient remote federate may not be immediately
* available, so the connection attempt is handled differently than for a
* persistent federate.
*/
void lf_connect_to_federate(uint16_t remote_federate_id);
void lf_connect_to_federate(uint16_t remote_federate_id, bool is_transient);

/**
* @brief Connect to the RTI at the specified host and port.
Expand Down Expand Up @@ -487,9 +524,10 @@ int lf_send_stop_request_to_rti(tag_t stop_tag);
* @brief Send a tagged message to the specified port of the specified federate.
* @ingroup Federated
*
* The tag will be the current tag of the specified environment delayed by the specified additional_delay.
* If the delayed tag falls after the timeout time, then the message is not sent and -1 is returned.
* The caller can reuse or free the memory storing the message after this returns.
* The tag will be the current tag of the specified environment delayed by the specified
* additional_delay. If the delayed tag falls after the timeout time, then the message is not sent
* and -1 is returned. The caller can reuse or free the memory storing the message after this
* returns.
*
* If the message fails to send (e.g. the network abstraction connection is broken), then the
* response depends on the message_type. For MSG_TYPE_TAGGED_MESSAGE, the message is
Expand Down Expand Up @@ -556,7 +594,8 @@ void lf_spawn_staa_thread(void);
void lf_stall_advance_level_federation(environment_t* env, size_t level);

/**
* @brief Version of lf_stall_advance_level_federation() that assumes the caller holds the mutex lock.
* @brief Version of lf_stall_advance_level_federation() that assumes the caller holds the mutex
* lock.
* @ingroup Federated
*
* @param level The level to which we would like to advance.
Expand Down
2 changes: 1 addition & 1 deletion lingua-franca-ref.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
transient-fed
transient-fed-dec
28 changes: 24 additions & 4 deletions network/api/net_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,9 @@
* request). When the RTI has gathered all the stop tags
* from federates (that are still connected), it will decide on a common stop tag
* which is the maximum of the seen stop tag and answer with a MSG_TYPE_STOP_GRANTED. The federate
* sending the MSG_TYPE_STOP_REQUEST and federates sending the MSG_TYPE_STOP_REQUEST_REPLY will freeze
* the advancement of tag until they receive the MSG_TYPE_STOP_GRANTED message, in which
* case they might continue their execution until the stop tag has been reached.
* sending the MSG_TYPE_STOP_REQUEST and federates sending the MSG_TYPE_STOP_REQUEST_REPLY will
* freeze the advancement of tag until they receive the MSG_TYPE_STOP_GRANTED message, in which case
* they might continue their execution until the stop tag has been reached.
*
*/

Expand All @@ -193,7 +193,8 @@
#define FED_COM_BUFFER_SIZE 256u

/**
* @brief Time that a federate waits before asking the RTI again for the port and IP address of a federate.
* @brief Time that a federate waits before asking the RTI again for the port and IP address of a
* federate.
* @ingroup Network
*
* The federate repeatedly sends an MSG_TYPE_ADDRESS_QUERY message after the RTI responds that it
Expand Down Expand Up @@ -576,6 +577,7 @@
* @ingroup Network
*
* The next two bytes are the other federate's ID.
* The following byte is 1 if the remote federate being queried is transient, 0 otherwise.
*/
#define MSG_TYPE_ADDRESS_QUERY 13

Expand Down Expand Up @@ -776,6 +778,24 @@
#define MSG_TYPE_UPSTREAM_DISCONNECTED 28
#define MSG_TYPE_UPSTREAM_DISCONNECTED_LENGTH (1 + sizeof(uint16_t))

/**
* A message that informs an upstream federate that a transient federate downstream of it
* has (re-)connected. The next 2 bytes are the federate ID of the downstream federate.
* Upon receiving this, the upstream federate should query the RTI for the downstream's
* address and establish (or re-establish) the outbound P2P connection.
*/
#define MSG_TYPE_OUTBOUND_CONNECTED 30
#define MSG_TYPE_OUTBOUND_CONNECTED_LENGTH (1 + sizeof(uint16_t))

/**
* A message that informs an upstream federate that a transient federate downstream of it
* has disconnected. The next 2 bytes are the federate ID of the downstream federate.
* Upon receiving this, the upstream federate should close its outbound P2P connection
* to the downstream.
*/
#define MSG_TYPE_OUTBOUND_DISCONNECTED 31
#define MSG_TYPE_OUTBOUND_DISCONNECTED_LENGTH (1 + sizeof(uint16_t))

/**
* Byte sent by the RTI ordering the federate to stop. Upon receiving the message,
* the federate will call lf_stop(), which will make it resign at its current_tag
Expand Down
6 changes: 5 additions & 1 deletion network/impl/src/lf_socket_support.c
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,12 @@ void read_from_net_fail_on_error(net_abstraction_t net_abs, size_t num_bytes, un
// Read failed.
if (format != NULL) {
va_start(args, format);
lf_print_error_system_failure(format, args);
// Use lf_vprint_error (va_list variant) rather than lf_print_error_system_failure
// (variadic variant). Passing a va_list to a '...' function is undefined behaviour
// and manifests as garbage argument values on macOS due to ABI differences.
lf_vprint_error(format, args);
va_end(args);
lf_print_error_and_exit("Error %d: %s", errno, strerror(errno));
} else {
lf_print_error_system_failure("Failed to read from socket.");
}
Expand Down
6 changes: 4 additions & 2 deletions network/impl/src/socket_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,10 @@ int accept_socket(int socket) {
// Got a socket
break;
} else if (socket_id < 0 && (errno != EAGAIN || errno != EWOULDBLOCK || errno != EINTR)) {
if (errno != ECONNABORTED) {
// ECONNABORTED: a connection was aborted before accept() could complete — not fatal.
// EINVAL: the socket was shut down (e.g., shutdown_socket() was called to unblock this
// accept() intentionally when the RTI is shutting down) — expected, not an error.
if (errno != ECONNABORTED && errno != EINVAL) {
lf_print_warning("Failed to accept the socket. %s.", strerror(errno));
}
break;
Expand Down Expand Up @@ -294,7 +297,6 @@ int read_from_socket(int socket, size_t num_bytes, unsigned char* buffer) {
}
return 0;
}

ssize_t peek_from_socket(int socket, unsigned char* result) {
ssize_t bytes_read = recv(socket, result, 1, MSG_DONTWAIT | MSG_PEEK);
if (bytes_read < 0 && (errno == EAGAIN || errno == EWOULDBLOCK))
Expand Down
18 changes: 18 additions & 0 deletions trace/api/types/trace_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ typedef enum {
send_ADR_QR,
send_ADR_QR_REP,
send_DNET,
send_UPSTREAM_CONNECTED,
send_UPSTREAM_DISCONNECTED,
// Receiving messages
receive_ACK,
receive_FAILED,
Expand All @@ -77,9 +79,16 @@ typedef enum {
receive_ADR_QR,
receive_ADR_QR_REP,
receive_DNET,
receive_UPSTREAM_CONNECTED,
receive_UPSTREAM_DISCONNECTED,
receive_UNIDENTIFIED,
send_STOP,
receive_STOP,
// New entries must be added here, at the end, to avoid shifting existing indices.
send_OUTBOUND_CONNECTED,
receive_OUTBOUND_CONNECTED,
send_OUTBOUND_DISCONNECTED,
receive_OUTBOUND_DISCONNECTED,
NUM_EVENT_TYPES
} trace_event_t;

Expand Down Expand Up @@ -123,6 +132,8 @@ static const char* trace_event_names[] = {
"Sending ADR_QR",
"Sending ADR_QR_REP",
"Sending DNET",
"Sending UPSTREAM_CONNECTED",
"Sending UPSTREAM_DISCONNECTED",
// Receiving messages
"Receiving ACK",
"Receiving FAILED",
Expand All @@ -147,9 +158,16 @@ static const char* trace_event_names[] = {
"Receiving ADR_QR",
"Receiving ADR_QR_REP",
"Receiving DNET",
"Receiving UPSTREAM_CONNECTED",
"Receiving UPSTREAM_DISCONNECTED",
"Receiving UNIDENTIFIED",
"Sending STOP",
"Receiving STOP",
// New entries appended at the end to avoid shifting existing indices.
"Sending OUTBOUND_CONNECTED",
"Receiving OUTBOUND_CONNECTED",
"Sending OUTBOUND_DISCONNECTED",
"Receiving OUTBOUND_DISCONNECTED",
};

static inline void _suppress_unused_variable_warning_for_static_variable() { (void)trace_event_names; }
Expand Down
34 changes: 33 additions & 1 deletion util/tracing/visualization/fedsd.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@
.DNET { stroke: #7b2d8b; fill: #7b2d8b; } \
.TIMESTAMP { stroke: #888888; fill: #888888; } \
.FED_ID { stroke: #80DD99; fill: #80DD99; } \
.UPSTREAM_CONNECTED { stroke: #f4a261; fill: #f4a261; } \
.UPSTREAM_DISCONNECTED { stroke: #e76f51; fill: #e76f51; } \
.OUTBOUND_CONNECTED { stroke: #2a9d8f; fill: #2a9d8f; } \
.OUTBOUND_DISCONNECTED { stroke: #264653; fill: #264653; } \
.ACK { stroke: #52b788; fill: #52b788; } \
.FAILED { stroke: #c1121f; fill: #c1121f; } \
.STOP {stroke: #d0b7eb; fill: #d0b7eb} \
Expand Down Expand Up @@ -66,6 +70,10 @@
"Sending STOP_REQ_REP": "STOP_REQ_REP",
"Sending STOP_GRN": "STOP_GRN",
"Sending FED_ID": "FED_ID",
"Sending UPSTREAM_CONNECTED": "UPSTREAM_CONNECTED",
"Sending UPSTREAM_DISCONNECTED": "UPSTREAM_DISCONNECTED",
"Sending OUTBOUND_CONNECTED": "OUTBOUND_CONNECTED",
"Sending OUTBOUND_DISCONNECTED": "OUTBOUND_DISCONNECTED",
"Sending PTAG": "PTAG",
"Sending TAG": "TAG",
"Sending REJECT": "REJECT",
Expand All @@ -89,6 +97,10 @@
"Receiving STOP_REQ_REP": "STOP_REQ_REP",
"Receiving STOP_GRN": "STOP_GRN",
"Receiving FED_ID": "FED_ID",
"Receiving UPSTREAM_CONNECTED": "UPSTREAM_CONNECTED",
"Receiving UPSTREAM_DISCONNECTED": "UPSTREAM_DISCONNECTED",
"Receiving OUTBOUND_CONNECTED": "OUTBOUND_CONNECTED",
"Receiving OUTBOUND_DISCONNECTED": "OUTBOUND_DISCONNECTED",
"Receiving PTAG": "PTAG",
"Receiving TAG": "TAG",
"Receiving REJECT": "REJECT",
Expand Down Expand Up @@ -150,7 +162,8 @@ def format_actor_name(name):
# Events matching at the sender and receiver ends depend on whether they are tagged
# (the elapsed logical time and microstep have to be the same) or not.
# Set of non-tagged events (messages)
non_tagged_messages = {'FED_ID', 'ACK', 'RESIGN', 'FAILED', 'REJECT', 'ADR_QR', 'ADR_QR_REP', 'ADR_AD', 'MSG', 'P2P_MSG', 'STOP'}
non_tagged_messages = {'FED_ID', 'UPSTREAM_CONNECTED', 'UPSTREAM_DISCONNECTED', 'OUTBOUND_CONNECTED', 'OUTBOUND_DISCONNECTED',
'ACK', 'RESIGN', 'FAILED', 'REJECT', 'ADR_QR', 'ADR_QR_REP', 'ADR_AD', 'MSG', 'P2P_MSG', 'STOP'}


################################################################################
Expand Down Expand Up @@ -645,6 +658,25 @@ def get_and_convert_lft_files(rti_lft_file, federates_lft_files, start_time, end
(trace_df['logical_time'] == logical_time) & \
(trace_df['microstep'] == microstep) \
]
elif (event == 'P2P_MSG'):
# P2P messages travel directly between federates without going through the
# RTI, so partner_id in the trace is typically -1 on both sides (the RTI is
# not involved and the tracepoint has no partner). We therefore cannot use
# partner_id for matching. Instead we match each 'out' to the first pending
# 'in' whose physical_time >= the sender's physical_time (causality guarantee:
# the receive cannot precede the send).
physical_time = trace_df.at[index, 'physical_time']
if (inout == 'out'):
matching_df = trace_df[\
(trace_df['inout'] == 'in') & \
(trace_df['arrow'] == 'pending') & \
(trace_df['event'] == event) & \
(trace_df['physical_time'] >= physical_time) \
]
else:
# 'in' rows are claimed by the corresponding 'out' pass above.
# If we reach an 'in' here it means no 'out' claimed it; render as dot.
matching_df = trace_df[0:0]
else :
matching_df = trace_df[\
(trace_df['inout'] != inout) & \
Expand Down
Loading