Skip to content

Commit

Permalink
Use Jackson SequenceWriter for streaming
Browse files Browse the repository at this point in the history
Before this commit, the AbstractJackson2Encoder instantiated a
ObjectWriter per value. This is not an issue for single values or
non-streaming scenarios (which effectively are the same, because in the
latter values are collected into a list until offered to Jackson).
However, this does create a problem for SMILE, because it allows for
shared references that do not match up when writing each value with a
new ObjectWriter, resulting in errors parsing the result.

This commit uses Jackson's SequenceWriter for streaming scenarios,
allowing Jackson to reuse the same context for writing multiple values,
fixing the issue described above.

Closes gh-24198
  • Loading branch information
poutsma committed Jan 21, 2020
1 parent 5e9d29d commit 54669c5
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 100 deletions.
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,7 +17,6 @@
package org.springframework.http.codec.json;

import java.io.IOException;
import java.io.OutputStream;
import java.lang.annotation.Annotation;
import java.nio.charset.Charset;
import java.util.ArrayList;
Expand All @@ -29,9 +28,11 @@
import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.util.ByteArrayBuilder;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.databind.SequenceWriter;
import com.fasterxml.jackson.databind.exc.InvalidDefinitionException;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
Expand All @@ -44,7 +45,6 @@
import org.springframework.core.codec.Hints;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.log.LogFormatUtils;
import org.springframework.http.MediaType;
import org.springframework.http.codec.HttpMessageEncoder;
Expand Down Expand Up @@ -115,72 +115,81 @@ public Flux<DataBuffer> encode(Publisher<?> inputStream, DataBufferFactory buffe
Assert.notNull(bufferFactory, "'bufferFactory' must not be null");
Assert.notNull(elementType, "'elementType' must not be null");

JsonEncoding encoding = getJsonEncoding(mimeType);

if (inputStream instanceof Mono) {
return Mono.from(inputStream).map(value ->
encodeValue(value, bufferFactory, elementType, mimeType, hints, encoding)).flux();
return Mono.from(inputStream)
.map(value -> encodeValue(value, bufferFactory, elementType, mimeType, hints))
.flux();
}
else {
return this.streamingMediaTypes.stream()
.filter(mediaType -> mediaType.isCompatibleWith(mimeType))
.findFirst()
.map(mediaType -> {
byte[] separator = STREAM_SEPARATORS.getOrDefault(mediaType, NEWLINE_SEPARATOR);
return Flux.from(inputStream).map(value -> {
DataBuffer buffer = encodeValue(
value, bufferFactory, elementType, mimeType, hints, encoding);
if (separator != null) {
buffer.write(separator);
}
return buffer;
});
})
.orElseGet(() -> {
ResolvableType listType = ResolvableType.forClassWithGenerics(List.class, elementType);
return Flux.from(inputStream).collectList().map(list ->
encodeValue(list, bufferFactory, listType, mimeType, hints, encoding)).flux();
});
byte[] separator = streamSeparator(mimeType);
if (separator != null) { // streaming
try {
ObjectWriter writer = createObjectWriter(elementType, mimeType, hints);
ByteArrayBuilder byteBuilder = new ByteArrayBuilder(writer.getFactory()._getBufferRecycler());
JsonEncoding encoding = getJsonEncoding(mimeType);
JsonGenerator generator = getObjectMapper().getFactory().createGenerator(byteBuilder, encoding);
SequenceWriter sequenceWriter = writer.writeValues(generator);

return Flux.from(inputStream)
.map(value -> encodeStreamingValue(value, bufferFactory, hints, sequenceWriter, byteBuilder,
separator));
}
catch (IOException ex) {
return Flux.error(ex);
}
}
else { // non-streaming
ResolvableType listType = ResolvableType.forClassWithGenerics(List.class, elementType);
return Flux.from(inputStream)
.collectList()
.map(list -> encodeValue(list, bufferFactory, listType, mimeType, hints))
.flux();
}

}
}

