From 28af75429c15b23d28e817428d99c8e1c7ab1431 Mon Sep 17 00:00:00 2001 From: "turbolytics.io" Date: Wed, 25 Jun 2025 07:58:58 -0400 Subject: [PATCH 1/2] Tutorial: How to stream to DuckLake refs #136 --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 8c88868..f92ea5e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,7 +12,7 @@ confluent-kafka==2.3.0 decorator==5.1.1 Deprecated==1.2.15 docker==7.1.0 -duckdb==1.2.2 +duckdb==1.3.1 executing==2.0.1 fastapi==0.115.13 filelock==3.16.1 From bd1b4e8a4b09ba6283becd514b30362cf093e0bc Mon Sep 17 00:00:00 2001 From: "turbolytics.io" Date: Wed, 25 Jun 2025 16:12:44 -0400 Subject: [PATCH 2/2] ducklake sink example --- dev/config/examples/kafka.ducklake.yml | 51 ++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) create mode 100644 dev/config/examples/kafka.ducklake.yml diff --git a/dev/config/examples/kafka.ducklake.yml b/dev/config/examples/kafka.ducklake.yml new file mode 100644 index 0000000..e79a9d5 --- /dev/null +++ b/dev/config/examples/kafka.ducklake.yml @@ -0,0 +1,51 @@ +commands: + - name: install ducklake + sql: | + INSTALL ducklake; + LOAD ducklake; + + - name: attach to ducklake + sql: | + ATTACH 'ducklake:my_ducklake.ducklake' AS my_ducklake; + USE my_ducklake; + + - name: create events table if not exists + sql: | + CREATE TABLE IF NOT EXISTS my_ducklake.events ( + ip TEXT, + event TEXT, + properties_city TEXT, + properties_country TEXT, + timestamp TIMESTAMP, + type TEXT, + userId TEXT + ); + +pipeline: + batch_size: {{ SQLFLOW_BATCH_SIZE|default(1) }} + source: + type: kafka + kafka: + brokers: [{{ SQLFLOW_KAFKA_BROKERS|default('localhost:9092') }}] + group_id: motherduck-sink-1 + auto_offset_reset: earliest + topics: + - "input-user-clicks-ducklake" + + handler: + type: 'handlers.InferredMemBatch' + sql: | + INSERT INTO my_ducklake.events + SELECT + ip, + event, + properties ->> 'city' AS properties_city, + properties ->> 'country' AS properties_country, + CAST(timestamp AS TIMESTAMP) AS timestamp, + type, + userId + FROM batch; + + sink: + type: noop +