From 77dfcbad8253bcef4ed9299ab600b4f9112c42fc Mon Sep 17 00:00:00 2001 From: Marc Lopez Rubio Date: Wed, 5 Nov 2025 09:10:16 +0800 Subject: [PATCH] lcc: Sync when first fetch returns no records If the first fetch returns no records, the consumer is considered synced immediately. This may only happen when the topic has no records. Signed-off-by: Marc Lopez Rubio --- kafka/log_compacted_consumer.go | 50 +++++++++++++++++++--------- kafka/log_compacted_consumer_test.go | 42 ++++++++++++++++++++++- 2 files changed, 76 insertions(+), 16 deletions(-) diff --git a/kafka/log_compacted_consumer.go b/kafka/log_compacted_consumer.go index 4c1fa20c..9370e454 100644 --- a/kafka/log_compacted_consumer.go +++ b/kafka/log_compacted_consumer.go @@ -75,12 +75,13 @@ func (cfg *LogCompactedConfig) finalize() error { // record for each key, allowing for efficient storage and retrieval of the most // recent state of each key. type LogCompactedConsumer struct { - client *kgo.Client - logger *zap.Logger - process func(context.Context, *kgo.FetchesRecordIter) error - topic string - ctx context.Context - cancel context.CancelFunc + client *kgo.Client + logger *zap.Logger + process func(context.Context, *kgo.FetchesRecordIter) error + topic string + ctx context.Context + cancel context.CancelFunc + fetchMaxWait time.Duration mu sync.RWMutex started chan struct{} @@ -117,13 +118,14 @@ func NewLogCompactedConsumer(cfg LogCompactedConfig, } lcc := LogCompactedConsumer{ - topic: cfg.Topic, - process: cfg.Processor, - logger: cfg.Logger, - client: client, - started: make(chan struct{}), - stopped: make(chan struct{}), - synced: make(chan struct{}), + topic: cfg.Topic, + fetchMaxWait: cfg.FetchMaxWait, + process: cfg.Processor, + logger: cfg.Logger, + client: client, + started: make(chan struct{}), + stopped: make(chan struct{}), + synced: make(chan struct{}), } lcc.ctx, lcc.cancel = context.WithCancel(context.Background()) return &lcc, nil @@ -149,7 +151,14 @@ func (lcc *LogCompactedConsumer) Run(ctx context.Context) error { go func() { defer close(lcc.stopped) close(lcc.started) - for { + // It's possible that the topic contains no records, in which case the + // consumer should be considered synced immediately. Since fetchMaxWait + // only works if the topic has records, we need to pass a context with + // a timeout to the first fetch. + ctx, cancel := context.WithTimeout(lcc.ctx, lcc.fetchMaxWait) + defer cancel() + lcc.consume(ctx) + for { // Business as usual: continue to fetch records until closed. select { case <-lcc.ctx.Done(): return // Exit the goroutine if the context is done. @@ -167,7 +176,7 @@ func (lcc *LogCompactedConsumer) Run(ctx context.Context) error { } func (lcc *LogCompactedConsumer) consume(ctx context.Context) { - fetches := lcc.client.PollRecords(lcc.ctx, -1) // This means all buffered. + fetches := lcc.client.PollRecords(ctx, -1) // This means all buffered. if fetches.IsClientClosed() { lcc.logger.Info("kafka client closed, stopping fetch") return @@ -185,6 +194,15 @@ func (lcc *LogCompactedConsumer) consume(ctx context.Context) { ) }) if fetches.Empty() { + // No records were returned across all partitions; consider the consumer + // synced because there is nothing to process to reach the current HWM. + select { + case <-lcc.synced: + // already synced + default: + lcc.syncDelta.Store(0) + close(lcc.synced) + } return } lcc.mu.RLock() @@ -260,6 +278,8 @@ func (lcc *LogCompactedConsumer) Close() error { // Healthy checks if the consumer is healthy by ensuring that it has had a full // sync and that the underlying Kafka client can ping a broker in the cluster. +// If the first fetch returns no records, the consumer is considered synced +// immediately. This may only happen when the topic has no records. // // This function can be used as a readiness probe. func (lcc *LogCompactedConsumer) Healthy(ctx context.Context) error { diff --git a/kafka/log_compacted_consumer_test.go b/kafka/log_compacted_consumer_test.go index 182f6ccd..83bce1c7 100644 --- a/kafka/log_compacted_consumer_test.go +++ b/kafka/log_compacted_consumer_test.go @@ -268,7 +268,7 @@ func TestLogCompactedConsumerSyncBehavior(t *testing.T) { ClientID: "sync-test-consumer", }, Topic: topicName, - FetchMaxWait: 10 * time.Millisecond, + FetchMaxWait: time.Second, Processor: func(_ context.Context, iter *kgo.FetchesRecordIter) error { for !iter.Done() { _ = iter.Next() // We just count records, don't need content @@ -414,3 +414,43 @@ func TestLogCompactedConsumerStartStop(t *testing.T) { assert.Error(t, consumer.Run(ctx), "Run should error after Close") } + +func TestLogCompactedConsumerHealthyOnEmptyFirstFetch(t *testing.T) { + topicName := "empty-first-fetch-topic" + cluster := newLogCompactedFakeCluster(t, topicName, 1) + var processedRecords int32 + cfg := LogCompactedConfig{ + CommonConfig: CommonConfig{ + Brokers: cluster.ListenAddrs(), + Logger: zapTest(t), + ClientID: "empty-first-fetch-consumer", + }, + Topic: topicName, + FetchMaxWait: 100 * time.Millisecond, + MinFetchSize: 0, + Processor: func(_ context.Context, iter *kgo.FetchesRecordIter) error { + for !iter.Done() { + _ = iter.Next() + atomic.AddInt32(&processedRecords, 1) + } + return nil + }, + } + + consumer, err := NewLogCompactedConsumer(cfg) + require.NoError(t, err) + t.Cleanup(func() { consumer.Close() }) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + require.NoError(t, consumer.Run(ctx)) + + // With an empty topic, the first fetch should return no records and the + // consumer should be considered synced (healthy) without producing anything. + require.Eventually(t, func() bool { + return consumer.Healthy(ctx) == nil + }, time.Second, 50*time.Millisecond) + + // Sanity check: no records were processed. + assert.Equal(t, int32(0), atomic.LoadInt32(&processedRecords)) +}