Skip to content

Commit

Permalink
[RESTEASY-3498] Do not invoke the onComplete callbacks if the onError…
Browse files Browse the repository at this point in the history
… callbacks were invoked. Invoke callbacks when reconnecting and reset the onComplete callbacks to be invoked again.

https://issues.redhat.com/browse/RESTEASY-3498
Signed-off-by: James R. Perkins <jperkins@redhat.com>
  • Loading branch information
jamezp committed Apr 30, 2024
1 parent 74a6bce commit 28123fa
Show file tree
Hide file tree
Showing 6 changed files with 576 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,7 @@ public boolean close(final long timeout, final TimeUnit unit) {
try {
return sseEventSourceScheduler.awaitTermination(timeout, unit);
} catch (InterruptedException e) {
onErrorConsumers.forEach(consumer -> {
consumer.accept(e);
});
runOnErrorConsumers(e);
Thread.currentThread().interrupt();
return false;
}
Expand All @@ -248,6 +246,12 @@ private void runCompleteConsumers() {
}
}

private void runOnErrorConsumers(final Throwable t) {
// Ensure the onComplete callbacks do not get invoked
completeListenersInvoked.set(true);
onErrorConsumers.forEach(onError -> onError.accept(t));
}

private void internalClose() {
if (state.getAndSet(State.CLOSED) == State.CLOSED) {
return;
Expand All @@ -257,7 +261,7 @@ private void internalClose() {
try {
clientResponse.releaseConnection(false);
} catch (IOException e) {
onErrorConsumers.forEach(consumer -> consumer.accept(e));
runOnErrorConsumers(e);
}
}
sseEventSourceScheduler.shutdownNow();
Expand Down Expand Up @@ -398,9 +402,7 @@ private void onConnection() {

private void onUnrecoverableError(Throwable throwable) {
connectedLatch.countDown();
onErrorConsumers.forEach(consumer -> {
consumer.accept(throwable);
});
runOnErrorConsumers(throwable);
internalClose();
}

Expand Down
8 changes: 8 additions & 0 deletions resteasy-dependencies-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
<version.jakarta.annotation.jakarta-annotation-api>2.1.1</version.jakarta.annotation.jakarta-annotation-api>
<version.jakarta.el.el-api>4.0.0</version.jakarta.el.el-api>
<version.jakarta.ejb.ejb-api>4.0.1</version.jakarta.ejb.ejb-api>
<version.jakarta.enterprise.concurrent>3.0.3</version.jakarta.enterprise.concurrent>
<version.jakarta.interceptor.interceptor-api>2.1.0</version.jakarta.interceptor.interceptor-api>
<version.jakarta.jms.jms-api>3.1.0</version.jakarta.jms.jms-api>
<version.jakarta.ws.rs>4.0.0</version.jakarta.ws.rs>
Expand Down Expand Up @@ -309,6 +310,13 @@
<version>${version.jakarta.ws.rs}</version>
</dependency>

<dependency>
<groupId>jakarta.enterprise.concurrent</groupId>
<artifactId>jakarta.enterprise.concurrent-api</artifactId>
<version>${version.jakarta.enterprise.concurrent}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>jakarta.ejb</groupId>
<artifactId>jakarta.ejb-api</artifactId>
Expand Down
6 changes: 6 additions & 0 deletions testsuite/integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,12 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>jakarta.enterprise.concurrent</groupId>
<artifactId>jakarta.enterprise.concurrent-api</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>jakarta.persistence</groupId>
<artifactId>jakarta.persistence-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ public void testException() throws Exception {
}, t -> {
String s = t.getMessage();
Assertions.assertTrue(s.contains("HTTP 500 Internal Server Error"));
// We need to count down here as well. Per the SseEventSource.register() the onComplete
// callback should not be invoked if the onError callback is invoked.
latch.countDown();
}, latch::countDown);
source.open();
Assertions.assertTrue(latch.await(5, TimeUnit.SECONDS));
Expand Down

0 comments on commit 28123fa

Please sign in to comment.