Skip to content

Commit

Permalink
improving how httpclients can be configured (#4490)
Browse files Browse the repository at this point in the history
* fix #4472: refined withRequestConfig so that a new client is not created

* fix #4471: kubernetesclientbuilder method for the httpclient.builder

* fix #4471: still need to apply config even with default factory
  • Loading branch information
shawkins committed Oct 18, 2022
1 parent 0394a0e commit 366ede8
Show file tree
Hide file tree
Showing 33 changed files with 582 additions and 463 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -16,6 +16,7 @@
* Fix #4426: [java-generator] Encode an `AnyType` instead of an Object if `x-kubernetes-preserve-unknown-fields` is present and the type is null.

#### Improvements
* Fix #4471: Adding KubernetesClientBuilder.withHttpClientBuilderConsumer to further customize the HttpClient for any implementation.
* Fix #4348: Introduce specific annotations for the generators
* Refactor #4441: refactoring `TokenRefreshInterceptor`
* Fix #4365: The Watch retry logic will handle more cases, as well as perform an exceptional close for events that are not properly handled. Informers can directly provide those exceptional outcomes via the SharedIndexInformer.stopped CompletableFuture.
Expand Down
Expand Up @@ -18,25 +18,17 @@

import io.fabric8.kubernetes.client.http.BasicBuilder;
import io.fabric8.kubernetes.client.http.HttpClient;
import io.fabric8.kubernetes.client.http.HttpClient.Builder;
import io.fabric8.kubernetes.client.http.HttpHeaders;
import io.fabric8.kubernetes.client.http.Interceptor;
import io.fabric8.kubernetes.client.http.StandardHttpClientBuilder;
import io.fabric8.kubernetes.client.http.TlsVersion;
import io.fabric8.kubernetes.client.internal.SSLUtils;

import java.net.InetSocketAddress;
import java.net.ProxySelector;
import java.net.http.HttpClient.Redirect;
import java.net.http.HttpClient.Version;
import java.time.Duration;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.concurrent.TimeUnit;

import javax.net.ssl.KeyManager;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLParameters;
import javax.net.ssl.TrustManager;

/**
* TODO: if there is another implementation that does not support client builder copying, then this needs to be abstracted -
Expand All @@ -48,28 +40,17 @@
*
*/

class JdkHttpClientBuilderImpl implements Builder {
class JdkHttpClientBuilderImpl
extends StandardHttpClientBuilder<JdkHttpClientImpl, JdkHttpClientFactory, JdkHttpClientBuilderImpl> {

LinkedHashMap<String, Interceptor> interceptors = new LinkedHashMap<>();
Duration connectTimeout;
Duration readTimeout;
private SSLContext sslContext;
JdkHttpClientFactory clientFactory;
private String proxyAuthorization;
private InetSocketAddress proxyAddress;
private boolean followRedirects;
private boolean preferHttp11;
private TlsVersion[] tlsVersions;
private java.net.http.HttpClient httpClient;

JdkHttpClientBuilderImpl(JdkHttpClientFactory factory) {
this.clientFactory = factory;
public JdkHttpClientBuilderImpl(JdkHttpClientFactory factory) {
super(factory);
}

@Override
public HttpClient build() {
if (httpClient != null) {
return new JdkHttpClientImpl(this, httpClient);
if (client != null) {
return new JdkHttpClientImpl(this, client.getHttpClient(), this.requestConfig);
}
java.net.http.HttpClient.Builder builder = clientFactory.createNewHttpClientBuilder();
if (connectTimeout != null) {
Expand Down Expand Up @@ -104,101 +85,12 @@ public void before(BasicBuilder builder, HttpHeaders headers) {
Arrays.asList(tlsVersions).stream().map(TlsVersion::javaName).toArray(String[]::new)));
}
clientFactory.additionalConfig(builder);
return new JdkHttpClientImpl(this, builder.build());
}

@Override
public JdkHttpClientBuilderImpl readTimeout(long readTimeout, TimeUnit unit) {
if (readTimeout == 0) {
this.readTimeout = null;
} else {
this.readTimeout = Duration.ofNanos(unit.toNanos(readTimeout));
}
return this;
}

@Override
public Builder connectTimeout(long connectTimeout, TimeUnit unit) {
this.connectTimeout = Duration.ofNanos(unit.toNanos(connectTimeout));
return this;
}

@Override
public Builder forStreaming() {
// nothing to do
return this;
}

@Override
public Builder writeTimeout(long timeout, TimeUnit timeoutUnit) {
// nothing to do
return this;
}

@Override
public Builder addOrReplaceInterceptor(String name, Interceptor interceptor) {
if (interceptor == null) {
interceptors.remove(name);
} else {
interceptors.put(name, interceptor);
}
return this;
return new JdkHttpClientImpl(this, builder.build(), null);
}

@Override
public Builder authenticatorNone() {
return this;
}

@Override
public Builder sslContext(KeyManager[] keyManagers, TrustManager[] trustManagers) {
this.sslContext = SSLUtils.sslContext(keyManagers, trustManagers);
return this;
}

@Override
public Builder followAllRedirects() {
this.followRedirects = true;
return this;
}

@Override
public Builder proxyAddress(InetSocketAddress proxyAddress) {
this.proxyAddress = proxyAddress;
return this;
}

@Override
public Builder proxyAuthorization(String credentials) {
this.proxyAuthorization = credentials;
return this;
}

@Override
public Builder preferHttp11() {
this.preferHttp11 = true;
return this;
}

@Override
public Builder tlsVersions(TlsVersion... tlsVersions) {
this.tlsVersions = tlsVersions;
return this;
}

public JdkHttpClientBuilderImpl copy(java.net.http.HttpClient httpClient) {
JdkHttpClientBuilderImpl copy = new JdkHttpClientBuilderImpl(this.clientFactory);
copy.connectTimeout = this.connectTimeout;
copy.readTimeout = this.readTimeout;
copy.sslContext = this.sslContext;
copy.interceptors = new LinkedHashMap<>(this.interceptors);
copy.proxyAddress = this.proxyAddress;
copy.proxyAuthorization = this.proxyAuthorization;
copy.tlsVersions = this.tlsVersions;
copy.preferHttp11 = this.preferHttp11;
copy.followRedirects = this.followRedirects;
copy.httpClient = httpClient;
return copy;
protected JdkHttpClientBuilderImpl newInstance(JdkHttpClientFactory clientFactory) {
return new JdkHttpClientBuilderImpl(clientFactory);
}

}
Expand Up @@ -18,7 +18,6 @@

import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.http.HttpClient;
import io.fabric8.kubernetes.client.utils.HttpClientUtils;
import io.fabric8.kubernetes.client.utils.Utils;

import java.util.concurrent.Executor;
Expand Down Expand Up @@ -46,15 +45,6 @@ public void shutdownNow() {

}

@Override
public HttpClient createHttpClient(Config config) {
JdkHttpClientBuilderImpl builderWrapper = newBuilder();

HttpClientUtils.applyCommonConfiguration(config, builderWrapper, this);

return builderWrapper.build();
}

@Override
public JdkHttpClientBuilderImpl newBuilder() {
return new JdkHttpClientBuilderImpl(this);
Expand Down
Expand Up @@ -16,6 +16,7 @@

package io.fabric8.kubernetes.client.jdkhttp;

import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.http.HttpClient;
import io.fabric8.kubernetes.client.http.HttpRequest;
import io.fabric8.kubernetes.client.http.HttpResponse;
Expand Down Expand Up @@ -214,25 +215,27 @@ public HandlerAndAsyncBody(BodyHandler<T> handler, AsyncBody asyncBody) {

private JdkHttpClientBuilderImpl builder;
private java.net.http.HttpClient httpClient;
private Config config;

public JdkHttpClientImpl(JdkHttpClientBuilderImpl builderImpl, java.net.http.HttpClient httpClient) {
public JdkHttpClientImpl(JdkHttpClientBuilderImpl builderImpl, java.net.http.HttpClient httpClient, Config config) {
this.builder = builderImpl;
this.httpClient = httpClient;
this.config = config;
}

@Override
public void close() {
if (this.httpClient == null) {
return;
}
builder.clientFactory.closeHttpClient(this);
builder.getClientFactory().closeHttpClient(this);
// help with default cleanup, which is based upon garbarge collection
this.httpClient = null;
}

@Override
public DerivedClientBuilder newBuilder() {
return this.builder.copy(getHttpClient());
return this.builder.copy(this);
}

@Override
Expand Down Expand Up @@ -286,8 +289,8 @@ public <T> CompletableFuture<AsyncResponse<T>> sendAsync(HttpRequest request,
Supplier<HandlerAndAsyncBody<T>> handlerAndAsyncBodySupplier) {
JdkHttpRequestImpl jdkRequest = (JdkHttpRequestImpl) request;
JdkHttpRequestImpl.BuilderImpl builderImpl = jdkRequest.newBuilder();
for (Interceptor interceptor : builder.interceptors.values()) {
interceptor.before(builderImpl, jdkRequest);
for (Interceptor interceptor : builder.getInterceptors().values()) {
Interceptor.useConfig(interceptor, config).before(builderImpl, jdkRequest);
jdkRequest = builderImpl.build();
}

Expand All @@ -296,19 +299,20 @@ public <T> CompletableFuture<AsyncResponse<T>> sendAsync(HttpRequest request,
CompletableFuture<AsyncResponse<T>> cf = this.getHttpClient().sendAsync(builderImpl.build().request,
handlerAndAsyncBody.handler).thenApply(r -> new AsyncResponse<>(r, handlerAndAsyncBody.asyncBody));

for (Interceptor interceptor : builder.interceptors.values()) {
for (Interceptor interceptor : builder.getInterceptors().values()) {
cf = cf.thenCompose(ar -> {
java.net.http.HttpResponse<T> response = ar.response;
if (response != null && !HttpResponse.isSuccessful(response.statusCode())) {
return interceptor.afterFailure(builderImpl, new JdkHttpResponseImpl<>(response)).thenCompose(b -> {
if (b) {
HandlerAndAsyncBody<T> interceptedHandlerAndAsyncBody = handlerAndAsyncBodySupplier.get();

return this.getHttpClient().sendAsync(builderImpl.build().request, interceptedHandlerAndAsyncBody.handler)
.thenApply(r -> new AsyncResponse<>(r, interceptedHandlerAndAsyncBody.asyncBody));
}
return CompletableFuture.completedFuture(ar);
});
return Interceptor.useConfig(interceptor, config).afterFailure(builderImpl, new JdkHttpResponseImpl<>(response))
.thenCompose(b -> {
if (b) {
HandlerAndAsyncBody<T> interceptedHandlerAndAsyncBody = handlerAndAsyncBodySupplier.get();

return this.getHttpClient().sendAsync(builderImpl.build().request, interceptedHandlerAndAsyncBody.handler)
.thenApply(r -> new AsyncResponse<>(r, interceptedHandlerAndAsyncBody.asyncBody));
}
return CompletableFuture.completedFuture(ar);
});
}
return CompletableFuture.completedFuture(ar);
});
Expand All @@ -324,7 +328,7 @@ public io.fabric8.kubernetes.client.http.WebSocket.Builder newWebSocketBuilder()

@Override
public io.fabric8.kubernetes.client.http.HttpRequest.Builder newHttpRequestBuilder() {
return new JdkHttpRequestImpl.BuilderImpl().timeout(this.builder.readTimeout);
return new JdkHttpRequestImpl.BuilderImpl().timeout(this.builder.getReadTimeout());
}

/*
Expand All @@ -344,23 +348,24 @@ public WebSocketResponse(WebSocket w, java.net.http.WebSocketHandshakeException
public CompletableFuture<WebSocket> buildAsync(JdkWebSocketImpl.BuilderImpl webSocketBuilder, Listener listener) {
JdkWebSocketImpl.BuilderImpl copy = webSocketBuilder.copy();

for (Interceptor interceptor : builder.interceptors.values()) {
interceptor.before(copy, new JdkHttpRequestImpl(null, copy.asRequest()));
for (Interceptor interceptor : builder.getInterceptors().values()) {
Interceptor.useConfig(interceptor, config).before(copy, new JdkHttpRequestImpl(null, copy.asRequest()));
}

CompletableFuture<WebSocket> result = new CompletableFuture<>();

CompletableFuture<WebSocketResponse> cf = internalBuildAsync(copy, listener);

for (Interceptor interceptor : builder.interceptors.values()) {
for (Interceptor interceptor : builder.getInterceptors().values()) {
cf = cf.thenCompose(response -> {
if (response.wshse != null && response.wshse.getResponse() != null) {
return interceptor.afterFailure(copy, new JdkHttpResponseImpl<>(response.wshse.getResponse())).thenCompose(b -> {
if (b) {
return this.internalBuildAsync(copy, listener);
}
return CompletableFuture.completedFuture(response);
});
return Interceptor.useConfig(interceptor, config)
.afterFailure(copy, new JdkHttpResponseImpl<>(response.wshse.getResponse())).thenCompose(b -> {
if (b) {
return this.internalBuildAsync(copy, listener);
}
return CompletableFuture.completedFuture(response);
});
}
return CompletableFuture.completedFuture(response);
});
Expand Down Expand Up @@ -399,8 +404,8 @@ public CompletableFuture<WebSocketResponse> internalBuildAsync(JdkWebSocketImpl.
}
// the Watch logic sets a websocketTimeout as the readTimeout
// TODO: this should probably be made clearer in the docs
if (this.builder.readTimeout != null) {
newBuilder.connectTimeout(this.builder.readTimeout);
if (this.builder.getReadTimeout() != null) {
newBuilder.connectTimeout(this.builder.getReadTimeout());
}

AtomicLong queueSize = new AtomicLong();
Expand Down Expand Up @@ -436,9 +441,4 @@ java.net.http.HttpClient getHttpClient() {
return httpClient;
}

@Override
public Factory getFactory() {
return builder.clientFactory;
}

}

0 comments on commit 366ede8

Please sign in to comment.