Skip to content

Commit

Permalink
AbstractListenerReadPublisher continues after 0 bytes
Browse files Browse the repository at this point in the history
If we read 0 bytes, e.g. chunked encoding markup read but not the
actual data within it, don't stop reading since the server may or
may not consider it necessary to call onDataAvailable again.
Instead, we keep on reading, and although isReady likely returns
false on the next iteration, it eliminates ambiguity and ensures
the server will call onDataAvailable when more data arrives.

Closes gh-28241
  • Loading branch information
rstoyanchev committed Apr 1, 2022
1 parent 24cd3c1 commit d518a7d
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 8 deletions.
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,6 +26,8 @@
import org.reactivestreams.Subscription;
import reactor.core.publisher.Operators;

import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.core.log.LogDelegateFactory;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
Expand Down Expand Up @@ -56,6 +58,8 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
*/
protected static Log rsReadLogger = LogDelegateFactory.getHiddenLog(AbstractListenerReadPublisher.class);

final static DataBuffer EMPTY_BUFFER = DefaultDataBufferFactory.sharedInstance.allocateBuffer(0);


private final AtomicReference<State> state = new AtomicReference<>(State.UNSUBSCRIBED);

Expand Down Expand Up @@ -180,15 +184,20 @@ public final void onError(Throwable ex) {

/**
* Read and publish data one at a time until there is no more data, no more
* demand, or perhaps we completed in the mean time.
* demand, or perhaps we completed meanwhile.
* @return {@code true} if there is more demand; {@code false} if there is
* no more demand or we have completed.
*/
private boolean readAndPublish() throws IOException {
long r;
while ((r = this.demand) > 0 && (this.state.get() != State.COMPLETED)) {
T data = read();
if (data != null) {
if (data == EMPTY_BUFFER) {
if (rsReadLogger.isTraceEnabled()) {
rsReadLogger.trace(getLogPrefix() + "0 bytes read, trying again");
}
}
else if (data != null) {
if (r != Long.MAX_VALUE) {
DEMAND_FIELD_UPDATER.addAndGet(this, -1L);
}
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -236,10 +236,10 @@ AsyncListener getAsyncListener() {
/**
* Read from the request body InputStream and return a DataBuffer.
* Invoked only when {@link ServletInputStream#isReady()} returns "true".
* @return a DataBuffer with data read, or {@link #EOF_BUFFER} if the input
* stream returned -1, or null if 0 bytes were read.
* @return a DataBuffer with data read, or
* {@link AbstractListenerReadPublisher#EMPTY_BUFFER} if 0 bytes were read,
* or {@link #EOF_BUFFER} if the input stream returned -1.
*/
@Nullable
DataBuffer readFromInputStream() throws IOException {
int read = this.request.getInputStream().read(this.buffer);
logBytesRead(read);
Expand All @@ -254,7 +254,7 @@ DataBuffer readFromInputStream() throws IOException {
return EOF_BUFFER;
}

return null;
return AbstractListenerReadPublisher.EMPTY_BUFFER;
}

protected final void logBytesRead(int read) {
Expand Down

0 comments on commit d518a7d

Please sign in to comment.