Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prototype decodeToSequence to read multiple objects from stream lazily #1691

Merged
merged 9 commits into from Nov 11, 2021
10 changes: 10 additions & 0 deletions formats/json/api/kotlinx-serialization-json.api
Expand Up @@ -342,6 +342,16 @@ public abstract class kotlinx/serialization/json/JsonTransformingSerializer : ko

public final class kotlinx/serialization/json/JvmStreamsKt {
public static final fun decodeFromStream (Lkotlinx/serialization/json/Json;Lkotlinx/serialization/DeserializationStrategy;Ljava/io/InputStream;)Ljava/lang/Object;
public static final fun decodeToSequence (Lkotlinx/serialization/json/Json;Ljava/io/InputStream;Lkotlinx/serialization/DeserializationStrategy;Lkotlinx/serialization/json/LazyStreamingFormat;)Lkotlin/sequences/Sequence;
public static synthetic fun decodeToSequence$default (Lkotlinx/serialization/json/Json;Ljava/io/InputStream;Lkotlinx/serialization/DeserializationStrategy;Lkotlinx/serialization/json/LazyStreamingFormat;ILjava/lang/Object;)Lkotlin/sequences/Sequence;
public static final fun encodeToStream (Lkotlinx/serialization/json/Json;Lkotlinx/serialization/SerializationStrategy;Ljava/lang/Object;Ljava/io/OutputStream;)V
}

