Skip to content

Commit

Permalink
[Branch 2.7] Revert some PRs to fix CI for branch 2.7 (#16882)
Browse files Browse the repository at this point in the history
* Revert "[fix][proxy] Fix client service url (#16834)"

This reverts commit 10b4e99.

* Revert "[Build] Use grpc-bom to align grpc library versions (#15234)"

This reverts commit 99c93d2.

* Revert "upgrade aircompressor to 0.20 (#11790)"

This reverts commit 5ad16b6.

* Revert "[Branch-2.7] Fixed deadlock on metadata cache missing while doing checkReplication (#12484)"

This reverts commit 32fe228.

* Revert changes of PersistentTopic#getMessageTTL in #12339.

Co-authored-by: JiangHaiting <janghaiting@apache.org>
  • Loading branch information
Jason918 and JiangHaiting committed Jul 31, 2022
1 parent 25a1098 commit d966c1a
Show file tree
Hide file tree
Showing 7 changed files with 173 additions and 216 deletions.
2 changes: 1 addition & 1 deletion distribution/server/src/assemble/LICENSE.bin.txt
Expand Up @@ -425,7 +425,7 @@ The Apache Software License, Version 2.0
- org.apache.httpcomponents-httpclient-4.5.5.jar
- org.apache.httpcomponents-httpcore-4.4.9.jar
* AirCompressor
- io.airlift-aircompressor-0.20.jar
- io.airlift-aircompressor-0.16.jar
* AsyncHttpClient
- org.asynchttpclient-async-http-client-2.12.1.jar
- org.asynchttpclient-async-http-client-netty-utils-2.12.1.jar
Expand Down
32 changes: 21 additions & 11 deletions pom.xml
Expand Up @@ -157,7 +157,7 @@ flexible messaging model and an intuitive client API.</description>
<prometheus-jmx.version>0.14.0</prometheus-jmx.version>
<confluent.version>5.3.2</confluent.version>
<kafka-avro-convert-jackson.version>1.9.13</kafka-avro-convert-jackson.version>
<aircompressor.version>0.20</aircompressor.version>
<aircompressor.version>0.16</aircompressor.version>
<asynchttpclient.version>2.12.1</asynchttpclient.version>
<jcommander.version>1.78</jcommander.version>
<commons-lang3.version>3.6</commons-lang3.version>
Expand Down Expand Up @@ -598,7 +598,7 @@ flexible messaging model and an intuitive client API.</description>
<artifactId>jna</artifactId>
<version>${jna.version}</version>
</dependency>

<dependency>
<groupId>com.github.docker-java</groupId>
<artifactId>docker-java-core</artifactId>
Expand Down Expand Up @@ -787,14 +787,6 @@ flexible messaging model and an intuitive client API.</description>
<version>${typetools.version}</version>
</dependency>

<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-bom</artifactId>
<version>${grpc.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>

<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-all</artifactId>
Expand All @@ -806,7 +798,25 @@ flexible messaging model and an intuitive client API.</description>
</exclusion>
</exclusions>
</dependency>


<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-core</artifactId>
<version>${grpc.version}</version>
</dependency>

<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
</dependency>

<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf-lite</artifactId>
<version>${grpc.version}</version>
</dependency>

<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-bom</artifactId>
Expand Down
Expand Up @@ -1190,139 +1190,119 @@ public CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(TopicName t
Optional<Policies> policies = Optional.empty();
Optional<LocalPolicies> localPolicies = Optional.empty();

PersistencePolicies tmpPersistencePolicies = null;
RetentionPolicies tmpRetentionPolicies = null;
OffloadPolicies tmpTopicLevelOffloadPolicies = null;
PersistencePolicies persistencePolicies = null;
RetentionPolicies retentionPolicies = null;
OffloadPolicies topicLevelOffloadPolicies = null;

if (pulsar.getConfig().isTopicLevelPoliciesEnabled()) {
try {
TopicPolicies topicPolicies = pulsar.getTopicPoliciesService().getTopicPolicies(topicName);
if (topicPolicies != null) {
tmpPersistencePolicies = topicPolicies.getPersistence();
tmpRetentionPolicies = topicPolicies.getRetentionPolicies();
tmpTopicLevelOffloadPolicies = topicPolicies.getOffloadPolicies();
persistencePolicies = topicPolicies.getPersistence();
retentionPolicies = topicPolicies.getRetentionPolicies();
topicLevelOffloadPolicies = topicPolicies.getOffloadPolicies();
}
} catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
log.debug("Topic {} policies have not been initialized yet.", topicName);
}
}

final PersistencePolicies finalPersistencePolicies = tmpPersistencePolicies;
final RetentionPolicies finalRetentionPolicies = tmpRetentionPolicies;
final OffloadPolicies finalTopicLevelOffloadPolicies = tmpTopicLevelOffloadPolicies;


CompletableFuture<Optional<Policies>> policiesFuture = pulsar
.getConfigurationCache().policiesCache().getAsync(AdminResource.path(POLICIES,
namespace.toString()));
String path = joinPath(LOCAL_POLICIES_ROOT, topicName.getNamespaceObject().toString());
CompletableFuture<Optional<LocalPolicies>> localPoliciesFuture =
pulsar().getLocalZkCacheService().policiesCache().getAsync(path);

policiesFuture.thenCombine(localPoliciesFuture, (optPolicies, optLocalPolicies) -> {
PersistencePolicies persistencePolicies = finalPersistencePolicies;
RetentionPolicies retentionPolicies = finalRetentionPolicies;
OffloadPolicies topicLevelOffloadPolicies = finalTopicLevelOffloadPolicies;

if (persistencePolicies == null) {
persistencePolicies = policies.map(p -> p.persistence).orElseGet(
() -> new PersistencePolicies(serviceConfig.getManagedLedgerDefaultEnsembleSize(),
serviceConfig.getManagedLedgerDefaultWriteQuorum(),
serviceConfig.getManagedLedgerDefaultAckQuorum(),
serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit()));
}
try {
policies = pulsar
.getConfigurationCache().policiesCache().get(AdminResource.path(POLICIES,
namespace.toString()));
String path = joinPath(LOCAL_POLICIES_ROOT, topicName.getNamespaceObject().toString());
localPolicies = pulsar().getLocalZkCacheService().policiesCache().get(path);
} catch (Throwable t) {
// Ignoring since if we don't have policies, we fallback on the default
log.warn("Got exception when reading persistence policy for {}: {}", topicName, t.getMessage(), t);
future.completeExceptionally(t);
return;
}

if (retentionPolicies == null) {
retentionPolicies = policies.map(p -> p.retention_policies).orElseGet(
() -> new RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
serviceConfig.getDefaultRetentionSizeInMB())
);
}
if (persistencePolicies == null) {
persistencePolicies = policies.map(p -> p.persistence).orElseGet(
() -> new PersistencePolicies(serviceConfig.getManagedLedgerDefaultEnsembleSize(),
serviceConfig.getManagedLedgerDefaultWriteQuorum(),
serviceConfig.getManagedLedgerDefaultAckQuorum(),
serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit()));
}

ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble());
managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
if (localPolicies.isPresent() && localPolicies.get().bookieAffinityGroup != null) {
managedLedgerConfig
.setBookKeeperEnsemblePlacementPolicyClassName(
ZkIsolatedBookieEnsemblePlacementPolicy.class);
Map<String, Object> properties = Maps.newHashMap();
properties.put(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
localPolicies.get().bookieAffinityGroup.bookkeeperAffinityGroupPrimary);
properties.put(ZkIsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
localPolicies.get().bookieAffinityGroup.bookkeeperAffinityGroupSecondary);
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
}
managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate());
managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType());
managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword());

