Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,9 @@ reflector/urfd.*
urfd
inicheck
dbutil
.devcontainer/
/test_urfd.ini
/staging_urfd.ini
/pr_comment_nng.md
/pr_body_fix.md
/staging/
145 changes: 145 additions & 0 deletions docs/nng.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
# NNG Event System Documentation

This document describes the real-time event system in `urfd`, which uses NNG (nanomsg next gen) to broadcast system state and activity as JSON.

## Architecture Overview

The `urfd` reflector acts as an NNG **Publisher** (PUB). Any number of subscribers (e.g., a middle-tier service or dashboard) can connect as **Subscribers** (SUB) to receive the event stream.

```mermaid
graph TD
subgraph "urfd Core"
CR["CReflector"]
CC["CClients"]
CU["CUsers"]
PS["CPacketStream"]
end

subgraph "Publishing Layer"
NP["g_NNGPublisher"]
end

subgraph "Network"
ADDR["tcp://0.0.0.0:5555"]
end

subgraph "External"
MT["Middle Tier / Dashboard"]
end

%% Internal Flows
CC -- "client_connect / client_disconnect" --> NP
CU -- "hearing / closing" --> NP
CR -- "periodic state report" --> NP
PS -- "IsActive status" --> CR

%% Network Flow
NP --> ADDR
ADDR -.-> MT
```

## Messaging Protocols

Events are sent as serialized JSON strings. Each message contains a `type` field to identify the payload structure.

### 1. State Broadcast (`state`)

Sent periodically based on `DashboardInterval` (default 10s). It provides a full snapshot of the reflector's configuration and status.

**Payload Structure:**

```json
{
"type": "state",
"Configure": {
"Key": "Value",
...
},
"Peers": [
{
"Callsign": "XLX123",
"Modules": "ABC",
"Protocol": "D-Extra",
"ConnectTime": "2023-10-27T10:00:00Z"
}
],
"Clients": [
{
"Callsign": "N7TAE",
"OnModule": "A",
"Protocol": "DMR",
"ConnectTime": "2023-10-27T10:05:00Z"
}
],
"Users": [
{
"Callsign": "G4XYZ",
"Repeater": "GB3NB",
"OnModule": "B",
"ViaPeer": "XLX456",
"LastHeard": "2023-10-27T10:10:00Z"
}
],
"ActiveTalkers": [
{
"Module": "A",
"Callsign": "N7TAE"
}
]
}
```

### 2. Client Connectivity (`client_connect` / `client_disconnect`)

Triggered immediately when a client (Repeater, Hotspot, or Mobile App) links or unlinks from a module.

**Payload Structure:**

```json
{
"type": "client_connect",
"callsign": "N7TAE",
"ip": "1.2.3.4",
"protocol": "DMR",
"module": "A"
}
```

### 3. Voice Activity (`hearing`)

Triggered when the reflector "hears" an active transmission. This event is sent for every "tick" or heartbeat of voice activity processed by the reflector.

**Payload Structure:**

```json
{
"type": "hearing",
"my": "G4XYZ",
"ur": "CQCQCQ",
"rpt1": "GB3NB",
"rpt2": "XLX123 A",
"module": "A",
"protocol": "M17"
}
```

### 4. Transmission End (`closing`)

Triggered when a transmission stream is closed (user stops talking).

**Payload Structure:**

```json
{
"type": "closing",
"my": "G4XYZ",
"module": "A",
"protocol": "M17"
}
```

## Middle Tier Design Considerations

1. **Late Joining**: The `state` message is broadcast periodically to ensure a middle-tier connecting at any time (or reconnecting) can synchronize its internal state without waiting for new events.
2. **Active Talkers**: The `ActiveTalkers` array in the `state` message identifies who is currently keyed up. Real-time transitions (start/stop) are driven by the `hearing` events and the absence of such events over a timeout (typically 2-3 seconds).
3. **Deduplication**: The `state` report is a snapshot. If the middle-tier is already tracking events, it can use the `state` report to "re-base" its state and clear out stale data.
Binary file added docs/nng_diagram.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion reflector/BMProtocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ void CBMProtocol::OnDvHeaderPacketIn(std::unique_ptr<CDvHeaderPacket> &Header, c
// release
g_Reflector.ReleaseClients();
// update last heard
g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, peer);
g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, peer, EProtocol::bm);
g_Reflector.ReleaseUsers();
}
}
Expand Down
29 changes: 22 additions & 7 deletions reflector/Clients.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,11 @@ CClients::~CClients()
void CClients::AddClient(std::shared_ptr<CClient> client)
{
// first check if client already exists
for ( auto it=begin(); it!=end(); it++ )
for ( auto it=m_Clients.begin(); it!=m_Clients.end(); it++ )
{
if (*client == *(*it))
// if found, just do nothing
// so *client keep pointing on a valid object
// on function return
{
// delete new one
// if found, just do nothing
return;
}
}
Expand All @@ -63,13 +60,21 @@ void CClients::AddClient(std::shared_ptr<CClient> client)
std::cout << " on module " << client->GetReflectorModule();
}
std::cout << std::endl;

