From 7acaf46a80f27923e9bf11d3da458a21256290c6 Mon Sep 17 00:00:00 2001 From: Marc Lopez Rubio Date: Thu, 5 Dec 2024 16:07:50 +0800 Subject: [PATCH 1/5] kafka consumer: Limit `MaxBrokerReadBytes` Limits the `MaxBrokerReadBytes` and `MaxFetchBytes` to the maximum value that is accepted by `franz-go` library. Signed-off-by: Marc Lopez Rubio --- kafka/consumer.go | 12 ++++ kafka/consumer_test.go | 123 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 135 insertions(+) diff --git a/kafka/consumer.go b/kafka/consumer.go index ad31b227..b207030d 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -150,6 +150,18 @@ func (cfg *ConsumerConfig) finalize() error { if cfg.FetchMinBytes < 0 { errs = append(errs, errors.New("kafka: fetch min bytes cannot be negative")) } + if cfg.MaxPollBytes > 0 { + // math.MaxInt32 is 1<<31-1. + if cfg.MaxPollBytes > 1<<30 { + cfg.MaxPollBytes = 1 << 30 + } + if cfg.BrokerMaxReadBytes == 0 { + cfg.BrokerMaxReadBytes = cfg.MaxPollBytes * 2 + } + } + if cfg.BrokerMaxReadBytes < 0 || cfg.BrokerMaxReadBytes > 1<<30 { + cfg.BrokerMaxReadBytes = 1 << 30 + } return errors.Join(errs...) } diff --git a/kafka/consumer_test.go b/kafka/consumer_test.go index a3ed2f1b..8b24ed7b 100644 --- a/kafka/consumer_test.go +++ b/kafka/consumer_test.go @@ -763,6 +763,129 @@ func TestConsumerTopicLogFieldFunc(t *testing.T) { }) } +func TestConsumerConfigFinalizer(t *testing.T) { + proc := apmqueue.ProcessorFunc(func(context.Context, apmqueue.Record) error { return nil }) + ccfg := CommonConfig{ + Brokers: []string{"localhost:9092"}, + Logger: zapTest(t), + } + t.Run("MaxPollBytes set to 1 << 20", func(t *testing.T) { + cfg := ConsumerConfig{ + CommonConfig: ccfg, + Processor: proc, + Topics: []apmqueue.Topic{"topic"}, + GroupID: "groupid", + MaxPollBytes: 1 << 20, + } + err := cfg.finalize() + require.NoError(t, err) + assert.NotNil(t, cfg.Processor) + cfg.Processor = nil + assert.NotNil(t, cfg.Logger) + cfg.Logger = nil + + assert.Equal(t, ConsumerConfig{ + CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}}, + Topics: []apmqueue.Topic{"topic"}, + GroupID: "groupid", + MaxPollBytes: 1 << 20, + BrokerMaxReadBytes: 1 << 21, + }, cfg) + }) + t.Run("MaxPollBytes set to 1 << 28", func(t *testing.T) { + cfg := ConsumerConfig{ + CommonConfig: ccfg, + Processor: proc, + Topics: []apmqueue.Topic{"topic"}, + GroupID: "groupid", + MaxPollBytes: 1 << 28, + } + err := cfg.finalize() + require.NoError(t, err) + assert.NotNil(t, cfg.Processor) + cfg.Processor = nil + assert.NotNil(t, cfg.Logger) + cfg.Logger = nil + + assert.Equal(t, ConsumerConfig{ + CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}}, + Topics: []apmqueue.Topic{"topic"}, + GroupID: "groupid", + MaxPollBytes: 1 << 28, + BrokerMaxReadBytes: 1 << 29, + }, cfg) + }) + t.Run("MaxPollBytes set to 1 << 29", func(t *testing.T) { + cfg := ConsumerConfig{ + CommonConfig: ccfg, + Processor: proc, + Topics: []apmqueue.Topic{"topic"}, + GroupID: "groupid", + MaxPollBytes: 1 << 29, + } + err := cfg.finalize() + require.NoError(t, err) + assert.NotNil(t, cfg.Processor) + cfg.Processor = nil + assert.NotNil(t, cfg.Logger) + cfg.Logger = nil + + assert.Equal(t, ConsumerConfig{ + CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}}, + Topics: []apmqueue.Topic{"topic"}, + GroupID: "groupid", + MaxPollBytes: 1 << 29, + BrokerMaxReadBytes: 1 << 30, + }, cfg) + }) + t.Run("MaxPollBytes set to 1 << 30", func(t *testing.T) { + cfg := ConsumerConfig{ + CommonConfig: ccfg, + Processor: proc, + Topics: []apmqueue.Topic{"topic"}, + GroupID: "groupid", + MaxPollBytes: 1 << 30, + } + err := cfg.finalize() + require.NoError(t, err) + assert.NotNil(t, cfg.Processor) + cfg.Processor = nil + assert.NotNil(t, cfg.Logger) + cfg.Logger = nil + + assert.Equal(t, ConsumerConfig{ + CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}}, + Topics: []apmqueue.Topic{"topic"}, + GroupID: "groupid", + MaxPollBytes: 1 << 30, + BrokerMaxReadBytes: 1 << 30, + }, cfg) + }) + t.Run("MaxPollBytes set to 1 << 31-1", func(t *testing.T) { + cfg := ConsumerConfig{ + CommonConfig: ccfg, + Processor: proc, + Topics: []apmqueue.Topic{"topic"}, + GroupID: "groupid", + MaxPollBytes: 1<<31 - 1, + } + err := cfg.finalize() + require.NoError(t, err) + assert.NotNil(t, cfg.Processor) + cfg.Processor = nil + assert.NotNil(t, cfg.Logger) + cfg.Logger = nil + + assert.Equal(t, ConsumerConfig{ + CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}}, + Topics: []apmqueue.Topic{"topic"}, + GroupID: "groupid", + MaxPollBytes: 1 << 30, + BrokerMaxReadBytes: 1 << 30, + }, cfg) + }) +} + func newConsumer(t testing.TB, cfg ConsumerConfig) *Consumer { if cfg.MaxPollWait <= 0 { // Lower MaxPollWait, ShutdownGracePeriod to speed up execution. From 422f99d0e67ab713ed003a761244269b16a3645c Mon Sep 17 00:00:00 2001 From: Marc Lopez Rubio Date: Fri, 6 Dec 2024 13:14:29 +0800 Subject: [PATCH 2/5] Address review comments Signed-off-by: Marc Lopez Rubio --- kafka/consumer.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/kafka/consumer.go b/kafka/consumer.go index b207030d..d0f85f8c 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -21,6 +21,7 @@ import ( "context" "errors" "fmt" + "math" "strings" "sync" "time" @@ -150,18 +151,24 @@ func (cfg *ConsumerConfig) finalize() error { if cfg.FetchMinBytes < 0 { errs = append(errs, errors.New("kafka: fetch min bytes cannot be negative")) } + if cfg.BrokerMaxReadBytes < 0 { + errs = append(errs, errors.New("kafka: broker max read bytes cannot be negative")) + } + if cfg.BrokerMaxReadBytes > 1<<30 { + cfg.Logger.Info("kafka: BrokerMaxReadBytes exceeds 1GiB, setting to 1GiB") + cfg.BrokerMaxReadBytes = 1 << 30 + } if cfg.MaxPollBytes > 0 { // math.MaxInt32 is 1<<31-1. if cfg.MaxPollBytes > 1<<30 { + cfg.Logger.Info("kafka: MaxPollBytes exceeds 1GiB, setting to 1GiB") cfg.MaxPollBytes = 1 << 30 } if cfg.BrokerMaxReadBytes == 0 { - cfg.BrokerMaxReadBytes = cfg.MaxPollBytes * 2 + cfg.Logger.Info("kafka: BrokerMaxReadBytes unset, setting to MaxPollBytes * 2 or 1GiB, whichever is smallest") + cfg.BrokerMaxReadBytes = int32(math.Min(float64(cfg.MaxPollBytes)*2, 1<<30)) } } - if cfg.BrokerMaxReadBytes < 0 || cfg.BrokerMaxReadBytes > 1<<30 { - cfg.BrokerMaxReadBytes = 1 << 30 - } return errors.Join(errs...) } From 054437553ec91e5d976c308f1a2382b465a4978d Mon Sep 17 00:00:00 2001 From: Marc Lopez Rubio Date: Mon, 9 Dec 2024 11:26:26 +0800 Subject: [PATCH 3/5] Set MaxPollPartitionBytes upper limit Signed-off-by: Marc Lopez Rubio --- kafka/consumer.go | 6 +++ kafka/consumer_test.go | 110 ++++++++++++++++++++++------------------- 2 files changed, 66 insertions(+), 50 deletions(-) diff --git a/kafka/consumer.go b/kafka/consumer.go index d0f85f8c..2f89ea2b 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -154,6 +154,12 @@ func (cfg *ConsumerConfig) finalize() error { if cfg.BrokerMaxReadBytes < 0 { errs = append(errs, errors.New("kafka: broker max read bytes cannot be negative")) } + if cfg.MaxPollPartitionBytes > 0 { + if cfg.MaxPollPartitionBytes > 1<<30 { + cfg.Logger.Info("kafka: MaxPollPartitionBytes exceeds 1GiB, setting to 1GiB") + cfg.MaxPollPartitionBytes = 1 << 30 + } + } if cfg.BrokerMaxReadBytes > 1<<30 { cfg.Logger.Info("kafka: BrokerMaxReadBytes exceeds 1GiB, setting to 1GiB") cfg.BrokerMaxReadBytes = 1 << 30 diff --git a/kafka/consumer_test.go b/kafka/consumer_test.go index 8b24ed7b..60775fa7 100644 --- a/kafka/consumer_test.go +++ b/kafka/consumer_test.go @@ -771,11 +771,12 @@ func TestConsumerConfigFinalizer(t *testing.T) { } t.Run("MaxPollBytes set to 1 << 20", func(t *testing.T) { cfg := ConsumerConfig{ - CommonConfig: ccfg, - Processor: proc, - Topics: []apmqueue.Topic{"topic"}, - GroupID: "groupid", - MaxPollBytes: 1 << 20, + CommonConfig: ccfg, + Processor: proc, + Topics: []apmqueue.Topic{"topic"}, + GroupID: "groupid", + MaxPollBytes: 1 << 20, + MaxPollPartitionBytes: 1 << 20, } err := cfg.finalize() require.NoError(t, err) @@ -785,20 +786,22 @@ func TestConsumerConfigFinalizer(t *testing.T) { cfg.Logger = nil assert.Equal(t, ConsumerConfig{ - CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}}, - Topics: []apmqueue.Topic{"topic"}, - GroupID: "groupid", - MaxPollBytes: 1 << 20, - BrokerMaxReadBytes: 1 << 21, + CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}}, + Topics: []apmqueue.Topic{"topic"}, + GroupID: "groupid", + MaxPollBytes: 1 << 20, + MaxPollPartitionBytes: 1 << 20, + BrokerMaxReadBytes: 1 << 21, }, cfg) }) t.Run("MaxPollBytes set to 1 << 28", func(t *testing.T) { cfg := ConsumerConfig{ - CommonConfig: ccfg, - Processor: proc, - Topics: []apmqueue.Topic{"topic"}, - GroupID: "groupid", - MaxPollBytes: 1 << 28, + CommonConfig: ccfg, + Processor: proc, + Topics: []apmqueue.Topic{"topic"}, + GroupID: "groupid", + MaxPollBytes: 1 << 28, + MaxPollPartitionBytes: 1 << 28, } err := cfg.finalize() require.NoError(t, err) @@ -808,20 +811,22 @@ func TestConsumerConfigFinalizer(t *testing.T) { cfg.Logger = nil assert.Equal(t, ConsumerConfig{ - CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}}, - Topics: []apmqueue.Topic{"topic"}, - GroupID: "groupid", - MaxPollBytes: 1 << 28, - BrokerMaxReadBytes: 1 << 29, + CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}}, + Topics: []apmqueue.Topic{"topic"}, + GroupID: "groupid", + MaxPollBytes: 1 << 28, + MaxPollPartitionBytes: 1 << 28, + BrokerMaxReadBytes: 1 << 29, }, cfg) }) t.Run("MaxPollBytes set to 1 << 29", func(t *testing.T) { cfg := ConsumerConfig{ - CommonConfig: ccfg, - Processor: proc, - Topics: []apmqueue.Topic{"topic"}, - GroupID: "groupid", - MaxPollBytes: 1 << 29, + CommonConfig: ccfg, + Processor: proc, + Topics: []apmqueue.Topic{"topic"}, + GroupID: "groupid", + MaxPollBytes: 1 << 29, + MaxPollPartitionBytes: 1 << 29, } err := cfg.finalize() require.NoError(t, err) @@ -831,20 +836,22 @@ func TestConsumerConfigFinalizer(t *testing.T) { cfg.Logger = nil assert.Equal(t, ConsumerConfig{ - CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}}, - Topics: []apmqueue.Topic{"topic"}, - GroupID: "groupid", - MaxPollBytes: 1 << 29, - BrokerMaxReadBytes: 1 << 30, + CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}}, + Topics: []apmqueue.Topic{"topic"}, + GroupID: "groupid", + MaxPollBytes: 1 << 29, + MaxPollPartitionBytes: 1 << 29, + BrokerMaxReadBytes: 1 << 30, }, cfg) }) t.Run("MaxPollBytes set to 1 << 30", func(t *testing.T) { cfg := ConsumerConfig{ - CommonConfig: ccfg, - Processor: proc, - Topics: []apmqueue.Topic{"topic"}, - GroupID: "groupid", - MaxPollBytes: 1 << 30, + CommonConfig: ccfg, + Processor: proc, + Topics: []apmqueue.Topic{"topic"}, + GroupID: "groupid", + MaxPollBytes: 1 << 30, + MaxPollPartitionBytes: 1 << 30, } err := cfg.finalize() require.NoError(t, err) @@ -854,20 +861,22 @@ func TestConsumerConfigFinalizer(t *testing.T) { cfg.Logger = nil assert.Equal(t, ConsumerConfig{ - CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}}, - Topics: []apmqueue.Topic{"topic"}, - GroupID: "groupid", - MaxPollBytes: 1 << 30, - BrokerMaxReadBytes: 1 << 30, + CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}}, + Topics: []apmqueue.Topic{"topic"}, + GroupID: "groupid", + MaxPollBytes: 1 << 30, + MaxPollPartitionBytes: 1 << 30, + BrokerMaxReadBytes: 1 << 30, }, cfg) }) t.Run("MaxPollBytes set to 1 << 31-1", func(t *testing.T) { cfg := ConsumerConfig{ - CommonConfig: ccfg, - Processor: proc, - Topics: []apmqueue.Topic{"topic"}, - GroupID: "groupid", - MaxPollBytes: 1<<31 - 1, + CommonConfig: ccfg, + Processor: proc, + Topics: []apmqueue.Topic{"topic"}, + GroupID: "groupid", + MaxPollBytes: 1<<31 - 1, + MaxPollPartitionBytes: 1<<31 - 1, } err := cfg.finalize() require.NoError(t, err) @@ -877,11 +886,12 @@ func TestConsumerConfigFinalizer(t *testing.T) { cfg.Logger = nil assert.Equal(t, ConsumerConfig{ - CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}}, - Topics: []apmqueue.Topic{"topic"}, - GroupID: "groupid", - MaxPollBytes: 1 << 30, - BrokerMaxReadBytes: 1 << 30, + CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}}, + Topics: []apmqueue.Topic{"topic"}, + GroupID: "groupid", + MaxPollBytes: 1 << 30, + MaxPollPartitionBytes: 1 << 30, + BrokerMaxReadBytes: 1 << 30, }, cfg) }) } From 7b079ffbcf7a96e60aa019328cb2fe80d8936f04 Mon Sep 17 00:00:00 2001 From: Marc Lopez Rubio Date: Tue, 10 Dec 2024 09:15:38 +0800 Subject: [PATCH 4/5] Address review comments Signed-off-by: Marc Lopez Rubio --- kafka/consumer.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/kafka/consumer.go b/kafka/consumer.go index 2f89ea2b..30c425ab 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -154,11 +154,9 @@ func (cfg *ConsumerConfig) finalize() error { if cfg.BrokerMaxReadBytes < 0 { errs = append(errs, errors.New("kafka: broker max read bytes cannot be negative")) } - if cfg.MaxPollPartitionBytes > 0 { - if cfg.MaxPollPartitionBytes > 1<<30 { - cfg.Logger.Info("kafka: MaxPollPartitionBytes exceeds 1GiB, setting to 1GiB") - cfg.MaxPollPartitionBytes = 1 << 30 - } + if cfg.MaxPollPartitionBytes > 1<<30 { + cfg.Logger.Info("kafka: MaxPollPartitionBytes exceeds 1GiB, setting to 1GiB") + cfg.MaxPollPartitionBytes = 1 << 30 } if cfg.BrokerMaxReadBytes > 1<<30 { cfg.Logger.Info("kafka: BrokerMaxReadBytes exceeds 1GiB, setting to 1GiB") From 20de88b1f4a85cb471927f67b294e7689905f53a Mon Sep 17 00:00:00 2001 From: Marc Lopez Rubio Date: Tue, 10 Dec 2024 09:30:56 +0800 Subject: [PATCH 5/5] Address review comments Signed-off-by: Marc Lopez Rubio --- kafka/consumer.go | 6 ++++++ kafka/consumer_test.go | 13 +++++++++++++ 2 files changed, 19 insertions(+) diff --git a/kafka/consumer.go b/kafka/consumer.go index 30c425ab..3d603eae 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -172,6 +172,12 @@ func (cfg *ConsumerConfig) finalize() error { cfg.Logger.Info("kafka: BrokerMaxReadBytes unset, setting to MaxPollBytes * 2 or 1GiB, whichever is smallest") cfg.BrokerMaxReadBytes = int32(math.Min(float64(cfg.MaxPollBytes)*2, 1<<30)) } + if cfg.BrokerMaxReadBytes > 0 && cfg.BrokerMaxReadBytes < cfg.MaxPollBytes { + errs = append(errs, fmt.Errorf( + "kafka: BrokerMaxReadBytes (%d) cannot be less than MaxPollBytes (%d)", + cfg.BrokerMaxReadBytes, cfg.MaxPollBytes, + )) + } } return errors.Join(errs...) } diff --git a/kafka/consumer_test.go b/kafka/consumer_test.go index 60775fa7..eeb08516 100644 --- a/kafka/consumer_test.go +++ b/kafka/consumer_test.go @@ -894,6 +894,19 @@ func TestConsumerConfigFinalizer(t *testing.T) { BrokerMaxReadBytes: 1 << 30, }, cfg) }) + t.Run("BrokerMaxReadBytes is less than MaxPollBytes", func(t *testing.T) { + cfg := ConsumerConfig{ + CommonConfig: ccfg, + Processor: proc, + Topics: []apmqueue.Topic{"topic"}, + GroupID: "groupid", + BrokerMaxReadBytes: 1, + MaxPollBytes: 1<<31 - 1, + MaxPollPartitionBytes: 1<<31 - 1, + } + err := cfg.finalize() + assert.EqualError(t, err, "kafka: BrokerMaxReadBytes (1) cannot be less than MaxPollBytes (1073741824)") + }) } func newConsumer(t testing.TB, cfg ConsumerConfig) *Consumer {