diff --git a/spring-core/src/main/java/org/springframework/core/codec/ByteArrayDecoder.java b/spring-core/src/main/java/org/springframework/core/codec/ByteArrayDecoder.java index 65f8222d1774..06e224a989af 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/ByteArrayDecoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/ByteArrayDecoder.java @@ -44,6 +44,11 @@ public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType return (elementType.resolve() == byte[].class && super.canDecode(elementType, mimeType)); } + @Override + public boolean canDecodeEmptyMessage() { + return true; + } + @Override public byte[] decode(DataBuffer dataBuffer, ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map hints) { diff --git a/spring-core/src/main/java/org/springframework/core/codec/ByteBufferDecoder.java b/spring-core/src/main/java/org/springframework/core/codec/ByteBufferDecoder.java index 6481ef86dbf1..143018ee9b03 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/ByteBufferDecoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/ByteBufferDecoder.java @@ -47,6 +47,11 @@ public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType super.canDecode(elementType, mimeType)); } + @Override + public boolean canDecodeEmptyMessage() { + return true; + } + @Override public ByteBuffer decode(DataBuffer dataBuffer, ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map hints) { diff --git a/spring-core/src/main/java/org/springframework/core/codec/DataBufferDecoder.java b/spring-core/src/main/java/org/springframework/core/codec/DataBufferDecoder.java index 788eafdb0e69..2fa7fddcfa06 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/DataBufferDecoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/DataBufferDecoder.java @@ -56,6 +56,11 @@ public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType super.canDecode(elementType, mimeType)); } + @Override + public boolean canDecodeEmptyMessage() { + return true; + } + @Override public Flux decode(Publisher input, ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map hints) { diff --git a/spring-core/src/main/java/org/springframework/core/codec/Decoder.java b/spring-core/src/main/java/org/springframework/core/codec/Decoder.java index cc5271d5c01f..52983ace05e1 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/Decoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/Decoder.java @@ -53,6 +53,17 @@ public interface Decoder { */ boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType); + /** + * Whether the decoder supports decoding an Object of its target type from an + * empty message. When it is true, the decoder will always return a non-null + * value from its {@code decode} method when an empty message is decoded. + * @return {@code true} if supported, {@code false} otherwise + * @since 6.0.5 + */ + default boolean canDecodeEmptyMessage() { + return false; + } + /** * Decode a {@link DataBuffer} input stream into a Flux of {@code T}. * @param inputStream the {@code DataBuffer} input stream to decode diff --git a/spring-core/src/main/java/org/springframework/core/codec/Netty5BufferDecoder.java b/spring-core/src/main/java/org/springframework/core/codec/Netty5BufferDecoder.java index 150db2b893e6..cb415941ef03 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/Netty5BufferDecoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/Netty5BufferDecoder.java @@ -48,6 +48,11 @@ public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType super.canDecode(elementType, mimeType)); } + @Override + public boolean canDecodeEmptyMessage() { + return true; + } + @Override public Buffer decode(DataBuffer dataBuffer, ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map hints) { diff --git a/spring-core/src/main/java/org/springframework/core/codec/NettyByteBufDecoder.java b/spring-core/src/main/java/org/springframework/core/codec/NettyByteBufDecoder.java index 569a5cc83246..df81878a4616 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/NettyByteBufDecoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/NettyByteBufDecoder.java @@ -48,6 +48,11 @@ public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType super.canDecode(elementType, mimeType)); } + @Override + public boolean canDecodeEmptyMessage() { + return true; + } + @Override public ByteBuf decode(DataBuffer dataBuffer, ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map hints) { diff --git a/spring-core/src/main/java/org/springframework/core/codec/ResourceDecoder.java b/spring-core/src/main/java/org/springframework/core/codec/ResourceDecoder.java index 4e9552a650a0..7a63e6ea8c70 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/ResourceDecoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/ResourceDecoder.java @@ -56,6 +56,11 @@ public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType super.canDecode(elementType, mimeType)); } + @Override + public boolean canDecodeEmptyMessage() { + return true; + } + @Override public Flux decode(Publisher inputStream, ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map hints) { diff --git a/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java b/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java index 953ddbd51e42..19c8e7c3bbe4 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java @@ -105,6 +105,11 @@ public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType return (elementType.resolve() == String.class && super.canDecode(elementType, mimeType)); } + @Override + public boolean canDecodeEmptyMessage() { + return true; + } + @Override public Flux decode(Publisher input, ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map hints) { diff --git a/spring-core/src/testFixtures/java/org/springframework/core/testfixture/codec/AbstractDecoderTests.java b/spring-core/src/testFixtures/java/org/springframework/core/testfixture/codec/AbstractDecoderTests.java index 97454ed162d4..02830794913c 100644 --- a/spring-core/src/testFixtures/java/org/springframework/core/testfixture/codec/AbstractDecoderTests.java +++ b/spring-core/src/testFixtures/java/org/springframework/core/testfixture/codec/AbstractDecoderTests.java @@ -36,6 +36,7 @@ import org.springframework.util.MimeType; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.springframework.core.io.buffer.DataBufferUtils.release; /** * Abstract base class for {@link Decoder} unit tests. Subclasses need to implement @@ -114,6 +115,7 @@ protected void testDecodeAll(Publisher input, Class *
  • {@link #testDecodeError(Publisher, ResolvableType, MimeType, Map)}
  • *
  • {@link #testDecodeCancel(Publisher, ResolvableType, MimeType, Map)}
  • *
  • {@link #testDecodeEmpty(ResolvableType, MimeType, Map)}
  • + *
  • {@link #testDecodeEmptyMessage(ResolvableType, MimeType, Map)}
  • * * * @param input the input to be provided to the decoder @@ -131,6 +133,7 @@ protected void testDecodeAll(Publisher input, ResolvableType out testDecodeError(input, outputType, mimeType, hints); testDecodeCancel(input, outputType, mimeType, hints); testDecodeEmpty(outputType, mimeType, hints); + testDecodeEmptyMessage(outputType, mimeType, hints); } /** @@ -258,6 +261,25 @@ protected void testDecodeEmpty(ResolvableType outputType, @Nullable MimeType mim StepVerifier.create(result).verifyComplete(); } + /** + * Test a {@link Decoder#decode decode} scenario where the input stream is an empty buffer. + * The output is expected to be filled when the decoder supports it. + * + * @param outputType the desired output type + * @param mimeType the mime type to use for decoding. May be {@code null}. + * @param hints the hints used for decoding. May be {@code null}. + */ + protected void testDecodeEmptyMessage(ResolvableType outputType, MimeType mimeType, Map hints) { + if (!this.decoder.canDecodeEmptyMessage()) { + return; + } + DataBuffer buffer = this.bufferFactory.allocateBuffer(0); + Object result = this.decoder.decode(buffer, outputType, mimeType, hints); + releaseDataBufferIfIdentical(buffer, result); + Assert.notNull(result, "result expected to be non null"); + Assert.isAssignable(outputType.toClass(), result.getClass(), "result not of specified type"); + } + // Mono /** @@ -289,6 +311,7 @@ protected void testDecodeToMonoAll(Publisher input, *
  • {@link #testDecodeToMonoError(Publisher, ResolvableType, MimeType, Map)}
  • *
  • {@link #testDecodeToMonoCancel(Publisher, ResolvableType, MimeType, Map)}
  • *
  • {@link #testDecodeToMonoEmpty(ResolvableType, MimeType, Map)}
  • + *
  • {@link #testDecodeToMonoEmptyMessage(ResolvableType, MimeType, Map)}
  • * * * @param input the input to be provided to the decoder @@ -306,6 +329,7 @@ protected void testDecodeToMonoAll(Publisher input, ResolvableTy testDecodeToMonoError(input, outputType, mimeType, hints); testDecodeToMonoCancel(input, outputType, mimeType, hints); testDecodeToMonoEmpty(outputType, mimeType, hints); + testDecodeToMonoEmptyMessage(outputType, mimeType, hints); } /** @@ -419,6 +443,32 @@ protected void testDecodeToMonoEmpty(ResolvableType outputType, @Nullable MimeTy StepVerifier.create(result).verifyComplete(); } + /** + * Test a {@link Decoder#decodeToMono decode} scenario where the input stream is an empty buffer. + * The output is expected to be filled when the decoder supports it. + * + * @param outputType the desired output type + * @param mimeType the mime type to use for decoding. May be {@code null}. + * @param hints the hints used for decoding. May be {@code null}. + */ + protected void testDecodeToMonoEmptyMessage(ResolvableType outputType, @Nullable MimeType mimeType, + @Nullable Map hints) { + + if (!this.decoder.canDecodeEmptyMessage()) { + return; + } + + Flux source = Flux.range(0, 2) + .map(i -> this.bufferFactory.allocateBuffer(0)); + + Mono result = this.decoder.decodeToMono(source, outputType, mimeType, hints) + .doOnNext(this::releaseIfDataBuffer); + + StepVerifier.create(result) + .expectNextMatches(next -> outputType.toClass().isInstance(next)) + .verifyComplete(); + } + /** * Creates a deferred {@link DataBuffer} containing the given bytes. * @param bytes the bytes that are to be stored in the buffer @@ -432,6 +482,27 @@ protected Mono dataBuffer(byte[] bytes) { }); } + /** + * If {@code value} is referentially identical to {@code buffer}, release it. + * @param buffer the {@link DataBuffer} that is compared + * @param value the {@link Object} that is compared + */ + private void releaseDataBufferIfIdentical(DataBuffer buffer, Object value) { + if (buffer == value) { + release(buffer); + } + } + + /** + * If {@code value} is a {@link DataBuffer}, release it. + * @param value the {@link Object} that is checked + */ + private void releaseIfDataBuffer(Object value) { + if (value instanceof DataBuffer) { + release((DataBuffer) value); + } + } + /** * Exception used in {@link #testDecodeError} and {@link #testDecodeToMonoError} */ diff --git a/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/reactive/PayloadMethodArgumentResolver.java b/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/reactive/PayloadMethodArgumentResolver.java index adbeea73b0b8..1b1e33f5a8d7 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/reactive/PayloadMethodArgumentResolver.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/reactive/PayloadMethodArgumentResolver.java @@ -231,7 +231,7 @@ private Mono decodeContent(MethodParameter parameter, Message message if (decoder.canDecode(elementType, mimeType)) { if (adapter != null && adapter.isMultiValue()) { Flux flux = content - .filter(this::nonEmptyDataBuffer) + .filter(dataBuffer -> nonEmptyDataBuffer(dataBuffer, decoder)) .map(buffer -> decoder.decode(buffer, elementType, mimeType, hints)) .onErrorResume(ex -> Flux.error(handleReadError(parameter, message, ex))); if (isContentRequired) { @@ -245,7 +245,7 @@ private Mono decodeContent(MethodParameter parameter, Message message else { // Single-value (with or without reactive type wrapper) Mono mono = content.next() - .filter(this::nonEmptyDataBuffer) + .filter(dataBuffer -> nonEmptyDataBuffer(dataBuffer, decoder)) .map(buffer -> decoder.decode(buffer, elementType, mimeType, hints)) .onErrorResume(ex -> Mono.error(handleReadError(parameter, message, ex))); if (isContentRequired) { @@ -263,7 +263,10 @@ private Mono decodeContent(MethodParameter parameter, Message message message, parameter, "Cannot decode to [" + targetType + "]" + message)); } - private boolean nonEmptyDataBuffer(DataBuffer buffer) { + private boolean nonEmptyDataBuffer(DataBuffer buffer, Decoder decoder) { + if (decoder.canDecodeEmptyMessage()) { + return true; + } if (buffer.readableByteCount() > 0) { return true; } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java index 12e416645d37..e8c3cfea658a 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java @@ -167,7 +167,7 @@ public void echoChannel() { @Test // gh-26344 public void echoChannelWithEmptyInput() { Flux result = requester.route("echo-channel-empty").data(Flux.empty()).retrieveFlux(String.class); - StepVerifier.create(result).verifyComplete(); + StepVerifier.create(result).expectNext(" echoed").verifyComplete(); } @Test diff --git a/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufDecoder.java b/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufDecoder.java index 776f7695642b..b8383a4a7984 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufDecoder.java +++ b/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufDecoder.java @@ -125,6 +125,11 @@ public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType return Message.class.isAssignableFrom(elementType.toClass()) && supportsMimeType(mimeType); } + @Override + public boolean canDecodeEmptyMessage() { + return true; + } + @Override public Flux decode(Publisher inputStream, ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map hints) { diff --git a/spring-web/src/test/java/org/springframework/http/codec/protobuf/ProtobufDecoderTests.java b/spring-web/src/test/java/org/springframework/http/codec/protobuf/ProtobufDecoderTests.java index 81b26e4f7df2..72711a141352 100644 --- a/spring-web/src/test/java/org/springframework/http/codec/protobuf/ProtobufDecoderTests.java +++ b/spring-web/src/test/java/org/springframework/http/codec/protobuf/ProtobufDecoderTests.java @@ -221,6 +221,11 @@ public void exceedMaxSize() { testDecode(input, Msg.class, step -> step.verifyError(DecodingException.class)); } + @Test + public void decodeEmpty() { + testDecodeEmptyMessage(ResolvableType.forClass(Msg.class), null, null); + } + private Mono dataBuffer(Msg msg) { return Mono.fromCallable(() -> { byte[] bytes = msg.toByteArray();