Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][client] Add maxConnectionsPerHost and connectionMaxIdleSeconds to PulsarAdminBuilder #22541

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

lhotari
Copy link
Member

@lhotari lhotari commented Apr 19, 2024

Fixes #22041

Motivation

See #22041 . Currently, when using the asynchronous interfaces of the Pulsar Admin client, there's no backpressure by the client itself and the client will keep on opening new connections to the broker to fulfill the in-progress requests.
Eventually, the broker will hit the maxHttpServerConnections limit, which is 2048.

It's better to limit the number of connections from a single client. This PR sets the limit to 16 connections per host.
The limit isn't called connectionsPerBroker since admin operations usually target a cluster address.

Modification

  • add maxConnectionsPerHost and connectionMaxIdleSeconds to PulsarAdminBuilder
  • also modify the default connectionMaxIdleSeconds from 60 seconds to 25 seconds
    • some firewalls/NATs have a timeout of 30 seconds and that's why 25 seconds is a better default - common firewall/NAT idle timeout is 60 seconds and since the check isn't absolute, a better default is 25 seconds to ensure that connections don't die because of firewall/NAT timeouts

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

… to PulsarAdminBuilder

- also modify the default connectionMaxIdleSeconds from 60 seconds to 25 seconds
  - some firewalls/NATs have a timeout of 30 seconds and that's why 25 seconds is a better default
    - common firewall/NAT idle timeout is 60 seconds and since the check isn't absolute, a better
      default is 25 seconds to ensure that connections don't die because of firewall/NAT timeouts
@lhotari lhotari added this to the 3.3.0 milestone Apr 19, 2024
@lhotari lhotari self-assigned this Apr 19, 2024
@lhotari lhotari marked this pull request as draft April 19, 2024 13:17
@github-actions github-actions bot added the doc Your PR contains doc changes, no matter whether the changes are in markdown or code files. label Apr 19, 2024
Copy link
Contributor

@merlimat merlimat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, just a couple of minor suggestions

