From 6e24d414a8553966a80e3d638c77f5f32bdb1f6b Mon Sep 17 00:00:00 2001 From: Leonid Stashevsky Date: Mon, 5 Dec 2022 09:13:09 +0100 Subject: [PATCH] KTOR-5252 Mute EOF exception in line reading --- .../src/io/ktor/utils/io/ByteBufferChannel.kt | 55 ++++++++++--------- .../io/ktor/utils/io/ByteBufferChannelTest.kt | 23 ++++++++ 2 files changed, 52 insertions(+), 26 deletions(-) diff --git a/ktor-io/jvm/src/io/ktor/utils/io/ByteBufferChannel.kt b/ktor-io/jvm/src/io/ktor/utils/io/ByteBufferChannel.kt index 9a1b91989f..b34e387f31 100644 --- a/ktor-io/jvm/src/io/ktor/utils/io/ByteBufferChannel.kt +++ b/ktor-io/jvm/src/io/ktor/utils/io/ByteBufferChannel.kt @@ -1949,41 +1949,44 @@ internal open class ByteBufferChannel( val output = CharArray(8 * 1024) while (!isClosedForRead && !newLine && !caret && (limit == Int.MAX_VALUE || consumed <= limit)) { - read(required) { - val readLimit = if (limit == Int.MAX_VALUE) output.size else minOf(output.size, limit - consumed) - val decodeResult = it.decodeUTF8Line(output, 0, readLimit) + try { + read(required) { + val readLimit = if (limit == Int.MAX_VALUE) output.size else minOf(output.size, limit - consumed) + val decodeResult = it.decodeUTF8Line(output, 0, readLimit) - val decoded = (decodeResult shr 32).toInt() - val requiredBytes = (decodeResult and 0xffffffffL).toInt() + val decoded = (decodeResult shr 32).toInt() + val requiredBytes = (decodeResult and 0xffffffffL).toInt() - required = kotlin.math.max(1, requiredBytes) + required = kotlin.math.max(1, requiredBytes) - if (requiredBytes == -1) { - newLine = true - } + if (requiredBytes == -1) { + newLine = true + } - if (requiredBytes != -1 && it.hasRemaining() && it[it.position()] == '\r'.code.toByte()) { - it.position(it.position() + 1) - caret = true - } + if (requiredBytes != -1 && it.hasRemaining() && it[it.position()] == '\r'.code.toByte()) { + it.position(it.position() + 1) + caret = true + } - if (requiredBytes != -1 && it.hasRemaining() && it[it.position()] == '\n'.code.toByte()) { - it.position(it.position() + 1) - newLine = true - } + if (requiredBytes != -1 && it.hasRemaining() && it[it.position()] == '\n'.code.toByte()) { + it.position(it.position() + 1) + newLine = true + } - if (out is StringBuilder) { - out.append(output, 0, decoded) - } else { - val buffer = CharBuffer.wrap(output, 0, decoded) - out.append(buffer, 0, decoded) - } + if (out is StringBuilder) { + out.append(output, 0, decoded) + } else { + val buffer = CharBuffer.wrap(output, 0, decoded) + out.append(buffer, 0, decoded) + } - consumed += decoded + consumed += decoded - if (limit != Int.MAX_VALUE && consumed >= limit && !newLine) { - throw TooLongLineException("Line is longer than limit") + if (limit != Int.MAX_VALUE && consumed >= limit && !newLine) { + throw TooLongLineException("Line is longer than limit") + } } + } catch (_: EOFException) { } } diff --git a/ktor-io/jvm/test/io/ktor/utils/io/ByteBufferChannelTest.kt b/ktor-io/jvm/test/io/ktor/utils/io/ByteBufferChannelTest.kt index 668ca3562d..b463906336 100644 --- a/ktor-io/jvm/test/io/ktor/utils/io/ByteBufferChannelTest.kt +++ b/ktor-io/jvm/test/io/ktor/utils/io/ByteBufferChannelTest.kt @@ -61,6 +61,29 @@ class ByteBufferChannelTest { Unit } + @Test + fun testReadUtf8LineEOF() = testSuspend { + (1..20000).forEach { num -> + val channel = ByteChannel(true) + val writer = launch(Dispatchers.IO) { + channel.writeFully("1\n".toByteArray()) + channel.close() + } + + val reader = async(Dispatchers.IO) { + val lines = mutableListOf() + while (true) { + val line = channel.readUTF8Line(5000) ?: break + lines.add(line) + } + lines + } + + val readerResult = reader.await() + writer.join() + } + } + @Test fun testWriteWriteAvailableRaceCondition() = runBlocking { testWriteXRaceCondition { it.writeAvailable(1) { it.put(1) } }