Skip to content
This repository has been archived by the owner on Jan 8, 2020. It is now read-only.

Commit

Permalink
Merge pull request #197 from jerrygb/issue-reset-offsets
Browse files Browse the repository at this point in the history
Allow marking of earlier offsets
  • Loading branch information
dim committed Jan 4, 2018
2 parents 24016d2 + baa95cc commit baf05b3
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 8 deletions.
7 changes: 5 additions & 2 deletions cluster_test.go
Expand Up @@ -74,6 +74,9 @@ var _ = BeforeSuite(func() {
testDataDir("server.properties"),
)

// Remove old test data before starting
Expect(os.RemoveAll(testKafkaData)).NotTo(HaveOccurred())

Expect(os.MkdirAll(testKafkaData, 0777)).To(Succeed())
Expect(testZkCmd.Start()).To(Succeed())
Expect(testKafkaCmd.Start()).To(Succeed())
Expand All @@ -96,7 +99,7 @@ var _ = BeforeSuite(func() {
}, "30s", "1s").Should(Succeed())

// Seed a few messages
Expect(testSeed(1000)).To(Succeed())
Expect(testSeed(1000, testTopics)).To(Succeed())
})

var _ = AfterSuite(func() {
Expand All @@ -123,7 +126,7 @@ func testDataDir(tokens ...string) string {
return filepath.Join(tokens...)
}

func testSeed(n int) error {
func testSeed(n int, testTopics []string) error {
producer, err := sarama.NewSyncProducerFromClient(testClient)
if err != nil {
return err
Expand Down
28 changes: 28 additions & 0 deletions consumer.go
Expand Up @@ -161,6 +161,34 @@ func (c *Consumer) MarkOffsets(s *OffsetStash) {
}
}

// ResetOffsets marks the provided message as processed, alongside a metadata string
// that represents the state of the partition consumer at that point in time. The
// metadata string can be used by another consumer to restore that state, so it
// can resume consumption.
//
// Difference between ResetOffset and MarkOffset is that it allows to rewind to an earlier offset
func (c *Consumer) ResetOffset(msg *sarama.ConsumerMessage, metadata string) {
c.subs.Fetch(msg.Topic, msg.Partition).ResetOffset(msg.Offset+1, metadata)
}

// ResetPartitionOffset marks an offset of the provided topic/partition as processed.
// See ResetOffset for additional explanation.
func (c *Consumer) ResetPartitionOffset(topic string, partition int32, offset int64, metadata string) {
c.subs.Fetch(topic, partition).ResetOffset(offset+1, metadata)
}

// ResetOffsets marks stashed offsets as processed.
// See ResetOffset for additional explanation.
func (c *Consumer) ResetOffsets(s *OffsetStash) {
s.mu.Lock()
defer s.mu.Unlock()

for tp, info := range s.offsets {
c.subs.Fetch(tp.Topic, tp.Partition).ResetOffset(info.Offset+1, info.Metadata)
delete(s.offsets, tp)
}
}

// Subscriptions returns the consumed topics and partitions
func (c *Consumer) Subscriptions() map[string][]int32 {
return c.subs.Info()
Expand Down
37 changes: 31 additions & 6 deletions consumer_test.go
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/Shopify/sarama"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"math/rand"
"sync"
)

var _ = Describe("Consumer", func() {
Expand Down Expand Up @@ -183,6 +185,30 @@ var _ = Describe("Consumer", func() {
}))
})

It("should support manual mark/commit, reset/commit", func() {
cs, err := newConsumerOf(testGroup, "topic-a")
Expect(err).NotTo(HaveOccurred())
defer cs.Close()

subscriptionsOf(cs).Should(Equal(map[string][]int32{
"topic-a": {0, 1, 2, 3}},
))

cs.MarkPartitionOffset("topic-a", 1, 3, "")
cs.MarkPartitionOffset("topic-a", 2, 4, "")
Expect(cs.CommitOffsets()).NotTo(HaveOccurred())

cs.ResetPartitionOffset("topic-a", 1, 2, "")
cs.ResetPartitionOffset("topic-a", 2, 3, "")
Expect(cs.CommitOffsets()).NotTo(HaveOccurred())

offsets, err := cs.fetchOffsets(cs.Subscriptions())
Expect(err).NotTo(HaveOccurred())
Expect(offsets).To(Equal(map[string]map[int32]offsetInfo{
"topic-a": {0: {Offset: -1}, 1: {Offset: 3}, 2: {Offset: 4}, 3: {Offset: -1}},
}))
})

It("should not commit unprocessed offsets", func() {
const groupID = "panicking"

Expand Down Expand Up @@ -281,28 +307,27 @@ var _ = Describe("Consumer", func() {
go consume("D", 200)
go consume("E", 100)
time.Sleep(10 * time.Second) // wait for consumers to subscribe to topics

Expect(testSeed(5000)).NotTo(HaveOccurred())
Expect(testSeed(5000, testTopics)).NotTo(HaveOccurred())
Eventually(func() int { return len(acc) }, "30s", "100ms").Should(BeNumerically(">=", 5000))

go consume("F", 300)
go consume("G", 400)
go consume("H", 1000)
go consume("I", 2000)
Expect(testSeed(5000)).NotTo(HaveOccurred())
Expect(testSeed(5000, testTopics)).NotTo(HaveOccurred())
Eventually(func() int { return len(acc) }, "30s", "100ms").Should(BeNumerically(">=", 8000))

go consume("J", 1000)
Expect(testSeed(5000)).NotTo(HaveOccurred())
Expect(testSeed(5000, testTopics)).NotTo(HaveOccurred())
Eventually(func() int { return len(acc) }, "30s", "100ms").Should(BeNumerically(">=", 9000))

go consume("K", 1000)
go consume("L", 3000)
Expect(testSeed(5000)).NotTo(HaveOccurred())
Expect(testSeed(5000, testTopics)).NotTo(HaveOccurred())
Eventually(func() int { return len(acc) }, "30s", "100ms").Should(BeNumerically(">=", 12000))

go consume("M", 1000)
Expect(testSeed(5000)).NotTo(HaveOccurred())
Expect(testSeed(5000, testTopics)).NotTo(HaveOccurred())
Eventually(func() int { return len(acc) }, "30s", "100ms").Should(BeNumerically(">=", 15000))

close(acc)
Expand Down
20 changes: 20 additions & 0 deletions offsets.go
Expand Up @@ -36,6 +36,26 @@ func (s *OffsetStash) MarkPartitionOffset(topic string, partition int32, offset
}
}

// ResetPartitionOffset stashes the offset for the provided topic/partition combination.
// Difference between ResetPartitionOffset and MarkPartitionOffset is that, ResetPartitionOffset supports earlier offsets
func (s *OffsetStash) ResetPartitionOffset(topic string, partition int32, offset int64, metadata string) {
s.mu.Lock()
defer s.mu.Unlock()

key := topicPartition{Topic: topic, Partition: partition}
if info := s.offsets[key]; offset <= info.Offset {
info.Offset = offset
info.Metadata = metadata
s.offsets[key] = info
}
}

// ResetOffset stashes the provided message offset
// See ResetPartitionOffset for explanation
func (s *OffsetStash) ResetOffset(msg *sarama.ConsumerMessage, metadata string) {
s.ResetPartitionOffset(msg.Topic, msg.Partition, msg.Offset, metadata)
}

// Offsets returns the latest stashed offsets by topic-partition
func (s *OffsetStash) Offsets() map[string]int64 {
s.mu.Lock()
Expand Down
40 changes: 40 additions & 0 deletions offsets_test.go
Expand Up @@ -44,4 +44,44 @@ var _ = Describe("OffsetStash", func() {
))
})

It("should reset", func() {
Expect(subject.offsets).To(HaveLen(0))

subject.MarkPartitionOffset("topic", 0, 0, "m3ta")
Expect(subject.offsets).To(HaveLen(1))
Expect(subject.offsets).To(HaveKeyWithValue(
topicPartition{Topic: "topic", Partition: 0},
offsetInfo{Offset: 0, Metadata: "m3ta"},
))

subject.MarkPartitionOffset("topic", 0, 200, "m3ta")
Expect(subject.offsets).To(HaveLen(1))
Expect(subject.offsets).To(HaveKeyWithValue(
topicPartition{Topic: "topic", Partition: 0},
offsetInfo{Offset: 200, Metadata: "m3ta"},
))

subject.ResetPartitionOffset("topic", 0, 199, "m3t@")
Expect(subject.offsets).To(HaveLen(1))
Expect(subject.offsets).To(HaveKeyWithValue(
topicPartition{Topic: "topic", Partition: 0},
offsetInfo{Offset: 199, Metadata: "m3t@"},
))

subject.MarkPartitionOffset("topic", 1, 300, "")
Expect(subject.offsets).To(HaveLen(2))
Expect(subject.offsets).To(HaveKeyWithValue(
topicPartition{Topic: "topic", Partition: 1},
offsetInfo{Offset: 300, Metadata: ""},
))

subject.ResetPartitionOffset("topic", 1, 200, "m3t@")
Expect(subject.offsets).To(HaveLen(2))
Expect(subject.offsets).To(HaveKeyWithValue(
topicPartition{Topic: "topic", Partition: 1},
offsetInfo{Offset: 200, Metadata: "m3t@"},
))

})

})
14 changes: 14 additions & 0 deletions partitions.go
Expand Up @@ -182,6 +182,20 @@ func (c *partitionConsumer) MarkOffset(offset int64, metadata string) {
c.mu.Unlock()
}

func (c *partitionConsumer) ResetOffset(offset int64, metadata string) {
if c == nil {
return
}

c.mu.Lock()
if offset <= c.state.Info.Offset {
c.state.Info.Offset = offset
c.state.Info.Metadata = metadata
c.state.Dirty = true
}
c.mu.Unlock()
}

// --------------------------------------------------------------------

type partitionState struct {
Expand Down
17 changes: 17 additions & 0 deletions partitions_test.go
Expand Up @@ -60,11 +60,28 @@ var _ = Describe("partitionConsumer", func() {
Dirty: true,
}))

// After committing a later offset, try rewinding back to earlier offset with new metadata.
subject.ResetOffset(2001, "met@")
Expect(subject.State()).To(Equal(partitionState{
Info: offsetInfo{2001, "met@"},
Dirty: true,
}))

subject.MarkCommitted(2001) // should not unset state
Expect(subject.State()).To(Equal(partitionState{
Info: offsetInfo{2001, "met@"},
}))

subject.MarkOffset(2002, "me7a") // should bump state
Expect(subject.State()).To(Equal(partitionState{
Info: offsetInfo{2002, "me7a"},
Dirty: true,
}))

subject.MarkCommitted(2002)
Expect(subject.State()).To(Equal(partitionState{
Info: offsetInfo{2002, "me7a"},
}))
})

It("should not fail when nil", func() {
Expand Down

0 comments on commit baf05b3

Please sign in to comment.