// dashboard event
nlohmann::json event;
event["type"] = "client_connect";
event["callsign"] = client->GetCallsign().GetCS();
event["ip"] = client->GetIp().GetAddress();
event["protocol"] = client->GetProtocolName();
event["module"] = std::string(1, client->GetReflectorModule());
g_NNGPublisher.Publish(event);
}

void CClients::RemoveClient(std::shared_ptr<CClient> client)
{
// look for the client
bool found = false;
for ( auto it=begin(); it!=end(); it++ )
for ( auto it=m_Clients.begin(); it!=m_Clients.end(); it++ )
{
// compare object pointers
if ( *it == client )
Expand All @@ -84,6 +89,16 @@ void CClients::RemoveClient(std::shared_ptr<CClient> client)
std::cout << " on module " << (*it)->GetReflectorModule();
}
std::cout << std::endl;

// dashboard event
nlohmann::json event;
event["type"] = "client_disconnect";
event["callsign"] = (*it)->GetCallsign().GetCS();
event["ip"] = (*it)->GetIp().GetAddress();
event["protocol"] = (*it)->GetProtocolName();
event["module"] = std::string(1, (*it)->GetReflectorModule());
g_NNGPublisher.Publish(event);

m_Clients.erase(it);
break;
}
Expand Down
25 changes: 25 additions & 0 deletions reflector/Configure.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#define JBRANDMEISTER "Brandmeister"
#define JCALLSIGN "Callsign"
#define JCOUNTRY "Country"
#define JDASHBOARD "Dashboard"
#define JDASHBOARDURL "DashboardUrl"
#define JDCS "DCS"
#define JDEFAULTID "DefaultId"
Expand Down Expand Up @@ -122,6 +123,12 @@ CConfigure::CConfigure()
{
IPv4RegEx = std::regex("^((25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9][0-9]|[0-9])\\.){3,3}(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9][0-9]|[0-9]){1,1}$", std::regex::extended);
IPv6RegEx = std::regex("^(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}:|([0-9a-fA-F]{1,4}:){1,6}(:[0-9a-fA-F]{1,4}){1,1}|([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}|([0-9a-fA-F]{1,4}:){1,1}(:[0-9a-fA-F]{1,4}){1,6}|:((:[0-9a-fA-F]{1,4}){1,7}|:))$", std::regex::extended);

data[g_Keys.dashboard.nngaddr] = "tcp://127.0.0.1:5555";
data[g_Keys.dashboard.interval] = 10U;
data[g_Keys.dashboard.enable] = false;
data[g_Keys.dashboard.debug] = false;
data[g_Keys.ysf.ysfreflectordb.id] = 0U;
}

bool CConfigure::ReadData(const std::string &path)
Expand Down Expand Up @@ -183,6 +190,8 @@ bool CConfigure::ReadData(const std::string &path)
section = ESection::ip;
else if (0 == hname.compare(JTRANSCODER))
section = ESection::tc;
else if (0 == hname.compare(JDASHBOARD))
section = ESection::dashboard;
else if (0 == hname.compare(JMODULES))
section = ESection::modules;
else if (0 == hname.compare(JDPLUS))
Expand Down Expand Up @@ -495,6 +504,18 @@ bool CConfigure::ReadData(const std::string &path)
else
badParam(key);
break;
case ESection::dashboard:
if (0 == key.compare(JENABLE))
data[g_Keys.dashboard.enable] = IS_TRUE(value[0]);
else if (0 == key.compare("NNGAddr"))
data[g_Keys.dashboard.nngaddr] = value;
else if (0 == key.compare("Interval"))
data[g_Keys.dashboard.interval] = getUnsigned(value, "Dashboard Interval", 1, 3600, 10);
else if (0 == key.compare("NNGDebug"))
data[g_Keys.dashboard.debug] = IS_TRUE(value[0]);
else
badParam(key);
break;
default:
std::cout << "WARNING: parameter '" << line << "' defined before any [section]" << std::endl;
}
Expand Down Expand Up @@ -797,6 +818,10 @@ bool CConfigure::ReadData(const std::string &path)
if (isDefined(ErrorLevel::fatal, JFILES, JG3TERMINALPATH, g_Keys.files.terminal, rval))
checkFile(JFILES, JG3TERMINALPATH, data[g_Keys.files.terminal]);
}
// Dashboard section
isDefined(ErrorLevel::mild, JDASHBOARD, JENABLE, g_Keys.dashboard.enable, rval);
isDefined(ErrorLevel::mild, JDASHBOARD, "NNGAddr", g_Keys.dashboard.nngaddr, rval);
isDefined(ErrorLevel::mild, JDASHBOARD, "Interval", g_Keys.dashboard.interval, rval);

