Skip to content

Commit

Permalink
Merge pull request #1962 from hanxiaolin/fix-interceptor
Browse files Browse the repository at this point in the history
fix(consumer):  call interceptors when MaxProcessingTime expire
  • Loading branch information
dnwe committed Jun 22, 2021
2 parents 7d06a47 + 4c37463 commit 0676fc2
Showing 1 changed file with 8 additions and 3 deletions.
11 changes: 8 additions & 3 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,9 +468,7 @@ feederLoop:
}

for i, msg := range msgs {
for _, interceptor := range child.conf.Consumer.Interceptors {
msg.safelyApplyInterceptor(interceptor)
}
child.interceptors(msg)
messageSelect:
select {
case <-child.dying:
Expand All @@ -484,6 +482,7 @@ feederLoop:
child.broker.acks.Done()
remainingLoop:
for _, msg = range msgs[i:] {
child.interceptors(msg)
select {
case child.messages <- msg:
case <-child.dying:
Expand Down Expand Up @@ -715,6 +714,12 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
return messages, nil
}

func (child *partitionConsumer) interceptors(msg *ConsumerMessage) {
for _, interceptor := range child.conf.Consumer.Interceptors {
msg.safelyApplyInterceptor(interceptor)
}
}

type brokerConsumer struct {
consumer *consumer
broker *Broker
Expand Down

0 comments on commit 0676fc2

Please sign in to comment.