-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathconsumer.go
More file actions
94 lines (80 loc) · 2.65 KB
/
consumer.go
File metadata and controls
94 lines (80 loc) · 2.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package kafka
import (
"context"
"log"
"strings"
"github.com/Shopify/sarama"
)
// IncomingMessage represent a message
type IncomingMessage struct {
Topic string
Value string
}
// ConsumerHandlerFunc will be called to process a message
type ConsumerHandlerFunc func(context.Context, IncomingMessage)
// Consumer represents a Sarama consumer group consumer
type Consumer struct {
Brokers string
ClientID string
Group string
Ctx context.Context
client sarama.ConsumerGroup
handlers map[string]ConsumerHandlerFunc
}
// Run do message consuming and call message handler function
func (consumer *Consumer) Run() {
if len(consumer.handlers) == 0 {
log.Fatalln("Error: No registered handlers. Please call Consumer.AddHandler(..) first")
}
// get all topics from handlers
topics := make([]string, len(consumer.handlers))
i := 0
for key := range consumer.handlers {
topics[i] = key
i++
}
config := sarama.NewConfig()
config.Version, _ = sarama.ParseKafkaVersion("2.1.1")
config.ClientID = consumer.ClientID
config.Consumer.Return.Errors = true
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
config.Consumer.Offsets.Initial = sarama.OffsetOldest
client, err := sarama.NewConsumerGroup(strings.Split(consumer.Brokers, ","), consumer.Group, config)
if err != nil {
log.Panicf("Error creating consumer group client: %v", err)
}
consumer.client = client
for {
if err := client.Consume(consumer.Ctx, topics, consumer); err != nil {
log.Panicf("Error from consumer: %v", err)
}
// check if context was cancelled, signaling that the consumer should stop
if consumer.Ctx.Err() != nil {
return
}
}
}
// AddHandler registers new topic handler
func (consumer *Consumer) AddHandler(topic string, handler ConsumerHandlerFunc) {
if nil == consumer.handlers {
consumer.handlers = make(map[string]ConsumerHandlerFunc)
}
consumer.handlers[topic] = handler
}
// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
return nil
}
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
// This method called in dedicated goroutine
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
session.MarkMessage(message, "")
consumer.handlers[claim.Topic()](consumer.Ctx, IncomingMessage{Topic: message.Topic, Value: string(message.Value)})
}
return nil
}