Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] [broker] Part-2: Replicator can not created successfully due to an orphan replicator in the previous topic owner #21948

Merged
merged 7 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ protected void scheduleCheckTopicActiveAndStartProducer(final long waitTimeMs) {
}
startProducer();
}).exceptionally(ex -> {
log.warn("[{}] [{}] Stop retry to create producer due to unknown error(topic create failed), and"
log.error("[{}] [{}] Stop retry to create producer due to unknown error(topic create failed), and"
+ " trigger a terminate. Replicator state: {}",
localTopicName, replicatorId, STATE_UPDATER.get(this), ex);
terminate();
Expand Down Expand Up @@ -377,9 +377,13 @@ public CompletableFuture<Void> terminate() {
this.producer = null;
// set the cursor as inactive.
disableReplicatorRead();
// release resources.
doReleaseResources();
});
}

protected void doReleaseResources() {}

protected boolean tryChangeStatusToTerminating() {
if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Terminating)){
return true;
Expand Down Expand Up @@ -468,4 +472,8 @@ protected ImmutablePair<Boolean, State> compareSetAndGetState(State expect, Stat
}
return compareSetAndGetState(expect, update);
}

public boolean isTerminated() {
return state == State.Terminating || state == State.Terminated;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,6 @@ default Optional<DispatchRateLimiter> getRateLimiter() {
boolean isConnected();

long getNumberOfEntriesInBacklog();

boolean isTerminated();
}
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
long waitTimeMillis = readFailureBackoff.next();

if (exception instanceof CursorAlreadyClosedException) {
log.error("[{}] Error reading entries because replicator is"
log.warn("[{}] Error reading entries because replicator is"
+ " already deleted and cursor is already closed {}, ({})",
replicatorId, ctx, exception.getMessage(), exception);
// replicator is already deleted and cursor is already closed so, producer should also be disconnected.
Expand Down Expand Up @@ -570,7 +570,7 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) {
log.error("[{}] Failed to delete message at {}: {}", replicatorId, ctx,
exception.getMessage(), exception);
if (exception instanceof CursorAlreadyClosedException) {
log.error("[{}] Asynchronous ack failure because replicator is already deleted and cursor is already"
log.warn("[{}] Asynchronous ack failure because replicator is already deleted and cursor is already"
+ " closed {}, ({})", replicatorId, ctx, exception.getMessage(), exception);
// replicator is already deleted and cursor is already closed so, producer should also be disconnected.
terminate();
Expand Down Expand Up @@ -698,6 +698,11 @@ public boolean isConnected() {
return producer != null && producer.isConnected();
}

@Override
protected void doReleaseResources() {
dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
}

private static final Logger log = LoggerFactory.getLogger(PersistentReplicator.class);

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1731,6 +1731,7 @@ public CompletableFuture<Void> checkReplication() {
return deleteForcefully();
}

removeTerminatedReplicators(replicators);
List<CompletableFuture<Void>> futures = new ArrayList<>();

// Check for missing replicators
Expand Down Expand Up @@ -1769,6 +1770,8 @@ private CompletableFuture<Void> checkShadowReplication() {
if (log.isDebugEnabled()) {
log.debug("[{}] Checking shadow replication status, shadowTopics={}", topic, configuredShadowTopics);
}

removeTerminatedReplicators(shadowReplicators);
List<CompletableFuture<Void>> futures = new ArrayList<>();

// Check for missing replicators
Expand Down Expand Up @@ -1919,19 +1922,30 @@ protected CompletableFuture<Void> addReplicationCluster(String remoteCluster, Ma
if (replicationClient == null) {
return;
}
Replicator replicator = replicators.computeIfAbsent(remoteCluster, r -> {
try {
return new GeoPersistentReplicator(PersistentTopic.this, cursor, localCluster,
remoteCluster, brokerService, (PulsarClientImpl) replicationClient);
} catch (PulsarServerException e) {
log.error("[{}] Replicator startup failed {}", topic, remoteCluster, e);
lock.readLock().lock();
try {
if (isClosingOrDeleting) {
// Whether is "transferring" or not, do not create new replicator.
log.info("[{}] Skip to create replicator because this topic is closing."
+ " remote cluster: {}. State of transferring : {}",
topic, remoteCluster, transferring);
return;
}
return null;
});

// clean up replicator if startup is failed
if (replicator == null) {
replicators.removeNullValue(remoteCluster);
Replicator replicator = replicators.computeIfAbsent(remoteCluster, r -> {
try {
return new GeoPersistentReplicator(PersistentTopic.this, cursor, localCluster,
remoteCluster, brokerService, (PulsarClientImpl) replicationClient);
} catch (PulsarServerException e) {
log.error("[{}] Replicator startup failed {}", topic, remoteCluster, e);
}
return null;
});
// clean up replicator if startup is failed
if (replicator == null) {
replicators.removeNullValue(remoteCluster);
}
} finally {
lock.readLock().unlock();
}
});
}
Expand Down Expand Up @@ -3881,9 +3895,27 @@ private void fenceTopicToCloseOrDelete() {
}

private void unfenceTopicToResume() {
subscriptions.values().forEach(sub -> sub.resumeAfterFence());
isFenced = false;
isClosingOrDeleting = false;
subscriptions.values().forEach(sub -> sub.resumeAfterFence());
unfenceReplicatorsToResume();
}

private void unfenceReplicatorsToResume() {
checkReplication();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can lead to a loop
checkReplication -> fail to delete replicated topic -> unfenceTopicToResume ->checkReplication

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2024-04-25T14:57:46,573 - INFO  - [broker-topic-workers-OrderedExecutor-2-0:PersistentTopic] - Deleting topic [persistent://pulsar/global/removeClusterTest/__change_events] because local cluster is not part of  global namespace repl list [r2, r3]
2024-04-25T14:57:46,573 - INFO  - [ForkJoinPool.commonPool-worker-4:AbstractReplicator] - [persistent://pulsar/global/removeClusterTest/__change_events | r1-->r2] Skip current termination since other thread is doing termination, state : Terminated
2024-04-25T14:57:46,573 - INFO  - [ForkJoinPool.commonPool-worker-4:AbstractReplicator] - [persistent://pulsar/global/removeClusterTest/__change_events | r1-->r3] Skip current termination since other thread is doing termination, state : Terminated
2024-04-25T14:57:46,573 - ERROR - [broker-topic-workers-OrderedExecutor-2-0:PersistentTopic] - [persistent://pulsar/global/removeClusterTest/__change_events] Error deleting topic
org.apache.pulsar.broker.service.BrokerServiceException$PersistenceException: org.apache.bookkeeper.mledger.ManagedLedgerException: java.util.concurrent.CompletionException: org.apache.bookkeeper.mledger.ManagedLedgerException$CursorAlreadyClosedException: Cursor was already closed
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$6.deleteLedgerFailed(PersistentTopic.java:1496) ~[classes/:?]
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.lambda$asyncDelete$33(ManagedLedgerImpl.java:2950) ~[managed-ledger-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:887) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2357) ~[?:?]
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncDelete(ManagedLedgerImpl.java:2947) ~[managed-ledger-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$delete$40(PersistentTopic.java:1468) ~[classes/:?]
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:887) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2357) ~[?:?]
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$delete$41(PersistentTopic.java:1462) ~[classes/:?]
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:887) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2357) ~[?:?]
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$delete$42(PersistentTopic.java:1453) ~[classes/:?]
	at java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:718) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2179) ~[?:?]
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$delete$34(PersistentTopic.java:1431) ~[classes/:?]
	at java.base/java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:787) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482) ~[?:?]
	at org.apache.bookkeeper.common.util.SingleThreadExecutor.safeRunTask(SingleThreadExecutor.java:137) ~[bookkeeper-common-4.17.0.jar:4.17.0]
	at org.apache.bookkeeper.common.util.SingleThreadExecutor.run(SingleThreadExecutor.java:107) ~[bookkeeper-common-4.17.0.jar:4.17.0]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.108.Final.jar:4.1.108.Final]
	at java.base/java.lang.Thread.run(Thread.java:1570) [?:?]
Caused by: org.apache.bookkeeper.mledger.ManagedLedgerException: java.util.concurrent.CompletionException: org.apache.bookkeeper.mledger.ManagedLedgerException$CursorAlreadyClosedException: Cursor was already closed
Caused by: java.util.concurrent.CompletionException: org.apache.bookkeeper.mledger.ManagedLedgerException$CursorAlreadyClosedException: Cursor was already closed
	at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.andTree(CompletableFuture.java:1527) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.allOf(CompletableFuture.java:2451) ~[?:?]
	at org.apache.pulsar.common.util.FutureUtil.waitForAll(FutureUtil.java:56) ~[pulsar-common-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncTruncate(ManagedLedgerImpl.java:4341) ~[managed-ledger-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncDelete(ManagedLedgerImpl.java:2946) ~[managed-ledger-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
	... 19 more
Caused by: org.apache.bookkeeper.mledger.ManagedLedgerException$CursorAlreadyClosedException: Cursor was already closed
2024-04-25T14:57:46,573 - INFO  - [broker-topic-workers-OrderedExecutor-2-0:PersistentTopic] - Deleting topic [persistent://pulsar/global/removeClusterTest/__change_events] because local cluster is not part of  global namespace repl list [r2, r3]
2024-04-25T14:57:46,573 - INFO  - [ForkJoinPool.commonPool-worker-4:AbstractReplicator] - [persistent://pulsar/global/removeClusterTest/__change_events | r1-->r2] Skip current termination since other thread is doing termination, state : Terminated
2024-04-25T14:57:46,573 - INFO  - [broker-topic-workers-OrderedExecutor-2-0:BrokerService] - Successfully delete authentication policies for topic persistent://pulsar/global/removeClusterTest/__change_events
2024-04-25T14:57:46,573 - INFO  - [ForkJoinPool.commonPool-worker-4:AbstractReplicator] - [persistent://pulsar/global/removeClusterTest/__change_events | r1-->r3] Skip current termination since other thread is doing termination, state : Terminated
2024-04-25T14:57:46,573 - INFO  - [broker-topic-workers-OrderedExecutor-2-0:ManagedLedgerImpl] - pulsar/global/removeClusterTest/persistent/__change_events Moving to FencedForDeletion state
2024-04-25T14:57:46,573 - ERROR - [broker-topic-workers-OrderedExecutor-2-0:ManagedLedgerImpl] - [pulsar/global/removeClusterTest/persistent/__change_events] Error truncating ledger for deletion
java.util.concurrent.CompletionException: org.apache.bookkeeper.mledger.ManagedLedgerException$CursorAlreadyClosedException: Cursor was already closed
	at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.andTree(CompletableFuture.java:1527) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.allOf(CompletableFuture.java:2451) ~[?:?]
	at org.apache.pulsar.common.util.FutureUtil.waitForAll(FutureUtil.java:56) ~[pulsar-common-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncTruncate(ManagedLedgerImpl.java:4341) ~[managed-ledger-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncDelete(ManagedLedgerImpl.java:2946) ~[managed-ledger-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$delete$40(PersistentTopic.java:1468) ~[classes/:?]
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:887) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2357) ~[?:?]
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$delete$41(PersistentTopic.java:1462) ~[classes/:?]
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:887) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2357) ~[?:?]
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$delete$42(PersistentTopic.java:1453) ~[classes/:?]
	at java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:718) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2179) ~[?:?]
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$delete$34(PersistentTopic.java:1431) ~[classes/:?]
	at java.base/java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:787) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482) ~[?:?]
	at org.apache.bookkeeper.common.util.SingleThreadExecutor.safeRunTask(SingleThreadExecutor.java:137) ~[bookkeeper-common-4.17.0.jar:4.17.0]
	at org.apache.bookkeeper.common.util.SingleThreadExecutor.run(SingleThreadExecutor.java:107) ~[bookkeeper-common-4.17.0.jar:4.17.0]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.108.Final.jar:4.1.108.Final]
	at java.base/java.lang.Thread.run(Thread.java:1570) [?:?]
Caused by: org.apache.bookkeeper.mledger.ManagedLedgerException$CursorAlreadyClosedException: Cursor was already closed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this also means we need to think about how to skip deleting ledgers when the cursor is closed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is caused by the method topic.close is a reentrant method, the issue steps are like the following

  • reset namespace level policies -> checkReplication -> delete topic
  • unload namespace -> unload topic -> close topic

The method topic.close can be run even if the deleting task is in-progress, it is not correct, the PR #17524 will fix it, please review it again, thanks

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we define enum state for topic and reject/skip any invalid state transition (fail/skip fast)?

For example, delete should skip if closed, and vice-versa.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with you, we need more than one separate PRs to merge the states fenced, close, deleting, just like we discussed here

checkShadowReplication();
}

private void removeTerminatedReplicators(ConcurrentOpenHashMap<String, Replicator> replicators) {
Map<String, Replicator> terminatedReplicators = new HashMap<>();
replicators.forEach((cluster, replicator) -> {
if (replicator.isTerminated()) {
terminatedReplicators.put(cluster, replicator);
}
});
terminatedReplicators.entrySet().forEach(entry -> {
replicators.remove(entry.getKey(), entry.getValue());
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,21 @@

import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
import io.netty.util.concurrent.FastThreadLocalThread;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.Arrays;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand All @@ -48,6 +51,7 @@
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
Expand Down Expand Up @@ -486,4 +490,166 @@ public void testPartitionedTopicLevelReplicationRemoteConflictTopicExist() throw
admin1.topics().deletePartitionedTopic(topicName);
admin2.topics().deletePartitionedTopic(topicName);
}

/**
* See the description and execution flow: https://github.com/apache/pulsar/pull/21948.
* Steps:
* 1.Create topic, does not enable replication now.
* - The topic will be loaded in the memory.
* 2.Enable namespace level replication.
* - Broker creates a replicator, and the internal producer of replicator is starting.
* - We inject an error to make the internal producer fail to connect,after few seconds, it will retry to start.
* 3.Unload bundle.
* - Starting to close the topic.
* - The replicator will be closed, but it will not close the internal producer, because the producer has not
* been created successfully.
* - We inject a sleeping into the progress of closing the "repl.cursor" to make it stuck. So the topic is still
* in the process of being closed now.
* 4.Internal producer retry to connect.
* - At the next retry, it connected successful. Since the state of "repl.cursor" is not "Closed", this producer
* will not be closed now.
* 5.Topic closed.
* - Cancel the stuck of closing the "repl.cursor".
* - The topic is wholly closed.
* 6.Verify: the delayed created internal producer will be closed. In other words, there is no producer is connected
* to the remote cluster.
*/
@Test
public void testConcurrencyOfUnloadBundleAndRecreateProducer2() throws Exception {
final String namespaceName = defaultTenant + "/" + UUID.randomUUID().toString().replaceAll("-", "");
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespaceName + "/tp_");
// 1.Create topic, does not enable replication now.
admin1.namespaces().createNamespace(namespaceName);
admin2.namespaces().createNamespace(namespaceName);
admin1.topics().createNonPartitionedTopic(topicName);
PersistentTopic persistentTopic =
(PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get();

// We inject an error to make the internal producer fail to connect.
// The delay time of next retry to create producer is below:
// 0.1s, 0.2, 0.4, 0.8, 1.6s, 3.2s, 6.4s...
// If the retry counter is larger than 6, the next creation will be slow enough to close Replicator.
final AtomicInteger createProducerCounter = new AtomicInteger();
final int failTimes = 6;
injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> {
if (topicName.equals(producerCnf.getTopicName())) {
// There is a switch to determine create producer successfully or not.
if (createProducerCounter.incrementAndGet() > failTimes) {
return originalProducer;
}
log.info("Retry create replicator.producer count: {}", createProducerCounter);
// Release producer and fail callback.
originalProducer.closeAsync();
throw new RuntimeException("mock error");
}
return originalProducer;
});

// 2.Enable namespace level replication.
admin1.namespaces().setNamespaceReplicationClusters(namespaceName, Sets.newHashSet(cluster1, cluster2));
AtomicReference<PersistentReplicator> replicator = new AtomicReference<PersistentReplicator>();
Awaitility.await().untilAsserted(() -> {
assertFalse(persistentTopic.getReplicators().isEmpty());
replicator.set(
(PersistentReplicator) persistentTopic.getReplicators().values().iterator().next());
// Since we inject a producer creation error, the replicator can not start successfully.
assertFalse(replicator.get().isConnected());
});

// We inject a sleeping into the progress of closing the "repl.cursor" to make it stuck, until the internal
// producer of the replicator started.
SpyCursor spyCursor =
spyCursor(persistentTopic, "pulsar.repl." + pulsar2.getConfig().getClusterName());
CursorCloseSignal cursorCloseSignal = makeCursorClosingDelay(spyCursor);

// 3.Unload bundle: call "topic.close(false)".
// Stuck start new producer, until the state of replicator change to Stopped.
// The next once of "createProducerSuccessAfterFailTimes" to create producer will be successfully.
Awaitility.await().pollInterval(Duration.ofMillis(100)).atMost(Duration.ofSeconds(60)).untilAsserted(() -> {
assertTrue(createProducerCounter.get() >= failTimes);
});
CompletableFuture<Void> topicCloseFuture = persistentTopic.close(true);
Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> {
String state = String.valueOf(replicator.get().getState());
log.error("replicator state: {}", state);
assertTrue(state.equals("Disconnected") || state.equals("Terminated"));
});

// 5.Delay close cursor, until "replicator.producer" create successfully.
// The next once retry time of create "replicator.producer" will be 3.2s.
Thread.sleep(4 * 1000);
log.info("Replicator.state: {}", replicator.get().getState());
cursorCloseSignal.startClose();
cursorCloseSignal.startCallback();
// Wait for topic close successfully.
topicCloseFuture.join();

// 6. Verify there is no orphan producer on the remote cluster.
Awaitility.await().pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> {
PersistentTopic persistentTopic2 =
(PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get();
assertEquals(persistentTopic2.getProducers().size(), 0);
Assert.assertFalse(replicator.get().isConnected());
});

// cleanup.
cleanupTopics(namespaceName, () -> {
admin1.topics().delete(topicName);
admin2.topics().delete(topicName);
});
admin1.namespaces().setNamespaceReplicationClusters(namespaceName, Sets.newHashSet(cluster1));
admin1.namespaces().deleteNamespace(namespaceName);
admin2.namespaces().deleteNamespace(namespaceName);
}

@Test
public void testUnFenceTopicToReuse() throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp");
// Wait for replicator started.
Producer<String> producer1 = client1.newProducer(Schema.STRING).topic(topicName).create();
waitReplicatorStarted(topicName);

// Inject an error to make topic close fails.
final String mockProducerName = UUID.randomUUID().toString();
final org.apache.pulsar.broker.service.Producer mockProducer =
mock(org.apache.pulsar.broker.service.Producer.class);
doAnswer(invocation -> CompletableFuture.failedFuture(new RuntimeException("mocked error")))
.when(mockProducer).disconnect(any());
doAnswer(invocation -> CompletableFuture.failedFuture(new RuntimeException("mocked error")))
.when(mockProducer).disconnect();
PersistentTopic persistentTopic =
(PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get();
persistentTopic.getProducers().put(mockProducerName, mockProducer);

// Do close.
GeoPersistentReplicator replicator1 =
(GeoPersistentReplicator) persistentTopic.getReplicators().values().iterator().next();
try {
persistentTopic.close(true, false).join();
fail("Expected close fails due to a producer close fails");
} catch (Exception ex) {
log.info("Expected error: {}", ex.getMessage());
}

// Broker will call `topic.unfenceTopicToResume` if close clients fails.
// Verify: the replicator will be re-created.
Awaitility.await().untilAsserted(() -> {
assertTrue(producer1.isConnected());
GeoPersistentReplicator replicator2 =
(GeoPersistentReplicator) persistentTopic.getReplicators().values().iterator().next();
assertNotEquals(replicator1, replicator2);
assertFalse(replicator1.isConnected());
assertFalse(replicator1.producer != null && replicator1.producer.isConnected());
assertTrue(replicator2.isConnected());
assertTrue(replicator2.producer != null && replicator2.producer.isConnected());
});

// cleanup.
persistentTopic.getProducers().remove(mockProducerName, mockProducer);
producer1.close();
cleanupTopics(() -> {
admin1.topics().delete(topicName);
admin2.topics().delete(topicName);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,16 @@ protected void createDefaultTenantsAndClustersAndNamespace() throws Exception {
}

protected void cleanupTopics(CleanupTopicAction cleanupTopicAction) throws Exception {
waitChangeEventsInit(replicatedNamespace);
admin1.namespaces().setNamespaceReplicationClusters(replicatedNamespace, Collections.singleton(cluster1));
admin1.namespaces().unload(replicatedNamespace);
cleanupTopics(replicatedNamespace, cleanupTopicAction);
}

protected void cleanupTopics(String namespace, CleanupTopicAction cleanupTopicAction) throws Exception {
waitChangeEventsInit(namespace);
admin1.namespaces().setNamespaceReplicationClusters(namespace, Collections.singleton(cluster1));
admin1.namespaces().unload(namespace);
cleanupTopicAction.run();
admin1.namespaces().setNamespaceReplicationClusters(replicatedNamespace, Sets.newHashSet(cluster1, cluster2));
waitChangeEventsInit(replicatedNamespace);
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet(cluster1, cluster2));
waitChangeEventsInit(namespace);
}

protected void waitChangeEventsInit(String namespace) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public Object[][] partitionedTopicProvider() {
return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
}

@Test
@Test(priority = Integer.MAX_VALUE)
public void testConfigChange() throws Exception {
log.info("--- Starting ReplicatorTest::testConfigChange ---");
// This test is to verify that the config change on global namespace is successfully applied in broker during
Expand Down