From 6f31d2aae43cb11a251db04b86e44c5aa0250dcf Mon Sep 17 00:00:00 2001 From: Steve Hawkins Date: Tue, 29 Nov 2022 11:39:03 -0500 Subject: [PATCH] fix #4201: moving consumeLines out of the clients --- .../client/jdkhttp/JdkHttpClientImpl.java | 10 --- .../client/jetty/JettyHttpClient.java | 24 ------- .../client/okhttp/OkHttpClientImpl.java | 15 ---- .../client/http/OkHttpClientTest.java | 37 ---------- .../kubernetes/client/http/HttpClient.java | 10 --- .../client/http/AbstractAsyncBodyTest.java | 71 ------------------- .../client/http/AbstractInterceptorTest.java | 35 --------- .../client/dsl/internal/WatchHTTPManager.java | 16 ++++- .../dsl/internal/WatchHttpManagerTest.java | 2 +- .../kubernetes/client/mock/WatchTest.java | 54 ++++++++++++-- 10 files changed, 64 insertions(+), 210 deletions(-) diff --git a/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientImpl.java b/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientImpl.java index 342d2c8bce6..5694fc656df 100644 --- a/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientImpl.java +++ b/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientImpl.java @@ -252,16 +252,6 @@ public DerivedClientBuilder newBuilder() { return this.builder.copy(this); } - @Override - public CompletableFuture> consumeLines(HttpRequest request, AsyncBody.Consumer consumer) { - return sendAsync(request, () -> { - AsyncBodySubscriber subscriber = new AsyncBodySubscriber<>(consumer); - BodyHandler handler = BodyHandlers.fromLineSubscriber(subscriber); - BodyHandler handlerAdapter = new BodyHandlerAdapter(subscriber, handler); - return new HandlerAndAsyncBody<>(handlerAdapter, subscriber); - }).thenApply(r -> new JdkHttpResponseImpl<>(r.response, r.asyncBody)); - } - @Override public CompletableFuture> consumeBytes(HttpRequest request, AsyncBody.Consumer> consumer) { diff --git a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpClient.java b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpClient.java index 74437bdb9f0..05a23aeba90 100644 --- a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpClient.java +++ b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpClient.java @@ -30,7 +30,6 @@ import org.eclipse.jetty.websocket.client.WebSocketClient; import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -76,29 +75,6 @@ public DerivedClientBuilder newBuilder() { return builder.copy(this); } - @Override - public CompletableFuture> consumeLines( - HttpRequest originalRequest, AsyncBody.Consumer consumer) { - final var request = toStandardHttpRequest(originalRequest); - final var future = new JettyAsyncResponseListener(request) { - - final StringBuilder builder = new StringBuilder(); - - @Override - protected void onContent(ByteBuffer content) throws Exception { - for (char c : StandardCharsets.UTF_8.decode(content).array()) { - if (c == '\n') { - consumer.consume(builder.toString(), this); - builder.setLength(0); - } else { - builder.append(c); - } - } - } - }.listen(newRequest(request)); - return interceptResponse(request.toBuilder(), future, r -> consumeLines(r, consumer)); - } - @Override public CompletableFuture> consumeBytes( HttpRequest originalRequest, AsyncBody.Consumer> consumer) { diff --git a/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java b/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java index a6c25a83c45..bfe4cc50c65 100644 --- a/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java +++ b/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java @@ -214,21 +214,6 @@ public DerivedClientBuilder newBuilder() { return new OkHttpClientBuilderImpl(httpClient.newBuilder(), this.factory, this.config); } - @Override - public CompletableFuture> consumeLines( - HttpRequest request, AsyncBody.Consumer consumer) { - Function handler = s -> new OkHttpAsyncBody(consumer, s) { - @Override - protected String process(BufferedSource source) throws IOException { - // this should probably be strict instead - // when non-strict if no newline is present, this will create a truncated string from - // what is available. However as strict it will be blocking. - return source.readUtf8Line(); - } - }; - return sendAsync(request, handler); - } - @Override public CompletableFuture> consumeBytes( HttpRequest request, AsyncBody.Consumer> consumer) { diff --git a/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/OkHttpClientTest.java b/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/OkHttpClientTest.java index 472891dfb9e..384a7a2138f 100644 --- a/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/OkHttpClientTest.java +++ b/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/OkHttpClientTest.java @@ -32,8 +32,6 @@ import java.net.URI; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Arrays; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -184,41 +182,6 @@ void testAsyncBody() throws Exception { assertEquals(byteCount, consumed.get(10, TimeUnit.SECONDS)); } - @Test - void testConsumeLines() throws Exception { - server.expect().withPath("/async").andReturn(200, "hello\nworld\nlines\n").always(); - - ArrayList strings = new ArrayList<>(); - CompletableFuture consumed = new CompletableFuture<>(); - - CompletableFuture> responseFuture = client.getHttpClient().consumeLines( - client.getHttpClient().newHttpRequestBuilder().uri(URI.create(client.getConfiguration().getMasterUrl() + "async")) - .build(), - (value, asyncBody) -> { - strings.add(value); - asyncBody.consume(); - }); - - responseFuture.whenComplete((r, t) -> { - if (t != null) { - consumed.completeExceptionally(t); - } - if (r != null) { - r.body().consume(); - r.body().done().whenComplete((v, ex) -> { - if (ex != null) { - consumed.completeExceptionally(ex); - } else { - consumed.complete(null); - } - }); - } - }); - - consumed.get(10, TimeUnit.SECONDS); - assertEquals(Arrays.asList("hello", "world", "lines"), strings); - } - @DisplayName("Supported response body types") @ParameterizedTest(name = "{index}: {0}") @ValueSource(classes = { String.class, byte[].class, Reader.class, InputStream.class }) diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClient.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClient.java index 809c8677c4b..42844d06439 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClient.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClient.java @@ -179,16 +179,6 @@ default CompletableFuture> sendAsync(HttpRequest request, Cl return HttpResponse.SupportedResponses.from(type).sendAsync(request, this); } - /** - * Send a request and consume the lines of the response body using the same logic as {@link BufferedReader} to - * break up the lines. - * - * @param request the HttpRequest to send - * @param consumer the response body consumer - * @return the future which will be ready after the headers have been read - */ - CompletableFuture> consumeLines(HttpRequest request, AsyncBody.Consumer consumer); - /** * Send a request and consume the bytes of the resulting response body *

