From fe359ac1172fbd7f37db13adc4679a55febae9cc Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Tue, 13 Apr 2021 16:54:53 +0200 Subject: [PATCH] Fixes #6168 - Improve handling of unconsumed content Added or expanded the scope of catch blocks to properly handle exceptions thrown by `HttpInput.Interceptor`. Signed-off-by: Simone Bordet --- .../eclipse/jetty/server/HttpConnection.java | 17 +- .../org/eclipse/jetty/server/HttpInput.java | 170 ++++--- .../jetty/test/HttpInputInterceptorTest.java | 463 ++++++++++++++++++ 3 files changed, 570 insertions(+), 80 deletions(-) create mode 100644 tests/test-integration/src/test/java/org/eclipse/jetty/test/HttpInputInterceptorTest.java diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index 0bcd46e5b9c5..5e7f373739c7 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -293,6 +293,14 @@ else if (filled < 0) } } } + catch (Throwable x) + { + if (LOG.isDebugEnabled()) + LOG.debug("{} caught exception {}", this, _channel.getState(), x); + BufferUtil.clear(_requestBuffer); + releaseRequestBuffer(); + close(); + } finally { setCurrentConnection(last); @@ -322,10 +330,7 @@ protected boolean fillAndParseForContent() private int fillRequestBuffer() { if (_contentBufferReferences.get() > 0) - { - LOG.warn("{} fill with unconsumed content!", this); - return 0; - } + throw new IllegalStateException("fill with unconsumed content on " + this); if (BufferUtil.isEmpty(_requestBuffer)) { @@ -353,11 +358,13 @@ else if (filled < 0) } catch (IOException e) { - LOG.debug(e); + if (LOG.isDebugEnabled()) + LOG.debug(e); _parser.atEOF(); return -1; } } + return 0; } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java index 91efb63e07b0..e324474bc842 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java @@ -235,7 +235,7 @@ public int available() { produceContent(); } - catch (IOException e) + catch (Throwable e) { woken = failed(e); } @@ -390,7 +390,7 @@ protected Content nextContent() throws IOException * * @return Content or null */ - protected Content nextNonSentinelContent() + protected Content nextNonSentinelContent() throws IOException { while (true) { @@ -416,7 +416,7 @@ protected Content nextNonSentinelContent() * @return the content or EOF or null if none available. * @throws IOException if retrieving the content fails */ - protected Content produceNextContext() throws IOException + protected Content produceNextContent() throws IOException { Content content = nextInterceptedContent(); if (content == null && !isFinished()) @@ -433,7 +433,7 @@ protected Content produceNextContext() throws IOException * * @return Content with remaining, a {@link SentinelContent}, or null */ - protected Content nextInterceptedContent() + protected Content nextInterceptedContent() throws IOException { // If we have a chunk produced by interception if (_intercepted != null) @@ -458,9 +458,10 @@ protected Content nextInterceptedContent() // Are we intercepting? if (_interceptor != null) { - // Intercept the current content (may be called several - // times for the same content - _intercepted = _interceptor.readFrom(_content); + // Intercept the current content. + // The interceptor may be called several + // times for the same content. + _intercepted = intercept(_content); // If interception produced new content if (_intercepted != null && _intercepted != _content) @@ -492,6 +493,24 @@ protected Content nextInterceptedContent() return null; } + private Content intercept(Content content) throws IOException + { + try + { + return _interceptor.readFrom(content); + } + catch (Throwable x) + { + IOException failure = new IOException("Bad content", x); + content.failed(failure); + HttpChannel channel = _channelState.getHttpChannel(); + Response response = channel.getResponse(); + if (response.isCommitted()) + channel.abort(failure); + throw failure; + } + } + private void consume(Content content) { if (!isError() && content instanceof EofContent) @@ -529,21 +548,6 @@ protected int get(Content content, byte[] buffer, int offset, int length) return l; } - /** - * Consumes the given content. Calls the content succeeded if all content consumed. - * - * @param content the content to consume - * @param length the number of bytes to consume - */ - protected void skip(Content content, int length) - { - int l = content.skip(length); - - _contentConsumed += l; - if (l > 0 && content.isEmpty()) - nextNonSentinelContent(); // hungry succeed - } - /** * Blocks until some content or some end-of-file event arrives. * @@ -620,10 +624,19 @@ public boolean addContent(Content content) if (LOG.isDebugEnabled()) LOG.debug("{} addContent {}", this, content); - if (nextInterceptedContent() != null) - return wakeup(); - else - return false; + try + { + if (nextInterceptedContent() != null) + return wakeup(); + else + return false; + } + catch (Throwable x) + { + if (LOG.isDebugEnabled()) + LOG.debug("", x); + return failed(x); + } } } } @@ -686,6 +699,7 @@ public boolean eof() * Consume all available content without blocking. * Raw content is counted in the {@link #getContentReceived()} statistics, but * is not intercepted nor counted in the {@link #getContentConsumed()} statistics + * * @return True if EOF was reached, false otherwise. */ public boolean consumeAll() @@ -765,9 +779,9 @@ public boolean isFinished() @Override public boolean isReady() { - try + synchronized (_inputQ) { - synchronized (_inputQ) + try { if (_listener == null) return true; @@ -775,17 +789,19 @@ public boolean isReady() return true; if (_waitingForContent) return false; - if (produceNextContext() != null) + if (produceNextContent() != null) return true; _channelState.onReadUnready(); _waitingForContent = true; + return false; + } + catch (Throwable e) + { + if (LOG.isDebugEnabled()) + LOG.debug("", e); + failed(e); + return true; } - return false; - } - catch (IOException e) - { - LOG.ignore(e); - return true; } } @@ -793,9 +809,9 @@ public boolean isReady() public void setReadListener(ReadListener readListener) { boolean woken = false; - try + synchronized (_inputQ) { - synchronized (_inputQ) + try { if (_listener != null) throw new IllegalStateException("ReadListener already set"); @@ -808,7 +824,7 @@ public void setReadListener(ReadListener readListener) } else { - Content content = produceNextContext(); + Content content = produceNextContent(); if (content != null) { _state = ASYNC; @@ -827,10 +843,13 @@ else if (_state == EOF) } } } - } - catch (IOException e) - { - throw new RuntimeIOException(e); + catch (Throwable e) + { + if (LOG.isDebugEnabled()) + LOG.debug("", e); + failed(e); + woken = _channelState.onReadReady(); + } } if (woken) @@ -895,49 +914,49 @@ private boolean wakeup() @Override public void run() { - final ReadListener listener; - Throwable error; + ReadListener listener = null; + Throwable error = null; boolean aeof = false; - synchronized (_inputQ) + try { - listener = _listener; - - if (_state == EOF) - return; - - if (_state == AEOF) + synchronized (_inputQ) { - _state = EOF; - aeof = true; - } + listener = _listener; - error = _state.getError(); - - if (!aeof && error == null) - { - Content content = nextInterceptedContent(); - if (content == null) + if (_state == EOF) return; - // Consume a directly received EOF without first calling onDataAvailable - // So -1 will never be read and only onAddDataRread or onError will be called - if (content instanceof EofContent) + if (_state == AEOF) { - consume(content); - if (_state == EARLY_EOF) - error = _state.getError(); - else if (_state == AEOF) + _state = EOF; + aeof = true; + } + + error = _state.getError(); + + if (!aeof && error == null) + { + Content content = nextInterceptedContent(); + if (content == null) + return; + + // Consume a directly received EOF without first calling onDataAvailable + // So -1 will never be read and only onAddDataRread or onError will be called + if (content instanceof EofContent) { - aeof = true; - _state = EOF; + consume(content); + if (_state == EARLY_EOF) + error = _state.getError(); + else if (_state == AEOF) + { + aeof = true; + _state = EOF; + } } } } - } - try - { if (error != null) { // TODO is this necessary to add here? @@ -958,7 +977,8 @@ else if (aeof) catch (Throwable e) { LOG.warn(e.toString()); - LOG.debug(e); + if (LOG.isDebugEnabled()) + LOG.debug("", e); try { if (aeof || error == null) @@ -1106,7 +1126,7 @@ protected static class EOFState extends State { } - protected class ErrorState extends EOFState + protected static class ErrorState extends EOFState { final Throwable _error; @@ -1155,7 +1175,7 @@ public String toString() protected static final State ASYNC = new State() { @Override - public int noContent() throws IOException + public int noContent() { return 0; } diff --git a/tests/test-integration/src/test/java/org/eclipse/jetty/test/HttpInputInterceptorTest.java b/tests/test-integration/src/test/java/org/eclipse/jetty/test/HttpInputInterceptorTest.java new file mode 100644 index 000000000000..9b8766991f5d --- /dev/null +++ b/tests/test-integration/src/test/java/org/eclipse/jetty/test/HttpInputInterceptorTest.java @@ -0,0 +1,463 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.test; + +import java.io.IOException; +import java.io.InputStream; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import javax.servlet.AsyncContext; +import javax.servlet.ReadListener; +import javax.servlet.ServletInputStream; +import javax.servlet.WriteListener; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.client.util.BytesContentProvider; +import org.eclipse.jetty.http.HttpMethod; +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.http.HttpTester; +import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.server.Connector; +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.HttpConnection; +import org.eclipse.jetty.server.HttpConnectionFactory; +import org.eclipse.jetty.server.HttpInput; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.server.handler.AbstractHandler; +import org.eclipse.jetty.util.IO; +import org.eclipse.jetty.util.component.LifeCycle; +import org.hamcrest.Matchers; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class HttpInputInterceptorTest +{ + private Server server; + private HttpConnectionFactory httpConnectionFactory = new HttpConnectionFactory(); + private ServerConnector connector; + private HttpClient client; + + private void start(Handler handler) throws Exception + { + server = new Server(); + connector = new ServerConnector(server, 1, 1, httpConnectionFactory); + server.addConnector(connector); + + server.setHandler(handler); + + client = new HttpClient(); + server.addBean(client); + + server.start(); + } + + @AfterEach + public void dispose() + { + LifeCycle.stop(server); + } + + @Test + public void testBlockingReadInterceptorThrows() throws Exception + { + CountDownLatch serverLatch = new CountDownLatch(1); + start(new AbstractHandler() + { + @Override + public void handle(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) + { + jettyRequest.setHandled(true); + + // Throw immediately from the interceptor. + jettyRequest.getHttpInput().addInterceptor(content -> + { + throw new RuntimeException(); + }); + + assertThrows(IOException.class, () -> IO.readBytes(request.getInputStream())); + serverLatch.countDown(); + response.setStatus(HttpStatus.NO_CONTENT_204); + } + }); + + ContentResponse response = client.newRequest("localhost", connector.getLocalPort()) + .method(HttpMethod.POST) + .content(new BytesContentProvider(new byte[1])) + .timeout(5, TimeUnit.SECONDS) + .send(); + + assertTrue(serverLatch.await(5, TimeUnit.SECONDS)); + assertEquals(HttpStatus.NO_CONTENT_204, response.getStatus()); + } + + @Test + public void testBlockingReadInterceptorConsumesHalfThenThrows() throws Exception + { + CountDownLatch serverLatch = new CountDownLatch(1); + start(new AbstractHandler() + { + @Override + public void handle(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) + { + jettyRequest.setHandled(true); + + // Consume some and then throw. + AtomicInteger readCount = new AtomicInteger(); + jettyRequest.getHttpInput().addInterceptor(content -> + { + int reads = readCount.incrementAndGet(); + if (reads == 1) + { + ByteBuffer buffer = content.getByteBuffer(); + int half = buffer.remaining() / 2; + int limit = buffer.limit(); + buffer.limit(buffer.position() + half); + ByteBuffer chunk = buffer.slice(); + buffer.position(buffer.limit()); + buffer.limit(limit); + return new HttpInput.Content(chunk); + } + throw new RuntimeException(); + }); + + assertThrows(IOException.class, () -> IO.readBytes(request.getInputStream())); + serverLatch.countDown(); + response.setStatus(HttpStatus.NO_CONTENT_204); + } + }); + + ContentResponse response = client.newRequest("localhost", connector.getLocalPort()) + .method(HttpMethod.POST) + .content(new BytesContentProvider(new byte[1024])) + .timeout(5, TimeUnit.SECONDS) + .send(); + + assertTrue(serverLatch.await(5, TimeUnit.SECONDS)); + assertEquals(HttpStatus.NO_CONTENT_204, response.getStatus()); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testAsyncResponseWithoutReadingRequestContentWithInterceptorThatThrows(boolean commitResponse) throws Exception + { + AtomicLong onFillableCount = new AtomicLong(); + httpConnectionFactory = new HttpConnectionFactory() + { + @Override + public Connection newConnection(Connector connector, EndPoint endPoint) + { + HttpConnection connection = new HttpConnection(getHttpConfiguration(), connector, endPoint, getHttpCompliance(), isRecordHttpComplianceViolations()) + { + @Override + public void onFillable() + { + onFillableCount.incrementAndGet(); + super.onFillable(); + } + }; + return configure(connection, connector, endPoint); + } + }; + + long delay = 500; + CountDownLatch contentLatch = new CountDownLatch(1); + start(new AbstractHandler() + { + @Override + public void handle(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException + { + jettyRequest.setHandled(true); + + AtomicInteger readCount = new AtomicInteger(); + jettyRequest.getHttpInput().addInterceptor(content -> + { + if (readCount.incrementAndGet() == 1) + { + // Tell the client to write more content. + contentLatch.countDown(); + // Wait to let the content arrive to the server. + sleep(delay); + } + throw new RuntimeException(); + }); + + AsyncContext asyncContext = request.startAsync(); + response.getOutputStream().setWriteListener(new WriteListener() + { + @Override + public void onWritePossible() throws IOException + { + if (commitResponse) + response.getOutputStream().close(); + asyncContext.complete(); + } + + @Override + public void onError(Throwable error) + { + error.printStackTrace(); + } + }); + } + }); + + try (SocketChannel client = SocketChannel.open(new InetSocketAddress("localhost", connector.getLocalPort()))) + { + // The request must have a content chunk so that it gets dispatched. + String request = "" + + "POST / HTTP/1.1\r\n" + + "Host: localhost\r\n" + + "Transfer-Encoding: chunked\r\n" + + "\r\n" + + "1\r\n" + + "A\r\n"; + client.write(StandardCharsets.UTF_8.encode(request)); + + // Write the remaining content. + // This triggers to fill and parse again after consumeAll(), + // and we want to verify that the code does not spin. + assertTrue(contentLatch.await(5, TimeUnit.SECONDS)); + String content = "" + + "1\r\n" + + "X\r\n" + + "0\r\n" + + "\r\n"; + client.write(StandardCharsets.UTF_8.encode(content)); + + // Wait and verify that we did not spin. + sleep(4 * delay); + assertThat(onFillableCount.get(), Matchers.lessThan(10L)); + + // Connection must be closed by the server. + Socket socket = client.socket(); + socket.setSoTimeout(1000); + InputStream input = socket.getInputStream(); + + HttpTester.Response response = HttpTester.parseResponse(input); + assertNotNull(response); + assertEquals(HttpStatus.OK_200, response.getStatus()); + + try + { + while (true) + { + if (input.read() < 0) + break; + } + } + catch (IOException ignored) + { + // Java 8 may throw IOException: Connection reset by peer + // but that's ok (the server closed the connection). + } + } + } + + @Test + public void testAvailableReadInterceptorThrows() throws Exception + { + CountDownLatch interceptorLatch = new CountDownLatch(1); + start(new AbstractHandler() + { + @Override + public void handle(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException + { + jettyRequest.setHandled(true); + + // Throw immediately from the interceptor. + jettyRequest.getHttpInput().addInterceptor(content -> + { + interceptorLatch.countDown(); + throw new RuntimeException(); + }); + + int available = request.getInputStream().available(); + assertEquals(0, available); + } + }); + + ContentResponse response = client.newRequest("localhost", connector.getLocalPort()) + .method(HttpMethod.POST) + .timeout(5, TimeUnit.SECONDS) + .send(); + + assertTrue(interceptorLatch.await(5, TimeUnit.SECONDS)); + assertEquals(HttpStatus.OK_200, response.getStatus()); + } + + @Test + public void testIsReadyReadInterceptorThrows() throws Exception + { + byte[] bytes = new byte[]{13}; + CountDownLatch interceptorLatch = new CountDownLatch(1); + CountDownLatch readFailureLatch = new CountDownLatch(1); + start(new AbstractHandler() + { + @Override + public void handle(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException + { + jettyRequest.setHandled(true); + + AtomicBoolean onDataAvailable = new AtomicBoolean(); + jettyRequest.getHttpInput().addInterceptor(content -> + { + if (onDataAvailable.get()) + { + interceptorLatch.countDown(); + throw new RuntimeException(); + } + else + { + return content; + } + }); + + AsyncContext asyncContext = request.startAsync(); + ServletInputStream input = request.getInputStream(); + input.setReadListener(new ReadListener() + { + @Override + public void onDataAvailable() + { + onDataAvailable.set(true); + // Now the interceptor should throw, but isReady() should not. + if (input.isReady()) + { + assertThrows(IOException.class, () -> assertEquals(bytes[0], input.read())); + readFailureLatch.countDown(); + response.setStatus(HttpStatus.NO_CONTENT_204); + asyncContext.complete(); + } + } + + @Override + public void onAllDataRead() + { + } + + @Override + public void onError(Throwable error) + { + error.printStackTrace(); + } + }); + } + }); + + ContentResponse response = client.newRequest("localhost", connector.getLocalPort()) + .method(HttpMethod.POST) + .content(new BytesContentProvider(bytes)) + .timeout(5, TimeUnit.SECONDS) + .send(); + + assertTrue(interceptorLatch.await(5, TimeUnit.SECONDS)); + assertTrue(readFailureLatch.await(5, TimeUnit.SECONDS)); + assertEquals(HttpStatus.NO_CONTENT_204, response.getStatus()); + } + + @Test + public void testSetReadListenerReadInterceptorThrows() throws Exception + { + RuntimeException failure = new RuntimeException(); + CountDownLatch interceptorLatch = new CountDownLatch(1); + start(new AbstractHandler() + { + @Override + public void handle(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException + { + jettyRequest.setHandled(true); + + // Throw immediately from the interceptor. + jettyRequest.getHttpInput().addInterceptor(content -> + { + interceptorLatch.countDown(); + failure.addSuppressed(new Throwable()); + throw failure; + }); + + AsyncContext asyncContext = request.startAsync(); + ServletInputStream input = request.getInputStream(); + input.setReadListener(new ReadListener() + { + @Override + public void onDataAvailable() + { + } + + @Override + public void onAllDataRead() + { + } + + @Override + public void onError(Throwable error) + { + assertSame(failure, error.getCause()); + response.setStatus(HttpStatus.NO_CONTENT_204); + asyncContext.complete(); + } + }); + } + }); + + ContentResponse response = client.newRequest("localhost", connector.getLocalPort()) + .method(HttpMethod.POST) + .content(new BytesContentProvider(new byte[1])) + .timeout(5, TimeUnit.SECONDS) + .send(); + + assertTrue(interceptorLatch.await(5, TimeUnit.SECONDS)); + assertEquals(HttpStatus.NO_CONTENT_204, response.getStatus()); + } + + private static void sleep(long time) + { + try + { + Thread.sleep(time); + } + catch (InterruptedException x) + { + throw new RuntimeException(x); + } + } +}