From 91bc3cc9a9234fa169e8245e9b9f99b452dd64eb Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Mon, 30 Aug 2021 18:53:36 -0700 Subject: [PATCH] Handle receiveAsync() failures in MultiTopicsConsumer (#11843) --- .../apache/pulsar/client/impl/MultiTopicsConsumerImpl.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 997358dcdc121..7588c34716049 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -269,6 +269,10 @@ private void receiveMessageFromConsumer(ConsumerImpl consumer) { // recursion and stack overflow internalPinnedExecutor.execute(() -> receiveMessageFromConsumer(consumer)); } + }).exceptionally(ex -> { + log.error("Receive operation failed on consumer {} - Retrying later", consumer, ex); + internalPinnedExecutor.schedule(() -> receiveMessageFromConsumer(consumer), 10, TimeUnit.SECONDS); + return null; }); }