From ed64589d0777be248798e62d7a546f04376d4c36 Mon Sep 17 00:00:00 2001 From: Steve Hawkins Date: Mon, 19 Sep 2022 18:19:48 -0400 Subject: [PATCH 1/5] fix #4201: generalizing sendAsync support --- CHANGELOG.md | 1 + .../client/jdkhttp/JdkHttpClientImpl.java | 35 ----- .../client/jetty/JettyHttpClient.java | 34 +---- .../client/jetty/JettyHttpResponse.java | 47 ------ .../client/jetty/JettyAsyncBodyTest.java | 10 ++ .../client/jetty/JettyHttpClientTest.java | 13 -- .../client/jetty/JettyHttpResponseTest.java | 27 ---- .../client/jetty/JettyInterceptorTest.java | 5 + .../client/okhttp/OkHttpClientImpl.java | 24 --- .../client/http/JettyHttpClientTest.java | 5 + .../client/http/OkHttpClientTest.java | 37 +++++ .../KubernetesCrudDispatcherPostTest.java | 3 +- .../crud/KubernetesCrudDispatcherPutTest.java | 3 +- .../client/http/ByteArrayBodyHandler.java | 66 +++++++++ .../kubernetes/client/http/HttpClient.java | 4 +- .../http/HttpClientReadableByteChannel.java | 128 ++++++++++++++++ .../client/http/SendAsyncUtils.java | 138 ++++++++++++++++++ 17 files changed, 398 insertions(+), 182 deletions(-) create mode 100644 kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/ByteArrayBodyHandler.java create mode 100644 kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClientReadableByteChannel.java create mode 100644 kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/SendAsyncUtils.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 263f0ec6e1..58949b31dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ * Fix #4396: Provide more error context when @Group/@Version annotations are missing * Fix #4384: The Java generator now supports the generation of specific annotations (min, max, pattern, etc.), as defined by #4348 * Fix #3864: Change ManagedOpenShiftClient OSGi ConfigurationPolicy to REQUIRE +* Fix #4201: Removed sendAsync from the individual http client implementations #### Dependency Upgrade * Fix #4243: Update Tekton pipeline model to v0.39.0 diff --git a/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientImpl.java b/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientImpl.java index e68d1110d6..46f8bc688b 100644 --- a/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientImpl.java +++ b/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientImpl.java @@ -23,17 +23,11 @@ import io.fabric8.kubernetes.client.http.WebSocket; import io.fabric8.kubernetes.client.http.WebSocket.Listener; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.Reader; import java.net.URI; import java.net.http.HttpResponse.BodyHandler; import java.net.http.HttpResponse.BodyHandlers; -import java.net.http.HttpResponse.BodySubscriber; -import java.net.http.HttpResponse.BodySubscribers; import java.net.http.WebSocketHandshakeException; import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; import java.util.Optional; @@ -253,35 +247,6 @@ public CompletableFuture> consumeBytes(HttpRequest reque }).thenApply(r -> new JdkHttpResponseImpl(r.response, r.asyncBody)); } - @Override - public CompletableFuture> sendAsync(HttpRequest request, Class type) { - return sendAsync(request, () -> new HandlerAndAsyncBody(toBodyHandler(type), null)) - .thenApply(ar -> new JdkHttpResponseImpl<>(ar.response)); - } - - private BodyHandler toBodyHandler(Class type) { - BodyHandler bodyHandler; - if (type == null) { - bodyHandler = (BodyHandler) BodyHandlers.discarding(); - } else if (type == InputStream.class) { - bodyHandler = (BodyHandler) BodyHandlers.ofInputStream(); - } else if (type == String.class) { - bodyHandler = (BodyHandler) BodyHandlers.ofString(); - } else if (type == byte[].class) { - bodyHandler = (BodyHandler) BodyHandlers.ofByteArray(); - } else { - bodyHandler = responseInfo -> { - BodySubscriber upstream = BodyHandlers.ofInputStream().apply(responseInfo); - - BodySubscriber downstream = BodySubscribers.mapping( - upstream, - (InputStream is) -> new InputStreamReader(is, StandardCharsets.UTF_8)); - return (BodySubscriber) downstream; - }; - } - return bodyHandler; - } - public CompletableFuture> sendAsync(HttpRequest request, Supplier> handlerAndAsyncBodySupplier) { JdkHttpRequestImpl jdkRequest = (JdkHttpRequestImpl) request; diff --git a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpClient.java b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpClient.java index e53dfaf2b7..9a56c51926 100644 --- a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpClient.java +++ b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpClient.java @@ -24,8 +24,6 @@ import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; -import org.eclipse.jetty.client.api.Result; -import org.eclipse.jetty.client.util.BufferingResponseListener; import org.eclipse.jetty.client.util.InputStreamRequestContent; import org.eclipse.jetty.client.util.StringRequestContent; import org.eclipse.jetty.websocket.client.WebSocketClient; @@ -40,7 +38,6 @@ import static io.fabric8.kubernetes.client.http.StandardMediaTypes.APPLICATION_OCTET_STREAM; import static io.fabric8.kubernetes.client.http.StandardMediaTypes.TEXT_PLAIN; -import static org.eclipse.jetty.util.BufferUtil.toArray; public class JettyHttpClient implements io.fabric8.kubernetes.client.http.HttpClient { @@ -74,39 +71,10 @@ public DerivedClientBuilder newBuilder() { return builder.copy(); } - @Override - public CompletableFuture> sendAsync(HttpRequest originalRequest, Class type) { - final var supportedResponse = JettyHttpResponse.SupportedResponse.from(type); - final var request = toStandardHttpRequest(originalRequest); - final CompletableFuture> future = new CompletableFuture<>(); - newRequest(request).send(new BufferingResponseListener() { - - // TODO: Long Term Refactor - This Listener blocks until the full response is read and keeps it in memory. - // Find a way to stream the response body without completing the future - // We need two signals, one when the response is received, and one when the body is completely - // read. - // Should this method be completely replaced by consumeXxx()? - @Override - public void onComplete(Result result) { - future.complete(new JettyHttpResponse<>( - request, result.getResponse(), supportedResponse.process(result.getResponse(), getContent(), type))); - } - }); - return interceptResponse(request.toBuilder(), future, r -> sendAsync(r, type)); - } - @Override public CompletableFuture> consumeLines( HttpRequest originalRequest, BodyConsumer consumer) { - final var request = toStandardHttpRequest(originalRequest); - final var future = new JettyAsyncResponseListener<>(request, consumer) { - - @Override - protected String process(Response response, ByteBuffer content) { - return JettyHttpResponse.SupportedResponse.TEXT.process(response, toArray(content), String.class); - } - }.listen(newRequest(request)); - return interceptResponse(request.toBuilder(), future, r -> consumeLines(r, consumer)); + throw new UnsupportedOperationException("Not supported by the Jetty client"); } @Override diff --git a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpResponse.java b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpResponse.java index 77a6669677..1c93df2932 100644 --- a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpResponse.java +++ b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpResponse.java @@ -17,19 +17,13 @@ import io.fabric8.kubernetes.client.http.HttpRequest; import io.fabric8.kubernetes.client.http.HttpResponse; -import io.fabric8.kubernetes.client.utils.Utils; import org.eclipse.jetty.client.api.Response; -import java.io.ByteArrayInputStream; -import java.io.InputStreamReader; -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.function.BiFunction; public class JettyHttpResponse implements HttpResponse { @@ -82,45 +76,4 @@ public Optional> previousResponse() { return Optional.empty(); } - enum SupportedResponse { - - TEXT(String.class, (r, bytes) -> new String(bytes, responseCharset(r))), - INPUT_STREAM(ByteArrayInputStream.class, (r, bytes) -> new ByteArrayInputStream(bytes)), - READER(InputStreamReader.class, (r, bytes) -> new InputStreamReader(new ByteArrayInputStream(bytes), responseCharset(r))), - BYTE_ARRAY(byte[].class, (r, bytes) -> bytes); - - private final Class type; - private final BiFunction processor; - - SupportedResponse(Class type, BiFunction processor) { - this.type = type; - this.processor = processor; - } - - public T process(Response response, byte[] bytes, Class type) { - return type.cast(processor.apply(response, bytes)); - } - - static SupportedResponse from(Class type) { - for (SupportedResponse sr : SupportedResponse.values()) { - if (type.isAssignableFrom(sr.type)) { - return sr; - } - } - throw new IllegalArgumentException("Unsupported response type: " + type.getName()); - } - - private static Charset responseCharset(Response response) { - var responseCharset = StandardCharsets.UTF_8; - final var responseEncoding = response.getHeaders().get("Content-Encoding"); - if (Utils.isNotNullOrEmpty(responseEncoding)) { - try { - responseCharset = Charset.forName(responseEncoding); - } catch (Exception e) { - // ignored - } - } - return responseCharset; - } - } } diff --git a/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyAsyncBodyTest.java b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyAsyncBodyTest.java index ca45f6bd9f..747b81ab90 100644 --- a/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyAsyncBodyTest.java +++ b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyAsyncBodyTest.java @@ -24,4 +24,14 @@ public class JettyAsyncBodyTest extends AbstractAsyncBodyTest { protected HttpClient.Factory getHttpClientFactory() { return new JettyHttpClientFactory(); } + + @Override + public void consumeLinesNotProcessedIfCancelled() throws Exception { + // consume lines not supported + } + + @Override + public void consumeLinesProcessedAfterConsume() throws Exception { + // consume lines not supported + } } diff --git a/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpClientTest.java b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpClientTest.java index c701fa23fd..d3afb0f82c 100644 --- a/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpClientTest.java +++ b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpClientTest.java @@ -101,19 +101,6 @@ void newBuilderInstantiatesJettyHttpClientBuilderWithSameSettings() throws Excep } } - @Test - @DisplayName("sendAsync with unsupported type throws Exception") - void sendAsyncUnsupportedType() { - try (var jettyHttpClient = new JettyHttpClient( - null, httpClient, webSocketClient, Collections.emptyList(), null)) { - // When - final var result = assertThrows(IllegalArgumentException.class, - () -> jettyHttpClient.sendAsync(null, Integer.class)); - // Then - assertThat(result).hasMessage("Unsupported response type: java.lang.Integer"); - } - } - @Test @DisplayName("sendAsync with unsupported HttpRequest throws Exception") void sendAsyncUnsupportedHttpRequest() { diff --git a/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpResponseTest.java b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpResponseTest.java index 3bf6233458..f6bc401eac 100644 --- a/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpResponseTest.java +++ b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpResponseTest.java @@ -17,22 +17,13 @@ import org.eclipse.jetty.client.HttpResponse; import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; -import java.io.ByteArrayInputStream; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.Reader; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.params.provider.Arguments.arguments; class JettyHttpResponseTest { @@ -56,22 +47,4 @@ void headersHandlesJettyHttpFields() { .containsEntry("Via", Arrays.asList("proxy-1", "proxy-2")); } - @ParameterizedTest(name = "{index}: SupportedResponse: from type ''{0}'' is ''{1}''") - @MethodSource("supportedResponsesInput") - void supportedResponses(Class type, JettyHttpResponse.SupportedResponse supportedResponse) { - // When - final var result = JettyHttpResponse.SupportedResponse.from(type); - // Then - assertThat(result).isEqualTo(supportedResponse); - } - - static Stream supportedResponsesInput() { - return Stream.of( - arguments(String.class, JettyHttpResponse.SupportedResponse.TEXT), - arguments(InputStream.class, JettyHttpResponse.SupportedResponse.INPUT_STREAM), - arguments(ByteArrayInputStream.class, JettyHttpResponse.SupportedResponse.INPUT_STREAM), - arguments(Reader.class, JettyHttpResponse.SupportedResponse.READER), - arguments(InputStreamReader.class, JettyHttpResponse.SupportedResponse.READER), - arguments(byte[].class, JettyHttpResponse.SupportedResponse.BYTE_ARRAY)); - } } diff --git a/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyInterceptorTest.java b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyInterceptorTest.java index afbd6a2bc9..2e23e6eb77 100644 --- a/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyInterceptorTest.java +++ b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyInterceptorTest.java @@ -24,4 +24,9 @@ public class JettyInterceptorTest extends AbstractInterceptorTest { protected HttpClient.Factory getHttpClientFactory() { return new JettyHttpClientFactory(); } + + @Override + public void afterHttpFailureReplacesResponseInConsumeLines() throws Exception { + // consume lines not supported + } } diff --git a/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java b/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java index f0d3262ba6..00341862b0 100644 --- a/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java +++ b/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java @@ -257,30 +257,6 @@ public void onFailure(Call call, IOException e) { return future; } - @Override - public CompletableFuture> sendAsync(HttpRequest request, Class type) { - CompletableFuture> future = new CompletableFuture<>(); - Call call = httpClient.newCall(((OkHttpRequestImpl) request).getRequest()); - call.enqueue(new Callback() { - - @Override - public void onResponse(Call call, Response response) throws IOException { - future.complete(new OkHttpResponseImpl<>(response, type)); - } - - @Override - public void onFailure(Call call, IOException e) { - future.completeExceptionally(e); - } - }); - future.whenComplete((r, t) -> { - if (future.isCancelled()) { - call.cancel(); - } - }); - return future; - } - @Override public io.fabric8.kubernetes.client.http.WebSocket.Builder newWebSocketBuilder() { return new OkHttpWebSocketImpl.BuilderImpl(this.httpClient); diff --git a/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/JettyHttpClientTest.java b/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/JettyHttpClientTest.java index a8c88e36f4..bd8a90df11 100644 --- a/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/JettyHttpClientTest.java +++ b/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/JettyHttpClientTest.java @@ -24,4 +24,9 @@ protected HttpClient.Factory getHttpClientFactory() { return new JettyHttpClientFactory(); } + @Override + void testConsumeLines() throws Exception { + // line parsing not yet supported + } + } diff --git a/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/OkHttpClientTest.java b/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/OkHttpClientTest.java index 4ef2f18697..829f428dc4 100644 --- a/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/OkHttpClientTest.java +++ b/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/OkHttpClientTest.java @@ -31,6 +31,8 @@ import java.io.InputStream; import java.io.Reader; import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -178,6 +180,41 @@ void testAsyncBody() throws Exception { assertTrue(consumed.get(5, TimeUnit.SECONDS)); } + @Test + void testConsumeLines() throws Exception { + server.expect().withPath("/async").andReturn(200, "hello\nworld\nlines\n").always(); + + ArrayList strings = new ArrayList<>(); + CompletableFuture consumed = new CompletableFuture<>(); + + CompletableFuture> responseFuture = client.getHttpClient().consumeLines( + client.getHttpClient().newHttpRequestBuilder().uri(URI.create(client.getConfiguration().getMasterUrl() + "async")) + .build(), + (value, asyncBody) -> { + strings.add(value); + asyncBody.consume(); + }); + + responseFuture.whenComplete((r, t) -> { + if (t != null) { + consumed.completeExceptionally(t); + } + if (r != null) { + r.body().consume(); + r.body().done().whenComplete((v, ex) -> { + if (ex != null) { + consumed.completeExceptionally(ex); + } else { + consumed.complete(null); + } + }); + } + }); + + consumed.get(5, TimeUnit.SECONDS); + assertEquals(Arrays.asList("hello", "world", "lines"), strings); + } + @DisplayName("Supported response body types") @ParameterizedTest(name = "{index}: {0}") @ValueSource(classes = { String.class, byte[].class, Reader.class, InputStream.class }) diff --git a/junit/kubernetes-server-mock/src/test/java/io/fabric8/kubernetes/client/server/mock/crud/KubernetesCrudDispatcherPostTest.java b/junit/kubernetes-server-mock/src/test/java/io/fabric8/kubernetes/client/server/mock/crud/KubernetesCrudDispatcherPostTest.java index 8416b8422e..4023fed116 100644 --- a/junit/kubernetes-server-mock/src/test/java/io/fabric8/kubernetes/client/server/mock/crud/KubernetesCrudDispatcherPostTest.java +++ b/junit/kubernetes-server-mock/src/test/java/io/fabric8/kubernetes/client/server/mock/crud/KubernetesCrudDispatcherPostTest.java @@ -178,7 +178,8 @@ void postNamespaceMismatch() throws Exception { final HttpResponse result = httpClient.sendAsync(request, String.class).get(10, TimeUnit.SECONDS); // Then assertThat(result) - .hasFieldOrPropertyWithValue("response.code", 400) + .extracting(HttpResponse::code).isEqualTo(400); + assertThat(result) .extracting(HttpResponse::body).asString() .contains("the namespace of the object (different) does not match the namespace on the URL (test)"); } diff --git a/junit/kubernetes-server-mock/src/test/java/io/fabric8/kubernetes/client/server/mock/crud/KubernetesCrudDispatcherPutTest.java b/junit/kubernetes-server-mock/src/test/java/io/fabric8/kubernetes/client/server/mock/crud/KubernetesCrudDispatcherPutTest.java index b92bacdfe0..4033d78789 100644 --- a/junit/kubernetes-server-mock/src/test/java/io/fabric8/kubernetes/client/server/mock/crud/KubernetesCrudDispatcherPutTest.java +++ b/junit/kubernetes-server-mock/src/test/java/io/fabric8/kubernetes/client/server/mock/crud/KubernetesCrudDispatcherPutTest.java @@ -113,7 +113,8 @@ void putNameMismatch() throws Exception { final HttpResponse result = httpClient.sendAsync(request, String.class).get(10, TimeUnit.SECONDS); // Then assertThat(result) - .hasFieldOrPropertyWithValue("response.code", 400) + .extracting(HttpResponse::code).isEqualTo(400); + assertThat(result) .extracting(HttpResponse::body).asString() .contains("the name of the object (different) does not match the name on the URL (mismatched-name)"); } diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/ByteArrayBodyHandler.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/ByteArrayBodyHandler.java new file mode 100644 index 0000000000..799c62f407 --- /dev/null +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/ByteArrayBodyHandler.java @@ -0,0 +1,66 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.fabric8.kubernetes.client.http; + +import io.fabric8.kubernetes.client.http.HttpClient.AsyncBody; +import io.fabric8.kubernetes.client.http.HttpClient.BodyConsumer; + +import java.nio.ByteBuffer; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +public class ByteArrayBodyHandler implements BodyConsumer> { + + private final LinkedList buffers = new LinkedList<>(); + private final CompletableFuture result = new CompletableFuture(); + + @Override + public synchronized void consume(List value, AsyncBody asyncBody) throws Exception { + this.buffers.addAll(value); + asyncBody.consume(); + } + + protected void onResponse(HttpResponse response) { + AsyncBody asyncBody = response.body(); + asyncBody.done().whenComplete(this::onBodyDone); + asyncBody.consume(); + } + + private synchronized void onBodyDone(Void v, Throwable t) { + if (t != null) { + result.completeExceptionally(t); + } else { + int size = buffers.stream().map(ByteBuffer::remaining).collect(Collectors.summingInt(Integer::intValue)).intValue(); + byte[] res = new byte[size]; + int from = 0; + for (ByteBuffer b : buffers) { + int l = b.remaining(); + b.get(res, from, l); + from += l; + } + result.complete(res); + } + buffers.clear(); + } + + public CompletableFuture getResult() { + return result; + } + +} diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClient.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClient.java index 88bc7b9177..e59a40d947 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClient.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClient.java @@ -183,7 +183,9 @@ interface BodyConsumer { * @param type one of InputStream, Reader, String, byte[] * @return a CompletableFuture that returns the resulting HttpResponse when complete */ - CompletableFuture> sendAsync(HttpRequest request, Class type); + default CompletableFuture> sendAsync(HttpRequest request, Class type) { + return SendAsyncUtils.sendAsync(request, type, this); + } /** * Send a request and consume the lines of the response body using the same logic as {@link BufferedReader} to diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClientReadableByteChannel.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClientReadableByteChannel.java new file mode 100644 index 0000000000..430dee3cab --- /dev/null +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClientReadableByteChannel.java @@ -0,0 +1,128 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.fabric8.kubernetes.client.http; + +import io.fabric8.kubernetes.client.http.HttpClient.AsyncBody; +import io.fabric8.kubernetes.client.http.HttpClient.BodyConsumer; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedByInterruptException; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.ReadableByteChannel; +import java.util.LinkedList; +import java.util.List; + +/** + * Creates a blocking {@link ReadableByteChannel} from a {@link HttpResponse} containing an {@link AsyncBody} + *

