Skip to content
This repository has been archived by the owner on Jan 8, 2020. It is now read-only.

Commit

Permalink
add Offset() to PartitionConsumer interface (#221)
Browse files Browse the repository at this point in the history
* add Offset() to PartitionConsumer interface

* rename Offset to InitialOffset
  • Loading branch information
imjustfly authored and dim committed Mar 16, 2018
1 parent af1a350 commit d985926
Showing 1 changed file with 17 additions and 6 deletions.
23 changes: 17 additions & 6 deletions partitions.go
Expand Up @@ -20,6 +20,10 @@ type PartitionConsumer interface {
// Partition returns the consumed partition
Partition() int32

// InitialOffset returns the offset used for creating the PartitionConsumer instance.
// The returned offset can be a literal offset, or OffsetNewest, or OffsetOldest
InitialOffset() int64

// MarkOffset marks the offset of a message as preocessed.
MarkOffset(offset int64, metadata string)

Expand All @@ -33,8 +37,9 @@ type partitionConsumer struct {
state partitionState
mu sync.Mutex

topic string
partition int32
topic string
partition int32
initialOffset int64

closeOnce sync.Once
closeErr error
Expand All @@ -43,12 +48,14 @@ type partitionConsumer struct {
}

func newPartitionConsumer(manager sarama.Consumer, topic string, partition int32, info offsetInfo, defaultOffset int64) (*partitionConsumer, error) {
pcm, err := manager.ConsumePartition(topic, partition, info.NextOffset(defaultOffset))
offset := info.NextOffset(defaultOffset)
pcm, err := manager.ConsumePartition(topic, partition, offset)

// Resume from default offset, if requested offset is out-of-range
if err == sarama.ErrOffsetOutOfRange {
info.Offset = -1
pcm, err = manager.ConsumePartition(topic, partition, defaultOffset)
offset = defaultOffset
pcm, err = manager.ConsumePartition(topic, partition, offset)
}
if err != nil {
return nil, err
Expand All @@ -58,8 +65,9 @@ func newPartitionConsumer(manager sarama.Consumer, topic string, partition int32
PartitionConsumer: pcm,
state: partitionState{Info: info},

topic: topic,
partition: partition,
topic: topic,
partition: partition,
initialOffset: offset,

dying: make(chan none),
dead: make(chan none),
Expand All @@ -72,6 +80,9 @@ func (c *partitionConsumer) Topic() string { return c.topic }
// Partition implements PartitionConsumer
func (c *partitionConsumer) Partition() int32 { return c.partition }

// InitialOffset implements PartitionConsumer
func (c *partitionConsumer) InitialOffset() int64 { return c.initialOffset }

// AsyncClose implements PartitionConsumer
func (c *partitionConsumer) AsyncClose() {
c.closeOnce.Do(func() {
Expand Down

0 comments on commit d985926

Please sign in to comment.