Skip to content

Commit

Permalink
Merge branch 'master' into fix-key-shared-order2
Browse files Browse the repository at this point in the history
  • Loading branch information
codelipenghui committed Jul 13, 2021
2 parents ae48c4a + feb4ff1 commit 62db869
Show file tree
Hide file tree
Showing 129 changed files with 2,104 additions and 590 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci-cpp-build-windows.yaml
Expand Up @@ -101,7 +101,7 @@ jobs:
run: |
if [ "$RUNNER_OS" == "Windows" ]; then
cd pulsar-client-cpp && \
cmake --build ./build
cmake --build ./build --config Release
fi
- name: Configure (dynamic library only)
Expand All @@ -126,5 +126,5 @@ jobs:
run: |
if [ "$RUNNER_OS" == "Windows" ]; then
cd pulsar-client-cpp && \
cmake --build ./build-1
cmake --build ./build-1 --config Release
fi
1 change: 1 addition & 0 deletions README.md
Expand Up @@ -88,6 +88,7 @@ components in the Pulsar ecosystem, including connectors, adapters, and other la
Requirements:
* Java [JDK 11](https://adoptopenjdk.net/?variant=openjdk11) or [JDK 8](https://adoptopenjdk.net/?variant=openjdk8)
* Maven 3.6.1+
* zip

Compile and install:

Expand Down
Expand Up @@ -2289,19 +2289,14 @@ private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
}

private boolean hasLedgerRetentionExpired(long ledgerTimestamp) {
if (config.getRetentionTimeMillis() < 0) {
// Negative retention time equates to infinite retention
return false;
}

long elapsedMs = clock.millis() - ledgerTimestamp;
return elapsedMs > config.getRetentionTimeMillis();
return config.getRetentionTimeMillis() >= 0
&& clock.millis() - ledgerTimestamp > config.getRetentionTimeMillis();
}

