Skip to content

Commit

Permalink
PIP-45: Migrate OwnershipCache to use MetadataStore (#11012)
Browse files Browse the repository at this point in the history
* PIP-45: Migrate OwnershipCache to use MetadataStore

* Import checkstyle

* Fixed typo

* Addressed comment

* Fixed test

* Test fixes

* One more test fix

* Fixed possible deadlock in the initialization of MLTransactionLog

* Fixed tests
  • Loading branch information
merlimat committed Jul 2, 2021
1 parent 555042a commit 794aa20
Show file tree
Hide file tree
Showing 16 changed files with 361 additions and 613 deletions.
Expand Up @@ -27,7 +27,6 @@
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.util.ObjectMapperFactory;
Expand All @@ -53,7 +52,6 @@ public class LocalZooKeeperCacheService {

private final ZooKeeperCache cache;

private ZooKeeperDataCache<NamespaceEphemeralData> ownerInfoCache;
private ZooKeeperManagedLedgerCache managedLedgerListCache;
private ResourceQuotaCache resourceQuotaCache;
private ZooKeeperDataCache<LocalPolicies> policiesCache;
Expand All @@ -68,13 +66,6 @@ public LocalZooKeeperCacheService(ZooKeeperCache cache, ConfigurationCacheServic

initZK();

this.ownerInfoCache = new ZooKeeperDataCache<NamespaceEphemeralData>(cache) {
@Override
public NamespaceEphemeralData deserialize(String path, byte[] content) throws Exception {
return ObjectMapperFactory.getThreadLocal().readValue(content, NamespaceEphemeralData.class);
}
};

this.policiesCache = new ZooKeeperDataCache<LocalPolicies>(cache) {
@Override
public LocalPolicies deserialize(String path, byte[] content) throws Exception {
Expand Down Expand Up @@ -239,10 +230,6 @@ public ResourceQuotaCache getResourceQuotaCache() {
return this.resourceQuotaCache;
}

public ZooKeeperDataCache<NamespaceEphemeralData> ownerInfoCache() {
return this.ownerInfoCache;
}

public ZooKeeperDataCache<LocalPolicies> policiesCache() {
return this.policiesCache;
}
Expand Down
Expand Up @@ -18,13 +18,20 @@
*/
package org.apache.pulsar.broker.namespace;

import com.google.common.base.MoreObjects;
import com.google.common.collect.Maps;
import java.util.Collections;
import java.util.Map;
import javax.validation.constraints.NotNull;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;

@Getter
@NoArgsConstructor
@EqualsAndHashCode
@ToString
public class NamespaceEphemeralData {
private String nativeUrl;
private String nativeUrlTls;
Expand All @@ -33,9 +40,6 @@ public class NamespaceEphemeralData {
private boolean disabled;
private Map<String, AdvertisedListener> advertisedListeners;

public NamespaceEphemeralData() {
}

public NamespaceEphemeralData(String brokerUrl, String brokerUrlTls, String httpUrl, String httpUrlTls,
boolean disabled) {
this(brokerUrl, brokerUrlTls, httpUrl, httpUrlTls, disabled, null);
Expand All @@ -55,41 +59,11 @@ public NamespaceEphemeralData(String brokerUrl, String brokerUrlTls, String http
}
}

public String getNativeUrl() {
return nativeUrl;
}

public String getNativeUrlTls() {
return nativeUrlTls;
}

public String getHttpUrl() {
return httpUrl;
}

public String getHttpUrlTls() {
return httpUrlTls;
}

public boolean isDisabled() {
return disabled;
}

public void setDisabled(boolean flag) {
this.disabled = flag;
}

@NotNull
public Map<String, AdvertisedListener> getAdvertisedListeners() {
if (this.advertisedListeners == null) {
return Collections.EMPTY_MAP;
}
return Collections.unmodifiableMap(this.advertisedListeners);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("nativeUrl", nativeUrl).add("httpUrl", httpUrl)
.add("disabled", disabled).add("advertisedListeners", getAdvertisedListeners()).toString();
}
}
Expand Up @@ -220,7 +220,7 @@ public Optional<NamespaceBundle> getBundleIfPresent(TopicName topicName) {
return bundles.map(b -> b.findBundle(topicName));
}

public NamespaceBundle getBundle(TopicName topicName) throws Exception {
public NamespaceBundle getBundle(TopicName topicName) {
return bundleFactory.getBundles(topicName.getNamespaceObject()).findBundle(topicName);
}

Expand Down Expand Up @@ -999,7 +999,7 @@ private boolean isTopicOwned(TopicName topicName) {

public CompletableFuture<Boolean> checkTopicOwnership(TopicName topicName) {
return getBundleAsync(topicName)
.thenCompose(bundle -> ownershipCache.checkOwnership(bundle));
.thenApply(ownershipCache::checkOwnership);
}

public void removeOwnedServiceUnit(NamespaceBundle nsBundle) throws Exception {
Expand Down
Expand Up @@ -23,12 +23,16 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@EqualsAndHashCode
@ToString
public class OwnedBundle {
private static final Logger LOG = LoggerFactory.getLogger(OwnedBundle.class);

Expand All @@ -38,6 +42,8 @@ public class OwnedBundle {
* {@link #nsLock} is used to protect read/write access to {@link #active} flag and the corresponding code section
* based on {@link #active} flag.
*/
@ToString.Exclude
@EqualsAndHashCode.Exclude
private final ReentrantReadWriteLock nsLock = new ReentrantReadWriteLock();
private static final int FALSE = 0;
private static final int TRUE = 1;
Expand Down

0 comments on commit 794aa20

Please sign in to comment.