Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jetty 10.0.x Fix #5605 Unblock non container Threads #5953

Merged
merged 38 commits into from Feb 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
9cc7be4
Fix #5605 Unblock non container Threads
gregw Feb 2, 2021
b3268eb
Fix #5605 Unblock non container Threads
gregw Feb 2, 2021
0d85c7d
Fix #5605 Unblock non container Threads
gregw Feb 2, 2021
a110fc3
Fix #5605 Unblock non container Threads
gregw Feb 2, 2021
a100d80
Fix #5605 Unblock non container Threads
gregw Feb 3, 2021
25cbe65
Fix #5605 Unblock non container Threads
gregw Feb 3, 2021
70056e2
Fix #5605 Unblock non container Threads
gregw Feb 3, 2021
5f4919c
Fix #5605 Unblock non container Threads
gregw Feb 3, 2021
e9315fe
Fix #5605 Unblock non container Threads
gregw Feb 3, 2021
7235e49
Fix #5605 Unblock non container Threads
gregw Feb 4, 2021
096e8b8
Merge remote-tracking branch 'origin/jetty-9.4.x' into jetty-9.4.x-56…
gregw Feb 4, 2021
39f6f87
Fix #5605 Unblock non container Threads
gregw Feb 4, 2021
ed534b8
Fix #5937
gregw Feb 5, 2021
9611a25
Merge remote-tracking branch 'origin/jetty-9.4.x-5605-wakeup-blocked-…
lorban Feb 8, 2021
d297e9c
unblock readers after consumeAll
lorban Feb 8, 2021
50a379d
align HttpInput and HttpOutput on recycle and reopen
lorban Feb 9, 2021
f81ff17
move fill interested + pending read check out of upgrade
lorban Feb 9, 2021
a29a054
fix leak
lorban Feb 9, 2021
ce29e7c
nits
lorban Feb 9, 2021
12734b1
committee loop rewrite
lorban Feb 9, 2021
3c4713d
nit
lorban Feb 9, 2021
9f2a4f5
Fix #5605 write side
gregw Feb 10, 2021
3c46c53
fix the committee's loop
lorban Feb 10, 2021
03e2789
Merge remote-tracking branch 'origin/jetty-9.4.x-5605-wakeup-blocked-…
lorban Feb 10, 2021
769687f
update from the feedback on the feedback of the feedback from the rev…
gregw Feb 10, 2021
f8bf885
restore the committee's loop
lorban Feb 10, 2021
6d9d548
lock all HttpInput methods to prevents concurrent threads to work on …
lorban Feb 10, 2021
1494a05
rework HttpInput locking
lorban Feb 11, 2021
14108c8
set the read listener only after all checks are done
lorban Feb 11, 2021
e2c710e
updates from review
gregw Feb 11, 2021
59b397b
fix deadlock between HttpInput's lock and BlockingContentProducer's s…
lorban Feb 11, 2021
31eedec
internalize the lock within the content producer
lorban Feb 11, 2021
67b533d
Merge remote-tracking branch 'origin/jetty-9.4.x-5605-wakeup-blocked-…
lorban Feb 12, 2021
8049be5
repro test
lorban Feb 12, 2021
bf9318f
fix HttpOutput.close() hanging forever when client sends TCP RST and …
lorban Feb 12, 2021
4ec51fb
abort when onError() fails
lorban Feb 15, 2021
f70a766
replace semaphore with cond variable + counter
lorban Feb 15, 2021
ace019a
Merge remote-tracking branch 'origin/jetty-10.0.x' into jetty-10.0.x-…
lorban Feb 16, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -14,20 +14,23 @@
package org.eclipse.jetty.server;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;

