-
Notifications
You must be signed in to change notification settings - Fork 612
/
JsonStreamFlowTest.kt
147 lines (126 loc) · 5.23 KB
/
JsonStreamFlowTest.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
/*
* Copyright 2017-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.serialization.features
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import kotlinx.serialization.*
import kotlinx.serialization.builtins.serializer
import kotlinx.serialization.json.*
import kotlinx.serialization.json.internal.JsonDecodingException
import kotlinx.serialization.test.assertFailsWithMessage
import org.junit.Test
import java.io.*
import kotlin.test.*
class JsonStreamFlowTest {
val json = Json {}
suspend inline fun <reified T> Flow<T>.writeToStream(os: OutputStream) {
collect {
json.encodeToStream(it, os)
}
}
private suspend inline fun <reified T> Json.readFromStream(iss: InputStream): Flow<T> = flow {
val serial = serializer<T>()
val iter = iterateOverStream(iss, serial)
while (iter.hasNext()) {
emit(iter.next())
}
}.flowOn(Dispatchers.IO)
val inputStringWsSeparated = """{"data":"a"}{"data":"b"}{"data":"c"}"""
val inputStringWrapped = """[{"data":"a"},{"data":"b"},{"data":"c"}]"""
val inputList = listOf(StringData("a"), StringData("b"), StringData("c"))
@Test
fun testEncodeSeveralItems() {
val items = inputList
val os = ByteArrayOutputStream()
runBlocking {
val f = flow<StringData> { items.forEach { emit(it) } }
f.writeToStream(os)
}
assertEquals(inputStringWsSeparated, os.toString(Charsets.UTF_8.name()))
}
@Test
fun testDecodeSeveralItems() {
val ins = ByteArrayInputStream(inputStringWsSeparated.encodeToByteArray())
assertFailsWithMessage<SerializationException>("EOF") {
json.decodeFromStream<StringData>(ins)
}
}
private inline fun <reified T> Iterator<T>.assertNext(expected: T) {
assertTrue(hasNext())
assertEquals(expected, next())
}
private fun <T> Json.iterateOverStream(stream: InputStream, deserializer: DeserializationStrategy<T>): Iterator<T> =
decodeToSequence(stream, deserializer).iterator()
private fun withInputs(vararg inputs: String = arrayOf(inputStringWsSeparated, inputStringWrapped), block: (InputStream) -> Unit) {
for (input in inputs) {
val res = runCatching { block(input.asInputStream()) }
if (res.isFailure) throw AssertionError("Failed test with input $input", res.exceptionOrNull())
}
}
private fun String.asInputStream() = ByteArrayInputStream(this.encodeToByteArray())
@Test
fun testIterateSeveralItems() = withInputs { ins ->
val iter = json.iterateOverStream(ins, StringData.serializer())
iter.assertNext(StringData("a"))
iter.assertNext(StringData("b"))
iter.assertNext(StringData("c"))
assertFalse(iter.hasNext())
assertFailsWithMessage<SerializationException>("EOF") {
iter.next()
}
}
@Test
fun testDecodeToSequence() = withInputs { ins ->
val sequence = json.decodeToSequence(ins, StringData.serializer())
assertEquals(inputList, sequence.toList(), "For input $inputStringWsSeparated")
assertFailsWith<IllegalStateException> { sequence.toList() } // assert constrained once
}
@Test
fun testDecodeAsFlow() = withInputs { ins ->
val list = runBlocking {
buildList { json.readFromStream<StringData>(ins).toCollection(this) }
}
assertEquals(inputList, list)
}
@Test
fun testItemsSeparatedByWs() {
val input = "{\"data\":\"a\"} {\"data\":\"b\"}\n\t{\"data\":\"c\"}"
val ins = ByteArrayInputStream(input.encodeToByteArray())
assertEquals(inputList, json.decodeToSequence(ins, StringData.serializer()).toList())
}
@Test
fun testMalformedArray() {
val input1 = """[1, 2, 3"""
val input2 = """[1, 2, 3]qwert"""
val input3 = """[1,2 3]"""
withInputs(input1, input2, input3) {
assertFailsWith<JsonDecodingException> {
json.decodeToSequence(it, Int.serializer()).toList()
}
}
}
@Test
fun testMultilineArrays() {
val input = "[1,2,3]\n[4,5,6]\n[7,8,9]"
assertFailsWith<JsonDecodingException> {
json.decodeToSequence<List<Int>>(input.asInputStream(), LazyStreamingFormat.AUTO_DETECT).toList()
}
assertFailsWith<JsonDecodingException> {
json.decodeToSequence<Int>(input.asInputStream(), LazyStreamingFormat.AUTO_DETECT).toList()
}
assertFailsWith<JsonDecodingException> { // we do not merge lists
json.decodeToSequence<Int>(input.asInputStream(), LazyStreamingFormat.ARRAY_WRAPPED).toList()
}
val parsed = json.decodeToSequence<List<Int>>(input.asInputStream(), LazyStreamingFormat.WHITESPACE_SEPARATED).toList()
val expected = listOf(listOf(1,2,3), listOf(4,5,6), listOf(7,8,9))
assertEquals(expected, parsed)
}
@Test
fun testStrictArrayCheck() {
assertFailsWith<JsonDecodingException> {
json.decodeToSequence<StringData>(inputStringWsSeparated.asInputStream(), LazyStreamingFormat.ARRAY_WRAPPED)
}
}
}