From c5d074e5f2fe5771c433aa4b0113481e3ce23add Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Mon, 20 Jan 2020 22:36:23 +1100 Subject: [PATCH 1/5] Issue #4383 - atomic state to MultiPart for multi-thread synchronization Signed-off-by: Lachlan Roberts --- .../server/MultiPartFormInputStream.java | 157 ++++++++++++------ .../server/MultiPartFormInputStreamTest.java | 74 +++++++++ 2 files changed, 177 insertions(+), 54 deletions(-) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/MultiPartFormInputStream.java b/jetty-server/src/main/java/org/eclipse/jetty/server/MultiPartFormInputStream.java index 12e40428398d..8a53207e2f24 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/MultiPartFormInputStream.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/MultiPartFormInputStream.java @@ -31,17 +31,16 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardCopyOption; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; import javax.servlet.MultipartConfigElement; import javax.servlet.ServletInputStream; import javax.servlet.http.Part; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.ByteArrayOutputStream2; -import org.eclipse.jetty.util.LazyList; import org.eclipse.jetty.util.MultiException; import org.eclipse.jetty.util.MultiMap; import org.eclipse.jetty.util.QuotedStringTokenizer; @@ -58,19 +57,27 @@ */ public class MultiPartFormInputStream { + private enum State + { + UNPARSED, + PARSING, + PARSED, + CLOSING, + CLOSED + } + private static final Logger LOG = Log.getLogger(MultiPartFormInputStream.class); - private static final MultiMap EMPTY_MAP = new MultiMap<>(Collections.emptyMap()); - private final MultiMap _parts; - private InputStream _in; - private MultipartConfigElement _config; - private String _contentType; - private Throwable _err; - private File _tmpDir; - private File _contextTmpDir; - private boolean _deleteOnExit; - private boolean _writeFilesWithFilenames; - private boolean _parsed; - private int _bufferSize = 16 * 1024; + private final MultiMap _parts = new MultiMap<>(); + private final InputStream _in; + private final MultipartConfigElement _config; + private final File _contextTmpDir; + private final String _contentType; + private volatile Throwable _err; + private volatile File _tmpDir; + private volatile boolean _deleteOnExit; + private volatile boolean _writeFilesWithFilenames; + private volatile int _bufferSize = 16 * 1024; + private volatile State state = State.UNPARSED; public class MultiPart implements Part { @@ -333,39 +340,33 @@ public String getContentDispositionFilename() public MultiPartFormInputStream(InputStream in, String contentType, MultipartConfigElement config, File contextTmpDir) { _contentType = contentType; - _config = config; - _contextTmpDir = contextTmpDir; - if (_contextTmpDir == null) - _contextTmpDir = new File(System.getProperty("java.io.tmpdir")); - - if (_config == null) - _config = new MultipartConfigElement(_contextTmpDir.getAbsolutePath()); - - MultiMap parts = new MultiMap(); + _contextTmpDir = (contextTmpDir != null) ? contextTmpDir : new File(System.getProperty("java.io.tmpdir")); + _config = (config != null) ? config : new MultipartConfigElement(_contextTmpDir.getAbsolutePath()); if (in instanceof ServletInputStream) { if (((ServletInputStream)in).isFinished()) { - parts = EMPTY_MAP; - _parsed = true; + _in = null; + state = State.PARSED; + return; } } - if (!_parsed) - _in = new BufferedInputStream(in); - _parts = parts; + + _in = new BufferedInputStream(in); } /** * @return whether the list of parsed parts is empty + * @deprecated use getParts().isEmpty() */ + @Deprecated public boolean isEmpty() { - if (_parts == null) + if (_parts.isEmpty()) return true; - Collection> values = _parts.values(); - for (List partList : values) + for (List partList : _parts.values()) { if (!partList.isEmpty()) return false; @@ -379,6 +380,26 @@ public boolean isEmpty() */ public void deleteParts() { + synchronized (this) + { + switch (state) + { + case CLOSED: + case UNPARSED: + state = State.CLOSED; + return; + + case PARSING: + state = State.CLOSING; + return; + + case PARSED: + case CLOSING: + state = State.CLOSED; + break; + } + } + MultiException err = null; for (List parts : _parts.values()) { @@ -410,21 +431,9 @@ public void deleteParts() */ public Collection getParts() throws IOException { - if (!_parsed) - parse(); + parse(); throwIfError(); - - if (_parts.isEmpty()) - return Collections.emptyList(); - - Collection> values = _parts.values(); - List parts = new ArrayList<>(); - for (List o : values) - { - List asList = LazyList.getList(o, false); - parts.addAll(asList); - } - return parts; + return _parts.values().stream().flatMap(List::stream).collect(Collectors.toList()); } /** @@ -436,8 +445,7 @@ public Collection getParts() throws IOException */ public Part getPart(String name) throws IOException { - if (!_parsed) - parse(); + parse(); throwIfError(); return _parts.getValue(name, 0); } @@ -468,13 +476,24 @@ protected void throwIfError() throws IOException */ protected void parse() { - // have we already parsed the input? - if (_parsed) - return; - _parsed = true; + synchronized (this) + { + switch (state) + { + case UNPARSED: + state = State.PARSING; + break; + + case PARSED: + return; + + default: + _err = new IllegalStateException(state.name()); + return; + } + } MultiPartParser parser = null; - Handler handler = new Handler(); try { // if its not a multipart request, don't parse it @@ -507,16 +526,23 @@ else if ("".equals(_config.getLocation())) contentTypeBoundary = QuotedStringTokenizer.unquote(value(_contentType.substring(bstart, bend)).trim()); } - parser = new MultiPartParser(handler, contentTypeBoundary); + parser = new MultiPartParser(new Handler(), contentTypeBoundary); byte[] data = new byte[_bufferSize]; int len; long total = 0; while (true) { + synchronized (this) + { + if (state != State.PARSING) + { + _err = new IllegalStateException(state.name()); + return; + } + } len = _in.read(data); - if (len > 0) { // keep running total of size of bytes read from input and throw an exception if exceeds MultipartConfigElement._maxRequestSize @@ -570,6 +596,29 @@ else if (len == -1) if (parser != null) parser.parse(BufferUtil.EMPTY_BUFFER, true); } + finally + { + boolean cleanup = false; + synchronized (this) + { + switch (state) + { + case PARSING: + state = State.PARSED; + break; + + case CLOSING: + cleanup = true; + break; + + default: + _err = new IllegalStateException(state.name()); + } + } + + if (cleanup) + deleteParts(); + } } class Handler implements MultiPartParser.Handler diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/MultiPartFormInputStreamTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/MultiPartFormInputStreamTest.java index b80787e13158..4862e32b62e4 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/MultiPartFormInputStreamTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/MultiPartFormInputStreamTest.java @@ -25,6 +25,8 @@ import java.io.InputStream; import java.util.Base64; import java.util.Collection; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import javax.servlet.MultipartConfigElement; import javax.servlet.ReadListener; @@ -518,6 +520,78 @@ public void testDeleteNPE() mpis.deleteParts(); // this should not be an NPE } + + @Test + public void testAsyncCleanUp() throws Exception + { + final CountDownLatch reading = new CountDownLatch(1); + final InputStream wrappedStream = new ByteArrayInputStream(createMultipartRequestString("myFile").getBytes()); + + // This stream won't allow the parser to exit because it will never return anything less than 0. + InputStream slowStream = new InputStream() + { + @Override + public int read(byte[] b, int off, int len) throws IOException + { + return Math.max(0, super.read(b, off, len)); + } + + @Override + public int read() throws IOException + { + reading.countDown(); + return wrappedStream.read(); + } + }; + + MultipartConfigElement config = new MultipartConfigElement(_dirname, 1024, 1024, 50); + MultiPartFormInputStream mpis = new MultiPartFormInputStream(slowStream, _contentType, config, _tmpDir); + + // In another thread delete the parts when we detect that we have started parsing. + CompletableFuture cleanupError = new CompletableFuture<>(); + new Thread(() -> + { + try + { + assertTrue(reading.await(5, TimeUnit.SECONDS)); + mpis.deleteParts(); + cleanupError.complete(null); + } + catch (Throwable t) + { + cleanupError.complete(t); + } + }).start(); + + // The call to getParts should throw an error. + Throwable error = assertThrows(IllegalStateException.class, mpis::getParts); + assertThat(error.getMessage(), is("CLOSING")); + + // There was no error with the cleanup. + assertNull(cleanupError.get()); + + // No tmp files are remaining. + String[] fileList = _tmpDir.list(); + assertNotNull(fileList); + assertThat(fileList.length, is(0)); + } + + @Test + public void testParseAfterCleanUp() throws Exception + { + final InputStream input = new ByteArrayInputStream(createMultipartRequestString("myFile").getBytes()); + MultipartConfigElement config = new MultipartConfigElement(_dirname, 1024, 1024, 50); + MultiPartFormInputStream mpis = new MultiPartFormInputStream(input, _contentType, config, _tmpDir); + + mpis.deleteParts(); + + // The call to getParts should throw because we have already cleaned up the parts. + Throwable error = assertThrows(IllegalStateException.class, mpis::getParts); + assertThat(error.getMessage(), is("CLOSED")); + + // Even though we called getParts() we never even created the tmp directory as we had already called deleteParts(). + assertFalse(_tmpDir.exists()); + } @Test public void testLFOnlyRequest() From b217a3541563f2faa631e0705a7b4aa2bfe79080 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Tue, 21 Jan 2020 16:14:01 +1100 Subject: [PATCH 2/5] Issue #4383 - add State diagram and other changes for review Signed-off-by: Lachlan Roberts --- .../server/MultiPartFormInputStream.java | 51 ++++++++++++++----- .../server/MultiPartFormInputStreamTest.java | 8 +-- 2 files changed, 42 insertions(+), 17 deletions(-) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/MultiPartFormInputStream.java b/jetty-server/src/main/java/org/eclipse/jetty/server/MultiPartFormInputStream.java index 8a53207e2f24..d04c7b62a316 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/MultiPartFormInputStream.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/MultiPartFormInputStream.java @@ -52,7 +52,24 @@ * MultiPartInputStream *

