diff --git a/dev/config/examples/kafka.ducklake.yml b/dev/config/examples/kafka.ducklake.yml new file mode 100644 index 0000000..e79a9d5 --- /dev/null +++ b/dev/config/examples/kafka.ducklake.yml @@ -0,0 +1,51 @@ +commands: + - name: install ducklake + sql: | + INSTALL ducklake; + LOAD ducklake; + + - name: attach to ducklake + sql: | + ATTACH 'ducklake:my_ducklake.ducklake' AS my_ducklake; + USE my_ducklake; + + - name: create events table if not exists + sql: | + CREATE TABLE IF NOT EXISTS my_ducklake.events ( + ip TEXT, + event TEXT, + properties_city TEXT, + properties_country TEXT, + timestamp TIMESTAMP, + type TEXT, + userId TEXT + ); + +pipeline: + batch_size: {{ SQLFLOW_BATCH_SIZE|default(1) }} + source: + type: kafka + kafka: + brokers: [{{ SQLFLOW_KAFKA_BROKERS|default('localhost:9092') }}] + group_id: motherduck-sink-1 + auto_offset_reset: earliest + topics: + - "input-user-clicks-ducklake" + + handler: + type: 'handlers.InferredMemBatch' + sql: | + INSERT INTO my_ducklake.events + SELECT + ip, + event, + properties ->> 'city' AS properties_city, + properties ->> 'country' AS properties_country, + CAST(timestamp AS TIMESTAMP) AS timestamp, + type, + userId + FROM batch; + + sink: + type: noop + diff --git a/requirements.txt b/requirements.txt index 8c88868..f92ea5e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,7 +12,7 @@ confluent-kafka==2.3.0 decorator==5.1.1 Deprecated==1.2.15 docker==7.1.0 -duckdb==1.2.2 +duckdb==1.3.1 executing==2.0.1 fastapi==0.115.13 filelock==3.16.1