diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 575738a991cf7..9c9d4829391ef 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -970,7 +970,7 @@ public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) { } private CompletableFuture> createNonPersistentTopic(String topic) { - CompletableFuture> topicFuture = futureWithDeadline(); + CompletableFuture> topicFuture = FutureUtil.futureWithDeadline(executor()); if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) { if (log.isDebugEnabled()) { @@ -1233,8 +1233,16 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) { long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - topicCreateTimeMs; pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs); - addTopicToStatsMaps(topicName, persistentTopic); - topicFuture.complete(Optional.of(persistentTopic)); + if (topicFuture.isCompletedExceptionally()) { + log.warn("{} future is already completed with failure {}, closing the topic", + topic, FutureUtil.getException(topicFuture)); + persistentTopic.stopReplProducers().whenComplete((v, exception) -> { + topics.remove(topic, topicFuture); + }); + } else { + addTopicToStatsMaps(topicName, persistentTopic); + topicFuture.complete(Optional.of(persistentTopic)); + } }).exceptionally((ex) -> { log.warn( "Replication or dedup check failed." diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index e5219d5b655e5..d261e8516ac27 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -18,11 +18,13 @@ */ package org.apache.pulsar.broker.service; +import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -34,11 +36,13 @@ import java.lang.reflect.Field; import java.lang.reflect.Method; import java.util.List; +import java.util.Optional; import java.util.SortedSet; import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -52,6 +56,7 @@ import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.service.BrokerServiceException.NamingException; @@ -65,6 +70,7 @@ import org.apache.pulsar.client.api.RawMessage; import org.apache.pulsar.client.api.RawReader; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; @@ -1041,6 +1047,80 @@ public void testTopicReplicatedAndProducerCreate(String topicPrefix, String topi nonPersistentProducer2.close(); } + @Test + public void testCleanupTopic() throws Exception { + + final String cluster1 = pulsar1.getConfig().getClusterName(); + final String cluster2 = pulsar2.getConfig().getClusterName(); + final String namespace = "pulsar/ns-" + System.nanoTime(); + final String topicName = "persistent://" + namespace + "/cleanTopic"; + final String topicMlName = namespace + "/persistent/cleanTopic"; + admin1.namespaces().createNamespace(namespace, Sets.newHashSet(cluster1, cluster2)); + + PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS) + .build(); + + long topicLoadTimeoutSeconds = 3; + config1.setTopicLoadTimeoutSeconds(topicLoadTimeoutSeconds); + config2.setTopicLoadTimeoutSeconds(topicLoadTimeoutSeconds); + + ManagedLedgerFactoryImpl mlFactory = (ManagedLedgerFactoryImpl) pulsar1.getManagedLedgerClientFactory() + .getManagedLedgerFactory(); + Field ledgersField = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers"); + ledgersField.setAccessible(true); + ConcurrentHashMap> ledgers = (ConcurrentHashMap>) ledgersField + .get(mlFactory); + CompletableFuture mlFuture = new CompletableFuture<>(); + ledgers.put(topicMlName, mlFuture); + + try { + Consumer consumer = client1.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared) + .subscriptionName("my-subscriber-name").subscribeAsync().get(100, TimeUnit.MILLISECONDS); + fail("consumer should fail due to topic loading failure"); + } catch (Exception e) { + // Ok + } + + CompletableFuture> topicFuture = null; + for (int i = 0; i < 5; i++) { + topicFuture = pulsar1.getBrokerService().getTopics().get(topicName); + if (topicFuture != null) { + break; + } + Thread.sleep(i * 1000); + } + + try { + topicFuture.get(); + fail("topic creation should fail"); + } catch (Exception e) { + // Ok + } + + final CompletableFuture> timedOutTopicFuture = topicFuture; + // timeout topic future should be removed from cache + retryStrategically((test) -> pulsar1.getBrokerService().getTopic(topicName, false) != timedOutTopicFuture, 5, + 1000); + + assertNotEquals(timedOutTopicFuture, pulsar1.getBrokerService().getTopics().get(topicName)); + + try { + Consumer consumer = client1.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared) + .subscriptionName("my-subscriber-name").subscribeAsync().get(100, TimeUnit.MILLISECONDS); + fail("consumer should fail due to topic loading failure"); + } catch (Exception e) { + // Ok + } + + ManagedLedgerImpl ml = (ManagedLedgerImpl) mlFactory.open(topicMlName + "-2"); + mlFuture.complete(ml); + + Consumer consumer = client1.newConsumer().topic(topicName).subscriptionName("my-subscriber-name") + .subscriptionType(SubscriptionType.Shared).subscribeAsync() + .get(2 * topicLoadTimeoutSeconds, TimeUnit.SECONDS); + + consumer.close(); + } private static final Logger log = LoggerFactory.getLogger(ReplicatorTest.class); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java index 735695062f371..0c3a0c03028c8 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java @@ -20,8 +20,10 @@ import java.time.Duration; import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -162,4 +164,33 @@ public synchronized Throwable fillInStackTrace() { return this; } } + + public static CompletableFuture futureWithDeadline(ScheduledExecutorService executor, Long delay, + TimeUnit unit, Exception exp) { + CompletableFuture future = new CompletableFuture(); + executor.schedule(() -> { + if (!future.isDone()) { + future.completeExceptionally(exp); + } + }, delay, unit); + return future; + } + + public static CompletableFuture futureWithDeadline(ScheduledExecutorService executor) { + return futureWithDeadline(executor, 60000L, TimeUnit.MILLISECONDS, + new TimeoutException("Future didn't finish within deadline")); + } + + public static Optional getException(CompletableFuture future) { + if (future != null && future.isCompletedExceptionally()) { + try { + future.get(); + } catch (InterruptedException e) { + return Optional.ofNullable(e); + } catch (ExecutionException e) { + return Optional.ofNullable(e.getCause()); + } + } + return Optional.empty(); + } }