Skip to content

Commit

Permalink
fix fabric8io#4472: refined withRequestConfig so that a new client is…
Browse files Browse the repository at this point in the history
… not created
  • Loading branch information
shawkins committed Oct 12, 2022
1 parent 177de1c commit 25d7509
Show file tree
Hide file tree
Showing 24 changed files with 451 additions and 424 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,17 @@

import io.fabric8.kubernetes.client.http.BasicBuilder;
import io.fabric8.kubernetes.client.http.HttpClient;
import io.fabric8.kubernetes.client.http.HttpClient.Builder;
import io.fabric8.kubernetes.client.http.HttpHeaders;
import io.fabric8.kubernetes.client.http.Interceptor;
import io.fabric8.kubernetes.client.http.StandardHttpClientBuilder;
import io.fabric8.kubernetes.client.http.TlsVersion;
import io.fabric8.kubernetes.client.internal.SSLUtils;

import java.net.InetSocketAddress;
import java.net.ProxySelector;
import java.net.http.HttpClient.Redirect;
import java.net.http.HttpClient.Version;
import java.time.Duration;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.concurrent.TimeUnit;

import javax.net.ssl.KeyManager;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLParameters;
import javax.net.ssl.TrustManager;

/**
* TODO: if there is another implementation that does not support client builder copying, then this needs to be abstracted -
Expand All @@ -48,28 +40,17 @@
*
*/

