Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Jetty HttpClient implementation #4180

Merged
merged 2 commits into from
Jun 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/e2e-httpclient-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ jobs:
fail-fast: false
matrix:
kubernetes: [v1.24.0,v1.23.3, v1.12.10]
httpclient: [jdk]
httpclient: [jdk,jetty]
steps:
- name: Checkout
uses: actions/checkout@v3
Expand Down Expand Up @@ -67,7 +67,7 @@ jobs:
fail-fast: false
matrix:
openshift: [v3.11.0, v3.10.0]
httpclient: [jdk]
httpclient: [jdk,jetty]
steps:
- name: Checkout
uses: actions/checkout@v3
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release-snapshots.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,4 @@ jobs:
gpg-private-key: ${{ secrets.SIGNINGKEY }}
gpg-passphrase: SIGNINGPASSWORD
- name: Build and release Java 11 modules
run: ./mvnw ${MAVEN_ARGS} ${RELEASE_MAVEN_ARGS} -pl "httpclient-jdk" clean deploy
run: ./mvnw ${MAVEN_ARGS} ${RELEASE_MAVEN_ARGS} -pl "httpclient-jdk" -pl "httpclient-jetty" clean deploy
24 changes: 23 additions & 1 deletion httpclient-jdk/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

<artifactId>kubernetes-httpclient-jdk</artifactId>
<packaging>jar</packaging>
<name>Fabric8 :: Kubernetes :: JDK HttpClient</name>
<name>Fabric8 :: Kubernetes :: HttpClient :: JDK</name>

<properties>
<maven.compiler.release>11</maven.compiler.release>
Expand All @@ -50,6 +50,28 @@
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client-api</artifactId>
</dependency>

<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client-api</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>mockwebserver</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ public Builder preferHttp11() {
}