@@ -47,6 +47,7 @@ public PulsarAdmin build() throws PulsarClientException {

public PulsarAdminBuilderImpl() {
this.conf = new ClientConfigurationData();
this.conf.setConnectionsPerBroker(16);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't the default be part of ClientConfigurationData constructor?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't the default be part of ClientConfigurationData constructor?

Not really. ClientConfigurationData is designed for PulsarClient, but it's also used in PulsarAdmin client. The current PulsarAdminBuilderImpl is a bit of a hack around ClientConfigurationData.

@lhotari lhotari changed the title [improve][client] Add connectionsPerHost and connectionMaxIdleSeconds to PulsarAdminBuilder [improve][client] Add maxConnectionsPerHost and connectionMaxIdleSeconds to PulsarAdminBuilder Apr 22, 2024
@merlimat merlimat marked this pull request as ready for review April 22, 2024 15:09
@lhotari
Copy link
Member Author

lhotari commented Apr 22, 2024

the setMaxConnectionsPerHost in async http client doesn't seem to behave as expected. Will check the errors

  Caused by: java.util.concurrent.CompletionException: org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector$RetryException: Could not complete the operation. Number of retries has been exhausted. Failed reason: Too many connections: 16
  	at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332)
  	at java.base/java.util.concurrent.CompletableFuture.uniApplyNow(CompletableFuture.java:674)
  	at java.base/java.util.concurrent.CompletableFuture.orApplyStage(CompletableFuture.java:1601)
  	at java.base/java.util.concurrent.CompletableFuture.applyToEither(CompletableFuture.java:2261)
  	at org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector.retryOrTimeOut(AsyncHttpConnector.java:275)
  	at org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector.apply(AsyncHttpConnector.java:234)

UPDATE: it's necessary to set acquireFreeChannelTimeout setting in AHC. Will find a way to set a proper default and have it configurable.

@lhotari
Copy link
Member Author

lhotari commented Apr 23, 2024

There's a problem with backpressure handling with async requests in the Pulsar code base.
Since this PR limits the Pulsar Admin client to 16 connections per host, it now shows up problems.

The namespace unloading is a good example:

final List<CompletableFuture<Void>> futures = new ArrayList<>();
List<String> boundaries = policies.bundles.getBoundaries();
for (int i = 0; i < boundaries.size() - 1; i++) {
String bundle = String.format("%s_%s", boundaries.get(i), boundaries.get(i + 1));
try {
futures.add(pulsar().getAdminClient().namespaces().unloadNamespaceBundleAsync(
namespaceName.toString(), bundle));
} catch (PulsarServerException e) {
log.error("[{}] Failed to unload namespace {}", clientAppId(), namespaceName, e);
throw new RestException(e);
}
}
return FutureUtil.waitForAll(futures);

All bundles in the namespace are unloaded at once without limiting concurrency.
There was a dev mailing list discussion about backpressure and Pulsar Admin API implementation in https://lists.apache.org/thread/03w6x9zsgx11mqcp5m4k4n27cyqmp271 . However we didn't come across resolving the problem.

a snippet from my email in that thread:

Touching upon Pulsar Admin REST API. The context for back pressure is a
lot different in the Pulsar Admin REST API. Before the PIP-149 async
changes, there was explicit backpressure in the REST API implementation.
The system could handle a limited amount of work and it would process
downstream work items one-by-one.

With "PIP-149 Making the REST Admin API fully async"
(#14365), there are different
challenges related to backpressure. It is usually about how to limit the
in-progress work in the system. An async system will accept a lot of
work compared to the previous solution and this accepted work will get
processed in the async REST API backend eventually even when the clients
have already closed the connection and sent a new retry. One possible
solution to this issue is to limit incoming requests at the HTTP server
level with features that Jetty provides for limiting concurrency. PRs
#14353 and
#15637 added this support to
Pulsar. The values might have to be tuned to a lot lower values to
prevent issues in practice. This is not a complete solution for REST API
backend. It would be useful to also have a solution that would cancel
down stream requests that are for incoming HTTP requests that no longer
exist since the client stopped waiting for the response. The main down
stream requests are towards the metadata store. It might also be
necessary to limit the number of outstanding downstream requests. With
batching in metadata store, that might not be an issue.

The solution for the namespace unloading issue is to have a way to limit the outstanding CompletableFutures that are in progress and use that as a way to "backpressure" the sending of new requests. The current solution of sending out all requests and then waiting for the results is a problematic solution since it doesn't use any sort of feedback from the system to adjust the speed. In other words, there's currently no proper backpressure solution for async Pulsar Admin calls within Pulsar broker.

I'll experiment with some ways to add backpressure to cases where a large amount of async calls are triggered and then results are waited.

@lhotari
Copy link
Member Author

lhotari commented Apr 23, 2024

Another location without proper backpressure is namespace deletion:

return markDeleteFuture.thenCompose(__ ->
internalDeleteTopicsAsync(allUserCreatedTopics))
.thenCompose(ignore ->
internalDeletePartitionedTopicsAsync(allUserCreatedPartitionTopics))
.thenCompose(ignore ->
internalDeleteTopicsAsync(allSystemTopics))
.thenCompose(ignore ->
internalDeletePartitionedTopicsAsync(allPartitionedSystemTopics))
.thenCompose(ignore ->
internalDeleteTopicsAsync(topicPolicy))
.thenCompose(ignore ->
internalDeletePartitionedTopicsAsync(partitionedTopicPolicy));

@lhotari
Copy link
Member Author

lhotari commented Apr 23, 2024

example of creating partitions:

protected CompletableFuture<Void> tryCreatePartitionsAsync(int numPartitions) {
if (!topicName.isPersistent()) {
return CompletableFuture.completedFuture(null);
}
List<CompletableFuture<Void>> futures = new ArrayList<>(numPartitions);
for (int i = 0; i < numPartitions; i++) {
futures.add(tryCreatePartitionAsync(i));
}
return FutureUtil.waitForAll(futures);
}

This would need backpressure too. Let's say if you create a 100 partition topic, the broker might open 100 HTTP connections to create the topic partitions concurrently. This is problematic when the brokers are under heavy load.

@lhotari
Copy link
Member Author

lhotari commented Apr 23, 2024

@lhotari
Copy link
Member Author

lhotari commented Apr 23, 2024

Noticed that there's a solution to run 1-by-1 using

@ThreadSafe
public static class Sequencer<T> {
private CompletableFuture<T> sequencerFuture = CompletableFuture.completedFuture(null);
private final boolean allowExceptionBreakChain;
public Sequencer(boolean allowExceptionBreakChain) {
this.allowExceptionBreakChain = allowExceptionBreakChain;
}
public static <T> Sequencer<T> create(boolean allowExceptionBreakChain) {
return new Sequencer<>(allowExceptionBreakChain);
}
public static <T> Sequencer<T> create() {
return new Sequencer<>(false);
}
/**
* @throws NullPointerException NPE when param is null
*/
public synchronized CompletableFuture<T> sequential(Supplier<CompletableFuture<T>> newTask) {
Objects.requireNonNull(newTask);
if (sequencerFuture.isDone()) {
if (sequencerFuture.isCompletedExceptionally() && allowExceptionBreakChain) {
return sequencerFuture;
}
return sequencerFuture = newTask.get();
}
return sequencerFuture = allowExceptionBreakChain
? sequencerFuture.thenCompose(__ -> newTask.get())
: sequencerFuture.exceptionally(ex -> null).thenCompose(__ -> newTask.get());
}
}
. However, I think that ConcurrencyReducer would be a better solution for most use cases.

@lhotari
Copy link
Member Author

lhotari commented Apr 23, 2024

Another challenge is to cancel work that is queued in the system, but not waited by any clients.
Newer Jersey clients have support for this. I noticed commit eclipse-ee4j/jersey@9602806 in Jersey.
When the system is overloaded, request processing might be very slow so that clients get timeouts and retry requests.
This will add more work to the system unless there's a solution that cancels the timed out tasks. That's why addressing this is also important part of the solution.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
doc Your PR contains doc changes, no matter whether the changes are in markdown or code files. ready-to-test
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Set limits for number of opened HTTP connections for Pulsar Admin client
3 participants