From 8358dae3ddf85134ace9aa661571123d48fe6134 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Wed, 17 Nov 2021 20:12:26 -0800 Subject: [PATCH 1/9] Added local filesystem backend for package manager (#12708) * Added local filesystem backend for package manager * fixed package name in package-info.java * Fixed javadoc --- .../filesystem-storage/pom.xml | 61 ++++++ .../filesystem/FileSystemPackagesStorage.java | 150 ++++++++++++++ .../FileSystemPackagesStorageProvider.java | 30 +++ .../storage/filesystem/package-info.java | 23 +++ .../FileSystemPackagesStorageTest.java | 185 ++++++++++++++++++ pulsar-package-management/pom.xml | 1 + 6 files changed, 450 insertions(+) create mode 100644 pulsar-package-management/filesystem-storage/pom.xml create mode 100644 pulsar-package-management/filesystem-storage/src/main/java/org/apache/pulsar/packages/management/storage/filesystem/FileSystemPackagesStorage.java create mode 100644 pulsar-package-management/filesystem-storage/src/main/java/org/apache/pulsar/packages/management/storage/filesystem/FileSystemPackagesStorageProvider.java create mode 100644 pulsar-package-management/filesystem-storage/src/main/java/org/apache/pulsar/packages/management/storage/filesystem/package-info.java create mode 100644 pulsar-package-management/filesystem-storage/src/test/java/org/apache/pulsar/packages/management/storage/filesystem/FileSystemPackagesStorageTest.java diff --git a/pulsar-package-management/filesystem-storage/pom.xml b/pulsar-package-management/filesystem-storage/pom.xml new file mode 100644 index 0000000000000..05c65daaa2b9d --- /dev/null +++ b/pulsar-package-management/filesystem-storage/pom.xml @@ -0,0 +1,61 @@ + + + + + pulsar-package-management + org.apache.pulsar + 2.10.0-SNAPSHOT + + 4.0.0 + + pulsar-package-filesystem-storage + Apache Pulsar :: Package Management :: Filesystem Storage + + + + ${project.groupId} + pulsar-package-core + ${project.parent.version} + + + + com.google.guava + guava + + + + ${project.groupId} + testmocks + ${project.parent.version} + test + + + + junit + junit + test + + + + diff --git a/pulsar-package-management/filesystem-storage/src/main/java/org/apache/pulsar/packages/management/storage/filesystem/FileSystemPackagesStorage.java b/pulsar-package-management/filesystem-storage/src/main/java/org/apache/pulsar/packages/management/storage/filesystem/FileSystemPackagesStorage.java new file mode 100644 index 0000000000000..ff34c482f15d3 --- /dev/null +++ b/pulsar-package-management/filesystem-storage/src/main/java/org/apache/pulsar/packages/management/storage/filesystem/FileSystemPackagesStorage.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.packages.management.storage.filesystem; + +import com.google.common.io.ByteStreams; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.packages.management.core.PackagesStorage; +import org.apache.pulsar.packages.management.core.PackagesStorageConfiguration; + + +/** + * Packages management storage implementation with filesystem. + */ +@Slf4j +public class FileSystemPackagesStorage implements PackagesStorage { + + private static final String STORAGE_PATH = "STORAGE_PATH"; + private static final String DEFAULT_STORAGE_PATH = "packages-storage"; + + private final File storagePath; + + FileSystemPackagesStorage(PackagesStorageConfiguration configuration) { + String storagePath = configuration.getProperty(STORAGE_PATH); + if (storagePath != null) { + this.storagePath = new File(storagePath); + } else { + this.storagePath = new File(DEFAULT_STORAGE_PATH); + } + } + + private File getPath(String path) { + File f = Paths.get(storagePath.toString(), path).toFile(); + if (!f.getParentFile().exists()) { + if (!f.getParentFile().mkdirs()) { + throw new RuntimeException("Failed to create parent dirs for " + path); + } + } + return f; + } + + @Override + public void initialize() { + if (!storagePath.exists()) { + if (!storagePath.mkdirs()) { + throw new RuntimeException("Failed to create base storage directory at " + storagePath); + } + } + + log.info("Packages management filesystem storage initialized on {}", storagePath); + } + + @Override + public CompletableFuture writeAsync(String path, InputStream inputStream) { + try { + File f = getPath(path); + + @Cleanup + OutputStream os = new FileOutputStream(f); + + @Cleanup + BufferedOutputStream bos = new BufferedOutputStream(os); + ByteStreams.copy(inputStream, bos); + + return CompletableFuture.completedFuture(null); + } catch (IOException e) { + CompletableFuture f = new CompletableFuture<>(); + f.completeExceptionally(e); + return f; + } + } + + @Override + public CompletableFuture readAsync(String path, OutputStream outputStream) { + try { + @Cleanup + InputStream is = new FileInputStream(getPath(path)); + + @Cleanup + BufferedInputStream bis = new BufferedInputStream(is); + ByteStreams.copy(bis, outputStream); + + return CompletableFuture.completedFuture(null); + } catch (IOException e) { + CompletableFuture f = new CompletableFuture<>(); + f.completeExceptionally(e); + return f; + } + } + + @Override + public CompletableFuture deleteAsync(String path) { + if (getPath(path).delete()) { + return CompletableFuture.completedFuture(null); + } else { + CompletableFuture f = new CompletableFuture<>(); + f.completeExceptionally(new IOException("Failed to delete file at " + path)); + return f; + } + } + + @Override + public CompletableFuture> listAsync(String path) { + String[] files = getPath(path).list(); + if (files == null) { + return CompletableFuture.completedFuture(Collections.emptyList()); + } else { + return CompletableFuture.completedFuture(Arrays.asList(files)); + } + } + + @Override + public CompletableFuture existAsync(String path) { + return CompletableFuture.completedFuture(getPath(path).exists()); + } + + @Override + public CompletableFuture closeAsync() { + return CompletableFuture.completedFuture(null); + } +} diff --git a/pulsar-package-management/filesystem-storage/src/main/java/org/apache/pulsar/packages/management/storage/filesystem/FileSystemPackagesStorageProvider.java b/pulsar-package-management/filesystem-storage/src/main/java/org/apache/pulsar/packages/management/storage/filesystem/FileSystemPackagesStorageProvider.java new file mode 100644 index 0000000000000..74b46e287024a --- /dev/null +++ b/pulsar-package-management/filesystem-storage/src/main/java/org/apache/pulsar/packages/management/storage/filesystem/FileSystemPackagesStorageProvider.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.packages.management.storage.filesystem; + +import org.apache.pulsar.packages.management.core.PackagesStorage; +import org.apache.pulsar.packages.management.core.PackagesStorageConfiguration; +import org.apache.pulsar.packages.management.core.PackagesStorageProvider; + +public class FileSystemPackagesStorageProvider implements PackagesStorageProvider { + @Override + public PackagesStorage getStorage(PackagesStorageConfiguration config) { + return new FileSystemPackagesStorage(config); + } +} diff --git a/pulsar-package-management/filesystem-storage/src/main/java/org/apache/pulsar/packages/management/storage/filesystem/package-info.java b/pulsar-package-management/filesystem-storage/src/main/java/org/apache/pulsar/packages/management/storage/filesystem/package-info.java new file mode 100644 index 0000000000000..1186e2bf08757 --- /dev/null +++ b/pulsar-package-management/filesystem-storage/src/main/java/org/apache/pulsar/packages/management/storage/filesystem/package-info.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Packages management storage implementation with filesystem. + */ +package org.apache.pulsar.packages.management.storage.filesystem; diff --git a/pulsar-package-management/filesystem-storage/src/test/java/org/apache/pulsar/packages/management/storage/filesystem/FileSystemPackagesStorageTest.java b/pulsar-package-management/filesystem-storage/src/test/java/org/apache/pulsar/packages/management/storage/filesystem/FileSystemPackagesStorageTest.java new file mode 100644 index 0000000000000..0b254ede5f4b5 --- /dev/null +++ b/pulsar-package-management/filesystem-storage/src/test/java/org/apache/pulsar/packages/management/storage/filesystem/FileSystemPackagesStorageTest.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.packages.management.storage.filesystem; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.RandomUtils; +import org.apache.pulsar.packages.management.core.PackagesStorage; +import org.apache.pulsar.packages.management.core.PackagesStorageProvider; +import org.apache.pulsar.packages.management.core.impl.DefaultPackagesStorageConfiguration; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@Slf4j +public class FileSystemPackagesStorageTest { + private PackagesStorage storage; + private Path storagePath; + + @BeforeMethod() + public void setup() throws Exception { + this.storagePath = Files.createTempDirectory("package-storage-test"); + log.info("Test using storage path: {}", storagePath); + + PackagesStorageProvider provider = PackagesStorageProvider + .newProvider(FileSystemPackagesStorageProvider.class.getName()); + DefaultPackagesStorageConfiguration configuration = new DefaultPackagesStorageConfiguration(); + configuration.setProperty("STORAGE_PATH", storagePath.toString()); + storage = provider.getStorage(configuration); + storage.initialize(); + } + + @AfterMethod(alwaysRun = true) + public void teardown() throws Exception { + if (storage != null) { + storage.closeAsync().get(); + } + + storagePath.toFile().delete(); + } + + @Test(timeOut = 60000) + public void testReadWriteOperations() throws ExecutionException, InterruptedException { + String testData = "test-data"; + ByteArrayInputStream testDataStream = new ByteArrayInputStream(testData.getBytes(StandardCharsets.UTF_8)); + String testPath = "test-read-write"; + + // write some data to the package + storage.writeAsync(testPath, testDataStream).get(); + + // read the data from the package + ByteArrayOutputStream readData = new ByteArrayOutputStream(); + storage.readAsync(testPath, readData).get(); + String readResult = new String(readData.toByteArray(), StandardCharsets.UTF_8); + + assertEquals(testData, readResult); + } + + @Test(timeOut = 60000) + public void testReadWriteLargeDataOperations() throws ExecutionException, InterruptedException { + byte[] data = RandomUtils.nextBytes(8192 * 3 + 4096); + ByteArrayInputStream testDataStream = new ByteArrayInputStream(data); + String testPath = "test-large-read-write"; + + // write some data to the package + storage.writeAsync(testPath, testDataStream).get(); + + // read the data from the package + ByteArrayOutputStream readData = new ByteArrayOutputStream(); + storage.readAsync(testPath, readData).get(); + byte[] readResult = readData.toByteArray(); + + assertEquals(data, readResult); + } + + @Test(timeOut = 60000) + public void testReadNonExistentData() { + String testPath = "non-existent-path"; + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + + try { + storage.readAsync(testPath, outputStream).join(); + } catch (CompletionException e) { + assertEquals(e.getCause().getClass(), FileNotFoundException.class); + } + } + + @Test(timeOut = 60000) + public void testListOperation() throws ExecutionException, InterruptedException { + // write the data to different path + String rootPath = "pulsar"; + String testData = "test-data"; + ByteArrayInputStream inputStream = new ByteArrayInputStream(testData.getBytes(StandardCharsets.UTF_8)); + + List writePaths = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + String path = "test-" + i; + writePaths.add(path); + storage.writeAsync(rootPath + "/" + path, inputStream).get(); + } + + // list all path under the root path + List paths = storage.listAsync(rootPath).get(); + + // verify the paths number + assertEquals(paths.size(), writePaths.size()); + paths.forEach(p -> writePaths.remove(p)); + assertEquals(writePaths.size(), 0); + + // list non-existent path + storage.listAsync("non-existent").get(); + } + + @Test(timeOut = 60000) + public void testDeleteOperation() throws ExecutionException, InterruptedException { + String testPath = "test-delete-path"; + String testData = "test-data"; + ByteArrayInputStream inputStream = new ByteArrayInputStream(testData.getBytes(StandardCharsets.UTF_8)); + + // write the data to the test path + storage.writeAsync(testPath, inputStream).get(); + + // list path should have one file + List paths = storage.listAsync("").get(); + assertEquals(paths.size(), 1); + assertEquals(paths.get(0), testPath); + + // delete the path + storage.deleteAsync(testPath).get(); + + // list again and not file under the path + paths= storage.listAsync("").get(); + assertEquals(paths.size(), 0); + + + // delete non-existent path + try { + storage.deleteAsync("non-existent").join(); + fail("should throw exception"); + } catch (Exception e) { + assertEquals(e.getCause().getClass(), IOException.class); + } + } + + @Test(timeOut = 60000) + public void testExistOperation() throws ExecutionException, InterruptedException { + Boolean exist = storage.existAsync("test-path").get(); + org.testng.Assert.assertFalse(exist); + + storage.writeAsync("test-path", new ByteArrayInputStream("test".getBytes())).get(); + + exist = storage.existAsync("test-path").get(); + assertTrue(exist); + } + +} diff --git a/pulsar-package-management/pom.xml b/pulsar-package-management/pom.xml index bbd4e1fbb2b75..5cf86c9d063ce 100644 --- a/pulsar-package-management/pom.xml +++ b/pulsar-package-management/pom.xml @@ -36,6 +36,7 @@ core bookkeeper-storage + filesystem-storage From 11298144ac118cda951deffa092ab17110d254b7 Mon Sep 17 00:00:00 2001 From: Jiwei Guo Date: Thu, 18 Nov 2021 13:44:42 +0800 Subject: [PATCH 2/9] Fix TopicPoliciesCacheNotInitException issue. (#12773) ### Motivation Sometimes, we may get `TopicPoliciesCacheNotInitException` with below stack trace: ``` 15:45:47.020 [pulsar-web-41-3] INFO org.eclipse.jetty.server.RequestLog - 10.0.0.42 - - [10/Nov/2021:15:45:47 +0000] "GET /status.html HTTP/1.1" 200 2 "-" "kube-probe/1.19+" 1 15:45:51.221 [pulsar-2-15] ERROR org.apache.pulsar.broker.admin.impl.PersistentTopicsBase - [null] Failed to perform getRetention on topic persistent://public/default/UpdateNodeCharts java.lang.RuntimeException: org.apache.pulsar.broker.service.BrokerServiceException$TopicPoliciesCacheNotInitException: Topic policies cache have not init. at org.apache.pulsar.broker.service.TopicPoliciesService.lambda$getTopicPoliciesAsyncWithRetry$0(TopicPoliciesService.java:84) ~[io.streamnative-pulsar-broker-2.8.1.21.jar:2.8.1.21] at org.apache.pulsar.client.util.RetryUtil.executeWithRetry(RetryUtil.java:50) ~[io.streamnative-pulsar-client-original-2.8.1.21.jar:2.8.1.21] at org.apache.pulsar.client.util.RetryUtil.lambda$executeWithRetry$1(RetryUtil.java:63) ~[io.streamnative-pulsar-client-original-2.8.1.21.jar:2.8.1.21] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?] at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.68.Final.jar:4.1.68.Final] at java.lang.Thread.run(Thread.java:829) [?:?] ``` This is because : https://github.com/apache/pulsar/blob/c3da1452a444c9599cb85562a3faa82ddfdecec8/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java#L298-L312 when `reader.readNextAsync()` throws exceptions, the msg will be null which will throw NPE without any catch block. --- .../TransactionMetadataStoreService.java | 5 +-- .../SystemTopicBasedTopicPoliciesService.java | 42 ++++++++++++------- ...temTopicBasedTopicPoliciesServiceTest.java | 40 ++++++++++++++++++ .../impl/MLTransactionLogImpl.java | 8 +++- 4 files changed, 76 insertions(+), 19 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java index 607f05e481673..240c6c9ad7c75 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker; +import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.getMLTransactionLogName; import static org.apache.pulsar.transaction.coordinator.proto.TxnStatus.ABORTING; import static org.apache.pulsar.transaction.coordinator.proto.TxnStatus.COMMITTING; import com.google.common.annotations.VisibleForTesting; @@ -63,7 +64,6 @@ import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.CoordinatorNotFoundException; import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.InvalidTxnStatusException; import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.TransactionMetadataStoreStateException; -import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl; import org.apache.pulsar.transaction.coordinator.proto.TxnStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -230,8 +230,7 @@ public CompletableFuture handleTcClientConnect(TransactionCoordinatorID tc public CompletableFuture openTransactionMetadataStore(TransactionCoordinatorID tcId) { return pulsarService.getBrokerService() - .getManagedLedgerConfig(TopicName.get(MLTransactionLogImpl - .TRANSACTION_LOG_PREFIX + tcId)).thenCompose(v -> { + .getManagedLedgerConfig(getMLTransactionLogName(tcId)).thenCompose(v -> { TransactionTimeoutTracker timeoutTracker = timeoutTrackerFactory.newTracker(tcId); TransactionRecoverTracker recoverTracker = new TransactionRecoverTrackerImpl(TransactionMetadataStoreService.this, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index 07fe23939669b..d0c3d4949c1e0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -171,6 +171,10 @@ private void notifyListener(Message msg) { @Override public TopicPolicies getTopicPolicies(TopicName topicName) throws TopicPoliciesCacheNotInitException { + if (!policyCacheInitMap.containsKey(topicName.getNamespaceObject())) { + NamespaceName namespace = topicName.getNamespaceObject(); + prepareInitPoliciesCache(namespace, new CompletableFuture<>()); + } if (policyCacheInitMap.containsKey(topicName.getNamespaceObject()) && !policyCacheInitMap.get(topicName.getNamespaceObject())) { throw new TopicPoliciesCacheNotInitException(); @@ -209,24 +213,29 @@ public CompletableFuture addOwnedNamespaceBundleAsync(NamespaceBundle name result.complete(null); } else { ownedBundlesCountPerNamespace.putIfAbsent(namespace, new AtomicInteger(1)); - policyCacheInitMap.put(namespace, false); - CompletableFuture> readerCompletableFuture = - creatSystemTopicClientWithRetry(namespace); - readerCaches.put(namespace, readerCompletableFuture); - readerCompletableFuture.whenComplete((reader, ex) -> { - if (ex != null) { - log.error("[{}] Failed to create reader on __change_events topic", namespace, ex); - result.completeExceptionally(ex); - } else { - initPolicesCache(reader, result); - result.thenRun(() -> readMorePolicies(reader)); - } - }); + prepareInitPoliciesCache(namespace, result); } } return result; } + private void prepareInitPoliciesCache(NamespaceName namespace, CompletableFuture result) { + if (policyCacheInitMap.putIfAbsent(namespace, false) == null) { + CompletableFuture> readerCompletableFuture = + creatSystemTopicClientWithRetry(namespace); + readerCaches.put(namespace, readerCompletableFuture); + readerCompletableFuture.whenComplete((reader, ex) -> { + if (ex != null) { + log.error("[{}] Failed to create reader on __change_events topic", namespace, ex); + result.completeExceptionally(ex); + } else { + initPolicesCache(reader, result); + result.thenRun(() -> readMorePolicies(reader)); + } + }); + } + } + protected CompletableFuture> creatSystemTopicClientWithRetry( NamespaceName namespace) { SystemTopicClient systemTopicClient = namespaceEventsSystemTopicFactory @@ -294,6 +303,9 @@ private void initPolicesCache(SystemTopicClient.Reader reader, Comp reader.getSystemTopic().getTopicName(), ex); future.completeExceptionally(ex); readerCaches.remove(reader.getSystemTopic().getTopicName().getNamespaceObject()); + policyCacheInitMap.remove(reader.getSystemTopic().getTopicName().getNamespaceObject()); + reader.closeAsync(); + return; } if (hasMore) { reader.readNextAsync().whenComplete((msg, e) -> { @@ -302,6 +314,9 @@ private void initPolicesCache(SystemTopicClient.Reader reader, Comp reader.getSystemTopic().getTopicName(), ex); future.completeExceptionally(e); readerCaches.remove(reader.getSystemTopic().getTopicName().getNamespaceObject()); + policyCacheInitMap.remove(reader.getSystemTopic().getTopicName().getNamespaceObject()); + reader.closeAsync(); + return; } refreshTopicPoliciesCache(msg); if (log.isDebugEnabled()) { @@ -316,7 +331,6 @@ private void initPolicesCache(SystemTopicClient.Reader reader, Comp } policyCacheInitMap.computeIfPresent( reader.getSystemTopic().getTopicName().getNamespaceObject(), (k, v) -> true); - // replay policy message policiesCache.forEach(((topicName, topicPolicies) -> { if (listeners.get(topicName) != null) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java index 532bd33bc8adb..4c43492a6f8d6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java @@ -31,8 +31,12 @@ import java.lang.reflect.Field; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException; @@ -276,4 +280,40 @@ public void testCreatSystemTopicClientWithRetry() throws Exception { assertEquals(reader1, reader); } + + @Test + public void testGetTopicPoliciesWithRetry() throws Exception { + Field initMapField = SystemTopicBasedTopicPoliciesService.class.getDeclaredField("policyCacheInitMap"); + initMapField.setAccessible(true); + Map initMap = (Map)initMapField.get(systemTopicBasedTopicPoliciesService); + initMap.remove(NamespaceName.get(NAMESPACE1)); + Field readerCaches = SystemTopicBasedTopicPoliciesService.class.getDeclaredField("readerCaches"); + readerCaches.setAccessible(true); + Map>> readers = (Map)readerCaches.get(systemTopicBasedTopicPoliciesService); + readers.remove(NamespaceName.get(NAMESPACE1)); + Backoff backoff = new BackoffBuilder() + .setInitialTime(500, TimeUnit.MILLISECONDS) + .setMandatoryStop(5000, TimeUnit.MILLISECONDS) + .setMax(1000, TimeUnit.MILLISECONDS) + .create(); + TopicPolicies initPolicy = TopicPolicies.builder() + .maxConsumerPerTopic(10) + .build(); + ScheduledExecutorService executors = Executors.newScheduledThreadPool(1); + executors.schedule(new Runnable() { + @Override + public void run() { + try { + systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, initPolicy).get(); + } catch (Exception ignore) {} + } + }, 2000, TimeUnit.MILLISECONDS); + Awaitility.await().untilAsserted(() -> { + Optional topicPolicies = systemTopicBasedTopicPoliciesService.getTopicPoliciesAsyncWithRetry(TOPIC1, backoff, pulsar.getExecutor()).get(); + Assert.assertTrue(topicPolicies.isPresent()); + if (topicPolicies.isPresent()) { + Assert.assertEquals(topicPolicies.get(), initPolicy); + } + }); + } } diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java index f2324afec82ee..c0442753e7a7b 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java @@ -73,8 +73,7 @@ public class MLTransactionLogImpl implements TransactionLog { public MLTransactionLogImpl(TransactionCoordinatorID tcID, ManagedLedgerFactory managedLedgerFactory, ManagedLedgerConfig managedLedgerConfig) { - this.topicName = TopicName.get(TopicDomain.persistent.value(), - NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_LOG_PREFIX + tcID.getId()); + this.topicName = getMLTransactionLogName(tcID); this.tcId = tcID.getId(); this.mlTransactionLogInterceptor = new MLTransactionLogInterceptor(); managedLedgerConfig.setManagedLedgerInterceptor(this.mlTransactionLogInterceptor); @@ -83,6 +82,11 @@ public MLTransactionLogImpl(TransactionCoordinatorID tcID, this.entryQueue = new SpscArrayQueue<>(2000); } + public static TopicName getMLTransactionLogName(TransactionCoordinatorID tcID) { + return TopicName.get(TopicDomain.persistent.value(), + NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_LOG_PREFIX + tcID.getId()); + } + @Override public CompletableFuture initialize() { CompletableFuture future = new CompletableFuture<>(); From fc8d50ecf841bd1a4b01fa09720411f6190ee5be Mon Sep 17 00:00:00 2001 From: Xiangying Meng <55571188+liangyepianzhou@users.noreply.github.com> Date: Thu, 18 Nov 2021 15:28:07 +0800 Subject: [PATCH 3/9] The problem of two exception handling (#12744) whenCompleteAsync has handle exception, don't use exceptionally, otherwise it will be handle twice --- .../mledger/impl/EntryCacheImpl.java | 19 ++----------------- .../mledger/impl/EntryCacheManager.java | 3 --- 2 files changed, 2 insertions(+), 20 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java index 7660031d24b02..49190ad96d640 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java @@ -234,12 +234,7 @@ private void asyncReadEntry0(ReadHandle lh, PositionImpl position, final ReadEnt } finally { ledgerEntries.close(); } - }, ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception->{ - ml.invalidateLedgerHandle(lh); - callback.readEntryFailed(createManagedLedgerException(exception), ctx); - return null; - } - ); + }, ml.getExecutor().chooseThread(ml.getName())); } } @@ -333,17 +328,7 @@ private void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boo } finally { ledgerEntries.close(); } - }, ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception->{ - if (exception instanceof BKException - && ((BKException)exception).getCode() == BKException.Code.TooManyRequestsException) { - callback.readEntriesFailed(createManagedLedgerException(exception), ctx); - } else { - ml.invalidateLedgerHandle(lh); - ManagedLedgerException mlException = createManagedLedgerException(exception); - callback.readEntriesFailed(mlException, ctx); - } - return null; - }); + }, ml.getExecutor().chooseThread(ml.getName())); } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java index c87bcb8aa4031..d360bbd6e5cee 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java @@ -215,9 +215,6 @@ public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boole ml.mbean.addReadEntriesSample(entries.size(), totalSize); callback.readEntriesComplete(entries, ctx); - }).exceptionally(exception -> { - callback.readEntriesFailed(createManagedLedgerException(exception), ctx); - return null; }); } From afa472e5150bd6ca7c52ef09c14ee0b5854613ae Mon Sep 17 00:00:00 2001 From: congbo <39078850+congbobo184@users.noreply.github.com> Date: Thu, 18 Nov 2021 16:02:45 +0800 Subject: [PATCH 4/9] [Transaction] Fix transaction flaky test testMaxReadPositionForNormalPublish (#12681) ## Motivation fix flaky test https://github.com/apache/pulsar/issues/12671 `publishContext.completed(null, position.getLedgerId(), position.getEntryId())` before `transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry())` so return messageId for client, dont represent broker syncMaxReadPositionForNormalPublish complete --- .../pulsar/broker/service/persistent/PersistentTopic.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index a27311f849d9c..78f0fda79ecaa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -501,10 +501,10 @@ public void addComplete(Position pos, ByteBuf entryData, Object ctx) { lastDataMessagePublishedTimestamp = Clock.systemUTC().millis(); } - publishContext.setMetadataFromEntryData(entryData); - publishContext.completed(null, position.getLedgerId(), position.getEntryId()); // in order to sync the max position when cursor read entries transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry()); + publishContext.setMetadataFromEntryData(entryData); + publishContext.completed(null, position.getLedgerId(), position.getEntryId()); decrementPendingWriteOpsAndCheck(); } From 4a4c1de6174c41ccf630fd17ea12d033adde9dd9 Mon Sep 17 00:00:00 2001 From: ZhangJian He Date: Thu, 18 Nov 2021 16:04:22 +0800 Subject: [PATCH 5/9] JavaInstanceTest should be AssertEquals (#12836) --- .../apache/pulsar/functions/instance/JavaInstanceTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java index 03c0da547bd97..9931ddf4ce839 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java @@ -34,6 +34,7 @@ import org.apache.pulsar.functions.api.Function; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.functions.instance.JavaInstance.AsyncFuncRequest; +import org.testng.Assert; import org.testng.annotations.Test; @Slf4j @@ -226,7 +227,7 @@ public void testAsyncFunctionMaxPending() throws Exception { for (int i = 0; i < 3; i++) { AsyncFuncRequest request = instance.getPendingAsyncRequests().poll(); - assertNotNull(testString + "-lambda", (String) request.getProcessResult().get()); + Assert.assertEquals(request.getProcessResult().get(), testString + "-lambda"); } long endTime = System.currentTimeMillis(); @@ -235,7 +236,6 @@ public void testAsyncFunctionMaxPending() throws Exception { instance.close(); } - @SuppressWarnings("serial") private static class UserException extends Exception { public UserException(String msg) { super(msg); From c18063f44684052ef07f0259084743a7a45e5656 Mon Sep 17 00:00:00 2001 From: ZhangJian He Date: Thu, 18 Nov 2021 16:05:23 +0800 Subject: [PATCH 6/9] Add error log when new jetty client (#12840) --- .../org/apache/pulsar/proxy/server/AdminProxyHandler.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java index 13f9372fa1dea..7d3c658a12ac8 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java @@ -256,7 +256,7 @@ protected HttpClient newHttpClient() { if (config.isTlsEnabledWithBroker()) { try { - X509Certificate trustCertificates[] = SecurityUtility + X509Certificate[] trustCertificates = SecurityUtility .loadCertificatesFromPemFile(config.getBrokerClientTrustCertsFilePath()); SSLContext sslCtx; @@ -281,6 +281,7 @@ protected HttpClient newHttpClient() { return new JettyHttpClient(contextFactory); } catch (Exception e) { + LOG.error("new jetty http client exception ", e); try { auth.close(); } catch (IOException ioe) { @@ -303,7 +304,7 @@ protected String rewriteTarget(HttpServletRequest request) { boolean isFunctionsRestRequest = false; String requestUri = request.getRequestURI(); - for (String routePrefix: functionRoutes) { + for (String routePrefix : functionRoutes) { if (requestUri.startsWith(routePrefix)) { isFunctionsRestRequest = true; break; @@ -324,7 +325,7 @@ protected String rewriteTarget(HttpServletRequest request) { if (LOG.isDebugEnabled()) { LOG.debug("[{}:{}] Selected active broker is {}", request.getRemoteAddr(), request.getRemotePort(), - url.toString()); + url); } } catch (Exception e) { LOG.warn("[{}:{}] Failed to get next active broker {}", request.getRemoteAddr(), From e33687d3f202ab104d41ad086c48b66b6f0d5ff5 Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Thu, 18 Nov 2021 02:42:56 -0600 Subject: [PATCH 7/9] [Broker] Fix producer getting incorrectly removed from topic's producers map (#12846) --- .../pulsar/broker/service/AbstractTopic.java | 8 +-- .../pulsar/broker/service/Producer.java | 27 ++++---- .../broker/service/PersistentTopicTest.java | 14 +++- .../pulsar/broker/service/ServerCnxTest.java | 66 +++++++++++++++++++ 4 files changed, 91 insertions(+), 24 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index f4f7615688fc2..12bf0973b7b04 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -639,13 +639,9 @@ protected void internalAddProducer(Producer producer) throws BrokerServiceExcept private void tryOverwriteOldProducer(Producer oldProducer, Producer newProducer) throws BrokerServiceException { - boolean canOverwrite = false; - if (oldProducer.equals(newProducer) && !isUserProvidedProducerName(oldProducer) - && !isUserProvidedProducerName(newProducer) && newProducer.getEpoch() > oldProducer.getEpoch()) { + if (newProducer.isSuccessorTo(oldProducer) && !isUserProvidedProducerName(oldProducer) + && !isUserProvidedProducerName(newProducer)) { oldProducer.close(false); - canOverwrite = true; - } - if (canOverwrite) { if (!producers.replace(newProducer.getProducerName(), oldProducer, newProducer)) { // Met concurrent update, throw exception here so that client can try reconnect later. throw new BrokerServiceException.NamingException("Producer with name '" + newProducer.getProducerName() diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index 0514542d2f567..d72f904019a4e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -153,22 +153,17 @@ private String parseRemoteClusterName(String producerName, boolean isRemote, Str return null; } - @Override - public int hashCode() { - return Objects.hash(producerName); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof Producer) { - Producer other = (Producer) obj; - return Objects.equals(producerName, other.producerName) - && Objects.equals(topic, other.topic) - && producerId == other.producerId - && Objects.equals(cnx, other.cnx); - } - - return false; + /** + * Method to determine if this producer can replace another producer. + * @param other - producer to compare to this one + * @return true if this producer is a subsequent instantiation of the same logical producer. Otherwise, false. + */ + public boolean isSuccessorTo(Producer other) { + return Objects.equals(producerName, other.producerName) + && Objects.equals(topic, other.topic) + && producerId == other.producerId + && Objects.equals(cnx, other.cnx) + && other.getEpoch() < epoch; } public void publishMessage(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 5fe651be704ce..b8cbe0b2291a6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -37,6 +37,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertSame; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -442,11 +443,20 @@ public void testAddRemoveProducer() throws Exception { // OK } - // 4. simple remove producer + // 4. Try to remove with unequal producer + Producer producerCopy = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name", + role, false, null, SchemaVersion.Latest, 0, false, + ProducerAccessMode.Shared, Optional.empty()); + topic.removeProducer(producerCopy); + // Expect producer to be in map + assertEquals(topic.getProducers().size(), 1); + assertSame(topic.getProducers().get(producer.getProducerName()), producer); + + // 5. simple remove producer topic.removeProducer(producer); assertEquals(topic.getProducers().size(), 0); - // 5. duplicate remove + // 6. duplicate remove topic.removeProducer(producer); /* noop */ } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index 80be3ef89f927..4fe14d89f7c67 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -837,6 +837,72 @@ public void testCreateProducerTimeout() throws Exception { channel.finish(); } + @Test(timeOut = 30000) + public void testCreateProducerTimeoutThenCreateSameNamedProducerShouldFail() throws Exception { + resetChannel(); + setChannelConnected(); + + // Delay the topic creation in a deterministic way + CompletableFuture openTopicFuture = new CompletableFuture<>(); + doAnswer(invocationOnMock -> { + openTopicFuture.complete(() -> { + ((OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null); + }); + return null; + }).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class), + any(OpenLedgerCallback.class), any(Supplier.class), any()); + + // In a create producer timeout from client side we expect to see this sequence of commands : + // 1. create producer + // 2. close producer (when the timeout is triggered, which may be before the producer was created on the broker + // 3. create producer (triggered by reconnection logic) + // Then, when another producer is created with the same name, it should fail. Because we only have one + // channel here, we just use a different producer id + + // These operations need to be serialized, to allow the last create producer to finally succeed + // (There can be more create/close pairs in the sequence, depending on the client timeout + + String producerName = "my-producer"; + + ByteBuf createProducer1 = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */, + producerName, Collections.emptyMap(), false); + channel.writeInbound(createProducer1); + + ByteBuf closeProducer = Commands.newCloseProducer(1 /* producer id */, 2 /* request id */ ); + channel.writeInbound(closeProducer); + + ByteBuf createProducer2 = Commands.newProducer(successTopicName, 1 /* producer id */, 3 /* request id */, + producerName, Collections.emptyMap(), false); + channel.writeInbound(createProducer2); + + // Complete the topic opening: It will make 2nd producer creation successful + openTopicFuture.get().run(); + + // Close succeeds + Object response = getResponse(); + assertEquals(response.getClass(), CommandSuccess.class); + assertEquals(((CommandSuccess) response).getRequestId(), 2); + + // 2nd producer will be successfully created as topic is open by then + response = getResponse(); + assertEquals(response.getClass(), CommandProducerSuccess.class); + assertEquals(((CommandProducerSuccess) response).getRequestId(), 3); + + // Send create command after getting the CommandProducerSuccess to ensure correct ordering + ByteBuf createProducer3 = Commands.newProducer(successTopicName, 2 /* producer id */, 4 /* request id */, + producerName, Collections.emptyMap(), false); + channel.writeInbound(createProducer3); + + // 3nd producer will fail + response = getResponse(); + assertEquals(response.getClass(), CommandError.class); + assertEquals(((CommandError) response).getRequestId(), 4); + + assertTrue(channel.isActive()); + + channel.finish(); + } + @Test(timeOut = 30000, enabled = false) public void testCreateProducerMultipleTimeouts() throws Exception { resetChannel(); From 5c12be7d1fdf74d02c247321ce4c982fc5157930 Mon Sep 17 00:00:00 2001 From: Diego Date: Thu, 18 Nov 2021 09:46:08 +0100 Subject: [PATCH 8/9] Update deploy-bare-metal.md (#12432) * Update deploy-bare-metal.md no details for NAT. It is useful for distributed cluster. * Update site2/docs/deploy-bare-metal.md Co-authored-by: Anonymitaet <50226895+Anonymitaet@users.noreply.github.com> * Update deploy-bare-metal.md AS requested Co-authored-by: Anonymitaet <50226895+Anonymitaet@users.noreply.github.com> --- site2/docs/deploy-bare-metal.md | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/site2/docs/deploy-bare-metal.md b/site2/docs/deploy-bare-metal.md index 7f56cbf4c1783..fd334746c88a5 100644 --- a/site2/docs/deploy-bare-metal.md +++ b/site2/docs/deploy-bare-metal.md @@ -202,9 +202,17 @@ server.1=zk1.us-west.example.com:2888:3888 server.2=zk2.us-west.example.com:2888:3888 server.3=zk3.us-west.example.com:2888:3888 ``` - > If you only have one machine on which to deploy Pulsar, you only need to add one server entry in the configuration file. +> If your machines are behind NAT use 0.0.0.0 as server entry for the local address. If the node use external IP in configuration for itself, behind NAT, zookeper service won't start because it tries to put a listener on an external ip that the linux box doesn't own. Using 0.0.0.0 start a listener on ALL ip, so that NAT network traffic can reach it. + +Example of configuration on _server.3_ +```properties +server.1=zk1.us-west.example.com:2888:3888 +server.2=zk2.us-west.example.com:2888:3888 +server.3=0.0.0.0:2888:3888 +``` + On each host, you need to specify the ID of the node in the `myid` file, which is in the `data/zookeeper` folder of each server by default (you can change the file location via the [`dataDir`](reference-configuration.md#zookeeper-dataDir) parameter). > See the [Multi-server setup guide](https://zookeeper.apache.org/doc/r3.4.10/zookeeperAdmin.html#sc_zkMulitServerSetup) in the ZooKeeper documentation for detailed information on `myid` and more. From fa7be236efcc6772e0aac05f25f8d5f3cf0ad741 Mon Sep 17 00:00:00 2001 From: JiangHaiting Date: Thu, 18 Nov 2021 20:37:37 +0800 Subject: [PATCH 9/9] [Issue 12757][broker] add broker config isAllowAutoUpdateSchema (#12786) --- conf/broker.conf | 4 + conf/standalone.conf | 4 + .../pulsar/broker/ServiceConfiguration.java | 8 ++ .../pulsar/broker/service/AbstractTopic.java | 38 +++++---- .../SchemaCompatibilityCheckTest.java | 81 +++++++++++++++++++ .../pulsar/common/policies/data/Policies.java | 2 +- .../docs/reference-configuration.md | 3 +- 7 files changed, 123 insertions(+), 17 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 13e955bdee53d..700f9a5adc9b1 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -270,6 +270,10 @@ brokerMaxConnections=0 # The maximum number of connections per IP. If it exceeds, new connections are rejected. brokerMaxConnectionsPerIp=0 +# Allow schema to be auto updated at broker level. User can override this by +# 'is_allow_auto_update_schema' of namespace policy. +isAllowAutoUpdateSchemaEnabled=true + # Enable check for minimum allowed client library version clientLibraryVersionCheckEnabled=false diff --git a/conf/standalone.conf b/conf/standalone.conf index 87d9e058f465c..906280c180253 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -176,6 +176,10 @@ defaultNumberOfNamespaceBundles=4 # Using a value of 0, is disabling maxTopicsPerNamespace-limit check. maxTopicsPerNamespace=0 +# Allow schema to be auto updated at broker level. User can override this by +# 'is_allow_auto_update_schema' of namespace policy. +isAllowAutoUpdateSchemaEnabled=true + # Enable check for minimum allowed client library version clientLibraryVersionCheckEnabled=false diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index b832c9568d718..9ca086a4c7d67 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -573,6 +573,14 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private int brokerMaxConnectionsPerIp = 0; + @FieldContext( + category = CATEGORY_POLICIES, + dynamic = true, + doc = "Allow schema to be auto updated at broker level. User can override this by 'is_allow_auto_update_schema'" + + " of namespace policy. This is enabled by default." + ) + private boolean isAllowAutoUpdateSchemaEnabled = true; + @FieldContext( category = CATEGORY_SERVER, dynamic = true, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 12bf0973b7b04..d505971b95432 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -98,7 +98,7 @@ public abstract class AbstractTopic implements Topic { @Getter protected volatile SchemaCompatibilityStrategy schemaCompatibilityStrategy = SchemaCompatibilityStrategy.FULL; - protected volatile boolean isAllowAutoUpdateSchema = true; + protected volatile Boolean isAllowAutoUpdateSchema; // schema validation enforced flag protected volatile boolean schemaValidationEnforced = false; @@ -333,20 +333,28 @@ public CompletableFuture addSchema(SchemaData schema) { String base = TopicName.get(getName()).getPartitionedTopicName(); String id = TopicName.get(base).getSchemaName(); SchemaRegistryService schemaRegistryService = brokerService.pulsar().getSchemaRegistryService(); - return isAllowAutoUpdateSchema ? schemaRegistryService - .putSchemaIfAbsent(id, schema, schemaCompatibilityStrategy) - : schemaRegistryService.trimDeletedSchemaAndGetList(id).thenCompose(schemaAndMetadataList -> - schemaRegistryService.getSchemaVersionBySchemaData(schemaAndMetadataList, schema) - .thenCompose(schemaVersion -> { - if (schemaVersion == null) { - return FutureUtil - .failedFuture( - new IncompatibleSchemaException( - "Schema not found and schema auto updating is disabled.")); - } else { - return CompletableFuture.completedFuture(schemaVersion); - } - })); + + if (allowAutoUpdateSchema()) { + return schemaRegistryService.putSchemaIfAbsent(id, schema, schemaCompatibilityStrategy); + } else { + return schemaRegistryService.trimDeletedSchemaAndGetList(id).thenCompose(schemaAndMetadataList -> + schemaRegistryService.getSchemaVersionBySchemaData(schemaAndMetadataList, schema) + .thenCompose(schemaVersion -> { + if (schemaVersion == null) { + return FutureUtil.failedFuture(new IncompatibleSchemaException( + "Schema not found and schema auto updating is disabled.")); + } else { + return CompletableFuture.completedFuture(schemaVersion); + } + })); + } + } + + private boolean allowAutoUpdateSchema() { + if (isAllowAutoUpdateSchema == null) { + return brokerService.pulsar().getConfig().isAllowAutoUpdateSchemaEnabled(); + } + return isAllowAutoUpdateSchema; } @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java index 293f71d5f878f..80168b9ae4c9f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java @@ -218,6 +218,87 @@ public void testConsumerCompatibilityReadAllCheckTest(SchemaCompatibilityStrateg } } + @Test(dataProvider = "AllCheckSchemaCompatibilityStrategy") + public void testBrokerAllowAutoUpdateSchemaDisabled(SchemaCompatibilityStrategy schemaCompatibilityStrategy) + throws Exception { + + final String tenant = PUBLIC_TENANT; + final String topic = "test-consumer-compatibility"; + String namespace = "test-namespace-" + randomName(16); + String fqtn = TopicName.get( + TopicDomain.persistent.value(), + tenant, + namespace, + topic + ).toString(); + + NamespaceName namespaceName = NamespaceName.get(tenant, namespace); + + admin.namespaces().createNamespace( + tenant + "/" + namespace, + Sets.newHashSet(CLUSTER_NAME) + ); + + assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(namespaceName.toString()), + SchemaCompatibilityStrategy.FULL); + + admin.namespaces().setSchemaCompatibilityStrategy(namespaceName.toString(), schemaCompatibilityStrategy); + admin.schemas().createSchema(fqtn, Schema.AVRO(Schemas.PersonOne.class).getSchemaInfo()); + + + pulsar.getConfig().setAllowAutoUpdateSchemaEnabled(false); + ProducerBuilder producerThreeBuilder = pulsarClient + .newProducer(Schema.AVRO(SchemaDefinition.builder().withAlwaysAllowNull + (false).withSupportSchemaVersioning(true). + withPojo(Schemas.PersonTwo.class).build())) + .topic(fqtn); + try { + producerThreeBuilder.create(); + } catch (Exception e) { + Assert.assertTrue(e.getMessage().contains("Schema not found and schema auto updating is disabled.")); + } + + pulsar.getConfig().setAllowAutoUpdateSchemaEnabled(true); + ConsumerBuilder comsumerBuilder = pulsarClient.newConsumer(Schema.AVRO( + SchemaDefinition.builder().withAlwaysAllowNull + (false).withSupportSchemaVersioning(true). + withPojo(Schemas.PersonTwo.class).build())) + .subscriptionName("test") + .topic(fqtn); + + Producer producer = producerThreeBuilder.create(); + Consumer consumerTwo = comsumerBuilder.subscribe(); + + producer.send(new Schemas.PersonTwo(2, "Lucy")); + Message message = consumerTwo.receive(); + + Schemas.PersonTwo personTwo = message.getValue(); + consumerTwo.acknowledge(message); + + assertEquals(personTwo.getId(), 2); + assertEquals(personTwo.getName(), "Lucy"); + + producer.close(); + consumerTwo.close(); + + pulsar.getConfig().setAllowAutoUpdateSchemaEnabled(false); + + producer = producerThreeBuilder.create(); + consumerTwo = comsumerBuilder.subscribe(); + + producer.send(new Schemas.PersonTwo(2, "Lucy")); + message = consumerTwo.receive(); + + personTwo = message.getValue(); + consumerTwo.acknowledge(message); + + assertEquals(personTwo.getId(), 2); + assertEquals(personTwo.getName(), "Lucy"); + + consumerTwo.close(); + producer.close(); + } + @Test(dataProvider = "AllCheckSchemaCompatibilityStrategy") public void testIsAutoUpdateSchema(SchemaCompatibilityStrategy schemaCompatibilityStrategy) throws Exception { final String tenant = PUBLIC_TENANT; diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java index 7f56bc8e50735..4d29a5fdee669 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java @@ -107,7 +107,7 @@ public class Policies { public SchemaCompatibilityStrategy schema_compatibility_strategy = SchemaCompatibilityStrategy.UNDEFINED; @SuppressWarnings("checkstyle:MemberName") - public boolean is_allow_auto_update_schema = true; + public Boolean is_allow_auto_update_schema = null; @SuppressWarnings("checkstyle:MemberName") public boolean schema_validation_enforced = false; diff --git a/site2/website-next/docs/reference-configuration.md b/site2/website-next/docs/reference-configuration.md index 1908149053ba1..3199155d3aaad 100644 --- a/site2/website-next/docs/reference-configuration.md +++ b/site2/website-next/docs/reference-configuration.md @@ -349,6 +349,7 @@ brokerServiceCompactionThresholdInBytes|If the estimated backlog size is greater | managedLedgerInfoCompressionType | Compression type of managed ledger information.

Available options are `NONE`, `LZ4`, `ZLIB`, `ZSTD`, and `SNAPPY`).

