From bea4eb1f6e803e436cde621fec54abe065969a54 Mon Sep 17 00:00:00 2001 From: Rajan Dhabalia Date: Thu, 17 Jun 2021 23:07:34 -0700 Subject: [PATCH] [pulsar-broker] Handle multiple topic creation for same topic-name in broker (#10847) When the broker takes a longer time to load the topic and times out before completing the topic future, then the broker keeps multiple topics opened , doesn't clean up timed out topic, fail to create replicator producer on successfully created topic with error: `repl-producer is already connected to topic`, builds replication backlog. ``` 19:16:10.107 [pulsar-ordered-OrderedExecutor-5-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - Opening managed ledger myProp/global/myNs/persistent/myTopic : 9:17:10.953 [BookKeeperClientWorker-OrderedExecutor-2-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl - [myProp/global/myNs/persistent/myTopic] Successfully initialize managed ledger : 19:17:10.065 [pulsar-io-23-30] ERROR org.apache.pulsar.broker.service.ServerCnx - [/10.196.133.62:47278] Failed to create topic persistent://myProp/global/myNs/myTopic, producerId=382 : 19:17:10.954 [BookKeeperClientWorker-OrderedExecutor-2-0] INFO org.apache.pulsar.broker.service.persistent.PersistentReplicator - [persistent://myProp/global/myNs/myTopic][west1 -> west2] Starting open cursor for replicator : 19:17:10.955 [BookKeeperClientWorker-OrderedExecutor-2-0] INFO org.apache.pulsar.broker.service.BrokerService - Created topic persistent://myProp/global/myNs/myTopic - dedup is disabled : 19:17:51.532 [pulsar-ordered-OrderedExecutor-5-0] INFO org.apache.pulsar.broker.service.BrokerService - Created topic persistent://myProp/global/myNs/myTopic - dedup is disabled : 19:17:51.530 [pulsar-ordered-OrderedExecutor-5-0] INFO org.apache.pulsar.broker.service.persistent.PersistentReplicator - [persistent://myProp/global/myNs/myTopic][west1 -> west2] Starting open cursor for replicator : 07:25:51.377 [pulsar-io-23-5] ERROR org.apache.pulsar.client.impl.ProducerImpl - [persistent://myProp/global/myNs/myTopic] [pulsar.repl.west1] Failed to create producer: Producer with name 'pulsarrepl.west1' is already connected to topic ``` - Stopped replicator for failed and timed-out topic - Clean up failed topic - Successfully create replicator producer for the topic and avoid creating replication backlog (cherry picked from commit 1447e6b1061babedc08901c44f16164bb4c4e2df) --- .../pulsar/broker/service/BrokerService.java | 14 +++- .../pulsar/broker/service/ReplicatorTest.java | 81 +++++++++++++++++++ .../apache/pulsar/common/util/FutureUtil.java | 34 ++++++++ 3 files changed, 126 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 9527128dae246..9376d85a8d0c1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -885,7 +885,7 @@ public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) { } private CompletableFuture> createNonPersistentTopic(String topic) { - CompletableFuture> topicFuture = futureWithDeadline(); + CompletableFuture> topicFuture = FutureUtil.futureWithDeadline(executor()); if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) { if (log.isDebugEnabled()) { @@ -1118,8 +1118,16 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) { long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - topicCreateTimeMs; pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs); - addTopicToStatsMaps(topicName, persistentTopic); - topicFuture.complete(Optional.of(persistentTopic)); + if (topicFuture.isCompletedExceptionally()) { + log.warn("{} future is already completed with failure {}, closing the topic", + topic, FutureUtil.getException(topicFuture)); + persistentTopic.stopReplProducers().whenComplete((v, exception) -> { + topics.remove(topic, topicFuture); + }); + } else { + addTopicToStatsMaps(topicName, persistentTopic); + topicFuture.complete(Optional.of(persistentTopic)); + } }).exceptionally((ex) -> { log.warn( "Replication or dedup check failed. Removing topic from topics list {}, {}", diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index fc58dac2202ea..26819c8f1d002 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -18,11 +18,13 @@ */ package org.apache.pulsar.broker.service; +import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -34,11 +36,13 @@ import java.lang.reflect.Field; import java.lang.reflect.Method; import java.util.List; +import java.util.Optional; import java.util.SortedSet; import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -52,6 +56,7 @@ import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.pulsar.broker.service.BrokerServiceException.NamingException; import org.apache.pulsar.broker.service.persistent.PersistentReplicator; @@ -64,6 +69,7 @@ import org.apache.pulsar.client.api.RawMessage; import org.apache.pulsar.client.api.RawReader; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; @@ -985,6 +991,81 @@ public void testUpdateGlobalTopicPartition() throws Exception { client1.close(); client2.close(); } + + @Test + public void testCleanupTopic() throws Exception { + + final String cluster1 = pulsar1.getConfig().getClusterName(); + final String cluster2 = pulsar2.getConfig().getClusterName(); + final String namespace = "pulsar/ns-" + System.nanoTime(); + final String topicName = "persistent://" + namespace + "/cleanTopic"; + final String topicMlName = namespace + "/persistent/cleanTopic"; + admin1.namespaces().createNamespace(namespace, Sets.newHashSet(cluster1, cluster2)); + + PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS) + .build(); + + long topicLoadTimeoutSeconds = 3; + config1.setTopicLoadTimeoutSeconds(topicLoadTimeoutSeconds); + config2.setTopicLoadTimeoutSeconds(topicLoadTimeoutSeconds); + + ManagedLedgerFactoryImpl mlFactory = (ManagedLedgerFactoryImpl) pulsar1.getManagedLedgerClientFactory() + .getManagedLedgerFactory(); + Field ledgersField = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers"); + ledgersField.setAccessible(true); + ConcurrentHashMap> ledgers = (ConcurrentHashMap>) ledgersField + .get(mlFactory); + CompletableFuture mlFuture = new CompletableFuture<>(); + ledgers.put(topicMlName, mlFuture); + + try { + Consumer consumer = client1.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared) + .subscriptionName("my-subscriber-name").subscribeAsync().get(100, TimeUnit.MILLISECONDS); + fail("consumer should fail due to topic loading failure"); + } catch (Exception e) { + // Ok + } + + CompletableFuture> topicFuture = null; + for (int i = 0; i < 5; i++) { + topicFuture = pulsar1.getBrokerService().getTopics().get(topicName); + if (topicFuture != null) { + break; + } + Thread.sleep(i * 1000); + } + + try { + topicFuture.get(); + fail("topic creation should fail"); + } catch (Exception e) { + // Ok + } + + final CompletableFuture> timedOutTopicFuture = topicFuture; + // timeout topic future should be removed from cache + retryStrategically((test) -> pulsar1.getBrokerService().getTopic(topicName, false) != timedOutTopicFuture, 5, + 1000); + + assertNotEquals(timedOutTopicFuture, pulsar1.getBrokerService().getTopics().get(topicName)); + + try { + Consumer consumer = client1.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared) + .subscriptionName("my-subscriber-name").subscribeAsync().get(100, TimeUnit.MILLISECONDS); + fail("consumer should fail due to topic loading failure"); + } catch (Exception e) { + // Ok + } + + ManagedLedgerImpl ml = (ManagedLedgerImpl) mlFactory.open(topicMlName + "-2"); + mlFuture.complete(ml); + + Consumer consumer = client1.newConsumer().topic(topicName).subscriptionName("my-subscriber-name") + .subscriptionType(SubscriptionType.Shared).subscribeAsync() + .get(2 * topicLoadTimeoutSeconds, TimeUnit.SECONDS); + + consumer.close(); + } private static final Logger log = LoggerFactory.getLogger(ReplicatorTest.class); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java index b86ee1098e535..779eddda90438 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java @@ -19,8 +19,13 @@ package org.apache.pulsar.common.util; import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * This class is aimed at simplifying work with {@code CompletableFuture}. @@ -50,4 +55,33 @@ public static Throwable unwrapCompletionException(Throwable t) { return t; } } + + public static CompletableFuture futureWithDeadline(ScheduledExecutorService executor, Long delay, + TimeUnit unit, Exception exp) { + CompletableFuture future = new CompletableFuture(); + executor.schedule(() -> { + if (!future.isDone()) { + future.completeExceptionally(exp); + } + }, delay, unit); + return future; + } + + public static CompletableFuture futureWithDeadline(ScheduledExecutorService executor) { + return futureWithDeadline(executor, 60000L, TimeUnit.MILLISECONDS, + new TimeoutException("Future didn't finish within deadline")); + } + + public static Optional getException(CompletableFuture future) { + if (future != null && future.isCompletedExceptionally()) { + try { + future.get(); + } catch (InterruptedException e) { + return Optional.ofNullable(e); + } catch (ExecutionException e) { + return Optional.ofNullable(e.getCause()); + } + } + return Optional.empty(); + } }