From 529f8ba78635e843d1f815002aac152361f66dd0 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Fri, 22 Nov 2019 15:55:44 +0000 Subject: [PATCH] Extra isReady-onWritePossible after last write Closes gh-24050 --- .../AbstractListenerWriteProcessor.java | 20 ++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java index 4f05d5e1e091..3f37a9558068 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java @@ -62,8 +62,17 @@ public abstract class AbstractListenerWriteProcessor implements Processor void onNext(AbstractListenerWriteProcessor processor, T data) { } @Override public void onComplete(AbstractListenerWriteProcessor processor) { - processor.changeStateToComplete(this); + processor.readyToCompleteAfterLastWrite = true; + processor.changeStateToReceived(this); } }, @@ -352,7 +362,10 @@ public void onComplete(AbstractListenerWriteProcessor processor) { @SuppressWarnings("deprecation") @Override public void onWritePossible(AbstractListenerWriteProcessor processor) { - if (processor.changeState(this, WRITING)) { + if (processor.readyToCompleteAfterLastWrite) { + processor.changeStateToComplete(RECEIVED); + } + else if (processor.changeState(this, WRITING)) { T data = processor.currentData; Assert.state(data != null, "No data"); try { @@ -360,7 +373,8 @@ public void onWritePossible(AbstractListenerWriteProcessor processor) { if (processor.changeState(WRITING, REQUESTED)) { processor.currentData = null; if (processor.subscriberCompleted) { - processor.changeStateToComplete(REQUESTED); + processor.readyToCompleteAfterLastWrite = true; + processor.changeStateToReceived(REQUESTED); } else { processor.writingPaused();