Skip to content

Commit 74cecbe

Browse files
author
anders-wartoft
committed
Multiple sending threads with time offset
1 parent 5bd0200 commit 74cecbe

15 files changed

Lines changed: 560 additions & 19 deletions

.vscode/launch.json

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
{
2+
// Use IntelliSense to learn about possible attributes.
3+
// Hover to view descriptions of existing attributes.
4+
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
5+
"version": "0.2.0",
6+
"configurations": [
7+
8+
{
9+
"name": "Launch file",
10+
"type": "go",
11+
"request": "launch",
12+
"mode": "debug",
13+
"program": "${file}",
14+
"args": ["../../config/upstream.properties"],
15+
},
16+
{
17+
"name": "Launch Package",
18+
"type": "go",
19+
"request": "launch",
20+
"mode": "auto",
21+
"program": "${fileDirname}"
22+
}
23+
]
24+
}

.vscode/settings.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{
2+
"makefile.configureOnOpen": false
3+
}

Makefile

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
GIT_VERSION := $(shell git describe --tags --always --dirty)
2+
3+
.PHONY: all upstream downstream clean
4+
5+
all: upstream downstream
6+
7+
src/upstream/version.go:
8+
@echo 'package main
9+
10+
var GitVersion = "$(GIT_VERSION)"' > src/upstream/version.go
11+
12+
upstream: src/upstream/version.go src/upstream/upstream.go
13+
cd src/upstream && go build -o upstream upstream.go
14+
15+
downstream: src/upstream/version.go src/downstream/downstream.go
16+
cd src/downstream && go build -o downstream downstream.go
17+
18+
clean:
19+
rm -f src/upstream/upstream src/downstream/downstream src/upstream/version.go

README.md

Lines changed: 50 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,23 @@ On downstream startup, a file glob is read from the configuration file. When a k
6464
#### Performance
6565
UDP receive is normally faster than Kafka writes. The downstream application tries to safeguard against lost packets by using a lightweight thread that receives events, decrypts them using AES256 (quite fast) and then adds the events to an array. Another thread consumes the array and writes to Kafka using the async protocol (that also returns immediately and processes the write in another thread). If the performance is not enough, first try to add nodes to the Kafka cluster and add the nodes to the bootstrapServers configuration in the downstream process. You can also try to add several events together before writing them to the upstream Kafka, since there is some overhead for each Kafka event, especially for writing. As a last resort, the upstream sender can be set to throttle (no code for that yet), e.g., by adding a small time.Sleep after each sent event. You should be able to securely transmit tens of thousand events every second using one transmission chain, but for large installations you might have to add more sender/receiver chains, as well as upgrade the Kafka instances.
6666

67+
### Automatic resend
68+
Since UDP is an unreliable protocol, you can set up air-gap to automatically resend logs at specific time intervals. In the upstream property file, add the following property:
69+
`sendingThreads=[{"Name": seconds-delay}, ... ]`
70+
71+
Example:
72+
```
73+
groupID=testGroup
74+
sendingThreads=[{"now": 0}, {"3minutes-ago": -180}]
75+
```
76+
For each name-dealy, a thread will be created in upstream. Each thread connects to Kafka with a group name that consists of the groupID from the property file, a "-" character, and the name from the sendingThreads property. In the example above, two threads will be created. One named "now" with 0 seconds delay and one named "3minutes-ago" with 180 seconds delay.
77+
78+
The thread with name "now" will connecto to Kafka with a group id of "testGroup-now" and the other thread "testGroup-3minutes-ago".
79+
80+
When a thread reads a message in Kafka, it will check if the Kafka timestamp - the delay (delay is a negative number) is at least equal to, or greater than, the current time. If not, it will sleep until the time is right to send.
81+
82+
If a message is read but not delivered (because the thread is sleeping) and the application terminates, then the
83+
6784
### Gap Detection
6885
Since UDP diodes only allow traffic in one direction, we need to invent a new feedback loop in case any events are not successfully delivered over the connection. We do this by enumerating all events we get from the upstream Kafka, send them over the UDP connection and use the enumeration as a key for the events in the downstream Kafka.
6986

@@ -92,22 +109,22 @@ When set-timestamp has updated the configuration file, just restart the upstream
92109

93110
## Keys
94111
Generate keystore with certificate or obtain otherwise.
95-
```
112+
```bash
96113
keytool -genkey -alias keyalias -keyalg RSA -validity 365 -keystore keystore.jks -storetype JKS
97114
```
98115

99116
Export the java keystore to a PKCS12 keystore:
100-
```
117+
```bash
101118
keytool -importkeystore -srckeystore keystore.jks -destkeystore keystore.p12 -deststoretype PKCS12 -srcalias keyalias
102119
```
103120

