Skip to content

Commit

Permalink
fix fabric8io#4201: generalizing sendAsync support
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins committed Sep 19, 2022
1 parent 66ca429 commit 23ad882
Show file tree
Hide file tree
Showing 14 changed files with 393 additions and 180 deletions.
Expand Up @@ -23,17 +23,11 @@
import io.fabric8.kubernetes.client.http.WebSocket;
import io.fabric8.kubernetes.client.http.WebSocket.Listener;

import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.net.URI;
import java.net.http.HttpResponse.BodyHandler;
import java.net.http.HttpResponse.BodyHandlers;
import java.net.http.HttpResponse.BodySubscriber;
import java.net.http.HttpResponse.BodySubscribers;
import java.net.http.WebSocketHandshakeException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -253,35 +247,6 @@ public CompletableFuture<HttpResponse<AsyncBody>> consumeBytes(HttpRequest reque
}).thenApply(r -> new JdkHttpResponseImpl<AsyncBody>(r.response, r.asyncBody));
}

@Override
public <T> CompletableFuture<HttpResponse<T>> sendAsync(HttpRequest request, Class<T> type) {
return sendAsync(request, () -> new HandlerAndAsyncBody<T>(toBodyHandler(type), null))
.thenApply(ar -> new JdkHttpResponseImpl<>(ar.response));
}

private <T> BodyHandler<T> toBodyHandler(Class<T> type) {
BodyHandler<T> bodyHandler;
if (type == null) {
bodyHandler = (BodyHandler<T>) BodyHandlers.discarding();
} else if (type == InputStream.class) {
bodyHandler = (BodyHandler<T>) BodyHandlers.ofInputStream();
} else if (type == String.class) {
bodyHandler = (BodyHandler<T>) BodyHandlers.ofString();
} else if (type == byte[].class) {
bodyHandler = (BodyHandler<T>) BodyHandlers.ofByteArray();
} else {
bodyHandler = responseInfo -> {
BodySubscriber<InputStream> upstream = BodyHandlers.ofInputStream().apply(responseInfo);

BodySubscriber<Reader> downstream = BodySubscribers.mapping(
upstream,
(InputStream is) -> new InputStreamReader(is, StandardCharsets.UTF_8));
return (BodySubscriber<T>) downstream;
};
}
return bodyHandler;
}

