Skip to content

Commit

Permalink
PIP-45: Migrate OwnershipCache to use MetadataStore (apache#11012)
Browse files Browse the repository at this point in the history
* PIP-45: Migrate OwnershipCache to use MetadataStore

* Import checkstyle

* Fixed typo

* Addressed comment

* Fixed test

* Test fixes

* One more test fix

* Fixed possible deadlock in the initialization of MLTransactionLog

* Fixed tests
  • Loading branch information
merlimat authored and ciaocloud committed Oct 16, 2021
1 parent a043fcb commit a0c5e8b
Show file tree
Hide file tree
Showing 16 changed files with 361 additions and 613 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.util.ObjectMapperFactory;
Expand All @@ -53,7 +52,6 @@ public class LocalZooKeeperCacheService {

private final ZooKeeperCache cache;

private ZooKeeperDataCache<NamespaceEphemeralData> ownerInfoCache;
private ZooKeeperManagedLedgerCache managedLedgerListCache;
private ResourceQuotaCache resourceQuotaCache;
private ZooKeeperDataCache<LocalPolicies> policiesCache;
Expand All @@ -68,13 +66,6 @@ public LocalZooKeeperCacheService(ZooKeeperCache cache, ConfigurationCacheServic

initZK();

this.ownerInfoCache = new ZooKeeperDataCache<NamespaceEphemeralData>(cache) {
@Override
public NamespaceEphemeralData deserialize(String path, byte[] content) throws Exception {
return ObjectMapperFactory.getThreadLocal().readValue(content, NamespaceEphemeralData.class);
}
};

this.policiesCache = new ZooKeeperDataCache<LocalPolicies>(cache) {
@Override
public LocalPolicies deserialize(String path, byte[] content) throws Exception {
Expand Down Expand Up @@ -239,10 +230,6 @@ public ResourceQuotaCache getResourceQuotaCache() {
return this.resourceQuotaCache;
}

public ZooKeeperDataCache<NamespaceEphemeralData> ownerInfoCache() {
return this.ownerInfoCache;
}

public ZooKeeperDataCache<LocalPolicies> policiesCache() {
return this.policiesCache;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,20 @@
*/
package org.apache.pulsar.broker.namespace;

import com.google.common.base.MoreObjects;
import com.google.common.collect.Maps;
import java.util.Collections;
import java.util.Map;
import javax.validation.constraints.NotNull;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;

@Getter
@NoArgsConstructor
@EqualsAndHashCode
@ToString
public class NamespaceEphemeralData {
private String nativeUrl;
private String nativeUrlTls;
Expand All @@ -33,9 +40,6 @@ public class NamespaceEphemeralData {
private boolean disabled;
private Map<String, AdvertisedListener> advertisedListeners;

public NamespaceEphemeralData() {
}

public NamespaceEphemeralData(String brokerUrl, String brokerUrlTls, String httpUrl, String httpUrlTls,
boolean disabled) {
this(brokerUrl, brokerUrlTls, httpUrl, httpUrlTls, disabled, null);
Expand All @@ -55,41 +59,11 @@ public NamespaceEphemeralData(String brokerUrl, String brokerUrlTls, String http
}
}

public String getNativeUrl() {
return nativeUrl;
}

public String getNativeUrlTls() {
return nativeUrlTls;
}

public String getHttpUrl() {
return httpUrl;
}

public String getHttpUrlTls() {
return httpUrlTls;
}

public boolean isDisabled() {
return disabled;
}

public void setDisabled(boolean flag) {
this.disabled = flag;
}

@NotNull
public Map<String, AdvertisedListener> getAdvertisedListeners() {
if (this.advertisedListeners == null) {
return Collections.EMPTY_MAP;
}
return Collections.unmodifiableMap(this.advertisedListeners);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("nativeUrl", nativeUrl).add("httpUrl", httpUrl)
.add("disabled", disabled).add("advertisedListeners", getAdvertisedListeners()).toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ public Optional<NamespaceBundle> getBundleIfPresent(TopicName topicName) {
return bundles.map(b -> b.findBundle(topicName));
}

public NamespaceBundle getBundle(TopicName topicName) throws Exception {
public NamespaceBundle getBundle(TopicName topicName) {
return bundleFactory.getBundles(topicName.getNamespaceObject()).findBundle(topicName);
}

Expand Down Expand Up @@ -999,7 +999,7 @@ private boolean isTopicOwned(TopicName topicName) {

public CompletableFuture<Boolean> checkTopicOwnership(TopicName topicName) {
return getBundleAsync(topicName)
.thenCompose(bundle -> ownershipCache.checkOwnership(bundle));
.thenApply(ownershipCache::checkOwnership);
}

public void removeOwnedServiceUnit(NamespaceBundle nsBundle) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,16 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@EqualsAndHashCode
@ToString
public class OwnedBundle {
private static final Logger LOG = LoggerFactory.getLogger(OwnedBundle.class);

Expand All @@ -38,6 +42,8 @@ public class OwnedBundle {
* {@link #nsLock} is used to protect read/write access to {@link #active} flag and the corresponding code section
* based on {@link #active} flag.
*/
@ToString.Exclude
@EqualsAndHashCode.Exclude
private final ReentrantReadWriteLock nsLock = new ReentrantReadWriteLock();
private static final int FALSE = 0;
private static final int TRUE = 1;
Expand Down

0 comments on commit a0c5e8b

Please sign in to comment.