From 32dd003db382e6f96acfbf63f25ecb8db0c5152e Mon Sep 17 00:00:00 2001 From: Steve Hawkins Date: Wed, 5 Oct 2022 08:11:43 -0400 Subject: [PATCH] fix #4201: ensuring okhttp is tolerant to multiple consume calls also converting jetty to fully non-blocking and adapting the body delivery for jdk for use with sendAsync logic - we need the AsyncBody to be available immediately --- .../client/jdkhttp/JdkHttpClientImpl.java | 98 +++++++++++-------- .../jetty/JettyAsyncResponseListener.java | 47 +++++---- .../client/okhttp/OkHttpClientImpl.java | 23 +++-- .../client/http/OkHttpClientTest.java | 27 ++--- .../http/HttpClientReadableByteChannel.java | 14 ++- .../client/http/AbstractInterceptorTest.java | 16 ++- 6 files changed, 139 insertions(+), 86 deletions(-) diff --git a/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientImpl.java b/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientImpl.java index 46f8bc688ba..757ee61593d 100644 --- a/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientImpl.java +++ b/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientImpl.java @@ -26,6 +26,8 @@ import java.net.URI; import java.net.http.HttpResponse.BodyHandler; import java.net.http.HttpResponse.BodyHandlers; +import java.net.http.HttpResponse.BodySubscriber; +import java.net.http.HttpResponse.ResponseInfo; import java.net.http.WebSocketHandshakeException; import java.nio.ByteBuffer; import java.util.List; @@ -33,10 +35,10 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; import java.util.concurrent.Flow; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; @@ -47,14 +49,56 @@ */ public class JdkHttpClientImpl implements HttpClient { + /** + * Adapts the BodyHandler to immediately complete the body + */ + private static final class BodyHandlerAdapter implements BodyHandler { + private final AsyncBodySubscriber subscriber; + private final BodyHandler handler; + + private BodyHandlerAdapter(AsyncBodySubscriber subscriber, BodyHandler handler) { + this.subscriber = subscriber; + this.handler = handler; + } + + @Override + public BodySubscriber apply(ResponseInfo responseInfo) { + BodySubscriber bodySubscriber = handler.apply(responseInfo); + return new BodySubscriber() { + CompletableFuture cf = CompletableFuture.completedFuture(subscriber); + + @Override + public void onSubscribe(Subscription subscription) { + bodySubscriber.onSubscribe(subscription); + } + + @Override + public void onNext(List item) { + bodySubscriber.onNext(item); + } + + @Override + public void onError(Throwable throwable) { + bodySubscriber.onError(throwable); + } + + @Override + public void onComplete() { + bodySubscriber.onComplete(); + } + + @Override + public CompletionStage getBody() { + return cf; + } + }; + } + } + private final class AsyncBodySubscriber implements Subscriber, AsyncBody { private final BodyConsumer consumer; private CompletableFuture done = new CompletableFuture(); - private final AtomicBoolean subscribed = new AtomicBoolean(); - private volatile Flow.Subscription subscription; - private T initialItem; - private boolean first = true; - private boolean isComplete; + private CompletableFuture subscription = new CompletableFuture<>(); private AsyncBodySubscriber(BodyConsumer consumer) { this.consumer = consumer; @@ -62,25 +106,15 @@ private AsyncBodySubscriber(BodyConsumer consumer) { @Override public void onSubscribe(Subscription subscription) { - if (!subscribed.compareAndSet(false, true)) { + if (this.subscription.isDone()) { subscription.cancel(); return; } - this.subscription = subscription; - // the sendAsync future won't complete unless we do the initial request here - // so in onNext we'll trap the item until we're ready - subscription.request(1); + this.subscription.complete(subscription); } @Override public void onNext(T item) { - synchronized (this) { - if (first) { - this.initialItem = item; - first = false; - return; - } - } try { if (item == null) { done.complete(null); @@ -88,7 +122,7 @@ public void onNext(T item) { consumer.consume(item, this); } } catch (Exception e) { - subscription.cancel(); + subscription.thenAccept(Subscription::cancel); done.completeExceptionally(e); } } @@ -100,10 +134,6 @@ public void onError(Throwable throwable) { @Override public synchronized void onComplete() { - if (initialItem != null) { - this.isComplete = true; - return; - } done.complete(null); } @@ -112,19 +142,7 @@ public synchronized void consume() { if (done.isDone()) { return; } - try { - first = false; - if (initialItem != null) { - T item = initialItem; - initialItem = null; - onNext(item); - } - } finally { - if (isComplete) { - done.complete(null); - } - this.subscription.request(1); - } + this.subscription.thenAccept(s -> s.request(1)); } @Override @@ -134,7 +152,7 @@ public CompletableFuture done() { @Override public void cancel() { - subscription.cancel(); + subscription.thenAccept(Subscription::cancel); done.cancel(false); } @@ -234,7 +252,8 @@ public CompletableFuture> consumeLines(HttpRequest reque return sendAsync(request, () -> { AsyncBodySubscriber subscriber = new AsyncBodySubscriber<>(consumer); BodyHandler handler = BodyHandlers.fromLineSubscriber(subscriber); - return new HandlerAndAsyncBody<>(handler, subscriber); + BodyHandler handlerAdapter = new BodyHandlerAdapter(subscriber, handler); + return new HandlerAndAsyncBody<>(handlerAdapter, subscriber); }).thenApply(r -> new JdkHttpResponseImpl(r.response, r.asyncBody)); } @@ -243,7 +262,8 @@ public CompletableFuture> consumeBytes(HttpRequest reque return sendAsync(request, () -> { AsyncBodySubscriber> subscriber = new AsyncBodySubscriber<>(consumer); BodyHandler handler = BodyHandlers.fromSubscriber(subscriber); - return new HandlerAndAsyncBody<>(handler, subscriber); + BodyHandler handlerAdapter = new BodyHandlerAdapter(subscriber, handler); + return new HandlerAndAsyncBody<>(handlerAdapter, subscriber); }).thenApply(r -> new JdkHttpResponseImpl(r.response, r.asyncBody)); } diff --git a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyAsyncResponseListener.java b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyAsyncResponseListener.java index 4694179a51c..2ee8ad50c0c 100644 --- a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyAsyncResponseListener.java +++ b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyAsyncResponseListener.java @@ -15,16 +15,17 @@ */ package io.fabric8.kubernetes.client.jetty; -import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.http.HttpClient; import io.fabric8.kubernetes.client.http.HttpRequest; import io.fabric8.kubernetes.client.http.HttpResponse; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Result; +import org.eclipse.jetty.util.Callback; import java.nio.ByteBuffer; import java.util.concurrent.CompletableFuture; +import java.util.function.LongConsumer; public abstract class JettyAsyncResponseListener extends Response.Listener.Adapter implements HttpClient.AsyncBody { @@ -32,20 +33,29 @@ public abstract class JettyAsyncResponseListener extends Response.Listener.Ad private final HttpClient.BodyConsumer bodyConsumer; private final CompletableFuture> asyncResponse; private final CompletableFuture asyncBodyDone; - private boolean consume; + private CompletableFuture demand = new CompletableFuture<>(); + private boolean initialConsumeCalled; + private Runnable initialConsume; JettyAsyncResponseListener(HttpRequest httpRequest, HttpClient.BodyConsumer bodyConsumer) { this.httpRequest = httpRequest; this.bodyConsumer = bodyConsumer; asyncResponse = new CompletableFuture<>(); asyncBodyDone = new CompletableFuture<>(); - consume = false; } @Override - public synchronized void consume() { - consume = true; - this.notifyAll(); + public void consume() { + synchronized (this) { + if (!this.initialConsumeCalled) { + this.initialConsumeCalled = true; + if (this.initialConsume != null) { + this.initialConsume.run(); + this.initialConsume = null; + } + } + } + demand.thenAccept(l -> l.accept(1)); } @Override @@ -74,21 +84,20 @@ public CompletableFuture> listen(Request requ } @Override - public void onContent(Response response, ByteBuffer content) { - try { - synchronized (this) { - while (!consume && !asyncBodyDone.isCancelled()) { - this.wait(); - } - } - if (!asyncBodyDone.isCancelled()) { - bodyConsumer.consume(process(response, content), this); + public void onContent(Response response, LongConsumer demand, ByteBuffer content, Callback callback) { + synchronized (this) { + if (!initialConsumeCalled) { + // defer until consume is called + this.initialConsume = () -> onContent(response, demand, content, callback); + return; } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw KubernetesClientException.launderThrowable(e); + this.demand.complete(demand); + } + try { + bodyConsumer.consume(process(response, content), this); + callback.succeeded(); } catch (Exception e) { - throw KubernetesClientException.launderThrowable(e); + callback.failed(e); } } diff --git a/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java b/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java index 00341862b05..34c5f8987af 100644 --- a/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java +++ b/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java @@ -72,16 +72,21 @@ public void consume() { // consume should not block from a callers perspective try { httpClient.dispatcher().executorService().execute(() -> { - try { - if (!source.exhausted() && !done.isDone()) { - T value = process(source); - consumer.consume(value, this); - } else { - done.complete(null); + // we must serialize multiple consumes as source is not thread-safe + // it would be better to use SerialExecutor, but that would need to move modules, as to + // potentially not hold multiple executor threads + synchronized (source) { + try { + if (!source.exhausted() && !done.isDone()) { + T value = process(source); + consumer.consume(value, this); + } else { + done.complete(null); + } + } catch (Exception e) { + Utils.closeQuietly(source); + done.completeExceptionally(e); } - } catch (Exception e) { - Utils.closeQuietly(source); - done.completeExceptionally(e); } }); } catch (Exception e) { diff --git a/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/OkHttpClientTest.java b/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/OkHttpClientTest.java index 829f428dc4a..ef8c55d9a9a 100644 --- a/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/OkHttpClientTest.java +++ b/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/OkHttpClientTest.java @@ -31,12 +31,15 @@ import java.io.InputStream; import java.io.Reader; import java.net.URI; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -53,8 +56,8 @@ protected HttpClient.Factory getHttpClientFactory() { return new OkHttpClientFactory(); } - private KubernetesMockServer server; - private KubernetesClient client; + KubernetesMockServer server; + KubernetesClient client; @BeforeEach void setUp() { @@ -148,15 +151,17 @@ public void onMessage(WebSocket webSocket, String text) { @Test void testAsyncBody() throws Exception { - server.expect().withPath("/async").andReturn(200, "hello world").always(); + int byteCount = 20000; + server.expect().withPath("/async").andReturn(200, new String(new byte[byteCount], StandardCharsets.UTF_8)).always(); - CompletableFuture consumed = new CompletableFuture<>(); + CompletableFuture consumed = new CompletableFuture<>(); + AtomicInteger total = new AtomicInteger(); CompletableFuture> responseFuture = client.getHttpClient().consumeBytes( client.getHttpClient().newHttpRequestBuilder().uri(URI.create(client.getConfiguration().getMasterUrl() + "async")) .build(), (value, asyncBody) -> { - consumed.complete(true); + value.stream().map(ByteBuffer::remaining).forEach(total::addAndGet); asyncBody.consume(); }); @@ -169,15 +174,14 @@ void testAsyncBody() throws Exception { r.body().done().whenComplete((v, ex) -> { if (ex != null) { consumed.completeExceptionally(ex); - } - if (v != null) { - consumed.complete(false); + } else { + consumed.complete(total.get()); } }); } }); - assertTrue(consumed.get(5, TimeUnit.SECONDS)); + assertEquals(byteCount, consumed.get(5, TimeUnit.SECONDS)); } @Test @@ -219,14 +223,15 @@ void testConsumeLines() throws Exception { @ParameterizedTest(name = "{index}: {0}") @ValueSource(classes = { String.class, byte[].class, Reader.class, InputStream.class }) void testSupportedTypes(Class type) throws Exception { - server.expect().withPath("/type").andReturn(200, "hello world").always(); + String value = new String(new byte[16384]); + server.expect().withPath("/type").andReturn(200, value).always(); final HttpResponse result = client.getHttpClient() .sendAsync(client.getHttpClient().newHttpRequestBuilder() .uri(URI.create(client.getConfiguration().getMasterUrl() + "type")).build(), type) .get(10, TimeUnit.SECONDS); assertThat(result) .satisfies(r -> assertThat(r.body()).isInstanceOf(type)) - .satisfies(r -> assertThat(r.bodyString()).isEqualTo("hello world")); + .satisfies(r -> assertThat(r.bodyString()).isEqualTo(value)); } } diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClientReadableByteChannel.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClientReadableByteChannel.java index 430dee3cab1..b0becfd643a 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClientReadableByteChannel.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClientReadableByteChannel.java @@ -40,18 +40,20 @@ public class HttpClientReadableByteChannel implements ReadableByteChannel, BodyC private boolean done; private AsyncBody asyncBody; private ByteBuffer currentBuffer; + private boolean consumeRequested; @Override public synchronized void consume(List value, AsyncBody asyncBody) throws Exception { this.buffers.addAll(value); // could proactively consume based up some byte limit this.notifyAll(); + this.consumeRequested = false; } protected synchronized void onResponse(HttpResponse response) { asyncBody = response.body(); asyncBody.done().whenComplete(this::onBodyDone); - asyncBody.consume(); + asyncBody.consume(); // pre-fetch the first results this.notifyAll(); } @@ -92,7 +94,7 @@ public synchronized int read(ByteBuffer arg0) throws IOException { while (currentBuffer == null || !currentBuffer.hasRemaining()) { if (buffers.isEmpty()) { if (failed != null) { - throw new IOException("closed", failed); + throw new IOException("channel already closed with exception", failed); } if (read > 0) { return read; @@ -100,8 +102,14 @@ public synchronized int read(ByteBuffer arg0) throws IOException { if (done) { return -1; } - if (this.asyncBody != null) { + if (!consumeRequested && this.asyncBody != null) { + consumeRequested = true; this.asyncBody.consume(); + // the consume call may actually trigger result deliver + // if it did, then just start the loop over + if (!consumeRequested) { + continue; + } } try { this.wait(); // block until more buffers are delivered diff --git a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractInterceptorTest.java b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractInterceptorTest.java index 5ea4f843da0..c7a3eca9400 100644 --- a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractInterceptorTest.java +++ b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractInterceptorTest.java @@ -71,7 +71,7 @@ public void before(BasicBuilder builder, HttpHeaders headers) { @DisplayName("afterFailure (HTTP), replaces the HttpResponse produced by HttpClient.sendAsync") public void afterHttpFailureReplacesResponseInSendAsync() throws Exception { // Given - server.expect().withPath("/intercepted-url").andReturn(200, "This works").always(); + server.expect().withPath("/intercepted-url").andReturn(200, "This works").once(); final HttpClient.Builder builder = getHttpClientFactory().newBuilder() .addOrReplaceInterceptor("test", new Interceptor() { @Override @@ -96,7 +96,7 @@ public CompletableFuture afterFailure(BasicBuilder builder, HttpRespons @DisplayName("afterFailure (HTTP), replaces the HttpResponse produced by HttpClient.consumeLines") public void afterHttpFailureReplacesResponseInConsumeLines() throws Exception { // Given - server.expect().withPath("/intercepted-url").andReturn(200, "This works").always(); + server.expect().withPath("/intercepted-url").andReturn(200, "This works\n").once(); final HttpClient.Builder builder = getHttpClientFactory().newBuilder() .addOrReplaceInterceptor("test", new Interceptor() { @Override @@ -115,9 +115,15 @@ public CompletableFuture afterFailure(BasicBuilder builder, HttpRespons }) .get(10L, TimeUnit.SECONDS); asyncR.body().consume(); - asyncR.body().done().get(10L, TimeUnit.SECONDS); + asyncR.body().done().whenComplete((v, t) -> { + if (t != null) { + result.completeExceptionally(t); + } else { + result.complete(null); + } + }); // Then - assertThat(result.get()).isEqualTo("This works"); + assertThat(result.get(10L, TimeUnit.SECONDS)).isEqualTo("This works"); } } @@ -125,7 +131,7 @@ public CompletableFuture afterFailure(BasicBuilder builder, HttpRespons @DisplayName("afterFailure (HTTP), replaces the HttpResponse produced by HttpClient.consumeBytes") public void afterHttpFailureReplacesResponseInConsumeBytes() throws Exception { // Given - server.expect().withPath("/intercepted-url").andReturn(200, "This works").always(); + server.expect().withPath("/intercepted-url").andReturn(200, "This works").once(); final HttpClient.Builder builder = getHttpClientFactory().newBuilder() .addOrReplaceInterceptor("test", new Interceptor() { @Override