From 23ad882d095dd62efbbcf7206afb108cf654c1ca Mon Sep 17 00:00:00 2001 From: Steve Hawkins Date: Mon, 19 Sep 2022 18:19:48 -0400 Subject: [PATCH] fix #4201: generalizing sendAsync support --- .../client/jdkhttp/JdkHttpClientImpl.java | 35 ----- .../client/jetty/JettyHttpClient.java | 34 +---- .../client/jetty/JettyHttpResponse.java | 47 ------ .../client/jetty/JettyAsyncBodyTest.java | 10 ++ .../client/jetty/JettyHttpClientTest.java | 13 -- .../client/jetty/JettyHttpResponseTest.java | 27 ---- .../client/jetty/JettyInterceptorTest.java | 5 + .../client/okhttp/OkHttpClientImpl.java | 24 --- .../client/http/JettyHttpClientTest.java | 5 + .../client/http/OkHttpClientTest.java | 37 +++++ .../client/http/ByteArrayBodyHandler.java | 66 +++++++++ .../kubernetes/client/http/HttpClient.java | 4 +- .../http/HttpClientReadableByteChannel.java | 128 ++++++++++++++++ .../client/http/SendAsyncUtils.java | 138 ++++++++++++++++++ 14 files changed, 393 insertions(+), 180 deletions(-) create mode 100644 kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/ByteArrayBodyHandler.java create mode 100644 kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClientReadableByteChannel.java create mode 100644 kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/SendAsyncUtils.java diff --git a/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientImpl.java b/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientImpl.java index e68d1110d6b..46f8bc688ba 100644 --- a/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientImpl.java +++ b/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientImpl.java @@ -23,17 +23,11 @@ import io.fabric8.kubernetes.client.http.WebSocket; import io.fabric8.kubernetes.client.http.WebSocket.Listener; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.Reader; import java.net.URI; import java.net.http.HttpResponse.BodyHandler; import java.net.http.HttpResponse.BodyHandlers; -import java.net.http.HttpResponse.BodySubscriber; -import java.net.http.HttpResponse.BodySubscribers; import java.net.http.WebSocketHandshakeException; import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; import java.util.Optional; @@ -253,35 +247,6 @@ public CompletableFuture> consumeBytes(HttpRequest reque }).thenApply(r -> new JdkHttpResponseImpl(r.response, r.asyncBody)); } - @Override - public CompletableFuture> sendAsync(HttpRequest request, Class type) { - return sendAsync(request, () -> new HandlerAndAsyncBody(toBodyHandler(type), null)) - .thenApply(ar -> new JdkHttpResponseImpl<>(ar.response)); - } - - private BodyHandler toBodyHandler(Class type) { - BodyHandler bodyHandler; - if (type == null) { - bodyHandler = (BodyHandler) BodyHandlers.discarding(); - } else if (type == InputStream.class) { - bodyHandler = (BodyHandler) BodyHandlers.ofInputStream(); - } else if (type == String.class) { - bodyHandler = (BodyHandler) BodyHandlers.ofString(); - } else if (type == byte[].class) { - bodyHandler = (BodyHandler) BodyHandlers.ofByteArray(); - } else { - bodyHandler = responseInfo -> { - BodySubscriber upstream = BodyHandlers.ofInputStream().apply(responseInfo); - - BodySubscriber downstream = BodySubscribers.mapping( - upstream, - (InputStream is) -> new InputStreamReader(is, StandardCharsets.UTF_8)); - return (BodySubscriber) downstream; - }; - } - return bodyHandler; - } - public CompletableFuture> sendAsync(HttpRequest request, Supplier> handlerAndAsyncBodySupplier) { JdkHttpRequestImpl jdkRequest = (JdkHttpRequestImpl) request; diff --git a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpClient.java b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpClient.java index e53dfaf2b71..9a56c51926b 100644 --- a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpClient.java +++ b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpClient.java @@ -24,8 +24,6 @@ import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; -import org.eclipse.jetty.client.api.Result; -import org.eclipse.jetty.client.util.BufferingResponseListener; import org.eclipse.jetty.client.util.InputStreamRequestContent; import org.eclipse.jetty.client.util.StringRequestContent; import org.eclipse.jetty.websocket.client.WebSocketClient; @@ -40,7 +38,6 @@ import static io.fabric8.kubernetes.client.http.StandardMediaTypes.APPLICATION_OCTET_STREAM; import static io.fabric8.kubernetes.client.http.StandardMediaTypes.TEXT_PLAIN; -import static org.eclipse.jetty.util.BufferUtil.toArray; public class JettyHttpClient implements io.fabric8.kubernetes.client.http.HttpClient { @@ -74,39 +71,10 @@ public DerivedClientBuilder newBuilder() { return builder.copy(); } - @Override - public CompletableFuture> sendAsync(HttpRequest originalRequest, Class type) { - final var supportedResponse = JettyHttpResponse.SupportedResponse.from(type); - final var request = toStandardHttpRequest(originalRequest); - final CompletableFuture> future = new CompletableFuture<>(); - newRequest(request).send(new BufferingResponseListener() { - - // TODO: Long Term Refactor - This Listener blocks until the full response is read and keeps it in memory. - // Find a way to stream the response body without completing the future - // We need two signals, one when the response is received, and one when the body is completely - // read. - // Should this method be completely replaced by consumeXxx()? - @Override - public void onComplete(Result result) { - future.complete(new JettyHttpResponse<>( - request, result.getResponse(), supportedResponse.process(result.getResponse(), getContent(), type))); - } - }); - return interceptResponse(request.toBuilder(), future, r -> sendAsync(r, type)); - } - @Override public CompletableFuture> consumeLines( HttpRequest originalRequest, BodyConsumer consumer) { - final var request = toStandardHttpRequest(originalRequest); - final var future = new JettyAsyncResponseListener<>(request, consumer) { - - @Override - protected String process(Response response, ByteBuffer content) { - return JettyHttpResponse.SupportedResponse.TEXT.process(response, toArray(content), String.class); - } - }.listen(newRequest(request)); - return interceptResponse(request.toBuilder(), future, r -> consumeLines(r, consumer)); + throw new UnsupportedOperationException("Not supported by the Jetty client"); } @Override diff --git a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpResponse.java b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpResponse.java index 77a66696778..1c93df29326 100644 --- a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpResponse.java +++ b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpResponse.java @@ -17,19 +17,13 @@ import io.fabric8.kubernetes.client.http.HttpRequest; import io.fabric8.kubernetes.client.http.HttpResponse; -import io.fabric8.kubernetes.client.utils.Utils; import org.eclipse.jetty.client.api.Response; -import java.io.ByteArrayInputStream; -import java.io.InputStreamReader; -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.function.BiFunction; public class JettyHttpResponse implements HttpResponse { @@ -82,45 +76,4 @@ public Optional> previousResponse() { return Optional.empty(); } - enum SupportedResponse { - - TEXT(String.class, (r, bytes) -> new String(bytes, responseCharset(r))), - INPUT_STREAM(ByteArrayInputStream.class, (r, bytes) -> new ByteArrayInputStream(bytes)), - READER(InputStreamReader.class, (r, bytes) -> new InputStreamReader(new ByteArrayInputStream(bytes), responseCharset(r))), - BYTE_ARRAY(byte[].class, (r, bytes) -> bytes); - - private final Class type; - private final BiFunction processor; - - SupportedResponse(Class type, BiFunction processor) { - this.type = type; - this.processor = processor; - } - - public T process(Response response, byte[] bytes, Class type) { - return type.cast(processor.apply(response, bytes)); - } - - static SupportedResponse from(Class type) { - for (SupportedResponse sr : SupportedResponse.values()) { - if (type.isAssignableFrom(sr.type)) { - return sr; - } - } - throw new IllegalArgumentException("Unsupported response type: " + type.getName()); - } - - private static Charset responseCharset(Response response) { - var responseCharset = StandardCharsets.UTF_8; - final var responseEncoding = response.getHeaders().get("Content-Encoding"); - if (Utils.isNotNullOrEmpty(responseEncoding)) { - try { - responseCharset = Charset.forName(responseEncoding); - } catch (Exception e) { - // ignored - } - } - return responseCharset; - } - } } diff --git a/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyAsyncBodyTest.java b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyAsyncBodyTest.java index ca45f6bd9fa..747b81ab90e 100644 --- a/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyAsyncBodyTest.java +++ b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyAsyncBodyTest.java @@ -24,4 +24,14 @@ public class JettyAsyncBodyTest extends AbstractAsyncBodyTest { protected HttpClient.Factory getHttpClientFactory() { return new JettyHttpClientFactory(); } + + @Override + public void consumeLinesNotProcessedIfCancelled() throws Exception { + // consume lines not supported + } + + @Override + public void consumeLinesProcessedAfterConsume() throws Exception { + // consume lines not supported + } } diff --git a/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpClientTest.java b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpClientTest.java index c701fa23fd3..d3afb0f82c8 100644 --- a/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpClientTest.java +++ b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpClientTest.java @@ -101,19 +101,6 @@ void newBuilderInstantiatesJettyHttpClientBuilderWithSameSettings() throws Excep } } - @Test - @DisplayName("sendAsync with unsupported type throws Exception") - void sendAsyncUnsupportedType() { - try (var jettyHttpClient = new JettyHttpClient( - null, httpClient, webSocketClient, Collections.emptyList(), null)) { - // When - final var result = assertThrows(IllegalArgumentException.class, - () -> jettyHttpClient.sendAsync(null, Integer.class)); - // Then - assertThat(result).hasMessage("Unsupported response type: java.lang.Integer"); - } - } - @Test @DisplayName("sendAsync with unsupported HttpRequest throws Exception") void sendAsyncUnsupportedHttpRequest() { diff --git a/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpResponseTest.java b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpResponseTest.java index 3bf62334580..f6bc401eac3 100644 --- a/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpResponseTest.java +++ b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpResponseTest.java @@ -17,22 +17,13 @@ import org.eclipse.jetty.client.HttpResponse; import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; -import java.io.ByteArrayInputStream; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.Reader; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.params.provider.Arguments.arguments; class JettyHttpResponseTest { @@ -56,22 +47,4 @@ void headersHandlesJettyHttpFields() { .containsEntry("Via", Arrays.asList("proxy-1", "proxy-2")); } - @ParameterizedTest(name = "{index}: SupportedResponse: from type ''{0}'' is ''{1}''") - @MethodSource("supportedResponsesInput") - void supportedResponses(Class type, JettyHttpResponse.SupportedResponse supportedResponse) { - // When - final var result = JettyHttpResponse.SupportedResponse.from(type); - // Then - assertThat(result).isEqualTo(supportedResponse); - } - - static Stream supportedResponsesInput() { - return Stream.of( - arguments(String.class, JettyHttpResponse.SupportedResponse.TEXT), - arguments(InputStream.class, JettyHttpResponse.SupportedResponse.INPUT_STREAM), - arguments(ByteArrayInputStream.class, JettyHttpResponse.SupportedResponse.INPUT_STREAM), - arguments(Reader.class, JettyHttpResponse.SupportedResponse.READER), - arguments(InputStreamReader.class, JettyHttpResponse.SupportedResponse.READER), - arguments(byte[].class, JettyHttpResponse.SupportedResponse.BYTE_ARRAY)); - } } diff --git a/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyInterceptorTest.java b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyInterceptorTest.java index afbd6a2bc90..2e23e6eb778 100644 --- a/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyInterceptorTest.java +++ b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyInterceptorTest.java @@ -24,4 +24,9 @@ public class JettyInterceptorTest extends AbstractInterceptorTest { protected HttpClient.Factory getHttpClientFactory() { return new JettyHttpClientFactory(); } + + @Override + public void afterHttpFailureReplacesResponseInConsumeLines() throws Exception { + // consume lines not supported + } } diff --git a/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java b/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java index f0d3262ba6e..00341862b05 100644 --- a/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java +++ b/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java @@ -257,30 +257,6 @@ public void onFailure(Call call, IOException e) { return future; } - @Override - public CompletableFuture> sendAsync(HttpRequest request, Class type) { - CompletableFuture> future = new CompletableFuture<>(); - Call call = httpClient.newCall(((OkHttpRequestImpl) request).getRequest()); - call.enqueue(new Callback() { - - @Override - public void onResponse(Call call, Response response) throws IOException { - future.complete(new OkHttpResponseImpl<>(response, type)); - } - - @Override - public void onFailure(Call call, IOException e) { - future.completeExceptionally(e); - } - }); - future.whenComplete((r, t) -> { - if (future.isCancelled()) { - call.cancel(); - } - }); - return future; - } - @Override public io.fabric8.kubernetes.client.http.WebSocket.Builder newWebSocketBuilder() { return new OkHttpWebSocketImpl.BuilderImpl(this.httpClient); diff --git a/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/JettyHttpClientTest.java b/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/JettyHttpClientTest.java index a8c88e36f44..bd8a90df115 100644 --- a/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/JettyHttpClientTest.java +++ b/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/JettyHttpClientTest.java @@ -24,4 +24,9 @@ protected HttpClient.Factory getHttpClientFactory() { return new JettyHttpClientFactory(); } + @Override + void testConsumeLines() throws Exception { + // line parsing not yet supported + } + } diff --git a/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/OkHttpClientTest.java b/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/OkHttpClientTest.java index 4ef2f18697c..829f428dc4a 100644 --- a/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/OkHttpClientTest.java +++ b/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/OkHttpClientTest.java @@ -31,6 +31,8 @@ import java.io.InputStream; import java.io.Reader; import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -178,6 +180,41 @@ void testAsyncBody() throws Exception { assertTrue(consumed.get(5, TimeUnit.SECONDS)); } + @Test + void testConsumeLines() throws Exception { + server.expect().withPath("/async").andReturn(200, "hello\nworld\nlines\n").always(); + + ArrayList strings = new ArrayList<>(); + CompletableFuture consumed = new CompletableFuture<>(); + + CompletableFuture> responseFuture = client.getHttpClient().consumeLines( + client.getHttpClient().newHttpRequestBuilder().uri(URI.create(client.getConfiguration().getMasterUrl() + "async")) + .build(), + (value, asyncBody) -> { + strings.add(value); + asyncBody.consume(); + }); + + responseFuture.whenComplete((r, t) -> { + if (t != null) { + consumed.completeExceptionally(t); + } + if (r != null) { + r.body().consume(); + r.body().done().whenComplete((v, ex) -> { + if (ex != null) { + consumed.completeExceptionally(ex); + } else { + consumed.complete(null); + } + }); + } + }); + + consumed.get(5, TimeUnit.SECONDS); + assertEquals(Arrays.asList("hello", "world", "lines"), strings); + } + @DisplayName("Supported response body types") @ParameterizedTest(name = "{index}: {0}") @ValueSource(classes = { String.class, byte[].class, Reader.class, InputStream.class }) diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/ByteArrayBodyHandler.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/ByteArrayBodyHandler.java new file mode 100644 index 00000000000..799c62f4070 --- /dev/null +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/ByteArrayBodyHandler.java @@ -0,0 +1,66 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.fabric8.kubernetes.client.http; + +import io.fabric8.kubernetes.client.http.HttpClient.AsyncBody; +import io.fabric8.kubernetes.client.http.HttpClient.BodyConsumer; + +import java.nio.ByteBuffer; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +public class ByteArrayBodyHandler implements BodyConsumer> { + + private final LinkedList buffers = new LinkedList<>(); + private final CompletableFuture result = new CompletableFuture(); + + @Override + public synchronized void consume(List value, AsyncBody asyncBody) throws Exception { + this.buffers.addAll(value); + asyncBody.consume(); + } + + protected void onResponse(HttpResponse response) { + AsyncBody asyncBody = response.body(); + asyncBody.done().whenComplete(this::onBodyDone); + asyncBody.consume(); + } + + private synchronized void onBodyDone(Void v, Throwable t) { + if (t != null) { + result.completeExceptionally(t); + } else { + int size = buffers.stream().map(ByteBuffer::remaining).collect(Collectors.summingInt(Integer::intValue)).intValue(); + byte[] res = new byte[size]; + int from = 0; + for (ByteBuffer b : buffers) { + int l = b.remaining(); + b.get(res, from, l); + from += l; + } + result.complete(res); + } + buffers.clear(); + } + + public CompletableFuture getResult() { + return result; + } + +} diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClient.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClient.java index 88bc7b9177e..e59a40d9473 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClient.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClient.java @@ -183,7 +183,9 @@ interface BodyConsumer { * @param type one of InputStream, Reader, String, byte[] * @return a CompletableFuture that returns the resulting HttpResponse when complete */ - CompletableFuture> sendAsync(HttpRequest request, Class type); + default CompletableFuture> sendAsync(HttpRequest request, Class type) { + return SendAsyncUtils.sendAsync(request, type, this); + } /** * Send a request and consume the lines of the response body using the same logic as {@link BufferedReader} to diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClientReadableByteChannel.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClientReadableByteChannel.java new file mode 100644 index 00000000000..430dee3cab1 --- /dev/null +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClientReadableByteChannel.java @@ -0,0 +1,128 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.fabric8.kubernetes.client.http; + +import io.fabric8.kubernetes.client.http.HttpClient.AsyncBody; +import io.fabric8.kubernetes.client.http.HttpClient.BodyConsumer; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedByInterruptException; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.ReadableByteChannel; +import java.util.LinkedList; +import java.util.List; + +/** + * Creates a blocking {@link ReadableByteChannel} from a {@link HttpResponse} containing an {@link AsyncBody} + *

