Skip to content

Commit

Permalink
KIP-392: Allow consumers to fetch from closest replica (#1822)
Browse files Browse the repository at this point in the history
Support for [KIP-392](https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica) in Sarama consumers.

This is a continuation of #1696 with changes mentioned [here](#1696 (comment)).

Co-authored-by: Deepak.Neralla <Deepak.Neralla@mixpanel.com>
  • Loading branch information
danp and dneralla committed Nov 6, 2020
1 parent 8970705 commit 22788cc
Show file tree
Hide file tree
Showing 8 changed files with 405 additions and 40 deletions.
14 changes: 14 additions & 0 deletions client.go
Expand Up @@ -29,6 +29,9 @@ type Client interface {
// Brokers returns the current set of active brokers as retrieved from cluster metadata.
Brokers() []*Broker

// Broker returns the active Broker if available for the broker ID.
Broker(brokerID int32) (*Broker, error)

// Topics returns the set of available topics as retrieved from cluster metadata.
Topics() ([]string, error)

Expand Down Expand Up @@ -198,6 +201,17 @@ func (client *client) Brokers() []*Broker {
return brokers
}

func (client *client) Broker(brokerID int32) (*Broker, error) {
client.lock.RLock()
defer client.lock.RUnlock()
broker, ok := client.brokers[brokerID]
if !ok {
return nil, ErrBrokerNotFound
}
_ = broker.Open(client.conf)
return broker, nil
}

func (client *client) InitProducerID() (*InitProducerIDResponse, error) {
var err error
for broker := client.any(); broker != nil; broker = client.any() {
Expand Down
36 changes: 36 additions & 0 deletions client_test.go
Expand Up @@ -576,6 +576,42 @@ func TestClientRefreshMetadataBrokerOffline(t *testing.T) {
}
}

func TestClientGetBroker(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 5)

metadataResponse1 := new(MetadataResponse)
metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID())
metadataResponse1.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
seedBroker.Returns(metadataResponse1)

client, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig())
if err != nil {
t.Fatal(err)
}

broker, err := client.Broker(leader.BrokerID())
if err != nil {
t.Fatal(err)
}

if broker.Addr() != leader.Addr() {
t.Errorf("Expected broker to have address %s, found %s", leader.Addr(), broker.Addr())
}

metadataResponse2 := new(MetadataResponse)
metadataResponse2.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
seedBroker.Returns(metadataResponse2)

if err := client.RefreshMetadata(); err != nil {
t.Error(err)
}
broker, err = client.Broker(leader.BrokerID())
if err != ErrBrokerNotFound {
t.Errorf("Expected Broker(brokerID) to return %v found %v", ErrBrokerNotFound, err)
}
}

func TestClientResurrectDeadSeeds(t *testing.T) {
initialSeed := NewMockBroker(t, 0)
emptyMetadata := new(MetadataResponse)
Expand Down
37 changes: 31 additions & 6 deletions consumer.go
Expand Up @@ -303,6 +303,8 @@ type partitionConsumer struct {
errors chan *ConsumerError
feeder chan *FetchResponse

preferredReadReplica int32

trigger, dying chan none
closeOnce sync.Once
topic string
Expand Down Expand Up @@ -363,18 +365,29 @@ func (child *partitionConsumer) dispatcher() {
close(child.feeder)
}

func (child *partitionConsumer) preferredBroker() (*Broker, error) {
if child.preferredReadReplica >= 0 {
broker, err := child.consumer.client.Broker(child.preferredReadReplica)
if err == nil {
return broker, nil
}
}

// if prefered replica cannot be found fallback to leader
return child.consumer.client.Leader(child.topic, child.partition)
}

func (child *partitionConsumer) dispatch() error {
if err := child.consumer.client.RefreshMetadata(child.topic); err != nil {
return err
}

var leader *Broker
var err error
if leader, err = child.consumer.client.Leader(child.topic, child.partition); err != nil {
broker, err := child.preferredBroker()
if err != nil {
return err
}

child.broker = child.consumer.refBrokerConsumer(leader)
child.broker = child.consumer.refBrokerConsumer(broker)

child.broker.input <- child

Expand Down Expand Up @@ -593,6 +606,8 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu

consumerBatchSizeMetric.Update(int64(nRecs))

child.preferredReadReplica = block.PreferredReadReplica

if nRecs == 0 {
partialTrailingMessage, err := block.isPartial()
if err != nil {
Expand Down Expand Up @@ -823,9 +838,19 @@ func (bc *brokerConsumer) handleResponses() {
result := child.responseResult
child.responseResult = nil

if result == nil {
if child.preferredReadReplica >= 0 && bc.broker.ID() != child.preferredReadReplica {
// not an error but needs redispatching to consume from prefered replica
child.trigger <- none{}
delete(bc.subscriptions, child)
}
continue
}

// Discard any replica preference.
child.preferredReadReplica = -1

switch result {
case nil:
// no-op
case errTimedOut:
Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because consuming was taking too long\n",
bc.broker.ID(), child.topic, child.partition)
Expand Down

0 comments on commit 22788cc

Please sign in to comment.