diff --git a/formats/json/api/kotlinx-serialization-json.api b/formats/json/api/kotlinx-serialization-json.api index 6d9aa1be8..eb91626eb 100644 --- a/formats/json/api/kotlinx-serialization-json.api +++ b/formats/json/api/kotlinx-serialization-json.api @@ -1,3 +1,11 @@ +public final class kotlinx/serialization/json/DecodeSequenceMode : java/lang/Enum { + public static final field ARRAY_WRAPPED Lkotlinx/serialization/json/DecodeSequenceMode; + public static final field AUTO_DETECT Lkotlinx/serialization/json/DecodeSequenceMode; + public static final field WHITESPACE_SEPARATED Lkotlinx/serialization/json/DecodeSequenceMode; + public static fun valueOf (Ljava/lang/String;)Lkotlinx/serialization/json/DecodeSequenceMode; + public static fun values ()[Lkotlinx/serialization/json/DecodeSequenceMode; +} + public abstract class kotlinx/serialization/json/Json : kotlinx/serialization/StringFormat { public static final field Default Lkotlinx/serialization/json/Json$Default; public synthetic fun (Lkotlinx/serialization/json/JsonConfiguration;Lkotlinx/serialization/modules/SerializersModule;Lkotlin/jvm/internal/DefaultConstructorMarker;)V @@ -342,6 +350,8 @@ public abstract class kotlinx/serialization/json/JsonTransformingSerializer : ko public final class kotlinx/serialization/json/JvmStreamsKt { public static final fun decodeFromStream (Lkotlinx/serialization/json/Json;Lkotlinx/serialization/DeserializationStrategy;Ljava/io/InputStream;)Ljava/lang/Object; + public static final fun decodeToSequence (Lkotlinx/serialization/json/Json;Ljava/io/InputStream;Lkotlinx/serialization/DeserializationStrategy;Lkotlinx/serialization/json/DecodeSequenceMode;)Lkotlin/sequences/Sequence; + public static synthetic fun decodeToSequence$default (Lkotlinx/serialization/json/Json;Ljava/io/InputStream;Lkotlinx/serialization/DeserializationStrategy;Lkotlinx/serialization/json/DecodeSequenceMode;ILjava/lang/Object;)Lkotlin/sequences/Sequence; public static final fun encodeToStream (Lkotlinx/serialization/json/Json;Lkotlinx/serialization/SerializationStrategy;Ljava/lang/Object;Ljava/io/OutputStream;)V } diff --git a/formats/json/commonMain/src/kotlinx/serialization/json/internal/lexer/AbstractJsonLexer.kt b/formats/json/commonMain/src/kotlinx/serialization/json/internal/lexer/AbstractJsonLexer.kt index 5ff983d01..82881ef71 100644 --- a/formats/json/commonMain/src/kotlinx/serialization/json/internal/lexer/AbstractJsonLexer.kt +++ b/formats/json/commonMain/src/kotlinx/serialization/json/internal/lexer/AbstractJsonLexer.kt @@ -139,6 +139,8 @@ internal abstract class AbstractJsonLexer { open fun ensureHaveChars() {} + fun isNotEof(): Boolean = peekNextToken() != TC_EOF + // Used as bound check in loops abstract fun prefetchOrEof(position: Int): Int @@ -158,7 +160,7 @@ internal abstract class AbstractJsonLexer { fun expectEof() { val nextToken = consumeNextToken() if (nextToken != TC_EOF) - fail("Expected EOF after parsing an object, but had ${source[currentPosition - 1]} instead") + fail("Expected EOF after parsing, but had ${source[currentPosition - 1]} instead") } /* @@ -202,7 +204,7 @@ internal abstract class AbstractJsonLexer { fail(charToTokenClass(expected)) } - protected fun fail(expectedToken: Byte) { + internal fun fail(expectedToken: Byte): Nothing { // We know that the token was consumed prior to this call // Slow path, never called in normal code, can avoid optimizing it val expected = when (expectedToken) { diff --git a/formats/json/jvmMain/src/kotlinx/serialization/json/JvmStreams.kt b/formats/json/jvmMain/src/kotlinx/serialization/json/JvmStreams.kt index d25f95735..3b83299c1 100644 --- a/formats/json/jvmMain/src/kotlinx/serialization/json/JvmStreams.kt +++ b/formats/json/jvmMain/src/kotlinx/serialization/json/JvmStreams.kt @@ -68,7 +68,7 @@ public fun Json.decodeFromStream( } /** - * Deserializes the contents of given [stream] to to the value of type [T] using UTF-8 encoding and + * Deserializes the contents of given [stream] to the value of type [T] using UTF-8 encoding and * deserializer retrieved from the reified type parameter. * * Note that this functions expects that exactly one object would be present in the stream @@ -80,3 +80,105 @@ public fun Json.decodeFromStream( @ExperimentalSerializationApi public inline fun Json.decodeFromStream(stream: InputStream): T = decodeFromStream(serializersModule.serializer(), stream) + +/** + * Description of [decodeToSequence]'s JSON input shape. + * + * The sequence represents a stream of objects parsed one by one; + * [DecodeSequenceMode] defines a separator between these objects. + * Typically, these objects are not separated by meaningful characters ([WHITESPACE_SEPARATED]), + * or the whole stream is a large array of objects separated with commas ([ARRAY_WRAPPED]). + */ +@ExperimentalSerializationApi +public enum class DecodeSequenceMode { + /** + * Declares that objects in the input stream are separated by whitespace characters. + * + * The stream is read as multiple JSON objects separated by any number of whitespace characters between objects. Starting and trailing whitespace characters are also permitted. + * Each individual object is parsed lazily, when it is requested from the resulting sequence. + * + * Whitespace character is either ' ', '\n', '\r' or '\t'. + * + * Example of `WHITESPACE_SEPARATED` stream content: + * ``` + * """{"key": "value"}{"key": "value2"} {"key2": "value2"}""" + * ``` + */ + WHITESPACE_SEPARATED, + + /** + * Declares that objects in the input stream are wrapped in the JSON array. + * Each individual object in the array is parsed lazily when it is requested from the resulting sequence. + * + * The stream is read as multiple JSON objects wrapped into a JSON array. + * The stream must start with an array start character `[` and end with an array end character `]`, + * otherwise, [JsonDecodingException] is thrown. + * + * Example of `ARRAY_WRAPPED` stream content: + * ``` + * """[{"key": "value"}, {"key": "value2"},{"key2": "value2"}]""" + * ``` + */ + ARRAY_WRAPPED, + + /** + * Declares that parser itself should select between [WHITESPACE_SEPARATED] and [ARRAY_WRAPPED] modes. + * The selection is performed by looking on the first meaningful character of the stream. + * + * In most cases, auto-detection is sufficient to correctly parse an input. + * If the input is _whitespace-separated stream of the arrays_, parser could select an incorrect mode, + * for that [DecodeSequenceMode] must be specified explicitly. + * + * Example of an exceptional case: + * `[1, 2, 3] [4, 5, 6]\n[7, 8, 9]` + */ + AUTO_DETECT; +} + +/** + * Transforms the given [stream] into lazily deserialized sequence of elements of type [T] using UTF-8 encoding and [deserializer]. + * Unlike [decodeFromStream], [stream] is allowed to have more than one element, separated as [format] declares. + * + * Elements must all be of type [T]. + * Elements are parsed lazily when resulting [Sequence] is evaluated. + * Resulting sequence is tied to the stream and can be evaluated only once. + * + * **Resource caution:** this method neither closes the [stream] when the parsing is finished nor provides a method to close it manually. + * It is a caller responsibility to hold a reference to a stream and close it. Moreover, because stream is parsed lazily, + * closing it before returned sequence is evaluated completely will result in [IOException] from decoder. + * + * @throws [SerializationException] if the given JSON input cannot be deserialized to the value of type [T]. + * @throws [IOException] If an I/O error occurs and stream can't be read from. + */ +@ExperimentalSerializationApi +public fun Json.decodeToSequence( + stream: InputStream, + deserializer: DeserializationStrategy, + format: DecodeSequenceMode = DecodeSequenceMode.AUTO_DETECT +): Sequence { + val lexer = ReaderJsonLexer(stream) + val iter = JsonIterator(format, this, lexer, deserializer) + return Sequence { iter }.constrainOnce() +} + +/** + * Transforms the given [stream] into lazily deserialized sequence of elements of type [T] using UTF-8 encoding and deserializer retrieved from the reified type parameter. + * Unlike [decodeFromStream], [stream] is allowed to have more than one element, separated as [format] declares. + * + * Elements must all be of type [T]. + * Elements are parsed lazily when resulting [Sequence] is evaluated. + * Resulting sequence is tied to the stream and constrained to be evaluated only once. + * + * **Resource caution:** this method does not close [stream] when the parsing is finished neither provides method to close it manually. + * It is a caller responsibility to hold a reference to a stream and close it. Moreover, because stream is parsed lazily, + * closing it before returned sequence is evaluated fully would result in [IOException] from decoder. + * + * @throws [SerializationException] if the given JSON input cannot be deserialized to the value of type [T]. + * @throws [IOException] If an I/O error occurs and stream can't be read from. + */ +@ExperimentalSerializationApi +public inline fun Json.decodeToSequence( + stream: InputStream, + format: DecodeSequenceMode = DecodeSequenceMode.AUTO_DETECT +): Sequence = decodeToSequence(stream, serializersModule.serializer(), format) + diff --git a/formats/json/jvmMain/src/kotlinx/serialization/json/internal/JsonIterator.kt b/formats/json/jvmMain/src/kotlinx/serialization/json/internal/JsonIterator.kt new file mode 100644 index 000000000..790030825 --- /dev/null +++ b/formats/json/jvmMain/src/kotlinx/serialization/json/internal/JsonIterator.kt @@ -0,0 +1,99 @@ +/* + * Copyright 2017-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +@file:Suppress("FunctionName") +@file:OptIn(ExperimentalSerializationApi::class) + +package kotlinx.serialization.json.internal + +import kotlinx.serialization.DeserializationStrategy +import kotlinx.serialization.ExperimentalSerializationApi +import kotlinx.serialization.json.* + +internal fun JsonIterator( + mode: DecodeSequenceMode, + json: Json, + lexer: ReaderJsonLexer, + deserializer: DeserializationStrategy +): Iterator = when (lexer.determineFormat(mode)) { + DecodeSequenceMode.WHITESPACE_SEPARATED -> JsonIteratorWsSeparated( + json, + lexer, + deserializer + ) // Can be many WS-separated independent arrays + DecodeSequenceMode.ARRAY_WRAPPED -> JsonIteratorArrayWrapped( + json, + lexer, + deserializer + ) + DecodeSequenceMode.AUTO_DETECT -> error("AbstractJsonLexer.determineFormat must be called beforehand.") +} + + +private fun AbstractJsonLexer.determineFormat(suggested: DecodeSequenceMode): DecodeSequenceMode = when (suggested) { + DecodeSequenceMode.WHITESPACE_SEPARATED -> + DecodeSequenceMode.WHITESPACE_SEPARATED // do not call consumeStartArray here so we don't confuse parser with stream of lists + DecodeSequenceMode.ARRAY_WRAPPED -> + if (tryConsumeStartArray()) DecodeSequenceMode.ARRAY_WRAPPED + else fail(TC_BEGIN_LIST) + DecodeSequenceMode.AUTO_DETECT -> + if (tryConsumeStartArray()) DecodeSequenceMode.ARRAY_WRAPPED + else DecodeSequenceMode.WHITESPACE_SEPARATED +} + +private fun AbstractJsonLexer.tryConsumeStartArray(): Boolean { + if (peekNextToken() == TC_BEGIN_LIST) { + consumeNextToken(TC_BEGIN_LIST) + return true + } + return false +} + +private class JsonIteratorWsSeparated( + private val json: Json, + private val lexer: ReaderJsonLexer, + private val deserializer: DeserializationStrategy +) : Iterator { + override fun next(): T = + StreamingJsonDecoder(json, WriteMode.OBJ, lexer, deserializer.descriptor) + .decodeSerializableValue(deserializer) + + override fun hasNext(): Boolean = lexer.isNotEof() +} + +private class JsonIteratorArrayWrapped( + private val json: Json, + private val lexer: ReaderJsonLexer, + private val deserializer: DeserializationStrategy +) : Iterator { + private var first = true + + override fun next(): T { + if (first) { + first = false + } else { + lexer.consumeNextToken(COMMA) + } + val input = StreamingJsonDecoder(json, WriteMode.OBJ, lexer, deserializer.descriptor) + return input.decodeSerializableValue(deserializer) + } + + /** + * Note: if array separator (comma) is missing, hasNext() returns true, but next() throws an exception. + */ + override fun hasNext(): Boolean { + if (lexer.peekNextToken() == TC_END_LIST) { + lexer.consumeNextToken(TC_END_LIST) + if (lexer.isNotEof()) { + if (lexer.peekNextToken() == TC_BEGIN_LIST) lexer.fail("There is a start of the new array after the one parsed to sequence. " + + "${DecodeSequenceMode.ARRAY_WRAPPED.name} mode doesn't merge consecutive arrays.\n" + + "If you need to parse a stream of arrays, please use ${DecodeSequenceMode.WHITESPACE_SEPARATED.name} mode instead.") + lexer.expectEof() + } + return false + } + if (!lexer.isNotEof()) lexer.fail(TC_END_LIST) + return true + } +} diff --git a/formats/json/jvmTest/src/kotlinx/serialization/features/JsonStreamFlowTest.kt b/formats/json/jvmTest/src/kotlinx/serialization/features/JsonStreamFlowTest.kt index a8ce9b79c..455a00381 100644 --- a/formats/json/jvmTest/src/kotlinx/serialization/features/JsonStreamFlowTest.kt +++ b/formats/json/jvmTest/src/kotlinx/serialization/features/JsonStreamFlowTest.kt @@ -4,28 +4,38 @@ package kotlinx.serialization.features +import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.flow.* import kotlinx.coroutines.runBlocking -import kotlinx.serialization.SerializationException -import kotlinx.serialization.StringData +import kotlinx.serialization.* +import kotlinx.serialization.builtins.serializer import kotlinx.serialization.json.* +import kotlinx.serialization.json.internal.JsonDecodingException import kotlinx.serialization.test.assertFailsWithMessage -import org.junit.Ignore import org.junit.Test import java.io.* -import kotlin.test.assertEquals +import kotlin.test.* class JsonStreamFlowTest { - val json = Json {} + val json = Json - suspend inline fun Flow.writeToStream(os: OutputStream) { + private suspend inline fun Flow.writeToStream(os: OutputStream) { collect { json.encodeToStream(it, os) } } - val inputString = """{"data":"a"}{"data":"b"}{"data":"c"}""" - val inputList = listOf(StringData("a"), StringData("b"), StringData("c")) + private suspend inline fun Json.readFromStream(iss: InputStream): Flow = flow { + val serial = serializer() + val iter = iterateOverStream(iss, serial) + while (iter.hasNext()) { + emit(iter.next()) + } + }.flowOn(Dispatchers.IO) + + private val inputStringWsSeparated = """{"data":"a"}{"data":"b"}{"data":"c"}""" + private val inputStringWrapped = """[{"data":"a"},{"data":"b"},{"data":"c"}]""" + private val inputList = listOf(StringData("a"), StringData("b"), StringData("c")) @Test fun testEncodeSeveralItems() { @@ -36,16 +46,116 @@ class JsonStreamFlowTest { f.writeToStream(os) } - assertEquals(inputString, os.toString(Charsets.UTF_8.name())) + assertEquals(inputStringWsSeparated, os.toString(Charsets.UTF_8.name())) } @Test fun testDecodeSeveralItems() { - val ins = ByteArrayInputStream(inputString.encodeToByteArray()) + val ins = ByteArrayInputStream(inputStringWsSeparated.encodeToByteArray()) assertFailsWithMessage("EOF") { json.decodeFromStream(ins) } } + private inline fun Iterator.assertNext(expected: T) { + assertTrue(hasNext()) + assertEquals(expected, next()) + } + + private fun Json.iterateOverStream(stream: InputStream, deserializer: DeserializationStrategy): Iterator = + decodeToSequence(stream, deserializer).iterator() + + private fun withInputs(vararg inputs: String = arrayOf(inputStringWsSeparated, inputStringWrapped), block: (InputStream) -> Unit) { + for (input in inputs) { + val res = runCatching { block(input.asInputStream()) } + if (res.isFailure) throw AssertionError("Failed test with input $input", res.exceptionOrNull()) + } + } + + private fun String.asInputStream() = ByteArrayInputStream(this.encodeToByteArray()) + + @Test + fun testIterateSeveralItems() = withInputs { ins -> + val iter = json.iterateOverStream(ins, StringData.serializer()) + iter.assertNext(StringData("a")) + iter.assertNext(StringData("b")) + iter.assertNext(StringData("c")) + assertFalse(iter.hasNext()) + assertFailsWithMessage("EOF") { + iter.next() + } + } + + @Test + fun testDecodeToSequence() = withInputs { ins -> + val sequence = json.decodeToSequence(ins, StringData.serializer()) + assertEquals(inputList, sequence.toList(), "For input $inputStringWsSeparated") + assertFailsWith { sequence.toList() } // assert constrained once + } + + @Test + fun testDecodeAsFlow() = withInputs { ins -> + val list = runBlocking { + buildList { json.readFromStream(ins).toCollection(this) } + } + assertEquals(inputList, list) + } + + @Test + fun testItemsSeparatedByWs() { + val input = "{\"data\":\"a\"} {\"data\":\"b\"}\n\t{\"data\":\"c\"}" + val ins = ByteArrayInputStream(input.encodeToByteArray()) + assertEquals(inputList, json.decodeToSequence(ins, StringData.serializer()).toList()) + } + + @Test + fun testMalformedArray() { + val input1 = """[1, 2, 3""" + val input2 = """[1, 2, 3]qwert""" + val input3 = """[1,2 3]""" + withInputs(input1, input2, input3) { + assertFailsWith { + json.decodeToSequence(it, Int.serializer()).toList() + } + } + } + + @Test + fun testMultilineArrays() { + val input = "[1,2,3]\n[4,5,6]\n[7,8,9]" + assertFailsWith { + json.decodeToSequence>(input.asInputStream(), DecodeSequenceMode.AUTO_DETECT).toList() + } + assertFailsWith { + json.decodeToSequence(input.asInputStream(), DecodeSequenceMode.AUTO_DETECT).toList() + } + assertFailsWith { // we do not merge lists + json.decodeToSequence(input.asInputStream(), DecodeSequenceMode.ARRAY_WRAPPED).toList() + } + val parsed = json.decodeToSequence>(input.asInputStream(), DecodeSequenceMode.WHITESPACE_SEPARATED).toList() + val expected = listOf(listOf(1,2,3), listOf(4,5,6), listOf(7,8,9)) + assertEquals(expected, parsed) + } + + @Test + fun testStrictArrayCheck() { + assertFailsWith { + json.decodeToSequence(inputStringWsSeparated.asInputStream(), DecodeSequenceMode.ARRAY_WRAPPED) + } + } + + @Test + fun testPaddedWs() { + val paddedWs = " $inputStringWsSeparated " + assertEquals(inputList, json.decodeToSequence(paddedWs.asInputStream(), StringData.serializer()).toList()) + assertEquals(inputList, json.decodeToSequence(paddedWs.asInputStream(), StringData.serializer(), DecodeSequenceMode.WHITESPACE_SEPARATED).toList()) + } + + @Test + fun testPaddedArray() { + val paddedWs = " $inputStringWrapped " + assertEquals(inputList, json.decodeToSequence(paddedWs.asInputStream(), StringData.serializer()).toList()) + assertEquals(inputList, json.decodeToSequence(paddedWs.asInputStream(), StringData.serializer(), DecodeSequenceMode.ARRAY_WRAPPED).toList()) + } }