From 895dd20a520f525025713691000683d265a3a48d Mon Sep 17 00:00:00 2001 From: Dimitrij Denissenko Date: Tue, 16 Jan 2018 17:26:07 +0000 Subject: [PATCH 1/2] Expose offset methods on partition consumers --- balancer.go | 4 ---- consumer.go | 48 +++++++++++++++++++++------------------------- partitions.go | 42 ++++++++++++++++------------------------ partitions_test.go | 48 +++++++++++++++++++++++----------------------- 4 files changed, 63 insertions(+), 79 deletions(-) diff --git a/balancer.go b/balancer.go index 0f9b445..3aeaece 100644 --- a/balancer.go +++ b/balancer.go @@ -157,10 +157,6 @@ func (r *balancer) Topic(name string, memberID string) error { } func (r *balancer) Perform(s Strategy) map[string]map[string][]int32 { - if r == nil { - return nil - } - res := make(map[string]map[string][]int32, 1) for topic, info := range r.topics { for memberID, partitions := range info.Perform(s) { diff --git a/consumer.go b/consumer.go index 46f199a..e7a67da 100644 --- a/consumer.go +++ b/consumer.go @@ -140,18 +140,16 @@ func (c *Consumer) HighWaterMarks() map[string]map[int32]int64 { return c.consum // your application crashes. This means that you may end up processing the same // message twice, and your processing should ideally be idempotent. func (c *Consumer) MarkOffset(msg *sarama.ConsumerMessage, metadata string) { - sub := c.subs.Fetch(msg.Topic, msg.Partition) - if sub != nil { - sub.MarkOffset(msg.Offset+1, metadata) + if sub := c.subs.Fetch(msg.Topic, msg.Partition); sub != nil { + sub.MarkOffset(msg.Offset, metadata) } } // MarkPartitionOffset marks an offset of the provided topic/partition as processed. // See MarkOffset for additional explanation. func (c *Consumer) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) { - sub := c.subs.Fetch(topic, partition) - if sub != nil { - sub.MarkOffset(offset+1, metadata) + if sub := c.subs.Fetch(topic, partition); sub != nil { + sub.MarkOffset(offset, metadata) } } @@ -162,9 +160,8 @@ func (c *Consumer) MarkOffsets(s *OffsetStash) { defer s.mu.Unlock() for tp, info := range s.offsets { - sub := c.subs.Fetch(tp.Topic, tp.Partition) - if sub != nil { - sub.MarkOffset(info.Offset+1, info.Metadata) + if sub := c.subs.Fetch(tp.Topic, tp.Partition); sub != nil { + sub.MarkOffset(info.Offset, info.Metadata) } delete(s.offsets, tp) } @@ -177,9 +174,8 @@ func (c *Consumer) MarkOffsets(s *OffsetStash) { // // Difference between ResetOffset and MarkOffset is that it allows to rewind to an earlier offset func (c *Consumer) ResetOffset(msg *sarama.ConsumerMessage, metadata string) { - sub := c.subs.Fetch(msg.Topic, msg.Partition) - if sub != nil { - sub.ResetOffset(msg.Offset+1, metadata) + if sub := c.subs.Fetch(msg.Topic, msg.Partition); sub != nil { + sub.ResetOffset(msg.Offset, metadata) } } @@ -188,7 +184,7 @@ func (c *Consumer) ResetOffset(msg *sarama.ConsumerMessage, metadata string) { func (c *Consumer) ResetPartitionOffset(topic string, partition int32, offset int64, metadata string) { sub := c.subs.Fetch(topic, partition) if sub != nil { - sub.ResetOffset(offset+1, metadata) + sub.ResetOffset(offset, metadata) } } @@ -199,9 +195,8 @@ func (c *Consumer) ResetOffsets(s *OffsetStash) { defer s.mu.Unlock() for tp, info := range s.offsets { - sub := c.subs.Fetch(tp.Topic, tp.Partition) - if sub != nil { - sub.ResetOffset(info.Offset+1, info.Metadata) + if sub := c.subs.Fetch(tp.Topic, tp.Partition); sub != nil { + sub.ResetOffset(info.Offset, info.Metadata) } delete(s.offsets, tp) } @@ -264,9 +259,8 @@ func (c *Consumer) CommitOffsets() error { if kerr != sarama.ErrNoError { err = kerr } else if state, ok := snap[topicPartition{topic, partition}]; ok { - sub := c.subs.Fetch(topic, partition) - if sub != nil { - sub.MarkCommitted(state.Info.Offset) + if sub := c.subs.Fetch(topic, partition); sub != nil { + sub.markCommitted(state.Info.Offset) } } } @@ -706,11 +700,13 @@ func (c *Consumer) syncGroup(strategy *balancer) (map[string][]int32, error) { GenerationId: generationID, } - for memberID, topics := range strategy.Perform(c.client.config.Group.PartitionStrategy) { - if err := req.AddGroupAssignmentMember(memberID, &sarama.ConsumerGroupMemberAssignment{ - Topics: topics, - }); err != nil { - return nil, err + if strategy != nil { + for memberID, topics := range strategy.Perform(c.client.config.Group.PartitionStrategy) { + if err := req.AddGroupAssignmentMember(memberID, &sarama.ConsumerGroupMemberAssignment{ + Topics: topics, + }); err != nil { + return nil, err + } } } @@ -828,9 +824,9 @@ func (c *Consumer) createConsumer(tomb *loopTomb, topic string, partition int32, // Start partition consumer goroutine tomb.Go(func(stopper <-chan none) { if c.client.config.Group.Mode == ConsumerModePartitions { - pc.WaitFor(stopper, c.errors) + pc.waitFor(stopper, c.errors) } else { - pc.Multiplex(stopper, c.messages, c.errors) + pc.multiplex(stopper, c.messages, c.errors) } }) diff --git a/partitions.go b/partitions.go index 987780b..a450ffe 100644 --- a/partitions.go +++ b/partitions.go @@ -19,6 +19,12 @@ type PartitionConsumer interface { // Partition returns the consumed partition Partition() int32 + + // MarkOffset marks the offset of a message as preocessed. + MarkOffset(offset int64, metadata string) + + // ResetOffset resets the offset to a previously processed message. + ResetOffset(offset int64, metadata string) } type partitionConsumer struct { @@ -81,7 +87,7 @@ func (c *partitionConsumer) Close() error { return c.closeErr } -func (c *partitionConsumer) WaitFor(stopper <-chan none, errors chan<- error) { +func (c *partitionConsumer) waitFor(stopper <-chan none, errors chan<- error) { defer close(c.dead) for { @@ -105,7 +111,7 @@ func (c *partitionConsumer) WaitFor(stopper <-chan none, errors chan<- error) { } } -func (c *partitionConsumer) Multiplex(stopper <-chan none, messages chan<- *sarama.ConsumerMessage, errors chan<- error) { +func (c *partitionConsumer) multiplex(stopper <-chan none, messages chan<- *sarama.ConsumerMessage, errors chan<- error) { defer close(c.dead) for { @@ -140,11 +146,7 @@ func (c *partitionConsumer) Multiplex(stopper <-chan none, messages chan<- *sara } } -func (c *partitionConsumer) State() partitionState { - if c == nil { - return partitionState{} - } - +func (c *partitionConsumer) getState() partitionState { c.mu.Lock() state := c.state c.mu.Unlock() @@ -152,11 +154,7 @@ func (c *partitionConsumer) State() partitionState { return state } -func (c *partitionConsumer) MarkCommitted(offset int64) { - if c == nil { - return - } - +func (c *partitionConsumer) markCommitted(offset int64) { c.mu.Lock() if offset == c.state.Info.Offset { c.state.Dirty = false @@ -164,28 +162,22 @@ func (c *partitionConsumer) MarkCommitted(offset int64) { c.mu.Unlock() } +// MarkOffset implements PartitionConsumer func (c *partitionConsumer) MarkOffset(offset int64, metadata string) { - if c == nil { - return - } - c.mu.Lock() - if offset > c.state.Info.Offset { - c.state.Info.Offset = offset + if next := offset + 1; next > c.state.Info.Offset { + c.state.Info.Offset = next c.state.Info.Metadata = metadata c.state.Dirty = true } c.mu.Unlock() } +// ResetOffset implements PartitionConsumer func (c *partitionConsumer) ResetOffset(offset int64, metadata string) { - if c == nil { - return - } - c.mu.Lock() - if offset <= c.state.Info.Offset { - c.state.Info.Offset = offset + if next := offset + 1; next <= c.state.Info.Offset { + c.state.Info.Offset = next c.state.Info.Metadata = metadata c.state.Dirty = true } @@ -244,7 +236,7 @@ func (m *partitionMap) Snapshot() map[topicPartition]partitionState { snap := make(map[topicPartition]partitionState, len(m.data)) for tp, pc := range m.data { - snap[tp] = pc.State() + snap[tp] = pc.getState() } return snap } diff --git a/partitions_test.go b/partitions_test.go index 77db918..8fff0fd 100644 --- a/partitions_test.go +++ b/partitions_test.go @@ -21,7 +21,7 @@ var _ = Describe("partitionConsumer", func() { }) It("should set state", func() { - Expect(subject.State()).To(Equal(partitionState{ + Expect(subject.getState()).To(Equal(partitionState{ Info: offsetInfo{2000, "m3ta"}, })) }) @@ -32,64 +32,64 @@ var _ = Describe("partitionConsumer", func() { defer pc.Close() close(pc.dead) - state := pc.State() + state := pc.getState() Expect(state.Info.Offset).To(Equal(int64(-1))) Expect(state.Info.Metadata).To(Equal("m3ta")) }) It("should update state", func() { subject.MarkOffset(2001, "met@") // should set state - Expect(subject.State()).To(Equal(partitionState{ - Info: offsetInfo{2001, "met@"}, + Expect(subject.getState()).To(Equal(partitionState{ + Info: offsetInfo{2002, "met@"}, Dirty: true, })) - subject.MarkCommitted(2001) // should reset dirty status - Expect(subject.State()).To(Equal(partitionState{ - Info: offsetInfo{2001, "met@"}, + subject.markCommitted(2002) // should reset dirty status + Expect(subject.getState()).To(Equal(partitionState{ + Info: offsetInfo{2002, "met@"}, })) subject.MarkOffset(2001, "me7a") // should not update state - Expect(subject.State()).To(Equal(partitionState{ - Info: offsetInfo{2001, "met@"}, + Expect(subject.getState()).To(Equal(partitionState{ + Info: offsetInfo{2002, "met@"}, })) subject.MarkOffset(2002, "me7a") // should bump state - Expect(subject.State()).To(Equal(partitionState{ - Info: offsetInfo{2002, "me7a"}, + Expect(subject.getState()).To(Equal(partitionState{ + Info: offsetInfo{2003, "me7a"}, Dirty: true, })) // After committing a later offset, try rewinding back to earlier offset with new metadata. subject.ResetOffset(2001, "met@") - Expect(subject.State()).To(Equal(partitionState{ - Info: offsetInfo{2001, "met@"}, + Expect(subject.getState()).To(Equal(partitionState{ + Info: offsetInfo{2002, "met@"}, Dirty: true, })) - subject.MarkCommitted(2001) // should not unset state - Expect(subject.State()).To(Equal(partitionState{ - Info: offsetInfo{2001, "met@"}, + subject.markCommitted(2002) // should not unset state + Expect(subject.getState()).To(Equal(partitionState{ + Info: offsetInfo{2002, "met@"}, })) subject.MarkOffset(2002, "me7a") // should bump state - Expect(subject.State()).To(Equal(partitionState{ - Info: offsetInfo{2002, "me7a"}, + Expect(subject.getState()).To(Equal(partitionState{ + Info: offsetInfo{2003, "me7a"}, Dirty: true, })) - subject.MarkCommitted(2002) - Expect(subject.State()).To(Equal(partitionState{ - Info: offsetInfo{2002, "me7a"}, + subject.markCommitted(2003) + Expect(subject.getState()).To(Equal(partitionState{ + Info: offsetInfo{2003, "me7a"}, })) }) It("should not fail when nil", func() { blank := (*partitionConsumer)(nil) Expect(func() { - _ = blank.State() + _ = blank.getState() blank.MarkOffset(2001, "met@") - blank.MarkCommitted(2001) + blank.markCommitted(2001) }).NotTo(Panic()) }) @@ -135,7 +135,7 @@ var _ = Describe("partitionMap", func() { subject.Store("topic", 0, pc0) subject.Store("topic", 1, pc1) - subject.Fetch("topic", 1).MarkOffset(2001, "met@") + subject.Fetch("topic", 1).MarkOffset(2000, "met@") Expect(subject.Snapshot()).To(Equal(map[topicPartition]partitionState{ {"topic", 0}: {Info: offsetInfo{2000, "m3ta"}, Dirty: false}, From 60c9f446bde5e2656fece3b04cccacc4d2c80cfe Mon Sep 17 00:00:00 2001 From: Dimitrij Denissenko Date: Wed, 7 Mar 2018 15:53:55 +0000 Subject: [PATCH 2/2] Fix/remove test --- partitions_test.go | 9 --------- 1 file changed, 9 deletions(-) diff --git a/partitions_test.go b/partitions_test.go index 8fff0fd..4065ea5 100644 --- a/partitions_test.go +++ b/partitions_test.go @@ -84,15 +84,6 @@ var _ = Describe("partitionConsumer", func() { })) }) - It("should not fail when nil", func() { - blank := (*partitionConsumer)(nil) - Expect(func() { - _ = blank.getState() - blank.MarkOffset(2001, "met@") - blank.markCommitted(2001) - }).NotTo(Panic()) - }) - }) var _ = Describe("partitionMap", func() {