diff --git a/config.go b/config.go index 3f86f1fb9..8bff916a2 100644 --- a/config.go +++ b/config.go @@ -293,6 +293,14 @@ type Config struct { // coordinator for the group. UserData []byte } + + // If true, consumer offsets will be automatically reset to configured Initial value + // if the fetched consumer offset is out of range of available offsets. Out of range + // can happen if the data has been deleted from the server, or during situations of + // under-replication where a replica does not have all the data yet. It can be + // dangerous to reset the offset automatically, particularly in the latter case. Defaults + // to true to maintain existing behavior. + ResetInvalidOffsets bool } Retry struct { @@ -494,6 +502,7 @@ func NewConfig() *Config { c.Consumer.Group.Rebalance.Timeout = 60 * time.Second c.Consumer.Group.Rebalance.Retry.Max = 4 c.Consumer.Group.Rebalance.Retry.Backoff = 2 * time.Second + c.Consumer.Group.ResetInvalidOffsets = false c.ClientID = defaultClientID c.ChannelBufferSize = 256 diff --git a/consumer_group.go b/consumer_group.go index b603d1705..5a8d3072c 100644 --- a/consumer_group.go +++ b/consumer_group.go @@ -883,7 +883,8 @@ type consumerGroupClaim struct { func newConsumerGroupClaim(sess *consumerGroupSession, topic string, partition int32, offset int64) (*consumerGroupClaim, error) { pcm, err := sess.parent.consumer.ConsumePartition(topic, partition, offset) - if err == ErrOffsetOutOfRange { + + if errors.Is(err, ErrOffsetOutOfRange) && sess.parent.config.Consumer.Group.ResetInvalidOffsets { offset = sess.parent.config.Consumer.Offsets.Initial pcm, err = sess.parent.consumer.ConsumePartition(topic, partition, offset) }