Skip to content

Commit

Permalink
Merge pull request #8 from manusa/refactor/http-clients
Browse files Browse the repository at this point in the history
review: HttpClient implementation refactor
  • Loading branch information
shawkins committed Nov 29, 2022
2 parents 45d605b + 3356764 commit d2c2ae9
Show file tree
Hide file tree
Showing 24 changed files with 508 additions and 217 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.fabric8.kubernetes.client.jdkhttp;

import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.http.AsyncBody;
import io.fabric8.kubernetes.client.http.HttpClient;
import io.fabric8.kubernetes.client.http.HttpRequest;
import io.fabric8.kubernetes.client.http.HttpResponse;
Expand Down Expand Up @@ -97,12 +98,12 @@ public CompletionStage<AsyncBody> getBody() {
}
}

private final class AsyncBodySubscriber<T> implements Subscriber<T>, AsyncBody {
private final BodyConsumer<T> consumer;
private static final class AsyncBodySubscriber<T> implements Subscriber<T>, AsyncBody {
private final AsyncBody.Consumer<T> consumer;
private CompletableFuture<Void> done = new CompletableFuture<Void>();
private CompletableFuture<Flow.Subscription> subscription = new CompletableFuture<>();

private AsyncBodySubscriber(BodyConsumer<T> consumer) {
private AsyncBodySubscriber(AsyncBody.Consumer<T> consumer) {
this.consumer = consumer;
}

Expand Down Expand Up @@ -252,17 +253,18 @@ public DerivedClientBuilder newBuilder() {
}

@Override
public CompletableFuture<HttpResponse<AsyncBody>> consumeLines(HttpRequest request, BodyConsumer<String> consumer) {
public CompletableFuture<HttpResponse<AsyncBody>> consumeLines(HttpRequest request, AsyncBody.Consumer<String> consumer) {
return sendAsync(request, () -> {
AsyncBodySubscriber<String> subscriber = new AsyncBodySubscriber<>(consumer);
BodyHandler<Void> handler = BodyHandlers.fromLineSubscriber(subscriber);
BodyHandler<AsyncBody> handlerAdapter = new BodyHandlerAdapter(subscriber, handler);
return new HandlerAndAsyncBody<>(handlerAdapter, subscriber);
}).thenApply(r -> new JdkHttpResponseImpl<AsyncBody>(r.response, r.asyncBody));
}).thenApply(r -> new JdkHttpResponseImpl<>(r.response, r.asyncBody));
}

@Override
public CompletableFuture<HttpResponse<AsyncBody>> consumeBytes(HttpRequest request, BodyConsumer<List<ByteBuffer>> consumer) {
public CompletableFuture<HttpResponse<AsyncBody>> consumeBytes(HttpRequest request,
AsyncBody.Consumer<List<ByteBuffer>> consumer) {
return sendAsync(request, () -> {
AsyncBodySubscriber<List<ByteBuffer>> subscriber = new AsyncBodySubscriber<>(consumer);
BodyHandler<Void> handler = BodyHandlers.fromSubscriber(subscriber);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package io.fabric8.kubernetes.client.jetty;

import io.fabric8.kubernetes.client.http.HttpClient;
import io.fabric8.kubernetes.client.http.AsyncBody;
import io.fabric8.kubernetes.client.http.HttpRequest;
import io.fabric8.kubernetes.client.http.HttpResponse;
import org.eclipse.jetty.client.api.Request;
Expand All @@ -27,21 +27,20 @@
import java.util.concurrent.CompletableFuture;
import java.util.function.LongConsumer;

public abstract class JettyAsyncResponseListener<T> extends Response.Listener.Adapter implements HttpClient.AsyncBody {
public abstract class JettyAsyncResponseListener extends Response.Listener.Adapter implements AsyncBody {

private final HttpRequest httpRequest;
private final HttpClient.BodyConsumer<T> bodyConsumer;
private final CompletableFuture<HttpResponse<HttpClient.AsyncBody>> asyncResponse;
private final CompletableFuture<HttpResponse<AsyncBody>> asyncResponse;
private final CompletableFuture<Void> asyncBodyDone;
private CompletableFuture<LongConsumer> demand = new CompletableFuture<>();
private final CompletableFuture<LongConsumer> demand;
private boolean initialConsumeCalled;
private Runnable initialConsume;

JettyAsyncResponseListener(HttpRequest httpRequest, HttpClient.BodyConsumer<T> bodyConsumer) {
JettyAsyncResponseListener(HttpRequest httpRequest) {
this.httpRequest = httpRequest;
this.bodyConsumer = bodyConsumer;
asyncResponse = new CompletableFuture<>();
asyncBodyDone = new CompletableFuture<>();
demand = new CompletableFuture<>();
}

@Override
Expand Down Expand Up @@ -78,7 +77,7 @@ public void onComplete(Result result) {
asyncBodyDone.complete(null);
}

public CompletableFuture<HttpResponse<HttpClient.AsyncBody>> listen(Request request) {
public CompletableFuture<HttpResponse<AsyncBody>> listen(Request request) {
request.send(this);
return asyncResponse;
}
Expand All @@ -94,20 +93,21 @@ public void onContent(Response response, LongConsumer demand, ByteBuffer content
this.demand.complete(demand);
}
try {
// we must clone as the buffer can be reused after the call to succeeded
bodyConsumer.consume(process(response, clone(content)), this);
onContent(content);
callback.succeeded();
} catch (Exception e) {
callback.failed(e);
}
}

protected abstract T process(Response response, ByteBuffer content);
/**
* Implement to consume the content of the chunked response.
* <p>
* Each chunk will be passed <b>in order</b> to this function (<code>onContent{callback.succeeded}</code>)
*
* @param content the ByteBuffer containing a chunk of the response.
* @throws Exception in case the downstream consumer throws an exception.
*/
protected abstract void onContent(ByteBuffer content) throws Exception;

public static ByteBuffer clone(ByteBuffer original) {
ByteBuffer clone = ByteBuffer.allocate(original.remaining());
clone.put(original);
clone.flip();
return clone;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,28 @@

import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.http.AsyncBody;
import io.fabric8.kubernetes.client.http.HttpRequest;
import io.fabric8.kubernetes.client.http.HttpResponse;
import io.fabric8.kubernetes.client.http.Interceptor;
import io.fabric8.kubernetes.client.http.StandardHttpRequest;
import io.fabric8.kubernetes.client.http.WebSocket;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.util.InputStreamRequestContent;
import org.eclipse.jetty.client.util.StringRequestContent;
import org.eclipse.jetty.websocket.client.WebSocketClient;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import static io.fabric8.kubernetes.client.http.BufferUtil.copy;
import static io.fabric8.kubernetes.client.http.StandardMediaTypes.APPLICATION_OCTET_STREAM;
import static io.fabric8.kubernetes.client.http.StandardMediaTypes.TEXT_PLAIN;

Expand All @@ -47,7 +49,7 @@ public class JettyHttpClient implements io.fabric8.kubernetes.client.http.HttpCl
private final Collection<Interceptor> interceptors;
private final JettyHttpClientBuilder builder;
private final JettyHttpClientFactory factory;
private Config config;
private final Config config;

public JettyHttpClient(JettyHttpClientBuilder builder, HttpClient httpClient, WebSocketClient webSocketClient,
Collection<Interceptor> interceptors, JettyHttpClientFactory jettyHttpClientFactory, Config config) {
Expand Down Expand Up @@ -76,19 +78,37 @@ public DerivedClientBuilder newBuilder() {

@Override
public CompletableFuture<HttpResponse<AsyncBody>> consumeLines(
HttpRequest originalRequest, BodyConsumer<String> consumer) {
throw new UnsupportedOperationException("Not supported by the Jetty client");
HttpRequest originalRequest, AsyncBody.Consumer<String> consumer) {
final var request = toStandardHttpRequest(originalRequest);
final var future = new JettyAsyncResponseListener(request) {

final StringBuilder builder = new StringBuilder();

@Override
protected void onContent(ByteBuffer content) throws Exception {
for (char c : StandardCharsets.UTF_8.decode(content).array()) {
if (c == '\n') {
consumer.consume(builder.toString(), this);
builder.setLength(0);
} else {
builder.append(c);
}
}
}
}.listen(newRequest(request));
return interceptResponse(request.toBuilder(), future, r -> consumeLines(r, consumer));
}

@Override
public CompletableFuture<HttpResponse<AsyncBody>> consumeBytes(
HttpRequest originalRequest, BodyConsumer<List<ByteBuffer>> consumer) {
HttpRequest originalRequest, AsyncBody.Consumer<List<ByteBuffer>> consumer) {
final var request = toStandardHttpRequest(originalRequest);
final var future = new JettyAsyncResponseListener<>(request, consumer) {
final var future = new JettyAsyncResponseListener(request) {

@Override
protected List<ByteBuffer> process(Response response, ByteBuffer content) {
return Collections.singletonList(content);
protected void onContent(ByteBuffer content) throws Exception {
// we must clone as the buffer can be reused by the byte consumer
consumer.consume(Collections.singletonList(copy(content)), this);
}
}.listen(newRequest(request));
return interceptResponse(request.toBuilder(), future, r -> consumeBytes(r, consumer));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,4 @@ public class JettyAsyncBodyTest extends AbstractAsyncBodyTest {
protected HttpClient.Factory getHttpClientFactory() {
return new JettyHttpClientFactory();
}

@Override
public void consumeLinesNotProcessedIfCancelled() throws Exception {
// consume lines not supported
}

@Override
public void consumeLinesProcessedAfterConsume() throws Exception {
// consume lines not supported
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,4 @@ public class JettyInterceptorTest extends AbstractInterceptorTest {
protected HttpClient.Factory getHttpClientFactory() {
return new JettyHttpClientFactory();
}

@Override
public void afterHttpFailureReplacesResponseInConsumeLines() throws Exception {
// consume lines not supported
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.http.AsyncBody;
import io.fabric8.kubernetes.client.http.HttpClient;
import io.fabric8.kubernetes.client.http.HttpRequest;
import io.fabric8.kubernetes.client.http.HttpResponse;
Expand Down Expand Up @@ -51,9 +52,6 @@ public class OkHttpClientImpl implements HttpClient {
static final Map<String, MediaType> MEDIA_TYPES = new ConcurrentHashMap<>();

public static final MediaType JSON = parseMediaType("application/json");
public static final MediaType JSON_PATCH = parseMediaType("application/json-patch+json");
public static final MediaType STRATEGIC_MERGE_JSON_PATCH = parseMediaType("application/strategic-merge-patch+json");
public static final MediaType JSON_MERGE_PATCH = parseMediaType("application/merge-patch+json");

static MediaType parseMediaType(String contentType) {
MediaType result = MediaType.parse(contentType);
Expand All @@ -62,11 +60,11 @@ static MediaType parseMediaType(String contentType) {
}

private abstract class OkHttpAsyncBody<T> implements AsyncBody {
private final BodyConsumer<T> consumer;
private final AsyncBody.Consumer<T> consumer;
private final BufferedSource source;
private final CompletableFuture<Void> done = new CompletableFuture<>();

private OkHttpAsyncBody(BodyConsumer<T> consumer, BufferedSource source) {
private OkHttpAsyncBody(AsyncBody.Consumer<T> consumer, BufferedSource source) {
this.consumer = consumer;
this.source = source;
}
Expand Down Expand Up @@ -217,8 +215,8 @@ public DerivedClientBuilder newBuilder() {
}

@Override
public CompletableFuture<HttpResponse<AsyncBody>> consumeLines(HttpRequest request,
BodyConsumer<String> consumer) {
public CompletableFuture<HttpResponse<AsyncBody>> consumeLines(
HttpRequest request, AsyncBody.Consumer<String> consumer) {
Function<BufferedSource, AsyncBody> handler = s -> new OkHttpAsyncBody<String>(consumer, s) {
@Override
protected String process(BufferedSource source) throws IOException {
Expand All @@ -232,8 +230,8 @@ protected String process(BufferedSource source) throws IOException {
}

@Override
public CompletableFuture<HttpResponse<AsyncBody>> consumeBytes(HttpRequest request,
BodyConsumer<List<ByteBuffer>> consumer) {
public CompletableFuture<HttpResponse<AsyncBody>> consumeBytes(
HttpRequest request, AsyncBody.Consumer<List<ByteBuffer>> consumer) {
Function<BufferedSource, AsyncBody> handler = s -> new OkHttpAsyncBody<List<ByteBuffer>>(consumer, s) {
@Override
protected List<ByteBuffer> process(BufferedSource source) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,4 @@ protected HttpClient.Factory getHttpClientFactory() {
return new JettyHttpClientFactory();
}

@Override
void testConsumeLines() throws Exception {
// line parsing not yet supported
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package io.fabric8.kubernetes.client.http;

import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.http.HttpClient.AsyncBody;
import io.fabric8.kubernetes.client.http.WebSocket.Listener;
import io.fabric8.kubernetes.client.okhttp.OkHttpClientFactory;
import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
Expand All @@ -42,6 +41,7 @@
import java.util.concurrent.atomic.AtomicInteger;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand Down Expand Up @@ -222,7 +222,7 @@ void testConsumeLines() throws Exception {
@DisplayName("Supported response body types")
@ParameterizedTest(name = "{index}: {0}")
@ValueSource(classes = { String.class, byte[].class, Reader.class, InputStream.class })
void testSupportedTypes(Class<?> type) throws Exception {
void supportedResponseBodyTypes(Class<?> type) throws Exception {
String value = new String(new byte[16384]);
server.expect().withPath("/type").andReturn(200, value).always();
final HttpResponse<?> result = client.getHttpClient()
Expand All @@ -234,4 +234,14 @@ void testSupportedTypes(Class<?> type) throws Exception {
.satisfies(r -> assertThat(r.bodyString()).isEqualTo(value));
}

@Test
void supportedResponseTypeWithInvalid() {
final HttpClient httpClient = client.getHttpClient();
final HttpRequest request = httpClient.newHttpRequestBuilder()
.uri(URI.create(client.getConfiguration().getMasterUrl() + "type")).build();
assertThatIllegalArgumentException()
.isThrownBy(() -> httpClient.sendAsync(request, Boolean.class))
.withMessageStartingWith("Unsupported response type: ")
.withMessageContaining("Boolean");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,7 @@ void postNamespaceMismatch() throws Exception {
final HttpResponse<String> result = httpClient.sendAsync(request, String.class).get(10, TimeUnit.SECONDS);
// Then
assertThat(result)
.extracting(HttpResponse::code).isEqualTo(400);
assertThat(result)
.returns(400, HttpResponse::code)
.extracting(HttpResponse::body).asString()
.contains("the namespace of the object (different) does not match the namespace on the URL (test)");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,7 @@ void putNameMismatch() throws Exception {
final HttpResponse<String> result = httpClient.sendAsync(request, String.class).get(10, TimeUnit.SECONDS);
// Then
assertThat(result)
.extracting(HttpResponse::code).isEqualTo(400);
assertThat(result)
.returns(400, HttpResponse::code)
.extracting(HttpResponse::body).asString()
.contains("the name of the object (different) does not match the name on the URL (mismatched-name)");
}
Expand Down

0 comments on commit d2c2ae9

Please sign in to comment.