Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] [broker] Part-2: Replicator can not created successfully due to an orphan replicator in the previous topic owner #21948

Merged
merged 7 commits into from Apr 24, 2024

Conversation

poorbarcode
Copy link
Contributor

@poorbarcode poorbarcode commented Jan 22, 2024

Motivation

There is a race condition that makes an orphan replicator in the original owner of a topic, and causes the new owner of the topic can not start a replicator due to org.apache.pulsar.broker.service.BrokerServiceException$NamingException Producer with name 'pulsar.repl.{local_cluster}-->{remote_cluster}' is already connected to topic.

Scenario 1

  • Thread-1: start/restart the producer of the replicator.
  • Thread-2: unloading bundles.

Scenario 2

  • Thread-1: start a new replicator after updated replication_clusters.
  • Thread-2: unloading bundles.

After we solved the scenario 1 by #21946, the current PR is focusing on the scenario 2:

Current PR is focusing on Scenario 2.

Steps of Scenario 2

time thread enable replication thread unload bundle
1 Enabled replication
2 Mark topic as closing
3 Skip replicator.disconnect() because topic.replicators is empty
4 Initialize cursor pulsar.repl
5 Start producer
6 Set replicator.stat --> Starting
7 Create producer success and set replicator.stat --> Started
8 Trigger a readMoreEntries, since there is no entries to read, just pending this request
9 Close cursor pulsar.repl
10 Close managed ledger
11 An orphan replicator is there, and the next topic owner could not start a replicator due to Producer with name 'pulsar.repl.{local_cluster}-->{remote_cluster}' is already connected to topic

Since the scenario is too complex, I can not add a test.

TODO: reproduce the Scenario 2 locally.

Modifications

  • call replicators.disconnect after the managed ledger is closed. It would prevent the new cursor(pulsar.dedup) from being created.
  • topic.close will be done after replicators.disconnect, it can avoid the new replicator on the next owner broker of the topic failing due to creating an internal producer failed org.apache.pulsar.broker.service.BrokerServiceException$NamingException Producer with name 'pulsar.repl.{local_cluster}-->{remote_cluster}' is already connected to topic.
  • After [fix] [broker] Replication stopped due to unload topic failed #21947 the operation replicator.producer.close will no longer fail.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: x

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Jan 22, 2024
@poorbarcode poorbarcode self-assigned this Jan 24, 2024
@poorbarcode poorbarcode added type/bug The PR fixed a bug or issue reported a bug release/2.10.6 category/reliability The function does not work properly in certain specific environments or failures. e.g. data lost release/3.0.3 release/2.11.4 release/3.1.3 labels Jan 24, 2024
@poorbarcode poorbarcode added this to the 3.3.0 milestone Jan 24, 2024
@Technoboy- Technoboy- closed this Jan 25, 2024
@Technoboy- Technoboy- reopened this Jan 25, 2024
Copy link
Contributor

@codelipenghui codelipenghui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we can also fix the issue by involving a read lock to the addReplicationCluster() method in PersistentTopic.java without changing the close topic process. The replicator.disconnect() method will also call the managed cursor API to change the cursor state which doesn't make sense to close the managed ledger before closing the replicator

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 9baafcb2e9..db6910eb51 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1918,11 +1918,16 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                         return;
                     }
                     Replicator replicator = replicators.computeIfAbsent(remoteCluster, r -> {
+                        lock.readLock().lock();
                         try {
-                            return new GeoPersistentReplicator(PersistentTopic.this, cursor, localCluster,
-                                    remoteCluster, brokerService, (PulsarClientImpl) replicationClient);
+                            if (!isClosingOrDeleting) {
+                                return new GeoPersistentReplicator(PersistentTopic.this, cursor, localCluster,
+                                        remoteCluster, brokerService, (PulsarClientImpl) replicationClient);
+                            }
                         } catch (PulsarServerException e) {
                             log.error("[{}] Replicator startup failed {}", topic, remoteCluster, e);
+                        } finally {
+                            lock.readLock().unlock();
                         }
                         return null;
                     });