managedLedgerConfig.setMaxUnackedRangesToPersist(
serviceConfig.getManagedLedgerMaxUnackedRangesToPersist());
managedLedgerConfig.setMaxUnackedRangesToPersistInZk(
serviceConfig.getManagedLedgerMaxUnackedRangesToPersistInZooKeeper());
managedLedgerConfig.setMaxEntriesPerLedger(serviceConfig.getManagedLedgerMaxEntriesPerLedger());
managedLedgerConfig.setMinimumRolloverTime(serviceConfig.getManagedLedgerMinLedgerRolloverTimeMinutes(),
TimeUnit.MINUTES);
managedLedgerConfig.setMaximumRolloverTime(serviceConfig.getManagedLedgerMaxLedgerRolloverTimeMinutes(),
TimeUnit.MINUTES);
managedLedgerConfig.setMaxSizePerLedgerMb(serviceConfig.getManagedLedgerMaxSizePerLedgerMbytes());

managedLedgerConfig.setMetadataOperationsTimeoutSeconds(
serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds());
managedLedgerConfig.setReadEntryTimeoutSeconds(serviceConfig.getManagedLedgerReadEntryTimeoutSeconds());
managedLedgerConfig.setAddEntryTimeoutSeconds(serviceConfig.getManagedLedgerAddEntryTimeoutSeconds());
managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize());
managedLedgerConfig.setUnackedRangesOpenCacheSetEnabled(
serviceConfig.isManagedLedgerUnackedRangesOpenCacheSetEnabled());
managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum());
managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum());
if (retentionPolicies == null) {
retentionPolicies = policies.map(p -> p.retention_policies).orElseGet(
() -> new RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
serviceConfig.getDefaultRetentionSizeInMB())
);
}

ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble());
managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
if (localPolicies.isPresent() && localPolicies.get().bookieAffinityGroup != null) {
managedLedgerConfig
.setMetadataMaxEntriesPerLedger(serviceConfig.getManagedLedgerCursorMaxEntriesPerLedger());

managedLedgerConfig.setLedgerRolloverTimeout(
serviceConfig.getManagedLedgerCursorRolloverTimeInSeconds());
managedLedgerConfig.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), TimeUnit.MINUTES);
managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData());
managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery());

