Skip to content
This repository has been archived by the owner on Jan 8, 2020. It is now read-only.

Commit

Permalink
Expose offset methods on partition consumers
Browse files Browse the repository at this point in the history
  • Loading branch information
dim committed Jan 16, 2018
1 parent baf05b3 commit ff20ee4
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 44 deletions.
18 changes: 9 additions & 9 deletions consumer.go
Expand Up @@ -140,13 +140,13 @@ func (c *Consumer) HighWaterMarks() map[string]map[int32]int64 { return c.consum
// your application crashes. This means that you may end up processing the same
// message twice, and your processing should ideally be idempotent.
func (c *Consumer) MarkOffset(msg *sarama.ConsumerMessage, metadata string) {
c.subs.Fetch(msg.Topic, msg.Partition).MarkOffset(msg.Offset+1, metadata)
c.subs.Fetch(msg.Topic, msg.Partition).MarkOffset(msg.Offset, metadata)
}

// MarkPartitionOffset marks an offset of the provided topic/partition as processed.
// See MarkOffset for additional explanation.
func (c *Consumer) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) {
c.subs.Fetch(topic, partition).MarkOffset(offset+1, metadata)
c.subs.Fetch(topic, partition).MarkOffset(offset, metadata)
}

// MarkOffsets marks stashed offsets as processed.
Expand All @@ -156,7 +156,7 @@ func (c *Consumer) MarkOffsets(s *OffsetStash) {
defer s.mu.Unlock()

for tp, info := range s.offsets {
c.subs.Fetch(tp.Topic, tp.Partition).MarkOffset(info.Offset+1, info.Metadata)
c.subs.Fetch(tp.Topic, tp.Partition).MarkOffset(info.Offset, info.Metadata)
delete(s.offsets, tp)
}
}
Expand All @@ -168,13 +168,13 @@ func (c *Consumer) MarkOffsets(s *OffsetStash) {
//
// Difference between ResetOffset and MarkOffset is that it allows to rewind to an earlier offset
func (c *Consumer) ResetOffset(msg *sarama.ConsumerMessage, metadata string) {
c.subs.Fetch(msg.Topic, msg.Partition).ResetOffset(msg.Offset+1, metadata)
c.subs.Fetch(msg.Topic, msg.Partition).ResetOffset(msg.Offset, metadata)
}

// ResetPartitionOffset marks an offset of the provided topic/partition as processed.
// See ResetOffset for additional explanation.
func (c *Consumer) ResetPartitionOffset(topic string, partition int32, offset int64, metadata string) {
c.subs.Fetch(topic, partition).ResetOffset(offset+1, metadata)
c.subs.Fetch(topic, partition).ResetOffset(offset, metadata)
}

// ResetOffsets marks stashed offsets as processed.
Expand All @@ -184,7 +184,7 @@ func (c *Consumer) ResetOffsets(s *OffsetStash) {
defer s.mu.Unlock()

for tp, info := range s.offsets {
c.subs.Fetch(tp.Topic, tp.Partition).ResetOffset(info.Offset+1, info.Metadata)
c.subs.Fetch(tp.Topic, tp.Partition).ResetOffset(info.Offset, info.Metadata)
delete(s.offsets, tp)
}
}
Expand Down Expand Up @@ -246,7 +246,7 @@ func (c *Consumer) CommitOffsets() error {
if kerr != sarama.ErrNoError {
err = kerr
} else if state, ok := snap[topicPartition{topic, partition}]; ok {
c.subs.Fetch(topic, partition).MarkCommitted(state.Info.Offset)
c.subs.Fetch(topic, partition).markCommitted(state.Info.Offset)
}
}
}
Expand Down Expand Up @@ -808,9 +808,9 @@ func (c *Consumer) createConsumer(tomb *loopTomb, topic string, partition int32,
// Start partition consumer goroutine
tomb.Go(func(stopper <-chan none) {
if c.client.config.Group.Mode == ConsumerModePartitions {
pc.WaitFor(stopper, c.errors)
pc.waitFor(stopper, c.errors)
} else {
pc.Multiplex(stopper, c.messages, c.errors)
pc.multiplex(stopper, c.messages, c.errors)
}
})

Expand Down
2 changes: 0 additions & 2 deletions consumer_test.go
Expand Up @@ -9,8 +9,6 @@ import (
"github.com/Shopify/sarama"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"math/rand"
"sync"
)

var _ = Describe("Consumer", func() {
Expand Down
26 changes: 17 additions & 9 deletions partitions.go
Expand Up @@ -31,6 +31,12 @@ type PartitionConsumer interface {

// Partition returns the consumed partition
Partition() int32

// MarkOffset marks the offset of a message as preocessed.
MarkOffset(offset int64, metadata string)

// ResetOffset resets the offset to a previously processed message.
ResetOffset(offset int64, metadata string)
}

type partitionConsumer struct {
Expand Down Expand Up @@ -76,7 +82,7 @@ func (c *partitionConsumer) Topic() string { return c.topic }
// Partition implements PartitionConsumer
func (c *partitionConsumer) Partition() int32 { return c.partition }

func (c *partitionConsumer) WaitFor(stopper <-chan none, errors chan<- error) {
func (c *partitionConsumer) waitFor(stopper <-chan none, errors chan<- error) {
defer close(c.dead)

for {
Expand All @@ -100,7 +106,7 @@ func (c *partitionConsumer) WaitFor(stopper <-chan none, errors chan<- error) {
}
}

func (c *partitionConsumer) Multiplex(stopper <-chan none, messages chan<- *sarama.ConsumerMessage, errors chan<- error) {
func (c *partitionConsumer) multiplex(stopper <-chan none, messages chan<- *sarama.ConsumerMessage, errors chan<- error) {
defer close(c.dead)

for {
Expand Down Expand Up @@ -144,7 +150,7 @@ func (c *partitionConsumer) Close() (err error) {
return err
}

func (c *partitionConsumer) State() partitionState {
func (c *partitionConsumer) getState() partitionState {
if c == nil {
return partitionState{}
}
Expand All @@ -156,7 +162,7 @@ func (c *partitionConsumer) State() partitionState {
return state
}

func (c *partitionConsumer) MarkCommitted(offset int64) {
func (c *partitionConsumer) markCommitted(offset int64) {
if c == nil {
return
}
Expand All @@ -168,28 +174,30 @@ func (c *partitionConsumer) MarkCommitted(offset int64) {
c.mu.Unlock()
}

// MarkOffset implements PartitionConsumer
func (c *partitionConsumer) MarkOffset(offset int64, metadata string) {
if c == nil {
return
}

c.mu.Lock()
if offset > c.state.Info.Offset {
c.state.Info.Offset = offset
if next := offset + 1; next > c.state.Info.Offset {
c.state.Info.Offset = next
c.state.Info.Metadata = metadata
c.state.Dirty = true
}
c.mu.Unlock()
}

// ResetOffset implements PartitionConsumer
func (c *partitionConsumer) ResetOffset(offset int64, metadata string) {
if c == nil {
return
}

c.mu.Lock()
if offset <= c.state.Info.Offset {
c.state.Info.Offset = offset
if next := offset + 1; next <= c.state.Info.Offset {
c.state.Info.Offset = next
c.state.Info.Metadata = metadata
c.state.Dirty = true
}
Expand Down Expand Up @@ -248,7 +256,7 @@ func (m *partitionMap) Snapshot() map[topicPartition]partitionState {

snap := make(map[topicPartition]partitionState, len(m.data))
for tp, pc := range m.data {
snap[tp] = pc.State()
snap[tp] = pc.getState()
}
return snap
}
Expand Down
48 changes: 24 additions & 24 deletions partitions_test.go
Expand Up @@ -21,7 +21,7 @@ var _ = Describe("partitionConsumer", func() {
})

It("should set state", func() {
Expect(subject.State()).To(Equal(partitionState{
Expect(subject.getState()).To(Equal(partitionState{
Info: offsetInfo{2000, "m3ta"},
}))
})
Expand All @@ -32,64 +32,64 @@ var _ = Describe("partitionConsumer", func() {
defer pc.Close()
close(pc.dead)

state := pc.State()
state := pc.getState()
Expect(state.Info.Offset).To(Equal(int64(-1)))
Expect(state.Info.Metadata).To(Equal("m3ta"))
})

It("should update state", func() {
subject.MarkOffset(2001, "met@") // should set state
Expect(subject.State()).To(Equal(partitionState{
Info: offsetInfo{2001, "met@"},
Expect(subject.getState()).To(Equal(partitionState{
Info: offsetInfo{2002, "met@"},
Dirty: true,
}))

subject.MarkCommitted(2001) // should reset dirty status
Expect(subject.State()).To(Equal(partitionState{
Info: offsetInfo{2001, "met@"},
subject.markCommitted(2002) // should reset dirty status
Expect(subject.getState()).To(Equal(partitionState{
Info: offsetInfo{2002, "met@"},
}))

subject.MarkOffset(2001, "me7a") // should not update state
Expect(subject.State()).To(Equal(partitionState{
Info: offsetInfo{2001, "met@"},
Expect(subject.getState()).To(Equal(partitionState{
Info: offsetInfo{2002, "met@"},
}))

subject.MarkOffset(2002, "me7a") // should bump state
Expect(subject.State()).To(Equal(partitionState{
Info: offsetInfo{2002, "me7a"},
Expect(subject.getState()).To(Equal(partitionState{
Info: offsetInfo{2003, "me7a"},
Dirty: true,
}))

// After committing a later offset, try rewinding back to earlier offset with new metadata.
subject.ResetOffset(2001, "met@")
Expect(subject.State()).To(Equal(partitionState{
Info: offsetInfo{2001, "met@"},
Expect(subject.getState()).To(Equal(partitionState{
Info: offsetInfo{2002, "met@"},
Dirty: true,
}))

subject.MarkCommitted(2001) // should not unset state
Expect(subject.State()).To(Equal(partitionState{
Info: offsetInfo{2001, "met@"},
subject.markCommitted(2002) // should not unset state
Expect(subject.getState()).To(Equal(partitionState{
Info: offsetInfo{2002, "met@"},
}))

subject.MarkOffset(2002, "me7a") // should bump state
Expect(subject.State()).To(Equal(partitionState{
Info: offsetInfo{2002, "me7a"},
Expect(subject.getState()).To(Equal(partitionState{
Info: offsetInfo{2003, "me7a"},
Dirty: true,
}))

subject.MarkCommitted(2002)
Expect(subject.State()).To(Equal(partitionState{
Info: offsetInfo{2002, "me7a"},
subject.markCommitted(2003)
Expect(subject.getState()).To(Equal(partitionState{
Info: offsetInfo{2003, "me7a"},
}))
})

It("should not fail when nil", func() {
blank := (*partitionConsumer)(nil)
Expect(func() {
_ = blank.State()
_ = blank.getState()
blank.MarkOffset(2001, "met@")
blank.MarkCommitted(2001)
blank.markCommitted(2001)
}).NotTo(Panic())
})

Expand Down Expand Up @@ -135,7 +135,7 @@ var _ = Describe("partitionMap", func() {

subject.Store("topic", 0, pc0)
subject.Store("topic", 1, pc1)
subject.Fetch("topic", 1).MarkOffset(2001, "met@")
subject.Fetch("topic", 1).MarkOffset(2000, "met@")

Expect(subject.Snapshot()).To(Equal(map[topicPartition]partitionState{
{"topic", 0}: {Info: offsetInfo{2000, "m3ta"}, Dirty: false},
Expand Down

0 comments on commit ff20ee4

Please sign in to comment.