Skip to content

Commit

Permalink
fix fabric8io#4201: moving consumeLines out of the clients
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins committed Nov 29, 2022
1 parent 009974f commit 57b905d
Show file tree
Hide file tree
Showing 10 changed files with 64 additions and 211 deletions.
Expand Up @@ -252,16 +252,6 @@ public DerivedClientBuilder newBuilder() {
return this.builder.copy(this);
}

@Override
public CompletableFuture<HttpResponse<AsyncBody>> consumeLines(HttpRequest request, AsyncBody.Consumer<String> consumer) {
return sendAsync(request, () -> {
AsyncBodySubscriber<String> subscriber = new AsyncBodySubscriber<>(consumer);
BodyHandler<Void> handler = BodyHandlers.fromLineSubscriber(subscriber);
BodyHandler<AsyncBody> handlerAdapter = new BodyHandlerAdapter(subscriber, handler);
return new HandlerAndAsyncBody<>(handlerAdapter, subscriber);
}).thenApply(r -> new JdkHttpResponseImpl<>(r.response, r.asyncBody));
}

@Override
public CompletableFuture<HttpResponse<AsyncBody>> consumeBytes(HttpRequest request,
AsyncBody.Consumer<List<ByteBuffer>> consumer) {
Expand Down
Expand Up @@ -30,7 +30,6 @@
import org.eclipse.jetty.websocket.client.WebSocketClient;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -76,29 +75,6 @@ public DerivedClientBuilder newBuilder() {
return builder.copy(this);
}

@Override
public CompletableFuture<HttpResponse<AsyncBody>> consumeLines(
HttpRequest originalRequest, AsyncBody.Consumer<String> consumer) {
final var request = toStandardHttpRequest(originalRequest);
final var future = new JettyAsyncResponseListener(request) {

final StringBuilder builder = new StringBuilder();

@Override
protected void onContent(ByteBuffer content) throws Exception {
for (char c : StandardCharsets.UTF_8.decode(content).array()) {
if (c == '\n') {
consumer.consume(builder.toString(), this);
builder.setLength(0);
} else {
builder.append(c);
}
}
}
}.listen(newRequest(request));
return interceptResponse(request.toBuilder(), future, r -> consumeLines(r, consumer));
}

@Override
public CompletableFuture<HttpResponse<AsyncBody>> consumeBytes(
HttpRequest originalRequest, AsyncBody.Consumer<List<ByteBuffer>> consumer) {
Expand Down
Expand Up @@ -214,21 +214,6 @@ public DerivedClientBuilder newBuilder() {
return new OkHttpClientBuilderImpl(httpClient.newBuilder(), this.factory, this.config);
}

@Override
public CompletableFuture<HttpResponse<AsyncBody>> consumeLines(
HttpRequest request, AsyncBody.Consumer<String> consumer) {
Function<BufferedSource, AsyncBody> handler = s -> new OkHttpAsyncBody<String>(consumer, s) {
@Override
protected String process(BufferedSource source) throws IOException {
// this should probably be strict instead
// when non-strict if no newline is present, this will create a truncated string from
// what is available. However as strict it will be blocking.
return source.readUtf8Line();
}
};
return sendAsync(request, handler);
}

@Override
public CompletableFuture<HttpResponse<AsyncBody>> consumeBytes(
HttpRequest request, AsyncBody.Consumer<List<ByteBuffer>> consumer) {
Expand Down
Expand Up @@ -32,8 +32,6 @@
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -184,41 +182,6 @@ void testAsyncBody() throws Exception {
assertEquals(byteCount, consumed.get(10, TimeUnit.SECONDS));
}

@Test
void testConsumeLines() throws Exception {
server.expect().withPath("/async").andReturn(200, "hello\nworld\nlines\n").always();

ArrayList<String> strings = new ArrayList<>();
CompletableFuture<Void> consumed = new CompletableFuture<>();

CompletableFuture<HttpResponse<AsyncBody>> responseFuture = client.getHttpClient().consumeLines(
client.getHttpClient().newHttpRequestBuilder().uri(URI.create(client.getConfiguration().getMasterUrl() + "async"))
.build(),
(value, asyncBody) -> {
strings.add(value);
asyncBody.consume();
});

responseFuture.whenComplete((r, t) -> {
if (t != null) {
consumed.completeExceptionally(t);
}
if (r != null) {
r.body().consume();
r.body().done().whenComplete((v, ex) -> {
if (ex != null) {
consumed.completeExceptionally(ex);
} else {
consumed.complete(null);
}
});
}
});

consumed.get(10, TimeUnit.SECONDS);
assertEquals(Arrays.asList("hello", "world", "lines"), strings);
}

@DisplayName("Supported response body types")
@ParameterizedTest(name = "{index}: {0}")
@ValueSource(classes = { String.class, byte[].class, Reader.class, InputStream.class })
Expand Down
Expand Up @@ -20,7 +20,6 @@
import io.fabric8.kubernetes.client.RequestConfig;
import io.fabric8.kubernetes.client.utils.HttpClientUtils;

import java.io.BufferedReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.List;
Expand Down Expand Up @@ -179,16 +178,6 @@ default <T> CompletableFuture<HttpResponse<T>> sendAsync(HttpRequest request, Cl
return HttpResponse.SupportedResponses.from(type).sendAsync(request, this);
}

/**
* Send a request and consume the lines of the response body using the same logic as {@link BufferedReader} to
* break up the lines.
*
* @param request the HttpRequest to send
* @param consumer the response body consumer
* @return the future which will be ready after the headers have been read
*/
CompletableFuture<HttpResponse<AsyncBody>> consumeLines(HttpRequest request, AsyncBody.Consumer<String> consumer);

/**
* Send a request and consume the bytes of the resulting response body
* <p>
Expand Down
Expand Up @@ -23,15 +23,10 @@

import java.nio.CharBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;

public abstract class AbstractAsyncBodyTest {

Expand All @@ -50,72 +45,6 @@ static void afterAll() {

protected abstract HttpClient.Factory getHttpClientFactory();

@Test
@DisplayName("Lines are processed and consumed only after the consume() invocation")
public void consumeLinesProcessedAfterConsume() throws Exception {
try (final HttpClient client = getHttpClientFactory().newBuilder().build()) {
server.expect().withPath("/consume-lines")
.andReturn(200, "This is the response body\n")
.always();
final StringBuffer responseText = new StringBuffer();
final HttpResponse<AsyncBody> asyncBodyResponse = client.consumeLines(
client.newHttpRequestBuilder().uri(server.url("/consume-lines")).build(),
(value, asyncBody) -> {
responseText.append(value);
asyncBody.consume();
})
.get(10L, TimeUnit.SECONDS);
assertThat(responseText).isEmpty();
asyncBodyResponse.body().consume();
asyncBodyResponse.body().done().get(10L, TimeUnit.SECONDS);
assertThat(responseText).contains("This is the response body");
}
}

@Test
@DisplayName("Lines are not processed when cancel() invocation")
public void consumeLinesNotProcessedIfCancelled() throws Exception {
try (final HttpClient client = getHttpClientFactory().newBuilder().build()) {
server.expect().withPath("/cancel")
.andReturn(200, "This would be the response body")
.always();
final StringBuffer responseText = new StringBuffer();
final HttpResponse<AsyncBody> asyncBodyResponse = client
.consumeLines(client.newHttpRequestBuilder()
.uri(server.url("/cancel")).build(), (value, asyncBody) -> {
responseText.append(value);
asyncBody.consume();
})
.get(10L, TimeUnit.SECONDS);
asyncBodyResponse.body().cancel();
asyncBodyResponse.body().consume();
final CompletableFuture<Void> doneFuture = asyncBodyResponse.body().done();
assertThrows(CancellationException.class, () -> doneFuture.get(10L, TimeUnit.SECONDS));
assertThat(responseText).isEmpty();
}
}

@Test
@DisplayName("Lines are processed completely")
public void consumeLinesProcessesAllLines() throws Exception {
try (final HttpClient client = getHttpClientFactory().newBuilder().build()) {
server.expect().withPath("/consume-lines")
.andReturn(200, "This is the response body\nWith\nMultiple\n lines\n")
.always();
final List<String> receivedLines = new ArrayList<>();
final HttpResponse<AsyncBody> asyncBodyResponse = client.consumeLines(
client.newHttpRequestBuilder().uri(server.url("/consume-lines")).build(),
(value, asyncBody) -> {
receivedLines.add(value);
asyncBody.consume();
})
.get(10L, TimeUnit.SECONDS);
asyncBodyResponse.body().consume();
asyncBodyResponse.body().done().get(10L, TimeUnit.SECONDS);
assertThat(receivedLines).containsExactly("This is the response body", "With", "Multiple", " lines");
}
}

@Test
@DisplayName("Bytes are processed and consumed only after the consume() invocation")
public void consumeBytesProcessedAfterConsume() throws Exception {
Expand Down
Expand Up @@ -115,41 +115,6 @@ public CompletableFuture<Boolean> afterFailure(BasicBuilder builder, HttpRespons
}
}

@Test
@DisplayName("afterFailure (HTTP), replaces the HttpResponse produced by HttpClient.consumeLines")
public void afterHttpFailureReplacesResponseInConsumeLines() throws Exception {
// Given
server.expect().withPath("/intercepted-url").andReturn(200, "This works\n").once();
final HttpClient.Builder builder = getHttpClientFactory().newBuilder()
.addOrReplaceInterceptor("test", new Interceptor() {
@Override
public CompletableFuture<Boolean> afterFailure(BasicBuilder builder, HttpResponse<?> response) {
builder.uri(URI.create(server.url("/intercepted-url")));
return CompletableFuture.completedFuture(true);
}
});
final CompletableFuture<String> result = new CompletableFuture<>();
// When
try (HttpClient client = builder.build()) {
final HttpResponse<AsyncBody> asyncR = client.consumeLines(
client.newHttpRequestBuilder().uri(server.url("/not-found")).build(), (s, ab) -> {
result.complete(s);
ab.consume();
})
.get(10L, TimeUnit.SECONDS);
asyncR.body().consume();
asyncR.body().done().whenComplete((v, t) -> {
if (t != null) {
result.completeExceptionally(t);
} else {
result.complete(null);
}
});
// Then
assertThat(result.get(10L, TimeUnit.SECONDS)).isEqualTo("This works");
}
}

@Test
@DisplayName("afterFailure (HTTP), replaces the HttpResponse produced by HttpClient.consumeBytes")
public void afterHttpFailureReplacesResponseInConsumeBytes() throws Exception {
Expand Down
Expand Up @@ -28,6 +28,8 @@

import java.net.MalformedURLException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -66,8 +68,18 @@ public WatchHTTPManager(final HttpClient client,
protected synchronized void start(URL url, Map<String, String> headers) {
HttpRequest.Builder builder = client.newHttpRequestBuilder().url(url);
headers.forEach(builder::header);
call = client.consumeLines(builder.build(), (s, a) -> {
onMessage(s);
StringBuffer buffer = new StringBuffer();
call = client.consumeBytes(builder.build(), (b, a) -> {
for (ByteBuffer content : b) {
for (char c : StandardCharsets.UTF_8.decode(content).array()) {
if (c == '\n') {
onMessage(buffer.toString());
buffer.setLength(0);
} else {
buffer.append(c);
}
}
}
a.consume();
});
call.whenComplete((response, t) -> {
Expand Down
Expand Up @@ -47,7 +47,7 @@ void testReconnectOnException() throws MalformedURLException, InterruptedExcepti
BaseOperation baseOperation = Mockito.mock(BaseOperation.class);
Mockito.when(baseOperation.getNamespacedUrl()).thenReturn(new URL("http://localhost"));
CompletableFuture<HttpResponse<AsyncBody>> future = new CompletableFuture<>();
Mockito.when(client.consumeLines(Mockito.any(), Mockito.any())).thenReturn(future);
Mockito.when(client.consumeBytes(Mockito.any(), Mockito.any())).thenReturn(future);

CountDownLatch reconnect = new CountDownLatch(1);
WatchHTTPManager<HasMetadata, KubernetesResourceList<HasMetadata>> watch = new WatchHTTPManager(client,
Expand Down

0 comments on commit 57b905d

Please sign in to comment.