From eca8edcea57014541f09847e282837c439590afe Mon Sep 17 00:00:00 2001 From: Lachlan Date: Mon, 19 Apr 2021 11:02:44 +1000 Subject: [PATCH] Create FileBufferedResponseHandler to buffer responses into a file. (#6010) FileBufferedResponseHandler adds an HttpOutput.Interceptor to buffer all responses into a file until the output is closed. This allows the commit to be delayed until the response is complete and thus headers and response status can be changed while writing the body. Signed-off-by: Lachlan Roberts --- .../org/eclipse/jetty/http/HttpStatus.java | 3 +- .../eclipse/jetty/io/ByteArrayEndPoint.java | 20 +- .../org/eclipse/jetty/server/HttpOutput.java | 1 + .../handler/BufferedResponseHandler.java | 193 +++--- .../handler/FileBufferedResponseHandler.java | 243 +++++++ .../FileBufferedResponseHandlerTest.java | 620 ++++++++++++++++++ .../org/eclipse/jetty/util/BufferUtil.java | 10 +- 7 files changed, 995 insertions(+), 95 deletions(-) create mode 100644 jetty-server/src/main/java/org/eclipse/jetty/server/handler/FileBufferedResponseHandler.java create mode 100644 jetty-server/src/test/java/org/eclipse/jetty/server/FileBufferedResponseHandlerTest.java diff --git a/jetty-http/src/main/java/org/eclipse/jetty/http/HttpStatus.java b/jetty-http/src/main/java/org/eclipse/jetty/http/HttpStatus.java index b937ba58269e..fe0242df216f 100644 --- a/jetty-http/src/main/java/org/eclipse/jetty/http/HttpStatus.java +++ b/jetty-http/src/main/java/org/eclipse/jetty/http/HttpStatus.java @@ -325,8 +325,9 @@ public static boolean hasNoBody(int status) switch (status) { case NO_CONTENT_204: - case NOT_MODIFIED_304: + case RESET_CONTENT_205: case PARTIAL_CONTENT_206: + case NOT_MODIFIED_304: return true; default: diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java index 768d8bc60c34..0e7fb3cf7107 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java @@ -47,6 +47,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint static final Logger LOG = Log.getLogger(ByteArrayEndPoint.class); static final InetAddress NOIP; static final InetSocketAddress NOIPPORT; + private static final int MAX_BUFFER_SIZE = Integer.MAX_VALUE - 1024; static { @@ -80,6 +81,7 @@ public void run() private final Locker _locker = new Locker(); private final Condition _hasOutput = _locker.newCondition(); private final Queue _inQ = new ArrayDeque<>(); + private final int _outputSize; private ByteBuffer _out; private boolean _growOutput; @@ -129,7 +131,8 @@ public ByteArrayEndPoint(Scheduler timer, long idleTimeoutMs, ByteBuffer input, super(timer); if (BufferUtil.hasContent(input)) addInput(input); - _out = output == null ? BufferUtil.allocate(1024) : output; + _outputSize = (output == null) ? 1024 : output.capacity(); + _out = output == null ? BufferUtil.allocate(_outputSize) : output; setIdleTimeout(idleTimeoutMs); onOpen(); } @@ -296,7 +299,7 @@ public ByteBuffer takeOutput() try (Locker.Lock lock = _locker.lock()) { b = _out; - _out = BufferUtil.allocate(b.capacity()); + _out = BufferUtil.allocate(_outputSize); } getWriteFlusher().completeWrite(); return b; @@ -322,7 +325,7 @@ public ByteBuffer waitForOutput(long time, TimeUnit unit) throws InterruptedExce return null; } b = _out; - _out = BufferUtil.allocate(b.capacity()); + _out = BufferUtil.allocate(_outputSize); } getWriteFlusher().completeWrite(); return b; @@ -436,9 +439,14 @@ public boolean flush(ByteBuffer... buffers) throws IOException BufferUtil.compact(_out); if (b.remaining() > BufferUtil.space(_out)) { - ByteBuffer n = BufferUtil.allocate(_out.capacity() + b.remaining() * 2); - BufferUtil.append(n, _out); - _out = n; + // Don't grow larger than MAX_BUFFER_SIZE to avoid memory issues. + if (_out.capacity() < MAX_BUFFER_SIZE) + { + long newBufferCapacity = Math.min((long)(_out.capacity() + b.remaining() * 1.5), MAX_BUFFER_SIZE); + ByteBuffer n = BufferUtil.allocate(Math.toIntExact(newBufferCapacity)); + BufferUtil.append(n, _out); + _out = n; + } } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java index 675d1d1c622f..2a9135f2974c 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java @@ -642,6 +642,7 @@ public void close() throws IOException catch (Throwable t) { onWriteComplete(true, t); + throw t; } } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/BufferedResponseHandler.java b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/BufferedResponseHandler.java index e06589e991c0..85f3e0ed1a00 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/BufferedResponseHandler.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/BufferedResponseHandler.java @@ -20,14 +20,15 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayDeque; import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; import javax.servlet.ServletContext; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.http.HttpMethod; +import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http.MimeTypes; import org.eclipse.jetty.http.pathmap.PathSpecSet; import org.eclipse.jetty.server.HttpChannel; @@ -45,16 +46,17 @@ import org.eclipse.jetty.util.log.Logger; /** - * Buffered Response Handler *

* A Handler that can apply a {@link org.eclipse.jetty.server.HttpOutput.Interceptor} * mechanism to buffer the entire response content until the output is closed. * This allows the commit to be delayed until the response is complete and thus * headers and response status can be changed while writing the body. + *

*

* Note that the decision to buffer is influenced by the headers and status at the * first write, and thus subsequent changes to those headers will not influence the * decision to buffer or not. + *

*

* Note also that there are no memory limits to the size of the buffer, thus * this handler can represent an unbounded memory commitment if the content @@ -63,7 +65,7 @@ */ public class BufferedResponseHandler extends HandlerWrapper { - static final Logger LOG = Log.getLogger(BufferedResponseHandler.class); + private static final Logger LOG = Log.getLogger(BufferedResponseHandler.class); private final IncludeExclude _methods = new IncludeExclude<>(); private final IncludeExclude _paths = new IncludeExclude<>(PathSpecSet.class); @@ -71,10 +73,7 @@ public class BufferedResponseHandler extends HandlerWrapper public BufferedResponseHandler() { - // include only GET requests - _methods.include(HttpMethod.GET.asString()); - // Exclude images, aduio and video from buffering for (String type : MimeTypes.getKnownMimeTypes()) { if (type.startsWith("image/") || @@ -82,7 +81,9 @@ public BufferedResponseHandler() type.startsWith("video/")) _mimeTypes.exclude(type); } - LOG.debug("{} mime types {}", this, _mimeTypes); + + if (LOG.isDebugEnabled()) + LOG.debug("{} mime types {}", this, _mimeTypes); } public IncludeExclude getMethodIncludeExclude() @@ -100,102 +101,147 @@ public IncludeExclude getMimeIncludeExclude() return _mimeTypes; } - /** - * @see org.eclipse.jetty.server.handler.HandlerWrapper#handle(java.lang.String, org.eclipse.jetty.server.Request, javax.servlet.http.HttpServletRequest, javax.servlet.http.HttpServletResponse) - */ + protected boolean isMimeTypeBufferable(String mimetype) + { + return _mimeTypes.test(mimetype); + } + + protected boolean isPathBufferable(String requestURI) + { + if (requestURI == null) + return true; + + return _paths.test(requestURI); + } + + protected boolean shouldBuffer(HttpChannel channel, boolean last) + { + if (last) + return false; + + Response response = channel.getResponse(); + int status = response.getStatus(); + if (HttpStatus.hasNoBody(status) || HttpStatus.isRedirection(status)) + return false; + + String ct = response.getContentType(); + if (ct == null) + return true; + + ct = MimeTypes.getContentTypeWithoutCharset(ct); + return isMimeTypeBufferable(StringUtil.asciiToLowerCase(ct)); + } + @Override public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { ServletContext context = baseRequest.getServletContext(); String path = context == null ? baseRequest.getRequestURI() : URIUtil.addPaths(baseRequest.getServletPath(), baseRequest.getPathInfo()); - LOG.debug("{} handle {} in {}", this, baseRequest, context); - HttpOutput out = baseRequest.getResponse().getHttpOutput(); + if (LOG.isDebugEnabled()) + LOG.debug("{} handle {} in {}", this, baseRequest, context); - // Are we already being gzipped? + // Are we already buffering? + HttpOutput out = baseRequest.getResponse().getHttpOutput(); HttpOutput.Interceptor interceptor = out.getInterceptor(); while (interceptor != null) { if (interceptor instanceof BufferedInterceptor) { - LOG.debug("{} already intercepting {}", this, request); + if (LOG.isDebugEnabled()) + LOG.debug("{} already intercepting {}", this, request); _handler.handle(target, baseRequest, request, response); return; } interceptor = interceptor.getNextInterceptor(); } - // If not a supported method - no Vary because no matter what client, this URI is always excluded + // If not a supported method this URI is always excluded. if (!_methods.test(baseRequest.getMethod())) { - LOG.debug("{} excluded by method {}", this, request); + if (LOG.isDebugEnabled()) + LOG.debug("{} excluded by method {}", this, request); _handler.handle(target, baseRequest, request, response); return; } - // If not a supported URI- no Vary because no matter what client, this URI is always excluded - // Use pathInfo because this is be + // If not a supported path this URI is always excluded. if (!isPathBufferable(path)) { - LOG.debug("{} excluded by path {}", this, request); + if (LOG.isDebugEnabled()) + LOG.debug("{} excluded by path {}", this, request); _handler.handle(target, baseRequest, request, response); return; } - // If the mime type is known from the path, then apply mime type filtering + // If the mime type is known from the path then apply mime type filtering. String mimeType = context == null ? MimeTypes.getDefaultMimeByExtension(path) : context.getMimeType(path); if (mimeType != null) { mimeType = MimeTypes.getContentTypeWithoutCharset(mimeType); if (!isMimeTypeBufferable(mimeType)) { - LOG.debug("{} excluded by path suffix mime type {}", this, request); + if (LOG.isDebugEnabled()) + LOG.debug("{} excluded by path suffix mime type {}", this, request); + // handle normally without setting vary header _handler.handle(target, baseRequest, request, response); return; } } - // install interceptor and handle - out.setInterceptor(new BufferedInterceptor(baseRequest.getHttpChannel(), out.getInterceptor())); - + // Install buffered interceptor and handle. + out.setInterceptor(newBufferedInterceptor(baseRequest.getHttpChannel(), out.getInterceptor())); if (_handler != null) _handler.handle(target, baseRequest, request, response); } - protected boolean isMimeTypeBufferable(String mimetype) + protected BufferedInterceptor newBufferedInterceptor(HttpChannel httpChannel, Interceptor interceptor) { - return _mimeTypes.test(mimetype); + return new ArrayBufferedInterceptor(httpChannel, interceptor); } - protected boolean isPathBufferable(String requestURI) + /** + * An {@link HttpOutput.Interceptor} which is created by {@link #newBufferedInterceptor(HttpChannel, Interceptor)} + * and is used by the implementation to buffer outgoing content. + */ + protected interface BufferedInterceptor extends HttpOutput.Interceptor { - if (requestURI == null) - return true; - - return _paths.test(requestURI); } - private class BufferedInterceptor implements HttpOutput.Interceptor + private class ArrayBufferedInterceptor implements BufferedInterceptor { - final Interceptor _next; - final HttpChannel _channel; - final Queue _buffers = new ConcurrentLinkedQueue<>(); - Boolean _aggregating; - ByteBuffer _aggregate; + private final Interceptor _next; + private final HttpChannel _channel; + private final Queue _buffers = new ArrayDeque<>(); + private Boolean _aggregating; + private ByteBuffer _aggregate; - public BufferedInterceptor(HttpChannel httpChannel, Interceptor interceptor) + public ArrayBufferedInterceptor(HttpChannel httpChannel, Interceptor interceptor) { _next = interceptor; _channel = httpChannel; } + @Override + public Interceptor getNextInterceptor() + { + return _next; + } + + @Override + public boolean isOptimizedForDirectBuffers() + { + return false; + } + @Override public void resetBuffer() { _buffers.clear(); _aggregating = null; _aggregate = null; + BufferedInterceptor.super.resetBuffer(); } @Override @@ -203,58 +249,43 @@ public void write(ByteBuffer content, boolean last, Callback callback) { if (LOG.isDebugEnabled()) LOG.debug("{} write last={} {}", this, last, BufferUtil.toDetailString(content)); - // if we are not committed, have to decide if we should aggregate or not + + // If we are not committed, have to decide if we should aggregate or not. if (_aggregating == null) - { - Response response = _channel.getResponse(); - int sc = response.getStatus(); - if (sc > 0 && (sc < 200 || sc == 204 || sc == 205 || sc >= 300)) - _aggregating = Boolean.FALSE; // No body - else - { - String ct = response.getContentType(); - if (ct == null) - _aggregating = Boolean.TRUE; - else - { - ct = MimeTypes.getContentTypeWithoutCharset(ct); - _aggregating = isMimeTypeBufferable(StringUtil.asciiToLowerCase(ct)); - } - } - } + _aggregating = shouldBuffer(_channel, last); - // If we are not aggregating, then handle normally - if (!_aggregating.booleanValue()) + // If we are not aggregating, then handle normally. + if (!_aggregating) { getNextInterceptor().write(content, last, callback); return; } - // If last if (last) { - // Add the current content to the buffer list without a copy + // Add the current content to the buffer list without a copy. if (BufferUtil.length(content) > 0) - _buffers.add(content); + _buffers.offer(content); if (LOG.isDebugEnabled()) LOG.debug("{} committing {}", this, _buffers.size()); - commit(_buffers, callback); + commit(callback); } else { if (LOG.isDebugEnabled()) LOG.debug("{} aggregating", this); - // Aggregate the content into buffer chain + // Aggregate the content into buffer chain. while (BufferUtil.hasContent(content)) { - // Do we need a new aggregate buffer + // Do we need a new aggregate buffer. if (BufferUtil.space(_aggregate) == 0) { + // TODO: use a buffer pool always allocating with outputBufferSize to avoid polluting the ByteBufferPool. int size = Math.max(_channel.getHttpConfiguration().getOutputBufferSize(), BufferUtil.length(content)); - _aggregate = BufferUtil.allocate(size); // TODO use a buffer pool - _buffers.add(_aggregate); + _aggregate = BufferUtil.allocate(size); + _buffers.offer(_aggregate); } BufferUtil.append(_aggregate, content); @@ -263,33 +294,23 @@ public void write(ByteBuffer content, boolean last, Callback callback) } } - @Override - public Interceptor getNextInterceptor() - { - return _next; - } - - @Override - public boolean isOptimizedForDirectBuffers() - { - return false; - } - - protected void commit(Queue buffers, Callback callback) + private void commit(Callback callback) { - // If only 1 buffer if (_buffers.size() == 0) + { getNextInterceptor().write(BufferUtil.EMPTY_BUFFER, true, callback); + } else if (_buffers.size() == 1) - // just flush it with the last callback - getNextInterceptor().write(_buffers.remove(), true, callback); + { + getNextInterceptor().write(_buffers.poll(), true, callback); + } else { - // Create an iterating callback to do the writing + // Create an iterating callback to do the writing. IteratingCallback icb = new IteratingCallback() { @Override - protected Action process() throws Exception + protected Action process() { ByteBuffer buffer = _buffers.poll(); if (buffer == null) @@ -302,14 +323,14 @@ protected Action process() throws Exception @Override protected void onCompleteSuccess() { - // Signal last callback + // Signal last callback. callback.succeeded(); } @Override protected void onCompleteFailure(Throwable cause) { - // Signal last callback + // Signal last callback. callback.failed(cause); } }; diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/FileBufferedResponseHandler.java b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/FileBufferedResponseHandler.java new file mode 100644 index 000000000000..85b32492abc4 --- /dev/null +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/FileBufferedResponseHandler.java @@ -0,0 +1,243 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.server.handler; + +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.Objects; + +import org.eclipse.jetty.server.HttpChannel; +import org.eclipse.jetty.server.HttpOutput.Interceptor; +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.IO; +import org.eclipse.jetty.util.IteratingCallback; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; + +/** + *

+ * A Handler that can apply a {@link org.eclipse.jetty.server.HttpOutput.Interceptor} + * mechanism to buffer the entire response content until the output is closed. + * This allows the commit to be delayed until the response is complete and thus + * headers and response status can be changed while writing the body. + *

+ *

+ * Note that the decision to buffer is influenced by the headers and status at the + * first write, and thus subsequent changes to those headers will not influence the + * decision to buffer or not. + *

+ *

+ * Note also that there are no memory limits to the size of the buffer, thus + * this handler can represent an unbounded memory commitment if the content + * generated can also be unbounded. + *

+ */ +public class FileBufferedResponseHandler extends BufferedResponseHandler +{ + private static final Logger LOG = Log.getLogger(FileBufferedResponseHandler.class); + + private Path _tempDir = new File(System.getProperty("java.io.tmpdir")).toPath(); + + public Path getTempDir() + { + return _tempDir; + } + + public void setTempDir(Path tempDir) + { + _tempDir = Objects.requireNonNull(tempDir); + } + + @Override + protected BufferedInterceptor newBufferedInterceptor(HttpChannel httpChannel, Interceptor interceptor) + { + return new FileBufferedInterceptor(httpChannel, interceptor); + } + + private class FileBufferedInterceptor implements BufferedResponseHandler.BufferedInterceptor + { + private static final int MAX_MAPPED_BUFFER_SIZE = Integer.MAX_VALUE / 2; + + private final Interceptor _next; + private final HttpChannel _channel; + private Boolean _aggregating; + private Path _filePath; + private OutputStream _fileOutputStream; + + public FileBufferedInterceptor(HttpChannel httpChannel, Interceptor interceptor) + { + _next = interceptor; + _channel = httpChannel; + } + + @Override + public Interceptor getNextInterceptor() + { + return _next; + } + + @Override + public boolean isOptimizedForDirectBuffers() + { + return false; + } + + @Override + public void resetBuffer() + { + dispose(); + BufferedInterceptor.super.resetBuffer(); + } + + private void dispose() + { + IO.close(_fileOutputStream); + _fileOutputStream = null; + _aggregating = null; + + if (_filePath != null) + { + try + { + Files.delete(_filePath); + } + catch (Throwable t) + { + LOG.warn("Could not delete file {}", _filePath, t); + } + _filePath = null; + } + } + + @Override + public void write(ByteBuffer content, boolean last, Callback callback) + { + if (LOG.isDebugEnabled()) + LOG.debug("{} write last={} {}", this, last, BufferUtil.toDetailString(content)); + + // If we are not committed, must decide if we should aggregate or not. + if (_aggregating == null) + _aggregating = shouldBuffer(_channel, last); + + // If we are not aggregating, then handle normally. + if (!_aggregating) + { + getNextInterceptor().write(content, last, callback); + return; + } + + if (LOG.isDebugEnabled()) + LOG.debug("{} aggregating", this); + + try + { + if (BufferUtil.hasContent(content)) + aggregate(content); + } + catch (Throwable t) + { + dispose(); + callback.failed(t); + return; + } + + if (last) + commit(callback); + else + callback.succeeded(); + } + + private void aggregate(ByteBuffer content) throws IOException + { + if (_fileOutputStream == null) + { + // Create a new OutputStream to a file. + _filePath = Files.createTempFile(_tempDir, "BufferedResponse", ""); + _fileOutputStream = Files.newOutputStream(_filePath, StandardOpenOption.WRITE); + } + + BufferUtil.writeTo(content, _fileOutputStream); + } + + private void commit(Callback callback) + { + if (_fileOutputStream == null) + { + // We have no content to write, signal next interceptor that we are finished. + getNextInterceptor().write(BufferUtil.EMPTY_BUFFER, true, callback); + return; + } + + try + { + _fileOutputStream.close(); + _fileOutputStream = null; + } + catch (Throwable t) + { + dispose(); + callback.failed(t); + return; + } + + // Create an iterating callback to do the writing + IteratingCallback icb = new IteratingCallback() + { + private final long fileLength = _filePath.toFile().length(); + private long _pos = 0; + private boolean _last = false; + + @Override + protected Action process() throws Exception + { + if (_last) + return Action.SUCCEEDED; + + long len = Math.min(MAX_MAPPED_BUFFER_SIZE, fileLength - _pos); + _last = (_pos + len == fileLength); + ByteBuffer buffer = BufferUtil.toMappedBuffer(_filePath, _pos, len); + getNextInterceptor().write(buffer, _last, this); + _pos += len; + return Action.SCHEDULED; + } + + @Override + protected void onCompleteSuccess() + { + dispose(); + callback.succeeded(); + } + + @Override + protected void onCompleteFailure(Throwable cause) + { + dispose(); + callback.failed(cause); + } + }; + icb.iterate(); + } + } +} diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/FileBufferedResponseHandlerTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/FileBufferedResponseHandlerTest.java new file mode 100644 index 000000000000..3f6fb80306c7 --- /dev/null +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/FileBufferedResponseHandlerTest.java @@ -0,0 +1,620 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.server; + +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintWriter; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.time.Duration; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import javax.servlet.ServletException; +import javax.servlet.ServletOutputStream; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.http.HttpTester; +import org.eclipse.jetty.server.handler.AbstractHandler; +import org.eclipse.jetty.server.handler.FileBufferedResponseHandler; +import org.eclipse.jetty.server.handler.HandlerCollection; +import org.eclipse.jetty.toolchain.test.FS; +import org.eclipse.jetty.toolchain.test.MavenTestingUtils; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class FileBufferedResponseHandlerTest +{ + private static final Logger LOG = Log.getLogger(FileBufferedResponseHandlerTest.class); + + private Server _server; + private LocalConnector _localConnector; + private ServerConnector _serverConnector; + private Path _testDir; + private FileBufferedResponseHandler _bufferedHandler; + + @BeforeEach + public void before() throws Exception + { + _testDir = MavenTestingUtils.getTargetTestingPath(FileBufferedResponseHandlerTest.class.getName()); + FS.ensureDirExists(_testDir); + + _server = new Server(); + HttpConfiguration config = new HttpConfiguration(); + config.setOutputBufferSize(1024); + config.setOutputAggregationSize(256); + + _localConnector = new LocalConnector(_server, new HttpConnectionFactory(config)); + _localConnector.setIdleTimeout(Duration.ofMinutes(1).toMillis()); + _server.addConnector(_localConnector); + _serverConnector = new ServerConnector(_server, new HttpConnectionFactory(config)); + _server.addConnector(_serverConnector); + + _bufferedHandler = new FileBufferedResponseHandler(); + _bufferedHandler.setTempDir(_testDir); + _bufferedHandler.getPathIncludeExclude().include("/include/*"); + _bufferedHandler.getPathIncludeExclude().exclude("*.exclude"); + _bufferedHandler.getMimeIncludeExclude().exclude("text/excluded"); + _server.setHandler(_bufferedHandler); + + FS.ensureEmpty(_testDir); + } + + @AfterEach + public void after() throws Exception + { + _server.stop(); + } + + @Test + public void testPathNotIncluded() throws Exception + { + _bufferedHandler.setHandler(new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException + { + baseRequest.setHandled(true); + response.setBufferSize(10); + PrintWriter writer = response.getWriter(); + writer.println("a string larger than the buffer size"); + writer.println("Committed: " + response.isCommitted()); + writer.println("NumFiles: " + getNumFiles()); + } + }); + + _server.start(); + String rawResponse = _localConnector.getResponse("GET /path HTTP/1.1\r\nHost: localhost\r\n\r\n"); + HttpTester.Response response = HttpTester.parseResponse(rawResponse); + String responseContent = response.getContent(); + + // The response was committed after the first write and we never created a file to buffer the response into. + assertThat(response.getStatus(), is(HttpStatus.OK_200)); + assertThat(responseContent, containsString("Committed: true")); + assertThat(responseContent, containsString("NumFiles: 0")); + assertThat(getNumFiles(), is(0)); + } + + @Test + public void testIncludedByPath() throws Exception + { + _bufferedHandler.setHandler(new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException + { + baseRequest.setHandled(true); + response.setBufferSize(10); + PrintWriter writer = response.getWriter(); + writer.println("a string larger than the buffer size"); + writer.println("Committed: " + response.isCommitted()); + writer.println("NumFiles: " + getNumFiles()); + } + }); + + _server.start(); + String rawResponse = _localConnector.getResponse("GET /include/path HTTP/1.1\r\nHost: localhost\r\n\r\n"); + HttpTester.Response response = HttpTester.parseResponse(rawResponse); + String responseContent = response.getContent(); + + // The response was not committed after the first write and a file was created to buffer the response. + assertThat(response.getStatus(), is(HttpStatus.OK_200)); + assertThat(responseContent, containsString("Committed: false")); + assertThat(responseContent, containsString("NumFiles: 1")); + assertThat(getNumFiles(), is(0)); + } + + @Test + public void testExcludedByPath() throws Exception + { + _bufferedHandler.setHandler(new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException + { + baseRequest.setHandled(true); + response.setBufferSize(10); + PrintWriter writer = response.getWriter(); + writer.println("a string larger than the buffer size"); + writer.println("Committed: " + response.isCommitted()); + writer.println("NumFiles: " + getNumFiles()); + } + }); + + _server.start(); + String rawResponse = _localConnector.getResponse("GET /include/path.exclude HTTP/1.1\r\nHost: localhost\r\n\r\n"); + HttpTester.Response response = HttpTester.parseResponse(rawResponse); + String responseContent = response.getContent(); + + // The response was committed after the first write and we never created a file to buffer the response into. + assertThat(response.getStatus(), is(HttpStatus.OK_200)); + assertThat(responseContent, containsString("Committed: true")); + assertThat(responseContent, containsString("NumFiles: 0")); + assertThat(getNumFiles(), is(0)); + } + + @Test + public void testExcludedByMime() throws Exception + { + String excludedMimeType = "text/excluded"; + _bufferedHandler.setHandler(new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException + { + baseRequest.setHandled(true); + response.setContentType(excludedMimeType); + response.setBufferSize(10); + PrintWriter writer = response.getWriter(); + writer.println("a string larger than the buffer size"); + writer.println("Committed: " + response.isCommitted()); + writer.println("NumFiles: " + getNumFiles()); + } + }); + + _server.start(); + String rawResponse = _localConnector.getResponse("GET /include/path HTTP/1.1\r\nHost: localhost\r\n\r\n"); + HttpTester.Response response = HttpTester.parseResponse(rawResponse); + String responseContent = response.getContent(); + + // The response was committed after the first write and we never created a file to buffer the response into. + assertThat(response.getStatus(), is(HttpStatus.OK_200)); + assertThat(responseContent, containsString("Committed: true")); + assertThat(responseContent, containsString("NumFiles: 0")); + assertThat(getNumFiles(), is(0)); + } + + @Test + public void testFlushed() throws Exception + { + _bufferedHandler.setHandler(new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException + { + baseRequest.setHandled(true); + response.setBufferSize(1024); + PrintWriter writer = response.getWriter(); + writer.println("a string smaller than the buffer size"); + writer.println("NumFilesBeforeFlush: " + getNumFiles()); + writer.flush(); + writer.println("Committed: " + response.isCommitted()); + writer.println("NumFiles: " + getNumFiles()); + } + }); + + _server.start(); + String rawResponse = _localConnector.getResponse("GET /include/path HTTP/1.1\r\nHost: localhost\r\n\r\n"); + HttpTester.Response response = HttpTester.parseResponse(rawResponse); + String responseContent = response.getContent(); + + // The response was not committed after the buffer was flushed and a file was created to buffer the response. + assertThat(response.getStatus(), is(HttpStatus.OK_200)); + assertThat(responseContent, containsString("NumFilesBeforeFlush: 0")); + assertThat(responseContent, containsString("Committed: false")); + assertThat(responseContent, containsString("NumFiles: 1")); + assertThat(getNumFiles(), is(0)); + } + + @Test + public void testClosed() throws Exception + { + _bufferedHandler.setHandler(new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException + { + baseRequest.setHandled(true); + response.setBufferSize(10); + PrintWriter writer = response.getWriter(); + writer.println("a string larger than the buffer size"); + writer.println("NumFiles: " + getNumFiles()); + writer.close(); + writer.println("writtenAfterClose"); + } + }); + + _server.start(); + String rawResponse = _localConnector.getResponse("GET /include/path HTTP/1.1\r\nHost: localhost\r\n\r\n"); + HttpTester.Response response = HttpTester.parseResponse(rawResponse); + String responseContent = response.getContent(); + + // The content written after close was not sent. + assertThat(response.getStatus(), is(HttpStatus.OK_200)); + assertThat(responseContent, not(containsString("writtenAfterClose"))); + assertThat(responseContent, containsString("NumFiles: 1")); + assertThat(getNumFiles(), is(0)); + } + + @Test + public void testBufferSizeBig() throws Exception + { + int bufferSize = 4096; + String largeContent = generateContent(bufferSize - 64); + _bufferedHandler.setHandler(new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException + { + baseRequest.setHandled(true); + response.setBufferSize(bufferSize); + PrintWriter writer = response.getWriter(); + writer.println(largeContent); + writer.println("Committed: " + response.isCommitted()); + writer.println("NumFiles: " + getNumFiles()); + } + }); + + _server.start(); + String rawResponse = _localConnector.getResponse("GET /include/path HTTP/1.1\r\nHost: localhost\r\n\r\n"); + HttpTester.Response response = HttpTester.parseResponse(rawResponse); + String responseContent = response.getContent(); + + // The content written was not buffered as a file as it was less than the buffer size. + assertThat(response.getStatus(), is(HttpStatus.OK_200)); + assertThat(responseContent, not(containsString("writtenAfterClose"))); + assertThat(responseContent, containsString("Committed: false")); + assertThat(responseContent, containsString("NumFiles: 0")); + assertThat(getNumFiles(), is(0)); + } + + @Test + public void testFlushEmpty() throws Exception + { + _bufferedHandler.setHandler(new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException + { + baseRequest.setHandled(true); + response.setBufferSize(1024); + PrintWriter writer = response.getWriter(); + writer.flush(); + int numFiles = getNumFiles(); + writer.println("NumFiles: " + numFiles); + } + }); + + _server.start(); + String rawResponse = _localConnector.getResponse("GET /include/path HTTP/1.1\r\nHost: localhost\r\n\r\n"); + HttpTester.Response response = HttpTester.parseResponse(rawResponse); + String responseContent = response.getContent(); + + // The flush should not create the file unless there is content to write. + assertThat(response.getStatus(), is(HttpStatus.OK_200)); + assertThat(responseContent, containsString("NumFiles: 0")); + assertThat(getNumFiles(), is(0)); + } + + @Test + public void testReset() throws Exception + { + _bufferedHandler.setHandler(new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException + { + baseRequest.setHandled(true); + response.setBufferSize(8); + PrintWriter writer = response.getWriter(); + writer.println("THIS WILL BE RESET"); + writer.flush(); + writer.println("THIS WILL BE RESET"); + int numFilesBeforeReset = getNumFiles(); + response.resetBuffer(); + int numFilesAfterReset = getNumFiles(); + + writer.println("NumFilesBeforeReset: " + numFilesBeforeReset); + writer.println("NumFilesAfterReset: " + numFilesAfterReset); + writer.println("a string larger than the buffer size"); + writer.println("NumFilesAfterWrite: " + getNumFiles()); + } + }); + + _server.start(); + String rawResponse = _localConnector.getResponse("GET /include/path HTTP/1.1\r\nHost: localhost\r\n\r\n"); + HttpTester.Response response = HttpTester.parseResponse(rawResponse); + String responseContent = response.getContent(); + + // Resetting the response buffer will delete the file. + assertThat(response.getStatus(), is(HttpStatus.OK_200)); + assertThat(responseContent, not(containsString("THIS WILL BE RESET"))); + assertThat(responseContent, containsString("NumFilesBeforeReset: 1")); + assertThat(responseContent, containsString("NumFilesAfterReset: 0")); + assertThat(responseContent, containsString("NumFilesAfterWrite: 1")); + assertThat(getNumFiles(), is(0)); + } + + @Test + public void testFileLargerThanMaxInteger() throws Exception + { + long fileSize = Integer.MAX_VALUE + 1234L; + byte[] bytes = randomBytes(1024 * 1024); + + _bufferedHandler.setHandler(new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException + { + baseRequest.setHandled(true); + ServletOutputStream outputStream = response.getOutputStream(); + + long written = 0; + while (written < fileSize) + { + int length = Math.toIntExact(Math.min(bytes.length, fileSize - written)); + outputStream.write(bytes, 0, length); + written += length; + } + outputStream.flush(); + + response.setHeader("NumFiles", Integer.toString(getNumFiles())); + response.setHeader("FileSize", Long.toString(getFileSize())); + } + }); + + _server.start(); + + AtomicLong received = new AtomicLong(); + HttpTester.Response response = new HttpTester.Response() + { + @Override + public boolean content(ByteBuffer ref) + { + // Verify the content is what was sent. + while (ref.hasRemaining()) + { + byte byteFromBuffer = ref.get(); + long totalReceived = received.getAndIncrement(); + int bytesIndex = (int)(totalReceived % bytes.length); + byte byteFromArray = bytes[bytesIndex]; + + if (byteFromBuffer != byteFromArray) + { + LOG.warn("Mismatch at index {} received bytes {}, {}!={}", bytesIndex, totalReceived, byteFromBuffer, byteFromArray, new IllegalStateException()); + return true; + } + } + + return false; + } + }; + + try (Socket socket = new Socket("localhost", _serverConnector.getLocalPort())) + { + OutputStream output = socket.getOutputStream(); + String request = "GET /include/path HTTP/1.1\r\nHost: localhost\r\n\r\n"; + output.write(request.getBytes(StandardCharsets.UTF_8)); + output.flush(); + + HttpTester.Input input = HttpTester.from(socket.getInputStream()); + HttpTester.parseResponse(input, response); + } + + assertTrue(response.isComplete()); + assertThat(response.get("NumFiles"), is("1")); + assertThat(response.get("FileSize"), is(Long.toString(fileSize))); + assertThat(received.get(), is(fileSize)); + assertThat(getNumFiles(), is(0)); + } + + @Test + public void testNextInterceptorFailed() throws Exception + { + AbstractHandler failingInterceptorHandler = new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + HttpOutput httpOutput = baseRequest.getResponse().getHttpOutput(); + HttpOutput.Interceptor nextInterceptor = httpOutput.getInterceptor(); + httpOutput.setInterceptor(new HttpOutput.Interceptor() + { + @Override + public void write(ByteBuffer content, boolean last, Callback callback) + { + callback.failed(new Throwable("intentionally throwing from interceptor")); + } + + @Override + public HttpOutput.Interceptor getNextInterceptor() + { + return nextInterceptor; + } + + @Override + public boolean isOptimizedForDirectBuffers() + { + return false; + } + }); + } + }; + + _server.setHandler(new HandlerCollection(failingInterceptorHandler, _server.getHandler())); + CompletableFuture errorFuture = new CompletableFuture<>(); + _bufferedHandler.setHandler(new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException + { + baseRequest.setHandled(true); + byte[] chunk1 = "this content will ".getBytes(); + byte[] chunk2 = "be buffered in a file".getBytes(); + response.setContentLength(chunk1.length + chunk2.length); + ServletOutputStream outputStream = response.getOutputStream(); + + // Write chunk1 and then flush so it is written to the file. + outputStream.write(chunk1); + outputStream.flush(); + assertThat(getNumFiles(), is(1)); + + try + { + // ContentLength is set so it knows this is the last write. + // This will cause the file to be written to the next interceptor which will fail. + outputStream.write(chunk2); + } + catch (Throwable t) + { + errorFuture.complete(t); + throw t; + } + } + }); + + _server.start(); + String rawResponse = _localConnector.getResponse("GET /include/path HTTP/1.1\r\nHost: localhost\r\n\r\n"); + HttpTester.Response response = HttpTester.parseResponse(rawResponse); + + // Response was aborted. + assertThat(response.getStatus(), is(0)); + + // We failed because of the next interceptor. + Throwable error = errorFuture.get(5, TimeUnit.SECONDS); + assertThat(error.getMessage(), containsString("intentionally throwing from interceptor")); + + // All files were deleted. + assertThat(getNumFiles(), is(0)); + } + + @Test + public void testFileWriteFailed() throws Exception + { + // Set the temp directory to an empty directory so that the file cannot be created. + File tempDir = MavenTestingUtils.getTargetTestingDir(getClass().getSimpleName()); + FS.ensureDeleted(tempDir); + _bufferedHandler.setTempDir(tempDir.toPath()); + + CompletableFuture errorFuture = new CompletableFuture<>(); + _bufferedHandler.setHandler(new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException + { + baseRequest.setHandled(true); + ServletOutputStream outputStream = response.getOutputStream(); + byte[] content = "this content will be buffered in a file".getBytes(); + + try + { + // Write the content and flush it to the file. + // This should throw as it cannot create the file to aggregate into. + outputStream.write(content); + outputStream.flush(); + } + catch (Throwable t) + { + errorFuture.complete(t); + throw t; + } + } + }); + + _server.start(); + String rawResponse = _localConnector.getResponse("GET /include/path HTTP/1.1\r\nHost: localhost\r\n\r\n"); + HttpTester.Response response = HttpTester.parseResponse(rawResponse); + + // Response was aborted. + assertThat(response.getStatus(), is(0)); + + // We failed because cannot create the file. + Throwable error = errorFuture.get(5, TimeUnit.SECONDS); + assertThat(error, instanceOf(NoSuchFileException.class)); + + // No files were created. + assertThat(getNumFiles(), is(0)); + } + + private int getNumFiles() + { + File[] files = _testDir.toFile().listFiles(); + if (files == null) + return 0; + + return files.length; + } + + private long getFileSize() + { + File[] files = _testDir.toFile().listFiles(); + assertNotNull(files); + assertThat(files.length, is(1)); + return files[0].length(); + } + + private static String generateContent(int size) + { + Random random = new Random(); + StringBuilder stringBuilder = new StringBuilder(size); + for (int i = 0; i < size; i++) + { + stringBuilder.append((char)Math.abs(random.nextInt(0x7F))); + } + return stringBuilder.toString(); + } + + @SuppressWarnings("SameParameterValue") + private byte[] randomBytes(int size) + { + byte[] data = new byte[size]; + new Random().nextBytes(data); + return data; + } +} diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/BufferUtil.java b/jetty-util/src/main/java/org/eclipse/jetty/util/BufferUtil.java index b150f86ec0d4..88e141a7d531 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/BufferUtil.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/BufferUtil.java @@ -30,6 +30,7 @@ import java.nio.channels.FileChannel.MapMode; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.util.Arrays; @@ -985,9 +986,14 @@ public static ByteBuffer toDirectBuffer(String s, Charset charset) public static ByteBuffer toMappedBuffer(File file) throws IOException { - try (FileChannel channel = FileChannel.open(file.toPath(), StandardOpenOption.READ)) + return toMappedBuffer(file.toPath(), 0, file.length()); + } + + public static ByteBuffer toMappedBuffer(Path filePath, long pos, long len) throws IOException + { + try (FileChannel channel = FileChannel.open(filePath, StandardOpenOption.READ)) { - return channel.map(MapMode.READ_ONLY, 0, file.length()); + return channel.map(MapMode.READ_ONLY, pos, len); } }