From 1f76d0de94eda94b91648f6235acde6ebd02d43a Mon Sep 17 00:00:00 2001 From: Masahiro Sakamoto Date: Wed, 4 Aug 2021 14:10:38 +0900 Subject: [PATCH] [websocket] Query parameter "negativeAckRedeliveryDelay" should be effective even if DLQ is disabled (#11495) ### Motivation On the consumer endpoint of WebSocket API, we can specify the delay time before a message which is negatively acknowledged is redelivered using the query parameter `negativeAckRedeliveryDelay`. However, this parameter is currently ignored when DLQ is disabled. I think this is an implementation mistake. Users should be able to specify `negativeAckRedeliveryDelay` even if DLQ is disabled. https://github.com/apache/pulsar/blob/ee202d06548e3c73d70ad52374658ee3507ca809/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java#L389-L403 Related PR: https://github.com/apache/pulsar/pull/8249 ### Modifications Fixed `ConsumerHandler` of WebSocket to use the `negativeAckRedeliveryDelay` value specified by the client even if DLQ is disabled. In addition, fixed an inappropriate test code (`ProxyPublishConsumeTest#nackMessageTest()`). --- .../proxy/ProxyPublishConsumeTest.java | 77 ++++++++++++++++--- .../websocket/proxy/SimpleProducerSocket.java | 10 ++- .../pulsar/websocket/ConsumerHandler.java | 8 +- site2/docs/client-libraries-websocket.md | 2 +- 4 files changed, 79 insertions(+), 18 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java index 2019a9ebd9a33..941e410cd30a5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java @@ -38,6 +38,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Optional; +import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -769,11 +770,11 @@ public void socketPullModeTest() throws Exception { } } - @Test(timeOut = 10000) + @Test(timeOut = 20000) public void nackMessageTest() throws Exception { final String subscription = "my-sub"; - final String dlqTopic = "my-property/my-ns/my-topic10"; - final String consumerTopic = "my-property/my-ns/my-topic9"; + final String dlqTopic = "my-property/my-ns/nack-msg-dlq-" + UUID.randomUUID(); + final String consumerTopic = "my-property/my-ns/nack-msg-" + UUID.randomUUID(); final String dlqUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get() + "/ws/v2/consumer/persistent/" + @@ -784,7 +785,7 @@ public void nackMessageTest() throws Exception { "/ws/v2/consumer/persistent/" + consumerTopic + "/" + subscription + "?deadLetterTopic=" + dlqTopic + - "&maxRedeliverCount=0&subscriptionType=Shared&ackTimeoutMillis=1000&negativeAckRedeliveryDelay=1000"; + "&maxRedeliverCount=1&subscriptionType=Shared&negativeAckRedeliveryDelay=1000"; final String producerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get() + "/ws/v2/producer/persistent/" + consumerTopic; @@ -794,7 +795,7 @@ public void nackMessageTest() throws Exception { WebSocketClient consumeClient2 = new WebSocketClient(); SimpleConsumerSocket consumeSocket2 = new SimpleConsumerSocket(); WebSocketClient produceClient = new WebSocketClient(); - SimpleProducerSocket produceSocket = new SimpleProducerSocket(); + SimpleProducerSocket produceSocket = new SimpleProducerSocket(0); consumeSocket1.setMessageHandler((id, data) -> { JsonObject nack = new JsonObject(); @@ -824,18 +825,70 @@ public void nackMessageTest() throws Exception { produceSocket.sendMessage(1); - Thread.sleep(500); + // Main topic + Awaitility.await().atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> assertEquals(consumeSocket1.getReceivedMessagesCount(), 2)); - //assertEquals(consumeSocket1.getReceivedMessagesCount(), 1); - assertTrue(consumeSocket1.getReceivedMessagesCount() > 0); + // DLQ + Awaitility.await().atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> assertEquals(consumeSocket2.getReceivedMessagesCount(), 1)); + } finally { + stopWebSocketClient(consumeClient1, consumeClient2, produceClient); + } + } - Thread.sleep(500); + @Test(timeOut = 20000) + public void nackRedeliveryDelayTest() throws Exception { + final String uriBase = "ws://localhost:" + proxyServer.getListenPortHTTP().get() + "/ws/v2"; + final String topic = "my-property/my-ns/nack-redelivery-delay-" + UUID.randomUUID(); + final String sub = "my-sub"; + final int delayTime = 5000; + + final String consumerUri = String.format("%s/consumer/persistent/%s/%s?negativeAckRedeliveryDelay=%d", uriBase, + topic, sub, delayTime); + + final String producerUri = String.format("%s/producer/persistent/%s", uriBase, topic); + + final WebSocketClient consumeClient = new WebSocketClient(); + final SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket(); + + final WebSocketClient produceClient = new WebSocketClient(); + final SimpleProducerSocket produceSocket = new SimpleProducerSocket(0); + + consumeSocket.setMessageHandler((mid, data) -> { + JsonObject nack = new JsonObject(); + nack.add("type", new JsonPrimitive("negativeAcknowledge")); + nack.add("messageId", new JsonPrimitive(mid)); + return nack.toString(); + }); + + try { + consumeClient.start(); + final ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest(); + final Future consumerFuture = consumeClient.connect(consumeSocket, URI.create(consumerUri), + consumeRequest); + assertTrue(consumerFuture.get().isOpen()); - //assertEquals(consumeSocket2.getReceivedMessagesCount(), 1); - assertTrue(consumeSocket1.getReceivedMessagesCount() > 0); + produceClient.start(); + final ClientUpgradeRequest produceRequest = new ClientUpgradeRequest(); + final Future producerFuture = produceClient.connect(produceSocket, URI.create(producerUri), + produceRequest); + assertTrue(producerFuture.get().isOpen()); + assertEquals(consumeSocket.getReceivedMessagesCount(), 0); + + produceSocket.sendMessage(1); + + Awaitility.await().atMost(delayTime - 1000, TimeUnit.MILLISECONDS) + .untilAsserted(() -> assertEquals(consumeSocket.getReceivedMessagesCount(), 1)); + + // Nacked message should be redelivered after 5 seconds + Thread.sleep(delayTime); + + Awaitility.await().atMost(delayTime - 1000, TimeUnit.MILLISECONDS) + .untilAsserted(() -> assertEquals(consumeSocket.getReceivedMessagesCount(), 2)); } finally { - stopWebSocketClient(consumeClient1, consumeClient2, produceClient); + stopWebSocketClient(consumeClient, produceClient); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleProducerSocket.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleProducerSocket.java index 149e46ba80558..0d15e56d56c82 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleProducerSocket.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleProducerSocket.java @@ -46,10 +46,16 @@ public class SimpleProducerSocket { private final CountDownLatch closeLatch; private Session session; private final ArrayList producerBuffer; + private final int messagesToSendWhenConnected; public SimpleProducerSocket() { + this(10); + } + + public SimpleProducerSocket(int messagesToSendWhenConnected) { this.closeLatch = new CountDownLatch(1); - producerBuffer = new ArrayList<>(); + this.producerBuffer = new ArrayList<>(); + this.messagesToSendWhenConnected = messagesToSendWhenConnected; } private static String getTestJsonPayload(int index) throws JsonProcessingException { @@ -74,7 +80,7 @@ public void onClose(int statusCode, String reason) { public void onConnect(Session session) throws Exception { log.info("Got connect: {}", session); this.session = session; - sendMessage(10); + sendMessage(this.messagesToSendWhenConnected); } public void sendMessage(int totalMsgs) throws Exception { diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java index 525ea0f6bf2f4..a1c76d26fd2c4 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java @@ -386,6 +386,11 @@ protected ConsumerBuilder getConsumerConfiguration(PulsarClient client) builder.priorityLevel(Integer.parseInt(queryParams.get("priorityLevel"))); } + if (queryParams.containsKey("negativeAckRedeliveryDelay")) { + builder.negativeAckRedeliveryDelay(Integer.parseInt(queryParams.get("negativeAckRedeliveryDelay")), + TimeUnit.MILLISECONDS); + } + if (queryParams.containsKey("maxRedeliverCount") || queryParams.containsKey("deadLetterTopic")) { DeadLetterPolicy.DeadLetterPolicyBuilder dlpBuilder = DeadLetterPolicy.builder(); if (queryParams.containsKey("maxRedeliverCount")) { @@ -396,9 +401,6 @@ protected ConsumerBuilder getConsumerConfiguration(PulsarClient client) if (queryParams.containsKey("deadLetterTopic")) { dlpBuilder.deadLetterTopic(queryParams.get("deadLetterTopic")); } - if (queryParams.containsKey("negativeAckRedeliveryDelay")) { - builder.negativeAckRedeliveryDelay(Integer.parseInt(queryParams.get("negativeAckRedeliveryDelay")), TimeUnit.MILLISECONDS); - } builder.deadLetterPolicy(dlpBuilder.build()); } diff --git a/site2/docs/client-libraries-websocket.md b/site2/docs/client-libraries-websocket.md index d887e452ba030..4af897997f5bc 100644 --- a/site2/docs/client-libraries-websocket.md +++ b/site2/docs/client-libraries-websocket.md @@ -167,7 +167,7 @@ Key | Type | Required? | Explanation `maxRedeliverCount` | int | no | Define a [maxRedeliverCount](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerBuilder.html#deadLetterPolicy-org.apache.pulsar.client.api.DeadLetterPolicy-) for the consumer (default: 0). Activates [Dead Letter Topic](https://github.com/apache/pulsar/wiki/PIP-22%3A-Pulsar-Dead-Letter-Topic) feature. `deadLetterTopic` | string | no | Define a [deadLetterTopic](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerBuilder.html#deadLetterPolicy-org.apache.pulsar.client.api.DeadLetterPolicy-) for the consumer (default: {topic}-{subscription}-DLQ). Activates [Dead Letter Topic](https://github.com/apache/pulsar/wiki/PIP-22%3A-Pulsar-Dead-Letter-Topic) feature. `pullMode` | boolean | no | Enable pull mode (default: false). See "Flow Control" below. -`negativeAckRedeliveryDelay` | int | no | When a message is negatively acknowledged, it will be redelivered to the DLQ. +`negativeAckRedeliveryDelay` | int | no | When a message is negatively acknowledged, the delay time before the message is redelivered (in milliseconds). The default value is 60000. `token` | string | no | Authentication token, this is used for the browser javascript client NB: these parameter (except `pullMode`) apply to the internal consumer of the WebSocket service.