diff --git a/README.md b/README.md index f2dc811..a7a9996 100644 --- a/README.md +++ b/README.md @@ -77,6 +77,11 @@ Redis client with: - Two-level caching (local + distributed) - Automatic cache invalidation via pub/sub +### [kafka_client](https://pkg.go.dev/github.com/poly-workshop/go-webmods/kafka_client) +Kafka client helpers with: +- Reader factory (consumer group or partition) +- Writer factory with configurable batching and acknowledgements + ### [object_storage](https://pkg.go.dev/github.com/poly-workshop/go-webmods/object_storage) Unified object storage interface supporting: - Local filesystem diff --git a/doc.go b/doc.go index 402d65f..9c09c7e 100644 --- a/doc.go +++ b/doc.go @@ -9,6 +9,7 @@ // - app: Core application utilities for configuration, logging, and context management // - gorm_client: Database client factory supporting PostgreSQL and SQLite // - redis_client: Redis client with caching support and cluster mode +// - kafka_client: Kafka client factories for readers and writers // - object_storage: Multi-provider object storage interface (local, MinIO, Volcengine TOS) // - grpc_utils: gRPC middleware and interceptors for logging and request ID tracking // - smtp_mailer: SMTP email sender with TLS support diff --git a/go.mod b/go.mod index fe493c5..f1feb0c 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/lmittmann/tint v1.1.2 github.com/oj-lab/go-webmods v0.1.4 github.com/redis/go-redis/v9 v9.12.1 + github.com/segmentio/kafka-go v0.4.48 github.com/spf13/viper v1.20.1 github.com/testcontainers/testcontainers-go v0.39.0 github.com/volcengine/ve-tos-golang-sdk/v2 v2.7.21 @@ -54,6 +55,7 @@ require ( github.com/morikuni/aec v1.0.0 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.1 // indirect + github.com/pierrec/lz4/v4 v4.1.15 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect diff --git a/go.sum b/go.sum index d8b29bc..7f08176 100644 --- a/go.sum +++ b/go.sum @@ -126,6 +126,7 @@ github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= @@ -208,6 +209,8 @@ github.com/pelletier/go-toml/v2 v2.2.4 h1:mye9XuhQ6gvn5h28+VilKrrPoQVanw5PMw/TB0 github.com/pelletier/go-toml/v2 v2.2.4/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8t9Stt+h56mCY= github.com/philhofer/fwd v1.2.0 h1:e6DnBTl7vGY+Gz322/ASL4Gyp1FspeMvx1RNDoToZuM= github.com/philhofer/fwd v1.2.0/go.mod h1:RqIHx9QI14HlwKwm98g9Re5prTQ6LdeRQn+gXJFxsJM= +github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0= +github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -224,6 +227,8 @@ github.com/rs/xid v1.6.0 h1:fV591PaemRlL6JfRxGDEPl69wICngIQ3shQtzfy2gxU= github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0= github.com/sagikazarmark/locafero v0.10.0 h1:FM8Cv6j2KqIhM2ZK7HZjm4mpj9NBktLgowT1aN9q5Cc= github.com/sagikazarmark/locafero v0.10.0/go.mod h1:Ieo3EUsjifvQu4NZwV5sPd4dwvu0OCgEQV7vjc9yDjw= +github.com/segmentio/kafka-go v0.4.48 h1:9jyu9CWK4W5W+SroCe8EffbrRZVqAOkuaLd/ApID4Vs= +github.com/segmentio/kafka-go v0.4.48/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg= github.com/shirou/gopsutil/v4 v4.25.6 h1:kLysI2JsKorfaFPcYmcJqbzROzsBWEOAtw6A7dIfqXs= github.com/shirou/gopsutil/v4 v4.25.6/go.mod h1:PfybzyydfZcN+JMMjkF6Zb8Mq1A/VcogFFg7hj50W9c= github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= @@ -312,6 +317,7 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw= +golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= golang.org/x/crypto v0.41.0 h1:WKYxWedPGCTVVl5+WHSSrOBT0O8lx32+zxmHxijgXp4= golang.org/x/crypto v0.41.0/go.mod h1:pO5AFd7FA68rFak7rOAGVuygIISepHftHnr8dr6+sUc= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= @@ -320,6 +326,7 @@ golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3/go.mod h1:3p9vT2HGsQu2 golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.6.0/go.mod h1:4mET923SAdbXp2ki8ey+zGs1SLqsuM2Y0uvdZR/fUNI= golang.org/x/mod v0.7.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -336,6 +343,9 @@ golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco= golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY= golang.org/x/net v0.3.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE= golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/net v0.43.0 h1:lat02VYK2j4aLzMzecihNvTlJNQUq316m2Mr9rnM6YE= golang.org/x/net v0.43.0/go.mod h1:vhO1fvI4dGsIjh73sWfUVjj3N7CA9WkKJNQm2svM6Jg= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -374,8 +384,10 @@ golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.36.0 h1:KVRy2GtZBrk1cBYA7MKu5bEZFxQk4NIDV6RLVcC8o0k= golang.org/x/sys v0.36.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -384,6 +396,9 @@ golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc= golang.org/x/term v0.3.0/go.mod h1:q750SLmJuPmVoN1blW3UFBPREJfb1KmY3vwxfr+nFDA= golang.org/x/term v0.4.0/go.mod h1:9P2UbLfCdcvo3p/nzKvsmas4TnlujnuoV9hGgYzW1lQ= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= +golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= golang.org/x/term v0.34.0 h1:O/2T7POpk0ZZ7MAzMeWFSg6S5IpWd/RXDlM9hgM3DR4= golang.org/x/term v0.34.0/go.mod h1:5jC53AEywhIVebHgPVeg0mj8OD3VO9OzclacVrqpaAw= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -394,6 +409,9 @@ golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng= golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU= golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 h1:vVKdlvoWBphwdxWKrFZEuM0kGgGLxUOYcY4U/2Vjg44= @@ -407,6 +425,7 @@ golang.org/x/tools v0.1.10/go.mod h1:Uh6Zz+xoGYZom868N8YTex3t7RhtHDBrE8Gzo9bV56E golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.2.0/go.mod h1:y4OqIKeOV/fWJetJ8bXPU1sEVniLMIyDAZWeHdV+NTA= golang.org/x/tools v0.4.0/go.mod h1:UE5sM2OK9E/d67R0ANs2xJizIymRP5gJU295PvKXxjQ= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/kafka-client/doc.go b/kafka-client/doc.go new file mode 100644 index 0000000..30d4983 --- /dev/null +++ b/kafka-client/doc.go @@ -0,0 +1,20 @@ +// Package kafka_client provides factory functions for creating Kafka readers +// and writers using github.com/segmentio/kafka-go. +// +// # Reader +// +// reader := kafka_client.NewReader(kafka_client.ReaderConfig{ +// Brokers: []string{"localhost:9092"}, +// Topic: "example-topic", +// GroupID: "example-group", +// }) +// defer reader.Close() +// +// # Writer +// +// writer := kafka_client.NewWriter(kafka_client.WriterConfig{ +// Brokers: []string{"localhost:9092"}, +// Topic: "example-topic", +// }) +// defer writer.Close() +package kafka_client diff --git a/kafka-client/reader.go b/kafka-client/reader.go new file mode 100644 index 0000000..be36e49 --- /dev/null +++ b/kafka-client/reader.go @@ -0,0 +1,36 @@ +package kafka_client + +import ( + "github.com/segmentio/kafka-go" +) + +type ReaderConfig struct { + Brokers []string + Topic string + GroupID string + Partition int + MinBytes int + MaxBytes int + StartOffset int64 +} + +func NewReader(cfg ReaderConfig) *kafka.Reader { + if len(cfg.Brokers) == 0 { + panic("kafka_client: no kafka brokers configured") + } + if cfg.Topic == "" { + panic("kafka_client: topic is required") + } + + readerCfg := kafka.ReaderConfig{ + Brokers: cfg.Brokers, + Topic: cfg.Topic, + GroupID: cfg.GroupID, + Partition: cfg.Partition, + MinBytes: cfg.MinBytes, + MaxBytes: cfg.MaxBytes, + StartOffset: cfg.StartOffset, + } + + return kafka.NewReader(readerCfg) +} diff --git a/kafka-client/reader_test.go b/kafka-client/reader_test.go new file mode 100644 index 0000000..763fc81 --- /dev/null +++ b/kafka-client/reader_test.go @@ -0,0 +1,64 @@ +package kafka_client + +import ( + "testing" + + "github.com/segmentio/kafka-go" +) + +func TestNewReaderRequiresBrokers(t *testing.T) { + defer func() { + if r := recover(); r == nil { + t.Fatal("expected panic when brokers are missing") + } + }() + + NewReader(ReaderConfig{ + Topic: "test-topic", + }) +} + +func TestNewReaderRequiresTopic(t *testing.T) { + defer func() { + if r := recover(); r == nil { + t.Fatal("expected panic when topic is missing") + } + }() + + NewReader(ReaderConfig{ + Brokers: []string{"localhost:9092"}, + }) +} + +func TestNewReaderAppliesConfig(t *testing.T) { + reader := NewReader(ReaderConfig{ + Brokers: []string{"localhost:9092"}, + Topic: "test-topic", + GroupID: "group-1", + MinBytes: 1, + MaxBytes: 10, + StartOffset: kafka.LastOffset, + }) + t.Cleanup(func() { + if err := reader.Close(); err != nil { + t.Logf("failed to close reader: %v", err) + } + }) + + cfg := reader.Config() + if cfg.Topic != "test-topic" { + t.Fatalf("expected topic %q, got %q", "test-topic", cfg.Topic) + } + if cfg.GroupID != "group-1" { + t.Fatalf("expected group %q, got %q", "group-1", cfg.GroupID) + } + if cfg.MinBytes != 1 { + t.Fatalf("expected min bytes %d, got %d", 1, cfg.MinBytes) + } + if cfg.MaxBytes != 10 { + t.Fatalf("expected max bytes %d, got %d", 10, cfg.MaxBytes) + } + if cfg.StartOffset != kafka.LastOffset { + t.Fatalf("expected start offset %d, got %d", kafka.LastOffset, cfg.StartOffset) + } +} diff --git a/kafka-client/writer.go b/kafka-client/writer.go new file mode 100644 index 0000000..a719565 --- /dev/null +++ b/kafka-client/writer.go @@ -0,0 +1,44 @@ +package kafka_client + +import ( + "time" + + "github.com/segmentio/kafka-go" +) + +type WriterConfig struct { + Brokers []string + Topic string + Async bool + RequiredAcks kafka.RequiredAcks + Balancer kafka.Balancer + BatchSize int + BatchBytes int + BatchTimeout time.Duration + CompressionCodec kafka.CompressionCodec + MaxAttempts int +} + +func NewWriter(cfg WriterConfig) *kafka.Writer { + if len(cfg.Brokers) == 0 { + panic("kafka_client: no kafka brokers configured") + } + if cfg.Topic == "" { + panic("kafka_client: topic is required") + } + + writerCfg := kafka.WriterConfig{ + Brokers: cfg.Brokers, + Topic: cfg.Topic, + Async: cfg.Async, + RequiredAcks: int(cfg.RequiredAcks), + Balancer: cfg.Balancer, + BatchSize: cfg.BatchSize, + BatchBytes: cfg.BatchBytes, + BatchTimeout: cfg.BatchTimeout, + CompressionCodec: cfg.CompressionCodec, + MaxAttempts: cfg.MaxAttempts, + } + + return kafka.NewWriter(writerCfg) +} diff --git a/kafka-client/writer_test.go b/kafka-client/writer_test.go new file mode 100644 index 0000000..7f06e28 --- /dev/null +++ b/kafka-client/writer_test.go @@ -0,0 +1,65 @@ +package kafka_client + +import ( + "testing" + "time" + + "github.com/segmentio/kafka-go" +) + +func TestNewWriterRequiresBrokers(t *testing.T) { + defer func() { + if r := recover(); r == nil { + t.Fatal("expected panic when brokers are missing") + } + }() + + NewWriter(WriterConfig{ + Topic: "test-topic", + }) +} + +func TestNewWriterRequiresTopic(t *testing.T) { + defer func() { + if r := recover(); r == nil { + t.Fatal("expected panic when topic is missing") + } + }() + + NewWriter(WriterConfig{ + Brokers: []string{"localhost:9092"}, + }) +} + +func TestNewWriterAppliesConfig(t *testing.T) { + writer := NewWriter(WriterConfig{ + Brokers: []string{"localhost:9092"}, + Topic: "test-topic", + Async: true, + RequiredAcks: kafka.RequireAll, + BatchSize: 2, + BatchTimeout: 2 * time.Second, + }) + t.Cleanup(func() { + if err := writer.Close(); err != nil { + t.Logf("failed to close writer: %v", err) + } + }) + + stats := writer.Stats() + if stats.Topic != "test-topic" { + t.Fatalf("expected topic %q, got %q", "test-topic", stats.Topic) + } + if !stats.Async { + t.Fatal("expected async writer to be true") + } + if stats.RequiredAcks != int64(kafka.RequireAll) { + t.Fatalf("expected required acks %d, got %d", kafka.RequireAll, stats.RequiredAcks) + } + if stats.MaxBatchSize != 2 { + t.Fatalf("expected max batch size %d, got %d", 2, stats.MaxBatchSize) + } + if stats.BatchTimeout != 2*time.Second { + t.Fatalf("expected batch timeout %v, got %v", 2*time.Second, stats.BatchTimeout) + } +}