From eeb2ed27c957772f0d7f437ef6fcbea47c97979d Mon Sep 17 00:00:00 2001 From: lixinyang <84127069+Nicklee007@users.noreply.github.com> Date: Wed, 3 Aug 2022 12:09:13 +0800 Subject: [PATCH] [fix][client]Fix MaxQueueSize semaphore release leak in createOpSendMsg (#16915) (cherry picked from commit d95f6cf366f66bc1e38711bc59cd456c8f53f888) (cherry picked from commit 70f1a03925a7fafb74eba6c538361f01606f0f4f) --- .../pulsar/client/impl/ProducerSemaphoreTest.java | 13 ++++++++++++- .../client/impl/BatchMessageContainerImpl.java | 1 + 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java index 9507e9a5e2cb4..2bc81c48f9beb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java @@ -66,7 +66,7 @@ public void testProducerSemaphoreInvalidMessage() throws Exception { ProducerImpl producer = (ProducerImpl) pulsarClient.newProducer() .topic("testProducerSemaphoreAcquire") .maxPendingMessages(pendingQueueSize) - .enableBatching(false) + .enableBatching(true) .create(); this.stopBroker(); @@ -79,6 +79,17 @@ public void testProducerSemaphoreInvalidMessage() throws Exception { } catch (PulsarClientException.InvalidMessageException ex) { Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize); } + + producer.conf.setBatchingEnabled(false); + try { + try (MockedStatic mockedStatic = Mockito.mockStatic(ClientCnx.class)) { + mockedStatic.when(ClientCnx::getMaxMessageSize).thenReturn(2); + producer.send("semaphore-test".getBytes(StandardCharsets.UTF_8)); + } + throw new IllegalStateException("can not reach here"); + } catch (PulsarClientException.InvalidMessageException ex) { + Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize); + } } @Test(timeOut = 30000) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java index ce1d361ecbbd3..0a6fedf0ab605 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java @@ -185,6 +185,7 @@ public boolean isMultiBatches() { public OpSendMsg createOpSendMsg() throws IOException { ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, getCompressedBatchMetadataAndPayload()); if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) { + producer.semaphoreRelease(messages.size()); messages.forEach(msg -> producer.client.getMemoryLimitController() .releaseMemory(msg.getUncompressedSize())); discard(new PulsarClientException.InvalidMessageException(