Skip to content

Commit

Permalink
Shutdown sarama consumer cleanly (#987)
Browse files Browse the repository at this point in the history
- Shutdown all partitions before shutting down the sarama consumer. This sidesteps bsm/sarama-cluster#255 and ensures that the shutdown completes in a reasonable timeframe.
- Wait for PartitionConsumer shutdown before consuming messages
- Use Sarama's PartitionConsumer mock instead of relying on our own because it is richer and well tested.
  • Loading branch information
vprithvi committed Aug 23, 2018
1 parent 75af597 commit afefdf1
Show file tree
Hide file tree
Showing 2 changed files with 186 additions and 139 deletions.
72 changes: 39 additions & 33 deletions cmd/ingester/app/consumer/consumer.go
Expand Up @@ -42,55 +42,58 @@ type Consumer struct {
internalConsumer consumer.Consumer
processorFactory ProcessorFactory

close chan struct{}
isClosed sync.WaitGroup
partitionIDToState map[int32]*consumerState
}

type consumerState struct {
wg sync.WaitGroup
partitionConsumer sc.PartitionConsumer
}

// New is a constructor for a Consumer
func New(params Params) (*Consumer, error) {
return &Consumer{
metricsFactory: params.Factory,
logger: params.Logger,
close: make(chan struct{}, 1),
isClosed: sync.WaitGroup{},
internalConsumer: params.InternalConsumer,
processorFactory: params.ProcessorFactory,
metricsFactory: params.Factory,
logger: params.Logger,
internalConsumer: params.InternalConsumer,
processorFactory: params.ProcessorFactory,
partitionIDToState: make(map[int32]*consumerState),
}, nil
}

// Start begins consuming messages in a go routine
func (c *Consumer) Start() {
c.isClosed.Add(1)
c.logger.Info("Starting main loop")
go c.mainLoop()
go func() {
c.logger.Info("Starting main loop")
for pc := range c.internalConsumer.Partitions() {
if p, ok := c.partitionIDToState[pc.Partition()]; ok {
// This is a guard against simultaneously draining messages
// from the last time the partition was assigned and
// processing new messages for the same partition, which may lead
// to the cleanup process not completing
p.wg.Wait()
}
c.partitionIDToState[pc.Partition()] = &consumerState{partitionConsumer: pc}
go c.handleMessages(pc)
go c.handleErrors(pc.Partition(), pc.Errors())
}
}()
}

// Close closes the Consumer and underlying sarama consumer
func (c *Consumer) Close() error {
close(c.close)
c.isClosed.Wait()
return c.internalConsumer.Close()
}

func (c *Consumer) mainLoop() {
for {
select {
case pc := <-c.internalConsumer.Partitions():
c.isClosed.Add(2)

go c.handleMessages(pc)
go c.handleErrors(pc.Partition(), pc.Errors())

case <-c.close:
c.isClosed.Done()
return
}
for _, p := range c.partitionIDToState {
c.closePartition(p.partitionConsumer)
p.wg.Wait()
}
c.logger.Info("Closing parent consumer")
return c.internalConsumer.Close()
}

func (c *Consumer) handleMessages(pc sc.PartitionConsumer) {
c.logger.Info("Starting message handler")
defer c.isClosed.Done()
c.logger.Info("Starting message handler", zap.Int32("partition", pc.Partition()))
c.partitionIDToState[pc.Partition()].wg.Add(1)
defer c.partitionIDToState[pc.Partition()].wg.Done()
defer c.closePartition(pc)

msgMetrics := c.newMsgMetrics(pc.Partition())
Expand All @@ -109,6 +112,7 @@ func (c *Consumer) handleMessages(pc sc.PartitionConsumer) {

msgProcessor.Process(&saramaMessageWrapper{msg})
}
c.logger.Info("Finished handling messages", zap.Int32("partition", pc.Partition()))
}

func (c *Consumer) closePartition(partitionConsumer sc.PartitionConsumer) {
Expand All @@ -118,12 +122,14 @@ func (c *Consumer) closePartition(partitionConsumer sc.PartitionConsumer) {
}

func (c *Consumer) handleErrors(partition int32, errChan <-chan *sarama.ConsumerError) {
c.logger.Info("Starting error handler")
defer c.isClosed.Done()
c.logger.Info("Starting error handler", zap.Int32("partition", partition))
c.partitionIDToState[partition].wg.Add(1)
defer c.partitionIDToState[partition].wg.Done()

errMetrics := c.newErrMetrics(partition)
for err := range errChan {
errMetrics.errCounter.Inc(1)
c.logger.Error("Error consuming from Kafka", zap.Error(err))
}
c.logger.Info("Finished handling errors", zap.Int32("partition", partition))
}

0 comments on commit afefdf1

Please sign in to comment.