Skip to content

Commit

Permalink
[Broker] Fix Future.join() causing deadlock. (#14469)
Browse files Browse the repository at this point in the history
Master issue #14438

### Motivation

Invoking the ``join()`` method in the async method will cause some deadlock.

### Modifications

- Refactor ``PersistentTopic#tryToDeletePartitionedMetadata`` to pure async.

(cherry picked from commit 65318e8)
  • Loading branch information
mattisonchao authored and gaoran10 committed Mar 1, 2022
1 parent 5047498 commit d87a230
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2278,42 +2278,48 @@ private CompletableFuture<Void> tryToDeletePartitionedMetadata() {
return CompletableFuture.completedFuture(null);
}
TopicName topicName = TopicName.get(TopicName.get(topic).getPartitionedTopicName());
try {
PartitionedTopicResources partitionedTopicResources = getBrokerService().pulsar().getPulsarResources()
.getNamespaceResources()
.getPartitionedTopicResources();
if (topicName.isPartitioned() && !partitionedTopicResources.partitionedTopicExists(topicName)) {
return CompletableFuture.completedFuture(null);
}
CompletableFuture<Void> deleteMetadataFuture = new CompletableFuture<>();
getBrokerService().fetchPartitionedTopicMetadataAsync(TopicName.get(topicName.getPartitionedTopicName()))
.thenAccept((metadata -> {
// make sure all sub partitions were deleted
for (int i = 0; i < metadata.partitions; i++) {
if (brokerService.getPulsar().getPulsarResources().getTopicResources()
.persistentTopicExists(topicName.getPartition(i)).join()) {
throw new UnsupportedOperationException();
}
}
}))
.thenAccept((res) -> partitionedTopicResources.deletePartitionedTopicAsync(topicName)
.thenAccept((r) -> {
deleteMetadataFuture.complete(null);
}).exceptionally(ex -> {
deleteMetadataFuture.completeExceptionally(ex.getCause());
return null;
}))
.exceptionally((e) -> {
if (!(e.getCause() instanceof UnsupportedOperationException)) {
log.error("delete metadata fail", e);
}
deleteMetadataFuture.complete(null);
return null;
});
return deleteMetadataFuture;
} catch (Exception e) {
return FutureUtil.failedFuture(e);
}
PartitionedTopicResources partitionedTopicResources = getBrokerService().pulsar().getPulsarResources()
.getNamespaceResources()
.getPartitionedTopicResources();
return partitionedTopicResources.partitionedTopicExistsAsync(topicName)
.thenCompose(partitionedTopicExist -> {
if (!partitionedTopicExist) {
return CompletableFuture.completedFuture(null);
} else {
return getBrokerService()
.fetchPartitionedTopicMetadataAsync(topicName)
.thenCompose((metadata -> {
List<CompletableFuture<Boolean>> persistentTopicExists =
new ArrayList<>(metadata.partitions);
for (int i = 0; i < metadata.partitions; i++) {
persistentTopicExists.add(brokerService.getPulsar()
.getPulsarResources().getTopicResources()
.persistentTopicExists(topicName.getPartition(i)));
}
List<CompletableFuture<Boolean>> unmodifiablePersistentTopicExists =
Collections.unmodifiableList(persistentTopicExists);
return FutureUtil.waitForAll(unmodifiablePersistentTopicExists)
.thenCompose(unused -> {
// make sure all sub partitions were deleted after all future complete
Optional<Boolean> anyExistPartition = unmodifiablePersistentTopicExists
.stream()
.map(CompletableFuture::join)
.filter(topicExist -> topicExist)
.findAny();
if (anyExistPartition.isPresent()) {
log.error("[{}] Delete topic metadata failed because"
+ " another partition exist.", topicName);
throw new UnsupportedOperationException(
String.format("Another partition exists for [%s].",
topicName));
} else {
return partitionedTopicResources
.deletePartitionedTopicAsync(topicName);
}
});
}));
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
@BeforeMethod(alwaysRun = true)
@Override
protected void setup() throws Exception {
conf.setBrokerDeleteInactivePartitionedTopicMetadataEnabled(true);
super.baseSetup();
}

Expand Down Expand Up @@ -617,8 +618,27 @@ public void testGC() throws Exception {

runGC();
assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
}

// write again, the topic will be available
Producer<byte[]> producer2 = pulsarClient.newProducer().topic(topicName).create();
producer2.close();

assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent());

// 6. Test for partitioned topic to delete the partitioned metadata
String topicGc = "persistent://prop/ns-abc/topic-gc";
int partitions = 5;
admin.topics().createPartitionedTopic(topicGc, partitions);
Producer<byte[]> producer3 = pulsarClient.newProducer().topic(topicGc).create();
producer3.close();
assertEquals(partitions, pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(
TopicName.get(topicGc)).join().partitions);
runGC();
Awaitility.await().untilAsserted(()-> {
assertEquals(pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(
TopicName.get(topicGc)).join().partitions, 0);
});
}
@Data
@ToString
@EqualsAndHashCode
Expand Down

0 comments on commit d87a230

Please sign in to comment.