diff --git a/CHANGELOG.md b/CHANGELOG.md
index d8a2d3b9c20..03f82bfca70 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -3,6 +3,7 @@
### 6.4-SNAPSHOT
#### Bugs
+* Fix #4666: fixed okhttp calls not explicitly closing
#### Improvements
diff --git a/httpclient-okhttp/pom.xml b/httpclient-okhttp/pom.xml
index cb28fb813ba..d952e0009bd 100644
--- a/httpclient-okhttp/pom.xml
+++ b/httpclient-okhttp/pom.xml
@@ -79,6 +79,10 @@
mockwebserver
test
+
+ org.mockito
+ mockito-inline
+
org.assertj
assertj-core
diff --git a/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java b/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java
index 82bad1fa40d..006573a3028 100644
--- a/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java
+++ b/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java
@@ -62,6 +62,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.function.Function;
@@ -80,16 +81,18 @@ static MediaType parseMediaType(String contentType) {
return result;
}
- private abstract class OkHttpAsyncBody implements AsyncBody {
+ abstract static class OkHttpAsyncBody implements AsyncBody {
private final AsyncBody.Consumer consumer;
private final BufferedSource source;
private final CompletableFuture done = new CompletableFuture<>();
private boolean consuming;
private boolean requested;
+ private Executor executor;
- private OkHttpAsyncBody(AsyncBody.Consumer consumer, BufferedSource source) {
+ OkHttpAsyncBody(AsyncBody.Consumer consumer, BufferedSource source, Executor executor) {
this.consumer = consumer;
this.source = source;
+ this.executor = executor;
}
@Override
@@ -103,7 +106,7 @@ public void consume() {
}
try {
// consume should not block a caller, delegate to the dispatcher thread pool
- httpClient.dispatcher().executorService().execute(this::doConsume);
+ executor.execute(this::doConsume);
} catch (Exception e) {
// executor is likely shutdown
Utils.closeQuietly(source);
@@ -125,6 +128,8 @@ private void doConsume() {
T value = process(source);
consumer.consume(value, this);
} else {
+ // even if we've read everything an explicit close is still needed
+ source.close();
done.complete(null);
}
}
@@ -311,7 +316,8 @@ private okhttp3.Request.Builder newRequestBuilder() {
@Override
public CompletableFuture> consumeBytesDirect(StandardHttpRequest request,
Consumer> consumer) {
- Function handler = s -> new OkHttpAsyncBody>(consumer, s) {
+ Function handler = s -> new OkHttpAsyncBody>(consumer, s,
+ this.httpClient.dispatcher().executorService()) {
@Override
protected List process(BufferedSource source) throws IOException {
// read only what is available otherwise okhttp will block trying to read
diff --git a/httpclient-okhttp/src/test/java/io/fabric8/kubernetes/client/okhttp/OkHttpAsyncBodyTest.java b/httpclient-okhttp/src/test/java/io/fabric8/kubernetes/client/okhttp/OkHttpAsyncBodyTest.java
index a867a9f1a99..2a0596b4971 100644
--- a/httpclient-okhttp/src/test/java/io/fabric8/kubernetes/client/okhttp/OkHttpAsyncBodyTest.java
+++ b/httpclient-okhttp/src/test/java/io/fabric8/kubernetes/client/okhttp/OkHttpAsyncBodyTest.java
@@ -17,6 +17,14 @@
import io.fabric8.kubernetes.client.http.AbstractAsyncBodyTest;
import io.fabric8.kubernetes.client.http.HttpClient;
+import io.fabric8.kubernetes.client.okhttp.OkHttpClientImpl.OkHttpAsyncBody;
+import okio.BufferedSource;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
@SuppressWarnings("java:S2187")
public class OkHttpAsyncBodyTest extends AbstractAsyncBodyTest {
@@ -24,4 +32,21 @@ public class OkHttpAsyncBodyTest extends AbstractAsyncBodyTest {
protected HttpClient.Factory getHttpClientFactory() {
return new OkHttpClientFactory();
}
+
+ @Test
+ void testClosedWhenExhausted() throws Exception {
+ BufferedSource source = Mockito.mock(BufferedSource.class);
+ Mockito.when(source.exhausted()).thenReturn(true);
+ OkHttpClientImpl.OkHttpAsyncBody> asyncBody = new OkHttpAsyncBody>(null, source,
+ Runnable::run) {
+
+ @Override
+ protected List process(BufferedSource source) throws IOException {
+ return null;
+ }
+ };
+
+ asyncBody.consume();
+ Mockito.verify(source).close();
+ }
}