Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions kafka/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ type ProducerConfig struct {
// RecordPartitioner is a function that returns the partition to which
// a record should be sent. If nil, the default partitioner is used.
RecordPartitioner kgo.Partitioner

// AllowAutoTopicCreation enables topics to be auto created if they do
// not exist when fetching their metadata.
AllowAutoTopicCreation bool
}

// BatchWriteListener specifies a callback function that is invoked after a batch is
Expand Down Expand Up @@ -181,6 +185,9 @@ func NewProducer(cfg ProducerConfig) (*Producer, error) {
if cfg.RecordPartitioner != nil {
opts = append(opts, kgo.RecordPartitioner(cfg.RecordPartitioner))
}
if cfg.AllowAutoTopicCreation {
opts = append(opts, kgo.AllowAutoTopicCreation())
}
client, err := cfg.newClient(cfg.TopicAttributeFunc, opts...)
if err != nil {
return nil, fmt.Errorf("kafka: failed creating producer: %w", err)
Expand Down
Loading