public <T> CompletableFuture<AsyncResponse<T>> sendAsync(HttpRequest request,
Supplier<HandlerAndAsyncBody<T>> handlerAndAsyncBodySupplier) {
JdkHttpRequestImpl jdkRequest = (JdkHttpRequestImpl) request;
Expand Down
Expand Up @@ -24,8 +24,6 @@
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.util.BufferingResponseListener;
import org.eclipse.jetty.client.util.InputStreamRequestContent;
import org.eclipse.jetty.client.util.StringRequestContent;
import org.eclipse.jetty.websocket.client.WebSocketClient;
Expand All @@ -40,7 +38,6 @@

import static io.fabric8.kubernetes.client.http.StandardMediaTypes.APPLICATION_OCTET_STREAM;
import static io.fabric8.kubernetes.client.http.StandardMediaTypes.TEXT_PLAIN;
import static org.eclipse.jetty.util.BufferUtil.toArray;

public class JettyHttpClient implements io.fabric8.kubernetes.client.http.HttpClient {

Expand Down Expand Up @@ -74,39 +71,10 @@ public DerivedClientBuilder newBuilder() {
return builder.copy();
}

@Override
public <T> CompletableFuture<HttpResponse<T>> sendAsync(HttpRequest originalRequest, Class<T> type) {
final var supportedResponse = JettyHttpResponse.SupportedResponse.from(type);
final var request = toStandardHttpRequest(originalRequest);
final CompletableFuture<HttpResponse<T>> future = new CompletableFuture<>();
newRequest(request).send(new BufferingResponseListener() {

// TODO: Long Term Refactor - This Listener blocks until the full response is read and keeps it in memory.
// Find a way to stream the response body without completing the future
// We need two signals, one when the response is received, and one when the body is completely
// read.
// Should this method be completely replaced by consumeXxx()?
@Override
public void onComplete(Result result) {
future.complete(new JettyHttpResponse<>(
request, result.getResponse(), supportedResponse.process(result.getResponse(), getContent(), type)));
}
});
return interceptResponse(request.toBuilder(), future, r -> sendAsync(r, type));
}

@Override
public CompletableFuture<HttpResponse<AsyncBody>> consumeLines(
HttpRequest originalRequest, BodyConsumer<String> consumer) {
final var request = toStandardHttpRequest(originalRequest);
final var future = new JettyAsyncResponseListener<>(request, consumer) {

@Override
protected String process(Response response, ByteBuffer content) {
return JettyHttpResponse.SupportedResponse.TEXT.process(response, toArray(content), String.class);
}
}.listen(newRequest(request));
return interceptResponse(request.toBuilder(), future, r -> consumeLines(r, consumer));
throw new UnsupportedOperationException("Not supported by the Jetty client");
}

@Override
Expand Down
Expand Up @@ -17,19 +17,13 @@

import io.fabric8.kubernetes.client.http.HttpRequest;
import io.fabric8.kubernetes.client.http.HttpResponse;
import io.fabric8.kubernetes.client.utils.Utils;
import org.eclipse.jetty.client.api.Response;

import java.io.ByteArrayInputStream;
import java.io.InputStreamReader;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiFunction;

public class JettyHttpResponse<T> implements HttpResponse<T> {

Expand Down Expand Up @@ -82,45 +76,4 @@ public Optional<HttpResponse<?>> previousResponse() {
return Optional.empty();
}

enum SupportedResponse {

TEXT(String.class, (r, bytes) -> new String(bytes, responseCharset(r))),
INPUT_STREAM(ByteArrayInputStream.class, (r, bytes) -> new ByteArrayInputStream(bytes)),
READER(InputStreamReader.class, (r, bytes) -> new InputStreamReader(new ByteArrayInputStream(bytes), responseCharset(r))),
BYTE_ARRAY(byte[].class, (r, bytes) -> bytes);

private final Class<?> type;
private final BiFunction<Response, byte[], Object> processor;

SupportedResponse(Class<?> type, BiFunction<Response, byte[], Object> processor) {
this.type = type;
this.processor = processor;
}

public <T> T process(Response response, byte[] bytes, Class<T> type) {
return type.cast(processor.apply(response, bytes));
}

static SupportedResponse from(Class<?> type) {
for (SupportedResponse sr : SupportedResponse.values()) {
if (type.isAssignableFrom(sr.type)) {
return sr;
}
}
throw new IllegalArgumentException("Unsupported response type: " + type.getName());
}

private static Charset responseCharset(Response response) {
var responseCharset = StandardCharsets.UTF_8;
final var responseEncoding = response.getHeaders().get("Content-Encoding");
if (Utils.isNotNullOrEmpty(responseEncoding)) {
try {
responseCharset = Charset.forName(responseEncoding);
} catch (Exception e) {
// ignored
}
}
return responseCharset;
}
}
}
Expand Up @@ -24,4 +24,14 @@ public class JettyAsyncBodyTest extends AbstractAsyncBodyTest {
protected HttpClient.Factory getHttpClientFactory() {
return new JettyHttpClientFactory();
}

@Override
public void consumeLinesNotProcessedIfCancelled() throws Exception {
// consume lines not supported
}

@Override
public void consumeLinesProcessedAfterConsume() throws Exception {
// consume lines not supported
}
}
Expand Up @@ -101,19 +101,6 @@ void newBuilderInstantiatesJettyHttpClientBuilderWithSameSettings() throws Excep
}
}

@Test
@DisplayName("sendAsync with unsupported type throws Exception")
void sendAsyncUnsupportedType() {
try (var jettyHttpClient = new JettyHttpClient(
null, httpClient, webSocketClient, Collections.emptyList(), null)) {
// When
final var result = assertThrows(IllegalArgumentException.class,
() -> jettyHttpClient.sendAsync(null, Integer.class));
// Then
assertThat(result).hasMessage("Unsupported response type: java.lang.Integer");
}
}

