Skip to content

Commit

Permalink
Prototype deocdeToSequence to read multiple objects from stream lazily
Browse files Browse the repository at this point in the history
Hide JsonIterator and provide LazyStreamingMode
  • Loading branch information
sandwwraith committed Oct 13, 2021
1 parent bb0d274 commit 346e5ad
Show file tree
Hide file tree
Showing 5 changed files with 308 additions and 10 deletions.
10 changes: 10 additions & 0 deletions formats/json/api/kotlinx-serialization-json.api
Expand Up @@ -342,6 +342,16 @@ public abstract class kotlinx/serialization/json/JsonTransformingSerializer : ko

public final class kotlinx/serialization/json/JvmStreamsKt {
public static final fun decodeFromStream (Lkotlinx/serialization/json/Json;Lkotlinx/serialization/DeserializationStrategy;Ljava/io/InputStream;)Ljava/lang/Object;
public static final fun decodeToSequence (Lkotlinx/serialization/json/Json;Ljava/io/InputStream;Lkotlinx/serialization/DeserializationStrategy;Lkotlinx/serialization/json/LazyStreamingFormat;)Lkotlin/sequences/Sequence;
public static synthetic fun decodeToSequence$default (Lkotlinx/serialization/json/Json;Ljava/io/InputStream;Lkotlinx/serialization/DeserializationStrategy;Lkotlinx/serialization/json/LazyStreamingFormat;ILjava/lang/Object;)Lkotlin/sequences/Sequence;
public static final fun encodeToStream (Lkotlinx/serialization/json/Json;Lkotlinx/serialization/SerializationStrategy;Ljava/lang/Object;Ljava/io/OutputStream;)V
}

public final class kotlinx/serialization/json/LazyStreamingFormat : java/lang/Enum {
public static final field ARRAY_WRAPPED Lkotlinx/serialization/json/LazyStreamingFormat;
public static final field AUTO_DETECT Lkotlinx/serialization/json/LazyStreamingFormat;
public static final field WHITESPACE_SEPARATED Lkotlinx/serialization/json/LazyStreamingFormat;
public static fun valueOf (Ljava/lang/String;)Lkotlinx/serialization/json/LazyStreamingFormat;
public static fun values ()[Lkotlinx/serialization/json/LazyStreamingFormat;
}

Expand Up @@ -139,6 +139,8 @@ internal abstract class AbstractJsonLexer {

open fun ensureHaveChars() {}

fun isNotEof(): Boolean = definitelyNotEof(currentPosition) != -1

// Used as bound check in loops
abstract fun definitelyNotEof(position: Int): Int

Expand All @@ -158,7 +160,7 @@ internal abstract class AbstractJsonLexer {
fun expectEof() {
val nextToken = consumeNextToken()
if (nextToken != TC_EOF)
fail("Expected EOF after parsing an object, but had ${source[currentPosition - 1]} instead")
fail("Expected EOF after parsing, but had ${source[currentPosition - 1]} instead")
}

/*
Expand Down Expand Up @@ -202,7 +204,7 @@ internal abstract class AbstractJsonLexer {
fail(charToTokenClass(expected))
}

protected fun fail(expectedToken: Byte) {
internal fun fail(expectedToken: Byte): Nothing {
// We know that the token was consumed prior to this call
// Slow path, never called in normal code, can avoid optimizing it
val expected = when (expectedToken) {
Expand Down
Expand Up @@ -68,7 +68,7 @@ public fun <T> Json.decodeFromStream(
}

/**
* Deserializes the contents of given [stream] to to the value of type [T] using UTF-8 encoding and
* Deserializes the contents of given [stream] to the value of type [T] using UTF-8 encoding and
* deserializer retrieved from the reified type parameter.
*
* Note that this functions expects that exactly one object would be present in the stream
Expand All @@ -80,3 +80,93 @@ public fun <T> Json.decodeFromStream(
@ExperimentalSerializationApi
public inline fun <reified T> Json.decodeFromStream(stream: InputStream): T =
decodeFromStream(serializersModule.serializer(), stream)

/**
* Description of [decodeToSequence]'s JSON input shape.
*
* Sequence represents a stream of objects parsed one-by-one;
* [LazyStreamingFormat] defines a separator between these objects.
* Normally, these objects are not separated by meaningful characters ([WHITESPACE_SEPARATED])
* or the whole stream is a large array and therefore objects are separated with commas ([ARRAY_WRAPPED]).
*/
@ExperimentalSerializationApi
public enum class LazyStreamingFormat {
/**
* Declares that objects in the input stream are not separated by meaningful characters.
*
* Stream must start with a first object and there are some (maybe none) whitespace chars between objects.
* Whitespace character is either ' ', '\n', '\r' or '\t'.
*/
WHITESPACE_SEPARATED,

/**
* Declares that objects in the input stream are wrapped in the array. Elements of the array are still parsed lazily,
* so there shouldn't be problems if array total size is larger than application memory.
*
* Stream must start with json array start character and objects in it must be separated with commas and optional whitespaces.
* Stream must end with array end character, otherwise, [JsonDecodingException] would be thrown.
* Dangling chars after array end are not permitted.
*/
ARRAY_WRAPPED,

/**
* Declares that parser itself should select between [WHITESPACE_SEPARATED] and [ARRAY_WRAPPED] modes.
* Selection is performed by looking on the first meaningful character of the stream.
*
* In most of the cases, auto detection is sufficient to correctly parse input.
* However, in the input is _whitespace-separated stream of the arrays_, parser would select incorrect mode.
* For such cases, [LazyStreamingFormat] must be specified explicitly.
*
* Example of exceptional case:
* `[1, 2, 3] [4, 5, 6]\n[7, 8, 9]`
*/
AUTO_DETECT;
}

/**
* Deserializes the contents of given [stream] using UTF-8 encoding and [deserializer].
* Unlike [decodeFromStream], [stream] is allowed to have more than one element, separated as [format] declares.
*
* Elements must all be of type [T].
* Elements are parsed lazily when resulting [Sequence] is evaluated.
* Resulting sequence is tied to the stream and constrained to be evaluated only once.
*
* **Resource caution:** this method does not close [stream] when the parsing is finished neither provides method to close it manually.
* It is a caller responsibility to hold a reference to a stream and close it. Moreover, because stream is parsed lazily,
* closing it before returned sequence is evaluated fully would result in [IOException] from decoder.
*
* @throws [SerializationException] if the given JSON input cannot be deserialized to the value of type [T].
* @throws [IOException] If an I/O error occurs and stream can't be read from.
*/
@ExperimentalSerializationApi
public fun <T> Json.decodeToSequence(
stream: InputStream,
deserializer: DeserializationStrategy<T>,
format: LazyStreamingFormat = LazyStreamingFormat.AUTO_DETECT
): Sequence<T> {
val lexer = ReaderJsonLexer(stream)
val iter = JsonIterator(format, this, lexer, deserializer)
return Sequence { iter }.constrainOnce()
}

/**
* Deserializes the contents of given [stream] using UTF-8 encoding and deserializer retrieved from the reified type parameter.
* Unlike [decodeFromStream], [stream] is allowed to have more than one element, separated as [format] declares.
*
* Elements must all be of type [T].
* Elements are parsed lazily when resulting [Sequence] is evaluated.
* Resulting sequence is tied to the stream and constrained to be evaluated only once.
*
* **Resource caution:** this method does not close [stream] when the parsing is finished neither provides method to close it manually.
* It is a caller responsibility to hold a reference to a stream and close it. Moreover, because stream is parsed lazily,
* closing it before returned sequence is evaluated fully would result in [IOException] from decoder.
*
* @throws [SerializationException] if the given JSON input cannot be deserialized to the value of type [T].
* @throws [IOException] If an I/O error occurs and stream can't be read from.
*/
@ExperimentalSerializationApi
public inline fun <reified T> Json.decodeToSequence(
stream: InputStream,
format: LazyStreamingFormat = LazyStreamingFormat.AUTO_DETECT
): Sequence<T> = decodeToSequence(stream, serializersModule.serializer(), format)

@@ -0,0 +1,100 @@
/*
* Copyright 2017-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

@file:Suppress("FunctionName")
@file:OptIn(ExperimentalSerializationApi::class)

package kotlinx.serialization.json.internal

import kotlinx.serialization.DeserializationStrategy
import kotlinx.serialization.ExperimentalSerializationApi
import kotlinx.serialization.json.*

internal fun <T> JsonIterator(
mode: LazyStreamingFormat,
json: Json,
lexer: ReaderJsonLexer,
deserializer: DeserializationStrategy<T>
): Iterator<T> = when (lexer.determineFormat(mode)) {
LazyStreamingFormat.WHITESPACE_SEPARATED -> JsonIteratorWsSeparated(
json,
lexer,
deserializer
) // Can be many WS-separated independent arrays
LazyStreamingFormat.ARRAY_WRAPPED -> JsonIteratorArrayWrapped(
json,
lexer,
deserializer
)
LazyStreamingFormat.AUTO_DETECT -> error("AbstractJsonLexer.determineFormat must be called beforehand.")
}


private fun AbstractJsonLexer.determineFormat(suggested: LazyStreamingFormat): LazyStreamingFormat = when (suggested) {
LazyStreamingFormat.WHITESPACE_SEPARATED ->
LazyStreamingFormat.WHITESPACE_SEPARATED // do not call consumeStartArray here so we don't confuse parser with stream of lists
LazyStreamingFormat.ARRAY_WRAPPED ->
if (tryConsumeStartArray()) LazyStreamingFormat.ARRAY_WRAPPED
else fail(TC_BEGIN_LIST)
LazyStreamingFormat.AUTO_DETECT ->
if (tryConsumeStartArray()) LazyStreamingFormat.ARRAY_WRAPPED
else LazyStreamingFormat.WHITESPACE_SEPARATED
}

private fun AbstractJsonLexer.tryConsumeStartArray(): Boolean {
if (peekNextToken() == TC_BEGIN_LIST) {
consumeNextToken(TC_BEGIN_LIST)
return true
}
return false
}

private class JsonIteratorWsSeparated<T>(
private val json: Json,
private val lexer: ReaderJsonLexer,
private val deserializer: DeserializationStrategy<T>
) : Iterator<T> {
override fun next(): T =
StreamingJsonDecoder(json, WriteMode.OBJ, lexer, deserializer.descriptor)
.decodeSerializableValue(deserializer)

override fun hasNext(): Boolean = lexer.isNotEof()
}

private class JsonIteratorArrayWrapped<T>(
private val json: Json,
private val lexer: ReaderJsonLexer,
private val deserializer: DeserializationStrategy<T>
) : Iterator<T> {
private var first = true

override fun next(): T {
if (first) {
first = false
} else {
lexer.consumeNextToken(COMMA)
}
val input = StreamingJsonDecoder(json, WriteMode.OBJ, lexer, deserializer.descriptor)
return input.decodeSerializableValue(deserializer)
}

/**
* Note: if array separator (comma) is missing, hasNext() returns true, but next() throws an exception.
*/
override fun hasNext(): Boolean {
if (lexer.peekNextToken() == TC_END_LIST) {
lexer.consumeNextToken(TC_END_LIST)
if (lexer.isNotEof()) {
if (lexer.peekNextToken() == TC_BEGIN_LIST) lexer.fail("There is a start of the new array after the one parsed to sequence. " +
"${LazyStreamingFormat.ARRAY_WRAPPED.name} mode doesn't merge consecutive arrays.\n" +
"If you need to parse a stream of arrays, please use ${LazyStreamingFormat.WHITESPACE_SEPARATED.name} mode instead.")
lexer.expectEof()
}
return false
}
// We may allow unclosed bracket on .isLenient, but it's usually not what this flag do
if (!lexer.isNotEof()) lexer.fail(TC_END_LIST)
return true
}
}
Expand Up @@ -4,16 +4,17 @@

package kotlinx.serialization.features

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import kotlinx.serialization.SerializationException
import kotlinx.serialization.StringData
import kotlinx.serialization.*
import kotlinx.serialization.builtins.serializer
import kotlinx.serialization.json.*
import kotlinx.serialization.json.internal.JsonDecodingException
import kotlinx.serialization.test.assertFailsWithMessage
import org.junit.Ignore
import org.junit.Test
import java.io.*
import kotlin.test.assertEquals
import kotlin.test.*

class JsonStreamFlowTest {
val json = Json {}
Expand All @@ -24,7 +25,16 @@ class JsonStreamFlowTest {
}
}

val inputString = """{"data":"a"}{"data":"b"}{"data":"c"}"""
private suspend inline fun <reified T> Json.readFromStream(iss: InputStream): Flow<T> = flow {
val serial = serializer<T>()
val iter = iterateOverStream(iss, serial)
while (iter.hasNext()) {
emit(iter.next())
}
}.flowOn(Dispatchers.IO)

val inputStringWsSeparated = """{"data":"a"}{"data":"b"}{"data":"c"}"""
val inputStringWrapped = """[{"data":"a"},{"data":"b"},{"data":"c"}]"""
val inputList = listOf(StringData("a"), StringData("b"), StringData("c"))

@Test
Expand All @@ -36,16 +46,102 @@ class JsonStreamFlowTest {
f.writeToStream(os)
}

assertEquals(inputString, os.toString(Charsets.UTF_8.name()))
assertEquals(inputStringWsSeparated, os.toString(Charsets.UTF_8.name()))
}

@Test
fun testDecodeSeveralItems() {
val ins = ByteArrayInputStream(inputString.encodeToByteArray())
val ins = ByteArrayInputStream(inputStringWsSeparated.encodeToByteArray())
assertFailsWithMessage<SerializationException>("EOF") {
json.decodeFromStream<StringData>(ins)
}
}

private inline fun <reified T> Iterator<T>.assertNext(expected: T) {
assertTrue(hasNext())
assertEquals(expected, next())
}

private fun <T> Json.iterateOverStream(stream: InputStream, deserializer: DeserializationStrategy<T>): Iterator<T> =
decodeToSequence(stream, deserializer).iterator()

private fun withInputs(vararg inputs: String = arrayOf(inputStringWsSeparated, inputStringWrapped), block: (InputStream) -> Unit) {
for (input in inputs) {
val res = runCatching { block(input.asInputStream()) }
if (res.isFailure) throw AssertionError("Failed test with input $input", res.exceptionOrNull())
}
}

private fun String.asInputStream() = ByteArrayInputStream(this.encodeToByteArray())

@Test
fun testIterateSeveralItems() = withInputs { ins ->
val iter = json.iterateOverStream(ins, StringData.serializer())
iter.assertNext(StringData("a"))
iter.assertNext(StringData("b"))
iter.assertNext(StringData("c"))
assertFalse(iter.hasNext())
assertFailsWithMessage<SerializationException>("EOF") {
iter.next()
}
}

@Test
fun testDecodeToSequence() = withInputs { ins ->
val sequence = json.decodeToSequence(ins, StringData.serializer())
assertEquals(inputList, sequence.toList(), "For input $inputStringWsSeparated")
assertFailsWith<IllegalStateException> { sequence.toList() } // assert constrained once
}

@Test
fun testDecodeAsFlow() = withInputs { ins ->
val list = runBlocking {
buildList { json.readFromStream<StringData>(ins).toCollection(this) }
}
assertEquals(inputList, list)
}

@Test
fun testItemsSeparatedByWs() {
val input = "{\"data\":\"a\"} {\"data\":\"b\"}\n\t{\"data\":\"c\"}"
val ins = ByteArrayInputStream(input.encodeToByteArray())
assertEquals(inputList, json.decodeToSequence(ins, StringData.serializer()).toList())
}

@Test
fun testMalformedArray() {
val input1 = """[1, 2, 3"""
val input2 = """[1, 2, 3]qwert"""
val input3 = """[1,2 3]"""
withInputs(input1, input2, input3) {
assertFailsWith<JsonDecodingException> {
json.decodeToSequence(it, Int.serializer()).toList()
}
}
}

@Test
fun testMultilineArrays() {
val input = "[1,2,3]\n[4,5,6]\n[7,8,9]"
assertFailsWith<JsonDecodingException> {
json.decodeToSequence<List<Int>>(input.asInputStream(), LazyStreamingFormat.AUTO_DETECT).toList()
}
assertFailsWith<JsonDecodingException> {
json.decodeToSequence<Int>(input.asInputStream(), LazyStreamingFormat.AUTO_DETECT).toList()
}
assertFailsWith<JsonDecodingException> { // we do not merge lists
json.decodeToSequence<Int>(input.asInputStream(), LazyStreamingFormat.ARRAY_WRAPPED).toList()
}
val parsed = json.decodeToSequence<List<Int>>(input.asInputStream(), LazyStreamingFormat.WHITESPACE_SEPARATED).toList()
val expected = listOf(listOf(1,2,3), listOf(4,5,6), listOf(7,8,9))
assertEquals(expected, parsed)
}

@Test
fun testStrictArrayCheck() {
assertFailsWith<JsonDecodingException> {
json.decodeToSequence<StringData>(inputStringWsSeparated.asInputStream(), LazyStreamingFormat.ARRAY_WRAPPED)
}
}

}

0 comments on commit 346e5ad

Please sign in to comment.