Nadi Interconnect is an application that orchestrates data streaming between nodes implementing the NADI (Node Agnostic Datastream Interface), forming a programmable data flow graph. Developed by the skunkforce organization, it dynamically loads NADI-compliant libraries (DLLs on Windows, .so files on Linux/macOS) from a user-specified directory (default: ./nodes), constructs nodes with unique instance identifiers, configures connections between node output and input streams across multiple channels, and manages message lifetimes for streams feeding multiple consumers. It’s ideal for IoT, sensor networks, data acquisition, or real-time analytics.
The NADI interface, defined in include/nadi.h, provides a minimalistic, platform-independent API for datastreaming, using a meta (JSON string) + data (binary) pattern. It supports dynamic library loading, reentrant C-style callbacks, and cross-language compatibility (e.g., C++, Python).
- Dynamic Node Loading: Loads NADI-compliant DLLs/
.sofiles from a user-specified directory (default:./nodes) usingLoadLibrary(Windows) ordlopen(Linux/macOS). - Node Construction/Destruction: Constructs multiple instances from a single node library with unique identifiers; destructs instances as needed.
- Programmable Stream Routing: Connects and disconnects producer node output streams to consumer node input streams via JSON control messages.
- Connection Management: Queries active stream connections between instances.
- Message Lifetime Management: Uses
scope-guardfor single-consumer streams; multi-consumer support planned. - JSON Message Processing: Parses
channel,meta, anddatafields fromstdinusingnlohmann_json. - Bootstrap Configuration: Initializes nodes and connections at startup via a JSON bootstrap file, parsed with
CLI11. - Stdio Integration: Built-in
stdioinstance forstdin/stdout/stderrcommunication. - Modern C++23: Leverages
std::expected,std::print, andscope-guardfor robust design. - Cross-Platform: Runs on Linux, macOS, and Windows via
vcpkg.
- CMake: Version 3.27 or higher.
- C++ Compiler: Supporting C++23 (e.g., GCC 13, Clang 16, MSVC 2022).
- Git: For cloning the repository.
- vcpkg: Automatically fetched for dependency management.
- Libraries:
nlohmann_json,CLI11,scope-guard(installed viavcpkg). - Operating System: Linux, macOS, or Windows.
-
Clone the Repository:
git clone https://github.com/skunkforce/nadi_node_interconnect.git cd nadi_node_interconnect -
Create a Build Directory:
mkdir build cd build -
Configure with CMake:
cmake -S .. -B . -DCMAKE_TOOLCHAIN_FILE=../vcpkg/scripts/buildsystems/vcpkg.cmake -
Build the Project:
cmake --build . --config ReleaseThe executable
nadi_interconnectwill be generated in thebuilddirectory.
-
Prepare Node Libraries: Place NADI-compliant libraries in the node directory (default:
./nodesor specified via--nodes). Each library must export all NADI functions (nadi_init,nadi_deinit,nadi_send,nadi_free,nadi_descriptor). Example descriptor:{"name":"node1","version":"1.0.0"} -
Prepare Bootstrap File (Optional): Create a JSON bootstrap file (default:
bootstrap.json) to construct, connect, and configure nodes at startup. Examplebootstrap.jsonfor a WebSocket server:{ "messages": [ { "channel": 61440, "meta": {"format": "json"}, "data": { "type": "nodes.instances.construct", "node_name": "websocket_server", "instance": "ws_server" } }, { "channel": 61440, "meta": {"format": "json"}, "data": { "type": "nodes.instances.connections.connect", "source": ["ws_server", 2], "target": ["interconnect", 61440] } }, { "channel": 61440, "meta": {"format": "json"}, "data": {"type": "nodes.loaded"} } ] } -
Run the Executable: Run with default settings:
./nadi_interconnect
Specify a custom node directory and/or bootstrap file:
./nadi_interconnect --nodes /path/to/nodes --bootstrap custom_bootstrap.json
Parses
--nodesand--bootstrapwithCLI11, loads libraries from the specified node directory (default:./nodes), processes bootstrap messages, loads thestdioinstance (with output channel0xF000connected to the interconnect’s input0xF000), and waits for JSON messages onstdin. -
Construct Nodes: Send a control message to construct a node (e.g., via
stdio):echo '{"channel":61440,"meta":{"format":"json"},"data":{"type":"nodes.instances.construct","node_name":"node1","instance":"node1_instance"}}' | ./nadi_interconnect
Constructs the
node1library asnode1_instance. -
Destruct Nodes: Send a control message to destruct a node instance:
echo '{"channel":61440,"meta":{"format":"json"},"data":{"type":"nodes.instances.destruct","instance":"node1_instance"}}' | ./nadi_interconnect
Destructs the
node1_instanceinstance. -
List Loaded Node Libraries: Send a control message to list loaded node libraries:
echo '{"channel":61440,"meta":{"format":"json"},"data":{"type":"nodes.loaded"}}' | ./nadi_interconnect
Response (sent to the originating instance, e.g.,
stdiochannel 0 forstdoutor a dynamic node):{ "meta": {"format": "json"}, "data": { "type": "nodes.loaded.list", "nodes": [ {"name": "node1", "version": "1.0.0"}, {"name": "node2", "version": "1.1.0"} ] } } -
List Constructed Node Instances: Send a control message to list constructed node instances:
echo '{"channel":61440,"meta":{"format":"json"},"data":{"type":"nodes.instances"}}' | ./nadi_interconnect
Response (sent to the originating instance):
{ "meta": {"format": "json"}, "data": { "type": "nodes.instances.list", "instances": [ {"instance": "node1_instance"}, {"instance": "node1_instance2"}, {"instance": "node2_instance"} ] } } -
List Active Connections: Send a control message to list active stream connections:
echo '{"channel":61440,"meta":{"format":"json"},"data":{"type":"nodes.instances.connections"}}' | ./nadi_interconnect
Response (sent to the originating instance):
{ "meta": {"format": "json"}, "data": { "type": "nodes.instances.connections.list", "connections": [ {"source": ["node1_instance", 1], "target": ["node2_instance", 1]}, {"source": ["ws_server", 2], "target": ["interconnect", 61440]} ] } } -
Configure Stream Connections: Send a control message to connect streams:
echo '{"channel":61440,"meta":{"format":"json"},"data":{"type":"nodes.instances.connections.connect","source":["node1_instance",1],"target":["node2_instance",1]}}' | ./nadi_interconnect
Connects
node1_instance’s output channel 1 tonode2_instance’s input channel 1. -
Disconnect Streams: Send a control message to disconnect streams:
echo '{"channel":61440,"meta":{"format":"json"},"data":{"type":"nodes.instances.connections.disconnect","source":["node1_instance",1],"target":["node2_instance",1]}}' | ./nadi_interconnect
Disconnects the stream from
node1_instancechannel 1 tonode2_instancechannel 1. -
Send Data Messages: Pipe a NADI-compliant data message:
echo '{"channel":1,"meta":{"format":"json"},"data":{"value":42}}' | ./nadi_interconnect
The message is routed to connected nodes, with lifetimes managed for single-consumer streams.
-
Monitor Output: Check
stdout(stdio channel 0) for node metadata, connection status, and responses, orstderr(stdio channel 1) for errors, if using thestdioinstance.
-
WebSocket Server: Use a bootstrap file to construct a WebSocket server node, connect its output channel to the interconnect’s
0xF000channel, and enable clients to list nodes (nodes.loaded,nodes.instances), construct/destruct nodes (nodes.instances.construct,nodes.instances.destruct), and manage connections (nodes.instances.connections.connect,nodes.instances.connections.disconnect).{ "messages": [ { "channel": 61440, "meta": {"format": "json"}, "data": { "type": "nodes.instances.construct", "node_name": "websocket_server", "instance": "ws_server" } }, { "channel": 61440, "meta": {"format": "json"}, "data": { "type": "nodes.instances.connections.connect", "source": ["ws_server", 2], "target": ["interconnect", 61440] } } ] } -
Device Driver Logging: Use a bootstrap file to construct a device driver node and a file output node, connect their streams, configure logging, and destruct the nodes afterward.
{ "messages": [ { "channel": 61440, "meta": {"format": "json"}, "data": { "type": "nodes.instances.construct", "node_name": "device_driver", "instance": "driver" } }, { "channel": 61440, "meta": {"format": "json"}, "data": { "type": "nodes.instances.construct", "node_name": "file_output", "instance": "file" } }, { "channel": 61440, "meta": {"format": "json"}, "data": { "type": "nodes.instances.connections.connect", "source": ["driver", 1], "target": ["file", 1] } }, { "channel": 61440, "meta": {"format": "json"}, "data": { "type": "nodes.instances.destruct", "instance": "driver" } }, { "channel": 61440, "meta": {"format": "json"}, "data": { "type": "nodes.instances.destruct", "instance": "file" } } ] }
- Node Directory: Place NADI-compliant DLLs/
.sofiles in the node directory (default:./nodes, or specified via--nodes). Themanagementclass loads these libraries, queryingnadi_descriptor()for metadata. - Bootstrap File: A JSON file (default:
bootstrap.json) with a"messages"array of control or data messages to construct, destruct, connect, disconnect, and configure nodes at startup, parsed usingCLI11. - Stdio Instance: Automatically loaded as
"stdio"with:- Output channel
0xF000: Connected to the interconnect’s input0xF000for control messages. - Output channel 0: Prints to
stdout. - Output channel 1: Prints to
stderr. - Input channels: Can receive messages from other nodes, addressed via the
"channel"field.
- Output channel
- Message Types:
- Data Messages: For stream data, must include:
channel: Integer for the communication channel (not0xF000).meta: JSON object with aformatfield (e.g.,"json","microseconds-double").data: Payload, interpreted based onmeta.format. Example:
{ "channel": 1, "meta": {"format": "microseconds-double"}, "data": [ [1625097600000000, 25.3], [1625097601000000, 25.5] ] } - Control Messages: For node construction, destruction, connection management, or node queries (on channel
0xF000), must include:channel:61440(0xF000 in decimal).meta: JSON object with"format":"json".data: JSON object with control details.- Node Construction:
type:"nodes.instances.construct".node_name: String specifying the node library’s name (fromnadi_descriptor).instance: String defining a unique identifier for the instance, used in connections. Example:
{ "channel": 61440, "meta": {"format": "json"}, "data": { "type": "nodes.instances.construct", "node_name": "node1", "instance": "node1_instance" } } - Node Destruction:
type:"nodes.instances.destruct".instance: String identifying the instance to destruct. Example:
{ "channel": 61440, "meta": {"format": "json"}, "data": { "type": "nodes.instances.destruct", "instance": "node1_instance" } } - Node Connection:
type:"nodes.instances.connections.connect".source: Tuple[instance, channel], whereinstanceis the instance identifier (string) andchannelis a number.target: Tuple[instance, channel], specifying the target instance identifier and channel. Example:
{ "channel": 61440, "meta": {"format": "json"}, "data": { "type": "nodes.instances.connections.connect", "source": ["node1_instance", 1], "target": ["node2_instance", 1] } } - Node Disconnection:
type:"nodes.instances.connections.disconnect".source: Tuple[instance, channel], identifying the source instance and channel.target: Tuple[instance, channel], identifying the target instance and channel. Example:
{ "channel": 61440, "meta": {"format": "json"}, "data": { "type": "nodes.instances.connections.disconnect", "source": ["node1_instance", 1], "target": ["node2_instance", 1] } } - Node Library Listing:
type:"nodes.loaded". Response (sent to the originating instance):
{ "meta": {"format": "json"}, "data": { "type": "nodes.loaded.list", "nodes": [ {"name": "node1", "version": "1.0.0"}, {"name": "node2", "version": "1.1.0"} ] } } - Node Instance Listing:
type:"nodes.instances". Response (sent to the originating instance):
{ "meta": {"format": "json"}, "data": { "type": "nodes.instances.list", "instances": [ {"instance": "node1_instance"}, {"instance": "node1_instance2"}, {"instance": "node2_instance"} ] } } - Connection Listing:
type:"nodes.instances.connections". Response (sent to the originating instance):
{ "meta": {"format": "json"}, "data": { "type": "nodes.instances.connections.list", "connections": [ {"source": ["node1_instance", 1], "target": ["node2_instance", 1]}, {"source": ["ws_server", 2], "target": ["interconnect", 61440]} ] } }
- Data Messages: For stream data, must include:
The NADI interface is defined in include/nadi.h. Key components include:
struct nadi_message {
char* meta; // Null-terminated JSON string
unsigned long meta_hash; // Hash of meta content (0 if unused)
char* data; // Raw binary data
unsigned int data_length;// Length of data in bytes
nadi_free_callback free; // Callback to free the message
nadi_node_handle instance; // Connection instance handle
unsigned int channel; // Channel identifier
};nadi_init(nadi_node_handle* instance, nadi_receive_callback callback): Initializes a NADI instance with a receive callback.nadi_deinit(nadi_node_handle instance): Deinitializes an instance, blocking until threads complete.nadi_send(nadi_message* message, nadi_node_handle instance): Sends a message, transferring ownership on success.nadi_free(nadi_message* message): Frees a message using itsfreecallback.nadi_descriptor(): Returns a JSON string with node metadata (e.g.,{"name":"node","version":"1.0.0"}).
struct nadi_library {
nadi_init_pt init; // Pointer to nadi_init
nadi_deinit_pt deinit; // Pointer to nadi_deinit
nadi_send_pt send; // Pointer to nadi_send
nadi_free_pt free; // Pointer to nadi_free
nadi_descriptor_pt descriptor; // Pointer to nadi_descriptor
// Platform-specific library handle (HMODULE on Windows, void* on Linux/macOS)
};nadi_library load_node(std::string path): Loads a NADI-compliant library, mapping its exported functions tonadi_librarypointers.
load_nodes(const std::string& dir): Loads NADI-compliant libraries from the specified directory usingload_node.to_json(): Returns JSON representation of loaded nodes.callback(nadi_message* msg): Processes messages:- Routes data messages (non-
0xF000channels) to connected nodes. - Parses control messages (
0xF000channel) for node construction (nodes.instances.construct), destruction (nodes.instances.destruct), connection management (nodes.instances.connections.connect,nodes.instances.connections.disconnect,nodes.instances.connections), node library listing (nodes.loaded), node instance listing (nodes.instances), or bootstrap initialization. - Manages message lifetimes with
scope-guard(single-consumer); multi-consumer support planned.
- Routes data messages (non-
load_nodes(const std::string& path): Loads libraries from the specified directory usingfind_nodes::get_node_pathsandload_node.construct_node: Constructs a node instance from a library, initializing it withnadi_init.destruct_node: Destructs an instance, deinitializing it withnadi_deinit.send_loaded_list: Sends anodes.loaded.listresponse with library metadata.send_instances_list: Sends anodes.instances.listresponse with instance identifiers.lib_from_instance: Retrieves thenadi_libraryfor an instance handle.
connect: Connects a source instance and channel to a target instance and channel (implementation incomplete).disconnect: Disconnects a source-target stream (implementation incomplete).send_connections_list: Sends anodes.instances.connections.listresponse with active connections (implementation incomplete).destinations_from: Returns destinations for a given source instance and channel.
(Note: Full details for connection management require complete implementations in message_routing.hpp.)
To build with debug symbols:
cmake -S .. -B . -CMAKE_BUILD_TYPE=Debug -CMAKE_TOOLCHAIN_FILE=../vcpkg/scripts/buildsystems/vcpkg.cmake
cmake --build . --config DebugContributions are welcome! Follow these steps:
- Fork the repository.
- Create a feature branch (
git checkout -b feature/your-feature). - Commit changes (
git commit -m 'Add your feature'). - Push to the branch (
git push origin feature/your-feature). - Open a Pull Request.
See CONTRIBUTING.md for guidelines.
Licensed under the MIT License. See LICENSE for details.