Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[websocket] Query parameter "negativeAckRedeliveryDelay" should be effective even if DLQ is disabled #11495

Merged
merged 2 commits into from Aug 4, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -38,6 +38,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -769,11 +770,11 @@ public void socketPullModeTest() throws Exception {
}
}

@Test(timeOut = 10000)
@Test(timeOut = 20000)
public void nackMessageTest() throws Exception {
final String subscription = "my-sub";
final String dlqTopic = "my-property/my-ns/my-topic10";
final String consumerTopic = "my-property/my-ns/my-topic9";
final String dlqTopic = "my-property/my-ns/nack-msg-dlq-" + UUID.randomUUID();
final String consumerTopic = "my-property/my-ns/nack-msg-" + UUID.randomUUID();

final String dlqUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get() +
"/ws/v2/consumer/persistent/" +
Expand All @@ -784,7 +785,7 @@ public void nackMessageTest() throws Exception {
"/ws/v2/consumer/persistent/" +
consumerTopic + "/" + subscription +
"?deadLetterTopic=" + dlqTopic +
"&maxRedeliverCount=0&subscriptionType=Shared&ackTimeoutMillis=1000&negativeAckRedeliveryDelay=1000";
"&maxRedeliverCount=1&subscriptionType=Shared&negativeAckRedeliveryDelay=1000";

final String producerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get() +
"/ws/v2/producer/persistent/" + consumerTopic;
Expand All @@ -794,7 +795,7 @@ public void nackMessageTest() throws Exception {
WebSocketClient consumeClient2 = new WebSocketClient();
SimpleConsumerSocket consumeSocket2 = new SimpleConsumerSocket();
WebSocketClient produceClient = new WebSocketClient();
SimpleProducerSocket produceSocket = new SimpleProducerSocket();
SimpleProducerSocket produceSocket = new SimpleProducerSocket(0);

consumeSocket1.setMessageHandler((id, data) -> {
JsonObject nack = new JsonObject();
Expand Down Expand Up @@ -824,18 +825,70 @@ public void nackMessageTest() throws Exception {

produceSocket.sendMessage(1);

Thread.sleep(500);
// Main topic
Awaitility.await().atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> assertEquals(consumeSocket1.getReceivedMessagesCount(), 2));

//assertEquals(consumeSocket1.getReceivedMessagesCount(), 1);
assertTrue(consumeSocket1.getReceivedMessagesCount() > 0);
// DLQ
Awaitility.await().atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> assertEquals(consumeSocket2.getReceivedMessagesCount(), 1));
} finally {
stopWebSocketClient(consumeClient1, consumeClient2, produceClient);
}
}

Thread.sleep(500);
@Test(timeOut = 20000)
public void nackRedeliveryDelayTest() throws Exception {
final String uriBase = "ws://localhost:" + proxyServer.getListenPortHTTP().get() + "/ws/v2";
final String topic = "my-property/my-ns/nack-redelivery-delay-" + UUID.randomUUID();
final String sub = "my-sub";
final int delayTime = 5000;

final String consumerUri = String.format("%s/consumer/persistent/%s/%s?negativeAckRedeliveryDelay=%d", uriBase,
topic, sub, delayTime);

final String producerUri = String.format("%s/producer/persistent/%s", uriBase, topic);

final WebSocketClient consumeClient = new WebSocketClient();
final SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();

final WebSocketClient produceClient = new WebSocketClient();
final SimpleProducerSocket produceSocket = new SimpleProducerSocket(0);

consumeSocket.setMessageHandler((mid, data) -> {
JsonObject nack = new JsonObject();
nack.add("type", new JsonPrimitive("negativeAcknowledge"));
nack.add("messageId", new JsonPrimitive(mid));
return nack.toString();
});

try {
consumeClient.start();
final ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest();
final Future<Session> consumerFuture = consumeClient.connect(consumeSocket, URI.create(consumerUri),
consumeRequest);
assertTrue(consumerFuture.get().isOpen());

//assertEquals(consumeSocket2.getReceivedMessagesCount(), 1);
assertTrue(consumeSocket1.getReceivedMessagesCount() > 0);
produceClient.start();
final ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
final Future<Session> producerFuture = produceClient.connect(produceSocket, URI.create(producerUri),
produceRequest);
assertTrue(producerFuture.get().isOpen());

assertEquals(consumeSocket.getReceivedMessagesCount(), 0);

produceSocket.sendMessage(1);

Awaitility.await().atMost(delayTime - 1000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> assertEquals(consumeSocket.getReceivedMessagesCount(), 1));

// Nacked message should be redelivered after 5 seconds
Thread.sleep(delayTime);

Awaitility.await().atMost(delayTime - 1000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> assertEquals(consumeSocket.getReceivedMessagesCount(), 2));
} finally {
stopWebSocketClient(consumeClient1, consumeClient2, produceClient);
stopWebSocketClient(consumeClient, produceClient);
}
}

Expand Down
Expand Up @@ -46,10 +46,16 @@ public class SimpleProducerSocket {
private final CountDownLatch closeLatch;
private Session session;
private final ArrayList<String> producerBuffer;
private final int messagesToSendWhenConnected;

public SimpleProducerSocket() {
this(10);
}

public SimpleProducerSocket(int messagesToSendWhenConnected) {
this.closeLatch = new CountDownLatch(1);
producerBuffer = new ArrayList<>();
this.producerBuffer = new ArrayList<>();
this.messagesToSendWhenConnected = messagesToSendWhenConnected;
}

private static String getTestJsonPayload(int index) throws JsonProcessingException {
Expand All @@ -74,7 +80,7 @@ public void onClose(int statusCode, String reason) {
public void onConnect(Session session) throws Exception {
log.info("Got connect: {}", session);
this.session = session;
sendMessage(10);
sendMessage(this.messagesToSendWhenConnected);
}

public void sendMessage(int totalMsgs) throws Exception {
Expand Down
Expand Up @@ -386,6 +386,11 @@ protected ConsumerBuilder<byte[]> getConsumerConfiguration(PulsarClient client)
builder.priorityLevel(Integer.parseInt(queryParams.get("priorityLevel")));
}

if (queryParams.containsKey("negativeAckRedeliveryDelay")) {
builder.negativeAckRedeliveryDelay(Integer.parseInt(queryParams.get("negativeAckRedeliveryDelay")),
TimeUnit.MILLISECONDS);
}

if (queryParams.containsKey("maxRedeliverCount") || queryParams.containsKey("deadLetterTopic")) {
DeadLetterPolicy.DeadLetterPolicyBuilder dlpBuilder = DeadLetterPolicy.builder();
if (queryParams.containsKey("maxRedeliverCount")) {
Expand All @@ -396,9 +401,6 @@ protected ConsumerBuilder<byte[]> getConsumerConfiguration(PulsarClient client)
if (queryParams.containsKey("deadLetterTopic")) {
dlpBuilder.deadLetterTopic(queryParams.get("deadLetterTopic"));
}
if (queryParams.containsKey("negativeAckRedeliveryDelay")) {
builder.negativeAckRedeliveryDelay(Integer.parseInt(queryParams.get("negativeAckRedeliveryDelay")), TimeUnit.MILLISECONDS);
}
builder.deadLetterPolicy(dlpBuilder.build());
}

Expand Down
2 changes: 1 addition & 1 deletion site2/docs/client-libraries-websocket.md
Expand Up @@ -167,7 +167,7 @@ Key | Type | Required? | Explanation
`maxRedeliverCount` | int | no | Define a [maxRedeliverCount](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerBuilder.html#deadLetterPolicy-org.apache.pulsar.client.api.DeadLetterPolicy-) for the consumer (default: 0). Activates [Dead Letter Topic](https://github.com/apache/pulsar/wiki/PIP-22%3A-Pulsar-Dead-Letter-Topic) feature.
`deadLetterTopic` | string | no | Define a [deadLetterTopic](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerBuilder.html#deadLetterPolicy-org.apache.pulsar.client.api.DeadLetterPolicy-) for the consumer (default: {topic}-{subscription}-DLQ). Activates [Dead Letter Topic](https://github.com/apache/pulsar/wiki/PIP-22%3A-Pulsar-Dead-Letter-Topic) feature.
`pullMode` | boolean | no | Enable pull mode (default: false). See "Flow Control" below.
`negativeAckRedeliveryDelay` | int | no | When a message is negatively acknowledged, it will be redelivered to the DLQ.
`negativeAckRedeliveryDelay` | int | no | When a message is negatively acknowledged, the delay time in milliseconds before the message is redelivered (default: 60000)
massakam marked this conversation as resolved.
Show resolved Hide resolved
`token` | string | no | Authentication token, this is used for the browser javascript client

NB: these parameter (except `pullMode`) apply to the internal consumer of the WebSocket service.
Expand Down