Skip to content

Commit dce8001

Browse files
author
anders-wartoft
committed
0.1.6-SNAPSHOT
1 parent 525e12d commit dce8001

12 files changed

Lines changed: 400 additions & 140 deletions

File tree

BUILD_NUMBER

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
274
1+
275

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -458,6 +458,10 @@ See LICENSE file
458458

459459
## 0.1.6-SNAPSHOT
460460
* Removed `topic` configuration from downstream. Downstream uses upstream's topic name, or a translation of that name.
461+
* Fixed the following issues:
462+
* resend will not accept payloadSize=auto #1
463+
* Separate internal logging from event stream from Upstream Kafka #2
464+
* Dedup can't use TLS to connect to Kafka #3
461465

462466
## 0.1.5-SNAPSHOT
463467
* Multiple sockets with SO_REUSEPORT for faster and more reliable UDP receive in Linux and Mac for downstream. Fallback to single thread in Windows.

config/testcases/dedup-17.env

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
WINDOW_SIZE=10000
2+
MAX_WINDOWS=10000
3+
RAW_TOPICS=transfer
4+
CLEAN_TOPIC=dedup
5+
GAP_TOPIC=gaps
6+
BOOTSTRAP_SERVERS=kafka-downstream.sitia.nu:9094
7+
STATE_DIR_CONFIG=/tmp/dedup_state_16_{id}/
8+
GAP_EMIT_INTERVAL_SEC=60
9+
PERSIST_INTERVAL_MS=10000
10+
INSTANCE_ID={id}
11+
#JAVA_OPTS=-javaagent:/opt/airgap/dedup/jolokia-agent.jar=port=8778,host=127.0.0.1
12+
13+
14+
KAFKA_SECURITY_PROTOCOL=SSL
15+
KAFKA_SSL_TRUSTSTORE_LOCATION=./tmp/kafka-certificates/kafka-downstream.truststore.jks
16+
KAFKA_SSL_TRUSTSTORE_PASSWORD=changeit
17+
# Use a client keystore that contains a client certificate/private key for mTLS
18+
KAFKA_SSL_KEYSTORE_LOCATION=./tmp/kafka-certificates/airgap-upstream.keystore.jks
19+
KAFKA_SSL_KEYSTORE_PASSWORD=changeit
20+
# If the key has a separate password, set it; otherwise keystore password is usually the same
21+
KAFKA_SSL_KEY_PASSWORD=changeit
22+
# Explicitly set keystore/truststore types (these keystores are PKCS12)
23+
KAFKA_SSL_KEYSTORE_TYPE=PKCS12
24+
KAFKA_SSL_TRUSTSTORE_TYPE=PKCS12
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
# Used in some log events to identify the source
2+
id=Downstream_17
3+
# Used in the MTU code
4+
nic=ens160
5+
# UDP address to listen on
6+
targetIP=0.0.0.0
7+
# UDP port to listen on
8+
targetPort=1234
9+
# Kafka target. If more than one, separate the servers with a comma ,
10+
bootstrapServers=kafka-downstream.sitia.nu:9094
11+
# Topic to write internal logging to
12+
internalTopic=airgap-internal
13+
14+
# kafka or cmd
15+
target=kafka
16+
17+
# More logging
18+
logLevel=info
19+
20+
# Client id to use when sending events to Kafka. Will be overridden by the certificate CN if Kafka mTLS is used
21+
clientId=downstream-17
22+
23+
# Glob that will identify the path(s) to all private keys we should try to use
24+
# when a key exchange packet is received
25+
privateKeyFiles=certs/private*.pem
26+
27+
mtu=1500
28+
#mtu=1500
29+
# After reading the config, where should we send the logs? Default is stdout
30+
#logFileName=./tmp/downstream.log
31+
32+
# TLS to Kafka
33+
# Certificate file
34+
certFile=certs/tmp/airgap-downstream.crt
35+
# Key file
36+
keyFile=certs/tmp/airgap-downstream.key
37+
# CA file
38+
caFile=certs/tmp/kafka-ca.crt
39+
40+
# This is the magic that saves the upstream topic 'transfer' to the downstream topic 'transfer-12b'
41+
#topicTranslations={"transfer": "transfer-15"}
42+
43+
44+
# One goroutine per CPU core or two per NUMA node.
45+
numReceivers=2
46+
# Allows burst absorption; uses RAM for latency tolerance.
47+
channelBufferSize=65536
48+
# Small batching boosts handler efficiency without noticeable delay.
49+
batchSize=128
50+
# Reduces reallocation when packets vary in size.
51+
readBufferMultiplier=32
52+
53+
# Log statistics every minute
54+
logStatistics=60
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
input=kafka
2+
topic=dedup
3+
client-id=downstream
4+
bootstrap-server=kafka-downstream.sitia.nu:9092
5+
print-keys=true
6+
filter=gap
7+
regex=_(\d+)$
8+
duplicate-detection=false
9+
output=cmd
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
# Used in some log events to identify the source
2+
id=Upstream_17
3+
# Used in the MTU code
4+
nic=lo0
5+
# UDP target (downstream, use static arp and route to be able to send packets over a diode)
6+
#targetIP=kafka-downstream.sitia.nu
7+
targetIP=localhost
8+
# UDP target port
9+
targetPort=1234
10+
# Kafka source. If more than one, separate the servers with a comma ,
11+
source=kafka
12+
bootstrapServers=kafka-upstream.sitia.nu:9094,kafka-upstream.sitia.nu:8094
13+
# Topic to read
14+
topic=transfer
15+
# Kafka group id to use. If several threads are used, this is prepended to the thread names.
16+
groupID=17
17+
logLevel=info
18+
# Read from this time instead of starting at the end
19+
# 2024-01-28T10:24:55+01:00
20+
from=
21+
# For testing, you can use random, else use kafka
22+
#source=random
23+
source=kafka
24+
# Downstream public key file
25+
encryption=false
26+
publicKeyFile=certs/server2.pem
27+
# Every n seconds, generate a new symmetric key
28+
generateNewSymmetricKeyEvery=50
29+
#payloadSize=auto
30+
payloadSize=1500
31+
# After reading the config, where should we send the logs? Default is stdout
32+
#logFileName=./tmp/upstream.log
33+
# Format: {"name": "thread_name", "offset": offset_in_seconds}
34+
#sendingThreads=[{"now": 0}, {"3minutes": -10}]
35+
sendingThreads=[{"No-delay": 0}]
36+
# TLS to Kafka
37+
# Certificate file
38+
certFile=certs/tmp/airgap-upstream.crt
39+
# Key file
40+
keyFile=certs/tmp/airgap-upstream.key
41+
# CA file
42+
caFile=certs/tmp/kafka-ca.crt
43+
44+
# To test the gap detection removing gaps, remove the filter out for previously dropped packets
45+
# Deliver every even numbered packet:
46+
#deliverFilter=2,4,6
47+
# compress the payload if events longer than this in bytes
48+
compressWhenLengthExceeds=1200
49+
50+
logStatistics=60
51+
52+
eps=500
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
input=counter
2+
string=TEST_
3+
filter=guard
4+
filter=gap
5+
regex=_(\d+)$
6+
duplicate-detection=true
7+
output=kafka
8+
client-id=test
9+
topic=transfer
10+
bootstrap-server=kafka-upstream.sitia.nu:9092
11+
statistics=true
12+
output=cmd
13+
limit=100

