From 57b008a411463bce4c26350177dae4346f7b84d2 Mon Sep 17 00:00:00 2001 From: ZhangJian He Date: Fri, 29 Jul 2022 10:43:50 +0800 Subject: [PATCH] Forget to update memory usage when invalid message (#16835) ### Modifications release memory usage when invalid message. Only need to release memory usage here, no need to release semaphore. Both add testcases. coauthored by @pengxiangrui127. ### Verifying this change - add unit tests for this change ### Documentation Check the box below or label this PR directly. Need to update docs? - [x] `doc-not-needed` bug fix, no need doc --- .../client/impl/ProducerMemoryLimitTest.java | 27 +++++++++++++++++++ .../client/impl/ProducerSemaphoreTest.java | 26 ++++++++++++++++++ .../impl/BatchMessageContainerImpl.java | 2 ++ 3 files changed, 55 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java index 264ec306413cd..0856dfc88b24b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java @@ -23,6 +23,8 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SizeUnit; +import org.mockito.MockedStatic; +import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -47,6 +49,31 @@ protected void cleanup() throws Exception { super.internalCleanup(); } + @Test(timeOut = 10_000) + public void testProducerInvalidMessageMemoryRelease() throws Exception { + initClientWithMemoryLimit(); + @Cleanup + ProducerImpl producer = (ProducerImpl) pulsarClient.newProducer() + .topic("testProducerMemoryLimit") + .sendTimeout(5, TimeUnit.SECONDS) + .batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS) + .batchingMaxBytes(10240) + .enableBatching(true) + .create(); + this.stopBroker(); + try { + try (MockedStatic mockedStatic = Mockito.mockStatic(ClientCnx.class)) { + mockedStatic.when(ClientCnx::getMaxMessageSize).thenReturn(8); + producer.send("memory-test".getBytes(StandardCharsets.UTF_8)); + } + throw new IllegalStateException("can not reach here"); + } catch (PulsarClientException.InvalidMessageException ex) { + PulsarClientImpl clientImpl = (PulsarClientImpl) this.pulsarClient; + final MemoryLimitController memoryLimitController = clientImpl.getMemoryLimitController(); + Assert.assertEquals(memoryLimitController.currentUsage(), 0); + } + } + @Test(timeOut = 10_000) public void testProducerTimeoutMemoryRelease() throws Exception { initClientWithMemoryLimit(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java index 78fc659a205b2..cc7b601e42a41 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java @@ -25,12 +25,15 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.util.FutureUtil; +import org.mockito.MockedStatic; +import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -52,6 +55,29 @@ public void cleanup() throws Exception { super.internalCleanup(); } + @Test(timeOut = 10_000) + public void testProducerSemaphoreInvalidMessage() throws Exception { + final int pendingQueueSize = 100; + + @Cleanup + ProducerImpl producer = (ProducerImpl) pulsarClient.newProducer() + .topic("testProducerSemaphoreAcquire") + .maxPendingMessages(pendingQueueSize) + .enableBatching(false) + .create(); + + this.stopBroker(); + try { + try (MockedStatic mockedStatic = Mockito.mockStatic(ClientCnx.class)) { + mockedStatic.when(ClientCnx::getMaxMessageSize).thenReturn(2); + producer.send("semaphore-test".getBytes(StandardCharsets.UTF_8)); + } + throw new IllegalStateException("can not reach here"); + } catch (PulsarClientException.InvalidMessageException ex) { + Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize); + } + } + @Test(timeOut = 30000) public void testProducerSemaphoreAcquireAndRelease() throws PulsarClientException, ExecutionException, InterruptedException { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java index 02fb491d09dec..0d107aa7ba953 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java @@ -199,6 +199,8 @@ public boolean isMultiBatches() { public OpSendMsg createOpSendMsg() throws IOException { ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, getCompressedBatchMetadataAndPayload()); if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) { + messages.forEach(msg -> producer.client.getMemoryLimitController() + .releaseMemory(msg.getUncompressedSize())); discard(new PulsarClientException.InvalidMessageException( "Message size is bigger than " + ClientCnx.getMaxMessageSize() + " bytes")); return null;