-
Notifications
You must be signed in to change notification settings - Fork 5
/
sarama.go
55 lines (46 loc) · 1.55 KB
/
sarama.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package consumer
import (
"github.com/Shopify/sarama"
"github.com/practo/klog/v2"
)
func NewSaramaConsumer() saramaConsumer {
return saramaConsumer{
ready: make(chan bool),
}
}
// saramaConsumer represents a Sarama consumer group consumer
type saramaConsumer struct {
// ready is used to signal the main thread about the readiness
ready chan bool
}
// Setup is run at the beginning of a new session, before ConsumeClaim
func (c saramaConsumer) Setup(sarama.ConsumerGroupSession) error {
// Mark the consumer as ready
klog.V(4).Info("Setting up saramaConsumer")
close(c.ready)
return nil
}
// Cleanup is run at the end of a session,
// once all ConsumeClaim goroutines have exited
func (c saramaConsumer) Cleanup(sarama.ConsumerGroupSession) error {
klog.V(4).Info("Cleaning up saramaConsumer")
return nil
}
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (c saramaConsumer) ConsumeClaim(
session sarama.ConsumerGroupSession,
claim sarama.ConsumerGroupClaim) error {
klog.V(4).Info("Starting to consume messages...")
// NOTE:
// Do not move the code below to a goroutine.
// The `ConsumeClaim` itself is called within a goroutine, see:
// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
for message := range claim.Messages() {
klog.Infof("Message claimed: value = %s, timestamp = %v, topic = %s",
string(message.Value), message.Timestamp, message.Topic)
session.MarkMessage(message, "")
session.Commit()
}
klog.V(4).Info("All messages were consumed, exiting consumerClaim.")
return nil
}