Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ test-release: docker-image
start-backing-services:
docker-compose -f dev/kafka-single.yml up -d

.PHONY: stop-backing-services
stop-backing-services:
docker-compose -f dev/kafka-single.yml down

.PHONY: docker-image
docker-image:
@GIT_HASH=$$(git rev-parse --short HEAD) && \
Expand Down
9 changes: 9 additions & 0 deletions cmd/turbine/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package main

import (
"github.com/turbolytics/turbine/internal/cli"
)

func main() {
cli.Execute()
}
9 changes: 5 additions & 4 deletions dev/config/examples/kafka.structured.mem.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@ commands:
);

pipeline:
batch_size: {{ SQLFLOW_BATCH_SIZE|default(1) }}
batch_size: {{ SQLFLOW_BATCH_SIZE|default:500 }}

source:
type: kafka
kafka:
brokers: [{{ SQLFLOW_KAFKA_BROKERS|default('localhost:9092') }}]
brokers: [{{ SQLFLOW_KAFKA_BROKERS|default:'localhost:9092' }}]
group_id: test
auto_offset_reset: earliest
topics:
- "input-structured-mem"
- "input-structured-mem-2"

handler:
type: "handlers.StructuredBatch"
Expand All @@ -26,8 +26,9 @@ pipeline:
sql: |
SELECT
properties.city as city,
1 as city_count
COUNT(*) as count
FROM source
GROUP BY properties.city;

sink:
type: console
39 changes: 39 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
module github.com/turbolytics/turbine

go 1.24.0

require (
github.com/marcboeker/go-duckdb v1.8.5
github.com/spf13/cobra v1.9.1
github.com/zeebo/assert v1.3.0
go.opentelemetry.io/otel/metric v1.35.0
gopkg.in/yaml.v3 v3.0.1
)

require (
github.com/apache/arrow-adbc/go/adbc v1.6.0 // indirect
github.com/apache/arrow-go/v18 v18.2.0 // indirect
github.com/confluentinc/confluent-kafka-go v1.9.2 // indirect
github.com/flosch/pongo2/v6 v6.0.0 // indirect
github.com/go-viper/mapstructure/v2 v2.2.1 // indirect
github.com/goccy/go-json v0.10.5 // indirect
github.com/google/flatbuffers v25.2.10+incompatible // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/klauspost/compress v1.18.0 // indirect
github.com/klauspost/cpuid/v2 v2.2.10 // indirect
github.com/pierrec/lz4/v4 v4.1.22 // indirect
github.com/rogpeppe/go-internal v1.9.0 // indirect
github.com/spf13/pflag v1.0.6 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
go.opentelemetry.io/otel v1.35.0 // indirect
go.uber.org/multierr v1.10.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/exp v0.0.0-20250305212735-054e65f0b394 // indirect
golang.org/x/mod v0.24.0 // indirect
golang.org/x/sync v0.13.0 // indirect
golang.org/x/sys v0.32.0 // indirect
golang.org/x/tools v0.32.0 // indirect
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect
google.golang.org/protobuf v1.36.6 // indirect
)
313 changes: 313 additions & 0 deletions go.sum

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions internal/cli/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package cli
1 change: 1 addition & 0 deletions internal/cli/dev.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package cli
34 changes: 34 additions & 0 deletions internal/cli/root.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package cli

import (
"fmt"
"github.com/spf13/cobra"
"github.com/turbolytics/turbine/internal/cli/run"
"github.com/turbolytics/turbine/internal/cli/tail"
"os"
)

func NewRootCommand() *cobra.Command {
var cmd = &cobra.Command{
Use: "turbine",
Short: "",
Long: ``,
// The run function is called when the command is executed
Run: func(cmd *cobra.Command, args []string) {
fmt.Println("Welcome to turbine!")
},
}

cmd.AddCommand(run.NewCommand())
cmd.AddCommand(tail.NewCommand())

return cmd
}

func Execute() {
cmd := NewRootCommand()
if err := cmd.Execute(); err != nil {
fmt.Println(err)
os.Exit(1)
}
}
125 changes: 125 additions & 0 deletions internal/cli/run/root.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package run

import (
"context"
"fmt"
"github.com/apache/arrow-adbc/go/adbc/drivermgr" // Import the driver manager
"github.com/spf13/cobra"
"github.com/turbolytics/turbine/internal/handlers"
"github.com/turbolytics/turbine/internal/sinks"
"github.com/turbolytics/turbine/internal/sources"
"go.uber.org/zap"
"net/http"
_ "net/http/pprof"
"sync"
"time"

"github.com/turbolytics/turbine/internal/config"
"github.com/turbolytics/turbine/internal/core"
)

