Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #5605 Unblock non container Threads #5936

Merged
merged 22 commits into from Feb 17, 2021
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
9cc7be4
Fix #5605 Unblock non container Threads
gregw Feb 2, 2021
b3268eb
Fix #5605 Unblock non container Threads
gregw Feb 2, 2021
0d85c7d
Fix #5605 Unblock non container Threads
gregw Feb 2, 2021
a110fc3
Fix #5605 Unblock non container Threads
gregw Feb 2, 2021
a100d80
Fix #5605 Unblock non container Threads
gregw Feb 3, 2021
25cbe65
Fix #5605 Unblock non container Threads
gregw Feb 3, 2021
70056e2
Fix #5605 Unblock non container Threads
gregw Feb 3, 2021
5f4919c
Fix #5605 Unblock non container Threads
gregw Feb 3, 2021
e9315fe
Fix #5605 Unblock non container Threads
gregw Feb 3, 2021
7235e49
Fix #5605 Unblock non container Threads
gregw Feb 4, 2021
096e8b8
Merge remote-tracking branch 'origin/jetty-9.4.x' into jetty-9.4.x-56…
gregw Feb 4, 2021
39f6f87
Fix #5605 Unblock non container Threads
gregw Feb 4, 2021
ed534b8
Fix #5937
gregw Feb 5, 2021
9f2a4f5
Fix #5605 write side
gregw Feb 10, 2021
769687f
update from the feedback on the feedback of the feedback from the rev…
gregw Feb 10, 2021
e2c710e
updates from review
gregw Feb 11, 2021
6b1a8c3
updates from review
gregw Feb 16, 2021
0715846
updates from review
gregw Feb 16, 2021
78ed082
Merge remote-tracking branch 'origin/jetty-9.4.x' into jetty-9.4.x-56…
gregw Feb 17, 2021
8349614
Merge branch 'jetty-9.4.x' into jetty-9.4.x-5605-wakeup-blocked-threads
gregw Feb 17, 2021
bbf26a3
Merge branch 'jetty-9.4.x' into jetty-9.4.x-5605-wakeup-blocked-threads
gregw Feb 17, 2021
379d069
Merge branch 'jetty-9.4.x' into jetty-9.4.x-5605-wakeup-blocked-threads
gregw Feb 17, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -376,8 +376,15 @@ private boolean parseRequestBuffer()
@Override
public void onCompleted()
{
// If we are fill interested, then a read is pending and we must abort
if (isFillInterested())
{
LOG.warn("Pending read in onCompleted {} {}", this, getEndPoint());
_channel.abort(new IOException("Pending read in onCompleted"));
}
gregw marked this conversation as resolved.
Show resolved Hide resolved

// Handle connection upgrades
if (_channel.getResponse().getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101)
else if (_channel.getResponse().getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101)
{
Connection connection = (Connection)_channel.getRequest().getAttribute(UPGRADE_CONNECTION_ATTRIBUTE);
if (connection != null)
Expand All @@ -401,6 +408,9 @@ public void onCompleted()
}
}

// Drive to EOF, EarlyEOF or Error
boolean complete = _input.consumeAll();

