diff --git a/src/main/java/reactor/netty/channel/FluxReceive.java b/src/main/java/reactor/netty/channel/FluxReceive.java index 83bc4d0ca5..44ec33c820 100644 --- a/src/main/java/reactor/netty/channel/FluxReceive.java +++ b/src/main/java/reactor/netty/channel/FluxReceive.java @@ -178,8 +178,11 @@ final void drainReceiver() { cleanQueue(q); return; } + Throwable ex = inboundError; + if (ex != null) { + cleanQueue(q); + } if (d && getPending() == 0) { - Throwable ex = inboundError; if (ex != null) { parent.listener.onUncaughtException(parent, ex); } @@ -369,15 +372,20 @@ final void onInboundComplete() { } final void onInboundError(Throwable err) { + CoreSubscriber receiver = this.receiver; if (isCancelled() || inboundDone) { if (log.isDebugEnabled()) { log.warn(format(channel, "An exception has been observed post termination"), err); } else if (log.isWarnEnabled()) { log.warn(format(channel, "An exception has been observed post termination, use DEBUG level to see the full stack: {}"), err.toString()); } + if (receiver == null) { + // When onInboundComplete is invoked, the queue is not cleaned as there is no receiver + Queue q = receiverQueue; + cleanQueue(q); + } return; } - CoreSubscriber receiver = this.receiver; this.inboundDone = true; if(channel.isActive()){ parent.markPersistent(false);