From d6c0ab36f5ffa77c59d50c9ff23e7555f116a2a9 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Tue, 16 Jun 2020 20:46:41 +0300 Subject: [PATCH] fix #1161 Clean the queue when onInboundError/onInboundClose is received There are two use cases for HttpClient: - a channelInactive is received, the response was fully received but there is no subscriber - a error is receive while receiving the response and there is no subscriber --- src/main/java/reactor/netty/channel/FluxReceive.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/main/java/reactor/netty/channel/FluxReceive.java b/src/main/java/reactor/netty/channel/FluxReceive.java index 83bc4d0ca5..44ec33c820 100644 --- a/src/main/java/reactor/netty/channel/FluxReceive.java +++ b/src/main/java/reactor/netty/channel/FluxReceive.java @@ -178,8 +178,11 @@ final void drainReceiver() { cleanQueue(q); return; } + Throwable ex = inboundError; + if (ex != null) { + cleanQueue(q); + } if (d && getPending() == 0) { - Throwable ex = inboundError; if (ex != null) { parent.listener.onUncaughtException(parent, ex); } @@ -369,15 +372,20 @@ final void onInboundComplete() { } final void onInboundError(Throwable err) { + CoreSubscriber receiver = this.receiver; if (isCancelled() || inboundDone) { if (log.isDebugEnabled()) { log.warn(format(channel, "An exception has been observed post termination"), err); } else if (log.isWarnEnabled()) { log.warn(format(channel, "An exception has been observed post termination, use DEBUG level to see the full stack: {}"), err.toString()); } + if (receiver == null) { + // When onInboundComplete is invoked, the queue is not cleaned as there is no receiver + Queue q = receiverQueue; + cleanQueue(q); + } return; } - CoreSubscriber receiver = this.receiver; this.inboundDone = true; if(channel.isActive()){ parent.markPersistent(false);