From 588d711ffe759961719948fab43e1f0816087736 Mon Sep 17 00:00:00 2001 From: Leonid Stashevsky Date: Mon, 5 Dec 2022 09:12:55 +0100 Subject: [PATCH 1/5] KTOR-5252 Fix Missing EOF exception --- .../src/io/ktor/utils/io/ByteBufferChannel.kt | 2 +- .../io/ktor/utils/io/ByteBufferChannelTest.kt | 25 +++++++++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/ktor-io/jvm/src/io/ktor/utils/io/ByteBufferChannel.kt b/ktor-io/jvm/src/io/ktor/utils/io/ByteBufferChannel.kt index f0eb60c8f2..9a1b91989f 100644 --- a/ktor-io/jvm/src/io/ktor/utils/io/ByteBufferChannel.kt +++ b/ktor-io/jvm/src/io/ktor/utils/io/ByteBufferChannel.kt @@ -1655,7 +1655,7 @@ internal open class ByteBufferChannel( if (!read) { if (isClosedForRead) { - return + throw EOFException("Got EOF but at least $min bytes were expected") } readBlockSuspend(min, consumer) diff --git a/ktor-io/jvm/test/io/ktor/utils/io/ByteBufferChannelTest.kt b/ktor-io/jvm/test/io/ktor/utils/io/ByteBufferChannelTest.kt index cba1283972..668ca3562d 100644 --- a/ktor-io/jvm/test/io/ktor/utils/io/ByteBufferChannelTest.kt +++ b/ktor-io/jvm/test/io/ktor/utils/io/ByteBufferChannelTest.kt @@ -4,6 +4,8 @@ package io.ktor.utils.io +import io.ktor.test.dispatcher.* +import io.ktor.utils.io.core.EOFException import kotlinx.coroutines.* import kotlinx.coroutines.debug.junit4.* import org.junit.* @@ -24,6 +26,29 @@ class ByteBufferChannelTest { assertFailsWith { runBlocking { channel.readByte() } } } + @Test + fun testEarlyEOF() = testSuspend { + repeat(20000) { + val channel = ByteChannel(true) + launch(Dispatchers.IO) { + channel.writeFully("1\n".toByteArray()) + channel.close() + } + + launch(Dispatchers.IO) { + channel.read(1) { + it.get(ByteArray(it.remaining())) + } + + assertFailsWith { + channel.read(1) { + it.get(ByteArray(it.remaining())) + } + } + }.join() + } + } + @Test fun readRemainingThrowsOnClosed() = runBlocking { val channel = ByteBufferChannel(false) From 6e24d414a8553966a80e3d638c77f5f32bdb1f6b Mon Sep 17 00:00:00 2001 From: Leonid Stashevsky Date: Mon, 5 Dec 2022 09:13:09 +0100 Subject: [PATCH 2/5] KTOR-5252 Mute EOF exception in line reading --- .../src/io/ktor/utils/io/ByteBufferChannel.kt | 55 ++++++++++--------- .../io/ktor/utils/io/ByteBufferChannelTest.kt | 23 ++++++++ 2 files changed, 52 insertions(+), 26 deletions(-) diff --git a/ktor-io/jvm/src/io/ktor/utils/io/ByteBufferChannel.kt b/ktor-io/jvm/src/io/ktor/utils/io/ByteBufferChannel.kt index 9a1b91989f..b34e387f31 100644 --- a/ktor-io/jvm/src/io/ktor/utils/io/ByteBufferChannel.kt +++ b/ktor-io/jvm/src/io/ktor/utils/io/ByteBufferChannel.kt @@ -1949,41 +1949,44 @@ internal open class ByteBufferChannel( val output = CharArray(8 * 1024) while (!isClosedForRead && !newLine && !caret && (limit == Int.MAX_VALUE || consumed <= limit)) { - read(required) { - val readLimit = if (limit == Int.MAX_VALUE) output.size else minOf(output.size, limit - consumed) - val decodeResult = it.decodeUTF8Line(output, 0, readLimit) + try { + read(required) { + val readLimit = if (limit == Int.MAX_VALUE) output.size else minOf(output.size, limit - consumed) + val decodeResult = it.decodeUTF8Line(output, 0, readLimit) - val decoded = (decodeResult shr 32).toInt() - val requiredBytes = (decodeResult and 0xffffffffL).toInt() + val decoded = (decodeResult shr 32).toInt() + val requiredBytes = (decodeResult and 0xffffffffL).toInt() - required = kotlin.math.max(1, requiredBytes) + required = kotlin.math.max(1, requiredBytes) - if (requiredBytes == -1) { - newLine = true - } + if (requiredBytes == -1) { + newLine = true + } - if (requiredBytes != -1 && it.hasRemaining() && it[it.position()] == '\r'.code.toByte()) { - it.position(it.position() + 1) - caret = true - } + if (requiredBytes != -1 && it.hasRemaining() && it[it.position()] == '\r'.code.toByte()) { + it.position(it.position() + 1) + caret = true + } - if (requiredBytes != -1 && it.hasRemaining() && it[it.position()] == '\n'.code.toByte()) { - it.position(it.position() + 1) - newLine = true - } + if (requiredBytes != -1 && it.hasRemaining() && it[it.position()] == '\n'.code.toByte()) { + it.position(it.position() + 1) + newLine = true + } - if (out is StringBuilder) { - out.append(output, 0, decoded) - } else { - val buffer = CharBuffer.wrap(output, 0, decoded) - out.append(buffer, 0, decoded) - } + if (out is StringBuilder) { + out.append(output, 0, decoded) + } else { + val buffer = CharBuffer.wrap(output, 0, decoded) + out.append(buffer, 0, decoded) + } - consumed += decoded + consumed += decoded - if (limit != Int.MAX_VALUE && consumed >= limit && !newLine) { - throw TooLongLineException("Line is longer than limit") + if (limit != Int.MAX_VALUE && consumed >= limit && !newLine) { + throw TooLongLineException("Line is longer than limit") + } } + } catch (_: EOFException) { } } diff --git a/ktor-io/jvm/test/io/ktor/utils/io/ByteBufferChannelTest.kt b/ktor-io/jvm/test/io/ktor/utils/io/ByteBufferChannelTest.kt index 668ca3562d..b463906336 100644 --- a/ktor-io/jvm/test/io/ktor/utils/io/ByteBufferChannelTest.kt +++ b/ktor-io/jvm/test/io/ktor/utils/io/ByteBufferChannelTest.kt @@ -61,6 +61,29 @@ class ByteBufferChannelTest { Unit } + @Test + fun testReadUtf8LineEOF() = testSuspend { + (1..20000).forEach { num -> + val channel = ByteChannel(true) + val writer = launch(Dispatchers.IO) { + channel.writeFully("1\n".toByteArray()) + channel.close() + } + + val reader = async(Dispatchers.IO) { + val lines = mutableListOf() + while (true) { + val line = channel.readUTF8Line(5000) ?: break + lines.add(line) + } + lines + } + + val readerResult = reader.await() + writer.join() + } + } + @Test fun testWriteWriteAvailableRaceCondition() = runBlocking { testWriteXRaceCondition { it.writeAvailable(1) { it.put(1) } } From ee367bcdea2f995d5e01c0cdd7c31f85471a309e Mon Sep 17 00:00:00 2001 From: Leonid Stashevsky Date: Mon, 5 Dec 2022 09:39:17 +0100 Subject: [PATCH 3/5] Fix style --- ktor-io/jvm/test/io/ktor/utils/io/ByteBufferChannelTest.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ktor-io/jvm/test/io/ktor/utils/io/ByteBufferChannelTest.kt b/ktor-io/jvm/test/io/ktor/utils/io/ByteBufferChannelTest.kt index b463906336..54884433cc 100644 --- a/ktor-io/jvm/test/io/ktor/utils/io/ByteBufferChannelTest.kt +++ b/ktor-io/jvm/test/io/ktor/utils/io/ByteBufferChannelTest.kt @@ -63,7 +63,7 @@ class ByteBufferChannelTest { @Test fun testReadUtf8LineEOF() = testSuspend { - (1..20000).forEach { num -> + repeat(20000) { val channel = ByteChannel(true) val writer = launch(Dispatchers.IO) { channel.writeFully("1\n".toByteArray()) @@ -79,7 +79,7 @@ class ByteBufferChannelTest { lines } - val readerResult = reader.await() + reader.await() writer.join() } } From 269af75f0fad1141acd05ef21511d83804448e4f Mon Sep 17 00:00:00 2001 From: Leonid Stashevsky Date: Tue, 6 Dec 2022 09:15:51 +0100 Subject: [PATCH 4/5] KTOR-5252 Fix peekTo EOFException --- .../src/io/ktor/utils/io/ByteBufferChannel.kt | 28 +++++++++++-------- 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/ktor-io/jvm/src/io/ktor/utils/io/ByteBufferChannel.kt b/ktor-io/jvm/src/io/ktor/utils/io/ByteBufferChannel.kt index b34e387f31..b53f81c678 100644 --- a/ktor-io/jvm/src/io/ktor/utils/io/ByteBufferChannel.kt +++ b/ktor-io/jvm/src/io/ktor/utils/io/ByteBufferChannel.kt @@ -2328,19 +2328,23 @@ internal open class ByteBufferChannel( var bytesCopied = 0 val desiredSize = (min + offset).coerceAtMost(4088L).toInt() - read(desiredSize) { nioBuffer -> - if (nioBuffer.remaining() > offset) { - val view = nioBuffer.duplicate()!! - view.position(view.position() + offset.toInt()) - - val oldLimit = view.limit() - val canCopyToDestination = minOf(max, destination.size - destinationOffset) - val newLimit = minOf(view.limit().toLong(), canCopyToDestination + offset) - view.limit(newLimit.toInt()) - bytesCopied = view.remaining() - view.copyTo(destination, destinationOffset.toInt()) - view.limit(oldLimit) + try { + read(desiredSize) { nioBuffer -> + if (nioBuffer.remaining() > offset) { + val view = nioBuffer.duplicate()!! + view.position(view.position() + offset.toInt()) + + val oldLimit = view.limit() + val canCopyToDestination = minOf(max, destination.size - destinationOffset) + val newLimit = minOf(view.limit().toLong(), canCopyToDestination + offset) + view.limit(newLimit.toInt()) + bytesCopied = view.remaining() + view.copyTo(destination, destinationOffset.toInt()) + view.limit(oldLimit) + } } + } catch (_: EOFException) { + // ignore } return bytesCopied.toLong() From c75d2a530f0cca1eba4657a4640a107604c742c0 Mon Sep 17 00:00:00 2001 From: Leonid Stashevsky Date: Tue, 6 Dec 2022 11:28:35 +0100 Subject: [PATCH 5/5] fixup! Fix style --- ktor-io/jvm/src/io/ktor/utils/io/ByteBufferChannel.kt | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ktor-io/jvm/src/io/ktor/utils/io/ByteBufferChannel.kt b/ktor-io/jvm/src/io/ktor/utils/io/ByteBufferChannel.kt index b53f81c678..f2a9e2752a 100644 --- a/ktor-io/jvm/src/io/ktor/utils/io/ByteBufferChannel.kt +++ b/ktor-io/jvm/src/io/ktor/utils/io/ByteBufferChannel.kt @@ -1987,6 +1987,7 @@ internal open class ByteBufferChannel( } } } catch (_: EOFException) { + // Ignored by the contract of [ByteReadChannel.readUTF8LineTo] method } } @@ -1999,6 +2000,7 @@ internal open class ByteBufferChannel( } } } catch (_: EOFException) { + // Ignored by the contract of [ByteReadChannel.readUTF8LineTo] method } } @@ -2344,7 +2346,7 @@ internal open class ByteBufferChannel( } } } catch (_: EOFException) { - // ignore + // ignored by the contract of peekTo method } return bytesCopied.toLong()