Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jetty 9.4.x unconsumed content2 #6163

Merged
merged 7 commits into from Apr 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -295,6 +295,14 @@ else if (filled < 0)
}
}
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("{} caught exception {}", this, _channel.getState(), x);
BufferUtil.clear(_requestBuffer);
releaseRequestBuffer();
close();
}
gregw marked this conversation as resolved.
Show resolved Hide resolved
finally
{
setCurrentConnection(last);
Expand Down Expand Up @@ -324,10 +332,7 @@ protected boolean fillAndParseForContent()
private int fillRequestBuffer()
{
if (_contentBufferReferences.get() > 0)
{
LOG.warn("{} fill with unconsumed content!", this);
return 0;
}
throw new IllegalStateException("fill with unconsumed content on " + this);

if (BufferUtil.isEmpty(_requestBuffer))
{
Expand All @@ -354,11 +359,13 @@ private int fillRequestBuffer()
}
catch (IOException e)
{
LOG.debug(e);
if (LOG.isDebugEnabled())
LOG.debug(e);
_parser.atEOF();
return -1;
}
}

return 0;
}

Expand Down
170 changes: 95 additions & 75 deletions jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java
Expand Up @@ -235,7 +235,7 @@ public int available()
{
produceContent();
}
catch (IOException e)
catch (Throwable e)
{
woken = failed(e);
}
Expand Down Expand Up @@ -390,7 +390,7 @@ protected Content nextContent() throws IOException
*
* @return Content or null
*/
protected Content nextNonSentinelContent()
protected Content nextNonSentinelContent() throws IOException
{
while (true)
{
Expand All @@ -416,7 +416,7 @@ protected Content nextNonSentinelContent()
* @return the content or EOF or null if none available.
* @throws IOException if retrieving the content fails
*/
protected Content produceNextContext() throws IOException
protected Content produceNextContent() throws IOException
{
Content content = nextInterceptedContent();
if (content == null && !isFinished())
Expand All @@ -433,7 +433,7 @@ protected Content produceNextContext() throws IOException
*
* @return Content with remaining, a {@link SentinelContent}, or null
*/
protected Content nextInterceptedContent()
protected Content nextInterceptedContent() throws IOException
{
// If we have a chunk produced by interception
if (_intercepted != null)
Expand All @@ -458,9 +458,10 @@ protected Content nextInterceptedContent()
// Are we intercepting?
if (_interceptor != null)
{
// Intercept the current content (may be called several
// times for the same content
_intercepted = _interceptor.readFrom(_content);
// Intercept the current content.
// The interceptor may be called several
// times for the same content.
_intercepted = intercept(_content);

// If interception produced new content
if (_intercepted != null && _intercepted != _content)
Expand Down Expand Up @@ -492,6 +493,24 @@ protected Content nextInterceptedContent()
return null;
}

private Content intercept(Content content) throws IOException
{
try
{
return _interceptor.readFrom(content);
}
catch (Throwable x)
{
IOException failure = new IOException("Bad content", x);
content.failed(failure);
HttpChannel channel = _channelState.getHttpChannel();
Response response = channel.getResponse();
if (response.isCommitted())
channel.abort(failure);
throw failure;
}
}

private void consume(Content content)
{
if (!isError() && content instanceof EofContent)
Expand Down Expand Up @@ -529,21 +548,6 @@ protected int get(Content content, byte[] buffer, int offset, int length)
return l;
}

/**
* Consumes the given content. Calls the content succeeded if all content consumed.
*
* @param content the content to consume
* @param length the number of bytes to consume
*/
protected void skip(Content content, int length)
{
int l = content.skip(length);

_contentConsumed += l;
if (l > 0 && content.isEmpty())
nextNonSentinelContent(); // hungry succeed
}

/**
* Blocks until some content or some end-of-file event arrives.
*
Expand Down Expand Up @@ -620,10 +624,19 @@ public boolean addContent(Content content)
if (LOG.isDebugEnabled())
LOG.debug("{} addContent {}", this, content);

if (nextInterceptedContent() != null)
return wakeup();
else
return false;
try
{
if (nextInterceptedContent() != null)
return wakeup();
else
return false;
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("", x);
return failed(x);
}
}
}
}
Expand Down Expand Up @@ -686,6 +699,7 @@ public boolean eof()
* Consume all available content without blocking.
* Raw content is counted in the {@link #getContentReceived()} statistics, but
* is not intercepted nor counted in the {@link #getContentConsumed()} statistics
*
* @return True if EOF was reached, false otherwise.
*/
public boolean consumeAll()
Expand Down Expand Up @@ -765,37 +779,39 @@ public boolean isFinished()
@Override
public boolean isReady()
{
try
synchronized (_inputQ)
{
synchronized (_inputQ)
try
{
if (_listener == null)
return true;
if (_state instanceof EOFState)
return true;
if (_waitingForContent)
return false;
if (produceNextContext() != null)
if (produceNextContent() != null)
return true;
_channelState.onReadUnready();
_waitingForContent = true;
return false;
}
catch (Throwable e)
{
if (LOG.isDebugEnabled())
LOG.debug("", e);
failed(e);
return true;
}
return false;
}
catch (IOException e)
{
LOG.ignore(e);
return true;
}
}

@Override
public void setReadListener(ReadListener readListener)
{
boolean woken = false;
try
synchronized (_inputQ)
{
synchronized (_inputQ)
try
{
if (_listener != null)
throw new IllegalStateException("ReadListener already set");
Expand All @@ -808,7 +824,7 @@ public void setReadListener(ReadListener readListener)
}
else
{
Content content = produceNextContext();
Content content = produceNextContent();
if (content != null)
{
_state = ASYNC;
Expand All @@ -827,10 +843,13 @@ else if (_state == EOF)
}
}
}
}
catch (IOException e)
{
throw new RuntimeIOException(e);
catch (Throwable e)
{
if (LOG.isDebugEnabled())
LOG.debug("", e);
failed(e);
woken = _channelState.onReadReady();
}
}

if (woken)
Expand Down Expand Up @@ -895,49 +914,49 @@ private boolean wakeup()
@Override
public void run()
{
final ReadListener listener;
Throwable error;
ReadListener listener = null;
Throwable error = null;
boolean aeof = false;

synchronized (_inputQ)
try
{
listener = _listener;

if (_state == EOF)
return;

if (_state == AEOF)
synchronized (_inputQ)
{
_state = EOF;
aeof = true;
}
listener = _listener;

error = _state.getError();

if (!aeof && error == null)
{
Content content = nextInterceptedContent();
if (content == null)
if (_state == EOF)
return;

// Consume a directly received EOF without first calling onDataAvailable
// So -1 will never be read and only onAddDataRread or onError will be called
if (content instanceof EofContent)
if (_state == AEOF)
{
consume(content);
if (_state == EARLY_EOF)
error = _state.getError();
else if (_state == AEOF)
_state = EOF;
aeof = true;
}

error = _state.getError();

if (!aeof && error == null)
{
Content content = nextInterceptedContent();
if (content == null)
return;

// Consume a directly received EOF without first calling onDataAvailable
// So -1 will never be read and only onAddDataRread or onError will be called
if (content instanceof EofContent)
{
aeof = true;
_state = EOF;
consume(content);
if (_state == EARLY_EOF)
error = _state.getError();
else if (_state == AEOF)
{
aeof = true;
_state = EOF;
}
}
}
}
}

try
{
if (error != null)
{
// TODO is this necessary to add here?
Expand All @@ -958,7 +977,8 @@ else if (aeof)
catch (Throwable e)
{
LOG.warn(e.toString());
LOG.debug(e);
if (LOG.isDebugEnabled())
LOG.debug("", e);
try
{
if (aeof || error == null)
Expand Down Expand Up @@ -1106,7 +1126,7 @@ protected static class EOFState extends State
{
}

protected class ErrorState extends EOFState
protected static class ErrorState extends EOFState
{
final Throwable _error;

Expand Down Expand Up @@ -1155,7 +1175,7 @@ public String toString()
protected static final State ASYNC = new State()
{
@Override
public int noContent() throws IOException
public int noContent()
{
return 0;
}
Expand Down