Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prototype decodeToSequence to read multiple objects from stream lazily #1691

Merged
merged 9 commits into from Nov 11, 2021
10 changes: 10 additions & 0 deletions formats/json/api/kotlinx-serialization-json.api
Expand Up @@ -342,6 +342,16 @@ public abstract class kotlinx/serialization/json/JsonTransformingSerializer : ko

public final class kotlinx/serialization/json/JvmStreamsKt {
public static final fun decodeFromStream (Lkotlinx/serialization/json/Json;Lkotlinx/serialization/DeserializationStrategy;Ljava/io/InputStream;)Ljava/lang/Object;
public static final fun decodeToSequence (Lkotlinx/serialization/json/Json;Ljava/io/InputStream;Lkotlinx/serialization/DeserializationStrategy;Lkotlinx/serialization/json/LazyStreamingFormat;)Lkotlin/sequences/Sequence;
public static synthetic fun decodeToSequence$default (Lkotlinx/serialization/json/Json;Ljava/io/InputStream;Lkotlinx/serialization/DeserializationStrategy;Lkotlinx/serialization/json/LazyStreamingFormat;ILjava/lang/Object;)Lkotlin/sequences/Sequence;
public static final fun encodeToStream (Lkotlinx/serialization/json/Json;Lkotlinx/serialization/SerializationStrategy;Ljava/lang/Object;Ljava/io/OutputStream;)V
}

