From 7119b6d2c8514c867a7645bf7ba320ace74daad7 Mon Sep 17 00:00:00 2001 From: "turbolytics.io" Date: Sat, 21 Jun 2025 19:15:23 -0400 Subject: [PATCH 1/7] trying --- Makefile | 4 ++++ dev/kafka-single.yml | 38 ++++++++++++++++++++++++++++++++++++-- 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index 0d1da41..3061824 100644 --- a/Makefile +++ b/Makefile @@ -31,6 +31,10 @@ test-release: docker-image start-backing-services: docker-compose -f dev/kafka-single.yml up -d +.PHONY: stop-backing-services +stop-backing-services: + docker-compose -f dev/kafka-single.yml down --remove-orphans + .PHONY: docker-image docker-image: @GIT_HASH=$$(git rev-parse --short HEAD) && \ diff --git a/dev/kafka-single.yml b/dev/kafka-single.yml index 78b31e1..adaa69e 100644 --- a/dev/kafka-single.yml +++ b/dev/kafka-single.yml @@ -20,10 +20,44 @@ services: - "9092:9092" - "29092:29092" - "9999:9999" + - "9093:9093" + volumes: + - ./kafka/secrets:/etc/kafka/secrets environment: - KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: | + INTERNAL://kafka1:19092, + EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092, + DOCKER://host.docker.internal:29092, + SSL://kafka1:9093 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: | + INTERNAL:PLAINTEXT, + EXTERNAL:PLAINTEXT, + DOCKER:PLAINTEXT, + SSL:SSL KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL + KAFKA_LISTENERS: | + INTERNAL://0.0.0.0:19092, + EXTERNAL://0.0.0.0:9092, + DOCKER://0.0.0.0:29092, + SSL://0.0.0.0:9093 + + # SSL configuration + KAFKA_SSL_KEYSTORE_FILENAME: kafka.keystore.jks + KAFKA_SSL_KEYSTORE_CREDENTIALS: keystore_creds + KAFKA_SSL_KEY_CREDENTIALS: key_creds + KAFKA_SSL_TRUSTSTORE_FILENAME: kafka.truststore.jks + KAFKA_SSL_TRUSTSTORE_CREDENTIALS: truststore_creds + + # Keystore/truststore passwords (via credentials file or use *_PASSWORD env vars instead) + KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "" # disable hostname verification (useful in dev) + KAFKA_SSL_CLIENT_AUTH: none # change to 'required' if using mTLS + + KAFKA_SASL_MECHANISM: PLAIN + KAFKA_SASL_JAAS_CONFIG: | + org.apache.kafka.common.security.plain.PlainLoginModule required + username="testuser" + password="testpassword"; + KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181" KAFKA_BROKER_ID: 1 KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" From 5eb8f8815dbb351446b0f937a3d3c25556ea626b Mon Sep 17 00:00:00 2001 From: "turbolytics.io" Date: Sun, 22 Jun 2025 05:56:27 -0400 Subject: [PATCH 2/7] Kafka Sasl/TLS config --- dev/config/examples/kafka.sasl.yml | 24 +++++++++++++++++++ dev/kafka-single.yml | 38 ++---------------------------- dev/kafka/kafka_server_jaas.conf | 5 ++++ sqlflow/config.py | 36 ++++++++++++++++++++++++++++ sqlflow/sources/__init__.py | 7 ++++++ sqlflow/sources/kafka.py | 4 ++++ 6 files changed, 78 insertions(+), 36 deletions(-) create mode 100644 dev/config/examples/kafka.sasl.yml create mode 100644 dev/kafka/kafka_server_jaas.conf diff --git a/dev/config/examples/kafka.sasl.yml b/dev/config/examples/kafka.sasl.yml new file mode 100644 index 0000000..1f66f6f --- /dev/null +++ b/dev/config/examples/kafka.sasl.yml @@ -0,0 +1,24 @@ +pipeline: + batch_size: {{ SQLFLOW_BATCH_SIZE|default(1) }} + + source: + type: kafka + kafka: + brokers: [{{ SQLFLOW_KAFKA_BROKERS|default('127.0.0.1:9094') }}] + group_id: test + auto_offset_reset: earliest + security_protocol: SASL_PLAINTEXT + sasl: + mechanism: PLAIN + username: testuser + password: testpassword + + topics: + - "input-sasl-1" + + handler: + type: "handlers.InferredMemBatch" + sql: SELECT * FROM batch + + sink: + type: console diff --git a/dev/kafka-single.yml b/dev/kafka-single.yml index adaa69e..78b31e1 100644 --- a/dev/kafka-single.yml +++ b/dev/kafka-single.yml @@ -20,44 +20,10 @@ services: - "9092:9092" - "29092:29092" - "9999:9999" - - "9093:9093" - volumes: - - ./kafka/secrets:/etc/kafka/secrets environment: - KAFKA_ADVERTISED_LISTENERS: | - INTERNAL://kafka1:19092, - EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092, - DOCKER://host.docker.internal:29092, - SSL://kafka1:9093 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: | - INTERNAL:PLAINTEXT, - EXTERNAL:PLAINTEXT, - DOCKER:PLAINTEXT, - SSL:SSL + KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL - KAFKA_LISTENERS: | - INTERNAL://0.0.0.0:19092, - EXTERNAL://0.0.0.0:9092, - DOCKER://0.0.0.0:29092, - SSL://0.0.0.0:9093 - - # SSL configuration - KAFKA_SSL_KEYSTORE_FILENAME: kafka.keystore.jks - KAFKA_SSL_KEYSTORE_CREDENTIALS: keystore_creds - KAFKA_SSL_KEY_CREDENTIALS: key_creds - KAFKA_SSL_TRUSTSTORE_FILENAME: kafka.truststore.jks - KAFKA_SSL_TRUSTSTORE_CREDENTIALS: truststore_creds - - # Keystore/truststore passwords (via credentials file or use *_PASSWORD env vars instead) - KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "" # disable hostname verification (useful in dev) - KAFKA_SSL_CLIENT_AUTH: none # change to 'required' if using mTLS - - KAFKA_SASL_MECHANISM: PLAIN - KAFKA_SASL_JAAS_CONFIG: | - org.apache.kafka.common.security.plain.PlainLoginModule required - username="testuser" - password="testpassword"; - KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181" KAFKA_BROKER_ID: 1 KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" diff --git a/dev/kafka/kafka_server_jaas.conf b/dev/kafka/kafka_server_jaas.conf new file mode 100644 index 0000000..ca3e9b7 --- /dev/null +++ b/dev/kafka/kafka_server_jaas.conf @@ -0,0 +1,5 @@ +KafkaServer { + org.apache.kafka.common.security.plain.PlainLoginModule required + username="testuser" + password="testpassword"; +}; \ No newline at end of file diff --git a/sqlflow/config.py b/sqlflow/config.py index b54c288..c418707 100644 --- a/sqlflow/config.py +++ b/sqlflow/config.py @@ -122,12 +122,29 @@ class SQLCommand: sql: str +@dataclass +class KafkaSSLConfig: + ca_location: str # Path to the CA certificate + certificate_location: str # Path to the client certificate + key_location: str # Path to the client private key + + +@dataclass +class KafkaSASLConfig: + mechanism: str # SASL mechanism (e.g., PLAIN, SCRAM-SHA-256) + username: str # SASL username + password: str # SASL password + + @dataclass class KafkaSource: brokers: [str] group_id: str auto_offset_reset: str topics: [str] + security_protocol: Optional[str] = None # Security protocol (e.g., PLAINTEXT, SSL, SASL_SSL) + ssl: Optional[KafkaSSLConfig] = None # SSL configuration + sasl: Optional[KafkaSASLConfig] = None # SASL configuration @dataclass @@ -206,11 +223,30 @@ def build_source_config_from_dict(conf) -> Source: ) if source.type == 'kafka': + ssl_config = None + if 'ssl' in conf['kafka']: + ssl_config = KafkaSSLConfig( + ca_location=conf['kafka']['ssl']['ca_location'], + certificate_location=conf['kafka']['ssl']['certificate_location'], + key_location=conf['kafka']['ssl']['key_location'], + ) + + sasl_config = None + if 'sasl' in conf['kafka']: + sasl_config = KafkaSASLConfig( + mechanism=conf['kafka']['sasl']['mechanism'], + username=conf['kafka']['sasl']['username'], + password=conf['kafka']['sasl']['password'], + ) + source.kafka = KafkaSource( brokers=conf['kafka']['brokers'], group_id=conf['kafka']['group_id'], auto_offset_reset=conf['kafka']['auto_offset_reset'], topics=conf['kafka']['topics'], + security_protocol=conf['kafka'].get('security_protocol'), + ssl=ssl_config, + sasl=sasl_config, ) elif source.type == 'websocket': diff --git a/sqlflow/sources/__init__.py b/sqlflow/sources/__init__.py index 68b6e79..8fdc75c 100644 --- a/sqlflow/sources/__init__.py +++ b/sqlflow/sources/__init__.py @@ -15,6 +15,13 @@ def new_source_from_conf(source_conf: config.Source): 'auto.offset.reset': source_conf.kafka.auto_offset_reset, 'enable.auto.commit': False, } + import ipdb; ipdb.set_trace(); + + if source_conf.kafka.security_protocol == 'SASL_PLAINTEXT': + kconf['security.protocol'] = 'SASL_PLAINTEXT' + kconf['sasl.mechanism'] = source_conf.kafka.sasl.mechanism + kconf['sasl.username'] = source_conf.kafka.sasl.username + kconf['sasl.password'] = source_conf.kafka.sasl.password consumer = Consumer(kconf) diff --git a/sqlflow/sources/kafka.py b/sqlflow/sources/kafka.py index dd99e95..e394944 100644 --- a/sqlflow/sources/kafka.py +++ b/sqlflow/sources/kafka.py @@ -1,3 +1,4 @@ +import logging import typing from confluent_kafka import KafkaError @@ -5,6 +6,9 @@ from .base import Source, Message, SourceException +logger = logging.getLogger(__name__) + + class KafkaSource(Source): def __init__(self, consumer, From 3766b98543c0d3304a1b3b37b5c53e665f0cfeff Mon Sep 17 00:00:00 2001 From: "turbolytics.io" Date: Sun, 22 Jun 2025 16:56:47 -0400 Subject: [PATCH 3/7] Kafka local SSL SASL config --- dev/config/examples/kafka.sasl-tls.yml | 30 ++++ dev/config/examples/kafka.sasl.yml | 24 --- dev/kafka-sasl-tls.yml | 29 +++ dev/kafka.env | 28 +++ dev/kafka/certs/ca-cert.pem | 21 +++ dev/kafka/certs/client-cert.pem | 29 +++ dev/kafka/certs/client-key.pem | 44 +++++ dev/kafka/kafka_server_jaas.conf | 13 +- dev/kafka/keystore/kafka.keystore.jks | Bin 0 -> 5470 bytes dev/kafka/keystore/kafka.keystore.p12 | Bin 0 -> 5470 bytes dev/kafka/script.sh | 207 ++++++++++++++++++++++ dev/kafka/truststore/ca-key | 30 ++++ dev/kafka/truststore/kafka.truststore.jks | Bin 0 -> 1254 bytes sqlflow/config.py | 10 +- sqlflow/sources/__init__.py | 19 ++ 15 files changed, 455 insertions(+), 29 deletions(-) create mode 100644 dev/config/examples/kafka.sasl-tls.yml delete mode 100644 dev/config/examples/kafka.sasl.yml create mode 100644 dev/kafka-sasl-tls.yml create mode 100644 dev/kafka.env create mode 100644 dev/kafka/certs/ca-cert.pem create mode 100644 dev/kafka/certs/client-cert.pem create mode 100644 dev/kafka/certs/client-key.pem create mode 100644 dev/kafka/keystore/kafka.keystore.jks create mode 100644 dev/kafka/keystore/kafka.keystore.p12 create mode 100755 dev/kafka/script.sh create mode 100644 dev/kafka/truststore/ca-key create mode 100644 dev/kafka/truststore/kafka.truststore.jks diff --git a/dev/config/examples/kafka.sasl-tls.yml b/dev/config/examples/kafka.sasl-tls.yml new file mode 100644 index 0000000..b25f64e --- /dev/null +++ b/dev/config/examples/kafka.sasl-tls.yml @@ -0,0 +1,30 @@ +pipeline: + batch_size: {{ SQLFLOW_BATCH_SIZE|default(1) }} + + source: + type: kafka + kafka: + brokers: [{{ SQLFLOW_KAFKA_BROKERS|default('localhost:9092') }}] + group_id: test + auto_offset_reset: earliest + security_protocol: SASL_SSL + ssl: + ca_location: {{ SQLFLOW_ROOT_DIR }}/dev/kafka/certs/ca-cert.pem + key_location: {{ SQLFLOW_ROOT_DIR }}/dev/kafka/certs/client-key.pem + certificate_location: {{ SQLFLOW_ROOT_DIR }}/dev/kafka/certs/client-cert.pem + key_password: testpass + endpoint_identification_algorithm: 'none' + sasl: + mechanism: PLAIN + username: user + password: bitnami + + topics: + - "input-sasl-tls-1" + + handler: + type: "handlers.InferredMemBatch" + sql: SELECT * FROM batch + + sink: + type: console diff --git a/dev/config/examples/kafka.sasl.yml b/dev/config/examples/kafka.sasl.yml deleted file mode 100644 index 1f66f6f..0000000 --- a/dev/config/examples/kafka.sasl.yml +++ /dev/null @@ -1,24 +0,0 @@ -pipeline: - batch_size: {{ SQLFLOW_BATCH_SIZE|default(1) }} - - source: - type: kafka - kafka: - brokers: [{{ SQLFLOW_KAFKA_BROKERS|default('127.0.0.1:9094') }}] - group_id: test - auto_offset_reset: earliest - security_protocol: SASL_PLAINTEXT - sasl: - mechanism: PLAIN - username: testuser - password: testpassword - - topics: - - "input-sasl-1" - - handler: - type: "handlers.InferredMemBatch" - sql: SELECT * FROM batch - - sink: - type: console diff --git a/dev/kafka-sasl-tls.yml b/dev/kafka-sasl-tls.yml new file mode 100644 index 0000000..adc9cd1 --- /dev/null +++ b/dev/kafka-sasl-tls.yml @@ -0,0 +1,29 @@ +version: '3.7' + +services: + kafka: + image: bitnami/kafka + # container_name: example.kafka.com + # hostname: example.kafka.com + ports: + - "9092:9092" + - "9093:9093" + expose: + - "9093" + env_file: + - ./kafka.env + volumes: + - ./kafka/kafka_server_jaas.conf:/opt/bitnami/kafka/config/kafka_server_jaas.conf + # - ./server.properties:/opt/bitnami/kafka/config/server.properties + - ./kafka/keystore/kafka.keystore.jks:/opt/bitnami/kafka/config/certs/kafka.keystore.jks:ro + - ./kafka/truststore/kafka.truststore.jks:/opt/bitnami/kafka/config/certs/kafka.truststore.jks:ro + # - ./producer.properties:/opt/bitnami/kafka/config/producer.properties + # - ./consumer.properties:/opt/bitnami/kafka/config/consumer.properties + networks: + - kafka + +networks: + kafka: +# driver: bridge +# name: kafka +# external: true \ No newline at end of file diff --git a/dev/kafka.env b/dev/kafka.env new file mode 100644 index 0000000..fada15b --- /dev/null +++ b/dev/kafka.env @@ -0,0 +1,28 @@ +KAFKA_CFG_NODE_ID=0 +KAFKA_CFG_PROCESS_ROLES=controller,broker +KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093 +KAFKA_CFG_LISTENERS=SASL_SSL://0.0.0.0:9092,CONTROLLER://:9093 +KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:SASL_PLAINTEXT,SASL_SSL:SASL_SSL +KAFKA_CFG_ADVERTISED_LISTENERS=SASL_SSL://localhost:9092 +KAFKA_CLIENT_USERS=user +KAFKA_CLIENT_PASSWORDS=bitnami +KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER +KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL=PLAIN +KAFKA_CFG_INTER_BROKER_LISTENER_NAME=SASL_SSL +KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN +KAFKA_TLS_TYPE=JKS +KAFKA_CONTROLLER_USER=controller_user +KAFKA_CONTROLLER_PASSWORD=bitnami +KAFKA_INTER_BROKER_USER=controller_user +KAFKA_INTER_BROKER_PASSWORD=bitnami +KAFKA_OPTS="-Djava.security.auth.login.config=/opt/bitnami/kafka/config/kafka_server_jaas.conf" +KAFKA_CERTIFICATE_PASSWORD=testpass +KAFKA_SSL_KEYSTORE_PASSWORD=testpass +KAFKA_SSL_KEY_PASSWORD=testpass +KAFKA_CFG_SSL_KEYSTORE_LOCATION=/opt/bitnami/kafka/config/certs/kafka.keystore.jks +KAFKA_CFG_SSL_KEYSTORE_TYPE=PKCS12 +KAFKA_CFG_SSL_KEYSTORE_PASSWORD=testpass +KAFKA_CFG_SSL_KEY_PASSWORD=testpass +KAFKA_CFG_SSL_KEYSTORE_TYPE=PKCS12 +KAFKA_CFG_SSL_TRUSTSTORE_LOCATION=/opt/bitnami/kafka/config/certs/kafka.truststore.jks +KAFKA_CFG_SSL_TRUSTSTORE_PASSWORD=testpass \ No newline at end of file diff --git a/dev/kafka/certs/ca-cert.pem b/dev/kafka/certs/ca-cert.pem new file mode 100644 index 0000000..f87a4ff --- /dev/null +++ b/dev/kafka/certs/ca-cert.pem @@ -0,0 +1,21 @@ +-----BEGIN CERTIFICATE----- +MIIDazCCAlOgAwIBAgIUGm/Ovt5PWDQGxu0AwDVOIKVX6YgwDQYJKoZIhvcNAQEL +BQAwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM +GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDAeFw0yNTA2MjIxOTQ5MDJaFw0zNTA2 +MjAxOTQ5MDJaMEUxCzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEw +HwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQwggEiMA0GCSqGSIb3DQEB +AQUAA4IBDwAwggEKAoIBAQDcnhgQ3LSvXbX2jT3D21/EeUiZ9hqUTt/c7Som/Ux9 +94/i/n6gQIitGx0fT98zxxJ1aqW2I7LxVrKRzWVdnzr2mnyX5P1S0Ke/K1Sw1pmk +A2pkz3NurC9R0XWdrosTcl1k3H8OuPfIlTS4DGN4R+sgTVKiny4wJenTIjXPU73S +sXw0Ojkke7M+Y68z1P9Go/IrAB2YqnQfHpA8JR8Y4np0+Q1E3QJJ1//Bft2wHsyK +GTZcZbC9jap6gX617yVoafQMtQI6Ka8Ho/3YnHkMmynPzCfcwzhjXS43EWog5o6Y +PidXyKDtOF2xoUZg005wcdPYAkvH/FHTC8QSG8nnyvLpAgMBAAGjUzBRMB0GA1Ud +DgQWBBRmzHAMbyjcaLXwLS/HCcCIeUn/0jAfBgNVHSMEGDAWgBRmzHAMbyjcaLXw +LS/HCcCIeUn/0jAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQCD +hv7LLuuhrmW69GWK4Ny6YucGoZKf3sryw5OXGn85DHchvjp+stnCtEHPTZZNCtLZ +khJP0qiBMIf6djcs/q8CVAXVaypjEyzCQOQ3UOsgCQekRXrzedWCsA8VV6ZWaTF7 ++VUSlAYsaSo38DFAA/WtkGTdffEvtEOd989xNxnPqY8CZTJOs5a3UtR4cVdyiPRC +jjyTEFtmlBCnJe98EJB1+7/H5dfggq84KiR9hRayqDBbXAvefxP/5tnTqYHRVLZh +I/y9AYnarSvP0wYJo+5CyCsuPTggDcig877Fkx5jlZHtyAwO/QAsFEKDXl4zQugw +MOdI5pqGrasUstSd2OnB +-----END CERTIFICATE----- diff --git a/dev/kafka/certs/client-cert.pem b/dev/kafka/certs/client-cert.pem new file mode 100644 index 0000000..c75d41e --- /dev/null +++ b/dev/kafka/certs/client-cert.pem @@ -0,0 +1,29 @@ +Bag Attributes + friendlyName: localhost + localKeyID: 54 69 6D 65 20 31 37 35 30 36 32 32 36 34 37 39 34 36 +subject=C=Unknown, ST=Unknown, L=Unknown, O=Unknown, OU=Unknown, CN=Unknown +issuer=C=AU, ST=Some-State, O=Internet Widgits Pty Ltd +-----BEGIN CERTIFICATE----- +MIIEATCCAumgAwIBAgIUTE7K4lwDmcrdpAMtvDe3a5zOgNowDQYJKoZIhvcNAQEL +BQAwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM +GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDAeFw0yNTA2MjIxOTQ5NDRaFw0zNTA2 +MjAxOTQ5NDRaMGwxEDAOBgNVBAYTB1Vua25vd24xEDAOBgNVBAgTB1Vua25vd24x +EDAOBgNVBAcTB1Vua25vd24xEDAOBgNVBAoTB1Vua25vd24xEDAOBgNVBAsTB1Vu +a25vd24xEDAOBgNVBAMTB1Vua25vd24wggGiMA0GCSqGSIb3DQEBAQUAA4IBjwAw +ggGKAoIBgQCyU6rADKn3c2s1dyvyFj1r9k5tmZtq4+OT81rkBiDgXhn5YZJbVcNd +T0BSv+JXdzq3TuQzOH56rtph7xvbHvhlp5UFpuvSKAn21nm1Y0Hihno+t+uZ82Xr +icrTGx7GWZR9vTutid8c4aVmsUqSsHZtUgdJNqsyoMEDYggeuKQikPSvVEZCJamy +ZXJaisgNbObln8e5xoqcrt8Osi9Xy+4Jm5J8X/dWZoQqL7+70aK3gfEJ5y7EAT/P +TC7LWgIJAbQDvyQPEI5cK7nxailDzEzUp36RIrxGMAZdRUgOnzbS4GKBLohfutjs +3K9vhP+Q86QnfAhRoGqBpx4YW4A0AmONr6NfKh6g+G24hKS/hIKaXegBxQckmH3x +O0Oit3X4VeubkSuQHZAWpZwyjxiEWgJEB6kW+sEGrCHJCEk/kZ/MxLfeDNMkBZXM +9PaGG1fwG5PvQLl7aBR0cpF4BQIvX7DinLS3kbdIvvVc1BUSuGDjBUqvE/pW7asr +cqBmyZ4U408CAwEAAaNCMEAwHQYDVR0OBBYEFLdWB6B5qP37YK5lvczApUEKiqOT +MB8GA1UdIwQYMBaAFGbMcAxvKNxotfAtL8cJwIh5Sf/SMA0GCSqGSIb3DQEBCwUA +A4IBAQANIAKY/c7WgUr1dgax7P9RlS6co/4kY3EAfv6Cyr3Qcxg5uXjEZOvsAHLH +ZKqpS27FibSX6ItkkEhVWE2pQSb5aq+Ng/87AHRjEO0bPRIYDKTwTIOxnoESh2UD +wWqfG7mxCY8OebvTALfToyBHfKVAuxp/aIZwbVxgDluBSHG5E59S2pmPwYr10fMJ +q5C/6Ak+4c3BxH9+1o70NQpucE8rcvEgU1f2b2XhrMVoXI0bfCQ/1H5HT9Up9k+p +3zoJOzNPgrWyEomJz28QnQUB2OiA/ObzeaIfwDsttFNa/BgjNjBzOcdNrLSTnc8z +eAuHWLMZ1JDGgW8ogtuVi0qg5mMd +-----END CERTIFICATE----- diff --git a/dev/kafka/certs/client-key.pem b/dev/kafka/certs/client-key.pem new file mode 100644 index 0000000..5c5b2d3 --- /dev/null +++ b/dev/kafka/certs/client-key.pem @@ -0,0 +1,44 @@ +Bag Attributes + friendlyName: localhost + localKeyID: 54 69 6D 65 20 31 37 35 30 36 32 32 36 34 37 39 34 36 +Key Attributes: +-----BEGIN PRIVATE KEY----- +MIIG/gIBADANBgkqhkiG9w0BAQEFAASCBugwggbkAgEAAoIBgQCyU6rADKn3c2s1 +dyvyFj1r9k5tmZtq4+OT81rkBiDgXhn5YZJbVcNdT0BSv+JXdzq3TuQzOH56rtph +7xvbHvhlp5UFpuvSKAn21nm1Y0Hihno+t+uZ82XricrTGx7GWZR9vTutid8c4aVm +sUqSsHZtUgdJNqsyoMEDYggeuKQikPSvVEZCJamyZXJaisgNbObln8e5xoqcrt8O +si9Xy+4Jm5J8X/dWZoQqL7+70aK3gfEJ5y7EAT/PTC7LWgIJAbQDvyQPEI5cK7nx +ailDzEzUp36RIrxGMAZdRUgOnzbS4GKBLohfutjs3K9vhP+Q86QnfAhRoGqBpx4Y +W4A0AmONr6NfKh6g+G24hKS/hIKaXegBxQckmH3xO0Oit3X4VeubkSuQHZAWpZwy +jxiEWgJEB6kW+sEGrCHJCEk/kZ/MxLfeDNMkBZXM9PaGG1fwG5PvQLl7aBR0cpF4 +BQIvX7DinLS3kbdIvvVc1BUSuGDjBUqvE/pW7asrcqBmyZ4U408CAwEAAQKCAYAA +0zwTn59w6xUwwt1VLvrkHFZX2dFBh3r5K2rN2+PYTtHTvedmlS/8DMqxTKmBecz0 +0rzfwJfj5RnpH+QnEORv/5jWURJ/dgXzjsRTWx4RrkVACxdDyx9DWIvmgxIlJ0WD +vlG5HE+kbj5yY55mYDPwhl2yqVZLiRC+XOwiTKNSrYcDrior0nQb70hQV6cNRd39 +QxOaAmuH6pe0WG0OyAN5qUlcUjzxeA/Cw8RaI0O8F/EPPZmJWCZjsrap5Du8Wh+i +RC/Vdy2o6JOTPPiwGp3yh7Wgb150fE8/z7fxbYfDlLK8pphaQ2epONgnRpb2Z8Wy +GDwcvVAHExJN8KhAzc2SXhlDXiHi0V5zlvXvZtZF+qsDLhWAeLUeCrh720mFzn4V +NgvVqV7PDcThFBOKbKz0O1M4Rp48uvHczGohbFItEu7oflwHUKkZY8HbRaAVRDYq +bSPSWBajgV+a/eVWB1p9p1JznT2/gXgV/wMLTvm/EITzoWt/mNyrjJPihqK6c0UC +gcEA9vRXGZjHfEbC8nN+VELQRKITAT/uUZdg0MugppmVwErc7zAVi8YdJJaPRnnS +rZw5a+DbHwLh0Jb3Cj7nupFjBg3YJKRjkql2C6Y2NtjTHZ6d0fmUQ9fpW5/WWvJk +WxV3iNs+c/7ZfS+DR2VxLt2wseEK4/tgHggXTUgI5BT44W8sx3o3JzqAP9q+574r +ymnd3edOYBSxbbyHstv0AvI/Lsb3enfezC9B25v7pdilOL3IxTdI4sqUPk0MRDNh +evANAoHBALjb0H0IK3ZB1XVLAKwQur3bfkRcUIimgv7IvzVb52HbV5Tt+9yfbsym +tDWMXUy3De4Aet2YeJYcyKLBrlzP1N6N9W71JtqY2vsJwMdOkZDzRoJnlf/UvUb/ +t6wTC3OUEZ52yX3OmYMH8GRNdwve/4b+hqFqj4jijBqRdU93w68VKHc8PbmuPUlh +JmG0Qr5u3MH4lqS0Xs5ynBWel12vyMvg2k9j1E4UwK3z628fM+OzptnxsHRSaDf7 +9d4Wwy9tywKBwQCQRKH1lClF0tjkZtAwTW/6CGzt1/lTtQmcMLlDWon/cjyrhv2v +UCeKTmKZG2YWgiORgCTCcx6Uivz7AVDCz1h4GvJgRaDd9x29JHabiAOdVCKCnjkd +gS8UlcXWD7DM/Td9vgc4IHPSDEW3Ge4LIPuujvebxCicosFqJmD7Tb0vhZE1X5KE +2ko+A35vR8uxTjONBSnmO3CD2RW1SYW9iuOaYiYFZ63CvwDMWM2kT3IGOejmPavY +wdvkoYI+/X0/IqkCgcEAl0RoaqfQyMg+X+ir+CEIbmu5+z5/OBLphovGy2cVA+J0 +3I2RV4uvIxAWzuq5Phlc8LC72bD2m/+ZvnU2tQPscOFBQTaiQKZsKphkg7MrMq2f +uP4CpIH1ELAYIFrFOCKl+EHDx4rT24EXmTw6eiBUgKaujE+ifKTFeMgmcozSN+bc +YhWNfO+zfuRcf/79zs7xHljJDKX8Hntydc58llFNwmeQvP43sF0S/kVnFls2HtHX +auh3N/hnB8jJ/J4rwhfZAoHAZO6ZLYGz3S9h2s+vpozzmhFNbqs+tp707j3cRbQV +txqmS4KpKk0Kxe4PTrw+XF0qINLL138HlSL8q+OHb1EnYCZqbJkMlzQPUNLHVD7Y +Lxv1thr0+/mJ6SWlNczyiZXGZ9fg7t6gMLTZt6jxiBTylXUti/kNBwhPvoEBQo37 +bTmH2i/2N0jTuK/4ZyooYGvlT5CJf5obWyRZBfOFj4Dp/60ZCD30ov4Ai2WdmsGK +NHHkAI6rg+yf2XH/c+593QtP +-----END PRIVATE KEY----- diff --git a/dev/kafka/kafka_server_jaas.conf b/dev/kafka/kafka_server_jaas.conf index ca3e9b7..6c7c4cf 100644 --- a/dev/kafka/kafka_server_jaas.conf +++ b/dev/kafka/kafka_server_jaas.conf @@ -1,5 +1,14 @@ KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required - username="testuser" - password="testpassword"; + serviceName="kafka" + username="user" + password="bitnami" + user_kafka="bitnami"; +}; + +ControllerServer { + org.apache.kafka.common.security.plain.PlainLoginModule required + serviceName="kafka" + username="controller_user" + password="bitnami"; }; \ No newline at end of file diff --git a/dev/kafka/keystore/kafka.keystore.jks b/dev/kafka/keystore/kafka.keystore.jks new file mode 100644 index 0000000000000000000000000000000000000000..409a00a69c4ce50ec6868a2c7adaaea88baf25ee GIT binary patch literal 5470 zcma)gWmFUZ(=IICwSbf?At|whbf+{dCEX=09Rf>9N+TW1Qqr9ZA|Tz}-Ki+BfL!mn z_q^Y|zrOFsoSAth&is33h8Ie11VBaOg_5IV;&4PMN8J*jVxbm7fxpqAz-wM8@SGQl zb@Ts)u=dfRSlj>L^?$OAiT8i2@NrQAg;1=}e_#-=-G3@D2zcM|3jddj&&!Dpyb;MN za(efE4zDIB|B;aKW1PIKJ^;W<3_u0(;$vd}pN;?=3>02EOdN|SWmH=;{C#y z$cL@$`MkK=io-48oqvN6d!TYM_@PpY!C*^8ntu*LAp{c&e@`Q!9&@a$A%4~RY>9&f znH#KApiNhicjuUhPY=^Ldw`j{$ytuj&3k8MVUT>Aepk88;c_8`dFTv#y$el3lzVgvNS2kjI zs%ZJ_c8~kzygP~mLTts;1w)k~SU^Kms);{r8YnR&q@r`5!aaw6zJ9XjSC-q<;#jbI zX*P~b`q;1wpLymJ=nH8lMdFdX{?6xPtgScHrv1U|)B2vZ-gevjgM-N~rn--T*zb3f z!w~|6%HHcfaHoELg4F0w@Ah`&Jv?zFY<*2?YURwJf!VIp<5Q5f`Cwqa#66jRy;;og z2>6e)yZyXTyB$v!e>wVdlr07QNAdOF2)5JrGqHX|(dGh!Jx~(-ly-{|gDQU!Wqq7k zh);DTt3+=>vjym8}hKA!U>6;{`m2$;1p+k!{vS@Q)NQr5qat=tQ>`3BWT^ zVV~m}<85<3)WsubkLZ$gra?!pmI0g7(#SSx_&BXMOb5O`H&}pky3bMq5x$aT6x0`2 zSSG=jzEjTdhl~&Wu=*HeqDza4xwO~L!=h)YFZ8?xvG0iKj``-{@i8Mn0@C|8`?6ek zt#V>}%K2jKLuaE5w)QX!TR4H#d4>y<66RZ9iGs1!^0}<|q=Se4i>EwV^JXCh|L;2l z`rI0sQS5W(2FBcGvr`cXmR-@pk^xr`I_VwHRwBbQ#<+271Kd!8YW3jXPx`Ce#T*%x zA8XpD_CjkH=xtw+drO7Jvs#T|C&ojCVI7G3G%se=+8Efnn!lYI%z|bGWmVS3`l#sQ zg%<86hq%guG(WYJm@0?z*K2LnX2GL(37SrKxO9XtOfH&QEUS>0drx^9`Lxr%npSNm zC2N%UqqLl|t?Dem<)RH<3IBkJ=mXGoslk({dl8wU;M`7s4cj5e<_Xnu0%+oygdU67ybp~hd^#N6Zf;h{ond*7b~K!6b9pW zLSsI7kguD}{nZ#j(rH2#+B%`Qd^55KnzR}M4BngOAam28`^}bsedUS)C77O|KQ+X zg*fY1GSuRAj0;U)wM3L?L8RbrgV(ni$Z9BtBmISxW29r7eTV9X&D@gteSp5zZd&PiUbFsM~bMy2yeblzr&e<TI4{lLp`IXCt07z*+WeO3F=tz)W5Ctr}F~rkXp(6QF%c+mLivPL4^U z=(2>0h>JuV-WR{7q>BBss0(HYCK;|s4jgy$5&HUCz9N|!o>JiFr7h6?qTM`!kSF)p zYiv3s`D6?K>7*mr2h)B{7qJWMtDA=EEl+zKziOy*DNgf~4xB$WmTh^pym_E}58 z*-BJ2{@*V0NYlS$9@-ETmPRf80iV9u6})@7=D@)JoNLJb!+K?V^t8P^UU{9Uw)s|f zc0ml=ushE5CnBcE&LcWnL1U%b|9*3XXn#%jx4q_`j0&}Y<)4-Q(4*7)xqZlsXb16M z(?V-~pJkQ-5(M7Kg${$^y>_WZ3*zUdy}8Low$tL=_o%mKtbg%zVd{9*Os@cvlTRi4 z#IeNY3He~Eg6G(^lAIFq${IXMNs!OG(l%(4Gn8$<{B2wS<4XgziO-}nfPQd4IjO1m zMMnKj!{ER{(clpl0ZE`Z=IXV9dBqhd|2k}_8o?XTT{I);EtzmL<}ve$bV%tuI6O!$jyb}koMW!Z{`ZKQw4gkZ2s+5cXRT}JC- z%dtuc{cJ?~0T0$BG^nS39?30jHSwialB;F>sE^M#dOpQ|dK7DKk%WaM#w-#_&bZ$f z0-1>zSr)qM!uMP1@W8aRT1u}zsl~w;Jg_n!#FP6WiD8~yL&P5S`$TPP>NIjc&pLo1 z@rG28LGwp?mJ)+PbflgZ+@hw_9_`Hnt_K7?a&L3pCj3MI&CMspZxY|Z9xoR)cvBS5<9~)V2nwgtA59kY;3OO2v)(& zBKzL4cKf__gSBN3~t3*%ycNmrn zVMvjBa=h%X%xKeN8TN*?%w8!PjjGq|k7Bt!GQHix(PmjkC=9M9XmxL1s}Qz2)1*ww zHsuPuuPlQ#9*YC>sKJ>sr>D9o*UJK_Z40z=Xuv|psqDj7VWNP?@fZ72$)8>YggAwv zf0^(u;~6KOv(!ZGb?S0rjmyVl7T|Pyu%7wQyH&3{6gE`r75EC1n44bJ;LHWMqp(E6 znA?7)6-$S;?)_r`PiV9ZWN?Sye^P-^bU{Qt-!$R$FSx6kwBgmUm+W`2k(*hv39L4Z zZ67;G;$nN}ZA1&DcC*l{2{Wtu3D7t+&wkU}l)J}o8GP^mLd2>HUfOoMCLm{Zz>!q4 z`PlC|D#%1FlOA;00#^)bT+C-JHua(*20Gr=^hY(H3A~cYJ@d3=LVr#ES;zLl{6uuE z!(ur!p_)W5T&F>o(&P&WF`dI7R<|lz2|cOjKDw-!P-Z!7=QwJC47?()e6!NE>ORx6 zB{l5*bXw7|EqlO5Rla#BndV;hAisR%#Fvw0OEfEKRJ@hah7^JgNyX8B`cz35X%)C? z=cYAa=rDHrX6VBEFI<@iFH}PU?rb@%RL~isPid~=PnRA~b0+0unu~04jxO7MZ!TMA zCeTymqdo8|;IDD8e_7WPUrwmAuT1qq$q#m- z`*|3$oN!)PcKr6%l9ZB#2nEk5w#6dYTKwZ)aIEkz(=yTYz9o-tosglB3g(fn^KI0E zTHkl`Q!780KHH=)<(KumJ|aSuA9IH(2bHS2=?Ty}D-CQ|$c-i7GPx*dA+B6`M~F*R zhG{S{fQdhGy5X6x<;hBUArzG1dTe~?LFnb^un>6M{-%<95`n^_y9O8rE0wNe9z#XGO8Y6wuA~(piy6@QmYa{y5ziRfN_a~&1!$H^j z%NODscc3+oDv1eYtNyhk{XJv*wS(1^)kRoyC-%5cQ|&hnRSCN}!%=Kv{4t11l6Y#! zOhn^bn{TA|b+5I=2^-3#jGV+1|KL zQhkR>#l|@bDn-ko5E4xNeNW;>8mtl_%P?W(o$ms(1;tIkgToc;Qy?OIlIKQAVSZy&YsaE|_5%=X3(tMwY)IbR7}-5JKrpz-tlyC+9rk zirZE9xE#eI@64ZMjA09eG#J5ylQ1hJYiwN!<@v%w&E34PHSHqP6akRGSig1hO_XAI z(%*^i3=Ay7nrdAMb2-!sbn>conK|x+Zmvv5#gPGz8oIR@E4aji)eG;lCdc$&`Z(2T zigI_3YaIb=2vFebBqXP2kyPfEoDFhg!vrF;)~>Dx=7k|v>{sq1h^w)PS;=_Oe% z$Vu)tUK>zOCCpu%g9yftXfNNx-w3k^2b3JvHA|l~~7i(YlB|XN5a&_3xm4NH`c8Yd$ z;7X!YPKq|RS1!^w10nbQoCEkZ3TrcV&*mbcQGMw}!+_&}oZHNk!zA9xx~gK7)Y5D0 zo`%BC(o5i*eINAn5=$opVH&dK(9Ly=!?x!v%SYk*3ShtYnQJadBxtzj)#hO(w4U-q z{<2z>m=6208R5hb5^}}Xtp*UU-Lz~KMaz`$>3Kq2R#7O8NoVtX4!t$L3$`_7t6Y>U zNckqCc5y>61R-I}Q2OZ%wuYNp3~n;Lwh&Y(!)59-Z38t}^9d?H{%xs{?5k`7NvoF| zFCXudn&lr!V5{Rex3&fxC6=g524l+V=3*SVQx6@(gc2nm*AK<58rFGm zNO^^~Udo^y)14500oTvxI(6e`#u~BH0fjbja<~5e@08Bj-bIl(^e1E6=f1f!23)L4zqW}N^ literal 0 HcmV?d00001 diff --git a/dev/kafka/keystore/kafka.keystore.p12 b/dev/kafka/keystore/kafka.keystore.p12 new file mode 100644 index 0000000000000000000000000000000000000000..b454dbae1bd435614072908b446e1a23484a58ab GIT binary patch literal 5470 zcma)gWmFUZ(=NTl0!xF^B1rE7(jnaoNH<8g(%s!ir!?%+A>Flf2-1jjiAzcf*L&_g z?|1L7@B1-lW}ca6&is4M3^;Et?@xl6F!T(BuV0H}Bd_QxV z55Vwgn+q@AsRj+s7U>NMG&B|v8Y(>)h=u)sCZgeBqJU|zaLghVP_580P}wj*h^6Sa zmt$~e1)e$V{z=PiMYlN$nh(J9ryUY9LUGvvz6iJz>?oT=wi#E5rveLGyBsD8xt(@4u&f zE%@UyC~fVcaoJ2n zIul%;rkZApDA>SU?J4A>kJhii5|*Bv_@vag22Wx79_ZE&h`1aIBJ;8a7}(g|zITB9 zzA1Q6jvX61{z1k^8IZ4XVdIM7dl>lY8bx%(19o?Uz9Hyaqj#s=iNS(kHGWnbROTQg z)&J$lgo#-%YSVp(#32hA`ELLBgMx;me|!wr6U?%Oc6fE?ROU`R)E`*UDM1r2#Iky9y@ab!nR))BhBycBc>BxP8@lnbtnA;RpvR9gl2yE9o}qsh9XXtQeXj<+A~pEn$n#W}6@L_8xp1Z(Wg>jBwq}qC(i~z* zuJI~aFEm6T8rqI!{(jK<$WMDG_Q^huuX^29C|=;Od}I>&5zn(TUtyEy>F20sk=d%{D=VJ~^565OW zf>R~R(NBnGo%!I;611AfXH^I|53ph2-IsW0)7ie-fvt*tl&kDS7YBOnW@>VC&=?{g zSnR|O)Y9!ZwvNZvIGwJp)7kHDLzT*R57b?wi)Jfw@?0-Z$9kL0rC9K=hIxLi9OG=C z3TGZ2IzjVK@%<{FG^Y7lox_E)?*#_?6!*({@hKsLwLO(SK>U1=#M_N1W{YgLS}+|y32gHopsCGS;W zJ3vKK5h+b0Z{6956JpmA`?w2o5s~fZm$a|+PVd!#k6B3hLyd2K*m^!LyKWtHNFH^0 zdwN%@wsJ{#OIa=7-aQlDb>c!^nipiHp zcAIZ5eh|uOx4duX^I>$#;ICtlMRS)vz7@_YsGBF6+|s3Ba2JS>cDk=?BY(W5U4piD z8;)OK8E&~w!Gz=P3LJG-<-A>J@Vvh5Mw%OjMmF`GT!_n=KXw3tdc1((q$Z@@5v*ITQ4#imoOT6@qKF$tu_|`R$PyXF76qP7t%NfVVo` zBLn_s`PZ5BO>;|Mxn<={W;ry%ic-qHo5^@g`o!!Z+Gd?zYCFy>eN*C+o!@CN>kNwh zgOs7Cx9vbJ2#7PEgjXIqPD^Q&(3CSWFAZ#~WaY`|AMHRjBHLLNm)>t34RHBhe3mj@ zM8#w0J}aQlbk}6J&-aKPE-$R#3b`D%s=D?S>6rnR7A~i>q5x=nD6HzPNisZSk$GDO z+QM-TG)>X)c`8^^D#t0DH~Y9+uydl|&n`Fz2Pb|2`lFekz0|p8H)sW;Ik9V0zh8nZ zrLGA<@q=9P%l)pX@SwlYN0@Ycro6hv5wgrTO*vin;G=IoCtWP6eZk4RI6frx_qG%X zD`Uj>*#n}kNd-t;SlzyLrvXK)pqGZHV*1+Q&32HnMoGI~`(rBaOHr?8bC<$=Xjs;( z=2t(@OMNJvNpZG-NC$wi^SkRxrE-+>O=ulkn2S7CS?{<+Ke6GhH0*-a%4k0w_%@1{ zRErB-;QNql?2HRuF^N>|7Z0dhRMtYvd>mAmArO2F!ulpJpO_t!@f4$vus@JdH2DxF zxE}fobxmEf`j0VnpiBv3atED6!+(YJ%4S+OX|5$N#dvl zbO{P_FLN^*L*8bOF^_Ujc#2qi>q`?K&G4a+;>IVH{eFg9qwjgp`&T#Sz5?#RA3rpSS%NxuLlKzb#}CQVe1(J0~l8h`?JgKMxN- zpMVe_KR6V4{+~Gjtin*>_CM$c8Y;>^xAb2X)c;QGt8OvE&gg5>tZ&?@QD;j+b9!g6 z|0iNU_eGide1b1LD39DZlpAL`I234>fCdqldk8mT`-EX--bQ`03FqpO^=x?qdOy{r z-aERix5u2?iw&pY&h+~`5#T!GU#7fVsj7ixorM89cf8q8-!JVSIO#+3u>yQ9$<+aP z9R$Bp7gxd*M3PDc)0V_HsA5rh-Xh4|K|koMJN=5ws2rtX)3RL2&c7VTBZC!#)4FT> z1=&kIiIUPMyN2FE@mAVJO@CR_#-Endha}>oCwu#k^?iKoaPrjKk^9Zht`qTTRjl+&> zx89gjEZ*K<8Kr!alYA#;=S_%gmE*Zmn9j4qe&gr4%hJv~c>+ZTMIcCFQf})_$L;?7 z$cZ~+Sc(W(9i~4Him=5T=5c_vdxN5<_7MJD5-zLnA)Ug6}2X9F# zszUpP$^mL#PYzSA7J;z9%eKw?r)+CGBE_0=YhA*CCH1e4&D6eq_qq8^(z3<;pLF{a zXp?rXy0VjIsB?tZkRUp$*yt=$sHTk3EBYtaSAOV8Uz)2xHhUZQsYD5u??@VdtUYI~ zj6frfHJbT8g(tr_Rcbae1%LR`QDPufs?AY9Ib0g(_jmB;5szEvtawbLKf9b}@fnyM z1-3@vLg`GL5-Ka0te57Mvw2_U$roWKj70Qgh z`X;hY1o9q@x&Xf4vLPbeRsQPzDYKL6Kxjt>yWz0b9^vNC^lX!=vOC!yE*^zthVR@~ zr3B|&okEs@u%(lS{-W;OS>67|DF%R4YK1mc5|(GH(C1sd;ywf`rmi zf!=dFQOUjwtZwH&F;9F#;ciPorV8bVN)?=X20c9d2>SINP1WZ$x53#pi5+Zg4V^bO z?{UMWUA?@M;Yl=mxh@@d(`zq!IGDfbw%VZa2lH4jM@Noex+p{a)edBsdHrsyc3!8; z=ujr%(7($5p`j0Dk!YhIwxUp3V5{cvRX|2@B0n%q&Ap6*+A7($BFF1EL&#=|IUS|X zs3lWiU^SVk@xxan;_>3g&cHTgW+rf0Gp&{^6;P(k5=Wf6y4XMJbb31_Ek~hp@T=$j zqU4TBKh7B>VHTiS3s-@|YXhG2^q##0t1ISO7nXE9=mL&BoF;A=6`?$a&~S@e#6r%J z$W#QJxL|cl6Zy)We-^RxfdIDtI{zzkv$>xZN_5W~8dH77HHsWe8{{Tq!V%$T{kAOi zfdnh-h^+@3sa-`6)&{Cb9ev5Lh|X*@KPn^85#@E!a}vx~Uv4LVdA&wAVG{EL7npZL z#sBs~*_wzmH+aC#Ame6st(TP)$-~0mG?`l3 z-(do+e77d#ra@)UFMrxuQWj!lhwwyJg}dVr%BM}sj@QHF#~y2Id?&cEw}tEQAW6XU z-YzcLf`~Lx_W7MrNtWfucFFNkE(R=eDwYzmnJESIqkFsYhpLGi<}lU-hV|@0#Y|K;JeSSNDnJ3a0zYiy2G~Aj89q%HL8yydU(ZSZe)WiyqJ14 zC>G1KY`WDCpX!Dub{iLmi% z;(YS<=%zi?mlqS^sL9L3MZ4a(b=zlxx!Tw#!G5OS*a1Whp@YQnPoJx@GlIvsO47A# z!ufnU*`U~2`5L3)LhbNf&vay+p?~J&P+MJ#Trl@xe2Db7m$wH$MVdthM3hLVXCz_r zc5NaSd2)D#e1KM+3R7C@3SLY=6d7ljRLuOMCgxL3wy#XTi|~O&Xm;28>(TK&A%Vb1 zMm)VXS8`Y8ANz97%nsV2`~qG|MTxgIA^Evta4a4ojL(E$IK{f+d1%wd|ISR9@uLdf zj+n$X_OngDh_Y2hUhC&7vd#GDkXcxmmDn>#JCXS_+^Dc?Wd!5=dPYM7{NeeQi-asE zQlc^oR)wNxmIR^)N9`|5+{K4h^^VyWUrSUsFSJVRD2>+JE6nHV9;yWo$DL}6_p0Tb zKn~uTLVcMx_thNPtXfz}Q+B&|%-a02#gf;(OFQ?8n*vzL?1qH+9@cKT3oLJ~Bc3@z zif3fOsC86de)|c|;6Cot6KV)f|1PAF(5sSB#o0BeQ7J39(0|z{cIYaLxY+A*d6U;a zdVC0WnDw`FuhbL3HIfYWx^Fa?1)glpF_+~IRX`#hK zo}yP@ZT{q{q=EI~(%Px1VL~8a-D*VeUd@y%Md{Ge@Jdqqd7f;#Ts=UBnS~NY^fO1im`G!Xi0^td0_^DFdu{kHcfSd5#!RdE=bgC!jTgDp0#hPov-Zr5s zdbIb-#H*;Jm$G5uPmf_*c84F+NsTm--rveEVVx#(rqIqqbG?L$k$O`ptUH>D%AD*W za9B}b+~({w`X}CaD~S)JC^_HX%^x!Q;47o(wAH#?!EHP$W6obxN4+-qEVWdApl?%* zNiZ6~k;%vzh_D9Fatl|_#7`HAPBE8SpI;_gt-O0Cx|!n1n&Yr+%A{}ybI0`5Tlge+ z=QVZA*JWDlQRjfqUEbF@M%-GVU-Br(U;pcKF%%2@gzZpSh)yT{-nO4V%Js8}a-tVu z638(KrChUdR34)(k3Ju?(ET-xjyyGl=8YQ*PvHGyWG=sSrfv^sP*pD4eoP0j5|b)i8@{N-Tiae z#0bi!qw7sz4oh~ElGvxIH$q%iwCRKcIu7(;0$Rg_srK_1_* -export -alias CARoot -rfc" + + echo + echo "Now the trust store will be generated from the certificate." + echo + echo "You will be prompted for:" + echo " - the trust store's password (labeled 'keystore'). Remember this" + echo " - a confirmation that you want to import the certificate" + + keytool -keystore $TRUSTSTORE_WORKING_DIRECTORY/$DEFAULT_TRUSTSTORE_FILENAME \ + -alias CARoot -import -file $TRUSTSTORE_WORKING_DIRECTORY/$CA_CERT_FILE + + trust_store_file="$TRUSTSTORE_WORKING_DIRECTORY/$DEFAULT_TRUSTSTORE_FILENAME" + + echo + echo "$TRUSTSTORE_WORKING_DIRECTORY/$DEFAULT_TRUSTSTORE_FILENAME was created." + + # don't need the cert because it's in the trust store. + rm $TRUSTSTORE_WORKING_DIRECTORY/$CA_CERT_FILE +else + echo + echo -n "Enter the path of the trust store file. " + read -e trust_store_file + + if ! [ -f $trust_store_file ]; then + echo "$trust_store_file isn't a file. Exiting." + exit 1 + fi + + echo -n "Enter the path of the trust store's private key. " + read -e trust_store_private_key_file + + if ! [ -f $trust_store_private_key_file ]; then + echo "$trust_store_private_key_file isn't a file. Exiting." + exit 1 + fi +fi + +echo +echo "Continuing with:" +echo " - trust store file: $trust_store_file" +echo " - trust store private key: $trust_store_private_key_file" + +mkdir $KEYSTORE_WORKING_DIRECTORY + +echo +echo "Now, a keystore will be generated. Each broker and logical client needs its own" +echo "keystore. This script will create only one keystore. Run this script multiple" +echo "times for multiple keystores." +echo +echo "You will be prompted for the following:" +echo " - A keystore password. Remember it." +echo " - Personal information, such as your name." +echo " NOTE: currently in Kafka, the Common Name (CN) does not need to be the FQDN of" +echo " this host. However, at some point, this may change. As such, make the CN" +echo " the FQDN. Some operating systems call the CN prompt 'first / last name'" +echo " - A key password, for the key being generated within the keystore. Remember this." + +# To learn more about CNs and FQDNs, read: +# https://docs.oracle.com/javase/7/docs/api/javax/net/ssl/X509ExtendedTrustManager.html + +keytool -keystore $KEYSTORE_WORKING_DIRECTORY/$KEYSTORE_FILENAME \ + -alias localhost -validity $VALIDITY_IN_DAYS -genkey -keyalg RSA + +echo +echo "'$KEYSTORE_WORKING_DIRECTORY/$KEYSTORE_FILENAME' now contains a key pair and a" +echo "self-signed certificate. Again, this keystore can only be used for one broker or" +echo "one logical client. Other brokers or clients need to generate their own keystores." + +echo +echo "Fetching the certificate from the trust store and storing in $CA_CERT_FILE." +echo +echo "You will be prompted for the trust store's password (labeled 'keystore')" + +keytool -keystore $trust_store_file -export -alias CARoot -rfc -file $CA_CERT_FILE + +echo +echo "Now a certificate signing request will be made to the keystore." +echo +echo "You will be prompted for the keystore's password." +keytool -keystore $KEYSTORE_WORKING_DIRECTORY/$KEYSTORE_FILENAME -alias localhost \ + -certreq -file $KEYSTORE_SIGN_REQUEST + +echo +echo "Now the trust store's private key (CA) will sign the keystore's certificate." +echo +echo "You will be prompted for the trust store's private key password." +openssl x509 -req -CA $CA_CERT_FILE -CAkey $trust_store_private_key_file \ + -in $KEYSTORE_SIGN_REQUEST -out $KEYSTORE_SIGNED_CERT \ + -days $VALIDITY_IN_DAYS -CAcreateserial +# creates $KEYSTORE_SIGN_REQUEST_SRL which is never used or needed. + +echo +echo "Now the CA will be imported into the keystore." +echo +echo "You will be prompted for the keystore's password and a confirmation that you want to" +echo "import the certificate." +keytool -keystore $KEYSTORE_WORKING_DIRECTORY/$KEYSTORE_FILENAME -alias CARoot \ + -import -file $CA_CERT_FILE +rm $CA_CERT_FILE # delete the trust store cert because it's stored in the trust store. + +echo +echo "Now the keystore's signed certificate will be imported back into the keystore." +echo +echo "You will be prompted for the keystore's password." +keytool -keystore $KEYSTORE_WORKING_DIRECTORY/$KEYSTORE_FILENAME -alias localhost -import \ + -file $KEYSTORE_SIGNED_CERT + +echo +echo "All done!" +echo +echo "Delete intermediate files? They are:" +echo " - '$KEYSTORE_SIGN_REQUEST_SRL': CA serial number" +echo " - '$KEYSTORE_SIGN_REQUEST': the keystore's certificate signing request" +echo " (that was fulfilled)" +echo " - '$KEYSTORE_SIGNED_CERT': the keystore's certificate, signed by the CA, and stored back" +echo " into the keystore" +echo -n "Delete? [yn] " +read delete_intermediate_files + +if [ "$delete_intermediate_files" == "y" ]; then + rm $KEYSTORE_SIGN_REQUEST_SRL + rm $KEYSTORE_SIGN_REQUEST + rm $KEYSTORE_SIGNED_CERT +fi diff --git a/dev/kafka/truststore/ca-key b/dev/kafka/truststore/ca-key new file mode 100644 index 0000000..5658c2e --- /dev/null +++ b/dev/kafka/truststore/ca-key @@ -0,0 +1,30 @@ +-----BEGIN ENCRYPTED PRIVATE KEY----- +MIIFNTBfBgkqhkiG9w0BBQ0wUjAxBgkqhkiG9w0BBQwwJAQQkFO+uriKONsoGWzw +VIEOqgICCAAwDAYIKoZIhvcNAgkFADAdBglghkgBZQMEASoEENA181yKD9wf6Cef +Emn5DKMEggTQ+xvC/rgR4GNu11C4ecVNgmd1cN5k04VOQdgvbrvriXI5dN/f4RsY +SoZtsq2ZJGI1Dlxf5Jk0bDVByeJb6+xpLk8lAE0gtlIlxB6Aeh2zxTDjniK9mEXJ +HyDsPEVySTelkMe+qM8KVDrYf8K7vsWEJ8qtK6EMrlNGaMgcC42IMVkN9Xt1PhZL +YRkQ4FOCluweUBaTKfs9LLnxxMfLyqy5DRVfG0t872WZddRo5i00i5zz59Xk7MMz +CV144wkljFtdhqcNy6k84+67oDQsvPUxWAoer5KtdL6a9GsjvKA+ECstyg9BOnad +GUEgd/tfaRgdNM/Dyuhy5WeQCDsj5LH9NiTEirX77JVhTPVXN3faBEqjMNkasQsg +h7JUcw5Zvl3PIlRwnz9He11Kg/WwAOYsjYhDmWJV0BVJyUa5DkEDEHo8SA1kBy8n +DvROwJJBOisrk/ZCWviV55AQuI91980BMyfCEk8kJCTcL6Nt0i71oT+xipcEE8sz +7VvTquKY7Vzvrf7G7GeMdWcHnF63m2nQOH/0gvUS3Ffq6O1462QbwmdsxqPJBLqN +U3QsorVfAehwUGOQlucHz3QXy0rqRggsAiIGYXQw+RrCWwnUVaeFlmwUEHYd6970 +zA//Xiw+Fq/K/m6JVDwC+RvdqqFKNLK4dCZILHIaYjFKYSyJiGRxK1xHt3ov8e0C +hOaxq+9Nv9Y+KapNwd0dNazlpRdisY+IBm+kQIw5juVqYcfTVjSmV91KugzKb8Qj +JZppjEDIkva1U8yyZR6J4lp7ieEwGA8B08IBhla+Ml662p+UCB2NTISjtJb4wuFB +xdvUVJ+YbpPTy5D6taw510wSYaO5p8lNorsWegaFhm/Ii27DQhMFH0puMNHOialD +qBK8Tk9+fBGvrbk+b9lnIdHI8TafAPKibbziHAn/pzGaEVr8i6aCrtZJTgRM6KpQ +Q9dDaw/iRp8OZxHdhDz1MizX4EPx3OH8I8SVmRxQ7jcURIBBRRhf9sYaiaQm5XHd ++K1+EDwT7Jl85NJzf7EQL+Se8uRg4jjyiSrUAZUWpLg8KCLg6uh96dOlVbhyTrus +EIED7tWAkumjY3FkBMn2ceVJxO8A6i2sl3w80d3KvKGWfyI6qAw6s6syw2fsIEhp +dizjFnL86oXYYWi+Zx9MovOS4lVnVN2D5QDKomaUeTWBt4Z+vrW6dRV4wOKn/1eW +/CrOvoWT/o0tH0XFnARooqMOWcORWymGKuxR1PPyoLoMwzNJW8eVFhyd86D/m0Sd +clauwgUJ944V+5fpAQt1PMUtwsuLjM+LP9lwmlUp4dRHMvFStlkUwrXcv3pYEKBP +gg1wY/keN7nn5di3PqNCFpnIWYzcWuicVrBfRW1CD/T+zjoI7RAjRalkM0ug26/2 +2xDAcURzCuFhj3NuU8hu17lltVB0aUkEwPnpvk+1jL9t2pGvT1WObD3A2GFZ5rQO +zHkt7y4qHgUU0vP10sq42nduHhupO5s6fUsFUNg1Cc93D50hC7WleSC616wt4JW+ +bCvId5868lt+VnH4DRD/1o1bsLHZ9UtdHVENxV1R+Wfy6/PQq1ROM3MD1geD8F0u +le8oHWhjmoA1k2BdKJ/sdpYsJPlwEi/jDjM49eJmwTjTTQ5SWI9ERT0= +-----END ENCRYPTED PRIVATE KEY----- diff --git a/dev/kafka/truststore/kafka.truststore.jks b/dev/kafka/truststore/kafka.truststore.jks new file mode 100644 index 0000000000000000000000000000000000000000..3591721d9221215bf828f5bbadb258f08230a22d GIT binary patch literal 1254 zcmV&LNQU+thDZTr0|Wso1Q0BRL>xE@_|g<%wLB5Lww8c`1K@X*`NXME@6Cj&7+VwzfMG8& zIx?ZgT%N}6XD)zIvxhoU9~TsH*d!~*ExC%sgtkk zd;hu*z1u8J*L2-v$P?WE@;M?44$>&pXc}WQ&n(jMsu_&;9~kII5dSee$SGo{WLpeY zhm(H`9tHK+Bi{h_cu5Fc_*vvS-EdGnPaBlFoUhM7Wbq*cOhrSDJwA5SyLepX68V#^ z7cvYhC0or9(iYfIJs9ADBUApiVKm_w=)njb$HX-kS67zK-s5l)%Bjc$&*xzGLZnZ- z{F^f-9_1ejl!{8L<%I1x2g38+&m+nLvK7>ccsK=l**V+P#zSxniGA7eePc_1MPK8B zZ7k;!l?J9#70#~ONG|_*vpu0NKU4R+Svl1kdq}O`y3i^a7l&#yo;u?b#aVU^9MheUp%tiNVp(fOnds_9c)uVTZE$nrq6M6K)HW*f-4r;Cuhm5bm z-giGKdDLsk53nZs&Lf;GeK|1I2%5(eO;VzTdZ6A-0;8ygj=N+iEqzj*9?2#|1o)5W zn*Nks5!+P`H3j}f^VhOu0=8+^ACj0K)%WQ!y=_}OCEr7aZxX$M^eG;sy1l}rnxChJ zC-{MfazfKIE>tWojgJ4-0qv#A0aXwu(d51`y?XJKYC{WZ5p#DJWN!VnmZUT4QD+Fo+TqgFr0sz@ z1#-fvp1xo_MTlw=C*5Xqbl&n~vT;C{>Tk~Z`e81Q!u(ni@iZ0@S*V5n=C~#N^HUij zk(hd_(*ycVPl6y0bn~@2@vFdujsE`j9DARe+O3`v1|XYQy3;UCFflL<1_@w>NC9O7 z1OfpC00bawd?LZ&?Vii_T;eivFlvta=Zj} Source: ssl_config = None if 'ssl' in conf['kafka']: ssl_config = KafkaSSLConfig( - ca_location=conf['kafka']['ssl']['ca_location'], - certificate_location=conf['kafka']['ssl']['certificate_location'], - key_location=conf['kafka']['ssl']['key_location'], + ca_location=conf['kafka']['ssl'].get('ca_location'), + certificate_location=conf['kafka']['ssl'].get('certificate_location'), + key_location=conf['kafka']['ssl'].get('key_location'), + key_password=conf['kafka']['ssl'].get('key_password'), + endpoint_identification_algorithm=conf['kafka']['ssl'].get('endpoint_identification_algorithm'), ) sasl_config = None diff --git a/sqlflow/sources/__init__.py b/sqlflow/sources/__init__.py index 8fdc75c..237189d 100644 --- a/sqlflow/sources/__init__.py +++ b/sqlflow/sources/__init__.py @@ -23,6 +23,25 @@ def new_source_from_conf(source_conf: config.Source): kconf['sasl.username'] = source_conf.kafka.sasl.username kconf['sasl.password'] = source_conf.kafka.sasl.password + if source_conf.kafka.security_protocol == 'SSL': + kconf['security.protocol'] = 'SSL' + kconf['ssl.ca.location'] = source_conf.kafka.ssl.ca_location + kconf['ssl.certificate.location'] = source_conf.kafka.ssl.certificate_location + kconf['ssl.key.location'] = source_conf.kafka.ssl.key_location + kconf['ssl.key.password'] = source_conf.kafka.ssl.key_password + kconf['ssl.endpoint.identification.algorithm'] = source_conf.kafka.ssl.endpoint_identification_algorithm + + if source_conf.kafka.security_protocol == 'SASL_SSL': + kconf['security.protocol'] = 'SASL_SSL' + kconf['sasl.mechanism'] = source_conf.kafka.sasl.mechanism + kconf['sasl.username'] = source_conf.kafka.sasl.username + kconf['sasl.password'] = source_conf.kafka.sasl.password + kconf['ssl.ca.location'] = source_conf.kafka.ssl.ca_location + kconf['ssl.certificate.location'] = source_conf.kafka.ssl.certificate_location + kconf['ssl.key.location'] = source_conf.kafka.ssl.key_location + kconf['ssl.key.password'] = source_conf.kafka.ssl.key_password + kconf['ssl.endpoint.identification.algorithm'] = source_conf.kafka.ssl.endpoint_identification_algorithm + consumer = Consumer(kconf) return KafkaSource( From eae7011f5b749aa882adb0476abe14c6fcdf174d Mon Sep 17 00:00:00 2001 From: "turbolytics.io" Date: Sun, 22 Jun 2025 17:20:55 -0400 Subject: [PATCH 4/7] removes pdb --- cmd/publish-test-data.py | 23 +++++++++++++++++-- sqlflow/fixtures/__init__.py | 43 +++++++++++++++++++++++++++++++++++- sqlflow/sources/__init__.py | 1 - 3 files changed, 63 insertions(+), 4 deletions(-) diff --git a/cmd/publish-test-data.py b/cmd/publish-test-data.py index 04b92d2..7aa6d7c 100644 --- a/cmd/publish-test-data.py +++ b/cmd/publish-test-data.py @@ -11,12 +11,31 @@ @click.option('--num-messages', default=1001, type=int) @click.option('--topic', default='topic-1') @click.option('--fixture', default=None) -def main(num_messages, topic, fixture): +@click.option('--bootstrap-servers', default='localhost:9092', help='Kafka bootstrap servers') +@click.option('--security-protocol', default=None, help='Kafka security protocol') +@click.option('--sasl-mechanism', default=None, help='SASL mechanism') +@click.option('--sasl-username', default=None, help='SASL username') +@click.option('--sasl-password', default=None, help='SASL password') +@click.option('--ssl-ca-location', default=None, help='Path to CA certificate') +@click.option('--ssl-key-location', default=None, help='Path to client key') +@click.option('--ssl-certificate-location', default=None, help='Path to client certificate') +@click.option('--ssl-key-password', default=None, help='Password for client key') +@click.option('--ssl-endpoint-identification-algorithm', default='none', help='SSL endpoint identification algorithm') +def main(num_messages, topic, fixture, bootstrap_servers, security_protocol, sasl_mechanism, sasl_username, sasl_password, ssl_ca_location, ssl_key_location, ssl_certificate_location, ssl_key_password, ssl_endpoint_identification_algorithm): kf = KafkaFaker( - bootstrap_servers='localhost:9092', + bootstrap_servers=bootstrap_servers, num_messages=num_messages, topic=topic, fixture=fixture, + security_protocol=security_protocol, + sasl_mechanism=sasl_mechanism, + sasl_username=sasl_username, + sasl_password=sasl_password, + ssl_ca_location=ssl_ca_location, + ssl_key_location=ssl_key_location, + ssl_certificate_location=ssl_certificate_location, + ssl_key_password=ssl_key_password, + ssl_endpoint_identification_algorithm=ssl_endpoint_identification_algorithm ) kf.publish() diff --git a/sqlflow/fixtures/__init__.py b/sqlflow/fixtures/__init__.py index 1068512..d51ae81 100644 --- a/sqlflow/fixtures/__init__.py +++ b/sqlflow/fixtures/__init__.py @@ -32,11 +32,34 @@ class KafkaFaker: - def __init__(self, bootstrap_servers, num_messages, topic, fixture=None): + def __init__(self, + bootstrap_servers, + num_messages, + topic, + fixture=None, + security_protocol=None, + sasl_mechanism=None, + sasl_username=None, + sasl_password=None, + ssl_ca_location=None, + ssl_key_location=None, + ssl_certificate_location=None, + ssl_key_password=None, + ssl_endpoint_identification_algorithm=None + ): self.bootstrap_servers = bootstrap_servers self.num_messages = num_messages self.topic = topic self.fixture = fixture + self.security_protocol = security_protocol + self.sasl_mechanism = sasl_mechanism + self.sasl_username = sasl_username + self.sasl_password = sasl_password + self.ssl_ca_location = ssl_ca_location + self.ssl_key_location = ssl_key_location + self.ssl_certificate_location = ssl_certificate_location + self.ssl_key_password = ssl_key_password + self.ssl_endpoint_identification_algorithm = ssl_endpoint_identification_algorithm def publish(self): conf = { @@ -44,6 +67,24 @@ def publish(self): 'client.id': socket.gethostname() } + if self.security_protocol: + conf['security.protocol'] = self.security_protocol + if self.sasl_mechanism: + conf['sasl.mechanism'] = self.sasl_mechanism + if self.sasl_username and self.sasl_password: + conf['sasl.username'] = self.sasl_username + conf['sasl.password'] = self.sasl_password + if self.ssl_ca_location: + conf['ssl.ca.location'] = self.ssl_ca_location + if self.ssl_key_location: + conf['ssl.key.location'] = self.ssl_key_location + if self.ssl_certificate_location: + conf['ssl.certificate.location'] = self.ssl_certificate_location + if self.ssl_key_password: + conf['ssl.key.password'] = self.ssl_key_password + if self.ssl_endpoint_identification_algorithm: + conf['ssl.endpoint.identification.algorithm'] = self.ssl_endpoint_identification_algorithm + producer = Producer(conf) for i in range(self.num_messages): if self.fixture is None: diff --git a/sqlflow/sources/__init__.py b/sqlflow/sources/__init__.py index 237189d..96cf15f 100644 --- a/sqlflow/sources/__init__.py +++ b/sqlflow/sources/__init__.py @@ -15,7 +15,6 @@ def new_source_from_conf(source_conf: config.Source): 'auto.offset.reset': source_conf.kafka.auto_offset_reset, 'enable.auto.commit': False, } - import ipdb; ipdb.set_trace(); if source_conf.kafka.security_protocol == 'SASL_PLAINTEXT': kconf['security.protocol'] = 'SASL_PLAINTEXT' From ca2b84cfa7b2d6d7742df6499f54be90512f1b07 Mon Sep 17 00:00:00 2001 From: "turbolytics.io" Date: Sun, 22 Jun 2025 19:57:09 -0400 Subject: [PATCH 5/7] valide schema updated, and factorced the consumer create --- sqlflow/sources/__init__.py | 57 ++++++++++++++---------------- sqlflow/static/schemas/config.json | 54 ++++++++++++++++++++++++++++ tests/test_configs.py | 1 + 3 files changed, 81 insertions(+), 31 deletions(-) diff --git a/sqlflow/sources/__init__.py b/sqlflow/sources/__init__.py index 96cf15f..ec92075 100644 --- a/sqlflow/sources/__init__.py +++ b/sqlflow/sources/__init__.py @@ -7,41 +7,36 @@ from .webhook import WebhookSource, HMACConfig -def new_source_from_conf(source_conf: config.Source): - if source_conf.type == 'kafka': - kconf = { - 'bootstrap.servers': ','.join(source_conf.kafka.brokers), - 'group.id': source_conf.kafka.group_id, - 'auto.offset.reset': source_conf.kafka.auto_offset_reset, - 'enable.auto.commit': False, - } +def new_consumer_from_conf(conf: config.KafkaSource): + kconf = { + 'bootstrap.servers': ','.join(conf.brokers), + 'group.id': conf.group_id, + 'auto.offset.reset': conf.auto_offset_reset, + 'enable.auto.commit': False, + } + + if conf.security_protocol: + kconf['security.protocol'] = conf.security_protocol - if source_conf.kafka.security_protocol == 'SASL_PLAINTEXT': - kconf['security.protocol'] = 'SASL_PLAINTEXT' - kconf['sasl.mechanism'] = source_conf.kafka.sasl.mechanism - kconf['sasl.username'] = source_conf.kafka.sasl.username - kconf['sasl.password'] = source_conf.kafka.sasl.password + if conf.sasl: + kconf['sasl.mechanism'] = conf.sasl.mechanism + kconf['sasl.username'] = conf.sasl.username + kconf['sasl.password'] = conf.sasl.password - if source_conf.kafka.security_protocol == 'SSL': - kconf['security.protocol'] = 'SSL' - kconf['ssl.ca.location'] = source_conf.kafka.ssl.ca_location - kconf['ssl.certificate.location'] = source_conf.kafka.ssl.certificate_location - kconf['ssl.key.location'] = source_conf.kafka.ssl.key_location - kconf['ssl.key.password'] = source_conf.kafka.ssl.key_password - kconf['ssl.endpoint.identification.algorithm'] = source_conf.kafka.ssl.endpoint_identification_algorithm + if conf.ssl: + kconf['ssl.ca.location'] = conf.ssl.ca_location + kconf['ssl.certificate.location'] = conf.ssl.certificate_location + kconf['ssl.key.location'] = conf.ssl.key_location + kconf['ssl.key.password'] = conf.ssl.key_password + kconf['ssl.endpoint.identification.algorithm'] = conf.ssl.endpoint_identification_algorithm - if source_conf.kafka.security_protocol == 'SASL_SSL': - kconf['security.protocol'] = 'SASL_SSL' - kconf['sasl.mechanism'] = source_conf.kafka.sasl.mechanism - kconf['sasl.username'] = source_conf.kafka.sasl.username - kconf['sasl.password'] = source_conf.kafka.sasl.password - kconf['ssl.ca.location'] = source_conf.kafka.ssl.ca_location - kconf['ssl.certificate.location'] = source_conf.kafka.ssl.certificate_location - kconf['ssl.key.location'] = source_conf.kafka.ssl.key_location - kconf['ssl.key.password'] = source_conf.kafka.ssl.key_password - kconf['ssl.endpoint.identification.algorithm'] = source_conf.kafka.ssl.endpoint_identification_algorithm + consumer = Consumer(kconf) + return consumer - consumer = Consumer(kconf) + +def new_source_from_conf(source_conf: config.Source): + if source_conf.type == 'kafka': + consumer = new_consumer_from_conf(source_conf.kafka) return KafkaSource( consumer=consumer, diff --git a/sqlflow/static/schemas/config.json b/sqlflow/static/schemas/config.json index a602b5c..607d55d 100644 --- a/sqlflow/static/schemas/config.json +++ b/sqlflow/static/schemas/config.json @@ -278,6 +278,60 @@ "items": { "type": "string" } + }, + "security_protocol": { + "type": "string", + "enum": [ + "SASL_SSL", + "SSL", + "SASL_PLAINTEXT", + "PLAINTEXT" + ] + }, + "ssl": { + "type": "object", + "properties": { + "ca_location": { + "type": "string" + }, + "key_location": { + "type": "string" + }, + "certificate_location": { + "type": "string" + }, + "key_password": { + "type": "string" + }, + "endpoint_identification_algorithm": { + "type": "string" + } + } + }, + "sasl": { + "type": "object", + "properties": { + "mechanism": { + "type": "string", + "enum": [ + "PLAIN", + "SCRAM-SHA-256", + "SCRAM-SHA-512", + "GSSAPI" + ] + }, + "username": { + "type": "string" + }, + "password": { + "type": "string" + } + }, + "required": [ + "mechanism", + "username", + "password" + ] } }, "required": [ diff --git a/tests/test_configs.py b/tests/test_configs.py index 292d3f4..1c4579c 100644 --- a/tests/test_configs.py +++ b/tests/test_configs.py @@ -49,6 +49,7 @@ def setUp(self): 'kafka.mem.iceberg.yml', 'kafka.postgres.join.yml', 'kafka.postgres.sink.yml', + 'kafka.sasl-tls.yml', 'kafka.structured.disk.yml', 'kafka.structured.mem.yml', 'local.parquet.sink.yml', From 3ba5160dd08059b78cd4c915bb42c19a29e64f71 Mon Sep 17 00:00:00 2001 From: "turbolytics.io" Date: Mon, 23 Jun 2025 06:39:03 -0400 Subject: [PATCH 6/7] adds kafka auth to kafka sink --- sqlflow/config.py | 80 +++++++++++++++++++++++++++++++---------------- 1 file changed, 53 insertions(+), 27 deletions(-) diff --git a/sqlflow/config.py b/sqlflow/config.py index f22c6d9..0ceea5c 100644 --- a/sqlflow/config.py +++ b/sqlflow/config.py @@ -10,6 +10,22 @@ from sqlflow import settings, errors +@dataclass +class KafkaSSLConfig: + ca_location: str # Path to the CA certificate + certificate_location: str # Path to the client certificate + key_location: str # Path to the client private key + key_password: Optional[str] = None + endpoint_identification_algorithm: Optional[str] = None # e.g., https, none (default is https) + + +@dataclass +class KafkaSASLConfig: + mechanism: str # SASL mechanism (e.g., PLAIN, SCRAM-SHA-256) + username: str # SASL username + password: str # SASL password + + @dataclass class HMACConfig: header: str # Header name for the HMAC signature @@ -38,6 +54,9 @@ class IcebergSink: class KafkaSink: brokers: [str] topic: str + security_protocol: Optional[str] = None # Security protocol (e.g., PLAINTEXT, SSL, SASL_SSL) + ssl: Optional[KafkaSSLConfig] = None # SSL configuration + sasl: Optional[KafkaSASLConfig] = None # SASL configuration @dataclass @@ -122,21 +141,6 @@ class SQLCommand: sql: str -@dataclass -class KafkaSSLConfig: - ca_location: str # Path to the CA certificate - certificate_location: str # Path to the client certificate - key_location: str # Path to the client private key - key_password: Optional[str] = None - endpoint_identification_algorithm: Optional[str] = None # e.g., https, none (default is https) - - -@dataclass -class KafkaSASLConfig: - mechanism: str # SASL mechanism (e.g., PLAIN, SCRAM-SHA-256) - username: str # SASL username - password: str # SASL password - @dataclass class KafkaSource: @@ -272,26 +276,48 @@ def build_source_config_from_dict(conf) -> Source: return source -def build_sink_config_from_dict(conf) -> Sink: +def build_sink_config_from_dict(raw_conf) -> Sink: sink = Sink( - type=conf['type'], + type=raw_conf['type'], ) - if 'format' in conf: + if 'format' in raw_conf: sink.format = SinkFormat( - type=conf['format']['type'], + type=raw_conf['format']['type'], ) assert sink.format.type in ('parquet',), "unsupported format: {}".format(sink.format.type) if sink.type == 'kafka': + ssl_config = None + if 'ssl' in raw_conf['kafka']: + ssl_config = KafkaSSLConfig( + ca_location=raw_conf['kafka']['ssl'].get('ca_location'), + certificate_location=raw_conf['kafka']['ssl'].get('certificate_location'), + key_location=raw_conf['kafka']['ssl'].get('key_location'), + key_password=raw_conf['kafka']['ssl'].get('key_password'), + endpoint_identification_algorithm=raw_conf['kafka']['ssl'].get('endpoint_identification_algorithm'), + ) + + sasl_config = None + if 'sasl' in raw_conf['kafka']: + sasl_config = KafkaSASLConfig( + mechanism=raw_conf['kafka']['sasl']['mechanism'], + username=raw_conf['kafka']['sasl']['username'], + password=raw_conf['kafka']['sasl']['password'], + ) + sink.kafka = KafkaSink( - brokers=conf['kafka']['brokers'], - topic=conf['kafka']['topic'], + brokers=raw_conf['kafka']['brokers'], + topic=raw_conf['kafka']['topic'], + security_protocol=raw_conf['kafka'].get('security_protocol'), + ssl=ssl_config, + sasl=sasl_config, ) + elif sink.type == 'sqlcommand': sink.sqlcommand = SQLCommandSink( - sql=conf['sqlcommand']['sql'], - substitutions=[SQLCommandSubstitution(**s) for s in conf['sqlcommand'].get('substitutions', ())], + sql=raw_conf['sqlcommand']['sql'], + substitutions=[SQLCommandSubstitution(**s) for s in raw_conf['sqlcommand'].get('substitutions', ())], ) for substitution in sink.sqlcommand.substitutions: assert substitution.type in ('uuid4',), "unsupported substitution type: {}".format(substitution.type) @@ -299,13 +325,13 @@ def build_sink_config_from_dict(conf) -> Sink: pass elif sink.type == 'iceberg': sink.iceberg = IcebergSink( - catalog_name=conf['iceberg']['catalog_name'], - table_name=conf['iceberg']['table_name'], + catalog_name=raw_conf['iceberg']['catalog_name'], + table_name=raw_conf['iceberg']['table_name'], ) elif sink.type == 'clickhouse': sink.clickhouse = ClikhouseSink( - dsn=conf['clickhouse']['dsn'], - table=conf['clickhouse']['table'], + dsn=raw_conf['clickhouse']['dsn'], + table=raw_conf['clickhouse']['table'], ) else: sink.type = 'console' From b7769f3a7f2ae43f41e5cc31c578e1bbcc323709 Mon Sep 17 00:00:00 2001 From: "turbolytics.io" Date: Tue, 24 Jun 2025 06:50:09 -0400 Subject: [PATCH 7/7] kafka sink with auth --- benchmark/kafka-motherduck.sh | 2 ++ dev/config/examples/kafka.motherduck.yml | 16 ++++++------- dev/config/examples/kafka.sasl-tls.yml | 16 ++++++++++++- sqlflow/sinks.py | 30 ++++++++++++++++++++---- 4 files changed, 51 insertions(+), 13 deletions(-) diff --git a/benchmark/kafka-motherduck.sh b/benchmark/kafka-motherduck.sh index 4bc3fa9..87db9dd 100755 --- a/benchmark/kafka-motherduck.sh +++ b/benchmark/kafka-motherduck.sh @@ -5,6 +5,8 @@ echo "starting benchmark for kafka.motherduck.yml with $NUM_MESSAGES messages" # delete kafka topic if exists docker exec -it kafka1 kafka-topics --bootstrap-server localhost:9092 --delete --topic input-user-clicks-motherduck || true +docker exec -it kafka1 kafka-topics --bootstrap-server localhost:9092 --create --topic input-user-clicks-motherduck --partitions 10 --replication-factor 1 + # Publish benchmark data set python3 cmd/publish-test-data.py --num-messages=$NUM_MESSAGES --topic="input-user-clicks-motherduck" diff --git a/dev/config/examples/kafka.motherduck.yml b/dev/config/examples/kafka.motherduck.yml index 47792eb..4dd613e 100644 --- a/dev/config/examples/kafka.motherduck.yml +++ b/dev/config/examples/kafka.motherduck.yml @@ -20,15 +20,15 @@ pipeline: handler: type: 'handlers.InferredMemBatch' sql: | - INSERT INTO my_db.issues + INSERT INTO my_db.events SELECT - action, - issue ->> 'id' AS issue_id, - issue ->> 'title' AS issue_title, - issue ->> 'number' AS issue_number, - issue ->> 'user' ->> 'login' AS issue_user_login, - issue ->> 'repository' ->> 'name' AS issue_repo_name, - issue ->> 'repository' ->> 'id' AS issue_repo_id + ip, + event, + properties ->> 'city' AS properties_city, + properties ->> 'country' AS properties_country, + CAST(timestamp AS TIMESTAMP) AS timestamp, + type, + userId FROM batch; sink: diff --git a/dev/config/examples/kafka.sasl-tls.yml b/dev/config/examples/kafka.sasl-tls.yml index b25f64e..e870753 100644 --- a/dev/config/examples/kafka.sasl-tls.yml +++ b/dev/config/examples/kafka.sasl-tls.yml @@ -27,4 +27,18 @@ pipeline: sql: SELECT * FROM batch sink: - type: console + type: kafka + kafka: + brokers: [{{ kafka_brokers|default('localhost:9092') }}] + topic: output-sasl-tls-1 + security_protocol: SASL_SSL + ssl: + ca_location: {{ SQLFLOW_ROOT_DIR }}/dev/kafka/certs/ca-cert.pem + key_location: {{ SQLFLOW_ROOT_DIR }}/dev/kafka/certs/client-key.pem + certificate_location: {{ SQLFLOW_ROOT_DIR }}/dev/kafka/certs/client-cert.pem + key_password: testpass + endpoint_identification_algorithm: 'none' + sasl: + mechanism: PLAIN + username: user + password: bitnami diff --git a/sqlflow/sinks.py b/sqlflow/sinks.py index e1af1bb..c7330cb 100644 --- a/sqlflow/sinks.py +++ b/sqlflow/sinks.py @@ -187,12 +187,34 @@ def flush(self): pass +def new_producer_from_conf(conf): + producer_conf = { + 'bootstrap.servers': ','.join(conf.brokers), + 'client.id': socket.gethostname(), + } + + if conf.security_protocol: + producer_conf['security.protocol'] = conf.security_protocol + + if conf.sasl: + producer_conf['sasl.mechanism'] = conf.sasl.mechanism + producer_conf['sasl.username'] = conf.sasl.username + producer_conf['sasl.password'] = conf.sasl.password + + if conf.ssl: + producer_conf['ssl.ca.location'] = conf.ssl.ca_location + producer_conf['ssl.certificate.location'] = conf.ssl.certificate_location + producer_conf['ssl.key.location'] = conf.ssl.key_location + producer_conf['ssl.key.password'] = conf.ssl.key_password + producer_conf['ssl.endpoint.identification.algorithm'] = conf.ssl.endpoint_identification_algorithm + + return Producer(producer_conf) + + def new_sink_from_conf(sink_conf: config.Sink, conn) -> Sink: if sink_conf.type == 'kafka': - p = Producer({ - 'bootstrap.servers': ','.join(sink_conf.kafka.brokers), - 'client.id': socket.gethostname(), - }) + p = new_producer_from_conf(sink_conf.kafka) + return KafkaSink( topic=sink_conf.kafka.topic, producer=p,