* Handle a MultiPart Mime input stream, breaking it up on the boundary into files and strings. - * + *

+ *

+ * Deleting the parts can be done from a different thread if the parts are parsed asynchronously. + * Because of this we use the state to fail the parsing and coordinate which thread will delete any remaining parts. + * The deletion of parts is done by the cleanup thread in all cases except the transition from ERROR->DELETED which + * is done by the parsing thread. + *

+ *
+ *                              deleteParts()
+ *     +--------------------------------------------------------------+
+ *     |                                                              |
+ *     |                                          deleteParts()       v
+ *  UNPARSED -------> PARSING --------> PARSED  ------------------>DELETED
+ *                      |                                             ^
+ *                      |                                             |
+ *                      +----------------> ERROR ---------------------+
+ *                        deleteParts()             parsing thread
+ * 
* @see https://tools.ietf.org/html/rfc7578 */ public class MultiPartFormInputStream @@ -62,8 +79,8 @@ private enum State UNPARSED, PARSING, PARSED, - CLOSING, - CLOSED + DELETED, + ERROR } private static final Logger LOG = Log.getLogger(MultiPartFormInputStream.class); @@ -384,22 +401,29 @@ public void deleteParts() { switch (state) { - case CLOSED: - case UNPARSED: - state = State.CLOSED; + case DELETED: + case ERROR: return; case PARSING: - state = State.CLOSING; + state = State.ERROR; + return; + + case UNPARSED: + state = State.DELETED; return; case PARSED: - case CLOSING: - state = State.CLOSED; + state = State.DELETED; break; } } + uncheckedDeleteParts(); + } + + private void uncheckedDeleteParts() + { MultiException err = null; for (List parts : _parts.values()) { @@ -488,7 +512,7 @@ protected void parse() return; default: - _err = new IllegalStateException(state.name()); + _err = new IOException(state.name()); return; } } @@ -537,7 +561,7 @@ else if ("".equals(_config.getLocation())) { if (state != State.PARSING) { - _err = new IllegalStateException(state.name()); + _err = new IOException(state.name()); return; } } @@ -607,7 +631,8 @@ else if (len == -1) state = State.PARSED; break; - case CLOSING: + case ERROR: + state = State.DELETED; cleanup = true; break; @@ -617,7 +642,7 @@ else if (len == -1) } if (cleanup) - deleteParts(); + uncheckedDeleteParts(); } } diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/MultiPartFormInputStreamTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/MultiPartFormInputStreamTest.java index 4862e32b62e4..4854445e6be0 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/MultiPartFormInputStreamTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/MultiPartFormInputStreamTest.java @@ -564,8 +564,8 @@ public int read() throws IOException }).start(); // The call to getParts should throw an error. - Throwable error = assertThrows(IllegalStateException.class, mpis::getParts); - assertThat(error.getMessage(), is("CLOSING")); + Throwable error = assertThrows(IOException.class, mpis::getParts); + assertThat(error.getMessage(), is("ERROR")); // There was no error with the cleanup. assertNull(cleanupError.get()); @@ -586,8 +586,8 @@ public void testParseAfterCleanUp() throws Exception mpis.deleteParts(); // The call to getParts should throw because we have already cleaned up the parts. - Throwable error = assertThrows(IllegalStateException.class, mpis::getParts); - assertThat(error.getMessage(), is("CLOSED")); + Throwable error = assertThrows(IOException.class, mpis::getParts); + assertThat(error.getMessage(), is("DELETED")); // Even though we called getParts() we never even created the tmp directory as we had already called deleteParts(). assertFalse(_tmpDir.exists()); From 0ab7751ef2af8ea49e34c98461da1a8d081ba5f6 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Tue, 21 Jan 2020 18:22:22 +1100 Subject: [PATCH 3/5] Issue #4383 - fix javadoc build issue Signed-off-by: Lachlan Roberts --- .../org/eclipse/jetty/server/MultiPartFormInputStream.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/MultiPartFormInputStream.java b/jetty-server/src/main/java/org/eclipse/jetty/server/MultiPartFormInputStream.java index d04c7b62a316..492c52a12394 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/MultiPartFormInputStream.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/MultiPartFormInputStream.java @@ -56,10 +56,10 @@ *

