Skip to content

Commit

Permalink
[websocket] Query parameter "negativeAckRedeliveryDelay" should be ef…
Browse files Browse the repository at this point in the history
…fective even if DLQ is disabled (apache#11495)

### Motivation

On the consumer endpoint of WebSocket API, we can specify the delay time before a message which is negatively acknowledged is redelivered using the query parameter `negativeAckRedeliveryDelay`.

However, this parameter is currently ignored when DLQ is disabled. I think this is an implementation mistake. Users should be able to specify `negativeAckRedeliveryDelay` even if DLQ is disabled.
https://github.com/apache/pulsar/blob/ee202d06548e3c73d70ad52374658ee3507ca809/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java#L389-L403

Related PR: apache#8249

### Modifications

Fixed `ConsumerHandler` of WebSocket to use the `negativeAckRedeliveryDelay` value specified by the client even if DLQ is disabled. In addition, fixed an inappropriate test code (`ProxyPublishConsumeTest#nackMessageTest()`).
  • Loading branch information
Masahiro Sakamoto authored and ciaocloud committed Oct 16, 2021
1 parent d2e2be8 commit 0ec35fb
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 18 deletions.
Expand Up @@ -38,6 +38,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -769,11 +770,11 @@ public void socketPullModeTest() throws Exception {
}
}

