diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java index 0c7f4b137e7919..15d1baecca1000 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.client.impl; +import java.lang.reflect.Field; +import java.nio.charset.StandardCharsets; import lombok.Cleanup; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.ProducerConsumerBase; @@ -36,8 +38,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; @Test(groups = "broker-impl") public class ProducerSemaphoreTest extends ProducerConsumerBase { @@ -55,6 +55,39 @@ public void cleanup() throws Exception { super.internalCleanup(); } + @Test(timeOut = 10_000) + public void testProducerSemaphoreInvalidMessage() throws Exception { + final int pendingQueueSize = 100; + + @Cleanup + ProducerImpl producer = (ProducerImpl) pulsarClient.newProducer() + .topic("testProducerSemaphoreAcquire") + .maxPendingMessages(pendingQueueSize) + .enableBatching(true) + .create(); + + this.stopBroker(); + + Field maxMessageSizeFiled = ClientCnx.class.getDeclaredField("maxMessageSize"); + maxMessageSizeFiled.setAccessible(true); + maxMessageSizeFiled.set(null, 2); + + try { + producer.send("semaphore-test".getBytes(StandardCharsets.UTF_8)); + Assert.fail("can not reach here"); + } catch (PulsarClientException.InvalidMessageException ex) { + Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize); + } + + producer.conf.setBatchingEnabled(false); + try { + producer.send("semaphore-test".getBytes(StandardCharsets.UTF_8)); + Assert.fail("can not reach here"); + } catch (PulsarClientException.InvalidMessageException ex) { + Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize); + } + } + @Test(timeOut = 30000) public void testProducerSemaphoreAcquireAndRelease() throws PulsarClientException, ExecutionException, InterruptedException { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java index 996875a7131fa3..e0ab2d942ca152 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java @@ -199,6 +199,7 @@ public boolean isMultiBatches() { public OpSendMsg createOpSendMsg() throws IOException { ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, getCompressedBatchMetadataAndPayload()); if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) { + producer.semaphoreRelease(messages.size()); discard(new PulsarClientException.InvalidMessageException( "Message size is bigger than " + ClientCnx.getMaxMessageSize() + " bytes")); return null;