* Deleting the parts can be done from a different thread if the parts are parsed asynchronously. * Because of this we use the state to fail the parsing and coordinate which thread will delete any remaining parts. - * The deletion of parts is done by the cleanup thread in all cases except the transition from ERROR->DELETED which + * The deletion of parts is done by the cleanup thread in all cases except the transition from ERROR->DELETED which * is done by the parsing thread. *

- *
+ * 
{@code
  *                              deleteParts()
  *     +--------------------------------------------------------------+
  *     |                                                              |
@@ -69,7 +69,7 @@
  *                      |                                             |
  *                      +----------------> ERROR ---------------------+
  *                        deleteParts()             parsing thread
- * 
+ * }
* @see https://tools.ietf.org/html/rfc7578 */ public class MultiPartFormInputStream From f9f2ccaefac721dfe50b6bfbde83baea78b872c9 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Wed, 22 Jan 2020 14:26:36 +1100 Subject: [PATCH 4/5] Issue #4383 - check for non multipart content type in constructor Signed-off-by: Lachlan Roberts --- .../eclipse/jetty/server/MultiPartFormInputStream.java | 10 +++++----- .../jetty/server/MultiPartFormInputStreamTest.java | 10 ++++------ 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/MultiPartFormInputStream.java b/jetty-server/src/main/java/org/eclipse/jetty/server/MultiPartFormInputStream.java index 492c52a12394..5d8eaffaa4ac 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/MultiPartFormInputStream.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/MultiPartFormInputStream.java @@ -94,7 +94,7 @@ private enum State private volatile boolean _deleteOnExit; private volatile boolean _writeFilesWithFilenames; private volatile int _bufferSize = 16 * 1024; - private volatile State state = State.UNPARSED; + private State state = State.UNPARSED; public class MultiPart implements Part { @@ -356,7 +356,11 @@ public String getContentDispositionFilename() */ public MultiPartFormInputStream(InputStream in, String contentType, MultipartConfigElement config, File contextTmpDir) { + // Must be a multipart request. _contentType = contentType; + if (_contentType == null || !_contentType.startsWith("multipart/form-data")) + throw new IllegalArgumentException("content type is not multipart/form-data"); + _contextTmpDir = (contextTmpDir != null) ? contextTmpDir : new File(System.getProperty("java.io.tmpdir")); _config = (config != null) ? config : new MultipartConfigElement(_contextTmpDir.getAbsolutePath()); @@ -520,10 +524,6 @@ protected void parse() MultiPartParser parser = null; try { - // if its not a multipart request, don't parse it - if (_contentType == null || !_contentType.startsWith("multipart/form-data")) - return; - // sort out the location to which to write the files if (_config.getLocation() == null) _tmpDir = _contextTmpDir; diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/MultiPartFormInputStreamTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/MultiPartFormInputStreamTest.java index 4854445e6be0..733bb7421516 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/MultiPartFormInputStreamTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/MultiPartFormInputStreamTest.java @@ -206,12 +206,10 @@ public void testNonMultiPartRequest() throws Exception { MultipartConfigElement config = new MultipartConfigElement(_dirname, 1024, 3072, 50); - MultiPartFormInputStream mpis = new MultiPartFormInputStream(new ByteArrayInputStream(_multi.getBytes()), - "Content-type: text/plain", - config, - _tmpDir); - mpis.setDeleteOnExit(true); - assertTrue(mpis.getParts().isEmpty()); + Throwable t = assertThrows(IllegalArgumentException.class, () -> + new MultiPartFormInputStream(new ByteArrayInputStream(_multi.getBytes()), + "Content-type: text/plain", config, _tmpDir)); + assertThat(t.getMessage(), is("content type is not multipart/form-data")); } @Test From f76687ad68fd0b467728dea60ef60b41c2f1418a Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Fri, 24 Jan 2020 09:49:00 +1100 Subject: [PATCH 5/5] Issue 4383 - changes from review Signed-off-by: Lachlan Roberts --- .../server/MultiPartFormInputStream.java | 28 +++++++++++-------- .../server/MultiPartFormInputStreamTest.java | 2 +- 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/MultiPartFormInputStream.java b/jetty-server/src/main/java/org/eclipse/jetty/server/MultiPartFormInputStream.java index 5d8eaffaa4ac..bc7fe0445b45 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/MultiPartFormInputStream.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/MultiPartFormInputStream.java @@ -56,10 +56,16 @@ *

