Skip to content

Commit

Permalink
Merge pull request #1781 from Shopify/reconnect-with-seed-brokers
Browse files Browse the repository at this point in the history
Refresh brokers given list of seed brokers
  • Loading branch information
justin-chen committed Aug 19, 2020
2 parents 9e4e58d + 654bd01 commit a1c698e
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 0 deletions.
29 changes: 29 additions & 0 deletions client.go
Expand Up @@ -56,6 +56,11 @@ type Client interface {
// partition. Offline replicas are replicas which are offline
OfflineReplicas(topic string, partitionID int32) ([]int32, error)

// RefreshBrokers takes a list of addresses to be used as seed brokers.
// Existing broker connections are closed and the updated list of seed brokers
// will be used for the next metadata fetch.
RefreshBrokers(addrs []string) error

// RefreshMetadata takes a list of topics and queries the cluster to refresh the
// available metadata for those topics. If no topics are provided, it will refresh
// metadata for all topics.
Expand Down Expand Up @@ -429,6 +434,30 @@ func (client *client) Leader(topic string, partitionID int32) (*Broker, error) {
return leader, err
}

func (client *client) RefreshBrokers(addrs []string) error {
if client.Closed() {
return ErrClosedClient
}

client.lock.Lock()
defer client.lock.Unlock()

for _, broker := range client.brokers {
_ = broker.Close()
delete(client.brokers, broker.ID())
}

client.seedBrokers = nil
client.deadSeeds = nil

random := rand.New(rand.NewSource(time.Now().UnixNano()))
for _, index := range random.Perm(len(addrs)) {
client.seedBrokers = append(client.seedBrokers, NewBroker(addrs[index]))
}

return nil
}

func (client *client) RefreshMetadata(topics ...string) error {
if client.Closed() {
return ErrClosedClient
Expand Down
31 changes: 31 additions & 0 deletions client_test.go
Expand Up @@ -515,6 +515,37 @@ func TestClientRefreshBehaviour(t *testing.T) {
safeClose(t, client)
}

func TestClientRefreshBrokers(t *testing.T) {
initialSeed := NewMockBroker(t, 0)
leader := NewMockBroker(t, 5)

metadataResponse1 := new(MetadataResponse)
metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID())
metadataResponse1.AddBroker(initialSeed.Addr(), initialSeed.BrokerID())
initialSeed.Returns(metadataResponse1)

c, err := NewClient([]string{initialSeed.Addr()}, nil)
client := c.(*client)

if err != nil {
t.Fatal(err)
}

if len(client.Brokers()) != 2 {
t.Error("Meta broker is not 2")
}

newSeedBrokers := []string{"localhost:12345"}
_ = client.RefreshBrokers(newSeedBrokers)

if client.seedBrokers[0].addr != newSeedBrokers[0] {
t.Error("Seed broker not updated")
}
if len(client.Brokers()) != 0 {
t.Error("Old brokers not closed")
}
}

func TestClientRefreshMetadataBrokerOffline(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 5)
Expand Down

0 comments on commit a1c698e

Please sign in to comment.