import org.eclipse.jetty.http.BadMessageException;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.util.thread.AutoLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Non-blocking {@link ContentProducer} implementation. Calling {@link #nextContent()} will never block
* Non-blocking {@link ContentProducer} implementation. Calling {@link ContentProducer#nextContent()} will never block
* but will return null when there is no available content.
*/
class AsyncContentProducer implements ContentProducer
{
private static final Logger LOG = LoggerFactory.getLogger(AsyncContentProducer.class);

private final AutoLock _lock = new AutoLock();
private final HttpChannel _httpChannel;
private HttpInput.Interceptor _interceptor;
private HttpInput.Content _rawContent;
Expand All @@ -41,9 +44,16 @@ class AsyncContentProducer implements ContentProducer
_httpChannel = httpChannel;
}

@Override
public AutoLock lock()
{
return _lock.lock();
}

@Override
public void recycle()
{
assertLocked();
if (LOG.isDebugEnabled())
LOG.debug("recycling {}", this);
_interceptor = null;
Expand All @@ -57,18 +67,21 @@ public void recycle()
@Override
public HttpInput.Interceptor getInterceptor()
{
assertLocked();
return _interceptor;
}

@Override
public void setInterceptor(HttpInput.Interceptor interceptor)
{
assertLocked();
this._interceptor = interceptor;
}

@Override
public int available()
{
assertLocked();
HttpInput.Content content = nextTransformedContent();
int available = content == null ? 0 : content.remaining();
if (LOG.isDebugEnabled())
Expand All @@ -79,6 +92,7 @@ public int available()
@Override
public boolean hasContent()
{
assertLocked();
boolean hasContent = _rawContent != null;
if (LOG.isDebugEnabled())
LOG.debug("hasContent = {} {}", hasContent, this);
Expand All @@ -88,6 +102,7 @@ public boolean hasContent()
@Override
public boolean isError()
{
assertLocked();
if (LOG.isDebugEnabled())
LOG.debug("isError = {} {}", _error, this);
return _error;
Expand All @@ -96,6 +111,7 @@ public boolean isError()
@Override
public void checkMinDataRate()
{
assertLocked();
long minRequestDataRate = _httpChannel.getHttpConfiguration().getMinRequestDataRate();
if (LOG.isDebugEnabled())
LOG.debug("checkMinDataRate [m={},t={}] {}", minRequestDataRate, _firstByteTimeStamp, this);
Expand Down Expand Up @@ -127,6 +143,7 @@ public void checkMinDataRate()
@Override
public long getRawContentArrived()
{
assertLocked();
if (LOG.isDebugEnabled())
LOG.debug("getRawContentArrived = {} {}", _rawContentArrived, this);
return _rawContentArrived;
Expand All @@ -135,6 +152,7 @@ public long getRawContentArrived()
@Override
public boolean consumeAll(Throwable x)
{
assertLocked();
if (LOG.isDebugEnabled())
LOG.debug("consumeAll [e={}] {}", x, this);
failCurrentContent(x);
Expand Down Expand Up @@ -177,11 +195,16 @@ private void failCurrentContent(Throwable x)
_rawContent.failed(x);
_rawContent = null;
}

HttpInput.ErrorContent errorContent = new HttpInput.ErrorContent(x);
_transformedContent = errorContent;
_rawContent = errorContent;
}

@Override
public boolean onContentProducible()
{
assertLocked();
if (LOG.isDebugEnabled())
LOG.debug("onContentProducible {}", this);
return _httpChannel.getState().onReadReady();
Expand All @@ -190,6 +213,7 @@ public boolean onContentProducible()
@Override
public HttpInput.Content nextContent()
{
assertLocked();
HttpInput.Content content = nextTransformedContent();
if (LOG.isDebugEnabled())
LOG.debug("nextContent = {} {}", content, this);
Expand All @@ -201,6 +225,7 @@ public HttpInput.Content nextContent()
@Override
public void reclaim(HttpInput.Content content)
{
assertLocked();
if (LOG.isDebugEnabled())
LOG.debug("reclaim {} {}", content, this);
if (_transformedContent == content)
Expand All @@ -215,6 +240,7 @@ public void reclaim(HttpInput.Content content)
@Override
public boolean isReady()
{
assertLocked();
HttpInput.Content content = nextTransformedContent();
if (content != null)
{
Expand Down Expand Up @@ -274,6 +300,13 @@ private HttpInput.Content nextTransformedContent()
{
// TODO does EOF need to be passed to the interceptors?

// In case the _rawContent was set by consumeAll(), check the httpChannel
// to see if it has a more precise error. Otherwise, the exact same
// special content will be returned by the httpChannel.
HttpInput.Content refreshedRawContent = produceRawContent();
if (refreshedRawContent != null)
_rawContent = refreshedRawContent;

_error = _rawContent.getError() != null;
if (LOG.isDebugEnabled())
LOG.debug("raw content is special (with error = {}), returning it {}", _error, this);
Expand Down Expand Up @@ -352,6 +385,12 @@ private HttpInput.Content produceRawContent()
return content;
}

private void assertLocked()
{
if (!_lock.isHeldByCurrentThread())
throw new IllegalStateException("ContentProducer must be called within lock scope");
}

@Override
public String toString()
{
Expand All @@ -365,4 +404,53 @@ public String toString()
_httpChannel
);
}

LockedSemaphore newLockedSemaphore()
{
return new LockedSemaphore();
}

/**
* A semaphore that assumes working under {@link AsyncContentProducer#lock()} scope.
*/
class LockedSemaphore
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a useful class. Surprised it is not already available as a common extension of Condition.

{
private final Condition _condition;
private int _permits;

private LockedSemaphore()
{
this._condition = _lock.newCondition();
}

void assertLocked()
{
if (!_lock.isHeldByCurrentThread())
throw new IllegalStateException("LockedSemaphore must be called within lock scope");
}

void drainPermits()
{
_permits = 0;
}

void acquire() throws InterruptedException
{
while (_permits == 0)
_condition.await();
_permits--;
}

void release()
{
_permits++;
_condition.signal();
}

@Override
public String toString()
{
return getClass().getSimpleName() + " permits=" + _permits;
}
}
}
Expand Up @@ -13,25 +13,31 @@

package org.eclipse.jetty.server;

import java.util.concurrent.Semaphore;

import org.eclipse.jetty.util.thread.AutoLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Blocking implementation of {@link ContentProducer}. Calling {@link #nextContent()} will block when
* Blocking implementation of {@link ContentProducer}. Calling {@link ContentProducer#nextContent()} will block when
* there is no available content but will never return null.
*/
class BlockingContentProducer implements ContentProducer
{
private static final Logger LOG = LoggerFactory.getLogger(BlockingContentProducer.class);

private final Semaphore _semaphore = new Semaphore(0);
private final AsyncContentProducer _asyncContentProducer;
private final AsyncContentProducer.LockedSemaphore _semaphore;

BlockingContentProducer(AsyncContentProducer delegate)
{
_asyncContentProducer = delegate;
_semaphore = _asyncContentProducer.newLockedSemaphore();
}

@Override
public AutoLock lock()
{
return _asyncContentProducer.lock();
}

@Override
Expand Down Expand Up @@ -76,7 +82,9 @@ public long getRawContentArrived()
@Override
public boolean consumeAll(Throwable x)
{
return _asyncContentProducer.consumeAll(x);
boolean eof = _asyncContentProducer.consumeAll(x);
_semaphore.release();
return eof;
}

@Override
Expand Down Expand Up @@ -142,6 +150,7 @@ public void setInterceptor(HttpInput.Interceptor interceptor)
@Override
public boolean onContentProducible()
{
_semaphore.assertLocked();
// In blocking mode, the dispatched thread normally does not have to be rescheduled as it is normally in state
// DISPATCHED blocked on the semaphore that just needs to be released for the dispatched thread to resume. This is why
// this method always returns false.
Expand Down
Expand Up @@ -13,6 +13,8 @@

package org.eclipse.jetty.server;

import org.eclipse.jetty.util.thread.AutoLock;

/**
* ContentProducer is the bridge between {@link HttpInput} and {@link HttpChannel}.
* It wraps a {@link HttpChannel} and uses the {@link HttpChannel#needContent()},
Expand All @@ -24,6 +26,13 @@
*/
public interface ContentProducer
{
/**
* Lock this instance. The lock must be held before any method of this instance's
* method be called, and must be manually released afterward.
* @return the lock that is guarding this instance.
*/
AutoLock lock();

/**
* Reset all internal state and clear any held resources.
*/
Expand Down
Expand Up @@ -701,9 +701,20 @@ else if (noStack != null)
}

if (isCommitted())
{
abort(failure);
}
else
_state.onError(failure);
{
try
{
_state.onError(failure);
}
catch (IllegalStateException e)
{
abort(failure);
}
}
}

/**
Expand Down
Expand Up @@ -309,19 +309,24 @@ else if (filled < 0)
}

/**
* Parse and fill data, looking for content
* Parse and fill data, looking for content.
* We do parse first, and only fill if we're out of bytes to avoid unnecessary system calls.
*/
void parseAndFillForContent()
{
// When fillRequestBuffer() is called, it must always be followed by a parseRequestBuffer() call otherwise this method
// doesn't trigger EOF/earlyEOF which breaks AsyncRequestReadTest.testPartialReadThenShutdown()
int filled = Integer.MAX_VALUE;
// doesn't trigger EOF/earlyEOF which breaks AsyncRequestReadTest.testPartialReadThenShutdown().

// This loop was designed by a committee and voted by a majority.
while (_parser.inContentState())
{
boolean handled = parseRequestBuffer();
if (handled || filled <= 0)
if (parseRequestBuffer())
break;
// Re-check the parser state after parsing to avoid filling,
// otherwise fillRequestBuffer() would acquire a ByteBuffer
// that may be leaked.
if (_parser.inContentState() && fillRequestBuffer() <= 0)
break;
filled = fillRequestBuffer();
}
}

Expand Down Expand Up @@ -412,9 +417,21 @@ private boolean upgrade()
@Override
public void onCompleted()
{
// Handle connection upgrades.
if (upgrade())
return;
// If we are fill interested, then a read is pending and we must abort
if (isFillInterested())
{
LOG.warn("Pending read in onCompleted {} {}", this, getEndPoint());
abort(new IllegalStateException());
}
else
{
// Handle connection upgrades.
if (upgrade())
return;
}

// Drive to EOF, EarlyEOF or Error
boolean complete = _input.consumeAll();

// Finish consuming the request
// If we are still expecting
Expand All @@ -424,7 +441,7 @@ public void onCompleted()
_parser.close();
}
// else abort if we can't consume all
else if (_generator.isPersistent() && !_input.consumeAll())
else if (_generator.isPersistent() && !complete)
{
if (LOG.isDebugEnabled())
LOG.debug("unconsumed input {} {}", this, _parser);
Expand Down