Skip to content

Commit

Permalink
Forget to update memory usage when invalid message (#16835)
Browse files Browse the repository at this point in the history
release memory usage when invalid message.
Only need to release memory usage here, no need to release semaphore. Both add testcases.

coauthored by @pengxiangrui127.

- add unit tests for this change

Check the box below or label this PR directly.

Need to update docs?

- [x] `doc-not-needed`
bug fix, no need doc

(cherry picked from commit 57b008a)
  • Loading branch information
shoothzj authored and congbobo184 committed Nov 10, 2022
1 parent b4ce1b7 commit 4f708bd
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 1 deletion.
Expand Up @@ -27,7 +27,7 @@
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;

Expand All @@ -47,6 +47,31 @@ protected void cleanup() throws Exception {
super.internalCleanup();
}

@Test(timeOut = 10_000)
public void testProducerInvalidMessageMemoryRelease() throws Exception {
initClientWithMemoryLimit();
@Cleanup
ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer()
.topic("testProducerMemoryLimit")
.sendTimeout(5, TimeUnit.SECONDS)
.batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS)
.batchingMaxBytes(10240)
.enableBatching(true)
.create();
this.stopBroker();
try {
Field field = ClientCnx.class.getDeclaredField("maxMessageSize");
field.setAccessible(true);
field.set(producer.getClientCnx(), 8);
producer.send("memory-test".getBytes(StandardCharsets.UTF_8));
throw new IllegalStateException("can not reach here");
} catch (PulsarClientException.InvalidMessageException ex) {
PulsarClientImpl clientImpl = (PulsarClientImpl) this.pulsarClient;
final MemoryLimitController memoryLimitController = clientImpl.getMemoryLimitController();
Assert.assertEquals(memoryLimitController.currentUsage(), 0);
}
}

@Test(timeOut = 10_000)
public void testProducerTimeoutMemoryRelease() throws Exception {
initClientWithMemoryLimit();
Expand Down
Expand Up @@ -200,6 +200,8 @@ public OpSendMsg createOpSendMsg() throws IOException {
ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, getCompressedBatchMetadataAndPayload());
if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) {
producer.semaphoreRelease(messages.size());
messages.forEach(msg -> producer.client.getMemoryLimitController()
.releaseMemory(msg.getUncompressedSize()));
discard(new PulsarClientException.InvalidMessageException(
"Message size is bigger than " + ClientCnx.getMaxMessageSize() + " bytes"));
return null;
Expand Down

0 comments on commit 4f708bd

Please sign in to comment.