diff --git a/kafka/log_compacted_consumer.go b/kafka/log_compacted_consumer.go index 4c1fa20c..9370e454 100644 --- a/kafka/log_compacted_consumer.go +++ b/kafka/log_compacted_consumer.go @@ -75,12 +75,13 @@ func (cfg *LogCompactedConfig) finalize() error { // record for each key, allowing for efficient storage and retrieval of the most // recent state of each key. type LogCompactedConsumer struct { - client *kgo.Client - logger *zap.Logger - process func(context.Context, *kgo.FetchesRecordIter) error - topic string - ctx context.Context - cancel context.CancelFunc + client *kgo.Client + logger *zap.Logger + process func(context.Context, *kgo.FetchesRecordIter) error + topic string + ctx context.Context + cancel context.CancelFunc + fetchMaxWait time.Duration mu sync.RWMutex started chan struct{} @@ -117,13 +118,14 @@ func NewLogCompactedConsumer(cfg LogCompactedConfig, } lcc := LogCompactedConsumer{ - topic: cfg.Topic, - process: cfg.Processor, - logger: cfg.Logger, - client: client, - started: make(chan struct{}), - stopped: make(chan struct{}), - synced: make(chan struct{}), + topic: cfg.Topic, + fetchMaxWait: cfg.FetchMaxWait, + process: cfg.Processor, + logger: cfg.Logger, + client: client, + started: make(chan struct{}), + stopped: make(chan struct{}), + synced: make(chan struct{}), } lcc.ctx, lcc.cancel = context.WithCancel(context.Background()) return &lcc, nil @@ -149,7 +151,14 @@ func (lcc *LogCompactedConsumer) Run(ctx context.Context) error { go func() { defer close(lcc.stopped) close(lcc.started) - for { + // It's possible that the topic contains no records, in which case the + // consumer should be considered synced immediately. Since fetchMaxWait + // only works if the topic has records, we need to pass a context with + // a timeout to the first fetch. + ctx, cancel := context.WithTimeout(lcc.ctx, lcc.fetchMaxWait) + defer cancel() + lcc.consume(ctx) + for { // Business as usual: continue to fetch records until closed. select { case <-lcc.ctx.Done(): return // Exit the goroutine if the context is done. @@ -167,7 +176,7 @@ func (lcc *LogCompactedConsumer) Run(ctx context.Context) error { } func (lcc *LogCompactedConsumer) consume(ctx context.Context) { - fetches := lcc.client.PollRecords(lcc.ctx, -1) // This means all buffered. + fetches := lcc.client.PollRecords(ctx, -1) // This means all buffered. if fetches.IsClientClosed() { lcc.logger.Info("kafka client closed, stopping fetch") return @@ -185,6 +194,15 @@ func (lcc *LogCompactedConsumer) consume(ctx context.Context) { ) }) if fetches.Empty() { + // No records were returned across all partitions; consider the consumer + // synced because there is nothing to process to reach the current HWM. + select { + case <-lcc.synced: + // already synced + default: + lcc.syncDelta.Store(0) + close(lcc.synced) + } return } lcc.mu.RLock() @@ -260,6 +278,8 @@ func (lcc *LogCompactedConsumer) Close() error { // Healthy checks if the consumer is healthy by ensuring that it has had a full // sync and that the underlying Kafka client can ping a broker in the cluster. +// If the first fetch returns no records, the consumer is considered synced +// immediately. This may only happen when the topic has no records. // // This function can be used as a readiness probe. func (lcc *LogCompactedConsumer) Healthy(ctx context.Context) error { diff --git a/kafka/log_compacted_consumer_test.go b/kafka/log_compacted_consumer_test.go index 182f6ccd..83bce1c7 100644 --- a/kafka/log_compacted_consumer_test.go +++ b/kafka/log_compacted_consumer_test.go @@ -268,7 +268,7 @@ func TestLogCompactedConsumerSyncBehavior(t *testing.T) { ClientID: "sync-test-consumer", }, Topic: topicName, - FetchMaxWait: 10 * time.Millisecond, + FetchMaxWait: time.Second, Processor: func(_ context.Context, iter *kgo.FetchesRecordIter) error { for !iter.Done() { _ = iter.Next() // We just count records, don't need content @@ -414,3 +414,43 @@ func TestLogCompactedConsumerStartStop(t *testing.T) { assert.Error(t, consumer.Run(ctx), "Run should error after Close") } + +func TestLogCompactedConsumerHealthyOnEmptyFirstFetch(t *testing.T) { + topicName := "empty-first-fetch-topic" + cluster := newLogCompactedFakeCluster(t, topicName, 1) + var processedRecords int32 + cfg := LogCompactedConfig{ + CommonConfig: CommonConfig{ + Brokers: cluster.ListenAddrs(), + Logger: zapTest(t), + ClientID: "empty-first-fetch-consumer", + }, + Topic: topicName, + FetchMaxWait: 100 * time.Millisecond, + MinFetchSize: 0, + Processor: func(_ context.Context, iter *kgo.FetchesRecordIter) error { + for !iter.Done() { + _ = iter.Next() + atomic.AddInt32(&processedRecords, 1) + } + return nil + }, + } + + consumer, err := NewLogCompactedConsumer(cfg) + require.NoError(t, err) + t.Cleanup(func() { consumer.Close() }) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + require.NoError(t, consumer.Run(ctx)) + + // With an empty topic, the first fetch should return no records and the + // consumer should be considered synced (healthy) without producing anything. + require.Eventually(t, func() bool { + return consumer.Healthy(ctx) == nil + }, time.Second, 50*time.Millisecond) + + // Sanity check: no records were processed. + assert.Equal(t, int32(0), atomic.LoadInt32(&processedRecords)) +}