diff --git a/kotlinx-coroutines-core/jvm/src/internal/ResizableAtomicArray.kt b/kotlinx-coroutines-core/jvm/src/internal/ResizableAtomicArray.kt new file mode 100644 index 0000000000..f949d9f5ea --- /dev/null +++ b/kotlinx-coroutines-core/jvm/src/internal/ResizableAtomicArray.kt @@ -0,0 +1,38 @@ +/* + * Copyright 2016-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.internal + +import java.util.concurrent.atomic.* + +/** + * Atomic array with lock-free reads and synchronized modifications. It logically has an unbounded size, + * is implicitly filled with nulls, and is resized on updates as needed to grow. + */ +internal class ResizableAtomicArray(initialLength: Int) { + @Volatile + private var array = AtomicReferenceArray(initialLength) + + // for debug output + public fun currentLength(): Int = array.length() + + public operator fun get(index: Int): T? { + val array = this.array // volatile read + return if (index < array.length()) array[index] else null + } + + // Must not be called concurrently, e.g. always use synchronized(this) to call this function + fun setSynchronized(index: Int, value: T?) { + val curArray = this.array + val curLen = curArray.length() + if (index < curLen) { + curArray[index] = value + } else { + val newArray = AtomicReferenceArray((index + 1).coerceAtLeast(2 * curLen)) + for (i in 0 until curLen) newArray[i] = curArray[i] + newArray[index] = value + array = newArray // copy done + } + } +} diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt index 41f759ce8a..6b7fecf553 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt @@ -9,7 +9,6 @@ import kotlinx.coroutines.* import kotlinx.coroutines.internal.* import java.io.* import java.util.concurrent.* -import java.util.concurrent.atomic.* import java.util.concurrent.locks.* import kotlin.math.* import kotlin.random.* @@ -261,7 +260,7 @@ internal class CoroutineScheduler( * works properly */ @JvmField - val workers = AtomicReferenceArray(maxPoolSize + 1) + val workers = ResizableAtomicArray(corePoolSize + 1) /** * Long describing state of workers in this pool. @@ -480,7 +479,7 @@ internal class CoroutineScheduler( * 3) Only then start the worker, otherwise it may miss its own creation */ val worker = Worker(newIndex) - workers[newIndex] = worker + workers.setSynchronized(newIndex, worker) require(newIndex == incrementCreatedWorkers()) worker.start() return cpuWorkers + 1 @@ -525,7 +524,7 @@ internal class CoroutineScheduler( var dormant = 0 var terminated = 0 val queueSizes = arrayListOf() - for (index in 1 until workers.length()) { + for (index in 1 until workers.currentLength()) { val worker = workers[index] ?: continue val queueSize = worker.localQueue.size when (worker.state) { @@ -838,7 +837,7 @@ internal class CoroutineScheduler( val lastIndex = decrementCreatedWorkers() if (lastIndex != oldIndex) { val lastWorker = workers[lastIndex]!! - workers[oldIndex] = lastWorker + workers.setSynchronized(oldIndex, lastWorker) lastWorker.indexInArray = oldIndex /* * Now lastWorker is available at both indices in the array, but it can @@ -852,7 +851,7 @@ internal class CoroutineScheduler( /* * 5) It is safe to clear reference from workers array now. */ - workers[lastIndex] = null + workers.setSynchronized(lastIndex, null) } state = WorkerState.TERMINATED } diff --git a/kotlinx-coroutines-core/jvm/test/lincheck/ResizableAtomicArrayLincheckTest.kt b/kotlinx-coroutines-core/jvm/test/lincheck/ResizableAtomicArrayLincheckTest.kt new file mode 100644 index 0000000000..1948a78ecc --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/lincheck/ResizableAtomicArrayLincheckTest.kt @@ -0,0 +1,27 @@ +/* + * Copyright 2016-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.lincheck + +import kotlinx.coroutines.* +import kotlinx.coroutines.internal.* +import org.jetbrains.kotlinx.lincheck.annotations.* +import org.jetbrains.kotlinx.lincheck.paramgen.* + +@Param(name = "index", gen = IntGen::class, conf = "0:4") +@Param(name = "value", gen = IntGen::class, conf = "1:5") +@OpGroupConfig(name = "sync", nonParallel = true) +class ResizableAtomicArrayLincheckTest : AbstractLincheckTest() { + private val a = ResizableAtomicArray(2) + + @Operation + fun get(@Param(name = "index") index: Int): Int? = a[index] + + @Operation(group = "sync") + fun set(@Param(name = "index") index: Int, @Param(name = "value") value: Int) { + a.setSynchronized(index, value) + } + + override fun extractState() = (0..4).map { a[it] } +} \ No newline at end of file