@poorbarcode
Copy link
Contributor Author

Rebase master

@poorbarcode
Copy link
Contributor Author

@codelipenghui

It looks like we can also fix the issue by involving a read lock to the addReplicationCluster() method in PersistentTopic.java without changing the close topic process. The replicator.disconnect() method will also call the managed cursor API to change the cursor state which doesn't make sense to close the managed ledger before closing the replicator

Good suggestion, already improved the code.

@codecov-commenter
Copy link

Codecov Report

Attention: Patch coverage is 67.56757% with 12 lines in your changes are missing coverage. Please review.

Project coverage is 73.95%. Comparing base (bbc6224) to head (3d389b1).
Report is 186 commits behind head on master.

Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #21948      +/-   ##
============================================
+ Coverage     73.57%   73.95%   +0.38%     
+ Complexity    32624     2744   -29880     
============================================
  Files          1877     1885       +8     
  Lines        139502   140512    +1010     
  Branches      15299    15450     +151     
============================================
+ Hits         102638   103916    +1278     
+ Misses        28908    28563     -345     
- Partials       7956     8033      +77     
Flag Coverage Δ
inttests 26.98% <21.62%> (+2.39%) ⬆️
systests 24.47% <10.81%> (+0.15%) ⬆️
unittests 73.24% <67.56%> (+0.39%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files Coverage Δ
...a/org/apache/pulsar/broker/service/Replicator.java 0.00% <ø> (ø)
...roker/service/persistent/PersistentReplicator.java 66.21% <75.00%> (-2.66%) ⬇️
...ache/pulsar/broker/service/AbstractReplicator.java 62.06% <50.00%> (-22.94%) ⬇️
...sar/broker/service/persistent/PersistentTopic.java 78.46% <68.96%> (+<0.01%) ⬆️

... and 262 files with indirect coverage changes

@poorbarcode poorbarcode merged commit b774666 into apache:master Apr 24, 2024
50 checks passed
@lhotari
Copy link
Member

lhotari commented Apr 25, 2024

@poorbarcode I wonder if these changes somehow cause OOME problems with org.apache.pulsar.broker.service.ReplicatorSubscriptionTest#testWriteMarkerTaskOfReplicateSubscriptions, please see
#22586 . Thanks

@lhotari
Copy link
Member

lhotari commented Apr 25, 2024

@poorbarcode I can confirm that this PR is causing problems which most likely lead to OOMEs. Reported as #22588.
To reproduce:

git clone https://github.com/apache/pulsar
cd pulsar
# checkout the merge commit for #21948
git checkout b774666
mvn -Pcore-modules,-main -T 1C clean install -DskipTests -Dspotbugs.skip=true -Dcheckstyle.skip=true
mvn -DredirectTestOutputToFile=false -DtestRetryCount=0 test -pl pulsar-broker "-Dtest=org.apache.pulsar.broker.service.ReplicatorGlobalNSTest#testRemoveLocalClusterOnGlobalNamespace" | tee test_$(git rev-parse --short HEAD).log

then checkout d475655 which is the commit before b774666 (#21948). you can see that the behavior is very different:

git checkout d475655
mvn -Pcore-modules,-main -T 1C clean install -DskipTests -Dspotbugs.skip=true -Dcheckstyle.skip=true
mvn -DredirectTestOutputToFile=false -DtestRetryCount=0 test -pl pulsar-broker "-Dtest=org.apache.pulsar.broker.service.ReplicatorGlobalNSTest#testRemoveLocalClusterOnGlobalNamespace" | tee test_$(git rev-parse --short HEAD).log

}

private void unfenceReplicatorsToResume() {
checkReplication();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can lead to a loop
checkReplication -> fail to delete replicated topic -> unfenceTopicToResume ->checkReplication

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2024-04-25T14:57:46,573 - INFO  - [broker-topic-workers-OrderedExecutor-2-0:PersistentTopic] - Deleting topic [persistent://pulsar/global/removeClusterTest/__change_events] because local cluster is not part of  global namespace repl list [r2, r3]
2024-04-25T14:57:46,573 - INFO  - [ForkJoinPool.commonPool-worker-4:AbstractReplicator] - [persistent://pulsar/global/removeClusterTest/__change_events | r1-->r2] Skip current termination since other thread is doing termination, state : Terminated
2024-04-25T14:57:46,573 - INFO  - [ForkJoinPool.commonPool-worker-4:AbstractReplicator] - [persistent://pulsar/global/removeClusterTest/__change_events | r1-->r3] Skip current termination since other thread is doing termination, state : Terminated
2024-04-25T14:57:46,573 - ERROR - [broker-topic-workers-OrderedExecutor-2-0:PersistentTopic] - [persistent://pulsar/global/removeClusterTest/__change_events] Error deleting topic
org.apache.pulsar.broker.service.BrokerServiceException$PersistenceException: org.apache.bookkeeper.mledger.ManagedLedgerException: java.util.concurrent.CompletionException: org.apache.bookkeeper.mledger.ManagedLedgerException$CursorAlreadyClosedException: Cursor was already closed
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$6.deleteLedgerFailed(PersistentTopic.java:1496) ~[classes/:?]
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.lambda$asyncDelete$33(ManagedLedgerImpl.java:2950) ~[managed-ledger-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:887) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2357) ~[?:?]
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncDelete(ManagedLedgerImpl.java:2947) ~[managed-ledger-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$delete$40(PersistentTopic.java:1468) ~[classes/:?]
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:887) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2357) ~[?:?]
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$delete$41(PersistentTopic.java:1462) ~[classes/:?]
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:887) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2357) ~[?:?]
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$delete$42(PersistentTopic.java:1453) ~[classes/:?]
	at java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:718) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2179) ~[?:?]
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$delete$34(PersistentTopic.java:1431) ~[classes/:?]
	at java.base/java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:787) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482) ~[?:?]
	at org.apache.bookkeeper.common.util.SingleThreadExecutor.safeRunTask(SingleThreadExecutor.java:137) ~[bookkeeper-common-4.17.0.jar:4.17.0]
	at org.apache.bookkeeper.common.util.SingleThreadExecutor.run(SingleThreadExecutor.java:107) ~[bookkeeper-common-4.17.0.jar:4.17.0]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.108.Final.jar:4.1.108.Final]
	at java.base/java.lang.Thread.run(Thread.java:1570) [?:?]
