Skip to content

Commit

Permalink
fix #1161 Clean the queue when onInboundError/onInboundClose is received
Browse files Browse the repository at this point in the history
There are two use cases for HttpClient:
- a channelInactive is received, the response was fully received but there is no subscriber
- a error is receive while receiving the response and there is no subscriber
  • Loading branch information
violetagg committed Jun 16, 2020
1 parent 36e902b commit 9cf8b9e
Showing 1 changed file with 10 additions and 2 deletions.
12 changes: 10 additions & 2 deletions src/main/java/reactor/netty/channel/FluxReceive.java
Expand Up @@ -178,8 +178,11 @@ final void drainReceiver() {
cleanQueue(q);
return;
}
Throwable ex = inboundError;
if (ex != null) {
cleanQueue(q);
}
if (d && getPending() == 0) {
Throwable ex = inboundError;
if (ex != null) {
parent.listener.onUncaughtException(parent, ex);
}
Expand Down Expand Up @@ -369,15 +372,20 @@ final void onInboundComplete() {
}

final void onInboundError(Throwable err) {
CoreSubscriber<?> receiver = this.receiver;
if (isCancelled() || inboundDone) {
if (log.isDebugEnabled()) {
log.warn(format(channel, "An exception has been observed post termination"), err);
} else if (log.isWarnEnabled()) {
log.warn(format(channel, "An exception has been observed post termination, use DEBUG level to see the full stack: {}"), err.toString());
}
if (receiver == null) {
// When onInboundComplete is invoked, the queue is not cleaned as there is no receiver
Queue<Object> q = receiverQueue;
cleanQueue(q);
}
return;
}
CoreSubscriber<?> receiver = this.receiver;
this.inboundDone = true;
if(channel.isActive()){
parent.markPersistent(false);
Expand Down

0 comments on commit 9cf8b9e

Please sign in to comment.