Skip to content

Commit

Permalink
Optimize isValidPath check in MetadataStore (apache#12663)
Browse files Browse the repository at this point in the history
### Motivation

1. There is a logic error in original LocalMemoryMetadataStore#isValidPath with this code line 239
>  return !path.equals("/") || !path.endsWith("/");

The "||" operator should be "&&", to forbids path ends with "/"

2. Further more, as we can implement other MetadataStore based on etcd or other external storage system. The path check function should be added to AbstractMetadataStore, and get checked before any operations.

### Modifications

1. Fixed the logic error in isValidPath and moved it to AbstractMetadataStore.
2. Check path before each operation in AbstractMetadataStore.
3. Add MetadataStoreException.InvalidPathException
  • Loading branch information
Jason918 authored and eolivelli committed Nov 29, 2021
1 parent 09b5f21 commit eb2a105
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 53 deletions.
Expand Up @@ -146,6 +146,12 @@ public AlreadyClosedException(String msg) {
}
}

public static class InvalidPathException extends MetadataStoreException {
public InvalidPathException(String path) {
super("Path(" + path + ") is invalid");
}
}

public static MetadataStoreException unwrap(Throwable t) {
if (t instanceof MetadataStoreException) {
return (MetadataStoreException) t;
Expand Down
Expand Up @@ -24,9 +24,7 @@
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.annotations.VisibleForTesting;

import io.netty.util.concurrent.DefaultThreadFactory;

import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
Expand All @@ -38,15 +36,14 @@
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import java.util.stream.Collectors;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataSerde;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.Stat;
Expand Down Expand Up @@ -146,11 +143,17 @@ public <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde) {

@Override
public final CompletableFuture<List<String>> getChildren(String path) {
if (!isValidPath(path)) {
return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path));
}
return childrenCache.get(path);
}

@Override
public final CompletableFuture<Boolean> exists(String path) {
if (!isValidPath(path)) {
return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path));
}
return existsCache.get(path);
}

Expand Down Expand Up @@ -203,6 +206,9 @@ public void accept(Notification n) {

@Override
public final CompletableFuture<Void> delete(String path, Optional<Long> expectedVersion) {
if (!isValidPath(path)) {
return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path));
}
// Ensure caches are invalidated before the operation is confirmed
return storeDelete(path, expectedVersion)
.thenRun(() -> {
Expand Down Expand Up @@ -239,6 +245,9 @@ protected abstract CompletableFuture<Stat> storePut(String path, byte[] data, Op
@Override
public final CompletableFuture<Stat> put(String path, byte[] data, Optional<Long> optExpectedVersion,
EnumSet<CreateOption> options) {
if (!isValidPath(path)) {
return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path));
}
// Ensure caches are invalidated before the operation is confirmed
return storePut(path, data, optExpectedVersion, options)
.thenApply(stat -> {
Expand Down Expand Up @@ -306,4 +315,17 @@ protected static String parent(String path) {

return path.substring(0, idx);
}

/**
* valid path in metadata store should be
* 1. not blank
* 2. starts with '/'
* 3. not ends with '/', except root path "/"
*/
static boolean isValidPath(String path) {
return StringUtils.equals(path, "/")
|| StringUtils.isNotBlank(path)
&& path.startsWith("/")
&& !path.endsWith("/");
}
}
Expand Up @@ -35,6 +35,7 @@
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
Expand Down Expand Up @@ -90,11 +91,10 @@ public LocalMemoryMetadataStore(String metadataURL, MetadataStoreConfig metadata

@Override
public CompletableFuture<Optional<GetResult>> get(String path) {
if (!isValidPath(path)) {
return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path));
}
synchronized (map) {
if (!isValidPath(path)) {
return FutureUtils.exception(new MetadataStoreException(""));
}

Value v = map.get(path);
if (v != null) {
return FutureUtils.value(
Expand All @@ -109,11 +109,10 @@ public CompletableFuture<Optional<GetResult>> get(String path) {

@Override
public CompletableFuture<List<String>> getChildrenFromStore(String path) {
if (!isValidPath(path)) {
return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path));
}
synchronized (map) {
if (!isValidPath(path)) {
return FutureUtils.exception(new MetadataStoreException(""));
}

String firstKey = path.equals("/") ? path : path + "/";
String lastKey = path.equals("/") ? "0" : path + "0"; // '0' is lexicographically just after '/'

Expand All @@ -132,13 +131,12 @@ public CompletableFuture<List<String>> getChildrenFromStore(String path) {

@Override
public CompletableFuture<Boolean> existsFromStore(String path) {
if (!isValidPath(path)) {
return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path));
}
synchronized (map) {
if (!isValidPath(path)) {
return FutureUtils.exception(new MetadataStoreException(""));
}

Value v = map.get(path);
return FutureUtils.value(v != null ? true : false);
return FutureUtils.value(v != null);
}
}

Expand All @@ -150,11 +148,10 @@ public CompletableFuture<Stat> put(String path, byte[] value, Optional<Long> exp
@Override
public CompletableFuture<Stat> storePut(String path, byte[] data, Optional<Long> optExpectedVersion,
EnumSet<CreateOption> options) {
if (!isValidPath(path)) {
return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path));
}
synchronized (map) {
if (!isValidPath(path)) {
return FutureUtils.exception(new MetadataStoreException(""));
}

boolean hasVersion = optExpectedVersion.isPresent();
int expectedVersion = optExpectedVersion.orElse(-1L).intValue();

Expand Down Expand Up @@ -203,11 +200,10 @@ public CompletableFuture<Stat> storePut(String path, byte[] data, Optional<Long>

@Override
public CompletableFuture<Void> storeDelete(String path, Optional<Long> optExpectedVersion) {
if (!isValidPath(path)) {
return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path));
}
synchronized (map) {
if (!isValidPath(path)) {
return FutureUtils.exception(new MetadataStoreException(""));
}

Value value = map.get(path);
if (value == null) {
return FutureUtils.exception(new NotFoundException(""));
Expand All @@ -230,12 +226,4 @@ private void notifyParentChildrenChanged(String path) {
parent = parent(parent);
}
}

private static boolean isValidPath(String path) {
if (path == null || !path.startsWith("/")) {
return false;
}

return !path.equals("/") || !path.endsWith("/");
}
}
Expand Up @@ -16,35 +16,17 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.metadata;
package org.apache.pulsar.metadata.impl;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletionException;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import lombok.Cleanup;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.apache.pulsar.metadata.api.MetadataStoreFactory;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.Stat;
import org.awaitility.Awaitility;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -92,4 +74,15 @@ public void testSharedInstance() throws Exception {
assertFalse(store1.exists("/test").join());
});
}

@Test
public void testPathValid() {
assertFalse(AbstractMetadataStore.isValidPath(null));
assertFalse(AbstractMetadataStore.isValidPath(""));
assertFalse(AbstractMetadataStore.isValidPath(" "));
assertTrue(AbstractMetadataStore.isValidPath("/"));
assertTrue(AbstractMetadataStore.isValidPath("/test"));
assertFalse(AbstractMetadataStore.isValidPath("/test/"));
assertTrue(AbstractMetadataStore.isValidPath("/test/ABC"));
}
}

0 comments on commit eb2a105

Please sign in to comment.