Skip to content

Commit

Permalink
Add method to deregister all connected brokers and provide new seedbr…
Browse files Browse the repository at this point in the history
…oker to fetch metadata
  • Loading branch information
justin-chen committed Aug 19, 2020
1 parent 9e4e58d commit 654bd01
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 0 deletions.
29 changes: 29 additions & 0 deletions client.go
Expand Up @@ -56,6 +56,11 @@ type Client interface {
// partition. Offline replicas are replicas which are offline
OfflineReplicas(topic string, partitionID int32) ([]int32, error)

// RefreshBrokers takes a list of addresses to be used as seed brokers.
// Existing broker connections are closed and the updated list of seed brokers
// will be used for the next metadata fetch.
RefreshBrokers(addrs []string) error

// RefreshMetadata takes a list of topics and queries the cluster to refresh the
// available metadata for those topics. If no topics are provided, it will refresh
// metadata for all topics.
Expand Down Expand Up @@ -429,6 +434,30 @@ func (client *client) Leader(topic string, partitionID int32) (*Broker, error) {
return leader, err
}

func (client *client) RefreshBrokers(addrs []string) error {
if client.Closed() {
return ErrClosedClient
}

client.lock.Lock()
defer client.lock.Unlock()

for _, broker := range client.brokers {
_ = broker.Close()
delete(client.brokers, broker.ID())
}

client.seedBrokers = nil
client.deadSeeds = nil

random := rand.New(rand.NewSource(time.Now().UnixNano()))
for _, index := range random.Perm(len(addrs)) {
client.seedBrokers = append(client.seedBrokers, NewBroker(addrs[index]))
}

return nil
}

func (client *client) RefreshMetadata(topics ...string) error {
if client.Closed() {
return ErrClosedClient
Expand Down
31 changes: 31 additions & 0 deletions client_test.go
Expand Up @@ -515,6 +515,37 @@ func TestClientRefreshBehaviour(t *testing.T) {
safeClose(t, client)
}

func TestClientRefreshBrokers(t *testing.T) {
initialSeed := NewMockBroker(t, 0)
leader := NewMockBroker(t, 5)

metadataResponse1 := new(MetadataResponse)
metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID())
metadataResponse1.AddBroker(initialSeed.Addr(), initialSeed.BrokerID())
initialSeed.Returns(metadataResponse1)

c, err := NewClient([]string{initialSeed.Addr()}, nil)
client := c.(*client)

if err != nil {
t.Fatal(err)
}

if len(client.Brokers()) != 2 {
t.Error("Meta broker is not 2")
}

newSeedBrokers := []string{"localhost:12345"}
_ = client.RefreshBrokers(newSeedBrokers)

if client.seedBrokers[0].addr != newSeedBrokers[0] {
t.Error("Seed broker not updated")
}
if len(client.Brokers()) != 0 {
t.Error("Old brokers not closed")
}
}

func TestClientRefreshMetadataBrokerOffline(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 5)
Expand Down

0 comments on commit 654bd01

Please sign in to comment.