Skip to content

Commit

Permalink
fix fabric8io#4201: changing how the jetty session is set
Browse files Browse the repository at this point in the history
adding missing test timeouts
correcting the wait in HttpClientReadableByteChannel
  • Loading branch information
shawkins committed Nov 22, 2022
1 parent c741a07 commit 45d605b
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 14 deletions.
Expand Up @@ -36,7 +36,7 @@ public class JettyWebSocket implements WebSocket, WebSocketListener {
private final Condition backPressure;
private final AtomicBoolean closed;
private boolean moreMessages;
private Session webSocketSession;
private volatile Session webSocketSession;

public JettyWebSocket(WebSocket.Listener listener) {
this.listener = listener;
Expand Down Expand Up @@ -115,6 +115,7 @@ public void onWebSocketClose(int statusCode, String reason) {

@Override
public void onWebSocketConnect(Session session) {
this.webSocketSession = session;
listener.onOpen(this);
}

Expand All @@ -133,11 +134,6 @@ public void onWebSocketError(Throwable cause) {
listener.onError(this, cause);
}

public JettyWebSocket setWebSocketSession(Session webSocketSession) {
this.webSocketSession = webSocketSession;
return this;
}

private void backPressure() {
try {
lock.lock();
Expand Down
Expand Up @@ -58,7 +58,7 @@ public CompletableFuture<WebSocket> buildAsync(WebSocket.Listener listener) {
final CompletableFuture<WebSocket> future = new CompletableFuture<>();
final var webSocket = new JettyWebSocket(listener);
return webSocketClient.connect(webSocket, Objects.requireNonNull(WebSocket.toWebSocketUri(getUri())), cur)
.thenApply(webSocket::setWebSocketSession)
.thenApply(s -> webSocket)
.exceptionally(ex -> {
if (ex instanceof CompletionException && ex.getCause() instanceof UpgradeException) {
future.completeExceptionally(toHandshakeException((UpgradeException) ex.getCause()));
Expand Down
Expand Up @@ -182,7 +182,7 @@ void sendCloseWhenConnectionIsOpen() {
// Given
final var jws = new JettyWebSocket(new Listener());
final var session = mock(Session.class);
jws.setWebSocketSession(session);
jws.onWebSocketConnect(session);
when(session.isOpen()).thenReturn(true);
// When
jws.sendClose(1000, "Closing");
Expand All @@ -196,7 +196,7 @@ void sendCloseIgnoredWhenConnectionIsClosed() {
// Given
final var jws = new JettyWebSocket(new Listener());
final var session = mock(Session.class);
jws.setWebSocketSession(session);
jws.onWebSocketConnect(session);
when(session.isOpen()).thenReturn(false);
// When
jws.sendClose(1000, "Closing");
Expand All @@ -210,7 +210,7 @@ void sendCloseIgnoredWhenAlreadyClosed() {
// Given
final var jws = new JettyWebSocket(new Listener());
final var session = mock(Session.class);
jws.setWebSocketSession(session);
jws.onWebSocketConnect(session);
when(session.isOpen()).thenReturn(true);
jws.sendClose(1000, "Closing");
// When
Expand All @@ -226,7 +226,7 @@ void sendIncreasesQueueSize() {
// Given
final var jws = new JettyWebSocket(new Listener());
final var session = mock(Session.class, RETURNS_DEEP_STUBS);
jws.setWebSocketSession(session);
jws.onWebSocketConnect(session);
when(session.isOpen()).thenReturn(true);
// When
jws.send(ByteBuffer.wrap(new byte[] { 1, 3, 3, 7 }));
Expand Down
Expand Up @@ -87,7 +87,7 @@ void testWebsocketHandshakeFailure() {

assertThrows(WebSocketHandshakeException.class, () -> {
try {
startedFuture.get();
startedFuture.get(10, TimeUnit.SECONDS);
} catch (ExecutionException e) {
throw e.getCause();
}
Expand Down Expand Up @@ -144,7 +144,7 @@ public void onMessage(WebSocket webSocket, String text) {
assertFalse(latch.await(10, TimeUnit.SECONDS));
assertEquals(1, latch.getCount());

startedFuture.get().request();
startedFuture.get(10, TimeUnit.SECONDS).request();

assertTrue(latch.await(10, TimeUnit.SECONDS));
}
Expand Down
Expand Up @@ -106,10 +106,13 @@ public synchronized int read(ByteBuffer arg0) throws IOException {
consumeRequested = true;
this.asyncBody.consume();
// the consume call may actually trigger result deliver
// if it did, then just start the loop over
// if it did, then just start the loop over or be done
if (!consumeRequested) {
continue;
}
if (done) {
return -1;
}
}
try {
this.wait(); // block until more buffers are delivered
Expand Down

0 comments on commit 45d605b

Please sign in to comment.