From 7c29e848f7278fc00f166502ccbe10d8fda152e9 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 4 Nov 2025 15:05:20 +0000 Subject: [PATCH 1/3] Initial plan From 18f93d1f1edb57fd09cc0d07c7b7c87b57299aed Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 4 Nov 2025 15:23:49 +0000 Subject: [PATCH 2/3] Fix consumer lag reporting to handle start and end offset errors - Check lag.Start.Err and lag.End.Err in addition to lag.Err - Prevent incorrect lag reporting when start offset has error but end offset is valid - Add test case to verify the fix for the scenario where start offset fails due to segment deletion - Log warnings for all offset-related errors to aid in monitoring and debugging Co-authored-by: lahsivjar <1427601+lahsivjar@users.noreply.github.com> --- kafka/manager.go | 21 ++++- kafka/manager_test.go | 186 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 204 insertions(+), 3 deletions(-) diff --git a/kafka/manager.go b/kafka/manager.go index 9da79cdf..cc5900c7 100644 --- a/kafka/manager.go +++ b/kafka/manager.go @@ -276,13 +276,28 @@ func (m *Manager) MonitorConsumerLag(topicConsumers []apmqueue.TopicConsumer) (m continue } for partition, lag := range partitions { + // Check for errors in lag calculation, start offset, or end offset. + // The franz-go library can set lag to the end offset value when: + // - End offset is valid (lag.End.Err == nil) + // - Start offset has an error (lag.Start.Err != nil) + // - No commit exists (lag.Commit.At == -1) + // This results in incorrect lag reporting, so we must check all error fields. + var lagErr error if lag.Err != nil { - if lag.Err == kerr.UnknownTopicOrPartition { + lagErr = lag.Err + } else if lag.Start.Err != nil { + lagErr = lag.Start.Err + } else if lag.End.Err != nil { + lagErr = lag.End.Err + } + + if lagErr != nil { + if lagErr == kerr.UnknownTopicOrPartition { logger.Debug("error getting consumer group lag", zap.String("group", l.Group), zap.String("topic", topic), zap.Int32("partition", partition), - zap.Error(lag.Err), + zap.Error(lagErr), ) continue } @@ -290,7 +305,7 @@ func (m *Manager) MonitorConsumerLag(topicConsumers []apmqueue.TopicConsumer) (m zap.String("group", l.Group), zap.String("topic", topic), zap.Int32("partition", partition), - zap.Error(lag.Err), + zap.Error(lagErr), ) continue } diff --git a/kafka/manager_test.go b/kafka/manager_test.go index c341c86f..e8d0e1ca 100644 --- a/kafka/manager_test.go +++ b/kafka/manager_test.go @@ -899,6 +899,192 @@ func TestUnknownTopicOrPartition(t *testing.T) { } } +func TestStartOffsetError(t *testing.T) { + // Test case for issue: Consumer lag can report end offset as the consumer + // lag if start offset request has an error. + // This test verifies that when: + // 1. Start offset request fails (e.g., due to segment deletions) + // 2. End offset is valid + // 3. No commit exists (topic not consumed) + // The lag should NOT be reported (since we can't calculate it accurately), + // instead of reporting the end offset as lag. + + reader := metric.NewManualReader() + mp := metric.NewMeterProvider(metric.WithReader(reader)) + defer mp.Shutdown(context.Background()) + + cluster, commonConfig := newFakeCluster(t) + core, observedLogs := observer.New(zapcore.WarnLevel) + + commonConfig.Logger = zap.New(core) + commonConfig.MeterProvider = mp + + m, err := NewManager(ManagerConfig{CommonConfig: commonConfig}) + require.NoError(t, err) + t.Cleanup(func() { m.Close() }) + + registration, err := m.MonitorConsumerLag([]apmqueue.TopicConsumer{ + { + Topic: "topic", + Consumer: "consumer", + }, + }) + require.NoError(t, err) + t.Cleanup(func() { registration.Unregister() }) + + // Setup describe groups response with a member assigned + cluster.ControlKey(kmsg.DescribeGroups.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) { + return &kmsg.DescribeGroupsResponse{ + Version: req.GetVersion(), + Groups: []kmsg.DescribeGroupsResponseGroup{ + { + Group: "consumer", + ProtocolType: "consumer", + State: "Stable", + Members: []kmsg.DescribeGroupsResponseGroupMember{{ + MemberID: "member-1", + ClientID: "client-1", + MemberAssignment: (&kmsg.ConsumerMemberAssignment{ + Version: 2, + Topics: []kmsg.ConsumerMemberAssignmentTopic{{ + Topic: "name_space-topic", + Partitions: []int32{0}, + }}, + }).AppendTo(nil), + }}, + }, + }, + }, nil, true + }) + + // Setup offset fetch response - no commits (topic not consumed) + cluster.ControlKey(kmsg.OffsetFetch.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) { + return &kmsg.OffsetFetchResponse{ + Version: req.GetVersion(), + Groups: []kmsg.OffsetFetchResponseGroup{{ + Group: "consumer", + Topics: []kmsg.OffsetFetchResponseGroupTopic{{ + Topic: "name_space-topic", + Partitions: []kmsg.OffsetFetchResponseGroupTopicPartition{{ + Partition: 0, + Offset: -1, // -1 means no commit + }}, + }}, + }}, + }, nil, true + }) + + // Setup metadata response + cluster.ControlKey(kmsg.Metadata.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) { + if len(req.(*kmsg.MetadataRequest).Topics) == 0 { + return nil, nil, false + } + cluster.KeepControl() + return &kmsg.MetadataResponse{ + Version: req.GetVersion(), + Topics: []kmsg.MetadataResponseTopic{{ + Topic: kmsg.StringPtr("name_space-topic"), + Partitions: []kmsg.MetadataResponseTopicPartition{{Partition: 0}}, + }}, + }, nil, true + }) + + // Setup list offsets response + // First call for start offsets (timestamp = -2) - returns error + // Second call for end offsets (timestamp = -1) - returns valid offset + callCount := 0 + cluster.ControlKey(kmsg.ListOffsets.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) { + cluster.KeepControl() + callCount++ + listReq := req.(*kmsg.ListOffsetsRequest) + + // Check if this is start offset request (timestamp = -2) or end offset request (timestamp = -1) + isStartOffsetRequest := false + if len(listReq.Topics) > 0 && len(listReq.Topics[0].Partitions) > 0 { + isStartOffsetRequest = listReq.Topics[0].Partitions[0].Timestamp == -2 + } + + if isStartOffsetRequest { + // Start offset request fails (e.g., due to segment deletion) + return &kmsg.ListOffsetsResponse{ + Version: listReq.Version, + Topics: []kmsg.ListOffsetsResponseTopic{{ + Topic: "name_space-topic", + Partitions: []kmsg.ListOffsetsResponseTopicPartition{{ + Partition: 0, + Offset: 0, + ErrorCode: kerr.OffsetOutOfRange.Code, // Error due to segment deletion + }}, + }}, + }, nil, true + } + + // End offset request succeeds with a large offset + return &kmsg.ListOffsetsResponse{ + Version: listReq.Version, + Topics: []kmsg.ListOffsetsResponseTopic{{ + Topic: "name_space-topic", + Partitions: []kmsg.ListOffsetsResponseTopicPartition{{ + Partition: 0, + Offset: 10000, // Large offset + }}, + }}, + }, nil, true + }) + + rm := metricdata.ResourceMetrics{} + err = reader.Collect(context.Background(), &rm) + require.NoError(t, err) + + // Check that no lag metric was reported (or if reported, it should not be the end offset) + var lagMetrics []metricdata.Metrics + for _, sm := range rm.ScopeMetrics { + if sm.Scope.Name == "github.com/elastic/apm-queue/kafka" { + for _, m := range sm.Metrics { + if m.Name == "consumer_group_lag" { + lagMetrics = append(lagMetrics, m) + } + } + } + } + + // Verify that lag metrics are handled correctly + foundIncorrectLag := false + metricsCount := 0 + for _, lagMetric := range lagMetrics { + if gauge, ok := lagMetric.Data.(metricdata.Gauge[int64]); ok { + for _, dp := range gauge.DataPoints { + metricsCount++ + // The lag should NOT be 10000 (the end offset) + // With the bug, it would be reported as 10000 + // After the fix, it should not be reported at all + if dp.Value == 10000 { + foundIncorrectLag = true + } + } + } + } + + // We should see warning logs about offset errors after the fix + matchingLogs := observedLogs.FilterFieldKey("group") + actual := matchingLogs.AllUntimed() + + // After the fix is applied, we expect: + // 1. Warning log about the start offset error + // 2. No lag metric reported (since we can't calculate it accurately) + assert.False(t, foundIncorrectLag, + "Lag should not be reported as end offset (10000) when start offset has error") + assert.Equal(t, 0, metricsCount, + "No lag metrics should be reported when start offset has error") + assert.Len(t, actual, 1, + "Should have exactly one warning log about the offset error") + + if len(actual) > 0 { + assert.Contains(t, actual[0].Message, "error getting consumer group lag", + "Log message should indicate lag calculation error") + } +} + func newFakeCluster(t testing.TB) (*kfake.Cluster, CommonConfig) { cluster, err := kfake.NewCluster( // Just one broker to simplify dealing with sharded requests. From 12298ea9b305582f0adf3280285525dace6cbc22 Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Mon, 10 Nov 2025 21:09:32 +0000 Subject: [PATCH 3/3] make fmt --- kafka/manager.go | 4 ++-- kafka/manager_test.go | 16 ++++++++-------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/kafka/manager.go b/kafka/manager.go index cc5900c7..6bdce133 100644 --- a/kafka/manager.go +++ b/kafka/manager.go @@ -279,7 +279,7 @@ func (m *Manager) MonitorConsumerLag(topicConsumers []apmqueue.TopicConsumer) (m // Check for errors in lag calculation, start offset, or end offset. // The franz-go library can set lag to the end offset value when: // - End offset is valid (lag.End.Err == nil) - // - Start offset has an error (lag.Start.Err != nil) + // - Start offset has an error (lag.Start.Err != nil) // - No commit exists (lag.Commit.At == -1) // This results in incorrect lag reporting, so we must check all error fields. var lagErr error @@ -290,7 +290,7 @@ func (m *Manager) MonitorConsumerLag(topicConsumers []apmqueue.TopicConsumer) (m } else if lag.End.Err != nil { lagErr = lag.End.Err } - + if lagErr != nil { if lagErr == kerr.UnknownTopicOrPartition { logger.Debug("error getting consumer group lag", diff --git a/kafka/manager_test.go b/kafka/manager_test.go index e8d0e1ca..67ee05b4 100644 --- a/kafka/manager_test.go +++ b/kafka/manager_test.go @@ -908,7 +908,7 @@ func TestStartOffsetError(t *testing.T) { // 3. No commit exists (topic not consumed) // The lag should NOT be reported (since we can't calculate it accurately), // instead of reporting the end offset as lag. - + reader := metric.NewManualReader() mp := metric.NewMeterProvider(metric.WithReader(reader)) defer mp.Shutdown(context.Background()) @@ -997,13 +997,13 @@ func TestStartOffsetError(t *testing.T) { cluster.KeepControl() callCount++ listReq := req.(*kmsg.ListOffsetsRequest) - + // Check if this is start offset request (timestamp = -2) or end offset request (timestamp = -1) isStartOffsetRequest := false if len(listReq.Topics) > 0 && len(listReq.Topics[0].Partitions) > 0 { isStartOffsetRequest = listReq.Topics[0].Partitions[0].Timestamp == -2 } - + if isStartOffsetRequest { // Start offset request fails (e.g., due to segment deletion) return &kmsg.ListOffsetsResponse{ @@ -1018,7 +1018,7 @@ func TestStartOffsetError(t *testing.T) { }}, }, nil, true } - + // End offset request succeeds with a large offset return &kmsg.ListOffsetsResponse{ Version: listReq.Version, @@ -1047,7 +1047,7 @@ func TestStartOffsetError(t *testing.T) { } } } - + // Verify that lag metrics are handled correctly foundIncorrectLag := false metricsCount := 0 @@ -1064,11 +1064,11 @@ func TestStartOffsetError(t *testing.T) { } } } - + // We should see warning logs about offset errors after the fix matchingLogs := observedLogs.FilterFieldKey("group") actual := matchingLogs.AllUntimed() - + // After the fix is applied, we expect: // 1. Warning log about the start offset error // 2. No lag metric reported (since we can't calculate it accurately) @@ -1078,7 +1078,7 @@ func TestStartOffsetError(t *testing.T) { "No lag metrics should be reported when start offset has error") assert.Len(t, actual, 1, "Should have exactly one warning log about the offset error") - + if len(actual) > 0 { assert.Contains(t, actual[0].Message, "error getting consumer group lag", "Log message should indicate lag calculation error")