+ * May be useful eventually to provide a non-blocking channel as well. + */ +public class HttpClientReadableByteChannel implements ReadableByteChannel, BodyConsumer> { + + private final LinkedList buffers = new LinkedList<>(); + private Throwable failed; + private boolean closed; + private boolean done; + private AsyncBody asyncBody; + private ByteBuffer currentBuffer; + + @Override + public synchronized void consume(List value, AsyncBody asyncBody) throws Exception { + this.buffers.addAll(value); + // could proactively consume based up some byte limit + this.notifyAll(); + } + + protected synchronized void onResponse(HttpResponse response) { + asyncBody = response.body(); + asyncBody.done().whenComplete(this::onBodyDone); + asyncBody.consume(); + this.notifyAll(); + } + + private synchronized void onBodyDone(Void v, Throwable t) { + if (t != null) { + failed = t; + } + done = true; + this.notifyAll(); + } + + @Override + public synchronized void close() { + if (this.closed) { + return; + } + if (asyncBody != null) { + asyncBody.cancel(); + } + this.closed = true; + this.notifyAll(); + } + + @Override + public synchronized boolean isOpen() { + return !closed; + } + + @Override + public synchronized int read(ByteBuffer arg0) throws IOException { + if (closed) { + throw new ClosedChannelException(); + } + + int read = 0; + + while (arg0.hasRemaining()) { + while (currentBuffer == null || !currentBuffer.hasRemaining()) { + if (buffers.isEmpty()) { + if (failed != null) { + throw new IOException("closed", failed); + } + if (read > 0) { + return read; + } + if (done) { + return -1; + } + if (this.asyncBody != null) { + this.asyncBody.consume(); + } + try { + this.wait(); // block until more buffers are delivered + } catch (InterruptedException e) { + close(); + Thread.currentThread().interrupt(); + throw new ClosedByInterruptException(); + } + } + + currentBuffer = buffers.poll(); + } + + int remaining = Math.min(arg0.remaining(), currentBuffer.remaining()); + for (int i = 0; i < remaining; i++) { + arg0.put(currentBuffer.get()); + } + read += remaining; + } + + return read; + } + +} diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/SendAsyncUtils.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/SendAsyncUtils.java new file mode 100644 index 00000000000..af73adb93f3 --- /dev/null +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/SendAsyncUtils.java @@ -0,0 +1,138 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.fabric8.kubernetes.client.http; + +import io.fabric8.kubernetes.client.http.HttpClient.AsyncBody; + +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.nio.channels.Channels; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +/** + * If we need support other than UTF-8, see jdk.internal.net.http.common.Utils.charsetFrom + */ +class SendAsyncUtils { + + /** + * Allows for changing the body type - there is further redesign that could be done here + */ + static class HttpResponseAdapter implements HttpResponse { + + private final HttpResponse response; + private final T body; + + public HttpResponseAdapter(HttpResponse response, T body) { + this.response = response; + this.body = body; + } + + @Override + public List headers(String key) { + return response.headers(key); + } + + @Override + public boolean isSuccessful() { + return response.isSuccessful(); + } + + @Override + public Map> headers() { + return response.headers(); + } + + @Override + public int code() { + return response.code(); + } + + @Override + public String message() { + return response.message(); + } + + @Override + public HttpRequest request() { + return response.request(); + } + + @Override + public Optional> previousResponse() { + return response.previousResponse(); + } + + @Override + public T body() { + return body; + } + + } + + static CompletableFuture> reader(HttpRequest request, HttpClient client) { + return inputStream(request, client) + .thenApply(res -> new HttpResponseAdapter<>(res, new InputStreamReader(res.body(), StandardCharsets.UTF_8))); + } + + static CompletableFuture> inputStream(HttpRequest request, HttpClient client) { + HttpClientReadableByteChannel byteChannel = new HttpClientReadableByteChannel(); + CompletableFuture> futureResponse = client.consumeBytes(request, byteChannel); + return futureResponse.thenApply(res -> { + byteChannel.onResponse(res); + return new HttpResponseAdapter<>(res, Channels.newInputStream(byteChannel)); + }); + } + + static CompletableFuture> bytes(HttpRequest request, HttpClient client) { + ByteArrayBodyHandler byteArrayBodyHandler = new ByteArrayBodyHandler(); + CompletableFuture> futureResponse = client.consumeBytes(request, byteArrayBodyHandler); + return futureResponse.thenCompose(res -> { + byteArrayBodyHandler.onResponse(res); + return byteArrayBodyHandler.getResult() + .thenApply(b -> new HttpResponseAdapter<>(res, b)); + }); + } + + static CompletableFuture> string(HttpRequest request, HttpClient client) { + return bytes(request, client).thenApply(res -> { + return new HttpResponseAdapter<>(res, + new String(res.body(), StandardCharsets.UTF_8)); + }); + } + + static CompletableFuture sendAsync(HttpRequest request, Class type, HttpClient httpClient) { + if (type == String.class) { + return string(request, httpClient); + } + if (type == byte[].class) { + return bytes(request, httpClient); + } + if (type == Reader.class) { + return reader(request, httpClient); + } + if (type == InputStream.class) { + return inputStream(request, httpClient); + } + throw new AssertionError("unknown type"); + } + +}