diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java index 3ec46627ae1488..551c579a57fec3 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java @@ -146,6 +146,12 @@ public AlreadyClosedException(String msg) { } } + public static class InvalidPathException extends MetadataStoreException { + public InvalidPathException(String path) { + super("Path(" + path + ") is invalid"); + } + } + public static MetadataStoreException unwrap(Throwable t) { if (t instanceof MetadataStoreException) { return (MetadataStoreException) t; diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java index ad62d24c40670f..176531bedb585c 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java @@ -24,9 +24,7 @@ import com.github.benmanes.caffeine.cache.AsyncLoadingCache; import com.github.benmanes.caffeine.cache.Caffeine; import com.google.common.annotations.VisibleForTesting; - import io.netty.util.concurrent.DefaultThreadFactory; - import java.util.EnumSet; import java.util.List; import java.util.Optional; @@ -38,15 +36,14 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; - import java.util.stream.Collectors; - import lombok.Getter; import lombok.extern.slf4j.Slf4j; - +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.metadata.api.MetadataCache; import org.apache.pulsar.metadata.api.MetadataSerde; +import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.Notification; import org.apache.pulsar.metadata.api.NotificationType; import org.apache.pulsar.metadata.api.Stat; @@ -146,11 +143,17 @@ public MetadataCache getMetadataCache(MetadataSerde serde) { @Override public final CompletableFuture> getChildren(String path) { + if (!isValidPath(path)) { + return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path)); + } return childrenCache.get(path); } @Override public final CompletableFuture exists(String path) { + if (!isValidPath(path)) { + return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path)); + } return existsCache.get(path); } @@ -203,6 +206,9 @@ public void accept(Notification n) { @Override public final CompletableFuture delete(String path, Optional expectedVersion) { + if (!isValidPath(path)) { + return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path)); + } // Ensure caches are invalidated before the operation is confirmed return storeDelete(path, expectedVersion) .thenRun(() -> { @@ -239,6 +245,9 @@ protected abstract CompletableFuture storePut(String path, byte[] data, Op @Override public final CompletableFuture put(String path, byte[] data, Optional optExpectedVersion, EnumSet options) { + if (!isValidPath(path)) { + return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path)); + } // Ensure caches are invalidated before the operation is confirmed return storePut(path, data, optExpectedVersion, options) .thenApply(stat -> { @@ -306,4 +315,17 @@ protected static String parent(String path) { return path.substring(0, idx); } + + /** + * valid path in metadata store should be + * 1. not blank + * 2. starts with '/' + * 3. not ends with '/', except root path "/" + */ + static boolean isValidPath(String path) { + return StringUtils.equals(path, "/") + || StringUtils.isNotBlank(path) + && path.startsWith("/") + && !path.endsWith("/"); + } } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java index 740910daeedfd8..4de2a7b5ae345d 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java @@ -35,6 +35,7 @@ import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.common.concurrent.FutureUtils; +import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.metadata.api.GetResult; import org.apache.pulsar.metadata.api.MetadataStoreConfig; import org.apache.pulsar.metadata.api.MetadataStoreException; @@ -90,11 +91,10 @@ public LocalMemoryMetadataStore(String metadataURL, MetadataStoreConfig metadata @Override public CompletableFuture> get(String path) { + if (!isValidPath(path)) { + return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path)); + } synchronized (map) { - if (!isValidPath(path)) { - return FutureUtils.exception(new MetadataStoreException("")); - } - Value v = map.get(path); if (v != null) { return FutureUtils.value( @@ -109,11 +109,10 @@ public CompletableFuture> get(String path) { @Override public CompletableFuture> getChildrenFromStore(String path) { + if (!isValidPath(path)) { + return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path)); + } synchronized (map) { - if (!isValidPath(path)) { - return FutureUtils.exception(new MetadataStoreException("")); - } - String firstKey = path.equals("/") ? path : path + "/"; String lastKey = path.equals("/") ? "0" : path + "0"; // '0' is lexicographically just after '/' @@ -132,13 +131,12 @@ public CompletableFuture> getChildrenFromStore(String path) { @Override public CompletableFuture existsFromStore(String path) { + if (!isValidPath(path)) { + return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path)); + } synchronized (map) { - if (!isValidPath(path)) { - return FutureUtils.exception(new MetadataStoreException("")); - } - Value v = map.get(path); - return FutureUtils.value(v != null ? true : false); + return FutureUtils.value(v != null); } } @@ -150,11 +148,10 @@ public CompletableFuture put(String path, byte[] value, Optional exp @Override public CompletableFuture storePut(String path, byte[] data, Optional optExpectedVersion, EnumSet options) { + if (!isValidPath(path)) { + return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path)); + } synchronized (map) { - if (!isValidPath(path)) { - return FutureUtils.exception(new MetadataStoreException("")); - } - boolean hasVersion = optExpectedVersion.isPresent(); int expectedVersion = optExpectedVersion.orElse(-1L).intValue(); @@ -203,11 +200,10 @@ public CompletableFuture storePut(String path, byte[] data, Optional @Override public CompletableFuture storeDelete(String path, Optional optExpectedVersion) { + if (!isValidPath(path)) { + return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path)); + } synchronized (map) { - if (!isValidPath(path)) { - return FutureUtils.exception(new MetadataStoreException("")); - } - Value value = map.get(path); if (value == null) { return FutureUtils.exception(new NotFoundException("")); @@ -230,12 +226,4 @@ private void notifyParentChildrenChanged(String path) { parent = parent(parent); } } - - private static boolean isValidPath(String path) { - if (path == null || !path.startsWith("/")) { - return false; - } - - return !path.equals("/") || !path.endsWith("/"); - } } diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LocalMemoryMetadataStoreTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStoreTest.java similarity index 75% rename from pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LocalMemoryMetadataStoreTest.java rename to pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStoreTest.java index 0e7d1a2f27bff7..8e945529912d00 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LocalMemoryMetadataStoreTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStoreTest.java @@ -16,35 +16,17 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.metadata; +package org.apache.pulsar.metadata.impl; -import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; -import static org.testng.Assert.fail; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; import java.util.Optional; import java.util.UUID; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CompletionException; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; import lombok.Cleanup; -import org.apache.pulsar.metadata.api.GetResult; import org.apache.pulsar.metadata.api.MetadataStore; import org.apache.pulsar.metadata.api.MetadataStoreConfig; -import org.apache.pulsar.metadata.api.MetadataStoreException; -import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException; -import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException; import org.apache.pulsar.metadata.api.MetadataStoreFactory; -import org.apache.pulsar.metadata.api.Notification; -import org.apache.pulsar.metadata.api.NotificationType; -import org.apache.pulsar.metadata.api.Stat; import org.awaitility.Awaitility; import org.testng.annotations.Test; @@ -92,4 +74,15 @@ public void testSharedInstance() throws Exception { assertFalse(store1.exists("/test").join()); }); } + + @Test + public void testPathValid() { + assertFalse(AbstractMetadataStore.isValidPath(null)); + assertFalse(AbstractMetadataStore.isValidPath("")); + assertFalse(AbstractMetadataStore.isValidPath(" ")); + assertTrue(AbstractMetadataStore.isValidPath("/")); + assertTrue(AbstractMetadataStore.isValidPath("/test")); + assertFalse(AbstractMetadataStore.isValidPath("/test/")); + assertTrue(AbstractMetadataStore.isValidPath("/test/ABC")); + } }