From a741ae422b75e330dac655718ad91e0067a2caeb Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Mon, 13 Jan 2020 14:55:29 +0000 Subject: [PATCH] Improve limit handling in StringDecoder The case of one data buffer containing multiple lines can could cause a buffer leak due to a suspected issue in concatMapIterable. This commit adds workarounds for that until the underlying issue is addressed. Closes gh-24339 --- .../core/codec/StringDecoder.java | 132 ++++++++++++------ .../core/codec/StringDecoderTests.java | 31 ++-- 2 files changed, 109 insertions(+), 54 deletions(-) diff --git a/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java b/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java index 2fa3cf0a4056..abbe66a3b3b0 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -94,20 +94,44 @@ public Flux decode(Publisher input, ResolvableType elementTy byte[][] delimiterBytes = getDelimiterBytes(mimeType); - // TODO: Drop Consumer and use bufferUntil with Supplier (reactor-core#1925) - // TODO: Drop doOnDiscard(LimitedDataBufferList.class, ...) (reactor-core#1924) - LimitedDataBufferConsumer limiter = new LimitedDataBufferConsumer(getMaxInMemorySize()); - Flux inputFlux = Flux.defer(() -> { DataBufferUtils.Matcher matcher = DataBufferUtils.matcher(delimiterBytes); - return Flux.from(input) - .concatMapIterable(buffer -> endFrameAfterDelimiter(buffer, matcher)) - .doOnNext(limiter) - .bufferUntil(buffer -> buffer instanceof EndFrameBuffer) - .map(buffers -> joinAndStrip(buffers, this.stripDelimiter)) - .doOnDiscard(LimitedDataBufferList.class, LimitedDataBufferList::releaseAndClear) - .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); + if (getMaxInMemorySize() != -1) { + + // Passing limiter into endFrameAfterDelimiter helps to ensure that in case of one DataBuffer + // containing multiple lines, the limit is checked and raised immediately without accumulating + // subsequent lines. This is necessary because concatMapIterable doesn't respect doOnDiscard. + // When reactor-core#1925 is resolved, we could replace bufferUntil with: + + // .windowUntil(buffer -> buffer instanceof EndFrameBuffer) + // .concatMap(fluxes -> fluxes.collect(() -> new LimitedDataBufferList(getMaxInMemorySize()), LimitedDataBufferList::add)) + + LimitedDataBufferList limiter = new LimitedDataBufferList(getMaxInMemorySize()); + + return Flux.from(input) + .concatMapIterable(buffer -> endFrameAfterDelimiter(buffer, matcher, limiter)) + .bufferUntil(buffer -> buffer instanceof EndFrameBuffer) + .map(buffers -> joinAndStrip(buffers, this.stripDelimiter)) + .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); + } + else { + // When the decoder is unlimited (-1), concatMapIterable will cache buffers that may not + // be released if cancel is signalled before they are turned into String lines + // (see test maxInMemoryLimitReleasesUnprocessedLinesWhenUnlimited). + // When reactor-core#1925 is resolved, the workaround can be removed and the entire + // else clause possibly dropped. + + ConcatMapIterableDiscardWorkaroundCache cache = new ConcatMapIterableDiscardWorkaroundCache(); + + return Flux.from(input) + .concatMapIterable(buffer -> cache.addAll(endFrameAfterDelimiter(buffer, matcher, null))) + .doOnNext(cache) + .doOnCancel(cache) + .bufferUntil(buffer -> buffer instanceof EndFrameBuffer) + .map(buffers -> joinAndStrip(buffers, this.stripDelimiter)) + .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); + } }); return super.decode(inputFlux, elementType, mimeType, hints); @@ -152,29 +176,49 @@ private static Charset getCharset(@Nullable MimeType mimeType) { * * @param dataBuffer the buffer to find delimiters in * @param matcher used to find the first delimiters + * @param limiter to enforce maxInMemorySize with * @return a flux of buffers, containing {@link EndFrameBuffer} after each delimiter that was * found in {@code dataBuffer}. Returns Flux, because returning List (w/ flatMapIterable) * results in memory leaks due to pre-fetching. */ - private static List endFrameAfterDelimiter(DataBuffer dataBuffer, DataBufferUtils.Matcher matcher) { + private static List endFrameAfterDelimiter( + DataBuffer dataBuffer, DataBufferUtils.Matcher matcher, @Nullable LimitedDataBufferList limiter) { + List result = new ArrayList<>(); - do { - int endIdx = matcher.match(dataBuffer); - if (endIdx != -1) { - int readPosition = dataBuffer.readPosition(); - int length = endIdx - readPosition + 1; - result.add(dataBuffer.retainedSlice(readPosition, length)); - result.add(new EndFrameBuffer(matcher.delimiter())); - dataBuffer.readPosition(endIdx + 1); + try { + do { + int endIdx = matcher.match(dataBuffer); + if (endIdx != -1) { + int readPosition = dataBuffer.readPosition(); + int length = (endIdx - readPosition + 1); + DataBuffer slice = dataBuffer.retainedSlice(readPosition, length); + result.add(slice); + result.add(new EndFrameBuffer(matcher.delimiter())); + dataBuffer.readPosition(endIdx + 1); + if (limiter != null) { + limiter.add(slice); // enforce the limit + limiter.clear(); + } + } + else { + result.add(DataBufferUtils.retain(dataBuffer)); + if (limiter != null) { + limiter.add(dataBuffer); + } + break; + } } - else { - result.add(DataBufferUtils.retain(dataBuffer)); - break; + while (dataBuffer.readableByteCount() > 0); + } + catch (DataBufferLimitException ex) { + if (limiter != null) { + limiter.releaseAndClear(); } + throw ex; + } + finally { + DataBufferUtils.release(dataBuffer); } - while (dataBuffer.readableByteCount() > 0); - - DataBufferUtils.release(dataBuffer); return result; } @@ -288,34 +332,32 @@ public byte[] delimiter() { } - /** - * Temporary measure for reactor-core#1925. - * Consumer that adds to a {@link LimitedDataBufferList} to enforce limits. - */ - private static class LimitedDataBufferConsumer implements Consumer { + private class ConcatMapIterableDiscardWorkaroundCache implements Consumer, Runnable { - private final LimitedDataBufferList bufferList; + private final List buffers = new ArrayList<>(); - public LimitedDataBufferConsumer(int maxInMemorySize) { - this.bufferList = new LimitedDataBufferList(maxInMemorySize); + public List addAll(List buffersToAdd) { + this.buffers.addAll(buffersToAdd); + return buffersToAdd; } + @Override + public void accept(DataBuffer dataBuffer) { + this.buffers.remove(dataBuffer); + } @Override - public void accept(DataBuffer buffer) { - if (buffer instanceof EndFrameBuffer) { - this.bufferList.clear(); - } - else { + public void run() { + this.buffers.forEach(buffer -> { try { - this.bufferList.add(buffer); - } - catch (DataBufferLimitException ex) { DataBufferUtils.release(buffer); - throw ex; } - } + catch (Throwable ex) { + // Keep going.. + } + }); } } + } diff --git a/spring-core/src/test/java/org/springframework/core/codec/StringDecoderTests.java b/spring-core/src/test/java/org/springframework/core/codec/StringDecoderTests.java index 0d3d29ceef55..fad7a1ec102c 100644 --- a/spring-core/src/test/java/org/springframework/core/codec/StringDecoderTests.java +++ b/spring-core/src/test/java/org/springframework/core/codec/StringDecoderTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -130,17 +130,30 @@ void decodeNewLine() { } @Test - void decodeNewLineWithLimit() { + void maxInMemoryLimit() { Flux input = Flux.just( - stringBuffer("abc\n"), - stringBuffer("defg\n"), - stringBuffer("hijkl\n") - ); - this.decoder.setMaxInMemorySize(5); + stringBuffer("abc\n"), stringBuffer("defg\n"), stringBuffer("hijkl\n")); + this.decoder.setMaxInMemorySize(5); testDecode(input, String.class, step -> - step.expectNext("abc", "defg") - .verifyError(DataBufferLimitException.class)); + step.expectNext("abc", "defg").verifyError(DataBufferLimitException.class)); + } + + @Test // gh-24312 + void maxInMemoryLimitReleaseUnprocessedLinesFromCurrentBuffer() { + Flux input = Flux.just( + stringBuffer("TOO MUCH DATA\nanother line\n\nand another\n")); + + this.decoder.setMaxInMemorySize(5); + testDecode(input, String.class, step -> step.verifyError(DataBufferLimitException.class)); + } + + @Test // gh-24339 + void maxInMemoryLimitReleaseUnprocessedLinesWhenUnlimited() { + Flux input = Flux.just(stringBuffer("Line 1\nLine 2\nLine 3\n")); + + this.decoder.setMaxInMemorySize(-1); + testDecodeCancel(input, ResolvableType.forClass(String.class), null, Collections.emptyMap()); } @Test