diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index 936528ad8b7e2..41052de646efb 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -425,7 +425,7 @@ The Apache Software License, Version 2.0
- org.apache.httpcomponents-httpclient-4.5.5.jar
- org.apache.httpcomponents-httpcore-4.4.9.jar
* AirCompressor
- - io.airlift-aircompressor-0.20.jar
+ - io.airlift-aircompressor-0.16.jar
* AsyncHttpClient
- org.asynchttpclient-async-http-client-2.12.1.jar
- org.asynchttpclient-async-http-client-netty-utils-2.12.1.jar
diff --git a/pom.xml b/pom.xml
index 40d045ec0dc10..718ef2d81df4f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -157,7 +157,7 @@ flexible messaging model and an intuitive client API.
0.14.0
5.3.2
1.9.13
- 0.20
+ 0.16
2.12.1
1.78
3.6
@@ -598,7 +598,7 @@ flexible messaging model and an intuitive client API.
jna
${jna.version}
-
+
com.github.docker-java
docker-java-core
@@ -787,14 +787,6 @@ flexible messaging model and an intuitive client API.
${typetools.version}
-
- io.grpc
- grpc-bom
- ${grpc.version}
- pom
- import
-
-
io.grpc
grpc-all
@@ -806,7 +798,25 @@ flexible messaging model and an intuitive client API.
-
+
+
+ io.grpc
+ grpc-core
+ ${grpc.version}
+
+
+
+ io.grpc
+ grpc-stub
+ ${grpc.version}
+
+
+
+ io.grpc
+ grpc-protobuf-lite
+ ${grpc.version}
+
+
com.google.protobuf
protobuf-bom
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 40fa540b9c02f..29d1001e4bdd9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1190,139 +1190,119 @@ public CompletableFuture getManagedLedgerConfig(TopicName t
Optional policies = Optional.empty();
Optional localPolicies = Optional.empty();
- PersistencePolicies tmpPersistencePolicies = null;
- RetentionPolicies tmpRetentionPolicies = null;
- OffloadPolicies tmpTopicLevelOffloadPolicies = null;
+ PersistencePolicies persistencePolicies = null;
+ RetentionPolicies retentionPolicies = null;
+ OffloadPolicies topicLevelOffloadPolicies = null;
if (pulsar.getConfig().isTopicLevelPoliciesEnabled()) {
try {
TopicPolicies topicPolicies = pulsar.getTopicPoliciesService().getTopicPolicies(topicName);
if (topicPolicies != null) {
- tmpPersistencePolicies = topicPolicies.getPersistence();
- tmpRetentionPolicies = topicPolicies.getRetentionPolicies();
- tmpTopicLevelOffloadPolicies = topicPolicies.getOffloadPolicies();
+ persistencePolicies = topicPolicies.getPersistence();
+ retentionPolicies = topicPolicies.getRetentionPolicies();
+ topicLevelOffloadPolicies = topicPolicies.getOffloadPolicies();
}
} catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
log.debug("Topic {} policies have not been initialized yet.", topicName);
}
}
- final PersistencePolicies finalPersistencePolicies = tmpPersistencePolicies;
- final RetentionPolicies finalRetentionPolicies = tmpRetentionPolicies;
- final OffloadPolicies finalTopicLevelOffloadPolicies = tmpTopicLevelOffloadPolicies;
-
-
- CompletableFuture> policiesFuture = pulsar
- .getConfigurationCache().policiesCache().getAsync(AdminResource.path(POLICIES,
- namespace.toString()));
- String path = joinPath(LOCAL_POLICIES_ROOT, topicName.getNamespaceObject().toString());
- CompletableFuture> localPoliciesFuture =
- pulsar().getLocalZkCacheService().policiesCache().getAsync(path);
-
- policiesFuture.thenCombine(localPoliciesFuture, (optPolicies, optLocalPolicies) -> {
- PersistencePolicies persistencePolicies = finalPersistencePolicies;
- RetentionPolicies retentionPolicies = finalRetentionPolicies;
- OffloadPolicies topicLevelOffloadPolicies = finalTopicLevelOffloadPolicies;
-
- if (persistencePolicies == null) {
- persistencePolicies = policies.map(p -> p.persistence).orElseGet(
- () -> new PersistencePolicies(serviceConfig.getManagedLedgerDefaultEnsembleSize(),
- serviceConfig.getManagedLedgerDefaultWriteQuorum(),
- serviceConfig.getManagedLedgerDefaultAckQuorum(),
- serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit()));
- }
+ try {
+ policies = pulsar
+ .getConfigurationCache().policiesCache().get(AdminResource.path(POLICIES,
+ namespace.toString()));
+ String path = joinPath(LOCAL_POLICIES_ROOT, topicName.getNamespaceObject().toString());
+ localPolicies = pulsar().getLocalZkCacheService().policiesCache().get(path);
+ } catch (Throwable t) {
+ // Ignoring since if we don't have policies, we fallback on the default
+ log.warn("Got exception when reading persistence policy for {}: {}", topicName, t.getMessage(), t);
+ future.completeExceptionally(t);
+ return;
+ }
- if (retentionPolicies == null) {
- retentionPolicies = policies.map(p -> p.retention_policies).orElseGet(
- () -> new RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
- serviceConfig.getDefaultRetentionSizeInMB())
- );
- }
+ if (persistencePolicies == null) {
+ persistencePolicies = policies.map(p -> p.persistence).orElseGet(
+ () -> new PersistencePolicies(serviceConfig.getManagedLedgerDefaultEnsembleSize(),
+ serviceConfig.getManagedLedgerDefaultWriteQuorum(),
+ serviceConfig.getManagedLedgerDefaultAckQuorum(),
+ serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit()));
+ }
- ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
- managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble());
- managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
- managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
- if (localPolicies.isPresent() && localPolicies.get().bookieAffinityGroup != null) {
- managedLedgerConfig
- .setBookKeeperEnsemblePlacementPolicyClassName(
- ZkIsolatedBookieEnsemblePlacementPolicy.class);
- Map properties = Maps.newHashMap();
- properties.put(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
- localPolicies.get().bookieAffinityGroup.bookkeeperAffinityGroupPrimary);
- properties.put(ZkIsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
- localPolicies.get().bookieAffinityGroup.bookkeeperAffinityGroupSecondary);
- managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
- }
- managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate());
- managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType());
- managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword());
-
- managedLedgerConfig.setMaxUnackedRangesToPersist(
- serviceConfig.getManagedLedgerMaxUnackedRangesToPersist());
- managedLedgerConfig.setMaxUnackedRangesToPersistInZk(
- serviceConfig.getManagedLedgerMaxUnackedRangesToPersistInZooKeeper());
- managedLedgerConfig.setMaxEntriesPerLedger(serviceConfig.getManagedLedgerMaxEntriesPerLedger());
- managedLedgerConfig.setMinimumRolloverTime(serviceConfig.getManagedLedgerMinLedgerRolloverTimeMinutes(),
- TimeUnit.MINUTES);
- managedLedgerConfig.setMaximumRolloverTime(serviceConfig.getManagedLedgerMaxLedgerRolloverTimeMinutes(),
- TimeUnit.MINUTES);
- managedLedgerConfig.setMaxSizePerLedgerMb(serviceConfig.getManagedLedgerMaxSizePerLedgerMbytes());
-
- managedLedgerConfig.setMetadataOperationsTimeoutSeconds(
- serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds());
- managedLedgerConfig.setReadEntryTimeoutSeconds(serviceConfig.getManagedLedgerReadEntryTimeoutSeconds());
- managedLedgerConfig.setAddEntryTimeoutSeconds(serviceConfig.getManagedLedgerAddEntryTimeoutSeconds());
- managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize());
- managedLedgerConfig.setUnackedRangesOpenCacheSetEnabled(
- serviceConfig.isManagedLedgerUnackedRangesOpenCacheSetEnabled());
- managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum());
- managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum());
+ if (retentionPolicies == null) {
+ retentionPolicies = policies.map(p -> p.retention_policies).orElseGet(
+ () -> new RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
+ serviceConfig.getDefaultRetentionSizeInMB())
+ );
+ }
+
+ ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+ managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble());
+ managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
+ managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
+ if (localPolicies.isPresent() && localPolicies.get().bookieAffinityGroup != null) {
managedLedgerConfig
- .setMetadataMaxEntriesPerLedger(serviceConfig.getManagedLedgerCursorMaxEntriesPerLedger());
-
- managedLedgerConfig.setLedgerRolloverTimeout(
- serviceConfig.getManagedLedgerCursorRolloverTimeInSeconds());
- managedLedgerConfig.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), TimeUnit.MINUTES);
- managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
- managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData());
- managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery());
-
- OffloadPolicies nsLevelOffloadPolicies = policies.map(p -> p.offload_policies).orElse(null);
- OffloadPolicies offloadPolicies = OffloadPolicies.mergeConfiguration(
- topicLevelOffloadPolicies,
- OffloadPolicies.oldPoliciesCompatible(nsLevelOffloadPolicies, policies.orElse(null)),
- getPulsar().getConfig().getProperties());
- if (topicLevelOffloadPolicies != null) {
- try {
- LedgerOffloader topicLevelLedgerOffLoader =
- pulsar().createManagedLedgerOffloader(offloadPolicies);
- managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
- } catch (PulsarServerException e) {
- future.completeExceptionally(e);
- return null;
- }
- } else {
- //If the topic level policy is null, use the namespace level
- managedLedgerConfig.setLedgerOffloader(
- pulsar.getManagedLedgerOffloader(namespace, offloadPolicies));
+ .setBookKeeperEnsemblePlacementPolicyClassName(ZkIsolatedBookieEnsemblePlacementPolicy.class);
+ Map properties = Maps.newHashMap();
+ properties.put(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
+ localPolicies.get().bookieAffinityGroup.bookkeeperAffinityGroupPrimary);
+ properties.put(ZkIsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
+ localPolicies.get().bookieAffinityGroup.bookkeeperAffinityGroupSecondary);
+ managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
+ }
+ managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate());
+ managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType());
+ managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword());
+
+ managedLedgerConfig.setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist());
+ managedLedgerConfig.setMaxUnackedRangesToPersistInZk(serviceConfig.getManagedLedgerMaxUnackedRangesToPersistInZooKeeper());
+ managedLedgerConfig.setMaxEntriesPerLedger(serviceConfig.getManagedLedgerMaxEntriesPerLedger());
+ managedLedgerConfig.setMinimumRolloverTime(serviceConfig.getManagedLedgerMinLedgerRolloverTimeMinutes(),
+ TimeUnit.MINUTES);
+ managedLedgerConfig.setMaximumRolloverTime(serviceConfig.getManagedLedgerMaxLedgerRolloverTimeMinutes(),
+ TimeUnit.MINUTES);
+ managedLedgerConfig.setMaxSizePerLedgerMb(serviceConfig.getManagedLedgerMaxSizePerLedgerMbytes());
+
+ managedLedgerConfig.setMetadataOperationsTimeoutSeconds(
+ serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds());
+ managedLedgerConfig.setReadEntryTimeoutSeconds(serviceConfig.getManagedLedgerReadEntryTimeoutSeconds());
+ managedLedgerConfig.setAddEntryTimeoutSeconds(serviceConfig.getManagedLedgerAddEntryTimeoutSeconds());
+ managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize());
+ managedLedgerConfig.setUnackedRangesOpenCacheSetEnabled(
+ serviceConfig.isManagedLedgerUnackedRangesOpenCacheSetEnabled());
+ managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum());
+ managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum());
+ managedLedgerConfig
+ .setMetadataMaxEntriesPerLedger(serviceConfig.getManagedLedgerCursorMaxEntriesPerLedger());
+
+ managedLedgerConfig.setLedgerRolloverTimeout(serviceConfig.getManagedLedgerCursorRolloverTimeInSeconds());
+ managedLedgerConfig.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), TimeUnit.MINUTES);
+ managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
+ managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData());
+ managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery());
+
+ OffloadPolicies nsLevelOffloadPolicies = policies.map(p -> p.offload_policies).orElse(null);
+ OffloadPolicies offloadPolicies = OffloadPolicies.mergeConfiguration(
+ topicLevelOffloadPolicies,
+ OffloadPolicies.oldPoliciesCompatible(nsLevelOffloadPolicies, policies.orElse(null)),
+ getPulsar().getConfig().getProperties());
+ if (topicLevelOffloadPolicies != null) {
+ try {
+ LedgerOffloader topicLevelLedgerOffLoader = pulsar().createManagedLedgerOffloader(offloadPolicies);
+ managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
+ } catch (PulsarServerException e) {
+ future.completeExceptionally(e);
+ return;
}
+ } else {
+ //If the topic level policy is null, use the namespace level
+ managedLedgerConfig.setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, offloadPolicies));
+ }
- managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(
- serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled());
- managedLedgerConfig.setNewEntriesCheckDelayInMillis(
- serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis());
-
-
- future.complete(managedLedgerConfig);
- return null;
- }).exceptionally(ex -> {
- log.warn("Got exception when reading persistence policy for {}: {}", topicName, ex.getMessage(), ex);
- future.completeExceptionally(ex);
- return null;
- });
+ managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled());
+ managedLedgerConfig.setNewEntriesCheckDelayInMillis(serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis());
+ future.complete(managedLedgerConfig);
}, (exception) -> future.completeExceptionally(exception)));
return future;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index c2fd32a44fdd4..a5613d439a5b6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -157,7 +157,6 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
@SuppressWarnings("unused")
private volatile long usageCount = 0;
-
static final String DEDUPLICATION_CURSOR_NAME = "pulsar.dedup";
private static final double MESSAGE_EXPIRY_THRESHOLD = 1.5;
@@ -1156,72 +1155,68 @@ public CompletableFuture checkReplication() {
log.debug("[{}] Checking replication status", name);
}
- return brokerService.pulsar().getConfigurationCache().policiesCache()
- .getAsync(AdminResource.path(POLICIES, name.getNamespace()))
- .thenCompose(optPolicies -> {
- if (!optPolicies.isPresent()) {
- return FutureUtil.failedFuture(
- new ServerMetadataException("Namespace not found: " + name.getNamespace()));
- }
-
- Policies policies = optPolicies.get();
+ Policies policies = null;
+ try {
+ policies = brokerService.pulsar().getConfigurationCache().policiesCache()
+ .get(AdminResource.path(POLICIES, name.getNamespace()))
+ .orElseThrow(() -> new KeeperException.NoNodeException());
+ } catch (Exception e) {
+ CompletableFuture future = new CompletableFuture<>();
+ future.completeExceptionally(new ServerMetadataException(e));
+ return future;
+ }
+ //Ignore current broker's config for messageTTL for replication.
+ final int newMessageTTLinSeconds;
+ try {
+ newMessageTTLinSeconds = getMessageTTL();
+ } catch (Exception e) {
+ return FutureUtil.failedFuture(new ServerMetadataException(e));
+ }
- //Ignore current broker's config for messageTTL for replication.
- final int newMessageTTLinSeconds;
- try {
- newMessageTTLinSeconds = getMessageTTL();
- } catch (Exception e) {
- return FutureUtil.failedFuture(new ServerMetadataException(e));
- }
+ Set configuredClusters;
+ if (policies.replication_clusters != null) {
+ configuredClusters = Sets.newTreeSet(policies.replication_clusters);
+ } else {
+ configuredClusters = Collections.emptySet();
+ }
- Set configuredClusters;
- if (policies.replication_clusters != null) {
- configuredClusters = Sets.newTreeSet(policies.replication_clusters);
- } else {
- configuredClusters = Collections.emptySet();
- }
+ String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
- String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
-
- // if local cluster is removed from global namespace cluster-list : then delete topic forcefully
- // because pulsar
- // doesn't serve global topic without local repl-cluster configured.
- if (TopicName.get(topic).isGlobal() && !configuredClusters.contains(localCluster)) {
- log.info(
- "Deleting topic [{}] because local cluster is not part of global namespace repl list "
- + "{}",
- topic, configuredClusters);
- return deleteForcefully();
- }
+ // if local cluster is removed from global namespace cluster-list : then delete topic forcefully because pulsar
+ // doesn't serve global topic without local repl-cluster configured.
+ if (TopicName.get(topic).isGlobal() && !configuredClusters.contains(localCluster)) {
+ log.info("Deleting topic [{}] because local cluster is not part of global namespace repl list {}",
+ topic, configuredClusters);
+ return deleteForcefully();
+ }
- List> futures = Lists.newArrayList();
+ List> futures = Lists.newArrayList();
- // Check for missing replicators
- for (String cluster : configuredClusters) {
- if (cluster.equals(localCluster)) {
- continue;
- }
+ // Check for missing replicators
+ for (String cluster : configuredClusters) {
+ if (cluster.equals(localCluster)) {
+ continue;
+ }
- if (!replicators.containsKey(cluster)) {
- futures.add(startReplicator(cluster));
- }
- }
+ if (!replicators.containsKey(cluster)) {
+ futures.add(startReplicator(cluster));
+ }
+ }
- // Check for replicators to be stopped
- replicators.forEach((cluster, replicator) -> {
- // Update message TTL
- ((PersistentReplicator) replicator).updateMessageTTL(newMessageTTLinSeconds);
+ // Check for replicators to be stopped
+ replicators.forEach((cluster, replicator) -> {
+ // Update message TTL
+ ((PersistentReplicator) replicator).updateMessageTTL(newMessageTTLinSeconds);
- if (!cluster.equals(localCluster)) {
- if (!configuredClusters.contains(cluster)) {
- futures.add(removeReplicator(cluster));
- }
- }
+ if (!cluster.equals(localCluster)) {
+ if (!configuredClusters.contains(cluster)) {
+ futures.add(removeReplicator(cluster));
+ }
+ }
- });
+ });
- return FutureUtil.waitForAll(futures);
- });
+ return FutureUtil.waitForAll(futures);
}
@Override
@@ -2480,10 +2475,8 @@ private int getMessageTTL() throws Exception {
TopicName name = TopicName.get(topic);
TopicPolicies topicPolicies = getTopicPolicies(name);
Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache()
- .getDataIfPresent(AdminResource.path(POLICIES, name.getNamespace()));
- if (policies == null) {
- throw new KeeperException.NoNodeException();
- }
+ .get(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()))
+ .orElseThrow(() -> new KeeperException.NoNodeException());
if (topicPolicies != null && topicPolicies.isMessageTTLSet()) {
return topicPolicies.getMessageTTLInSeconds();
}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 6ae0e52f961d9..3c73284e7ed4a 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -529,10 +529,8 @@ protected void handleLookup(CommandLookupTopic lookup) {
ClientConfigurationData createClientConfiguration()
throws PulsarClientException.UnsupportedAuthenticationException {
ClientConfigurationData initialConf = new ClientConfigurationData();
+ initialConf.setServiceUrl(service.getServiceUrl());
ProxyConfiguration proxyConfig = service.getConfiguration();
- initialConf.setServiceUrl(
- proxyConfig.isTlsEnabledWithBroker() ? service.getServiceUrlTls() : service.getServiceUrl());
-
// Apply all arbitrary configuration. This must be called before setting any fields annotated as
// @Secret on the ClientConfigurationData object because of the way they are serialized.
// See https://github.com/apache/pulsar/issues/8509 for more information.
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionTest.java
index 8c07e4b42d797..5f533e37d3594 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionTest.java
@@ -18,12 +18,8 @@
*/
package org.apache.pulsar.proxy.server;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.testng.annotations.Test;
public class ProxyConnectionTest {
@@ -39,24 +35,4 @@ public void testMatchesHostAndPort() {
assertFalse(ProxyConnection
.matchesHostAndPort("pulsar://", "pulsar://1.2.3.4:12345", "1.2.3.4:1234"));
}
- @Test
- public void testCreateClientConfiguration() {
- ProxyConfiguration proxyConfiguration = new ProxyConfiguration();
- proxyConfiguration.setTlsEnabledWithBroker(true);
- String proxyUrlTls = "pulsar+ssl://proxy:6651";
- String proxyUrl = "pulsar://proxy:6650";
-
- ProxyService proxyService = mock(ProxyService.class);
- doReturn(proxyConfiguration).when(proxyService).getConfiguration();
- doReturn(proxyUrlTls).when(proxyService).getServiceUrlTls();
- doReturn(proxyUrl).when(proxyService).getServiceUrl();
-
- ProxyConnection proxyConnection = new ProxyConnection(proxyService, null);
- ClientConfigurationData clientConfiguration = proxyConnection.createClientConfiguration();
- assertEquals(clientConfiguration.getServiceUrl(), proxyUrlTls);
-
- proxyConfiguration.setTlsEnabledWithBroker(false);
- clientConfiguration = proxyConnection.createClientConfiguration();
- assertEquals(clientConfiguration.getServiceUrl(), proxyUrl);
- }
}
diff --git a/pulsar-sql/presto-distribution/LICENSE b/pulsar-sql/presto-distribution/LICENSE
index 906cad724359e..a52dda264ba59 100644
--- a/pulsar-sql/presto-distribution/LICENSE
+++ b/pulsar-sql/presto-distribution/LICENSE
@@ -273,7 +273,7 @@ The Apache Software License, Version 2.0
* CGLIB Nodep
- cglib-nodep-3.3.0.jar
* Airlift
- - aircompressor-0.20.jar
+ - aircompressor-0.16.jar
- airline-0.8.jar
- bootstrap-0.199.jar
- bootstrap-0.195.jar