public final class kotlinx/serialization/json/LazyStreamingFormat : java/lang/Enum {
public static final field ARRAY_WRAPPED Lkotlinx/serialization/json/LazyStreamingFormat;
public static final field AUTO_DETECT Lkotlinx/serialization/json/LazyStreamingFormat;
public static final field WHITESPACE_SEPARATED Lkotlinx/serialization/json/LazyStreamingFormat;
public static fun valueOf (Ljava/lang/String;)Lkotlinx/serialization/json/LazyStreamingFormat;
public static fun values ()[Lkotlinx/serialization/json/LazyStreamingFormat;
}

Expand Up @@ -139,6 +139,8 @@ internal abstract class AbstractJsonLexer {

open fun ensureHaveChars() {}

fun isNotEof(): Boolean = definitelyNotEof(currentPosition) != -1

// Used as bound check in loops
abstract fun definitelyNotEof(position: Int): Int

Expand All @@ -158,7 +160,7 @@ internal abstract class AbstractJsonLexer {
fun expectEof() {
val nextToken = consumeNextToken()
if (nextToken != TC_EOF)
fail("Expected EOF after parsing an object, but had ${source[currentPosition - 1]} instead")
fail("Expected EOF after parsing, but had ${source[currentPosition - 1]} instead")
}

/*
Expand Down Expand Up @@ -202,7 +204,7 @@ internal abstract class AbstractJsonLexer {
fail(charToTokenClass(expected))
}

protected fun fail(expectedToken: Byte) {
internal fun fail(expectedToken: Byte): Nothing {
// We know that the token was consumed prior to this call
// Slow path, never called in normal code, can avoid optimizing it
val expected = when (expectedToken) {
Expand Down
Expand Up @@ -68,7 +68,7 @@ public fun <T> Json.decodeFromStream(
}

/**
* Deserializes the contents of given [stream] to to the value of type [T] using UTF-8 encoding and
* Deserializes the contents of given [stream] to the value of type [T] using UTF-8 encoding and
* deserializer retrieved from the reified type parameter.
*
* Note that this functions expects that exactly one object would be present in the stream
Expand All @@ -80,3 +80,93 @@ public fun <T> Json.decodeFromStream(
@ExperimentalSerializationApi
public inline fun <reified T> Json.decodeFromStream(stream: InputStream): T =
decodeFromStream(serializersModule.serializer(), stream)

/**
* Description of [decodeToSequence]'s JSON input shape.
*
* Sequence represents a stream of objects parsed one-by-one;
sandwwraith marked this conversation as resolved.
Show resolved Hide resolved
* [LazyStreamingFormat] defines a separator between these objects.
* Normally, these objects are not separated by meaningful characters ([WHITESPACE_SEPARATED])
sandwwraith marked this conversation as resolved.
Show resolved Hide resolved
* or the whole stream is a large array and therefore objects are separated with commas ([ARRAY_WRAPPED]).
sandwwraith marked this conversation as resolved.
Show resolved Hide resolved
*/
@ExperimentalSerializationApi
public enum class LazyStreamingFormat {
sandwwraith marked this conversation as resolved.
Show resolved Hide resolved
/**
* Declares that objects in the input stream are not separated by meaningful characters.
sandwwraith marked this conversation as resolved.
Show resolved Hide resolved
*
* Stream must start with a first object and there are some (maybe none) whitespace chars between objects.
* Whitespace character is either ' ', '\n', '\r' or '\t'.
*/
WHITESPACE_SEPARATED,

/**
* Declares that objects in the input stream are wrapped in the array. Elements of the array are still parsed lazily,
sandwwraith marked this conversation as resolved.
Show resolved Hide resolved
* so there shouldn't be problems if array total size is larger than application memory.
sandwwraith marked this conversation as resolved.
Show resolved Hide resolved
*
* Stream must start with json array start character and objects in it must be separated with commas and optional whitespaces.
* Stream must end with array end character, otherwise, [JsonDecodingException] would be thrown.
* Dangling chars after array end are not permitted.
*/
ARRAY_WRAPPED,

/**
* Declares that parser itself should select between [WHITESPACE_SEPARATED] and [ARRAY_WRAPPED] modes.
* Selection is performed by looking on the first meaningful character of the stream.
*
* In most of the cases, auto detection is sufficient to correctly parse input.
sandwwraith marked this conversation as resolved.
Show resolved Hide resolved
* However, in the input is _whitespace-separated stream of the arrays_, parser would select incorrect mode.
* For such cases, [LazyStreamingFormat] must be specified explicitly.
*
* Example of exceptional case:
* `[1, 2, 3] [4, 5, 6]\n[7, 8, 9]`
*/
AUTO_DETECT;
}

/**
* Deserializes the contents of given [stream] using UTF-8 encoding and [deserializer].
sandwwraith marked this conversation as resolved.
Show resolved Hide resolved
* Unlike [decodeFromStream], [stream] is allowed to have more than one element, separated as [format] declares.
*
* Elements must all be of type [T].
* Elements are parsed lazily when resulting [Sequence] is evaluated.
* Resulting sequence is tied to the stream and constrained to be evaluated only once.
*
* **Resource caution:** this method does not close [stream] when the parsing is finished neither provides method to close it manually.
* It is a caller responsibility to hold a reference to a stream and close it. Moreover, because stream is parsed lazily,
* closing it before returned sequence is evaluated fully would result in [IOException] from decoder.
*
* @throws [SerializationException] if the given JSON input cannot be deserialized to the value of type [T].
* @throws [IOException] If an I/O error occurs and stream can't be read from.
*/
@ExperimentalSerializationApi
public fun <T> Json.decodeToSequence(
stream: InputStream,
deserializer: DeserializationStrategy<T>,
format: LazyStreamingFormat = LazyStreamingFormat.AUTO_DETECT
): Sequence<T> {
val lexer = ReaderJsonLexer(stream)
val iter = JsonIterator(format, this, lexer, deserializer)
return Sequence { iter }.constrainOnce()
}

/**
* Deserializes the contents of given [stream] using UTF-8 encoding and deserializer retrieved from the reified type parameter.
* Unlike [decodeFromStream], [stream] is allowed to have more than one element, separated as [format] declares.
*
* Elements must all be of type [T].
* Elements are parsed lazily when resulting [Sequence] is evaluated.
* Resulting sequence is tied to the stream and constrained to be evaluated only once.
*
* **Resource caution:** this method does not close [stream] when the parsing is finished neither provides method to close it manually.
* It is a caller responsibility to hold a reference to a stream and close it. Moreover, because stream is parsed lazily,
* closing it before returned sequence is evaluated fully would result in [IOException] from decoder.
*
* @throws [SerializationException] if the given JSON input cannot be deserialized to the value of type [T].
* @throws [IOException] If an I/O error occurs and stream can't be read from.
*/
@ExperimentalSerializationApi
public inline fun <reified T> Json.decodeToSequence(
stream: InputStream,
format: LazyStreamingFormat = LazyStreamingFormat.AUTO_DETECT
): Sequence<T> = decodeToSequence(stream, serializersModule.serializer(), format)

@@ -0,0 +1,100 @@
/*
* Copyright 2017-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

@file:Suppress("FunctionName")
@file:OptIn(ExperimentalSerializationApi::class)

package kotlinx.serialization.json.internal

import kotlinx.serialization.DeserializationStrategy
import kotlinx.serialization.ExperimentalSerializationApi
import kotlinx.serialization.json.*

internal fun <T> JsonIterator(
mode: LazyStreamingFormat,
json: Json,
lexer: ReaderJsonLexer,
deserializer: DeserializationStrategy<T>
): Iterator<T> = when (lexer.determineFormat(mode)) {
LazyStreamingFormat.WHITESPACE_SEPARATED -> JsonIteratorWsSeparated(
json,
lexer,
deserializer
) // Can be many WS-separated independent arrays
LazyStreamingFormat.ARRAY_WRAPPED -> JsonIteratorArrayWrapped(
json,
lexer,
deserializer
)
LazyStreamingFormat.AUTO_DETECT -> error("AbstractJsonLexer.determineFormat must be called beforehand.")
}


private fun AbstractJsonLexer.determineFormat(suggested: LazyStreamingFormat): LazyStreamingFormat = when (suggested) {
LazyStreamingFormat.WHITESPACE_SEPARATED ->
LazyStreamingFormat.WHITESPACE_SEPARATED // do not call consumeStartArray here so we don't confuse parser with stream of lists
LazyStreamingFormat.ARRAY_WRAPPED ->
if (tryConsumeStartArray()) LazyStreamingFormat.ARRAY_WRAPPED
else fail(TC_BEGIN_LIST)
LazyStreamingFormat.AUTO_DETECT ->
if (tryConsumeStartArray()) LazyStreamingFormat.ARRAY_WRAPPED
else LazyStreamingFormat.WHITESPACE_SEPARATED
}

private fun AbstractJsonLexer.tryConsumeStartArray(): Boolean {
if (peekNextToken() == TC_BEGIN_LIST) {
consumeNextToken(TC_BEGIN_LIST)
return true
}
return false
}

private class JsonIteratorWsSeparated<T>(
private val json: Json,
private val lexer: ReaderJsonLexer,
private val deserializer: DeserializationStrategy<T>
) : Iterator<T> {
override fun next(): T =
StreamingJsonDecoder(json, WriteMode.OBJ, lexer, deserializer.descriptor)
.decodeSerializableValue(deserializer)

override fun hasNext(): Boolean = lexer.isNotEof()
sandwwraith marked this conversation as resolved.
Show resolved Hide resolved
}

private class JsonIteratorArrayWrapped<T>(
private val json: Json,
private val lexer: ReaderJsonLexer,
private val deserializer: DeserializationStrategy<T>
) : Iterator<T> {
private var first = true

override fun next(): T {
if (first) {
first = false
} else {
lexer.consumeNextToken(COMMA)
}
val input = StreamingJsonDecoder(json, WriteMode.OBJ, lexer, deserializer.descriptor)
return input.decodeSerializableValue(deserializer)
}

/**
* Note: if array separator (comma) is missing, hasNext() returns true, but next() throws an exception.
*/
override fun hasNext(): Boolean {
if (lexer.peekNextToken() == TC_END_LIST) {
lexer.consumeNextToken(TC_END_LIST)
if (lexer.isNotEof()) {
if (lexer.peekNextToken() == TC_BEGIN_LIST) lexer.fail("There is a start of the new array after the one parsed to sequence. " +
"${LazyStreamingFormat.ARRAY_WRAPPED.name} mode doesn't merge consecutive arrays.\n" +
"If you need to parse a stream of arrays, please use ${LazyStreamingFormat.WHITESPACE_SEPARATED.name} mode instead.")
lexer.expectEof()
}
return false
}
// We may allow unclosed bracket on .isLenient, but it's usually not what this flag do
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
if (!lexer.isNotEof()) lexer.fail(TC_END_LIST)
return true
}
}
Expand Up @@ -4,16 +4,17 @@

package kotlinx.serialization.features

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import kotlinx.serialization.SerializationException
import kotlinx.serialization.StringData
import kotlinx.serialization.*
import kotlinx.serialization.builtins.serializer
import kotlinx.serialization.json.*
import kotlinx.serialization.json.internal.JsonDecodingException
import kotlinx.serialization.test.assertFailsWithMessage
import org.junit.Ignore
import org.junit.Test
import java.io.*
import kotlin.test.assertEquals
import kotlin.test.*

class JsonStreamFlowTest {
val json = Json {}
sandwwraith marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -24,7 +25,16 @@ class JsonStreamFlowTest {
}
}

val inputString = """{"data":"a"}{"data":"b"}{"data":"c"}"""
private suspend inline fun <reified T> Json.readFromStream(iss: InputStream): Flow<T> = flow {
val serial = serializer<T>()
val iter = iterateOverStream(iss, serial)
while (iter.hasNext()) {
emit(iter.next())
}
}.flowOn(Dispatchers.IO)

val inputStringWsSeparated = """{"data":"a"}{"data":"b"}{"data":"c"}"""
val inputStringWrapped = """[{"data":"a"},{"data":"b"},{"data":"c"}]"""
sandwwraith marked this conversation as resolved.
Show resolved Hide resolved
val inputList = listOf(StringData("a"), StringData("b"), StringData("c"))

@Test
Expand All @@ -36,16 +46,102 @@ class JsonStreamFlowTest {
f.writeToStream(os)
}

assertEquals(inputString, os.toString(Charsets.UTF_8.name()))
assertEquals(inputStringWsSeparated, os.toString(Charsets.UTF_8.name()))
}

@Test
fun testDecodeSeveralItems() {
val ins = ByteArrayInputStream(inputString.encodeToByteArray())
val ins = ByteArrayInputStream(inputStringWsSeparated.encodeToByteArray())
assertFailsWithMessage<SerializationException>("EOF") {
json.decodeFromStream<StringData>(ins)
}
}

private inline fun <reified T> Iterator<T>.assertNext(expected: T) {
sandwwraith marked this conversation as resolved.
Show resolved Hide resolved
assertTrue(hasNext())
assertEquals(expected, next())
}

private fun <T> Json.iterateOverStream(stream: InputStream, deserializer: DeserializationStrategy<T>): Iterator<T> =
decodeToSequence(stream, deserializer).iterator()

private fun withInputs(vararg inputs: String = arrayOf(inputStringWsSeparated, inputStringWrapped), block: (InputStream) -> Unit) {
for (input in inputs) {
val res = runCatching { block(input.asInputStream()) }
if (res.isFailure) throw AssertionError("Failed test with input $input", res.exceptionOrNull())
}
}

private fun String.asInputStream() = ByteArrayInputStream(this.encodeToByteArray())

@Test
fun testIterateSeveralItems() = withInputs { ins ->
val iter = json.iterateOverStream(ins, StringData.serializer())
iter.assertNext(StringData("a"))
iter.assertNext(StringData("b"))
iter.assertNext(StringData("c"))
assertFalse(iter.hasNext())
assertFailsWithMessage<SerializationException>("EOF") {
iter.next()
}
}

@Test
fun testDecodeToSequence() = withInputs { ins ->
val sequence = json.decodeToSequence(ins, StringData.serializer())
assertEquals(inputList, sequence.toList(), "For input $inputStringWsSeparated")
assertFailsWith<IllegalStateException> { sequence.toList() } // assert constrained once
}

@Test
fun testDecodeAsFlow() = withInputs { ins ->
val list = runBlocking {
buildList { json.readFromStream<StringData>(ins).toCollection(this) }
}
assertEquals(inputList, list)
}

@Test
fun testItemsSeparatedByWs() {
val input = "{\"data\":\"a\"} {\"data\":\"b\"}\n\t{\"data\":\"c\"}"
val ins = ByteArrayInputStream(input.encodeToByteArray())
assertEquals(inputList, json.decodeToSequence(ins, StringData.serializer()).toList())
}

@Test
fun testMalformedArray() {
val input1 = """[1, 2, 3"""
val input2 = """[1, 2, 3]qwert"""
val input3 = """[1,2 3]"""
withInputs(input1, input2, input3) {
assertFailsWith<JsonDecodingException> {
json.decodeToSequence(it, Int.serializer()).toList()
}
}
}

@Test
fun testMultilineArrays() {
val input = "[1,2,3]\n[4,5,6]\n[7,8,9]"
assertFailsWith<JsonDecodingException> {
json.decodeToSequence<List<Int>>(input.asInputStream(), LazyStreamingFormat.AUTO_DETECT).toList()
}
assertFailsWith<JsonDecodingException> {
json.decodeToSequence<Int>(input.asInputStream(), LazyStreamingFormat.AUTO_DETECT).toList()
}
assertFailsWith<JsonDecodingException> { // we do not merge lists
json.decodeToSequence<Int>(input.asInputStream(), LazyStreamingFormat.ARRAY_WRAPPED).toList()
}
val parsed = json.decodeToSequence<List<Int>>(input.asInputStream(), LazyStreamingFormat.WHITESPACE_SEPARATED).toList()
val expected = listOf(listOf(1,2,3), listOf(4,5,6), listOf(7,8,9))
assertEquals(expected, parsed)
}

@Test
fun testStrictArrayCheck() {
assertFailsWith<JsonDecodingException> {
json.decodeToSequence<StringData>(inputStringWsSeparated.asInputStream(), LazyStreamingFormat.ARRAY_WRAPPED)
}
}

}