diff --git a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocket.java b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocket.java index 53ea3a55b45..b254883bdd4 100644 --- a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocket.java +++ b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocket.java @@ -36,7 +36,7 @@ public class JettyWebSocket implements WebSocket, WebSocketListener { private final Condition backPressure; private final AtomicBoolean closed; private boolean moreMessages; - private Session webSocketSession; + private volatile Session webSocketSession; public JettyWebSocket(WebSocket.Listener listener) { this.listener = listener; @@ -115,6 +115,7 @@ public void onWebSocketClose(int statusCode, String reason) { @Override public void onWebSocketConnect(Session session) { + this.webSocketSession = session; listener.onOpen(this); } @@ -133,11 +134,6 @@ public void onWebSocketError(Throwable cause) { listener.onError(this, cause); } - public JettyWebSocket setWebSocketSession(Session webSocketSession) { - this.webSocketSession = webSocketSession; - return this; - } - private void backPressure() { try { lock.lock(); diff --git a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketBuilder.java b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketBuilder.java index b3f9990b5e2..c5a8135bc51 100644 --- a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketBuilder.java +++ b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketBuilder.java @@ -58,7 +58,7 @@ public CompletableFuture buildAsync(WebSocket.Listener listener) { final CompletableFuture future = new CompletableFuture<>(); final var webSocket = new JettyWebSocket(listener); return webSocketClient.connect(webSocket, Objects.requireNonNull(WebSocket.toWebSocketUri(getUri())), cur) - .thenApply(webSocket::setWebSocketSession) + .thenApply(s -> webSocket) .exceptionally(ex -> { if (ex instanceof CompletionException && ex.getCause() instanceof UpgradeException) { future.completeExceptionally(toHandshakeException((UpgradeException) ex.getCause())); diff --git a/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketTest.java b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketTest.java index 75e05f5ca3a..83a3510e869 100644 --- a/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketTest.java +++ b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketTest.java @@ -182,7 +182,7 @@ void sendCloseWhenConnectionIsOpen() { // Given final var jws = new JettyWebSocket(new Listener()); final var session = mock(Session.class); - jws.setWebSocketSession(session); + jws.onWebSocketConnect(session); when(session.isOpen()).thenReturn(true); // When jws.sendClose(1000, "Closing"); @@ -196,7 +196,7 @@ void sendCloseIgnoredWhenConnectionIsClosed() { // Given final var jws = new JettyWebSocket(new Listener()); final var session = mock(Session.class); - jws.setWebSocketSession(session); + jws.onWebSocketConnect(session); when(session.isOpen()).thenReturn(false); // When jws.sendClose(1000, "Closing"); @@ -210,7 +210,7 @@ void sendCloseIgnoredWhenAlreadyClosed() { // Given final var jws = new JettyWebSocket(new Listener()); final var session = mock(Session.class); - jws.setWebSocketSession(session); + jws.onWebSocketConnect(session); when(session.isOpen()).thenReturn(true); jws.sendClose(1000, "Closing"); // When @@ -226,7 +226,7 @@ void sendIncreasesQueueSize() { // Given final var jws = new JettyWebSocket(new Listener()); final var session = mock(Session.class, RETURNS_DEEP_STUBS); - jws.setWebSocketSession(session); + jws.onWebSocketConnect(session); when(session.isOpen()).thenReturn(true); // When jws.send(ByteBuffer.wrap(new byte[] { 1, 3, 3, 7 })); diff --git a/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/OkHttpClientTest.java b/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/OkHttpClientTest.java index 10cd72b4238..5d1bf425bfc 100644 --- a/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/OkHttpClientTest.java +++ b/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/OkHttpClientTest.java @@ -87,7 +87,7 @@ void testWebsocketHandshakeFailure() { assertThrows(WebSocketHandshakeException.class, () -> { try { - startedFuture.get(); + startedFuture.get(10, TimeUnit.SECONDS); } catch (ExecutionException e) { throw e.getCause(); } @@ -144,7 +144,7 @@ public void onMessage(WebSocket webSocket, String text) { assertFalse(latch.await(10, TimeUnit.SECONDS)); assertEquals(1, latch.getCount()); - startedFuture.get().request(); + startedFuture.get(10, TimeUnit.SECONDS).request(); assertTrue(latch.await(10, TimeUnit.SECONDS)); }