Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIP-392: Allow consumers to fetch from closest replica #1822

Merged
merged 4 commits into from Nov 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 14 additions & 0 deletions client.go
Expand Up @@ -29,6 +29,9 @@ type Client interface {
// Brokers returns the current set of active brokers as retrieved from cluster metadata.
Brokers() []*Broker

// Broker returns the active Broker if available for the broker ID.
Broker(brokerID int32) (*Broker, error)

// Topics returns the set of available topics as retrieved from cluster metadata.
Topics() ([]string, error)

Expand Down Expand Up @@ -198,6 +201,17 @@ func (client *client) Brokers() []*Broker {
return brokers
}

func (client *client) Broker(brokerID int32) (*Broker, error) {
client.lock.RLock()
defer client.lock.RUnlock()
broker, ok := client.brokers[brokerID]
if !ok {
return nil, ErrBrokerNotFound
}
_ = broker.Open(client.conf)
return broker, nil
}

func (client *client) InitProducerID() (*InitProducerIDResponse, error) {
var err error
for broker := client.any(); broker != nil; broker = client.any() {
Expand Down
36 changes: 36 additions & 0 deletions client_test.go
Expand Up @@ -576,6 +576,42 @@ func TestClientRefreshMetadataBrokerOffline(t *testing.T) {
}
}

func TestClientGetBroker(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 5)

metadataResponse1 := new(MetadataResponse)
metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID())
metadataResponse1.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
seedBroker.Returns(metadataResponse1)

client, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig())
if err != nil {
t.Fatal(err)
}

broker, err := client.Broker(leader.BrokerID())
if err != nil {
t.Fatal(err)
}

if broker.Addr() != leader.Addr() {
t.Errorf("Expected broker to have address %s, found %s", leader.Addr(), broker.Addr())
}

metadataResponse2 := new(MetadataResponse)
metadataResponse2.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
seedBroker.Returns(metadataResponse2)

if err := client.RefreshMetadata(); err != nil {
t.Error(err)
}
broker, err = client.Broker(leader.BrokerID())
if err != ErrBrokerNotFound {
t.Errorf("Expected Broker(brokerID) to return %v found %v", ErrBrokerNotFound, err)
}
}

func TestClientResurrectDeadSeeds(t *testing.T) {
initialSeed := NewMockBroker(t, 0)
emptyMetadata := new(MetadataResponse)
Expand Down
37 changes: 31 additions & 6 deletions consumer.go
Expand Up @@ -303,6 +303,8 @@ type partitionConsumer struct {
errors chan *ConsumerError
feeder chan *FetchResponse

preferredReadReplica int32

trigger, dying chan none
closeOnce sync.Once
topic string
Expand Down Expand Up @@ -363,18 +365,29 @@ func (child *partitionConsumer) dispatcher() {
close(child.feeder)
}

func (child *partitionConsumer) preferredBroker() (*Broker, error) {
if child.preferredReadReplica >= 0 {
broker, err := child.consumer.client.Broker(child.preferredReadReplica)
if err == nil {
return broker, nil
}
}

// if prefered replica cannot be found fallback to leader
return child.consumer.client.Leader(child.topic, child.partition)
}

func (child *partitionConsumer) dispatch() error {
if err := child.consumer.client.RefreshMetadata(child.topic); err != nil {
return err
}

var leader *Broker
var err error
if leader, err = child.consumer.client.Leader(child.topic, child.partition); err != nil {
broker, err := child.preferredBroker()
if err != nil {
return err
}

child.broker = child.consumer.refBrokerConsumer(leader)
child.broker = child.consumer.refBrokerConsumer(broker)

child.broker.input <- child

Expand Down Expand Up @@ -593,6 +606,8 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu

consumerBatchSizeMetric.Update(int64(nRecs))

child.preferredReadReplica = block.PreferredReadReplica

if nRecs == 0 {
partialTrailingMessage, err := block.isPartial()
if err != nil {
Expand Down Expand Up @@ -823,9 +838,19 @@ func (bc *brokerConsumer) handleResponses() {
result := child.responseResult
child.responseResult = nil

if result == nil {
if child.preferredReadReplica >= 0 && bc.broker.ID() != child.preferredReadReplica {
// not an error but needs redispatching to consume from prefered replica
child.trigger <- none{}
delete(bc.subscriptions, child)
}
continue
}

// Discard any replica preference.
child.preferredReadReplica = -1

switch result {
case nil:
// no-op
case errTimedOut:
Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because consuming was taking too long\n",
bc.broker.ID(), child.topic, child.partition)
Expand Down