Skip to content

Commit

Permalink
Add method to deregister all connected brokers and provide new seedbr…
Browse files Browse the repository at this point in the history
…oker to fetch metadata
  • Loading branch information
justin-chen committed Aug 18, 2020
1 parent 9e4e58d commit 8605256
Showing 1 changed file with 19 additions and 0 deletions.
19 changes: 19 additions & 0 deletions client.go
Expand Up @@ -56,6 +56,8 @@ type Client interface {
// partition. Offline replicas are replicas which are offline
OfflineReplicas(topic string, partitionID int32) ([]int32, error)

RefreshBrokersWithNewSeedBrokers(seedBrokers []*Broker) error

// RefreshMetadata takes a list of topics and queries the cluster to refresh the
// available metadata for those topics. If no topics are provided, it will refresh
// metadata for all topics.
Expand Down Expand Up @@ -429,6 +431,23 @@ func (client *client) Leader(topic string, partitionID int32) (*Broker, error) {
return leader, err
}

func (client *client) RefreshBrokersWithNewSeedBrokers(seedBrokers []*Broker) error {
if client.Closed() {
return ErrClosedClient
}

client.lock.Lock()
defer client.lock.Unlock()

for _, broker := range client.brokers {
_ = broker.Close()
delete(client.brokers, broker.ID())
}
client.seedBrokers = seedBrokers
client.deadSeeds = nil
return nil
}

func (client *client) RefreshMetadata(topics ...string) error {
if client.Closed() {
return ErrClosedClient
Expand Down

0 comments on commit 8605256

Please sign in to comment.