From 9cc7be4842e96b995a29c2ab0c897969887f9568 Mon Sep 17 00:00:00 2001 From: gregw Date: Tue, 2 Feb 2021 14:03:38 +0100 Subject: [PATCH 01/17] Fix #5605 Unblock non container Threads Ensure that HttpInput is always closed to EOF, EarlyEOF or Error, so that non container threads doing blocking reads will not block forever, even if late. Delay recycling of HttpInput until next request is received. --- .../eclipse/jetty/io/AbstractEndPoint.java | 7 + .../java/org/eclipse/jetty/io/EndPoint.java | 3 + .../org/eclipse/jetty/io/FillInterest.java | 189 +++++++-- .../eclipse/jetty/server/HttpConnection.java | 5 +- .../org/eclipse/jetty/server/HttpInput.java | 14 + .../jetty/server/ProxyConnectionFactory.java | 7 + .../org/eclipse/jetty/server/Request.java | 4 +- .../eclipse/jetty/server/BlockingTest.java | 398 ++++++++++++++++++ 8 files changed, 590 insertions(+), 37 deletions(-) create mode 100644 jetty-server/src/test/java/org/eclipse/jetty/server/BlockingTest.java diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java index 99237f853a78..c6635f732b0b 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; @@ -362,6 +363,12 @@ public void fillInterested(Callback callback) _fillInterest.register(callback); } + @Override + public Throwable cancelFillInterest(Supplier cancellation) + { + return _fillInterest.cancel(cancellation); + } + @Override public boolean tryFillInterested(Callback callback) { diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java index 16776dacbd32..99d4f3def3ab 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java @@ -24,6 +24,7 @@ import java.nio.ByteBuffer; import java.nio.channels.ReadPendingException; import java.nio.channels.WritePendingException; +import java.util.function.Supplier; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.FutureCallback; @@ -223,6 +224,8 @@ public interface EndPoint extends Closeable */ boolean isFillInterested(); + Throwable cancelFillInterest(Supplier cancellation); + /** *

