Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions kafka/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,21 +276,36 @@ func (m *Manager) MonitorConsumerLag(topicConsumers []apmqueue.TopicConsumer) (m
continue
}
for partition, lag := range partitions {
// Check for errors in lag calculation, start offset, or end offset.
// The franz-go library can set lag to the end offset value when:
// - End offset is valid (lag.End.Err == nil)
// - Start offset has an error (lag.Start.Err != nil)
// - No commit exists (lag.Commit.At == -1)
// This results in incorrect lag reporting, so we must check all error fields.
var lagErr error
if lag.Err != nil {
if lag.Err == kerr.UnknownTopicOrPartition {
lagErr = lag.Err
} else if lag.Start.Err != nil {
lagErr = lag.Start.Err
} else if lag.End.Err != nil {
lagErr = lag.End.Err
}

if lagErr != nil {
if lagErr == kerr.UnknownTopicOrPartition {
logger.Debug("error getting consumer group lag",
zap.String("group", l.Group),
zap.String("topic", topic),
zap.Int32("partition", partition),
zap.Error(lag.Err),
zap.Error(lagErr),
)
continue
}
logger.Warn("error getting consumer group lag",
zap.String("group", l.Group),
zap.String("topic", topic),
zap.Int32("partition", partition),
zap.Error(lag.Err),
zap.Error(lagErr),
)
continue
}
Expand Down
186 changes: 186 additions & 0 deletions kafka/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,192 @@ func TestUnknownTopicOrPartition(t *testing.T) {
}
}

func TestStartOffsetError(t *testing.T) {
// Test case for issue: Consumer lag can report end offset as the consumer
// lag if start offset request has an error.
// This test verifies that when:
// 1. Start offset request fails (e.g., due to segment deletions)
// 2. End offset is valid
// 3. No commit exists (topic not consumed)
// The lag should NOT be reported (since we can't calculate it accurately),
// instead of reporting the end offset as lag.

reader := metric.NewManualReader()
mp := metric.NewMeterProvider(metric.WithReader(reader))
defer mp.Shutdown(context.Background())

cluster, commonConfig := newFakeCluster(t)
core, observedLogs := observer.New(zapcore.WarnLevel)

commonConfig.Logger = zap.New(core)
commonConfig.MeterProvider = mp

m, err := NewManager(ManagerConfig{CommonConfig: commonConfig})
require.NoError(t, err)
t.Cleanup(func() { m.Close() })

registration, err := m.MonitorConsumerLag([]apmqueue.TopicConsumer{
{
Topic: "topic",
Consumer: "consumer",
},
})
require.NoError(t, err)
t.Cleanup(func() { registration.Unregister() })

// Setup describe groups response with a member assigned
cluster.ControlKey(kmsg.DescribeGroups.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) {
return &kmsg.DescribeGroupsResponse{
Version: req.GetVersion(),
Groups: []kmsg.DescribeGroupsResponseGroup{
{
Group: "consumer",
ProtocolType: "consumer",
State: "Stable",
Members: []kmsg.DescribeGroupsResponseGroupMember{{
MemberID: "member-1",
ClientID: "client-1",
MemberAssignment: (&kmsg.ConsumerMemberAssignment{
Version: 2,
Topics: []kmsg.ConsumerMemberAssignmentTopic{{
Topic: "name_space-topic",
Partitions: []int32{0},
}},
}).AppendTo(nil),
}},
},
},
}, nil, true
})

// Setup offset fetch response - no commits (topic not consumed)
cluster.ControlKey(kmsg.OffsetFetch.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) {
return &kmsg.OffsetFetchResponse{
Version: req.GetVersion(),
Groups: []kmsg.OffsetFetchResponseGroup{{
Group: "consumer",
Topics: []kmsg.OffsetFetchResponseGroupTopic{{
Topic: "name_space-topic",
Partitions: []kmsg.OffsetFetchResponseGroupTopicPartition{{
Partition: 0,
Offset: -1, // -1 means no commit
}},
}},
}},
}, nil, true
})

