Skip to content

Commit

Permalink
Merge pull request #5936 from eclipse/jetty-9.4.x-5605-wakeup-blocked…
Browse files Browse the repository at this point in the history
…-threads

Fix #5605 Unblock non container Threads
  • Loading branch information
lorban committed Feb 17, 2021
2 parents 1e364ec + 379d069 commit b3bebca
Show file tree
Hide file tree
Showing 8 changed files with 784 additions and 47 deletions.
Expand Up @@ -382,8 +382,15 @@ private boolean parseRequestBuffer()
@Override
public void onCompleted()
{
// If we are fill interested, then a read is pending and we must abort
if (isFillInterested())
{
LOG.warn("Pending read in onCompleted {} {}", this, getEndPoint());
_channel.abort(new IOException("Pending read in onCompleted"));
}

// Handle connection upgrades
if (_channel.getResponse().getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101)
else if (_channel.getResponse().getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101)
{
Connection connection = (Connection)_channel.getRequest().getAttribute(UPGRADE_CONNECTION_ATTRIBUTE);
if (connection != null)
Expand All @@ -407,6 +414,9 @@ public void onCompleted()
}
}

// Drive to EOF, EarlyEOF or Error
boolean complete = _input.consumeAll();

// Finish consuming the request
// If we are still expecting
if (_channel.isExpecting100Continue())
Expand All @@ -415,7 +425,7 @@ public void onCompleted()
_parser.close();
}
// else abort if we can't consume all
else if (_generator.isPersistent() && !_input.consumeAll())
else if (_generator.isPersistent() && !complete)
{
if (LOG.isDebugEnabled())
LOG.debug("unconsumed input {} {}", this, _parser);
Expand Down
Expand Up @@ -720,12 +720,17 @@ public boolean consumeAll()
{
produceContent();
if (_content == null && _intercepted == null && _inputQ.isEmpty())
{
_state = EARLY_EOF;
_inputQ.notify();
return false;
}
}
catch (Throwable e)
{
LOG.debug(e);
_state = new ErrorState(e);
_inputQ.notify();
return false;
}
}
Expand Down
115 changes: 75 additions & 40 deletions jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java
Expand Up @@ -29,6 +29,7 @@
import java.nio.charset.CoderResult;
import java.nio.charset.CodingErrorAction;
import java.util.ResourceBundle;
import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeUnit;
import javax.servlet.RequestDispatcher;
import javax.servlet.ServletOutputStream;
Expand Down Expand Up @@ -69,7 +70,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
enum State
{
OPEN, // Open
CLOSE, // Close needed from onWriteCompletion
CLOSE, // Close needed from onWriteComplete
CLOSING, // Close in progress after close API called
CLOSED // Closed
}
Expand Down Expand Up @@ -308,7 +309,7 @@ else if (_state == State.CLOSE)
{
// Somebody called close or complete while we were writing.
// We can now send a (probably empty) last buffer and then when it completes
// onWriteCompletion will be called again to actually execute the _completeCallback
// onWriteComplete will be called again to actually execute the _completeCallback
_state = State.CLOSING;
closeContent = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER;
}
Expand Down Expand Up @@ -411,53 +412,87 @@ public void complete(Callback callback)
ByteBuffer content = null;
synchronized (_channelState)
{
switch (_state)
// First check the API state for any unrecoverable situations
switch (_apiState)
{
case CLOSED:
succeeded = true;
case UNREADY: // isReady() has returned false so a call to onWritePossible may happen at any time
error = new CancellationException("Completed whilst write unready");
break;

case CLOSE:
case CLOSING:
_closedCallback = Callback.combine(_closedCallback, callback);
case PENDING: // an async write is pending and may complete at any time
// If this is not the last write, then we must abort
if (!_channel.getResponse().isContentComplete(_written))
error = new CancellationException("Completed whilst write pending");
break;

case OPEN:
if (_onError != null)
{
error = _onError;
break;
}
case BLOCKED: // another thread is blocked in a write or a close
error = new CancellationException("Completed whilst write blocked");
break;

_closedCallback = Callback.combine(_closedCallback, callback);
default:
break;
}

switch (_apiState)
{
case BLOCKING:
// Output is idle blocking state, but we still do an async close
_apiState = ApiState.BLOCKED;
_state = State.CLOSING;
content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER;
break;
// If we can't complete due to the API state, then abort
if (error != null)
{
_channel.abort(error);
_writeBlocker.fail(error);
_state = State.CLOSED;
}
else
{
// Otherwise check the output state to determine how to complete
switch (_state)
{
case CLOSED:
succeeded = true;
break;

case ASYNC:
case READY:
// Output is idle in async state, so we can do an async close
_apiState = ApiState.PENDING;
_state = State.CLOSING;
content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER;
break;
case CLOSE:
case CLOSING:
_closedCallback = Callback.combine(_closedCallback, callback);
break;

case BLOCKED:
case UNREADY:
case PENDING:
// An operation is in progress, so we soft close now
_softClose = true;
// then trigger a close from onWriteComplete
_state = State.CLOSE;
case OPEN:
if (_onError != null)
{
error = _onError;
break;
}
break;
}

_closedCallback = Callback.combine(_closedCallback, callback);

switch (_apiState)
{
case BLOCKING:
// Output is idle blocking state, but we still do an async close
_apiState = ApiState.BLOCKED;
_state = State.CLOSING;
content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER;
break;

case ASYNC:
case READY:
// Output is idle in async state, so we can do an async close
_apiState = ApiState.PENDING;
_state = State.CLOSING;
content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER;
break;

case UNREADY:
case PENDING:
// An operation is in progress, so we soft close now
_softClose = true;
// then trigger a close from onWriteComplete
_state = State.CLOSE;
break;

default:
throw new IllegalStateException();
}
break;
}
}
}

Expand Down Expand Up @@ -1399,7 +1434,7 @@ public void recycle()
{
_state = State.OPEN;
_apiState = ApiState.BLOCKING;
_softClose = false;
_softClose = true; // Stay closed until next request
_interceptor = _channel;
HttpConfiguration config = _channel.getHttpConfiguration();
_bufferSize = config.getOutputBufferSize();
Expand Down
Expand Up @@ -1814,6 +1814,11 @@ public boolean isUserInRole(String role)
*/
public void setMetaData(org.eclipse.jetty.http.MetaData.Request request)
{
if (_metaData == null && _input != null && _channel != null)
{
_input.recycle();
_channel.getResponse().getHttpOutput().reopen();
}
_metaData = request;

setMethod(request.getMethod());
Expand Down Expand Up @@ -1895,7 +1900,7 @@ protected void recycle()

getHttpChannelState().recycle();
_requestAttributeListeners.clear();
_input.recycle();
// Defer _input.recycle() until setMetaData on next request, TODO replace with recycle and reopen in 10
_metaData = null;
_originalURI = null;
_contextPath = null;
Expand Down

0 comments on commit b3bebca

Please sign in to comment.