@Test(timeOut = 10000)
@Test(timeOut = 20000)
public void nackMessageTest() throws Exception {
final String subscription = "my-sub";
final String dlqTopic = "my-property/my-ns/my-topic10";
final String consumerTopic = "my-property/my-ns/my-topic9";
final String dlqTopic = "my-property/my-ns/nack-msg-dlq-" + UUID.randomUUID();
final String consumerTopic = "my-property/my-ns/nack-msg-" + UUID.randomUUID();

final String dlqUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get() +
"/ws/v2/consumer/persistent/" +
Expand All @@ -784,7 +785,7 @@ public void nackMessageTest() throws Exception {
"/ws/v2/consumer/persistent/" +
consumerTopic + "/" + subscription +
"?deadLetterTopic=" + dlqTopic +
"&maxRedeliverCount=0&subscriptionType=Shared&ackTimeoutMillis=1000&negativeAckRedeliveryDelay=1000";
"&maxRedeliverCount=1&subscriptionType=Shared&negativeAckRedeliveryDelay=1000";

final String producerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get() +
"/ws/v2/producer/persistent/" + consumerTopic;
Expand All @@ -794,7 +795,7 @@ public void nackMessageTest() throws Exception {
WebSocketClient consumeClient2 = new WebSocketClient();
SimpleConsumerSocket consumeSocket2 = new SimpleConsumerSocket();
WebSocketClient produceClient = new WebSocketClient();
SimpleProducerSocket produceSocket = new SimpleProducerSocket();
SimpleProducerSocket produceSocket = new SimpleProducerSocket(0);

consumeSocket1.setMessageHandler((id, data) -> {
JsonObject nack = new JsonObject();
Expand Down Expand Up @@ -824,18 +825,70 @@ public void nackMessageTest() throws Exception {

produceSocket.sendMessage(1);

Thread.sleep(500);
// Main topic
Awaitility.await().atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> assertEquals(consumeSocket1.getReceivedMessagesCount(), 2));

//assertEquals(consumeSocket1.getReceivedMessagesCount(), 1);
assertTrue(consumeSocket1.getReceivedMessagesCount() > 0);
// DLQ
Awaitility.await().atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> assertEquals(consumeSocket2.getReceivedMessagesCount(), 1));
} finally {
stopWebSocketClient(consumeClient1, consumeClient2, produceClient);
}
}

Thread.sleep(500);
@Test(timeOut = 20000)
public void nackRedeliveryDelayTest() throws Exception {
final String uriBase = "ws://localhost:" + proxyServer.getListenPortHTTP().get() + "/ws/v2";
final String topic = "my-property/my-ns/nack-redelivery-delay-" + UUID.randomUUID();
final String sub = "my-sub";
final int delayTime = 5000;

final String consumerUri = String.format("%s/consumer/persistent/%s/%s?negativeAckRedeliveryDelay=%d", uriBase,
topic, sub, delayTime);

final String producerUri = String.format("%s/producer/persistent/%s", uriBase, topic);

final WebSocketClient consumeClient = new WebSocketClient();
final SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();

final WebSocketClient produceClient = new WebSocketClient();
final SimpleProducerSocket produceSocket = new SimpleProducerSocket(0);

consumeSocket.setMessageHandler((mid, data) -> {
JsonObject nack = new JsonObject();
nack.add("type", new JsonPrimitive("negativeAcknowledge"));
nack.add("messageId", new JsonPrimitive(mid));
return nack.toString();
});

try {
consumeClient.start();
final ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest();
final Future<Session> consumerFuture = consumeClient.connect(consumeSocket, URI.create(consumerUri),
consumeRequest);
assertTrue(consumerFuture.get().isOpen());

//assertEquals(consumeSocket2.getReceivedMessagesCount(), 1);
assertTrue(consumeSocket1.getReceivedMessagesCount() > 0);
produceClient.start();
final ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
final Future<Session> producerFuture = produceClient.connect(produceSocket, URI.create(producerUri),
produceRequest);
assertTrue(producerFuture.get().isOpen());

assertEquals(consumeSocket.getReceivedMessagesCount(), 0);

produceSocket.sendMessage(1);

Awaitility.await().atMost(delayTime - 1000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> assertEquals(consumeSocket.getReceivedMessagesCount(), 1));

// Nacked message should be redelivered after 5 seconds
Thread.sleep(delayTime);

Awaitility.await().atMost(delayTime - 1000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> assertEquals(consumeSocket.getReceivedMessagesCount(), 2));
} finally {
stopWebSocketClient(consumeClient1, consumeClient2, produceClient);
stopWebSocketClient(consumeClient, produceClient);
}
}

Expand Down
Expand Up @@ -46,10 +46,16 @@ public class SimpleProducerSocket {
private final CountDownLatch closeLatch;
private Session session;
private final ArrayList<String> producerBuffer;
private final int messagesToSendWhenConnected;

public SimpleProducerSocket() {
this(10);
}

public SimpleProducerSocket(int messagesToSendWhenConnected) {
this.closeLatch = new CountDownLatch(1);
producerBuffer = new ArrayList<>();
this.producerBuffer = new ArrayList<>();
this.messagesToSendWhenConnected = messagesToSendWhenConnected;
}

private static String getTestJsonPayload(int index) throws JsonProcessingException {
Expand All @@ -74,7 +80,7 @@ public void onClose(int statusCode, String reason) {
public void onConnect(Session session) throws Exception {
log.info("Got connect: {}", session);
this.session = session;
sendMessage(10);
sendMessage(this.messagesToSendWhenConnected);
}

public void sendMessage(int totalMsgs) throws Exception {
Expand Down
Expand Up @@ -386,6 +386,11 @@ protected ConsumerBuilder<byte[]> getConsumerConfiguration(PulsarClient client)
builder.priorityLevel(Integer.parseInt(queryParams.get("priorityLevel")));
}

if (queryParams.containsKey("negativeAckRedeliveryDelay")) {
builder.negativeAckRedeliveryDelay(Integer.parseInt(queryParams.get("negativeAckRedeliveryDelay")),
TimeUnit.MILLISECONDS);
}

if (queryParams.containsKey("maxRedeliverCount") || queryParams.containsKey("deadLetterTopic")) {
DeadLetterPolicy.DeadLetterPolicyBuilder dlpBuilder = DeadLetterPolicy.builder();
if (queryParams.containsKey("maxRedeliverCount")) {
Expand All @@ -396,9 +401,6 @@ protected ConsumerBuilder<byte[]> getConsumerConfiguration(PulsarClient client)
if (queryParams.containsKey("deadLetterTopic")) {
dlpBuilder.deadLetterTopic(queryParams.get("deadLetterTopic"));
}
if (queryParams.containsKey("negativeAckRedeliveryDelay")) {
builder.negativeAckRedeliveryDelay(Integer.parseInt(queryParams.get("negativeAckRedeliveryDelay")), TimeUnit.MILLISECONDS);
}
builder.deadLetterPolicy(dlpBuilder.build());
}

Expand Down
2 changes: 1 addition & 1 deletion site2/docs/client-libraries-websocket.md
Expand Up @@ -167,7 +167,7 @@ Key | Type | Required? | Explanation
`maxRedeliverCount` | int | no | Define a [maxRedeliverCount](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerBuilder.html#deadLetterPolicy-org.apache.pulsar.client.api.DeadLetterPolicy-) for the consumer (default: 0). Activates [Dead Letter Topic](https://github.com/apache/pulsar/wiki/PIP-22%3A-Pulsar-Dead-Letter-Topic) feature.
`deadLetterTopic` | string | no | Define a [deadLetterTopic](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerBuilder.html#deadLetterPolicy-org.apache.pulsar.client.api.DeadLetterPolicy-) for the consumer (default: {topic}-{subscription}-DLQ). Activates [Dead Letter Topic](https://github.com/apache/pulsar/wiki/PIP-22%3A-Pulsar-Dead-Letter-Topic) feature.
`pullMode` | boolean | no | Enable pull mode (default: false). See "Flow Control" below.
`negativeAckRedeliveryDelay` | int | no | When a message is negatively acknowledged, it will be redelivered to the DLQ.
`negativeAckRedeliveryDelay` | int | no | When a message is negatively acknowledged, the delay time before the message is redelivered (in milliseconds). The default value is 60000.
`token` | string | no | Authentication token, this is used for the browser javascript client

NB: these parameter (except `pullMode`) apply to the internal consumer of the WebSocket service.
Expand Down

0 comments on commit 0ec35fb

Please sign in to comment.