Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support decoding empty DataBuffers for Decoders that support it #29903

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Expand Up @@ -53,6 +53,15 @@ public interface Decoder<T> {
*/
boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType);

/**
* Whether the decoder supports decoding messages from empty data buffers.
* @return {@code true} if supported, {@code false} otherwise
* @since 6.0.5
*/
default boolean canDecodeEmptyDataBuffer() {
return false;
}

/**
* Decode a {@link DataBuffer} input stream into a Flux of {@code T}.
* @param inputStream the {@code DataBuffer} input stream to decode
Expand Down
Expand Up @@ -231,7 +231,7 @@ private Mono<Object> decodeContent(MethodParameter parameter, Message<?> message
if (decoder.canDecode(elementType, mimeType)) {
if (adapter != null && adapter.isMultiValue()) {
Flux<?> flux = content
.filter(this::nonEmptyDataBuffer)
.filter(dataBuffer -> nonEmptyDataBuffer(dataBuffer, decoder))
.map(buffer -> decoder.decode(buffer, elementType, mimeType, hints))
.onErrorResume(ex -> Flux.error(handleReadError(parameter, message, ex)));
if (isContentRequired) {
Expand All @@ -245,7 +245,7 @@ private Mono<Object> decodeContent(MethodParameter parameter, Message<?> message
else {
// Single-value (with or without reactive type wrapper)
Mono<?> mono = content.next()
.filter(this::nonEmptyDataBuffer)
.filter(dataBuffer -> nonEmptyDataBuffer(dataBuffer, decoder))
.map(buffer -> decoder.decode(buffer, elementType, mimeType, hints))
.onErrorResume(ex -> Mono.error(handleReadError(parameter, message, ex)));
if (isContentRequired) {
Expand All @@ -263,7 +263,10 @@ private Mono<Object> decodeContent(MethodParameter parameter, Message<?> message
message, parameter, "Cannot decode to [" + targetType + "]" + message));
}

private boolean nonEmptyDataBuffer(DataBuffer buffer) {
private boolean nonEmptyDataBuffer(DataBuffer buffer, Decoder<?> decoder) {
if (decoder.canDecodeEmptyDataBuffer()) {
return true;
}
if (buffer.readableByteCount() > 0) {
return true;
}
Expand Down
Expand Up @@ -125,6 +125,11 @@ public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType
return Message.class.isAssignableFrom(elementType.toClass()) && supportsMimeType(mimeType);
}

@Override
public boolean canDecodeEmptyDataBuffer() {
return true;
}

@Override
public Flux<Message> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
Expand Down