Skip to content

Commit

Permalink
[WIP] Improve performance of comine(Iterable<Flow>) by reusing lambda…
Browse files Browse the repository at this point in the history
…s for onReceive

Addresses #2296
  • Loading branch information
qwwdfsad committed Oct 12, 2020
1 parent 20341f2 commit 67df6e6
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 8 deletions.
36 changes: 36 additions & 0 deletions benchmarks/src/jmh/kotlin/benchmarks/flow/CombineBenchmark.kt
@@ -0,0 +1,36 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package benchmarks.flow

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import org.openjdk.jmh.annotations.*
import java.util.concurrent.*

@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@Fork(value = 1)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Benchmark)
open class CombineBenchmark {

@Benchmark
fun measure10() = measure(10)

@Benchmark
fun measure100() = measure(100)

@Benchmark
fun measure1000() = measure(1000)

fun measure(size: Int) = runBlocking {
val flowList = (1..size).map { flowOf(it) }
val listFlow = combine(flowList) { it.toList() }

listFlow.collect {
}
}
}
29 changes: 21 additions & 8 deletions kotlinx-coroutines-core/common/src/flow/internal/Combine.kt
Expand Up @@ -56,14 +56,18 @@ internal suspend fun <R, T> FlowCollector<R>.combineInternal(
val isClosed = Array(size) { false }
var nonClosed = size
var remainingNulls = size
// See flow.combine(other) for explanation.
while (nonClosed != 0) {
select<Unit> {
for (i in 0 until size) {
onReceive(isClosed[i], channels[i], { isClosed[i] = true; --nonClosed }) { value ->
if (latestValues[i] == null) --remainingNulls
latestValues[i] = value
if (remainingNulls != 0) return@onReceive
// See flow.combine(other) for explanation of the logic
// Reuse receive blocks to avoid allocations on each iteration
val onReceiveBlocks = Array<suspend (Any?) -> Unit>(size) { i ->
{ value ->
if (value === null) {
isClosed[i] = true;
--nonClosed
}
else {
if (latestValues[i] == null) --remainingNulls
latestValues[i] = value
if (remainingNulls == 0) {
val arguments = arrayFactory()
for (index in 0 until size) {
arguments[index] = NULL.unbox(latestValues[index])
Expand All @@ -73,6 +77,15 @@ internal suspend fun <R, T> FlowCollector<R>.combineInternal(
}
}
}

while (nonClosed != 0) {
select<Unit> {
for (i in 0 until size) {
if (isClosed[i]) continue
channels[i].onReceiveOrNull(onReceiveBlocks[i])
}
}
}
}

private inline fun SelectBuilder<Unit>.onReceive(
Expand Down

0 comments on commit 67df6e6

Please sign in to comment.