private boolean isLedgerRetentionOverSizeQuota() {
private boolean isLedgerRetentionOverSizeQuota(long sizeToDelete) {
// Handle the -1 size limit as "infinite" size quota
return config.getRetentionSizeInMB() >= 0
&& TOTAL_SIZE_UPDATER.get(this) > config.getRetentionSizeInMB() * 1024 * 1024;
&& TOTAL_SIZE_UPDATER.get(this) - sizeToDelete >= config.getRetentionSizeInMB() * MegaByte;
}

private boolean isOffloadedNeedsDelete(OffloadContext offload) {
Expand Down Expand Up @@ -2369,33 +2364,56 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) {
if (log.isDebugEnabled()) {
log.debug("[{}] Slowest consumer ledger id: {}", name, slowestReaderLedgerId);
}

long totalSizeToDelete = 0;
boolean retentionSizeQuotaMet = false;
// skip ledger if retention constraint met
for (LedgerInfo ls : ledgers.headMap(slowestReaderLedgerId, false).values()) {
boolean expired = hasLedgerRetentionExpired(ls.getTimestamp());
boolean overRetentionQuota = isLedgerRetentionOverSizeQuota();
// currentLedger can not be deleted
if (ls.getLedgerId() == currentLedger.getId()) {
if (log.isDebugEnabled()) {
log.debug("[{}] Ledger {} skipped for deletion as it is currently being written to", name,
ls.getLedgerId());
}
break;
}
// if truncate, all ledgers besides currentLedger are going to be deleted
if (isTruncate){
if (log.isDebugEnabled()) {
log.debug("[{}] Ledger {} will be truncated with ts {}",
name, ls.getLedgerId(), ls.getTimestamp());
}
ledgersToDelete.add(ls);
continue;
}

totalSizeToDelete += ls.getSize();
boolean overRetentionQuota = isLedgerRetentionOverSizeQuota(totalSizeToDelete);
boolean expired = hasLedgerRetentionExpired(ls.getTimestamp());
if (log.isDebugEnabled()) {
log.debug(
"[{}] Checking ledger {} -- time-old: {} sec -- "
+ "expired: {} -- over-quota: {} -- current-ledger: {}",
name, ls.getLedgerId(), (clock.millis() - ls.getTimestamp()) / 1000.0, expired,
overRetentionQuota, currentLedger.getId());
}
if (ls.getLedgerId() == currentLedger.getId()) {
log.debug("[{}] Ledger {} skipped for deletion as it is currently being written to", name,
ls.getLedgerId());
break;
} else if (expired || isTruncate) {
log.debug("[{}] Ledger {} has expired or be truncated, expired is {}, isTruncate is {}, ts {}", name, ls.getLedgerId(), expired, isTruncate, ls.getTimestamp());
ledgersToDelete.add(ls);
} else if (overRetentionQuota || isTruncate) {
log.debug("[{}] Ledger {} is over quota or be truncated, overRetentionQuota is {}, isTruncate is {}", name, ls.getLedgerId(), overRetentionQuota, isTruncate);

if (expired || overRetentionQuota) {
if (log.isDebugEnabled()) {
log.debug("[{}] Ledger {} has expired or over quota, expired is: {}, ts: {}, "
+ "overRetentionQuota is: {}, ledge size: {}",
name, ls.getLedgerId(), expired, ls.getTimestamp(), overRetentionQuota, ls.getSize());
}
ledgersToDelete.add(ls);
} else {
log.debug("[{}] Ledger {} not deleted. Neither expired nor over-quota", name, ls.getLedgerId());
// once retention constraint has been met, skip check
if (log.isDebugEnabled()) {
log.debug("[{}] Ledger {} not deleted. Neither expired nor over-quota", name, ls.getLedgerId());
}
break;
}
}

for (LedgerInfo ls : ledgers.values()) {
if (isOffloadedNeedsDelete(ls.getOffloadContext()) && !ledgersToDelete.contains(ls)) {
log.debug("[{}] Ledger {} has been offloaded, bookkeeper ledger needs to be deleted", name,
Expand Down
Expand Up @@ -44,6 +44,7 @@
import java.nio.charset.Charset;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -1970,6 +1971,49 @@ public void testInfiniteRetention() throws Exception {
assertTrue(ml.getTotalSize() > "shortmessage".getBytes().length);
}

@Test
public void testRetentionSize() throws Exception {
final int retentionSizeInMB = 5;
final int totalMessage = 10;

// message size is 1MB
final int messageSize = 1048576;
char[] data = new char[messageSize];
Arrays.fill(data, 'a');
byte [] message = new String(data).getBytes(Encoding);

@Cleanup("shutdown")
ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc);
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setRetentionSizeInMB(retentionSizeInMB);
config.setMaxEntriesPerLedger(1);
config.setRetentionTime(1, TimeUnit.HOURS);


ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open("retention_size_ledger", config);
ManagedCursor c1 = ml.openCursor("c1");
Position position = null;
for (int i = 0; i < totalMessage; i++) {
position = ml.addEntry(message);
}
// all ledgers are not delete yet since no entry has been acked for c1
assertEquals(ml.getLedgersInfoAsList().size(), totalMessage);

List<Entry> entryList = c1.readEntries(totalMessage);
if (null != position) {
c1.markDelete(position);
}
entryList.forEach(entry -> {
log.info("Read entry position {}:{}", entry.getLedgerId(), entry.getEntryId());
entry.release();
});

Awaitility.await().untilAsserted(() -> {
assertTrue(ml.getTotalSize() <= retentionSizeInMB * 1024 * 1024);
assertEquals(ml.getLedgersInfoAsList().size(), 5);
});
}

@Test
public void testTimestampOnWorkingLedger() throws Exception {
ManagedLedgerConfig conf = new ManagedLedgerConfig();
Expand Down
Expand Up @@ -27,8 +27,8 @@
import java.util.function.Function;
import lombok.Getter;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;

/**
* Base class for all configuration resources to access configurations from metadata-store.
Expand All @@ -39,18 +39,18 @@
public class BaseResources<T> {

@Getter
private final MetadataStoreExtended store;
private final MetadataStore store;
@Getter
private final MetadataCache<T> cache;
private int operationTimeoutSec;

public BaseResources(MetadataStoreExtended store, Class<T> clazz, int operationTimeoutSec) {
public BaseResources(MetadataStore store, Class<T> clazz, int operationTimeoutSec) {
this.store = store;
this.cache = store.getMetadataCache(clazz);
this.operationTimeoutSec = operationTimeoutSec;
}

public BaseResources(MetadataStoreExtended store, TypeReference<T> typeRef, int operationTimeoutSec) {
public BaseResources(MetadataStore store, TypeReference<T> typeRef, int operationTimeoutSec) {
this.store = store;
this.cache = store.getMetadataCache(typeRef);
this.operationTimeoutSec = operationTimeoutSec;
Expand Down
Expand Up @@ -22,12 +22,12 @@
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.zookeeper.ZkBookieRackAffinityMapping;

public class BookieResources extends BaseResources<BookiesRackConfiguration> {

public BookieResources(MetadataStoreExtended store, int operationTimeoutSec) {
public BookieResources(MetadataStore store, int operationTimeoutSec) {
super(store, BookiesRackConfiguration.class, operationTimeoutSec);
}

Expand Down
Expand Up @@ -21,20 +21,18 @@
import java.util.HashSet;
import java.util.Set;
import lombok.Getter;

import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.FailureDomainImpl;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;

public class ClusterResources extends BaseResources<ClusterData> {

public static final String CLUSTERS_ROOT = "/admin/clusters";
@Getter
private FailureDomainResources failureDomainResources;

public ClusterResources(MetadataStoreExtended store, int operationTimeoutSec) {
public ClusterResources(MetadataStore store, int operationTimeoutSec) {
super(store, ClusterData.class, operationTimeoutSec);
this.failureDomainResources = new FailureDomainResources(store, FailureDomainImpl.class, operationTimeoutSec);
}
Expand All @@ -46,7 +44,7 @@ public Set<String> list() throws MetadataStoreException {
public static class FailureDomainResources extends BaseResources<FailureDomainImpl> {
public static final String FAILURE_DOMAIN = "failureDomain";

public FailureDomainResources(MetadataStoreExtended store, Class<FailureDomainImpl> clazz,
public FailureDomainResources(MetadataStore store, Class<FailureDomainImpl> clazz,
int operationTimeoutSec) {
super(store, clazz, operationTimeoutSec);
}
Expand Down
Expand Up @@ -20,12 +20,11 @@

import com.fasterxml.jackson.core.type.TypeReference;
import java.util.Map;

import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.api.MetadataStore;

public class DynamicConfigurationResources extends BaseResources<Map<String, String>> {

public DynamicConfigurationResources(MetadataStoreExtended store, int operationTimeoutSec) {
public DynamicConfigurationResources(MetadataStore store, int operationTimeoutSec) {
super(store, new TypeReference<Map<String, String>>() {
}, operationTimeoutSec);
}
Expand Down
Expand Up @@ -18,12 +18,12 @@
*/
package org.apache.pulsar.broker.resources;

import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;

public class LoadManagerReportResources extends BaseResources<LoadManagerReport> {

public LoadManagerReportResources(MetadataStoreExtended configurationStore, int operationTimeoutSec) {
public LoadManagerReportResources(MetadataStore configurationStore, int operationTimeoutSec) {
super(configurationStore, LoadManagerReport.class, operationTimeoutSec);
}
}
Expand Up @@ -19,11 +19,11 @@
package org.apache.pulsar.broker.resources;

import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.api.MetadataStore;

public class LocalPoliciesResources extends BaseResources<LocalPolicies> {

public LocalPoliciesResources(MetadataStoreExtended configurationStore, int operationTimeoutSec) {
public LocalPoliciesResources(MetadataStore configurationStore, int operationTimeoutSec) {
super(configurationStore, LocalPolicies.class, operationTimeoutSec);
}
}
Expand Up @@ -27,24 +27,24 @@
import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;

@Getter
public class NamespaceResources extends BaseResources<Policies> {
private IsolationPolicyResources isolationPolicies;
private PartitionedTopicResources partitionedTopicResources;
private MetadataStoreExtended configurationStore;
private MetadataStore configurationStore;

public NamespaceResources(MetadataStoreExtended configurationStore, int operationTimeoutSec) {
public NamespaceResources(MetadataStore configurationStore, int operationTimeoutSec) {
super(configurationStore, Policies.class, operationTimeoutSec);
this.configurationStore = configurationStore;
isolationPolicies = new IsolationPolicyResources(configurationStore, operationTimeoutSec);
partitionedTopicResources = new PartitionedTopicResources(configurationStore, operationTimeoutSec);
}

public static class IsolationPolicyResources extends BaseResources<Map<String, NamespaceIsolationDataImpl>> {
public IsolationPolicyResources(MetadataStoreExtended store, int operationTimeoutSec) {
public IsolationPolicyResources(MetadataStore store, int operationTimeoutSec) {
super(store, new TypeReference<Map<String, NamespaceIsolationDataImpl>>() {
}, operationTimeoutSec);
}
Expand All @@ -56,7 +56,7 @@ public Optional<NamespaceIsolationPolicies> getPolicies(String path) throws Meta
}

public static class PartitionedTopicResources extends BaseResources<PartitionedTopicMetadata> {
public PartitionedTopicResources(MetadataStoreExtended configurationStore, int operationTimeoutSec) {
public PartitionedTopicResources(MetadataStore configurationStore, int operationTimeoutSec) {
super(configurationStore, PartitionedTopicMetadata.class, operationTimeoutSec);
}
}
Expand Down
Expand Up @@ -20,6 +20,7 @@

import java.util.Optional;

import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
Expand All @@ -39,14 +40,15 @@ public class PulsarResources {
private LocalPoliciesResources localPolicies;
private LoadManagerReportResources loadReportResources;
private BookieResources bookieResources;
private TopicResources topicResources;

private Optional<MetadataStoreExtended> localMetadataStore;
private Optional<MetadataStoreExtended> configurationMetadataStore;
private Optional<MetadataStore> localMetadataStore;
private Optional<MetadataStore> configurationMetadataStore;

public PulsarResources(MetadataStoreExtended localMetadataStore, MetadataStoreExtended configurationMetadataStore) {
public PulsarResources(MetadataStore localMetadataStore, MetadataStore configurationMetadataStore) {
this(localMetadataStore, configurationMetadataStore, DEFAULT_OPERATION_TIMEOUT_SEC);
}
public PulsarResources(MetadataStoreExtended localMetadataStore, MetadataStoreExtended configurationMetadataStore,
public PulsarResources(MetadataStore localMetadataStore, MetadataStore configurationMetadataStore,
int operationTimeoutSec) {
if (configurationMetadataStore != null) {
tenantResources = new TenantResources(configurationMetadataStore, operationTimeoutSec);
Expand All @@ -59,6 +61,7 @@ public PulsarResources(MetadataStoreExtended localMetadataStore, MetadataStoreEx
localPolicies = new LocalPoliciesResources(localMetadataStore, operationTimeoutSec);
loadReportResources = new LoadManagerReportResources(localMetadataStore, operationTimeoutSec);
bookieResources = new BookieResources(localMetadataStore, operationTimeoutSec);
topicResources = new TopicResources(localMetadataStore);
}
this.localMetadataStore = Optional.ofNullable(localMetadataStore);
this.configurationMetadataStore = Optional.ofNullable(configurationMetadataStore);
Expand Down
Expand Up @@ -19,10 +19,10 @@
package org.apache.pulsar.broker.resources;

import org.apache.pulsar.common.policies.data.ResourceGroup;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.api.MetadataStore;

public class ResourceGroupResources extends BaseResources<ResourceGroup> {
public ResourceGroupResources(MetadataStoreExtended store, int operationTimeoutSec) {
public ResourceGroupResources(MetadataStore store, int operationTimeoutSec) {
super(store, ResourceGroup.class, operationTimeoutSec);
}
}
Expand Up @@ -19,11 +19,10 @@
package org.apache.pulsar.broker.resources;

import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.api.MetadataStore;

public class TenantResources extends BaseResources<TenantInfo> {
public TenantResources(MetadataStoreExtended store, int operationTimeoutSec) {
public TenantResources(MetadataStore store, int operationTimeoutSec) {
super(store, TenantInfo.class, operationTimeoutSec);
}
}

0 comments on commit 62db869

Please sign in to comment.