@Override
public Builder tlsVersions(TlsVersion[] tlsVersions) {
public Builder tlsVersions(TlsVersion... tlsVersions) {
this.tlsVersions = tlsVersions;
return this;
}
Expand All @@ -192,7 +192,6 @@ public JdkHttpClientBuilderImpl copy(java.net.http.HttpClient httpClient) {
copy.readTimeout = this.readTimeout;
copy.sslContext = this.sslContext;
copy.interceptors = new LinkedHashMap<>(this.interceptors);
copy.followRedirects = this.followRedirects;
copy.proxyAddress = this.proxyAddress;
copy.proxyAuthorization = this.proxyAuthorization;
copy.tlsVersions = this.tlsVersions;
Expand All @@ -202,4 +201,4 @@ public JdkHttpClientBuilderImpl copy(java.net.http.HttpClient httpClient) {
return copy;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand All @@ -43,6 +44,7 @@
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

/**
* TODO:
Expand All @@ -56,6 +58,9 @@ private final class AsyncBodySubscriber<T> implements Subscriber<T>, AsyncBody {
private CompletableFuture<Void> done = new CompletableFuture<Void>();
private final AtomicBoolean subscribed = new AtomicBoolean();
private volatile Flow.Subscription subscription;
private T initialItem;
private boolean first = true;
private boolean isComplete;

private AsyncBodySubscriber(BodyConsumer<T> consumer) {
this.consumer = consumer;
Expand All @@ -68,11 +73,20 @@ public void onSubscribe(Subscription subscription) {
return;
}
this.subscription = subscription;
// the sendAsync future won't complete unless we do the initial request here
// so in onNext we'll trap the item until we're ready
subscription.request(1);
}

@Override
public void onNext(T item) {
synchronized (this) {
if (first) {
this.initialItem = item;
first = false;
return;
}
}
try {
if (item == null) {
done.complete(null);
Expand All @@ -91,13 +105,32 @@ public void onError(Throwable throwable) {
}

@Override
public void onComplete() {
public synchronized void onComplete() {
if (initialItem != null) {
this.isComplete = true;
return;
}
done.complete(null);
}

@Override
public void consume() {
this.subscription.request(1);
public synchronized void consume() {
if (done.isDone()) {
return;
}
try {
first = false;
if (initialItem != null) {
T item = initialItem;
initialItem = null;
onNext(item);
}
} finally {
if (isComplete) {
done.complete(null);
}
this.subscription.request(1);
}
}

@Override
Expand Down Expand Up @@ -132,6 +165,11 @@ public List<String> headers(String key) {
return response.headers().allValues(key);
}

@Override
public Map<String, List<String>> headers() {
return response.headers().map();
}

@Override
public int code() {
return response.statusCode();
Expand All @@ -154,6 +192,26 @@ public Optional<HttpResponse<?>> previousResponse() {

}

static class AsyncResponse<T> {
java.net.http.HttpResponse<T> response;
AsyncBody asyncBody;

public AsyncResponse(java.net.http.HttpResponse<T> response, AsyncBody asyncBody) {
this.response = response;
this.asyncBody = asyncBody;
}
}

static class HandlerAndAsyncBody<T> {
BodyHandler<T> handler;
AsyncBody asyncBody;

public HandlerAndAsyncBody(BodyHandler<T> handler, AsyncBody asyncBody) {
this.handler = handler;
this.asyncBody = asyncBody;
}
}

private JdkHttpClientBuilderImpl builder;
private java.net.http.HttpClient httpClient;

Expand All @@ -179,20 +237,29 @@ public DerivedClientBuilder newBuilder() {

@Override
public CompletableFuture<HttpResponse<AsyncBody>> consumeLines(HttpRequest request, BodyConsumer<String> consumer) {
AsyncBodySubscriber<String> subscriber = new AsyncBodySubscriber<>(consumer);
BodyHandler<Void> handler = BodyHandlers.fromLineSubscriber(subscriber);
return sendAsync(request, handler).thenApply(r -> new JdkHttpResponseImpl<AsyncBody>(r, subscriber));
return sendAsync(request, () -> {
AsyncBodySubscriber<String> subscriber = new AsyncBodySubscriber<>(consumer);
BodyHandler<Void> handler = BodyHandlers.fromLineSubscriber(subscriber);
return new HandlerAndAsyncBody<>(handler, subscriber);
}).thenApply(r -> new JdkHttpResponseImpl<AsyncBody>(r.response, r.asyncBody));
}

@Override
public CompletableFuture<HttpResponse<AsyncBody>> consumeBytes(HttpRequest request, BodyConsumer<List<ByteBuffer>> consumer) {
AsyncBodySubscriber<List<ByteBuffer>> subscriber = new AsyncBodySubscriber<>(consumer);
BodyHandler<Void> handler = BodyHandlers.fromSubscriber(subscriber);
return sendAsync(request, handler).thenApply(r -> new JdkHttpResponseImpl<AsyncBody>(r, subscriber));
return sendAsync(request, () -> {
AsyncBodySubscriber<List<ByteBuffer>> subscriber = new AsyncBodySubscriber<>(consumer);
BodyHandler<Void> handler = BodyHandlers.fromSubscriber(subscriber);
return new HandlerAndAsyncBody<>(handler, subscriber);
}).thenApply(r -> new JdkHttpResponseImpl<AsyncBody>(r.response, r.asyncBody));
}

@Override
public <T> CompletableFuture<HttpResponse<T>> sendAsync(HttpRequest request, Class<T> type) {
return sendAsync(request, () -> new HandlerAndAsyncBody<T>(toBodyHandler(type), null))
.thenApply(ar -> new JdkHttpResponseImpl<>(ar.response));
}

private <T> BodyHandler<T> toBodyHandler(Class<T> type) {
BodyHandler<T> bodyHandler;
if (type == null) {
bodyHandler = (BodyHandler<T>) BodyHandlers.discarding();
Expand All @@ -212,31 +279,38 @@ public <T> CompletableFuture<HttpResponse<T>> sendAsync(HttpRequest request, Cla
return (BodySubscriber<T>) downstream;
};
}
return sendAsync(request, bodyHandler).thenApply(JdkHttpResponseImpl::new);
return bodyHandler;
}

public <T> CompletableFuture<java.net.http.HttpResponse<T>> sendAsync(HttpRequest request, BodyHandler<T> bodyHandler) {
public <T> CompletableFuture<AsyncResponse<T>> sendAsync(HttpRequest request,
Supplier<HandlerAndAsyncBody<T>> handlerAndAsyncBodySupplier) {
JdkHttpRequestImpl jdkRequest = (JdkHttpRequestImpl) request;
JdkHttpRequestImpl.BuilderImpl builderImpl = jdkRequest.newBuilder();
for (Interceptor interceptor : builder.interceptors.values()) {
interceptor.before(builderImpl, jdkRequest);
jdkRequest = builderImpl.build();
}

CompletableFuture<java.net.http.HttpResponse<T>> cf = this.getHttpClient().sendAsync(builderImpl.build().request,
bodyHandler);
HandlerAndAsyncBody<T> handlerAndAsyncBody = handlerAndAsyncBodySupplier.get();

CompletableFuture<AsyncResponse<T>> cf = this.getHttpClient().sendAsync(builderImpl.build().request,
handlerAndAsyncBody.handler).thenApply(r -> new AsyncResponse<>(r, handlerAndAsyncBody.asyncBody));

for (Interceptor interceptor : builder.interceptors.values()) {
cf = cf.thenCompose(response -> {
cf = cf.thenCompose(ar -> {
java.net.http.HttpResponse<T> response = ar.response;
if (response != null && !HttpResponse.isSuccessful(response.statusCode())) {
return interceptor.afterFailure(builderImpl, new JdkHttpResponseImpl<>(response)).thenCompose(b -> {
if (b) {
return this.getHttpClient().sendAsync(builderImpl.build().request, bodyHandler);
HandlerAndAsyncBody<T> interceptedHandlerAndAsyncBody = handlerAndAsyncBodySupplier.get();

return this.getHttpClient().sendAsync(builderImpl.build().request, interceptedHandlerAndAsyncBody.handler)
.thenApply(r -> new AsyncResponse<>(r, interceptedHandlerAndAsyncBody.asyncBody));
}
return CompletableFuture.completedFuture(response);
return CompletableFuture.completedFuture(ar);
});
}
return CompletableFuture.completedFuture(response);
return CompletableFuture.completedFuture(ar);
});
}

Expand Down Expand Up @@ -334,13 +408,7 @@ public CompletableFuture<WebSocketResponse> internalBuildAsync(JdkWebSocketImpl.
// use a responseholder to convey both the exception and the websocket
CompletableFuture<WebSocketResponse> response = new CompletableFuture<>();

URI uri = request.uri();
if (uri.getScheme().startsWith("http")) {
// the jdk logic expects a ws uri
// after the https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8245245 it just does the reverse of this
// to convert back to http(s) ...
uri = URI.create("ws" + uri.toString().substring(4));
}
URI uri = WebSocket.toWebSocketUri(request.uri());
newBuilder.buildAsync(uri, new JdkWebSocketImpl.ListenerAdapter(listener, queueSize)).whenComplete((w, t) -> {
if (t instanceof CompletionException && t.getCause() != null) {
t = t.getCause();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Flow.Subscriber;

class JdkHttpRequestImpl implements HttpRequest {
import static io.fabric8.kubernetes.client.http.StandardHttpHeaders.CONTENT_TYPE;

private static final String CONTENT_TYPE = "Content-Type";
class JdkHttpRequestImpl implements HttpRequest {

public static class BuilderImpl implements Builder {

Expand Down Expand Up @@ -141,6 +142,11 @@ public List<String> headers(String key) {
return request.headers().allValues(key);
}

@Override
public Map<String, List<String>> headers() {
return request.headers().map();
}

@Override
public URI uri() {
return request.uri();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/**
* Copyright (C) 2015 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.fabric8.kubernetes.client.jdkhttp;

import io.fabric8.kubernetes.client.http.AbstractAsyncBodyTest;
import io.fabric8.kubernetes.client.http.HttpClient;

@SuppressWarnings("java:S2187")
public class JdkHttpClientAsyncBodyTest extends AbstractAsyncBodyTest {
@Override
protected HttpClient.Factory getHttpClientFactory() {
return new JdkHttpClientFactory();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/**
* Copyright (C) 2015 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.fabric8.kubernetes.client.jdkhttp;

import io.fabric8.kubernetes.client.http.AbstractInterceptorTest;
import io.fabric8.kubernetes.client.http.HttpClient;

@SuppressWarnings("java:S2187")
public class JdkHttpClientInterceptorTest extends AbstractInterceptorTest {
@Override
protected HttpClient.Factory getHttpClientFactory() {
return new JdkHttpClientFactory();
}

}