Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"errors"
"fmt"
"math"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -150,6 +151,34 @@ func (cfg *ConsumerConfig) finalize() error {
if cfg.FetchMinBytes < 0 {
errs = append(errs, errors.New("kafka: fetch min bytes cannot be negative"))
}
if cfg.BrokerMaxReadBytes < 0 {
errs = append(errs, errors.New("kafka: broker max read bytes cannot be negative"))
}
if cfg.MaxPollPartitionBytes > 1<<30 {
cfg.Logger.Info("kafka: MaxPollPartitionBytes exceeds 1GiB, setting to 1GiB")
cfg.MaxPollPartitionBytes = 1 << 30
}
if cfg.BrokerMaxReadBytes > 1<<30 {
cfg.Logger.Info("kafka: BrokerMaxReadBytes exceeds 1GiB, setting to 1GiB")
cfg.BrokerMaxReadBytes = 1 << 30
}
if cfg.MaxPollBytes > 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also validate that BrokerMaxReadBytes is GTE than MaxPollBytes and set it to the bigger value otherwise?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't that what the code block above does?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really, what I mean right now it's possible to set BrokerMaxReadBytes=1 and MaxPollBytes=1000000 and we will accept this configuration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah good catch, added some more validation and tests.

// math.MaxInt32 is 1<<31-1.
if cfg.MaxPollBytes > 1<<30 {
cfg.Logger.Info("kafka: MaxPollBytes exceeds 1GiB, setting to 1GiB")
cfg.MaxPollBytes = 1 << 30
}
if cfg.BrokerMaxReadBytes == 0 {
cfg.Logger.Info("kafka: BrokerMaxReadBytes unset, setting to MaxPollBytes * 2 or 1GiB, whichever is smallest")
cfg.BrokerMaxReadBytes = int32(math.Min(float64(cfg.MaxPollBytes)*2, 1<<30))
}
if cfg.BrokerMaxReadBytes > 0 && cfg.BrokerMaxReadBytes < cfg.MaxPollBytes {
errs = append(errs, fmt.Errorf(
"kafka: BrokerMaxReadBytes (%d) cannot be less than MaxPollBytes (%d)",
cfg.BrokerMaxReadBytes, cfg.MaxPollBytes,
))
}
}
return errors.Join(errs...)
}

Expand Down
146 changes: 146 additions & 0 deletions kafka/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,152 @@ func TestConsumerTopicLogFieldFunc(t *testing.T) {
})
}