If this value is `NONE` or invalid, the `managedLedgerInfo` is not compressed.

**Note** that after enabling this configuration, if you want to degrade a broker, you need to change the value to `NONE` and make sure all ledger metadata is saved without compression. | None | | additionalServlets | Additional servlet name.

If you have multiple additional servlets, separate them by commas.

For example, additionalServlet_1, additionalServlet_2 | N/A | | additionalServletDirectory | Location of broker additional servlet NAR directory | ./brokerAdditionalServlet | +| isAllowAutoUpdateSchemaEnabled | Allow schema to be auto updated at broker level. User can override this by 'is_allow_auto_update_schema' of namespace policy. |true| ## Client @@ -480,7 +481,6 @@ You can set the log level and configuration in the [log4j2.yaml](https://github | dispatchThrottlingRatePerTopicInMsg | Default messages (per second) dispatch throttling-limit for every topic. When the value is set to 0, default message dispatch throttling-limit is disabled. |0 | | dispatchThrottlingRatePerTopicInByte | Default byte (per second) dispatch throttling-limit for every topic. When the value is set to 0, default byte dispatch throttling-limit is disabled. | 0| | dispatchThrottlingOnBatchMessageEnabled |Apply dispatch rate limiting on batch message instead individual messages with in batch message. (Default is disabled). | false| - | dispatchThrottlingRateRelativeToPublishRate | Enable dispatch rate-limiting relative to publish rate. | false | |dispatchThrottlingRatePerSubscriptionInMsg|The defaulted number of message dispatching throttling-limit for a subscription. The value of 0 disables message dispatch-throttling.|0| |dispatchThrottlingRatePerSubscriptionInByte|The default number of message-bytes dispatching throttling-limit for a subscription. The value of 0 disables message-byte dispatch-throttling.|0| @@ -650,6 +650,7 @@ You can set the log level and configuration in the [log4j2.yaml](https://github |haProxyProtocolEnabled | Enable or disable the [HAProxy](http://www.haproxy.org/) protocol. |false| |bookieId | If you want to custom a bookie ID or use a dynamic network address for a bookie, you can set the `bookieId`.

Bookie advertises itself using the `bookieId` rather than the `BookieSocketAddress` (`hostname:port` or `IP:port`).

The `bookieId` is a non-empty string that can contain ASCII digits and letters ([a-zA-Z9-0]), colons, dashes, and dots.

For more information about `bookieId`, see [here](http://bookkeeper.apache.org/bps/BP-41-bookieid/).|/| | maxTopicsPerNamespace | The maximum number of persistent topics that can be created in the namespace. When the number of topics reaches this threshold, the broker rejects the request of creating a new topic, including the auto-created topics by the producer or consumer, until the number of connected consumers decreases. The default value 0 disables the check. | 0 | +| isAllowAutoUpdateSchemaEnabled | Allow schema to be auto updated at broker level. User can override this by 'is_allow_auto_update_schema' of namespace policy. |true| ## WebSocket