diff --git a/CHANGELOG.md b/CHANGELOG.md index b3ef5f46c1..1f26b86096 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,7 +15,8 @@ #### Improvements * Fix #4355: for exec, attach, upload, and copy operations the container id/name will be validated or chosen prior to the remote call. You may also use the kubectl.kubernetes.io/default-container annotation to specify the default container. -* Fix #4530: generalizing the Serialization logic to allow for primitive values and clarifying the type expectations. +* Fix #4530: generalizing the Serialization logic to allow for primitive values and clarifying the type expectations. +* Fix #4201: Removed sendAsync from the individual http client implementations #### Dependency Upgrade diff --git a/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientImpl.java b/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientImpl.java index db0ad225c7..342d2c8bce 100644 --- a/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientImpl.java +++ b/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientImpl.java @@ -17,6 +17,7 @@ package io.fabric8.kubernetes.client.jdkhttp; import io.fabric8.kubernetes.client.Config; +import io.fabric8.kubernetes.client.http.AsyncBody; import io.fabric8.kubernetes.client.http.HttpClient; import io.fabric8.kubernetes.client.http.HttpRequest; import io.fabric8.kubernetes.client.http.HttpResponse; @@ -24,27 +25,23 @@ import io.fabric8.kubernetes.client.http.WebSocket; import io.fabric8.kubernetes.client.http.WebSocket.Listener; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.Reader; import java.net.URI; import java.net.http.HttpResponse.BodyHandler; import java.net.http.HttpResponse.BodyHandlers; import java.net.http.HttpResponse.BodySubscriber; -import java.net.http.HttpResponse.BodySubscribers; +import java.net.http.HttpResponse.ResponseInfo; import java.net.http.WebSocketHandshakeException; import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; import java.util.concurrent.Flow; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; @@ -55,40 +52,72 @@ */ public class JdkHttpClientImpl implements HttpClient { - private final class AsyncBodySubscriber implements Subscriber, AsyncBody { - private final BodyConsumer consumer; + /** + * Adapts the BodyHandler to immediately complete the body + */ + private static final class BodyHandlerAdapter implements BodyHandler { + private final AsyncBodySubscriber subscriber; + private final BodyHandler handler; + + private BodyHandlerAdapter(AsyncBodySubscriber subscriber, BodyHandler handler) { + this.subscriber = subscriber; + this.handler = handler; + } + + @Override + public BodySubscriber apply(ResponseInfo responseInfo) { + BodySubscriber bodySubscriber = handler.apply(responseInfo); + return new BodySubscriber() { + CompletableFuture cf = CompletableFuture.completedFuture(subscriber); + + @Override + public void onSubscribe(Subscription subscription) { + bodySubscriber.onSubscribe(subscription); + } + + @Override + public void onNext(List item) { + bodySubscriber.onNext(item); + } + + @Override + public void onError(Throwable throwable) { + bodySubscriber.onError(throwable); + } + + @Override + public void onComplete() { + bodySubscriber.onComplete(); + } + + @Override + public CompletionStage getBody() { + return cf; + } + }; + } + } + + private static final class AsyncBodySubscriber implements Subscriber, AsyncBody { + private final AsyncBody.Consumer consumer; private CompletableFuture done = new CompletableFuture(); - private final AtomicBoolean subscribed = new AtomicBoolean(); - private volatile Flow.Subscription subscription; - private T initialItem; - private boolean first = true; - private boolean isComplete; + private CompletableFuture subscription = new CompletableFuture<>(); - private AsyncBodySubscriber(BodyConsumer consumer) { + private AsyncBodySubscriber(AsyncBody.Consumer consumer) { this.consumer = consumer; } @Override public void onSubscribe(Subscription subscription) { - if (!subscribed.compareAndSet(false, true)) { + if (this.subscription.isDone()) { subscription.cancel(); return; } - this.subscription = subscription; - // the sendAsync future won't complete unless we do the initial request here - // so in onNext we'll trap the item until we're ready - subscription.request(1); + this.subscription.complete(subscription); } @Override public void onNext(T item) { - synchronized (this) { - if (first) { - this.initialItem = item; - first = false; - return; - } - } try { if (item == null) { done.complete(null); @@ -96,7 +125,7 @@ public void onNext(T item) { consumer.consume(item, this); } } catch (Exception e) { - subscription.cancel(); + subscription.thenAccept(Subscription::cancel); done.completeExceptionally(e); } } @@ -108,10 +137,6 @@ public void onError(Throwable throwable) { @Override public synchronized void onComplete() { - if (initialItem != null) { - this.isComplete = true; - return; - } done.complete(null); } @@ -120,19 +145,7 @@ public synchronized void consume() { if (done.isDone()) { return; } - try { - first = false; - if (initialItem != null) { - T item = initialItem; - initialItem = null; - onNext(item); - } - } finally { - if (isComplete) { - done.complete(null); - } - this.subscription.request(1); - } + this.subscription.thenAccept(s -> s.request(1)); } @Override @@ -142,7 +155,7 @@ public CompletableFuture done() { @Override public void cancel() { - subscription.cancel(); + subscription.thenAccept(Subscription::cancel); done.cancel(false); } @@ -240,52 +253,26 @@ public DerivedClientBuilder newBuilder() { } @Override - public CompletableFuture> consumeLines(HttpRequest request, BodyConsumer consumer) { + public CompletableFuture> consumeLines(HttpRequest request, AsyncBody.Consumer consumer) { return sendAsync(request, () -> { AsyncBodySubscriber subscriber = new AsyncBodySubscriber<>(consumer); BodyHandler handler = BodyHandlers.fromLineSubscriber(subscriber); - return new HandlerAndAsyncBody<>(handler, subscriber); - }).thenApply(r -> new JdkHttpResponseImpl(r.response, r.asyncBody)); + BodyHandler handlerAdapter = new BodyHandlerAdapter(subscriber, handler); + return new HandlerAndAsyncBody<>(handlerAdapter, subscriber); + }).thenApply(r -> new JdkHttpResponseImpl<>(r.response, r.asyncBody)); } @Override - public CompletableFuture> consumeBytes(HttpRequest request, BodyConsumer> consumer) { + public CompletableFuture> consumeBytes(HttpRequest request, + AsyncBody.Consumer> consumer) { return sendAsync(request, () -> { AsyncBodySubscriber> subscriber = new AsyncBodySubscriber<>(consumer); BodyHandler handler = BodyHandlers.fromSubscriber(subscriber); - return new HandlerAndAsyncBody<>(handler, subscriber); + BodyHandler handlerAdapter = new BodyHandlerAdapter(subscriber, handler); + return new HandlerAndAsyncBody<>(handlerAdapter, subscriber); }).thenApply(r -> new JdkHttpResponseImpl(r.response, r.asyncBody)); } - @Override - public CompletableFuture> sendAsync(HttpRequest request, Class type) { - return sendAsync(request, () -> new HandlerAndAsyncBody(toBodyHandler(type), null)) - .thenApply(ar -> new JdkHttpResponseImpl<>(ar.response)); - } - - private BodyHandler toBodyHandler(Class type) { - BodyHandler bodyHandler; - if (type == null) { - bodyHandler = (BodyHandler) BodyHandlers.discarding(); - } else if (type == InputStream.class) { - bodyHandler = (BodyHandler) BodyHandlers.ofInputStream(); - } else if (type == String.class) { - bodyHandler = (BodyHandler) BodyHandlers.ofString(); - } else if (type == byte[].class) { - bodyHandler = (BodyHandler) BodyHandlers.ofByteArray(); - } else { - bodyHandler = responseInfo -> { - BodySubscriber upstream = BodyHandlers.ofInputStream().apply(responseInfo); - - BodySubscriber downstream = BodySubscribers.mapping( - upstream, - (InputStream is) -> new InputStreamReader(is, StandardCharsets.UTF_8)); - return (BodySubscriber) downstream; - }; - } - return bodyHandler; - } - public CompletableFuture> sendAsync(HttpRequest request, Supplier> handlerAndAsyncBodySupplier) { JdkHttpRequestImpl jdkRequest = (JdkHttpRequestImpl) request; diff --git a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyAsyncResponseListener.java b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyAsyncResponseListener.java index 4694179a51..3d79a28df7 100644 --- a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyAsyncResponseListener.java +++ b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyAsyncResponseListener.java @@ -15,37 +15,46 @@ */ package io.fabric8.kubernetes.client.jetty; -import io.fabric8.kubernetes.client.KubernetesClientException; -import io.fabric8.kubernetes.client.http.HttpClient; +import io.fabric8.kubernetes.client.http.AsyncBody; import io.fabric8.kubernetes.client.http.HttpRequest; import io.fabric8.kubernetes.client.http.HttpResponse; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Result; +import org.eclipse.jetty.util.Callback; import java.nio.ByteBuffer; import java.util.concurrent.CompletableFuture; +import java.util.function.LongConsumer; -public abstract class JettyAsyncResponseListener extends Response.Listener.Adapter implements HttpClient.AsyncBody { +public abstract class JettyAsyncResponseListener extends Response.Listener.Adapter implements AsyncBody { private final HttpRequest httpRequest; - private final HttpClient.BodyConsumer bodyConsumer; - private final CompletableFuture> asyncResponse; + private final CompletableFuture> asyncResponse; private final CompletableFuture asyncBodyDone; - private boolean consume; + private final CompletableFuture demand; + private boolean initialConsumeCalled; + private Runnable initialConsume; - JettyAsyncResponseListener(HttpRequest httpRequest, HttpClient.BodyConsumer bodyConsumer) { + JettyAsyncResponseListener(HttpRequest httpRequest) { this.httpRequest = httpRequest; - this.bodyConsumer = bodyConsumer; asyncResponse = new CompletableFuture<>(); asyncBodyDone = new CompletableFuture<>(); - consume = false; + demand = new CompletableFuture<>(); } @Override - public synchronized void consume() { - consume = true; - this.notifyAll(); + public void consume() { + synchronized (this) { + if (!this.initialConsumeCalled) { + this.initialConsumeCalled = true; + if (this.initialConsume != null) { + this.initialConsume.run(); + this.initialConsume = null; + } + } + } + demand.thenAccept(l -> l.accept(1)); } @Override @@ -68,29 +77,37 @@ public void onComplete(Result result) { asyncBodyDone.complete(null); } - public CompletableFuture> listen(Request request) { + public CompletableFuture> listen(Request request) { request.send(this); return asyncResponse; } @Override - public void onContent(Response response, ByteBuffer content) { - try { - synchronized (this) { - while (!consume && !asyncBodyDone.isCancelled()) { - this.wait(); - } + public void onContent(Response response, LongConsumer demand, ByteBuffer content, Callback callback) { + synchronized (this) { + if (!initialConsumeCalled) { + // defer until consume is called + this.initialConsume = () -> onContent(response, demand, content, callback); + return; } - if (!asyncBodyDone.isCancelled()) { - bodyConsumer.consume(process(response, content), this); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw KubernetesClientException.launderThrowable(e); + this.demand.complete(demand); + } + try { + onContent(content); + callback.succeeded(); } catch (Exception e) { - throw KubernetesClientException.launderThrowable(e); + callback.failed(e); } } - protected abstract T process(Response response, ByteBuffer content); + /** + * Implement to consume the content of the chunked response. + *

