diff --git a/Makefile b/Makefile index 0d1da41..3061824 100644 --- a/Makefile +++ b/Makefile @@ -31,6 +31,10 @@ test-release: docker-image start-backing-services: docker-compose -f dev/kafka-single.yml up -d +.PHONY: stop-backing-services +stop-backing-services: + docker-compose -f dev/kafka-single.yml down --remove-orphans + .PHONY: docker-image docker-image: @GIT_HASH=$$(git rev-parse --short HEAD) && \ diff --git a/benchmark/kafka-motherduck.sh b/benchmark/kafka-motherduck.sh index 4bc3fa9..87db9dd 100755 --- a/benchmark/kafka-motherduck.sh +++ b/benchmark/kafka-motherduck.sh @@ -5,6 +5,8 @@ echo "starting benchmark for kafka.motherduck.yml with $NUM_MESSAGES messages" # delete kafka topic if exists docker exec -it kafka1 kafka-topics --bootstrap-server localhost:9092 --delete --topic input-user-clicks-motherduck || true +docker exec -it kafka1 kafka-topics --bootstrap-server localhost:9092 --create --topic input-user-clicks-motherduck --partitions 10 --replication-factor 1 + # Publish benchmark data set python3 cmd/publish-test-data.py --num-messages=$NUM_MESSAGES --topic="input-user-clicks-motherduck" diff --git a/cmd/publish-test-data.py b/cmd/publish-test-data.py index 04b92d2..7aa6d7c 100644 --- a/cmd/publish-test-data.py +++ b/cmd/publish-test-data.py @@ -11,12 +11,31 @@ @click.option('--num-messages', default=1001, type=int) @click.option('--topic', default='topic-1') @click.option('--fixture', default=None) -def main(num_messages, topic, fixture): +@click.option('--bootstrap-servers', default='localhost:9092', help='Kafka bootstrap servers') +@click.option('--security-protocol', default=None, help='Kafka security protocol') +@click.option('--sasl-mechanism', default=None, help='SASL mechanism') +@click.option('--sasl-username', default=None, help='SASL username') +@click.option('--sasl-password', default=None, help='SASL password') +@click.option('--ssl-ca-location', default=None, help='Path to CA certificate') +@click.option('--ssl-key-location', default=None, help='Path to client key') +@click.option('--ssl-certificate-location', default=None, help='Path to client certificate') +@click.option('--ssl-key-password', default=None, help='Password for client key') +@click.option('--ssl-endpoint-identification-algorithm', default='none', help='SSL endpoint identification algorithm') +def main(num_messages, topic, fixture, bootstrap_servers, security_protocol, sasl_mechanism, sasl_username, sasl_password, ssl_ca_location, ssl_key_location, ssl_certificate_location, ssl_key_password, ssl_endpoint_identification_algorithm): kf = KafkaFaker( - bootstrap_servers='localhost:9092', + bootstrap_servers=bootstrap_servers, num_messages=num_messages, topic=topic, fixture=fixture, + security_protocol=security_protocol, + sasl_mechanism=sasl_mechanism, + sasl_username=sasl_username, + sasl_password=sasl_password, + ssl_ca_location=ssl_ca_location, + ssl_key_location=ssl_key_location, + ssl_certificate_location=ssl_certificate_location, + ssl_key_password=ssl_key_password, + ssl_endpoint_identification_algorithm=ssl_endpoint_identification_algorithm ) kf.publish() diff --git a/dev/config/examples/kafka.motherduck.yml b/dev/config/examples/kafka.motherduck.yml index 47792eb..4dd613e 100644 --- a/dev/config/examples/kafka.motherduck.yml +++ b/dev/config/examples/kafka.motherduck.yml @@ -20,15 +20,15 @@ pipeline: handler: type: 'handlers.InferredMemBatch' sql: | - INSERT INTO my_db.issues + INSERT INTO my_db.events SELECT - action, - issue ->> 'id' AS issue_id, - issue ->> 'title' AS issue_title, - issue ->> 'number' AS issue_number, - issue ->> 'user' ->> 'login' AS issue_user_login, - issue ->> 'repository' ->> 'name' AS issue_repo_name, - issue ->> 'repository' ->> 'id' AS issue_repo_id + ip, + event, + properties ->> 'city' AS properties_city, + properties ->> 'country' AS properties_country, + CAST(timestamp AS TIMESTAMP) AS timestamp, + type, + userId FROM batch; sink: diff --git a/dev/config/examples/kafka.sasl-tls.yml b/dev/config/examples/kafka.sasl-tls.yml new file mode 100644 index 0000000..e870753 --- /dev/null +++ b/dev/config/examples/kafka.sasl-tls.yml @@ -0,0 +1,44 @@ +pipeline: + batch_size: {{ SQLFLOW_BATCH_SIZE|default(1) }} + + source: + type: kafka + kafka: + brokers: [{{ SQLFLOW_KAFKA_BROKERS|default('localhost:9092') }}] + group_id: test + auto_offset_reset: earliest + security_protocol: SASL_SSL + ssl: + ca_location: {{ SQLFLOW_ROOT_DIR }}/dev/kafka/certs/ca-cert.pem + key_location: {{ SQLFLOW_ROOT_DIR }}/dev/kafka/certs/client-key.pem + certificate_location: {{ SQLFLOW_ROOT_DIR }}/dev/kafka/certs/client-cert.pem + key_password: testpass + endpoint_identification_algorithm: 'none' + sasl: + mechanism: PLAIN + username: user + password: bitnami + + topics: + - "input-sasl-tls-1" + + handler: + type: "handlers.InferredMemBatch" + sql: SELECT * FROM batch + + sink: + type: kafka + kafka: + brokers: [{{ kafka_brokers|default('localhost:9092') }}] + topic: output-sasl-tls-1 + security_protocol: SASL_SSL + ssl: + ca_location: {{ SQLFLOW_ROOT_DIR }}/dev/kafka/certs/ca-cert.pem + key_location: {{ SQLFLOW_ROOT_DIR }}/dev/kafka/certs/client-key.pem + certificate_location: {{ SQLFLOW_ROOT_DIR }}/dev/kafka/certs/client-cert.pem + key_password: testpass + endpoint_identification_algorithm: 'none' + sasl: + mechanism: PLAIN + username: user + password: bitnami diff --git a/dev/kafka-sasl-tls.yml b/dev/kafka-sasl-tls.yml new file mode 100644 index 0000000..adc9cd1 --- /dev/null +++ b/dev/kafka-sasl-tls.yml @@ -0,0 +1,29 @@ +version: '3.7' + +services: + kafka: + image: bitnami/kafka + # container_name: example.kafka.com + # hostname: example.kafka.com + ports: + - "9092:9092" + - "9093:9093" + expose: + - "9093" + env_file: + - ./kafka.env + volumes: + - ./kafka/kafka_server_jaas.conf:/opt/bitnami/kafka/config/kafka_server_jaas.conf + # - ./server.properties:/opt/bitnami/kafka/config/server.properties + - ./kafka/keystore/kafka.keystore.jks:/opt/bitnami/kafka/config/certs/kafka.keystore.jks:ro + - ./kafka/truststore/kafka.truststore.jks:/opt/bitnami/kafka/config/certs/kafka.truststore.jks:ro + # - ./producer.properties:/opt/bitnami/kafka/config/producer.properties + # - ./consumer.properties:/opt/bitnami/kafka/config/consumer.properties + networks: + - kafka + +networks: + kafka: +# driver: bridge +# name: kafka +# external: true \ No newline at end of file diff --git a/dev/kafka.env b/dev/kafka.env new file mode 100644 index 0000000..fada15b --- /dev/null +++ b/dev/kafka.env @@ -0,0 +1,28 @@ +KAFKA_CFG_NODE_ID=0 +KAFKA_CFG_PROCESS_ROLES=controller,broker +KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093 +KAFKA_CFG_LISTENERS=SASL_SSL://0.0.0.0:9092,CONTROLLER://:9093 +KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:SASL_PLAINTEXT,SASL_SSL:SASL_SSL +KAFKA_CFG_ADVERTISED_LISTENERS=SASL_SSL://localhost:9092 +KAFKA_CLIENT_USERS=user +KAFKA_CLIENT_PASSWORDS=bitnami +KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER +KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL=PLAIN +KAFKA_CFG_INTER_BROKER_LISTENER_NAME=SASL_SSL +KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN +KAFKA_TLS_TYPE=JKS +KAFKA_CONTROLLER_USER=controller_user +KAFKA_CONTROLLER_PASSWORD=bitnami +KAFKA_INTER_BROKER_USER=controller_user +KAFKA_INTER_BROKER_PASSWORD=bitnami +KAFKA_OPTS="-Djava.security.auth.login.config=/opt/bitnami/kafka/config/kafka_server_jaas.conf" +KAFKA_CERTIFICATE_PASSWORD=testpass +KAFKA_SSL_KEYSTORE_PASSWORD=testpass +KAFKA_SSL_KEY_PASSWORD=testpass +KAFKA_CFG_SSL_KEYSTORE_LOCATION=/opt/bitnami/kafka/config/certs/kafka.keystore.jks +KAFKA_CFG_SSL_KEYSTORE_TYPE=PKCS12 +KAFKA_CFG_SSL_KEYSTORE_PASSWORD=testpass +KAFKA_CFG_SSL_KEY_PASSWORD=testpass +KAFKA_CFG_SSL_KEYSTORE_TYPE=PKCS12 +KAFKA_CFG_SSL_TRUSTSTORE_LOCATION=/opt/bitnami/kafka/config/certs/kafka.truststore.jks +KAFKA_CFG_SSL_TRUSTSTORE_PASSWORD=testpass \ No newline at end of file diff --git a/dev/kafka/certs/ca-cert.pem b/dev/kafka/certs/ca-cert.pem new file mode 100644 index 0000000..f87a4ff --- /dev/null +++ b/dev/kafka/certs/ca-cert.pem @@ -0,0 +1,21 @@ +-----BEGIN CERTIFICATE----- +MIIDazCCAlOgAwIBAgIUGm/Ovt5PWDQGxu0AwDVOIKVX6YgwDQYJKoZIhvcNAQEL +BQAwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM +GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDAeFw0yNTA2MjIxOTQ5MDJaFw0zNTA2 +MjAxOTQ5MDJaMEUxCzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEw +HwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQwggEiMA0GCSqGSIb3DQEB +AQUAA4IBDwAwggEKAoIBAQDcnhgQ3LSvXbX2jT3D21/EeUiZ9hqUTt/c7Som/Ux9 +94/i/n6gQIitGx0fT98zxxJ1aqW2I7LxVrKRzWVdnzr2mnyX5P1S0Ke/K1Sw1pmk +A2pkz3NurC9R0XWdrosTcl1k3H8OuPfIlTS4DGN4R+sgTVKiny4wJenTIjXPU73S +sXw0Ojkke7M+Y68z1P9Go/IrAB2YqnQfHpA8JR8Y4np0+Q1E3QJJ1//Bft2wHsyK +GTZcZbC9jap6gX617yVoafQMtQI6Ka8Ho/3YnHkMmynPzCfcwzhjXS43EWog5o6Y +PidXyKDtOF2xoUZg005wcdPYAkvH/FHTC8QSG8nnyvLpAgMBAAGjUzBRMB0GA1Ud +DgQWBBRmzHAMbyjcaLXwLS/HCcCIeUn/0jAfBgNVHSMEGDAWgBRmzHAMbyjcaLXw +LS/HCcCIeUn/0jAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQCD +hv7LLuuhrmW69GWK4Ny6YucGoZKf3sryw5OXGn85DHchvjp+stnCtEHPTZZNCtLZ +khJP0qiBMIf6djcs/q8CVAXVaypjEyzCQOQ3UOsgCQekRXrzedWCsA8VV6ZWaTF7 ++VUSlAYsaSo38DFAA/WtkGTdffEvtEOd989xNxnPqY8CZTJOs5a3UtR4cVdyiPRC +jjyTEFtmlBCnJe98EJB1+7/H5dfggq84KiR9hRayqDBbXAvefxP/5tnTqYHRVLZh +I/y9AYnarSvP0wYJo+5CyCsuPTggDcig877Fkx5jlZHtyAwO/QAsFEKDXl4zQugw +MOdI5pqGrasUstSd2OnB +-----END CERTIFICATE----- diff --git a/dev/kafka/certs/client-cert.pem b/dev/kafka/certs/client-cert.pem new file mode 100644 index 0000000..c75d41e --- /dev/null +++ b/dev/kafka/certs/client-cert.pem @@ -0,0 +1,29 @@ +Bag Attributes + friendlyName: localhost + localKeyID: 54 69 6D 65 20 31 37 35 30 36 32 32 36 34 37 39 34 36 +subject=C=Unknown, ST=Unknown, L=Unknown, O=Unknown, OU=Unknown, CN=Unknown +issuer=C=AU, ST=Some-State, O=Internet Widgits Pty Ltd +-----BEGIN CERTIFICATE----- +MIIEATCCAumgAwIBAgIUTE7K4lwDmcrdpAMtvDe3a5zOgNowDQYJKoZIhvcNAQEL +BQAwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM +GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDAeFw0yNTA2MjIxOTQ5NDRaFw0zNTA2 +MjAxOTQ5NDRaMGwxEDAOBgNVBAYTB1Vua25vd24xEDAOBgNVBAgTB1Vua25vd24x +EDAOBgNVBAcTB1Vua25vd24xEDAOBgNVBAoTB1Vua25vd24xEDAOBgNVBAsTB1Vu +a25vd24xEDAOBgNVBAMTB1Vua25vd24wggGiMA0GCSqGSIb3DQEBAQUAA4IBjwAw +ggGKAoIBgQCyU6rADKn3c2s1dyvyFj1r9k5tmZtq4+OT81rkBiDgXhn5YZJbVcNd +T0BSv+JXdzq3TuQzOH56rtph7xvbHvhlp5UFpuvSKAn21nm1Y0Hihno+t+uZ82Xr +icrTGx7GWZR9vTutid8c4aVmsUqSsHZtUgdJNqsyoMEDYggeuKQikPSvVEZCJamy +ZXJaisgNbObln8e5xoqcrt8Osi9Xy+4Jm5J8X/dWZoQqL7+70aK3gfEJ5y7EAT/P +TC7LWgIJAbQDvyQPEI5cK7nxailDzEzUp36RIrxGMAZdRUgOnzbS4GKBLohfutjs +3K9vhP+Q86QnfAhRoGqBpx4YW4A0AmONr6NfKh6g+G24hKS/hIKaXegBxQckmH3x +O0Oit3X4VeubkSuQHZAWpZwyjxiEWgJEB6kW+sEGrCHJCEk/kZ/MxLfeDNMkBZXM +9PaGG1fwG5PvQLl7aBR0cpF4BQIvX7DinLS3kbdIvvVc1BUSuGDjBUqvE/pW7asr +cqBmyZ4U408CAwEAAaNCMEAwHQYDVR0OBBYEFLdWB6B5qP37YK5lvczApUEKiqOT +MB8GA1UdIwQYMBaAFGbMcAxvKNxotfAtL8cJwIh5Sf/SMA0GCSqGSIb3DQEBCwUA +A4IBAQANIAKY/c7WgUr1dgax7P9RlS6co/4kY3EAfv6Cyr3Qcxg5uXjEZOvsAHLH +ZKqpS27FibSX6ItkkEhVWE2pQSb5aq+Ng/87AHRjEO0bPRIYDKTwTIOxnoESh2UD +wWqfG7mxCY8OebvTALfToyBHfKVAuxp/aIZwbVxgDluBSHG5E59S2pmPwYr10fMJ +q5C/6Ak+4c3BxH9+1o70NQpucE8rcvEgU1f2b2XhrMVoXI0bfCQ/1H5HT9Up9k+p +3zoJOzNPgrWyEomJz28QnQUB2OiA/ObzeaIfwDsttFNa/BgjNjBzOcdNrLSTnc8z +eAuHWLMZ1JDGgW8ogtuVi0qg5mMd +-----END CERTIFICATE----- diff --git a/dev/kafka/certs/client-key.pem b/dev/kafka/certs/client-key.pem new file mode 100644 index 0000000..5c5b2d3 --- /dev/null +++ b/dev/kafka/certs/client-key.pem @@ -0,0 +1,44 @@ +Bag Attributes + friendlyName: localhost + localKeyID: 54 69 6D 65 20 31 37 35 30 36 32 32 36 34 37 39 34 36 +Key Attributes: +-----BEGIN PRIVATE KEY----- +MIIG/gIBADANBgkqhkiG9w0BAQEFAASCBugwggbkAgEAAoIBgQCyU6rADKn3c2s1 +dyvyFj1r9k5tmZtq4+OT81rkBiDgXhn5YZJbVcNdT0BSv+JXdzq3TuQzOH56rtph +7xvbHvhlp5UFpuvSKAn21nm1Y0Hihno+t+uZ82XricrTGx7GWZR9vTutid8c4aVm +sUqSsHZtUgdJNqsyoMEDYggeuKQikPSvVEZCJamyZXJaisgNbObln8e5xoqcrt8O +si9Xy+4Jm5J8X/dWZoQqL7+70aK3gfEJ5y7EAT/PTC7LWgIJAbQDvyQPEI5cK7nx +ailDzEzUp36RIrxGMAZdRUgOnzbS4GKBLohfutjs3K9vhP+Q86QnfAhRoGqBpx4Y +W4A0AmONr6NfKh6g+G24hKS/hIKaXegBxQckmH3xO0Oit3X4VeubkSuQHZAWpZwy +jxiEWgJEB6kW+sEGrCHJCEk/kZ/MxLfeDNMkBZXM9PaGG1fwG5PvQLl7aBR0cpF4 +BQIvX7DinLS3kbdIvvVc1BUSuGDjBUqvE/pW7asrcqBmyZ4U408CAwEAAQKCAYAA +0zwTn59w6xUwwt1VLvrkHFZX2dFBh3r5K2rN2+PYTtHTvedmlS/8DMqxTKmBecz0 +0rzfwJfj5RnpH+QnEORv/5jWURJ/dgXzjsRTWx4RrkVACxdDyx9DWIvmgxIlJ0WD +vlG5HE+kbj5yY55mYDPwhl2yqVZLiRC+XOwiTKNSrYcDrior0nQb70hQV6cNRd39 +QxOaAmuH6pe0WG0OyAN5qUlcUjzxeA/Cw8RaI0O8F/EPPZmJWCZjsrap5Du8Wh+i +RC/Vdy2o6JOTPPiwGp3yh7Wgb150fE8/z7fxbYfDlLK8pphaQ2epONgnRpb2Z8Wy +GDwcvVAHExJN8KhAzc2SXhlDXiHi0V5zlvXvZtZF+qsDLhWAeLUeCrh720mFzn4V +NgvVqV7PDcThFBOKbKz0O1M4Rp48uvHczGohbFItEu7oflwHUKkZY8HbRaAVRDYq +bSPSWBajgV+a/eVWB1p9p1JznT2/gXgV/wMLTvm/EITzoWt/mNyrjJPihqK6c0UC +gcEA9vRXGZjHfEbC8nN+VELQRKITAT/uUZdg0MugppmVwErc7zAVi8YdJJaPRnnS +rZw5a+DbHwLh0Jb3Cj7nupFjBg3YJKRjkql2C6Y2NtjTHZ6d0fmUQ9fpW5/WWvJk +WxV3iNs+c/7ZfS+DR2VxLt2wseEK4/tgHggXTUgI5BT44W8sx3o3JzqAP9q+574r +ymnd3edOYBSxbbyHstv0AvI/Lsb3enfezC9B25v7pdilOL3IxTdI4sqUPk0MRDNh +evANAoHBALjb0H0IK3ZB1XVLAKwQur3bfkRcUIimgv7IvzVb52HbV5Tt+9yfbsym +tDWMXUy3De4Aet2YeJYcyKLBrlzP1N6N9W71JtqY2vsJwMdOkZDzRoJnlf/UvUb/ +t6wTC3OUEZ52yX3OmYMH8GRNdwve/4b+hqFqj4jijBqRdU93w68VKHc8PbmuPUlh +JmG0Qr5u3MH4lqS0Xs5ynBWel12vyMvg2k9j1E4UwK3z628fM+OzptnxsHRSaDf7 +9d4Wwy9tywKBwQCQRKH1lClF0tjkZtAwTW/6CGzt1/lTtQmcMLlDWon/cjyrhv2v +UCeKTmKZG2YWgiORgCTCcx6Uivz7AVDCz1h4GvJgRaDd9x29JHabiAOdVCKCnjkd +gS8UlcXWD7DM/Td9vgc4IHPSDEW3Ge4LIPuujvebxCicosFqJmD7Tb0vhZE1X5KE +2ko+A35vR8uxTjONBSnmO3CD2RW1SYW9iuOaYiYFZ63CvwDMWM2kT3IGOejmPavY +wdvkoYI+/X0/IqkCgcEAl0RoaqfQyMg+X+ir+CEIbmu5+z5/OBLphovGy2cVA+J0 +3I2RV4uvIxAWzuq5Phlc8LC72bD2m/+ZvnU2tQPscOFBQTaiQKZsKphkg7MrMq2f +uP4CpIH1ELAYIFrFOCKl+EHDx4rT24EXmTw6eiBUgKaujE+ifKTFeMgmcozSN+bc +YhWNfO+zfuRcf/79zs7xHljJDKX8Hntydc58llFNwmeQvP43sF0S/kVnFls2HtHX +auh3N/hnB8jJ/J4rwhfZAoHAZO6ZLYGz3S9h2s+vpozzmhFNbqs+tp707j3cRbQV +txqmS4KpKk0Kxe4PTrw+XF0qINLL138HlSL8q+OHb1EnYCZqbJkMlzQPUNLHVD7Y +Lxv1thr0+/mJ6SWlNczyiZXGZ9fg7t6gMLTZt6jxiBTylXUti/kNBwhPvoEBQo37 +bTmH2i/2N0jTuK/4ZyooYGvlT5CJf5obWyRZBfOFj4Dp/60ZCD30ov4Ai2WdmsGK +NHHkAI6rg+yf2XH/c+593QtP +-----END PRIVATE KEY----- diff --git a/dev/kafka/kafka_server_jaas.conf b/dev/kafka/kafka_server_jaas.conf new file mode 100644 index 0000000..6c7c4cf --- /dev/null +++ b/dev/kafka/kafka_server_jaas.conf @@ -0,0 +1,14 @@ +KafkaServer { + org.apache.kafka.common.security.plain.PlainLoginModule required + serviceName="kafka" + username="user" + password="bitnami" + user_kafka="bitnami"; +}; + +ControllerServer { + org.apache.kafka.common.security.plain.PlainLoginModule required + serviceName="kafka" + username="controller_user" + password="bitnami"; +}; \ No newline at end of file diff --git a/dev/kafka/keystore/kafka.keystore.jks b/dev/kafka/keystore/kafka.keystore.jks new file mode 100644 index 0000000..409a00a Binary files /dev/null and b/dev/kafka/keystore/kafka.keystore.jks differ diff --git a/dev/kafka/keystore/kafka.keystore.p12 b/dev/kafka/keystore/kafka.keystore.p12 new file mode 100644 index 0000000..b454dba Binary files /dev/null and b/dev/kafka/keystore/kafka.keystore.p12 differ diff --git a/dev/kafka/script.sh b/dev/kafka/script.sh new file mode 100755 index 0000000..39ff3f9 --- /dev/null +++ b/dev/kafka/script.sh @@ -0,0 +1,207 @@ +#!/usr/bin/env bash + +set -e + +KEYSTORE_FILENAME="kafka.keystore.jks" +VALIDITY_IN_DAYS=3650 +DEFAULT_TRUSTSTORE_FILENAME="kafka.truststore.jks" +TRUSTSTORE_WORKING_DIRECTORY="truststore" +KEYSTORE_WORKING_DIRECTORY="keystore" +CA_CERT_FILE="ca-cert" +KEYSTORE_SIGN_REQUEST="cert-file" +KEYSTORE_SIGN_REQUEST_SRL="ca-cert.srl" +KEYSTORE_SIGNED_CERT="cert-signed" + +function file_exists_and_exit() { + echo "'$1' cannot exist. Move or delete it before" + echo "re-running this script." + exit 1 +} + +if [ -e "$KEYSTORE_WORKING_DIRECTORY" ]; then + file_exists_and_exit $KEYSTORE_WORKING_DIRECTORY +fi + +if [ -e "$CA_CERT_FILE" ]; then + file_exists_and_exit $CA_CERT_FILE +fi + +if [ -e "$KEYSTORE_SIGN_REQUEST" ]; then + file_exists_and_exit $KEYSTORE_SIGN_REQUEST +fi + +if [ -e "$KEYSTORE_SIGN_REQUEST_SRL" ]; then + file_exists_and_exit $KEYSTORE_SIGN_REQUEST_SRL +fi + +if [ -e "$KEYSTORE_SIGNED_CERT" ]; then + file_exists_and_exit $KEYSTORE_SIGNED_CERT +fi + +echo +echo "Welcome to the Kafka SSL keystore and truststore generator script." + +echo +echo "First, do you need to generate a trust store and associated private key," +echo "or do you already have a trust store file and private key?" +echo +echo -n "Do you need to generate a trust store and associated private key? [yn] " +read generate_trust_store + +trust_store_file="" +trust_store_private_key_file="" + +if [ "$generate_trust_store" == "y" ]; then + if [ -e "$TRUSTSTORE_WORKING_DIRECTORY" ]; then + file_exists_and_exit $TRUSTSTORE_WORKING_DIRECTORY + fi + + mkdir $TRUSTSTORE_WORKING_DIRECTORY + echo + echo "OK, we'll generate a trust store and associated private key." + echo + echo "First, the private key." + echo + echo "You will be prompted for:" + echo " - A password for the private key. Remember this." + echo " - Information about you and your company." + echo " - NOTE that the Common Name (CN) is currently not important." + + openssl req -new -x509 -keyout $TRUSTSTORE_WORKING_DIRECTORY/ca-key \ + -out $TRUSTSTORE_WORKING_DIRECTORY/$CA_CERT_FILE -days $VALIDITY_IN_DAYS + + trust_store_private_key_file="$TRUSTSTORE_WORKING_DIRECTORY/ca-key" + + echo + echo "Two files were created:" + echo " - $TRUSTSTORE_WORKING_DIRECTORY/ca-key -- the private key used later to" + echo " sign certificates" + echo " - $TRUSTSTORE_WORKING_DIRECTORY/$CA_CERT_FILE -- the certificate that will be" + echo " stored in the trust store in a moment and serve as the certificate" + echo " authority (CA). Once this certificate has been stored in the trust" + echo " store, it will be deleted. It can be retrieved from the trust store via:" + echo " $ keytool -keystore -export -alias CARoot -rfc" + + echo + echo "Now the trust store will be generated from the certificate." + echo + echo "You will be prompted for:" + echo " - the trust store's password (labeled 'keystore'). Remember this" + echo " - a confirmation that you want to import the certificate" + + keytool -keystore $TRUSTSTORE_WORKING_DIRECTORY/$DEFAULT_TRUSTSTORE_FILENAME \ + -alias CARoot -import -file $TRUSTSTORE_WORKING_DIRECTORY/$CA_CERT_FILE + + trust_store_file="$TRUSTSTORE_WORKING_DIRECTORY/$DEFAULT_TRUSTSTORE_FILENAME" + + echo + echo "$TRUSTSTORE_WORKING_DIRECTORY/$DEFAULT_TRUSTSTORE_FILENAME was created." + + # don't need the cert because it's in the trust store. + rm $TRUSTSTORE_WORKING_DIRECTORY/$CA_CERT_FILE +else + echo + echo -n "Enter the path of the trust store file. " + read -e trust_store_file + + if ! [ -f $trust_store_file ]; then + echo "$trust_store_file isn't a file. Exiting." + exit 1 + fi + + echo -n "Enter the path of the trust store's private key. " + read -e trust_store_private_key_file + + if ! [ -f $trust_store_private_key_file ]; then + echo "$trust_store_private_key_file isn't a file. Exiting." + exit 1 + fi +fi + +echo +echo "Continuing with:" +echo " - trust store file: $trust_store_file" +echo " - trust store private key: $trust_store_private_key_file" + +mkdir $KEYSTORE_WORKING_DIRECTORY + +echo +echo "Now, a keystore will be generated. Each broker and logical client needs its own" +echo "keystore. This script will create only one keystore. Run this script multiple" +echo "times for multiple keystores." +echo +echo "You will be prompted for the following:" +echo " - A keystore password. Remember it." +echo " - Personal information, such as your name." +echo " NOTE: currently in Kafka, the Common Name (CN) does not need to be the FQDN of" +echo " this host. However, at some point, this may change. As such, make the CN" +echo " the FQDN. Some operating systems call the CN prompt 'first / last name'" +echo " - A key password, for the key being generated within the keystore. Remember this." + +# To learn more about CNs and FQDNs, read: +# https://docs.oracle.com/javase/7/docs/api/javax/net/ssl/X509ExtendedTrustManager.html + +keytool -keystore $KEYSTORE_WORKING_DIRECTORY/$KEYSTORE_FILENAME \ + -alias localhost -validity $VALIDITY_IN_DAYS -genkey -keyalg RSA + +echo +echo "'$KEYSTORE_WORKING_DIRECTORY/$KEYSTORE_FILENAME' now contains a key pair and a" +echo "self-signed certificate. Again, this keystore can only be used for one broker or" +echo "one logical client. Other brokers or clients need to generate their own keystores." + +echo +echo "Fetching the certificate from the trust store and storing in $CA_CERT_FILE." +echo +echo "You will be prompted for the trust store's password (labeled 'keystore')" + +keytool -keystore $trust_store_file -export -alias CARoot -rfc -file $CA_CERT_FILE + +echo +echo "Now a certificate signing request will be made to the keystore." +echo +echo "You will be prompted for the keystore's password." +keytool -keystore $KEYSTORE_WORKING_DIRECTORY/$KEYSTORE_FILENAME -alias localhost \ + -certreq -file $KEYSTORE_SIGN_REQUEST + +echo +echo "Now the trust store's private key (CA) will sign the keystore's certificate." +echo +echo "You will be prompted for the trust store's private key password." +openssl x509 -req -CA $CA_CERT_FILE -CAkey $trust_store_private_key_file \ + -in $KEYSTORE_SIGN_REQUEST -out $KEYSTORE_SIGNED_CERT \ + -days $VALIDITY_IN_DAYS -CAcreateserial +# creates $KEYSTORE_SIGN_REQUEST_SRL which is never used or needed. + +echo +echo "Now the CA will be imported into the keystore." +echo +echo "You will be prompted for the keystore's password and a confirmation that you want to" +echo "import the certificate." +keytool -keystore $KEYSTORE_WORKING_DIRECTORY/$KEYSTORE_FILENAME -alias CARoot \ + -import -file $CA_CERT_FILE +rm $CA_CERT_FILE # delete the trust store cert because it's stored in the trust store. + +echo +echo "Now the keystore's signed certificate will be imported back into the keystore." +echo +echo "You will be prompted for the keystore's password." +keytool -keystore $KEYSTORE_WORKING_DIRECTORY/$KEYSTORE_FILENAME -alias localhost -import \ + -file $KEYSTORE_SIGNED_CERT + +echo +echo "All done!" +echo +echo "Delete intermediate files? They are:" +echo " - '$KEYSTORE_SIGN_REQUEST_SRL': CA serial number" +echo " - '$KEYSTORE_SIGN_REQUEST': the keystore's certificate signing request" +echo " (that was fulfilled)" +echo " - '$KEYSTORE_SIGNED_CERT': the keystore's certificate, signed by the CA, and stored back" +echo " into the keystore" +echo -n "Delete? [yn] " +read delete_intermediate_files + +if [ "$delete_intermediate_files" == "y" ]; then + rm $KEYSTORE_SIGN_REQUEST_SRL + rm $KEYSTORE_SIGN_REQUEST + rm $KEYSTORE_SIGNED_CERT +fi diff --git a/dev/kafka/truststore/ca-key b/dev/kafka/truststore/ca-key new file mode 100644 index 0000000..5658c2e --- /dev/null +++ b/dev/kafka/truststore/ca-key @@ -0,0 +1,30 @@ +-----BEGIN ENCRYPTED PRIVATE KEY----- +MIIFNTBfBgkqhkiG9w0BBQ0wUjAxBgkqhkiG9w0BBQwwJAQQkFO+uriKONsoGWzw +VIEOqgICCAAwDAYIKoZIhvcNAgkFADAdBglghkgBZQMEASoEENA181yKD9wf6Cef +Emn5DKMEggTQ+xvC/rgR4GNu11C4ecVNgmd1cN5k04VOQdgvbrvriXI5dN/f4RsY +SoZtsq2ZJGI1Dlxf5Jk0bDVByeJb6+xpLk8lAE0gtlIlxB6Aeh2zxTDjniK9mEXJ +HyDsPEVySTelkMe+qM8KVDrYf8K7vsWEJ8qtK6EMrlNGaMgcC42IMVkN9Xt1PhZL +YRkQ4FOCluweUBaTKfs9LLnxxMfLyqy5DRVfG0t872WZddRo5i00i5zz59Xk7MMz +CV144wkljFtdhqcNy6k84+67oDQsvPUxWAoer5KtdL6a9GsjvKA+ECstyg9BOnad +GUEgd/tfaRgdNM/Dyuhy5WeQCDsj5LH9NiTEirX77JVhTPVXN3faBEqjMNkasQsg +h7JUcw5Zvl3PIlRwnz9He11Kg/WwAOYsjYhDmWJV0BVJyUa5DkEDEHo8SA1kBy8n +DvROwJJBOisrk/ZCWviV55AQuI91980BMyfCEk8kJCTcL6Nt0i71oT+xipcEE8sz +7VvTquKY7Vzvrf7G7GeMdWcHnF63m2nQOH/0gvUS3Ffq6O1462QbwmdsxqPJBLqN +U3QsorVfAehwUGOQlucHz3QXy0rqRggsAiIGYXQw+RrCWwnUVaeFlmwUEHYd6970 +zA//Xiw+Fq/K/m6JVDwC+RvdqqFKNLK4dCZILHIaYjFKYSyJiGRxK1xHt3ov8e0C +hOaxq+9Nv9Y+KapNwd0dNazlpRdisY+IBm+kQIw5juVqYcfTVjSmV91KugzKb8Qj +JZppjEDIkva1U8yyZR6J4lp7ieEwGA8B08IBhla+Ml662p+UCB2NTISjtJb4wuFB +xdvUVJ+YbpPTy5D6taw510wSYaO5p8lNorsWegaFhm/Ii27DQhMFH0puMNHOialD +qBK8Tk9+fBGvrbk+b9lnIdHI8TafAPKibbziHAn/pzGaEVr8i6aCrtZJTgRM6KpQ +Q9dDaw/iRp8OZxHdhDz1MizX4EPx3OH8I8SVmRxQ7jcURIBBRRhf9sYaiaQm5XHd ++K1+EDwT7Jl85NJzf7EQL+Se8uRg4jjyiSrUAZUWpLg8KCLg6uh96dOlVbhyTrus +EIED7tWAkumjY3FkBMn2ceVJxO8A6i2sl3w80d3KvKGWfyI6qAw6s6syw2fsIEhp +dizjFnL86oXYYWi+Zx9MovOS4lVnVN2D5QDKomaUeTWBt4Z+vrW6dRV4wOKn/1eW +/CrOvoWT/o0tH0XFnARooqMOWcORWymGKuxR1PPyoLoMwzNJW8eVFhyd86D/m0Sd +clauwgUJ944V+5fpAQt1PMUtwsuLjM+LP9lwmlUp4dRHMvFStlkUwrXcv3pYEKBP +gg1wY/keN7nn5di3PqNCFpnIWYzcWuicVrBfRW1CD/T+zjoI7RAjRalkM0ug26/2 +2xDAcURzCuFhj3NuU8hu17lltVB0aUkEwPnpvk+1jL9t2pGvT1WObD3A2GFZ5rQO +zHkt7y4qHgUU0vP10sq42nduHhupO5s6fUsFUNg1Cc93D50hC7WleSC616wt4JW+ +bCvId5868lt+VnH4DRD/1o1bsLHZ9UtdHVENxV1R+Wfy6/PQq1ROM3MD1geD8F0u +le8oHWhjmoA1k2BdKJ/sdpYsJPlwEi/jDjM49eJmwTjTTQ5SWI9ERT0= +-----END ENCRYPTED PRIVATE KEY----- diff --git a/dev/kafka/truststore/kafka.truststore.jks b/dev/kafka/truststore/kafka.truststore.jks new file mode 100644 index 0000000..3591721 Binary files /dev/null and b/dev/kafka/truststore/kafka.truststore.jks differ diff --git a/sqlflow/config.py b/sqlflow/config.py index b54c288..0ceea5c 100644 --- a/sqlflow/config.py +++ b/sqlflow/config.py @@ -10,6 +10,22 @@ from sqlflow import settings, errors +@dataclass +class KafkaSSLConfig: + ca_location: str # Path to the CA certificate + certificate_location: str # Path to the client certificate + key_location: str # Path to the client private key + key_password: Optional[str] = None + endpoint_identification_algorithm: Optional[str] = None # e.g., https, none (default is https) + + +@dataclass +class KafkaSASLConfig: + mechanism: str # SASL mechanism (e.g., PLAIN, SCRAM-SHA-256) + username: str # SASL username + password: str # SASL password + + @dataclass class HMACConfig: header: str # Header name for the HMAC signature @@ -38,6 +54,9 @@ class IcebergSink: class KafkaSink: brokers: [str] topic: str + security_protocol: Optional[str] = None # Security protocol (e.g., PLAINTEXT, SSL, SASL_SSL) + ssl: Optional[KafkaSSLConfig] = None # SSL configuration + sasl: Optional[KafkaSASLConfig] = None # SASL configuration @dataclass @@ -122,12 +141,16 @@ class SQLCommand: sql: str + @dataclass class KafkaSource: brokers: [str] group_id: str auto_offset_reset: str topics: [str] + security_protocol: Optional[str] = None # Security protocol (e.g., PLAINTEXT, SSL, SASL_SSL) + ssl: Optional[KafkaSSLConfig] = None # SSL configuration + sasl: Optional[KafkaSASLConfig] = None # SASL configuration @dataclass @@ -206,11 +229,32 @@ def build_source_config_from_dict(conf) -> Source: ) if source.type == 'kafka': + ssl_config = None + if 'ssl' in conf['kafka']: + ssl_config = KafkaSSLConfig( + ca_location=conf['kafka']['ssl'].get('ca_location'), + certificate_location=conf['kafka']['ssl'].get('certificate_location'), + key_location=conf['kafka']['ssl'].get('key_location'), + key_password=conf['kafka']['ssl'].get('key_password'), + endpoint_identification_algorithm=conf['kafka']['ssl'].get('endpoint_identification_algorithm'), + ) + + sasl_config = None + if 'sasl' in conf['kafka']: + sasl_config = KafkaSASLConfig( + mechanism=conf['kafka']['sasl']['mechanism'], + username=conf['kafka']['sasl']['username'], + password=conf['kafka']['sasl']['password'], + ) + source.kafka = KafkaSource( brokers=conf['kafka']['brokers'], group_id=conf['kafka']['group_id'], auto_offset_reset=conf['kafka']['auto_offset_reset'], topics=conf['kafka']['topics'], + security_protocol=conf['kafka'].get('security_protocol'), + ssl=ssl_config, + sasl=sasl_config, ) elif source.type == 'websocket': @@ -232,26 +276,48 @@ def build_source_config_from_dict(conf) -> Source: return source -def build_sink_config_from_dict(conf) -> Sink: +def build_sink_config_from_dict(raw_conf) -> Sink: sink = Sink( - type=conf['type'], + type=raw_conf['type'], ) - if 'format' in conf: + if 'format' in raw_conf: sink.format = SinkFormat( - type=conf['format']['type'], + type=raw_conf['format']['type'], ) assert sink.format.type in ('parquet',), "unsupported format: {}".format(sink.format.type) if sink.type == 'kafka': + ssl_config = None + if 'ssl' in raw_conf['kafka']: + ssl_config = KafkaSSLConfig( + ca_location=raw_conf['kafka']['ssl'].get('ca_location'), + certificate_location=raw_conf['kafka']['ssl'].get('certificate_location'), + key_location=raw_conf['kafka']['ssl'].get('key_location'), + key_password=raw_conf['kafka']['ssl'].get('key_password'), + endpoint_identification_algorithm=raw_conf['kafka']['ssl'].get('endpoint_identification_algorithm'), + ) + + sasl_config = None + if 'sasl' in raw_conf['kafka']: + sasl_config = KafkaSASLConfig( + mechanism=raw_conf['kafka']['sasl']['mechanism'], + username=raw_conf['kafka']['sasl']['username'], + password=raw_conf['kafka']['sasl']['password'], + ) + sink.kafka = KafkaSink( - brokers=conf['kafka']['brokers'], - topic=conf['kafka']['topic'], + brokers=raw_conf['kafka']['brokers'], + topic=raw_conf['kafka']['topic'], + security_protocol=raw_conf['kafka'].get('security_protocol'), + ssl=ssl_config, + sasl=sasl_config, ) + elif sink.type == 'sqlcommand': sink.sqlcommand = SQLCommandSink( - sql=conf['sqlcommand']['sql'], - substitutions=[SQLCommandSubstitution(**s) for s in conf['sqlcommand'].get('substitutions', ())], + sql=raw_conf['sqlcommand']['sql'], + substitutions=[SQLCommandSubstitution(**s) for s in raw_conf['sqlcommand'].get('substitutions', ())], ) for substitution in sink.sqlcommand.substitutions: assert substitution.type in ('uuid4',), "unsupported substitution type: {}".format(substitution.type) @@ -259,13 +325,13 @@ def build_sink_config_from_dict(conf) -> Sink: pass elif sink.type == 'iceberg': sink.iceberg = IcebergSink( - catalog_name=conf['iceberg']['catalog_name'], - table_name=conf['iceberg']['table_name'], + catalog_name=raw_conf['iceberg']['catalog_name'], + table_name=raw_conf['iceberg']['table_name'], ) elif sink.type == 'clickhouse': sink.clickhouse = ClikhouseSink( - dsn=conf['clickhouse']['dsn'], - table=conf['clickhouse']['table'], + dsn=raw_conf['clickhouse']['dsn'], + table=raw_conf['clickhouse']['table'], ) else: sink.type = 'console' diff --git a/sqlflow/fixtures/__init__.py b/sqlflow/fixtures/__init__.py index 1068512..d51ae81 100644 --- a/sqlflow/fixtures/__init__.py +++ b/sqlflow/fixtures/__init__.py @@ -32,11 +32,34 @@ class KafkaFaker: - def __init__(self, bootstrap_servers, num_messages, topic, fixture=None): + def __init__(self, + bootstrap_servers, + num_messages, + topic, + fixture=None, + security_protocol=None, + sasl_mechanism=None, + sasl_username=None, + sasl_password=None, + ssl_ca_location=None, + ssl_key_location=None, + ssl_certificate_location=None, + ssl_key_password=None, + ssl_endpoint_identification_algorithm=None + ): self.bootstrap_servers = bootstrap_servers self.num_messages = num_messages self.topic = topic self.fixture = fixture + self.security_protocol = security_protocol + self.sasl_mechanism = sasl_mechanism + self.sasl_username = sasl_username + self.sasl_password = sasl_password + self.ssl_ca_location = ssl_ca_location + self.ssl_key_location = ssl_key_location + self.ssl_certificate_location = ssl_certificate_location + self.ssl_key_password = ssl_key_password + self.ssl_endpoint_identification_algorithm = ssl_endpoint_identification_algorithm def publish(self): conf = { @@ -44,6 +67,24 @@ def publish(self): 'client.id': socket.gethostname() } + if self.security_protocol: + conf['security.protocol'] = self.security_protocol + if self.sasl_mechanism: + conf['sasl.mechanism'] = self.sasl_mechanism + if self.sasl_username and self.sasl_password: + conf['sasl.username'] = self.sasl_username + conf['sasl.password'] = self.sasl_password + if self.ssl_ca_location: + conf['ssl.ca.location'] = self.ssl_ca_location + if self.ssl_key_location: + conf['ssl.key.location'] = self.ssl_key_location + if self.ssl_certificate_location: + conf['ssl.certificate.location'] = self.ssl_certificate_location + if self.ssl_key_password: + conf['ssl.key.password'] = self.ssl_key_password + if self.ssl_endpoint_identification_algorithm: + conf['ssl.endpoint.identification.algorithm'] = self.ssl_endpoint_identification_algorithm + producer = Producer(conf) for i in range(self.num_messages): if self.fixture is None: diff --git a/sqlflow/sinks.py b/sqlflow/sinks.py index e1af1bb..c7330cb 100644 --- a/sqlflow/sinks.py +++ b/sqlflow/sinks.py @@ -187,12 +187,34 @@ def flush(self): pass +def new_producer_from_conf(conf): + producer_conf = { + 'bootstrap.servers': ','.join(conf.brokers), + 'client.id': socket.gethostname(), + } + + if conf.security_protocol: + producer_conf['security.protocol'] = conf.security_protocol + + if conf.sasl: + producer_conf['sasl.mechanism'] = conf.sasl.mechanism + producer_conf['sasl.username'] = conf.sasl.username + producer_conf['sasl.password'] = conf.sasl.password + + if conf.ssl: + producer_conf['ssl.ca.location'] = conf.ssl.ca_location + producer_conf['ssl.certificate.location'] = conf.ssl.certificate_location + producer_conf['ssl.key.location'] = conf.ssl.key_location + producer_conf['ssl.key.password'] = conf.ssl.key_password + producer_conf['ssl.endpoint.identification.algorithm'] = conf.ssl.endpoint_identification_algorithm + + return Producer(producer_conf) + + def new_sink_from_conf(sink_conf: config.Sink, conn) -> Sink: if sink_conf.type == 'kafka': - p = Producer({ - 'bootstrap.servers': ','.join(sink_conf.kafka.brokers), - 'client.id': socket.gethostname(), - }) + p = new_producer_from_conf(sink_conf.kafka) + return KafkaSink( topic=sink_conf.kafka.topic, producer=p, diff --git a/sqlflow/sources/__init__.py b/sqlflow/sources/__init__.py index 68b6e79..ec92075 100644 --- a/sqlflow/sources/__init__.py +++ b/sqlflow/sources/__init__.py @@ -7,16 +7,36 @@ from .webhook import WebhookSource, HMACConfig +def new_consumer_from_conf(conf: config.KafkaSource): + kconf = { + 'bootstrap.servers': ','.join(conf.brokers), + 'group.id': conf.group_id, + 'auto.offset.reset': conf.auto_offset_reset, + 'enable.auto.commit': False, + } + + if conf.security_protocol: + kconf['security.protocol'] = conf.security_protocol + + if conf.sasl: + kconf['sasl.mechanism'] = conf.sasl.mechanism + kconf['sasl.username'] = conf.sasl.username + kconf['sasl.password'] = conf.sasl.password + + if conf.ssl: + kconf['ssl.ca.location'] = conf.ssl.ca_location + kconf['ssl.certificate.location'] = conf.ssl.certificate_location + kconf['ssl.key.location'] = conf.ssl.key_location + kconf['ssl.key.password'] = conf.ssl.key_password + kconf['ssl.endpoint.identification.algorithm'] = conf.ssl.endpoint_identification_algorithm + + consumer = Consumer(kconf) + return consumer + + def new_source_from_conf(source_conf: config.Source): if source_conf.type == 'kafka': - kconf = { - 'bootstrap.servers': ','.join(source_conf.kafka.brokers), - 'group.id': source_conf.kafka.group_id, - 'auto.offset.reset': source_conf.kafka.auto_offset_reset, - 'enable.auto.commit': False, - } - - consumer = Consumer(kconf) + consumer = new_consumer_from_conf(source_conf.kafka) return KafkaSource( consumer=consumer, diff --git a/sqlflow/sources/kafka.py b/sqlflow/sources/kafka.py index dd99e95..e394944 100644 --- a/sqlflow/sources/kafka.py +++ b/sqlflow/sources/kafka.py @@ -1,3 +1,4 @@ +import logging import typing from confluent_kafka import KafkaError @@ -5,6 +6,9 @@ from .base import Source, Message, SourceException +logger = logging.getLogger(__name__) + + class KafkaSource(Source): def __init__(self, consumer, diff --git a/sqlflow/static/schemas/config.json b/sqlflow/static/schemas/config.json index a602b5c..607d55d 100644 --- a/sqlflow/static/schemas/config.json +++ b/sqlflow/static/schemas/config.json @@ -278,6 +278,60 @@ "items": { "type": "string" } + }, + "security_protocol": { + "type": "string", + "enum": [ + "SASL_SSL", + "SSL", + "SASL_PLAINTEXT", + "PLAINTEXT" + ] + }, + "ssl": { + "type": "object", + "properties": { + "ca_location": { + "type": "string" + }, + "key_location": { + "type": "string" + }, + "certificate_location": { + "type": "string" + }, + "key_password": { + "type": "string" + }, + "endpoint_identification_algorithm": { + "type": "string" + } + } + }, + "sasl": { + "type": "object", + "properties": { + "mechanism": { + "type": "string", + "enum": [ + "PLAIN", + "SCRAM-SHA-256", + "SCRAM-SHA-512", + "GSSAPI" + ] + }, + "username": { + "type": "string" + }, + "password": { + "type": "string" + } + }, + "required": [ + "mechanism", + "username", + "password" + ] } }, "required": [ diff --git a/tests/test_configs.py b/tests/test_configs.py index 292d3f4..1c4579c 100644 --- a/tests/test_configs.py +++ b/tests/test_configs.py @@ -49,6 +49,7 @@ def setUp(self): 'kafka.mem.iceberg.yml', 'kafka.postgres.join.yml', 'kafka.postgres.sink.yml', + 'kafka.sasl-tls.yml', 'kafka.structured.disk.yml', 'kafka.structured.mem.yml', 'local.parquet.sink.yml',