diff --git a/kafka/producer.go b/kafka/producer.go index 9219dc05..0f85065f 100644 --- a/kafka/producer.go +++ b/kafka/producer.go @@ -92,6 +92,10 @@ type ProducerConfig struct { // RecordPartitioner is a function that returns the partition to which // a record should be sent. If nil, the default partitioner is used. RecordPartitioner kgo.Partitioner + + // AllowAutoTopicCreation enables topics to be auto created if they do + // not exist when fetching their metadata. + AllowAutoTopicCreation bool } // BatchWriteListener specifies a callback function that is invoked after a batch is @@ -181,6 +185,9 @@ func NewProducer(cfg ProducerConfig) (*Producer, error) { if cfg.RecordPartitioner != nil { opts = append(opts, kgo.RecordPartitioner(cfg.RecordPartitioner)) } + if cfg.AllowAutoTopicCreation { + opts = append(opts, kgo.AllowAutoTopicCreation()) + } client, err := cfg.newClient(cfg.TopicAttributeFunc, opts...) if err != nil { return nil, fmt.Errorf("kafka: failed creating producer: %w", err)