Caused by: org.apache.bookkeeper.mledger.ManagedLedgerException: java.util.concurrent.CompletionException: org.apache.bookkeeper.mledger.ManagedLedgerException$CursorAlreadyClosedException: Cursor was already closed
Caused by: java.util.concurrent.CompletionException: org.apache.bookkeeper.mledger.ManagedLedgerException$CursorAlreadyClosedException: Cursor was already closed
	at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.andTree(CompletableFuture.java:1527) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.allOf(CompletableFuture.java:2451) ~[?:?]
	at org.apache.pulsar.common.util.FutureUtil.waitForAll(FutureUtil.java:56) ~[pulsar-common-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncTruncate(ManagedLedgerImpl.java:4341) ~[managed-ledger-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncDelete(ManagedLedgerImpl.java:2946) ~[managed-ledger-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
	... 19 more
Caused by: org.apache.bookkeeper.mledger.ManagedLedgerException$CursorAlreadyClosedException: Cursor was already closed
2024-04-25T14:57:46,573 - INFO  - [broker-topic-workers-OrderedExecutor-2-0:PersistentTopic] - Deleting topic [persistent://pulsar/global/removeClusterTest/__change_events] because local cluster is not part of  global namespace repl list [r2, r3]
2024-04-25T14:57:46,573 - INFO  - [ForkJoinPool.commonPool-worker-4:AbstractReplicator] - [persistent://pulsar/global/removeClusterTest/__change_events | r1-->r2] Skip current termination since other thread is doing termination, state : Terminated
2024-04-25T14:57:46,573 - INFO  - [broker-topic-workers-OrderedExecutor-2-0:BrokerService] - Successfully delete authentication policies for topic persistent://pulsar/global/removeClusterTest/__change_events
2024-04-25T14:57:46,573 - INFO  - [ForkJoinPool.commonPool-worker-4:AbstractReplicator] - [persistent://pulsar/global/removeClusterTest/__change_events | r1-->r3] Skip current termination since other thread is doing termination, state : Terminated
2024-04-25T14:57:46,573 - INFO  - [broker-topic-workers-OrderedExecutor-2-0:ManagedLedgerImpl] - pulsar/global/removeClusterTest/persistent/__change_events Moving to FencedForDeletion state
2024-04-25T14:57:46,573 - ERROR - [broker-topic-workers-OrderedExecutor-2-0:ManagedLedgerImpl] - [pulsar/global/removeClusterTest/persistent/__change_events] Error truncating ledger for deletion
java.util.concurrent.CompletionException: org.apache.bookkeeper.mledger.ManagedLedgerException$CursorAlreadyClosedException: Cursor was already closed
	at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.andTree(CompletableFuture.java:1527) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.allOf(CompletableFuture.java:2451) ~[?:?]
	at org.apache.pulsar.common.util.FutureUtil.waitForAll(FutureUtil.java:56) ~[pulsar-common-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncTruncate(ManagedLedgerImpl.java:4341) ~[managed-ledger-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncDelete(ManagedLedgerImpl.java:2946) ~[managed-ledger-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$delete$40(PersistentTopic.java:1468) ~[classes/:?]
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:887) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2357) ~[?:?]
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$delete$41(PersistentTopic.java:1462) ~[classes/:?]
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:887) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2357) ~[?:?]
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$delete$42(PersistentTopic.java:1453) ~[classes/:?]
	at java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:718) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2179) ~[?:?]
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$delete$34(PersistentTopic.java:1431) ~[classes/:?]
	at java.base/java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:787) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482) ~[?:?]
	at org.apache.bookkeeper.common.util.SingleThreadExecutor.safeRunTask(SingleThreadExecutor.java:137) ~[bookkeeper-common-4.17.0.jar:4.17.0]
	at org.apache.bookkeeper.common.util.SingleThreadExecutor.run(SingleThreadExecutor.java:107) ~[bookkeeper-common-4.17.0.jar:4.17.0]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.108.Final.jar:4.1.108.Final]
	at java.base/java.lang.Thread.run(Thread.java:1570) [?:?]
