From 704841bd16f20cb6bc2efe8fc946876064becabf Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Mon, 17 Jan 2022 22:03:07 +0300 Subject: [PATCH] Optimization: resizable workers array (#3137) Instead of allocating an array of maxPoolSize (~2M) elements for the worst-case supported scenario that may never be reached in practice and takes considerable memory, allocate just an array of corePoolSize elements and grow it dynamically if needed to accommodate more workers. The data structure to make it happen must support lock-free reads for performance reasons, but it is simple since the workers array is modified exclusively under synchronization. --- .../jvm/src/internal/ResizableAtomicArray.kt | 38 +++++++++++++++++++ .../jvm/src/scheduling/CoroutineScheduler.kt | 11 +++--- .../ResizableAtomicArrayLincheckTest.kt | 27 +++++++++++++ 3 files changed, 70 insertions(+), 6 deletions(-) create mode 100644 kotlinx-coroutines-core/jvm/src/internal/ResizableAtomicArray.kt create mode 100644 kotlinx-coroutines-core/jvm/test/lincheck/ResizableAtomicArrayLincheckTest.kt diff --git a/kotlinx-coroutines-core/jvm/src/internal/ResizableAtomicArray.kt b/kotlinx-coroutines-core/jvm/src/internal/ResizableAtomicArray.kt new file mode 100644 index 0000000000..f949d9f5ea --- /dev/null +++ b/kotlinx-coroutines-core/jvm/src/internal/ResizableAtomicArray.kt @@ -0,0 +1,38 @@ +/* + * Copyright 2016-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.internal + +import java.util.concurrent.atomic.* + +/** + * Atomic array with lock-free reads and synchronized modifications. It logically has an unbounded size, + * is implicitly filled with nulls, and is resized on updates as needed to grow. + */ +internal class ResizableAtomicArray(initialLength: Int) { + @Volatile + private var array = AtomicReferenceArray(initialLength) + + // for debug output + public fun currentLength(): Int = array.length() + + public operator fun get(index: Int): T? { + val array = this.array // volatile read + return if (index < array.length()) array[index] else null + } + + // Must not be called concurrently, e.g. always use synchronized(this) to call this function + fun setSynchronized(index: Int, value: T?) { + val curArray = this.array + val curLen = curArray.length() + if (index < curLen) { + curArray[index] = value + } else { + val newArray = AtomicReferenceArray((index + 1).coerceAtLeast(2 * curLen)) + for (i in 0 until curLen) newArray[i] = curArray[i] + newArray[index] = value + array = newArray // copy done + } + } +} diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt index 41f759ce8a..6b7fecf553 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt @@ -9,7 +9,6 @@ import kotlinx.coroutines.* import kotlinx.coroutines.internal.* import java.io.* import java.util.concurrent.* -import java.util.concurrent.atomic.* import java.util.concurrent.locks.* import kotlin.math.* import kotlin.random.* @@ -261,7 +260,7 @@ internal class CoroutineScheduler( * works properly */ @JvmField - val workers = AtomicReferenceArray(maxPoolSize + 1) + val workers = ResizableAtomicArray(corePoolSize + 1) /** * Long describing state of workers in this pool. @@ -480,7 +479,7 @@ internal class CoroutineScheduler( * 3) Only then start the worker, otherwise it may miss its own creation */ val worker = Worker(newIndex) - workers[newIndex] = worker + workers.setSynchronized(newIndex, worker) require(newIndex == incrementCreatedWorkers()) worker.start() return cpuWorkers + 1 @@ -525,7 +524,7 @@ internal class CoroutineScheduler( var dormant = 0 var terminated = 0 val queueSizes = arrayListOf() - for (index in 1 until workers.length()) { + for (index in 1 until workers.currentLength()) { val worker = workers[index] ?: continue val queueSize = worker.localQueue.size when (worker.state) { @@ -838,7 +837,7 @@ internal class CoroutineScheduler( val lastIndex = decrementCreatedWorkers() if (lastIndex != oldIndex) { val lastWorker = workers[lastIndex]!! - workers[oldIndex] = lastWorker + workers.setSynchronized(oldIndex, lastWorker) lastWorker.indexInArray = oldIndex /* * Now lastWorker is available at both indices in the array, but it can @@ -852,7 +851,7 @@ internal class CoroutineScheduler( /* * 5) It is safe to clear reference from workers array now. */ - workers[lastIndex] = null + workers.setSynchronized(lastIndex, null) } state = WorkerState.TERMINATED } diff --git a/kotlinx-coroutines-core/jvm/test/lincheck/ResizableAtomicArrayLincheckTest.kt b/kotlinx-coroutines-core/jvm/test/lincheck/ResizableAtomicArrayLincheckTest.kt new file mode 100644 index 0000000000..1948a78ecc --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/lincheck/ResizableAtomicArrayLincheckTest.kt @@ -0,0 +1,27 @@ +/* + * Copyright 2016-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.lincheck + +import kotlinx.coroutines.* +import kotlinx.coroutines.internal.* +import org.jetbrains.kotlinx.lincheck.annotations.* +import org.jetbrains.kotlinx.lincheck.paramgen.* + +@Param(name = "index", gen = IntGen::class, conf = "0:4") +@Param(name = "value", gen = IntGen::class, conf = "1:5") +@OpGroupConfig(name = "sync", nonParallel = true) +class ResizableAtomicArrayLincheckTest : AbstractLincheckTest() { + private val a = ResizableAtomicArray(2) + + @Operation + fun get(@Param(name = "index") index: Int): Int? = a[index] + + @Operation(group = "sync") + fun set(@Param(name = "index") index: Int, @Param(name = "value") value: Int) { + a.setSynchronized(index, value) + } + + override fun extractState() = (0..4).map { a[it] } +} \ No newline at end of file