Skip to content

Commit

Permalink
KTOR-5252 Mute EOF exception in line reading
Browse files Browse the repository at this point in the history
  • Loading branch information
e5l committed Dec 5, 2022
1 parent 588d711 commit 6e24d41
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 26 deletions.
55 changes: 29 additions & 26 deletions ktor-io/jvm/src/io/ktor/utils/io/ByteBufferChannel.kt
Expand Up @@ -1949,41 +1949,44 @@ internal open class ByteBufferChannel(

val output = CharArray(8 * 1024)
while (!isClosedForRead && !newLine && !caret && (limit == Int.MAX_VALUE || consumed <= limit)) {
read(required) {
val readLimit = if (limit == Int.MAX_VALUE) output.size else minOf(output.size, limit - consumed)
val decodeResult = it.decodeUTF8Line(output, 0, readLimit)
try {
read(required) {
val readLimit = if (limit == Int.MAX_VALUE) output.size else minOf(output.size, limit - consumed)
val decodeResult = it.decodeUTF8Line(output, 0, readLimit)

val decoded = (decodeResult shr 32).toInt()
val requiredBytes = (decodeResult and 0xffffffffL).toInt()
val decoded = (decodeResult shr 32).toInt()
val requiredBytes = (decodeResult and 0xffffffffL).toInt()

required = kotlin.math.max(1, requiredBytes)
required = kotlin.math.max(1, requiredBytes)

if (requiredBytes == -1) {
newLine = true
}
if (requiredBytes == -1) {
newLine = true
}

if (requiredBytes != -1 && it.hasRemaining() && it[it.position()] == '\r'.code.toByte()) {
it.position(it.position() + 1)
caret = true
}
if (requiredBytes != -1 && it.hasRemaining() && it[it.position()] == '\r'.code.toByte()) {
it.position(it.position() + 1)
caret = true
}

if (requiredBytes != -1 && it.hasRemaining() && it[it.position()] == '\n'.code.toByte()) {
it.position(it.position() + 1)
newLine = true
}
if (requiredBytes != -1 && it.hasRemaining() && it[it.position()] == '\n'.code.toByte()) {
it.position(it.position() + 1)
newLine = true
}

if (out is StringBuilder) {
out.append(output, 0, decoded)
} else {
val buffer = CharBuffer.wrap(output, 0, decoded)
out.append(buffer, 0, decoded)
}
if (out is StringBuilder) {
out.append(output, 0, decoded)
} else {
val buffer = CharBuffer.wrap(output, 0, decoded)
out.append(buffer, 0, decoded)
}

consumed += decoded
consumed += decoded

if (limit != Int.MAX_VALUE && consumed >= limit && !newLine) {
throw TooLongLineException("Line is longer than limit")
if (limit != Int.MAX_VALUE && consumed >= limit && !newLine) {
throw TooLongLineException("Line is longer than limit")
}
}
} catch (_: EOFException) {
}
}

Expand Down
23 changes: 23 additions & 0 deletions ktor-io/jvm/test/io/ktor/utils/io/ByteBufferChannelTest.kt
Expand Up @@ -61,6 +61,29 @@ class ByteBufferChannelTest {
Unit
}

@Test
fun testReadUtf8LineEOF() = testSuspend {
(1..20000).forEach { num ->
val channel = ByteChannel(true)
val writer = launch(Dispatchers.IO) {
channel.writeFully("1\n".toByteArray())
channel.close()
}

val reader = async(Dispatchers.IO) {
val lines = mutableListOf<String>()
while (true) {
val line = channel.readUTF8Line(5000) ?: break
lines.add(line)
}
lines
}

val readerResult = reader.await()
writer.join()
}
}

@Test
fun testWriteWriteAvailableRaceCondition() = runBlocking {
testWriteXRaceCondition { it.writeAvailable(1) { it.put(1) } }
Expand Down

0 comments on commit 6e24d41

Please sign in to comment.