diff --git a/benchmarks/src/jmh/kotlin/benchmarks/flow/CombineBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/flow/CombineBenchmark.kt new file mode 100644 index 0000000000..9a7224c89e --- /dev/null +++ b/benchmarks/src/jmh/kotlin/benchmarks/flow/CombineBenchmark.kt @@ -0,0 +1,36 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package benchmarks.flow + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import org.openjdk.jmh.annotations.* +import java.util.concurrent.* + +@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@Fork(value = 1) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@State(Scope.Benchmark) +open class CombineBenchmark { + + @Benchmark + fun measure10() = measure(10) + + @Benchmark + fun measure100() = measure(100) + + @Benchmark + fun measure1000() = measure(1000) + + fun measure(size: Int) = runBlocking { + val flowList = (1..size).map { flowOf(it) } + val listFlow = combine(flowList) { it.toList() } + + listFlow.collect { + } + } +} diff --git a/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt b/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt index 67da32c9f9..6b031065dc 100644 --- a/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt +++ b/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt @@ -56,14 +56,18 @@ internal suspend fun FlowCollector.combineInternal( val isClosed = Array(size) { false } var nonClosed = size var remainingNulls = size - // See flow.combine(other) for explanation. - while (nonClosed != 0) { - select { - for (i in 0 until size) { - onReceive(isClosed[i], channels[i], { isClosed[i] = true; --nonClosed }) { value -> - if (latestValues[i] == null) --remainingNulls - latestValues[i] = value - if (remainingNulls != 0) return@onReceive + // See flow.combine(other) for explanation of the logic + // Reuse receive blocks to avoid allocations on each iteration + val onReceiveBlocks = Array Unit>(size) { i -> + { value -> + if (value === null) { + isClosed[i] = true; + --nonClosed + } + else { + if (latestValues[i] == null) --remainingNulls + latestValues[i] = value + if (remainingNulls == 0) { val arguments = arrayFactory() for (index in 0 until size) { arguments[index] = NULL.unbox(latestValues[index]) @@ -73,6 +77,15 @@ internal suspend fun FlowCollector.combineInternal( } } } + + while (nonClosed != 0) { + select { + for (i in 0 until size) { + if (isClosed[i]) continue + channels[i].onReceiveOrNull(onReceiveBlocks[i]) + } + } + } } private inline fun SelectBuilder.onReceive(