Skip to content

Commit

Permalink
Merge pull request #4 from remicalixte/dm/v1.30.1-patched
Browse files Browse the repository at this point in the history
 feat: add option to propagate OffsetOutOfRange error
  • Loading branch information
remicalixte committed Apr 28, 2023
2 parents 1d57ef5 + e3bb312 commit b33682e
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 1 deletion.
9 changes: 9 additions & 0 deletions config.go
Expand Up @@ -293,6 +293,14 @@ type Config struct {
// coordinator for the group.
UserData []byte
}

// If true, consumer offsets will be automatically reset to configured Initial value
// if the fetched consumer offset is out of range of available offsets. Out of range
// can happen if the data has been deleted from the server, or during situations of
// under-replication where a replica does not have all the data yet. It can be
// dangerous to reset the offset automatically, particularly in the latter case. Defaults
// to true to maintain existing behavior.
ResetInvalidOffsets bool
}

Retry struct {
Expand Down Expand Up @@ -494,6 +502,7 @@ func NewConfig() *Config {
c.Consumer.Group.Rebalance.Timeout = 60 * time.Second
c.Consumer.Group.Rebalance.Retry.Max = 4
c.Consumer.Group.Rebalance.Retry.Backoff = 2 * time.Second
c.Consumer.Group.ResetInvalidOffsets = true

c.ClientID = defaultClientID
c.ChannelBufferSize = 256
Expand Down
3 changes: 2 additions & 1 deletion consumer_group.go
Expand Up @@ -883,7 +883,8 @@ type consumerGroupClaim struct {

func newConsumerGroupClaim(sess *consumerGroupSession, topic string, partition int32, offset int64) (*consumerGroupClaim, error) {
pcm, err := sess.parent.consumer.ConsumePartition(topic, partition, offset)
if err == ErrOffsetOutOfRange {

if errors.Is(err, ErrOffsetOutOfRange) && sess.parent.config.Consumer.Group.ResetInvalidOffsets {
offset = sess.parent.config.Consumer.Offsets.Initial
pcm, err = sess.parent.consumer.ConsumePartition(topic, partition, offset)
}
Expand Down

0 comments on commit b33682e

Please sign in to comment.