// Finish consuming the request
// If we are still expecting
if (_channel.isExpecting100Continue())
Expand All @@ -409,7 +419,7 @@ public void onCompleted()
_parser.close();
}
// else abort if we can't consume all
else if (_generator.isPersistent() && !_input.consumeAll())
else if (_generator.isPersistent() && !complete)
{
if (LOG.isDebugEnabled())
LOG.debug("unconsumed input {} {}", this, _parser);
Expand Down
Expand Up @@ -720,12 +720,17 @@ public boolean consumeAll()
{
produceContent();
if (_content == null && _intercepted == null && _inputQ.isEmpty())
{
_state = EARLY_EOF;
_inputQ.notify();
return false;
}
}
catch (Throwable e)
{
LOG.debug(e);
_state = new ErrorState(e);
_inputQ.notify();
return false;
}
}
Expand Down
115 changes: 75 additions & 40 deletions jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java
Expand Up @@ -29,6 +29,7 @@
import java.nio.charset.CoderResult;
import java.nio.charset.CodingErrorAction;
import java.util.ResourceBundle;
import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeUnit;
import javax.servlet.RequestDispatcher;
import javax.servlet.ServletOutputStream;
Expand Down Expand Up @@ -69,7 +70,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
enum State
{
OPEN, // Open
CLOSE, // Close needed from onWriteCompletion
CLOSE, // Close needed from onWriteComplete
CLOSING, // Close in progress after close API called
CLOSED // Closed
}
Expand Down Expand Up @@ -308,7 +309,7 @@ else if (_state == State.CLOSE)
{
// Somebody called close or complete while we were writing.
// We can now send a (probably empty) last buffer and then when it completes
// onWriteCompletion will be called again to actually execute the _completeCallback
// onWriteComplete will be called again to actually execute the _completeCallback
_state = State.CLOSING;
closeContent = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER;
}
Expand Down Expand Up @@ -411,53 +412,87 @@ public void complete(Callback callback)
ByteBuffer content = null;
synchronized (_channelState)
{
switch (_state)
// First check the API state for any unrecoverable situations
switch (_apiState)
{
case CLOSED:
succeeded = true;
case UNREADY: // isReady() has returned false so a call to onWritePossible may happen at any time
error = new CancellationException("Completed whilst write unready");
break;

case CLOSE:
case CLOSING:
_closedCallback = Callback.combine(_closedCallback, callback);
case PENDING: // an async write is pending and may complete at any time
// If this is not the last write, then we must abort
if (!_channel.getResponse().isContentComplete(_written))
error = new CancellationException("Completed whilst write pending");
break;

case OPEN:
if (_onError != null)
{
error = _onError;
break;
}
case BLOCKED: // another thread is blocked in a write or a close
error = new CancellationException("Completed whilst write blocked");
break;

_closedCallback = Callback.combine(_closedCallback, callback);
default:
break;
}

switch (_apiState)
{
case BLOCKING:
// Output is idle blocking state, but we still do an async close
_apiState = ApiState.BLOCKED;
_state = State.CLOSING;
content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER;
break;
// If we can't complete due to the API state, then abort
if (error != null)
{
_channel.abort(error);
_writeBlocker.fail(error);
_state = State.CLOSED;
gregw marked this conversation as resolved.
Show resolved Hide resolved
}
else
{
// Otherwise check the output state to determine how to complete
switch (_state)
{
case CLOSED:
succeeded = true;
break;

case ASYNC:
case READY:
// Output is idle in async state, so we can do an async close
_apiState = ApiState.PENDING;
_state = State.CLOSING;
content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER;
break;
case CLOSE:
case CLOSING:
_closedCallback = Callback.combine(_closedCallback, callback);
break;

case BLOCKED:
case UNREADY:
case PENDING:
// An operation is in progress, so we soft close now
_softClose = true;
// then trigger a close from onWriteComplete
_state = State.CLOSE;
case OPEN:
if (_onError != null)
{
error = _onError;
break;
}
break;
}

_closedCallback = Callback.combine(_closedCallback, callback);

switch (_apiState)
{
case BLOCKING:
// Output is idle blocking state, but we still do an async close
_apiState = ApiState.BLOCKED;
_state = State.CLOSING;
content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER;
break;

case ASYNC:
case READY:
// Output is idle in async state, so we can do an async close
_apiState = ApiState.PENDING;
_state = State.CLOSING;
content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER;
break;

case UNREADY:
case PENDING:
// An operation is in progress, so we soft close now
_softClose = true;
// then trigger a close from onWriteComplete
_state = State.CLOSE;
break;

default:
throw new IllegalStateException();
}
break;
}
}
}

Expand Down Expand Up @@ -1399,7 +1434,7 @@ public void recycle()
{
_state = State.OPEN;
_apiState = ApiState.BLOCKING;
_softClose = false;
_softClose = true; // Stay closed until next request
_interceptor = _channel;
HttpConfiguration config = _channel.getHttpConfiguration();
_bufferSize = config.getOutputBufferSize();
Expand Down
Expand Up @@ -1811,6 +1811,11 @@ public boolean isUserInRole(String role)
*/
public void setMetaData(org.eclipse.jetty.http.MetaData.Request request)
{
if (_metaData == null && _input != null && _channel != null)
{
_input.recycle();
_channel.getResponse().getHttpOutput().reopen();
lorban marked this conversation as resolved.
Show resolved Hide resolved
}
_metaData = request;

setMethod(request.getMethod());
Expand Down Expand Up @@ -1879,7 +1884,7 @@ protected void recycle()

getHttpChannelState().recycle();
_requestAttributeListeners.clear();
_input.recycle();
// Defer _input.recycle() until setMetaData on next request, TODO replace with recycle and reopen in 10
_metaData = null;
_originalURI = null;
_contextPath = null;
Expand Down