Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refresh brokers given list of seed brokers #1781

Merged
merged 1 commit into from Aug 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 29 additions & 0 deletions client.go
Expand Up @@ -56,6 +56,11 @@ type Client interface {
// partition. Offline replicas are replicas which are offline
OfflineReplicas(topic string, partitionID int32) ([]int32, error)

// RefreshBrokers takes a list of addresses to be used as seed brokers.
// Existing broker connections are closed and the updated list of seed brokers
// will be used for the next metadata fetch.
RefreshBrokers(addrs []string) error

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how Sarama folks are adding new features, but this looks like it's a breaking change if someone is implementing the interface 🤔

not sure why someone would do that, but the interface is public 🤷

we could add a trick that I saw in the golang code, to add a private method, so no apps could implement the interface

    // A private method to prevent users implementing the
    // interface and so future additions to it will not
    // violate Go 1 compatibility.
    private()

found the blog post: https://blog.golang.org/module-compatibility

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, i'll add a dummy private() method to the interface in another PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😞 this wasn't a good idea at the end :/ the initial intention of private() is if the user needs to use an interface but don't want other people to implement it, and this is not the case for this interface.

// RefreshMetadata takes a list of topics and queries the cluster to refresh the
// available metadata for those topics. If no topics are provided, it will refresh
// metadata for all topics.
Expand Down Expand Up @@ -429,6 +434,30 @@ func (client *client) Leader(topic string, partitionID int32) (*Broker, error) {
return leader, err
}

func (client *client) RefreshBrokers(addrs []string) error {
if client.Closed() {
return ErrClosedClient
}

client.lock.Lock()
defer client.lock.Unlock()

for _, broker := range client.brokers {
_ = broker.Close()
delete(client.brokers, broker.ID())
}

client.seedBrokers = nil
client.deadSeeds = nil

random := rand.New(rand.NewSource(time.Now().UnixNano()))
for _, index := range random.Perm(len(addrs)) {
client.seedBrokers = append(client.seedBrokers, NewBroker(addrs[index]))
}
justin-chen marked this conversation as resolved.
Show resolved Hide resolved

return nil
}

func (client *client) RefreshMetadata(topics ...string) error {
if client.Closed() {
return ErrClosedClient
Expand Down
31 changes: 31 additions & 0 deletions client_test.go
Expand Up @@ -515,6 +515,37 @@ func TestClientRefreshBehaviour(t *testing.T) {
safeClose(t, client)
}

func TestClientRefreshBrokers(t *testing.T) {
initialSeed := NewMockBroker(t, 0)
leader := NewMockBroker(t, 5)

metadataResponse1 := new(MetadataResponse)
metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID())
metadataResponse1.AddBroker(initialSeed.Addr(), initialSeed.BrokerID())
initialSeed.Returns(metadataResponse1)

c, err := NewClient([]string{initialSeed.Addr()}, nil)
client := c.(*client)

if err != nil {
t.Fatal(err)
}

if len(client.Brokers()) != 2 {
t.Error("Meta broker is not 2")
}

newSeedBrokers := []string{"localhost:12345"}
_ = client.RefreshBrokers(newSeedBrokers)

if client.seedBrokers[0].addr != newSeedBrokers[0] {
t.Error("Seed broker not updated")
}
if len(client.Brokers()) != 0 {
t.Error("Old brokers not closed")
}
}

func TestClientRefreshMetadataBrokerOffline(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 5)
Expand Down