return rval;
}
Expand Down
2 changes: 1 addition & 1 deletion reflector/Configure.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

enum class ErrorLevel { fatal, mild };
enum class ERefreshType { file, http, both };
enum class ESection { none, names, ip, modules, urf, dplus, dextra, dcs, g3, dmrplus, mmdvm, nxdn, bm, ysf, p25, m17, usrp, dmrid, nxdnid, ysffreq, files, tc };
enum class ESection { none, names, ip, modules, urf, dplus, dextra, dcs, g3, dmrplus, mmdvm, nxdn, bm, ysf, p25, m17, usrp, dmrid, nxdnid, ysffreq, files, tc, dashboard };

#define IS_TRUE(a) ((a)=='t' || (a)=='T' || (a)=='1')

Expand Down
2 changes: 1 addition & 1 deletion reflector/DCSProtocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ void CDcsProtocol::OnDvHeaderPacketIn(std::unique_ptr<CDvHeaderPacket> &Header,
g_Reflector.ReleaseClients();

// update last heard
g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2);
g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, rpt2, EProtocol::dcs);
g_Reflector.ReleaseUsers();
}
}
Expand Down
2 changes: 1 addition & 1 deletion reflector/DExtraProtocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ void CDextraProtocol::OnDvHeaderPacketIn(std::unique_ptr<CDvHeaderPacket> &Heade
g_Reflector.ReleaseClients();

// update last heard
g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2);
g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, rpt2, EProtocol::dextra);
g_Reflector.ReleaseUsers();
}
}
Expand Down
2 changes: 1 addition & 1 deletion reflector/DMRMMDVMProtocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ void CDmrmmdvmProtocol::OnDvHeaderPacketIn(std::unique_ptr<CDvHeaderPacket> &Hea
// update last heard
if ( lastheard )
{
g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2);
g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, rpt2, EProtocol::dmrmmdvm);
g_Reflector.ReleaseUsers();
}
}
Expand Down
2 changes: 1 addition & 1 deletion reflector/DMRPlusProtocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ void CDmrplusProtocol::OnDvHeaderPacketIn(std::unique_ptr<CDvHeaderPacket> &Head
// release
g_Reflector.ReleaseClients();
// update last heard
g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2);
g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, EProtocol::dmrplus);
g_Reflector.ReleaseUsers();
}
}
Expand Down
2 changes: 1 addition & 1 deletion reflector/DPlusProtocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ void CDplusProtocol::OnDvHeaderPacketIn(std::unique_ptr<CDvHeaderPacket> &Header
g_Reflector.ReleaseClients();

// update last heard
g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2);
g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, rpt2, EProtocol::dplus);
g_Reflector.ReleaseUsers();
}
else
Expand Down
2 changes: 1 addition & 1 deletion reflector/G3Protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ void CG3Protocol::OnDvHeaderPacketIn(std::unique_ptr<CDvHeaderPacket> &Header, c
}

// update last heard
g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2);
g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, EProtocol::g3);
g_Reflector.ReleaseUsers();
}
}
Expand Down
2 changes: 1 addition & 1 deletion reflector/GateKeeper.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class CGateKeeper
// authorizations
bool MayLink(const CCallsign &, const CIp &, const EProtocol, char * = nullptr) const;
bool MayTransmit(const CCallsign &, const CIp &, EProtocol = EProtocol::any, char = ' ') const;
const std::string ProtocolName(EProtocol) const;

protected:
// thread
Expand All @@ -56,7 +57,6 @@ class CGateKeeper
bool IsNodeListedOk(const std::string &) const;
bool IsPeerListedOk(const std::string &, char) const;
bool IsPeerListedOk(const std::string &, const CIp &, char *) const;
const std::string ProtocolName(EProtocol) const;

protected:
// data
Expand Down
2 changes: 2 additions & 0 deletions reflector/Global.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "LookupYsf.h"
#include "TCSocket.h"
#include "JsonKeys.h"
#include "NNGPublisher.h"

extern CReflector g_Reflector;
extern CGateKeeper g_GateKeeper;
Expand All @@ -33,3 +34,4 @@ extern CLookupNxdn g_LNid;
extern CLookupYsf g_LYtr;
extern SJsonKeys g_Keys;
extern CTCServer g_TCServer;
extern CNNGPublisher g_NNGPublisher;
3 changes: 3 additions & 0 deletions reflector/JsonKeys.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,7 @@ struct SJsonKeys {

struct FILES { const std::string pid, xml, json, white, black, interlink, terminal; }
files { "pidFilePath", "xmlFilePath", "jsonFilePath", "whitelistFilePath", "blacklistFilePath", "interlinkFilePath", "g3TerminalFilePath" };

struct DASHBOARD { const std::string enable, nngaddr, interval, debug; }
dashboard { "DashboardEnable", "DashboardNNGAddr", "DashboardInterval", "NNGDebug" };
};
Loading