diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java index 77e3ee811a714..e187b8f3d0ee7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java @@ -23,11 +23,14 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SizeUnit; +import org.mockito.MockedStatic; +import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeUnit; @@ -47,6 +50,32 @@ protected void cleanup() throws Exception { super.internalCleanup(); } + @Test(timeOut = 10_000) + public void testProducerInvalidMessageMemoryRelease() throws Exception { + initClientWithMemoryLimit(); + @Cleanup + ProducerImpl producer = (ProducerImpl) pulsarClient.newProducer() + .topic("testProducerMemoryLimit") + .sendTimeout(5, TimeUnit.SECONDS) + .batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS) + .batchingMaxBytes(10240) + .enableBatching(true) + .create(); + this.stopBroker(); + try { + Field field = ClientCnx.class.getDeclaredField("maxMessageSize"); + field.setAccessible(true); + field.set(producer.getClientCnx(), 8); + producer.send("memory-test".getBytes(StandardCharsets.UTF_8)); + throw new IllegalStateException("can not reach here"); + + } catch (PulsarClientException.InvalidMessageException ex) { + PulsarClientImpl clientImpl = (PulsarClientImpl) this.pulsarClient; + final MemoryLimitController memoryLimitController = clientImpl.getMemoryLimitController(); + Assert.assertEquals(memoryLimitController.currentUsage(), 0); + } + } + @Test(timeOut = 10_000) public void testProducerTimeoutMemoryRelease() throws Exception { initClientWithMemoryLimit(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java index e0ab2d942ca15..b5acc2c30e9c4 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java @@ -200,6 +200,8 @@ public OpSendMsg createOpSendMsg() throws IOException { ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, getCompressedBatchMetadataAndPayload()); if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) { producer.semaphoreRelease(messages.size()); + messages.forEach(msg -> producer.client.getMemoryLimitController() + .releaseMemory(msg.getUncompressedSize())); discard(new PulsarClientException.InvalidMessageException( "Message size is bigger than " + ClientCnx.getMaxMessageSize() + " bytes")); return null;