Skip to content

Commit

Permalink
fix #4472: refined withRequestConfig so that a new client is not created
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins committed Oct 11, 2022
1 parent 177de1c commit 1bb8a34
Show file tree
Hide file tree
Showing 18 changed files with 152 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.fabric8.kubernetes.client.jdkhttp;

import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.http.HttpClient;
import io.fabric8.kubernetes.client.http.HttpRequest;
import io.fabric8.kubernetes.client.http.HttpResponse;
Expand Down Expand Up @@ -214,6 +215,7 @@ public HandlerAndAsyncBody(BodyHandler<T> handler, AsyncBody asyncBody) {

private JdkHttpClientBuilderImpl builder;
private java.net.http.HttpClient httpClient;
private Config config;

public JdkHttpClientImpl(JdkHttpClientBuilderImpl builderImpl, java.net.http.HttpClient httpClient) {
this.builder = builderImpl;
Expand Down Expand Up @@ -287,7 +289,7 @@ public <T> CompletableFuture<AsyncResponse<T>> sendAsync(HttpRequest request,
JdkHttpRequestImpl jdkRequest = (JdkHttpRequestImpl) request;
JdkHttpRequestImpl.BuilderImpl builderImpl = jdkRequest.newBuilder();
for (Interceptor interceptor : builder.interceptors.values()) {
interceptor.before(builderImpl, jdkRequest);
Interceptor.useConfig(interceptor, config).before(builderImpl, jdkRequest);
jdkRequest = builderImpl.build();
}

Expand All @@ -300,15 +302,16 @@ public <T> CompletableFuture<AsyncResponse<T>> sendAsync(HttpRequest request,
cf = cf.thenCompose(ar -> {
java.net.http.HttpResponse<T> response = ar.response;
if (response != null && !HttpResponse.isSuccessful(response.statusCode())) {
return interceptor.afterFailure(builderImpl, new JdkHttpResponseImpl<>(response)).thenCompose(b -> {
if (b) {
HandlerAndAsyncBody<T> interceptedHandlerAndAsyncBody = handlerAndAsyncBodySupplier.get();

return this.getHttpClient().sendAsync(builderImpl.build().request, interceptedHandlerAndAsyncBody.handler)
.thenApply(r -> new AsyncResponse<>(r, interceptedHandlerAndAsyncBody.asyncBody));
}
return CompletableFuture.completedFuture(ar);
});
return Interceptor.useConfig(interceptor, config).afterFailure(builderImpl, new JdkHttpResponseImpl<>(response))
.thenCompose(b -> {
if (b) {
HandlerAndAsyncBody<T> interceptedHandlerAndAsyncBody = handlerAndAsyncBodySupplier.get();

return this.getHttpClient().sendAsync(builderImpl.build().request, interceptedHandlerAndAsyncBody.handler)
.thenApply(r -> new AsyncResponse<>(r, interceptedHandlerAndAsyncBody.asyncBody));
}
return CompletableFuture.completedFuture(ar);
});
}
return CompletableFuture.completedFuture(ar);
});
Expand Down Expand Up @@ -345,7 +348,7 @@ public CompletableFuture<WebSocket> buildAsync(JdkWebSocketImpl.BuilderImpl webS
JdkWebSocketImpl.BuilderImpl copy = webSocketBuilder.copy();

for (Interceptor interceptor : builder.interceptors.values()) {
interceptor.before(copy, new JdkHttpRequestImpl(null, copy.asRequest()));
Interceptor.useConfig(interceptor, config).before(copy, new JdkHttpRequestImpl(null, copy.asRequest()));
}

CompletableFuture<WebSocket> result = new CompletableFuture<>();
Expand All @@ -355,12 +358,13 @@ public CompletableFuture<WebSocket> buildAsync(JdkWebSocketImpl.BuilderImpl webS
for (Interceptor interceptor : builder.interceptors.values()) {
cf = cf.thenCompose(response -> {
if (response.wshse != null && response.wshse.getResponse() != null) {
return interceptor.afterFailure(copy, new JdkHttpResponseImpl<>(response.wshse.getResponse())).thenCompose(b -> {
if (b) {
return this.internalBuildAsync(copy, listener);
}
return CompletableFuture.completedFuture(response);
});
return Interceptor.useConfig(interceptor, config)
.afterFailure(copy, new JdkHttpResponseImpl<>(response.wshse.getResponse())).thenCompose(b -> {
if (b) {
return this.internalBuildAsync(copy, listener);
}
return CompletableFuture.completedFuture(response);
});
}
return CompletableFuture.completedFuture(response);
});
Expand Down Expand Up @@ -437,8 +441,10 @@ java.net.http.HttpClient getHttpClient() {
}

@Override
public Factory getFactory() {
return builder.clientFactory;
public HttpClient withRequestConfig(Config config) {
JdkHttpClientImpl impl = new JdkHttpClientImpl(this.builder, this.httpClient);
impl.config = config;
return impl;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.fabric8.kubernetes.client.jetty;

import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.http.HttpRequest;
import io.fabric8.kubernetes.client.http.HttpResponse;
Expand Down Expand Up @@ -49,6 +50,7 @@ public class JettyHttpClient implements io.fabric8.kubernetes.client.http.HttpCl
private final Collection<Interceptor> interceptors;
private final JettyHttpClientBuilder builder;
private final JettyHttpClientFactory factory;
private Config config;

public JettyHttpClient(JettyHttpClientBuilder builder, HttpClient httpClient, WebSocketClient webSocketClient,
Collection<Interceptor> interceptors, JettyHttpClientFactory jettyHttpClientFactory) {
Expand Down Expand Up @@ -133,7 +135,6 @@ public HttpRequest.Builder newHttpRequestBuilder() {
return new StandardHttpRequest.Builder();
}

@Override
public Factory getFactory() {
return factory;
}
Expand All @@ -145,7 +146,7 @@ private Request newRequest(StandardHttpRequest originalRequest) {
throw KubernetesClientException.launderThrowable(e);
}
final var requestBuilder = originalRequest.toBuilder();
interceptors.forEach(i -> i.before(requestBuilder, originalRequest));
interceptors.forEach(i -> Interceptor.useConfig(i, config).before(requestBuilder, originalRequest));
final var request = requestBuilder.build();

var jettyRequest = jetty.newRequest(request.uri()).method(request.method());
Expand All @@ -167,7 +168,7 @@ private <T> CompletableFuture<HttpResponse<T>> interceptResponse(
for (var interceptor : interceptors) {
originalResponse = originalResponse.thenCompose(r -> {
if (!r.isSuccessful()) {
return interceptor.afterFailure(builder, r)
return Interceptor.useConfig(interceptor, config).afterFailure(builder, r)
.thenCompose(b -> {
if (Boolean.TRUE.equals(b)) {
return function.apply(builder.build());
Expand Down Expand Up @@ -195,4 +196,11 @@ HttpClient getJetty() {
WebSocketClient getJettyWs() {
return jettyWs;
}

@Override
public io.fabric8.kubernetes.client.http.HttpClient withRequestConfig(Config config) {
JettyHttpClient client = builder.copy().build();
client.config = config;
return client;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public JettyHttpClientBuilder preferHttp11() {
return this;
}

public Builder copy() {
public JettyHttpClientBuilder copy() {
final var ret = new JettyHttpClientBuilder(factory);
ret.sharedHttpClient = sharedHttpClient;
ret.sharedWebSocketClient = sharedWebSocketClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.fabric8.kubernetes.client.okhttp;

import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.http.HttpClient.Builder;
import io.fabric8.kubernetes.client.http.TlsVersion;
Expand Down Expand Up @@ -61,13 +62,16 @@ static final class InteceptorAdapter implements Interceptor {
@Override
public Response intercept(Chain chain) throws IOException {
Request.Builder requestBuilder = chain.request().newBuilder();
Config config = chain.request().tag(Config.class);
OkHttpRequestImpl.BuilderImpl builderImpl = new OkHttpRequestImpl.BuilderImpl(requestBuilder);
interceptor.before(new OkHttpRequestImpl.BuilderImpl(requestBuilder), new OkHttpRequestImpl(chain.request()));
io.fabric8.kubernetes.client.http.Interceptor.useConfig(interceptor, config)
.before(new OkHttpRequestImpl.BuilderImpl(requestBuilder), new OkHttpRequestImpl(chain.request()));
Response response = chain.proceed(requestBuilder.build());
if (!response.isSuccessful()) {
// for okhttp this token refresh will be blocking
try {
boolean call = interceptor.afterFailure(builderImpl, new OkHttpResponseImpl<>(response, InputStream.class)).get();
boolean call = io.fabric8.kubernetes.client.http.Interceptor.useConfig(interceptor, config)
.afterFailure(builderImpl, new OkHttpResponseImpl<>(response, InputStream.class)).get();
if (call) {
response.close();
return chain.proceed(requestBuilder.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.fabric8.kubernetes.client.okhttp;

import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.http.HttpClient;
import io.fabric8.kubernetes.client.http.HttpRequest;
import io.fabric8.kubernetes.client.http.HttpResponse;
Expand All @@ -26,6 +27,7 @@
import okhttp3.Dispatcher;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.ResponseBody;
import okio.BufferedSource;
Expand Down Expand Up @@ -176,6 +178,8 @@ public Map<String, List<String>> headers() {
private final okhttp3.OkHttpClient httpClient;
private final OkHttpClientFactory factory;

private Config config;

public OkHttpClientImpl(OkHttpClient httpClient, OkHttpClientFactory factory) {
this.httpClient = httpClient;
this.factory = factory;
Expand Down Expand Up @@ -288,7 +292,7 @@ public void onFailure(Call call, IOException e) {

@Override
public io.fabric8.kubernetes.client.http.WebSocket.Builder newWebSocketBuilder() {
return new OkHttpWebSocketImpl.BuilderImpl(this.httpClient);
return new OkHttpWebSocketImpl.BuilderImpl(this.httpClient, newRequestBuilder());
}

public okhttp3.OkHttpClient getOkHttpClient() {
Expand All @@ -297,12 +301,18 @@ public okhttp3.OkHttpClient getOkHttpClient() {

@Override
public HttpRequest.Builder newHttpRequestBuilder() {
return new OkHttpRequestImpl.BuilderImpl();
return new OkHttpRequestImpl.BuilderImpl(newRequestBuilder());
}

private okhttp3.Request.Builder newRequestBuilder() {
return new Request.Builder().tag(Config.class, config);
}

@Override
public Factory getFactory() {
return this.factory;
public HttpClient withRequestConfig(Config config) {
OkHttpClientImpl result = new OkHttpClientImpl(httpClient, factory);
result.config = config;
return result;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ static class BuilderImpl implements Builder {

private final Request.Builder builder;

public BuilderImpl() {
this(new Request.Builder());
}

public BuilderImpl(Request.Builder builder) {
this.builder = builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ static class BuilderImpl implements WebSocket.Builder {
private Request.Builder builder = new Request.Builder();
private OkHttpClient httpClient;

public BuilderImpl(OkHttpClient httpClient) {
public BuilderImpl(OkHttpClient httpClient, okhttp3.Request.Builder builder) {
this.httpClient = httpClient;
this.builder = builder;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ public Config(String masterUrl, String apiVersion, String namespace, boolean tru

this.requestConfig = new RequestConfig(username, password, oauthToken, watchReconnectLimit, watchReconnectInterval,
connectionTimeout, rollingTimeout, requestTimeout, scaleTimeout, loggingInterval, websocketTimeout,
websocketPingInterval, maxConcurrentRequests, maxConcurrentRequestsPerHost, oauthTokenProvider,
websocketPingInterval, oauthTokenProvider,
requestRetryBackoffLimit, requestRetryBackoffInterval, uploadConnectionTimeout, uploadRequestTimeout);
this.requestConfig.setImpersonateUsername(impersonateUsername);
this.requestConfig.setImpersonateGroups(impersonateGroups);
Expand All @@ -383,6 +383,8 @@ public Config(String masterUrl, String apiVersion, String namespace, boolean tru
//We set the masterUrl because it's needed by ensureHttps
this.masterUrl = masterUrl;
this.masterUrl = ensureEndsWithSlash(ensureHttps(masterUrl, this));
this.maxConcurrentRequests = maxConcurrentRequests;
this.maxConcurrentRequestsPerHost = maxConcurrentRequestsPerHost;
}

public static void configFromSysPropsOrEnvVars(Config config) {
Expand Down Expand Up @@ -1268,19 +1270,19 @@ public void setWebsocketPingInterval(long websocketPingInterval) {
}

public int getMaxConcurrentRequests() {
return getRequestConfig().getMaxConcurrentRequests();
return maxConcurrentRequests;
}

public void setMaxConcurrentRequests(int maxConcurrentRequests) {
this.requestConfig.setMaxConcurrentRequests(maxConcurrentRequests);
this.maxConcurrentRequests = maxConcurrentRequests;
}

public int getMaxConcurrentRequestsPerHost() {
return getRequestConfig().getMaxConcurrentRequestsPerHost();
return maxConcurrentRequestsPerHost;
}

public void setMaxConcurrentRequestsPerHost(int maxConcurrentRequestsPerHost) {
this.requestConfig.setMaxConcurrentRequestsPerHost(maxConcurrentRequestsPerHost);
this.maxConcurrentRequestsPerHost = maxConcurrentRequestsPerHost;
}

@JsonProperty("proxyUsername")
Expand Down Expand Up @@ -1347,11 +1349,11 @@ public String getKeyStoreFile() {

@JsonIgnore
public OAuthTokenProvider getOauthTokenProvider() {
return oauthTokenProvider;
return this.getRequestConfig().getOauthTokenProvider();
}

public void setOauthTokenProvider(OAuthTokenProvider oauthTokenProvider) {
this.oauthTokenProvider = oauthTokenProvider;
this.requestConfig.setOauthTokenProvider(oauthTokenProvider);
}

@JsonProperty("customHeaders")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import java.util.Map;

import static io.fabric8.kubernetes.client.Config.DEFAULT_LOGGING_INTERVAL;
import static io.fabric8.kubernetes.client.Config.DEFAULT_MAX_CONCURRENT_REQUESTS;
import static io.fabric8.kubernetes.client.Config.DEFAULT_MAX_CONCURRENT_REQUESTS_PER_HOST;
import static io.fabric8.kubernetes.client.Config.DEFAULT_REQUEST_RETRY_BACKOFFINTERVAL;
import static io.fabric8.kubernetes.client.Config.DEFAULT_REQUEST_RETRY_BACKOFFLIMIT;
import static io.fabric8.kubernetes.client.Config.DEFAULT_ROLLING_TIMEOUT;
Expand Down Expand Up @@ -59,8 +57,6 @@ public class RequestConfig {
private int loggingInterval = DEFAULT_LOGGING_INTERVAL;
private long websocketTimeout = DEFAULT_WEBSOCKET_TIMEOUT;
private long websocketPingInterval = DEFAULT_WEBSOCKET_PING_INTERVAL;
private int maxConcurrentRequests = DEFAULT_MAX_CONCURRENT_REQUESTS;
private int maxConcurrentRequestsPerHost = DEFAULT_MAX_CONCURRENT_REQUESTS_PER_HOST;

RequestConfig() {
}
Expand Down Expand Up @@ -90,7 +86,7 @@ public RequestConfig(String username, String password, String oauthToken,
int maxConcurrentRequests, int maxConcurrentRequestsPerHost) {
this(username, password, oauthToken, watchReconnectLimit, watchReconnectInterval, connectionTimeout, rollingTimeout,
requestTimeout, scaleTimeout, loggingInterval,
websocketTimeout, websocketPingInterval, maxConcurrentRequests, maxConcurrentRequestsPerHost, null,
websocketTimeout, websocketPingInterval, null,
DEFAULT_REQUEST_RETRY_BACKOFFLIMIT, DEFAULT_REQUEST_RETRY_BACKOFFINTERVAL,
DEFAULT_UPLOAD_CONNECTION_TIMEOUT, DEFAULT_UPLOAD_REQUEST_TIMEOUT);
}
Expand All @@ -100,7 +96,7 @@ public RequestConfig(String username, String password, String oauthToken,
int watchReconnectLimit, int watchReconnectInterval,
int connectionTimeout, long rollingTimeout, int requestTimeout, long scaleTimeout, int loggingInterval,
long websocketTimeout, long websocketPingInterval,
int maxConcurrentRequests, int maxConcurrentRequestsPerHost, OAuthTokenProvider oauthTokenProvider,
OAuthTokenProvider oauthTokenProvider,
int requestRetryBackoffLimit, int requestRetryBackoffInterval, int uploadConnectionTimeout, int uploadRequestTimeout) {
this.username = username;
this.oauthToken = oauthToken;
Expand All @@ -114,8 +110,6 @@ public RequestConfig(String username, String password, String oauthToken,
this.websocketTimeout = websocketTimeout;
this.loggingInterval = loggingInterval;
this.websocketPingInterval = websocketPingInterval;
this.maxConcurrentRequests = maxConcurrentRequests;
this.maxConcurrentRequestsPerHost = maxConcurrentRequestsPerHost;
this.oauthTokenProvider = oauthTokenProvider;
this.requestRetryBackoffLimit = requestRetryBackoffLimit;
this.requestRetryBackoffInterval = requestRetryBackoffInterval;
Expand Down Expand Up @@ -262,22 +256,6 @@ public void setWebsocketPingInterval(long websocketPingInterval) {
this.websocketPingInterval = websocketPingInterval;
}

public int getMaxConcurrentRequests() {
return maxConcurrentRequests;
}

public void setMaxConcurrentRequests(int maxConcurrentRequests) {
this.maxConcurrentRequests = maxConcurrentRequests;
}

public int getMaxConcurrentRequestsPerHost() {
return maxConcurrentRequestsPerHost;
}

public void setMaxConcurrentRequestsPerHost(int maxConcurrentRequestsPerHost) {
this.maxConcurrentRequestsPerHost = maxConcurrentRequestsPerHost;
}

public void setImpersonateUsername(String impersonateUsername) {
this.impersonateUsername = impersonateUsername;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ public WithRequestCallable(C client, RequestConfig requestConfig) {

@Override
public <O> O call(Function<C, O> function) {
try (C newClient = (C) this.client.newClient(requestConfig).adapt(this.client.getClass())) {
return function.apply(newClient);
}
C newClient = (C) this.client.newClient(requestConfig).adapt(this.client.getClass());
return function.apply(newClient);
}
}

0 comments on commit 1bb8a34

Please sign in to comment.