diff --git a/consumer.go b/consumer.go index 3c0fda5..e59d89c 100644 --- a/consumer.go +++ b/consumer.go @@ -140,13 +140,13 @@ func (c *Consumer) HighWaterMarks() map[string]map[int32]int64 { return c.consum // your application crashes. This means that you may end up processing the same // message twice, and your processing should ideally be idempotent. func (c *Consumer) MarkOffset(msg *sarama.ConsumerMessage, metadata string) { - c.subs.Fetch(msg.Topic, msg.Partition).MarkOffset(msg.Offset+1, metadata) + c.subs.Fetch(msg.Topic, msg.Partition).MarkOffset(msg.Offset, metadata) } // MarkPartitionOffset marks an offset of the provided topic/partition as processed. // See MarkOffset for additional explanation. func (c *Consumer) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) { - c.subs.Fetch(topic, partition).MarkOffset(offset+1, metadata) + c.subs.Fetch(topic, partition).MarkOffset(offset, metadata) } // MarkOffsets marks stashed offsets as processed. @@ -156,7 +156,7 @@ func (c *Consumer) MarkOffsets(s *OffsetStash) { defer s.mu.Unlock() for tp, info := range s.offsets { - c.subs.Fetch(tp.Topic, tp.Partition).MarkOffset(info.Offset+1, info.Metadata) + c.subs.Fetch(tp.Topic, tp.Partition).MarkOffset(info.Offset, info.Metadata) delete(s.offsets, tp) } } @@ -168,13 +168,13 @@ func (c *Consumer) MarkOffsets(s *OffsetStash) { // // Difference between ResetOffset and MarkOffset is that it allows to rewind to an earlier offset func (c *Consumer) ResetOffset(msg *sarama.ConsumerMessage, metadata string) { - c.subs.Fetch(msg.Topic, msg.Partition).ResetOffset(msg.Offset+1, metadata) + c.subs.Fetch(msg.Topic, msg.Partition).ResetOffset(msg.Offset, metadata) } // ResetPartitionOffset marks an offset of the provided topic/partition as processed. // See ResetOffset for additional explanation. func (c *Consumer) ResetPartitionOffset(topic string, partition int32, offset int64, metadata string) { - c.subs.Fetch(topic, partition).ResetOffset(offset+1, metadata) + c.subs.Fetch(topic, partition).ResetOffset(offset, metadata) } // ResetOffsets marks stashed offsets as processed. @@ -184,7 +184,7 @@ func (c *Consumer) ResetOffsets(s *OffsetStash) { defer s.mu.Unlock() for tp, info := range s.offsets { - c.subs.Fetch(tp.Topic, tp.Partition).ResetOffset(info.Offset+1, info.Metadata) + c.subs.Fetch(tp.Topic, tp.Partition).ResetOffset(info.Offset, info.Metadata) delete(s.offsets, tp) } } @@ -246,7 +246,7 @@ func (c *Consumer) CommitOffsets() error { if kerr != sarama.ErrNoError { err = kerr } else if state, ok := snap[topicPartition{topic, partition}]; ok { - c.subs.Fetch(topic, partition).MarkCommitted(state.Info.Offset) + c.subs.Fetch(topic, partition).markCommitted(state.Info.Offset) } } } @@ -808,9 +808,9 @@ func (c *Consumer) createConsumer(tomb *loopTomb, topic string, partition int32, // Start partition consumer goroutine tomb.Go(func(stopper <-chan none) { if c.client.config.Group.Mode == ConsumerModePartitions { - pc.WaitFor(stopper, c.errors) + pc.waitFor(stopper, c.errors) } else { - pc.Multiplex(stopper, c.messages, c.errors) + pc.multiplex(stopper, c.messages, c.errors) } }) diff --git a/consumer_test.go b/consumer_test.go index 36ee447..7d4970c 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -9,8 +9,6 @@ import ( "github.com/Shopify/sarama" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "math/rand" - "sync" ) var _ = Describe("Consumer", func() { diff --git a/partitions.go b/partitions.go index 0f106a2..838e45f 100644 --- a/partitions.go +++ b/partitions.go @@ -31,6 +31,12 @@ type PartitionConsumer interface { // Partition returns the consumed partition Partition() int32 + + // MarkOffset marks the offset of a message as preocessed. + MarkOffset(offset int64, metadata string) + + // ResetOffset resets the offset to a previously processed message. + ResetOffset(offset int64, metadata string) } type partitionConsumer struct { @@ -76,7 +82,7 @@ func (c *partitionConsumer) Topic() string { return c.topic } // Partition implements PartitionConsumer func (c *partitionConsumer) Partition() int32 { return c.partition } -func (c *partitionConsumer) WaitFor(stopper <-chan none, errors chan<- error) { +func (c *partitionConsumer) waitFor(stopper <-chan none, errors chan<- error) { defer close(c.dead) for { @@ -100,7 +106,7 @@ func (c *partitionConsumer) WaitFor(stopper <-chan none, errors chan<- error) { } } -func (c *partitionConsumer) Multiplex(stopper <-chan none, messages chan<- *sarama.ConsumerMessage, errors chan<- error) { +func (c *partitionConsumer) multiplex(stopper <-chan none, messages chan<- *sarama.ConsumerMessage, errors chan<- error) { defer close(c.dead) for { @@ -144,7 +150,7 @@ func (c *partitionConsumer) Close() (err error) { return err } -func (c *partitionConsumer) State() partitionState { +func (c *partitionConsumer) getState() partitionState { if c == nil { return partitionState{} } @@ -156,7 +162,7 @@ func (c *partitionConsumer) State() partitionState { return state } -func (c *partitionConsumer) MarkCommitted(offset int64) { +func (c *partitionConsumer) markCommitted(offset int64) { if c == nil { return } @@ -168,28 +174,30 @@ func (c *partitionConsumer) MarkCommitted(offset int64) { c.mu.Unlock() } +// MarkOffset implements PartitionConsumer func (c *partitionConsumer) MarkOffset(offset int64, metadata string) { if c == nil { return } c.mu.Lock() - if offset > c.state.Info.Offset { - c.state.Info.Offset = offset + if next := offset + 1; next > c.state.Info.Offset { + c.state.Info.Offset = next c.state.Info.Metadata = metadata c.state.Dirty = true } c.mu.Unlock() } +// ResetOffset implements PartitionConsumer func (c *partitionConsumer) ResetOffset(offset int64, metadata string) { if c == nil { return } c.mu.Lock() - if offset <= c.state.Info.Offset { - c.state.Info.Offset = offset + if next := offset + 1; next <= c.state.Info.Offset { + c.state.Info.Offset = next c.state.Info.Metadata = metadata c.state.Dirty = true } @@ -248,7 +256,7 @@ func (m *partitionMap) Snapshot() map[topicPartition]partitionState { snap := make(map[topicPartition]partitionState, len(m.data)) for tp, pc := range m.data { - snap[tp] = pc.State() + snap[tp] = pc.getState() } return snap } diff --git a/partitions_test.go b/partitions_test.go index 77db918..8fff0fd 100644 --- a/partitions_test.go +++ b/partitions_test.go @@ -21,7 +21,7 @@ var _ = Describe("partitionConsumer", func() { }) It("should set state", func() { - Expect(subject.State()).To(Equal(partitionState{ + Expect(subject.getState()).To(Equal(partitionState{ Info: offsetInfo{2000, "m3ta"}, })) }) @@ -32,64 +32,64 @@ var _ = Describe("partitionConsumer", func() { defer pc.Close() close(pc.dead) - state := pc.State() + state := pc.getState() Expect(state.Info.Offset).To(Equal(int64(-1))) Expect(state.Info.Metadata).To(Equal("m3ta")) }) It("should update state", func() { subject.MarkOffset(2001, "met@") // should set state - Expect(subject.State()).To(Equal(partitionState{ - Info: offsetInfo{2001, "met@"}, + Expect(subject.getState()).To(Equal(partitionState{ + Info: offsetInfo{2002, "met@"}, Dirty: true, })) - subject.MarkCommitted(2001) // should reset dirty status - Expect(subject.State()).To(Equal(partitionState{ - Info: offsetInfo{2001, "met@"}, + subject.markCommitted(2002) // should reset dirty status + Expect(subject.getState()).To(Equal(partitionState{ + Info: offsetInfo{2002, "met@"}, })) subject.MarkOffset(2001, "me7a") // should not update state - Expect(subject.State()).To(Equal(partitionState{ - Info: offsetInfo{2001, "met@"}, + Expect(subject.getState()).To(Equal(partitionState{ + Info: offsetInfo{2002, "met@"}, })) subject.MarkOffset(2002, "me7a") // should bump state - Expect(subject.State()).To(Equal(partitionState{ - Info: offsetInfo{2002, "me7a"}, + Expect(subject.getState()).To(Equal(partitionState{ + Info: offsetInfo{2003, "me7a"}, Dirty: true, })) // After committing a later offset, try rewinding back to earlier offset with new metadata. subject.ResetOffset(2001, "met@") - Expect(subject.State()).To(Equal(partitionState{ - Info: offsetInfo{2001, "met@"}, + Expect(subject.getState()).To(Equal(partitionState{ + Info: offsetInfo{2002, "met@"}, Dirty: true, })) - subject.MarkCommitted(2001) // should not unset state - Expect(subject.State()).To(Equal(partitionState{ - Info: offsetInfo{2001, "met@"}, + subject.markCommitted(2002) // should not unset state + Expect(subject.getState()).To(Equal(partitionState{ + Info: offsetInfo{2002, "met@"}, })) subject.MarkOffset(2002, "me7a") // should bump state - Expect(subject.State()).To(Equal(partitionState{ - Info: offsetInfo{2002, "me7a"}, + Expect(subject.getState()).To(Equal(partitionState{ + Info: offsetInfo{2003, "me7a"}, Dirty: true, })) - subject.MarkCommitted(2002) - Expect(subject.State()).To(Equal(partitionState{ - Info: offsetInfo{2002, "me7a"}, + subject.markCommitted(2003) + Expect(subject.getState()).To(Equal(partitionState{ + Info: offsetInfo{2003, "me7a"}, })) }) It("should not fail when nil", func() { blank := (*partitionConsumer)(nil) Expect(func() { - _ = blank.State() + _ = blank.getState() blank.MarkOffset(2001, "met@") - blank.MarkCommitted(2001) + blank.markCommitted(2001) }).NotTo(Panic()) }) @@ -135,7 +135,7 @@ var _ = Describe("partitionMap", func() { subject.Store("topic", 0, pc0) subject.Store("topic", 1, pc1) - subject.Fetch("topic", 1).MarkOffset(2001, "met@") + subject.Fetch("topic", 1).MarkOffset(2000, "met@") Expect(subject.Snapshot()).To(Equal(map[topicPartition]partitionState{ {"topic", 0}: {Info: offsetInfo{2000, "m3ta"}, Dirty: false},