* Deleting the parts can be done from a different thread if the parts are parsed asynchronously. * Because of this we use the state to fail the parsing and coordinate which thread will delete any remaining parts. - * The deletion of parts is done by the cleanup thread in all cases except the transition from ERROR->DELETED which + * The deletion of parts is done by the cleanup thread in all cases except the transition from DELETING->DELETED which * is done by the parsing thread. *

*
{@code
+ * UNPARSED - Parsing has not started, there are no parts which need to be cleaned up.
+ * PARSING  - The parsing thread is reading from the InputStream and generating parts.
+ * PARSED   - Parsing has complete and no more parts will be generated.
+ * DELETING - deleteParts() has been called while we were in PARSING state, parsing thread will do the delete.
+ * DELETED  - The parts have been deleted, this is the terminal state.
+ *
  *                              deleteParts()
  *     +--------------------------------------------------------------+
  *     |                                                              |
@@ -67,8 +73,8 @@
  *  UNPARSED -------> PARSING --------> PARSED  ------------------>DELETED
  *                      |                                             ^
  *                      |                                             |
- *                      +----------------> ERROR ---------------------+
- *                        deleteParts()             parsing thread
+ *                      +---------------> DELETING -------------------+
+ *                        deleteParts()               parsing thread
  * }
* @see https://tools.ietf.org/html/rfc7578 */ @@ -79,8 +85,8 @@ private enum State UNPARSED, PARSING, PARSED, - DELETED, - ERROR + DELETING, + DELETED } private static final Logger LOG = Log.getLogger(MultiPartFormInputStream.class); @@ -406,11 +412,11 @@ public void deleteParts() switch (state) { case DELETED: - case ERROR: + case DELETING: return; case PARSING: - state = State.ERROR; + state = State.DELETING; return; case UNPARSED: @@ -423,10 +429,10 @@ public void deleteParts() } } - uncheckedDeleteParts(); + delete(); } - private void uncheckedDeleteParts() + private void delete() { MultiException err = null; for (List parts : _parts.values()) @@ -631,7 +637,7 @@ else if (len == -1) state = State.PARSED; break; - case ERROR: + case DELETING: state = State.DELETED; cleanup = true; break; @@ -642,7 +648,7 @@ else if (len == -1) } if (cleanup) - uncheckedDeleteParts(); + delete(); } } diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/MultiPartFormInputStreamTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/MultiPartFormInputStreamTest.java index 733bb7421516..119c0d774703 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/MultiPartFormInputStreamTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/MultiPartFormInputStreamTest.java @@ -563,7 +563,7 @@ public int read() throws IOException // The call to getParts should throw an error. Throwable error = assertThrows(IOException.class, mpis::getParts); - assertThat(error.getMessage(), is("ERROR")); + assertThat(error.getMessage(), is("DELETING")); // There was no error with the cleanup. assertNull(cleanupError.get());