From 4f708bd9c76b0ae8ade9679688f998e342b9465f Mon Sep 17 00:00:00 2001 From: ZhangJian He Date: Fri, 29 Jul 2022 10:43:50 +0800 Subject: [PATCH] Forget to update memory usage when invalid message (#16835) release memory usage when invalid message. Only need to release memory usage here, no need to release semaphore. Both add testcases. coauthored by @pengxiangrui127. - add unit tests for this change Check the box below or label this PR directly. Need to update docs? - [x] `doc-not-needed` bug fix, no need doc (cherry picked from commit 57b008a411463bce4c26350177dae4346f7b84d2) --- .../client/impl/ProducerMemoryLimitTest.java | 27 ++++++++++++++++++- .../impl/BatchMessageContainerImpl.java | 2 ++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java index 77e3ee811a714..741262bafe24a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java @@ -27,7 +27,7 @@ import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; - +import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeUnit; @@ -47,6 +47,31 @@ protected void cleanup() throws Exception { super.internalCleanup(); } + @Test(timeOut = 10_000) + public void testProducerInvalidMessageMemoryRelease() throws Exception { + initClientWithMemoryLimit(); + @Cleanup + ProducerImpl producer = (ProducerImpl) pulsarClient.newProducer() + .topic("testProducerMemoryLimit") + .sendTimeout(5, TimeUnit.SECONDS) + .batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS) + .batchingMaxBytes(10240) + .enableBatching(true) + .create(); + this.stopBroker(); + try { + Field field = ClientCnx.class.getDeclaredField("maxMessageSize"); + field.setAccessible(true); + field.set(producer.getClientCnx(), 8); + producer.send("memory-test".getBytes(StandardCharsets.UTF_8)); + throw new IllegalStateException("can not reach here"); + } catch (PulsarClientException.InvalidMessageException ex) { + PulsarClientImpl clientImpl = (PulsarClientImpl) this.pulsarClient; + final MemoryLimitController memoryLimitController = clientImpl.getMemoryLimitController(); + Assert.assertEquals(memoryLimitController.currentUsage(), 0); + } + } + @Test(timeOut = 10_000) public void testProducerTimeoutMemoryRelease() throws Exception { initClientWithMemoryLimit(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java index e0ab2d942ca15..b5acc2c30e9c4 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java @@ -200,6 +200,8 @@ public OpSendMsg createOpSendMsg() throws IOException { ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, getCompressedBatchMetadataAndPayload()); if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) { producer.semaphoreRelease(messages.size()); + messages.forEach(msg -> producer.client.getMemoryLimitController() + .releaseMemory(msg.getUncompressedSize())); discard(new PulsarClientException.InvalidMessageException( "Message size is bigger than " + ClientCnx.getMaxMessageSize() + " bytes")); return null;