forked from bsm/sarama-cluster
-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathconsumer.go
More file actions
2211 lines (1973 loc) · 78.8 KB
/
consumer.go
File metadata and controls
2211 lines (1973 loc) · 78.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
A simple kafka consumer-group client
Copyright 2016 MistSys
*/
package consumer
import (
"bytes"
"encoding/json"
"fmt"
"log"
"sort"
"sync"
"time"
"github.com/Shopify/sarama"
"github.com/mistsys/sarama-consumer/roundrobin"
)
const logging = true // set to true to see log messages
const debug = false // set to true to see debug messages
const per_msg_debug = false // set to true to see per-message debug messages
// low level logging function. Replace it with your own if desired before making any calls to the rest of the API
var Logf func(fmt string, args ...interface{}) = log.Printf
// logf logs a printf style message if log is enabled
func logf(fmt string, args ...interface{}) {
if logging {
Logf(fmt, args...)
}
}
// dbgf logs a printf style message to somewhere reasonable if debug is enabled, and as efficiently as it can does nothing with any side effects if debug is disabled
func dbgf(fmt string, args ...interface{}) {
if debug {
Logf(fmt, args...)
}
}
// msgf is similar to dbgf but used for per-message debug messages. since these are so numerous there's a separate compile-time flag to compile these out
// in addition, since the go 1.7 compiler isn't eliminating the call sufficiently to avoid allocating the []interface{} needed by "args...interface{}",
// we pass the pointer to the msg. That works and doesn't show up in the heap gc'ed trash profile (pprof -alloc_space)
func msgf(fmt string, msg *sarama.ConsumerMessage) {
if per_msg_debug {
Logf(fmt, msg.Topic, msg.Partition, msg.Offset)
}
}
// minimum kafka API version required. Use this when constructing the sarama.Client's sarama.Config.MinVersion
var MinVersion = sarama.V0_9_0_0
// Error holds the errors generated by this package
type Error struct {
Err error // underlying error
Context string // description of the context surrounding the error
Consumer Consumer // nil, or Consumer which produced the error
Topic string // "", or the topic which had the error
Partition int32 // -1, or the partition which had the error
cl *client
}
func (err *Error) Error() string {
if err.Topic != "" {
if err.Partition != -1 {
return fmt.Sprintf("consumer-group %q: Error %s, topic %q, partition %d: %s", err.cl.group_name, err.Context, err.Topic, err.Partition, err.Err)
}
return fmt.Sprintf("consumer-group %q: Error %s, topic %q: %s", err.cl.group_name, err.Context, err.Topic, err.Err)
}
return fmt.Sprintf("consumer-group %q: Error %s: %s", err.cl.group_name, err.Context, err.Err)
}
// Config is the configuration of a Client. Typically you'd create a default configuration with
// NewConfig, modify any fields of interest, and pass it to NewClient. Once passed to NewClient the
// Config must not be modified. (doing so leads to data races, and may caused bugs as well).
//
// In addition to this config, consumer's code also looks at the sarama.Config of the sarama.Client
// supplied to NewClient, especially at the Consumer.Offsets settings, Version, Metadata.Retry.Backoff,
// Metadata.RefreshFrequency and ChannelBufferSize.
type Config struct {
Session struct {
// The allowed session timeout for registered consumers (defaults to 30s).
// Must be within the allowed server range.
Timeout time.Duration
}
Rebalance struct {
// The allowed rebalance timeout for registered consumers (defaults to 30s).
// Must be within the allowed server range. Only functions if sarama.Config.Version >= 0.10.1
// Otherwise Session.Timeout is used for rebalancing too.
Timeout time.Duration
}
Heartbeat struct {
// Interval between each heartbeat (defaults to 3s). It should be no more
// than 1/3rd of the Group.Session.Timout setting
Interval time.Duration
}
// the partitioner used to map partitions to consumer group members (defaults to a round-robin partitioner)
Partitioner Partitioner
// OffsetOutOfRange is the handler for sarama.ErrOffsetOutOfRange errors (defaults to sarama.OffsetNewest,nil),
// and messages older than MaxMessageAge if MaxMessageAge is set.
// Implementations must return the new starting offset in the partition, or an error. The sarama.Client is included
// for convenience, since handling this might involve querying the partition's current offsets.
OffsetOutOfRange OffsetOutOfRange
// StartingOffset is a hook to allow modifying the starting offset when a Consumer begins to consume
// a partition. (defaults to returning the last committed offset). Some consumers might want to jump
// ahead to fresh messages. The sarama.Client is included for convenience, since handling this might involve
// looking up a partition's offset by time. When no committed offset could be found -1 (sarama.OffsetNewest)
// is passed in. An implementation might want to return client.Config().Consumer.Offsets.Initial in that case.
StartingOffset StartingOffset
// SidechannelOffset is a kafka topic used to exchange partition offsets between dying and rebalancing consumers.
// It defaults to "sarama-consumer-sidechannel-offsets". If SidechannelTopic is "" then this feature is disabled,
// and consumers can rewind as much as Config.Rebalance.Timeout + sarama.Config.Offset.CommitInterval
// when a partition is reassigned. That's always possible (kafka only promises at-least-once), but in high frequency
// topics rewinding the default 30 seconds creates a measureable burst).
// We can't comit offsets normally during a rebalance because at that point in time we still belong to the old generation,
// but the broker belongs to the new generation. Hence this side channel.
SidechannelTopic string
// AssignmentNotification is an optional callback to inform the client code whenever the client gets a new
// partition assignment.
AssignmentNotification AssignmentNotification
// InOrderDone disables extra processing which permits Done() to be called out of ordera.
// If InOrderDone is true then Done() does not have to be be called for every message.
// The caller can wait and call Done() once to indicate that processing is complete for
// all messages up to and including the argument message in the argument's partition.
// This is not the historical behavior, so it is disabled by default.
InOrderDone bool
// NoMessages disables fetching and receiving sarama.ConsumerMessage. So the consumer group participation is performed,
// but fetching the kafka messages is left to the caller's code (presumably using sarama.Broker.Fetch or similar low level API)
// NoMessages requires InOrderDone, since without seeing the messages ingress the group consumer cannot keep track of which
// haven't been completed.
NoMessages bool
// PartitionStartNotification is an optional callback to inform client code of the (partition,offset) at which we've
// started consuming (or, if NoMessages, at which we think the caller should start consuming)
PartitionStartNotification PartitionStartNotification
// MaxMessageAge is a optional function mapping a topic to the maximum lag the consumer should try and maintain in that topic. If the consumer
// lags more than MaxMessageAge (as compared with the sarama.ConsumerMessage.Timestamp) it declares an OffsetOutOfRange condition, and restarts
// where OffsetOutOfRange() indicates. (by default that's OffsetNewest, which might not be what you want, so override OffsetOutOfRange too)
// Note that this is not absolute, since there's always some lag, and pipelining, and in a low message frequency
// partition MaxMessageAge might not make sense. Returning 0 disables this functionality.
MaxMessageAge func(topic string) time.Duration
}
// types of the functions in the Config
type StartingOffset func(topic string, partition int32, committed_offset int64, client sarama.Client) (offset int64, err error)
type OffsetOutOfRange func(topic string, partition int32, client sarama.Client) (offset int64, err error)
type AssignmentNotification func(assignments map[string][]int32) // assignments is a map from topic -> list of partitions
type PartitionStartNotification func(topic string, partition int32, offset int64) // position at which we're going to start consuming from the partition
// default implementation of Config.OffsetOutOfRange jumps to the current head of the partition.
func DefaultOffsetOutOfRange(topic string, partition int32, client sarama.Client) (int64, error) {
return sarama.OffsetNewest, nil
}
// default implementation of Config.StartingOffset starts at the committed offset, or at sarama.Config.Consumer.Offsets.Initial
// if there is no committed offset.
func DefaultStartingOffset(topic string, partition int32, offset int64, client sarama.Client) (int64, error) {
if offset == sarama.OffsetNewest {
offset = client.Config().Consumer.Offsets.Initial
}
return offset, nil
}
// NewConfig constructs a default configuration.
func NewConfig() *Config {
cfg := &Config{}
cfg.Session.Timeout = 30 * time.Second
cfg.Rebalance.Timeout = 30 * time.Second
cfg.Heartbeat.Interval = 3 * time.Second
cfg.Partitioner = roundrobin.RoundRobin
cfg.OffsetOutOfRange = DefaultOffsetOutOfRange
cfg.StartingOffset = DefaultStartingOffset
cfg.SidechannelTopic = "sarama-consumer-sidechannel-offsets"
return cfg
}
/*
NewClient creates a new consumer group client on top of an existing
sarama.Client.
After this call the contents of config should be treated as read-only.
config can be nil if the defaults are acceptable.
The consumer group name is used to match this client with other
instances running elsewhere, but connected to the same cluster
of kafka brokers and using the same consumer group name.
The supplied sarama.Client should have been constructed with a sarama.Config
where sarama.Config.Version is >= consumer.MinVersion, and if full handling of
ErrOffsetOutOfRange is desired, sarama.Config.Consumer.Return.Errors = true.
In addition, this package uses the settings in sarama.Config.Consumer.Offsets
and sarama.Config.Metadata.RefreshFrequency
*/
func NewClient(group_name string, config *Config, sarama_client sarama.Client) (Client, error) {
// sanity check
if config.NoMessages && !config.InOrderDone {
return nil, fmt.Errorf("invalid sarama-consumer.Config: .NoMessages requires .InOrderDone")
}
cl := &client{
client: sarama_client,
config: config,
group_name: group_name,
errors: make(chan error),
closed: make(chan struct{}),
add_consumers: make(chan add_consumers),
rem_consumer: make(chan *consumer),
sidechannel_commit: make(chan map[string][]SidechannelOffset),
}
// start the client's manager goroutine
rc := make(chan error)
cl.wg.Add(1)
go cl.run(rc)
return cl, <-rc
}
/*
Client is a kafaka client belonging to a consumer group. It is created by NewClient.
*/
type Client interface {
// Consume returns a consumer of the given topic
Consume(topic string) (Consumer, error)
// ConsumeMany starts consuming many topics at once. It is much more efficient than calling Consume
// repeatedly because kafka brokers serialize joining topics
ConsumeMany(topics []string) ([]Consumer, error)
// Close closes the client. It must be called to shutdown
// the client. It cleans up any unclosed topic Consumers created by this Client.
// It does NOT close the inner sarama.Client.
// Calling twice is NOT supported.
Close()
// Errors returns a channel which can (should) be monitored
// for errors. callers should probably log or otherwise report
// the returned errors. The channel closes when the client
// is closed.
Errors() <-chan error
// TODO have a Status() method for debug/logging? Or is Errors() enough?
}
/*
Consumer is a consumer of a topic.
Messages from any partition assigned to this client arrive on the
channel returned by Messages.
Every message read from the Messages channel should be eventually passed
to Done, or have its topic/partition/offset passed to MarkUpTo.
Calling Done is the signal that that one message has been consumed (possibly
out of receive order).
*/
type Consumer interface {
// Messages returns the channel of messages arriving from kafka. It always
// returns the same result, so it is safe to call once and store the result.
// Every message read from the channel should be passed to Done when processing
// of the message is complete.
// It is not necessary to call Done in the same order as messages are received
// from this channel.
Messages() <-chan *sarama.ConsumerMessage
// Done indicates the processing of the message is complete, and its offset can
// be committed to kafka. Calling Done twice with the same message, or with a
// garbage message, can cause trouble.
// Calling Done on message out of order is supported, and the consumer keeps
// track of the correct offset to commit to kafka.
Done(*sarama.ConsumerMessage)
// AsyncClose terminates the consumer cleanly. Callers can continue to read from
// Messages channel until it is closed, or not, as they wish.
// Calling Client.Close() performs a AsyncClose() on any remaining consumers.
// Calling AsyncClose multiple times is permitted. Only the first call has any effect.
// Never calling AsyncClose is also permitted. Client.Close() implies Consumer.AsyncClose.
AsyncClose()
// Close terminates the consumer and waits for it to be finished committing the current
// offsets to kafka. Calling twice happens to work at the moment, but let's not encourage it.
Close()
}
/*
Partitioner maps partitions to consumer group members.
When the user wants control over the partitioning they should set
Config.Partitioner to their implementation of Partitioner.
*/
type Partitioner interface {
// name this partitioner (used for log messages)
Name() string
// PrepareJoin prepares a JoinGroupRequest given the topics supplied.
// The simplest implementation would be something like
// join_req.AddGroupProtocolMetadata("<partitioner name>", &sarama.ConsumerGroupMemberMetadata{ Version: 1, Topics: topics, })
PrepareJoin(join_req *sarama.JoinGroupRequest, topics []string, current_assignments map[string][]int32)
// Partition performs the partitioning. Given the requested
// memberships from the JoinGroupResponse, it adds the results
// to the SyncGroupRequest. Returning an error cancels everything.
// The sarama.Client supplied to NewClient is included for convenince,
// since performing the partitioning probably requires looking at each
// topic's metadata, especially its list of partitions.
Partition(*sarama.SyncGroupRequest, *sarama.JoinGroupResponse, sarama.Client) error
// ParseSync parses the SyncGroupResponse and returns the map of topics
// to partitions assigned to this client, or an error if the information
// is not parsable.
ParseSync(*sarama.SyncGroupResponse) (map[string][]int32, error)
}
// client implements the Client interface
type client struct {
client sarama.Client // the sarama client from which we were constructed
config *Config // our configuration (read-only)
group_name string // the client-group name
errors chan error // channel over which asynchronous errors are reported
closed chan struct{} // channel which is closed to cause the client to shutdown
wg sync.WaitGroup // waitgroup which is done when the client is shutdown
add_consumers chan add_consumers // command channel used to add new consumers
rem_consumer chan *consumer // command channel used to remove an existing consumer
sidechannel_commit chan map[string][]SidechannelOffset // command channel used to commit to the sidechannel
}
// Errors returns the channel over which asynchronous errors are observed.
func (cl *client) Errors() <-chan error { return cl.errors }
// add_consumers are the messages sent over the client.add_consumers channel
type add_consumers struct {
cons []*consumer
reply chan<- error
}
func (cl *client) Consume(topic string) (Consumer, error) {
sarama_consumer, err := sarama.NewConsumerFromClient(cl.client)
if err != nil {
return nil, cl.makeError("Consume sarama.NewConsumerFromClient", err)
}
chanbufsize := cl.client.Config().ChannelBufferSize // give ourselves some capacity once I know it runs right without any (capacity hides bugs :-)
con := &consumer{
cl: cl,
consumer: sarama_consumer,
topic: topic,
in_order_done: cl.config.InOrderDone,
messages: make(chan *sarama.ConsumerMessage, chanbufsize),
closed: make(chan struct{}),
exited: make(chan struct{}),
assignments: make(chan *assignment, 1),
commit_reqs: make(chan commit_req),
done: make(chan *sarama.ConsumerMessage, chanbufsize),
}
if cl.config.MaxMessageAge != nil {
con.max_age = cl.config.MaxMessageAge(topic)
}
if !con.in_order_done {
con.premessages = make(chan *sarama.ConsumerMessage, chanbufsize)
}
if !con.cl.config.NoMessages {
con.restart_partitions = make(chan *partition)
}
reply := make(chan error)
cl.add_consumers <- add_consumers{[]*consumer{con}, reply}
err = <-reply
if err != nil {
// if an error is returned then it is up to us to close the sarama.Consumer
_ = sarama_consumer.Close() // we already have an error to return. a 2nd one is too much
return nil, err
}
return con, nil
}
func (cl *client) ConsumeMany(topics []string) ([]Consumer, error) {
sarama_consumer, err := sarama.NewConsumerFromClient(cl.client)
if err != nil {
return nil, cl.makeError("ConsumeMany sarama.NewConsumerFromClient", err)
}
chanbufsize := cl.client.Config().ChannelBufferSize // give ourselves some capacity once I know it runs right without any (capacity hides bugs :-)
consumers := make([]*consumer, len(topics))
for i, topic := range topics {
consumers[i] = &consumer{
cl: cl,
consumer: sarama_consumer,
topic: topic,
in_order_done: cl.config.InOrderDone,
messages: make(chan *sarama.ConsumerMessage, chanbufsize),
closed: make(chan struct{}),
exited: make(chan struct{}),
assignments: make(chan *assignment, 1),
commit_reqs: make(chan commit_req),
restart_partitions: make(chan *partition),
done: make(chan *sarama.ConsumerMessage, chanbufsize),
}
if cl.config.MaxMessageAge != nil {
consumers[i].max_age = cl.config.MaxMessageAge(topic)
}
if !consumers[i].in_order_done {
consumers[i].premessages = make(chan *sarama.ConsumerMessage, chanbufsize)
}
}
reply := make(chan error)
cl.add_consumers <- add_consumers{consumers, reply}
err = <-reply
if err != nil {
// if an error is returned then it is up to us to close the sarama.Consumer
_ = sarama_consumer.Close() // we already have an error to return. a 2nd one is too much
return nil, err
}
cons := make([]Consumer, len(consumers))
for i := range consumers {
cons[i] = consumers[i]
}
return cons, nil
}
// Close shutsdown the client and any remaining Consumers.
func (cl *client) Close() {
// signal to cl.run() that it should exit
dbgf("Close client of consumer-group %q", cl.group_name)
close(cl.closed)
// and wait for the shutdown to be complete
cl.wg.Wait()
}
// run is a long lived goroutine which manages this client's membership in the consumer group.
func (cl *client) run(early_rc chan<- error) {
defer cl.wg.Done()
var member_id string // our group member id, assigned to us by kafka when we first make contact
consumers := make(map[string]*consumer) // map of topic -> consumer
var assignments map[string][]int32 // nil, or our currently assigned partitions (map of topic -> list of partitions)
var wg sync.WaitGroup // waitgroup used to wait for all consumers to exit
defer dbgf("consumer-group %q client exiting", cl.group_name)
// add a consumer
add := func(add add_consumers) {
dbgf("client.run add(%d topics)", len(add.cons))
// first make sure we aren't already consuming any of these topics
for _, con := range add.cons {
if _, ok := consumers[con.topic]; ok {
// topic already is being consumed. the way the standard kafka 0.9 group coordination works you cannot consume twice with the
// same client. If you want to consume the same topic twice, use two Clients.
add.reply <- cl.makeError("Consume", fmt.Errorf("topic %q is already being consumed", con.topic))
return
}
}
// then start them all at once, so that the next time we send a join we contain all these topics
for _, con := range add.cons {
dbgf("client.run add(topic %q)", con.topic)
consumers[con.topic] = con
wg.Add(1)
go con.run(&wg)
}
add.reply <- nil
}
// remove a consumer
rem := func(con *consumer) {
dbgf("client.run rem(topic %q)", con.topic)
existing_con := consumers[con.topic]
if existing_con == con {
delete(consumers, con.topic)
delete(assignments, con.topic) // forget about the topic's partition assignment
} // else it's some old consumer and we've already removed it
// and let the consumer shutdown
close(con.assignments)
close(con.commit_reqs)
}
// shutdown the consumers. waits until they are all stopped. only call once and return afterwards, since it makes assumptions that hold only when it is used like that
shutdown := func() {
dbgf("client.run shutdown")
// shutdown the remaining consumers
for _, con := range consumers {
con.AsyncClose()
}
// and consume any last rem_consumer messages from them
go func() {
wg.Wait()
close(cl.rem_consumer)
}()
for con := range cl.rem_consumer {
rem(con)
}
wg.Wait()
// and shutdown the errors channel
close(cl.errors)
}
// if enabled, subscribe to the side-channel topic on the appropriate partition
var sidechannel_queries chan sidechannel_query // nil, or command channel used to request offsets from sidechannel
if topic := cl.config.SidechannelTopic; topic != "" {
sidechannel_queries = make(chan sidechannel_query)
ready := make(chan error)
go cl.sidechannel_consumer(topic, sidechannel_queries, ready)
// want and log errors until sidechannel subscription is ready, since we want to capture the sidechannel msgs we will trigger by our join-group request
for err := range ready {
err = cl.makeError(fmt.Sprintf("consuming SidechannelTopic %q", topic), err)
if early_rc != nil {
early_rc <- err
return
}
cl.deliverError("", err)
}
} // else leave sidechannel_queries nil
// always start the producer, even if it is just a dummy routine that drains and throws away msgs in cl.sidechannel_commit
cl.wg.Add(1)
go cl.sidechannel_producer(cl.config.SidechannelTopic)
// commitToSidechannel trys to send the partition offsets to the SidechannelTopic
commitToSidechannel := func() {
dbgf("commitToSidechannel()")
if cl.config.SidechannelTopic == "" {
// no side channel is configured
return
}
// gather up the offsets to commit
var wg sync.WaitGroup
resp := make(chan commit_resp)
for _, con := range consumers {
wg.Add(1)
con.commit_reqs <- commit_req{resp, &wg}
}
go func(resp chan commit_resp, wg *sync.WaitGroup) {
wg.Wait()
close(resp)
}(resp, &wg)
var offsets = make(map[string][]SidechannelOffset)
for r := range resp {
if r.offset != sarama.OffsetNewest && r.offset != sarama.OffsetOldest {
dbgf("commit (%q, %d, %d)", r.topic, r.partition, r.offset)
offsets[r.topic] = append(offsets[r.topic], SidechannelOffset{
Partition: r.partition,
Offset: r.offset,
})
} // else don't commit these special offsets
}
select {
case cl.sidechannel_commit <- offsets:
// good
case <-cl.closed:
return
}
}
// start the commit timer
var commit_timer <-chan time.Time
clconfig := cl.client.Config()
commit_interval := clconfig.Consumer.Offsets.AutoCommit.Interval
if commit_interval == 0 && clconfig.Consumer.Offsets.CommitInterval > 0 {
// use the legacy option
commit_interval = clconfig.Consumer.Offsets.CommitInterval
}
if commit_interval > 0 {
commit_ticker := time.NewTicker(commit_interval)
commit_timer = commit_ticker.C
defer commit_ticker.Stop()
} // else don't commit periodically (we still commit when closing down)
pause := false
refresh := false // refresh the coordinating broker (after an I/O error or a ErrNotCoordinatorForConsumer)
reopen := false // reopen coordinating broker (after an I/O error)
var coor *sarama.Broker // nil, or coordinating broker
// loop rejoining the group each time the group reforms
join_loop:
for {
if pause {
delay := cl.client.Config().Metadata.Retry.Backoff // TODO should we increase timeouts?
dbgf("pausing %v", delay)
// pause before continuing, so we don't fail continuously too fast
timeout := time.After(delay)
pause_loop:
for {
select {
case <-timeout:
break pause_loop
case <-cl.closed:
// shutdown the remaining consumers
shutdown()
return
case a := <-cl.add_consumers:
add(a)
case r := <-cl.rem_consumer:
rem(r)
case <-commit_timer:
commitToSidechannel()
}
}
pause = false
}
if reopen {
if coor != nil {
dbgf("closing and reopening connection to coordinator %d %s", coor.ID(), coor.Addr())
if ok, err := coor.Connected(); ok {
err = coor.Close()
if err != nil {
cl.deliverError(fmt.Sprintf("Close()ing coordinating broker %d %s", coor.ID(), coor.Addr()), err)
}
} else if err != nil {
// remote the earlier error
cl.deliverError(fmt.Sprintf("past Open() of coordinating broker %d %s", coor.ID(), coor.Addr()), err)
}
err := coor.Open(cl.client.Config())
if err != nil {
cl.deliverError(fmt.Sprintf("re-Open()ing coordinating broker %d %s", coor.ID(), coor.Addr()), err)
}
// coor.Open() is asynchronous. We'll continue without waiting (without doing an coor.Connected() call)
// because coor might not even be our coordinator anymore (and might not exist)
}
reopen = false
// after reopening (successfully or not), always refresh the coordinator
refresh = true
}
if refresh {
dbgf("refreshing coordinating broker")
// refresh the group coordinator (because sarama caches the result, and the cache must be manually refreshed by us when we decide an invalidate might be needed)
err := cl.client.RefreshCoordinator(cl.group_name)
if err != nil {
err = cl.makeError("refreshing coordinating broker", err)
if early_rc != nil {
early_rc <- err
return
}
cl.deliverError("", err)
pause = true
continue join_loop
}
refresh = false
}
// make contact with the kafka broker coordinating this group
// NOTE: sarama keeps the result cached, so we aren't taking a round trip to the kafka brokers very time
// (then again we need to manage sarama's cache too)
var err error
coor, err = cl.client.Coordinator(cl.group_name)
if err != nil {
err = cl.makeError("contacting coordinating broker "+coor.Addr(), err)
if early_rc != nil {
early_rc <- err
return
}
cl.deliverError("", err)
pause = true
refresh = true
continue join_loop
}
dbgf("Coordinator %v %v", coor.ID(), coor.Addr())
// make sure we are connected to the broker
if ok, err := coor.Connected(); !ok {
err = cl.makeError("connecting coordinating broker "+coor.Addr(), err)
if early_rc != nil {
early_rc <- err
return
}
cl.deliverError("", err)
pause = true
reopen = true
continue join_loop
}
// join the group
jreq := &sarama.JoinGroupRequest{
GroupId: cl.group_name,
SessionTimeout: int32(cl.config.Session.Timeout / time.Millisecond),
MemberId: member_id,
ProtocolType: "consumer", // we implement the standard kafka 0.9 consumer protocol metadata
}
num_partitions := make(map[string]int, len(consumers))
{ // prepare the join request
var topics = make([]string, 0, len(consumers))
var current_assignments = make(map[string][]int32, len(consumers))
for topic := range consumers {
topics = append(topics, topic)
if a := assignments[topic]; len(a) != 0 { // omit any topics for which we are not assigned a partition
current_assignments[topic] = a
}
// and keep track of the # of partitions we saw before we joined
partitions, err := cl.client.Partitions(topic)
if err != nil {
cl.deliverError(fmt.Sprintf("looking up partitions of topic %q", topic), err)
} else {
num_partitions[topic] = len(partitions)
}
}
logf("consumer %q proposing partitioner %q", cl.group_name, cl.config.Partitioner.Name())
cl.config.Partitioner.PrepareJoin(jreq, topics, current_assignments)
}
// send and wait for join response while still committing to the side channel, since the JoinGroupResponse doesn't arrive until the broker is sure it has gathered them all
var jresp *sarama.JoinGroupResponse
done := make(chan struct{})
go func(jreq *sarama.JoinGroupRequest) {
dbgf("sending JoinGroupRequest %v", jreq)
jresp, err = coor.JoinGroup(jreq)
dbgf("received JoinGroupResponse %v, %v", jresp, err)
close(done)
}(jreq)
wait_for_jresp:
for {
select {
case <-done:
break wait_for_jresp
case <-commit_timer:
commitToSidechannel()
}
}
if err != nil {
// some I/O error happened; we should reopen and refresh the current coordinator
reopen = true
} else if jresp.Err != 0 {
switch jresp.Err {
case sarama.ErrNotCoordinatorForConsumer, sarama.ErrConsumerCoordinatorNotAvailable, sarama.ErrRebalanceInProgress:
refresh = true // the broker is no longer the coordinator. we should refresh the current coordinator
case sarama.ErrUnknownMemberId:
member_id = "" // the coordinator no longer knows who we are; have it assign us a new member id
}
err = jresp.Err
}
if err != nil {
switch err {
case sarama.ErrRebalanceInProgress:
// The "error" whenever the kafka consumer group starts a new generation is correct, expected, and normal
logf("new consumer group %q generation forming (discovered while joining group): %v", cl.group_name, err)
default:
err = cl.makeError("joining group", err)
// if it is still early (the 1st iteration of this loop) then return the error and bail out
if early_rc != nil {
early_rc <- err
return
}
cl.deliverError("", err)
}
pause = true
continue join_loop
}
// we managed to get a successfull join-group response. that is far enough that basic communication is functioning
// and we can declare that our early_rc is success and release the caller to NewClient
if early_rc != nil {
early_rc <- nil
early_rc = nil
}
// save our member_id for next time we join, and the new generation id
member_id = jresp.MemberId
generation_id := jresp.GenerationId
logf("consumer %q joining generation %d as member %q", cl.group_name, generation_id, member_id)
// prepare a sync request
sreq := &sarama.SyncGroupRequest{
GroupId: cl.group_name,
GenerationId: generation_id,
MemberId: member_id,
}
// we have been chosen as the leader then we have to map the partitions
if jresp.LeaderId == member_id {
dbgf("leader is we; partitioning using partitioner %s", cl.config.Partitioner.Name())
err := cl.config.Partitioner.Partition(sreq, jresp, cl.client)
if err != nil {
cl.deliverError("partitioning", err)
// and rejoin (thus aborting this generation) since we can't partition it as needed
pause = true
continue join_loop
}
}
// send SyncGroup
var sresp *sarama.SyncGroupResponse
done = make(chan struct{})
go func(sreq *sarama.SyncGroupRequest) {
dbgf("sending SyncGroupRequest %v", sreq)
sresp, err = coor.SyncGroup(sreq)
dbgf("received SyncGroupResponse %v, %v", sresp, err)
close(done)
}(sreq)
wait_for_sresp:
for {
select {
case <-done:
break wait_for_sresp
case <-commit_timer:
commitToSidechannel()
}
}
if err != nil {
reopen = true
} else if sresp.Err != 0 {
switch sresp.Err {
case sarama.ErrNotCoordinatorForConsumer, sarama.ErrConsumerCoordinatorNotAvailable, sarama.ErrRebalanceInProgress:
refresh = true // the broker is no longer the coordinator. we should refresh the current coordinator
case sarama.ErrUnknownMemberId:
member_id = "" // the coordinator no longer knows who we are; have it assign us a new member id
}
err = sresp.Err
}
if err != nil {
switch err {
case sarama.ErrRebalanceInProgress:
// The "error" whenever the kafka consumer group starts a new generation is correct, expected, and normal
logf("new consumer group %q generation forming (discovered while synchronizing group): %v", cl.group_name, err)
default:
cl.deliverError("synchronizing group", err)
}
pause = true
continue join_loop
}
new_assignments, err := cl.config.Partitioner.ParseSync(sresp)
if err != nil {
cl.deliverError("decoding member assignments", err)
pause = true
continue join_loop
}
// keep track of which and how many partitions we are assigned
assignments = new_assignments
num_assigned_partitions := 0
for _, parts := range assignments {
num_assigned_partitions += len(parts)
}
logf("consumer %q assigned %d partitions; assignment: %v", cl.group_name, num_assigned_partitions, assignments)
if cl.config.AssignmentNotification != nil {
// keep users from thinking they can alter assignments in the callback by making a deep copy
acopy := make(map[string][]int32, len(assignments))
n := 0
for _, v := range assignments {
n += len(v)
}
parts := make([]int32, 0, n) // space for all the partitions slices (better for gc and for perf)
for k, v := range assignments {
n := len(parts)
parts := append(parts, v...)
acopy[k] = parts[n:len(parts):len(parts)]
}
cl.config.AssignmentNotification(acopy)
}
// save and distribute the new assignments to our topic consumers
a := &assignment{
generation_id: generation_id,
coordinator: coor,
member_id: member_id,
assignments: assignments,
sidechannel_queries: sidechannel_queries,
}
for _, con := range consumers {
// con.assignments has a capacity of 1. the chan is either empty, or contains a stale assignment we can remove and replace
select {
case con.assignments <- a:
// a is delivered
case <-con.assignments:
// we've cleared out the stale assignment. since we've the only code which writes to this channel we now know we have room
con.assignments <- a
}
}
// start the heartbeat timer
heartbeat_timer := time.After(cl.config.Heartbeat.Interval)
// and the metadata check timer
var metadata_timer <-chan time.Time
if clconfig.Metadata.RefreshFrequency > 0 {
metadata_timer = time.After(clconfig.Metadata.RefreshFrequency)
}
// and loop, sending heartbeats until something happens and we need to rejoin (or exit)
for {
select {
case <-cl.closed:
// cl.Close() has been called; time to exit
// shutdown any remaining consumers (causing them to sync their final offsets)
shutdown()
// and nicely leave the consumer group
req := &sarama.LeaveGroupRequest{
GroupId: cl.group_name,
MemberId: member_id,
}
dbgf("sending LeaveGroupRequest %v", req)
resp, err := coor.LeaveGroup(req)
dbgf("received LeaveGroupResponse %v, %v", resp, err)
// note: we don't bother with the full error handling code, since we're exiting anyway
if err == nil && resp.Err != 0 {
err = resp.Err
}
if err != nil {
cl.deliverError("leaving group", err)
}
// and we're done
return
case <-heartbeat_timer:
// send a heartbeat
req := &sarama.HeartbeatRequest{
GroupId: cl.group_name,
MemberId: member_id,
GenerationId: generation_id,
}
dbgf("sending HeartbeatRequest %v", req)
resp, err := coor.Heartbeat(req)
dbgf("received HeartbeatResponse %v, %v", resp, err)
if err != nil {
reopen = true
} else if resp.Err != 0 {
switch resp.Err {
case sarama.ErrNotCoordinatorForConsumer, sarama.ErrConsumerCoordinatorNotAvailable, sarama.ErrRebalanceInProgress:
refresh = true // the broker is no longer the coordinator. we should refresh the current coordinator
case sarama.ErrUnknownMemberId:
member_id = "" // the coordinator no longer knows who we are; have it assign us a new member id
}
err = resp.Err
}
if err != nil {
switch err {
case sarama.ErrRebalanceInProgress, sarama.ErrIllegalGeneration:
// The "error" whenever the kafka consumer group starts a new generation is correct, expected, and normal
logf("consumer group %q at %v is rebalancing: %v; rejoining new generation", cl.group_name, coor.Addr(), err)
default:
cl.deliverError("heartbeating with "+coor.Addr(), err)
}
// we've got heartbeat troubles of one kind or another; disconnect and reconnect
continue join_loop
}
// and start the next heartbeat only after we get the response to this one
// that way when the network or the broker are slow we back off.
heartbeat_timer = time.After(cl.config.Heartbeat.Interval)
case <-commit_timer:
ocreq := &sarama.OffsetCommitRequest{
ConsumerGroup: cl.group_name,
ConsumerGroupGeneration: generation_id,
ConsumerID: member_id,
RetentionTime: int64(clconfig.Consumer.Offsets.Retention / time.Millisecond),
Version: 2, // kafka 0.9.0 version, with RetentionTime
}
if clconfig.Consumer.Offsets.Retention == 0 { // note that this and the rounding math above means that if you wanted a retention time of 0 millseconds you could set Config.Offsets.RetentionTime to something < 1 ms, like 1 nanosecond
ocreq.RetentionTime = -1 // use broker's value
}
var wg sync.WaitGroup
resp := make(chan commit_resp, num_assigned_partitions) // allocating room for the responses helps the code run smoothly
for _, con := range consumers {
wg.Add(1)
con.commit_reqs <- commit_req{resp, &wg}
}
go func(resp chan commit_resp, wg *sync.WaitGroup) {
wg.Wait()
close(resp)