class JdkHttpClientBuilderImpl implements Builder {
class JdkHttpClientBuilderImpl
extends StandardHttpClientBuilder<JdkHttpClientImpl, JdkHttpClientFactory, JdkHttpClientBuilderImpl> {

LinkedHashMap<String, Interceptor> interceptors = new LinkedHashMap<>();
Duration connectTimeout;
Duration readTimeout;
private SSLContext sslContext;
JdkHttpClientFactory clientFactory;
private String proxyAuthorization;
private InetSocketAddress proxyAddress;
private boolean followRedirects;
private boolean preferHttp11;
private TlsVersion[] tlsVersions;
private java.net.http.HttpClient httpClient;

JdkHttpClientBuilderImpl(JdkHttpClientFactory factory) {
this.clientFactory = factory;
public JdkHttpClientBuilderImpl(JdkHttpClientFactory factory) {
super(factory);
}

@Override
public HttpClient build() {
if (httpClient != null) {
return new JdkHttpClientImpl(this, httpClient);
if (client != null) {
return new JdkHttpClientImpl(this, client.getHttpClient(), this.requestConfig);
}
java.net.http.HttpClient.Builder builder = clientFactory.createNewHttpClientBuilder();
if (connectTimeout != null) {
Expand Down Expand Up @@ -104,101 +85,12 @@ public void before(BasicBuilder builder, HttpHeaders headers) {
Arrays.asList(tlsVersions).stream().map(TlsVersion::javaName).toArray(String[]::new)));
}
clientFactory.additionalConfig(builder);
return new JdkHttpClientImpl(this, builder.build());
}

@Override
public JdkHttpClientBuilderImpl readTimeout(long readTimeout, TimeUnit unit) {
if (readTimeout == 0) {
this.readTimeout = null;
} else {
this.readTimeout = Duration.ofNanos(unit.toNanos(readTimeout));
}
return this;
}

@Override
public Builder connectTimeout(long connectTimeout, TimeUnit unit) {
this.connectTimeout = Duration.ofNanos(unit.toNanos(connectTimeout));
return this;
}

@Override
public Builder forStreaming() {
// nothing to do
return this;
}

@Override
public Builder writeTimeout(long timeout, TimeUnit timeoutUnit) {
// nothing to do
return this;
}

@Override
public Builder addOrReplaceInterceptor(String name, Interceptor interceptor) {
if (interceptor == null) {
interceptors.remove(name);
} else {
interceptors.put(name, interceptor);
}
return this;
return new JdkHttpClientImpl(this, builder.build(), null);
}

@Override
public Builder authenticatorNone() {
return this;
}

@Override
public Builder sslContext(KeyManager[] keyManagers, TrustManager[] trustManagers) {
this.sslContext = SSLUtils.sslContext(keyManagers, trustManagers);
return this;
}

@Override
public Builder followAllRedirects() {
this.followRedirects = true;
return this;
}

@Override
public Builder proxyAddress(InetSocketAddress proxyAddress) {
this.proxyAddress = proxyAddress;
return this;
}

@Override
public Builder proxyAuthorization(String credentials) {
this.proxyAuthorization = credentials;
return this;
}

@Override
public Builder preferHttp11() {
this.preferHttp11 = true;
return this;
}

@Override
public Builder tlsVersions(TlsVersion... tlsVersions) {
this.tlsVersions = tlsVersions;
return this;
}

public JdkHttpClientBuilderImpl copy(java.net.http.HttpClient httpClient) {
JdkHttpClientBuilderImpl copy = new JdkHttpClientBuilderImpl(this.clientFactory);
copy.connectTimeout = this.connectTimeout;
copy.readTimeout = this.readTimeout;
copy.sslContext = this.sslContext;
copy.interceptors = new LinkedHashMap<>(this.interceptors);
copy.proxyAddress = this.proxyAddress;
copy.proxyAuthorization = this.proxyAuthorization;
copy.tlsVersions = this.tlsVersions;
copy.preferHttp11 = this.preferHttp11;
copy.followRedirects = this.followRedirects;
copy.httpClient = httpClient;
return copy;
protected JdkHttpClientBuilderImpl newInstance(JdkHttpClientFactory clientFactory) {
return new JdkHttpClientBuilderImpl(clientFactory);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.fabric8.kubernetes.client.jdkhttp;

import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.http.HttpClient;
import io.fabric8.kubernetes.client.http.HttpRequest;
import io.fabric8.kubernetes.client.http.HttpResponse;
Expand Down Expand Up @@ -214,25 +215,27 @@ public HandlerAndAsyncBody(BodyHandler<T> handler, AsyncBody asyncBody) {

private JdkHttpClientBuilderImpl builder;
private java.net.http.HttpClient httpClient;
private Config config;

public JdkHttpClientImpl(JdkHttpClientBuilderImpl builderImpl, java.net.http.HttpClient httpClient) {
public JdkHttpClientImpl(JdkHttpClientBuilderImpl builderImpl, java.net.http.HttpClient httpClient, Config config) {
this.builder = builderImpl;
this.httpClient = httpClient;
this.config = config;
}

@Override
public void close() {
if (this.httpClient == null) {
return;
}
builder.clientFactory.closeHttpClient(this);
builder.getClientFactory().closeHttpClient(this);
// help with default cleanup, which is based upon garbarge collection
this.httpClient = null;
}

@Override
public DerivedClientBuilder newBuilder() {
return this.builder.copy(getHttpClient());
return this.builder.copy(this);
}

@Override
Expand Down Expand Up @@ -286,8 +289,8 @@ public <T> CompletableFuture<AsyncResponse<T>> sendAsync(HttpRequest request,
Supplier<HandlerAndAsyncBody<T>> handlerAndAsyncBodySupplier) {
JdkHttpRequestImpl jdkRequest = (JdkHttpRequestImpl) request;
JdkHttpRequestImpl.BuilderImpl builderImpl = jdkRequest.newBuilder();
for (Interceptor interceptor : builder.interceptors.values()) {
interceptor.before(builderImpl, jdkRequest);
for (Interceptor interceptor : builder.getInterceptors().values()) {
Interceptor.useConfig(interceptor, config).before(builderImpl, jdkRequest);
jdkRequest = builderImpl.build();
}

Expand All @@ -296,19 +299,20 @@ public <T> CompletableFuture<AsyncResponse<T>> sendAsync(HttpRequest request,
CompletableFuture<AsyncResponse<T>> cf = this.getHttpClient().sendAsync(builderImpl.build().request,
handlerAndAsyncBody.handler).thenApply(r -> new AsyncResponse<>(r, handlerAndAsyncBody.asyncBody));

for (Interceptor interceptor : builder.interceptors.values()) {
for (Interceptor interceptor : builder.getInterceptors().values()) {
cf = cf.thenCompose(ar -> {
java.net.http.HttpResponse<T> response = ar.response;
if (response != null && !HttpResponse.isSuccessful(response.statusCode())) {
return interceptor.afterFailure(builderImpl, new JdkHttpResponseImpl<>(response)).thenCompose(b -> {
if (b) {
HandlerAndAsyncBody<T> interceptedHandlerAndAsyncBody = handlerAndAsyncBodySupplier.get();

return this.getHttpClient().sendAsync(builderImpl.build().request, interceptedHandlerAndAsyncBody.handler)
.thenApply(r -> new AsyncResponse<>(r, interceptedHandlerAndAsyncBody.asyncBody));
}
return CompletableFuture.completedFuture(ar);
});
return Interceptor.useConfig(interceptor, config).afterFailure(builderImpl, new JdkHttpResponseImpl<>(response))
.thenCompose(b -> {
if (b) {
HandlerAndAsyncBody<T> interceptedHandlerAndAsyncBody = handlerAndAsyncBodySupplier.get();

return this.getHttpClient().sendAsync(builderImpl.build().request, interceptedHandlerAndAsyncBody.handler)
.thenApply(r -> new AsyncResponse<>(r, interceptedHandlerAndAsyncBody.asyncBody));
}
return CompletableFuture.completedFuture(ar);
});
}
return CompletableFuture.completedFuture(ar);
});
Expand All @@ -324,7 +328,7 @@ public io.fabric8.kubernetes.client.http.WebSocket.Builder newWebSocketBuilder()

@Override
public io.fabric8.kubernetes.client.http.HttpRequest.Builder newHttpRequestBuilder() {
return new JdkHttpRequestImpl.BuilderImpl().timeout(this.builder.readTimeout);
return new JdkHttpRequestImpl.BuilderImpl().timeout(this.builder.getReadTimeout());
}

/*
Expand All @@ -344,23 +348,24 @@ public WebSocketResponse(WebSocket w, java.net.http.WebSocketHandshakeException
public CompletableFuture<WebSocket> buildAsync(JdkWebSocketImpl.BuilderImpl webSocketBuilder, Listener listener) {
JdkWebSocketImpl.BuilderImpl copy = webSocketBuilder.copy();

for (Interceptor interceptor : builder.interceptors.values()) {
interceptor.before(copy, new JdkHttpRequestImpl(null, copy.asRequest()));
for (Interceptor interceptor : builder.getInterceptors().values()) {
Interceptor.useConfig(interceptor, config).before(copy, new JdkHttpRequestImpl(null, copy.asRequest()));
}

CompletableFuture<WebSocket> result = new CompletableFuture<>();

CompletableFuture<WebSocketResponse> cf = internalBuildAsync(copy, listener);

for (Interceptor interceptor : builder.interceptors.values()) {
for (Interceptor interceptor : builder.getInterceptors().values()) {
cf = cf.thenCompose(response -> {
if (response.wshse != null && response.wshse.getResponse() != null) {
return interceptor.afterFailure(copy, new JdkHttpResponseImpl<>(response.wshse.getResponse())).thenCompose(b -> {
if (b) {
return this.internalBuildAsync(copy, listener);
}
return CompletableFuture.completedFuture(response);
});
return Interceptor.useConfig(interceptor, config)
.afterFailure(copy, new JdkHttpResponseImpl<>(response.wshse.getResponse())).thenCompose(b -> {
if (b) {
return this.internalBuildAsync(copy, listener);
}
return CompletableFuture.completedFuture(response);
});
}
return CompletableFuture.completedFuture(response);
});
Expand Down Expand Up @@ -399,8 +404,8 @@ public CompletableFuture<WebSocketResponse> internalBuildAsync(JdkWebSocketImpl.
}
// the Watch logic sets a websocketTimeout as the readTimeout
// TODO: this should probably be made clearer in the docs
if (this.builder.readTimeout != null) {
newBuilder.connectTimeout(this.builder.readTimeout);
if (this.builder.getReadTimeout() != null) {
newBuilder.connectTimeout(this.builder.getReadTimeout());
}

AtomicLong queueSize = new AtomicLong();
Expand Down Expand Up @@ -436,9 +441,4 @@ java.net.http.HttpClient getHttpClient() {
return httpClient;
}

@Override
public Factory getFactory() {
return builder.clientFactory;
}

}

This file was deleted.

0 comments on commit 25d7509

Please sign in to comment.