From d518a7d8c81e2cf7484942c37836219b3669240d Mon Sep 17 00:00:00 2001 From: rstoyanchev Date: Fri, 1 Apr 2022 17:36:14 +0100 Subject: [PATCH] AbstractListenerReadPublisher continues after 0 bytes If we read 0 bytes, e.g. chunked encoding markup read but not the actual data within it, don't stop reading since the server may or may not consider it necessary to call onDataAvailable again. Instead, we keep on reading, and although isReady likely returns false on the next iteration, it eliminates ambiguity and ensures the server will call onDataAvailable when more data arrives. Closes gh-28241 --- .../reactive/AbstractListenerReadPublisher.java | 15 ++++++++++++--- .../server/reactive/ServletServerHttpRequest.java | 10 +++++----- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java index 0845a9f25f04..de1f3ca5a600 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,6 +26,8 @@ import org.reactivestreams.Subscription; import reactor.core.publisher.Operators; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DefaultDataBufferFactory; import org.springframework.core.log.LogDelegateFactory; import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -56,6 +58,8 @@ public abstract class AbstractListenerReadPublisher implements Publisher { */ protected static Log rsReadLogger = LogDelegateFactory.getHiddenLog(AbstractListenerReadPublisher.class); + final static DataBuffer EMPTY_BUFFER = DefaultDataBufferFactory.sharedInstance.allocateBuffer(0); + private final AtomicReference state = new AtomicReference<>(State.UNSUBSCRIBED); @@ -180,7 +184,7 @@ public final void onError(Throwable ex) { /** * Read and publish data one at a time until there is no more data, no more - * demand, or perhaps we completed in the mean time. + * demand, or perhaps we completed meanwhile. * @return {@code true} if there is more demand; {@code false} if there is * no more demand or we have completed. */ @@ -188,7 +192,12 @@ private boolean readAndPublish() throws IOException { long r; while ((r = this.demand) > 0 && (this.state.get() != State.COMPLETED)) { T data = read(); - if (data != null) { + if (data == EMPTY_BUFFER) { + if (rsReadLogger.isTraceEnabled()) { + rsReadLogger.trace(getLogPrefix() + "0 bytes read, trying again"); + } + } + else if (data != null) { if (r != Long.MAX_VALUE) { DEMAND_FIELD_UPDATER.addAndGet(this, -1L); } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java index a84ddc6d6e3d..7183234e1d4a 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -236,10 +236,10 @@ AsyncListener getAsyncListener() { /** * Read from the request body InputStream and return a DataBuffer. * Invoked only when {@link ServletInputStream#isReady()} returns "true". - * @return a DataBuffer with data read, or {@link #EOF_BUFFER} if the input - * stream returned -1, or null if 0 bytes were read. + * @return a DataBuffer with data read, or + * {@link AbstractListenerReadPublisher#EMPTY_BUFFER} if 0 bytes were read, + * or {@link #EOF_BUFFER} if the input stream returned -1. */ - @Nullable DataBuffer readFromInputStream() throws IOException { int read = this.request.getInputStream().read(this.buffer); logBytesRead(read); @@ -254,7 +254,7 @@ DataBuffer readFromInputStream() throws IOException { return EOF_BUFFER; } - return null; + return AbstractListenerReadPublisher.EMPTY_BUFFER; } protected final void logBytesRead(int read) {