Writes the given buffers via {@link #flush(ByteBuffer...)} and invokes callback methods when either * all the data has been flushed or an error occurs.

diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java b/jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java index fa7fc5fc88cc..967282768f87 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java @@ -21,7 +21,9 @@ import java.io.IOException; import java.nio.channels.ClosedChannelException; import java.nio.channels.ReadPendingException; +import java.util.concurrent.CancellationException; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.log.Log; @@ -42,6 +44,38 @@ protected FillInterest() { } + /** + * Cancel a fill interest registration. + * + * If there was a registration, then any {@link #fillable()}, {@link #onClose()} or {@link #onFail(Throwable)} + * calls are remembered and passed to the next registration. + * Since any actions resulting from a call to {@link #needsFillInterest()} cannot be unwound, a subsequent call to + * register will not call {@link #needsFillInterest()} again if it has already been called an no callback received. + * @param cancellation A supplier of the cancellation Throwable to use if there is an existing registration. If the + * suppler or the supplied Throwable is null, then a new {@link CancellationException} is used. + * @return The Throwable used to cancel an existing registration or null if there was no registration to cancel. + */ + public Throwable cancel(Supplier cancellation) + { + Cancelled cancelled = new Cancelled(); + while (true) + { + Callback callback = _interested.get(); + if (callback == null || callback instanceof Cancelled) + return null; + if (_interested.compareAndSet(callback, cancelled)) + { + Throwable cause = cancellation == null ? null : cancellation.get(); + if (cause == null) + cause = new CancellationException(); + if (LOG.isDebugEnabled()) + LOG.debug("cancelled {} {}",this, callback, cause); + callback.failed(cause); + return cause; + } + } + } + /** * Call to register interest in a callback when a read is possible. * The callback will be called either immediately if {@link #needsFillInterest()} @@ -68,26 +102,63 @@ public void register(Callback callback) throws ReadPendingException * @return true if the register succeeded */ public boolean tryRegister(Callback callback) + { + return register(callback, null); + } + + /** + * Call to register interest in a callback when a read is possible. + * The callback will be called either immediately if {@link #needsFillInterest()} + * returns true or eventually once {@link #fillable()} is called. + * + * @param callback the callback to register + * @param cancellation A supplier of a {@link Throwable}, which if not null will be used to fail any existing registration + * @return true if the register succeeded + */ + public boolean register(Callback callback, Supplier cancellation) { if (callback == null) throw new IllegalArgumentException(); - if (!_interested.compareAndSet(null, callback)) - return false; + while (true) + { + Callback existing = _interested.get(); - if (LOG.isDebugEnabled()) - LOG.debug("interested {}", this); + if (existing != null && !(existing instanceof Cancelled) && cancellation == null) + return false; - try - { - needsFillInterest(); - } - catch (Throwable e) - { - onFail(e); - } + if (existing == callback) + return true; - return true; + if (_interested.compareAndSet(existing, callback)) + { + if (LOG.isDebugEnabled()) + LOG.debug("interested {}->{}", existing, this); + if (existing == null) + { + try + { + needsFillInterest(); + } + catch (Throwable e) + { + onFail(e); + } + } + else if (existing instanceof Cancelled) + { + ((Cancelled)existing).apply(callback); + } + else + { + Throwable cause = cancellation.get(); + if (cause == null) + cause = new CancellationException(); + existing.failed(cause); + } + return true; + } + } } /** @@ -97,17 +168,19 @@ public boolean tryRegister(Callback callback) */ public boolean fillable() { - if (LOG.isDebugEnabled()) - LOG.debug("fillable {}", this); - Callback callback = _interested.get(); - if (callback != null && _interested.compareAndSet(callback, null)) + while (true) { - callback.succeeded(); - return true; + Callback callback = _interested.get(); + if (callback == null) + return false; + if (_interested.compareAndSet(callback, null)) + { + if (LOG.isDebugEnabled()) + LOG.debug("fillable {} {}",this, callback); + callback.succeeded(); + return true; + } } - if (LOG.isDebugEnabled()) - LOG.debug("{} lost race {}", this, callback); - return false; } /** @@ -115,7 +188,8 @@ public boolean fillable() */ public boolean isInterested() { - return _interested.get() != null; + Callback callback = _interested.get(); + return callback != null && !(callback instanceof Cancelled); } public InvocationType getCallbackInvocationType() @@ -132,24 +206,37 @@ public InvocationType getCallbackInvocationType() */ public boolean onFail(Throwable cause) { - if (LOG.isDebugEnabled()) - LOG.debug("onFail " + this, cause); - Callback callback = _interested.get(); - if (callback != null && _interested.compareAndSet(callback, null)) + while (true) { - callback.failed(cause); - return true; + Callback callback = _interested.get(); + if (callback == null) + return false; + if (_interested.compareAndSet(callback, null)) + { + if (LOG.isDebugEnabled()) + LOG.debug("onFail {} {}",this, callback, cause); + callback.failed(cause); + return true; + } } - return false; } public void onClose() { - if (LOG.isDebugEnabled()) - LOG.debug("onClose {}", this); - Callback callback = _interested.get(); - if (callback != null && _interested.compareAndSet(callback, null)) - callback.failed(new ClosedChannelException()); + while (true) + { + Callback callback = _interested.get(); + if (callback == null) + return; + if (_interested.compareAndSet(callback, null)) + { + ClosedChannelException cause = new ClosedChannelException(); + if (LOG.isDebugEnabled()) + LOG.debug("onFail {} {}",this, callback, cause); + callback.failed(cause); + return; + } + } } @Override @@ -171,4 +258,36 @@ public String toStateString() * @throws IOException if unable to fulfill interest in fill */ protected abstract void needsFillInterest() throws IOException; + + private static class Cancelled implements Callback + { + private final AtomicReference _result = new AtomicReference<>(); + + @Override + public void succeeded() + { + _result.compareAndSet(null, Boolean.TRUE); + } + + @Override + public void failed(Throwable x) + { + _result.compareAndSet(null, x == null ? new Exception() : x); + } + + @Override + public InvocationType getInvocationType() + { + return InvocationType.NON_BLOCKING; + } + + void apply(Callback callback) + { + Object result = _result.get(); + if (result == Boolean.TRUE) + callback.succeeded(); + else if (result instanceof Throwable) + callback.failed((Throwable)result); + } + } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index 3ae8bbe4bc67..7b503913b5bb 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -376,6 +376,9 @@ private boolean parseRequestBuffer() @Override public void onCompleted() { + boolean complete = _input.consumeAll(); + getEndPoint().cancelFillInterest(_input::getError); + // Handle connection upgrades if (_channel.getResponse().getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101) { @@ -409,7 +412,7 @@ public void onCompleted() _parser.close(); } // else abort if we can't consume all - else if (_generator.isPersistent() && !_input.consumeAll()) + else if (_generator.isPersistent() && !complete) { if (LOG.isDebugEnabled()) LOG.debug("unconsumed input {} {}", this, _parser); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java index 600260de34ad..724132c3e303 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java @@ -720,12 +720,17 @@ public boolean consumeAll() { produceContent(); if (_content == null && _intercepted == null && _inputQ.isEmpty()) + { + _state = EARLY_EOF; + _inputQ.notify(); return false; + } } catch (Throwable e) { LOG.debug(e); _state = new ErrorState(e); + _inputQ.notify(); return false; } } @@ -740,6 +745,15 @@ public boolean isError() } } + public Throwable getError() + { + synchronized (_inputQ) + { + Throwable error = _state instanceof ErrorState ? ((ErrorState)_state)._error : null; + return error == null ? new IOException() : error; + } + } + public boolean isAsync() { synchronized (_inputQ) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/ProxyConnectionFactory.java b/jetty-server/src/main/java/org/eclipse/jetty/server/ProxyConnectionFactory.java index 71cd03c274db..3938c0f272a6 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/ProxyConnectionFactory.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/ProxyConnectionFactory.java @@ -27,6 +27,7 @@ import java.nio.channels.ReadPendingException; import java.nio.channels.WritePendingException; import java.nio.charset.StandardCharsets; +import java.util.function.Supplier; import org.eclipse.jetty.io.AbstractConnection; import org.eclipse.jetty.io.Connection; @@ -805,6 +806,12 @@ public void fillInterested(Callback callback) throws ReadPendingException _endp.fillInterested(callback); } + @Override + public Throwable cancelFillInterest(Supplier cancellation) + { + return _endp.cancelFillInterest(cancellation); + } + @Override public boolean flush(ByteBuffer... buffer) throws IOException { diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java index d1c27ef2a72b..3cc4946c2d2e 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java @@ -1811,6 +1811,8 @@ public boolean isUserInRole(String role) */ public void setMetaData(org.eclipse.jetty.http.MetaData.Request request) { + if (_metaData == null) + _input.recycle(); _metaData = request; setMethod(request.getMethod()); @@ -1879,7 +1881,7 @@ protected void recycle() getHttpChannelState().recycle(); _requestAttributeListeners.clear(); - _input.recycle(); + // Defer _input.recycle() until setMetaData on next request, so that late readers will fail _metaData = null; _originalURI = null; _contextPath = null; diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/BlockingTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/BlockingTest.java new file mode 100644 index 000000000000..62605a29dee5 --- /dev/null +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/BlockingTest.java @@ -0,0 +1,398 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.server; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.Socket; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import javax.servlet.AsyncContext; +import javax.servlet.DispatcherType; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.http.HttpTester; +import org.eclipse.jetty.server.handler.AbstractHandler; +import org.eclipse.jetty.server.handler.ContextHandler; +import org.eclipse.jetty.server.handler.DefaultHandler; +import org.eclipse.jetty.server.handler.HandlerList; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.core.Is.is; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class BlockingTest +{ + private Server server; + ServerConnector connector; + private ContextHandler context; + + @BeforeEach + void setUp() + { + server = new Server(); + connector = new ServerConnector(server); + connector.setPort(0); + server.addConnector(connector); + + context = new ContextHandler("/ctx"); + + HandlerList handlers = new HandlerList(); + handlers.setHandlers(new Handler[]{context, new DefaultHandler()}); + server.setHandler(handlers); + } + + @AfterEach + void tearDown() throws Exception + { + server.stop(); + } + + @Test + public void testBlockingReadThenNormalComplete() throws Exception + { + CountDownLatch started = new CountDownLatch(1); + CountDownLatch stopped = new CountDownLatch(1); + AtomicReference readException = new AtomicReference<>(); + AbstractHandler handler = new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + new Thread(() -> + { + try + { + int b = baseRequest.getHttpInput().read(); + if (b == '1') + { + started.countDown(); + if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE) + throw new IllegalStateException(); + } + } + catch (Throwable t) + { + readException.set(t); + stopped.countDown(); + } + }).start(); + + try + { + // wait for thread to start and read first byte + started.await(10, TimeUnit.SECONDS); + // give it time to block on second byte + Thread.sleep(1000); + } + catch (Throwable e) + { + throw new ServletException(e); + } + + response.setStatus(200); + response.setContentType("text/plain"); + response.getOutputStream().print("OK\r\n"); + } + }; + context.setHandler(handler); + server.start(); + + StringBuilder request = new StringBuilder(); + request.append("POST /ctx/path/info HTTP/1.1\r\n") + .append("Host: localhost\r\n") + .append("Content-Type: test/data\r\n") + .append("Content-Length: 2\r\n") + .append("\r\n") + .append("1"); + + int port = connector.getLocalPort(); + try (Socket socket = new Socket("localhost", port)) + { + socket.setSoTimeout(1000000); + OutputStream out = socket.getOutputStream(); + out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1)); + + HttpTester.Response response = HttpTester.parseResponse(socket.getInputStream()); + assertThat(response, notNullValue()); + assertThat(response.getStatus(), is(200)); + assertThat(response.getContent(), containsString("OK")); + + // Async thread should have stopped + assertTrue(stopped.await(10, TimeUnit.SECONDS)); + assertThat(readException.get(), instanceOf(IOException.class)); + } + } + + @Test + public void testNormalCompleteThenBlockingRead() throws Exception + { + CountDownLatch started = new CountDownLatch(1); + CountDownLatch completed = new CountDownLatch(1); + CountDownLatch stopped = new CountDownLatch(1); + AtomicReference readException = new AtomicReference<>(); + AbstractHandler handler = new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + new Thread(() -> + { + try + { + int b = baseRequest.getHttpInput().read(); + if (b == '1') + { + started.countDown(); + completed.await(10, TimeUnit.SECONDS); + Thread.sleep(500); + if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE) + throw new IllegalStateException(); + } + } + catch (Throwable t) + { + readException.set(t); + stopped.countDown(); + } + }).start(); + + try + { + // wait for thread to start and read first byte + started.await(10, TimeUnit.SECONDS); + // give it time to block on second byte + Thread.sleep(1000); + } + catch (Throwable e) + { + throw new ServletException(e); + } + + response.setStatus(200); + response.setContentType("text/plain"); + response.getOutputStream().print("OK\r\n"); + } + }; + context.setHandler(handler); + server.start(); + + StringBuilder request = new StringBuilder(); + request.append("POST /ctx/path/info HTTP/1.1\r\n") + .append("Host: localhost\r\n") + .append("Content-Type: test/data\r\n") + .append("Content-Length: 2\r\n") + .append("\r\n") + .append("1"); + + int port = connector.getLocalPort(); + try (Socket socket = new Socket("localhost", port)) + { + socket.setSoTimeout(1000000); + OutputStream out = socket.getOutputStream(); + out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1)); + + HttpTester.Response response = HttpTester.parseResponse(socket.getInputStream()); + assertThat(response, notNullValue()); + assertThat(response.getStatus(), is(200)); + assertThat(response.getContent(), containsString("OK")); + + completed.countDown(); + Thread.sleep(1000); + + // Async thread should have stopped + assertTrue(stopped.await(10, TimeUnit.SECONDS)); + assertThat(readException.get(), instanceOf(IOException.class)); + } + } + + @Test + public void testStartAsyncThenBlockingReadThenTimeout() throws Exception + { + CountDownLatch started = new CountDownLatch(1); + CountDownLatch completed = new CountDownLatch(1); + CountDownLatch stopped = new CountDownLatch(1); + AtomicReference readException = new AtomicReference<>(); + AbstractHandler handler = new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException + { + baseRequest.setHandled(true); + if (baseRequest.getDispatcherType() != DispatcherType.ERROR) + { + AsyncContext async = request.startAsync(); + async.setTimeout(100); + + new Thread(() -> + { + try + { + int b = baseRequest.getHttpInput().read(); + if (b == '1') + { + started.countDown(); + completed.await(10, TimeUnit.SECONDS); + Thread.sleep(500); + if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE) + throw new IllegalStateException(); + } + } + catch (Throwable t) + { + readException.set(t); + stopped.countDown(); + } + }).start(); + + try + { + // wait for thread to start and read first byte + started.await(10, TimeUnit.SECONDS); + // give it time to block on second byte + Thread.sleep(1000); + } + catch (Throwable e) + { + throw new ServletException(e); + } + } + } + }; + context.setHandler(handler); + server.start(); + + StringBuilder request = new StringBuilder(); + request.append("POST /ctx/path/info HTTP/1.1\r\n") + .append("Host: localhost\r\n") + .append("Content-Type: test/data\r\n") + .append("Content-Length: 2\r\n") + .append("\r\n") + .append("1"); + + int port = connector.getLocalPort(); + try (Socket socket = new Socket("localhost", port)) + { + socket.setSoTimeout(1000000); + OutputStream out = socket.getOutputStream(); + out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1)); + + HttpTester.Response response = HttpTester.parseResponse(socket.getInputStream()); + assertThat(response, notNullValue()); + assertThat(response.getStatus(), is(500)); + assertThat(response.getContent(), containsString("AsyncContext timeout")); + + completed.countDown(); + Thread.sleep(1000); + + // Async thread should have stopped + assertTrue(stopped.await(10, TimeUnit.SECONDS)); + assertThat(readException.get(), instanceOf(IOException.class)); + } + } + + @Test + public void testBlockingReadThenSendError() throws Exception + { + CountDownLatch started = new CountDownLatch(1); + CountDownLatch stopped = new CountDownLatch(1); + AtomicReference readException = new AtomicReference<>(); + AbstractHandler handler = new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + if (baseRequest.getDispatcherType() != DispatcherType.ERROR) + { + new Thread(() -> + { + try + { + int b = baseRequest.getHttpInput().read(); + if (b == '1') + { + started.countDown(); + if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE) + throw new IllegalStateException(); + } + } + catch (Throwable t) + { + readException.set(t); + stopped.countDown(); + } + }).start(); + + try + { + // wait for thread to start and read first byte + started.await(10, TimeUnit.SECONDS); + // give it time to block on second byte + Thread.sleep(1000); + } + catch (Throwable e) + { + throw new ServletException(e); + } + + response.sendError(499); + } + } + }; + context.setHandler(handler); + server.start(); + + StringBuilder request = new StringBuilder(); + request.append("POST /ctx/path/info HTTP/1.1\r\n") + .append("Host: localhost\r\n") + .append("Content-Type: test/data\r\n") + .append("Content-Length: 2\r\n") + .append("\r\n") + .append("1"); + + int port = connector.getLocalPort(); + try (Socket socket = new Socket("localhost", port)) + { + socket.setSoTimeout(1000000); + OutputStream out = socket.getOutputStream(); + out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1)); + + HttpTester.Response response = HttpTester.parseResponse(socket.getInputStream()); + assertThat(response, notNullValue()); + assertThat(response.getStatus(), is(499)); + + // Async thread should have stopped + assertTrue(stopped.await(10, TimeUnit.SECONDS)); + assertThat(readException.get(), instanceOf(IOException.class)); + } + } +} From b3268eb3b584666a50dca08dad80893d1a69620d Mon Sep 17 00:00:00 2001 From: gregw Date: Tue, 2 Feb 2021 16:33:32 +0100 Subject: [PATCH 02/17] Fix #5605 Unblock non container Threads Ensure that HttpInput is always closed to EOF, EarlyEOF or Error, so that non container threads doing blocking reads will not block forever, even if late. Delay recycling of HttpInput until next request is received. --- .../eclipse/jetty/websocket/common/io/MockEndPoint.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/io/MockEndPoint.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/io/MockEndPoint.java index b57c0f9bff9e..8a541ea10aee 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/io/MockEndPoint.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/io/MockEndPoint.java @@ -23,6 +23,7 @@ import java.nio.ByteBuffer; import java.nio.channels.ReadPendingException; import java.nio.channels.WritePendingException; +import java.util.function.Supplier; import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; @@ -116,6 +117,12 @@ public void fillInterested(Callback callback) throws ReadPendingException throw new UnsupportedOperationException(NOT_SUPPORTED); } + @Override + public Throwable cancelFillInterest(Supplier cancellation) + { + throw new UnsupportedOperationException(NOT_SUPPORTED); + } + @Override public boolean tryFillInterested(Callback callback) { From 0d85c7d2200fd5c495205234907cb9aad70c14de Mon Sep 17 00:00:00 2001 From: gregw Date: Tue, 2 Feb 2021 17:49:17 +0100 Subject: [PATCH 03/17] Fix #5605 Unblock non container Threads test and fixes for the write side. --- .../eclipse/jetty/server/HttpConnection.java | 4 +- .../org/eclipse/jetty/server/HttpOutput.java | 12 +- .../eclipse/jetty/server/BlockingTest.java | 106 ++++++++++++++++++ .../jetty/util/SharedBlockingCallback.java | 39 ++++++- 4 files changed, 155 insertions(+), 6 deletions(-) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index 7b503913b5bb..78f96cf9ccbd 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -377,7 +377,9 @@ private boolean parseRequestBuffer() public void onCompleted() { boolean complete = _input.consumeAll(); - getEndPoint().cancelFillInterest(_input::getError); + Throwable cancelled = getEndPoint().cancelFillInterest(_input::getError); + if (LOG.isDebugEnabled()) + LOG.debug("cancelled {}", this, cancelled); // Handle connection upgrades if (_channel.getResponse().getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java index 799fa5fe3395..143931091b7c 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java @@ -29,6 +29,7 @@ import java.nio.charset.CoderResult; import java.nio.charset.CodingErrorAction; import java.util.ResourceBundle; +import java.util.concurrent.CancellationException; import java.util.concurrent.TimeUnit; import javax.servlet.RequestDispatcher; import javax.servlet.ServletOutputStream; @@ -449,6 +450,15 @@ public void complete(Callback callback) break; case BLOCKED: + CancellationException cancelled = new CancellationException(); + if (_writeBlocker.fail(cancelled)) + _channel.abort(cancelled); + // An operation is in progress, so we soft close now + _softClose = true; + // then trigger a close from onWriteComplete + _state = State.CLOSE; + break; + case UNREADY: case PENDING: // An operation is in progress, so we soft close now @@ -1399,7 +1409,7 @@ public void recycle() { _state = State.OPEN; _apiState = ApiState.BLOCKING; - _softClose = false; + _softClose = true; // Stay closed until next request _interceptor = _channel; HttpConfiguration config = _channel.getHttpConfiguration(); _bufferSize = config.getOutputBufferSize(); diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/BlockingTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/BlockingTest.java index 62605a29dee5..9a2f20b38287 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/BlockingTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/BlockingTest.java @@ -18,10 +18,15 @@ package org.eclipse.jetty.server; +import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStreamReader; import java.io.OutputStream; import java.net.Socket; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -44,6 +49,7 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.startsWith; import static org.hamcrest.core.Is.is; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -395,4 +401,104 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques assertThat(readException.get(), instanceOf(IOException.class)); } } + + @Test + public void testBlockingWriteThenNormalComplete() throws Exception + { + CountDownLatch started = new CountDownLatch(1); + CountDownLatch stopped = new CountDownLatch(1); + AtomicReference readException = new AtomicReference<>(); + AbstractHandler handler = new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException + { + baseRequest.setHandled(true); + response.setStatus(200); + response.setContentType("text/plain"); + new Thread(() -> + { + try + { + byte[] data = new byte[16 * 1024]; + Arrays.fill(data, (byte)'X'); + data[data.length - 2] = '\r'; + data[data.length - 1] = '\n'; + OutputStream out = response.getOutputStream(); + started.countDown(); + while (true) + out.write(data); + } + catch (Throwable t) + { + readException.set(t); + stopped.countDown(); + } + }).start(); + + try + { + // wait for thread to start and read first byte + started.await(10, TimeUnit.SECONDS); + // give it time to block on write + Thread.sleep(1000); + } + catch (Throwable e) + { + throw new ServletException(e); + } + } + }; + context.setHandler(handler); + server.start(); + + StringBuilder request = new StringBuilder(); + request.append("GET /ctx/path/info HTTP/1.1\r\n") + .append("Host: localhost\r\n") + .append("\r\n"); + + int port = connector.getLocalPort(); + try (Socket socket = new Socket("localhost", port)) + { + socket.setSoTimeout(1000000); + OutputStream out = socket.getOutputStream(); + out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1)); + + BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.ISO_8859_1)); + + // Read the header + List header = new ArrayList<>(); + while (true) + { + String line = in.readLine(); + if (line.length() == 0) + break; + header.add(line); + } + assertThat(header.get(0), containsString("200 OK")); + + // read one line of content + String content = in.readLine(); + assertThat(content, is("4000")); + content = in.readLine(); + assertThat(content, startsWith("XXXXXXXX")); + + // check that writing thread is stopped by end of request handling + assertTrue(stopped.await(10, TimeUnit.SECONDS)); + + // read until last line + String last = null; + while (true) + { + String line = in.readLine(); + if (line == null) + break; + + last = line; + } + + // last line is not empty chunk, ie abnormal completion + assertThat(last, startsWith("XXXXX")); + } + } } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java index eea89c8ea248..daad08cc5029 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java @@ -21,6 +21,7 @@ import java.io.Closeable; import java.io.IOException; import java.io.InterruptedIOException; +import java.util.Objects; import java.util.concurrent.CancellationException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -50,10 +51,10 @@ public class SharedBlockingCallback { private static final Logger LOG = Log.getLogger(SharedBlockingCallback.class); - private static Throwable IDLE = new ConstantThrowable("IDLE"); - private static Throwable SUCCEEDED = new ConstantThrowable("SUCCEEDED"); + private static final Throwable IDLE = new ConstantThrowable("IDLE"); + private static final Throwable SUCCEEDED = new ConstantThrowable("SUCCEEDED"); - private static Throwable FAILED = new ConstantThrowable("FAILED"); + private static final Throwable FAILED = new ConstantThrowable("FAILED"); private final ReentrantLock _lock = new ReentrantLock(); private final Condition _idle = _lock.newCondition(); @@ -96,6 +97,26 @@ public Blocker acquire() throws IOException } } + public boolean fail(Throwable cause) + { + Objects.requireNonNull(cause); + _lock.lock(); + try + { + if (_blocker._state == null) + { + _blocker._state = new BlockerFailedException(cause); + _complete.signalAll(); + return true; + } + } + finally + { + _lock.unlock(); + } + return false; + } + protected void notComplete(Blocker blocker) { LOG.warn("Blocker not complete {}", blocker); @@ -165,10 +186,12 @@ else if (cause instanceof BlockerTimeoutException) _state = cause; _complete.signalAll(); } - else if (_state instanceof BlockerTimeoutException) + else if (_state instanceof BlockerTimeoutException || _state instanceof BlockerFailedException) { // Failure arrived late, block() already // modified the state, nothing more to do. + if (LOG.isDebugEnabled()) + LOG.debug("Failed after {}", _state); } else { @@ -297,4 +320,12 @@ public String toString() private static class BlockerTimeoutException extends TimeoutException { } + + private static class BlockerFailedException extends Exception + { + public BlockerFailedException(Throwable cause) + { + super(cause); + } + } } From a110fc3468041892b20590095f7d2bb0557186dc Mon Sep 17 00:00:00 2001 From: gregw Date: Tue, 2 Feb 2021 17:55:15 +0100 Subject: [PATCH 04/17] Fix #5605 Unblock non container Threads test and fixes for the write side. --- .../src/main/java/org/eclipse/jetty/server/Request.java | 3 +++ .../src/test/java/org/eclipse/jetty/server/ResponseTest.java | 2 ++ 2 files changed, 5 insertions(+) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java index 3cc4946c2d2e..bdb455694a78 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java @@ -1812,7 +1812,10 @@ public boolean isUserInRole(String role) public void setMetaData(org.eclipse.jetty.http.MetaData.Request request) { if (_metaData == null) + { _input.recycle(); + _channel.getResponse().getHttpOutput().reopen(); + } _metaData = request; setMethod(request.getMethod()); diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/ResponseTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/ResponseTest.java index 51d8133c05b0..5ba0df395420 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/ResponseTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/ResponseTest.java @@ -493,6 +493,7 @@ public void testContentTypeWithCharacterEncoding() throws Exception assertEquals("foo2/bar2;charset=utf-8", response.getContentType()); response.recycle(); + response.reopen(); response.setCharacterEncoding("utf16"); response.setContentType("text/html; charset=utf-8"); @@ -505,6 +506,7 @@ public void testContentTypeWithCharacterEncoding() throws Exception assertEquals("text/xml;charset=utf-8", response.getContentType()); response.recycle(); + response.reopen(); response.setCharacterEncoding("utf-16"); response.setContentType("foo/bar"); assertEquals("foo/bar;charset=utf-16", response.getContentType()); From a100d80d26dd4d97b8ad2bab5e32fff190c46feb Mon Sep 17 00:00:00 2001 From: gregw Date: Wed, 3 Feb 2021 11:03:02 +0100 Subject: [PATCH 05/17] Fix #5605 Unblock non container Threads Don't consumeAll before upgrade --- .../org/eclipse/jetty/server/HttpConnection.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index 78f96cf9ccbd..9b7bd630a326 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -376,17 +376,15 @@ private boolean parseRequestBuffer() @Override public void onCompleted() { - boolean complete = _input.consumeAll(); - Throwable cancelled = getEndPoint().cancelFillInterest(_input::getError); - if (LOG.isDebugEnabled()) - LOG.debug("cancelled {}", this, cancelled); - // Handle connection upgrades if (_channel.getResponse().getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101) { Connection connection = (Connection)_channel.getRequest().getAttribute(UPGRADE_CONNECTION_ATTRIBUTE); if (connection != null) { + Throwable cancelled = getEndPoint().cancelFillInterest(_input::getError); + if (LOG.isDebugEnabled()) + LOG.debug("cancelled {}", this, cancelled); if (LOG.isDebugEnabled()) LOG.debug("Upgrade from {} to {}", this, connection); _channel.getState().upgrade(); @@ -406,6 +404,11 @@ public void onCompleted() } } + boolean complete = _input.consumeAll(); + Throwable cancelled = getEndPoint().cancelFillInterest(_input::getError); + if (LOG.isDebugEnabled()) + LOG.debug("cancelled {}", this, cancelled); + // Finish consuming the request // If we are still expecting if (_channel.isExpecting100Continue()) From 25cbe652a0bdd811289858dc0548a75faffc48cb Mon Sep 17 00:00:00 2001 From: gregw Date: Wed, 3 Feb 2021 11:27:25 +0100 Subject: [PATCH 06/17] Fix #5605 Unblock non container Threads reorder --- .../src/main/java/org/eclipse/jetty/server/HttpOutput.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java index 143931091b7c..248b85caccd0 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java @@ -450,11 +450,12 @@ public void complete(Callback callback) break; case BLOCKED: + // An operation is in progress, so we soft close now + _softClose = true; + // then cancel the operation CancellationException cancelled = new CancellationException(); if (_writeBlocker.fail(cancelled)) _channel.abort(cancelled); - // An operation is in progress, so we soft close now - _softClose = true; // then trigger a close from onWriteComplete _state = State.CLOSE; break; From 70056e2c690b6f92ca0d396123989ad2074a370b Mon Sep 17 00:00:00 2001 From: gregw Date: Wed, 3 Feb 2021 13:40:06 +0100 Subject: [PATCH 07/17] Fix #5605 Unblock non container Threads fix test --- .../src/main/java/org/eclipse/jetty/server/Request.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java index bdb455694a78..5eb360f376c5 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java @@ -1811,7 +1811,7 @@ public boolean isUserInRole(String role) */ public void setMetaData(org.eclipse.jetty.http.MetaData.Request request) { - if (_metaData == null) + if (_metaData == null && _input != null && _channel != null) { _input.recycle(); _channel.getResponse().getHttpOutput().reopen(); From 5f4919c45aa3bb6d232bbed479282b25da6dce8d Mon Sep 17 00:00:00 2001 From: gregw Date: Wed, 3 Feb 2021 14:04:29 +0100 Subject: [PATCH 08/17] Fix #5605 Unblock non container Threads cleanup debug --- .../main/java/org/eclipse/jetty/server/HttpConnection.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index 9b7bd630a326..758df92020f8 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -384,9 +384,7 @@ public void onCompleted() { Throwable cancelled = getEndPoint().cancelFillInterest(_input::getError); if (LOG.isDebugEnabled()) - LOG.debug("cancelled {}", this, cancelled); - if (LOG.isDebugEnabled()) - LOG.debug("Upgrade from {} to {}", this, connection); + LOG.debug("Upgrade from {} to {}", this, connection, cancelled); _channel.getState().upgrade(); getEndPoint().upgrade(connection); _channel.recycle(); From e9315fe51fbec49df96ba1e812c86653384698ae Mon Sep 17 00:00:00 2001 From: gregw Date: Wed, 3 Feb 2021 17:27:25 +0100 Subject: [PATCH 09/17] Fix #5605 Unblock non container Threads revert fillInterest cancellation and just abort connection instead. tested for all transports --- .../eclipse/jetty/io/AbstractEndPoint.java | 7 - .../java/org/eclipse/jetty/io/EndPoint.java | 3 - .../org/eclipse/jetty/io/FillInterest.java | 189 ++++-------------- .../eclipse/jetty/server/HttpConnection.java | 35 ++-- .../jetty/server/ProxyConnectionFactory.java | 7 - .../websocket/common/io/MockEndPoint.java | 7 - .../jetty/http/client/BlockedIOTest.java | 144 +++++++++++++ 7 files changed, 198 insertions(+), 194 deletions(-) create mode 100644 tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/BlockedIOTest.java diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java index c6635f732b0b..99237f853a78 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java @@ -22,7 +22,6 @@ import java.nio.ByteBuffer; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Supplier; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; @@ -363,12 +362,6 @@ public void fillInterested(Callback callback) _fillInterest.register(callback); } - @Override - public Throwable cancelFillInterest(Supplier cancellation) - { - return _fillInterest.cancel(cancellation); - } - @Override public boolean tryFillInterested(Callback callback) { diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java index 99d4f3def3ab..16776dacbd32 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java @@ -24,7 +24,6 @@ import java.nio.ByteBuffer; import java.nio.channels.ReadPendingException; import java.nio.channels.WritePendingException; -import java.util.function.Supplier; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.FutureCallback; @@ -224,8 +223,6 @@ public interface EndPoint extends Closeable */ boolean isFillInterested(); - Throwable cancelFillInterest(Supplier cancellation); - /** *

Writes the given buffers via {@link #flush(ByteBuffer...)} and invokes callback methods when either * all the data has been flushed or an error occurs.

diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java b/jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java index 967282768f87..fa7fc5fc88cc 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java @@ -21,9 +21,7 @@ import java.io.IOException; import java.nio.channels.ClosedChannelException; import java.nio.channels.ReadPendingException; -import java.util.concurrent.CancellationException; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Supplier; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.log.Log; @@ -44,38 +42,6 @@ protected FillInterest() { } - /** - * Cancel a fill interest registration. - * - * If there was a registration, then any {@link #fillable()}, {@link #onClose()} or {@link #onFail(Throwable)} - * calls are remembered and passed to the next registration. - * Since any actions resulting from a call to {@link #needsFillInterest()} cannot be unwound, a subsequent call to - * register will not call {@link #needsFillInterest()} again if it has already been called an no callback received. - * @param cancellation A supplier of the cancellation Throwable to use if there is an existing registration. If the - * suppler or the supplied Throwable is null, then a new {@link CancellationException} is used. - * @return The Throwable used to cancel an existing registration or null if there was no registration to cancel. - */ - public Throwable cancel(Supplier cancellation) - { - Cancelled cancelled = new Cancelled(); - while (true) - { - Callback callback = _interested.get(); - if (callback == null || callback instanceof Cancelled) - return null; - if (_interested.compareAndSet(callback, cancelled)) - { - Throwable cause = cancellation == null ? null : cancellation.get(); - if (cause == null) - cause = new CancellationException(); - if (LOG.isDebugEnabled()) - LOG.debug("cancelled {} {}",this, callback, cause); - callback.failed(cause); - return cause; - } - } - } - /** * Call to register interest in a callback when a read is possible. * The callback will be called either immediately if {@link #needsFillInterest()} @@ -102,63 +68,26 @@ public void register(Callback callback) throws ReadPendingException * @return true if the register succeeded */ public boolean tryRegister(Callback callback) - { - return register(callback, null); - } - - /** - * Call to register interest in a callback when a read is possible. - * The callback will be called either immediately if {@link #needsFillInterest()} - * returns true or eventually once {@link #fillable()} is called. - * - * @param callback the callback to register - * @param cancellation A supplier of a {@link Throwable}, which if not null will be used to fail any existing registration - * @return true if the register succeeded - */ - public boolean register(Callback callback, Supplier cancellation) { if (callback == null) throw new IllegalArgumentException(); - while (true) - { - Callback existing = _interested.get(); - - if (existing != null && !(existing instanceof Cancelled) && cancellation == null) - return false; + if (!_interested.compareAndSet(null, callback)) + return false; - if (existing == callback) - return true; + if (LOG.isDebugEnabled()) + LOG.debug("interested {}", this); - if (_interested.compareAndSet(existing, callback)) - { - if (LOG.isDebugEnabled()) - LOG.debug("interested {}->{}", existing, this); - if (existing == null) - { - try - { - needsFillInterest(); - } - catch (Throwable e) - { - onFail(e); - } - } - else if (existing instanceof Cancelled) - { - ((Cancelled)existing).apply(callback); - } - else - { - Throwable cause = cancellation.get(); - if (cause == null) - cause = new CancellationException(); - existing.failed(cause); - } - return true; - } + try + { + needsFillInterest(); + } + catch (Throwable e) + { + onFail(e); } + + return true; } /** @@ -168,19 +97,17 @@ else if (existing instanceof Cancelled) */ public boolean fillable() { - while (true) + if (LOG.isDebugEnabled()) + LOG.debug("fillable {}", this); + Callback callback = _interested.get(); + if (callback != null && _interested.compareAndSet(callback, null)) { - Callback callback = _interested.get(); - if (callback == null) - return false; - if (_interested.compareAndSet(callback, null)) - { - if (LOG.isDebugEnabled()) - LOG.debug("fillable {} {}",this, callback); - callback.succeeded(); - return true; - } + callback.succeeded(); + return true; } + if (LOG.isDebugEnabled()) + LOG.debug("{} lost race {}", this, callback); + return false; } /** @@ -188,8 +115,7 @@ public boolean fillable() */ public boolean isInterested() { - Callback callback = _interested.get(); - return callback != null && !(callback instanceof Cancelled); + return _interested.get() != null; } public InvocationType getCallbackInvocationType() @@ -206,37 +132,24 @@ public InvocationType getCallbackInvocationType() */ public boolean onFail(Throwable cause) { - while (true) + if (LOG.isDebugEnabled()) + LOG.debug("onFail " + this, cause); + Callback callback = _interested.get(); + if (callback != null && _interested.compareAndSet(callback, null)) { - Callback callback = _interested.get(); - if (callback == null) - return false; - if (_interested.compareAndSet(callback, null)) - { - if (LOG.isDebugEnabled()) - LOG.debug("onFail {} {}",this, callback, cause); - callback.failed(cause); - return true; - } + callback.failed(cause); + return true; } + return false; } public void onClose() { - while (true) - { - Callback callback = _interested.get(); - if (callback == null) - return; - if (_interested.compareAndSet(callback, null)) - { - ClosedChannelException cause = new ClosedChannelException(); - if (LOG.isDebugEnabled()) - LOG.debug("onFail {} {}",this, callback, cause); - callback.failed(cause); - return; - } - } + if (LOG.isDebugEnabled()) + LOG.debug("onClose {}", this); + Callback callback = _interested.get(); + if (callback != null && _interested.compareAndSet(callback, null)) + callback.failed(new ClosedChannelException()); } @Override @@ -258,36 +171,4 @@ public String toStateString() * @throws IOException if unable to fulfill interest in fill */ protected abstract void needsFillInterest() throws IOException; - - private static class Cancelled implements Callback - { - private final AtomicReference _result = new AtomicReference<>(); - - @Override - public void succeeded() - { - _result.compareAndSet(null, Boolean.TRUE); - } - - @Override - public void failed(Throwable x) - { - _result.compareAndSet(null, x == null ? new Exception() : x); - } - - @Override - public InvocationType getInvocationType() - { - return InvocationType.NON_BLOCKING; - } - - void apply(Callback callback) - { - Object result = _result.get(); - if (result == Boolean.TRUE) - callback.succeeded(); - else if (result instanceof Throwable) - callback.failed((Throwable)result); - } - } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index 758df92020f8..47caad8e43c0 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -382,30 +382,33 @@ public void onCompleted() Connection connection = (Connection)_channel.getRequest().getAttribute(UPGRADE_CONNECTION_ATTRIBUTE); if (connection != null) { - Throwable cancelled = getEndPoint().cancelFillInterest(_input::getError); - if (LOG.isDebugEnabled()) - LOG.debug("Upgrade from {} to {}", this, connection, cancelled); - _channel.getState().upgrade(); - getEndPoint().upgrade(connection); - _channel.recycle(); - _parser.reset(); - _generator.reset(); - if (_contentBufferReferences.get() == 0) - releaseRequestBuffer(); + if (isFillInterested()) + abort(new IllegalStateException()); else { - LOG.warn("{} lingering content references?!?!", this); - _requestBuffer = null; // Not returned to pool! - _contentBufferReferences.set(0); + if (LOG.isDebugEnabled()) + LOG.debug("Upgrade from {} to {}", this, connection); + _channel.getState().upgrade(); + getEndPoint().upgrade(connection); + _channel.recycle(); + _parser.reset(); + _generator.reset(); + if (_contentBufferReferences.get() == 0) + releaseRequestBuffer(); + else + { + LOG.warn("{} lingering content references?!?!", this); + _requestBuffer = null; // Not returned to pool! + _contentBufferReferences.set(0); + } } return; } } boolean complete = _input.consumeAll(); - Throwable cancelled = getEndPoint().cancelFillInterest(_input::getError); - if (LOG.isDebugEnabled()) - LOG.debug("cancelled {}", this, cancelled); + if (isFillInterested()) + abort(new IllegalStateException()); // Finish consuming the request // If we are still expecting diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/ProxyConnectionFactory.java b/jetty-server/src/main/java/org/eclipse/jetty/server/ProxyConnectionFactory.java index 3938c0f272a6..71cd03c274db 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/ProxyConnectionFactory.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/ProxyConnectionFactory.java @@ -27,7 +27,6 @@ import java.nio.channels.ReadPendingException; import java.nio.channels.WritePendingException; import java.nio.charset.StandardCharsets; -import java.util.function.Supplier; import org.eclipse.jetty.io.AbstractConnection; import org.eclipse.jetty.io.Connection; @@ -806,12 +805,6 @@ public void fillInterested(Callback callback) throws ReadPendingException _endp.fillInterested(callback); } - @Override - public Throwable cancelFillInterest(Supplier cancellation) - { - return _endp.cancelFillInterest(cancellation); - } - @Override public boolean flush(ByteBuffer... buffer) throws IOException { diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/io/MockEndPoint.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/io/MockEndPoint.java index 8a541ea10aee..b57c0f9bff9e 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/io/MockEndPoint.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/io/MockEndPoint.java @@ -23,7 +23,6 @@ import java.nio.ByteBuffer; import java.nio.channels.ReadPendingException; import java.nio.channels.WritePendingException; -import java.util.function.Supplier; import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; @@ -117,12 +116,6 @@ public void fillInterested(Callback callback) throws ReadPendingException throw new UnsupportedOperationException(NOT_SUPPORTED); } - @Override - public Throwable cancelFillInterest(Supplier cancellation) - { - throw new UnsupportedOperationException(NOT_SUPPORTED); - } - @Override public boolean tryFillInterested(Callback callback) { diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/BlockedIOTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/BlockedIOTest.java new file mode 100644 index 000000000000..4e4ac600bee7 --- /dev/null +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/BlockedIOTest.java @@ -0,0 +1,144 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.http.client; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.client.util.DeferredContentProvider; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.handler.AbstractHandler; +import org.eclipse.jetty.util.BufferUtil; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ArgumentsSource; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.core.Is.is; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class BlockedIOTest extends AbstractTest +{ + @Override + public void init(Transport transport) throws IOException + { + setScenario(new TransportScenario(transport)); + } + + @ParameterizedTest + @ArgumentsSource(TransportProvider.class) + public void testBlockingReadThenNormalComplete(Transport transport) throws Exception + { + CountDownLatch started = new CountDownLatch(1); + CountDownLatch stopped = new CountDownLatch(1); + AtomicReference readException = new AtomicReference<>(); + AtomicReference rereadException = new AtomicReference<>(); + + init(transport); + scenario.start(new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + new Thread(() -> + { + try + { + int b = baseRequest.getHttpInput().read(); + if (b == '1') + { + started.countDown(); + if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE) + throw new IllegalStateException(); + } + } + catch (Throwable ex1) + { + readException.set(ex1); + try + { + if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE) + throw new IllegalStateException(); + } + catch (Throwable ex2) + { + rereadException.set(ex2); + } + finally + { + stopped.countDown(); + } + } + }).start(); + + try + { + // wait for thread to start and read first byte + started.await(10, TimeUnit.SECONDS); + // give it time to block on second byte + Thread.sleep(1000); + } + catch (Throwable e) + { + throw new ServletException(e); + } + + response.setStatus(200); + response.setContentType("text/plain"); + response.getOutputStream().print("OK\r\n"); + } + }); + + DeferredContentProvider contentProvider = new DeferredContentProvider(); + CountDownLatch ok = new CountDownLatch(2); + scenario.client.POST(scenario.newURI()) + .content(contentProvider) + .onResponseContent((response, content) -> + { + assertThat(BufferUtil.toString(content), containsString("OK")); + ok.countDown(); + }) + .onResponseSuccess(response -> + { + try + { + assertThat(response.getStatus(), is(200)); + stopped.await(10, TimeUnit.SECONDS); + ok.countDown(); + } + catch (Throwable t) + { + t.printStackTrace(); + } + }) + .send(null); + contentProvider.offer(BufferUtil.toBuffer("1")); + + assertTrue(ok.await(10, TimeUnit.SECONDS)); + assertThat(readException.get(), instanceOf(IOException.class)); + assertThat(rereadException.get(), instanceOf(IOException.class)); + } +} From 7235e492c8e370cbdfeee539f4d22f95b9f75505 Mon Sep 17 00:00:00 2001 From: gregw Date: Thu, 4 Feb 2021 09:58:24 +0100 Subject: [PATCH 10/17] Fix #5605 Unblock non container Threads Simplification. Always abort on any pending read or write in completion. --- .../eclipse/jetty/server/HttpConnection.java | 39 ++++++++++--------- .../org/eclipse/jetty/server/HttpOutput.java | 15 +++---- 2 files changed, 25 insertions(+), 29 deletions(-) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index 47caad8e43c0..be8b2c0992b9 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -376,39 +376,40 @@ private boolean parseRequestBuffer() @Override public void onCompleted() { + // If we are fill interested, then a read is pending and we must abort + if (isFillInterested()) + { + LOG.warn("Pending read in onCompleted {} {}", this, getEndPoint()); + abort(new IllegalStateException()); + } + // Handle connection upgrades if (_channel.getResponse().getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101) { Connection connection = (Connection)_channel.getRequest().getAttribute(UPGRADE_CONNECTION_ATTRIBUTE); if (connection != null) { - if (isFillInterested()) - abort(new IllegalStateException()); + if (LOG.isDebugEnabled()) + LOG.debug("Upgrade from {} to {}", this, connection); + _channel.getState().upgrade(); + getEndPoint().upgrade(connection); + _channel.recycle(); + _parser.reset(); + _generator.reset(); + if (_contentBufferReferences.get() == 0) + releaseRequestBuffer(); else { - if (LOG.isDebugEnabled()) - LOG.debug("Upgrade from {} to {}", this, connection); - _channel.getState().upgrade(); - getEndPoint().upgrade(connection); - _channel.recycle(); - _parser.reset(); - _generator.reset(); - if (_contentBufferReferences.get() == 0) - releaseRequestBuffer(); - else - { - LOG.warn("{} lingering content references?!?!", this); - _requestBuffer = null; // Not returned to pool! - _contentBufferReferences.set(0); - } + LOG.warn("{} lingering content references?!?!", this); + _requestBuffer = null; // Not returned to pool! + _contentBufferReferences.set(0); } return; } } + // Drive to EOF, EarlyEOF or Error boolean complete = _input.consumeAll(); - if (isFillInterested()) - abort(new IllegalStateException()); // Finish consuming the request // If we are still expecting diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java index 248b85caccd0..2afa950d18c0 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java @@ -450,20 +450,15 @@ public void complete(Callback callback) break; case BLOCKED: - // An operation is in progress, so we soft close now - _softClose = true; - // then cancel the operation - CancellationException cancelled = new CancellationException(); - if (_writeBlocker.fail(cancelled)) - _channel.abort(cancelled); - // then trigger a close from onWriteComplete - _state = State.CLOSE; - break; - case UNREADY: case PENDING: + LOG.warn("Pending write onComplated {} {}", this, _channel); // An operation is in progress, so we soft close now _softClose = true; + // then cancel the operation and abort the channel + CancellationException cancelled = new CancellationException(); + _writeBlocker.fail(cancelled); + _channel.abort(cancelled); // then trigger a close from onWriteComplete _state = State.CLOSE; break; From 39f6f87ca7c5b01dc438a9a3c8425650c199b9dc Mon Sep 17 00:00:00 2001 From: gregw Date: Thu, 4 Feb 2021 10:26:11 +0100 Subject: [PATCH 11/17] Fix #5605 Unblock non container Threads Simplification. Always abort on any pending read or write in completion. --- .../java/org/eclipse/jetty/server/HttpOutput.java | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java index 2afa950d18c0..2b424ed40fb2 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java @@ -455,12 +455,19 @@ public void complete(Callback callback) LOG.warn("Pending write onComplated {} {}", this, _channel); // An operation is in progress, so we soft close now _softClose = true; - // then cancel the operation and abort the channel - CancellationException cancelled = new CancellationException(); - _writeBlocker.fail(cancelled); - _channel.abort(cancelled); // then trigger a close from onWriteComplete _state = State.CLOSE; + + // But if we are blocked or there is more content to come, we must abort + // Note that this allows a pending async write to complete only if it is the last write + if (_apiState == ApiState.BLOCKED || !_channel.getResponse().isContentComplete(_written)) + { + CancellationException cancelled = new CancellationException(); + _writeBlocker.fail(cancelled); + _channel.abort(cancelled); + _state = State.CLOSED; + } + break; } break; From ed534b84efa4f04dce0b047965a339ff797ffa20 Mon Sep 17 00:00:00 2001 From: gregw Date: Fri, 5 Feb 2021 15:36:34 +0100 Subject: [PATCH 12/17] Fix #5937 updates from review. --- .../main/java/org/eclipse/jetty/server/HttpInput.java | 9 --------- .../main/java/org/eclipse/jetty/server/HttpOutput.java | 2 +- .../src/main/java/org/eclipse/jetty/server/Request.java | 2 +- 3 files changed, 2 insertions(+), 11 deletions(-) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java index 724132c3e303..91efb63e07b0 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java @@ -745,15 +745,6 @@ public boolean isError() } } - public Throwable getError() - { - synchronized (_inputQ) - { - Throwable error = _state instanceof ErrorState ? ((ErrorState)_state)._error : null; - return error == null ? new IOException() : error; - } - } - public boolean isAsync() { synchronized (_inputQ) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java index 2b424ed40fb2..9340ba11c117 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java @@ -452,7 +452,7 @@ public void complete(Callback callback) case BLOCKED: case UNREADY: case PENDING: - LOG.warn("Pending write onComplated {} {}", this, _channel); + LOG.warn("Pending write in complete {} {}", this, _channel); // An operation is in progress, so we soft close now _softClose = true; // then trigger a close from onWriteComplete diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java index 712745847f8e..634b37339991 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java @@ -1884,7 +1884,7 @@ protected void recycle() getHttpChannelState().recycle(); _requestAttributeListeners.clear(); - // Defer _input.recycle() until setMetaData on next request, so that late readers will fail + // Defer _input.recycle() until setMetaData on next request, TODO replace with recycle and reopen in 10 _metaData = null; _originalURI = null; _contextPath = null; From 9f2a4f5ad53c6bb3fc74cba45acf8411ceef4512 Mon Sep 17 00:00:00 2001 From: gregw Date: Wed, 10 Feb 2021 15:35:48 +0100 Subject: [PATCH 13/17] Fix #5605 write side refactored the complete method to consider unrecoverable API states no matter what the httpout state actually is. This avoid duplication of OPEN, CLOSING, CLOSED etc. handling. --- .../org/eclipse/jetty/server/HttpOutput.java | 120 +++++++++++------- .../jetty/http/client/BlockedIOTest.java | 3 +- 2 files changed, 73 insertions(+), 50 deletions(-) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java index 9340ba11c117..970eaf9a5b40 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java @@ -412,65 +412,87 @@ public void complete(Callback callback) ByteBuffer content = null; synchronized (_channelState) { - switch (_state) + // First check the API state for any unrecoverable situations + switch (_apiState) { - case CLOSED: - succeeded = true; + case UNREADY: // isReady() has returned false so a call to onWritePossible may happen at any time + error = new CancellationException("Completed whilst write unready"); break; - case CLOSE: - case CLOSING: - _closedCallback = Callback.combine(_closedCallback, callback); + case PENDING: // an async write is pending and may complete at any time + // If this is not the last write, then we must abort + if (!_channel.getResponse().isContentComplete(_written)) + error = new CancellationException("Completed whilst write pending"); break; - case OPEN: - if (_onError != null) - { - error = _onError; - break; - } - - _closedCallback = Callback.combine(_closedCallback, callback); - - switch (_apiState) - { - case BLOCKING: - // Output is idle blocking state, but we still do an async close - _apiState = ApiState.BLOCKED; - _state = State.CLOSING; - content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER; - break; + case BLOCKED: // another thread is blocked in a write or a close + error = new CancellationException("Completed whilst write blocked"); + break; - case ASYNC: - case READY: - // Output is idle in async state, so we can do an async close - _apiState = ApiState.PENDING; - _state = State.CLOSING; - content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER; - break; + default: + break; + } - case BLOCKED: - case UNREADY: - case PENDING: - LOG.warn("Pending write in complete {} {}", this, _channel); - // An operation is in progress, so we soft close now - _softClose = true; - // then trigger a close from onWriteComplete - _state = State.CLOSE; + // If we can't complete due to the API state, then abort + if (error != null) + { + _writeBlocker.fail(error); + _channel.abort(error); + _state = State.CLOSED; + } + else + { + // Otherwise check the output state to determine how to complete + switch (_state) + { + case CLOSED: + succeeded = true; + break; - // But if we are blocked or there is more content to come, we must abort - // Note that this allows a pending async write to complete only if it is the last write - if (_apiState == ApiState.BLOCKED || !_channel.getResponse().isContentComplete(_written)) - { - CancellationException cancelled = new CancellationException(); - _writeBlocker.fail(cancelled); - _channel.abort(cancelled); - _state = State.CLOSED; - } + case CLOSE: + case CLOSING: + _closedCallback = Callback.combine(_closedCallback, callback); + break; + case OPEN: + if (_onError != null) + { + error = _onError; break; - } - break; + } + + _closedCallback = Callback.combine(_closedCallback, callback); + + switch (_apiState) + { + case BLOCKING: + // Output is idle blocking state, but we still do an async close + _apiState = ApiState.BLOCKED; + _state = State.CLOSING; + content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER; + break; + + case ASYNC: + case READY: + // Output is idle in async state, so we can do an async close + _apiState = ApiState.PENDING; + _state = State.CLOSING; + content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER; + break; + + case UNREADY: + case PENDING: + // An operation is in progress, so we soft close now + _softClose = true; + // then trigger a close from onWriteComplete + _state = State.CLOSE; + break; + + default: + throw new IllegalStateException(); + } + break; + } } } diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/BlockedIOTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/BlockedIOTest.java index 4e4ac600bee7..ca14214e9b4d 100644 --- a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/BlockedIOTest.java +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/BlockedIOTest.java @@ -114,7 +114,8 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques DeferredContentProvider contentProvider = new DeferredContentProvider(); CountDownLatch ok = new CountDownLatch(2); - scenario.client.POST(scenario.newURI()) + scenario.client.newRequest(scenario.newURI()) + .method("POST") .content(contentProvider) .onResponseContent((response, content) -> { From 769687f773808615dced710965e5ba5b245bf50a Mon Sep 17 00:00:00 2001 From: gregw Date: Wed, 10 Feb 2021 16:38:12 +0100 Subject: [PATCH 14/17] update from the feedback on the feedback of the feedback from the review. fix javadoc --- .../src/main/java/org/eclipse/jetty/server/HttpOutput.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java index 970eaf9a5b40..ee6cc0e908df 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java @@ -70,7 +70,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable enum State { OPEN, // Open - CLOSE, // Close needed from onWriteCompletion + CLOSE, // Close needed from onWriteComplete CLOSING, // Close in progress after close API called CLOSED // Closed } @@ -309,7 +309,7 @@ else if (_state == State.CLOSE) { // Somebody called close or complete while we were writing. // We can now send a (probably empty) last buffer and then when it completes - // onWriteCompletion will be called again to actually execute the _completeCallback + // onWriteComplete will be called again to actually execute the _completeCallback _state = State.CLOSING; closeContent = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER; } From e2c710e0868746673c21f6aace99637feb3605b1 Mon Sep 17 00:00:00 2001 From: gregw Date: Thu, 11 Feb 2021 15:01:00 +0100 Subject: [PATCH 15/17] updates from review --- .../src/main/java/org/eclipse/jetty/server/HttpConnection.java | 2 +- .../src/main/java/org/eclipse/jetty/server/HttpOutput.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index be8b2c0992b9..0e31a13ea8dc 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -384,7 +384,7 @@ public void onCompleted() } // Handle connection upgrades - if (_channel.getResponse().getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101) + else if (_channel.getResponse().getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101) { Connection connection = (Connection)_channel.getRequest().getAttribute(UPGRADE_CONNECTION_ATTRIBUTE); if (connection != null) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java index ee6cc0e908df..675d1d1c622f 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java @@ -436,8 +436,8 @@ public void complete(Callback callback) // If we can't complete due to the API state, then abort if (error != null) { - _writeBlocker.fail(error); _channel.abort(error); + _writeBlocker.fail(error); _state = State.CLOSED; } else From 6b1a8c376fd8a461207c4481037107dac08dd78d Mon Sep 17 00:00:00 2001 From: gregw Date: Tue, 16 Feb 2021 17:48:49 +0100 Subject: [PATCH 16/17] updates from review --- .../org/eclipse/jetty/server/HttpConnection.java | 2 +- .../org/eclipse/jetty/server/BlockingTest.java | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index 0e31a13ea8dc..bc4a50a0e727 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -380,7 +380,7 @@ public void onCompleted() if (isFillInterested()) { LOG.warn("Pending read in onCompleted {} {}", this, getEndPoint()); - abort(new IllegalStateException()); + abort(new IllegalStateException("Pending read in onCompleted")); } // Handle connection upgrades diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/BlockingTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/BlockingTest.java index 9a2f20b38287..2ce8800efef8 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/BlockingTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/BlockingTest.java @@ -56,11 +56,11 @@ public class BlockingTest { private Server server; - ServerConnector connector; + private ServerConnector connector; private ContextHandler context; @BeforeEach - void setUp() + public void setUp() { server = new Server(); connector = new ServerConnector(server); @@ -75,7 +75,7 @@ void setUp() } @AfterEach - void tearDown() throws Exception + public void tearDown() throws Exception { server.stop(); } @@ -142,7 +142,7 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques int port = connector.getLocalPort(); try (Socket socket = new Socket("localhost", port)) { - socket.setSoTimeout(1000000); + socket.setSoTimeout(10000); OutputStream out = socket.getOutputStream(); out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1)); @@ -222,7 +222,7 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques int port = connector.getLocalPort(); try (Socket socket = new Socket("localhost", port)) { - socket.setSoTimeout(1000000); + socket.setSoTimeout(10000); OutputStream out = socket.getOutputStream(); out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1)); @@ -307,7 +307,7 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques int port = connector.getLocalPort(); try (Socket socket = new Socket("localhost", port)) { - socket.setSoTimeout(1000000); + socket.setSoTimeout(10000); OutputStream out = socket.getOutputStream(); out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1)); @@ -388,7 +388,7 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques int port = connector.getLocalPort(); try (Socket socket = new Socket("localhost", port)) { - socket.setSoTimeout(1000000); + socket.setSoTimeout(10000); OutputStream out = socket.getOutputStream(); out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1)); @@ -460,7 +460,7 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques int port = connector.getLocalPort(); try (Socket socket = new Socket("localhost", port)) { - socket.setSoTimeout(1000000); + socket.setSoTimeout(10000); OutputStream out = socket.getOutputStream(); out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1)); From 071584668cfdae1222deb53eefc2520ce4b00412 Mon Sep 17 00:00:00 2001 From: gregw Date: Tue, 16 Feb 2021 18:42:28 +0100 Subject: [PATCH 17/17] updates from review --- .../src/main/java/org/eclipse/jetty/server/HttpConnection.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index bc4a50a0e727..6849de64e864 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -380,7 +380,7 @@ public void onCompleted() if (isFillInterested()) { LOG.warn("Pending read in onCompleted {} {}", this, getEndPoint()); - abort(new IllegalStateException("Pending read in onCompleted")); + _channel.abort(new IOException("Pending read in onCompleted")); } // Handle connection upgrades