diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java index 4f05d5e1e091..3f37a9558068 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java @@ -62,8 +62,17 @@ public abstract class AbstractListenerWriteProcessor implements Processor void onNext(AbstractListenerWriteProcessor processor, T data) { } @Override public void onComplete(AbstractListenerWriteProcessor processor) { - processor.changeStateToComplete(this); + processor.readyToCompleteAfterLastWrite = true; + processor.changeStateToReceived(this); } }, @@ -352,7 +362,10 @@ public void onComplete(AbstractListenerWriteProcessor processor) { @SuppressWarnings("deprecation") @Override public void onWritePossible(AbstractListenerWriteProcessor processor) { - if (processor.changeState(this, WRITING)) { + if (processor.readyToCompleteAfterLastWrite) { + processor.changeStateToComplete(RECEIVED); + } + else if (processor.changeState(this, WRITING)) { T data = processor.currentData; Assert.state(data != null, "No data"); try { @@ -360,7 +373,8 @@ public void onWritePossible(AbstractListenerWriteProcessor processor) { if (processor.changeState(WRITING, REQUESTED)) { processor.currentData = null; if (processor.subscriberCompleted) { - processor.changeStateToComplete(REQUESTED); + processor.readyToCompleteAfterLastWrite = true; + processor.changeStateToReceived(REQUESTED); } else { processor.writingPaused();