func NewCommand() *cobra.Command {
var configPath string

var cmd = &cobra.Command{
Use: "run",
Short: "Run turbine against a stream of data",
RunE: func(cmd *cobra.Command, args []string) error {
logger, _ := zap.NewDevelopment()
defer logger.Sync()
l := logger.Named("turbine.run")

// Start pprof server
go func() {
l.Info("starting pprof server on :6060")
if err := http.ListenAndServe(":6060", nil); err != nil {
l.Error("failed to start pprof server", zap.Error(err))
}
}()

conf, err := config.Load(configPath, map[string]string{})
if err != nil {
return fmt.Errorf("failed to load config: %w", err)
}

// Initialize ADBC connection using driver manager
var drv drivermgr.Driver
db, err := drv.NewDatabase(map[string]string{
"driver": "/opt/homebrew/lib/libduckdb.dylib",
"entrypoint": "duckdb_adbc_init",
})
if err != nil {
return fmt.Errorf("failed to initialize DuckDB driver: %w", err)
}

conn, err := db.Open(context.Background())
if err != nil {
return fmt.Errorf("failed to open DuckDB connection: %w", err)
}
defer func() {
if err := conn.Close(); err != nil {
l.Error("failed to close DuckDB connection", zap.Error(err))
}
}()

// Initialize commands
if err := core.InitCommands(conn, conf); err != nil {
return fmt.Errorf("failed to initialize commands: %w", err)
}

src, err := sources.New(
conf.Pipeline.Source,
logger,
)
if err != nil {
return fmt.Errorf("failed to create source: %w", err)
}

sink, err := sinks.New(conf.Pipeline.Sink)
if err != nil {
return fmt.Errorf("failed to create sink: %w", err)
}

handler, err := handlers.New(
conn,
conf.Pipeline.Handler,
logger,
)
if err != nil {
return fmt.Errorf("failed to create handler: %w", err)
}

lock := &sync.Mutex{}
turbine := core.NewTurbine(
src,
handler,
sink,
conf.Pipeline.BatchSize,
time.Duration(conf.Pipeline.FlushIntervalSeconds)*time.Second,
lock,
core.PipelineErrorPolicies{
// Source: conf.Pipeline.Source.Error.Policy,
},
core.WithTurbineLogger(l),
)

go func() {
if err := turbine.StatusLoop(context.Background()); err != nil {
l.Error("failed to start status loop", zap.Error(err))
}
}()

_, err = turbine.ConsumeLoop(context.Background(), 0)
if err != nil {
l.Error("failed to consume loop", zap.Error(err))
return err
}
return nil
},
}

cmd.Flags().StringVarP(&configPath, "config", "c", "", "Path to turbine config file (required)")
cmd.MarkFlagRequired("config")

return cmd
}
80 changes: 80 additions & 0 deletions internal/cli/tail/root.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package tail

import (
"fmt"
"github.com/spf13/cobra"
"github.com/turbolytics/turbine/internal/config"
"github.com/turbolytics/turbine/internal/sources"
"go.uber.org/zap"
"time"
)

func NewCommand() *cobra.Command {
var configPath string

var cmd = &cobra.Command{
Use: "tail",
Short: "Tail a stream of data",
RunE: func(cmd *cobra.Command, args []string) error {
logger, _ := zap.NewDevelopment()
defer logger.Sync()
l := logger.Named("turbine.tail")

conf, err := config.Load(configPath, map[string]string{})
if err != nil {
return fmt.Errorf("failed to load config: %w", err)
}

src, err := sources.New(conf.Pipeline.Source, logger)
if err != nil {
return fmt.Errorf("failed to create source: %w", err)
}

if err := src.Start(); err != nil {
return fmt.Errorf("failed to start source: %w", err)
}

stream := src.Stream()
defer func() {
if err := src.Close(); err != nil {
l.Error("failed to close source", zap.Error(err))
}
}()

// Status loop to display total messages processed every 5 seconds
totalMessages := 0
done := make(chan struct{})
go func() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
l.Info("status update", zap.Int("totalMessages", totalMessages))
case <-done:
return
}
}
}()

// Read all messages from the source
for msg := range stream {
totalMessages++
fmt.Printf("Message: %s\n", string(msg.Value()))
if err := src.Commit(); err != nil {
l.Error("failed to commit message", zap.Error(err))
return err
}
}

// Signal the status loop to stop
close(done)
return nil
},
}

cmd.Flags().StringVarP(&configPath, "config", "c", "", "Path to turbine config file (required)")
cmd.MarkFlagRequired("config")
return cmd

}
Loading
Loading