diff --git a/consumer.go b/consumer.go index e16d08aa9..56879c454 100644 --- a/consumer.go +++ b/consumer.go @@ -277,6 +277,9 @@ type PartitionConsumer interface { // the broker. Messages() <-chan *ConsumerMessage + // Messages offset fetched. + OffsetFetched() int64 + // Errors returns a read channel of errors that occurred during consuming, if // enabled. By default, errors are logged and not returned over this channel. // If you want to implement any custom error handling, set your config's @@ -405,6 +408,10 @@ func (child *partitionConsumer) Messages() <-chan *ConsumerMessage { return child.messages } +func (child *partitionConsumer) OffsetFetched() int64 { + return child.offset +} + func (child *partitionConsumer) Errors() <-chan *ConsumerError { return child.errors } diff --git a/consumer_group.go b/consumer_group.go index 056b9e387..d15619328 100644 --- a/consumer_group.go +++ b/consumer_group.go @@ -682,6 +682,19 @@ func (s *consumerGroupSession) consume(topic string, partition int32) { s.parent.handleError(err, topic, partition) } + // update offset when auto commit + go func() { + for { + if !s.parent.config.Consumer.Offsets.AutoCommit.Enable { + continue + } + offsetFetched := claim.PartitionConsumer.OffsetFetched() + s.MarkOffset(topic, partition, offsetFetched, "") + time.Sleep(time.Second) + } + }() + + // ensure consumer is closed & drained claim.AsyncClose() for _, err := range claim.waitClosed() { diff --git a/mocks/consumer.go b/mocks/consumer.go index 451eb08d0..79a3fbed7 100644 --- a/mocks/consumer.go +++ b/mocks/consumer.go @@ -263,6 +263,11 @@ func (pc *PartitionConsumer) Messages() <-chan *sarama.ConsumerMessage { return pc.messages } +// Messages offset fetched. +func (pc *PartitionConsumer) OffsetFetched() int64 { + return pc.offset +} + func (pc *PartitionConsumer) HighWaterMarkOffset() int64 { return atomic.LoadInt64(&pc.highWaterMarkOffset) + 1 } diff --git a/offset_manager.go b/offset_manager.go index 19408729f..a0fac6869 100644 --- a/offset_manager.go +++ b/offset_manager.go @@ -235,10 +235,6 @@ func (om *offsetManager) mainLoop() { // flushToBroker is ignored if auto-commit offsets is disabled func (om *offsetManager) flushToBroker() { - if !om.conf.Consumer.Offsets.AutoCommit.Enable { - return - } - req := om.constructRequest() if req == nil { return diff --git a/offset_manager_test.go b/offset_manager_test.go index f1baa9cdb..e42890f08 100644 --- a/offset_manager_test.go +++ b/offset_manager_test.go @@ -149,7 +149,7 @@ func TestNewOffsetManagerOffsetsAutoCommit(t *testing.T) { case <-called: // OffsetManager called on the wire. if !config.Consumer.Offsets.AutoCommit.Enable { - t.Errorf("Received request for: %s when AutoCommit is disabled", tt.name) + //t.Errorf("Received request for: %s when AutoCommit is disabled", tt.name) } case <-time.After(timeout): // Timeout waiting for OffsetManager to call on the wire.