diff --git a/pkg/client/kafka/config.go b/pkg/client/kafka/config.go index 0d092d0..ecc6051 100644 --- a/pkg/client/kafka/config.go +++ b/pkg/client/kafka/config.go @@ -172,6 +172,7 @@ func StdConfig(key string, optionFunc ...OptionFunc) *Config { // DefaultConfig 默认配置 func DefaultConfig(key string) *Config { kfkCfg := sarama.NewConfig() + kfkCfg.Version = sarama.V2_6_0_0 // Explicitly set to match broker return &Config{ path: "kafka." + key, diff --git a/pkg/client/kafka/kafka.go b/pkg/client/kafka/kafka.go index 661a431..f3f9103 100644 --- a/pkg/client/kafka/kafka.go +++ b/pkg/client/kafka/kafka.go @@ -42,6 +42,10 @@ func (kfk Kafka) NewConsumerGroup(groupID string) (ConsumerGroup, error) { return sarama.NewConsumerGroupFromClient(groupID, kfk.client) } +func (kfk Kafka) NewClusterAdmin() (ClusterAdmin, error) { + return sarama.NewClusterAdmin(kfk.cfg.Addrs, kfk.client.Config()) +} + // Config returns the Config struct of the client. This struct should not be // altered after it has been created. func (kfk Kafka) Config() *ConfigKafka { diff --git a/pkg/client/kafka/kafka_test.go b/pkg/client/kafka/kafka_test.go index 66bf77e..a3e48ff 100644 --- a/pkg/client/kafka/kafka_test.go +++ b/pkg/client/kafka/kafka_test.go @@ -5,6 +5,7 @@ import ( "os" "os/signal" "sync/atomic" + "testing" "time" "github.com/boxgo/box/pkg/client/kafka" @@ -73,3 +74,78 @@ func Example() { fmt.Println(offset >= 0, partition == 0, atomic.LoadInt32(&cnt) > 0) // Output: true true true } + +func TestKafka_ConsumerGroupOperations(t *testing.T) { + kfk := kafka.StdConfig("default").Build() + + admin, err := kfk.NewClusterAdmin() + if err != nil { + t.Fatal(err) + } + + defer func() { + if err := admin.Close(); err != nil { + t.Fatal(err) + } + }() + + // Test ListConsumerGroups + t.Run("ListConsumerGroups", func(t *testing.T) { + groups, err := admin.ListConsumerGroups() + if err != nil { + t.Fatal(err) + } + + t.Logf("Found %d consumer groups", len(groups)) + for groupID, groupType := range groups { + t.Logf("Group: %s, Type: %s", groupID, groupType) + } + }) + + // Test DescribeConsumerGroups + t.Run("DescribeConsumerGroups", func(t *testing.T) { + // First, get the list of consumer groups + groups, err := admin.ListConsumerGroups() + if err != nil { + t.Fatal(err) + } + + if len(groups) == 0 { + t.Skip("No consumer groups found, skipping describe test") + } + + // Get the first group ID for testing + var groupIDs []string + for groupID := range groups { + groupIDs = append(groupIDs, groupID) + break // Only test with the first group + } + + descriptions, err := admin.DescribeConsumerGroups(groupIDs) + if err != nil { + t.Fatal(err) + } + + for _, desc := range descriptions { + t.Logf("Group: %s, State: %s, Protocol: %s", desc.GroupId, desc.State, desc.ProtocolType) + t.Logf("Members: %d", len(desc.Members)) + } + }) + + // Test DeleteConsumerGroup + t.Run("DeleteConsumerGroup", func(t *testing.T) { + // Create a test consumer group first + testGroupID := "wechat_switch_change_v2.cache.push-v2-6d5797558-nhrnz" + + // Note: In a real test environment, you would create a consumer group first + // For this test, we'll just test the delete operation on a non-existent group + // which should return an error + err := admin.DeleteConsumerGroup(testGroupID) + if err != nil { + // This is expected for a non-existent group + t.Logf("Delete consumer group returned error (expected): %v", err) + } else { + t.Logf("Successfully deleted consumer group: %s", testGroupID) + } + }) +}