Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #4482: consumeBytes for okhttp should not fetch more on consume #4483

Merged
merged 1 commit into from
Oct 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -9,6 +9,7 @@
* Fix #4441: corrected patch base handling for the patch methods available from a Resource - resource(item).patch() will be evaluated as resource(latest).patch(item). Also undeprecated patch(item), which is consistent with leaving patch(context, item) undeprecated as well. For consistency with the other operations (such as edit), patch(item) will use the context item as the base when available, or the server side item when not. This means that patch(item) is only the same as resource(item).patch() when the patch(item) is called when the context item is missing or is the same as the latest.
* Fix #4442: TokenRefreshInterceptor doesn't overwrite existing OAuth token with empty string
* Fix #4459: Fixed OSGi startup exceptions while using KubernetesClient/OpenShiftClient
* Fix #4482: Fixing blocking behavior of okhttp log watch
* Fix #4473: Fix regression in backoff interval introduced in #4365

#### Improvements
Expand Down
Expand Up @@ -211,6 +211,9 @@ public CompletableFuture<HttpResponse<AsyncBody>> consumeLines(HttpRequest reque
Function<BufferedSource, AsyncBody> handler = s -> new OkHttpAsyncBody<String>(consumer, s) {
@Override
protected String process(BufferedSource source) throws IOException {
// this should probably be strict instead
// when non-strict if no newline is present, this will create a truncated string from
// what is available. However as strict it will be blocking.
return source.readUtf8Line();
}
};
Expand All @@ -223,7 +226,9 @@ public CompletableFuture<HttpResponse<AsyncBody>> consumeBytes(HttpRequest reque
Function<BufferedSource, AsyncBody> handler = s -> new OkHttpAsyncBody<List<ByteBuffer>>(consumer, s) {
@Override
protected List<ByteBuffer> process(BufferedSource source) throws IOException {
return Collections.singletonList(ByteBuffer.wrap(source.readByteArray()));
// read only what is available otherwise okhttp will block trying to read
// a whole fetch size 8k worth
return Collections.singletonList(ByteBuffer.wrap(source.readByteArray(source.buffer().size())));
}
};
return sendAsync(request, handler);
Expand Down
Expand Up @@ -26,6 +26,7 @@
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.ExecListener;
import io.fabric8.kubernetes.client.dsl.ExecWatch;
import io.fabric8.kubernetes.client.dsl.LogWatch;
import io.fabric8.kubernetes.client.dsl.PodResource;
import io.fabric8.kubernetes.client.utils.InputStreamPumper;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -232,6 +233,12 @@ public void onClose(int i, String s) {
assertTrue(latch.await(5, TimeUnit.SECONDS));
assertTrue(closed.get());
assertFalse(failed.get());

// make sure we can get log output before the pod terminates
ByteArrayOutputStream result = new ByteArrayOutputStream();
LogWatch log = client.pods().withName("pod-interactive").tailingLines(10).watchLog(result);
Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> result.size() > 0);
log.close();
}

@Test
Expand Down