Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/client/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ func StdConfig(key string, optionFunc ...OptionFunc) *Config {
// DefaultConfig 默认配置
func DefaultConfig(key string) *Config {
kfkCfg := sarama.NewConfig()
kfkCfg.Version = sarama.V2_6_0_0 // Explicitly set to match broker

return &Config{
path: "kafka." + key,
Expand Down
4 changes: 4 additions & 0 deletions pkg/client/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ func (kfk Kafka) NewConsumerGroup(groupID string) (ConsumerGroup, error) {
return sarama.NewConsumerGroupFromClient(groupID, kfk.client)
}

func (kfk Kafka) NewClusterAdmin() (ClusterAdmin, error) {
return sarama.NewClusterAdmin(kfk.cfg.Addrs, kfk.client.Config())
}

// Config returns the Config struct of the client. This struct should not be
// altered after it has been created.
func (kfk Kafka) Config() *ConfigKafka {
Expand Down
76 changes: 76 additions & 0 deletions pkg/client/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"os"
"os/signal"
"sync/atomic"
"testing"
"time"

"github.com/boxgo/box/pkg/client/kafka"
Expand Down Expand Up @@ -73,3 +74,78 @@ func Example() {
fmt.Println(offset >= 0, partition == 0, atomic.LoadInt32(&cnt) > 0)
// Output: true true true
}

func TestKafka_ConsumerGroupOperations(t *testing.T) {
kfk := kafka.StdConfig("default").Build()

admin, err := kfk.NewClusterAdmin()
if err != nil {
t.Fatal(err)
}

defer func() {
if err := admin.Close(); err != nil {
t.Fatal(err)
}
}()

// Test ListConsumerGroups
t.Run("ListConsumerGroups", func(t *testing.T) {
groups, err := admin.ListConsumerGroups()
if err != nil {
t.Fatal(err)
}

t.Logf("Found %d consumer groups", len(groups))
for groupID, groupType := range groups {
t.Logf("Group: %s, Type: %s", groupID, groupType)
}
})

// Test DescribeConsumerGroups
t.Run("DescribeConsumerGroups", func(t *testing.T) {
// First, get the list of consumer groups
groups, err := admin.ListConsumerGroups()
if err != nil {
t.Fatal(err)
}

if len(groups) == 0 {
t.Skip("No consumer groups found, skipping describe test")
}

// Get the first group ID for testing
var groupIDs []string
for groupID := range groups {
groupIDs = append(groupIDs, groupID)
break // Only test with the first group
}

descriptions, err := admin.DescribeConsumerGroups(groupIDs)
if err != nil {
t.Fatal(err)
}

for _, desc := range descriptions {
t.Logf("Group: %s, State: %s, Protocol: %s", desc.GroupId, desc.State, desc.ProtocolType)
t.Logf("Members: %d", len(desc.Members))
}
})

// Test DeleteConsumerGroup
t.Run("DeleteConsumerGroup", func(t *testing.T) {
// Create a test consumer group first
testGroupID := "wechat_switch_change_v2.cache.push-v2-6d5797558-nhrnz"

// Note: In a real test environment, you would create a consumer group first
// For this test, we'll just test the delete operation on a non-existent group
// which should return an error
err := admin.DeleteConsumerGroup(testGroupID)
if err != nil {
// This is expected for a non-existent group
t.Logf("Delete consumer group returned error (expected): %v", err)
} else {
t.Logf("Successfully deleted consumer group: %s", testGroupID)
}
})
}
Loading