diff --git a/CHANGELOG.md b/CHANGELOG.md index d8a2d3b9c2..03f82bfca7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ### 6.4-SNAPSHOT #### Bugs +* Fix #4666: fixed okhttp calls not explicitly closing #### Improvements diff --git a/httpclient-okhttp/pom.xml b/httpclient-okhttp/pom.xml index cb28fb813b..d952e0009b 100644 --- a/httpclient-okhttp/pom.xml +++ b/httpclient-okhttp/pom.xml @@ -79,6 +79,10 @@ mockwebserver test + + org.mockito + mockito-inline + org.assertj assertj-core diff --git a/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java b/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java index 82bad1fa40..006573a302 100644 --- a/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java +++ b/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java @@ -62,6 +62,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; import java.util.function.Function; @@ -80,16 +81,18 @@ static MediaType parseMediaType(String contentType) { return result; } - private abstract class OkHttpAsyncBody implements AsyncBody { + abstract static class OkHttpAsyncBody implements AsyncBody { private final AsyncBody.Consumer consumer; private final BufferedSource source; private final CompletableFuture done = new CompletableFuture<>(); private boolean consuming; private boolean requested; + private Executor executor; - private OkHttpAsyncBody(AsyncBody.Consumer consumer, BufferedSource source) { + OkHttpAsyncBody(AsyncBody.Consumer consumer, BufferedSource source, Executor executor) { this.consumer = consumer; this.source = source; + this.executor = executor; } @Override @@ -103,7 +106,7 @@ public void consume() { } try { // consume should not block a caller, delegate to the dispatcher thread pool - httpClient.dispatcher().executorService().execute(this::doConsume); + executor.execute(this::doConsume); } catch (Exception e) { // executor is likely shutdown Utils.closeQuietly(source); @@ -125,6 +128,8 @@ private void doConsume() { T value = process(source); consumer.consume(value, this); } else { + // even if we've read everything an explicit close is still needed + source.close(); done.complete(null); } } @@ -311,7 +316,8 @@ private okhttp3.Request.Builder newRequestBuilder() { @Override public CompletableFuture> consumeBytesDirect(StandardHttpRequest request, Consumer> consumer) { - Function handler = s -> new OkHttpAsyncBody>(consumer, s) { + Function handler = s -> new OkHttpAsyncBody>(consumer, s, + this.httpClient.dispatcher().executorService()) { @Override protected List process(BufferedSource source) throws IOException { // read only what is available otherwise okhttp will block trying to read diff --git a/httpclient-okhttp/src/test/java/io/fabric8/kubernetes/client/okhttp/OkHttpAsyncBodyTest.java b/httpclient-okhttp/src/test/java/io/fabric8/kubernetes/client/okhttp/OkHttpAsyncBodyTest.java index a867a9f1a9..2a0596b497 100644 --- a/httpclient-okhttp/src/test/java/io/fabric8/kubernetes/client/okhttp/OkHttpAsyncBodyTest.java +++ b/httpclient-okhttp/src/test/java/io/fabric8/kubernetes/client/okhttp/OkHttpAsyncBodyTest.java @@ -17,6 +17,14 @@ import io.fabric8.kubernetes.client.http.AbstractAsyncBodyTest; import io.fabric8.kubernetes.client.http.HttpClient; +import io.fabric8.kubernetes.client.okhttp.OkHttpClientImpl.OkHttpAsyncBody; +import okio.BufferedSource; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; @SuppressWarnings("java:S2187") public class OkHttpAsyncBodyTest extends AbstractAsyncBodyTest { @@ -24,4 +32,21 @@ public class OkHttpAsyncBodyTest extends AbstractAsyncBodyTest { protected HttpClient.Factory getHttpClientFactory() { return new OkHttpClientFactory(); } + + @Test + void testClosedWhenExhausted() throws Exception { + BufferedSource source = Mockito.mock(BufferedSource.class); + Mockito.when(source.exhausted()).thenReturn(true); + OkHttpClientImpl.OkHttpAsyncBody> asyncBody = new OkHttpAsyncBody>(null, source, + Runnable::run) { + + @Override + protected List process(BufferedSource source) throws IOException { + return null; + } + }; + + asyncBody.consume(); + Mockito.verify(source).close(); + } }