Caused by: org.apache.bookkeeper.mledger.ManagedLedgerException$CursorAlreadyClosedException: Cursor was already closed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this also means we need to think about how to skip deleting ledgers when the cursor is closed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is caused by the method topic.close is a reentrant method, the issue steps are like the following

  • reset namespace level policies -> checkReplication -> delete topic
  • unload namespace -> unload topic -> close topic

The method topic.close can be run even if the deleting task is in-progress, it is not correct, the PR #17524 will fix it, please review it again, thanks

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we define enum state for topic and reject/skip any invalid state transition (fail/skip fast)?

For example, delete should skip if closed, and vice-versa.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with you, we need more than one separate PRs to merge the states fenced, close, deleting, just like we discussed here

poorbarcode added a commit that referenced this pull request May 7, 2024
… an orphan replicator in the previous topic owner (#21948)

(cherry picked from commit b774666)
poorbarcode added a commit that referenced this pull request May 7, 2024
… an orphan replicator in the previous topic owner (#21948)

(cherry picked from commit b774666)
nikhil-ctds pushed a commit to datastax/pulsar that referenced this pull request May 13, 2024
… an orphan replicator in the previous topic owner (apache#21948)

(cherry picked from commit b774666)
(cherry picked from commit 6038bbf)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request May 16, 2024
… an orphan replicator in the previous topic owner (apache#21948)

(cherry picked from commit b774666)
(cherry picked from commit 6038bbf)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
category/reliability The function does not work properly in certain specific environments or failures. e.g. data lost cherry-picked/branch-3.0 cherry-picked/branch-3.2 doc-not-needed Your PR changes do not impact docs ready-to-test release/2.11.4 release/3.0.3 release/3.2.3 type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants