Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pulsar-broker] Handle multiple topic creation for same topic-name in broker #10847

Merged
merged 1 commit into from Jun 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -970,7 +970,7 @@ public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
}

private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic) {
CompletableFuture<Optional<Topic>> topicFuture = futureWithDeadline();
CompletableFuture<Optional<Topic>> topicFuture = FutureUtil.futureWithDeadline(executor());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why need change futureWithDeadline() to FutureUtil.futureWithDeadline(executor())?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a utility method for Future so, it should be part of FutureUtil


if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) {
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -1233,8 +1233,16 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime())
- topicCreateTimeMs;
pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
addTopicToStatsMaps(topicName, persistentTopic);
topicFuture.complete(Optional.of(persistentTopic));
if (topicFuture.isCompletedExceptionally()) {
log.warn("{} future is already completed with failure {}, closing the topic",
topic, FutureUtil.getException(topicFuture));
persistentTopic.stopReplProducers().whenComplete((v, exception) -> {
topics.remove(topic, topicFuture);
});
} else {
addTopicToStatsMaps(topicName, persistentTopic);
topicFuture.complete(Optional.of(persistentTopic));
}
}).exceptionally((ex) -> {
log.warn(
"Replication or dedup check failed."
Expand Down
Expand Up @@ -18,11 +18,13 @@
*/
package org.apache.pulsar.broker.service;

import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

Expand All @@ -34,10 +36,12 @@
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.List;
import java.util.Optional;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -51,6 +55,7 @@
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
Expand All @@ -64,6 +69,7 @@
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
Expand Down Expand Up @@ -997,6 +1003,80 @@ public void testTopicReplicatedAndProducerCreate(String topicPrefix, String topi
nonPersistentProducer2.close();
}

@Test
public void testCleanupTopic() throws Exception {

final String cluster1 = pulsar1.getConfig().getClusterName();
final String cluster2 = pulsar2.getConfig().getClusterName();
final String namespace = "pulsar/ns-" + System.nanoTime();
final String topicName = "persistent://" + namespace + "/cleanTopic";
final String topicMlName = namespace + "/persistent/cleanTopic";
admin1.namespaces().createNamespace(namespace, Sets.newHashSet(cluster1, cluster2));

PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
.build();

long topicLoadTimeoutSeconds = 3;
config1.setTopicLoadTimeoutSeconds(topicLoadTimeoutSeconds);
config2.setTopicLoadTimeoutSeconds(topicLoadTimeoutSeconds);

ManagedLedgerFactoryImpl mlFactory = (ManagedLedgerFactoryImpl) pulsar1.getManagedLedgerClientFactory()
.getManagedLedgerFactory();
Field ledgersField = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
ledgersField.setAccessible(true);
ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>> ledgers = (ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>>) ledgersField
.get(mlFactory);
CompletableFuture<ManagedLedgerImpl> mlFuture = new CompletableFuture<>();
ledgers.put(topicMlName, mlFuture);

try {
Consumer<byte[]> consumer = client1.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared)
.subscriptionName("my-subscriber-name").subscribeAsync().get(100, TimeUnit.MILLISECONDS);
fail("consumer should fail due to topic loading failure");
} catch (Exception e) {
// Ok
}

CompletableFuture<Optional<Topic>> topicFuture = null;
for (int i = 0; i < 5; i++) {
topicFuture = pulsar1.getBrokerService().getTopics().get(topicName);
if (topicFuture != null) {
break;
}
Thread.sleep(i * 1000);
}

try {
topicFuture.get();
fail("topic creation should fail");
} catch (Exception e) {
// Ok
}

final CompletableFuture<Optional<Topic>> timedOutTopicFuture = topicFuture;
// timeout topic future should be removed from cache
retryStrategically((test) -> pulsar1.getBrokerService().getTopic(topicName, false) != timedOutTopicFuture, 5,
1000);

assertNotEquals(timedOutTopicFuture, pulsar1.getBrokerService().getTopics().get(topicName));

try {
Consumer<byte[]> consumer = client1.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared)
.subscriptionName("my-subscriber-name").subscribeAsync().get(100, TimeUnit.MILLISECONDS);
fail("consumer should fail due to topic loading failure");
} catch (Exception e) {
// Ok
}

ManagedLedgerImpl ml = (ManagedLedgerImpl) mlFactory.open(topicMlName + "-2");
mlFuture.complete(ml);

Consumer<byte[]> consumer = client1.newConsumer().topic(topicName).subscriptionName("my-subscriber-name")
.subscriptionType(SubscriptionType.Shared).subscribeAsync()
.get(2 * topicLoadTimeoutSeconds, TimeUnit.SECONDS);

consumer.close();
}
private static final Logger log = LoggerFactory.getLogger(ReplicatorTest.class);

}
Expand Up @@ -20,8 +20,10 @@

import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -162,4 +164,33 @@ public synchronized Throwable fillInStackTrace() {
return this;
}
}

public static <T> CompletableFuture<T> futureWithDeadline(ScheduledExecutorService executor, Long delay,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference between this and addTimeoutHandling? why we need add this method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is an existing method just moved to FutureUtil class. futureWithDeadline completes future with exception after the time whereas timeout handling something gets trigger once future is completed with exception.

TimeUnit unit, Exception exp) {
CompletableFuture<T> future = new CompletableFuture<T>();
executor.schedule(() -> {
if (!future.isDone()) {
future.completeExceptionally(exp);
}
}, delay, unit);
return future;
}

public static <T> CompletableFuture<T> futureWithDeadline(ScheduledExecutorService executor) {
return futureWithDeadline(executor, 60000L, TimeUnit.MILLISECONDS,
new TimeoutException("Future didn't finish within deadline"));
}

public static <T> Optional<Throwable> getException(CompletableFuture<T> future) {
if (future != null && future.isCompletedExceptionally()) {
try {
future.get();
} catch (InterruptedException e) {
return Optional.ofNullable(e);
} catch (ExecutionException e) {
return Optional.ofNullable(e.getCause());
}
}
return Optional.empty();
}
}