Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ test-release: docker-image
start-backing-services:
docker-compose -f dev/kafka-single.yml up -d

.PHONY: stop-backing-services
stop-backing-services:
docker-compose -f dev/kafka-single.yml down --remove-orphans

.PHONY: docker-image
docker-image:
@GIT_HASH=$$(git rev-parse --short HEAD) && \
Expand Down
2 changes: 2 additions & 0 deletions benchmark/kafka-motherduck.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ echo "starting benchmark for kafka.motherduck.yml with $NUM_MESSAGES messages"
# delete kafka topic if exists
docker exec -it kafka1 kafka-topics --bootstrap-server localhost:9092 --delete --topic input-user-clicks-motherduck || true

docker exec -it kafka1 kafka-topics --bootstrap-server localhost:9092 --create --topic input-user-clicks-motherduck --partitions 10 --replication-factor 1

# Publish benchmark data set
python3 cmd/publish-test-data.py --num-messages=$NUM_MESSAGES --topic="input-user-clicks-motherduck"

Expand Down
23 changes: 21 additions & 2 deletions cmd/publish-test-data.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,31 @@
@click.option('--num-messages', default=1001, type=int)
@click.option('--topic', default='topic-1')
@click.option('--fixture', default=None)
def main(num_messages, topic, fixture):
@click.option('--bootstrap-servers', default='localhost:9092', help='Kafka bootstrap servers')
@click.option('--security-protocol', default=None, help='Kafka security protocol')
@click.option('--sasl-mechanism', default=None, help='SASL mechanism')
@click.option('--sasl-username', default=None, help='SASL username')
@click.option('--sasl-password', default=None, help='SASL password')
@click.option('--ssl-ca-location', default=None, help='Path to CA certificate')
@click.option('--ssl-key-location', default=None, help='Path to client key')
@click.option('--ssl-certificate-location', default=None, help='Path to client certificate')
@click.option('--ssl-key-password', default=None, help='Password for client key')
@click.option('--ssl-endpoint-identification-algorithm', default='none', help='SSL endpoint identification algorithm')
def main(num_messages, topic, fixture, bootstrap_servers, security_protocol, sasl_mechanism, sasl_username, sasl_password, ssl_ca_location, ssl_key_location, ssl_certificate_location, ssl_key_password, ssl_endpoint_identification_algorithm):
kf = KafkaFaker(
bootstrap_servers='localhost:9092',
bootstrap_servers=bootstrap_servers,
num_messages=num_messages,
topic=topic,
fixture=fixture,
security_protocol=security_protocol,
sasl_mechanism=sasl_mechanism,
sasl_username=sasl_username,
sasl_password=sasl_password,
ssl_ca_location=ssl_ca_location,
ssl_key_location=ssl_key_location,
ssl_certificate_location=ssl_certificate_location,
ssl_key_password=ssl_key_password,
ssl_endpoint_identification_algorithm=ssl_endpoint_identification_algorithm
)

kf.publish()
Expand Down
16 changes: 8 additions & 8 deletions dev/config/examples/kafka.motherduck.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ pipeline:
handler:
type: 'handlers.InferredMemBatch'
sql: |
INSERT INTO my_db.issues
INSERT INTO my_db.events
SELECT
action,
issue ->> 'id' AS issue_id,
issue ->> 'title' AS issue_title,
issue ->> 'number' AS issue_number,
issue ->> 'user' ->> 'login' AS issue_user_login,
issue ->> 'repository' ->> 'name' AS issue_repo_name,
issue ->> 'repository' ->> 'id' AS issue_repo_id
ip,
event,
properties ->> 'city' AS properties_city,
properties ->> 'country' AS properties_country,
CAST(timestamp AS TIMESTAMP) AS timestamp,
type,
userId
FROM batch;

sink:
Expand Down
44 changes: 44 additions & 0 deletions dev/config/examples/kafka.sasl-tls.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
pipeline:
batch_size: {{ SQLFLOW_BATCH_SIZE|default(1) }}