diff --git a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractAsyncBodyTest.java b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractAsyncBodyTest.java index 97420be7c15..1ee1cfd50e4 100644 --- a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractAsyncBodyTest.java +++ b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractAsyncBodyTest.java @@ -23,15 +23,10 @@ import java.nio.CharBuffer; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CancellationException; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.assertThrows; public abstract class AbstractAsyncBodyTest { @@ -50,72 +45,6 @@ static void afterAll() { protected abstract HttpClient.Factory getHttpClientFactory(); - @Test - @DisplayName("Lines are processed and consumed only after the consume() invocation") - public void consumeLinesProcessedAfterConsume() throws Exception { - try (final HttpClient client = getHttpClientFactory().newBuilder().build()) { - server.expect().withPath("/consume-lines") - .andReturn(200, "This is the response body\n") - .always(); - final StringBuffer responseText = new StringBuffer(); - final HttpResponse asyncBodyResponse = client.consumeLines( - client.newHttpRequestBuilder().uri(server.url("/consume-lines")).build(), - (value, asyncBody) -> { - responseText.append(value); - asyncBody.consume(); - }) - .get(10L, TimeUnit.SECONDS); - assertThat(responseText).isEmpty(); - asyncBodyResponse.body().consume(); - asyncBodyResponse.body().done().get(10L, TimeUnit.SECONDS); - assertThat(responseText).contains("This is the response body"); - } - } - - @Test - @DisplayName("Lines are not processed when cancel() invocation") - public void consumeLinesNotProcessedIfCancelled() throws Exception { - try (final HttpClient client = getHttpClientFactory().newBuilder().build()) { - server.expect().withPath("/cancel") - .andReturn(200, "This would be the response body") - .always(); - final StringBuffer responseText = new StringBuffer(); - final HttpResponse asyncBodyResponse = client - .consumeLines(client.newHttpRequestBuilder() - .uri(server.url("/cancel")).build(), (value, asyncBody) -> { - responseText.append(value); - asyncBody.consume(); - }) - .get(10L, TimeUnit.SECONDS); - asyncBodyResponse.body().cancel(); - asyncBodyResponse.body().consume(); - final CompletableFuture doneFuture = asyncBodyResponse.body().done(); - assertThrows(CancellationException.class, () -> doneFuture.get(10L, TimeUnit.SECONDS)); - assertThat(responseText).isEmpty(); - } - } - - @Test - @DisplayName("Lines are processed completely") - public void consumeLinesProcessesAllLines() throws Exception { - try (final HttpClient client = getHttpClientFactory().newBuilder().build()) { - server.expect().withPath("/consume-lines") - .andReturn(200, "This is the response body\nWith\nMultiple\n lines\n") - .always(); - final List receivedLines = new ArrayList<>(); - final HttpResponse asyncBodyResponse = client.consumeLines( - client.newHttpRequestBuilder().uri(server.url("/consume-lines")).build(), - (value, asyncBody) -> { - receivedLines.add(value); - asyncBody.consume(); - }) - .get(10L, TimeUnit.SECONDS); - asyncBodyResponse.body().consume(); - asyncBodyResponse.body().done().get(10L, TimeUnit.SECONDS); - assertThat(receivedLines).containsExactly("This is the response body", "With", "Multiple", " lines"); - } - } - @Test @DisplayName("Bytes are processed and consumed only after the consume() invocation") public void consumeBytesProcessedAfterConsume() throws Exception { diff --git a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractInterceptorTest.java b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractInterceptorTest.java index e0eb6e73a3a..a1f837fe692 100644 --- a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractInterceptorTest.java +++ b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractInterceptorTest.java @@ -115,41 +115,6 @@ public CompletableFuture afterFailure(BasicBuilder builder, HttpRespons } } - @Test - @DisplayName("afterFailure (HTTP), replaces the HttpResponse produced by HttpClient.consumeLines") - public void afterHttpFailureReplacesResponseInConsumeLines() throws Exception { - // Given - server.expect().withPath("/intercepted-url").andReturn(200, "This works\n").once(); - final HttpClient.Builder builder = getHttpClientFactory().newBuilder() - .addOrReplaceInterceptor("test", new Interceptor() { - @Override - public CompletableFuture afterFailure(BasicBuilder builder, HttpResponse response) { - builder.uri(URI.create(server.url("/intercepted-url"))); - return CompletableFuture.completedFuture(true); - } - }); - final CompletableFuture result = new CompletableFuture<>(); - // When - try (HttpClient client = builder.build()) { - final HttpResponse asyncR = client.consumeLines( - client.newHttpRequestBuilder().uri(server.url("/not-found")).build(), (s, ab) -> { - result.complete(s); - ab.consume(); - }) - .get(10L, TimeUnit.SECONDS); - asyncR.body().consume(); - asyncR.body().done().whenComplete((v, t) -> { - if (t != null) { - result.completeExceptionally(t); - } else { - result.complete(null); - } - }); - // Then - assertThat(result.get(10L, TimeUnit.SECONDS)).isEqualTo("This works"); - } - } - @Test @DisplayName("afterFailure (HTTP), replaces the HttpResponse produced by HttpClient.consumeBytes") public void afterHttpFailureReplacesResponseInConsumeBytes() throws Exception { diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java index e114495c5f3..7d41dd739ce 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java @@ -28,6 +28,8 @@ import java.net.MalformedURLException; import java.net.URL; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -66,8 +68,18 @@ public WatchHTTPManager(final HttpClient client, protected synchronized void start(URL url, Map headers) { HttpRequest.Builder builder = client.newHttpRequestBuilder().url(url); headers.forEach(builder::header); - call = client.consumeLines(builder.build(), (s, a) -> { - onMessage(s); + StringBuffer buffer = new StringBuffer(); + call = client.consumeBytes(builder.build(), (b, a) -> { + for (ByteBuffer content : b) { + for (char c : StandardCharsets.UTF_8.decode(content).array()) { + if (c == '\n') { + onMessage(buffer.toString()); + buffer.setLength(0); + } else { + buffer.append(c); + } + } + } a.consume(); }); call.whenComplete((response, t) -> { diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/WatchHttpManagerTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/WatchHttpManagerTest.java index cc2a8323e1d..9d2d48387f7 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/WatchHttpManagerTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/WatchHttpManagerTest.java @@ -47,7 +47,7 @@ void testReconnectOnException() throws MalformedURLException, InterruptedExcepti BaseOperation baseOperation = Mockito.mock(BaseOperation.class); Mockito.when(baseOperation.getNamespacedUrl()).thenReturn(new URL("http://localhost")); CompletableFuture> future = new CompletableFuture<>(); - Mockito.when(client.consumeLines(Mockito.any(), Mockito.any())).thenReturn(future); + Mockito.when(client.consumeBytes(Mockito.any(), Mockito.any())).thenReturn(future); CountDownLatch reconnect = new CountDownLatch(1); WatchHTTPManager> watch = new WatchHTTPManager(client, diff --git a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/WatchTest.java b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/WatchTest.java index 7ffa267a5a8..776dfef7ab3 100644 --- a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/WatchTest.java +++ b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/WatchTest.java @@ -30,6 +30,7 @@ import io.fabric8.kubernetes.client.dsl.Watchable; import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer; +import io.fabric8.kubernetes.client.utils.Serialization; import org.awaitility.Awaitility; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -37,8 +38,10 @@ import org.junit.jupiter.api.Test; import java.net.HttpURLConnection; +import java.util.Collections; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static io.fabric8.kubernetes.client.Watcher.Action.BOOKMARK; import static io.fabric8.kubernetes.client.Watcher.Action.DELETED; @@ -315,11 +318,12 @@ public void onClose(WatcherException cause) { } private static WatchEvent outdatedEvent() { - return new WatchEventBuilder().withType(Watcher.Action.ERROR.name()).withStatusObject( - new StatusBuilder().withCode(HttpURLConnection.HTTP_GONE) - .withMessage( - "410: The event in requested index is outdated and cleared (the requested history has been cleared [3/1]) [2]") - .build()) + return new WatchEventBuilder().withType(Watcher.Action.ERROR.name()) + .withStatusObject( + new StatusBuilder().withCode(HttpURLConnection.HTTP_GONE) + .withMessage( + "410: The event in requested index is outdated and cleared (the requested history has been cleared [3/1]) [2]") + .build()) .build(); } @@ -425,4 +429,44 @@ public void onClose(WatcherException cause) { // ensure that the exception does not inhibit further message processing assertTrue(latch.await(10, TimeUnit.SECONDS)); } + + @Test + void testHttpWatch() throws InterruptedException { + // Given + + // trigger the usage of the http watch + server.expect() + .withPath("/api/v1/namespaces/test/pods?allowWatchBookmarks=true&watch=true") + .andReturn(200, null) + .once(); + + String dummyEvent = Serialization.asJson(new WatchEventBuilder().withType("MODIFIED") + .withObject(new PodBuilder().withNewMetadata().endMetadata().build()) + .build()) + "\n"; + + // build a response that is large enough to span multiple messages + // there's potentially a corner case here with utf multi-byte that is unhandled + // if that happens we'll see an exception from the decode + server.expect() + .withPath("/api/v1/namespaces/test/pods?allowWatchBookmarks=true&watch=true") + .andReturn(200, Collections.nCopies(200, dummyEvent).stream().collect(Collectors.joining())) + .once(); + + CountDownLatch latch = new CountDownLatch(200); + + client.pods().watch(new Watcher() { + + @Override + public void eventReceived(Action action, Pod resource) { + latch.countDown(); + } + + @Override + public void onClose(WatcherException cause) { + } + }); + + // ensure that the exception does not inhibit further message processing + assertTrue(latch.await(10, TimeUnit.SECONDS)); + } }