Skip to content

Commit

Permalink
Prototype JsonIterator to read multiple objects from stream lazily
Browse files Browse the repository at this point in the history
although it does not support top-level array wrapping

See #1662
  • Loading branch information
sandwwraith committed Sep 22, 2021
1 parent e1ff8f2 commit 8923be2
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ internal abstract class AbstractJsonLexer {

open fun ensureHaveChars() {}

fun isNotEof(): Boolean = definitelyNotEof(currentPosition) != -1

// Used as bound check in loops
abstract fun definitelyNotEof(position: Int): Int

Expand Down
36 changes: 36 additions & 0 deletions formats/json/jvmMain/src/kotlinx/serialization/json/JvmStreams.kt
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,39 @@ public fun <T> Json.decodeFromStream(
@ExperimentalSerializationApi
public inline fun <reified T> Json.decodeFromStream(stream: InputStream): T =
decodeFromStream(serializersModule.serializer(), stream)

public sealed interface JsonIterator {
public fun <T> next(deserializer: DeserializationStrategy<T>): T

public fun hasNext(): Boolean
}

public fun <T> JsonIterator.asIterator(deserializer: DeserializationStrategy<T>): Iterator<T> =
object : Iterator<T> {
override fun hasNext(): Boolean = this@asIterator.hasNext()

override fun next(): T = this@asIterator.next(deserializer)
}

public fun <T> JsonIterator.asSequence(deserializer: DeserializationStrategy<T>): Sequence<T> = asIterator(deserializer).asSequence()

public fun Json.iterateOverStream(stream: InputStream): JsonIterator {
val lexer = ReaderJsonLexer(stream)
return JsonIteratorImpl(this, lexer)
}

public fun <T> Json.decodeToSequence(stream: InputStream, deserializer: DeserializationStrategy<T>): Sequence<T> =
Sequence { iterateOverStream(stream).asIterator(deserializer) }.constrainOnce() // or just iterateOverStream().asSequence(deserializer)





internal class JsonIteratorImpl(private val json: Json, private val lexer: ReaderJsonLexer): JsonIterator {
override fun <T> next(deserializer: DeserializationStrategy<T>): T {
val input = StreamingJsonDecoder(json, WriteMode.OBJ, lexer, deserializer.descriptor)
return input.decodeSerializableValue(deserializer)
}

override fun hasNext(): Boolean = lexer.isNotEof()
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@

package kotlinx.serialization.features

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import kotlinx.serialization.SerializationException
import kotlinx.serialization.StringData
import kotlinx.serialization.*
import kotlinx.serialization.json.*
import kotlinx.serialization.test.assertFailsWithMessage
import org.junit.Ignore
import org.junit.Test
import java.io.*
import kotlin.test.assertEquals
import kotlin.test.*

class JsonStreamFlowTest {
val json = Json {}
Expand All @@ -24,6 +23,14 @@ class JsonStreamFlowTest {
}
}

suspend inline fun <reified T> Json.readFromStream(iss: InputStream): Flow<T> = flow {
val iter = iterateOverStream(iss)
val serial = serializer<T>()
while (iter.hasNext()) {
emit(iter.next(serial))
}
}.flowOn(Dispatchers.IO)

val inputString = """{"data":"a"}{"data":"b"}{"data":"c"}"""
val inputList = listOf(StringData("a"), StringData("b"), StringData("c"))

Expand All @@ -47,5 +54,57 @@ class JsonStreamFlowTest {
}
}

inline fun <reified T> JsonIterator.assertNext(expected: T) {
assertTrue(hasNext())
assertEquals(expected, next(serializer()))
}

@Test
fun testIterateSeveralItems() {

val ins = ByteArrayInputStream(inputString.encodeToByteArray())
val iter = json.iterateOverStream(ins)
iter.assertNext(StringData("a"))
iter.assertNext(StringData("b"))
iter.assertNext(StringData("c"))
assertFalse(iter.hasNext())
assertFailsWithMessage<SerializationException>("EOF") {
iter.next(StringData.serializer())
}
}

@Test
fun testDecodeToSequence() {
val ins = ByteArrayInputStream(inputString.encodeToByteArray())
assertEquals(inputList, json.decodeToSequence(ins, StringData.serializer()).toList())
}

@Test
fun testDecodeAsFlow() {
val ins = ByteArrayInputStream(inputString.encodeToByteArray())
val list = runBlocking {
buildList { json.readFromStream<StringData>(ins).toCollection(this) }
}
assertEquals(inputList, list)
}

@Test
fun testDecodeDifferentItems() {
val input = """{"data":"a"}{"intV":10}null{"data":"b"}"""
val ins = ByteArrayInputStream(input.encodeToByteArray())
val iter = json.iterateOverStream(ins)
iter.assertNext(StringData("a"))
iter.assertNext(IntData(10))
iter.assertNext<String?>(null)
iter.assertNext(StringData("b"))
assertFalse(iter.hasNext())
}

@Test
fun testItemsSeparatedByWs() {
val input = "{\"data\":\"a\"} {\"data\":\"b\"}\n\t{\"data\":\"c\"}"
val ins = ByteArrayInputStream(input.encodeToByteArray())
assertEquals(inputList, json.decodeToSequence(ins, StringData.serializer()).toList())
}

}

0 comments on commit 8923be2

Please sign in to comment.