diff --git a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocket.java b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocket.java index 53ea3a55b4..b254883bdd 100644 --- a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocket.java +++ b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocket.java @@ -36,7 +36,7 @@ public class JettyWebSocket implements WebSocket, WebSocketListener { private final Condition backPressure; private final AtomicBoolean closed; private boolean moreMessages; - private Session webSocketSession; + private volatile Session webSocketSession; public JettyWebSocket(WebSocket.Listener listener) { this.listener = listener; @@ -115,6 +115,7 @@ public void onWebSocketClose(int statusCode, String reason) { @Override public void onWebSocketConnect(Session session) { + this.webSocketSession = session; listener.onOpen(this); } @@ -133,11 +134,6 @@ public void onWebSocketError(Throwable cause) { listener.onError(this, cause); } - public JettyWebSocket setWebSocketSession(Session webSocketSession) { - this.webSocketSession = webSocketSession; - return this; - } - private void backPressure() { try { lock.lock(); diff --git a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketBuilder.java b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketBuilder.java index b3f9990b5e..c5a8135bc5 100644 --- a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketBuilder.java +++ b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketBuilder.java @@ -58,7 +58,7 @@ public CompletableFuture buildAsync(WebSocket.Listener listener) { final CompletableFuture future = new CompletableFuture<>(); final var webSocket = new JettyWebSocket(listener); return webSocketClient.connect(webSocket, Objects.requireNonNull(WebSocket.toWebSocketUri(getUri())), cur) - .thenApply(webSocket::setWebSocketSession) + .thenApply(s -> webSocket) .exceptionally(ex -> { if (ex instanceof CompletionException && ex.getCause() instanceof UpgradeException) { future.completeExceptionally(toHandshakeException((UpgradeException) ex.getCause())); diff --git a/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketTest.java b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketTest.java index 75e05f5ca3..83a3510e86 100644 --- a/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketTest.java +++ b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketTest.java @@ -182,7 +182,7 @@ void sendCloseWhenConnectionIsOpen() { // Given final var jws = new JettyWebSocket(new Listener()); final var session = mock(Session.class); - jws.setWebSocketSession(session); + jws.onWebSocketConnect(session); when(session.isOpen()).thenReturn(true); // When jws.sendClose(1000, "Closing"); @@ -196,7 +196,7 @@ void sendCloseIgnoredWhenConnectionIsClosed() { // Given final var jws = new JettyWebSocket(new Listener()); final var session = mock(Session.class); - jws.setWebSocketSession(session); + jws.onWebSocketConnect(session); when(session.isOpen()).thenReturn(false); // When jws.sendClose(1000, "Closing"); @@ -210,7 +210,7 @@ void sendCloseIgnoredWhenAlreadyClosed() { // Given final var jws = new JettyWebSocket(new Listener()); final var session = mock(Session.class); - jws.setWebSocketSession(session); + jws.onWebSocketConnect(session); when(session.isOpen()).thenReturn(true); jws.sendClose(1000, "Closing"); // When @@ -226,7 +226,7 @@ void sendIncreasesQueueSize() { // Given final var jws = new JettyWebSocket(new Listener()); final var session = mock(Session.class, RETURNS_DEEP_STUBS); - jws.setWebSocketSession(session); + jws.onWebSocketConnect(session); when(session.isOpen()).thenReturn(true); // When jws.send(ByteBuffer.wrap(new byte[] { 1, 3, 3, 7 })); diff --git a/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/OkHttpClientTest.java b/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/OkHttpClientTest.java index 10cd72b423..5d1bf425bf 100644 --- a/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/OkHttpClientTest.java +++ b/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/OkHttpClientTest.java @@ -87,7 +87,7 @@ void testWebsocketHandshakeFailure() { assertThrows(WebSocketHandshakeException.class, () -> { try { - startedFuture.get(); + startedFuture.get(10, TimeUnit.SECONDS); } catch (ExecutionException e) { throw e.getCause(); } @@ -144,7 +144,7 @@ public void onMessage(WebSocket webSocket, String text) { assertFalse(latch.await(10, TimeUnit.SECONDS)); assertEquals(1, latch.getCount()); - startedFuture.get().request(); + startedFuture.get(10, TimeUnit.SECONDS).request(); assertTrue(latch.await(10, TimeUnit.SECONDS)); } diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClientReadableByteChannel.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClientReadableByteChannel.java index b0becfd643..b8e114e353 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClientReadableByteChannel.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClientReadableByteChannel.java @@ -106,10 +106,13 @@ public synchronized int read(ByteBuffer arg0) throws IOException { consumeRequested = true; this.asyncBody.consume(); // the consume call may actually trigger result deliver - // if it did, then just start the loop over + // if it did, then just start the loop over or be done if (!consumeRequested) { continue; } + if (done) { + return -1; + } } try { this.wait(); // block until more buffers are delivered