Skip to content

Commit

Permalink
Optimization: resizable workers array (#3137)
Browse files Browse the repository at this point in the history
Instead of allocating an array of maxPoolSize (~2M) elements for the worst-case supported scenario that may never be reached in practice and takes considerable memory, allocate just an array of corePoolSize elements and grow it dynamically if needed to accommodate more workers.

The data structure to make it happen must support lock-free reads for performance reasons, but it is simple since the workers array is modified exclusively under synchronization.
  • Loading branch information
elizarov committed Jan 17, 2022
1 parent 9169d09 commit a1f5ab8
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 6 deletions.
38 changes: 38 additions & 0 deletions kotlinx-coroutines-core/jvm/src/internal/ResizableAtomicArray.kt
@@ -0,0 +1,38 @@
/*
* Copyright 2016-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.internal

import java.util.concurrent.atomic.*

/**
* Atomic array with lock-free reads and synchronized modifications. It logically has an unbounded size,
* is implicitly filled with nulls, and is resized on updates as needed to grow.
*/
internal class ResizableAtomicArray<T>(initialLength: Int) {
@Volatile
private var array = AtomicReferenceArray<T>(initialLength)

// for debug output
public fun currentLength(): Int = array.length()

public operator fun get(index: Int): T? {
val array = this.array // volatile read
return if (index < array.length()) array[index] else null
}

// Must not be called concurrently, e.g. always use synchronized(this) to call this function
fun setSynchronized(index: Int, value: T?) {
val curArray = this.array
val curLen = curArray.length()
if (index < curLen) {
curArray[index] = value
} else {
val newArray = AtomicReferenceArray<T>((index + 1).coerceAtLeast(2 * curLen))
for (i in 0 until curLen) newArray[i] = curArray[i]
newArray[index] = value
array = newArray // copy done
}
}
}
11 changes: 5 additions & 6 deletions kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt
Expand Up @@ -9,7 +9,6 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.internal.*
import java.io.*
import java.util.concurrent.*
import java.util.concurrent.atomic.*
import java.util.concurrent.locks.*
import kotlin.math.*
import kotlin.random.*
Expand Down Expand Up @@ -261,7 +260,7 @@ internal class CoroutineScheduler(
* works properly
*/
@JvmField
val workers = AtomicReferenceArray<Worker?>(maxPoolSize + 1)
val workers = ResizableAtomicArray<Worker>(corePoolSize + 1)

/**
* Long describing state of workers in this pool.
Expand Down Expand Up @@ -480,7 +479,7 @@ internal class CoroutineScheduler(
* 3) Only then start the worker, otherwise it may miss its own creation
*/
val worker = Worker(newIndex)
workers[newIndex] = worker
workers.setSynchronized(newIndex, worker)
require(newIndex == incrementCreatedWorkers())
worker.start()
return cpuWorkers + 1
Expand Down Expand Up @@ -525,7 +524,7 @@ internal class CoroutineScheduler(
var dormant = 0
var terminated = 0
val queueSizes = arrayListOf<String>()
for (index in 1 until workers.length()) {
for (index in 1 until workers.currentLength()) {
val worker = workers[index] ?: continue
val queueSize = worker.localQueue.size
when (worker.state) {
Expand Down Expand Up @@ -838,7 +837,7 @@ internal class CoroutineScheduler(
val lastIndex = decrementCreatedWorkers()
if (lastIndex != oldIndex) {
val lastWorker = workers[lastIndex]!!
workers[oldIndex] = lastWorker
workers.setSynchronized(oldIndex, lastWorker)
lastWorker.indexInArray = oldIndex
/*
* Now lastWorker is available at both indices in the array, but it can
Expand All @@ -852,7 +851,7 @@ internal class CoroutineScheduler(
/*
* 5) It is safe to clear reference from workers array now.
*/
workers[lastIndex] = null
workers.setSynchronized(lastIndex, null)
}
state = WorkerState.TERMINATED
}
Expand Down
@@ -0,0 +1,27 @@
/*
* Copyright 2016-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.lincheck

import kotlinx.coroutines.*
import kotlinx.coroutines.internal.*
import org.jetbrains.kotlinx.lincheck.annotations.*
import org.jetbrains.kotlinx.lincheck.paramgen.*

@Param(name = "index", gen = IntGen::class, conf = "0:4")
@Param(name = "value", gen = IntGen::class, conf = "1:5")
@OpGroupConfig(name = "sync", nonParallel = true)
class ResizableAtomicArrayLincheckTest : AbstractLincheckTest() {
private val a = ResizableAtomicArray<Int>(2)

@Operation
fun get(@Param(name = "index") index: Int): Int? = a[index]

@Operation(group = "sync")
fun set(@Param(name = "index") index: Int, @Param(name = "value") value: Int) {
a.setSynchronized(index, value)
}

override fun extractState() = (0..4).map { a[it] }
}

0 comments on commit a1f5ab8

Please sign in to comment.