diff --git a/consumer_group.go b/consumer_group.go index 056b9e387..aae6599ca 100644 --- a/consumer_group.go +++ b/consumer_group.go @@ -513,6 +513,11 @@ type ConsumerGroupSession interface { // message twice, and your processing should ideally be idempotent. MarkOffset(topic string, partition int32, offset int64, metadata string) + // Commit the offset to the backend + // + // Note: calling Commit performs a blocking synchronous operation. + Commit() + // ResetOffset resets to the provided offset, alongside a metadata string that // represents the state of the partition consumer at that point in time. Reset // acts as a counterpart to MarkOffset, the difference being that it allows to @@ -624,6 +629,10 @@ func (s *consumerGroupSession) MarkOffset(topic string, partition int32, offset } } +func (s *consumerGroupSession) Commit() { + s.offsets.Commit() +} + func (s *consumerGroupSession) ResetOffset(topic string, partition int32, offset int64, metadata string) { if pom := s.offsets.findPOM(topic, partition); pom != nil { pom.ResetOffset(offset, metadata) diff --git a/offset_manager.go b/offset_manager.go index 19408729f..b4fea8226 100644 --- a/offset_manager.go +++ b/offset_manager.go @@ -19,6 +19,10 @@ type OffsetManager interface { // will otherwise leak memory. You must call this after all the // PartitionOffsetManagers are closed. Close() error + + // Commit commits the offsets. This method can be used if AutoCommit.Enable is + // set to false. + Commit() } type offsetManager struct { @@ -58,7 +62,6 @@ func newOffsetManagerFromClient(group, memberID string, generation int32, client client: client, conf: conf, group: group, - ticker: time.NewTicker(conf.Consumer.Offsets.AutoCommit.Interval), poms: make(map[string]map[int32]*partitionOffsetManager), memberID: memberID, @@ -67,7 +70,10 @@ func newOffsetManagerFromClient(group, memberID string, generation int32, client closing: make(chan none), closed: make(chan none), } - go withRecover(om.mainLoop) + if conf.Consumer.Offsets.AutoCommit.Enable { + om.ticker = time.NewTicker(conf.Consumer.Offsets.AutoCommit.Interval) + go withRecover(om.mainLoop) + } return om, nil } @@ -99,7 +105,9 @@ func (om *offsetManager) Close() error { om.closeOnce.Do(func() { // exit the mainLoop close(om.closing) - <-om.closed + if om.conf.Consumer.Offsets.AutoCommit.Enable { + <-om.closed + } // mark all POMs as closed om.asyncClosePOMs() @@ -225,20 +233,19 @@ func (om *offsetManager) mainLoop() { for { select { case <-om.ticker.C: - om.flushToBroker() - om.releasePOMs(false) + om.Commit() case <-om.closing: return } } } -// flushToBroker is ignored if auto-commit offsets is disabled -func (om *offsetManager) flushToBroker() { - if !om.conf.Consumer.Offsets.AutoCommit.Enable { - return - } +func (om *offsetManager) Commit() { + om.flushToBroker() + om.releasePOMs(false) +} +func (om *offsetManager) flushToBroker() { req := om.constructRequest() if req == nil { return diff --git a/offset_manager_test.go b/offset_manager_test.go index f1baa9cdb..5aa2ee0ff 100644 --- a/offset_manager_test.go +++ b/offset_manager_test.go @@ -169,6 +169,64 @@ func TestNewOffsetManagerOffsetsAutoCommit(t *testing.T) { } } +func TestNewOffsetManagerOffsetsManualCommit(t *testing.T) { + // Tests to validate configuration when `Consumer.Offsets.AutoCommit.Enable` is false + config := NewConfig() + config.Consumer.Offsets.AutoCommit.Enable = false + + om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, nil, config) + pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta") + + // Wait long enough for the test not to fail.. + timeout := 50 * config.Consumer.Offsets.AutoCommit.Interval + + ocResponse := new(OffsetCommitResponse) + ocResponse.AddError("my_topic", 0, ErrNoError) + called := make(chan none) + handler := func(req *request) (res encoderWithHeader) { + close(called) + return ocResponse + } + coordinator.setHandler(handler) + + // Should not trigger an auto-commit + expected := int64(1) + pom.ResetOffset(expected, "modified_meta") + _, _ = pom.NextOffset() + + select { + case <-called: + // OffsetManager called on the wire. + t.Errorf("Received request when AutoCommit is disabled") + case <-time.After(timeout): + // Timeout waiting for OffsetManager to call on the wire. + // OK + } + + // Setup again to test manual commit + called = make(chan none) + + om.Commit() + + select { + case <-called: + // OffsetManager called on the wire. + // OK + case <-time.After(timeout): + // Timeout waiting for OffsetManager to call on the wire. + t.Errorf("No request received for after waiting for %v", timeout) + } + + // Close up + broker.Close() + coordinator.Close() + + // !! om must be closed before the pom so pom.release() is called before pom.Close() + safeClose(t, om) + safeClose(t, pom) + safeClose(t, testClient) +} + // Test recovery from ErrNotCoordinatorForConsumer // on first fetchInitialOffset call func TestOffsetManagerFetchInitialFail(t *testing.T) {