source:
type: kafka
kafka:
brokers: [{{ SQLFLOW_KAFKA_BROKERS|default('localhost:9092') }}]
group_id: test
auto_offset_reset: earliest
security_protocol: SASL_SSL
ssl:
ca_location: {{ SQLFLOW_ROOT_DIR }}/dev/kafka/certs/ca-cert.pem
key_location: {{ SQLFLOW_ROOT_DIR }}/dev/kafka/certs/client-key.pem
certificate_location: {{ SQLFLOW_ROOT_DIR }}/dev/kafka/certs/client-cert.pem
key_password: testpass
endpoint_identification_algorithm: 'none'
sasl:
mechanism: PLAIN
username: user
password: bitnami

topics:
- "input-sasl-tls-1"

handler:
type: "handlers.InferredMemBatch"
sql: SELECT * FROM batch

sink:
type: kafka
kafka:
brokers: [{{ kafka_brokers|default('localhost:9092') }}]
topic: output-sasl-tls-1
security_protocol: SASL_SSL
ssl:
ca_location: {{ SQLFLOW_ROOT_DIR }}/dev/kafka/certs/ca-cert.pem
key_location: {{ SQLFLOW_ROOT_DIR }}/dev/kafka/certs/client-key.pem
certificate_location: {{ SQLFLOW_ROOT_DIR }}/dev/kafka/certs/client-cert.pem
key_password: testpass
endpoint_identification_algorithm: 'none'
sasl:
mechanism: PLAIN
username: user
password: bitnami
29 changes: 29 additions & 0 deletions dev/kafka-sasl-tls.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
version: '3.7'

services:
kafka:
image: bitnami/kafka
# container_name: example.kafka.com
# hostname: example.kafka.com
ports:
- "9092:9092"
- "9093:9093"
expose:
- "9093"
env_file:
- ./kafka.env
volumes:
- ./kafka/kafka_server_jaas.conf:/opt/bitnami/kafka/config/kafka_server_jaas.conf
# - ./server.properties:/opt/bitnami/kafka/config/server.properties
- ./kafka/keystore/kafka.keystore.jks:/opt/bitnami/kafka/config/certs/kafka.keystore.jks:ro
- ./kafka/truststore/kafka.truststore.jks:/opt/bitnami/kafka/config/certs/kafka.truststore.jks:ro
# - ./producer.properties:/opt/bitnami/kafka/config/producer.properties
# - ./consumer.properties:/opt/bitnami/kafka/config/consumer.properties
networks:
- kafka

