diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java index 9507e9a5e2cb4..2bc81c48f9beb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java @@ -66,7 +66,7 @@ public void testProducerSemaphoreInvalidMessage() throws Exception { ProducerImpl producer = (ProducerImpl) pulsarClient.newProducer() .topic("testProducerSemaphoreAcquire") .maxPendingMessages(pendingQueueSize) - .enableBatching(false) + .enableBatching(true) .create(); this.stopBroker(); @@ -79,6 +79,17 @@ public void testProducerSemaphoreInvalidMessage() throws Exception { } catch (PulsarClientException.InvalidMessageException ex) { Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize); } + + producer.conf.setBatchingEnabled(false); + try { + try (MockedStatic mockedStatic = Mockito.mockStatic(ClientCnx.class)) { + mockedStatic.when(ClientCnx::getMaxMessageSize).thenReturn(2); + producer.send("semaphore-test".getBytes(StandardCharsets.UTF_8)); + } + throw new IllegalStateException("can not reach here"); + } catch (PulsarClientException.InvalidMessageException ex) { + Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize); + } } @Test(timeOut = 30000) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java index 0d107aa7ba953..7d95f0963ba22 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java @@ -199,6 +199,7 @@ public boolean isMultiBatches() { public OpSendMsg createOpSendMsg() throws IOException { ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, getCompressedBatchMetadataAndPayload()); if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) { + producer.semaphoreRelease(messages.size()); messages.forEach(msg -> producer.client.getMemoryLimitController() .releaseMemory(msg.getUncompressedSize())); discard(new PulsarClientException.InvalidMessageException(