@Override
public DataBuffer encodeValue(Object value, DataBufferFactory bufferFactory,
ResolvableType valueType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {

return encodeValue(value, bufferFactory, valueType, mimeType, hints, getJsonEncoding(mimeType));
}
ObjectWriter writer = createObjectWriter(valueType, mimeType, hints);
ByteArrayBuilder byteBuilder = new ByteArrayBuilder(writer.getFactory()._getBufferRecycler());
JsonEncoding encoding = getJsonEncoding(mimeType);

private DataBuffer encodeValue(Object value, DataBufferFactory bufferFactory, ResolvableType valueType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints, JsonEncoding encoding) {
logValue(hints, value);

if (!Hints.isLoggingSuppressed(hints)) {
LogFormatUtils.traceDebug(logger, traceOn -> {
String formatted = LogFormatUtils.formatValue(value, !traceOn);
return Hints.getLogPrefix(hints) + "Encoding [" + formatted + "]";
});
try {
JsonGenerator generator = getObjectMapper().getFactory().createGenerator(byteBuilder, encoding);
writer.writeValue(generator, value);
generator.flush();
}
catch (InvalidDefinitionException ex) {
throw new CodecException("Type definition error: " + ex.getType(), ex);
}
catch (JsonProcessingException ex) {
throw new EncodingException("JSON encoding error: " + ex.getOriginalMessage(), ex);
}
catch (IOException ex) {
throw new IllegalStateException("Unexpected I/O error while writing to byte array builder",
ex);
}

JavaType javaType = getJavaType(valueType.getType(), null);
Class<?> jsonView = (hints != null ? (Class<?>) hints.get(Jackson2CodecSupport.JSON_VIEW_HINT) : null);
ObjectWriter writer = (jsonView != null ?
getObjectMapper().writerWithView(jsonView) : getObjectMapper().writer());
byte[] bytes = byteBuilder.toByteArray();
DataBuffer buffer = bufferFactory.allocateBuffer(bytes.length);
buffer.write(bytes);

if (javaType.isContainerType()) {
writer = writer.forType(javaType);
}
return buffer;
}

writer = customizeWriter(writer, mimeType, valueType, hints);
private DataBuffer encodeStreamingValue(Object value, DataBufferFactory bufferFactory, @Nullable Map<String, Object> hints,
SequenceWriter sequenceWriter, ByteArrayBuilder byteArrayBuilder, byte[] separator) {

DataBuffer buffer = bufferFactory.allocateBuffer();
boolean release = true;
OutputStream outputStream = buffer.asOutputStream();
logValue(hints, value);

try {
JsonGenerator generator = getObjectMapper().getFactory().createGenerator(outputStream, encoding);
writer.writeValue(generator, value);
generator.flush();
release = false;
sequenceWriter.write(value);
sequenceWriter.flush();
}
catch (InvalidDefinitionException ex) {
throw new CodecException("Type definition error: " + ex.getType(), ex);
Expand All @@ -189,24 +198,70 @@ private DataBuffer encodeValue(Object value, DataBufferFactory bufferFactory, Re
throw new EncodingException("JSON encoding error: " + ex.getOriginalMessage(), ex);
}
catch (IOException ex) {
throw new IllegalStateException("Unexpected I/O error while writing to data buffer",
throw new IllegalStateException("Unexpected I/O error while writing to byte array builder",
ex);
}
finally {
if (release) {
DataBufferUtils.release(buffer);
}

byte[] bytes = byteArrayBuilder.toByteArray();
byteArrayBuilder.reset();

int offset;
int length;
if (bytes.length > 0 && bytes[0] == ' ') {
// SequenceWriter writes an unnecessary space in between values
offset = 1;
length = bytes.length - 1;
}
else {
offset = 0;
length = bytes.length;
}
DataBuffer buffer = bufferFactory.allocateBuffer(length + separator.length);
buffer.write(bytes, offset, length);
buffer.write(separator);

return buffer;
}

private void logValue(@Nullable Map<String, Object> hints, Object value) {
if (!Hints.isLoggingSuppressed(hints)) {
LogFormatUtils.traceDebug(logger, traceOn -> {
String formatted = LogFormatUtils.formatValue(value, !traceOn);
return Hints.getLogPrefix(hints) + "Encoding [" + formatted + "]";
});
}
}

private ObjectWriter createObjectWriter(ResolvableType valueType, @Nullable MimeType mimeType,
@Nullable Map<String, Object> hints) {
JavaType javaType = getJavaType(valueType.getType(), null);
Class<?> jsonView = (hints != null ? (Class<?>) hints.get(Jackson2CodecSupport.JSON_VIEW_HINT) : null);
ObjectWriter writer = (jsonView != null ?
getObjectMapper().writerWithView(jsonView) : getObjectMapper().writer());

if (javaType.isContainerType()) {
writer = writer.forType(javaType);
}

return customizeWriter(writer, mimeType, valueType, hints);
}

protected ObjectWriter customizeWriter(ObjectWriter writer, @Nullable MimeType mimeType,
ResolvableType elementType, @Nullable Map<String, Object> hints) {

return writer;
}

@Nullable
private byte[] streamSeparator(@Nullable MimeType mimeType) {
for (MediaType streamingMediaType : this.streamingMediaTypes) {
if (streamingMediaType.isCompatibleWith(mimeType)) {
return STREAM_SEPARATORS.getOrDefault(streamingMediaType, NEWLINE_SEPARATOR);
}
}
return null;
}

/**
* Determine the JSON encoding to use for the given mime type.
* @param mimeType the mime type as requested by the caller
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,17 +20,18 @@
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.List;
import java.util.function.Consumer;

import com.fasterxml.jackson.databind.MappingIterator;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.testfixture.codec.AbstractEncoderTests;
import org.springframework.core.testfixture.io.buffer.DataBufferTestUtils;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
import org.springframework.util.MimeType;
Expand Down Expand Up @@ -59,21 +60,6 @@ public Jackson2SmileEncoderTests() {

}

public Consumer<DataBuffer> pojoConsumer(Pojo expected) {
return dataBuffer -> {
try {
Pojo actual = this.mapper.reader().forType(Pojo.class)
.readValue(DataBufferTestUtils.dumpBytes(dataBuffer));
assertThat(actual).isEqualTo(expected);
release(dataBuffer);
}
catch (IOException ex) {
throw new UncheckedIOException(ex);
}
};
}


@Override
@Test
public void canEncode() {
Expand Down Expand Up @@ -106,7 +92,19 @@ public void encode() {
Flux<Pojo> input = Flux.fromIterable(list);

testEncode(input, Pojo.class, step -> step
.consumeNextWith(expect(list, List.class)));
.consumeNextWith(dataBuffer -> {
try {
Object actual = this.mapper.reader().forType(List.class)
.readValue(dataBuffer.asInputStream());
assertThat(actual).isEqualTo(list);
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
finally {
release(dataBuffer);
}
}));
}

@Test
Expand All @@ -127,32 +125,22 @@ public void encodeAsStream() throws Exception {
Flux<Pojo> input = Flux.just(pojo1, pojo2, pojo3);
ResolvableType type = ResolvableType.forClass(Pojo.class);

testEncodeAll(input, type, step -> step
.consumeNextWith(expect(pojo1, Pojo.class))
.consumeNextWith(expect(pojo2, Pojo.class))
.consumeNextWith(expect(pojo3, Pojo.class))
.verifyComplete(),
STREAM_SMILE_MIME_TYPE, null);
Flux<DataBuffer> result = this.encoder
.encode(input, bufferFactory, type, STREAM_SMILE_MIME_TYPE, null);

Mono<MappingIterator<Pojo>> joined = DataBufferUtils.join(result)
.map(buffer -> {
try {
return this.mapper.reader().forType(Pojo.class).readValues(buffer.asInputStream(true));
}
catch (IOException ex) {
throw new UncheckedIOException(ex);
}
});

StepVerifier.create(joined)
.assertNext(iter -> assertThat(iter).toIterable().contains(pojo1, pojo2, pojo3))
.verifyComplete();
}


private <T> Consumer<DataBuffer> expect(T expected, Class<T> expectedType) {
return dataBuffer -> {
try {
Object actual = this.mapper.reader().forType(expectedType)
.readValue(dataBuffer.asInputStream());
assertThat(actual).isEqualTo(expected);
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
finally {
release(dataBuffer);
}
};

}



}

0 comments on commit 54669c5

Please sign in to comment.