Skip to content

Commit

Permalink
fix: bug with parent context cancelling and handle pr review
Browse files Browse the repository at this point in the history
  • Loading branch information
oss92 committed Mar 29, 2021
1 parent 37f3d8b commit 46882bc
Showing 1 changed file with 16 additions and 12 deletions.
28 changes: 16 additions & 12 deletions component/kafka/group/component.go
Expand Up @@ -187,45 +187,49 @@ func (c *Component) processing(ctx context.Context) error {
c.batchTimeout, c.commitSync)

client, err := sarama.NewConsumerGroup(c.brokers, c.group, c.saramaConfig)
componentError = err
if err != nil {
componentError = err
log.Errorf("error creating consumer group client for kafka component: %v", err)
}

if client != nil {
// `Consume` should be called inside an infinite loop, when a
// server-side rebalance happens, the consumer session will need to be
// `Consume` should be called inside a loop, when a
// server-side re-balance happens, the consumer session will need to be
// recreated to get the new claims
if err := client.Consume(ctx, c.topics, handler); err != nil {
err := client.Consume(ctx, c.topics, handler)
componentError = err
if err != nil {
log.Errorf("error from kafka consumer: %v", err)
}

// check if context was cancelled or deadline exceeded, signaling that the consumer should stop
if ctx.Err() != nil {
log.Infof("kafka component terminating: context cancelled or deadline exceeded")
break
}

err = client.Close()
if err != nil {
log.Errorf("error closing kafka consumer: %v", err)
}
}

consumerErrorsInc(c.name)

// check if context was cancelled or deadline exceeded, signaling that the consumer should stop
if ctx.Err() != nil {
log.Infof("kafka component terminating: context cancelled or deadline exceeded")
break
}

if c.retries > 0 {
if handler.processedMessages {
i = 0
componentError = nil
}
log.Errorf("failed run, retry %d/%d with %v wait: %v", i, c.retries, c.retryWait, err)
log.Errorf("failed run, retry %d/%d with %v wait: %v", i, c.retries, c.retryWait, componentError)
time.Sleep(c.retryWait)
}

// If there is no component error which is a result of not being able to initialize the consumer
// then the handler errored while processing a message. This faulty message is then the reason
// behind the component failure.
if i == retries && componentError == nil {
componentError = handler.err
componentError = fmt.Errorf("message processing failure exhausted %d retries: %w", i, handler.err)
}
}
return componentError
Expand Down

0 comments on commit 46882bc

Please sign in to comment.