+ * May be useful eventually to provide a non-blocking channel as well. + */ +public class HttpClientReadableByteChannel implements ReadableByteChannel, BodyConsumer> { + + private final LinkedList buffers = new LinkedList<>(); + private Throwable failed; + private boolean closed; + private boolean done; + private AsyncBody asyncBody; + private ByteBuffer currentBuffer; + + @Override + public synchronized void consume(List value, AsyncBody asyncBody) throws Exception { + this.buffers.addAll(value); + // could proactively consume based up some byte limit + this.notifyAll(); + } + + protected synchronized void onResponse(HttpResponse response) { + asyncBody = response.body(); + asyncBody.done().whenComplete(this::onBodyDone); + asyncBody.consume(); + this.notifyAll(); + } + + private synchronized void onBodyDone(Void v, Throwable t) { + if (t != null) { + failed = t; + } + done = true; + this.notifyAll(); + } + + @Override + public synchronized void close() { + if (this.closed) { + return; + } + if (asyncBody != null) { + asyncBody.cancel(); + } + this.closed = true; + this.notifyAll(); + } + + @Override + public synchronized boolean isOpen() { + return !closed; + } + + @Override + public synchronized int read(ByteBuffer arg0) throws IOException { + if (closed) { + throw new ClosedChannelException(); + } + + int read = 0; + + while (arg0.hasRemaining()) { + while (currentBuffer == null || !currentBuffer.hasRemaining()) { + if (buffers.isEmpty()) { + if (failed != null) { + throw new IOException("closed", failed); + } + if (read > 0) { + return read; + } + if (done) { + return -1; + } + if (this.asyncBody != null) { + this.asyncBody.consume(); + } + try { + this.wait(); // block until more buffers are delivered + } catch (InterruptedException e) { + close(); + Thread.currentThread().interrupt(); + throw new ClosedByInterruptException(); + } + } + + currentBuffer = buffers.poll(); + } + + int remaining = Math.min(arg0.remaining(), currentBuffer.remaining()); + for (int i = 0; i < remaining; i++) { + arg0.put(currentBuffer.get()); + } + read += remaining; + } + + return read; + } + +} diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/SendAsyncUtils.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/SendAsyncUtils.java new file mode 100644 index 0000000000..af73adb93f --- /dev/null +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/SendAsyncUtils.java @@ -0,0 +1,138 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.fabric8.kubernetes.client.http; + +import io.fabric8.kubernetes.client.http.HttpClient.AsyncBody; + +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.nio.channels.Channels; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +/** + * If we need support other than UTF-8, see jdk.internal.net.http.common.Utils.charsetFrom + */ +class SendAsyncUtils { + + /** + * Allows for changing the body type - there is further redesign that could be done here + */ + static class HttpResponseAdapter implements HttpResponse { + + private final HttpResponse response; + private final T body; + + public HttpResponseAdapter(HttpResponse response, T body) { + this.response = response; + this.body = body; + } + + @Override + public List headers(String key) { + return response.headers(key); + } + + @Override + public boolean isSuccessful() { + return response.isSuccessful(); + } + + @Override + public Map> headers() { + return response.headers(); + } + + @Override + public int code() { + return response.code(); + } + + @Override + public String message() { + return response.message(); + } + + @Override + public HttpRequest request() { + return response.request(); + } + + @Override + public Optional> previousResponse() { + return response.previousResponse(); + } + + @Override + public T body() { + return body; + } + + } + + static CompletableFuture> reader(HttpRequest request, HttpClient client) { + return inputStream(request, client) + .thenApply(res -> new HttpResponseAdapter<>(res, new InputStreamReader(res.body(), StandardCharsets.UTF_8))); + } + + static CompletableFuture> inputStream(HttpRequest request, HttpClient client) { + HttpClientReadableByteChannel byteChannel = new HttpClientReadableByteChannel(); + CompletableFuture> futureResponse = client.consumeBytes(request, byteChannel); + return futureResponse.thenApply(res -> { + byteChannel.onResponse(res); + return new HttpResponseAdapter<>(res, Channels.newInputStream(byteChannel)); + }); + } + + static CompletableFuture> bytes(HttpRequest request, HttpClient client) { + ByteArrayBodyHandler byteArrayBodyHandler = new ByteArrayBodyHandler(); + CompletableFuture> futureResponse = client.consumeBytes(request, byteArrayBodyHandler); + return futureResponse.thenCompose(res -> { + byteArrayBodyHandler.onResponse(res); + return byteArrayBodyHandler.getResult() + .thenApply(b -> new HttpResponseAdapter<>(res, b)); + }); + } + + static CompletableFuture> string(HttpRequest request, HttpClient client) { + return bytes(request, client).thenApply(res -> { + return new HttpResponseAdapter<>(res, + new String(res.body(), StandardCharsets.UTF_8)); + }); + } + + static CompletableFuture sendAsync(HttpRequest request, Class type, HttpClient httpClient) { + if (type == String.class) { + return string(request, httpClient); + } + if (type == byte[].class) { + return bytes(request, httpClient); + } + if (type == Reader.class) { + return reader(request, httpClient); + } + if (type == InputStream.class) { + return inputStream(request, httpClient); + } + throw new AssertionError("unknown type"); + } + +} From 130d42a38642d471715b3963f2c5095938d5fefd Mon Sep 17 00:00:00 2001 From: Steve Hawkins Date: Wed, 5 Oct 2022 08:11:43 -0400 Subject: [PATCH 2/5] fix #4201: ensuring okhttp is tolerant to multiple consume calls also converting jetty to fully non-blocking and adapting the body delivery for jdk for use with sendAsync logic - we need the AsyncBody to be available immediately --- .../client/jdkhttp/JdkHttpClientImpl.java | 98 +++++++++++-------- .../jetty/JettyAsyncResponseListener.java | 47 +++++---- .../client/okhttp/OkHttpClientImpl.java | 23 +++-- .../client/http/OkHttpClientTest.java | 27 ++--- .../client/http/ByteArrayBodyHandler.java | 2 +- .../http/HttpClientReadableByteChannel.java | 14 ++- .../client/http/SendAsyncUtils.java | 11 ++- .../client/http/AbstractInterceptorTest.java | 16 ++- 8 files changed, 147 insertions(+), 91 deletions(-) diff --git a/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientImpl.java b/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientImpl.java index 46f8bc688b..757ee61593 100644 --- a/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientImpl.java +++ b/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientImpl.java @@ -26,6 +26,8 @@ import java.net.URI; import java.net.http.HttpResponse.BodyHandler; import java.net.http.HttpResponse.BodyHandlers; +import java.net.http.HttpResponse.BodySubscriber; +import java.net.http.HttpResponse.ResponseInfo; import java.net.http.WebSocketHandshakeException; import java.nio.ByteBuffer; import java.util.List; @@ -33,10 +35,10 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; import java.util.concurrent.Flow; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; @@ -47,14 +49,56 @@ */ public class JdkHttpClientImpl implements HttpClient { + /** + * Adapts the BodyHandler to immediately complete the body + */ + private static final class BodyHandlerAdapter implements BodyHandler { + private final AsyncBodySubscriber subscriber; + private final BodyHandler handler; + + private BodyHandlerAdapter(AsyncBodySubscriber subscriber, BodyHandler handler) { + this.subscriber = subscriber; + this.handler = handler; + } + + @Override + public BodySubscriber apply(ResponseInfo responseInfo) { + BodySubscriber bodySubscriber = handler.apply(responseInfo); + return new BodySubscriber() { + CompletableFuture cf = CompletableFuture.completedFuture(subscriber); + + @Override + public void onSubscribe(Subscription subscription) { + bodySubscriber.onSubscribe(subscription); + } + + @Override + public void onNext(List item) { + bodySubscriber.onNext(item); + } + + @Override + public void onError(Throwable throwable) { + bodySubscriber.onError(throwable); + } + + @Override + public void onComplete() { + bodySubscriber.onComplete(); + } + + @Override + public CompletionStage getBody() { + return cf; + } + }; + } + } + private final class AsyncBodySubscriber implements Subscriber, AsyncBody { private final BodyConsumer consumer; private CompletableFuture done = new CompletableFuture(); - private final AtomicBoolean subscribed = new AtomicBoolean(); - private volatile Flow.Subscription subscription; - private T initialItem; - private boolean first = true; - private boolean isComplete; + private CompletableFuture subscription = new CompletableFuture<>(); private AsyncBodySubscriber(BodyConsumer consumer) { this.consumer = consumer; @@ -62,25 +106,15 @@ private AsyncBodySubscriber(BodyConsumer consumer) { @Override public void onSubscribe(Subscription subscription) { - if (!subscribed.compareAndSet(false, true)) { + if (this.subscription.isDone()) { subscription.cancel(); return; } - this.subscription = subscription; - // the sendAsync future won't complete unless we do the initial request here - // so in onNext we'll trap the item until we're ready - subscription.request(1); + this.subscription.complete(subscription); } @Override public void onNext(T item) { - synchronized (this) { - if (first) { - this.initialItem = item; - first = false; - return; - } - } try { if (item == null) { done.complete(null); @@ -88,7 +122,7 @@ public void onNext(T item) { consumer.consume(item, this); } } catch (Exception e) { - subscription.cancel(); + subscription.thenAccept(Subscription::cancel); done.completeExceptionally(e); } } @@ -100,10 +134,6 @@ public void onError(Throwable throwable) { @Override public synchronized void onComplete() { - if (initialItem != null) { - this.isComplete = true; - return; - } done.complete(null); } @@ -112,19 +142,7 @@ public synchronized void consume() { if (done.isDone()) { return; } - try { - first = false; - if (initialItem != null) { - T item = initialItem; - initialItem = null; - onNext(item); - } - } finally { - if (isComplete) { - done.complete(null); - } - this.subscription.request(1); - } + this.subscription.thenAccept(s -> s.request(1)); } @Override @@ -134,7 +152,7 @@ public CompletableFuture done() { @Override public void cancel() { - subscription.cancel(); + subscription.thenAccept(Subscription::cancel); done.cancel(false); } @@ -234,7 +252,8 @@ public CompletableFuture> consumeLines(HttpRequest reque return sendAsync(request, () -> { AsyncBodySubscriber subscriber = new AsyncBodySubscriber<>(consumer); BodyHandler handler = BodyHandlers.fromLineSubscriber(subscriber); - return new HandlerAndAsyncBody<>(handler, subscriber); + BodyHandler handlerAdapter = new BodyHandlerAdapter(subscriber, handler); + return new HandlerAndAsyncBody<>(handlerAdapter, subscriber); }).thenApply(r -> new JdkHttpResponseImpl(r.response, r.asyncBody)); } @@ -243,7 +262,8 @@ public CompletableFuture> consumeBytes(HttpRequest reque return sendAsync(request, () -> { AsyncBodySubscriber> subscriber = new AsyncBodySubscriber<>(consumer); BodyHandler handler = BodyHandlers.fromSubscriber(subscriber); - return new HandlerAndAsyncBody<>(handler, subscriber); + BodyHandler handlerAdapter = new BodyHandlerAdapter(subscriber, handler); + return new HandlerAndAsyncBody<>(handlerAdapter, subscriber); }).thenApply(r -> new JdkHttpResponseImpl(r.response, r.asyncBody)); } diff --git a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyAsyncResponseListener.java b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyAsyncResponseListener.java index 4694179a51..2ee8ad50c0 100644 --- a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyAsyncResponseListener.java +++ b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyAsyncResponseListener.java @@ -15,16 +15,17 @@ */ package io.fabric8.kubernetes.client.jetty; -import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.http.HttpClient; import io.fabric8.kubernetes.client.http.HttpRequest; import io.fabric8.kubernetes.client.http.HttpResponse; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Result; +import org.eclipse.jetty.util.Callback; import java.nio.ByteBuffer; import java.util.concurrent.CompletableFuture; +import java.util.function.LongConsumer; public abstract class JettyAsyncResponseListener extends Response.Listener.Adapter implements HttpClient.AsyncBody { @@ -32,20 +33,29 @@ public abstract class JettyAsyncResponseListener extends Response.Listener.Ad private final HttpClient.BodyConsumer bodyConsumer; private final CompletableFuture> asyncResponse; private final CompletableFuture asyncBodyDone; - private boolean consume; + private CompletableFuture demand = new CompletableFuture<>(); + private boolean initialConsumeCalled; + private Runnable initialConsume; JettyAsyncResponseListener(HttpRequest httpRequest, HttpClient.BodyConsumer bodyConsumer) { this.httpRequest = httpRequest; this.bodyConsumer = bodyConsumer; asyncResponse = new CompletableFuture<>(); asyncBodyDone = new CompletableFuture<>(); - consume = false; } @Override - public synchronized void consume() { - consume = true; - this.notifyAll(); + public void consume() { + synchronized (this) { + if (!this.initialConsumeCalled) { + this.initialConsumeCalled = true; + if (this.initialConsume != null) { + this.initialConsume.run(); + this.initialConsume = null; + } + } + } + demand.thenAccept(l -> l.accept(1)); } @Override @@ -74,21 +84,20 @@ public CompletableFuture> listen(Request requ } @Override - public void onContent(Response response, ByteBuffer content) { - try { - synchronized (this) { - while (!consume && !asyncBodyDone.isCancelled()) { - this.wait(); - } - } - if (!asyncBodyDone.isCancelled()) { - bodyConsumer.consume(process(response, content), this); + public void onContent(Response response, LongConsumer demand, ByteBuffer content, Callback callback) { + synchronized (this) { + if (!initialConsumeCalled) { + // defer until consume is called + this.initialConsume = () -> onContent(response, demand, content, callback); + return; } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw KubernetesClientException.launderThrowable(e); + this.demand.complete(demand); + } + try { + bodyConsumer.consume(process(response, content), this); + callback.succeeded(); } catch (Exception e) { - throw KubernetesClientException.launderThrowable(e); + callback.failed(e); } } diff --git a/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java b/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java index 00341862b0..34c5f8987a 100644 --- a/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java +++ b/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java @@ -72,16 +72,21 @@ public void consume() { // consume should not block from a callers perspective try { httpClient.dispatcher().executorService().execute(() -> { - try { - if (!source.exhausted() && !done.isDone()) { - T value = process(source); - consumer.consume(value, this); - } else { - done.complete(null); + // we must serialize multiple consumes as source is not thread-safe + // it would be better to use SerialExecutor, but that would need to move modules, as to + // potentially not hold multiple executor threads + synchronized (source) { + try { + if (!source.exhausted() && !done.isDone()) { + T value = process(source); + consumer.consume(value, this); + } else { + done.complete(null); + } + } catch (Exception e) { + Utils.closeQuietly(source); + done.completeExceptionally(e); } - } catch (Exception e) { - Utils.closeQuietly(source); - done.completeExceptionally(e); } }); } catch (Exception e) { diff --git a/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/OkHttpClientTest.java b/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/OkHttpClientTest.java index 829f428dc4..ef8c55d9a9 100644 --- a/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/OkHttpClientTest.java +++ b/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/OkHttpClientTest.java @@ -31,12 +31,15 @@ import java.io.InputStream; import java.io.Reader; import java.net.URI; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -53,8 +56,8 @@ protected HttpClient.Factory getHttpClientFactory() { return new OkHttpClientFactory(); } - private KubernetesMockServer server; - private KubernetesClient client; + KubernetesMockServer server; + KubernetesClient client; @BeforeEach void setUp() { @@ -148,15 +151,17 @@ public void onMessage(WebSocket webSocket, String text) { @Test void testAsyncBody() throws Exception { - server.expect().withPath("/async").andReturn(200, "hello world").always(); + int byteCount = 20000; + server.expect().withPath("/async").andReturn(200, new String(new byte[byteCount], StandardCharsets.UTF_8)).always(); - CompletableFuture consumed = new CompletableFuture<>(); + CompletableFuture consumed = new CompletableFuture<>(); + AtomicInteger total = new AtomicInteger(); CompletableFuture> responseFuture = client.getHttpClient().consumeBytes( client.getHttpClient().newHttpRequestBuilder().uri(URI.create(client.getConfiguration().getMasterUrl() + "async")) .build(), (value, asyncBody) -> { - consumed.complete(true); + value.stream().map(ByteBuffer::remaining).forEach(total::addAndGet); asyncBody.consume(); }); @@ -169,15 +174,14 @@ void testAsyncBody() throws Exception { r.body().done().whenComplete((v, ex) -> { if (ex != null) { consumed.completeExceptionally(ex); - } - if (v != null) { - consumed.complete(false); + } else { + consumed.complete(total.get()); } }); } }); - assertTrue(consumed.get(5, TimeUnit.SECONDS)); + assertEquals(byteCount, consumed.get(5, TimeUnit.SECONDS)); } @Test @@ -219,14 +223,15 @@ void testConsumeLines() throws Exception { @ParameterizedTest(name = "{index}: {0}") @ValueSource(classes = { String.class, byte[].class, Reader.class, InputStream.class }) void testSupportedTypes(Class type) throws Exception { - server.expect().withPath("/type").andReturn(200, "hello world").always(); + String value = new String(new byte[16384]); + server.expect().withPath("/type").andReturn(200, value).always(); final HttpResponse result = client.getHttpClient() .sendAsync(client.getHttpClient().newHttpRequestBuilder() .uri(URI.create(client.getConfiguration().getMasterUrl() + "type")).build(), type) .get(10, TimeUnit.SECONDS); assertThat(result) .satisfies(r -> assertThat(r.body()).isInstanceOf(type)) - .satisfies(r -> assertThat(r.bodyString()).isEqualTo("hello world")); + .satisfies(r -> assertThat(r.bodyString()).isEqualTo(value)); } } diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/ByteArrayBodyHandler.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/ByteArrayBodyHandler.java index 799c62f407..dea486073c 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/ByteArrayBodyHandler.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/ByteArrayBodyHandler.java @@ -28,7 +28,7 @@ public class ByteArrayBodyHandler implements BodyConsumer> { private final LinkedList buffers = new LinkedList<>(); - private final CompletableFuture result = new CompletableFuture(); + private final CompletableFuture result = new CompletableFuture<>(); @Override public synchronized void consume(List value, AsyncBody asyncBody) throws Exception { diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClientReadableByteChannel.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClientReadableByteChannel.java index 430dee3cab..b0becfd643 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClientReadableByteChannel.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClientReadableByteChannel.java @@ -40,18 +40,20 @@ public class HttpClientReadableByteChannel implements ReadableByteChannel, BodyC private boolean done; private AsyncBody asyncBody; private ByteBuffer currentBuffer; + private boolean consumeRequested; @Override public synchronized void consume(List value, AsyncBody asyncBody) throws Exception { this.buffers.addAll(value); // could proactively consume based up some byte limit this.notifyAll(); + this.consumeRequested = false; } protected synchronized void onResponse(HttpResponse response) { asyncBody = response.body(); asyncBody.done().whenComplete(this::onBodyDone); - asyncBody.consume(); + asyncBody.consume(); // pre-fetch the first results this.notifyAll(); } @@ -92,7 +94,7 @@ public synchronized int read(ByteBuffer arg0) throws IOException { while (currentBuffer == null || !currentBuffer.hasRemaining()) { if (buffers.isEmpty()) { if (failed != null) { - throw new IOException("closed", failed); + throw new IOException("channel already closed with exception", failed); } if (read > 0) { return read; @@ -100,8 +102,14 @@ public synchronized int read(ByteBuffer arg0) throws IOException { if (done) { return -1; } - if (this.asyncBody != null) { + if (!consumeRequested && this.asyncBody != null) { + consumeRequested = true; this.asyncBody.consume(); + // the consume call may actually trigger result deliver + // if it did, then just start the loop over + if (!consumeRequested) { + continue; + } } try { this.wait(); // block until more buffers are delivered diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/SendAsyncUtils.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/SendAsyncUtils.java index af73adb93f..d27322fd7e 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/SendAsyncUtils.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/SendAsyncUtils.java @@ -33,6 +33,10 @@ */ class SendAsyncUtils { + private SendAsyncUtils() { + // just utils + } + /** * Allows for changing the body type - there is further redesign that could be done here */ @@ -113,12 +117,11 @@ static CompletableFuture> bytes(HttpRequest request, HttpCl } static CompletableFuture> string(HttpRequest request, HttpClient client) { - return bytes(request, client).thenApply(res -> { - return new HttpResponseAdapter<>(res, - new String(res.body(), StandardCharsets.UTF_8)); - }); + return bytes(request, client) + .thenApply(res -> new HttpResponseAdapter<>(res, new String(res.body(), StandardCharsets.UTF_8))); } + @SuppressWarnings("rawtypes") static CompletableFuture sendAsync(HttpRequest request, Class type, HttpClient httpClient) { if (type == String.class) { return string(request, httpClient); diff --git a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractInterceptorTest.java b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractInterceptorTest.java index 5ea4f843da..c7a3eca940 100644 --- a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractInterceptorTest.java +++ b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractInterceptorTest.java @@ -71,7 +71,7 @@ public void before(BasicBuilder builder, HttpHeaders headers) { @DisplayName("afterFailure (HTTP), replaces the HttpResponse produced by HttpClient.sendAsync") public void afterHttpFailureReplacesResponseInSendAsync() throws Exception { // Given - server.expect().withPath("/intercepted-url").andReturn(200, "This works").always(); + server.expect().withPath("/intercepted-url").andReturn(200, "This works").once(); final HttpClient.Builder builder = getHttpClientFactory().newBuilder() .addOrReplaceInterceptor("test", new Interceptor() { @Override @@ -96,7 +96,7 @@ public CompletableFuture afterFailure(BasicBuilder builder, HttpRespons @DisplayName("afterFailure (HTTP), replaces the HttpResponse produced by HttpClient.consumeLines") public void afterHttpFailureReplacesResponseInConsumeLines() throws Exception { // Given - server.expect().withPath("/intercepted-url").andReturn(200, "This works").always(); + server.expect().withPath("/intercepted-url").andReturn(200, "This works\n").once(); final HttpClient.Builder builder = getHttpClientFactory().newBuilder() .addOrReplaceInterceptor("test", new Interceptor() { @Override @@ -115,9 +115,15 @@ public CompletableFuture afterFailure(BasicBuilder builder, HttpRespons }) .get(10L, TimeUnit.SECONDS); asyncR.body().consume(); - asyncR.body().done().get(10L, TimeUnit.SECONDS); + asyncR.body().done().whenComplete((v, t) -> { + if (t != null) { + result.completeExceptionally(t); + } else { + result.complete(null); + } + }); // Then - assertThat(result.get()).isEqualTo("This works"); + assertThat(result.get(10L, TimeUnit.SECONDS)).isEqualTo("This works"); } } @@ -125,7 +131,7 @@ public CompletableFuture afterFailure(BasicBuilder builder, HttpRespons @DisplayName("afterFailure (HTTP), replaces the HttpResponse produced by HttpClient.consumeBytes") public void afterHttpFailureReplacesResponseInConsumeBytes() throws Exception { // Given - server.expect().withPath("/intercepted-url").andReturn(200, "This works").always(); + server.expect().withPath("/intercepted-url").andReturn(200, "This works").once(); final HttpClient.Builder builder = getHttpClientFactory().newBuilder() .addOrReplaceInterceptor("test", new Interceptor() { @Override From 9156cdef27e1747ba8062821f3deb9457f604fc3 Mon Sep 17 00:00:00 2001 From: Steve Hawkins Date: Tue, 18 Oct 2022 10:57:29 -0400 Subject: [PATCH 3/5] fix #4201: jetty's bytebuffers may be immediately re-used --- .../client/jetty/JettyAsyncResponseListener.java | 10 +++++++++- .../io/fabric8/kubernetes/client/http/HttpClient.java | 2 ++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyAsyncResponseListener.java b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyAsyncResponseListener.java index 2ee8ad50c0..ba09dc235e 100644 --- a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyAsyncResponseListener.java +++ b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyAsyncResponseListener.java @@ -94,7 +94,8 @@ public void onContent(Response response, LongConsumer demand, ByteBuffer content this.demand.complete(demand); } try { - bodyConsumer.consume(process(response, content), this); + // we must clone as the buffer can be reused after the call to succeeded + bodyConsumer.consume(process(response, clone(content)), this); callback.succeeded(); } catch (Exception e) { callback.failed(e); @@ -102,4 +103,11 @@ public void onContent(Response response, LongConsumer demand, ByteBuffer content } protected abstract T process(Response response, ByteBuffer content); + + public static ByteBuffer clone(ByteBuffer original) { + ByteBuffer clone = ByteBuffer.allocate(original.remaining()); + clone.put(original); + clone.flip(); + return clone; + } } diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClient.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClient.java index e59a40d947..b540f07cf9 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClient.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClient.java @@ -199,6 +199,8 @@ default CompletableFuture> sendAsync(HttpRequest request, Cl /** * Send a request and consume the bytes of the resulting response body + *

+ * HtttpClient implementations will provide ByteBuffers that may be held directly. * * @param request the HttpRequest to send * @param consumer the response body consumer From 45d605bcbcd9b424ba2940a6f9aa6d20de27a754 Mon Sep 17 00:00:00 2001 From: Steve Hawkins Date: Mon, 21 Nov 2022 22:39:18 -0500 Subject: [PATCH 4/5] fix #4201: changing how the jetty session is set adding missing test timeouts correcting the wait in HttpClientReadableByteChannel --- .../fabric8/kubernetes/client/jetty/JettyWebSocket.java | 8 ++------ .../kubernetes/client/jetty/JettyWebSocketBuilder.java | 2 +- .../kubernetes/client/jetty/JettyWebSocketTest.java | 8 ++++---- .../fabric8/kubernetes/client/http/OkHttpClientTest.java | 4 ++-- .../client/http/HttpClientReadableByteChannel.java | 5 ++++- 5 files changed, 13 insertions(+), 14 deletions(-) diff --git a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocket.java b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocket.java index 53ea3a55b4..b254883bdd 100644 --- a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocket.java +++ b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocket.java @@ -36,7 +36,7 @@ public class JettyWebSocket implements WebSocket, WebSocketListener { private final Condition backPressure; private final AtomicBoolean closed; private boolean moreMessages; - private Session webSocketSession; + private volatile Session webSocketSession; public JettyWebSocket(WebSocket.Listener listener) { this.listener = listener; @@ -115,6 +115,7 @@ public void onWebSocketClose(int statusCode, String reason) { @Override public void onWebSocketConnect(Session session) { + this.webSocketSession = session; listener.onOpen(this); } @@ -133,11 +134,6 @@ public void onWebSocketError(Throwable cause) { listener.onError(this, cause); } - public JettyWebSocket setWebSocketSession(Session webSocketSession) { - this.webSocketSession = webSocketSession; - return this; - } - private void backPressure() { try { lock.lock(); diff --git a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketBuilder.java b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketBuilder.java index b3f9990b5e..c5a8135bc5 100644 --- a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketBuilder.java +++ b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketBuilder.java @@ -58,7 +58,7 @@ public CompletableFuture buildAsync(WebSocket.Listener listener) { final CompletableFuture future = new CompletableFuture<>(); final var webSocket = new JettyWebSocket(listener); return webSocketClient.connect(webSocket, Objects.requireNonNull(WebSocket.toWebSocketUri(getUri())), cur) - .thenApply(webSocket::setWebSocketSession) + .thenApply(s -> webSocket) .exceptionally(ex -> { if (ex instanceof CompletionException && ex.getCause() instanceof UpgradeException) { future.completeExceptionally(toHandshakeException((UpgradeException) ex.getCause())); diff --git a/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketTest.java b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketTest.java index 75e05f5ca3..83a3510e86 100644 --- a/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketTest.java +++ b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketTest.java @@ -182,7 +182,7 @@ void sendCloseWhenConnectionIsOpen() { // Given final var jws = new JettyWebSocket(new Listener()); final var session = mock(Session.class); - jws.setWebSocketSession(session); + jws.onWebSocketConnect(session); when(session.isOpen()).thenReturn(true); // When jws.sendClose(1000, "Closing"); @@ -196,7 +196,7 @@ void sendCloseIgnoredWhenConnectionIsClosed() { // Given final var jws = new JettyWebSocket(new Listener()); final var session = mock(Session.class); - jws.setWebSocketSession(session); + jws.onWebSocketConnect(session); when(session.isOpen()).thenReturn(false); // When jws.sendClose(1000, "Closing"); @@ -210,7 +210,7 @@ void sendCloseIgnoredWhenAlreadyClosed() { // Given final var jws = new JettyWebSocket(new Listener()); final var session = mock(Session.class); - jws.setWebSocketSession(session); + jws.onWebSocketConnect(session); when(session.isOpen()).thenReturn(true); jws.sendClose(1000, "Closing"); // When @@ -226,7 +226,7 @@ void sendIncreasesQueueSize() { // Given final var jws = new JettyWebSocket(new Listener()); final var session = mock(Session.class, RETURNS_DEEP_STUBS); - jws.setWebSocketSession(session); + jws.onWebSocketConnect(session); when(session.isOpen()).thenReturn(true); // When jws.send(ByteBuffer.wrap(new byte[] { 1, 3, 3, 7 })); diff --git a/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/OkHttpClientTest.java b/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/OkHttpClientTest.java index 10cd72b423..5d1bf425bf 100644 --- a/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/OkHttpClientTest.java +++ b/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/OkHttpClientTest.java @@ -87,7 +87,7 @@ void testWebsocketHandshakeFailure() { assertThrows(WebSocketHandshakeException.class, () -> { try { - startedFuture.get(); + startedFuture.get(10, TimeUnit.SECONDS); } catch (ExecutionException e) { throw e.getCause(); } @@ -144,7 +144,7 @@ public void onMessage(WebSocket webSocket, String text) { assertFalse(latch.await(10, TimeUnit.SECONDS)); assertEquals(1, latch.getCount()); - startedFuture.get().request(); + startedFuture.get(10, TimeUnit.SECONDS).request(); assertTrue(latch.await(10, TimeUnit.SECONDS)); } diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClientReadableByteChannel.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClientReadableByteChannel.java index b0becfd643..b8e114e353 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClientReadableByteChannel.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClientReadableByteChannel.java @@ -106,10 +106,13 @@ public synchronized int read(ByteBuffer arg0) throws IOException { consumeRequested = true; this.asyncBody.consume(); // the consume call may actually trigger result deliver - // if it did, then just start the loop over + // if it did, then just start the loop over or be done if (!consumeRequested) { continue; } + if (done) { + return -1; + } } try { this.wait(); // block until more buffers are delivered From 3356764e74d2959e23f1ae19d33e67ce79932ca1 Mon Sep 17 00:00:00 2001 From: Marc Nuri Date: Tue, 29 Nov 2022 13:02:22 +0100 Subject: [PATCH 5/5] review: HttpClient implementation refactor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - implementation of JettyHttpClient consumeLines - additional tests - centralization of HttpResponse body/async consumption logic - uncalled-for nitpicks 😅😇 Signed-off-by: Marc Nuri --- .../client/jdkhttp/JdkHttpClientImpl.java | 14 +- .../jetty/JettyAsyncResponseListener.java | 34 ++--- .../client/jetty/JettyHttpClient.java | 36 +++-- .../client/jetty/JettyAsyncBodyTest.java | 10 -- .../client/jetty/JettyInterceptorTest.java | 5 - .../client/okhttp/OkHttpClientImpl.java | 16 +-- .../client/http/JettyHttpClientTest.java | 5 - .../client/http/OkHttpClientTest.java | 14 +- .../KubernetesCrudDispatcherPostTest.java | 3 +- .../crud/KubernetesCrudDispatcherPutTest.java | 3 +- .../kubernetes/client/http/AsyncBody.java | 49 +++++++ .../kubernetes/client/http/BufferUtil.java | 77 +++++++++++ .../client/http/ByteArrayBodyHandler.java | 18 +-- .../kubernetes/client/http/HttpClient.java | 35 +---- .../http/HttpClientReadableByteChannel.java | 5 +- .../kubernetes/client/http/HttpResponse.java | 71 ++++++++-- .../client/http/HttpResponseAdapter.java | 80 +++++++++++ .../client/http/SendAsyncUtils.java | 77 ----------- .../client/http/AbstractAsyncBodyTest.java | 31 ++++- .../client/http/AbstractInterceptorTest.java | 4 +- .../client/http/BufferUtilTest.java | 129 ++++++++++++++++++ .../client/dsl/internal/LogWatchCallback.java | 5 +- .../client/dsl/internal/WatchHTTPManager.java | 2 +- .../dsl/internal/WatchHttpManagerTest.java | 2 +- 24 files changed, 508 insertions(+), 217 deletions(-) create mode 100644 kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/AsyncBody.java create mode 100644 kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/BufferUtil.java create mode 100644 kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpResponseAdapter.java create mode 100644 kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/BufferUtilTest.java diff --git a/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientImpl.java b/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientImpl.java index 097e875824..342d2c8bce 100644 --- a/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientImpl.java +++ b/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientImpl.java @@ -17,6 +17,7 @@ package io.fabric8.kubernetes.client.jdkhttp; import io.fabric8.kubernetes.client.Config; +import io.fabric8.kubernetes.client.http.AsyncBody; import io.fabric8.kubernetes.client.http.HttpClient; import io.fabric8.kubernetes.client.http.HttpRequest; import io.fabric8.kubernetes.client.http.HttpResponse; @@ -97,12 +98,12 @@ public CompletionStage getBody() { } } - private final class AsyncBodySubscriber implements Subscriber, AsyncBody { - private final BodyConsumer consumer; + private static final class AsyncBodySubscriber implements Subscriber, AsyncBody { + private final AsyncBody.Consumer consumer; private CompletableFuture done = new CompletableFuture(); private CompletableFuture subscription = new CompletableFuture<>(); - private AsyncBodySubscriber(BodyConsumer consumer) { + private AsyncBodySubscriber(AsyncBody.Consumer consumer) { this.consumer = consumer; } @@ -252,17 +253,18 @@ public DerivedClientBuilder newBuilder() { } @Override - public CompletableFuture> consumeLines(HttpRequest request, BodyConsumer consumer) { + public CompletableFuture> consumeLines(HttpRequest request, AsyncBody.Consumer consumer) { return sendAsync(request, () -> { AsyncBodySubscriber subscriber = new AsyncBodySubscriber<>(consumer); BodyHandler handler = BodyHandlers.fromLineSubscriber(subscriber); BodyHandler handlerAdapter = new BodyHandlerAdapter(subscriber, handler); return new HandlerAndAsyncBody<>(handlerAdapter, subscriber); - }).thenApply(r -> new JdkHttpResponseImpl(r.response, r.asyncBody)); + }).thenApply(r -> new JdkHttpResponseImpl<>(r.response, r.asyncBody)); } @Override - public CompletableFuture> consumeBytes(HttpRequest request, BodyConsumer> consumer) { + public CompletableFuture> consumeBytes(HttpRequest request, + AsyncBody.Consumer> consumer) { return sendAsync(request, () -> { AsyncBodySubscriber> subscriber = new AsyncBodySubscriber<>(consumer); BodyHandler handler = BodyHandlers.fromSubscriber(subscriber); diff --git a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyAsyncResponseListener.java b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyAsyncResponseListener.java index ba09dc235e..3d79a28df7 100644 --- a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyAsyncResponseListener.java +++ b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyAsyncResponseListener.java @@ -15,7 +15,7 @@ */ package io.fabric8.kubernetes.client.jetty; -import io.fabric8.kubernetes.client.http.HttpClient; +import io.fabric8.kubernetes.client.http.AsyncBody; import io.fabric8.kubernetes.client.http.HttpRequest; import io.fabric8.kubernetes.client.http.HttpResponse; import org.eclipse.jetty.client.api.Request; @@ -27,21 +27,20 @@ import java.util.concurrent.CompletableFuture; import java.util.function.LongConsumer; -public abstract class JettyAsyncResponseListener extends Response.Listener.Adapter implements HttpClient.AsyncBody { +public abstract class JettyAsyncResponseListener extends Response.Listener.Adapter implements AsyncBody { private final HttpRequest httpRequest; - private final HttpClient.BodyConsumer bodyConsumer; - private final CompletableFuture> asyncResponse; + private final CompletableFuture> asyncResponse; private final CompletableFuture asyncBodyDone; - private CompletableFuture demand = new CompletableFuture<>(); + private final CompletableFuture demand; private boolean initialConsumeCalled; private Runnable initialConsume; - JettyAsyncResponseListener(HttpRequest httpRequest, HttpClient.BodyConsumer bodyConsumer) { + JettyAsyncResponseListener(HttpRequest httpRequest) { this.httpRequest = httpRequest; - this.bodyConsumer = bodyConsumer; asyncResponse = new CompletableFuture<>(); asyncBodyDone = new CompletableFuture<>(); + demand = new CompletableFuture<>(); } @Override @@ -78,7 +77,7 @@ public void onComplete(Result result) { asyncBodyDone.complete(null); } - public CompletableFuture> listen(Request request) { + public CompletableFuture> listen(Request request) { request.send(this); return asyncResponse; } @@ -94,20 +93,21 @@ public void onContent(Response response, LongConsumer demand, ByteBuffer content this.demand.complete(demand); } try { - // we must clone as the buffer can be reused after the call to succeeded - bodyConsumer.consume(process(response, clone(content)), this); + onContent(content); callback.succeeded(); } catch (Exception e) { callback.failed(e); } } - protected abstract T process(Response response, ByteBuffer content); + /** + * Implement to consume the content of the chunked response. + *

+ * Each chunk will be passed in order to this function (onContent{callback.succeeded}) + * + * @param content the ByteBuffer containing a chunk of the response. + * @throws Exception in case the downstream consumer throws an exception. + */ + protected abstract void onContent(ByteBuffer content) throws Exception; - public static ByteBuffer clone(ByteBuffer original) { - ByteBuffer clone = ByteBuffer.allocate(original.remaining()); - clone.put(original); - clone.flip(); - return clone; - } } diff --git a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpClient.java b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpClient.java index 2365733a78..74437bdb9f 100644 --- a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpClient.java +++ b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpClient.java @@ -17,6 +17,7 @@ import io.fabric8.kubernetes.client.Config; import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.kubernetes.client.http.AsyncBody; import io.fabric8.kubernetes.client.http.HttpRequest; import io.fabric8.kubernetes.client.http.HttpResponse; import io.fabric8.kubernetes.client.http.Interceptor; @@ -24,12 +25,12 @@ import io.fabric8.kubernetes.client.http.WebSocket; import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.api.Request; -import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.util.InputStreamRequestContent; import org.eclipse.jetty.client.util.StringRequestContent; import org.eclipse.jetty.websocket.client.WebSocketClient; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -37,6 +38,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; +import static io.fabric8.kubernetes.client.http.BufferUtil.copy; import static io.fabric8.kubernetes.client.http.StandardMediaTypes.APPLICATION_OCTET_STREAM; import static io.fabric8.kubernetes.client.http.StandardMediaTypes.TEXT_PLAIN; @@ -47,7 +49,7 @@ public class JettyHttpClient implements io.fabric8.kubernetes.client.http.HttpCl private final Collection interceptors; private final JettyHttpClientBuilder builder; private final JettyHttpClientFactory factory; - private Config config; + private final Config config; public JettyHttpClient(JettyHttpClientBuilder builder, HttpClient httpClient, WebSocketClient webSocketClient, Collection interceptors, JettyHttpClientFactory jettyHttpClientFactory, Config config) { @@ -76,19 +78,37 @@ public DerivedClientBuilder newBuilder() { @Override public CompletableFuture> consumeLines( - HttpRequest originalRequest, BodyConsumer consumer) { - throw new UnsupportedOperationException("Not supported by the Jetty client"); + HttpRequest originalRequest, AsyncBody.Consumer consumer) { + final var request = toStandardHttpRequest(originalRequest); + final var future = new JettyAsyncResponseListener(request) { + + final StringBuilder builder = new StringBuilder(); + + @Override + protected void onContent(ByteBuffer content) throws Exception { + for (char c : StandardCharsets.UTF_8.decode(content).array()) { + if (c == '\n') { + consumer.consume(builder.toString(), this); + builder.setLength(0); + } else { + builder.append(c); + } + } + } + }.listen(newRequest(request)); + return interceptResponse(request.toBuilder(), future, r -> consumeLines(r, consumer)); } @Override public CompletableFuture> consumeBytes( - HttpRequest originalRequest, BodyConsumer> consumer) { + HttpRequest originalRequest, AsyncBody.Consumer> consumer) { final var request = toStandardHttpRequest(originalRequest); - final var future = new JettyAsyncResponseListener<>(request, consumer) { + final var future = new JettyAsyncResponseListener(request) { @Override - protected List process(Response response, ByteBuffer content) { - return Collections.singletonList(content); + protected void onContent(ByteBuffer content) throws Exception { + // we must clone as the buffer can be reused by the byte consumer + consumer.consume(Collections.singletonList(copy(content)), this); } }.listen(newRequest(request)); return interceptResponse(request.toBuilder(), future, r -> consumeBytes(r, consumer)); diff --git a/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyAsyncBodyTest.java b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyAsyncBodyTest.java index 747b81ab90..ca45f6bd9f 100644 --- a/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyAsyncBodyTest.java +++ b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyAsyncBodyTest.java @@ -24,14 +24,4 @@ public class JettyAsyncBodyTest extends AbstractAsyncBodyTest { protected HttpClient.Factory getHttpClientFactory() { return new JettyHttpClientFactory(); } - - @Override - public void consumeLinesNotProcessedIfCancelled() throws Exception { - // consume lines not supported - } - - @Override - public void consumeLinesProcessedAfterConsume() throws Exception { - // consume lines not supported - } } diff --git a/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyInterceptorTest.java b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyInterceptorTest.java index 2e23e6eb77..afbd6a2bc9 100644 --- a/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyInterceptorTest.java +++ b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyInterceptorTest.java @@ -24,9 +24,4 @@ public class JettyInterceptorTest extends AbstractInterceptorTest { protected HttpClient.Factory getHttpClientFactory() { return new JettyHttpClientFactory(); } - - @Override - public void afterHttpFailureReplacesResponseInConsumeLines() throws Exception { - // consume lines not supported - } } diff --git a/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java b/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java index 1c03f1b343..a6c25a83c4 100644 --- a/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java +++ b/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java @@ -18,6 +18,7 @@ import io.fabric8.kubernetes.client.Config; import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.kubernetes.client.http.AsyncBody; import io.fabric8.kubernetes.client.http.HttpClient; import io.fabric8.kubernetes.client.http.HttpRequest; import io.fabric8.kubernetes.client.http.HttpResponse; @@ -51,9 +52,6 @@ public class OkHttpClientImpl implements HttpClient { static final Map MEDIA_TYPES = new ConcurrentHashMap<>(); public static final MediaType JSON = parseMediaType("application/json"); - public static final MediaType JSON_PATCH = parseMediaType("application/json-patch+json"); - public static final MediaType STRATEGIC_MERGE_JSON_PATCH = parseMediaType("application/strategic-merge-patch+json"); - public static final MediaType JSON_MERGE_PATCH = parseMediaType("application/merge-patch+json"); static MediaType parseMediaType(String contentType) { MediaType result = MediaType.parse(contentType); @@ -62,11 +60,11 @@ static MediaType parseMediaType(String contentType) { } private abstract class OkHttpAsyncBody implements AsyncBody { - private final BodyConsumer consumer; + private final AsyncBody.Consumer consumer; private final BufferedSource source; private final CompletableFuture done = new CompletableFuture<>(); - private OkHttpAsyncBody(BodyConsumer consumer, BufferedSource source) { + private OkHttpAsyncBody(AsyncBody.Consumer consumer, BufferedSource source) { this.consumer = consumer; this.source = source; } @@ -217,8 +215,8 @@ public DerivedClientBuilder newBuilder() { } @Override - public CompletableFuture> consumeLines(HttpRequest request, - BodyConsumer consumer) { + public CompletableFuture> consumeLines( + HttpRequest request, AsyncBody.Consumer consumer) { Function handler = s -> new OkHttpAsyncBody(consumer, s) { @Override protected String process(BufferedSource source) throws IOException { @@ -232,8 +230,8 @@ protected String process(BufferedSource source) throws IOException { } @Override - public CompletableFuture> consumeBytes(HttpRequest request, - BodyConsumer> consumer) { + public CompletableFuture> consumeBytes( + HttpRequest request, AsyncBody.Consumer> consumer) { Function handler = s -> new OkHttpAsyncBody>(consumer, s) { @Override protected List process(BufferedSource source) throws IOException { diff --git a/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/JettyHttpClientTest.java b/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/JettyHttpClientTest.java index bd8a90df11..a8c88e36f4 100644 --- a/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/JettyHttpClientTest.java +++ b/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/JettyHttpClientTest.java @@ -24,9 +24,4 @@ protected HttpClient.Factory getHttpClientFactory() { return new JettyHttpClientFactory(); } - @Override - void testConsumeLines() throws Exception { - // line parsing not yet supported - } - } diff --git a/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/OkHttpClientTest.java b/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/OkHttpClientTest.java index 5d1bf425bf..472891dfb9 100644 --- a/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/OkHttpClientTest.java +++ b/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/OkHttpClientTest.java @@ -17,7 +17,6 @@ package io.fabric8.kubernetes.client.http; import io.fabric8.kubernetes.client.KubernetesClient; -import io.fabric8.kubernetes.client.http.HttpClient.AsyncBody; import io.fabric8.kubernetes.client.http.WebSocket.Listener; import io.fabric8.kubernetes.client.okhttp.OkHttpClientFactory; import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer; @@ -42,6 +41,7 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -222,7 +222,7 @@ void testConsumeLines() throws Exception { @DisplayName("Supported response body types") @ParameterizedTest(name = "{index}: {0}") @ValueSource(classes = { String.class, byte[].class, Reader.class, InputStream.class }) - void testSupportedTypes(Class type) throws Exception { + void supportedResponseBodyTypes(Class type) throws Exception { String value = new String(new byte[16384]); server.expect().withPath("/type").andReturn(200, value).always(); final HttpResponse result = client.getHttpClient() @@ -234,4 +234,14 @@ void testSupportedTypes(Class type) throws Exception { .satisfies(r -> assertThat(r.bodyString()).isEqualTo(value)); } + @Test + void supportedResponseTypeWithInvalid() { + final HttpClient httpClient = client.getHttpClient(); + final HttpRequest request = httpClient.newHttpRequestBuilder() + .uri(URI.create(client.getConfiguration().getMasterUrl() + "type")).build(); + assertThatIllegalArgumentException() + .isThrownBy(() -> httpClient.sendAsync(request, Boolean.class)) + .withMessageStartingWith("Unsupported response type: ") + .withMessageContaining("Boolean"); + } } diff --git a/junit/kubernetes-server-mock/src/test/java/io/fabric8/kubernetes/client/server/mock/crud/KubernetesCrudDispatcherPostTest.java b/junit/kubernetes-server-mock/src/test/java/io/fabric8/kubernetes/client/server/mock/crud/KubernetesCrudDispatcherPostTest.java index 4023fed116..2eb831892e 100644 --- a/junit/kubernetes-server-mock/src/test/java/io/fabric8/kubernetes/client/server/mock/crud/KubernetesCrudDispatcherPostTest.java +++ b/junit/kubernetes-server-mock/src/test/java/io/fabric8/kubernetes/client/server/mock/crud/KubernetesCrudDispatcherPostTest.java @@ -178,8 +178,7 @@ void postNamespaceMismatch() throws Exception { final HttpResponse result = httpClient.sendAsync(request, String.class).get(10, TimeUnit.SECONDS); // Then assertThat(result) - .extracting(HttpResponse::code).isEqualTo(400); - assertThat(result) + .returns(400, HttpResponse::code) .extracting(HttpResponse::body).asString() .contains("the namespace of the object (different) does not match the namespace on the URL (test)"); } diff --git a/junit/kubernetes-server-mock/src/test/java/io/fabric8/kubernetes/client/server/mock/crud/KubernetesCrudDispatcherPutTest.java b/junit/kubernetes-server-mock/src/test/java/io/fabric8/kubernetes/client/server/mock/crud/KubernetesCrudDispatcherPutTest.java index 4033d78789..da8b2caa4e 100644 --- a/junit/kubernetes-server-mock/src/test/java/io/fabric8/kubernetes/client/server/mock/crud/KubernetesCrudDispatcherPutTest.java +++ b/junit/kubernetes-server-mock/src/test/java/io/fabric8/kubernetes/client/server/mock/crud/KubernetesCrudDispatcherPutTest.java @@ -113,8 +113,7 @@ void putNameMismatch() throws Exception { final HttpResponse result = httpClient.sendAsync(request, String.class).get(10, TimeUnit.SECONDS); // Then assertThat(result) - .extracting(HttpResponse::code).isEqualTo(400); - assertThat(result) + .returns(400, HttpResponse::code) .extracting(HttpResponse::body).asString() .contains("the name of the object (different) does not match the name on the URL (mismatched-name)"); } diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/AsyncBody.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/AsyncBody.java new file mode 100644 index 0000000000..3b6c1f3123 --- /dev/null +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/AsyncBody.java @@ -0,0 +1,49 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.fabric8.kubernetes.client.http; + +import java.util.concurrent.CompletableFuture; + +/** + * A simplified java.util.concurrent.Flow.Subscription and a future tracking done. + *
+ * The body should be consumed until the end or cancelled. + */ +public interface AsyncBody { + /** + * Called to deliver results to the {@link AsyncBody.Consumer} + */ + void consume(); + + /** + * Tracks the completion of the body. + * + * @return the future + */ + CompletableFuture done(); + + void cancel(); + + /** + * A functional interface for consuming async result bodies + */ + @FunctionalInterface + interface Consumer { + void consume(T value, AsyncBody asyncBody) throws Exception; + + } +} diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/BufferUtil.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/BufferUtil.java new file mode 100644 index 0000000000..bfc8d3fa04 --- /dev/null +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/BufferUtil.java @@ -0,0 +1,77 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.http; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; + +public class BufferUtil { + + private BufferUtil() { + // utils class + } + + /** + * Convert a ByteBuffer to a byte array. + * + * @param buffer The buffer to convert in flush mode. The buffer is not altered. + * @return An array of bytes duplicated from the buffer. + */ + public static byte[] toArray(ByteBuffer buffer) { + if (buffer.hasArray()) { + byte[] array = buffer.array(); + int from = buffer.arrayOffset() + buffer.position(); + return Arrays.copyOfRange(array, from, from + buffer.remaining()); + } else { + byte[] to = new byte[buffer.remaining()]; + buffer.slice().get(to); + return to; + } + } + + public static byte[] toArray(Collection buffers) { + final List arrays = buffers.stream().map(BufferUtil::toArray).collect(Collectors.toList()); + final byte[] ret = new byte[arrays.stream().mapToInt(a -> a.length).sum()]; + int offset = 0; + for (byte[] array : arrays) { + System.arraycopy(array, 0, ret, offset, array.length); + offset += array.length; + } + return ret; + } + + /** + * Deep copy/clone of a ByteBuffer. + * + * @param buffer The buffer to copy. + * @return A copy of the provided buffer. + */ + public static ByteBuffer copy(ByteBuffer buffer) { + if (buffer == null) { + return null; + } + final int position = buffer.position(); + ByteBuffer clone = buffer.isDirect() ? ByteBuffer.allocateDirect(buffer.remaining()) + : ByteBuffer.allocate(buffer.remaining()); + clone.put(buffer); + clone.flip(); + buffer.position(position); + return clone; + } +} diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/ByteArrayBodyHandler.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/ByteArrayBodyHandler.java index dea486073c..bc41c6c603 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/ByteArrayBodyHandler.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/ByteArrayBodyHandler.java @@ -16,16 +16,14 @@ package io.fabric8.kubernetes.client.http; -import io.fabric8.kubernetes.client.http.HttpClient.AsyncBody; -import io.fabric8.kubernetes.client.http.HttpClient.BodyConsumer; - import java.nio.ByteBuffer; import java.util.LinkedList; import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.stream.Collectors; -public class ByteArrayBodyHandler implements BodyConsumer> { +import static io.fabric8.kubernetes.client.http.BufferUtil.toArray; + +public class ByteArrayBodyHandler implements AsyncBody.Consumer> { private final LinkedList buffers = new LinkedList<>(); private final CompletableFuture result = new CompletableFuture<>(); @@ -46,15 +44,7 @@ private synchronized void onBodyDone(Void v, Throwable t) { if (t != null) { result.completeExceptionally(t); } else { - int size = buffers.stream().map(ByteBuffer::remaining).collect(Collectors.summingInt(Integer::intValue)).intValue(); - byte[] res = new byte[size]; - int from = 0; - for (ByteBuffer b : buffers) { - int l = b.remaining(); - b.get(res, from, l); - from += l; - } - result.complete(res); + result.complete(toArray(buffers)); } buffers.clear(); } diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClient.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClient.java index 45227f99f7..809c8677c4 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClient.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClient.java @@ -150,35 +150,6 @@ interface Builder extends DerivedClientBuilder { Builder preferHttp11(); } - /** - * A simplified java.util.concurrent.Flow.Subscription and a future tracking done. - *
- * The body should be consumed until the end or cancelled. - */ - interface AsyncBody { - /** - * Called to deliver results to the {@link BodyConsumer} - */ - void consume(); - - /** - * Tracks the completion of the body. - * - * @return the future - */ - CompletableFuture done(); - - void cancel(); - } - - /** - * A functional interface for consuming async result bodies - */ - @FunctionalInterface - interface BodyConsumer { - void consume(T value, AsyncBody asyncBody) throws Exception; - } - @Override void close(); @@ -205,7 +176,7 @@ interface BodyConsumer { * @return a CompletableFuture that returns the resulting HttpResponse when complete */ default CompletableFuture> sendAsync(HttpRequest request, Class type) { - return SendAsyncUtils.sendAsync(request, type, this); + return HttpResponse.SupportedResponses.from(type).sendAsync(request, this); } /** @@ -216,7 +187,7 @@ default CompletableFuture> sendAsync(HttpRequest request, Cl * @param consumer the response body consumer * @return the future which will be ready after the headers have been read */ - CompletableFuture> consumeLines(HttpRequest request, BodyConsumer consumer); + CompletableFuture> consumeLines(HttpRequest request, AsyncBody.Consumer consumer); /** * Send a request and consume the bytes of the resulting response body @@ -227,7 +198,7 @@ default CompletableFuture> sendAsync(HttpRequest request, Cl * @param consumer the response body consumer * @return the future which will be ready after the headers have been read */ - CompletableFuture> consumeBytes(HttpRequest request, BodyConsumer> consumer); + CompletableFuture> consumeBytes(HttpRequest request, AsyncBody.Consumer> consumer); WebSocket.Builder newWebSocketBuilder(); diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClientReadableByteChannel.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClientReadableByteChannel.java index b8e114e353..391fbae921 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClientReadableByteChannel.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClientReadableByteChannel.java @@ -16,9 +16,6 @@ package io.fabric8.kubernetes.client.http; -import io.fabric8.kubernetes.client.http.HttpClient.AsyncBody; -import io.fabric8.kubernetes.client.http.HttpClient.BodyConsumer; - import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.ClosedByInterruptException; @@ -32,7 +29,7 @@ *

* May be useful eventually to provide a non-blocking channel as well. */ -public class HttpClientReadableByteChannel implements ReadableByteChannel, BodyConsumer> { +public class HttpClientReadableByteChannel implements ReadableByteChannel, AsyncBody.Consumer> { private final LinkedList buffers = new LinkedList<>(); private Throwable failed; diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpResponse.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpResponse.java index 921fe9e15c..c7d91b9dfc 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpResponse.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpResponse.java @@ -23,6 +23,7 @@ import java.io.InputStream; import java.io.Reader; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import static java.nio.charset.StandardCharsets.UTF_8; @@ -38,7 +39,7 @@ static boolean isSuccessful(int code) { /** * Returns the HTTP status code. - * + * * @return the HTTP status code. */ int code(); @@ -63,29 +64,71 @@ default String message() { * @throws IOException in case there's an I/O problem */ default String bodyString() throws IOException { - Object body = body(); - if (body == null) { + final Object body = body(); + final SupportedResponses supportedResponse = SupportedResponses.from(body); + if (supportedResponse == null) { return ""; } - if (body instanceof String) { - return (String) body; - } - if (body instanceof Reader) { - return IOHelpers.readFully((Reader) body); - } - if (body instanceof byte[]) { - return new String((byte[]) body, UTF_8); - } - return IOHelpers.readFully((InputStream) body, UTF_8); + return supportedResponse.asString(body); } /** * The original {@link HttpRequest} that initiated this response. - * + * * @return the HTTP request. */ HttpRequest request(); Optional> previousResponse(); + enum SupportedResponses { + TEXT(String.class, Object::toString, SendAsyncUtils::string), + INPUT_STREAM(InputStream.class, body -> IOHelpers.readFully((InputStream) body, UTF_8), SendAsyncUtils::inputStream), + READER(Reader.class, body -> IOHelpers.readFully((Reader) body), SendAsyncUtils::reader), + BYTE_ARRAY(byte[].class, body -> new String((byte[]) body, UTF_8), SendAsyncUtils::bytes); + + private final Class type; + private final ToString toString; + private final Async async; + + SupportedResponses(Class type, ToString toString, Async async) { + this.type = type; + this.toString = toString; + this.async = async; + } + + private String asString(Object body) throws IOException { + return toString.toString(body); + } + + CompletableFuture> sendAsync(HttpRequest request, HttpClient client) { + return ((Async) async).sendAsync(request, client); + } + + public static SupportedResponses from(Object object) { + if (object == null) { + return null; + } + return from(object.getClass()); + } + + public static SupportedResponses from(Class type) { + for (SupportedResponses sr : SupportedResponses.values()) { + if (sr.type.isAssignableFrom(type)) { + return sr; + } + } + throw new IllegalArgumentException("Unsupported response type: " + type.getName()); + } + + @FunctionalInterface + interface ToString { + String toString(Object body) throws IOException; + } + + @FunctionalInterface + interface Async { + CompletableFuture> sendAsync(HttpRequest request, HttpClient client); + } + } } diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpResponseAdapter.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpResponseAdapter.java new file mode 100644 index 0000000000..cdd617f3de --- /dev/null +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpResponseAdapter.java @@ -0,0 +1,80 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.fabric8.kubernetes.client.http; + +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** + * HttpResponse implementation wrapping an existing HttpResponse with a processed body. + *

+ * Allows for changing the body type - there is further redesign that could be done here + * + * @param the type of the body. + */ +class HttpResponseAdapter implements HttpResponse { + + private final HttpResponse response; + private final T body; + + public HttpResponseAdapter(HttpResponse response, T body) { + this.response = response; + this.body = body; + } + + @Override + public List headers(String key) { + return response.headers(key); + } + + @Override + public boolean isSuccessful() { + return response.isSuccessful(); + } + + @Override + public Map> headers() { + return response.headers(); + } + + @Override + public int code() { + return response.code(); + } + + @Override + public String message() { + return response.message(); + } + + @Override + public HttpRequest request() { + return response.request(); + } + + @Override + public Optional> previousResponse() { + return response.previousResponse(); + } + + @Override + public T body() { + return body; + } + +} diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/SendAsyncUtils.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/SendAsyncUtils.java index d27322fd7e..487ab30a83 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/SendAsyncUtils.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/SendAsyncUtils.java @@ -16,16 +16,11 @@ package io.fabric8.kubernetes.client.http; -import io.fabric8.kubernetes.client.http.HttpClient.AsyncBody; - import java.io.InputStream; import java.io.InputStreamReader; import java.io.Reader; import java.nio.channels.Channels; import java.nio.charset.StandardCharsets; -import java.util.List; -import java.util.Map; -import java.util.Optional; import java.util.concurrent.CompletableFuture; /** @@ -37,61 +32,6 @@ private SendAsyncUtils() { // just utils } - /** - * Allows for changing the body type - there is further redesign that could be done here - */ - static class HttpResponseAdapter implements HttpResponse { - - private final HttpResponse response; - private final T body; - - public HttpResponseAdapter(HttpResponse response, T body) { - this.response = response; - this.body = body; - } - - @Override - public List headers(String key) { - return response.headers(key); - } - - @Override - public boolean isSuccessful() { - return response.isSuccessful(); - } - - @Override - public Map> headers() { - return response.headers(); - } - - @Override - public int code() { - return response.code(); - } - - @Override - public String message() { - return response.message(); - } - - @Override - public HttpRequest request() { - return response.request(); - } - - @Override - public Optional> previousResponse() { - return response.previousResponse(); - } - - @Override - public T body() { - return body; - } - - } - static CompletableFuture> reader(HttpRequest request, HttpClient client) { return inputStream(request, client) .thenApply(res -> new HttpResponseAdapter<>(res, new InputStreamReader(res.body(), StandardCharsets.UTF_8))); @@ -121,21 +61,4 @@ static CompletableFuture> string(HttpRequest request, HttpC .thenApply(res -> new HttpResponseAdapter<>(res, new String(res.body(), StandardCharsets.UTF_8))); } - @SuppressWarnings("rawtypes") - static CompletableFuture sendAsync(HttpRequest request, Class type, HttpClient httpClient) { - if (type == String.class) { - return string(request, httpClient); - } - if (type == byte[].class) { - return bytes(request, httpClient); - } - if (type == Reader.class) { - return reader(request, httpClient); - } - if (type == InputStream.class) { - return inputStream(request, httpClient); - } - throw new AssertionError("unknown type"); - } - } diff --git a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractAsyncBodyTest.java b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractAsyncBodyTest.java index a55e868e60..97420be7c1 100644 --- a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractAsyncBodyTest.java +++ b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractAsyncBodyTest.java @@ -23,6 +23,8 @@ import java.nio.CharBuffer; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -56,7 +58,7 @@ public void consumeLinesProcessedAfterConsume() throws Exception { .andReturn(200, "This is the response body\n") .always(); final StringBuffer responseText = new StringBuffer(); - final HttpResponse asyncBodyResponse = client.consumeLines( + final HttpResponse asyncBodyResponse = client.consumeLines( client.newHttpRequestBuilder().uri(server.url("/consume-lines")).build(), (value, asyncBody) -> { responseText.append(value); @@ -78,7 +80,7 @@ public void consumeLinesNotProcessedIfCancelled() throws Exception { .andReturn(200, "This would be the response body") .always(); final StringBuffer responseText = new StringBuffer(); - final HttpResponse asyncBodyResponse = client + final HttpResponse asyncBodyResponse = client .consumeLines(client.newHttpRequestBuilder() .uri(server.url("/cancel")).build(), (value, asyncBody) -> { responseText.append(value); @@ -93,15 +95,36 @@ public void consumeLinesNotProcessedIfCancelled() throws Exception { } } + @Test + @DisplayName("Lines are processed completely") + public void consumeLinesProcessesAllLines() throws Exception { + try (final HttpClient client = getHttpClientFactory().newBuilder().build()) { + server.expect().withPath("/consume-lines") + .andReturn(200, "This is the response body\nWith\nMultiple\n lines\n") + .always(); + final List receivedLines = new ArrayList<>(); + final HttpResponse asyncBodyResponse = client.consumeLines( + client.newHttpRequestBuilder().uri(server.url("/consume-lines")).build(), + (value, asyncBody) -> { + receivedLines.add(value); + asyncBody.consume(); + }) + .get(10L, TimeUnit.SECONDS); + asyncBodyResponse.body().consume(); + asyncBodyResponse.body().done().get(10L, TimeUnit.SECONDS); + assertThat(receivedLines).containsExactly("This is the response body", "With", "Multiple", " lines"); + } + } + @Test @DisplayName("Bytes are processed and consumed only after the consume() invocation") - public void consumeByteBufferLinesProcessedAfterConsume() throws Exception { + public void consumeBytesProcessedAfterConsume() throws Exception { try (final HttpClient client = getHttpClientFactory().newBuilder().build()) { server.expect().withPath("/consume-bytes") .andReturn(200, "This is the response body as bytes") .always(); final StringBuffer responseText = new StringBuffer(); - final HttpResponse asyncBodyResponse = client.consumeBytes( + final HttpResponse asyncBodyResponse = client.consumeBytes( client.newHttpRequestBuilder().uri(server.url("/consume-bytes")).build(), (value, asyncBody) -> { responseText.append(value.stream().map(StandardCharsets.UTF_8::decode) diff --git a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractInterceptorTest.java b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractInterceptorTest.java index ab001e3865..e0eb6e73a3 100644 --- a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractInterceptorTest.java +++ b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractInterceptorTest.java @@ -131,7 +131,7 @@ public CompletableFuture afterFailure(BasicBuilder builder, HttpRespons final CompletableFuture result = new CompletableFuture<>(); // When try (HttpClient client = builder.build()) { - final HttpResponse asyncR = client.consumeLines( + final HttpResponse asyncR = client.consumeLines( client.newHttpRequestBuilder().uri(server.url("/not-found")).build(), (s, ab) -> { result.complete(s); ab.consume(); @@ -166,7 +166,7 @@ public CompletableFuture afterFailure(BasicBuilder builder, HttpRespons final CompletableFuture result = new CompletableFuture<>(); // When try (HttpClient client = builder.build()) { - final HttpResponse asyncR = client.consumeBytes( + final HttpResponse asyncR = client.consumeBytes( client.newHttpRequestBuilder().uri(server.url("/not-found")).build(), (s, ab) -> { result.complete(StandardCharsets.UTF_8.decode(s.iterator().next()).toString()); diff --git a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/BufferUtilTest.java b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/BufferUtilTest.java new file mode 100644 index 0000000000..2ef6342198 --- /dev/null +++ b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/BufferUtilTest.java @@ -0,0 +1,129 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.http; + +import org.assertj.core.api.InstanceOfAssertFactories; +import org.junit.jupiter.api.Test; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; + +import static org.assertj.core.api.Assertions.assertThat; + +class BufferUtilTest { + + @Test + void toArrayFromBBWithArray() { + final ByteBuffer buffer = ByteBuffer.wrap("hello".getBytes(StandardCharsets.UTF_8)); + final byte[] result = BufferUtil.toArray(buffer); + assertThat(result).asString(StandardCharsets.UTF_8).isEqualTo("hello"); + } + + @Test + void toArrayFromBB() { + final ByteBuffer buffer = ByteBuffer.allocateDirect(5); + buffer.put((byte) 104); + buffer.put((byte) 101); + buffer.put((byte) 108); + buffer.put((byte) 108); + buffer.put((byte) 111); + buffer.rewind(); + final byte[] result = BufferUtil.toArray(buffer); + assertThat(result).asString(StandardCharsets.UTF_8).isEqualTo("hello"); + } + + @Test + void toArrayFromBBInPosition() { + final ByteBuffer buffer = ByteBuffer.allocateDirect(12); + buffer.position(7); + buffer.put((byte) 104); + buffer.put((byte) 101); + buffer.put((byte) 108); + buffer.put((byte) 108); + buffer.put((byte) 111); + buffer.position(7); + final byte[] result = BufferUtil.toArray(buffer); + assertThat(result).asString(StandardCharsets.UTF_8).isEqualTo("hello"); + } + + @Test + void toArrayFromMultipleBB() { + final ByteBuffer hello = ByteBuffer.allocateDirect(12); + hello.position(7); + hello.put((byte) 104); + hello.put((byte) 101); + hello.put((byte) 108); + hello.put((byte) 108); + hello.put((byte) 111); + hello.position(7); + final ByteBuffer space = ByteBuffer.allocateDirect(1); + space.put((byte) 32); + space.rewind(); + final ByteBuffer world = ByteBuffer.wrap("world".getBytes(StandardCharsets.UTF_8)); + final byte[] result = BufferUtil.toArray(Arrays.asList(hello, space, world)); + assertThat(result).asString(StandardCharsets.UTF_8).isEqualTo("hello world"); + } + + @Test + void copyReturnsADifferentInstance() { + final ByteBuffer buffer = ByteBuffer.wrap("hello".getBytes(StandardCharsets.UTF_8)); + final ByteBuffer result = BufferUtil.copy(buffer); + assertThat(result) + .isNotSameAs(buffer) + .extracting(BufferUtil::toArray).asInstanceOf(InstanceOfAssertFactories.BYTE_ARRAY) + .asString(StandardCharsets.UTF_8) + .isEqualTo("hello"); + } + + @Test + void copyReturnsInstanceWithSameContent() { + final ByteBuffer buffer = ByteBuffer.wrap("hello".getBytes(StandardCharsets.UTF_8)); + final ByteBuffer result = BufferUtil.copy(buffer); + assertThat(result) + .extracting(BufferUtil::toArray) + .asInstanceOf(InstanceOfAssertFactories.BYTE_ARRAY) + .asString(StandardCharsets.UTF_8) + .isEqualTo("hello"); + } + + @Test + void copyReturnsInstanceWithSameContentRange() { + final ByteBuffer buffer = ByteBuffer.wrap("hello".getBytes(StandardCharsets.UTF_8)); + buffer.position(2); + final ByteBuffer result = BufferUtil.copy(buffer); + assertThat(result) + .extracting(BufferUtil::toArray) + .asInstanceOf(InstanceOfAssertFactories.BYTE_ARRAY) + .asString(StandardCharsets.UTF_8) + .isEqualTo("llo"); + } + + @Test + void copyReturnsInstancePreservingPositionOfOriginal() { + final ByteBuffer buffer = ByteBuffer.wrap("hello".getBytes(StandardCharsets.UTF_8)); + buffer.position(2); + BufferUtil.copy(buffer); + assertThat(buffer) + .extracting(ByteBuffer::position) + .isEqualTo(2); + } + + @Test + void copyWithNullReturnsNull() { + assertThat(BufferUtil.copy(null)).isNull(); + } +} diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/LogWatchCallback.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/LogWatchCallback.java index 82aad07290..5f3b08ed3a 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/LogWatchCallback.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/LogWatchCallback.java @@ -17,6 +17,7 @@ import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.dsl.LogWatch; +import io.fabric8.kubernetes.client.http.AsyncBody; import io.fabric8.kubernetes.client.http.HttpClient; import io.fabric8.kubernetes.client.http.HttpRequest; import io.fabric8.kubernetes.client.utils.internal.SerialExecutor; @@ -45,7 +46,7 @@ public class LogWatchCallback implements LogWatch, AutoCloseable { private volatile InputStream output; private final AtomicBoolean closed = new AtomicBoolean(false); - private volatile Optional asyncBody = Optional.empty(); + private volatile Optional asyncBody = Optional.empty(); private final SerialExecutor serialExecutor; public LogWatchCallback(OutputStream out, Executor executor) { @@ -65,7 +66,7 @@ private void cleanUp() { if (!closed.compareAndSet(false, true)) { return; } - asyncBody.ifPresent(HttpClient.AsyncBody::cancel); + asyncBody.ifPresent(AsyncBody::cancel); serialExecutor.shutdownNow(); } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java index 3bfd38c0ef..e114495c5f 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java @@ -19,8 +19,8 @@ import io.fabric8.kubernetes.api.model.KubernetesResourceList; import io.fabric8.kubernetes.api.model.ListOptions; import io.fabric8.kubernetes.client.Watcher; +import io.fabric8.kubernetes.client.http.AsyncBody; import io.fabric8.kubernetes.client.http.HttpClient; -import io.fabric8.kubernetes.client.http.HttpClient.AsyncBody; import io.fabric8.kubernetes.client.http.HttpRequest; import io.fabric8.kubernetes.client.http.HttpResponse; import org.slf4j.Logger; diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/WatchHttpManagerTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/WatchHttpManagerTest.java index 0143ccdde2..cc2a8323e1 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/WatchHttpManagerTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/WatchHttpManagerTest.java @@ -20,8 +20,8 @@ import io.fabric8.kubernetes.api.model.KubernetesResourceList; import io.fabric8.kubernetes.api.model.ListOptions; import io.fabric8.kubernetes.client.Watcher; +import io.fabric8.kubernetes.client.http.AsyncBody; import io.fabric8.kubernetes.client.http.HttpClient; -import io.fabric8.kubernetes.client.http.HttpClient.AsyncBody; import io.fabric8.kubernetes.client.http.HttpClient.DerivedClientBuilder; import io.fabric8.kubernetes.client.http.HttpResponse; import org.junit.jupiter.api.Test;