From 346e5ade64b8083e53a603f37817bdb7ba3ae389 Mon Sep 17 00:00:00 2001 From: Leonid Startsev Date: Wed, 22 Sep 2021 15:15:38 +0300 Subject: [PATCH] Prototype deocdeToSequence to read multiple objects from stream lazily Hide JsonIterator and provide LazyStreamingMode --- .../json/api/kotlinx-serialization-json.api | 10 ++ .../json/internal/lexer/AbstractJsonLexer.kt | 6 +- .../kotlinx/serialization/json/JvmStreams.kt | 92 ++++++++++++++- .../json/internal/JsonIterator.kt | 100 ++++++++++++++++ .../features/JsonStreamFlowTest.kt | 110 ++++++++++++++++-- 5 files changed, 308 insertions(+), 10 deletions(-) create mode 100644 formats/json/jvmMain/src/kotlinx/serialization/json/internal/JsonIterator.kt diff --git a/formats/json/api/kotlinx-serialization-json.api b/formats/json/api/kotlinx-serialization-json.api index 6d9aa1be8..0aac85916 100644 --- a/formats/json/api/kotlinx-serialization-json.api +++ b/formats/json/api/kotlinx-serialization-json.api @@ -342,6 +342,16 @@ public abstract class kotlinx/serialization/json/JsonTransformingSerializer : ko public final class kotlinx/serialization/json/JvmStreamsKt { public static final fun decodeFromStream (Lkotlinx/serialization/json/Json;Lkotlinx/serialization/DeserializationStrategy;Ljava/io/InputStream;)Ljava/lang/Object; + public static final fun decodeToSequence (Lkotlinx/serialization/json/Json;Ljava/io/InputStream;Lkotlinx/serialization/DeserializationStrategy;Lkotlinx/serialization/json/LazyStreamingFormat;)Lkotlin/sequences/Sequence; + public static synthetic fun decodeToSequence$default (Lkotlinx/serialization/json/Json;Ljava/io/InputStream;Lkotlinx/serialization/DeserializationStrategy;Lkotlinx/serialization/json/LazyStreamingFormat;ILjava/lang/Object;)Lkotlin/sequences/Sequence; public static final fun encodeToStream (Lkotlinx/serialization/json/Json;Lkotlinx/serialization/SerializationStrategy;Ljava/lang/Object;Ljava/io/OutputStream;)V } +public final class kotlinx/serialization/json/LazyStreamingFormat : java/lang/Enum { + public static final field ARRAY_WRAPPED Lkotlinx/serialization/json/LazyStreamingFormat; + public static final field AUTO_DETECT Lkotlinx/serialization/json/LazyStreamingFormat; + public static final field WHITESPACE_SEPARATED Lkotlinx/serialization/json/LazyStreamingFormat; + public static fun valueOf (Ljava/lang/String;)Lkotlinx/serialization/json/LazyStreamingFormat; + public static fun values ()[Lkotlinx/serialization/json/LazyStreamingFormat; +} + diff --git a/formats/json/commonMain/src/kotlinx/serialization/json/internal/lexer/AbstractJsonLexer.kt b/formats/json/commonMain/src/kotlinx/serialization/json/internal/lexer/AbstractJsonLexer.kt index bc72ec3cc..e5ef6331a 100644 --- a/formats/json/commonMain/src/kotlinx/serialization/json/internal/lexer/AbstractJsonLexer.kt +++ b/formats/json/commonMain/src/kotlinx/serialization/json/internal/lexer/AbstractJsonLexer.kt @@ -139,6 +139,8 @@ internal abstract class AbstractJsonLexer { open fun ensureHaveChars() {} + fun isNotEof(): Boolean = definitelyNotEof(currentPosition) != -1 + // Used as bound check in loops abstract fun definitelyNotEof(position: Int): Int @@ -158,7 +160,7 @@ internal abstract class AbstractJsonLexer { fun expectEof() { val nextToken = consumeNextToken() if (nextToken != TC_EOF) - fail("Expected EOF after parsing an object, but had ${source[currentPosition - 1]} instead") + fail("Expected EOF after parsing, but had ${source[currentPosition - 1]} instead") } /* @@ -202,7 +204,7 @@ internal abstract class AbstractJsonLexer { fail(charToTokenClass(expected)) } - protected fun fail(expectedToken: Byte) { + internal fun fail(expectedToken: Byte): Nothing { // We know that the token was consumed prior to this call // Slow path, never called in normal code, can avoid optimizing it val expected = when (expectedToken) { diff --git a/formats/json/jvmMain/src/kotlinx/serialization/json/JvmStreams.kt b/formats/json/jvmMain/src/kotlinx/serialization/json/JvmStreams.kt index d25f95735..3b7f625b5 100644 --- a/formats/json/jvmMain/src/kotlinx/serialization/json/JvmStreams.kt +++ b/formats/json/jvmMain/src/kotlinx/serialization/json/JvmStreams.kt @@ -68,7 +68,7 @@ public fun Json.decodeFromStream( } /** - * Deserializes the contents of given [stream] to to the value of type [T] using UTF-8 encoding and + * Deserializes the contents of given [stream] to the value of type [T] using UTF-8 encoding and * deserializer retrieved from the reified type parameter. * * Note that this functions expects that exactly one object would be present in the stream @@ -80,3 +80,93 @@ public fun Json.decodeFromStream( @ExperimentalSerializationApi public inline fun Json.decodeFromStream(stream: InputStream): T = decodeFromStream(serializersModule.serializer(), stream) + +/** + * Description of [decodeToSequence]'s JSON input shape. + * + * Sequence represents a stream of objects parsed one-by-one; + * [LazyStreamingFormat] defines a separator between these objects. + * Normally, these objects are not separated by meaningful characters ([WHITESPACE_SEPARATED]) + * or the whole stream is a large array and therefore objects are separated with commas ([ARRAY_WRAPPED]). + */ +@ExperimentalSerializationApi +public enum class LazyStreamingFormat { + /** + * Declares that objects in the input stream are not separated by meaningful characters. + * + * Stream must start with a first object and there are some (maybe none) whitespace chars between objects. + * Whitespace character is either ' ', '\n', '\r' or '\t'. + */ + WHITESPACE_SEPARATED, + + /** + * Declares that objects in the input stream are wrapped in the array. Elements of the array are still parsed lazily, + * so there shouldn't be problems if array total size is larger than application memory. + * + * Stream must start with json array start character and objects in it must be separated with commas and optional whitespaces. + * Stream must end with array end character, otherwise, [JsonDecodingException] would be thrown. + * Dangling chars after array end are not permitted. + */ + ARRAY_WRAPPED, + + /** + * Declares that parser itself should select between [WHITESPACE_SEPARATED] and [ARRAY_WRAPPED] modes. + * Selection is performed by looking on the first meaningful character of the stream. + * + * In most of the cases, auto detection is sufficient to correctly parse input. + * However, in the input is _whitespace-separated stream of the arrays_, parser would select incorrect mode. + * For such cases, [LazyStreamingFormat] must be specified explicitly. + * + * Example of exceptional case: + * `[1, 2, 3] [4, 5, 6]\n[7, 8, 9]` + */ + AUTO_DETECT; +} + +/** + * Deserializes the contents of given [stream] using UTF-8 encoding and [deserializer]. + * Unlike [decodeFromStream], [stream] is allowed to have more than one element, separated as [format] declares. + * + * Elements must all be of type [T]. + * Elements are parsed lazily when resulting [Sequence] is evaluated. + * Resulting sequence is tied to the stream and constrained to be evaluated only once. + * + * **Resource caution:** this method does not close [stream] when the parsing is finished neither provides method to close it manually. + * It is a caller responsibility to hold a reference to a stream and close it. Moreover, because stream is parsed lazily, + * closing it before returned sequence is evaluated fully would result in [IOException] from decoder. + * + * @throws [SerializationException] if the given JSON input cannot be deserialized to the value of type [T]. + * @throws [IOException] If an I/O error occurs and stream can't be read from. + */ +@ExperimentalSerializationApi +public fun Json.decodeToSequence( + stream: InputStream, + deserializer: DeserializationStrategy, + format: LazyStreamingFormat = LazyStreamingFormat.AUTO_DETECT +): Sequence { + val lexer = ReaderJsonLexer(stream) + val iter = JsonIterator(format, this, lexer, deserializer) + return Sequence { iter }.constrainOnce() +} + +/** + * Deserializes the contents of given [stream] using UTF-8 encoding and deserializer retrieved from the reified type parameter. + * Unlike [decodeFromStream], [stream] is allowed to have more than one element, separated as [format] declares. + * + * Elements must all be of type [T]. + * Elements are parsed lazily when resulting [Sequence] is evaluated. + * Resulting sequence is tied to the stream and constrained to be evaluated only once. + * + * **Resource caution:** this method does not close [stream] when the parsing is finished neither provides method to close it manually. + * It is a caller responsibility to hold a reference to a stream and close it. Moreover, because stream is parsed lazily, + * closing it before returned sequence is evaluated fully would result in [IOException] from decoder. + * + * @throws [SerializationException] if the given JSON input cannot be deserialized to the value of type [T]. + * @throws [IOException] If an I/O error occurs and stream can't be read from. + */ +@ExperimentalSerializationApi +public inline fun Json.decodeToSequence( + stream: InputStream, + format: LazyStreamingFormat = LazyStreamingFormat.AUTO_DETECT +): Sequence = decodeToSequence(stream, serializersModule.serializer(), format) + diff --git a/formats/json/jvmMain/src/kotlinx/serialization/json/internal/JsonIterator.kt b/formats/json/jvmMain/src/kotlinx/serialization/json/internal/JsonIterator.kt new file mode 100644 index 000000000..57d23a843 --- /dev/null +++ b/formats/json/jvmMain/src/kotlinx/serialization/json/internal/JsonIterator.kt @@ -0,0 +1,100 @@ +/* + * Copyright 2017-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +@file:Suppress("FunctionName") +@file:OptIn(ExperimentalSerializationApi::class) + +package kotlinx.serialization.json.internal + +import kotlinx.serialization.DeserializationStrategy +import kotlinx.serialization.ExperimentalSerializationApi +import kotlinx.serialization.json.* + +internal fun JsonIterator( + mode: LazyStreamingFormat, + json: Json, + lexer: ReaderJsonLexer, + deserializer: DeserializationStrategy +): Iterator = when (lexer.determineFormat(mode)) { + LazyStreamingFormat.WHITESPACE_SEPARATED -> JsonIteratorWsSeparated( + json, + lexer, + deserializer + ) // Can be many WS-separated independent arrays + LazyStreamingFormat.ARRAY_WRAPPED -> JsonIteratorArrayWrapped( + json, + lexer, + deserializer + ) + LazyStreamingFormat.AUTO_DETECT -> error("AbstractJsonLexer.determineFormat must be called beforehand.") +} + + +private fun AbstractJsonLexer.determineFormat(suggested: LazyStreamingFormat): LazyStreamingFormat = when (suggested) { + LazyStreamingFormat.WHITESPACE_SEPARATED -> + LazyStreamingFormat.WHITESPACE_SEPARATED // do not call consumeStartArray here so we don't confuse parser with stream of lists + LazyStreamingFormat.ARRAY_WRAPPED -> + if (tryConsumeStartArray()) LazyStreamingFormat.ARRAY_WRAPPED + else fail(TC_BEGIN_LIST) + LazyStreamingFormat.AUTO_DETECT -> + if (tryConsumeStartArray()) LazyStreamingFormat.ARRAY_WRAPPED + else LazyStreamingFormat.WHITESPACE_SEPARATED +} + +private fun AbstractJsonLexer.tryConsumeStartArray(): Boolean { + if (peekNextToken() == TC_BEGIN_LIST) { + consumeNextToken(TC_BEGIN_LIST) + return true + } + return false +} + +private class JsonIteratorWsSeparated( + private val json: Json, + private val lexer: ReaderJsonLexer, + private val deserializer: DeserializationStrategy +) : Iterator { + override fun next(): T = + StreamingJsonDecoder(json, WriteMode.OBJ, lexer, deserializer.descriptor) + .decodeSerializableValue(deserializer) + + override fun hasNext(): Boolean = lexer.isNotEof() +} + +private class JsonIteratorArrayWrapped( + private val json: Json, + private val lexer: ReaderJsonLexer, + private val deserializer: DeserializationStrategy +) : Iterator { + private var first = true + + override fun next(): T { + if (first) { + first = false + } else { + lexer.consumeNextToken(COMMA) + } + val input = StreamingJsonDecoder(json, WriteMode.OBJ, lexer, deserializer.descriptor) + return input.decodeSerializableValue(deserializer) + } + + /** + * Note: if array separator (comma) is missing, hasNext() returns true, but next() throws an exception. + */ + override fun hasNext(): Boolean { + if (lexer.peekNextToken() == TC_END_LIST) { + lexer.consumeNextToken(TC_END_LIST) + if (lexer.isNotEof()) { + if (lexer.peekNextToken() == TC_BEGIN_LIST) lexer.fail("There is a start of the new array after the one parsed to sequence. " + + "${LazyStreamingFormat.ARRAY_WRAPPED.name} mode doesn't merge consecutive arrays.\n" + + "If you need to parse a stream of arrays, please use ${LazyStreamingFormat.WHITESPACE_SEPARATED.name} mode instead.") + lexer.expectEof() + } + return false + } + // We may allow unclosed bracket on .isLenient, but it's usually not what this flag do + if (!lexer.isNotEof()) lexer.fail(TC_END_LIST) + return true + } +} diff --git a/formats/json/jvmTest/src/kotlinx/serialization/features/JsonStreamFlowTest.kt b/formats/json/jvmTest/src/kotlinx/serialization/features/JsonStreamFlowTest.kt index a8ce9b79c..fd94aec31 100644 --- a/formats/json/jvmTest/src/kotlinx/serialization/features/JsonStreamFlowTest.kt +++ b/formats/json/jvmTest/src/kotlinx/serialization/features/JsonStreamFlowTest.kt @@ -4,16 +4,17 @@ package kotlinx.serialization.features +import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.flow.* import kotlinx.coroutines.runBlocking -import kotlinx.serialization.SerializationException -import kotlinx.serialization.StringData +import kotlinx.serialization.* +import kotlinx.serialization.builtins.serializer import kotlinx.serialization.json.* +import kotlinx.serialization.json.internal.JsonDecodingException import kotlinx.serialization.test.assertFailsWithMessage -import org.junit.Ignore import org.junit.Test import java.io.* -import kotlin.test.assertEquals +import kotlin.test.* class JsonStreamFlowTest { val json = Json {} @@ -24,7 +25,16 @@ class JsonStreamFlowTest { } } - val inputString = """{"data":"a"}{"data":"b"}{"data":"c"}""" + private suspend inline fun Json.readFromStream(iss: InputStream): Flow = flow { + val serial = serializer() + val iter = iterateOverStream(iss, serial) + while (iter.hasNext()) { + emit(iter.next()) + } + }.flowOn(Dispatchers.IO) + + val inputStringWsSeparated = """{"data":"a"}{"data":"b"}{"data":"c"}""" + val inputStringWrapped = """[{"data":"a"},{"data":"b"},{"data":"c"}]""" val inputList = listOf(StringData("a"), StringData("b"), StringData("c")) @Test @@ -36,16 +46,102 @@ class JsonStreamFlowTest { f.writeToStream(os) } - assertEquals(inputString, os.toString(Charsets.UTF_8.name())) + assertEquals(inputStringWsSeparated, os.toString(Charsets.UTF_8.name())) } @Test fun testDecodeSeveralItems() { - val ins = ByteArrayInputStream(inputString.encodeToByteArray()) + val ins = ByteArrayInputStream(inputStringWsSeparated.encodeToByteArray()) assertFailsWithMessage("EOF") { json.decodeFromStream(ins) } } + private inline fun Iterator.assertNext(expected: T) { + assertTrue(hasNext()) + assertEquals(expected, next()) + } + + private fun Json.iterateOverStream(stream: InputStream, deserializer: DeserializationStrategy): Iterator = + decodeToSequence(stream, deserializer).iterator() + + private fun withInputs(vararg inputs: String = arrayOf(inputStringWsSeparated, inputStringWrapped), block: (InputStream) -> Unit) { + for (input in inputs) { + val res = runCatching { block(input.asInputStream()) } + if (res.isFailure) throw AssertionError("Failed test with input $input", res.exceptionOrNull()) + } + } + + private fun String.asInputStream() = ByteArrayInputStream(this.encodeToByteArray()) + + @Test + fun testIterateSeveralItems() = withInputs { ins -> + val iter = json.iterateOverStream(ins, StringData.serializer()) + iter.assertNext(StringData("a")) + iter.assertNext(StringData("b")) + iter.assertNext(StringData("c")) + assertFalse(iter.hasNext()) + assertFailsWithMessage("EOF") { + iter.next() + } + } + + @Test + fun testDecodeToSequence() = withInputs { ins -> + val sequence = json.decodeToSequence(ins, StringData.serializer()) + assertEquals(inputList, sequence.toList(), "For input $inputStringWsSeparated") + assertFailsWith { sequence.toList() } // assert constrained once + } + + @Test + fun testDecodeAsFlow() = withInputs { ins -> + val list = runBlocking { + buildList { json.readFromStream(ins).toCollection(this) } + } + assertEquals(inputList, list) + } + + @Test + fun testItemsSeparatedByWs() { + val input = "{\"data\":\"a\"} {\"data\":\"b\"}\n\t{\"data\":\"c\"}" + val ins = ByteArrayInputStream(input.encodeToByteArray()) + assertEquals(inputList, json.decodeToSequence(ins, StringData.serializer()).toList()) + } + + @Test + fun testMalformedArray() { + val input1 = """[1, 2, 3""" + val input2 = """[1, 2, 3]qwert""" + val input3 = """[1,2 3]""" + withInputs(input1, input2, input3) { + assertFailsWith { + json.decodeToSequence(it, Int.serializer()).toList() + } + } + } + + @Test + fun testMultilineArrays() { + val input = "[1,2,3]\n[4,5,6]\n[7,8,9]" + assertFailsWith { + json.decodeToSequence>(input.asInputStream(), LazyStreamingFormat.AUTO_DETECT).toList() + } + assertFailsWith { + json.decodeToSequence(input.asInputStream(), LazyStreamingFormat.AUTO_DETECT).toList() + } + assertFailsWith { // we do not merge lists + json.decodeToSequence(input.asInputStream(), LazyStreamingFormat.ARRAY_WRAPPED).toList() + } + val parsed = json.decodeToSequence>(input.asInputStream(), LazyStreamingFormat.WHITESPACE_SEPARATED).toList() + val expected = listOf(listOf(1,2,3), listOf(4,5,6), listOf(7,8,9)) + assertEquals(expected, parsed) + } + + @Test + fun testStrictArrayCheck() { + assertFailsWith { + json.decodeToSequence(inputStringWsSeparated.asInputStream(), LazyStreamingFormat.ARRAY_WRAPPED) + } + } }