Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[draft] [fix] [broker] Revert #19446 to fix the send message future never complete #21134

Closed

Conversation

poorbarcode
Copy link
Contributor

@poorbarcode poorbarcode commented Sep 5, 2023

Motivation

Before #19446

If producer sending and topic unloading are executed at the same time, there are two scenarios:

  • The Broker receives the send command before unloading the topic
    • Broker responds successfully after writing even if the Producer is already marked closed.
    • The client marks the send command as finished even if the connection is transferred to another one.
  • The Broker receives the send command after unloading the topic
    • Broker trigger close the socket
    • The client marks all pending requests as failed after the socket closed.

After #19446, scenario 2 changed

  • The Broker receives the send command after unloading the topic
    • Broker discards this send command and responds nothing.
    • The send command will not finish until the timeout

We received more than 20,000 errors in three hours.

image

[Pulsar][Producer]Asynchronous message sending failed:
org.apache.pulsar.client.api.PulsarClientException$TimeoutException: The producer service-TestProducer-02586a can not send message to the topic persistent://public/default/test-partition-2 within given timeout : createdAt 22.195 seconds ago, firstSentAt 7484.289 seconds ago, lastSentAt 7484.289 seconds ago, retryCount 0
	at org.apache.pulsar.client.impl.ProducerImpl$OpSendMsg.sendComplete(ProducerImpl.java:1386)
	at org.apache.pulsar.client.impl.ProducerImpl.lambda$failPendingMessages$19(ProducerImpl.java:1931)
	at java.base/java.util.ArrayDeque.forEach(ArrayDeque.java:889)
	at org.apache.pulsar.client.impl.ProducerImpl$OpSendMsgQueue.forEach(ProducerImpl.java:1476)
	at org.apache.pulsar.client.impl.ProducerImpl.failPendingMessages(ProducerImpl.java:1921)
	at org.apache.pulsar.client.impl.ProducerImpl.run(ProducerImpl.java:1900)
	at org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelTimeout.run(HashedWheelTimer.java:715)
	at org.apache.pulsar.shade.io.netty.util.concurrent.ImmediateExecutor.execute(ImmediateExecutor.java:34)
	at org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:703)
	at org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:790)
	at org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:503)
	at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:955)

Modifications

Revert #19446


Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: x

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Sep 5, 2023
@poorbarcode poorbarcode self-assigned this Sep 5, 2023
@poorbarcode poorbarcode added this to the 3.2.0 milestone Sep 5, 2023
@poorbarcode poorbarcode added the type/bug The PR fixed a bug or issue reported a bug label Sep 5, 2023
@michaeljmarshall
Copy link
Member

After #19446, scenario 2 changed

* The Broker receives the send command after unloading the topic
  
  * Broker discards this send command and responds nothing.
  * The send command will not finish until the timeout

What version of the Java client are you using? The client is supposed to resend all unack'd messages upon creating a new producer. Are you seeing the client resend the messages?

@michaeljmarshall
Copy link
Member

org.apache.pulsar.client.api.PulsarClientException$TimeoutException: The producer service-TestProducer-02586a can not send message to the topic persistent://public/default/test-partition-2 within given timeout : createdAt 22.195 seconds ago, firstSentAt 7484.289 seconds ago, lastSentAt 7484.289 seconds ago, retryCount 0

If you test the producer with an infinite send timeout, do the producers get into a stuck state? If the bug is in the recentlyClosedProducers logic, I would expect that to be the case.

@poorbarcode
Copy link
Contributor Author

poorbarcode commented Sep 5, 2023

@michaeljmarshall

What version of the Java client are you using?

It should be 2.9.x or 2.10.x. I need to reconfirm it with my colleague tomorrow.

The client is supposed to resend all unack'd messages upon creating a new producer. Are you seeing the client resend the messages?

Do you mean the resend by the method ProducerImpl.recoverProcessOpSendMsgFrom?[1]

If yes[1], there are two issues:

Issues-1: When sent again, the producer is removed by the new broker, for example:

