Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Why not refresh metadata when got ErrNotLeaderForPartition #2760

Open
ForeverSRC opened this issue Jan 3, 2024 · 3 comments
Open

Why not refresh metadata when got ErrNotLeaderForPartition #2760

ForeverSRC opened this issue Jan 3, 2024 · 3 comments
Labels
bug needs-investigation Issues that require followup from maintainers

Comments

@ForeverSRC
Copy link

ForeverSRC commented Jan 3, 2024

We got some ErrNotLeaderForPartition error after retrying for 3 times, I wonder why the producer doesn't refresh metadata when got that error.

sarama/async_producer.go

Lines 1121 to 1126 in d9abf3c

if bp.parent.conf.Producer.Idempotent {
err := bp.parent.client.RefreshMetadata(retryTopics...)
if err != nil {
Logger.Printf("Failed refreshing metadata because of %v\n", err)
}
}

@dnwe
Copy link
Collaborator

dnwe commented Feb 11, 2024

@ForeverSRC thanks for raising, a good question!

Refreshing metadata in the producer retry case was brought in as part of the Idempotent producer changes, but it's not clear that this refresh needs to be gated for idempotent-only retries

@dnwe dnwe added bug needs-investigation Issues that require followup from maintainers labels Feb 11, 2024
@dnwe
Copy link
Collaborator

dnwe commented Feb 11, 2024

@ForeverSRC having updated my memory of this after re-reading the code, any retriabe error for a given broker producer should cause it to 'abandon' the broker, do a metadata refresh on the assumption it needs use a different broker producer because the topicpartition has moved:

sarama/async_producer.go

Lines 710 to 724 in 5f63a84

func (pp *partitionProducer) newHighWatermark(hwm int) {
Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, hwm)
pp.highWatermark = hwm
// send off a fin so that we know when everything "in between" has made it
// back to us and we can safely flush the backlog (otherwise we risk re-ordering messages)
pp.retryState[pp.highWatermark].expectChaser = true
pp.parent.inFlight.Add(1) // we're generating a fin message; track it so we don't shut down while it's still inflight
pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: fin, retries: pp.highWatermark - 1}
// a new HWM means that our current broker selection is out of date
Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
pp.brokerProducer = nil
}

You'd see this in sarama logger output as (e.g.,):
13:45:16.866724 producer/leader/my_topic/0 state change to [retrying-1]
13:45:16.866735 producer/leader/my_topic/0 abandoning broker 2

@ForeverSRC
Copy link
Author

In my use case we got three ErrNotLeaderForPartition errors because the retry times is 3 and the interval is 100ms, so I'm not sure if the abandon logic works or not.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug needs-investigation Issues that require followup from maintainers
Projects
None yet
Development

No branches or pull requests

2 participants