You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
This will build the upstream and downstream binaries as well as the Kafka Streams Java application for deduplication.
16
17
17
18
2.**Run a local test (no Kafka required):**
18
-
- Edit `config/upstream3.properties` and `config/downstream3.properties` as needed (set `targetIP` to your local IP).
19
+
- Edit `config/upstream.properties` and `config/downstream.properties` as needed (set `targetIP` to your local IP).
19
20
- In one terminal:
20
21
```bash
21
-
go run src/downstream/downstream.go config/downstream3.properties
22
+
go run src/downstream/downstream.go config/downstream.properties
22
23
```
23
24
- In another terminal:
24
25
```bash
25
-
go run src/upstream/upstream.go config/upstream3.properties
26
+
go run src/upstream/upstream.go config/upstream.properties
26
27
```
27
-
- You should see messages sent and received.
28
+
- You should see messages received in the first terminal.
28
29
29
30
3.**Next steps:**
30
31
- Connect to real Kafka by editing the config files.
@@ -35,7 +36,6 @@ For resend of lost events that gap-detection identifies, a back channel must be
35
36
- To set up reduncancy and/or load balancing, see [Redundancy and Load Balancing.md](doc/Redundancy%20and%20Load%20Balancing.md)
36
37
37
38
38
-
39
39
## Notation
40
40
There are four executable files that together constitutes the transfer software.
41
41
- Upstream - on the sending side of the diode, also used as the name of the program that consumes Kafka events and produces UDP packets
@@ -56,19 +56,19 @@ The first topic may contain duplicates but the one from the deduplicator should
56
56
## Getting started
57
57
### Very simple use case
58
58
To enable users to get started without Kafka and without hardware diode, use the following properties files:
59
-
-upstream3.properties
60
-
-downstream3.properties
59
+
-upstream.properties
60
+
-downstream.properties
61
61
62
-
These properties files are configured for getting a few random strings instead of reading from Kafka and to send with UDP without encyption. Change the targetIP in upstream3.properties to the one you would like to send to, and change the targetIP in downstream3.properties to the same value. The IP address must be one that downstrem can bind to and that upstream can send to.
62
+
These properties files are configured for getting a few random strings instead of reading from Kafka and to send with UDP without encyption. Change the targetIP in upstream3.properties to the one you would like to send to. The targetIP in downstream3.properties is set to 0.0.0.0 so it will bind to all local addresses.
63
63
64
64
In one terminal, start the server with:
65
65
```
66
-
go run src/downstream/downstream.go config/downstream3.properties
66
+
go run src/downstream/downstream.go config/downstream.properties
67
67
```
68
68
69
69
In a new terminal, start the client (sender) with:
70
70
```
71
-
go run src/upstream/upstream.go config/upstream3.properties
71
+
go run src/upstream/upstream.go config/upstream.properties
72
72
```
73
73
A few messages should now be sent from upstream and received by downstream. From here, add encryption and connections to Kafka to enable all features.
The PartitionDedupApp exposes rich runtime information and operations via JMX, thanks to the `JmxSupport` class. You can access these via JConsole, Jolokia, or any JMX client.
- `<partition>`: Info about the GapDetector for that partition (window stats, offsets, etc.)
104
-
- `<partition>_gaps`: Current gaps for that partition
105
-
- Operations: `getAllGaps_<partition>()`and `purge_<partition>()` to fetch or purge gaps for a specific partition
102
+
- **GapDetectors MBean** (per partition):
103
+
- Each partition is registered as its own MBean:
104
+
- `nu.sitia.airgap:type=GapDetectors,partition=0`
105
+
- `nu.sitia.airgap:type=GapDetectors,partition=1`
106
+
- ...etc.
107
+
- For each partition MBean, exposes:
108
+
- `<topic>_<partition>`: Info about the GapDetector for that partition (window stats, offsets, etc.)
109
+
- `<topic>_<partition>_gaps`: Current gaps for that partition
110
+
- `<topic>_<partition>_mem`: Estimated memory usage for that partition
111
+
- `<topic>_<partition>_nrMissing`: Number of missing offsets for that partition
112
+
- `<topic>_<partition>_nrWindows`: Number of windows for that partition
113
+
- Operations: `getAllGaps_<topic>_<partition>()`and `purge_<topic>_<partition>()` to fetch or purge gaps for a specific partition
114
+
- **Aggregate GapDetectors MBean** (optional):
115
+
- `nu.sitia.airgap:type=GapDetectors,partition=-1`provides aggregate stats across all partitions (if enabled in your code).
106
116
- **Props MBean** (`nu.sitia.airgap:type=Props`):
107
117
- All Kafka Streams properties
108
118
- Topics, assigned partitions, window size, max windows, and other runtime config
@@ -114,22 +124,24 @@ The PartitionDedupApp exposes rich runtime information and operations via JMX, t
114
124
3. Browse to `nu.sitia.airgap -> GapDetectors` or `Props` to view attributes and invoke operations.
115
125
- **With Jolokia (for Metricbeat):**
116
126
- The Jolokia agent exposes these MBeans over HTTP. Metricbeat can be configured to scrape specific attributes or call operations.
117
-
- Example: To fetch all gaps for partition 0, configure Metricbeat's `jolokia.yml` to query the `getAllGaps_0` operation on the `nu.sitia.airgap:type=GapDetectors` MBean.
127
+
128
+
- Example: To fetch all gaps for partition 0, configure Metricbeat's `jolokia.yml` to query the `getAllGaps_transfer_0` operation on the `nu.sitia.airgap:type=GapDetectors,partition=0` MBean.
118
129
119
130
#### Example Jolokia Query (HTTP API)
120
131
To call an operation (e.g., get all gaps for partition 0):
0 commit comments