func TestConsumerConfigFinalizer(t *testing.T) {
proc := apmqueue.ProcessorFunc(func(context.Context, apmqueue.Record) error { return nil })
ccfg := CommonConfig{
Brokers: []string{"localhost:9092"},
Logger: zapTest(t),
}
t.Run("MaxPollBytes set to 1 << 20", func(t *testing.T) {
cfg := ConsumerConfig{
CommonConfig: ccfg,
Processor: proc,
Topics: []apmqueue.Topic{"topic"},
GroupID: "groupid",
MaxPollBytes: 1 << 20,
MaxPollPartitionBytes: 1 << 20,
}
err := cfg.finalize()
require.NoError(t, err)
assert.NotNil(t, cfg.Processor)
cfg.Processor = nil
assert.NotNil(t, cfg.Logger)
cfg.Logger = nil

assert.Equal(t, ConsumerConfig{
CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}},
Topics: []apmqueue.Topic{"topic"},
GroupID: "groupid",
MaxPollBytes: 1 << 20,
MaxPollPartitionBytes: 1 << 20,
BrokerMaxReadBytes: 1 << 21,
}, cfg)
})
t.Run("MaxPollBytes set to 1 << 28", func(t *testing.T) {
cfg := ConsumerConfig{
CommonConfig: ccfg,
Processor: proc,
Topics: []apmqueue.Topic{"topic"},
GroupID: "groupid",
MaxPollBytes: 1 << 28,
MaxPollPartitionBytes: 1 << 28,
}
err := cfg.finalize()
require.NoError(t, err)
assert.NotNil(t, cfg.Processor)
cfg.Processor = nil
assert.NotNil(t, cfg.Logger)
cfg.Logger = nil

assert.Equal(t, ConsumerConfig{
CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}},
Topics: []apmqueue.Topic{"topic"},
GroupID: "groupid",
MaxPollBytes: 1 << 28,
MaxPollPartitionBytes: 1 << 28,
BrokerMaxReadBytes: 1 << 29,
}, cfg)
})
t.Run("MaxPollBytes set to 1 << 29", func(t *testing.T) {
cfg := ConsumerConfig{
CommonConfig: ccfg,
Processor: proc,
Topics: []apmqueue.Topic{"topic"},
GroupID: "groupid",
MaxPollBytes: 1 << 29,
MaxPollPartitionBytes: 1 << 29,
}
err := cfg.finalize()
require.NoError(t, err)
assert.NotNil(t, cfg.Processor)
cfg.Processor = nil
assert.NotNil(t, cfg.Logger)
cfg.Logger = nil

assert.Equal(t, ConsumerConfig{
CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}},
Topics: []apmqueue.Topic{"topic"},
GroupID: "groupid",
MaxPollBytes: 1 << 29,
MaxPollPartitionBytes: 1 << 29,
BrokerMaxReadBytes: 1 << 30,
}, cfg)
})
t.Run("MaxPollBytes set to 1 << 30", func(t *testing.T) {
cfg := ConsumerConfig{
CommonConfig: ccfg,
Processor: proc,
Topics: []apmqueue.Topic{"topic"},
GroupID: "groupid",
MaxPollBytes: 1 << 30,
MaxPollPartitionBytes: 1 << 30,
}
err := cfg.finalize()
require.NoError(t, err)
assert.NotNil(t, cfg.Processor)
cfg.Processor = nil
assert.NotNil(t, cfg.Logger)
cfg.Logger = nil

assert.Equal(t, ConsumerConfig{
CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}},
Topics: []apmqueue.Topic{"topic"},
GroupID: "groupid",
MaxPollBytes: 1 << 30,
MaxPollPartitionBytes: 1 << 30,
BrokerMaxReadBytes: 1 << 30,
}, cfg)
})
t.Run("MaxPollBytes set to 1 << 31-1", func(t *testing.T) {
cfg := ConsumerConfig{
CommonConfig: ccfg,
Processor: proc,
Topics: []apmqueue.Topic{"topic"},
GroupID: "groupid",
MaxPollBytes: 1<<31 - 1,
MaxPollPartitionBytes: 1<<31 - 1,
}
err := cfg.finalize()
require.NoError(t, err)
assert.NotNil(t, cfg.Processor)
cfg.Processor = nil
assert.NotNil(t, cfg.Logger)
cfg.Logger = nil

assert.Equal(t, ConsumerConfig{
CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}},
Topics: []apmqueue.Topic{"topic"},
GroupID: "groupid",
MaxPollBytes: 1 << 30,
MaxPollPartitionBytes: 1 << 30,
BrokerMaxReadBytes: 1 << 30,
}, cfg)
})
t.Run("BrokerMaxReadBytes is less than MaxPollBytes", func(t *testing.T) {
cfg := ConsumerConfig{
CommonConfig: ccfg,
Processor: proc,
Topics: []apmqueue.Topic{"topic"},
GroupID: "groupid",
BrokerMaxReadBytes: 1,
MaxPollBytes: 1<<31 - 1,
MaxPollPartitionBytes: 1<<31 - 1,
}
err := cfg.finalize()
assert.EqualError(t, err, "kafka: BrokerMaxReadBytes (1) cannot be less than MaxPollBytes (1073741824)")
})
}

func newConsumer(t testing.TB, cfg ConsumerConfig) *Consumer {
if cfg.MaxPollWait <= 0 {
// Lower MaxPollWait, ShutdownGracePeriod to speed up execution.
Expand Down
Loading