+ * Each chunk will be passed in order to this function (onContent{callback.succeeded}) + * + * @param content the ByteBuffer containing a chunk of the response. + * @throws Exception in case the downstream consumer throws an exception. + */ + protected abstract void onContent(ByteBuffer content) throws Exception; + } diff --git a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpClient.java b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpClient.java index dcf215952a..74437bdb9f 100644 --- a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpClient.java +++ b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpClient.java @@ -17,6 +17,7 @@ import io.fabric8.kubernetes.client.Config; import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.kubernetes.client.http.AsyncBody; import io.fabric8.kubernetes.client.http.HttpRequest; import io.fabric8.kubernetes.client.http.HttpResponse; import io.fabric8.kubernetes.client.http.Interceptor; @@ -24,14 +25,12 @@ import io.fabric8.kubernetes.client.http.WebSocket; import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.api.Request; -import org.eclipse.jetty.client.api.Response; -import org.eclipse.jetty.client.api.Result; -import org.eclipse.jetty.client.util.BufferingResponseListener; import org.eclipse.jetty.client.util.InputStreamRequestContent; import org.eclipse.jetty.client.util.StringRequestContent; import org.eclipse.jetty.websocket.client.WebSocketClient; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -39,9 +38,9 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; +import static io.fabric8.kubernetes.client.http.BufferUtil.copy; import static io.fabric8.kubernetes.client.http.StandardMediaTypes.APPLICATION_OCTET_STREAM; import static io.fabric8.kubernetes.client.http.StandardMediaTypes.TEXT_PLAIN; -import static org.eclipse.jetty.util.BufferUtil.toArray; public class JettyHttpClient implements io.fabric8.kubernetes.client.http.HttpClient { @@ -50,7 +49,7 @@ public class JettyHttpClient implements io.fabric8.kubernetes.client.http.HttpCl private final Collection interceptors; private final JettyHttpClientBuilder builder; private final JettyHttpClientFactory factory; - private Config config; + private final Config config; public JettyHttpClient(JettyHttpClientBuilder builder, HttpClient httpClient, WebSocketClient webSocketClient, Collection interceptors, JettyHttpClientFactory jettyHttpClientFactory, Config config) { @@ -77,36 +76,24 @@ public DerivedClientBuilder newBuilder() { return builder.copy(this); } - @Override - public CompletableFuture> sendAsync(HttpRequest originalRequest, Class type) { - final var supportedResponse = JettyHttpResponse.SupportedResponse.from(type); - final var request = toStandardHttpRequest(originalRequest); - final CompletableFuture> future = new CompletableFuture<>(); - newRequest(request).send(new BufferingResponseListener() { - - // TODO: Long Term Refactor - This Listener blocks until the full response is read and keeps it in memory. - // Find a way to stream the response body without completing the future - // We need two signals, one when the response is received, and one when the body is completely - // read. - // Should this method be completely replaced by consumeXxx()? - @Override - public void onComplete(Result result) { - future.complete(new JettyHttpResponse<>( - request, result.getResponse(), supportedResponse.process(result.getResponse(), getContent(), type))); - } - }); - return interceptResponse(request.toBuilder(), future, r -> sendAsync(r, type)); - } - @Override public CompletableFuture> consumeLines( - HttpRequest originalRequest, BodyConsumer consumer) { + HttpRequest originalRequest, AsyncBody.Consumer consumer) { final var request = toStandardHttpRequest(originalRequest); - final var future = new JettyAsyncResponseListener<>(request, consumer) { + final var future = new JettyAsyncResponseListener(request) { + + final StringBuilder builder = new StringBuilder(); @Override - protected String process(Response response, ByteBuffer content) { - return JettyHttpResponse.SupportedResponse.TEXT.process(response, toArray(content), String.class); + protected void onContent(ByteBuffer content) throws Exception { + for (char c : StandardCharsets.UTF_8.decode(content).array()) { + if (c == '\n') { + consumer.consume(builder.toString(), this); + builder.setLength(0); + } else { + builder.append(c); + } + } } }.listen(newRequest(request)); return interceptResponse(request.toBuilder(), future, r -> consumeLines(r, consumer)); @@ -114,13 +101,14 @@ protected String process(Response response, ByteBuffer content) { @Override public CompletableFuture> consumeBytes( - HttpRequest originalRequest, BodyConsumer> consumer) { + HttpRequest originalRequest, AsyncBody.Consumer> consumer) { final var request = toStandardHttpRequest(originalRequest); - final var future = new JettyAsyncResponseListener<>(request, consumer) { + final var future = new JettyAsyncResponseListener(request) { @Override - protected List process(Response response, ByteBuffer content) { - return Collections.singletonList(content); + protected void onContent(ByteBuffer content) throws Exception { + // we must clone as the buffer can be reused by the byte consumer + consumer.consume(Collections.singletonList(copy(content)), this); } }.listen(newRequest(request)); return interceptResponse(request.toBuilder(), future, r -> consumeBytes(r, consumer)); diff --git a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpResponse.java b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpResponse.java index 77a6669677..1c93df2932 100644 --- a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpResponse.java +++ b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpResponse.java @@ -17,19 +17,13 @@ import io.fabric8.kubernetes.client.http.HttpRequest; import io.fabric8.kubernetes.client.http.HttpResponse; -import io.fabric8.kubernetes.client.utils.Utils; import org.eclipse.jetty.client.api.Response; -import java.io.ByteArrayInputStream; -import java.io.InputStreamReader; -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.function.BiFunction; public class JettyHttpResponse implements HttpResponse { @@ -82,45 +76,4 @@ public Optional> previousResponse() { return Optional.empty(); } - enum SupportedResponse { - - TEXT(String.class, (r, bytes) -> new String(bytes, responseCharset(r))), - INPUT_STREAM(ByteArrayInputStream.class, (r, bytes) -> new ByteArrayInputStream(bytes)), - READER(InputStreamReader.class, (r, bytes) -> new InputStreamReader(new ByteArrayInputStream(bytes), responseCharset(r))), - BYTE_ARRAY(byte[].class, (r, bytes) -> bytes); - - private final Class type; - private final BiFunction processor; - - SupportedResponse(Class type, BiFunction processor) { - this.type = type; - this.processor = processor; - } - - public T process(Response response, byte[] bytes, Class type) { - return type.cast(processor.apply(response, bytes)); - } - - static SupportedResponse from(Class type) { - for (SupportedResponse sr : SupportedResponse.values()) { - if (type.isAssignableFrom(sr.type)) { - return sr; - } - } - throw new IllegalArgumentException("Unsupported response type: " + type.getName()); - } - - private static Charset responseCharset(Response response) { - var responseCharset = StandardCharsets.UTF_8; - final var responseEncoding = response.getHeaders().get("Content-Encoding"); - if (Utils.isNotNullOrEmpty(responseEncoding)) { - try { - responseCharset = Charset.forName(responseEncoding); - } catch (Exception e) { - // ignored - } - } - return responseCharset; - } - } } diff --git a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocket.java b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocket.java index 53ea3a55b4..b254883bdd 100644 --- a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocket.java +++ b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocket.java @@ -36,7 +36,7 @@ public class JettyWebSocket implements WebSocket, WebSocketListener { private final Condition backPressure; private final AtomicBoolean closed; private boolean moreMessages; - private Session webSocketSession; + private volatile Session webSocketSession; public JettyWebSocket(WebSocket.Listener listener) { this.listener = listener; @@ -115,6 +115,7 @@ public void onWebSocketClose(int statusCode, String reason) { @Override public void onWebSocketConnect(Session session) { + this.webSocketSession = session; listener.onOpen(this); } @@ -133,11 +134,6 @@ public void onWebSocketError(Throwable cause) { listener.onError(this, cause); } - public JettyWebSocket setWebSocketSession(Session webSocketSession) { - this.webSocketSession = webSocketSession; - return this; - } - private void backPressure() { try { lock.lock(); diff --git a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketBuilder.java b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketBuilder.java index b3f9990b5e..c5a8135bc5 100644 --- a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketBuilder.java +++ b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketBuilder.java @@ -58,7 +58,7 @@ public CompletableFuture buildAsync(WebSocket.Listener listener) { final CompletableFuture future = new CompletableFuture<>(); final var webSocket = new JettyWebSocket(listener); return webSocketClient.connect(webSocket, Objects.requireNonNull(WebSocket.toWebSocketUri(getUri())), cur) - .thenApply(webSocket::setWebSocketSession) + .thenApply(s -> webSocket) .exceptionally(ex -> { if (ex instanceof CompletionException && ex.getCause() instanceof UpgradeException) { future.completeExceptionally(toHandshakeException((UpgradeException) ex.getCause())); diff --git a/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpClientTest.java b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpClientTest.java index fdf80f1b63..824396d1fe 100644 --- a/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpClientTest.java +++ b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpClientTest.java @@ -101,19 +101,6 @@ void newBuilderInstantiatesJettyHttpClientBuilderWithSameSettings() throws Excep } } - @Test - @DisplayName("sendAsync with unsupported type throws Exception") - void sendAsyncUnsupportedType() { - try (var jettyHttpClient = new JettyHttpClient( - null, httpClient, webSocketClient, Collections.emptyList(), null, null)) { - // When - final var result = assertThrows(IllegalArgumentException.class, - () -> jettyHttpClient.sendAsync(null, Integer.class)); - // Then - assertThat(result).hasMessage("Unsupported response type: java.lang.Integer"); - } - } - @Test @DisplayName("sendAsync with unsupported HttpRequest throws Exception") void sendAsyncUnsupportedHttpRequest() { diff --git a/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpResponseTest.java b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpResponseTest.java index 3bf6233458..f6bc401eac 100644 --- a/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpResponseTest.java +++ b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpResponseTest.java @@ -17,22 +17,13 @@ import org.eclipse.jetty.client.HttpResponse; import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; -import java.io.ByteArrayInputStream; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.Reader; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.params.provider.Arguments.arguments; class JettyHttpResponseTest { @@ -56,22 +47,4 @@ void headersHandlesJettyHttpFields() { .containsEntry("Via", Arrays.asList("proxy-1", "proxy-2")); } - @ParameterizedTest(name = "{index}: SupportedResponse: from type ''{0}'' is ''{1}''") - @MethodSource("supportedResponsesInput") - void supportedResponses(Class type, JettyHttpResponse.SupportedResponse supportedResponse) { - // When - final var result = JettyHttpResponse.SupportedResponse.from(type); - // Then - assertThat(result).isEqualTo(supportedResponse); - } - - static Stream supportedResponsesInput() { - return Stream.of( - arguments(String.class, JettyHttpResponse.SupportedResponse.TEXT), - arguments(InputStream.class, JettyHttpResponse.SupportedResponse.INPUT_STREAM), - arguments(ByteArrayInputStream.class, JettyHttpResponse.SupportedResponse.INPUT_STREAM), - arguments(Reader.class, JettyHttpResponse.SupportedResponse.READER), - arguments(InputStreamReader.class, JettyHttpResponse.SupportedResponse.READER), - arguments(byte[].class, JettyHttpResponse.SupportedResponse.BYTE_ARRAY)); - } } diff --git a/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketTest.java b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketTest.java index 75e05f5ca3..83a3510e86 100644 --- a/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketTest.java +++ b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketTest.java @@ -182,7 +182,7 @@ void sendCloseWhenConnectionIsOpen() { // Given final var jws = new JettyWebSocket(new Listener()); final var session = mock(Session.class); - jws.setWebSocketSession(session); + jws.onWebSocketConnect(session); when(session.isOpen()).thenReturn(true); // When jws.sendClose(1000, "Closing"); @@ -196,7 +196,7 @@ void sendCloseIgnoredWhenConnectionIsClosed() { // Given final var jws = new JettyWebSocket(new Listener()); final var session = mock(Session.class); - jws.setWebSocketSession(session); + jws.onWebSocketConnect(session); when(session.isOpen()).thenReturn(false); // When jws.sendClose(1000, "Closing"); @@ -210,7 +210,7 @@ void sendCloseIgnoredWhenAlreadyClosed() { // Given final var jws = new JettyWebSocket(new Listener()); final var session = mock(Session.class); - jws.setWebSocketSession(session); + jws.onWebSocketConnect(session); when(session.isOpen()).thenReturn(true); jws.sendClose(1000, "Closing"); // When @@ -226,7 +226,7 @@ void sendIncreasesQueueSize() { // Given final var jws = new JettyWebSocket(new Listener()); final var session = mock(Session.class, RETURNS_DEEP_STUBS); - jws.setWebSocketSession(session); + jws.onWebSocketConnect(session); when(session.isOpen()).thenReturn(true); // When jws.send(ByteBuffer.wrap(new byte[] { 1, 3, 3, 7 })); diff --git a/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java b/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java index cf803cf656..a6c25a83c4 100644 --- a/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java +++ b/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java @@ -18,6 +18,7 @@ import io.fabric8.kubernetes.client.Config; import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.kubernetes.client.http.AsyncBody; import io.fabric8.kubernetes.client.http.HttpClient; import io.fabric8.kubernetes.client.http.HttpRequest; import io.fabric8.kubernetes.client.http.HttpResponse; @@ -51,9 +52,6 @@ public class OkHttpClientImpl implements HttpClient { static final Map MEDIA_TYPES = new ConcurrentHashMap<>(); public static final MediaType JSON = parseMediaType("application/json"); - public static final MediaType JSON_PATCH = parseMediaType("application/json-patch+json"); - public static final MediaType STRATEGIC_MERGE_JSON_PATCH = parseMediaType("application/strategic-merge-patch+json"); - public static final MediaType JSON_MERGE_PATCH = parseMediaType("application/merge-patch+json"); static MediaType parseMediaType(String contentType) { MediaType result = MediaType.parse(contentType); @@ -62,11 +60,11 @@ static MediaType parseMediaType(String contentType) { } private abstract class OkHttpAsyncBody implements AsyncBody { - private final BodyConsumer consumer; + private final AsyncBody.Consumer consumer; private final BufferedSource source; private final CompletableFuture done = new CompletableFuture<>(); - private OkHttpAsyncBody(BodyConsumer consumer, BufferedSource source) { + private OkHttpAsyncBody(AsyncBody.Consumer consumer, BufferedSource source) { this.consumer = consumer; this.source = source; } @@ -76,16 +74,21 @@ public void consume() { // consume should not block from a callers perspective try { httpClient.dispatcher().executorService().execute(() -> { - try { - if (!source.exhausted() && !done.isDone()) { - T value = process(source); - consumer.consume(value, this); - } else { - done.complete(null); + // we must serialize multiple consumes as source is not thread-safe + // it would be better to use SerialExecutor, but that would need to move modules, as to + // potentially not hold multiple executor threads + synchronized (source) { + try { + if (!source.exhausted() && !done.isDone()) { + T value = process(source); + consumer.consume(value, this); + } else { + done.complete(null); + } + } catch (Exception e) { + Utils.closeQuietly(source); + done.completeExceptionally(e); } - } catch (Exception e) { - Utils.closeQuietly(source); - done.completeExceptionally(e); } }); } catch (Exception e) { @@ -212,8 +215,8 @@ public DerivedClientBuilder newBuilder() { } @Override - public CompletableFuture> consumeLines(HttpRequest request, - BodyConsumer consumer) { + public CompletableFuture> consumeLines( + HttpRequest request, AsyncBody.Consumer consumer) { Function handler = s -> new OkHttpAsyncBody(consumer, s) { @Override protected String process(BufferedSource source) throws IOException { @@ -227,8 +230,8 @@ protected String process(BufferedSource source) throws IOException { } @Override - public CompletableFuture> consumeBytes(HttpRequest request, - BodyConsumer> consumer) { + public CompletableFuture> consumeBytes( + HttpRequest request, AsyncBody.Consumer> consumer) { Function handler = s -> new OkHttpAsyncBody>(consumer, s) { @Override protected List process(BufferedSource source) throws IOException { @@ -274,30 +277,6 @@ public void onFailure(Call call, IOException e) { return future; } - @Override - public CompletableFuture> sendAsync(HttpRequest request, Class type) { - CompletableFuture> future = new CompletableFuture<>(); - Call call = httpClient.newCall(((OkHttpRequestImpl) request).getRequest()); - call.enqueue(new Callback() { - - @Override - public void onResponse(Call call, Response response) throws IOException { - future.complete(new OkHttpResponseImpl<>(response, type)); - } - - @Override - public void onFailure(Call call, IOException e) { - future.completeExceptionally(e); - } - }); - future.whenComplete((r, t) -> { - if (future.isCancelled()) { - call.cancel(); - } - }); - return future; - } - @Override public io.fabric8.kubernetes.client.http.WebSocket.Builder newWebSocketBuilder() { return new OkHttpWebSocketImpl.BuilderImpl(this.httpClient, newRequestBuilder()); diff --git a/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/OkHttpClientTest.java b/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/OkHttpClientTest.java index 41419d8b40..472891dfb9 100644 --- a/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/OkHttpClientTest.java +++ b/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/OkHttpClientTest.java @@ -17,7 +17,6 @@ package io.fabric8.kubernetes.client.http; import io.fabric8.kubernetes.client.KubernetesClient; -import io.fabric8.kubernetes.client.http.HttpClient.AsyncBody; import io.fabric8.kubernetes.client.http.WebSocket.Listener; import io.fabric8.kubernetes.client.okhttp.OkHttpClientFactory; import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer; @@ -31,12 +30,18 @@ import java.io.InputStream; import java.io.Reader; import java.net.URI; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -51,8 +56,8 @@ protected HttpClient.Factory getHttpClientFactory() { return new OkHttpClientFactory(); } - private KubernetesMockServer server; - private KubernetesClient client; + KubernetesMockServer server; + KubernetesClient client; @BeforeEach void setUp() { @@ -82,7 +87,7 @@ void testWebsocketHandshakeFailure() { assertThrows(WebSocketHandshakeException.class, () -> { try { - startedFuture.get(); + startedFuture.get(10, TimeUnit.SECONDS); } catch (ExecutionException e) { throw e.getCause(); } @@ -139,22 +144,24 @@ public void onMessage(WebSocket webSocket, String text) { assertFalse(latch.await(10, TimeUnit.SECONDS)); assertEquals(1, latch.getCount()); - startedFuture.get().request(); + startedFuture.get(10, TimeUnit.SECONDS).request(); assertTrue(latch.await(10, TimeUnit.SECONDS)); } @Test void testAsyncBody() throws Exception { - server.expect().withPath("/async").andReturn(200, "hello world").always(); + int byteCount = 20000; + server.expect().withPath("/async").andReturn(200, new String(new byte[byteCount], StandardCharsets.UTF_8)).always(); - CompletableFuture consumed = new CompletableFuture<>(); + CompletableFuture consumed = new CompletableFuture<>(); + AtomicInteger total = new AtomicInteger(); CompletableFuture> responseFuture = client.getHttpClient().consumeBytes( client.getHttpClient().newHttpRequestBuilder().uri(URI.create(client.getConfiguration().getMasterUrl() + "async")) .build(), (value, asyncBody) -> { - consumed.complete(true); + value.stream().map(ByteBuffer::remaining).forEach(total::addAndGet); asyncBody.consume(); }); @@ -167,29 +174,74 @@ void testAsyncBody() throws Exception { r.body().done().whenComplete((v, ex) -> { if (ex != null) { consumed.completeExceptionally(ex); + } else { + consumed.complete(total.get()); } - if (v != null) { - consumed.complete(false); + }); + } + }); + + assertEquals(byteCount, consumed.get(10, TimeUnit.SECONDS)); + } + + @Test + void testConsumeLines() throws Exception { + server.expect().withPath("/async").andReturn(200, "hello\nworld\nlines\n").always(); + + ArrayList strings = new ArrayList<>(); + CompletableFuture consumed = new CompletableFuture<>(); + + CompletableFuture> responseFuture = client.getHttpClient().consumeLines( + client.getHttpClient().newHttpRequestBuilder().uri(URI.create(client.getConfiguration().getMasterUrl() + "async")) + .build(), + (value, asyncBody) -> { + strings.add(value); + asyncBody.consume(); + }); + + responseFuture.whenComplete((r, t) -> { + if (t != null) { + consumed.completeExceptionally(t); + } + if (r != null) { + r.body().consume(); + r.body().done().whenComplete((v, ex) -> { + if (ex != null) { + consumed.completeExceptionally(ex); + } else { + consumed.complete(null); } }); } }); - assertTrue(consumed.get(10, TimeUnit.SECONDS)); + consumed.get(10, TimeUnit.SECONDS); + assertEquals(Arrays.asList("hello", "world", "lines"), strings); } @DisplayName("Supported response body types") @ParameterizedTest(name = "{index}: {0}") @ValueSource(classes = { String.class, byte[].class, Reader.class, InputStream.class }) - void testSupportedTypes(Class type) throws Exception { - server.expect().withPath("/type").andReturn(200, "hello world").always(); + void supportedResponseBodyTypes(Class type) throws Exception { + String value = new String(new byte[16384]); + server.expect().withPath("/type").andReturn(200, value).always(); final HttpResponse result = client.getHttpClient() .sendAsync(client.getHttpClient().newHttpRequestBuilder() .uri(URI.create(client.getConfiguration().getMasterUrl() + "type")).build(), type) .get(10, TimeUnit.SECONDS); assertThat(result) .satisfies(r -> assertThat(r.body()).isInstanceOf(type)) - .satisfies(r -> assertThat(r.bodyString()).isEqualTo("hello world")); + .satisfies(r -> assertThat(r.bodyString()).isEqualTo(value)); } + @Test + void supportedResponseTypeWithInvalid() { + final HttpClient httpClient = client.getHttpClient(); + final HttpRequest request = httpClient.newHttpRequestBuilder() + .uri(URI.create(client.getConfiguration().getMasterUrl() + "type")).build(); + assertThatIllegalArgumentException() + .isThrownBy(() -> httpClient.sendAsync(request, Boolean.class)) + .withMessageStartingWith("Unsupported response type: ") + .withMessageContaining("Boolean"); + } } diff --git a/junit/kubernetes-server-mock/src/test/java/io/fabric8/kubernetes/client/server/mock/crud/KubernetesCrudDispatcherPostTest.java b/junit/kubernetes-server-mock/src/test/java/io/fabric8/kubernetes/client/server/mock/crud/KubernetesCrudDispatcherPostTest.java index 8416b8422e..2eb831892e 100644 --- a/junit/kubernetes-server-mock/src/test/java/io/fabric8/kubernetes/client/server/mock/crud/KubernetesCrudDispatcherPostTest.java +++ b/junit/kubernetes-server-mock/src/test/java/io/fabric8/kubernetes/client/server/mock/crud/KubernetesCrudDispatcherPostTest.java @@ -178,7 +178,7 @@ void postNamespaceMismatch() throws Exception { final HttpResponse result = httpClient.sendAsync(request, String.class).get(10, TimeUnit.SECONDS); // Then assertThat(result) - .hasFieldOrPropertyWithValue("response.code", 400) + .returns(400, HttpResponse::code) .extracting(HttpResponse::body).asString() .contains("the namespace of the object (different) does not match the namespace on the URL (test)"); } diff --git a/junit/kubernetes-server-mock/src/test/java/io/fabric8/kubernetes/client/server/mock/crud/KubernetesCrudDispatcherPutTest.java b/junit/kubernetes-server-mock/src/test/java/io/fabric8/kubernetes/client/server/mock/crud/KubernetesCrudDispatcherPutTest.java index b92bacdfe0..da8b2caa4e 100644 --- a/junit/kubernetes-server-mock/src/test/java/io/fabric8/kubernetes/client/server/mock/crud/KubernetesCrudDispatcherPutTest.java +++ b/junit/kubernetes-server-mock/src/test/java/io/fabric8/kubernetes/client/server/mock/crud/KubernetesCrudDispatcherPutTest.java @@ -113,7 +113,7 @@ void putNameMismatch() throws Exception { final HttpResponse result = httpClient.sendAsync(request, String.class).get(10, TimeUnit.SECONDS); // Then assertThat(result) - .hasFieldOrPropertyWithValue("response.code", 400) + .returns(400, HttpResponse::code) .extracting(HttpResponse::body).asString() .contains("the name of the object (different) does not match the name on the URL (mismatched-name)"); } diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/AsyncBody.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/AsyncBody.java new file mode 100644 index 0000000000..3b6c1f3123 --- /dev/null +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/AsyncBody.java @@ -0,0 +1,49 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.fabric8.kubernetes.client.http; + +import java.util.concurrent.CompletableFuture; + +/** + * A simplified java.util.concurrent.Flow.Subscription and a future tracking done. + *
+ * The body should be consumed until the end or cancelled. + */ +public interface AsyncBody { + /** + * Called to deliver results to the {@link AsyncBody.Consumer} + */ + void consume(); + + /** + * Tracks the completion of the body. + * + * @return the future + */ + CompletableFuture done(); + + void cancel(); + + /** + * A functional interface for consuming async result bodies + */ + @FunctionalInterface + interface Consumer { + void consume(T value, AsyncBody asyncBody) throws Exception; + + } +} diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/BufferUtil.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/BufferUtil.java new file mode 100644 index 0000000000..bfc8d3fa04 --- /dev/null +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/BufferUtil.java @@ -0,0 +1,77 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.http; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; + +public class BufferUtil { + + private BufferUtil() { + // utils class + } + + /** + * Convert a ByteBuffer to a byte array. + * + * @param buffer The buffer to convert in flush mode. The buffer is not altered. + * @return An array of bytes duplicated from the buffer. + */ + public static byte[] toArray(ByteBuffer buffer) { + if (buffer.hasArray()) { + byte[] array = buffer.array(); + int from = buffer.arrayOffset() + buffer.position(); + return Arrays.copyOfRange(array, from, from + buffer.remaining()); + } else { + byte[] to = new byte[buffer.remaining()]; + buffer.slice().get(to); + return to; + } + } + + public static byte[] toArray(Collection buffers) { + final List arrays = buffers.stream().map(BufferUtil::toArray).collect(Collectors.toList()); + final byte[] ret = new byte[arrays.stream().mapToInt(a -> a.length).sum()]; + int offset = 0; + for (byte[] array : arrays) { + System.arraycopy(array, 0, ret, offset, array.length); + offset += array.length; + } + return ret; + } + + /** + * Deep copy/clone of a ByteBuffer. + * + * @param buffer The buffer to copy. + * @return A copy of the provided buffer. + */ + public static ByteBuffer copy(ByteBuffer buffer) { + if (buffer == null) { + return null; + } + final int position = buffer.position(); + ByteBuffer clone = buffer.isDirect() ? ByteBuffer.allocateDirect(buffer.remaining()) + : ByteBuffer.allocate(buffer.remaining()); + clone.put(buffer); + clone.flip(); + buffer.position(position); + return clone; + } +} diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/ByteArrayBodyHandler.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/ByteArrayBodyHandler.java new file mode 100644 index 0000000000..bc41c6c603 --- /dev/null +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/ByteArrayBodyHandler.java @@ -0,0 +1,56 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.fabric8.kubernetes.client.http; + +import java.nio.ByteBuffer; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import static io.fabric8.kubernetes.client.http.BufferUtil.toArray; + +public class ByteArrayBodyHandler implements AsyncBody.Consumer> { + + private final LinkedList buffers = new LinkedList<>(); + private final CompletableFuture result = new CompletableFuture<>(); + + @Override + public synchronized void consume(List value, AsyncBody asyncBody) throws Exception { + this.buffers.addAll(value); + asyncBody.consume(); + } + + protected void onResponse(HttpResponse response) { + AsyncBody asyncBody = response.body(); + asyncBody.done().whenComplete(this::onBodyDone); + asyncBody.consume(); + } + + private synchronized void onBodyDone(Void v, Throwable t) { + if (t != null) { + result.completeExceptionally(t); + } else { + result.complete(toArray(buffers)); + } + buffers.clear(); + } + + public CompletableFuture getResult() { + return result; + } + +} diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClient.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClient.java index ff17fce708..809c8677c4 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClient.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClient.java @@ -150,35 +150,6 @@ interface Builder extends DerivedClientBuilder { Builder preferHttp11(); } - /** - * A simplified java.util.concurrent.Flow.Subscription and a future tracking done. - *
- * The body should be consumed until the end or cancelled. - */ - interface AsyncBody { - /** - * Called to deliver results to the {@link BodyConsumer} - */ - void consume(); - - /** - * Tracks the completion of the body. - * - * @return the future - */ - CompletableFuture done(); - - void cancel(); - } - - /** - * A functional interface for consuming async result bodies - */ - @FunctionalInterface - interface BodyConsumer { - void consume(T value, AsyncBody asyncBody) throws Exception; - } - @Override void close(); @@ -204,7 +175,9 @@ interface BodyConsumer { * @param type one of InputStream, Reader, String, byte[] * @return a CompletableFuture that returns the resulting HttpResponse when complete */ - CompletableFuture> sendAsync(HttpRequest request, Class type); + default CompletableFuture> sendAsync(HttpRequest request, Class type) { + return HttpResponse.SupportedResponses.from(type).sendAsync(request, this); + } /** * Send a request and consume the lines of the response body using the same logic as {@link BufferedReader} to @@ -214,16 +187,18 @@ interface BodyConsumer { * @param consumer the response body consumer * @return the future which will be ready after the headers have been read */ - CompletableFuture> consumeLines(HttpRequest request, BodyConsumer consumer); + CompletableFuture> consumeLines(HttpRequest request, AsyncBody.Consumer consumer); /** * Send a request and consume the bytes of the resulting response body + *

+ * HtttpClient implementations will provide ByteBuffers that may be held directly. * * @param request the HttpRequest to send * @param consumer the response body consumer * @return the future which will be ready after the headers have been read */ - CompletableFuture> consumeBytes(HttpRequest request, BodyConsumer> consumer); + CompletableFuture> consumeBytes(HttpRequest request, AsyncBody.Consumer> consumer); WebSocket.Builder newWebSocketBuilder(); diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClientReadableByteChannel.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClientReadableByteChannel.java new file mode 100644 index 0000000000..391fbae921 --- /dev/null +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClientReadableByteChannel.java @@ -0,0 +1,136 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.fabric8.kubernetes.client.http; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedByInterruptException; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.ReadableByteChannel; +import java.util.LinkedList; +import java.util.List; + +/** + * Creates a blocking {@link ReadableByteChannel} from a {@link HttpResponse} containing an {@link AsyncBody} + *

+ * May be useful eventually to provide a non-blocking channel as well. + */ +public class HttpClientReadableByteChannel implements ReadableByteChannel, AsyncBody.Consumer> { + + private final LinkedList buffers = new LinkedList<>(); + private Throwable failed; + private boolean closed; + private boolean done; + private AsyncBody asyncBody; + private ByteBuffer currentBuffer; + private boolean consumeRequested; + + @Override + public synchronized void consume(List value, AsyncBody asyncBody) throws Exception { + this.buffers.addAll(value); + // could proactively consume based up some byte limit + this.notifyAll(); + this.consumeRequested = false; + } + + protected synchronized void onResponse(HttpResponse response) { + asyncBody = response.body(); + asyncBody.done().whenComplete(this::onBodyDone); + asyncBody.consume(); // pre-fetch the first results + this.notifyAll(); + } + + private synchronized void onBodyDone(Void v, Throwable t) { + if (t != null) { + failed = t; + } + done = true; + this.notifyAll(); + } + + @Override + public synchronized void close() { + if (this.closed) { + return; + } + if (asyncBody != null) { + asyncBody.cancel(); + } + this.closed = true; + this.notifyAll(); + } + + @Override + public synchronized boolean isOpen() { + return !closed; + } + + @Override + public synchronized int read(ByteBuffer arg0) throws IOException { + if (closed) { + throw new ClosedChannelException(); + } + + int read = 0; + + while (arg0.hasRemaining()) { + while (currentBuffer == null || !currentBuffer.hasRemaining()) { + if (buffers.isEmpty()) { + if (failed != null) { + throw new IOException("channel already closed with exception", failed); + } + if (read > 0) { + return read; + } + if (done) { + return -1; + } + if (!consumeRequested && this.asyncBody != null) { + consumeRequested = true; + this.asyncBody.consume(); + // the consume call may actually trigger result deliver + // if it did, then just start the loop over or be done + if (!consumeRequested) { + continue; + } + if (done) { + return -1; + } + } + try { + this.wait(); // block until more buffers are delivered + } catch (InterruptedException e) { + close(); + Thread.currentThread().interrupt(); + throw new ClosedByInterruptException(); + } + } + + currentBuffer = buffers.poll(); + } + + int remaining = Math.min(arg0.remaining(), currentBuffer.remaining()); + for (int i = 0; i < remaining; i++) { + arg0.put(currentBuffer.get()); + } + read += remaining; + } + + return read; + } + +} diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpResponse.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpResponse.java index 921fe9e15c..c7d91b9dfc 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpResponse.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpResponse.java @@ -23,6 +23,7 @@ import java.io.InputStream; import java.io.Reader; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import static java.nio.charset.StandardCharsets.UTF_8; @@ -38,7 +39,7 @@ static boolean isSuccessful(int code) { /** * Returns the HTTP status code. - * + * * @return the HTTP status code. */ int code(); @@ -63,29 +64,71 @@ default String message() { * @throws IOException in case there's an I/O problem */ default String bodyString() throws IOException { - Object body = body(); - if (body == null) { + final Object body = body(); + final SupportedResponses supportedResponse = SupportedResponses.from(body); + if (supportedResponse == null) { return ""; } - if (body instanceof String) { - return (String) body; - } - if (body instanceof Reader) { - return IOHelpers.readFully((Reader) body); - } - if (body instanceof byte[]) { - return new String((byte[]) body, UTF_8); - } - return IOHelpers.readFully((InputStream) body, UTF_8); + return supportedResponse.asString(body); } /** * The original {@link HttpRequest} that initiated this response. - * + * * @return the HTTP request. */ HttpRequest request(); Optional> previousResponse(); + enum SupportedResponses { + TEXT(String.class, Object::toString, SendAsyncUtils::string), + INPUT_STREAM(InputStream.class, body -> IOHelpers.readFully((InputStream) body, UTF_8), SendAsyncUtils::inputStream), + READER(Reader.class, body -> IOHelpers.readFully((Reader) body), SendAsyncUtils::reader), + BYTE_ARRAY(byte[].class, body -> new String((byte[]) body, UTF_8), SendAsyncUtils::bytes); + + private final Class type; + private final ToString toString; + private final Async async; + + SupportedResponses(Class type, ToString toString, Async async) { + this.type = type; + this.toString = toString; + this.async = async; + } + + private String asString(Object body) throws IOException { + return toString.toString(body); + } + + CompletableFuture> sendAsync(HttpRequest request, HttpClient client) { + return ((Async) async).sendAsync(request, client); + } + + public static SupportedResponses from(Object object) { + if (object == null) { + return null; + } + return from(object.getClass()); + } + + public static SupportedResponses from(Class type) { + for (SupportedResponses sr : SupportedResponses.values()) { + if (sr.type.isAssignableFrom(type)) { + return sr; + } + } + throw new IllegalArgumentException("Unsupported response type: " + type.getName()); + } + + @FunctionalInterface + interface ToString { + String toString(Object body) throws IOException; + } + + @FunctionalInterface + interface Async { + CompletableFuture> sendAsync(HttpRequest request, HttpClient client); + } + } } diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpResponseAdapter.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpResponseAdapter.java new file mode 100644 index 0000000000..cdd617f3de --- /dev/null +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpResponseAdapter.java @@ -0,0 +1,80 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.fabric8.kubernetes.client.http; + +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** + * HttpResponse implementation wrapping an existing HttpResponse with a processed body. + *

+ * Allows for changing the body type - there is further redesign that could be done here + * + * @param the type of the body. + */ +class HttpResponseAdapter implements HttpResponse { + + private final HttpResponse response; + private final T body; + + public HttpResponseAdapter(HttpResponse response, T body) { + this.response = response; + this.body = body; + } + + @Override + public List headers(String key) { + return response.headers(key); + } + + @Override + public boolean isSuccessful() { + return response.isSuccessful(); + } + + @Override + public Map> headers() { + return response.headers(); + } + + @Override + public int code() { + return response.code(); + } + + @Override + public String message() { + return response.message(); + } + + @Override + public HttpRequest request() { + return response.request(); + } + + @Override + public Optional> previousResponse() { + return response.previousResponse(); + } + + @Override + public T body() { + return body; + } + +} diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/SendAsyncUtils.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/SendAsyncUtils.java new file mode 100644 index 0000000000..487ab30a83 --- /dev/null +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/SendAsyncUtils.java @@ -0,0 +1,64 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.fabric8.kubernetes.client.http; + +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.nio.channels.Channels; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.CompletableFuture; + +/** + * If we need support other than UTF-8, see jdk.internal.net.http.common.Utils.charsetFrom + */ +class SendAsyncUtils { + + private SendAsyncUtils() { + // just utils + } + + static CompletableFuture> reader(HttpRequest request, HttpClient client) { + return inputStream(request, client) + .thenApply(res -> new HttpResponseAdapter<>(res, new InputStreamReader(res.body(), StandardCharsets.UTF_8))); + } + + static CompletableFuture> inputStream(HttpRequest request, HttpClient client) { + HttpClientReadableByteChannel byteChannel = new HttpClientReadableByteChannel(); + CompletableFuture> futureResponse = client.consumeBytes(request, byteChannel); + return futureResponse.thenApply(res -> { + byteChannel.onResponse(res); + return new HttpResponseAdapter<>(res, Channels.newInputStream(byteChannel)); + }); + } + + static CompletableFuture> bytes(HttpRequest request, HttpClient client) { + ByteArrayBodyHandler byteArrayBodyHandler = new ByteArrayBodyHandler(); + CompletableFuture> futureResponse = client.consumeBytes(request, byteArrayBodyHandler); + return futureResponse.thenCompose(res -> { + byteArrayBodyHandler.onResponse(res); + return byteArrayBodyHandler.getResult() + .thenApply(b -> new HttpResponseAdapter<>(res, b)); + }); + } + + static CompletableFuture> string(HttpRequest request, HttpClient client) { + return bytes(request, client) + .thenApply(res -> new HttpResponseAdapter<>(res, new String(res.body(), StandardCharsets.UTF_8))); + } + +} diff --git a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractAsyncBodyTest.java b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractAsyncBodyTest.java index a55e868e60..97420be7c1 100644 --- a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractAsyncBodyTest.java +++ b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractAsyncBodyTest.java @@ -23,6 +23,8 @@ import java.nio.CharBuffer; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -56,7 +58,7 @@ public void consumeLinesProcessedAfterConsume() throws Exception { .andReturn(200, "This is the response body\n") .always(); final StringBuffer responseText = new StringBuffer(); - final HttpResponse asyncBodyResponse = client.consumeLines( + final HttpResponse asyncBodyResponse = client.consumeLines( client.newHttpRequestBuilder().uri(server.url("/consume-lines")).build(), (value, asyncBody) -> { responseText.append(value); @@ -78,7 +80,7 @@ public void consumeLinesNotProcessedIfCancelled() throws Exception { .andReturn(200, "This would be the response body") .always(); final StringBuffer responseText = new StringBuffer(); - final HttpResponse asyncBodyResponse = client + final HttpResponse asyncBodyResponse = client .consumeLines(client.newHttpRequestBuilder() .uri(server.url("/cancel")).build(), (value, asyncBody) -> { responseText.append(value); @@ -93,15 +95,36 @@ public void consumeLinesNotProcessedIfCancelled() throws Exception { } } + @Test + @DisplayName("Lines are processed completely") + public void consumeLinesProcessesAllLines() throws Exception { + try (final HttpClient client = getHttpClientFactory().newBuilder().build()) { + server.expect().withPath("/consume-lines") + .andReturn(200, "This is the response body\nWith\nMultiple\n lines\n") + .always(); + final List receivedLines = new ArrayList<>(); + final HttpResponse asyncBodyResponse = client.consumeLines( + client.newHttpRequestBuilder().uri(server.url("/consume-lines")).build(), + (value, asyncBody) -> { + receivedLines.add(value); + asyncBody.consume(); + }) + .get(10L, TimeUnit.SECONDS); + asyncBodyResponse.body().consume(); + asyncBodyResponse.body().done().get(10L, TimeUnit.SECONDS); + assertThat(receivedLines).containsExactly("This is the response body", "With", "Multiple", " lines"); + } + } + @Test @DisplayName("Bytes are processed and consumed only after the consume() invocation") - public void consumeByteBufferLinesProcessedAfterConsume() throws Exception { + public void consumeBytesProcessedAfterConsume() throws Exception { try (final HttpClient client = getHttpClientFactory().newBuilder().build()) { server.expect().withPath("/consume-bytes") .andReturn(200, "This is the response body as bytes") .always(); final StringBuffer responseText = new StringBuffer(); - final HttpResponse asyncBodyResponse = client.consumeBytes( + final HttpResponse asyncBodyResponse = client.consumeBytes( client.newHttpRequestBuilder().uri(server.url("/consume-bytes")).build(), (value, asyncBody) -> { responseText.append(value.stream().map(StandardCharsets.UTF_8::decode) diff --git a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractInterceptorTest.java b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractInterceptorTest.java index e380a93a9a..e0eb6e73a3 100644 --- a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractInterceptorTest.java +++ b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractInterceptorTest.java @@ -94,7 +94,7 @@ public void before(BasicBuilder builder, HttpHeaders headers) { @DisplayName("afterFailure (HTTP), replaces the HttpResponse produced by HttpClient.sendAsync") public void afterHttpFailureReplacesResponseInSendAsync() throws Exception { // Given - server.expect().withPath("/intercepted-url").andReturn(200, "This works").always(); + server.expect().withPath("/intercepted-url").andReturn(200, "This works").once(); final HttpClient.Builder builder = getHttpClientFactory().newBuilder() .addOrReplaceInterceptor("test", new Interceptor() { @Override @@ -119,7 +119,7 @@ public CompletableFuture afterFailure(BasicBuilder builder, HttpRespons @DisplayName("afterFailure (HTTP), replaces the HttpResponse produced by HttpClient.consumeLines") public void afterHttpFailureReplacesResponseInConsumeLines() throws Exception { // Given - server.expect().withPath("/intercepted-url").andReturn(200, "This works").always(); + server.expect().withPath("/intercepted-url").andReturn(200, "This works\n").once(); final HttpClient.Builder builder = getHttpClientFactory().newBuilder() .addOrReplaceInterceptor("test", new Interceptor() { @Override @@ -131,16 +131,22 @@ public CompletableFuture afterFailure(BasicBuilder builder, HttpRespons final CompletableFuture result = new CompletableFuture<>(); // When try (HttpClient client = builder.build()) { - final HttpResponse asyncR = client.consumeLines( + final HttpResponse asyncR = client.consumeLines( client.newHttpRequestBuilder().uri(server.url("/not-found")).build(), (s, ab) -> { result.complete(s); ab.consume(); }) .get(10L, TimeUnit.SECONDS); asyncR.body().consume(); - asyncR.body().done().get(10L, TimeUnit.SECONDS); + asyncR.body().done().whenComplete((v, t) -> { + if (t != null) { + result.completeExceptionally(t); + } else { + result.complete(null); + } + }); // Then - assertThat(result.get()).isEqualTo("This works"); + assertThat(result.get(10L, TimeUnit.SECONDS)).isEqualTo("This works"); } } @@ -148,7 +154,7 @@ public CompletableFuture afterFailure(BasicBuilder builder, HttpRespons @DisplayName("afterFailure (HTTP), replaces the HttpResponse produced by HttpClient.consumeBytes") public void afterHttpFailureReplacesResponseInConsumeBytes() throws Exception { // Given - server.expect().withPath("/intercepted-url").andReturn(200, "This works").always(); + server.expect().withPath("/intercepted-url").andReturn(200, "This works").once(); final HttpClient.Builder builder = getHttpClientFactory().newBuilder() .addOrReplaceInterceptor("test", new Interceptor() { @Override @@ -160,7 +166,7 @@ public CompletableFuture afterFailure(BasicBuilder builder, HttpRespons final CompletableFuture result = new CompletableFuture<>(); // When try (HttpClient client = builder.build()) { - final HttpResponse asyncR = client.consumeBytes( + final HttpResponse asyncR = client.consumeBytes( client.newHttpRequestBuilder().uri(server.url("/not-found")).build(), (s, ab) -> { result.complete(StandardCharsets.UTF_8.decode(s.iterator().next()).toString()); diff --git a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/BufferUtilTest.java b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/BufferUtilTest.java new file mode 100644 index 0000000000..2ef6342198 --- /dev/null +++ b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/BufferUtilTest.java @@ -0,0 +1,129 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.http; + +import org.assertj.core.api.InstanceOfAssertFactories; +import org.junit.jupiter.api.Test; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; + +import static org.assertj.core.api.Assertions.assertThat; + +class BufferUtilTest { + + @Test + void toArrayFromBBWithArray() { + final ByteBuffer buffer = ByteBuffer.wrap("hello".getBytes(StandardCharsets.UTF_8)); + final byte[] result = BufferUtil.toArray(buffer); + assertThat(result).asString(StandardCharsets.UTF_8).isEqualTo("hello"); + } + + @Test + void toArrayFromBB() { + final ByteBuffer buffer = ByteBuffer.allocateDirect(5); + buffer.put((byte) 104); + buffer.put((byte) 101); + buffer.put((byte) 108); + buffer.put((byte) 108); + buffer.put((byte) 111); + buffer.rewind(); + final byte[] result = BufferUtil.toArray(buffer); + assertThat(result).asString(StandardCharsets.UTF_8).isEqualTo("hello"); + } + + @Test + void toArrayFromBBInPosition() { + final ByteBuffer buffer = ByteBuffer.allocateDirect(12); + buffer.position(7); + buffer.put((byte) 104); + buffer.put((byte) 101); + buffer.put((byte) 108); + buffer.put((byte) 108); + buffer.put((byte) 111); + buffer.position(7); + final byte[] result = BufferUtil.toArray(buffer); + assertThat(result).asString(StandardCharsets.UTF_8).isEqualTo("hello"); + } + + @Test + void toArrayFromMultipleBB() { + final ByteBuffer hello = ByteBuffer.allocateDirect(12); + hello.position(7); + hello.put((byte) 104); + hello.put((byte) 101); + hello.put((byte) 108); + hello.put((byte) 108); + hello.put((byte) 111); + hello.position(7); + final ByteBuffer space = ByteBuffer.allocateDirect(1); + space.put((byte) 32); + space.rewind(); + final ByteBuffer world = ByteBuffer.wrap("world".getBytes(StandardCharsets.UTF_8)); + final byte[] result = BufferUtil.toArray(Arrays.asList(hello, space, world)); + assertThat(result).asString(StandardCharsets.UTF_8).isEqualTo("hello world"); + } + + @Test + void copyReturnsADifferentInstance() { + final ByteBuffer buffer = ByteBuffer.wrap("hello".getBytes(StandardCharsets.UTF_8)); + final ByteBuffer result = BufferUtil.copy(buffer); + assertThat(result) + .isNotSameAs(buffer) + .extracting(BufferUtil::toArray).asInstanceOf(InstanceOfAssertFactories.BYTE_ARRAY) + .asString(StandardCharsets.UTF_8) + .isEqualTo("hello"); + } + + @Test + void copyReturnsInstanceWithSameContent() { + final ByteBuffer buffer = ByteBuffer.wrap("hello".getBytes(StandardCharsets.UTF_8)); + final ByteBuffer result = BufferUtil.copy(buffer); + assertThat(result) + .extracting(BufferUtil::toArray) + .asInstanceOf(InstanceOfAssertFactories.BYTE_ARRAY) + .asString(StandardCharsets.UTF_8) + .isEqualTo("hello"); + } + + @Test + void copyReturnsInstanceWithSameContentRange() { + final ByteBuffer buffer = ByteBuffer.wrap("hello".getBytes(StandardCharsets.UTF_8)); + buffer.position(2); + final ByteBuffer result = BufferUtil.copy(buffer); + assertThat(result) + .extracting(BufferUtil::toArray) + .asInstanceOf(InstanceOfAssertFactories.BYTE_ARRAY) + .asString(StandardCharsets.UTF_8) + .isEqualTo("llo"); + } + + @Test + void copyReturnsInstancePreservingPositionOfOriginal() { + final ByteBuffer buffer = ByteBuffer.wrap("hello".getBytes(StandardCharsets.UTF_8)); + buffer.position(2); + BufferUtil.copy(buffer); + assertThat(buffer) + .extracting(ByteBuffer::position) + .isEqualTo(2); + } + + @Test + void copyWithNullReturnsNull() { + assertThat(BufferUtil.copy(null)).isNull(); + } +} diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/LogWatchCallback.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/LogWatchCallback.java index 82aad07290..5f3b08ed3a 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/LogWatchCallback.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/LogWatchCallback.java @@ -17,6 +17,7 @@ import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.dsl.LogWatch; +import io.fabric8.kubernetes.client.http.AsyncBody; import io.fabric8.kubernetes.client.http.HttpClient; import io.fabric8.kubernetes.client.http.HttpRequest; import io.fabric8.kubernetes.client.utils.internal.SerialExecutor; @@ -45,7 +46,7 @@ public class LogWatchCallback implements LogWatch, AutoCloseable { private volatile InputStream output; private final AtomicBoolean closed = new AtomicBoolean(false); - private volatile Optional asyncBody = Optional.empty(); + private volatile Optional asyncBody = Optional.empty(); private final SerialExecutor serialExecutor; public LogWatchCallback(OutputStream out, Executor executor) { @@ -65,7 +66,7 @@ private void cleanUp() { if (!closed.compareAndSet(false, true)) { return; } - asyncBody.ifPresent(HttpClient.AsyncBody::cancel); + asyncBody.ifPresent(AsyncBody::cancel); serialExecutor.shutdownNow(); } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java index 3bfd38c0ef..e114495c5f 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java @@ -19,8 +19,8 @@ import io.fabric8.kubernetes.api.model.KubernetesResourceList; import io.fabric8.kubernetes.api.model.ListOptions; import io.fabric8.kubernetes.client.Watcher; +import io.fabric8.kubernetes.client.http.AsyncBody; import io.fabric8.kubernetes.client.http.HttpClient; -import io.fabric8.kubernetes.client.http.HttpClient.AsyncBody; import io.fabric8.kubernetes.client.http.HttpRequest; import io.fabric8.kubernetes.client.http.HttpResponse; import org.slf4j.Logger; diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/WatchHttpManagerTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/WatchHttpManagerTest.java index 0143ccdde2..cc2a8323e1 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/WatchHttpManagerTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/WatchHttpManagerTest.java @@ -20,8 +20,8 @@ import io.fabric8.kubernetes.api.model.KubernetesResourceList; import io.fabric8.kubernetes.api.model.ListOptions; import io.fabric8.kubernetes.client.Watcher; +import io.fabric8.kubernetes.client.http.AsyncBody; import io.fabric8.kubernetes.client.http.HttpClient; -import io.fabric8.kubernetes.client.http.HttpClient.AsyncBody; import io.fabric8.kubernetes.client.http.HttpClient.DerivedClientBuilder; import io.fabric8.kubernetes.client.http.HttpResponse; import org.junit.jupiter.api.Test;