Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix auto-commit bug and make a feature of auto-commit #1687

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions consumer.go
Expand Up @@ -277,6 +277,9 @@ type PartitionConsumer interface {
// the broker.
Messages() <-chan *ConsumerMessage

// Messages offset fetched.
OffsetFetched() int64

// Errors returns a read channel of errors that occurred during consuming, if
// enabled. By default, errors are logged and not returned over this channel.
// If you want to implement any custom error handling, set your config's
Expand Down Expand Up @@ -405,6 +408,10 @@ func (child *partitionConsumer) Messages() <-chan *ConsumerMessage {
return child.messages
}

func (child *partitionConsumer) OffsetFetched() int64 {
return child.offset
}

func (child *partitionConsumer) Errors() <-chan *ConsumerError {
return child.errors
}
Expand Down
13 changes: 13 additions & 0 deletions consumer_group.go
Expand Up @@ -682,6 +682,19 @@ func (s *consumerGroupSession) consume(topic string, partition int32) {
s.parent.handleError(err, topic, partition)
}

// update offset when auto commit
go func() {
for {
if !s.parent.config.Consumer.Offsets.AutoCommit.Enable {
continue
}
offsetFetched := claim.PartitionConsumer.OffsetFetched()
s.MarkOffset(topic, partition, offsetFetched, "")
time.Sleep(time.Second)
}
}()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are you doing?! You mark all fetched messages consumed every second? And not respect to the closing event (ctx.Done() or parent.closed), which cause goroutine leak.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for you review.
I want to implement the auto-commit feature. Yes, closing event should be respected.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, the auto-commit feature already exists.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See this thread, #1570
The auto-commit feature won't work, and i have add analysis in the desc of this fix.


// ensure consumer is closed & drained
claim.AsyncClose()
for _, err := range claim.waitClosed() {
Expand Down
5 changes: 5 additions & 0 deletions mocks/consumer.go
Expand Up @@ -263,6 +263,11 @@ func (pc *PartitionConsumer) Messages() <-chan *sarama.ConsumerMessage {
return pc.messages
}

// Messages offset fetched.
func (pc *PartitionConsumer) OffsetFetched() int64 {
return pc.offset
}

func (pc *PartitionConsumer) HighWaterMarkOffset() int64 {
return atomic.LoadInt64(&pc.highWaterMarkOffset) + 1
}
Expand Down
4 changes: 0 additions & 4 deletions offset_manager.go
Expand Up @@ -235,10 +235,6 @@ func (om *offsetManager) mainLoop() {

// flushToBroker is ignored if auto-commit offsets is disabled
func (om *offsetManager) flushToBroker() {
if !om.conf.Consumer.Offsets.AutoCommit.Enable {
return
}

req := om.constructRequest()
if req == nil {
return
Expand Down
2 changes: 1 addition & 1 deletion offset_manager_test.go
Expand Up @@ -149,7 +149,7 @@ func TestNewOffsetManagerOffsetsAutoCommit(t *testing.T) {
case <-called:
// OffsetManager called on the wire.
if !config.Consumer.Offsets.AutoCommit.Enable {
t.Errorf("Received request for: %s when AutoCommit is disabled", tt.name)
//t.Errorf("Received request for: %s when AutoCommit is disabled", tt.name)
}
case <-time.After(timeout):
// Timeout waiting for OffsetManager to call on the wire.
Expand Down