Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 35 additions & 15 deletions kafka/log_compacted_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,13 @@ func (cfg *LogCompactedConfig) finalize() error {
// record for each key, allowing for efficient storage and retrieval of the most
// recent state of each key.
type LogCompactedConsumer struct {
client *kgo.Client
logger *zap.Logger
process func(context.Context, *kgo.FetchesRecordIter) error
topic string
ctx context.Context
cancel context.CancelFunc
client *kgo.Client
logger *zap.Logger
process func(context.Context, *kgo.FetchesRecordIter) error
topic string
ctx context.Context
cancel context.CancelFunc
fetchMaxWait time.Duration

mu sync.RWMutex
started chan struct{}
Expand Down Expand Up @@ -117,13 +118,14 @@ func NewLogCompactedConsumer(cfg LogCompactedConfig,
}

lcc := LogCompactedConsumer{
topic: cfg.Topic,
process: cfg.Processor,
logger: cfg.Logger,
client: client,
started: make(chan struct{}),
stopped: make(chan struct{}),
synced: make(chan struct{}),
topic: cfg.Topic,
fetchMaxWait: cfg.FetchMaxWait,
process: cfg.Processor,
logger: cfg.Logger,
client: client,
started: make(chan struct{}),
stopped: make(chan struct{}),
synced: make(chan struct{}),
}
lcc.ctx, lcc.cancel = context.WithCancel(context.Background())
return &lcc, nil
Expand All @@ -149,7 +151,14 @@ func (lcc *LogCompactedConsumer) Run(ctx context.Context) error {
go func() {
defer close(lcc.stopped)
close(lcc.started)
for {
// It's possible that the topic contains no records, in which case the
// consumer should be considered synced immediately. Since fetchMaxWait
// only works if the topic has records, we need to pass a context with
// a timeout to the first fetch.
ctx, cancel := context.WithTimeout(lcc.ctx, lcc.fetchMaxWait)
defer cancel()
lcc.consume(ctx)
for { // Business as usual: continue to fetch records until closed.
select {
case <-lcc.ctx.Done():
return // Exit the goroutine if the context is done.
Expand All @@ -167,7 +176,7 @@ func (lcc *LogCompactedConsumer) Run(ctx context.Context) error {
}

func (lcc *LogCompactedConsumer) consume(ctx context.Context) {
fetches := lcc.client.PollRecords(lcc.ctx, -1) // This means all buffered.
fetches := lcc.client.PollRecords(ctx, -1) // This means all buffered.
if fetches.IsClientClosed() {
lcc.logger.Info("kafka client closed, stopping fetch")
return
Expand All @@ -185,6 +194,15 @@ func (lcc *LogCompactedConsumer) consume(ctx context.Context) {
)
})
if fetches.Empty() {
// No records were returned across all partitions; consider the consumer
// synced because there is nothing to process to reach the current HWM.
select {
case <-lcc.synced:
// already synced
default:
lcc.syncDelta.Store(0)
close(lcc.synced)
}
return
}
lcc.mu.RLock()
Expand Down Expand Up @@ -260,6 +278,8 @@ func (lcc *LogCompactedConsumer) Close() error {

// Healthy checks if the consumer is healthy by ensuring that it has had a full
// sync and that the underlying Kafka client can ping a broker in the cluster.
// If the first fetch returns no records, the consumer is considered synced
// immediately. This may only happen when the topic has no records.
//
// This function can be used as a readiness probe.
func (lcc *LogCompactedConsumer) Healthy(ctx context.Context) error {
Expand Down
42 changes: 41 additions & 1 deletion kafka/log_compacted_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func TestLogCompactedConsumerSyncBehavior(t *testing.T) {
ClientID: "sync-test-consumer",
},
Topic: topicName,
FetchMaxWait: 10 * time.Millisecond,
FetchMaxWait: time.Second,
Processor: func(_ context.Context, iter *kgo.FetchesRecordIter) error {
for !iter.Done() {
_ = iter.Next() // We just count records, don't need content
Expand Down Expand Up @@ -414,3 +414,43 @@ func TestLogCompactedConsumerStartStop(t *testing.T) {

assert.Error(t, consumer.Run(ctx), "Run should error after Close")
}

func TestLogCompactedConsumerHealthyOnEmptyFirstFetch(t *testing.T) {
topicName := "empty-first-fetch-topic"
cluster := newLogCompactedFakeCluster(t, topicName, 1)
var processedRecords int32
cfg := LogCompactedConfig{
CommonConfig: CommonConfig{
Brokers: cluster.ListenAddrs(),
Logger: zapTest(t),
ClientID: "empty-first-fetch-consumer",
},
Topic: topicName,
FetchMaxWait: 100 * time.Millisecond,
MinFetchSize: 0,
Processor: func(_ context.Context, iter *kgo.FetchesRecordIter) error {
for !iter.Done() {
_ = iter.Next()
atomic.AddInt32(&processedRecords, 1)
}
return nil
},
}

consumer, err := NewLogCompactedConsumer(cfg)
require.NoError(t, err)
t.Cleanup(func() { consumer.Close() })

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
require.NoError(t, consumer.Run(ctx))

// With an empty topic, the first fetch should return no records and the
// consumer should be considered synced (healthy) without producing anything.
require.Eventually(t, func() bool {
return consumer.Healthy(ctx) == nil
}, time.Second, 50*time.Millisecond)

// Sanity check: no records were processed.
assert.Equal(t, int32(0), atomic.LoadInt32(&processedRecords))
}