diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java index 264ec306413cd..0856dfc88b24b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java @@ -23,6 +23,8 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SizeUnit; +import org.mockito.MockedStatic; +import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -47,6 +49,31 @@ protected void cleanup() throws Exception { super.internalCleanup(); } + @Test(timeOut = 10_000) + public void testProducerInvalidMessageMemoryRelease() throws Exception { + initClientWithMemoryLimit(); + @Cleanup + ProducerImpl producer = (ProducerImpl) pulsarClient.newProducer() + .topic("testProducerMemoryLimit") + .sendTimeout(5, TimeUnit.SECONDS) + .batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS) + .batchingMaxBytes(10240) + .enableBatching(true) + .create(); + this.stopBroker(); + try { + try (MockedStatic mockedStatic = Mockito.mockStatic(ClientCnx.class)) { + mockedStatic.when(ClientCnx::getMaxMessageSize).thenReturn(8); + producer.send("memory-test".getBytes(StandardCharsets.UTF_8)); + } + throw new IllegalStateException("can not reach here"); + } catch (PulsarClientException.InvalidMessageException ex) { + PulsarClientImpl clientImpl = (PulsarClientImpl) this.pulsarClient; + final MemoryLimitController memoryLimitController = clientImpl.getMemoryLimitController(); + Assert.assertEquals(memoryLimitController.currentUsage(), 0); + } + } + @Test(timeOut = 10_000) public void testProducerTimeoutMemoryRelease() throws Exception { initClientWithMemoryLimit(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java index 78fc659a205b2..cc7b601e42a41 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java @@ -25,12 +25,15 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.util.FutureUtil; +import org.mockito.MockedStatic; +import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -52,6 +55,29 @@ public void cleanup() throws Exception { super.internalCleanup(); } + @Test(timeOut = 10_000) + public void testProducerSemaphoreInvalidMessage() throws Exception { + final int pendingQueueSize = 100; + + @Cleanup + ProducerImpl producer = (ProducerImpl) pulsarClient.newProducer() + .topic("testProducerSemaphoreAcquire") + .maxPendingMessages(pendingQueueSize) + .enableBatching(false) + .create(); + + this.stopBroker(); + try { + try (MockedStatic mockedStatic = Mockito.mockStatic(ClientCnx.class)) { + mockedStatic.when(ClientCnx::getMaxMessageSize).thenReturn(2); + producer.send("semaphore-test".getBytes(StandardCharsets.UTF_8)); + } + throw new IllegalStateException("can not reach here"); + } catch (PulsarClientException.InvalidMessageException ex) { + Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize); + } + } + @Test(timeOut = 30000) public void testProducerSemaphoreAcquireAndRelease() throws PulsarClientException, ExecutionException, InterruptedException { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java index afd3f808f8e55..ce1d361ecbbd3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java @@ -185,6 +185,8 @@ public boolean isMultiBatches() { public OpSendMsg createOpSendMsg() throws IOException { ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, getCompressedBatchMetadataAndPayload()); if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) { + messages.forEach(msg -> producer.client.getMemoryLimitController() + .releaseMemory(msg.getUncompressedSize())); discard(new PulsarClientException.InvalidMessageException( "Message size is bigger than " + ClientCnx.getMaxMessageSize() + " bytes")); return null;