diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index 024d57ec2459..aa1acf5f7dfe 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -382,8 +382,15 @@ private boolean parseRequestBuffer() @Override public void onCompleted() { + // If we are fill interested, then a read is pending and we must abort + if (isFillInterested()) + { + LOG.warn("Pending read in onCompleted {} {}", this, getEndPoint()); + _channel.abort(new IOException("Pending read in onCompleted")); + } + // Handle connection upgrades - if (_channel.getResponse().getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101) + else if (_channel.getResponse().getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101) { Connection connection = (Connection)_channel.getRequest().getAttribute(UPGRADE_CONNECTION_ATTRIBUTE); if (connection != null) @@ -407,6 +414,9 @@ public void onCompleted() } } + // Drive to EOF, EarlyEOF or Error + boolean complete = _input.consumeAll(); + // Finish consuming the request // If we are still expecting if (_channel.isExpecting100Continue()) @@ -415,7 +425,7 @@ public void onCompleted() _parser.close(); } // else abort if we can't consume all - else if (_generator.isPersistent() && !_input.consumeAll()) + else if (_generator.isPersistent() && !complete) { if (LOG.isDebugEnabled()) LOG.debug("unconsumed input {} {}", this, _parser); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java index 600260de34ad..91efb63e07b0 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java @@ -720,12 +720,17 @@ public boolean consumeAll() { produceContent(); if (_content == null && _intercepted == null && _inputQ.isEmpty()) + { + _state = EARLY_EOF; + _inputQ.notify(); return false; + } } catch (Throwable e) { LOG.debug(e); _state = new ErrorState(e); + _inputQ.notify(); return false; } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java index 799fa5fe3395..675d1d1c622f 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java @@ -29,6 +29,7 @@ import java.nio.charset.CoderResult; import java.nio.charset.CodingErrorAction; import java.util.ResourceBundle; +import java.util.concurrent.CancellationException; import java.util.concurrent.TimeUnit; import javax.servlet.RequestDispatcher; import javax.servlet.ServletOutputStream; @@ -69,7 +70,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable enum State { OPEN, // Open - CLOSE, // Close needed from onWriteCompletion + CLOSE, // Close needed from onWriteComplete CLOSING, // Close in progress after close API called CLOSED // Closed } @@ -308,7 +309,7 @@ else if (_state == State.CLOSE) { // Somebody called close or complete while we were writing. // We can now send a (probably empty) last buffer and then when it completes - // onWriteCompletion will be called again to actually execute the _completeCallback + // onWriteComplete will be called again to actually execute the _completeCallback _state = State.CLOSING; closeContent = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER; } @@ -411,53 +412,87 @@ public void complete(Callback callback) ByteBuffer content = null; synchronized (_channelState) { - switch (_state) + // First check the API state for any unrecoverable situations + switch (_apiState) { - case CLOSED: - succeeded = true; + case UNREADY: // isReady() has returned false so a call to onWritePossible may happen at any time + error = new CancellationException("Completed whilst write unready"); break; - case CLOSE: - case CLOSING: - _closedCallback = Callback.combine(_closedCallback, callback); + case PENDING: // an async write is pending and may complete at any time + // If this is not the last write, then we must abort + if (!_channel.getResponse().isContentComplete(_written)) + error = new CancellationException("Completed whilst write pending"); break; - case OPEN: - if (_onError != null) - { - error = _onError; - break; - } + case BLOCKED: // another thread is blocked in a write or a close + error = new CancellationException("Completed whilst write blocked"); + break; - _closedCallback = Callback.combine(_closedCallback, callback); + default: + break; + } - switch (_apiState) - { - case BLOCKING: - // Output is idle blocking state, but we still do an async close - _apiState = ApiState.BLOCKED; - _state = State.CLOSING; - content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER; - break; + // If we can't complete due to the API state, then abort + if (error != null) + { + _channel.abort(error); + _writeBlocker.fail(error); + _state = State.CLOSED; + } + else + { + // Otherwise check the output state to determine how to complete + switch (_state) + { + case CLOSED: + succeeded = true; + break; - case ASYNC: - case READY: - // Output is idle in async state, so we can do an async close - _apiState = ApiState.PENDING; - _state = State.CLOSING; - content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER; - break; + case CLOSE: + case CLOSING: + _closedCallback = Callback.combine(_closedCallback, callback); + break; - case BLOCKED: - case UNREADY: - case PENDING: - // An operation is in progress, so we soft close now - _softClose = true; - // then trigger a close from onWriteComplete - _state = State.CLOSE; + case OPEN: + if (_onError != null) + { + error = _onError; break; - } - break; + } + + _closedCallback = Callback.combine(_closedCallback, callback); + + switch (_apiState) + { + case BLOCKING: + // Output is idle blocking state, but we still do an async close + _apiState = ApiState.BLOCKED; + _state = State.CLOSING; + content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER; + break; + + case ASYNC: + case READY: + // Output is idle in async state, so we can do an async close + _apiState = ApiState.PENDING; + _state = State.CLOSING; + content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER; + break; + + case UNREADY: + case PENDING: + // An operation is in progress, so we soft close now + _softClose = true; + // then trigger a close from onWriteComplete + _state = State.CLOSE; + break; + + default: + throw new IllegalStateException(); + } + break; + } } } @@ -1399,7 +1434,7 @@ public void recycle() { _state = State.OPEN; _apiState = ApiState.BLOCKING; - _softClose = false; + _softClose = true; // Stay closed until next request _interceptor = _channel; HttpConfiguration config = _channel.getHttpConfiguration(); _bufferSize = config.getOutputBufferSize(); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java index 4bda67ae2891..bcd2777a311c 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java @@ -1814,6 +1814,11 @@ public boolean isUserInRole(String role) */ public void setMetaData(org.eclipse.jetty.http.MetaData.Request request) { + if (_metaData == null && _input != null && _channel != null) + { + _input.recycle(); + _channel.getResponse().getHttpOutput().reopen(); + } _metaData = request; setMethod(request.getMethod()); @@ -1895,7 +1900,7 @@ protected void recycle() getHttpChannelState().recycle(); _requestAttributeListeners.clear(); - _input.recycle(); + // Defer _input.recycle() until setMetaData on next request, TODO replace with recycle and reopen in 10 _metaData = null; _originalURI = null; _contextPath = null; diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/BlockingTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/BlockingTest.java new file mode 100644 index 000000000000..2ce8800efef8 --- /dev/null +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/BlockingTest.java @@ -0,0 +1,504 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.server; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.Socket; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import javax.servlet.AsyncContext; +import javax.servlet.DispatcherType; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.http.HttpTester; +import org.eclipse.jetty.server.handler.AbstractHandler; +import org.eclipse.jetty.server.handler.ContextHandler; +import org.eclipse.jetty.server.handler.DefaultHandler; +import org.eclipse.jetty.server.handler.HandlerList; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.startsWith; +import static org.hamcrest.core.Is.is; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class BlockingTest +{ + private Server server; + private ServerConnector connector; + private ContextHandler context; + + @BeforeEach + public void setUp() + { + server = new Server(); + connector = new ServerConnector(server); + connector.setPort(0); + server.addConnector(connector); + + context = new ContextHandler("/ctx"); + + HandlerList handlers = new HandlerList(); + handlers.setHandlers(new Handler[]{context, new DefaultHandler()}); + server.setHandler(handlers); + } + + @AfterEach + public void tearDown() throws Exception + { + server.stop(); + } + + @Test + public void testBlockingReadThenNormalComplete() throws Exception + { + CountDownLatch started = new CountDownLatch(1); + CountDownLatch stopped = new CountDownLatch(1); + AtomicReference readException = new AtomicReference<>(); + AbstractHandler handler = new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + new Thread(() -> + { + try + { + int b = baseRequest.getHttpInput().read(); + if (b == '1') + { + started.countDown(); + if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE) + throw new IllegalStateException(); + } + } + catch (Throwable t) + { + readException.set(t); + stopped.countDown(); + } + }).start(); + + try + { + // wait for thread to start and read first byte + started.await(10, TimeUnit.SECONDS); + // give it time to block on second byte + Thread.sleep(1000); + } + catch (Throwable e) + { + throw new ServletException(e); + } + + response.setStatus(200); + response.setContentType("text/plain"); + response.getOutputStream().print("OK\r\n"); + } + }; + context.setHandler(handler); + server.start(); + + StringBuilder request = new StringBuilder(); + request.append("POST /ctx/path/info HTTP/1.1\r\n") + .append("Host: localhost\r\n") + .append("Content-Type: test/data\r\n") + .append("Content-Length: 2\r\n") + .append("\r\n") + .append("1"); + + int port = connector.getLocalPort(); + try (Socket socket = new Socket("localhost", port)) + { + socket.setSoTimeout(10000); + OutputStream out = socket.getOutputStream(); + out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1)); + + HttpTester.Response response = HttpTester.parseResponse(socket.getInputStream()); + assertThat(response, notNullValue()); + assertThat(response.getStatus(), is(200)); + assertThat(response.getContent(), containsString("OK")); + + // Async thread should have stopped + assertTrue(stopped.await(10, TimeUnit.SECONDS)); + assertThat(readException.get(), instanceOf(IOException.class)); + } + } + + @Test + public void testNormalCompleteThenBlockingRead() throws Exception + { + CountDownLatch started = new CountDownLatch(1); + CountDownLatch completed = new CountDownLatch(1); + CountDownLatch stopped = new CountDownLatch(1); + AtomicReference readException = new AtomicReference<>(); + AbstractHandler handler = new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + new Thread(() -> + { + try + { + int b = baseRequest.getHttpInput().read(); + if (b == '1') + { + started.countDown(); + completed.await(10, TimeUnit.SECONDS); + Thread.sleep(500); + if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE) + throw new IllegalStateException(); + } + } + catch (Throwable t) + { + readException.set(t); + stopped.countDown(); + } + }).start(); + + try + { + // wait for thread to start and read first byte + started.await(10, TimeUnit.SECONDS); + // give it time to block on second byte + Thread.sleep(1000); + } + catch (Throwable e) + { + throw new ServletException(e); + } + + response.setStatus(200); + response.setContentType("text/plain"); + response.getOutputStream().print("OK\r\n"); + } + }; + context.setHandler(handler); + server.start(); + + StringBuilder request = new StringBuilder(); + request.append("POST /ctx/path/info HTTP/1.1\r\n") + .append("Host: localhost\r\n") + .append("Content-Type: test/data\r\n") + .append("Content-Length: 2\r\n") + .append("\r\n") + .append("1"); + + int port = connector.getLocalPort(); + try (Socket socket = new Socket("localhost", port)) + { + socket.setSoTimeout(10000); + OutputStream out = socket.getOutputStream(); + out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1)); + + HttpTester.Response response = HttpTester.parseResponse(socket.getInputStream()); + assertThat(response, notNullValue()); + assertThat(response.getStatus(), is(200)); + assertThat(response.getContent(), containsString("OK")); + + completed.countDown(); + Thread.sleep(1000); + + // Async thread should have stopped + assertTrue(stopped.await(10, TimeUnit.SECONDS)); + assertThat(readException.get(), instanceOf(IOException.class)); + } + } + + @Test + public void testStartAsyncThenBlockingReadThenTimeout() throws Exception + { + CountDownLatch started = new CountDownLatch(1); + CountDownLatch completed = new CountDownLatch(1); + CountDownLatch stopped = new CountDownLatch(1); + AtomicReference readException = new AtomicReference<>(); + AbstractHandler handler = new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException + { + baseRequest.setHandled(true); + if (baseRequest.getDispatcherType() != DispatcherType.ERROR) + { + AsyncContext async = request.startAsync(); + async.setTimeout(100); + + new Thread(() -> + { + try + { + int b = baseRequest.getHttpInput().read(); + if (b == '1') + { + started.countDown(); + completed.await(10, TimeUnit.SECONDS); + Thread.sleep(500); + if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE) + throw new IllegalStateException(); + } + } + catch (Throwable t) + { + readException.set(t); + stopped.countDown(); + } + }).start(); + + try + { + // wait for thread to start and read first byte + started.await(10, TimeUnit.SECONDS); + // give it time to block on second byte + Thread.sleep(1000); + } + catch (Throwable e) + { + throw new ServletException(e); + } + } + } + }; + context.setHandler(handler); + server.start(); + + StringBuilder request = new StringBuilder(); + request.append("POST /ctx/path/info HTTP/1.1\r\n") + .append("Host: localhost\r\n") + .append("Content-Type: test/data\r\n") + .append("Content-Length: 2\r\n") + .append("\r\n") + .append("1"); + + int port = connector.getLocalPort(); + try (Socket socket = new Socket("localhost", port)) + { + socket.setSoTimeout(10000); + OutputStream out = socket.getOutputStream(); + out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1)); + + HttpTester.Response response = HttpTester.parseResponse(socket.getInputStream()); + assertThat(response, notNullValue()); + assertThat(response.getStatus(), is(500)); + assertThat(response.getContent(), containsString("AsyncContext timeout")); + + completed.countDown(); + Thread.sleep(1000); + + // Async thread should have stopped + assertTrue(stopped.await(10, TimeUnit.SECONDS)); + assertThat(readException.get(), instanceOf(IOException.class)); + } + } + + @Test + public void testBlockingReadThenSendError() throws Exception + { + CountDownLatch started = new CountDownLatch(1); + CountDownLatch stopped = new CountDownLatch(1); + AtomicReference readException = new AtomicReference<>(); + AbstractHandler handler = new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + if (baseRequest.getDispatcherType() != DispatcherType.ERROR) + { + new Thread(() -> + { + try + { + int b = baseRequest.getHttpInput().read(); + if (b == '1') + { + started.countDown(); + if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE) + throw new IllegalStateException(); + } + } + catch (Throwable t) + { + readException.set(t); + stopped.countDown(); + } + }).start(); + + try + { + // wait for thread to start and read first byte + started.await(10, TimeUnit.SECONDS); + // give it time to block on second byte + Thread.sleep(1000); + } + catch (Throwable e) + { + throw new ServletException(e); + } + + response.sendError(499); + } + } + }; + context.setHandler(handler); + server.start(); + + StringBuilder request = new StringBuilder(); + request.append("POST /ctx/path/info HTTP/1.1\r\n") + .append("Host: localhost\r\n") + .append("Content-Type: test/data\r\n") + .append("Content-Length: 2\r\n") + .append("\r\n") + .append("1"); + + int port = connector.getLocalPort(); + try (Socket socket = new Socket("localhost", port)) + { + socket.setSoTimeout(10000); + OutputStream out = socket.getOutputStream(); + out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1)); + + HttpTester.Response response = HttpTester.parseResponse(socket.getInputStream()); + assertThat(response, notNullValue()); + assertThat(response.getStatus(), is(499)); + + // Async thread should have stopped + assertTrue(stopped.await(10, TimeUnit.SECONDS)); + assertThat(readException.get(), instanceOf(IOException.class)); + } + } + + @Test + public void testBlockingWriteThenNormalComplete() throws Exception + { + CountDownLatch started = new CountDownLatch(1); + CountDownLatch stopped = new CountDownLatch(1); + AtomicReference readException = new AtomicReference<>(); + AbstractHandler handler = new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException + { + baseRequest.setHandled(true); + response.setStatus(200); + response.setContentType("text/plain"); + new Thread(() -> + { + try + { + byte[] data = new byte[16 * 1024]; + Arrays.fill(data, (byte)'X'); + data[data.length - 2] = '\r'; + data[data.length - 1] = '\n'; + OutputStream out = response.getOutputStream(); + started.countDown(); + while (true) + out.write(data); + } + catch (Throwable t) + { + readException.set(t); + stopped.countDown(); + } + }).start(); + + try + { + // wait for thread to start and read first byte + started.await(10, TimeUnit.SECONDS); + // give it time to block on write + Thread.sleep(1000); + } + catch (Throwable e) + { + throw new ServletException(e); + } + } + }; + context.setHandler(handler); + server.start(); + + StringBuilder request = new StringBuilder(); + request.append("GET /ctx/path/info HTTP/1.1\r\n") + .append("Host: localhost\r\n") + .append("\r\n"); + + int port = connector.getLocalPort(); + try (Socket socket = new Socket("localhost", port)) + { + socket.setSoTimeout(10000); + OutputStream out = socket.getOutputStream(); + out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1)); + + BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.ISO_8859_1)); + + // Read the header + List header = new ArrayList<>(); + while (true) + { + String line = in.readLine(); + if (line.length() == 0) + break; + header.add(line); + } + assertThat(header.get(0), containsString("200 OK")); + + // read one line of content + String content = in.readLine(); + assertThat(content, is("4000")); + content = in.readLine(); + assertThat(content, startsWith("XXXXXXXX")); + + // check that writing thread is stopped by end of request handling + assertTrue(stopped.await(10, TimeUnit.SECONDS)); + + // read until last line + String last = null; + while (true) + { + String line = in.readLine(); + if (line == null) + break; + + last = line; + } + + // last line is not empty chunk, ie abnormal completion + assertThat(last, startsWith("XXXXX")); + } + } +} diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/ResponseTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/ResponseTest.java index 51d8133c05b0..5ba0df395420 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/ResponseTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/ResponseTest.java @@ -493,6 +493,7 @@ public void testContentTypeWithCharacterEncoding() throws Exception assertEquals("foo2/bar2;charset=utf-8", response.getContentType()); response.recycle(); + response.reopen(); response.setCharacterEncoding("utf16"); response.setContentType("text/html; charset=utf-8"); @@ -505,6 +506,7 @@ public void testContentTypeWithCharacterEncoding() throws Exception assertEquals("text/xml;charset=utf-8", response.getContentType()); response.recycle(); + response.reopen(); response.setCharacterEncoding("utf-16"); response.setContentType("foo/bar"); assertEquals("foo/bar;charset=utf-16", response.getContentType()); diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java index eea89c8ea248..daad08cc5029 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java @@ -21,6 +21,7 @@ import java.io.Closeable; import java.io.IOException; import java.io.InterruptedIOException; +import java.util.Objects; import java.util.concurrent.CancellationException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -50,10 +51,10 @@ public class SharedBlockingCallback { private static final Logger LOG = Log.getLogger(SharedBlockingCallback.class); - private static Throwable IDLE = new ConstantThrowable("IDLE"); - private static Throwable SUCCEEDED = new ConstantThrowable("SUCCEEDED"); + private static final Throwable IDLE = new ConstantThrowable("IDLE"); + private static final Throwable SUCCEEDED = new ConstantThrowable("SUCCEEDED"); - private static Throwable FAILED = new ConstantThrowable("FAILED"); + private static final Throwable FAILED = new ConstantThrowable("FAILED"); private final ReentrantLock _lock = new ReentrantLock(); private final Condition _idle = _lock.newCondition(); @@ -96,6 +97,26 @@ public Blocker acquire() throws IOException } } + public boolean fail(Throwable cause) + { + Objects.requireNonNull(cause); + _lock.lock(); + try + { + if (_blocker._state == null) + { + _blocker._state = new BlockerFailedException(cause); + _complete.signalAll(); + return true; + } + } + finally + { + _lock.unlock(); + } + return false; + } + protected void notComplete(Blocker blocker) { LOG.warn("Blocker not complete {}", blocker); @@ -165,10 +186,12 @@ else if (cause instanceof BlockerTimeoutException) _state = cause; _complete.signalAll(); } - else if (_state instanceof BlockerTimeoutException) + else if (_state instanceof BlockerTimeoutException || _state instanceof BlockerFailedException) { // Failure arrived late, block() already // modified the state, nothing more to do. + if (LOG.isDebugEnabled()) + LOG.debug("Failed after {}", _state); } else { @@ -297,4 +320,12 @@ public String toString() private static class BlockerTimeoutException extends TimeoutException { } + + private static class BlockerFailedException extends Exception + { + public BlockerFailedException(Throwable cause) + { + super(cause); + } + } } diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/BlockedIOTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/BlockedIOTest.java new file mode 100644 index 000000000000..ca14214e9b4d --- /dev/null +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/BlockedIOTest.java @@ -0,0 +1,145 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.http.client; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.client.util.DeferredContentProvider; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.handler.AbstractHandler; +import org.eclipse.jetty.util.BufferUtil; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ArgumentsSource; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.core.Is.is; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class BlockedIOTest extends AbstractTest +{ + @Override + public void init(Transport transport) throws IOException + { + setScenario(new TransportScenario(transport)); + } + + @ParameterizedTest + @ArgumentsSource(TransportProvider.class) + public void testBlockingReadThenNormalComplete(Transport transport) throws Exception + { + CountDownLatch started = new CountDownLatch(1); + CountDownLatch stopped = new CountDownLatch(1); + AtomicReference readException = new AtomicReference<>(); + AtomicReference rereadException = new AtomicReference<>(); + + init(transport); + scenario.start(new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + new Thread(() -> + { + try + { + int b = baseRequest.getHttpInput().read(); + if (b == '1') + { + started.countDown(); + if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE) + throw new IllegalStateException(); + } + } + catch (Throwable ex1) + { + readException.set(ex1); + try + { + if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE) + throw new IllegalStateException(); + } + catch (Throwable ex2) + { + rereadException.set(ex2); + } + finally + { + stopped.countDown(); + } + } + }).start(); + + try + { + // wait for thread to start and read first byte + started.await(10, TimeUnit.SECONDS); + // give it time to block on second byte + Thread.sleep(1000); + } + catch (Throwable e) + { + throw new ServletException(e); + } + + response.setStatus(200); + response.setContentType("text/plain"); + response.getOutputStream().print("OK\r\n"); + } + }); + + DeferredContentProvider contentProvider = new DeferredContentProvider(); + CountDownLatch ok = new CountDownLatch(2); + scenario.client.newRequest(scenario.newURI()) + .method("POST") + .content(contentProvider) + .onResponseContent((response, content) -> + { + assertThat(BufferUtil.toString(content), containsString("OK")); + ok.countDown(); + }) + .onResponseSuccess(response -> + { + try + { + assertThat(response.getStatus(), is(200)); + stopped.await(10, TimeUnit.SECONDS); + ok.countDown(); + } + catch (Throwable t) + { + t.printStackTrace(); + } + }) + .send(null); + contentProvider.offer(BufferUtil.toBuffer("1")); + + assertTrue(ok.await(10, TimeUnit.SECONDS)); + assertThat(readException.get(), instanceOf(IOException.class)); + assertThat(rereadException.get(), instanceOf(IOException.class)); + } +}