networks:
kafka:
# driver: bridge
# name: kafka
# external: true
28 changes: 28 additions & 0 deletions dev/kafka.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
KAFKA_CFG_NODE_ID=0
KAFKA_CFG_PROCESS_ROLES=controller,broker
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
KAFKA_CFG_LISTENERS=SASL_SSL://0.0.0.0:9092,CONTROLLER://:9093
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
KAFKA_CFG_ADVERTISED_LISTENERS=SASL_SSL://localhost:9092
KAFKA_CLIENT_USERS=user
KAFKA_CLIENT_PASSWORDS=bitnami
KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL=PLAIN
KAFKA_CFG_INTER_BROKER_LISTENER_NAME=SASL_SSL
KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN
KAFKA_TLS_TYPE=JKS
KAFKA_CONTROLLER_USER=controller_user
KAFKA_CONTROLLER_PASSWORD=bitnami
KAFKA_INTER_BROKER_USER=controller_user
KAFKA_INTER_BROKER_PASSWORD=bitnami
KAFKA_OPTS="-Djava.security.auth.login.config=/opt/bitnami/kafka/config/kafka_server_jaas.conf"
KAFKA_CERTIFICATE_PASSWORD=testpass
KAFKA_SSL_KEYSTORE_PASSWORD=testpass
KAFKA_SSL_KEY_PASSWORD=testpass
KAFKA_CFG_SSL_KEYSTORE_LOCATION=/opt/bitnami/kafka/config/certs/kafka.keystore.jks
KAFKA_CFG_SSL_KEYSTORE_TYPE=PKCS12
KAFKA_CFG_SSL_KEYSTORE_PASSWORD=testpass
KAFKA_CFG_SSL_KEY_PASSWORD=testpass
KAFKA_CFG_SSL_KEYSTORE_TYPE=PKCS12
KAFKA_CFG_SSL_TRUSTSTORE_LOCATION=/opt/bitnami/kafka/config/certs/kafka.truststore.jks
KAFKA_CFG_SSL_TRUSTSTORE_PASSWORD=testpass
21 changes: 21 additions & 0 deletions dev/kafka/certs/ca-cert.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
-----BEGIN CERTIFICATE-----
MIIDazCCAlOgAwIBAgIUGm/Ovt5PWDQGxu0AwDVOIKVX6YgwDQYJKoZIhvcNAQEL
BQAwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM
GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDAeFw0yNTA2MjIxOTQ5MDJaFw0zNTA2
MjAxOTQ5MDJaMEUxCzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEw
HwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQwggEiMA0GCSqGSIb3DQEB
AQUAA4IBDwAwggEKAoIBAQDcnhgQ3LSvXbX2jT3D21/EeUiZ9hqUTt/c7Som/Ux9
94/i/n6gQIitGx0fT98zxxJ1aqW2I7LxVrKRzWVdnzr2mnyX5P1S0Ke/K1Sw1pmk
A2pkz3NurC9R0XWdrosTcl1k3H8OuPfIlTS4DGN4R+sgTVKiny4wJenTIjXPU73S
sXw0Ojkke7M+Y68z1P9Go/IrAB2YqnQfHpA8JR8Y4np0+Q1E3QJJ1//Bft2wHsyK
GTZcZbC9jap6gX617yVoafQMtQI6Ka8Ho/3YnHkMmynPzCfcwzhjXS43EWog5o6Y
PidXyKDtOF2xoUZg005wcdPYAkvH/FHTC8QSG8nnyvLpAgMBAAGjUzBRMB0GA1Ud
DgQWBBRmzHAMbyjcaLXwLS/HCcCIeUn/0jAfBgNVHSMEGDAWgBRmzHAMbyjcaLXw
LS/HCcCIeUn/0jAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQCD
hv7LLuuhrmW69GWK4Ny6YucGoZKf3sryw5OXGn85DHchvjp+stnCtEHPTZZNCtLZ
khJP0qiBMIf6djcs/q8CVAXVaypjEyzCQOQ3UOsgCQekRXrzedWCsA8VV6ZWaTF7
+VUSlAYsaSo38DFAA/WtkGTdffEvtEOd989xNxnPqY8CZTJOs5a3UtR4cVdyiPRC
jjyTEFtmlBCnJe98EJB1+7/H5dfggq84KiR9hRayqDBbXAvefxP/5tnTqYHRVLZh
I/y9AYnarSvP0wYJo+5CyCsuPTggDcig877Fkx5jlZHtyAwO/QAsFEKDXl4zQugw
MOdI5pqGrasUstSd2OnB
-----END CERTIFICATE-----
29 changes: 29 additions & 0 deletions dev/kafka/certs/client-cert.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
Bag Attributes
friendlyName: localhost
localKeyID: 54 69 6D 65 20 31 37 35 30 36 32 32 36 34 37 39 34 36
subject=C=Unknown, ST=Unknown, L=Unknown, O=Unknown, OU=Unknown, CN=Unknown
issuer=C=AU, ST=Some-State, O=Internet Widgits Pty Ltd
-----BEGIN CERTIFICATE-----
MIIEATCCAumgAwIBAgIUTE7K4lwDmcrdpAMtvDe3a5zOgNowDQYJKoZIhvcNAQEL
BQAwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM
GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDAeFw0yNTA2MjIxOTQ5NDRaFw0zNTA2
MjAxOTQ5NDRaMGwxEDAOBgNVBAYTB1Vua25vd24xEDAOBgNVBAgTB1Vua25vd24x
EDAOBgNVBAcTB1Vua25vd24xEDAOBgNVBAoTB1Vua25vd24xEDAOBgNVBAsTB1Vu
a25vd24xEDAOBgNVBAMTB1Vua25vd24wggGiMA0GCSqGSIb3DQEBAQUAA4IBjwAw
ggGKAoIBgQCyU6rADKn3c2s1dyvyFj1r9k5tmZtq4+OT81rkBiDgXhn5YZJbVcNd
T0BSv+JXdzq3TuQzOH56rtph7xvbHvhlp5UFpuvSKAn21nm1Y0Hihno+t+uZ82Xr
icrTGx7GWZR9vTutid8c4aVmsUqSsHZtUgdJNqsyoMEDYggeuKQikPSvVEZCJamy
ZXJaisgNbObln8e5xoqcrt8Osi9Xy+4Jm5J8X/dWZoQqL7+70aK3gfEJ5y7EAT/P
TC7LWgIJAbQDvyQPEI5cK7nxailDzEzUp36RIrxGMAZdRUgOnzbS4GKBLohfutjs
3K9vhP+Q86QnfAhRoGqBpx4YW4A0AmONr6NfKh6g+G24hKS/hIKaXegBxQckmH3x
O0Oit3X4VeubkSuQHZAWpZwyjxiEWgJEB6kW+sEGrCHJCEk/kZ/MxLfeDNMkBZXM
9PaGG1fwG5PvQLl7aBR0cpF4BQIvX7DinLS3kbdIvvVc1BUSuGDjBUqvE/pW7asr
cqBmyZ4U408CAwEAAaNCMEAwHQYDVR0OBBYEFLdWB6B5qP37YK5lvczApUEKiqOT
MB8GA1UdIwQYMBaAFGbMcAxvKNxotfAtL8cJwIh5Sf/SMA0GCSqGSIb3DQEBCwUA
A4IBAQANIAKY/c7WgUr1dgax7P9RlS6co/4kY3EAfv6Cyr3Qcxg5uXjEZOvsAHLH
ZKqpS27FibSX6ItkkEhVWE2pQSb5aq+Ng/87AHRjEO0bPRIYDKTwTIOxnoESh2UD
wWqfG7mxCY8OebvTALfToyBHfKVAuxp/aIZwbVxgDluBSHG5E59S2pmPwYr10fMJ
q5C/6Ak+4c3BxH9+1o70NQpucE8rcvEgU1f2b2XhrMVoXI0bfCQ/1H5HT9Up9k+p
3zoJOzNPgrWyEomJz28QnQUB2OiA/ObzeaIfwDsttFNa/BgjNjBzOcdNrLSTnc8z
eAuHWLMZ1JDGgW8ogtuVi0qg5mMd
-----END CERTIFICATE-----
44 changes: 44 additions & 0 deletions dev/kafka/certs/client-key.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
Bag Attributes
friendlyName: localhost
localKeyID: 54 69 6D 65 20 31 37 35 30 36 32 32 36 34 37 39 34 36
Key Attributes: <No Attributes>
-----BEGIN PRIVATE KEY-----
MIIG/gIBADANBgkqhkiG9w0BAQEFAASCBugwggbkAgEAAoIBgQCyU6rADKn3c2s1
dyvyFj1r9k5tmZtq4+OT81rkBiDgXhn5YZJbVcNdT0BSv+JXdzq3TuQzOH56rtph
7xvbHvhlp5UFpuvSKAn21nm1Y0Hihno+t+uZ82XricrTGx7GWZR9vTutid8c4aVm
sUqSsHZtUgdJNqsyoMEDYggeuKQikPSvVEZCJamyZXJaisgNbObln8e5xoqcrt8O
si9Xy+4Jm5J8X/dWZoQqL7+70aK3gfEJ5y7EAT/PTC7LWgIJAbQDvyQPEI5cK7nx
ailDzEzUp36RIrxGMAZdRUgOnzbS4GKBLohfutjs3K9vhP+Q86QnfAhRoGqBpx4Y
W4A0AmONr6NfKh6g+G24hKS/hIKaXegBxQckmH3xO0Oit3X4VeubkSuQHZAWpZwy
jxiEWgJEB6kW+sEGrCHJCEk/kZ/MxLfeDNMkBZXM9PaGG1fwG5PvQLl7aBR0cpF4
BQIvX7DinLS3kbdIvvVc1BUSuGDjBUqvE/pW7asrcqBmyZ4U408CAwEAAQKCAYAA
0zwTn59w6xUwwt1VLvrkHFZX2dFBh3r5K2rN2+PYTtHTvedmlS/8DMqxTKmBecz0
0rzfwJfj5RnpH+QnEORv/5jWURJ/dgXzjsRTWx4RrkVACxdDyx9DWIvmgxIlJ0WD
vlG5HE+kbj5yY55mYDPwhl2yqVZLiRC+XOwiTKNSrYcDrior0nQb70hQV6cNRd39
QxOaAmuH6pe0WG0OyAN5qUlcUjzxeA/Cw8RaI0O8F/EPPZmJWCZjsrap5Du8Wh+i
RC/Vdy2o6JOTPPiwGp3yh7Wgb150fE8/z7fxbYfDlLK8pphaQ2epONgnRpb2Z8Wy
GDwcvVAHExJN8KhAzc2SXhlDXiHi0V5zlvXvZtZF+qsDLhWAeLUeCrh720mFzn4V
NgvVqV7PDcThFBOKbKz0O1M4Rp48uvHczGohbFItEu7oflwHUKkZY8HbRaAVRDYq
bSPSWBajgV+a/eVWB1p9p1JznT2/gXgV/wMLTvm/EITzoWt/mNyrjJPihqK6c0UC
gcEA9vRXGZjHfEbC8nN+VELQRKITAT/uUZdg0MugppmVwErc7zAVi8YdJJaPRnnS
rZw5a+DbHwLh0Jb3Cj7nupFjBg3YJKRjkql2C6Y2NtjTHZ6d0fmUQ9fpW5/WWvJk
WxV3iNs+c/7ZfS+DR2VxLt2wseEK4/tgHggXTUgI5BT44W8sx3o3JzqAP9q+574r
ymnd3edOYBSxbbyHstv0AvI/Lsb3enfezC9B25v7pdilOL3IxTdI4sqUPk0MRDNh
evANAoHBALjb0H0IK3ZB1XVLAKwQur3bfkRcUIimgv7IvzVb52HbV5Tt+9yfbsym
tDWMXUy3De4Aet2YeJYcyKLBrlzP1N6N9W71JtqY2vsJwMdOkZDzRoJnlf/UvUb/
t6wTC3OUEZ52yX3OmYMH8GRNdwve/4b+hqFqj4jijBqRdU93w68VKHc8PbmuPUlh
JmG0Qr5u3MH4lqS0Xs5ynBWel12vyMvg2k9j1E4UwK3z628fM+OzptnxsHRSaDf7
9d4Wwy9tywKBwQCQRKH1lClF0tjkZtAwTW/6CGzt1/lTtQmcMLlDWon/cjyrhv2v
UCeKTmKZG2YWgiORgCTCcx6Uivz7AVDCz1h4GvJgRaDd9x29JHabiAOdVCKCnjkd
gS8UlcXWD7DM/Td9vgc4IHPSDEW3Ge4LIPuujvebxCicosFqJmD7Tb0vhZE1X5KE
2ko+A35vR8uxTjONBSnmO3CD2RW1SYW9iuOaYiYFZ63CvwDMWM2kT3IGOejmPavY
wdvkoYI+/X0/IqkCgcEAl0RoaqfQyMg+X+ir+CEIbmu5+z5/OBLphovGy2cVA+J0
3I2RV4uvIxAWzuq5Phlc8LC72bD2m/+ZvnU2tQPscOFBQTaiQKZsKphkg7MrMq2f
uP4CpIH1ELAYIFrFOCKl+EHDx4rT24EXmTw6eiBUgKaujE+ifKTFeMgmcozSN+bc
YhWNfO+zfuRcf/79zs7xHljJDKX8Hntydc58llFNwmeQvP43sF0S/kVnFls2HtHX
auh3N/hnB8jJ/J4rwhfZAoHAZO6ZLYGz3S9h2s+vpozzmhFNbqs+tp707j3cRbQV
txqmS4KpKk0Kxe4PTrw+XF0qINLL138HlSL8q+OHb1EnYCZqbJkMlzQPUNLHVD7Y
Lxv1thr0+/mJ6SWlNczyiZXGZ9fg7t6gMLTZt6jxiBTylXUti/kNBwhPvoEBQo37
bTmH2i/2N0jTuK/4ZyooYGvlT5CJf5obWyRZBfOFj4Dp/60ZCD30ov4Ai2WdmsGK
NHHkAI6rg+yf2XH/c+593QtP
-----END PRIVATE KEY-----
14 changes: 14 additions & 0 deletions dev/kafka/kafka_server_jaas.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
serviceName="kafka"
username="user"
password="bitnami"
user_kafka="bitnami";
};

ControllerServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
serviceName="kafka"
username="controller_user"
password="bitnami";
};
Binary file added dev/kafka/keystore/kafka.keystore.jks
Binary file not shown.
Binary file added dev/kafka/keystore/kafka.keystore.p12
Binary file not shown.
Loading