// Setup metadata response
cluster.ControlKey(kmsg.Metadata.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) {
if len(req.(*kmsg.MetadataRequest).Topics) == 0 {
return nil, nil, false
}
cluster.KeepControl()
return &kmsg.MetadataResponse{
Version: req.GetVersion(),
Topics: []kmsg.MetadataResponseTopic{{
Topic: kmsg.StringPtr("name_space-topic"),
Partitions: []kmsg.MetadataResponseTopicPartition{{Partition: 0}},
}},
}, nil, true
})

// Setup list offsets response
// First call for start offsets (timestamp = -2) - returns error
// Second call for end offsets (timestamp = -1) - returns valid offset
callCount := 0
cluster.ControlKey(kmsg.ListOffsets.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) {
cluster.KeepControl()
callCount++
listReq := req.(*kmsg.ListOffsetsRequest)

// Check if this is start offset request (timestamp = -2) or end offset request (timestamp = -1)
isStartOffsetRequest := false
if len(listReq.Topics) > 0 && len(listReq.Topics[0].Partitions) > 0 {
isStartOffsetRequest = listReq.Topics[0].Partitions[0].Timestamp == -2
}

if isStartOffsetRequest {
// Start offset request fails (e.g., due to segment deletion)
return &kmsg.ListOffsetsResponse{
Version: listReq.Version,
Topics: []kmsg.ListOffsetsResponseTopic{{
Topic: "name_space-topic",
Partitions: []kmsg.ListOffsetsResponseTopicPartition{{
Partition: 0,
Offset: 0,
ErrorCode: kerr.OffsetOutOfRange.Code, // Error due to segment deletion
}},
}},
}, nil, true
}

// End offset request succeeds with a large offset
return &kmsg.ListOffsetsResponse{
Version: listReq.Version,
Topics: []kmsg.ListOffsetsResponseTopic{{
Topic: "name_space-topic",
Partitions: []kmsg.ListOffsetsResponseTopicPartition{{
Partition: 0,
Offset: 10000, // Large offset
}},
}},
}, nil, true
})

rm := metricdata.ResourceMetrics{}
err = reader.Collect(context.Background(), &rm)
require.NoError(t, err)

// Check that no lag metric was reported (or if reported, it should not be the end offset)
var lagMetrics []metricdata.Metrics
for _, sm := range rm.ScopeMetrics {
if sm.Scope.Name == "github.com/elastic/apm-queue/kafka" {
for _, m := range sm.Metrics {
if m.Name == "consumer_group_lag" {
lagMetrics = append(lagMetrics, m)
}
}
}
}

// Verify that lag metrics are handled correctly
foundIncorrectLag := false
metricsCount := 0
for _, lagMetric := range lagMetrics {
if gauge, ok := lagMetric.Data.(metricdata.Gauge[int64]); ok {
for _, dp := range gauge.DataPoints {
metricsCount++
// The lag should NOT be 10000 (the end offset)
// With the bug, it would be reported as 10000
// After the fix, it should not be reported at all
if dp.Value == 10000 {
foundIncorrectLag = true
}
}
}
}

// We should see warning logs about offset errors after the fix
matchingLogs := observedLogs.FilterFieldKey("group")
actual := matchingLogs.AllUntimed()

// After the fix is applied, we expect:
// 1. Warning log about the start offset error
// 2. No lag metric reported (since we can't calculate it accurately)
assert.False(t, foundIncorrectLag,
"Lag should not be reported as end offset (10000) when start offset has error")
assert.Equal(t, 0, metricsCount,
"No lag metrics should be reported when start offset has error")
assert.Len(t, actual, 1,
"Should have exactly one warning log about the offset error")

if len(actual) > 0 {
assert.Contains(t, actual[0].Message, "error getting consumer group lag",
"Log message should indicate lag calculation error")
}
}

func newFakeCluster(t testing.TB) (*kfake.Cluster, CommonConfig) {
cluster, err := kfake.NewCluster(
// Just one broker to simplify dealing with sharded requests.
Expand Down