diff --git a/Makefile b/Makefile index 0d1da41..6d0a16e 100644 --- a/Makefile +++ b/Makefile @@ -31,6 +31,10 @@ test-release: docker-image start-backing-services: docker-compose -f dev/kafka-single.yml up -d +.PHONY: stop-backing-services +stop-backing-services: + docker-compose -f dev/kafka-single.yml down + .PHONY: docker-image docker-image: @GIT_HASH=$$(git rev-parse --short HEAD) && \ diff --git a/cmd/turbine/main.go b/cmd/turbine/main.go new file mode 100644 index 0000000..ee295b6 --- /dev/null +++ b/cmd/turbine/main.go @@ -0,0 +1,9 @@ +package main + +import ( + "github.com/turbolytics/turbine/internal/cli" +) + +func main() { + cli.Execute() +} diff --git a/dev/config/examples/kafka.structured.mem.yml b/dev/config/examples/kafka.structured.mem.yml index 0f122e5..8429a7c 100644 --- a/dev/config/examples/kafka.structured.mem.yml +++ b/dev/config/examples/kafka.structured.mem.yml @@ -8,16 +8,16 @@ commands: ); pipeline: - batch_size: {{ SQLFLOW_BATCH_SIZE|default(1) }} + batch_size: {{ SQLFLOW_BATCH_SIZE|default:500 }} source: type: kafka kafka: - brokers: [{{ SQLFLOW_KAFKA_BROKERS|default('localhost:9092') }}] + brokers: [{{ SQLFLOW_KAFKA_BROKERS|default:'localhost:9092' }}] group_id: test auto_offset_reset: earliest topics: - - "input-structured-mem" + - "input-structured-mem-2" handler: type: "handlers.StructuredBatch" @@ -26,8 +26,9 @@ pipeline: sql: | SELECT properties.city as city, - 1 as city_count + COUNT(*) as count FROM source + GROUP BY properties.city; sink: type: console \ No newline at end of file diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..2372e55 --- /dev/null +++ b/go.mod @@ -0,0 +1,39 @@ +module github.com/turbolytics/turbine + +go 1.24.0 + +require ( + github.com/marcboeker/go-duckdb v1.8.5 + github.com/spf13/cobra v1.9.1 + github.com/zeebo/assert v1.3.0 + go.opentelemetry.io/otel/metric v1.35.0 + gopkg.in/yaml.v3 v3.0.1 +) + +require ( + github.com/apache/arrow-adbc/go/adbc v1.6.0 // indirect + github.com/apache/arrow-go/v18 v18.2.0 // indirect + github.com/confluentinc/confluent-kafka-go v1.9.2 // indirect + github.com/flosch/pongo2/v6 v6.0.0 // indirect + github.com/go-viper/mapstructure/v2 v2.2.1 // indirect + github.com/goccy/go-json v0.10.5 // indirect + github.com/google/flatbuffers v25.2.10+incompatible // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/inconshreveable/mousetrap v1.1.0 // indirect + github.com/klauspost/compress v1.18.0 // indirect + github.com/klauspost/cpuid/v2 v2.2.10 // indirect + github.com/pierrec/lz4/v4 v4.1.22 // indirect + github.com/rogpeppe/go-internal v1.9.0 // indirect + github.com/spf13/pflag v1.0.6 // indirect + github.com/zeebo/xxh3 v1.0.2 // indirect + go.opentelemetry.io/otel v1.35.0 // indirect + go.uber.org/multierr v1.10.0 // indirect + go.uber.org/zap v1.27.0 // indirect + golang.org/x/exp v0.0.0-20250305212735-054e65f0b394 // indirect + golang.org/x/mod v0.24.0 // indirect + golang.org/x/sync v0.13.0 // indirect + golang.org/x/sys v0.32.0 // indirect + golang.org/x/tools v0.32.0 // indirect + golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect + google.golang.org/protobuf v1.36.6 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..9e93f1f --- /dev/null +++ b/go.sum @@ -0,0 +1,313 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/actgardner/gogen-avro/v10 v10.1.0/go.mod h1:o+ybmVjEa27AAr35FRqU98DJu1fXES56uXniYFv4yDA= +github.com/actgardner/gogen-avro/v10 v10.2.1/go.mod h1:QUhjeHPchheYmMDni/Nx7VB0RsT/ee8YIgGY/xpEQgQ= +github.com/actgardner/gogen-avro/v9 v9.1.0/go.mod h1:nyTj6wPqDJoxM3qdnjcLv+EnMDSDFqE0qDpva2QRmKc= +github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA= +github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA= +github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= +github.com/apache/arrow-adbc/go/adbc v1.6.0 h1:QhmnpaVOra/zlPHNotTezt5EGzlYrYTSbJymipJInI8= +github.com/apache/arrow-adbc/go/adbc v1.6.0/go.mod h1:63Q8hs4o77b+YHSLxep5UYkC9+dXUdl0s+A8fR/RhFE= +github.com/apache/arrow-go/v18 v18.2.0 h1:QhWqpgZMKfWOniGPhbUxrHohWnooGURqL2R2Gg4SO1Q= +github.com/apache/arrow-go/v18 v18.2.0/go.mod h1:Ic/01WSwGJWRrdAZcxjBZ5hbApNJ28K96jGYaxzzGUc= +github.com/apache/thrift v0.21.0 h1:tdPmh/ptjE1IJnhbhrcl2++TauVjy242rkV/UzJChnE= +github.com/apache/thrift v0.21.0/go.mod h1:W1H8aR/QRtYNvrPeFXBtobyRkd0/YVhTc6i07XIAgDw= +github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= +github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= +github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= +github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= +github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XPnfI9Z40ddWsdw2W/uZgQLFXToKeRcDiI= +github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/confluentinc/confluent-kafka-go v1.9.2 h1:gV/GxhMBUb03tFWkN+7kdhg+zf+QUM+wVkI9zwh770Q= +github.com/confluentinc/confluent-kafka-go v1.9.2/go.mod h1:ptXNqsuDfYbAE/LBW6pnwWZElUoWxHoV8E43DCrliyo= +github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= +github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= +github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= +github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE= +github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/flosch/pongo2/v6 v6.0.0 h1:lsGru8IAzHgIAw6H2m4PCyleO58I40ow6apih0WprMU= +github.com/flosch/pongo2/v6 v6.0.0/go.mod h1:CuDpFm47R0uGGE7z13/tTlt1Y6zdxvr2RLT5LJhsHEU= +github.com/frankban/quicktest v1.2.2/go.mod h1:Qh/WofXFeiAFII1aEBu529AtJo6Zg2VHscnEsbBnJ20= +github.com/frankban/quicktest v1.7.2/go.mod h1:jaStnuzAqU1AJdCO0l53JDCJrVDKcS03DbaAcR7Ks/o= +github.com/frankban/quicktest v1.10.0/go.mod h1:ui7WezCLWMWxVWr1GETZY3smRy0G4KWq9vcPtJmFl7Y= +github.com/frankban/quicktest v1.14.0/go.mod h1:NeW+ay9A/U67EYXNFA1nPE8e/tnQv/09mUdL/ijj8og= +github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/go-viper/mapstructure/v2 v2.2.1 h1:ZAaOCxANMuZx5RCeg0mBdEZk7DZasvvZIxtHqx8aGss= +github.com/go-viper/mapstructure/v2 v2.2.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= +github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4= +github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= +github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/flatbuffers v25.2.10+incompatible h1:F3vclr7C3HpB1k9mxCGRMXq6FdUalZ6H/pNX4FP1v0Q= +github.com/google/flatbuffers v25.2.10+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/google/go-cmp v0.2.1-0.20190312032427-6f77996f0c42/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/pprof v0.0.0-20211008130755-947d60d73cc0/go.mod h1:KgnwoLYCZ8IQu3XUZ8Nc/bM9CCZFOyjUNOSygVozoDg= +github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= +github.com/hamba/avro v1.5.6/go.mod h1:3vNT0RLXXpFm2Tb/5KC71ZRJlOroggq1Rcitb6k4Fr8= +github.com/heetch/avro v0.3.1/go.mod h1:4xn38Oz/+hiEUTpbVfGVLfvOg0yKLlRP7Q9+gJJILgA= +github.com/iancoleman/orderedmap v0.0.0-20190318233801-ac98e3ecb4b0/go.mod h1:N0Wam8K1arqPXNWjMo21EXnBPOPp36vB07FNRdD2geA= +github.com/ianlancetaylor/demangle v0.0.0-20210905161508-09a460cdf81d/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w= +github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= +github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= +github.com/invopop/jsonschema v0.4.0/go.mod h1:O9uiLokuu0+MGFlyiaqtWxwqJm41/+8Nj0lD7A36YH0= +github.com/jhump/gopoet v0.0.0-20190322174617-17282ff210b3/go.mod h1:me9yfT6IJSlOL3FCfrg+L6yzUEZ+5jW6WHt4Sk+UPUI= +github.com/jhump/gopoet v0.1.0/go.mod h1:me9yfT6IJSlOL3FCfrg+L6yzUEZ+5jW6WHt4Sk+UPUI= +github.com/jhump/goprotoc v0.5.0/go.mod h1:VrbvcYrQOrTi3i0Vf+m+oqQWk9l72mjkJCYo7UvLHRQ= +github.com/jhump/protoreflect v1.11.0/go.mod h1:U7aMIjN0NWq9swDP7xDdoMfRHb35uiuTd3Z9nFXJf5E= +github.com/jhump/protoreflect v1.12.0/go.mod h1:JytZfP5d0r8pVNLZvai7U/MCuTWITgrI4tTg7puQFKI= +github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= +github.com/juju/qthttptest v0.1.1/go.mod h1:aTlAv8TYaflIiTDIQYzxnl1QdPjAg8Q8qJMErpKy6A4= +github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= +github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4= +github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/klauspost/cpuid/v2 v2.2.10 h1:tBs3QSyvjDyFTq3uoc/9xFpCuOsJQFNPiAhYdw2skhE= +github.com/klauspost/cpuid/v2 v2.2.10/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/linkedin/goavro v2.1.0+incompatible/go.mod h1:bBCwI2eGYpUI/4820s67MElg9tdeLbINjLjiM2xZFYM= +github.com/linkedin/goavro/v2 v2.10.0/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA= +github.com/linkedin/goavro/v2 v2.10.1/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA= +github.com/linkedin/goavro/v2 v2.11.1/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA= +github.com/marcboeker/go-duckdb v1.8.5 h1:tkYp+TANippy0DaIOP5OEfBEwbUINqiFqgwMQ44jME0= +github.com/marcboeker/go-duckdb v1.8.5/go.mod h1:6mK7+WQE4P4u5AFLvVBmhFxY5fvhymFptghgJX6B+/8= +github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= +github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY= +github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI= +github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/nrwiersma/avro-benchmarks v0.0.0-20210913175520-21aec48c8f76/go.mod h1:iKyFMidsk/sVYONJRE372sJuX/QTRPacU7imPqqsu7g= +github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= +github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/rogpeppe/clock v0.0.0-20190514195947-2896927a307a/go.mod h1:4r5QyqhjIWCcK8DO4KMclc5Iknq5qVBAlbYYzAbUScQ= +github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= +github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= +github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE= +github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/santhosh-tekuri/jsonschema/v5 v5.0.0/go.mod h1:FKdcjfQW6rpZSnxxUvEA5H/cDPdvJ/SZJQLWWXWGrZ0= +github.com/spf13/cobra v1.9.1 h1:CXSaggrXdbHK9CF+8ywj8Amf7PBRmPCOJugH954Nnlo= +github.com/spf13/cobra v1.9.1/go.mod h1:nDyEzZ8ogv936Cinf6g1RU9MRY64Ir93oCnqb9wxYW0= +github.com/spf13/pflag v1.0.6 h1:jFzHGLGAlb3ruxLB8MhbI6A8+AQX/2eW4qeyNZXNp2o= +github.com/spf13/pflag v1.0.6/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.3.1-0.20190311161405-34c6fa2dc709/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= +github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= +github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= +github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/otel v1.35.0 h1:xKWKPxrxB6OtMCbmMY021CqC45J+3Onta9MqjhnusiQ= +go.opentelemetry.io/otel v1.35.0/go.mod h1:UEqy8Zp11hpkUrL73gSlELM0DupHoiq72dR+Zqel/+Y= +go.opentelemetry.io/otel/metric v1.35.0 h1:0znxYu2SNyuMSQT4Y9WDWej0VpcsxkuklLa4/siN90M= +go.opentelemetry.io/otel/metric v1.35.0/go.mod h1:nKVFgxBZ2fReX6IlyW28MgZojkoAkJGaE8CpgeAU3oE= +go.opentelemetry.io/otel/trace v1.35.0 h1:dPpEfJu1sDIqruz7BHFG3c7528f6ddfSWfFDVt/xgMs= +go.opentelemetry.io/otel/trace v1.35.0/go.mod h1:WUk7DtFp1Aw2MkvqGdwiXYDZZNvA/1J8o6xRXLrIkyc= +go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= +go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= +go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c h1:KL/ZBHXgKGVmuZBZ01Lt57yE5ws8ZPSkkihmEyq7FXc= +golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c/go.mod h1:tujkw807nyEEAamNbDrEGzRav+ilXA7PCRAd6xsmwiU= +golang.org/x/exp v0.0.0-20250305212735-054e65f0b394 h1:nDVHiLt8aIbd/VzvPWN6kSOPE7+F/fNFDSXLVYkE/Iw= +golang.org/x/exp v0.0.0-20250305212735-054e65f0b394/go.mod h1:sIifuuw/Yco/y6yb6+bDNfyeQ/MdPUy/hKEMYQV17cM= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.23.0 h1:Zb7khfcRGKk+kqfxFaP5tZqCnDZMjC5VtUBs87Hr6QM= +golang.org/x/mod v0.23.0/go.mod h1:6SkKJ3Xj0I0BrPOZoBy3bdMptDDU9oJrpohJ3eWZ1fY= +golang.org/x/mod v0.24.0 h1:ZfthKaKaT4NrhGVZHO1/WDTwGES4De8KtWO0SIbNJMU= +golang.org/x/mod v0.24.0/go.mod h1:IXM97Txy2VM4PJ3gI61r1YEk/gAj6zAHN3AdZt6S9Ww= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200505041828-1ed23360d12c/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= +golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w= +golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.13.0 h1:AauUjRAJ9OSnvULf/ARrrVywoJDy0YS2AwQ98I37610= +golang.org/x/sync v0.13.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik= +golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20= +golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200505023115-26f46d2f7ef8/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.30.0 h1:BgcpHewrV5AUp2G9MebG4XPFI1E2W41zU1SaqVA9vJY= +golang.org/x/tools v0.30.0/go.mod h1:c347cR/OJfw5TI+GfX7RUPNMdDRRbjvYTS0jPyvsVtY= +golang.org/x/tools v0.32.0 h1:Q7N1vhpkQv7ybVzLFtTjvQya2ewbwNDZzUgfXGqtMWU= +golang.org/x/tools v0.32.0/go.mod h1:ZxrU41P/wAbZD8EDa6dDCa6XfpkhJ7HFMjHJXfBDu8s= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da h1:noIWHXmPHxILtqtCOPIhSt0ABwskkZKjD3bXGnZGpNY= +golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90= +gonum.org/v1/gonum v0.15.1 h1:FNy7N6OUZVUaWG9pTiD+jlhdQ3lMP+/LcTpJ6+a8sQ0= +gonum.org/v1/gonum v0.15.1/go.mod h1:eZTZuRFrzu5pcyjN5wJhcIhnUdNijYxX1T2IcrOGY0o= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= +google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= +google.golang.org/genproto v0.0.0-20220503193339-ba3ae3f07e29/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= +google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0= +google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= +google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= +google.golang.org/grpc v1.46.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= +google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= +gopkg.in/avro.v0 v0.0.0-20171217001914-a730b5802183/go.mod h1:FvqrFXt+jCsyQibeRv4xxEJBL5iG2DDW5aeJwzDiq4A= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/errgo.v1 v1.0.0/go.mod h1:CxwszS/Xz1C49Ucd2i6Zil5UToP1EmyrFhKaMVbg1mk= +gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= +gopkg.in/httprequest.v1 v1.2.1/go.mod h1:x2Otw96yda5+8+6ZeWwHIJTFkEHWP/qP8pJOzqEtWPM= +gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA= +gopkg.in/retry.v1 v1.0.3/go.mod h1:FJkXmWiMaAo7xB+xhvDF59zhfjDWyzmyAxiT4dB688g= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/internal/cli/config.go b/internal/cli/config.go new file mode 100644 index 0000000..7f1e458 --- /dev/null +++ b/internal/cli/config.go @@ -0,0 +1 @@ +package cli diff --git a/internal/cli/dev.go b/internal/cli/dev.go new file mode 100644 index 0000000..7f1e458 --- /dev/null +++ b/internal/cli/dev.go @@ -0,0 +1 @@ +package cli diff --git a/internal/cli/root.go b/internal/cli/root.go new file mode 100644 index 0000000..b13c8ad --- /dev/null +++ b/internal/cli/root.go @@ -0,0 +1,34 @@ +package cli + +import ( + "fmt" + "github.com/spf13/cobra" + "github.com/turbolytics/turbine/internal/cli/run" + "github.com/turbolytics/turbine/internal/cli/tail" + "os" +) + +func NewRootCommand() *cobra.Command { + var cmd = &cobra.Command{ + Use: "turbine", + Short: "", + Long: ``, + // The run function is called when the command is executed + Run: func(cmd *cobra.Command, args []string) { + fmt.Println("Welcome to turbine!") + }, + } + + cmd.AddCommand(run.NewCommand()) + cmd.AddCommand(tail.NewCommand()) + + return cmd +} + +func Execute() { + cmd := NewRootCommand() + if err := cmd.Execute(); err != nil { + fmt.Println(err) + os.Exit(1) + } +} diff --git a/internal/cli/run/root.go b/internal/cli/run/root.go new file mode 100644 index 0000000..988473d --- /dev/null +++ b/internal/cli/run/root.go @@ -0,0 +1,125 @@ +package run + +import ( + "context" + "fmt" + "github.com/apache/arrow-adbc/go/adbc/drivermgr" // Import the driver manager + "github.com/spf13/cobra" + "github.com/turbolytics/turbine/internal/handlers" + "github.com/turbolytics/turbine/internal/sinks" + "github.com/turbolytics/turbine/internal/sources" + "go.uber.org/zap" + "net/http" + _ "net/http/pprof" + "sync" + "time" + + "github.com/turbolytics/turbine/internal/config" + "github.com/turbolytics/turbine/internal/core" +) + +func NewCommand() *cobra.Command { + var configPath string + + var cmd = &cobra.Command{ + Use: "run", + Short: "Run turbine against a stream of data", + RunE: func(cmd *cobra.Command, args []string) error { + logger, _ := zap.NewDevelopment() + defer logger.Sync() + l := logger.Named("turbine.run") + + // Start pprof server + go func() { + l.Info("starting pprof server on :6060") + if err := http.ListenAndServe(":6060", nil); err != nil { + l.Error("failed to start pprof server", zap.Error(err)) + } + }() + + conf, err := config.Load(configPath, map[string]string{}) + if err != nil { + return fmt.Errorf("failed to load config: %w", err) + } + + // Initialize ADBC connection using driver manager + var drv drivermgr.Driver + db, err := drv.NewDatabase(map[string]string{ + "driver": "/opt/homebrew/lib/libduckdb.dylib", + "entrypoint": "duckdb_adbc_init", + }) + if err != nil { + return fmt.Errorf("failed to initialize DuckDB driver: %w", err) + } + + conn, err := db.Open(context.Background()) + if err != nil { + return fmt.Errorf("failed to open DuckDB connection: %w", err) + } + defer func() { + if err := conn.Close(); err != nil { + l.Error("failed to close DuckDB connection", zap.Error(err)) + } + }() + + // Initialize commands + if err := core.InitCommands(conn, conf); err != nil { + return fmt.Errorf("failed to initialize commands: %w", err) + } + + src, err := sources.New( + conf.Pipeline.Source, + logger, + ) + if err != nil { + return fmt.Errorf("failed to create source: %w", err) + } + + sink, err := sinks.New(conf.Pipeline.Sink) + if err != nil { + return fmt.Errorf("failed to create sink: %w", err) + } + + handler, err := handlers.New( + conn, + conf.Pipeline.Handler, + logger, + ) + if err != nil { + return fmt.Errorf("failed to create handler: %w", err) + } + + lock := &sync.Mutex{} + turbine := core.NewTurbine( + src, + handler, + sink, + conf.Pipeline.BatchSize, + time.Duration(conf.Pipeline.FlushIntervalSeconds)*time.Second, + lock, + core.PipelineErrorPolicies{ + // Source: conf.Pipeline.Source.Error.Policy, + }, + core.WithTurbineLogger(l), + ) + + go func() { + if err := turbine.StatusLoop(context.Background()); err != nil { + l.Error("failed to start status loop", zap.Error(err)) + } + }() + + _, err = turbine.ConsumeLoop(context.Background(), 0) + if err != nil { + l.Error("failed to consume loop", zap.Error(err)) + return err + } + return nil + }, + } + + cmd.Flags().StringVarP(&configPath, "config", "c", "", "Path to turbine config file (required)") + cmd.MarkFlagRequired("config") + + return cmd +} diff --git a/internal/cli/tail/root.go b/internal/cli/tail/root.go new file mode 100644 index 0000000..4969531 --- /dev/null +++ b/internal/cli/tail/root.go @@ -0,0 +1,80 @@ +package tail + +import ( + "fmt" + "github.com/spf13/cobra" + "github.com/turbolytics/turbine/internal/config" + "github.com/turbolytics/turbine/internal/sources" + "go.uber.org/zap" + "time" +) + +func NewCommand() *cobra.Command { + var configPath string + + var cmd = &cobra.Command{ + Use: "tail", + Short: "Tail a stream of data", + RunE: func(cmd *cobra.Command, args []string) error { + logger, _ := zap.NewDevelopment() + defer logger.Sync() + l := logger.Named("turbine.tail") + + conf, err := config.Load(configPath, map[string]string{}) + if err != nil { + return fmt.Errorf("failed to load config: %w", err) + } + + src, err := sources.New(conf.Pipeline.Source, logger) + if err != nil { + return fmt.Errorf("failed to create source: %w", err) + } + + if err := src.Start(); err != nil { + return fmt.Errorf("failed to start source: %w", err) + } + + stream := src.Stream() + defer func() { + if err := src.Close(); err != nil { + l.Error("failed to close source", zap.Error(err)) + } + }() + + // Status loop to display total messages processed every 5 seconds + totalMessages := 0 + done := make(chan struct{}) + go func() { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + for { + select { + case <-ticker.C: + l.Info("status update", zap.Int("totalMessages", totalMessages)) + case <-done: + return + } + } + }() + + // Read all messages from the source + for msg := range stream { + totalMessages++ + fmt.Printf("Message: %s\n", string(msg.Value())) + if err := src.Commit(); err != nil { + l.Error("failed to commit message", zap.Error(err)) + return err + } + } + + // Signal the status loop to stop + close(done) + return nil + }, + } + + cmd.Flags().StringVarP(&configPath, "config", "c", "", "Path to turbine config file (required)") + cmd.MarkFlagRequired("config") + return cmd + +} diff --git a/internal/config/config.go b/internal/config/config.go new file mode 100644 index 0000000..28431ec --- /dev/null +++ b/internal/config/config.go @@ -0,0 +1,139 @@ +package config + +type ErrorPolicy string + +const ( + PolicyRaise ErrorPolicy = "RAISE" + PolicyIgnore ErrorPolicy = "IGNORE" +) + +// Error +type Error struct { + Policy ErrorPolicy `yaml:"policy"` +} + +// SinkFormat +type SinkFormat struct { + Type string `yaml:"type"` +} + +// Various Sink Configs +type IcebergSink struct { + CatalogName string `yaml:"catalog_name"` + TableName string `yaml:"table_name"` +} + +type KafkaSink struct { + Brokers []string `yaml:"brokers"` + Topic string `yaml:"topic"` +} + +type ConsoleSink struct{} + +type SQLCommandSubstitution struct { + Var string `yaml:"var"` + Type string `yaml:"type"` +} + +type SQLCommandSink struct { + SQL string `yaml:"sql"` + Substitutions []SQLCommandSubstitution `yaml:"substitutions,omitempty"` +} + +type ClickhouseSink struct { + DSN string `yaml:"dsn"` + Table string `yaml:"table"` +} + +// Unified Sink +type Sink struct { + Type string `yaml:"type"` + Format *SinkFormat `yaml:"format,omitempty"` + Kafka *KafkaSink `yaml:"kafka,omitempty"` + Console *ConsoleSink `yaml:"console,omitempty"` + SQLCommand *SQLCommandSink `yaml:"sqlcommand,omitempty"` + Iceberg *IcebergSink `yaml:"iceberg,omitempty"` + Clickhouse *ClickhouseSink `yaml:"clickhouse,omitempty"` +} + +// Tumbling Window Manager +type TumblingWindow struct { + CollectSQL string `yaml:"collect_closed_windows_sql"` + DeleteSQL string `yaml:"delete_closed_windows_sql"` + PollIntervalSecs int `yaml:"poll_interval_seconds"` +} + +// Table Manager +type TableManager struct { + TumblingWindow *TumblingWindow `yaml:"tumbling_window"` + Sink Sink `yaml:"sink"` +} + +// SQL Tables +type TableSQL struct { + Name string `yaml:"name"` + SQL string `yaml:"sql"` + Manager *TableManager `yaml:"manager,omitempty"` +} + +// Tables +type Tables struct { + SQL []TableSQL `yaml:"sql"` +} + +// UDFs +type UDF struct { + FunctionName string `yaml:"function_name"` + ImportPath string `yaml:"import_path"` +} + +// SQL Command +type SQLCommand struct { + Name string `yaml:"name"` + SQL string `yaml:"sql"` +} + +// Source Types +type KafkaSource struct { + Brokers []string `yaml:"brokers"` + GroupID string `yaml:"group_id"` + AutoOffsetReset string `yaml:"auto_offset_reset"` + Topics []string `yaml:"topics"` +} + +type WebsocketSource struct { + URI string `yaml:"uri"` +} + +// Source +type Source struct { + Type string `yaml:"type"` + Kafka *KafkaSource `yaml:"kafka,omitempty"` + Websocket *WebsocketSource `yaml:"websocket,omitempty"` + Error *Error `yaml:"error,omitempty"` +} + +// Handler +type Handler struct { + Type string `yaml:"type"` + SQL string `yaml:"sql"` + SQLResultsCacheDir string `yaml:"sql_results_cache_dir,omitempty"` + Table string `yaml:"table,omitempty"` +} + +// Pipeline +type Pipeline struct { + Source Source `yaml:"source"` + Handler Handler `yaml:"handler"` + Sink Sink `yaml:"sink"` + BatchSize int `yaml:"batch_size,omitempty"` + FlushIntervalSeconds int `yaml:"flush_interval_seconds,omitempty"` +} + +// Conf +type Conf struct { + Pipeline Pipeline `yaml:"pipeline"` + Tables *Tables `yaml:"tables,omitempty"` + UDFs []UDF `yaml:"udfs,omitempty"` + Commands []SQLCommand `yaml:"commands,omitempty"` +} diff --git a/internal/config/load.go b/internal/config/load.go new file mode 100644 index 0000000..69425fb --- /dev/null +++ b/internal/config/load.go @@ -0,0 +1,58 @@ +package config + +import ( + "fmt" + "github.com/flosch/pongo2/v6" + "gopkg.in/yaml.v3" + "os" + "strings" +) + +func RenderTemplate(path string, overrides map[string]string) ([]byte, error) { + /* + raw, err := os.ReadFile(path) + if err != nil { + return nil, err + } + */ + + tmpl := pongo2.Must(pongo2.FromFile(path)) + + /* + tmpl, err := template.New("config").Parse(string(raw)) + if err != nil { + return nil, err + } + */ + + vars := pongo2.Context{} + for _, v := range os.Environ() { + parts := strings.SplitN(v, "=", 2) + if len(parts) == 2 && strings.HasPrefix(parts[0], "SQLFLOW_") { + vars[parts[0]] = parts[1] + } + } + for k, v := range overrides { + vars[k] = v + } + + out, err := tmpl.Execute(vars) + if err != nil { + return nil, fmt.Errorf("rendering template failed: %w", err) + } + + return []byte(out), nil +} + +func Load(path string, overrides map[string]string) (*Conf, error) { + rendered, err := RenderTemplate(path, overrides) + if err != nil { + return nil, fmt.Errorf("rendering config failed: %w", err) + } + + var conf Conf + if err := yaml.Unmarshal(rendered, &conf); err != nil { + return nil, fmt.Errorf("parsing YAML failed: %w", err) + } + return &conf, nil +} diff --git a/internal/config/load_test.go b/internal/config/load_test.go new file mode 100644 index 0000000..d912156 --- /dev/null +++ b/internal/config/load_test.go @@ -0,0 +1 @@ +package config diff --git a/internal/core/init.go b/internal/core/init.go new file mode 100644 index 0000000..53b726d --- /dev/null +++ b/internal/core/init.go @@ -0,0 +1,36 @@ +package core + +import ( + "context" + "github.com/apache/arrow-adbc/go/adbc" + "github.com/turbolytics/turbine/internal/config" + "go.uber.org/zap" +) + +var logger *zap.Logger + +func init() { + logger, _ = zap.NewDevelopment() +} + +func InitCommands(conn adbc.Connection, c *config.Conf) error { + for _, command := range c.Commands { + logger.Info("Executing command step", zap.String("name", command.Name)) + + stmt, err := conn.NewStatement() + if err != nil { + return err + } + + if err := stmt.SetSqlQuery(command.SQL); err != nil { + return err + } + + _, _, err = stmt.ExecuteQuery(context.Background()) + if err != nil { + return err + } + stmt.Close() + } + return nil +} diff --git a/internal/core/turbine.go b/internal/core/turbine.go new file mode 100644 index 0000000..3a158da --- /dev/null +++ b/internal/core/turbine.go @@ -0,0 +1,297 @@ +package core + +import ( + "context" + "github.com/apache/arrow-go/v18/arrow" + "go.uber.org/zap" + "log" + "sync" + "time" + + "go.opentelemetry.io/otel/metric" +) + +type Source interface { + Start() error + Stream() <-chan Message + Commit() error + Close() error +} + +type Sink interface { + WriteTable(batch arrow.Table) error + Flush() error + Batch() (arrow.Table, error) +} + +type Message interface { + Value() []byte +} + +type Handler interface { + Init(ctx context.Context) error + Write(msg []byte) error + Invoke(ctx context.Context) (arrow.Table, error) +} + +type Stats struct { + mu sync.Mutex + + numMessagesConsumed int + StartTime time.Time + NumErrors int + TotalThroughputPerSecond float64 +} + +func (s *Stats) SetThroughput(throughput float64) { + s.mu.Lock() + defer s.mu.Unlock() + s.TotalThroughputPerSecond = throughput +} + +func (s *Stats) GetThroughput() float64 { + s.mu.Lock() + defer s.mu.Unlock() + return s.TotalThroughputPerSecond +} + +func (s *Stats) SetNumMessagesConsumed(num int) { + s.mu.Lock() + defer s.mu.Unlock() + s.numMessagesConsumed = num +} + +func (s *Stats) MessagesConsumed() int { + s.mu.Lock() + defer s.mu.Unlock() + return s.numMessagesConsumed +} + +func (s *Stats) IncrementMessagesConsumed() { + s.mu.Lock() + defer s.mu.Unlock() + s.numMessagesConsumed++ +} + +type ErrorPolicy int + +const ( + PolicyRaise ErrorPolicy = iota + PolicyIgnore +) + +type PipelineErrorPolicies struct { + Source ErrorPolicy +} + +type Turbine struct { + source Source + sink Sink + handler Handler + batchSize int + flushInterval time.Duration + lock *sync.Mutex + running bool + stats *Stats + errorPolicy PipelineErrorPolicies + + logger *zap.Logger + + // Metrics + messageCounter metric.Int64Counter + errorCounter metric.Int64Counter + sourceReadLatency metric.Float64Histogram + sinkFlushLatency metric.Float64Histogram + sinkFlushNumRows metric.Float64ObservableGauge + sinkFlushCount metric.Int64Counter + batchProcessingLatency metric.Float64Histogram +} + +func WithTurbineLogger(l *zap.Logger) TurbineOption { + return func(t *Turbine) { + t.logger = l + } +} + +type TurbineOption func(turbine *Turbine) + +func NewTurbine( + source Source, + handler Handler, + sink Sink, + batchSize int, + flushInterval time.Duration, + lock *sync.Mutex, + policy PipelineErrorPolicies, + opts ...TurbineOption, +) *Turbine { + t := &Turbine{ + source: source, + sink: sink, + handler: handler, + batchSize: batchSize, + flushInterval: flushInterval, + lock: lock, + running: true, + stats: &Stats{ + StartTime: time.Now().UTC(), + }, + errorPolicy: policy, + + logger: zap.NewNop(), + } + + for _, opt := range opts { + opt(t) + } + + return t +} + +func (t *Turbine) StatusLoop(ctx context.Context) error { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + for { + select { + case <-ticker.C: + t.logThroughput() + case <-ctx.Done(): + return nil + } + } +} + +func (t *Turbine) ConsumeLoop(ctx context.Context, maxMsgs int) (stats *Stats, err error) { + log.Println("consumer loop starting") + + if err := t.source.Start(); err != nil { + return nil, err + } + defer func() { + t.logger.Info("closing source from ConsumeLoop", + zap.Bool("running", t.running), + zap.String("here", "here"), + zap.Error(err), + ) + + if err := t.source.Close(); err != nil { + panic(err) + } + }() + + t.stats.StartTime = time.Now().UTC() + t.stats.SetNumMessagesConsumed(0) + if err := t.handler.Init(ctx); err != nil { + return nil, err + } + + numBatchMessages := 0 + + stream := t.source.Stream() + + for t.running { + // t.logger.Debug("top of loop", zap.Bool("running", t.running)) + select { + case <-ctx.Done(): + t.logger.Warn("context done, stopping consumer loop") + t.running = false + break + case msg, ok := <-stream: + if !ok { + t.logger.Warn("stream channel closed") + t.running = false + break + } + + if msg == nil { + t.logger.Debug("received nil message") + continue + } + + t.stats.IncrementMessagesConsumed() + + if err := t.handler.Write(msg.Value()); err != nil { + t.stats.NumErrors++ + t.logger.Error("error writing message", zap.Error(err)) + return nil, err + } + + numBatchMessages++ + if maxMsgs > 0 && t.stats.MessagesConsumed() >= maxMsgs { + t.logger.Info("max messages consumed, stopping consumer loop") + t.running = false + break + } + + if numBatchMessages == t.batchSize { + t.lock.Lock() + batch, err := t.handler.Invoke(ctx) + t.lock.Unlock() + + if err != nil { + t.stats.NumErrors++ + t.logger.Error("error invoking handler", zap.Error(err)) + return nil, err + } + + if err := t.sink.WriteTable(batch); err != nil { + t.stats.NumErrors++ + t.logger.Error("error writing batch to sink", zap.Error(err)) + return nil, err + } + + if err := t.flush(batch); err != nil { + t.stats.NumErrors++ + t.logger.Error("error flushing sink", zap.Error(err)) + return nil, err + } + + if err := t.source.Commit(); err != nil { + t.logger.Error("error committing source", zap.Error(err)) + return nil, err + } + batch.Release() + + if err := t.handler.Init(ctx); err != nil { + t.stats.NumErrors++ + t.logger.Error("error reinitializing handler", zap.Error(err)) + return nil, err + } + + numBatchMessages = 0 + } + } + } + + t.logThroughput() + return t.stats, nil +} + +func (t *Turbine) logThroughput() { + if duration := time.Since(t.stats.StartTime).Seconds(); duration > 0 { + t.stats.SetThroughput(float64(t.stats.MessagesConsumed()) / duration) + } else { + t.stats.SetThroughput(0) + } + + if t.stats.TotalThroughputPerSecond > 0 { + t.logger.Info("throughput", + zap.Int("messages_consumed", t.stats.MessagesConsumed()), + zap.Float64("total_throughput_per_second", t.stats.GetThroughput()), + ) + } else { + t.logger.Info("no messages consumed, throughput is zero") + } +} + +func (t *Turbine) flush(batch arrow.Table) error { + start := time.Now() + if err := t.sink.Flush(); err != nil { + t.stats.NumErrors++ + rows, _ := t.sink.Batch() + log.Printf("flush error: %v, rows: %+v", err, rows) + return err + } + log.Printf("flushed sink with %d rows in %v", batch.NumRows(), time.Since(start)) + return nil +} diff --git a/internal/handlers/init.go b/internal/handlers/init.go new file mode 100644 index 0000000..f50177f --- /dev/null +++ b/internal/handlers/init.go @@ -0,0 +1,49 @@ +package handlers + +import ( + "context" + "fmt" + "github.com/apache/arrow-adbc/go/adbc" + "github.com/turbolytics/turbine/internal/config" + "github.com/turbolytics/turbine/internal/core" + "go.uber.org/zap" +) + +func New(conn adbc.Connection, c config.Handler, l *zap.Logger) (core.Handler, error) { + switch c.Type { + case "handlers.StructuredBatch": + // Create a statement to fetch the schema + stmt, err := conn.NewStatement() + if err != nil { + return nil, fmt.Errorf("failed to create statement: %w", err) + } + defer stmt.Close() + + // Set the query to fetch the schema + if err := stmt.SetSqlQuery(fmt.Sprintf("SELECT * FROM %s LIMIT 0", c.Table)); err != nil { + return nil, fmt.Errorf("failed to set SQL query: %w", err) + } + + // Execute the query to get the schema + reader, _, err := stmt.ExecuteQuery(context.Background()) + if err != nil { + return nil, fmt.Errorf("failed to execute query: %w", err) + } + defer reader.Release() + + // Create the StructuredBatchHandler + h, err := NewStructuredBatchHandler( + conn, + c.SQL, + c.Table, + reader.Schema(), + StructuredBatchWithLogger(l), + ) + if err != nil { + return nil, fmt.Errorf("failed to create StructuredBatchHandler: %w", err) + } + return h, nil + default: + return nil, fmt.Errorf(`handler: %q not supported`, c.Type) + } +} diff --git a/internal/handlers/noop.go b/internal/handlers/noop.go new file mode 100644 index 0000000..464e52b --- /dev/null +++ b/internal/handlers/noop.go @@ -0,0 +1,21 @@ +package handlers + +import ( + "fmt" + "github.com/apache/arrow-go/v18/arrow" +) + +type Noop struct{} + +func (n Noop) Init() error { + return nil +} + +func (n Noop) Write(msg []byte) error { + fmt.Println("Received message:", string(msg)) + return nil +} + +func (n Noop) Invoke() (*arrow.Table, error) { + return nil, nil +} diff --git a/internal/handlers/structured.go b/internal/handlers/structured.go new file mode 100644 index 0000000..63e176b --- /dev/null +++ b/internal/handlers/structured.go @@ -0,0 +1,224 @@ +package handlers + +import ( + "bytes" + "context" + "fmt" + "github.com/apache/arrow-adbc/go/adbc" + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/memory" + "go.uber.org/zap" + "runtime/debug" + "sync" +) + +type StructuredBatchHandler struct { + mu sync.Mutex + batch []arrow.Record + + alloc *memory.GoAllocator + conn adbc.Connection + stmt adbc.Statement + logger *zap.Logger + schema *arrow.Schema + sql string + + tableName string +} + +func (h *StructuredBatchHandler) Init(ctx context.Context) error { + h.mu.Lock() + defer h.mu.Unlock() + h.batch = nil + + // Execute truncate table statement + if err := h.stmt.SetSqlQuery(fmt.Sprintf("TRUNCATE TABLE %s;", h.tableName)); err != nil { + return err + } + if _, err := h.stmt.ExecuteUpdate(ctx); err != nil { + return err + } + return nil +} + +func (h *StructuredBatchHandler) Write(r []byte) error { + h.mu.Lock() + defer h.mu.Unlock() + + record, _, err := array.RecordFromJSON( + h.alloc, + h.schema, + bytes.NewReader(r), + array.WithMultipleDocs(), + ) + if err != nil { + return err + } + + h.batch = append(h.batch, record) + return nil +} + +func (h *StructuredBatchHandler) Invoke(ctx context.Context) (arrow.Table, error) { + var records []arrow.Record + defer func() { + if r := recover(); r != nil { + // Log the panic details for debugging + h.logger.Error( + "Panic occurred in Invoke: %v\n", + zap.Error(r.(error)), + ) + + for _, rec := range records { + bs, _ := rec.MarshalJSON() + h.logger.Debug("record", zap.String("record", string(bs))) + } + + for _, rec := range h.batch { + bs, _ := rec.MarshalJSON() + h.logger.Debug("batch_record", zap.String("record", string(bs))) + } + debug.PrintStack() // Print the stack trace for more context + } + }() + // Merge records into one for ingestion + recordReader, err := array.NewRecordReader(h.schema, h.batch) + if err != nil { + return nil, err + } + defer recordReader.Release() + + var allRecords []arrow.Record + for recordReader.Next() { + rec := recordReader.Record() + rec.Retain() // prevent premature GC + allRecords = append(allRecords, rec) + } + + var allColumns []arrow.Array + var totalRows int64 + + // Collect columns and count total rows + for _, rec := range allRecords { + for i := 0; i < int(rec.NumCols()); i++ { + if len(allColumns) <= i { + allColumns = append(allColumns, rec.Column(i)) + } else { + allColumns[i], _ = array.Concatenate([]arrow.Array{allColumns[i], rec.Column(i)}, h.alloc) + } + } + totalRows += rec.NumRows() + } + + // Build the combined record + combined := array.NewRecord(h.schema, allColumns, totalRows) + defer combined.Release() + + stmt, err := h.conn.NewStatement() + if err != nil { + return nil, fmt.Errorf("new statement error: %v", err) + } + defer stmt.Close() + + // Use append mode if schema is known, otherwise create + if h.schema == nil { + if err := stmt.SetOption(adbc.OptionKeyIngestMode, adbc.OptionValueIngestModeCreate); err != nil { + return nil, fmt.Errorf("set option ingest mode create error: %v", err) + } + } else { + if err := stmt.SetOption(adbc.OptionKeyIngestMode, adbc.OptionValueIngestModeAppend); err != nil { + return nil, fmt.Errorf("set option ingest mode append error: %v", err) + } + } + + if err := stmt.SetOption(adbc.OptionKeyIngestTargetTable, h.tableName); err != nil { + return nil, fmt.Errorf("set option target table error: %v", err) + } + + if err := stmt.Bind(ctx, combined); err != nil { + return nil, fmt.Errorf("statement binding arrow record error: %v", err) + } + + if _, err := stmt.ExecuteUpdate(ctx); err != nil { + return nil, fmt.Errorf("execute update error: %w", err) + } + + // Query results back using your stored SQL + queryStmt, err := h.conn.NewStatement() + if err != nil { + return nil, fmt.Errorf("new query statement error: %v", err) + } + defer queryStmt.Close() + + if err := queryStmt.SetSqlQuery(h.sql); err != nil { + return nil, fmt.Errorf("set query error: %v", err) + } + + reader, _, err := queryStmt.ExecuteQuery(ctx) + if err != nil { + return nil, fmt.Errorf("query execution error: %v", err) + } + defer reader.Release() + + for reader.Next() { + rec := reader.Record() + rec.Retain() + records = append(records, rec) + } + + // Clean up batch + for _, rec := range h.batch { + rec.Release() + } + h.batch = nil + + result := array.NewTableFromRecords(reader.Schema(), records) + result.Retain() + + for _, rec := range records { + rec.Release() + } + + return result, nil +} + +type StructuredBatchHandlerOption func(*StructuredBatchHandler) + +func StructuredBatchWithLogger(l *zap.Logger) StructuredBatchHandlerOption { + return func(h *StructuredBatchHandler) { + h.logger = l + } +} + +func NewStructuredBatchHandler( + conn adbc.Connection, + sql string, + tableName string, + schema *arrow.Schema, + opts ...StructuredBatchHandlerOption, +) (*StructuredBatchHandler, error) { + + pool := memory.NewGoAllocator() + + stmt, err := conn.NewStatement() + if err != nil { + return nil, err + } + + s := &StructuredBatchHandler{ + alloc: pool, + conn: conn, + stmt: stmt, + schema: schema, + sql: sql, + tableName: tableName, + logger: zap.NewNop(), + } + + for _, opt := range opts { + opt(s) + } + + return s, nil +} diff --git a/internal/handlers/structured_test.go b/internal/handlers/structured_test.go new file mode 100644 index 0000000..5ae0358 --- /dev/null +++ b/internal/handlers/structured_test.go @@ -0,0 +1,127 @@ +package handlers + +import ( + "context" + "database/sql" + "github.com/apache/arrow-go/v18/arrow" + "github.com/marcboeker/go-duckdb" + _ "github.com/marcboeker/go-duckdb" + "github.com/zeebo/assert" + "testing" +) + +func TestArrowGetSchema(t *testing.T) { + connector, err := duckdb.NewConnector(":memory:", nil) + assert.NoError(t, err) + defer func() { + err := connector.Close() + assert.NoError(t, err) + }() + + db := sql.OpenDB(connector) + defer func() { + err := db.Close() + assert.NoError(t, err) + }() + + // Create a table with 5 columns + createStmt := ` + CREATE TABLE users ( + id INTEGER, + name TEXT, + email TEXT, + age INTEGER, + created_at TIMESTAMP + ); + ` + + _, err = db.Exec(createStmt) + assert.NoError(t, err) + + conn, err := connector.Connect(context.Background()) + assert.NoError(t, err) + defer func() { + err := conn.Close() + assert.NoError(t, err) + }() + + arr, err := duckdb.NewArrowFromConn(conn) + assert.NoError(t, err) + + rdr, err := arr.QueryContext( + context.Background(), + "select * from users", + ) + assert.NoError(t, err) + + schema := rdr.Schema() + assert.Equal(t, 5, len(schema.Fields())) +} + +func TestStructuredBatchHandler_SelectCount(t *testing.T) { + connector, err := duckdb.NewConnector(":memory:", nil) + assert.NoError(t, err) + defer func() { + err := connector.Close() + assert.NoError(t, err) + }() + + db := sql.OpenDB(connector) + defer func() { + err := db.Close() + assert.NoError(t, err) + }() + + // Create a table with 5 columns + createStmt := ` + CREATE TABLE users ( + id INTEGER, + name TEXT, + email TEXT, + age INTEGER, + created_at TIMESTAMP + ); + ` + + _, err = db.Exec(createStmt) + assert.NoError(t, err) + + conn, err := connector.Connect(context.Background()) + assert.NoError(t, err) + defer func() { + err := conn.Close() + assert.NoError(t, err) + }() + + arr, err := duckdb.NewArrowFromConn(conn) + assert.NoError(t, err) + + schema := arrow.NewSchema( + []arrow.Field{ + {Name: "id", Type: arrow.PrimitiveTypes.Int32}, + {Name: "name", Type: arrow.BinaryTypes.String}, + {Name: "email", Type: arrow.BinaryTypes.String}, + {Name: "age", Type: arrow.PrimitiveTypes.Int32}, + {Name: "created_at", Type: arrow.FixedWidthTypes.Timestamp_ms}, + }, + nil) + + h, err := NewStructuredBatchHandler( + arr, + "SELECT * FROM users", + "users", + schema, + ) + assert.NoError(t, err) + + err = h.Init(context.Background()) + assert.NoError(t, err) + + err = h.Write([]byte(`{"id": 1, "name": "test name", "email": "test email", "age": 77, "created_at": "2025-04-10 15:00:00"}`)) + assert.NoError(t, err) + + res, err := h.Invoke(context.Background()) + assert.NoError(t, err) + + assert.Equal(t, 1, res.NumRows()) +} diff --git a/internal/kafka/source.go b/internal/kafka/source.go new file mode 100644 index 0000000..cce7dca --- /dev/null +++ b/internal/kafka/source.go @@ -0,0 +1,157 @@ +package kafka + +import ( + "errors" + "fmt" + "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/turbolytics/turbine/internal/core" + "go.uber.org/zap" + "sync" + "time" +) + +type Message struct { + value []byte +} + +func (m Message) Value() []byte { + return m.value +} + +type Source struct { + consumer *kafka.Consumer + topics []string + readTimeout time.Duration + streamChan chan core.Message + closeOnce sync.Once + + logger *zap.Logger + + sync.Mutex + lastMessage *kafka.Message +} + +type Option func(*Source) + +func WithReadTimeout(timeout time.Duration) Option { + return func(s *Source) { + s.readTimeout = timeout + } +} + +func WithLogger(logger *zap.Logger) Option { + return func(s *Source) { + l := logger.Named("source.kafka") + s.logger = l + } +} + +func NewSource(consumer *kafka.Consumer, topics []string, opts ...Option) (*Source, error) { + s := &Source{ + consumer: consumer, + topics: topics, + readTimeout: time.Second * 5, + streamChan: make(chan core.Message), + + logger: zap.NewNop(), + } + + for _, opt := range opts { + opt(s) + } + + return s, nil +} + +func (k *Source) SetLastMessage(msg *kafka.Message) { + k.Lock() + defer k.Unlock() + k.lastMessage = msg +} + +func (k *Source) LastMessage() *kafka.Message { + k.Lock() + defer k.Unlock() + return k.lastMessage +} + +func (k *Source) Start() error { + k.logger.Info("starting consumer", zap.String("topics", fmt.Sprintf("%v", k.topics))) + return k.consumer.SubscribeTopics(k.topics, nil) +} + +func (k *Source) Close() error { + k.logger.Info("closing consumer for topics: \n", zap.String("topics", fmt.Sprintf("%v", k.topics))) + k.closeOnce.Do(func() { + k.logger.Error("closing consumer in do once") + close(k.streamChan) + }) + return k.consumer.Close() +} + +func (k *Source) Commit() error { + msg := k.LastMessage() + if msg == nil { + k.logger.Warn("no last message to commit") + return nil + } + + _, err := k.consumer.CommitMessage(msg) + if err != nil { + k.logger.Error("failed to commit offsets", zap.Error(err)) + var kafkaErr kafka.Error + if errors.As(err, &kafkaErr) && kafkaErr.Code() == kafka.ErrNoOffset { + // Handle ErrNoOffset specifically + fmt.Printf("No offset found for topic %s, ignoring commit error\n", k.topics) + return nil + } + } + + return err +} + +func (k *Source) Stream() <-chan core.Message { + k.logger.Info("starting stream") + go func() { + for { + select { + /* + case <-k.stopChan: + k.logger.Info("stopping stream") + k.closeOnce.Do(func() { + close(k.streamChan) + }) + return + + */ + default: + ev := k.consumer.Poll(int(k.readTimeout.Milliseconds())) + if ev == nil { + continue + } + + switch msg := ev.(type) { + case *kafka.Message: + k.SetLastMessage(msg) + k.streamChan <- &Message{value: msg.Value} + case kafka.PartitionEOF: + k.logger.Info("%s reached end at offset %v\n", + zap.String("topic", fmt.Sprintf("%v", k.topics)), + zap.String("message", fmt.Sprintf("%v", msg)), + ) + case kafka.Error: + if msg.Code() == kafka.ErrPartitionEOF { + k.logger.Info("%s reached end at offset\n", + zap.String("topic", fmt.Sprintf("%v", k.topics)), + ) + } else { + k.logger.Error("Kafka error: %v\n", zap.String("error", msg.Error())) + } + default: + // Ignore other types + } + } + } + }() + return k.streamChan +} diff --git a/internal/local/console.go b/internal/local/console.go new file mode 100644 index 0000000..643d920 --- /dev/null +++ b/internal/local/console.go @@ -0,0 +1,60 @@ +package local + +import ( + "encoding/json" + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "os" + "sync" +) + +type ConsoleSink struct { + mu sync.Mutex + buf []arrow.Table + encoder *json.Encoder + out *os.File +} + +func NewConsoleSink() *ConsoleSink { + return &ConsoleSink{ + out: os.Stdout, + encoder: json.NewEncoder(os.Stdout), + } +} + +func (s *ConsoleSink) WriteTable(batch arrow.Table) error { + s.mu.Lock() + defer s.mu.Unlock() + + // Retain to prevent premature release + s.buf = append(s.buf, batch) + return nil +} + +func (s *ConsoleSink) Batch() (arrow.Table, error) { + s.mu.Lock() + defer s.mu.Unlock() + + return nil, nil +} + +func (s *ConsoleSink) Flush() error { + s.mu.Lock() + defer s.mu.Unlock() + + if len(s.buf) == 0 { + return nil + } + + for _, table := range s.buf { + tr := array.NewTableReader(table, 0) + for tr.Next() { + rec := tr.Record() + if err := s.encoder.Encode(rec); err != nil { + return err + } + } + } + s.buf = nil + return nil +} diff --git a/internal/sinks/init.go b/internal/sinks/init.go new file mode 100644 index 0000000..64f51aa --- /dev/null +++ b/internal/sinks/init.go @@ -0,0 +1,28 @@ +package sinks + +import ( + "github.com/apache/arrow-go/v18/arrow" + "github.com/turbolytics/turbine/internal/config" + "github.com/turbolytics/turbine/internal/core" +) + +type NoopSink struct{} + +func (n *NoopSink) WriteTable(batch arrow.Table) error { + // No operation performed + return nil +} + +func (n *NoopSink) Flush() error { + // No operation performed + return nil +} + +func (n *NoopSink) Batch() (arrow.Table, error) { + // No operation performed, return nil + return nil, nil +} + +func New(sink config.Sink) (core.Sink, error) { + return &NoopSink{}, nil +} diff --git a/internal/sources/init.go b/internal/sources/init.go new file mode 100644 index 0000000..016010c --- /dev/null +++ b/internal/sources/init.go @@ -0,0 +1,38 @@ +package sources + +import ( + "fmt" + "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/turbolytics/turbine/internal/config" + "github.com/turbolytics/turbine/internal/core" + tkafka "github.com/turbolytics/turbine/internal/kafka" + "go.uber.org/zap" +) + +func New(c config.Source, l *zap.Logger) (core.Source, error) { + switch c.Type { + case "kafka": + l.Info( + "initializing kafka source", + zap.String("topics", fmt.Sprintf("%v", c.Kafka.Topics)), + zap.String("group.id", c.Kafka.GroupID), + zap.String("auto.offset.reset", c.Kafka.AutoOffsetReset), + ) + consumer, _ := kafka.NewConsumer(&kafka.ConfigMap{ + "bootstrap.servers": "localhost:9092", + "group.id": c.Kafka.GroupID, + "auto.offset.reset": c.Kafka.AutoOffsetReset, + "enable.auto.commit": false, + "fetch.wait.max.ms": 10, + }) + k, err := tkafka.NewSource( + consumer, + c.Kafka.Topics, + tkafka.WithLogger(l), + ) + return k, err + + default: + return nil, fmt.Errorf("source: %q not supported", c.Type) + } +}