Skip to content

Commit

Permalink
KTOR-5342 Fix freezing of readUTF8Line (#3322)
Browse files Browse the repository at this point in the history
  • Loading branch information
rsinukov authored and marychatte committed Jan 10, 2023
1 parent d4be8ce commit 0825343
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 10 deletions.
54 changes: 45 additions & 9 deletions ktor-io/jvm/src/io/ktor/utils/io/ByteBufferChannel.kt
Expand Up @@ -15,6 +15,7 @@ import java.lang.Float.*
import java.nio.*
import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*
import kotlin.math.*

internal const val DEFAULT_CLOSE_MESSAGE: String = "Byte channel was closed"
private const val BYTE_BUFFER_CAPACITY: Int = 4088
Expand Down Expand Up @@ -1948,40 +1949,71 @@ internal open class ByteBufferChannel(
var newLine = false

val output = CharArray(8 * 1024)
var transferBuffer: ByteBuffer? = null
var transferredRemaining = 0
while (!isClosedForRead && !newLine && !caret && (limit == Int.MAX_VALUE || consumed <= limit)) {
try {
read(required) {
read(required) { buffer ->
val oldPosition = buffer.position()
val bufferToDecode = transferBuffer?.also {
val oldLimit = buffer.limit()
buffer.limit(min(buffer.limit(), buffer.position() + it.remaining()))
it.put(buffer)
it.flip()
buffer.limit(oldLimit)
} ?: buffer

val readLimit = if (limit == Int.MAX_VALUE) output.size else minOf(output.size, limit - consumed)
val decodeResult = it.decodeUTF8Line(output, 0, readLimit)
val decodeResult = bufferToDecode.decodeUTF8Line(output, 0, readLimit)

transferBuffer?.let {
buffer.position(oldPosition + it.position() - transferredRemaining)
BufferPool.recycle(it)
transferBuffer = null
transferredRemaining = 0
}

val decoded = (decodeResult shr 32).toInt()
val requiredBytes = (decodeResult and 0xffffffffL).toInt()

required = kotlin.math.max(1, requiredBytes)
required = max(1, requiredBytes)

if (requiredBytes == -1) {
newLine = true
}

if (requiredBytes != -1 && it.hasRemaining() && it[it.position()] == '\r'.code.toByte()) {
it.position(it.position() + 1)
if (requiredBytes != -1 &&
buffer.hasRemaining() &&
buffer[buffer.position()] == '\r'.code.toByte()
) {
buffer.position(buffer.position() + 1)
caret = true
}

if (requiredBytes != -1 && it.hasRemaining() && it[it.position()] == '\n'.code.toByte()) {
it.position(it.position() + 1)
if (requiredBytes != -1 &&
buffer.hasRemaining() &&
buffer[buffer.position()] == '\n'.code.toByte()
) {
buffer.position(buffer.position() + 1)
newLine = true
}

if (out is StringBuilder) {
out.append(output, 0, decoded)
} else {
val buffer = CharBuffer.wrap(output, 0, decoded)
out.append(buffer, 0, decoded)
val charBuffer = CharBuffer.wrap(output, 0, decoded)
out.append(charBuffer, 0, decoded)
}

consumed += decoded

if (decoded == 0 && buffer.remaining() < requiredBytes) {
transferBuffer = BufferPool.borrow().also {
transferredRemaining = buffer.remaining()
it.put(buffer)
}
}

if (limit != Int.MAX_VALUE && consumed >= limit && !newLine) {
throw TooLongLineException("Line is longer than limit")
}
Expand All @@ -1991,6 +2023,10 @@ internal open class ByteBufferChannel(
}
}

if (transferBuffer != null) {
BufferPool.recycle(transferBuffer!!)
}

if (!isClosedForRead && caret && !newLine) {
try {
read(1) {
Expand Down
25 changes: 25 additions & 0 deletions ktor-io/jvm/test/io/ktor/utils/io/ByteBufferChannelTest.kt
Expand Up @@ -228,4 +228,29 @@ class ByteBufferChannelTest {
yield()
assertFalse(awaitingContent)
}

@Test
fun testReadLine(): Unit = runBlocking {
val channel = ByteChannel(autoFlush = true)

val writer = launch(Dispatchers.IO) {
repeat(4087) {
channel.writeByte('a'.code.toByte())
}

// U+2588
channel.writeByte(0xE2.toByte())
channel.writeByte(0x96.toByte())
channel.writeByte(0x88.toByte())

channel.writeByte('\n'.code.toByte())
}

val reader = async(Dispatchers.IO) {
channel.readUTF8Line(100_000)
}

reader.await()
writer.join()
}
}
Expand Up @@ -62,7 +62,7 @@ class CachingHeadersTest {
}
)

object Immutable: CacheControl(null) {
object Immutable : CacheControl(null) {
override fun toString(): String = "immutable"
}

Expand Down

0 comments on commit 0825343

Please sign in to comment.