time producer-1 broker-1 broker-2 broker-3
1 topic owner
2 send msg to broker-1 remove producer-1 due to unloading topic
3 topic owner
4 resend msg to broker-2 and remove msg from the pending ops queue remove producer-1 due to unloading topic
5 topic owner
6 (Highlight)the msg won't be resend because the pending ops queue is empty

Issues-2: First request timeout and resend are completed at the same time(Maybe this issue exists before #19446):

time producer-1 broker-1 broker-2
1 topic owner
2 async send [msg1, msg2, msg3, msg4] to broker-1 remove producer-1 due to unloading topic
3 topic owner
4 since the request is still in the clientCnx.pendingRequests, it will be triggered timeout responds success to the client
5 msg1 was triggered timeout in the thread scheduled thread(a random thread in eventLoopGroup)
6 msg2 was completed in the thread io thread
7 msg3 was triggered timeout in the thread scheduled thread(a random thread in eventLoopGroup)
8 msg4 was completed in the thread io thread

Then msg1 and msg3 were timeout(in fact, they were successful), but msg2 and msg4 were sent successful. Users will be confused about this case

If you test the producer with an infinite send timeout, do the producers get into a stuck state? If the bug is in the recentlyClosedProducers logic, I would expect that to be the case.

If hit the issue-1 above, I think the producer will get stuck.

@michaeljmarshall
Copy link
Member

I am working from my memory here, but I'm pretty sure this is the way it ought to work.

the msg won't be resend because the pending ops queue is empty

Messages are resent from the pendingMessages iterator. Note that the pending ops queue (assuming you mean the msgIterator) is built as pendingMessages.iterator(). This is necessary because any time we resend messages, they must be done from the earliest unacked message in order to retain message order. As far as I can tell, messages are not actually removed from pendingMessages until they are ack'd, timedout, or get an error from the broker.

Then msg1 and msg3 were timeout(in fact, they were successful), but msg2 and msg4 were sent successful. In this case, what should the user do?

I am not sure that I follow the events listed in this table. What is the purpose of referenced in the table?

It's relevant to point out that any time we have a send timeout, it is possible for messages to persist with "holes" and therefore out of order.

Also, note that there is synchronization on the ProducerImpl.this object that is intended to prevent certain race conditions.

@michaeljmarshall
Copy link
Member

If hit the issue-1 above, I think the producer will get stuck.

Is it possible to set up a test to show that this is true?

@poorbarcode
Copy link
Contributor Author

poorbarcode commented Sep 5, 2023

@michaeljmarshall

As far as I can tell, messages are not actually removed from pendingMessages until they are ack'd, timedout, or get an error from the broker.

After I recheck the code, you are right, it will not be removed in the method ProducerImpl.recoverProcessOpSendMsgFrom.

Is it possible to set up a test to show that this is true?

I also think this is a good way to confirm the issue. But it is hard to reproduce. I tend to think you're right, these messages will be re-sent and a response will be received if the time is enough.

Also, note that there is synchronization on the ProducerImpl. this object that is intended to prevent certain race conditions.

Yes, [timeout task](https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1989) and `handle send receipt` acquire the same lock `ProducerImpl.this`, but each time they process a message, they acquire and release the lock, so the scenario below might occur: - `msg1 send` timeout, acquire the lock, and release the lock, the user gets a timeout error for the `msg1`. - receive a response of `msg2 send` from the broker, the user gets a success sending `msg2`. - `msg3 send` timeout, acquire the lock, and release the lock, the user gets a timeout error for the `msg3`. - receive a response of `msg4 send` from the broker, the user gets a success sending `msg4`.

ProducerImpl.failPendingMessages will mark all messages as timeout in one lock statement block, so you are right, the scenario above will not occur.

@poorbarcode poorbarcode changed the title [fix] [broker] Revert #19446 to fix the send message future never complete [draft] [fix] [broker] Revert #19446 to fix the send message future never complete Sep 5, 2023
@poorbarcode
Copy link
Contributor Author

@michaeljmarshall

Thanks for your explanation. I'll sum it up tomorrow.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
doc-not-needed Your PR changes do not impact docs type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants