Skip to content

Commit

Permalink
Provide decodeToSequence to read multiple objects from stream lazily (#…
Browse files Browse the repository at this point in the history
…1691)

Hide JsonIterator and provide DecodeSequenceMode

Fixes #1662

Co-authored-by: Vsevolod Tolstopyatov <qwwdfsad@gmail.com>
  • Loading branch information
sandwwraith and qwwdfsad committed Nov 11, 2021
1 parent 3999818 commit b211c29
Show file tree
Hide file tree
Showing 5 changed files with 336 additions and 13 deletions.
10 changes: 10 additions & 0 deletions formats/json/api/kotlinx-serialization-json.api
@@ -1,3 +1,11 @@
public final class kotlinx/serialization/json/DecodeSequenceMode : java/lang/Enum {
public static final field ARRAY_WRAPPED Lkotlinx/serialization/json/DecodeSequenceMode;
public static final field AUTO_DETECT Lkotlinx/serialization/json/DecodeSequenceMode;
public static final field WHITESPACE_SEPARATED Lkotlinx/serialization/json/DecodeSequenceMode;
public static fun valueOf (Ljava/lang/String;)Lkotlinx/serialization/json/DecodeSequenceMode;
public static fun values ()[Lkotlinx/serialization/json/DecodeSequenceMode;
}

public abstract class kotlinx/serialization/json/Json : kotlinx/serialization/StringFormat {
public static final field Default Lkotlinx/serialization/json/Json$Default;
public synthetic fun <init> (Lkotlinx/serialization/json/JsonConfiguration;Lkotlinx/serialization/modules/SerializersModule;Lkotlin/jvm/internal/DefaultConstructorMarker;)V
Expand Down Expand Up @@ -342,6 +350,8 @@ public abstract class kotlinx/serialization/json/JsonTransformingSerializer : ko

public final class kotlinx/serialization/json/JvmStreamsKt {
public static final fun decodeFromStream (Lkotlinx/serialization/json/Json;Lkotlinx/serialization/DeserializationStrategy;Ljava/io/InputStream;)Ljava/lang/Object;
public static final fun decodeToSequence (Lkotlinx/serialization/json/Json;Ljava/io/InputStream;Lkotlinx/serialization/DeserializationStrategy;Lkotlinx/serialization/json/DecodeSequenceMode;)Lkotlin/sequences/Sequence;
public static synthetic fun decodeToSequence$default (Lkotlinx/serialization/json/Json;Ljava/io/InputStream;Lkotlinx/serialization/DeserializationStrategy;Lkotlinx/serialization/json/DecodeSequenceMode;ILjava/lang/Object;)Lkotlin/sequences/Sequence;
public static final fun encodeToStream (Lkotlinx/serialization/json/Json;Lkotlinx/serialization/SerializationStrategy;Ljava/lang/Object;Ljava/io/OutputStream;)V
}

Expand Up @@ -139,6 +139,8 @@ internal abstract class AbstractJsonLexer {

open fun ensureHaveChars() {}

fun isNotEof(): Boolean = peekNextToken() != TC_EOF

// Used as bound check in loops
abstract fun prefetchOrEof(position: Int): Int

Expand All @@ -158,7 +160,7 @@ internal abstract class AbstractJsonLexer {
fun expectEof() {
val nextToken = consumeNextToken()
if (nextToken != TC_EOF)
fail("Expected EOF after parsing an object, but had ${source[currentPosition - 1]} instead")
fail("Expected EOF after parsing, but had ${source[currentPosition - 1]} instead")
}

/*
Expand Down Expand Up @@ -202,7 +204,7 @@ internal abstract class AbstractJsonLexer {
fail(charToTokenClass(expected))
}

protected fun fail(expectedToken: Byte) {
internal fun fail(expectedToken: Byte): Nothing {
// We know that the token was consumed prior to this call
// Slow path, never called in normal code, can avoid optimizing it
val expected = when (expectedToken) {
Expand Down
104 changes: 103 additions & 1 deletion formats/json/jvmMain/src/kotlinx/serialization/json/JvmStreams.kt
Expand Up @@ -68,7 +68,7 @@ public fun <T> Json.decodeFromStream(
}

/**
* Deserializes the contents of given [stream] to to the value of type [T] using UTF-8 encoding and
* Deserializes the contents of given [stream] to the value of type [T] using UTF-8 encoding and
* deserializer retrieved from the reified type parameter.
*
* Note that this functions expects that exactly one object would be present in the stream
Expand All @@ -80,3 +80,105 @@ public fun <T> Json.decodeFromStream(
@ExperimentalSerializationApi
public inline fun <reified T> Json.decodeFromStream(stream: InputStream): T =
decodeFromStream(serializersModule.serializer(), stream)

/**
* Description of [decodeToSequence]'s JSON input shape.
*
* The sequence represents a stream of objects parsed one by one;
* [DecodeSequenceMode] defines a separator between these objects.
* Typically, these objects are not separated by meaningful characters ([WHITESPACE_SEPARATED]),
* or the whole stream is a large array of objects separated with commas ([ARRAY_WRAPPED]).
*/
@ExperimentalSerializationApi
public enum class DecodeSequenceMode {
/**
* Declares that objects in the input stream are separated by whitespace characters.
*
* The stream is read as multiple JSON objects separated by any number of whitespace characters between objects. Starting and trailing whitespace characters are also permitted.
* Each individual object is parsed lazily, when it is requested from the resulting sequence.
*
* Whitespace character is either ' ', '\n', '\r' or '\t'.
*
* Example of `WHITESPACE_SEPARATED` stream content:
* ```
* """{"key": "value"}{"key": "value2"} {"key2": "value2"}"""
* ```
*/
WHITESPACE_SEPARATED,

/**
* Declares that objects in the input stream are wrapped in the JSON array.
* Each individual object in the array is parsed lazily when it is requested from the resulting sequence.
*
* The stream is read as multiple JSON objects wrapped into a JSON array.
* The stream must start with an array start character `[` and end with an array end character `]`,
* otherwise, [JsonDecodingException] is thrown.
*
* Example of `ARRAY_WRAPPED` stream content:
* ```
* """[{"key": "value"}, {"key": "value2"},{"key2": "value2"}]"""
* ```
*/
ARRAY_WRAPPED,

/**
* Declares that parser itself should select between [WHITESPACE_SEPARATED] and [ARRAY_WRAPPED] modes.
* The selection is performed by looking on the first meaningful character of the stream.
*
* In most cases, auto-detection is sufficient to correctly parse an input.
* If the input is _whitespace-separated stream of the arrays_, parser could select an incorrect mode,
* for that [DecodeSequenceMode] must be specified explicitly.
*
* Example of an exceptional case:
* `[1, 2, 3] [4, 5, 6]\n[7, 8, 9]`
*/
AUTO_DETECT;
}

/**
* Transforms the given [stream] into lazily deserialized sequence of elements of type [T] using UTF-8 encoding and [deserializer].
* Unlike [decodeFromStream], [stream] is allowed to have more than one element, separated as [format] declares.
*
* Elements must all be of type [T].
* Elements are parsed lazily when resulting [Sequence] is evaluated.
* Resulting sequence is tied to the stream and can be evaluated only once.
*
* **Resource caution:** this method neither closes the [stream] when the parsing is finished nor provides a method to close it manually.
* It is a caller responsibility to hold a reference to a stream and close it. Moreover, because stream is parsed lazily,
* closing it before returned sequence is evaluated completely will result in [IOException] from decoder.
*
* @throws [SerializationException] if the given JSON input cannot be deserialized to the value of type [T].
* @throws [IOException] If an I/O error occurs and stream can't be read from.
*/
@ExperimentalSerializationApi
public fun <T> Json.decodeToSequence(
stream: InputStream,
deserializer: DeserializationStrategy<T>,
format: DecodeSequenceMode = DecodeSequenceMode.AUTO_DETECT
): Sequence<T> {
val lexer = ReaderJsonLexer(stream)
val iter = JsonIterator(format, this, lexer, deserializer)
return Sequence { iter }.constrainOnce()
}

/**
* Transforms the given [stream] into lazily deserialized sequence of elements of type [T] using UTF-8 encoding and deserializer retrieved from the reified type parameter.
* Unlike [decodeFromStream], [stream] is allowed to have more than one element, separated as [format] declares.
*
* Elements must all be of type [T].
* Elements are parsed lazily when resulting [Sequence] is evaluated.
* Resulting sequence is tied to the stream and constrained to be evaluated only once.
*
* **Resource caution:** this method does not close [stream] when the parsing is finished neither provides method to close it manually.
* It is a caller responsibility to hold a reference to a stream and close it. Moreover, because stream is parsed lazily,
* closing it before returned sequence is evaluated fully would result in [IOException] from decoder.
*
* @throws [SerializationException] if the given JSON input cannot be deserialized to the value of type [T].
* @throws [IOException] If an I/O error occurs and stream can't be read from.
*/
@ExperimentalSerializationApi
public inline fun <reified T> Json.decodeToSequence(
stream: InputStream,
format: DecodeSequenceMode = DecodeSequenceMode.AUTO_DETECT
): Sequence<T> = decodeToSequence(stream, serializersModule.serializer(), format)

@@ -0,0 +1,99 @@
/*
* Copyright 2017-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

@file:Suppress("FunctionName")
@file:OptIn(ExperimentalSerializationApi::class)

package kotlinx.serialization.json.internal

import kotlinx.serialization.DeserializationStrategy
import kotlinx.serialization.ExperimentalSerializationApi
import kotlinx.serialization.json.*

internal fun <T> JsonIterator(
mode: DecodeSequenceMode,
json: Json,
lexer: ReaderJsonLexer,
deserializer: DeserializationStrategy<T>
): Iterator<T> = when (lexer.determineFormat(mode)) {
DecodeSequenceMode.WHITESPACE_SEPARATED -> JsonIteratorWsSeparated(
json,
lexer,
deserializer
) // Can be many WS-separated independent arrays
DecodeSequenceMode.ARRAY_WRAPPED -> JsonIteratorArrayWrapped(
json,
lexer,
deserializer
)
DecodeSequenceMode.AUTO_DETECT -> error("AbstractJsonLexer.determineFormat must be called beforehand.")
}


private fun AbstractJsonLexer.determineFormat(suggested: DecodeSequenceMode): DecodeSequenceMode = when (suggested) {
DecodeSequenceMode.WHITESPACE_SEPARATED ->
DecodeSequenceMode.WHITESPACE_SEPARATED // do not call consumeStartArray here so we don't confuse parser with stream of lists
DecodeSequenceMode.ARRAY_WRAPPED ->
if (tryConsumeStartArray()) DecodeSequenceMode.ARRAY_WRAPPED
else fail(TC_BEGIN_LIST)
DecodeSequenceMode.AUTO_DETECT ->
if (tryConsumeStartArray()) DecodeSequenceMode.ARRAY_WRAPPED
else DecodeSequenceMode.WHITESPACE_SEPARATED
}

private fun AbstractJsonLexer.tryConsumeStartArray(): Boolean {
if (peekNextToken() == TC_BEGIN_LIST) {
consumeNextToken(TC_BEGIN_LIST)
return true
}
return false
}

private class JsonIteratorWsSeparated<T>(
private val json: Json,
private val lexer: ReaderJsonLexer,
private val deserializer: DeserializationStrategy<T>
) : Iterator<T> {
override fun next(): T =
StreamingJsonDecoder(json, WriteMode.OBJ, lexer, deserializer.descriptor)
.decodeSerializableValue(deserializer)

override fun hasNext(): Boolean = lexer.isNotEof()
}

private class JsonIteratorArrayWrapped<T>(
private val json: Json,
private val lexer: ReaderJsonLexer,
private val deserializer: DeserializationStrategy<T>
) : Iterator<T> {
private var first = true

override fun next(): T {
if (first) {
first = false
} else {
lexer.consumeNextToken(COMMA)
}
val input = StreamingJsonDecoder(json, WriteMode.OBJ, lexer, deserializer.descriptor)
return input.decodeSerializableValue(deserializer)
}

/**
* Note: if array separator (comma) is missing, hasNext() returns true, but next() throws an exception.
*/
override fun hasNext(): Boolean {
if (lexer.peekNextToken() == TC_END_LIST) {
lexer.consumeNextToken(TC_END_LIST)
if (lexer.isNotEof()) {
if (lexer.peekNextToken() == TC_BEGIN_LIST) lexer.fail("There is a start of the new array after the one parsed to sequence. " +
"${DecodeSequenceMode.ARRAY_WRAPPED.name} mode doesn't merge consecutive arrays.\n" +
"If you need to parse a stream of arrays, please use ${DecodeSequenceMode.WHITESPACE_SEPARATED.name} mode instead.")
lexer.expectEof()
}
return false
}
if (!lexer.isNotEof()) lexer.fail(TC_END_LIST)
return true
}
}

0 comments on commit b211c29

Please sign in to comment.