public final class kotlinx/serialization/json/LazyStreamingFormat : java/lang/Enum {
public static final field ARRAY_WRAPPED Lkotlinx/serialization/json/LazyStreamingFormat;
public static final field AUTO_DETECT Lkotlinx/serialization/json/LazyStreamingFormat;
public static final field WHITESPACE_SEPARATED Lkotlinx/serialization/json/LazyStreamingFormat;
public static fun valueOf (Ljava/lang/String;)Lkotlinx/serialization/json/LazyStreamingFormat;
public static fun values ()[Lkotlinx/serialization/json/LazyStreamingFormat;
}

Expand Up @@ -139,6 +139,8 @@ internal abstract class AbstractJsonLexer {

open fun ensureHaveChars() {}

fun isNotEof(): Boolean = peekNextToken() != TC_EOF

// Used as bound check in loops
abstract fun definitelyNotEof(position: Int): Int

Expand All @@ -158,7 +160,7 @@ internal abstract class AbstractJsonLexer {
fun expectEof() {
val nextToken = consumeNextToken()
if (nextToken != TC_EOF)
fail("Expected EOF after parsing an object, but had ${source[currentPosition - 1]} instead")
fail("Expected EOF after parsing, but had ${source[currentPosition - 1]} instead")
}

/*
Expand Down Expand Up @@ -202,7 +204,7 @@ internal abstract class AbstractJsonLexer {
fail(charToTokenClass(expected))
}

protected fun fail(expectedToken: Byte) {
internal fun fail(expectedToken: Byte): Nothing {
// We know that the token was consumed prior to this call
// Slow path, never called in normal code, can avoid optimizing it
val expected = when (expectedToken) {
Expand Down
104 changes: 103 additions & 1 deletion formats/json/jvmMain/src/kotlinx/serialization/json/JvmStreams.kt
Expand Up @@ -68,7 +68,7 @@ public fun <T> Json.decodeFromStream(
}

/**
* Deserializes the contents of given [stream] to to the value of type [T] using UTF-8 encoding and
* Deserializes the contents of given [stream] to the value of type [T] using UTF-8 encoding and
* deserializer retrieved from the reified type parameter.
*
* Note that this functions expects that exactly one object would be present in the stream
Expand All @@ -80,3 +80,105 @@ public fun <T> Json.decodeFromStream(
@ExperimentalSerializationApi
public inline fun <reified T> Json.decodeFromStream(stream: InputStream): T =
decodeFromStream(serializersModule.serializer(), stream)

/**
* Description of [decodeToSequence]'s JSON input shape.
*
* The sequence represents a stream of objects parsed one by one;
* [LazyStreamingFormat] defines a separator between these objects.
* Typically, these objects are not separated by meaningful characters ([WHITESPACE_SEPARATED]),
* or the whole stream is a large array of objects separated with commas ([ARRAY_WRAPPED]).
*/
@ExperimentalSerializationApi
public enum class LazyStreamingFormat {
sandwwraith marked this conversation as resolved.
Show resolved Hide resolved
/**
* Declares that objects in the input stream are separated by whitespace characters.
*
* The stream is read as multiple JSON objects separated by any number of whitespace characters between objects.
sandwwraith marked this conversation as resolved.
Show resolved Hide resolved
* Each individual object is parsed lazily, when it is requested from the resulting sequence.
*
* Whitespace character is either ' ', '\n', '\r' or '\t'.
*
* Example of `WHITESPACE_SEPARATED` stream content:
* ```
* """{"key": "value"}{"key": "value2"} {"key2": "value2"}"""
* ```
*/
WHITESPACE_SEPARATED,

/**
* Declares that objects in the input stream are wrapped in the JSON array.
* Each individual object in the array is parsed lazily when it is requested from the resulting sequence.
*
* The stream is read as multiple JSON objects wrapped into a JSON array.
* The stream must start with an array start character `[` and end with an array end character `]`,
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
* otherwise, [JsonDecodingException] is thrown.
*
* Example of `ARRAY_WRAPPED` stream content:
* ```
* """[{"key": "value"}, {"key": "value2"},{"key2": "value2"}]"""
* ```
*/
ARRAY_WRAPPED,

/**
* Declares that parser itself should select between [WHITESPACE_SEPARATED] and [ARRAY_WRAPPED] modes.
* The selection is performed by looking on the first meaningful character of the stream.
*
* In most cases, auto-detection is sufficient to correctly parse an input.
* If the input is _whitespace-separated stream of the arrays_, parser could select an incorrect mode,
* for that [LazyStreamingFormat] must be specified explicitly.
*
* Example of an exceptional case:
* `[1, 2, 3] [4, 5, 6]\n[7, 8, 9]`
*/
AUTO_DETECT;
}

/**
* Transforms the given [stream] into lazily deserialized sequence of elements of type [T] using UTF-8 encoding and [deserializer].
* Unlike [decodeFromStream], [stream] is allowed to have more than one element, separated as [format] declares.
*
* Elements must all be of type [T].
* Elements are parsed lazily when resulting [Sequence] is evaluated.
* Resulting sequence is tied to the stream and can be evaluated only once.
*
* **Resource caution:** this method neither closes the [stream] when the parsing is finished nor provides a method to close it manually.
* It is a caller responsibility to hold a reference to a stream and close it. Moreover, because stream is parsed lazily,
* closing it before returned sequence is evaluated completely will result in [IOException] from decoder.
*
* @throws [SerializationException] if the given JSON input cannot be deserialized to the value of type [T].
* @throws [IOException] If an I/O error occurs and stream can't be read from.
*/
@ExperimentalSerializationApi
public fun <T> Json.decodeToSequence(
stream: InputStream,
deserializer: DeserializationStrategy<T>,
format: LazyStreamingFormat = LazyStreamingFormat.AUTO_DETECT
): Sequence<T> {
val lexer = ReaderJsonLexer(stream)
val iter = JsonIterator(format, this, lexer, deserializer)
return Sequence { iter }.constrainOnce()
}

/**
* Transforms the given [stream] into lazily deserialized sequence of elements of type [T] using UTF-8 encoding and deserializer retrieved from the reified type parameter.
* Unlike [decodeFromStream], [stream] is allowed to have more than one element, separated as [format] declares.
*
* Elements must all be of type [T].
* Elements are parsed lazily when resulting [Sequence] is evaluated.
* Resulting sequence is tied to the stream and constrained to be evaluated only once.
*
* **Resource caution:** this method does not close [stream] when the parsing is finished neither provides method to close it manually.
* It is a caller responsibility to hold a reference to a stream and close it. Moreover, because stream is parsed lazily,
* closing it before returned sequence is evaluated fully would result in [IOException] from decoder.
*
* @throws [SerializationException] if the given JSON input cannot be deserialized to the value of type [T].
* @throws [IOException] If an I/O error occurs and stream can't be read from.
*/
@ExperimentalSerializationApi
public inline fun <reified T> Json.decodeToSequence(
stream: InputStream,
format: LazyStreamingFormat = LazyStreamingFormat.AUTO_DETECT
): Sequence<T> = decodeToSequence(stream, serializersModule.serializer(), format)

@@ -0,0 +1,99 @@
/*
* Copyright 2017-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

@file:Suppress("FunctionName")
@file:OptIn(ExperimentalSerializationApi::class)

package kotlinx.serialization.json.internal

import kotlinx.serialization.DeserializationStrategy
import kotlinx.serialization.ExperimentalSerializationApi
import kotlinx.serialization.json.*

internal fun <T> JsonIterator(
mode: LazyStreamingFormat,
json: Json,
lexer: ReaderJsonLexer,
deserializer: DeserializationStrategy<T>
): Iterator<T> = when (lexer.determineFormat(mode)) {
LazyStreamingFormat.WHITESPACE_SEPARATED -> JsonIteratorWsSeparated(
json,
lexer,
deserializer
) // Can be many WS-separated independent arrays
LazyStreamingFormat.ARRAY_WRAPPED -> JsonIteratorArrayWrapped(
json,
lexer,
deserializer
)
LazyStreamingFormat.AUTO_DETECT -> error("AbstractJsonLexer.determineFormat must be called beforehand.")
}


private fun AbstractJsonLexer.determineFormat(suggested: LazyStreamingFormat): LazyStreamingFormat = when (suggested) {
LazyStreamingFormat.WHITESPACE_SEPARATED ->
LazyStreamingFormat.WHITESPACE_SEPARATED // do not call consumeStartArray here so we don't confuse parser with stream of lists
LazyStreamingFormat.ARRAY_WRAPPED ->
if (tryConsumeStartArray()) LazyStreamingFormat.ARRAY_WRAPPED
else fail(TC_BEGIN_LIST)
LazyStreamingFormat.AUTO_DETECT ->
if (tryConsumeStartArray()) LazyStreamingFormat.ARRAY_WRAPPED
else LazyStreamingFormat.WHITESPACE_SEPARATED
}

private fun AbstractJsonLexer.tryConsumeStartArray(): Boolean {
if (peekNextToken() == TC_BEGIN_LIST) {
consumeNextToken(TC_BEGIN_LIST)
return true
}
return false
}

private class JsonIteratorWsSeparated<T>(
private val json: Json,
private val lexer: ReaderJsonLexer,
private val deserializer: DeserializationStrategy<T>
) : Iterator<T> {
override fun next(): T =
StreamingJsonDecoder(json, WriteMode.OBJ, lexer, deserializer.descriptor)
.decodeSerializableValue(deserializer)

override fun hasNext(): Boolean = lexer.isNotEof()
sandwwraith marked this conversation as resolved.
Show resolved Hide resolved
}

private class JsonIteratorArrayWrapped<T>(
private val json: Json,
private val lexer: ReaderJsonLexer,
private val deserializer: DeserializationStrategy<T>
) : Iterator<T> {
private var first = true

override fun next(): T {
if (first) {
first = false
} else {
lexer.consumeNextToken(COMMA)
}
val input = StreamingJsonDecoder(json, WriteMode.OBJ, lexer, deserializer.descriptor)
return input.decodeSerializableValue(deserializer)
}

/**
* Note: if array separator (comma) is missing, hasNext() returns true, but next() throws an exception.
*/
override fun hasNext(): Boolean {
if (lexer.peekNextToken() == TC_END_LIST) {
lexer.consumeNextToken(TC_END_LIST)
if (lexer.isNotEof()) {
if (lexer.peekNextToken() == TC_BEGIN_LIST) lexer.fail("There is a start of the new array after the one parsed to sequence. " +
"${LazyStreamingFormat.ARRAY_WRAPPED.name} mode doesn't merge consecutive arrays.\n" +
"If you need to parse a stream of arrays, please use ${LazyStreamingFormat.WHITESPACE_SEPARATED.name} mode instead.")
lexer.expectEof()
}
return false
}
if (!lexer.isNotEof()) lexer.fail(TC_END_LIST)
return true
}
}