From eb2a105fb89a7ba3fdc64e6261f7670dc7cf8e4b Mon Sep 17 00:00:00 2001 From: JiangHaiting Date: Fri, 12 Nov 2021 12:26:54 +0800 Subject: [PATCH] Optimize isValidPath check in MetadataStore (#12663) ### Motivation 1. There is a logic error in original LocalMemoryMetadataStore#isValidPath with this code line 239 > return !path.equals("/") || !path.endsWith("/"); The "||" operator should be "&&", to forbids path ends with "/" 2. Further more, as we can implement other MetadataStore based on etcd or other external storage system. The path check function should be added to AbstractMetadataStore, and get checked before any operations. ### Modifications 1. Fixed the logic error in isValidPath and moved it to AbstractMetadataStore. 2. Check path before each operation in AbstractMetadataStore. 3. Add MetadataStoreException.InvalidPathException --- .../metadata/api/MetadataStoreException.java | 6 +++ .../metadata/impl/AbstractMetadataStore.java | 32 +++++++++++-- .../impl/LocalMemoryMetadataStore.java | 46 +++++++------------ .../LocalMemoryMetadataStoreTest.java | 31 +++++-------- 4 files changed, 62 insertions(+), 53 deletions(-) rename pulsar-metadata/src/test/java/org/apache/pulsar/metadata/{ => impl}/LocalMemoryMetadataStoreTest.java (75%) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java index 3ec46627ae1488..551c579a57fec3 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java @@ -146,6 +146,12 @@ public AlreadyClosedException(String msg) { } } + public static class InvalidPathException extends MetadataStoreException { + public InvalidPathException(String path) { + super("Path(" + path + ") is invalid"); + } + } + public static MetadataStoreException unwrap(Throwable t) { if (t instanceof MetadataStoreException) { return (MetadataStoreException) t; diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java index ad62d24c40670f..176531bedb585c 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java @@ -24,9 +24,7 @@ import com.github.benmanes.caffeine.cache.AsyncLoadingCache; import com.github.benmanes.caffeine.cache.Caffeine; import com.google.common.annotations.VisibleForTesting; - import io.netty.util.concurrent.DefaultThreadFactory; - import java.util.EnumSet; import java.util.List; import java.util.Optional; @@ -38,15 +36,14 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; - import java.util.stream.Collectors; - import lombok.Getter; import lombok.extern.slf4j.Slf4j; - +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.metadata.api.MetadataCache; import org.apache.pulsar.metadata.api.MetadataSerde; +import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.Notification; import org.apache.pulsar.metadata.api.NotificationType; import org.apache.pulsar.metadata.api.Stat; @@ -146,11 +143,17 @@ public MetadataCache getMetadataCache(MetadataSerde serde) { @Override public final CompletableFuture> getChildren(String path) { + if (!isValidPath(path)) { + return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path)); + } return childrenCache.get(path); } @Override public final CompletableFuture exists(String path) { + if (!isValidPath(path)) { + return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path)); + } return existsCache.get(path); } @@ -203,6 +206,9 @@ public void accept(Notification n) { @Override public final CompletableFuture delete(String path, Optional expectedVersion) { + if (!isValidPath(path)) { + return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path)); + } // Ensure caches are invalidated before the operation is confirmed return storeDelete(path, expectedVersion) .thenRun(() -> { @@ -239,6 +245,9 @@ protected abstract CompletableFuture storePut(String path, byte[] data, Op @Override public final CompletableFuture put(String path, byte[] data, Optional optExpectedVersion, EnumSet options) { + if (!isValidPath(path)) { + return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path)); + } // Ensure caches are invalidated before the operation is confirmed return storePut(path, data, optExpectedVersion, options) .thenApply(stat -> { @@ -306,4 +315,17 @@ protected static String parent(String path) { return path.substring(0, idx); } + + /** + * valid path in metadata store should be + * 1. not blank + * 2. starts with '/' + * 3. not ends with '/', except root path "/" + */ + static boolean isValidPath(String path) { + return StringUtils.equals(path, "/") + || StringUtils.isNotBlank(path) + && path.startsWith("/") + && !path.endsWith("/"); + } } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java index 740910daeedfd8..4de2a7b5ae345d 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java @@ -35,6 +35,7 @@ import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.common.concurrent.FutureUtils; +import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.metadata.api.GetResult; import org.apache.pulsar.metadata.api.MetadataStoreConfig; import org.apache.pulsar.metadata.api.MetadataStoreException; @@ -90,11 +91,10 @@ public LocalMemoryMetadataStore(String metadataURL, MetadataStoreConfig metadata @Override public CompletableFuture> get(String path) { + if (!isValidPath(path)) { + return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path)); + } synchronized (map) { - if (!isValidPath(path)) { - return FutureUtils.exception(new MetadataStoreException("")); - } - Value v = map.get(path); if (v != null) { return FutureUtils.value( @@ -109,11 +109,10 @@ public CompletableFuture> get(String path) { @Override public CompletableFuture> getChildrenFromStore(String path) { + if (!isValidPath(path)) { + return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path)); + } synchronized (map) { - if (!isValidPath(path)) { - return FutureUtils.exception(new MetadataStoreException("")); - } - String firstKey = path.equals("/") ? path : path + "/"; String lastKey = path.equals("/") ? "0" : path + "0"; // '0' is lexicographically just after '/' @@ -132,13 +131,12 @@ public CompletableFuture> getChildrenFromStore(String path) { @Override public CompletableFuture existsFromStore(String path) { + if (!isValidPath(path)) { + return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path)); + } synchronized (map) { - if (!isValidPath(path)) { - return FutureUtils.exception(new MetadataStoreException("")); - } - Value v = map.get(path); - return FutureUtils.value(v != null ? true : false); + return FutureUtils.value(v != null); } } @@ -150,11 +148,10 @@ public CompletableFuture put(String path, byte[] value, Optional exp @Override public CompletableFuture storePut(String path, byte[] data, Optional optExpectedVersion, EnumSet options) { + if (!isValidPath(path)) { + return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path)); + } synchronized (map) { - if (!isValidPath(path)) { - return FutureUtils.exception(new MetadataStoreException("")); - } - boolean hasVersion = optExpectedVersion.isPresent(); int expectedVersion = optExpectedVersion.orElse(-1L).intValue(); @@ -203,11 +200,10 @@ public CompletableFuture storePut(String path, byte[] data, Optional @Override public CompletableFuture storeDelete(String path, Optional optExpectedVersion) { + if (!isValidPath(path)) { + return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path)); + } synchronized (map) { - if (!isValidPath(path)) { - return FutureUtils.exception(new MetadataStoreException("")); - } - Value value = map.get(path); if (value == null) { return FutureUtils.exception(new NotFoundException("")); @@ -230,12 +226,4 @@ private void notifyParentChildrenChanged(String path) { parent = parent(parent); } } - - private static boolean isValidPath(String path) { - if (path == null || !path.startsWith("/")) { - return false; - } - - return !path.equals("/") || !path.endsWith("/"); - } } diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LocalMemoryMetadataStoreTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStoreTest.java similarity index 75% rename from pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LocalMemoryMetadataStoreTest.java rename to pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStoreTest.java index 0e7d1a2f27bff7..8e945529912d00 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LocalMemoryMetadataStoreTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStoreTest.java @@ -16,35 +16,17 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.metadata; +package org.apache.pulsar.metadata.impl; -import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; -import static org.testng.Assert.fail; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; import java.util.Optional; import java.util.UUID; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CompletionException; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; import lombok.Cleanup; -import org.apache.pulsar.metadata.api.GetResult; import org.apache.pulsar.metadata.api.MetadataStore; import org.apache.pulsar.metadata.api.MetadataStoreConfig; -import org.apache.pulsar.metadata.api.MetadataStoreException; -import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException; -import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException; import org.apache.pulsar.metadata.api.MetadataStoreFactory; -import org.apache.pulsar.metadata.api.Notification; -import org.apache.pulsar.metadata.api.NotificationType; -import org.apache.pulsar.metadata.api.Stat; import org.awaitility.Awaitility; import org.testng.annotations.Test; @@ -92,4 +74,15 @@ public void testSharedInstance() throws Exception { assertFalse(store1.exists("/test").join()); }); } + + @Test + public void testPathValid() { + assertFalse(AbstractMetadataStore.isValidPath(null)); + assertFalse(AbstractMetadataStore.isValidPath("")); + assertFalse(AbstractMetadataStore.isValidPath(" ")); + assertTrue(AbstractMetadataStore.isValidPath("/")); + assertTrue(AbstractMetadataStore.isValidPath("/test")); + assertFalse(AbstractMetadataStore.isValidPath("/test/")); + assertTrue(AbstractMetadataStore.isValidPath("/test/ABC")); + } }