diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/AsyncContentProducer.java b/jetty-server/src/main/java/org/eclipse/jetty/server/AsyncContentProducer.java index e2864950bfb6..72202727e9fa 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/AsyncContentProducer.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/AsyncContentProducer.java @@ -14,20 +14,23 @@ package org.eclipse.jetty.server; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; import org.eclipse.jetty.http.BadMessageException; import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.util.thread.AutoLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Non-blocking {@link ContentProducer} implementation. Calling {@link #nextContent()} will never block + * Non-blocking {@link ContentProducer} implementation. Calling {@link ContentProducer#nextContent()} will never block * but will return null when there is no available content. */ class AsyncContentProducer implements ContentProducer { private static final Logger LOG = LoggerFactory.getLogger(AsyncContentProducer.class); + private final AutoLock _lock = new AutoLock(); private final HttpChannel _httpChannel; private HttpInput.Interceptor _interceptor; private HttpInput.Content _rawContent; @@ -41,9 +44,16 @@ class AsyncContentProducer implements ContentProducer _httpChannel = httpChannel; } + @Override + public AutoLock lock() + { + return _lock.lock(); + } + @Override public void recycle() { + assertLocked(); if (LOG.isDebugEnabled()) LOG.debug("recycling {}", this); _interceptor = null; @@ -57,18 +67,21 @@ public void recycle() @Override public HttpInput.Interceptor getInterceptor() { + assertLocked(); return _interceptor; } @Override public void setInterceptor(HttpInput.Interceptor interceptor) { + assertLocked(); this._interceptor = interceptor; } @Override public int available() { + assertLocked(); HttpInput.Content content = nextTransformedContent(); int available = content == null ? 0 : content.remaining(); if (LOG.isDebugEnabled()) @@ -79,6 +92,7 @@ public int available() @Override public boolean hasContent() { + assertLocked(); boolean hasContent = _rawContent != null; if (LOG.isDebugEnabled()) LOG.debug("hasContent = {} {}", hasContent, this); @@ -88,6 +102,7 @@ public boolean hasContent() @Override public boolean isError() { + assertLocked(); if (LOG.isDebugEnabled()) LOG.debug("isError = {} {}", _error, this); return _error; @@ -96,6 +111,7 @@ public boolean isError() @Override public void checkMinDataRate() { + assertLocked(); long minRequestDataRate = _httpChannel.getHttpConfiguration().getMinRequestDataRate(); if (LOG.isDebugEnabled()) LOG.debug("checkMinDataRate [m={},t={}] {}", minRequestDataRate, _firstByteTimeStamp, this); @@ -127,6 +143,7 @@ public void checkMinDataRate() @Override public long getRawContentArrived() { + assertLocked(); if (LOG.isDebugEnabled()) LOG.debug("getRawContentArrived = {} {}", _rawContentArrived, this); return _rawContentArrived; @@ -135,6 +152,7 @@ public long getRawContentArrived() @Override public boolean consumeAll(Throwable x) { + assertLocked(); if (LOG.isDebugEnabled()) LOG.debug("consumeAll [e={}] {}", x, this); failCurrentContent(x); @@ -177,11 +195,16 @@ private void failCurrentContent(Throwable x) _rawContent.failed(x); _rawContent = null; } + + HttpInput.ErrorContent errorContent = new HttpInput.ErrorContent(x); + _transformedContent = errorContent; + _rawContent = errorContent; } @Override public boolean onContentProducible() { + assertLocked(); if (LOG.isDebugEnabled()) LOG.debug("onContentProducible {}", this); return _httpChannel.getState().onReadReady(); @@ -190,6 +213,7 @@ public boolean onContentProducible() @Override public HttpInput.Content nextContent() { + assertLocked(); HttpInput.Content content = nextTransformedContent(); if (LOG.isDebugEnabled()) LOG.debug("nextContent = {} {}", content, this); @@ -201,6 +225,7 @@ public HttpInput.Content nextContent() @Override public void reclaim(HttpInput.Content content) { + assertLocked(); if (LOG.isDebugEnabled()) LOG.debug("reclaim {} {}", content, this); if (_transformedContent == content) @@ -215,6 +240,7 @@ public void reclaim(HttpInput.Content content) @Override public boolean isReady() { + assertLocked(); HttpInput.Content content = nextTransformedContent(); if (content != null) { @@ -274,6 +300,13 @@ private HttpInput.Content nextTransformedContent() { // TODO does EOF need to be passed to the interceptors? + // In case the _rawContent was set by consumeAll(), check the httpChannel + // to see if it has a more precise error. Otherwise, the exact same + // special content will be returned by the httpChannel. + HttpInput.Content refreshedRawContent = produceRawContent(); + if (refreshedRawContent != null) + _rawContent = refreshedRawContent; + _error = _rawContent.getError() != null; if (LOG.isDebugEnabled()) LOG.debug("raw content is special (with error = {}), returning it {}", _error, this); @@ -352,6 +385,12 @@ private HttpInput.Content produceRawContent() return content; } + private void assertLocked() + { + if (!_lock.isHeldByCurrentThread()) + throw new IllegalStateException("ContentProducer must be called within lock scope"); + } + @Override public String toString() { @@ -365,4 +404,53 @@ public String toString() _httpChannel ); } + + LockedSemaphore newLockedSemaphore() + { + return new LockedSemaphore(); + } + + /** + * A semaphore that assumes working under {@link AsyncContentProducer#lock()} scope. + */ + class LockedSemaphore + { + private final Condition _condition; + private int _permits; + + private LockedSemaphore() + { + this._condition = _lock.newCondition(); + } + + void assertLocked() + { + if (!_lock.isHeldByCurrentThread()) + throw new IllegalStateException("LockedSemaphore must be called within lock scope"); + } + + void drainPermits() + { + _permits = 0; + } + + void acquire() throws InterruptedException + { + while (_permits == 0) + _condition.await(); + _permits--; + } + + void release() + { + _permits++; + _condition.signal(); + } + + @Override + public String toString() + { + return getClass().getSimpleName() + " permits=" + _permits; + } + } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/BlockingContentProducer.java b/jetty-server/src/main/java/org/eclipse/jetty/server/BlockingContentProducer.java index 186eb48dc8a1..c525de21d4e2 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/BlockingContentProducer.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/BlockingContentProducer.java @@ -13,25 +13,31 @@ package org.eclipse.jetty.server; -import java.util.concurrent.Semaphore; - +import org.eclipse.jetty.util.thread.AutoLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Blocking implementation of {@link ContentProducer}. Calling {@link #nextContent()} will block when + * Blocking implementation of {@link ContentProducer}. Calling {@link ContentProducer#nextContent()} will block when * there is no available content but will never return null. */ class BlockingContentProducer implements ContentProducer { private static final Logger LOG = LoggerFactory.getLogger(BlockingContentProducer.class); - private final Semaphore _semaphore = new Semaphore(0); private final AsyncContentProducer _asyncContentProducer; + private final AsyncContentProducer.LockedSemaphore _semaphore; BlockingContentProducer(AsyncContentProducer delegate) { _asyncContentProducer = delegate; + _semaphore = _asyncContentProducer.newLockedSemaphore(); + } + + @Override + public AutoLock lock() + { + return _asyncContentProducer.lock(); } @Override @@ -76,7 +82,9 @@ public long getRawContentArrived() @Override public boolean consumeAll(Throwable x) { - return _asyncContentProducer.consumeAll(x); + boolean eof = _asyncContentProducer.consumeAll(x); + _semaphore.release(); + return eof; } @Override @@ -142,6 +150,7 @@ public void setInterceptor(HttpInput.Interceptor interceptor) @Override public boolean onContentProducible() { + _semaphore.assertLocked(); // In blocking mode, the dispatched thread normally does not have to be rescheduled as it is normally in state // DISPATCHED blocked on the semaphore that just needs to be released for the dispatched thread to resume. This is why // this method always returns false. diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/ContentProducer.java b/jetty-server/src/main/java/org/eclipse/jetty/server/ContentProducer.java index 076619e816da..533a041ca1ad 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/ContentProducer.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/ContentProducer.java @@ -13,6 +13,8 @@ package org.eclipse.jetty.server; +import org.eclipse.jetty.util.thread.AutoLock; + /** * ContentProducer is the bridge between {@link HttpInput} and {@link HttpChannel}. * It wraps a {@link HttpChannel} and uses the {@link HttpChannel#needContent()}, @@ -24,6 +26,13 @@ */ public interface ContentProducer { + /** + * Lock this instance. The lock must be held before any method of this instance's + * method be called, and must be manually released afterward. + * @return the lock that is guarding this instance. + */ + AutoLock lock(); + /** * Reset all internal state and clear any held resources. */ diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java index 4810a3aa025d..4f4b86fb1b7a 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java @@ -701,9 +701,20 @@ else if (noStack != null) } if (isCommitted()) + { abort(failure); + } else - _state.onError(failure); + { + try + { + _state.onError(failure); + } + catch (IllegalStateException e) + { + abort(failure); + } + } } /** diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index b149476b64f1..b4e0f2363e65 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -309,19 +309,24 @@ else if (filled < 0) } /** - * Parse and fill data, looking for content + * Parse and fill data, looking for content. + * We do parse first, and only fill if we're out of bytes to avoid unnecessary system calls. */ void parseAndFillForContent() { // When fillRequestBuffer() is called, it must always be followed by a parseRequestBuffer() call otherwise this method - // doesn't trigger EOF/earlyEOF which breaks AsyncRequestReadTest.testPartialReadThenShutdown() - int filled = Integer.MAX_VALUE; + // doesn't trigger EOF/earlyEOF which breaks AsyncRequestReadTest.testPartialReadThenShutdown(). + + // This loop was designed by a committee and voted by a majority. while (_parser.inContentState()) { - boolean handled = parseRequestBuffer(); - if (handled || filled <= 0) + if (parseRequestBuffer()) + break; + // Re-check the parser state after parsing to avoid filling, + // otherwise fillRequestBuffer() would acquire a ByteBuffer + // that may be leaked. + if (_parser.inContentState() && fillRequestBuffer() <= 0) break; - filled = fillRequestBuffer(); } } @@ -412,9 +417,21 @@ private boolean upgrade() @Override public void onCompleted() { - // Handle connection upgrades. - if (upgrade()) - return; + // If we are fill interested, then a read is pending and we must abort + if (isFillInterested()) + { + LOG.warn("Pending read in onCompleted {} {}", this, getEndPoint()); + abort(new IllegalStateException()); + } + else + { + // Handle connection upgrades. + if (upgrade()) + return; + } + + // Drive to EOF, EarlyEOF or Error + boolean complete = _input.consumeAll(); // Finish consuming the request // If we are still expecting @@ -424,7 +441,7 @@ public void onCompleted() _parser.close(); } // else abort if we can't consume all - else if (_generator.isPersistent() && !_input.consumeAll()) + else if (_generator.isPersistent() && !complete) { if (LOG.isDebugEnabled()) LOG.debug("unconsumed input {} {}", this, _parser); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java index a41ad47fe1f7..d7eee8efe7b3 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java @@ -16,6 +16,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Objects; +import java.util.concurrent.atomic.LongAdder; import javax.servlet.ReadListener; import javax.servlet.ServletInputStream; @@ -23,6 +24,7 @@ import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.component.Destroyable; +import org.eclipse.jetty.util.thread.AutoLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,10 +40,10 @@ public class HttpInput extends ServletInputStream implements Runnable private final BlockingContentProducer _blockingContentProducer; private final AsyncContentProducer _asyncContentProducer; private final HttpChannelState _channelState; - private ContentProducer _contentProducer; - private boolean _consumedEof; - private ReadListener _readListener; - private long _contentConsumed; + private final LongAdder _contentConsumed = new LongAdder(); + private volatile ContentProducer _contentProducer; + private volatile boolean _consumedEof; + private volatile ReadListener _readListener; public HttpInput(HttpChannelState state) { @@ -55,11 +57,20 @@ public void recycle() { if (LOG.isDebugEnabled()) LOG.debug("recycle {}", this); - _blockingContentProducer.recycle(); - _contentProducer = _blockingContentProducer; - _consumedEof = false; - _readListener = null; - _contentConsumed = 0; + } + + public void reopen() + { + try (AutoLock lock = _contentProducer.lock()) + { + if (LOG.isDebugEnabled()) + LOG.debug("reopen {}", this); + _blockingContentProducer.recycle(); + _contentProducer = _blockingContentProducer; + _consumedEof = false; + _readListener = null; + _contentConsumed.reset(); + } } /** @@ -67,7 +78,10 @@ public void recycle() */ public Interceptor getInterceptor() { - return _contentProducer.getInterceptor(); + try (AutoLock lock = _contentProducer.lock()) + { + return _contentProducer.getInterceptor(); + } } /** @@ -77,9 +91,12 @@ public Interceptor getInterceptor() */ public void setInterceptor(Interceptor interceptor) { - if (LOG.isDebugEnabled()) - LOG.debug("setting interceptor to {} on {}", interceptor, this); - _contentProducer.setInterceptor(interceptor); + try (AutoLock lock = _contentProducer.lock()) + { + if (LOG.isDebugEnabled()) + LOG.debug("setting interceptor to {} on {}", interceptor, this); + _contentProducer.setInterceptor(interceptor); + } } /** @@ -90,60 +107,72 @@ public void setInterceptor(Interceptor interceptor) */ public void addInterceptor(Interceptor interceptor) { - Interceptor currentInterceptor = _contentProducer.getInterceptor(); - if (currentInterceptor == null) - { - if (LOG.isDebugEnabled()) - LOG.debug("adding single interceptor: {} on {}", interceptor, this); - _contentProducer.setInterceptor(interceptor); - } - else + try (AutoLock lock = _contentProducer.lock()) { - ChainedInterceptor chainedInterceptor = new ChainedInterceptor(currentInterceptor, interceptor); - if (LOG.isDebugEnabled()) - LOG.debug("adding chained interceptor: {} on {}", chainedInterceptor, this); - _contentProducer.setInterceptor(chainedInterceptor); + Interceptor currentInterceptor = _contentProducer.getInterceptor(); + if (currentInterceptor == null) + { + if (LOG.isDebugEnabled()) + LOG.debug("adding single interceptor: {} on {}", interceptor, this); + _contentProducer.setInterceptor(interceptor); + } + else + { + ChainedInterceptor chainedInterceptor = new ChainedInterceptor(currentInterceptor, interceptor); + if (LOG.isDebugEnabled()) + LOG.debug("adding chained interceptor: {} on {}", chainedInterceptor, this); + _contentProducer.setInterceptor(chainedInterceptor); + } } } - public int get(Content content, byte[] bytes, int offset, int length) + private int get(Content content, byte[] bytes, int offset, int length) { int consumed = content.get(bytes, offset, length); - _contentConsumed += consumed; + _contentConsumed.add(consumed); return consumed; } public long getContentConsumed() { - return _contentConsumed; + return _contentConsumed.sum(); } public long getContentReceived() { - return _contentProducer.getRawContentArrived(); + try (AutoLock lock = _contentProducer.lock()) + { + return _contentProducer.getRawContentArrived(); + } } public boolean consumeAll() { - IOException failure = new IOException("Unconsumed content"); - if (LOG.isDebugEnabled()) - LOG.debug("consumeAll {}", this, failure); - boolean atEof = _contentProducer.consumeAll(failure); - if (atEof) - _consumedEof = true; + try (AutoLock lock = _contentProducer.lock()) + { + IOException failure = new IOException("Unconsumed content"); + if (LOG.isDebugEnabled()) + LOG.debug("consumeAll {}", this, failure); + boolean atEof = _contentProducer.consumeAll(failure); + if (atEof) + _consumedEof = true; - if (isFinished()) - return !isError(); + if (isFinished()) + return !isError(); - return false; + return false; + } } public boolean isError() { - boolean error = _contentProducer.isError(); - if (LOG.isDebugEnabled()) - LOG.debug("isError={} {}", error, this); - return error; + try (AutoLock lock = _contentProducer.lock()) + { + boolean error = _contentProducer.isError(); + if (LOG.isDebugEnabled()) + LOG.debug("isError={} {}", error, this); + return error; + } } public boolean isAsync() @@ -167,10 +196,13 @@ public boolean isFinished() @Override public boolean isReady() { - boolean ready = _contentProducer.isReady(); - if (LOG.isDebugEnabled()) - LOG.debug("isReady={} {}", ready, this); - return ready; + try (AutoLock lock = _contentProducer.lock()) + { + boolean ready = _contentProducer.isReady(); + if (LOG.isDebugEnabled()) + LOG.debug("isReady={} {}", ready, this); + return ready; + } } @Override @@ -180,10 +212,10 @@ public void setReadListener(ReadListener readListener) LOG.debug("setting read listener to {} {}", readListener, this); if (_readListener != null) throw new IllegalStateException("ReadListener already set"); - _readListener = Objects.requireNonNull(readListener); //illegal if async not started if (!_channelState.isAsyncStarted()) throw new IllegalStateException("Async not started"); + _readListener = Objects.requireNonNull(readListener); _contentProducer = _asyncContentProducer; // trigger content production @@ -193,59 +225,68 @@ public void setReadListener(ReadListener readListener) public boolean onContentProducible() { - return _contentProducer.onContentProducible(); + try (AutoLock lock = _contentProducer.lock()) + { + return _contentProducer.onContentProducible(); + } } @Override public int read() throws IOException { - int read = read(_oneByteBuffer, 0, 1); - if (read == 0) - throw new IOException("unready read=0"); - return read < 0 ? -1 : _oneByteBuffer[0] & 0xFF; + try (AutoLock lock = _contentProducer.lock()) + { + int read = read(_oneByteBuffer, 0, 1); + if (read == 0) + throw new IOException("unready read=0"); + return read < 0 ? -1 : _oneByteBuffer[0] & 0xFF; + } } @Override public int read(byte[] b, int off, int len) throws IOException { - // Calculate minimum request rate for DoS protection - _contentProducer.checkMinDataRate(); - - Content content = _contentProducer.nextContent(); - if (content == null) - throw new IllegalStateException("read on unready input"); - if (!content.isSpecial()) + try (AutoLock lock = _contentProducer.lock()) { - int read = get(content, b, off, len); - if (LOG.isDebugEnabled()) - LOG.debug("read produced {} byte(s) {}", read, this); - if (content.isEmpty()) - _contentProducer.reclaim(content); - return read; - } + // Calculate minimum request rate for DoS protection + _contentProducer.checkMinDataRate(); - Throwable error = content.getError(); - if (LOG.isDebugEnabled()) - LOG.debug("read error={} {}", error, this); - if (error != null) - { - if (error instanceof IOException) - throw (IOException)error; - throw new IOException(error); - } + Content content = _contentProducer.nextContent(); + if (content == null) + throw new IllegalStateException("read on unready input"); + if (!content.isSpecial()) + { + int read = get(content, b, off, len); + if (LOG.isDebugEnabled()) + LOG.debug("read produced {} byte(s) {}", read, this); + if (content.isEmpty()) + _contentProducer.reclaim(content); + return read; + } - if (content.isEof()) - { + Throwable error = content.getError(); if (LOG.isDebugEnabled()) - LOG.debug("read at EOF, setting consumed EOF to true {}", this); - _consumedEof = true; - // If EOF do we need to wake for allDataRead callback? - if (onContentProducible()) - scheduleReadListenerNotification(); - return -1; - } + LOG.debug("read error={} {}", error, this); + if (error != null) + { + if (error instanceof IOException) + throw (IOException)error; + throw new IOException(error); + } + + if (content.isEof()) + { + if (LOG.isDebugEnabled()) + LOG.debug("read at EOF, setting consumed EOF to true {}", this); + _consumedEof = true; + // If EOF do we need to wake for allDataRead callback? + if (onContentProducible()) + scheduleReadListenerNotification(); + return -1; + } - throw new AssertionError("no data, no error and not EOF"); + throw new AssertionError("no data, no error and not EOF"); + } } private void scheduleReadListenerNotification() @@ -261,21 +302,27 @@ private void scheduleReadListenerNotification() */ public boolean hasContent() { - // Do not call _contentProducer.available() as it calls HttpChannel.produceContent() - // which is forbidden by this method's contract. - boolean hasContent = _contentProducer.hasContent(); - if (LOG.isDebugEnabled()) - LOG.debug("hasContent={} {}", hasContent, this); - return hasContent; + try (AutoLock lock = _contentProducer.lock()) + { + // Do not call _contentProducer.available() as it calls HttpChannel.produceContent() + // which is forbidden by this method's contract. + boolean hasContent = _contentProducer.hasContent(); + if (LOG.isDebugEnabled()) + LOG.debug("hasContent={} {}", hasContent, this); + return hasContent; + } } @Override public int available() { - int available = _contentProducer.available(); - if (LOG.isDebugEnabled()) - LOG.debug("available={} {}", available, this); - return available; + try (AutoLock lock = _contentProducer.lock()) + { + int available = _contentProducer.available(); + if (LOG.isDebugEnabled()) + LOG.debug("available={} {}", available, this); + return available; + } } /* Runnable */ @@ -287,16 +334,20 @@ public int available() @Override public void run() { - // Call isReady() to make sure that if not ready we register for fill interest. - if (!_contentProducer.isReady()) + Content content; + try (AutoLock lock = _contentProducer.lock()) { + // Call isReady() to make sure that if not ready we register for fill interest. + if (!_contentProducer.isReady()) + { + if (LOG.isDebugEnabled()) + LOG.debug("running but not ready {}", this); + return; + } + content = _contentProducer.nextContent(); if (LOG.isDebugEnabled()) - LOG.debug("running but not ready {}", this); - return; + LOG.debug("running on content {} {}", content, this); } - Content content = _contentProducer.nextContent(); - if (LOG.isDebugEnabled()) - LOG.debug("running on content {} {}", content, this); // This check is needed when a request is started async but no read listener is registered. if (_readListener == null) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java index c3b7815d91da..7baeffa84420 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java @@ -23,6 +23,8 @@ import java.nio.charset.CharsetEncoder; import java.nio.charset.CoderResult; import java.nio.charset.CodingErrorAction; +import java.util.ResourceBundle; +import java.util.concurrent.CancellationException; import java.util.concurrent.TimeUnit; import javax.servlet.RequestDispatcher; import javax.servlet.ServletOutputStream; @@ -61,7 +63,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable enum State { OPEN, // Open - CLOSE, // Close needed from onWriteCompletion + CLOSE, // Close needed from onWriteComplete CLOSING, // Close in progress after close API called CLOSED // Closed } @@ -292,7 +294,7 @@ else if (_state == State.CLOSE) { // Somebody called close or complete while we were writing. // We can now send a (probably empty) last buffer and then when it completes - // onWriteCompletion will be called again to actually execute the _completeCallback + // onWriteComplete will be called again to actually execute the _completeCallback _state = State.CLOSING; closeContent = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER; } @@ -395,53 +397,87 @@ public void complete(Callback callback) ByteBuffer content = null; try (AutoLock l = _channelState.lock()) { - switch (_state) + // First check the API state for any unrecoverable situations + switch (_apiState) { - case CLOSED: - succeeded = true; + case UNREADY: // isReady() has returned false so a call to onWritePossible may happen at any time + error = new CancellationException("Completed whilst write unready"); break; - case CLOSE: - case CLOSING: - _closedCallback = Callback.combine(_closedCallback, callback); + case PENDING: // an async write is pending and may complete at any time + // If this is not the last write, then we must abort + if (!_channel.getResponse().isContentComplete(_written)) + error = new CancellationException("Completed whilst write pending"); break; - case OPEN: - if (_onError != null) - { - error = _onError; - break; - } + case BLOCKED: // another thread is blocked in a write or a close + error = new CancellationException("Completed whilst write blocked"); + break; - _closedCallback = Callback.combine(_closedCallback, callback); + default: + break; + } - switch (_apiState) - { - case BLOCKING: - // Output is idle blocking state, but we still do an async close - _apiState = ApiState.BLOCKED; - _state = State.CLOSING; - content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER; - break; + // If we can't complete due to the API state, then abort + if (error != null) + { + _channel.abort(error); + _writeBlocker.fail(error); + _state = State.CLOSED; + } + else + { + // Otherwise check the output state to determine how to complete + switch (_state) + { + case CLOSED: + succeeded = true; + break; - case ASYNC: - case READY: - // Output is idle in async state, so we can do an async close - _apiState = ApiState.PENDING; - _state = State.CLOSING; - content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER; - break; + case CLOSE: + case CLOSING: + _closedCallback = Callback.combine(_closedCallback, callback); + break; - case BLOCKED: - case UNREADY: - case PENDING: - // An operation is in progress, so we soft close now - _softClose = true; - // then trigger a close from onWriteComplete - _state = State.CLOSE; + case OPEN: + if (_onError != null) + { + error = _onError; break; - } - break; + } + + _closedCallback = Callback.combine(_closedCallback, callback); + + switch (_apiState) + { + case BLOCKING: + // Output is idle blocking state, but we still do an async close + _apiState = ApiState.BLOCKED; + _state = State.CLOSING; + content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER; + break; + + case ASYNC: + case READY: + // Output is idle in async state, so we can do an async close + _apiState = ApiState.PENDING; + _state = State.CLOSING; + content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER; + break; + + case UNREADY: + case PENDING: + // An operation is in progress, so we soft close now + _softClose = true; + // then trigger a close from onWriteComplete + _state = State.CLOSE; + break; + + default: + throw new IllegalStateException(); + } + break; + } } } @@ -1351,7 +1387,7 @@ public void recycle() { _state = State.OPEN; _apiState = ApiState.BLOCKING; - _softClose = false; + _softClose = true; // Stay closed until next request _interceptor = _channel; HttpConfiguration config = _channel.getHttpConfiguration(); _bufferSize = config.getOutputBufferSize(); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java index 67b9457ea9f8..a2ff1628bcad 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java @@ -1681,6 +1681,11 @@ public boolean isUserInRole(String role) */ public void setMetaData(MetaData.Request request) { + if (_metaData == null && _input != null && _channel != null) + { + _input.reopen(); + _channel.getResponse().getHttpOutput().reopen(); + } _metaData = request; _method = request.getMethod(); _httpFields = request.getFields(); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/gzip/GzipHttpOutputInterceptor.java b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/gzip/GzipHttpOutputInterceptor.java index fe5b816db325..be33871a9891 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/gzip/GzipHttpOutputInterceptor.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/gzip/GzipHttpOutputInterceptor.java @@ -273,8 +273,11 @@ public GzipBufferCB(ByteBuffer content, boolean complete, Callback callback) @Override protected void onCompleteFailure(Throwable x) { - _deflaterEntry.release(); - _deflaterEntry = null; + if (_deflaterEntry != null) + { + _deflaterEntry.release(); + _deflaterEntry = null; + } super.onCompleteFailure(x); } diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncContentProducerTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncContentProducerTest.java index 19bd83da37b6..047a53827551 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncContentProducerTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncContentProducerTest.java @@ -28,8 +28,8 @@ import org.eclipse.jetty.io.ArrayByteBufferPool; import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.server.handler.gzip.GzipHttpInputInterceptor; -import org.eclipse.jetty.util.compression.CompressionPool; import org.eclipse.jetty.util.compression.InflaterPool; +import org.eclipse.jetty.util.thread.AutoLock; import org.hamcrest.core.Is; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -72,8 +72,11 @@ public void testAsyncContentProducerNoInterceptor() throws Exception ContentProducer contentProducer = new AsyncContentProducer(new ArrayDelayedHttpChannel(buffers, new HttpInput.EofContent(), scheduledExecutorService, barrier)); - Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, contentProducer, (buffers.length + 1) * 2, 0, 4, barrier); - assertThat(error, nullValue()); + try (AutoLock lock = contentProducer.lock()) + { + Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, contentProducer, (buffers.length + 1) * 2, 0, 4, barrier); + assertThat(error, nullValue()); + } } @Test @@ -91,8 +94,11 @@ public void testAsyncContentProducerNoInterceptorWithError() throws Exception ContentProducer contentProducer = new AsyncContentProducer(new ArrayDelayedHttpChannel(buffers, new HttpInput.ErrorContent(expectedError), scheduledExecutorService, barrier)); - Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, contentProducer, (buffers.length + 1) * 2, 0, 4, barrier); - assertThat(error, Is.is(expectedError)); + try (AutoLock lock = contentProducer.lock()) + { + Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, contentProducer, (buffers.length + 1) * 2, 0, 4, barrier); + assertThat(error, Is.is(expectedError)); + } } @Test @@ -113,10 +119,13 @@ public void testAsyncContentProducerGzipInterceptor() throws Exception CyclicBarrier barrier = new CyclicBarrier(2); ContentProducer contentProducer = new AsyncContentProducer(new ArrayDelayedHttpChannel(buffers, new HttpInput.EofContent(), scheduledExecutorService, barrier)); - contentProducer.setInterceptor(new GzipHttpInputInterceptor(inflaterPool, new ArrayByteBufferPool(1, 1, 2), 32)); + try (AutoLock lock = contentProducer.lock()) + { + contentProducer.setInterceptor(new GzipHttpInputInterceptor(inflaterPool, new ArrayByteBufferPool(1, 1, 2), 32)); - Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, contentProducer, (buffers.length + 1) * 2, 0, 4, barrier); - assertThat(error, nullValue()); + Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, contentProducer, (buffers.length + 1) * 2, 0, 4, barrier); + assertThat(error, nullValue()); + } } @Test @@ -137,10 +146,13 @@ public void testAsyncContentProducerGzipInterceptorWithTinyBuffers() throws Exce CyclicBarrier barrier = new CyclicBarrier(2); ContentProducer contentProducer = new AsyncContentProducer(new ArrayDelayedHttpChannel(buffers, new HttpInput.EofContent(), scheduledExecutorService, barrier)); - contentProducer.setInterceptor(new GzipHttpInputInterceptor(inflaterPool, new ArrayByteBufferPool(1, 1, 2), 1)); + try (AutoLock lock = contentProducer.lock()) + { + contentProducer.setInterceptor(new GzipHttpInputInterceptor(inflaterPool, new ArrayByteBufferPool(1, 1, 2), 1)); - Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, contentProducer, totalContentBytesCount + buffers.length + 2, 25, 4, barrier); - assertThat(error, nullValue()); + Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, contentProducer, totalContentBytesCount + buffers.length + 2, 25, 4, barrier); + assertThat(error, nullValue()); + } } @Test @@ -162,10 +174,13 @@ public void testBlockingContentProducerGzipInterceptorWithError() throws Excepti CyclicBarrier barrier = new CyclicBarrier(2); ContentProducer contentProducer = new AsyncContentProducer(new ArrayDelayedHttpChannel(buffers, new HttpInput.ErrorContent(expectedError), scheduledExecutorService, barrier)); - contentProducer.setInterceptor(new GzipHttpInputInterceptor(inflaterPool, new ArrayByteBufferPool(1, 1, 2), 32)); + try (AutoLock lock = contentProducer.lock()) + { + contentProducer.setInterceptor(new GzipHttpInputInterceptor(inflaterPool, new ArrayByteBufferPool(1, 1, 2), 32)); - Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, contentProducer, (buffers.length + 1) * 2, 0, 4, barrier); - assertThat(error, Is.is(expectedError)); + Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, contentProducer, (buffers.length + 1) * 2, 0, 4, barrier); + assertThat(error, Is.is(expectedError)); + } } private Throwable readAndAssertContent(int totalContentBytesCount, String originalContentString, ContentProducer contentProducer, int totalContentCount, int readyCount, int notReadyCount, CyclicBarrier barrier) throws InterruptedException, BrokenBarrierException, TimeoutException diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/BlockingContentProducerTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/BlockingContentProducerTest.java index 8755dc8c0e73..8cc7ca8b523b 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/BlockingContentProducerTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/BlockingContentProducerTest.java @@ -20,13 +20,13 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import java.util.zip.GZIPOutputStream; import org.eclipse.jetty.io.ArrayByteBufferPool; import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.server.handler.gzip.GzipHttpInputInterceptor; import org.eclipse.jetty.util.compression.InflaterPool; +import org.eclipse.jetty.util.thread.AutoLock; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -63,13 +63,16 @@ public void testBlockingContentProducerNoInterceptor() final int totalContentBytesCount = countRemaining(buffers); final String originalContentString = asString(buffers); - AtomicReference ref = new AtomicReference<>(); - ArrayDelayedHttpChannel httpChannel = new ArrayDelayedHttpChannel(buffers, new HttpInput.EofContent(), scheduledExecutorService, () -> ref.get().onContentProducible()); + ContentListener contentListener = new ContentListener(); + ArrayDelayedHttpChannel httpChannel = new ArrayDelayedHttpChannel(buffers, new HttpInput.EofContent(), scheduledExecutorService, contentListener); ContentProducer contentProducer = new BlockingContentProducer(new AsyncContentProducer(httpChannel)); - ref.set(contentProducer); + contentListener.setContentProducer(contentProducer); - Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, buffers.length + 1, contentProducer); - assertThat(error, nullValue()); + try (AutoLock lock = contentProducer.lock()) + { + Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, buffers.length + 1, contentProducer); + assertThat(error, nullValue()); + } } @Test @@ -83,13 +86,16 @@ public void testBlockingContentProducerNoInterceptorWithError() final String originalContentString = asString(buffers); final Throwable expectedError = new EofException("Early EOF"); - AtomicReference ref = new AtomicReference<>(); - ArrayDelayedHttpChannel httpChannel = new ArrayDelayedHttpChannel(buffers, new HttpInput.ErrorContent(expectedError), scheduledExecutorService, () -> ref.get().onContentProducible()); + ContentListener contentListener = new ContentListener(); + ArrayDelayedHttpChannel httpChannel = new ArrayDelayedHttpChannel(buffers, new HttpInput.ErrorContent(expectedError), scheduledExecutorService, contentListener); ContentProducer contentProducer = new BlockingContentProducer(new AsyncContentProducer(httpChannel)); - ref.set(contentProducer); + contentListener.setContentProducer(contentProducer); - Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, buffers.length + 1, contentProducer); - assertThat(error, is(expectedError)); + try (AutoLock lock = contentProducer.lock()) + { + Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, buffers.length + 1, contentProducer); + assertThat(error, is(expectedError)); + } } @Test @@ -107,14 +113,18 @@ public void testBlockingContentProducerGzipInterceptor() buffers[1] = gzipByteBuffer(uncompressedBuffers[1]); buffers[2] = gzipByteBuffer(uncompressedBuffers[2]); - AtomicReference ref = new AtomicReference<>(); - ArrayDelayedHttpChannel httpChannel = new ArrayDelayedHttpChannel(buffers, new HttpInput.EofContent(), scheduledExecutorService, () -> ref.get().onContentProducible()); + ContentListener contentListener = new ContentListener(); + ArrayDelayedHttpChannel httpChannel = new ArrayDelayedHttpChannel(buffers, new HttpInput.EofContent(), scheduledExecutorService, contentListener); ContentProducer contentProducer = new BlockingContentProducer(new AsyncContentProducer(httpChannel)); - ref.set(contentProducer); - contentProducer.setInterceptor(new GzipHttpInputInterceptor(inflaterPool, new ArrayByteBufferPool(1, 1, 2), 32)); + contentListener.setContentProducer(contentProducer); + + try (AutoLock lock = contentProducer.lock()) + { + contentProducer.setInterceptor(new GzipHttpInputInterceptor(inflaterPool, new ArrayByteBufferPool(1, 1, 2), 32)); - Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, buffers.length + 1, contentProducer); - assertThat(error, nullValue()); + Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, buffers.length + 1, contentProducer); + assertThat(error, nullValue()); + } } @Test @@ -132,14 +142,18 @@ public void testBlockingContentProducerGzipInterceptorWithTinyBuffers() buffers[1] = gzipByteBuffer(uncompressedBuffers[1]); buffers[2] = gzipByteBuffer(uncompressedBuffers[2]); - AtomicReference ref = new AtomicReference<>(); - ArrayDelayedHttpChannel httpChannel = new ArrayDelayedHttpChannel(buffers, new HttpInput.EofContent(), scheduledExecutorService, () -> ref.get().onContentProducible()); + ContentListener contentListener = new ContentListener(); + ArrayDelayedHttpChannel httpChannel = new ArrayDelayedHttpChannel(buffers, new HttpInput.EofContent(), scheduledExecutorService, contentListener); ContentProducer contentProducer = new BlockingContentProducer(new AsyncContentProducer(httpChannel)); - ref.set(contentProducer); - contentProducer.setInterceptor(new GzipHttpInputInterceptor(inflaterPool, new ArrayByteBufferPool(1, 1, 2), 1)); + contentListener.setContentProducer(contentProducer); + + try (AutoLock lock = contentProducer.lock()) + { + contentProducer.setInterceptor(new GzipHttpInputInterceptor(inflaterPool, new ArrayByteBufferPool(1, 1, 2), 1)); - Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, totalContentBytesCount + 1, contentProducer); - assertThat(error, nullValue()); + Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, totalContentBytesCount + 1, contentProducer); + assertThat(error, nullValue()); + } } @Test @@ -158,14 +172,18 @@ public void testBlockingContentProducerGzipInterceptorWithError() buffers[1] = gzipByteBuffer(uncompressedBuffers[1]); buffers[2] = gzipByteBuffer(uncompressedBuffers[2]); - AtomicReference ref = new AtomicReference<>(); - ArrayDelayedHttpChannel httpChannel = new ArrayDelayedHttpChannel(buffers, new HttpInput.ErrorContent(expectedError), scheduledExecutorService, () -> ref.get().onContentProducible()); + ContentListener contentListener = new ContentListener(); + ArrayDelayedHttpChannel httpChannel = new ArrayDelayedHttpChannel(buffers, new HttpInput.ErrorContent(expectedError), scheduledExecutorService, contentListener); ContentProducer contentProducer = new BlockingContentProducer(new AsyncContentProducer(httpChannel)); - ref.set(contentProducer); - contentProducer.setInterceptor(new GzipHttpInputInterceptor(inflaterPool, new ArrayByteBufferPool(1, 1, 2), 32)); + contentListener.setContentProducer(contentProducer); + + try (AutoLock lock = contentProducer.lock()) + { + contentProducer.setInterceptor(new GzipHttpInputInterceptor(inflaterPool, new ArrayByteBufferPool(1, 1, 2), 32)); - Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, buffers.length + 1, contentProducer); - assertThat(error, is(expectedError)); + Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, buffers.length + 1, contentProducer); + assertThat(error, is(expectedError)); + } } private Throwable readAndAssertContent(int totalContentBytesCount, String originalContentString, int totalContentCount, ContentProducer contentProducer) @@ -241,9 +259,26 @@ private static ByteBuffer gzipByteBuffer(ByteBuffer uncompressedBuffer) } } - private interface ContentListener + private static class ContentListener { - void onContent(); + private ContentProducer contentProducer; + + private ContentListener() + { + } + + private void onContent() + { + try (AutoLock lock = contentProducer.lock()) + { + contentProducer.onContentProducible(); + } + } + + private void setContentProducer(ContentProducer contentProducer) + { + this.contentProducer = contentProducer; + } } private static class ArrayDelayedHttpChannel extends HttpChannel diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/BlockingTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/BlockingTest.java new file mode 100644 index 000000000000..ac527e8cfa0b --- /dev/null +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/BlockingTest.java @@ -0,0 +1,604 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.server; + +import java.io.BufferedReader; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.Socket; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; +import java.util.zip.GZIPOutputStream; +import javax.servlet.AsyncContext; +import javax.servlet.DispatcherType; +import javax.servlet.ServletException; +import javax.servlet.ServletOutputStream; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.http.HttpTester; +import org.eclipse.jetty.server.handler.AbstractHandler; +import org.eclipse.jetty.server.handler.ContextHandler; +import org.eclipse.jetty.server.handler.DefaultHandler; +import org.eclipse.jetty.server.handler.HandlerCollection; +import org.eclipse.jetty.server.handler.HandlerList; +import org.eclipse.jetty.server.handler.gzip.GzipHandler; +import org.hamcrest.Matchers; +import org.hamcrest.core.Is; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.startsWith; +import static org.hamcrest.core.Is.is; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class BlockingTest +{ + private Server server; + ServerConnector connector; + private ContextHandler context; + + @BeforeEach + void setUp() + { + server = new Server(); + connector = new ServerConnector(server); + server.addConnector(connector); + + context = new ContextHandler("/ctx"); + + HandlerList handlers = new HandlerList(); + handlers.setHandlers(new Handler[]{context, new DefaultHandler()}); + server.setHandler(handlers); + } + + @AfterEach + void tearDown() throws Exception + { + server.stop(); + } + + @Test + public void testBlockingReadThenNormalComplete() throws Exception + { + CountDownLatch started = new CountDownLatch(1); + CountDownLatch stopped = new CountDownLatch(1); + AtomicReference readException = new AtomicReference<>(); + AbstractHandler handler = new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + new Thread(() -> + { + try + { + int b = baseRequest.getHttpInput().read(); + if (b == '1') + { + started.countDown(); + if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE) + throw new IllegalStateException(); + } + } + catch (Throwable t) + { + readException.set(t); + stopped.countDown(); + } + }).start(); + + try + { + // wait for thread to start and read first byte + started.await(10, TimeUnit.SECONDS); + // give it time to block on second byte + Thread.sleep(1000); + } + catch (Throwable e) + { + throw new ServletException(e); + } + + response.setStatus(200); + response.setContentType("text/plain"); + response.getOutputStream().print("OK\r\n"); + } + }; + context.setHandler(handler); + server.start(); + + StringBuilder request = new StringBuilder(); + request.append("POST /ctx/path/info HTTP/1.1\r\n") + .append("Host: localhost\r\n") + .append("Content-Type: test/data\r\n") + .append("Content-Length: 2\r\n") + .append("\r\n") + .append("1"); + + int port = connector.getLocalPort(); + try (Socket socket = new Socket("localhost", port)) + { + socket.setSoTimeout(1000000); + OutputStream out = socket.getOutputStream(); + out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1)); + + HttpTester.Response response = HttpTester.parseResponse(socket.getInputStream()); + assertThat(response, notNullValue()); + assertThat(response.getStatus(), is(200)); + assertThat(response.getContent(), containsString("OK")); + + // Async thread should have stopped + assertTrue(stopped.await(10, TimeUnit.SECONDS)); + assertThat(readException.get(), instanceOf(IOException.class)); + } + } + + @Test + public void testBlockingReadAndBlockingWriteGzipped() throws Exception + { + AtomicReference threadRef = new AtomicReference<>(); + CyclicBarrier barrier = new CyclicBarrier(2); + + AbstractHandler handler = new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + try + { + baseRequest.setHandled(true); + final AsyncContext asyncContext = baseRequest.startAsync(); + final ServletOutputStream outputStream = response.getOutputStream(); + final Thread thread = new Thread(() -> + { + try + { + for (int i = 0; i < 5; i++) + { + int b = baseRequest.getHttpInput().read(); + assertThat(b, not(is(-1))); + } + outputStream.write("All read.".getBytes(StandardCharsets.UTF_8)); + barrier.await(); // notify that all bytes were read + baseRequest.getHttpInput().read(); // this read should throw IOException as the client has closed the connection + throw new AssertionError("should have thrown IOException"); + } + catch (Exception e) + { + //throw new RuntimeException(e); + } + finally + { + try + { + outputStream.close(); + } + catch (Exception e2) + { + //e2.printStackTrace(); + } + asyncContext.complete(); + } + }); + threadRef.set(thread); + thread.start(); + barrier.await(); // notify that handler thread has started + + response.setStatus(200); + response.setContentType("text/plain"); + response.getOutputStream().print("OK\r\n"); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + }; + GzipHandler gzipHandler = new GzipHandler(); + gzipHandler.setMinGzipSize(1); + gzipHandler.setHandler(handler); + context.setHandler(gzipHandler); + // using the GzipHandler is mandatory to reproduce the +// context.setHandler(handler); + server.start(); + + StringBuilder request = new StringBuilder(); + // partial chunked request + request.append("POST /ctx/path/info HTTP/1.1\r\n") + .append("Host: localhost\r\n") + .append("Accept-Encoding: gzip, *\r\n") + .append("Content-Type: test/data\r\n") + .append("Transfer-Encoding: chunked\r\n") + .append("\r\n") + .append("10\r\n") + .append("01234") + ; + + int port = connector.getLocalPort(); + try (Socket socket = new Socket("localhost", port)) + { + socket.setSoLinger(true, 0); // send TCP RST upon close instead of FIN + OutputStream out = socket.getOutputStream(); + out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1)); + barrier.await(); // wait for handler thread to be started + barrier.await(); // wait for all bytes of the request to be read + } + threadRef.get().join(5000); + assertThat("handler thread should not be alive anymore", threadRef.get().isAlive(), is(false)); + } + + @Test + public void testNormalCompleteThenBlockingRead() throws Exception + { + CountDownLatch started = new CountDownLatch(1); + CountDownLatch completed = new CountDownLatch(1); + CountDownLatch stopped = new CountDownLatch(1); + AtomicReference readException = new AtomicReference<>(); + AbstractHandler handler = new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + new Thread(() -> + { + try + { + int b = baseRequest.getHttpInput().read(); + if (b == '1') + { + started.countDown(); + completed.await(10, TimeUnit.SECONDS); + Thread.sleep(500); + if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE) + throw new IllegalStateException(); + } + } + catch (Throwable t) + { + readException.set(t); + stopped.countDown(); + } + }).start(); + + try + { + // wait for thread to start and read first byte + started.await(10, TimeUnit.SECONDS); + // give it time to block on second byte + Thread.sleep(1000); + } + catch (Throwable e) + { + throw new ServletException(e); + } + + response.setStatus(200); + response.setContentType("text/plain"); + response.getOutputStream().print("OK\r\n"); + } + }; + context.setHandler(handler); + server.start(); + + StringBuilder request = new StringBuilder(); + request.append("POST /ctx/path/info HTTP/1.1\r\n") + .append("Host: localhost\r\n") + .append("Content-Type: test/data\r\n") + .append("Content-Length: 2\r\n") + .append("\r\n") + .append("1"); + + int port = connector.getLocalPort(); + try (Socket socket = new Socket("localhost", port)) + { + socket.setSoTimeout(1000000); + OutputStream out = socket.getOutputStream(); + out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1)); + + HttpTester.Response response = HttpTester.parseResponse(socket.getInputStream()); + assertThat(response, notNullValue()); + assertThat(response.getStatus(), is(200)); + assertThat(response.getContent(), containsString("OK")); + + completed.countDown(); + Thread.sleep(1000); + + // Async thread should have stopped + assertTrue(stopped.await(10, TimeUnit.SECONDS)); + assertThat(readException.get(), instanceOf(IOException.class)); + } + } + + @Test + public void testStartAsyncThenBlockingReadThenTimeout() throws Exception + { + CountDownLatch started = new CountDownLatch(1); + CountDownLatch completed = new CountDownLatch(1); + CountDownLatch stopped = new CountDownLatch(1); + AtomicReference readException = new AtomicReference<>(); + AbstractHandler handler = new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException + { + baseRequest.setHandled(true); + if (baseRequest.getDispatcherType() != DispatcherType.ERROR) + { + AsyncContext async = request.startAsync(); + async.setTimeout(100); + + new Thread(() -> + { + try + { + int b = baseRequest.getHttpInput().read(); + if (b == '1') + { + started.countDown(); + completed.await(10, TimeUnit.SECONDS); + Thread.sleep(500); + if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE) + throw new IllegalStateException(); + } + } + catch (Throwable t) + { + readException.set(t); + stopped.countDown(); + } + }).start(); + + try + { + // wait for thread to start and read first byte + started.await(10, TimeUnit.SECONDS); + // give it time to block on second byte + Thread.sleep(1000); + } + catch (Throwable e) + { + throw new ServletException(e); + } + } + } + }; + context.setHandler(handler); + server.start(); + + StringBuilder request = new StringBuilder(); + request.append("POST /ctx/path/info HTTP/1.1\r\n") + .append("Host: localhost\r\n") + .append("Content-Type: test/data\r\n") + .append("Content-Length: 2\r\n") + .append("\r\n") + .append("1"); + + int port = connector.getLocalPort(); + try (Socket socket = new Socket("localhost", port)) + { + socket.setSoTimeout(1000000); + OutputStream out = socket.getOutputStream(); + out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1)); + + HttpTester.Response response = HttpTester.parseResponse(socket.getInputStream()); + assertThat(response, notNullValue()); + assertThat(response.getStatus(), is(500)); + assertThat(response.getContent(), containsString("AsyncContext timeout")); + + completed.countDown(); + Thread.sleep(1000); + + // Async thread should have stopped + assertTrue(stopped.await(10, TimeUnit.SECONDS)); + assertThat(readException.get(), instanceOf(IOException.class)); + } + } + + @Test + public void testBlockingReadThenSendError() throws Exception + { + CountDownLatch started = new CountDownLatch(1); + CountDownLatch stopped = new CountDownLatch(1); + AtomicReference readException = new AtomicReference<>(); + AbstractHandler handler = new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + if (baseRequest.getDispatcherType() != DispatcherType.ERROR) + { + new Thread(() -> + { + try + { + int b = baseRequest.getHttpInput().read(); + if (b == '1') + { + started.countDown(); + if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE) + throw new IllegalStateException(); + } + } + catch (Throwable t) + { + readException.set(t); + stopped.countDown(); + } + }).start(); + + try + { + // wait for thread to start and read first byte + started.await(10, TimeUnit.SECONDS); + // give it time to block on second byte + Thread.sleep(1000); + } + catch (Throwable e) + { + throw new ServletException(e); + } + + response.sendError(499); + } + } + }; + context.setHandler(handler); + server.start(); + + StringBuilder request = new StringBuilder(); + request.append("POST /ctx/path/info HTTP/1.1\r\n") + .append("Host: localhost\r\n") + .append("Content-Type: test/data\r\n") + .append("Content-Length: 2\r\n") + .append("\r\n") + .append("1"); + + int port = connector.getLocalPort(); + try (Socket socket = new Socket("localhost", port)) + { + socket.setSoTimeout(1000000); + OutputStream out = socket.getOutputStream(); + out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1)); + + HttpTester.Response response = HttpTester.parseResponse(socket.getInputStream()); + assertThat(response, notNullValue()); + assertThat(response.getStatus(), is(499)); + + // Async thread should have stopped + assertTrue(stopped.await(10, TimeUnit.SECONDS)); + assertThat(readException.get(), instanceOf(IOException.class)); + } + } + + @Test + public void testBlockingWriteThenNormalComplete() throws Exception + { + CountDownLatch started = new CountDownLatch(1); + CountDownLatch stopped = new CountDownLatch(1); + AtomicReference readException = new AtomicReference<>(); + AbstractHandler handler = new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException + { + baseRequest.setHandled(true); + response.setStatus(200); + response.setContentType("text/plain"); + new Thread(() -> + { + try + { + byte[] data = new byte[16 * 1024]; + Arrays.fill(data, (byte)'X'); + data[data.length - 2] = '\r'; + data[data.length - 1] = '\n'; + OutputStream out = response.getOutputStream(); + started.countDown(); + while (true) + out.write(data); + } + catch (Throwable t) + { + readException.set(t); + stopped.countDown(); + } + }).start(); + + try + { + // wait for thread to start and read first byte + started.await(10, TimeUnit.SECONDS); + // give it time to block on write + Thread.sleep(1000); + } + catch (Throwable e) + { + throw new ServletException(e); + } + } + }; + context.setHandler(handler); + server.start(); + + StringBuilder request = new StringBuilder(); + request.append("GET /ctx/path/info HTTP/1.1\r\n") + .append("Host: localhost\r\n") + .append("\r\n"); + + int port = connector.getLocalPort(); + try (Socket socket = new Socket("localhost", port)) + { + socket.setSoTimeout(1000000); + OutputStream out = socket.getOutputStream(); + out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1)); + + BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.ISO_8859_1)); + + // Read the header + List header = new ArrayList<>(); + while (true) + { + String line = in.readLine(); + if (line.length() == 0) + break; + header.add(line); + } + assertThat(header.get(0), containsString("200 OK")); + + // read one line of content + String content = in.readLine(); + assertThat(content, is("4000")); + content = in.readLine(); + assertThat(content, startsWith("XXXXXXXX")); + + // check that writing thread is stopped by end of request handling + assertTrue(stopped.await(10, TimeUnit.SECONDS)); + + // read until last line + String last = null; + while (true) + { + String line = in.readLine(); + if (line == null) + break; + + last = line; + } + + // last line is not empty chunk, ie abnormal completion + assertThat(last, startsWith("XXXXX")); + } + } +} diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/ResponseTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/ResponseTest.java index 22b6c2371876..a6b962c07fdd 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/ResponseTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/ResponseTest.java @@ -643,6 +643,7 @@ public void testContentTypeWithCharacterEncoding() throws Exception assertEquals("foo2/bar2;charset=utf-8", response.getContentType()); response.recycle(); + response.reopen(); response.setCharacterEncoding("utf16"); response.setContentType("text/html; charset=utf-8"); @@ -655,6 +656,7 @@ public void testContentTypeWithCharacterEncoding() throws Exception assertEquals("text/xml;charset=utf-8", response.getContentType()); response.recycle(); + response.reopen(); response.setCharacterEncoding("utf-16"); response.setContentType("foo/bar"); assertEquals("foo/bar;charset=utf-16", response.getContentType()); diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java index a40aabcb001b..6ea4351dc80b 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java @@ -16,6 +16,7 @@ import java.io.Closeable; import java.io.IOException; import java.io.InterruptedIOException; +import java.util.Objects; import java.util.concurrent.CancellationException; import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.Condition; @@ -44,10 +45,10 @@ public class SharedBlockingCallback { private static final Logger LOG = LoggerFactory.getLogger(SharedBlockingCallback.class); - private static Throwable IDLE = new ConstantThrowable("IDLE"); - private static Throwable SUCCEEDED = new ConstantThrowable("SUCCEEDED"); + private static final Throwable IDLE = new ConstantThrowable("IDLE"); + private static final Throwable SUCCEEDED = new ConstantThrowable("SUCCEEDED"); - private static Throwable FAILED = new ConstantThrowable("FAILED"); + private static final Throwable FAILED = new ConstantThrowable("FAILED"); private final ReentrantLock _lock = new ReentrantLock(); private final Condition _idle = _lock.newCondition(); @@ -76,6 +77,26 @@ public Blocker acquire() throws IOException } } + public boolean fail(Throwable cause) + { + Objects.requireNonNull(cause); + _lock.lock(); + try + { + if (_blocker._state == null) + { + _blocker._state = new BlockerFailedException(cause); + _complete.signalAll(); + return true; + } + } + finally + { + _lock.unlock(); + } + return false; + } + protected void notComplete(Blocker blocker) { LOG.warn("Blocker not complete {}", blocker); @@ -145,10 +166,12 @@ else if (cause instanceof BlockerTimeoutException) _state = cause; _complete.signalAll(); } - else if (_state instanceof BlockerTimeoutException) + else if (_state instanceof BlockerTimeoutException || _state instanceof BlockerFailedException) { // Failure arrived late, block() already // modified the state, nothing more to do. + if (LOG.isDebugEnabled()) + LOG.debug("Failed after {}", _state); } else { @@ -261,4 +284,12 @@ public String toString() private static class BlockerTimeoutException extends TimeoutException { } + + private static class BlockerFailedException extends Exception + { + public BlockerFailedException(Throwable cause) + { + super(cause); + } + } } diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/BlockedIOTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/BlockedIOTest.java new file mode 100644 index 000000000000..f85f1e327187 --- /dev/null +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/BlockedIOTest.java @@ -0,0 +1,140 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.http.client; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.client.util.DeferredContentProvider; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.handler.AbstractHandler; +import org.eclipse.jetty.util.BufferUtil; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ArgumentsSource; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.core.Is.is; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class BlockedIOTest extends AbstractTest +{ + @Override + public void init(Transport transport) throws IOException + { + setScenario(new TransportScenario(transport)); + } + + @ParameterizedTest + @ArgumentsSource(TransportProvider.class) + public void testBlockingReadThenNormalComplete(Transport transport) throws Exception + { + CountDownLatch started = new CountDownLatch(1); + CountDownLatch stopped = new CountDownLatch(1); + AtomicReference readException = new AtomicReference<>(); + AtomicReference rereadException = new AtomicReference<>(); + + init(transport); + scenario.start(new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + new Thread(() -> + { + try + { + int b = baseRequest.getHttpInput().read(); + if (b == '1') + { + started.countDown(); + if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE) + throw new IllegalStateException(); + } + } + catch (Throwable ex1) + { + readException.set(ex1); + try + { + if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE) + throw new IllegalStateException(); + } + catch (Throwable ex2) + { + rereadException.set(ex2); + } + finally + { + stopped.countDown(); + } + } + }).start(); + + try + { + // wait for thread to start and read first byte + started.await(10, TimeUnit.SECONDS); + // give it time to block on second byte + Thread.sleep(1000); + } + catch (Throwable e) + { + throw new ServletException(e); + } + + response.setStatus(200); + response.setContentType("text/plain"); + response.getOutputStream().print("OK\r\n"); + } + }); + + DeferredContentProvider contentProvider = new DeferredContentProvider(); + CountDownLatch ok = new CountDownLatch(2); + scenario.client.newRequest(scenario.newURI()) + .method("POST") + .content(contentProvider) + .onResponseContent((response, content) -> + { + assertThat(BufferUtil.toString(content), containsString("OK")); + ok.countDown(); + }) + .onResponseSuccess(response -> + { + try + { + assertThat(response.getStatus(), is(200)); + stopped.await(10, TimeUnit.SECONDS); + ok.countDown(); + } + catch (Throwable t) + { + t.printStackTrace(); + } + }) + .send(null); + contentProvider.offer(BufferUtil.toBuffer("1")); + + assertTrue(ok.await(10, TimeUnit.SECONDS)); + assertThat(readException.get(), instanceOf(IOException.class)); + assertThat(rereadException.get(), instanceOf(IOException.class)); + } +}