Skip to content

Commit

Permalink
fix #4482: consumeBytes for okhttp should not fetch more on consume
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins authored and manusa committed Oct 11, 2022
1 parent 8e89540 commit 6a65483
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -9,6 +9,7 @@
* Fix #4441: corrected patch base handling for the patch methods available from a Resource - resource(item).patch() will be evaluated as resource(latest).patch(item). Also undeprecated patch(item), which is consistent with leaving patch(context, item) undeprecated as well. For consistency with the other operations (such as edit), patch(item) will use the context item as the base when available, or the server side item when not. This means that patch(item) is only the same as resource(item).patch() when the patch(item) is called when the context item is missing or is the same as the latest.
* Fix #4442: TokenRefreshInterceptor doesn't overwrite existing OAuth token with empty string
* Fix #4459: Fixed OSGi startup exceptions while using KubernetesClient/OpenShiftClient
* Fix #4482: Fixing blocking behavior of okhttp log watch
* Fix #4473: Fix regression in backoff interval introduced in #4365

#### Improvements
Expand Down
Expand Up @@ -211,6 +211,9 @@ public CompletableFuture<HttpResponse<AsyncBody>> consumeLines(HttpRequest reque
Function<BufferedSource, AsyncBody> handler = s -> new OkHttpAsyncBody<String>(consumer, s) {
@Override
protected String process(BufferedSource source) throws IOException {
// this should probably be strict instead
// when non-strict if no newline is present, this will create a truncated string from
// what is available. However as strict it will be blocking.
return source.readUtf8Line();
}
};
Expand All @@ -223,7 +226,9 @@ public CompletableFuture<HttpResponse<AsyncBody>> consumeBytes(HttpRequest reque
Function<BufferedSource, AsyncBody> handler = s -> new OkHttpAsyncBody<List<ByteBuffer>>(consumer, s) {
@Override
protected List<ByteBuffer> process(BufferedSource source) throws IOException {
return Collections.singletonList(ByteBuffer.wrap(source.readByteArray()));
// read only what is available otherwise okhttp will block trying to read
// a whole fetch size 8k worth
return Collections.singletonList(ByteBuffer.wrap(source.readByteArray(source.buffer().size())));
}
};
return sendAsync(request, handler);
Expand Down
Expand Up @@ -26,6 +26,7 @@
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.ExecListener;
import io.fabric8.kubernetes.client.dsl.ExecWatch;
import io.fabric8.kubernetes.client.dsl.LogWatch;
import io.fabric8.kubernetes.client.dsl.PodResource;
import io.fabric8.kubernetes.client.utils.InputStreamPumper;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -232,6 +233,12 @@ public void onClose(int i, String s) {
assertTrue(latch.await(5, TimeUnit.SECONDS));
assertTrue(closed.get());
assertFalse(failed.get());

// make sure we can get log output before the pod terminates
ByteArrayOutputStream result = new ByteArrayOutputStream();
LogWatch log = client.pods().withName("pod-interactive").tailingLines(10).watchLog(result);
Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> result.size() > 0);
log.close();
}

@Test
Expand Down

0 comments on commit 6a65483

Please sign in to comment.