From 4ddb792827f891c99751904c9a20b24a08d0c36b Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Thu, 11 Aug 2022 22:12:08 +0100 Subject: [PATCH 1/2] Merge pull request #2252 from retailnext/propagate-consumer-group-offset-out-of-range-error feat: add option to propagate OffsetOutOfRange error --- config.go | 9 +++++++++ consumer_group.go | 3 ++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/config.go b/config.go index 3f86f1fb9..8bff916a2 100644 --- a/config.go +++ b/config.go @@ -293,6 +293,14 @@ type Config struct { // coordinator for the group. UserData []byte } + + // If true, consumer offsets will be automatically reset to configured Initial value + // if the fetched consumer offset is out of range of available offsets. Out of range + // can happen if the data has been deleted from the server, or during situations of + // under-replication where a replica does not have all the data yet. It can be + // dangerous to reset the offset automatically, particularly in the latter case. Defaults + // to true to maintain existing behavior. + ResetInvalidOffsets bool } Retry struct { @@ -494,6 +502,7 @@ func NewConfig() *Config { c.Consumer.Group.Rebalance.Timeout = 60 * time.Second c.Consumer.Group.Rebalance.Retry.Max = 4 c.Consumer.Group.Rebalance.Retry.Backoff = 2 * time.Second + c.Consumer.Group.ResetInvalidOffsets = false c.ClientID = defaultClientID c.ChannelBufferSize = 256 diff --git a/consumer_group.go b/consumer_group.go index b603d1705..5a8d3072c 100644 --- a/consumer_group.go +++ b/consumer_group.go @@ -883,7 +883,8 @@ type consumerGroupClaim struct { func newConsumerGroupClaim(sess *consumerGroupSession, topic string, partition int32, offset int64) (*consumerGroupClaim, error) { pcm, err := sess.parent.consumer.ConsumePartition(topic, partition, offset) - if err == ErrOffsetOutOfRange { + + if errors.Is(err, ErrOffsetOutOfRange) && sess.parent.config.Consumer.Group.ResetInvalidOffsets { offset = sess.parent.config.Consumer.Offsets.Initial pcm, err = sess.parent.consumer.ConsumePartition(topic, partition, offset) } From e3bb312652da85ef59f3535faafdb73452aabe1a Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Tue, 27 Sep 2022 21:47:10 +0100 Subject: [PATCH 2/2] Merge pull request #2345 from Shopify/dnwe/fix-default fix(consumer): default ResetInvalidOffsets to true --- config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config.go b/config.go index 8bff916a2..a2e55af04 100644 --- a/config.go +++ b/config.go @@ -502,7 +502,7 @@ func NewConfig() *Config { c.Consumer.Group.Rebalance.Timeout = 60 * time.Second c.Consumer.Group.Rebalance.Retry.Max = 4 c.Consumer.Group.Rebalance.Retry.Backoff = 2 * time.Second - c.Consumer.Group.ResetInvalidOffsets = false + c.Consumer.Group.ResetInvalidOffsets = true c.ClientID = defaultClientID c.ChannelBufferSize = 256