Skip to content

Commit

Permalink
[fix][client]Fix client memory limit currentUsage leak and semaphore …
Browse files Browse the repository at this point in the history
…release duplicated in ProducerImpl (apache#16837)

### Motivation

Fix client memory limit `currentUsage` leak in `ProducerImpl`. When our pulsar cluster occur some error,  producer send message fail and we find the `currentUsage` always  keep high value like the leaked, and cause the producer send rate is slow.
And find producer semaphore release duplicated when `createOpSendMsg`  occur some excrption.

Follow 1 point only release the message count semaphore, but not release the memory limit.
**memory limit currentUsage leak point**
https://github.com/apache/pulsar/blob/c217b8f559292fd34c6a4fb4b30aab213720d962/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L2031-L2033

**producer semaphore release duplicated**
https://github.com/apache/pulsar/blob/4d64e2e66689381ebbb94fbfc03eb4e1dfba0405/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L2116-L2120

```
After the exception the  memory limit leak occured.
org.apache.pulsar.client.api.PulsarClientException$TimeoutException: The producer XXXX-366-15151 can not send message to the topic persistent://XXXX/XXXX/XXXX within given timeout : createdAt 30.005 seconds ago, firstSentAt 30.005 seconds ago, lastSentAt 30.005 seconds ago, retryCount 1
        at org.apache.pulsar.client.impl.ProducerImpl$OpSendMsg.sendComplete(ProducerImpl.java:1287)
        at org.apache.pulsar.client.impl.ProducerImpl.lambda$failPendingMessages$18(ProducerImpl.java:1826)
        at java.base/java.util.ArrayDeque.forEach(ArrayDeque.java:889)
        at org.apache.pulsar.client.impl.ProducerImpl$OpSendMsgQueue.forEach(ProducerImpl.java:1369)
        at org.apache.pulsar.client.impl.ProducerImpl.failPendingMessages(ProducerImpl.java:1816)
        at org.apache.pulsar.client.impl.ProducerImpl.lambda$failPendingMessages$19(ProducerImpl.java:1848)
        at org.apache.pulsar.shade.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
        at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
        at org.apache.pulsar.shade.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
        at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
        at org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:834)

```

### Modifications

1. add the `MemoryLimitController` release.

### Documentation

- [X] `doc-not-needed`

(cherry picked from commit 955dcd1)
(cherry picked from commit 7c73269)
  • Loading branch information
Nicklee007 authored and nicoloboschi committed Aug 16, 2022
1 parent 94968bd commit bbcbe08
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,35 @@ public void testProducerTimeoutMemoryRelease() throws Exception {

}

@Test(timeOut = 10_000)
public void testProducerBatchSendTimeoutMemoryRelease() throws Exception {
initClientWithMemoryLimit();
@Cleanup
ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer()
.topic("testProducerMemoryLimit")
.sendTimeout(5, TimeUnit.SECONDS)
.maxPendingMessages(0)
.enableBatching(true)
.batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS)
.batchingMaxBytes(12)
.create();
this.stopBroker();
try {
producer.newMessage().value("memory-test".getBytes(StandardCharsets.UTF_8)).sendAsync();
try {
producer.newMessage().value("memory-test".getBytes(StandardCharsets.UTF_8)).sendAsync().get();
} catch (Exception e) {
throw PulsarClientException.unwrap(e);
}

throw new IllegalStateException("can not reach here");
} catch (PulsarClientException.TimeoutException ex) {
PulsarClientImpl clientImpl = (PulsarClientImpl) this.pulsarClient;
final MemoryLimitController memoryLimitController = clientImpl.getMemoryLimitController();
Assert.assertEquals(memoryLimitController.currentUsage(), 0);
}
}

@Test(timeOut = 10_000)
public void testProducerCloseMemoryRelease() throws Exception {
initClientWithMemoryLimit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
*/
package org.apache.pulsar.client.impl;

import static org.mockito.ArgumentMatchers.any;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.ProducerConsumerBase;
Expand All @@ -33,7 +37,6 @@
import org.testng.annotations.Test;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -229,4 +232,35 @@ public void testEnsureNotBlockOnThePendingQueue() throws Exception {
Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize);
Assert.assertFalse(producer.isErrorStat());
}

@Test(timeOut = 10_000)
public void testBatchMessageSendTimeoutProducerSemaphoreRelease() throws Exception {
final int pendingQueueSize = 10;
@Cleanup
ProducerImpl<byte[]> producer =
(ProducerImpl<byte[]>) pulsarClient.newProducer()
.topic("testProducerSemaphoreRelease")
.sendTimeout(5, TimeUnit.SECONDS)
.maxPendingMessages(pendingQueueSize)
.enableBatching(true)
.batchingMaxPublishDelay(500, TimeUnit.MILLISECONDS)
.batchingMaxBytes(12)
.create();
this.stopBroker();
try {
ProducerImpl<byte[]> spyProducer = Mockito.spy(producer);
Mockito.doThrow(new PulsarClientException.CryptoException("crypto error")).when(spyProducer)
.encryptMessage(any(),any());

Field batchMessageContainerField = ProducerImpl.class.getDeclaredField("batchMessageContainer");
batchMessageContainerField.setAccessible(true);
BatchMessageContainerImpl batchMessageContainer = (BatchMessageContainerImpl) batchMessageContainerField.get(spyProducer);
batchMessageContainer.setProducer(spyProducer);
spyProducer.send("semaphore-test".getBytes(StandardCharsets.UTF_8));

throw new IllegalStateException("can not reach here");
} catch (PulsarClientException.TimeoutException ex) {
Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 10);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1959,8 +1959,10 @@ private void failPendingBatchMessages(PulsarClientException ex) {
return;
}
final int numMessagesInBatch = batchMessageContainer.getNumMessagesInBatch();
final long currentBatchSize = batchMessageContainer.getCurrentBatchSize();
batchMessageContainer.discard(ex);
semaphoreRelease(numMessagesInBatch);
client.getMemoryLimitController().releaseMemory(currentBatchSize);
}

@Override
Expand Down Expand Up @@ -2005,10 +2007,7 @@ private void batchMessageAndSend() {
for (OpSendMsg opSendMsg : opSendMsgs) {
processOpSendMsg(opSendMsg);
}
} catch (PulsarClientException e) {
semaphoreRelease(batchMessageContainer.getNumMessagesInBatch());
} catch (Throwable t) {
semaphoreRelease(batchMessageContainer.getNumMessagesInBatch());
log.warn("[{}] [{}] error while create opSendMsg by batch message container", topic, producerName, t);
}
}
Expand Down

0 comments on commit bbcbe08

Please sign in to comment.