104121
Export certificate using openssl:
105-
```
122+
```bash
106123
openssl pkcs12 -in keystore.p12 -nokeys -out cert.pem
107124
```
108125

109126
Export unencrypted private key:
110-
```
127+
```bash
111128
openssl pkcs12 -in keystore.p12 -nodes -nocerts -out key.pem
112129
```
113130

@@ -148,6 +165,23 @@ generateNewSymmetricKeyEvery=500
148165
mtu=auto
149166
```
150167

168+
All configuration can be overridden by environment variables. In the case a file is parsed that will be parsed first and may result in configuration errors. After that, any environment variables are checked and, if found, will overwrite the file configuration.
169+
170+
The environment variables are named as:
171+
```bash
172+
AIRGAP_UPSTREAM_{variable name in upper case}
173+
```
174+
Example:
175+
```bash
176+
export AIRGAP_UPSTREAM_ID=NEW-ID
177+
export AIRGAP_UPSTREAM_NIC=ens0
178+
export AIRGAP_UPSTREAM_TARGET_IP=255.255.255.255
179+
...
180+
```
181+
182+
183+
Resend will receive a major overhaul so this section is now deprecated:
184+
151185
The same configuration file is used for set-timestamp. set-timestamp uses the bootstrapServers to query for timestamps for each topic partition and position in the set-timestamp arguments. When the earlierst timestamp has been retrieved, the configuration files's from parameter is set to that timestamp. When upstream restarts, it will read all Kafka events from the beginning and discard those before the from timestamp. During the start phase, set-timestamp will revert the from parameter to an empty string so the next startup will use Kafka's stored pointer for where to read from in the future.
152186

153187
### Downstream
@@ -196,10 +230,18 @@ The applications responds to os signals and can be installed as a service in, e.
196230
See https://fabianlee.org/2022/10/29/golang-running-a-go-binary-as-a-systemd-service-on-ubuntu-22-04/
197231

198232
## Compile
199-
Change directory to the application you would like to build (./src/upstream, ...).
200-
Compile the applications with `go build`.
201-
233+
There is a Makefile that will get the latest tag from git and save in version.go, then build upstream and downstream.
234+
```bash
235+
make # builds both upstream and downstream
236+
make upstream # builds only upstream
237+
make downstream # builds only downstream
238+
make clean # removes binaries and version.go
202239
```
240+
To build manually, change directory to the application you would like to build (./src/upstream, ...).
241+
Compile the applications with `go build {filename}`.
242+
243+
Example:
244+
```bash
203245
cd src/upstream
204246
go build upstream.go
205247
```
@@ -248,5 +290,5 @@ sudo systemctl start upstream
248290
## Dependencies
249291
air-gap uses IBM/sarama for the Kafka read/write. For other dependencies, check the go.mod file.
250292

251-
## License
293+
## Licence
252294
See LICENCE file

config/downstream4.properties

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# Used in some log events to identify the source
2+
id=Downstream_3
3+
# Used in the MTU code
4+
nic=en0
5+
# UDP target (what IP to listen to)
6+
targetIP=192.168.0.27
7+
# UDP port to listen to
8+
targetPort=1234
9+
# Kafka target. If more than one, separate the servers with a comma ,
10+
bootstrapServers=192.168.153.139:9092
11+
# Topic to write to
12+
topic=out
13+
# Glob that will identify the path(s) to all private keys we should try to use
14+
# when a key exchange packet is received
15+
privateKeyFiles=certs/private*.pem
16+
# kafka or cmd
17+
target=kafka
18+
#target=cmd
19+
# Some extra printouts
20+
verbose=false
21+
# Set mtu to auto or 0 will query the nic of the mtu
22+
mtu=auto
23+
# Client id to use when sending events to Kafka
24+
clientId=downstream
25+
# After loading the config, where to send the logs? stdout is default
26+
#logFileName=./tmp/downstream.log

config/upstream.properties

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,5 @@ mtu=auto
2828
logFileName=./tmp/upstream.log
2929
# Add more sending threads (only valid with source=kafka). sendingThread needs to be an array of objects
3030
# Format: {"name": "thread_name", "offset": offset_in_seconds}
31-
sendingThreads=[{"now": 0}, {"3minutes": -10}]
31+
#sendingThreads=[{"now": 0}, {"3minutes": -10}]
32+
sendingThreads=[{"0seconds": 0}]

0 commit comments

Comments
 (0)