diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java index 0845a9f25f04..de1f3ca5a600 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,6 +26,8 @@ import org.reactivestreams.Subscription; import reactor.core.publisher.Operators; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DefaultDataBufferFactory; import org.springframework.core.log.LogDelegateFactory; import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -56,6 +58,8 @@ public abstract class AbstractListenerReadPublisher implements Publisher { */ protected static Log rsReadLogger = LogDelegateFactory.getHiddenLog(AbstractListenerReadPublisher.class); + final static DataBuffer EMPTY_BUFFER = DefaultDataBufferFactory.sharedInstance.allocateBuffer(0); + private final AtomicReference state = new AtomicReference<>(State.UNSUBSCRIBED); @@ -180,7 +184,7 @@ public final void onError(Throwable ex) { /** * Read and publish data one at a time until there is no more data, no more - * demand, or perhaps we completed in the mean time. + * demand, or perhaps we completed meanwhile. * @return {@code true} if there is more demand; {@code false} if there is * no more demand or we have completed. */ @@ -188,7 +192,12 @@ private boolean readAndPublish() throws IOException { long r; while ((r = this.demand) > 0 && (this.state.get() != State.COMPLETED)) { T data = read(); - if (data != null) { + if (data == EMPTY_BUFFER) { + if (rsReadLogger.isTraceEnabled()) { + rsReadLogger.trace(getLogPrefix() + "0 bytes read, trying again"); + } + } + else if (data != null) { if (r != Long.MAX_VALUE) { DEMAND_FIELD_UPDATER.addAndGet(this, -1L); } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java index a84ddc6d6e3d..7183234e1d4a 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -236,10 +236,10 @@ AsyncListener getAsyncListener() { /** * Read from the request body InputStream and return a DataBuffer. * Invoked only when {@link ServletInputStream#isReady()} returns "true". - * @return a DataBuffer with data read, or {@link #EOF_BUFFER} if the input - * stream returned -1, or null if 0 bytes were read. + * @return a DataBuffer with data read, or + * {@link AbstractListenerReadPublisher#EMPTY_BUFFER} if 0 bytes were read, + * or {@link #EOF_BUFFER} if the input stream returned -1. */ - @Nullable DataBuffer readFromInputStream() throws IOException { int read = this.request.getInputStream().read(this.buffer); logBytesRead(read); @@ -254,7 +254,7 @@ DataBuffer readFromInputStream() throws IOException { return EOF_BUFFER; } - return null; + return AbstractListenerReadPublisher.EMPTY_BUFFER; } protected final void logBytesRead(int read) {