@Test
@DisplayName("sendAsync with unsupported HttpRequest throws Exception")
void sendAsyncUnsupportedHttpRequest() {
Expand Down
Expand Up @@ -17,22 +17,13 @@

import org.eclipse.jetty.client.HttpResponse;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.params.provider.Arguments.arguments;

class JettyHttpResponseTest {

Expand All @@ -56,22 +47,4 @@ void headersHandlesJettyHttpFields() {
.containsEntry("Via", Arrays.asList("proxy-1", "proxy-2"));
}

@ParameterizedTest(name = "{index}: SupportedResponse: from type ''{0}'' is ''{1}''")
@MethodSource("supportedResponsesInput")
void supportedResponses(Class<?> type, JettyHttpResponse.SupportedResponse supportedResponse) {
// When
final var result = JettyHttpResponse.SupportedResponse.from(type);
// Then
assertThat(result).isEqualTo(supportedResponse);
}

static Stream<Arguments> supportedResponsesInput() {
return Stream.of(
arguments(String.class, JettyHttpResponse.SupportedResponse.TEXT),
arguments(InputStream.class, JettyHttpResponse.SupportedResponse.INPUT_STREAM),
arguments(ByteArrayInputStream.class, JettyHttpResponse.SupportedResponse.INPUT_STREAM),
arguments(Reader.class, JettyHttpResponse.SupportedResponse.READER),
arguments(InputStreamReader.class, JettyHttpResponse.SupportedResponse.READER),
arguments(byte[].class, JettyHttpResponse.SupportedResponse.BYTE_ARRAY));
}
}
Expand Up @@ -24,4 +24,9 @@ public class JettyInterceptorTest extends AbstractInterceptorTest {
protected HttpClient.Factory getHttpClientFactory() {
return new JettyHttpClientFactory();
}

@Override
public void afterHttpFailureReplacesResponseInConsumeLines() throws Exception {
// consume lines not supported
}
}
Expand Up @@ -257,30 +257,6 @@ public void onFailure(Call call, IOException e) {
return future;
}

@Override
public <T> CompletableFuture<HttpResponse<T>> sendAsync(HttpRequest request, Class<T> type) {
CompletableFuture<HttpResponse<T>> future = new CompletableFuture<>();
Call call = httpClient.newCall(((OkHttpRequestImpl) request).getRequest());
call.enqueue(new Callback() {

@Override
public void onResponse(Call call, Response response) throws IOException {
future.complete(new OkHttpResponseImpl<>(response, type));
}

@Override
public void onFailure(Call call, IOException e) {
future.completeExceptionally(e);
}
});
future.whenComplete((r, t) -> {
if (future.isCancelled()) {
call.cancel();
}
});
return future;
}

@Override
public io.fabric8.kubernetes.client.http.WebSocket.Builder newWebSocketBuilder() {
return new OkHttpWebSocketImpl.BuilderImpl(this.httpClient);
Expand Down
Expand Up @@ -24,4 +24,9 @@ protected HttpClient.Factory getHttpClientFactory() {
return new JettyHttpClientFactory();
}

@Override
void testConsumeLines() throws Exception {
// line parsing not yet supported
}

}
Expand Up @@ -31,6 +31,8 @@
import java.io.InputStream;
import java.io.Reader;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -178,6 +180,41 @@ void testAsyncBody() throws Exception {
assertTrue(consumed.get(5, TimeUnit.SECONDS));
}

@Test
void testConsumeLines() throws Exception {
server.expect().withPath("/async").andReturn(200, "hello\nworld\nlines\n").always();

ArrayList<String> strings = new ArrayList<>();
CompletableFuture<Void> consumed = new CompletableFuture<>();

CompletableFuture<HttpResponse<AsyncBody>> responseFuture = client.getHttpClient().consumeLines(
client.getHttpClient().newHttpRequestBuilder().uri(URI.create(client.getConfiguration().getMasterUrl() + "async"))
.build(),
(value, asyncBody) -> {
strings.add(value);
asyncBody.consume();
});

responseFuture.whenComplete((r, t) -> {
if (t != null) {
consumed.completeExceptionally(t);
}
if (r != null) {
r.body().consume();
r.body().done().whenComplete((v, ex) -> {
if (ex != null) {
consumed.completeExceptionally(ex);
} else {
consumed.complete(null);
}
});
}
});

consumed.get(5, TimeUnit.SECONDS);
assertEquals(Arrays.asList("hello", "world", "lines"), strings);
}

@DisplayName("Supported response body types")
@ParameterizedTest(name = "{index}: {0}")
@ValueSource(classes = { String.class, byte[].class, Reader.class, InputStream.class })
Expand Down

0 comments on commit 23ad882

Please sign in to comment.