doc/Deduplication.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,17 @@
2020
export STATE_DIR_CONFIG=/tmp/dedup_state_a/
2121
export GAP_EMIT_INTERVAL_SEC=60
2222
export PERSIST_INTERVAL_MS=10000
23+
24+
# For TLS connections, also add
25+
export KAFKA_SECURITY_PROTOCOL=SSL
26+
export KAFKA_SSL_TRUSTSTORE_LOCATION=/opt/airgap/certs/client.truststore.jks
27+
export KAFKA_SSL_TRUSTSTORE_PASSWORD=changeit
28+
export KAFKA_SSL_KEYSTORE_LOCATION=/opt/airgap/certs/client.keystore.jks
29+
export KAFKA_SSL_KEYSTORE_PASSWORD=changeit
30+
31+
# or
32+
export KAFKA_SASL_MECHANISM=PLAIN
33+
export KAFKA_SASL_JAAS_CONFIG='org.apache.kafka.common.security.plain.PlainLoginModule required username="user" password="pass";'
2334
```
2435

2536
3. **Start the deduplication service:**

java-streams/dependency-reduced-pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<groupId>nu.sitia.airgap.streams</groupId>
55
<artifactId>air-gap-deduplication</artifactId>
66
<name>air-gap Deduplication</name>
7-
<version>0.1.5-SNAPSHOT</version>
7+
<version>0.1.6-SNAPSHOT</version>
88
<url>http://maven.apache.org</url>
99
<build>
1010
<plugins>

java-streams/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<groupId>nu.sitia.airgap.streams</groupId>
55
<artifactId>air-gap-deduplication</artifactId>
66
<packaging>jar</packaging>
7-
<version>0.1.5-SNAPSHOT</version>
7+
<version>0.1.6-SNAPSHOT</version>
88
<name>air-gap Deduplication</name>
99
<url>http://maven.apache.org</url>
1010
<dependencies>

0 commit comments

Comments
 (0)