New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][broker] Expect msgs after server initiated CloseProducer #19446
[fix][broker] Expect msgs after server initiated CloseProducer #19446
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good work @michaeljmarshall!
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good solution
@@ -1622,6 +1625,14 @@ protected void handleSend(CommandSend send, ByteBuf headersAndPayload) { | |||
CompletableFuture<Producer> producerFuture = producers.get(send.getProducerId()); | |||
|
|||
if (producerFuture == null || !producerFuture.isDone() || producerFuture.isCompletedExceptionally()) { | |||
if (recentlyClosedProducers.containsKey(send.getProducerId())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we compare the "epoch" here ? maybe it is unnecessary, but I am not expert in this part of the protocol
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The send command does not have the producer's epoch, so we don't have that information in scope.
It could be valuable to discuss ways to improve the protocol for the future, like asking if the send command should have the epoch or some other identifier, but I want a backwards compatible solution that will work by upgrading the broker.
Also, I considered comparing epoch's when the Producer
command is handled in another part of this PR, but I think it complicates the logic more than necessary, so we ignore the value then too.
The primary reason for keeping using the epoch value is to make sure the scheduled task does not remove the wrong key.
/pulsarbot rerun-failure-checks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
(cherry picked from commit 524288c)
apache#19446)" This reverts commit 524288c.
I think this is a great improvement. But https://github.com/apache/pulsar/pull/19446/files#diff-1e0e8195fb5ec5a6d79acbc7d859c025a9b711f94e6ab37c94439e99b3202e84R1627-R1635 leading the send message future of the client could not be completed. At first, I tried to set the send message future as completed if the producer no longer exists in the broker, but I found that I also had to deal with the order of pending requests. So, for a quick fix, I pushed a new PR(#21134) to revert the current PR. |
apache#19446)" This reverts commit 524288c.
apache#19446)" This reverts commit 524288c.
Motivation
The intention of #12780 was to close the whole connection when the client sends an unexpected
Send
command. The goal of that change was to make the broker more defensive to prevent incorrect implementations in the client (see #12779) from leading to out of order messages.However, the change in #12780 was too broad. It closes the connection in a very expected case. Specifically, when the server disconnects a producer due to a load balancing or unloading event, the broker sends the client a
CloseProducer
command. If the broker receives any additional messages for that producer, the broker closes the whole connection. This is an expensive interruption for clients with many producers/consumers. Because the Pulsar Producer is expected to pipelineSend
commands, there is no current way to know if the client sent the messages before or after receiving the close producer command, and because the goal of #12780 was to increase stability, I think we should ignore the messages when they are received in these conditions.I propose that we keep a map of recently closed producers, and use that to limit how long we keep around the producer's tombstone.
For reference, when connections are closed due to this weakness in the implementation, the broker logs:
pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
Lines 1625 to 1626 in fd3ce8b
I observed this log line more than 17,000 in the past 7 days. As such, I plan to cherry pick this to active release branches.
Modifications
HashMap
to track recently closed producers. This map is only ever updated on theServerCnx
's event loop. The one downside is that theHashMap
will box thelong
keys and values. However, it is likely faster than theConcurrentLongHashMap
since it does not have any synchronization.Send
commands if they come recently after the broker sent aCloseProducer
command.CloseProducer
request, and the keep alive interval should be sufficient for a full round trip from broker to client.Verifying this change
New tests are added.
Does this pull request potentially affect one of the following parts:
This affects the protocol in a sense, but it does not change the protocol in any negative way.
Documentation
doc-not-needed
Matching PR in forked repository
PR in forked repository: michaeljmarshall#24