From 43ded5927fb78add69196b46c1edc36bde77af0e Mon Sep 17 00:00:00 2001 From: Zhanpeng Wu Date: Tue, 17 Aug 2021 13:05:31 +0800 Subject: [PATCH] Fix race condition in concurrent schema deletion (#11606) This PR fixes #11605 ### Motivation Concurrently deleting topics with the same schema may cause race condition in broker side. If we do not handle these scenarios correctly we will get unexpected exceptions in broker logs. ### Modifications 1. Add existence checks before schema deletion in `AbstractTopic#deleteSchema`. 2. Add existence checks before actually performing schema storage deletion in `BookkeeperSchemaStorage#deleteSchema`. 3. Ignore `NoNodeException` in `BookkeeperSchemaStorage#deleteSchema`. --- .../pulsar/broker/service/AbstractTopic.java | 3 +- .../schema/BookkeeperSchemaStorage.java | 52 ++++++++++++++++--- .../service/PersistentTopicE2ETest.java | 27 ++++++++++ 3 files changed, 73 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index d388d515d8c7e..4080d5bbec4c3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -46,6 +46,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException.ProducerBusyException; import org.apache.pulsar.broker.service.BrokerServiceException.ProducerFencedException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException; +import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage; import org.apache.pulsar.broker.service.schema.SchemaRegistryService; import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException; import org.apache.pulsar.broker.stats.prometheus.metrics.Summary; @@ -371,7 +372,7 @@ public CompletableFuture deleteSchema() { String base = TopicName.get(getName()).getPartitionedTopicName(); String id = TopicName.get(base).getSchemaName(); SchemaRegistryService schemaRegistryService = brokerService.pulsar().getSchemaRegistryService(); - return schemaRegistryService.getSchema(id) + return BookkeeperSchemaStorage.ignoreUnrecoverableBKException(schemaRegistryService.getSchema(id)) .thenCompose(schema -> { if (schema != null) { // It's different from `SchemasResource.deleteSchema` diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java index df3f1b8138f57..3fa8e0259dd92 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java @@ -37,6 +37,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; @@ -58,6 +59,7 @@ import org.apache.pulsar.metadata.api.MetadataCache; import org.apache.pulsar.metadata.api.MetadataSerde; import org.apache.pulsar.metadata.api.MetadataStore; +import org.apache.pulsar.metadata.api.MetadataStoreException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -174,7 +176,12 @@ BookKeeper getBookKeeper() { @Override public CompletableFuture delete(String key, boolean forcefully) { - return deleteSchema(key, forcefully).thenApply(LongSchemaVersion::new); + return deleteSchema(key, forcefully).thenApply(version -> { + if (version == null) { + return null; + } + return new LongSchemaVersion(version); + }); } @Override @@ -369,10 +376,10 @@ private CompletableFuture createNewSchema(String schemaId, byte[] data, by } @NotNull - private CompletableFuture deleteSchema(String schemaId, boolean forceFully) { - return (forceFully ? CompletableFuture.completedFuture(null) : getSchema(schemaId)) - .thenCompose(schemaAndVersion -> { - if (!forceFully && isNull(schemaAndVersion)) { + private CompletableFuture deleteSchema(String schemaId, boolean forcefully) { + return (forcefully ? CompletableFuture.completedFuture(null) + : ignoreUnrecoverableBKException(getSchema(schemaId))).thenCompose(schemaAndVersion -> { + if (!forcefully && isNull(schemaAndVersion)) { return completedFuture(null); } else { // The version is only for the compatibility of the current interface @@ -405,9 +412,20 @@ private CompletableFuture deleteSchema(String schemaId, boolean forceFully store.delete(path, Optional.empty()) .thenRun(() -> { future.complete(version); - }).exceptionally(ex1 -> { - future.completeExceptionally(ex1); - return null; + }).exceptionally(zkException -> { + if (zkException.getCause() + instanceof MetadataStoreException.NotFoundException) { + // The znode has been deleted by others. + // In some cases, the program may enter this logic. + // Since the znode is gone, we don’t need to deal with it. + if (log.isDebugEnabled()) { + log.debug("No node for schema path: {}", path); + } + future.complete(null); + } else { + future.completeExceptionally(zkException); + } + return null; }); }); } @@ -681,4 +699,22 @@ public static Exception bkException(String operation, int rc, long ledgerId, lon && rc != BKException.Code.NoSuchEntryException; return new SchemaException(recoverable, message); } + + public static CompletableFuture ignoreUnrecoverableBKException(CompletableFuture source) { + return source.exceptionally(t -> { + if (t.getCause() != null + && (t.getCause() instanceof SchemaException) + && !((SchemaException) t.getCause()).isRecoverable()) { + // Meeting NoSuchLedgerExistsException or NoSuchEntryException when reading schemas in + // bookkeeper. This also means that the data has already been deleted by other operations + // in deleting schema. + if (log.isDebugEnabled()) { + log.debug("Schema data in bookkeeper may be deleted by other operations.", t); + } + return null; + } + // rethrow other cases + throw t instanceof CompletionException ? (CompletionException) t : new CompletionException(t); + }); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java index 576adec09996f..881fe266a004d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java @@ -81,6 +81,7 @@ import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.stats.Metrics; import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; +import org.apache.pulsar.schema.Schemas; import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; @@ -731,6 +732,32 @@ public void testDeleteSchema() throws Exception { assertFalse(topicHasSchema(topicName)); } + @Test + public void testConcurrentlyDeleteSchema() throws Exception { + String topic = "persistent://prop/ns-delete-schema/concurrently-delete-schema-test"; + int partitions = 50; + admin.namespaces().createNamespace("prop/ns-delete-schema", 3); + admin.topics().createPartitionedTopic(topic, partitions); + + Producer producer = pulsarClient + .newProducer(Schema.JSON(Schemas.BytesRecord.class)) + .topic(topic) + .create(); + producer.close(); + + CompletableFuture[] asyncFutures = new CompletableFuture[partitions]; + for (int i = 0; i < partitions; i++) { + asyncFutures[i] = getTopic(TopicName.get(topic).getPartition(i).toString()).get().deleteSchema(); + } + + try { + // delete the schema concurrently, and wait for the end of all operations + CompletableFuture.allOf(asyncFutures).join(); + } catch (Exception e) { + fail("Should not fail"); + } + } + /** * A topic that has retention policy set to non-0, should not be GCed until it has been inactive for at least the * retention time.