diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java index c3fd143016010..e9a348625857c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java @@ -27,7 +27,6 @@ import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.util.ZkUtils; import org.apache.pulsar.broker.PulsarServerException; -import org.apache.pulsar.broker.namespace.NamespaceEphemeralData; import org.apache.pulsar.common.policies.data.LocalPolicies; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.util.ObjectMapperFactory; @@ -53,7 +52,6 @@ public class LocalZooKeeperCacheService { private final ZooKeeperCache cache; - private ZooKeeperDataCache ownerInfoCache; private ZooKeeperManagedLedgerCache managedLedgerListCache; private ResourceQuotaCache resourceQuotaCache; private ZooKeeperDataCache policiesCache; @@ -68,13 +66,6 @@ public LocalZooKeeperCacheService(ZooKeeperCache cache, ConfigurationCacheServic initZK(); - this.ownerInfoCache = new ZooKeeperDataCache(cache) { - @Override - public NamespaceEphemeralData deserialize(String path, byte[] content) throws Exception { - return ObjectMapperFactory.getThreadLocal().readValue(content, NamespaceEphemeralData.class); - } - }; - this.policiesCache = new ZooKeeperDataCache(cache) { @Override public LocalPolicies deserialize(String path, byte[] content) throws Exception { @@ -239,10 +230,6 @@ public ResourceQuotaCache getResourceQuotaCache() { return this.resourceQuotaCache; } - public ZooKeeperDataCache ownerInfoCache() { - return this.ownerInfoCache; - } - public ZooKeeperDataCache policiesCache() { return this.policiesCache; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceEphemeralData.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceEphemeralData.java index d37e6856f791d..e6acbaf6582c4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceEphemeralData.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceEphemeralData.java @@ -18,13 +18,20 @@ */ package org.apache.pulsar.broker.namespace; -import com.google.common.base.MoreObjects; import com.google.common.collect.Maps; import java.util.Collections; import java.util.Map; import javax.validation.constraints.NotNull; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.ToString; import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener; +@Getter +@NoArgsConstructor +@EqualsAndHashCode +@ToString public class NamespaceEphemeralData { private String nativeUrl; private String nativeUrlTls; @@ -33,9 +40,6 @@ public class NamespaceEphemeralData { private boolean disabled; private Map advertisedListeners; - public NamespaceEphemeralData() { - } - public NamespaceEphemeralData(String brokerUrl, String brokerUrlTls, String httpUrl, String httpUrlTls, boolean disabled) { this(brokerUrl, brokerUrlTls, httpUrl, httpUrlTls, disabled, null); @@ -55,30 +59,6 @@ public NamespaceEphemeralData(String brokerUrl, String brokerUrlTls, String http } } - public String getNativeUrl() { - return nativeUrl; - } - - public String getNativeUrlTls() { - return nativeUrlTls; - } - - public String getHttpUrl() { - return httpUrl; - } - - public String getHttpUrlTls() { - return httpUrlTls; - } - - public boolean isDisabled() { - return disabled; - } - - public void setDisabled(boolean flag) { - this.disabled = flag; - } - @NotNull public Map getAdvertisedListeners() { if (this.advertisedListeners == null) { @@ -86,10 +66,4 @@ public Map getAdvertisedListeners() { } return Collections.unmodifiableMap(this.advertisedListeners); } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this).add("nativeUrl", nativeUrl).add("httpUrl", httpUrl) - .add("disabled", disabled).add("advertisedListeners", getAdvertisedListeners()).toString(); - } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index a4164746eef09..fd6c74ec64bc8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -220,7 +220,7 @@ public Optional getBundleIfPresent(TopicName topicName) { return bundles.map(b -> b.findBundle(topicName)); } - public NamespaceBundle getBundle(TopicName topicName) throws Exception { + public NamespaceBundle getBundle(TopicName topicName) { return bundleFactory.getBundles(topicName.getNamespaceObject()).findBundle(topicName); } @@ -999,7 +999,7 @@ private boolean isTopicOwned(TopicName topicName) { public CompletableFuture checkTopicOwnership(TopicName topicName) { return getBundleAsync(topicName) - .thenCompose(bundle -> ownershipCache.checkOwnership(bundle)); + .thenApply(ownershipCache::checkOwnership); } public void removeOwnedServiceUnit(NamespaceBundle nsBundle) throws Exception { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnedBundle.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnedBundle.java index 4f93057eeca37..1ff6db9149103 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnedBundle.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnedBundle.java @@ -23,12 +23,16 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.locks.ReentrantReadWriteLock; +import lombok.EqualsAndHashCode; +import lombok.ToString; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@EqualsAndHashCode +@ToString public class OwnedBundle { private static final Logger LOG = LoggerFactory.getLogger(OwnedBundle.class); @@ -38,6 +42,8 @@ public class OwnedBundle { * {@link #nsLock} is used to protect read/write access to {@link #active} flag and the corresponding code section * based on {@link #active} flag. */ + @ToString.Exclude + @EqualsAndHashCode.Exclude private final ReentrantReadWriteLock nsLock = new ReentrantReadWriteLock(); private static final int FALSE = 0; private static final int TRUE = 1; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java index 116397a94d499..fe7f572865688 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java @@ -18,8 +18,6 @@ */ package org.apache.pulsar.broker.namespace; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import com.github.benmanes.caffeine.cache.AsyncCacheLoader; import com.github.benmanes.caffeine.cache.AsyncLoadingCache; import com.github.benmanes.caffeine.cache.Caffeine; @@ -29,31 +27,23 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; -import org.apache.bookkeeper.util.ZkUtils; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceBundleFactory; import org.apache.pulsar.common.naming.NamespaceBundles; import org.apache.pulsar.common.util.FutureUtil; -import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.apache.pulsar.metadata.api.coordination.LockManager; +import org.apache.pulsar.metadata.api.coordination.ResourceLock; import org.apache.pulsar.stats.CacheMetricsCollector; -import org.apache.pulsar.zookeeper.ZooKeeperCache; -import org.apache.pulsar.zookeeper.ZooKeeperDataCache; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * This class provides a cache service for all the service unit ownership among the brokers. It provide a cache service - * as well as ZooKeeper read/write functions for a) lookup of a service unit ownership to a broker; b) take ownership of - * a service unit by the local broker - * - * + * as well as MetadataStore read/write functions for a) lookup of a service unit ownership to a broker; b) take + * ownership of a service unit by the local broker */ public class OwnershipCache { @@ -79,26 +69,14 @@ public class OwnershipCache { */ private final NamespaceEphemeralData selfOwnerInfoDisabled; - /** - * Service unit ownership cache of ZooKeeper data of ephemeral nodes showing all known ownership of - * service unit to active brokers. - */ - private final ZooKeeperDataCache ownershipReadOnlyCache; + private final LockManager lockManager; - /** - * The loading cache of locally owned NamespaceBundle objects. - */ - private final AsyncLoadingCache ownedBundlesCache; + private final Map locallyAcquiredLocks; /** - * The ObjectMapper to deserialize/serialize JSON objects. - */ - private final ObjectMapper jsonMapper = ObjectMapperFactory.create(); - - /** - * The ZooKeeperCache connecting to the local ZooKeeper. + * The loading cache of locally owned NamespaceBundle objects. */ - private final ZooKeeperCache localZkCache; + private final AsyncLoadingCache ownedBundlesCache; /** * The NamespaceBundleFactory to construct NamespaceBundles. @@ -112,40 +90,20 @@ public class OwnershipCache { private final PulsarService pulsar; - private class OwnedServiceUnitCacheLoader implements AsyncCacheLoader { + private class OwnedServiceUnitCacheLoader implements AsyncCacheLoader { - @SuppressWarnings("deprecation") @Override - public CompletableFuture asyncLoad(String namespaceBundleZNode, Executor executor) { - if (LOG.isDebugEnabled()) { - LOG.debug("Acquiring zk lock on namespace {}", namespaceBundleZNode); - } - - byte[] znodeContent; - try { - znodeContent = jsonMapper.writeValueAsBytes(selfOwnerInfo); - } catch (JsonProcessingException e) { - // Failed to serialize to JSON - return FutureUtil.failedFuture(e); - } - - CompletableFuture future = new CompletableFuture<>(); - ZkUtils.asyncCreateFullPathOptimistic(localZkCache.getZooKeeper(), namespaceBundleZNode, znodeContent, - Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, (rc, path, ctx, name) -> { - if (rc == KeeperException.Code.OK.intValue()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Successfully acquired zk lock on {}", namespaceBundleZNode); - } - ownershipReadOnlyCache.invalidate(namespaceBundleZNode); - future.complete(new OwnedBundle( - ServiceUnitUtils.suBundleFromPath(namespaceBundleZNode, bundleFactory))); - } else { - // Failed to acquire lock - future.completeExceptionally(KeeperException.create(rc)); - } - }, null); - - return future; + public CompletableFuture asyncLoad(NamespaceBundle namespaceBundle, Executor executor) { + return lockManager.acquireLock(ServiceUnitUtils.path(namespaceBundle), selfOwnerInfo) + .thenApply(rl -> { + locallyAcquiredLocks.put(namespaceBundle, rl); + rl.getLockExpiredFuture() + .thenRun(() -> { + ownedBundlesCache.synchronous().invalidate(namespaceBundle); + namespaceService.onNamespaceBundleUnload(namespaceBundle); + }); + return new OwnedBundle(namespaceBundle); + }); } } @@ -167,8 +125,8 @@ public OwnershipCache(PulsarService pulsar, NamespaceBundleFactory bundleFactory pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(), true, pulsar.getAdvertisedListeners()); this.bundleFactory = bundleFactory; - this.localZkCache = pulsar.getLocalZkCache(); - this.ownershipReadOnlyCache = pulsar.getLocalZkCacheService().ownerInfoCache(); + this.lockManager = pulsar.getCoordinationService().getLockManager(NamespaceEphemeralData.class); + this.locallyAcquiredLocks = new ConcurrentHashMap<>(); // ownedBundlesCache contains all namespaces that are owned by the local broker this.ownedBundlesCache = Caffeine.newBuilder() .executor(MoreExecutors.directExecutor()) @@ -177,37 +135,14 @@ public OwnershipCache(PulsarService pulsar, NamespaceBundleFactory bundleFactory CacheMetricsCollector.CAFFEINE.addCache("owned-bundles", this.ownedBundlesCache); } - private CompletableFuture>> resolveOwnership(String path) { - return ownershipReadOnlyCache.getWithStatAsync(path).thenApply(optionalOwnerDataWithStat -> { - if (optionalOwnerDataWithStat.isPresent()) { - Map.Entry ownerDataWithStat = optionalOwnerDataWithStat.get(); - Stat stat = ownerDataWithStat.getValue(); - if (stat.getEphemeralOwner() == localZkCache.getZooKeeper().getSessionId()) { - LOG.info("Successfully reestablish ownership of {}", path); - OwnedBundle ownedBundle = new OwnedBundle(ServiceUnitUtils.suBundleFromPath(path, bundleFactory)); - if (selfOwnerInfo.getNativeUrl().equals(ownerDataWithStat.getKey().getNativeUrl())) { - ownedBundlesCache.put(path, CompletableFuture.completedFuture(ownedBundle)); - } - ownershipReadOnlyCache.invalidate(path); - } - } - return optionalOwnerDataWithStat; - }); - } - /** * Check whether this broker owns given namespace bundle. * * @param bundle namespace bundle * @return future that will complete with check result */ - public CompletableFuture checkOwnership(NamespaceBundle bundle) { - OwnedBundle ownedBundle = getOwnedBundle(bundle); - if (ownedBundle != null) { - return CompletableFuture.completedFuture(true); - } - String bundlePath = ServiceUnitUtils.path(bundle); - return resolveOwnership(bundlePath).thenApply(Optional::isPresent); + public boolean checkOwnership(NamespaceBundle bundle) { + return getOwnedBundle(bundle) != null; } /** @@ -220,9 +155,7 @@ public CompletableFuture checkOwnership(NamespaceBundle bundle) { * throws exception if no ownership info is found */ public CompletableFuture> getOwnerAsync(NamespaceBundle suName) { - String path = ServiceUnitUtils.path(suName); - - CompletableFuture ownedBundleFuture = ownedBundlesCache.getIfPresent(path); + CompletableFuture ownedBundleFuture = ownedBundlesCache.getIfPresent(suName); if (ownedBundleFuture != null) { // Either we're the owners or we're trying to become the owner. return ownedBundleFuture.thenApply(serviceUnit -> { @@ -232,7 +165,8 @@ public CompletableFuture> getOwnerAsync(Namespa } // If we're not the owner, we need to check if anybody else is - return resolveOwnership(path).thenApply(optional -> optional.map(Map.Entry::getKey)); + String path = ServiceUnitUtils.path(suName); + return lockManager.readLock(path); } /** @@ -244,59 +178,21 @@ public CompletableFuture> getOwnerAsync(Namespa * @throws Exception */ public CompletableFuture tryAcquiringOwnership(NamespaceBundle bundle) throws Exception { - String path = ServiceUnitUtils.path(bundle); - - CompletableFuture future = new CompletableFuture<>(); - if (!refreshSelfOwnerInfo()) { - future.completeExceptionally( + return FutureUtil.failedFuture( new RuntimeException("Namespace service does not ready for acquiring ownership")); - return future; } LOG.info("Trying to acquire ownership of {}", bundle); - // Doing a get() on the ownedBundlesCache will trigger an async ZK write to acquire the lock over the + // Doing a get() on the ownedBundlesCache will trigger an async metatada write to acquire the lock over the // service unit - ownedBundlesCache.get(path).thenAccept(namespaceBundle -> { - LOG.info("Successfully acquired ownership of {}", path); + return ownedBundlesCache.get(bundle) + .thenApply(namespaceBundle -> { + LOG.info("Successfully acquired ownership of {}", namespaceBundle); namespaceService.onNamespaceBundleOwned(bundle); - future.complete(selfOwnerInfo); - }).exceptionally(exception -> { - // Failed to acquire ownership - if (exception instanceof CompletionException - && exception.getCause() instanceof KeeperException.NodeExistsException) { - resolveOwnership(path).thenAccept(optionalOwnerDataWithStat -> { - if (optionalOwnerDataWithStat.isPresent()) { - Map.Entry ownerDataWithStat = optionalOwnerDataWithStat.get(); - NamespaceEphemeralData ownerData = ownerDataWithStat.getKey(); - Stat stat = ownerDataWithStat.getValue(); - if (stat.getEphemeralOwner() != localZkCache.getZooKeeper().getSessionId()) { - LOG.info("Failed to acquire ownership of {} -- Already owned by broker {}", - path, ownerData); - } - future.complete(ownerData); - } else { - // Strange scenario: we couldn't create a z-node because it was already existing, but when we - // try to read it, it's not there anymore - LOG.info("Failed to acquire ownership of {} -- Already owned by unknown broker", path); - future.completeExceptionally(exception); - } - }).exceptionally(ex -> { - LOG.warn("Failed to check ownership of {}: {}", bundle, ex.getMessage(), ex); - future.completeExceptionally(exception); - return null; - }); - } else { - // Other ZK error, bailing out for now - LOG.warn("Failed to acquire ownership of {}: {}", bundle, exception.getMessage(), exception); - future.completeExceptionally(exception); - } - - return null; + return selfOwnerInfo; }); - - return future; } /** @@ -304,23 +200,13 @@ public CompletableFuture tryAcquiringOwnership(Namespace * */ public CompletableFuture removeOwnership(NamespaceBundle bundle) { - CompletableFuture result = new CompletableFuture<>(); - String key = ServiceUnitUtils.path(bundle); - localZkCache.getZooKeeper().delete(key, -1, (rc, path, ctx) -> { - // Invalidate cache even in error since this operation may succeed in server side. - ownedBundlesCache.synchronous().invalidate(key); - ownershipReadOnlyCache.invalidate(key); - namespaceService.onNamespaceBundleUnload(bundle); - if (rc == KeeperException.Code.OK.intValue() || rc == KeeperException.Code.NONODE.intValue()) { - LOG.info("[{}] Removed zk lock for service unit: {}", key, KeeperException.Code.get(rc)); - result.complete(null); - } else { - LOG.warn("[{}] Failed to delete the namespace ephemeral node. key={}", key, - KeeperException.Code.get(rc)); - result.completeExceptionally(KeeperException.create(rc)); - } - }, null); - return result; + ResourceLock lock = locallyAcquiredLocks.get(bundle); + if (lock == null) { + // We don't own the specified bundle anymore + return CompletableFuture.completedFuture(null); + } + + return lock.release(); } /** @@ -347,7 +233,7 @@ public CompletableFuture removeOwnership(NamespaceBundles bundles) { * * @return a map of owned ServiceUnit objects */ - public Map getOwnedBundles() { + public Map getOwnedBundles() { return this.ownedBundlesCache.synchronous().asMap(); } @@ -369,7 +255,8 @@ public boolean isNamespaceBundleOwned(NamespaceBundle bundle) { * @return */ public OwnedBundle getOwnedBundle(NamespaceBundle bundle) { - CompletableFuture future = ownedBundlesCache.getIfPresent(ServiceUnitUtils.path(bundle)); + CompletableFuture future = ownedBundlesCache.getIfPresent(bundle); + if (future != null && future.isDone() && !future.isCompletedExceptionally()) { return future.join(); } else { @@ -384,35 +271,15 @@ public OwnedBundle getOwnedBundle(NamespaceBundle bundle) { * @throws Exception */ public CompletableFuture disableOwnership(NamespaceBundle bundle) { - String path = ServiceUnitUtils.path(bundle); - CompletableFuture future = new CompletableFuture<>(); - - updateBundleState(bundle, false) - .thenRun(() -> { - byte[] value; - try { - value = jsonMapper.writeValueAsBytes(selfOwnerInfoDisabled); - } catch (JsonProcessingException e) { - future.completeExceptionally(e); - return; + return updateBundleState(bundle, false) + .thenCompose(__ -> { + ResourceLock lock = locallyAcquiredLocks.get(bundle); + if (lock == null) { + return CompletableFuture.completedFuture(null); + } else { + return lock.updateValue(selfOwnerInfoDisabled); } - - localZkCache.getZooKeeper().setData(path, value, -1, (rc, path1, ctx, stat) -> { - if (rc == KeeperException.Code.OK.intValue()) { - ownershipReadOnlyCache.invalidate(path1); - future.complete(null); - } else { - future.completeExceptionally(KeeperException.create(rc)); - } - }, null); - }) - .exceptionally(ex -> { - LOG.warn("Failed to update state on namespace bundle {}: {}", bundle, ex.getMessage(), ex); - future.completeExceptionally(ex); - return null; }); - - return future; } /** @@ -422,9 +289,8 @@ public CompletableFuture disableOwnership(NamespaceBundle bundle) { * @throws Exception */ public CompletableFuture updateBundleState(NamespaceBundle bundle, boolean isActive) { - String path = ServiceUnitUtils.path(bundle); // Disable owned instance in local cache - CompletableFuture f = ownedBundlesCache.getIfPresent(path); + CompletableFuture f = ownedBundlesCache.getIfPresent(bundle); if (f != null && f.isDone() && !f.isCompletedExceptionally()) { return f.thenAccept(ob -> ob.setActive(isActive)); } else { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index e2133f6345fb4..55279138e8d2d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -383,16 +383,15 @@ public CompletableFuture checkSchemaCompatibleForConsumer(SchemaData schem @Override public CompletableFuture> addProducer(Producer producer, - CompletableFuture producerQueuedFuture) { + CompletableFuture producerQueuedFuture) { checkArgument(producer.getTopic() == this); - CompletableFuture> future = new CompletableFuture<>(); - - incrementTopicEpochIfNeeded(producer, producerQueuedFuture) - .thenAccept(producerEpoch -> { + return brokerService.checkTopicNsOwnership(getName()) + .thenCompose(__ -> + incrementTopicEpochIfNeeded(producer, producerQueuedFuture)) + .thenCompose(producerEpoch -> { lock.writeLock().lock(); try { - brokerService.checkTopicNsOwnership(getName()); checkTopicFenced(); if (isTerminated()) { log.warn("[{}] Attempting to add producer to a terminated topic", topic); @@ -406,18 +405,13 @@ public CompletableFuture> addProducer(Producer producer, USAGE_COUNT_UPDATER.get(this)); } - future.complete(producerEpoch); - } catch (Throwable e) { - future.completeExceptionally(e); + return CompletableFuture.completedFuture(producerEpoch); + } catch (BrokerServiceException e) { + return FutureUtil.failedFuture(e); } finally { lock.writeLock().unlock(); } - }).exceptionally(ex -> { - future.completeExceptionally(ex); - return null; }); - - return future; } protected CompletableFuture> incrementTopicEpochIfNeeded(Producer producer, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 1e5061831bcd0..e390dd15b5521 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -56,7 +56,6 @@ import java.util.Set; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; @@ -108,7 +107,6 @@ import org.apache.pulsar.broker.service.BrokerServiceException.NamingException; import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException; import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException; -import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException; import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException; import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter; @@ -1151,29 +1149,25 @@ protected CompletableFuture> loadOrCreatePersistentTopic(final S return topicFuture; } - checkTopicNsOwnershipAsync(topic).whenComplete((ignored, throwable) -> { - if (throwable != null) { - topicFuture.completeExceptionally(throwable); - return; - } - - final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore.get(); + checkTopicNsOwnership(topic) + .thenRun(() -> { + final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore.get(); - if (topicLoadSemaphore.tryAcquire()) { - createPersistentTopic(topic, createIfMissing, topicFuture); - topicFuture.handle((persistentTopic, ex) -> { - // release permit and process pending topic - topicLoadSemaphore.release(); - createPendingLoadTopic(); - return null; + if (topicLoadSemaphore.tryAcquire()) { + createPersistentTopic(topic, createIfMissing, topicFuture); + topicFuture.handle((persistentTopic, ex) -> { + // release permit and process pending topic + topicLoadSemaphore.release(); + createPendingLoadTopic(); + return null; + }); + } else { + pendingTopicLoadingQueue.add(new ImmutablePair<>(topic, topicFuture)); + if (log.isDebugEnabled()) { + log.debug("topic-loading for {} added into pending queue", topic); + } + } }); - } else { - pendingTopicLoadingQueue.add(new ImmutablePair<>(topic, topicFuture)); - if (log.isDebugEnabled()) { - log.debug("topic-loading for {} added into pending queue", topic); - } - } - }); return topicFuture; } @@ -1627,36 +1621,21 @@ public boolean isTopicNsOwnedByBroker(TopicName topicName) { return false; } - public CompletableFuture checkTopicNsOwnershipAsync(final String topic) { + public CompletableFuture checkTopicNsOwnership(final String topic) { TopicName topicName = TopicName.get(topic); - CompletableFuture checkFuture = new CompletableFuture<>(); - pulsar.getNamespaceService().checkTopicOwnership(topicName).whenComplete((ownedByThisInstance, throwable) -> { - if (throwable != null) { - log.debug("Failed to check the ownership of the topic: {}", topicName, throwable); - checkFuture.completeExceptionally(new ServerMetadataException(throwable)); - } else if (!ownedByThisInstance) { - String msg = String.format("Namespace bundle for topic (%s) not served by this instance. " - + "Please redo the lookup. Request is denied: namespace=%s", topic, topicName.getNamespace()); - log.warn(msg); - checkFuture.completeExceptionally(new ServiceUnitNotReadyException(msg)); - } else { - checkFuture.complete(null); - } - }); - return checkFuture; - } - public void checkTopicNsOwnership(final String topic) throws BrokerServiceException { - try { - checkTopicNsOwnershipAsync(topic).join(); - } catch (CompletionException ex) { - if (ex.getCause() instanceof BrokerServiceException) { - throw (BrokerServiceException) ex.getCause(); - } - throw new BrokerServiceException(ex.getCause()); - } catch (Exception ex) { - throw new BrokerServiceException(ex); - } + return pulsar.getNamespaceService().checkTopicOwnership(topicName) + .thenCompose(ownedByThisInstance -> { + if (ownedByThisInstance) { + return CompletableFuture.completedFuture(null); + } else { + String msg = String.format("Namespace bundle for topic (%s) not served by this instance. " + + "Please redo the lookup. Request is denied: namespace=%s", topic, + topicName.getNamespace()); + log.warn(msg); + return FutureUtil.failedFuture(new ServiceUnitNotReadyException(msg)); + } + }); } public CompletableFuture unloadServiceUnit(NamespaceBundle serviceUnit, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 464cd8e6417cc..9fd4ce5e8d761 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -228,92 +228,89 @@ public CompletableFuture subscribe(final TransportCnx cnx, String subs long resetStartMessageBackInSec, boolean replicateSubscriptionState, KeySharedMeta keySharedMeta) { - final CompletableFuture future = new CompletableFuture<>(); + return brokerService.checkTopicNsOwnership(getName()).thenCompose(__ -> { + final CompletableFuture future = new CompletableFuture<>(); - try { - brokerService.checkTopicNsOwnership(getName()); - } catch (Exception e) { - future.completeExceptionally(e); - return future; - } - if (hasBatchMessagePublished && !cnx.isBatchMessageCompatibleVersion()) { - if (log.isDebugEnabled()) { - log.debug("[{}] Consumer doesn't support batch-message {}", topic, subscriptionName); + if (hasBatchMessagePublished && !cnx.isBatchMessageCompatibleVersion()) { + if (log.isDebugEnabled()) { + log.debug("[{}] Consumer doesn't support batch-message {}", topic, subscriptionName); + } + future.completeExceptionally(new UnsupportedVersionException("Consumer doesn't support batch-message")); + return future; } - future.completeExceptionally(new UnsupportedVersionException("Consumer doesn't support batch-message")); - return future; - } - - if (subscriptionName.startsWith(replicatorPrefix)) { - log.warn("[{}] Failed to create subscription for {}", topic, subscriptionName); - future.completeExceptionally(new NamingException("Subscription with reserved subscription name attempted")); - return future; - } - if (readCompacted) { - future.completeExceptionally(new NotAllowedException("readCompacted only valid on persistent topics")); - return future; - } + if (subscriptionName.startsWith(replicatorPrefix)) { + log.warn("[{}] Failed to create subscription for {}", topic, subscriptionName); + future.completeExceptionally( + new NamingException("Subscription with reserved subscription name attempted")); + return future; + } - lock.readLock().lock(); - try { - if (isFenced) { - log.warn("[{}] Attempting to subscribe to a fenced topic", topic); - future.completeExceptionally(new TopicFencedException("Topic is temporarily unavailable")); + if (readCompacted) { + future.completeExceptionally(new NotAllowedException("readCompacted only valid on persistent topics")); return future; } - handleConsumerAdded(subscriptionName, consumerName); - } finally { - lock.readLock().unlock(); - } + lock.readLock().lock(); + try { + if (isFenced) { + log.warn("[{}] Attempting to subscribe to a fenced topic", topic); + future.completeExceptionally(new TopicFencedException("Topic is temporarily unavailable")); + return future; + } - NonPersistentSubscription subscription = subscriptions.computeIfAbsent(subscriptionName, - name -> new NonPersistentSubscription(this, subscriptionName)); - Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, 0, - cnx, cnx.getAuthRole(), metadata, readCompacted, initialPosition, keySharedMeta); - addConsumerToSubscription(subscription, consumer).thenRun(() -> { - if (!cnx.isActive()) { - try { - consumer.close(); - } catch (BrokerServiceException e) { - if (e instanceof ConsumerBusyException) { - log.warn("[{}][{}] Consumer {} {} already connected", topic, subscriptionName, consumerId, - consumerName); - } else if (e instanceof SubscriptionBusyException) { - log.warn("[{}][{}] {}", topic, subscriptionName, e.getMessage()); - } + handleConsumerAdded(subscriptionName, consumerName); + } finally { + lock.readLock().unlock(); + } - decrementUsageCount(); - future.completeExceptionally(e); - return; + NonPersistentSubscription subscription = subscriptions.computeIfAbsent(subscriptionName, + name -> new NonPersistentSubscription(this, subscriptionName)); + Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, 0, + cnx, cnx.getAuthRole(), metadata, readCompacted, initialPosition, keySharedMeta); + addConsumerToSubscription(subscription, consumer).thenRun(() -> { + if (!cnx.isActive()) { + try { + consumer.close(); + } catch (BrokerServiceException e) { + if (e instanceof ConsumerBusyException) { + log.warn("[{}][{}] Consumer {} {} already connected", topic, subscriptionName, consumerId, + consumerName); + } else if (e instanceof SubscriptionBusyException) { + log.warn("[{}][{}] {}", topic, subscriptionName, e.getMessage()); + } + + decrementUsageCount(); + future.completeExceptionally(e); + return; + } + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] [{}] Subscribe failed -- count: {}", topic, subscriptionName, + consumer.consumerName(), currentUsageCount()); + } + future.completeExceptionally( + new BrokerServiceException("Connection was closed while the opening the cursor ")); + } else { + log.info("[{}][{}] Created new subscription for {}", topic, subscriptionName, consumerId); + future.complete(consumer); } - if (log.isDebugEnabled()) { - log.debug("[{}] [{}] [{}] Subscribe failed -- count: {}", topic, subscriptionName, - consumer.consumerName(), currentUsageCount()); + }).exceptionally(e -> { + Throwable throwable = e.getCause(); + if (throwable instanceof ConsumerBusyException) { + log.warn("[{}][{}] Consumer {} {} already connected", topic, subscriptionName, consumerId, + consumerName); + } else if (throwable instanceof SubscriptionBusyException) { + log.warn("[{}][{}] {}", topic, subscriptionName, e.getMessage()); } - future.completeExceptionally( - new BrokerServiceException("Connection was closed while the opening the cursor ")); - } else { - log.info("[{}][{}] Created new subscription for {}", topic, subscriptionName, consumerId); - future.complete(consumer); - } - }).exceptionally(e -> { - Throwable throwable = e.getCause(); - if (throwable instanceof ConsumerBusyException) { - log.warn("[{}][{}] Consumer {} {} already connected", topic, subscriptionName, consumerId, - consumerName); - } else if (throwable instanceof SubscriptionBusyException) { - log.warn("[{}][{}] {}", topic, subscriptionName, e.getMessage()); - } - decrementUsageCount(); - future.completeExceptionally(throwable); - return null; - }); + decrementUsageCount(); + future.completeExceptionally(throwable); + return null; + }); - return future; + return future; + }); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 0e863494fccae..4ed047053f05e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -626,168 +626,146 @@ public CompletableFuture subscribe(final TransportCnx cnx, String subs Map metadata, boolean readCompacted, InitialPosition initialPosition, long startMessageRollbackDurationSec, - boolean replicatedSubscriptionState, + boolean replicatedSubscriptionStateArg, KeySharedMeta keySharedMeta) { - - final CompletableFuture future = new CompletableFuture<>(); - - try { - brokerService.checkTopicNsOwnership(getName()); - } catch (Exception e) { - future.completeExceptionally(e); - return future; - } - if (readCompacted && !(subType == SubType.Failover || subType == SubType.Exclusive)) { - future.completeExceptionally( - new NotAllowedException("readCompacted only allowed on failover or exclusive subscriptions")); - return future; + return FutureUtil.failedFuture(new NotAllowedException( + "readCompacted only allowed on failover or exclusive subscriptions")); } - if (replicatedSubscriptionState - && !brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions()) { - log.warn("Replicated Subscription is disabled by broker."); - replicatedSubscriptionState = false; - } - - if (subType == SubType.Key_Shared - && !brokerService.pulsar().getConfiguration().isSubscriptionKeySharedEnable()) { - future.completeExceptionally( - new NotAllowedException("Key_Shared subscription is disabled by broker.") - ); - return future; - } + return brokerService.checkTopicNsOwnership(getName()).thenCompose(__ -> { + boolean replicatedSubscriptionState = replicatedSubscriptionStateArg; - try { - if (!topic.endsWith(EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME) - && !checkSubscriptionTypesEnable(subType)) { - future.completeExceptionally( - new NotAllowedException("Topic[{" + topic + "}] don't support " - + subType.name() + " sub type!")); - return future; + if (replicatedSubscriptionState + && !brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions()) { + log.warn("[{}] Replicated Subscription is disabled by broker.", getName()); + replicatedSubscriptionState = false; } - } catch (Exception e) { - future.completeExceptionally(e); - return future; - } - if (isBlank(subscriptionName)) { - if (log.isDebugEnabled()) { - log.debug("[{}] Empty subscription name", topic); + if (subType == SubType.Key_Shared + && !brokerService.pulsar().getConfiguration().isSubscriptionKeySharedEnable()) { + return FutureUtil.failedFuture( + new NotAllowedException("Key_Shared subscription is disabled by broker.")); } - future.completeExceptionally(new NamingException("Empty subscription name")); - return future; - } - if (hasBatchMessagePublished && !cnx.isBatchMessageCompatibleVersion()) { - if (log.isDebugEnabled()) { - log.debug("[{}] Consumer doesn't support batch-message {}", topic, subscriptionName); + try { + if (!topic.endsWith(EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME) + && !checkSubscriptionTypesEnable(subType)) { + return FutureUtil.failedFuture( + new NotAllowedException("Topic[{" + topic + "}] doesn't support " + + subType.name() + " sub type!")); + } + } catch (Exception e) { + return FutureUtil.failedFuture(e); } - future.completeExceptionally(new UnsupportedVersionException("Consumer doesn't support batch-message")); - return future; - } - if (subscriptionName.startsWith(replicatorPrefix) || subscriptionName.equals(DEDUPLICATION_CURSOR_NAME)) { - log.warn("[{}] Failed to create subscription for {}", topic, subscriptionName); - future.completeExceptionally(new NamingException("Subscription with reserved subscription name attempted")); - return future; - } + if (isBlank(subscriptionName)) { + if (log.isDebugEnabled()) { + log.debug("[{}] Empty subscription name", topic); + } + return FutureUtil.failedFuture(new NamingException("Empty subscription name")); + } - if (cnx.clientAddress() != null && cnx.clientAddress().toString().contains(":")) { - SubscribeRateLimiter.ConsumerIdentifier consumer = new SubscribeRateLimiter.ConsumerIdentifier( - cnx.clientAddress().toString().split(":")[0], consumerName, consumerId); - if (subscribeRateLimiter.isPresent() && !subscribeRateLimiter.get().subscribeAvailable(consumer) - || !subscribeRateLimiter.get().tryAcquire(consumer)) { - log.warn("[{}] Failed to create subscription for {} {} limited by {}, available {}", - topic, subscriptionName, consumer, subscribeRateLimiter.get().getSubscribeRate(), - subscribeRateLimiter.get().getAvailableSubscribeRateLimit(consumer)); - future.completeExceptionally( - new NotAllowedException("Subscribe limited by subscribe rate limit per consumer.")); - return future; + if (hasBatchMessagePublished && !cnx.isBatchMessageCompatibleVersion()) { + if (log.isDebugEnabled()) { + log.debug("[{}] Consumer doesn't support batch-message {}", topic, subscriptionName); + } + return FutureUtil.failedFuture( + new UnsupportedVersionException("Consumer doesn't support batch-message")); + } + + if (subscriptionName.startsWith(replicatorPrefix) + || subscriptionName.equals(DEDUPLICATION_CURSOR_NAME)) { + log.warn("[{}] Failed to create subscription for {}", topic, subscriptionName); + return FutureUtil.failedFuture( + new NamingException("Subscription with reserved subscription name attempted")); + } + + if (cnx.clientAddress() != null && cnx.clientAddress().toString().contains(":")) { + SubscribeRateLimiter.ConsumerIdentifier consumer = new SubscribeRateLimiter.ConsumerIdentifier( + cnx.clientAddress().toString().split(":")[0], consumerName, consumerId); + if (subscribeRateLimiter.isPresent() && !subscribeRateLimiter.get().subscribeAvailable(consumer) + || !subscribeRateLimiter.get().tryAcquire(consumer)) { + log.warn("[{}] Failed to create subscription for {} {} limited by {}, available {}", + topic, subscriptionName, consumer, subscribeRateLimiter.get().getSubscribeRate(), + subscribeRateLimiter.get().getAvailableSubscribeRateLimit(consumer)); + return FutureUtil.failedFuture( + new NotAllowedException("Subscribe limited by subscribe rate limit per consumer.")); + } } - } + lock.readLock().lock(); + try { + if (isFenced) { + log.warn("[{}] Attempting to subscribe to a fenced topic", topic); + return FutureUtil.failedFuture(new TopicFencedException("Topic is temporarily unavailable")); + } + handleConsumerAdded(subscriptionName, consumerName); + } finally { + lock.readLock().unlock(); + } + + CompletableFuture subscriptionFuture = isDurable ? // + getDurableSubscription(subscriptionName, initialPosition, startMessageRollbackDurationSec, + replicatedSubscriptionState) + : getNonDurableSubscription(subscriptionName, startMessageId, initialPosition, + startMessageRollbackDurationSec); + + int maxUnackedMessages = isDurable + ? getMaxUnackedMessagesOnConsumer() + : 0; + + CompletableFuture future = subscriptionFuture.thenCompose(subscription -> { + Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, + consumerName, maxUnackedMessages, cnx, cnx.getAuthRole(), metadata, + readCompacted, initialPosition, keySharedMeta); + return addConsumerToSubscription(subscription, consumer).thenCompose(v -> { + checkBackloggedCursors(); + if (!cnx.isActive()) { + try { + consumer.close(); + } catch (BrokerServiceException e) { + if (e instanceof ConsumerBusyException) { + log.warn("[{}][{}] Consumer {} {} already connected", + topic, subscriptionName, consumerId, consumerName); + } else if (e instanceof SubscriptionBusyException) { + log.warn("[{}][{}] {}", topic, subscriptionName, e.getMessage()); + } - lock.readLock().lock(); - try { - if (isFenced) { - log.warn("[{}] Attempting to subscribe to a fenced topic", topic); - future.completeExceptionally(new TopicFencedException("Topic is temporarily unavailable")); - return future; - } - handleConsumerAdded(subscriptionName, consumerName); - } finally { - lock.readLock().unlock(); - } - - CompletableFuture subscriptionFuture = isDurable ? // - getDurableSubscription(subscriptionName, initialPosition, startMessageRollbackDurationSec, - replicatedSubscriptionState) - : getNonDurableSubscription(subscriptionName, startMessageId, initialPosition, - startMessageRollbackDurationSec); - - int maxUnackedMessages = isDurable - ? getMaxUnackedMessagesOnConsumer() - : 0; - - subscriptionFuture.thenAccept(subscription -> { - Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, - maxUnackedMessages, cnx, cnx.getAuthRole(), metadata, - readCompacted, initialPosition, keySharedMeta); - addConsumerToSubscription(subscription, consumer).thenAccept(v -> { - checkBackloggedCursors(); - if (!cnx.isActive()) { - try { - consumer.close(); - } catch (BrokerServiceException e) { - if (e instanceof ConsumerBusyException) { - log.warn("[{}][{}] Consumer {} {} already connected", - topic, subscriptionName, consumerId, consumerName); - } else if (e instanceof SubscriptionBusyException) { - log.warn("[{}][{}] {}", topic, subscriptionName, e.getMessage()); + decrementUsageCount(); + return FutureUtil.failedFuture(e); + } + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] [{}] Subscribe failed -- count: {}", topic, subscriptionName, + consumer.consumerName(), currentUsageCount()); } decrementUsageCount(); - future.completeExceptionally(e); - return; - } - if (log.isDebugEnabled()) { - log.debug("[{}] [{}] [{}] Subscribe failed -- count: {}", topic, subscriptionName, - consumer.consumerName(), currentUsageCount()); + return FutureUtil.failedFuture( + new BrokerServiceException("Connection was closed while the opening the cursor ")); + } else { + checkReplicatedSubscriptionControllerState(); + log.info("[{}][{}] Created new subscription for {}", topic, subscriptionName, consumerId); + return CompletableFuture.completedFuture(consumer); } + }); + }); - decrementUsageCount(); - future.completeExceptionally( - new BrokerServiceException("Connection was closed while the opening the cursor ")); - } else { - checkReplicatedSubscriptionControllerState(); - log.info("[{}][{}] Created new subscription for {}", topic, subscriptionName, consumerId); - future.complete(consumer); - } - }).exceptionally(e -> { - if (e.getCause() instanceof ConsumerBusyException) { + future.exceptionally(ex -> { + decrementUsageCount(); + + if (ex.getCause() instanceof ConsumerBusyException) { log.warn("[{}][{}] Consumer {} {} already connected", topic, subscriptionName, consumerId, consumerName); - } else if (e.getCause() instanceof SubscriptionBusyException) { - log.warn("[{}][{}] {}", topic, subscriptionName, e.getMessage()); + } else if (ex.getCause() instanceof SubscriptionBusyException) { + log.warn("[{}][{}] {}", topic, subscriptionName, ex.getMessage()); + } else { + log.error("[{}] Failed to create subscription: {} error: {}", topic, subscriptionName, ex); } - - decrementUsageCount(); - future.completeExceptionally(e); return null; }); - }).exceptionally(ex -> { - log.error("[{}] Failed to create subscription: {} error: {}", topic, subscriptionName, ex); - decrementUsageCount(); - if (ex.getCause() instanceof NotAllowedException) { - future.completeExceptionally(ex.getCause()); - } else { - future.completeExceptionally(new PersistenceException(ex)); - } - return null; + return future; }); - - return future; } public void updateUnackedMessagesAppliedOnSubscription(Policies policies) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java index a62ea936ee412..c1c98332f9e1f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java @@ -192,7 +192,7 @@ public CompletableFuture getBundlesAsync(NamespaceName nsname) return bundlesCache.get(nsname); } - public NamespaceBundles getBundles(NamespaceName nsname) throws Exception { + public NamespaceBundles getBundles(NamespaceName nsname) { return bundlesCache.synchronous().get(nsname); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java index 1f840b1177e1e..afae4764a613b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java @@ -805,7 +805,7 @@ public boolean matches(NamespaceBundle bundle) { return bundle.getNamespaceObject().equals(testNs); } })); - doReturn(Optional.of(new NamespaceEphemeralData())).when(nsSvc) + doReturn(Optional.of(mock(NamespaceEphemeralData.class))).when(nsSvc) .getOwner(Mockito.argThat(new ArgumentMatcher() { @Override public boolean matches(NamespaceBundle bundle) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java index b9a89938c40f7..6c8585380b7e5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java @@ -21,11 +21,9 @@ import static com.google.common.base.Preconditions.checkNotNull; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.Mockito.doNothing; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; @@ -33,37 +31,30 @@ import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; - import com.google.common.hash.Hashing; - import java.util.EnumSet; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - +import lombok.Cleanup; import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; -import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceBundleFactory; import org.apache.pulsar.common.naming.NamespaceName; -import org.apache.pulsar.common.policies.data.LocalPolicies; -import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.metadata.api.MetadataStoreConfig; +import org.apache.pulsar.metadata.api.MetadataStoreException; +import org.apache.pulsar.metadata.api.coordination.CoordinationService; import org.apache.pulsar.metadata.api.extended.CreateOption; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; -import org.apache.pulsar.zookeeper.LocalZooKeeperCache; -import org.apache.pulsar.zookeeper.ZooKeeperCache; -import org.apache.pulsar.zookeeper.ZooKeeperDataCache; +import org.apache.pulsar.metadata.coordination.impl.CoordinationServiceImpl; import org.apache.pulsar.zookeeper.ZookeeperServerTest; -import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.MockZooKeeper; import org.apache.zookeeper.ZooKeeper; -import org.powermock.reflect.Whitebox; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.AfterMethod; @@ -77,8 +68,6 @@ public class OwnershipCacheTest { private PulsarService pulsar; private ServiceConfiguration config; private String selfBrokerUrl; - private ZooKeeperCache zkCache; - private LocalZooKeeperCacheService localCache; private NamespaceBundleFactory bundleFactory; private NamespaceService nsService; private BrokerService brokerService; @@ -86,6 +75,7 @@ public class OwnershipCacheTest { private ZooKeeper zkc; private MetadataStoreExtended store; private MetadataStoreExtended otherStore; + private CoordinationService coordinationService; private MockZooKeeper mockZkc; private ZookeeperServerTest zookeeperServer; @@ -101,28 +91,24 @@ public void setup() throws Exception { store = MetadataStoreExtended.create(zookeeperServer.getHostPort(), MetadataStoreConfig.builder().sessionTimeoutMillis(5000).build()); + coordinationService = new CoordinationServiceImpl(store); otherStore = MetadataStoreExtended.create(zookeeperServer.getHostPort(), MetadataStoreConfig.builder().sessionTimeoutMillis(5000).build()); mockZkc = MockZooKeeper.newInstance(); zkc = new ZooKeeper(zookeeperServer.getHostPort(), 5000, null); - zkCache = new LocalZooKeeperCache(zkc, 30, executor); - localCache = spy(new LocalZooKeeperCacheService(zkCache, null)); - ZooKeeperDataCache poilciesCache = mock(ZooKeeperDataCache.class); - when(pulsar.getLocalZkCacheService()).thenReturn(localCache); - when(localCache.policiesCache()).thenReturn(poilciesCache); when(pulsar.getConfigurationMetadataStore()).thenReturn(store); - doNothing().when(poilciesCache).registerListener(any()); when(pulsar.getLocalMetadataStore()).thenReturn(store); + when(pulsar.getConfigurationMetadataStore()).thenReturn(store); + when(pulsar.getCoordinationService()).thenReturn(coordinationService); bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32()); nsService = mock(NamespaceService.class); brokerService = mock(BrokerService.class); - doReturn(CompletableFuture.completedFuture(1)).when(brokerService).unloadServiceUnit(any(), anyBoolean(), anyInt(), any()); + doReturn(CompletableFuture.completedFuture(1)).when(brokerService) + .unloadServiceUnit(any(), anyBoolean(), anyLong(), any()); - doReturn(zkCache).when(pulsar).getLocalZkCache(); - doReturn(localCache).when(pulsar).getLocalZkCacheService(); doReturn(config).when(pulsar).getConfiguration(); doReturn(nsService).when(pulsar).getNamespaceService(); doReturn(Optional.of(port)).when(config).getBrokerServicePort(); @@ -134,7 +120,6 @@ public void setup() throws Exception { @AfterMethod(alwaysRun = true) public void teardown() throws Exception { executor.shutdownNow(); - zkCache.stop(); zkc.close(); store.close(); otherStore.close(); @@ -180,25 +165,33 @@ public void testGetOrSetOwner() throws Exception { // this would disable the ownership doReturn(cache).when(nsService).getOwnershipCache(); nsObj.handleUnloadRequest(pulsar, 5, TimeUnit.SECONDS).join(); - Thread.sleep(1000); // case 3: some other broker owned the namespace, getOrSetOwner() should return other broker's URL // The only chance that we lost an already existing ephemeral node is when the broker dies or unload has // succeeded in both cases, the ownerInfoCache will be updated (i.e. invalidated the entry) - localCache.ownerInfoCache().invalidate(ServiceUnitUtils.path(testFullBundle)); - - pulsar.getLocalMetadataStore().put(ServiceUnitUtils.path(testFullBundle), + @Cleanup + MetadataStoreExtended otherStore = MetadataStoreExtended.create(zookeeperServer.getHostPort(), + MetadataStoreConfig.builder().sessionTimeoutMillis(5000).build()); + otherStore.put(ServiceUnitUtils.path(testFullBundle), ObjectMapperFactory.getThreadLocal().writeValueAsBytes( new NamespaceEphemeralData("pulsar://otherhost:8881", "pulsar://otherhost:8884", "http://localhost:8080", "https://localhost:4443", false)), - Optional.of(-1L), EnumSet.of(CreateOption.Ephemeral)).join(); - data1 = cache.tryAcquiringOwnership(testFullBundle).get(); + Optional.of(-1L), EnumSet.of(CreateOption.Ephemeral)) + .join(); + + try { + cache.tryAcquiringOwnership(testFullBundle).get(); + fail("Should fail to acquire"); + } catch (ExecutionException e) { + assertEquals(e.getCause().getClass(), MetadataStoreException.LockBusyException.class); + } + + data1 = cache.getOwnerAsync(testFullBundle).join().get(); assertEquals(data1.getNativeUrl(), "pulsar://otherhost:8881"); assertEquals(data1.getNativeUrlTls(), "pulsar://otherhost:8884"); assertFalse(data1.isDisabled()); - } @Test @@ -209,7 +202,10 @@ public void testGetOwner() throws Exception { assertFalse(cache.getOwnerAsync(testBundle).get().isPresent()); // case 2: someone owns the namespace - pulsar.getLocalMetadataStore().put(ServiceUnitUtils.path(testBundle), + @Cleanup + MetadataStoreExtended otherStore = MetadataStoreExtended.create(zookeeperServer.getHostPort(), + MetadataStoreConfig.builder().sessionTimeoutMillis(5000).build()); + otherStore.put(ServiceUnitUtils.path(testBundle), ObjectMapperFactory.getThreadLocal().writeValueAsBytes( new NamespaceEphemeralData("pulsar://otherhost:8881", "pulsar://otherhost:8884", @@ -218,7 +214,14 @@ public void testGetOwner() throws Exception { Optional.of(-1L), EnumSet.of(CreateOption.Ephemeral)).join(); // try to acquire, which will load the read-only cache - NamespaceEphemeralData data1 = cache.tryAcquiringOwnership(testBundle).get(); + try { + cache.tryAcquiringOwnership(testBundle).get(); + fail("Should fail to acquire"); + } catch (ExecutionException e) { + assertEquals(e.getCause().getClass(), MetadataStoreException.LockBusyException.class); + } + + NamespaceEphemeralData data1 = cache.getOwnerAsync(testBundle).join().get(); assertEquals(data1.getNativeUrl(), "pulsar://otherhost:8881"); assertEquals(data1.getNativeUrlTls(), "pulsar://otherhost:8884"); @@ -227,18 +230,9 @@ public void testGetOwner() throws Exception { NamespaceEphemeralData readOnlyData = cache.getOwnerAsync(testBundle).get().get(); assertEquals(data1, readOnlyData); - AtomicReference zkSession = Whitebox.getInternalState(zkCache, "zkSession"); - ZooKeeper zooKeeper = zkSession.get(); - zkSession.set(mockZkc); - mockZkc.failConditional(KeeperException.Code.NONODE, (op, path) -> { - return op == MockZooKeeper.Op.GET - && path.equals("/namespace/pulsar/test/ns-none/0x00000000_0xffffffff"); - }); - Optional res = cache .getOwnerAsync(bundleFactory.getFullBundle(NamespaceName.get("pulsar/test/ns-none"))).get(); assertFalse(res.isPresent()); - zkSession.set(zooKeeper); } @Test @@ -270,10 +264,16 @@ public void testGetOwnedServiceUnit() throws Exception { // OK for not owned namespace } - Thread.sleep(500); - // try to acquire, which will load the read-only cache - NamespaceEphemeralData data1 = cache.tryAcquiringOwnership(testBundle).get(); + try { + cache.tryAcquiringOwnership(testBundle).get(); + fail("Should fail to acquire"); + } catch (ExecutionException e) { + assertEquals(e.getCause().getClass(), MetadataStoreException.LockBusyException.class); + } + + NamespaceEphemeralData data1 = cache.getOwnerAsync(testBundle).join().get(); + assertEquals(data1.getNativeUrl(), "pulsar://otherhost:8881"); assertEquals(data1.getNativeUrlTls(), "pulsar://otherhost:8884"); assertFalse(data1.isDisabled()); @@ -286,8 +286,7 @@ public void testGetOwnedServiceUnit() throws Exception { // case 3: this broker owns the namespace // delete the ephemeral node by others otherStore.delete(ServiceUnitUtils.path(testBundle), Optional.empty()).join(); - // force to read directly from ZK - localCache.ownerInfoCache().invalidate(ServiceUnitUtils.path(testBundle)); + data1 = cache.tryAcquiringOwnership(testBundle).get(); assertEquals(data1.getNativeUrl(), selfBrokerUrl); assertFalse(data1.isDisabled()); @@ -317,7 +316,14 @@ public void testGetOwnedServiceUnits() throws Exception { Thread.sleep(500); // try to acquire, which will load the read-only cache - NamespaceEphemeralData data1 = cache.tryAcquiringOwnership(testBundle).get(); + try { + cache.tryAcquiringOwnership(testBundle).get(); + fail("Should fail to acquire"); + } catch (ExecutionException e) { + assertEquals(e.getCause().getClass(), MetadataStoreException.LockBusyException.class); + } + + NamespaceEphemeralData data1 = cache.getOwnerAsync(testBundle).join().get(); assertEquals(data1.getNativeUrl(), "pulsar://otherhost:8881"); assertEquals(data1.getNativeUrlTls(), "pulsar://otherhost:8884"); assertFalse(data1.isDisabled()); @@ -325,8 +331,6 @@ public void testGetOwnedServiceUnits() throws Exception { // case 3: this broker owns the namespace // delete the ephemeral node by others otherStore.delete(ServiceUnitUtils.path(testBundle), Optional.empty()).join(); - // force to read directly from ZK - localCache.ownerInfoCache().invalidate(ServiceUnitUtils.path(testBundle)); data1 = cache.tryAcquiringOwnership(testBundle).get(); assertEquals(data1.getNativeUrl(), selfBrokerUrl); assertFalse(data1.isDisabled()); @@ -375,10 +379,6 @@ public void testReestablishOwnership() throws Exception { assertNotNull(cache.getOwnedBundle(testFullBundle)); // invalidate cache, reestablish ownership through query ownership - cache.invalidateLocalOwnerCache(); - localCache.ownerInfoCache().invalidate(testFullBundlePath); - assertNull(cache.getOwnedBundle(testFullBundle)); - assertNull(localCache.ownerInfoCache().getDataIfPresent(testFullBundlePath)); NamespaceEphemeralData data2 = cache.getOwnerAsync(testFullBundle).get().get(); assertEquals(data2.getNativeUrl(), selfBrokerUrl); assertFalse(data2.isDisabled()); @@ -386,20 +386,13 @@ public void testReestablishOwnership() throws Exception { // invalidate cache, reestablish ownership through acquire ownership cache.invalidateLocalOwnerCache(); - localCache.ownerInfoCache().invalidate(testFullBundlePath); assertNull(cache.getOwnedBundle(testFullBundle)); - assertNull(localCache.ownerInfoCache().getDataIfPresent(testFullBundlePath)); NamespaceEphemeralData data3 = cache.tryAcquiringOwnership(testFullBundle).get(); assertEquals(data3.getNativeUrl(), selfBrokerUrl); assertFalse(data3.isDisabled()); assertNotNull(cache.getOwnedBundle(testFullBundle)); - // invalidate cache, reestablish ownership through check ownership - cache.invalidateLocalOwnerCache(); - localCache.ownerInfoCache().invalidate(testFullBundlePath); - assertNull(cache.getOwnedBundle(testFullBundle)); - assertNull(localCache.ownerInfoCache().getDataIfPresent(testFullBundlePath)); - assertTrue(cache.checkOwnership(testFullBundle).join()); + assertTrue(cache.checkOwnership(testFullBundle)); assertEquals(data2.getNativeUrl(), selfBrokerUrl); assertFalse(data2.isDisabled()); assertNotNull(cache.getOwnedBundle(testFullBundle)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationTest.java index a4099dfee070c..9078159237b3b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.resourcegroup; import com.google.common.collect.Sets; +import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount; import org.apache.pulsar.broker.resourcegroup.ResourceGroup.ResourceGroupMonitoringClass; import org.apache.pulsar.broker.service.BrokerService; @@ -45,6 +46,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; +@Slf4j public class ResourceGroupUsageAggregationTest extends ProducerConsumerBase { @BeforeClass @Override @@ -109,16 +111,9 @@ private void testProduceConsumeUsageOnRG(String topicString) throws Exception { Producer producer = null; Consumer consumer = null; - this.pulsar.getBrokerService().getOrCreateTopic(topicString); - - try { - producer = pulsarClient.newProducer() - .topic(topicString) - .create(); - } catch (PulsarClientException p) { - final String errMesg = String.format("Got exception while building producer: ex={}", p.getMessage()); - Assert.assertTrue(false, errMesg); - } + producer = pulsarClient.newProducer() + .topic(topicString) + .create(); try { consumer = pulsarClient.newConsumer() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java index 6c5aba91b917d..60e3b2c6621dc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java @@ -145,8 +145,6 @@ public void testConsumedLedgersTrimNoSubscriptions() throws Exception { // the lastMessageId is still on the previous ledger restartBroker(); // force load topic - Awaitility.await().ignoreExceptions().untilAsserted(() - -> assertNotNull(pulsar.getBrokerService().getTopicIfExists(topicName).get(3, TimeUnit.SECONDS).get())); pulsar.getAdminClient().topics().getStats(topicName); MessageId messageIdAfterRestart = pulsar.getAdminClient().topics().getLastMessageId(topicName); LOG.info("lastmessageid " + messageIdAfterRestart); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index cb0122941e705..5fe18f761ccb8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -26,6 +26,7 @@ import org.awaitility.Awaitility; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.matches; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; @@ -114,6 +115,7 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.Policies; +import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.zookeeper.ZooKeeperCache; import org.apache.pulsar.zookeeper.ZooKeeperDataCache; import org.apache.zookeeper.ZooKeeper; @@ -1731,9 +1733,8 @@ public void testTopicIsNotReady() throws Exception { assertEquals(((CommandSuccess) response1).getRequestId(), 1); // Force the checkTopicNsOwnership method to throw ServiceUnitNotReadyException - CompletableFuture future = new CompletableFuture<>(); - future.completeExceptionally(new ServiceUnitNotReadyException("Service unit is not ready")); - doReturn(future).when(brokerService).checkTopicNsOwnershipAsync(anyString()); + doReturn(FutureUtil.failedFuture(new ServiceUnitNotReadyException("Service unit is not ready"))) + .when(brokerService).checkTopicNsOwnership(anyString()); // 2nd subscribe command when the service unit is not ready ByteBuf clientCommand2 = Commands.newSubscribe(successTopicName, successSubName, 2 /* consumer id */, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java index d55e1e7efe321..9c3b58594f5b5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java @@ -254,11 +254,12 @@ public void testReestablishOwnershipAfterInvalidateCache() throws Exception { Assert.assertEquals(pulsarAdmins[4].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl()); OwnershipCache ownershipCache1 = pulsar1.getNamespaceService().getOwnershipCache(); - AsyncLoadingCache ownedBundlesCache1 = Whitebox.getInternalState(ownershipCache1, "ownedBundlesCache"); + AsyncLoadingCache ownedBundlesCache1 = Whitebox.getInternalState(ownershipCache1, "ownedBundlesCache"); leaderAuthorizedBroker.setValue(null); - ownedBundlesCache1.synchronous().invalidate(ServiceUnitUtils.path(namespaceBundle)); + Assert.assertNotNull(ownershipCache1.getOwnedBundle(namespaceBundle)); + ownedBundlesCache1.synchronous().invalidate(namespaceBundle); Assert.assertNull(ownershipCache1.getOwnedBundle(namespaceBundle)); // pulsar1 is still owner in zk. @@ -266,27 +267,6 @@ public void testReestablishOwnershipAfterInvalidateCache() throws Exception { Assert.assertEquals(pulsarAdmins[2].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl()); Assert.assertEquals(pulsarAdmins[3].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl()); Assert.assertEquals(pulsarAdmins[4].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl()); - - // Reestablish ownership through lookup ownership. - Assert.assertNull(ownershipCache1.getOwnedBundle(namespaceBundle)); - Assert.assertEquals(pulsarAdmins[1].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl()); - Assert.assertNotNull(ownershipCache1.getOwnedBundle(namespaceBundle)); - - // Reestablish ownership through check ownership. - ownedBundlesCache1.synchronous().invalidate(ServiceUnitUtils.path(namespaceBundle)); - ownershipCache1.checkOwnership(namespaceBundle).join(); - Assert.assertNotNull(ownershipCache1.getOwnedBundle(namespaceBundle)); - - // Reestablish ownership through load topic. - ownedBundlesCache1.synchronous().invalidate(ServiceUnitUtils.path(namespaceBundle)); - pulsar1.getBrokerService().getTopic(topic1, true).join(); - Assert.assertNotNull(ownershipCache1.getOwnedBundle(namespaceBundle)); - pulsar1.getBrokerService().deleteTopic(topic1, true).join(); - - // Reestablish ownership through web. - ownedBundlesCache1.synchronous().invalidate(ServiceUnitUtils.path(namespaceBundle)); - pulsarAdmins[0].topics().createNonPartitionedTopic(topic1); - Assert.assertNotNull(ownershipCache1.getOwnedBundle(namespaceBundle)); } @Test