OffloadPolicies nsLevelOffloadPolicies = policies.map(p -> p.offload_policies).orElse(null);
OffloadPolicies offloadPolicies = OffloadPolicies.mergeConfiguration(
topicLevelOffloadPolicies,
OffloadPolicies.oldPoliciesCompatible(nsLevelOffloadPolicies, policies.orElse(null)),
getPulsar().getConfig().getProperties());
if (topicLevelOffloadPolicies != null) {
try {
LedgerOffloader topicLevelLedgerOffLoader =
pulsar().createManagedLedgerOffloader(offloadPolicies);
managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
} catch (PulsarServerException e) {
future.completeExceptionally(e);
return null;
}
} else {
//If the topic level policy is null, use the namespace level
managedLedgerConfig.setLedgerOffloader(
pulsar.getManagedLedgerOffloader(namespace, offloadPolicies));
.setBookKeeperEnsemblePlacementPolicyClassName(ZkIsolatedBookieEnsemblePlacementPolicy.class);
Map<String, Object> properties = Maps.newHashMap();
properties.put(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
localPolicies.get().bookieAffinityGroup.bookkeeperAffinityGroupPrimary);
properties.put(ZkIsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
localPolicies.get().bookieAffinityGroup.bookkeeperAffinityGroupSecondary);
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
}
managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate());
managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType());
managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword());

managedLedgerConfig.setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist());
managedLedgerConfig.setMaxUnackedRangesToPersistInZk(serviceConfig.getManagedLedgerMaxUnackedRangesToPersistInZooKeeper());
managedLedgerConfig.setMaxEntriesPerLedger(serviceConfig.getManagedLedgerMaxEntriesPerLedger());
managedLedgerConfig.setMinimumRolloverTime(serviceConfig.getManagedLedgerMinLedgerRolloverTimeMinutes(),
TimeUnit.MINUTES);
managedLedgerConfig.setMaximumRolloverTime(serviceConfig.getManagedLedgerMaxLedgerRolloverTimeMinutes(),
TimeUnit.MINUTES);
managedLedgerConfig.setMaxSizePerLedgerMb(serviceConfig.getManagedLedgerMaxSizePerLedgerMbytes());

managedLedgerConfig.setMetadataOperationsTimeoutSeconds(
serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds());
managedLedgerConfig.setReadEntryTimeoutSeconds(serviceConfig.getManagedLedgerReadEntryTimeoutSeconds());
managedLedgerConfig.setAddEntryTimeoutSeconds(serviceConfig.getManagedLedgerAddEntryTimeoutSeconds());
managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize());
managedLedgerConfig.setUnackedRangesOpenCacheSetEnabled(
serviceConfig.isManagedLedgerUnackedRangesOpenCacheSetEnabled());
managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum());
managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum());
managedLedgerConfig
.setMetadataMaxEntriesPerLedger(serviceConfig.getManagedLedgerCursorMaxEntriesPerLedger());

managedLedgerConfig.setLedgerRolloverTimeout(serviceConfig.getManagedLedgerCursorRolloverTimeInSeconds());
managedLedgerConfig.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), TimeUnit.MINUTES);
managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData());
managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery());

OffloadPolicies nsLevelOffloadPolicies = policies.map(p -> p.offload_policies).orElse(null);
OffloadPolicies offloadPolicies = OffloadPolicies.mergeConfiguration(
topicLevelOffloadPolicies,
OffloadPolicies.oldPoliciesCompatible(nsLevelOffloadPolicies, policies.orElse(null)),
getPulsar().getConfig().getProperties());
if (topicLevelOffloadPolicies != null) {
try {
LedgerOffloader topicLevelLedgerOffLoader = pulsar().createManagedLedgerOffloader(offloadPolicies);
managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
} catch (PulsarServerException e) {
future.completeExceptionally(e);
return;
}
} else {
//If the topic level policy is null, use the namespace level
managedLedgerConfig.setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, offloadPolicies));
}

managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(
serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled());
managedLedgerConfig.setNewEntriesCheckDelayInMillis(
serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis());


future.complete(managedLedgerConfig);
return null;
}).exceptionally(ex -> {
log.warn("Got exception when reading persistence policy for {}: {}", topicName, ex.getMessage(), ex);
future.completeExceptionally(ex);
return null;
});
managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled());
managedLedgerConfig.setNewEntriesCheckDelayInMillis(serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis());


future.complete(managedLedgerConfig);
}, (exception) -> future.completeExceptionally(exception)));

return future;
Expand Down

0 comments on commit d966c1a

Please sign in to comment.