From 51c88b597f6ccb6c2b907672cda90d85cc227822 Mon Sep 17 00:00:00 2001 From: justfly Date: Mon, 5 Feb 2018 12:32:50 +0800 Subject: [PATCH 1/2] add Offset() to PartitionConsumer interface --- partitions.go | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/partitions.go b/partitions.go index 987780b..65725a8 100644 --- a/partitions.go +++ b/partitions.go @@ -19,6 +19,10 @@ type PartitionConsumer interface { // Partition returns the consumed partition Partition() int32 + + // Offset returns the offset used for creating the PartitionConsumer instance. + // Offset can be a literal offset, or OffsetNewest, or OffsetOldest + Offset() int64 } type partitionConsumer struct { @@ -29,6 +33,7 @@ type partitionConsumer struct { topic string partition int32 + offset int64 closeOnce sync.Once closeErr error @@ -37,12 +42,14 @@ type partitionConsumer struct { } func newPartitionConsumer(manager sarama.Consumer, topic string, partition int32, info offsetInfo, defaultOffset int64) (*partitionConsumer, error) { - pcm, err := manager.ConsumePartition(topic, partition, info.NextOffset(defaultOffset)) + offset := info.NextOffset(defaultOffset) + pcm, err := manager.ConsumePartition(topic, partition, offset) // Resume from default offset, if requested offset is out-of-range if err == sarama.ErrOffsetOutOfRange { info.Offset = -1 - pcm, err = manager.ConsumePartition(topic, partition, defaultOffset) + offset = defaultOffset + pcm, err = manager.ConsumePartition(topic, partition, offset) } if err != nil { return nil, err @@ -54,6 +61,7 @@ func newPartitionConsumer(manager sarama.Consumer, topic string, partition int32 topic: topic, partition: partition, + offset: offset, dying: make(chan none), dead: make(chan none), @@ -66,6 +74,9 @@ func (c *partitionConsumer) Topic() string { return c.topic } // Partition implements PartitionConsumer func (c *partitionConsumer) Partition() int32 { return c.partition } +// Offset implements PartitionConsumer +func (c *partitionConsumer) Offset() int64 { return c.offset } + // AsyncClose implements PartitionConsumer func (c *partitionConsumer) AsyncClose() { c.closeOnce.Do(func() { From 8158acf12093f596c9263f00b0b5ff4765a87168 Mon Sep 17 00:00:00 2001 From: justfly Date: Thu, 8 Mar 2018 16:03:39 +0800 Subject: [PATCH 2/2] rename Offset to InitialOffset --- partitions.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/partitions.go b/partitions.go index 65725a8..28eff52 100644 --- a/partitions.go +++ b/partitions.go @@ -20,9 +20,9 @@ type PartitionConsumer interface { // Partition returns the consumed partition Partition() int32 - // Offset returns the offset used for creating the PartitionConsumer instance. - // Offset can be a literal offset, or OffsetNewest, or OffsetOldest - Offset() int64 + // InitialOffset returns the offset used for creating the PartitionConsumer instance. + // The returned offset can be a literal offset, or OffsetNewest, or OffsetOldest + InitialOffset() int64 } type partitionConsumer struct { @@ -31,9 +31,9 @@ type partitionConsumer struct { state partitionState mu sync.Mutex - topic string - partition int32 - offset int64 + topic string + partition int32 + initialOffset int64 closeOnce sync.Once closeErr error @@ -59,9 +59,9 @@ func newPartitionConsumer(manager sarama.Consumer, topic string, partition int32 PartitionConsumer: pcm, state: partitionState{Info: info}, - topic: topic, - partition: partition, - offset: offset, + topic: topic, + partition: partition, + initialOffset: offset, dying: make(chan none), dead: make(chan none), @@ -74,8 +74,8 @@ func (c *partitionConsumer) Topic() string { return c.topic } // Partition implements PartitionConsumer func (c *partitionConsumer) Partition() int32 { return c.partition } -// Offset implements PartitionConsumer -func (c *partitionConsumer) Offset() int64 { return c.offset } +// InitialOffset implements PartitionConsumer +func (c *partitionConsumer) InitialOffset() int64 { return c.initialOffset } // AsyncClose implements PartitionConsumer func (c *partitionConsumer) AsyncClose() {