diff --git a/benchmarks/build.gradle.kts b/benchmarks/build.gradle.kts index 5da40f261e..1a62b2d6d4 100644 --- a/benchmarks/build.gradle.kts +++ b/benchmarks/build.gradle.kts @@ -31,33 +31,7 @@ tasks.named("compileJmhKotlin") { } } -/* - * Due to a bug in the inliner it sometimes does not remove inlined symbols (that are later renamed) from unused code paths, - * and it breaks JMH that tries to post-process these symbols and fails because they are renamed. - */ -val removeRedundantFiles by tasks.registering(Delete::class) { - delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$buildHistoOnScore\$1\$\$special\$\$inlined\$filter\$1\$1.class") - delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$nBlanks\$1\$\$special\$\$inlined\$map\$1\$1.class") - delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$score2\$1\$\$special\$\$inlined\$map\$1\$1.class") - delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$bonusForDoubleLetter\$1\$\$special\$\$inlined\$map\$1\$1.class") - delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$nBlanks\$1\$\$special\$\$inlined\$map\$1\$2\$1.class") - delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$bonusForDoubleLetter\$1\$\$special\$\$inlined\$map\$1\$2\$1.class") - delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$score2\$1\$\$special\$\$inlined\$map\$1\$2\$1.class") - delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOptKt\$\$special\$\$inlined\$collect\$1\$1.class") - delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOptKt\$\$special\$\$inlined\$collect\$2\$1.class") - delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$histoOfLetters\$1\$\$special\$\$inlined\$fold\$1\$1.class") - delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleBase\$play\$buildHistoOnScore\$1\$\$special\$\$inlined\$filter\$1\$1.class") - delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleBase\$play\$histoOfLetters\$1\$\$special\$\$inlined\$fold\$1\$1.class") - delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/SaneFlowPlaysScrabble\$play\$buildHistoOnScore\$1\$\$special\$\$inlined\$filter\$1\$1.class") - // Primes - delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/misc/Numbers\$\$special\$\$inlined\$filter\$1\$2\$1.class") - delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/misc/Numbers\$\$special\$\$inlined\$filter\$1\$1.class") -} - -tasks.named("jmhRunBytecodeGenerator") { - dependsOn(removeRedundantFiles) -} // It is better to use the following to run benchmarks, otherwise you may get unexpected errors: // ./gradlew --no-daemon cleanJmhJar jmh -Pjmh="MyBenchmark" diff --git a/benchmarks/src/jmh/kotlin/benchmarks/flow/CombineBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/flow/CombineFlowsBenchmark.kt similarity index 54% rename from benchmarks/src/jmh/kotlin/benchmarks/flow/CombineBenchmark.kt rename to benchmarks/src/jmh/kotlin/benchmarks/flow/CombineFlowsBenchmark.kt index 9a7224c89e..dba161ccfa 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/flow/CombineBenchmark.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/flow/CombineFlowsBenchmark.kt @@ -12,25 +12,23 @@ import java.util.concurrent.* @Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) @Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) @Fork(value = 1) -@BenchmarkMode(Mode.AverageTime) +@BenchmarkMode(Mode.Throughput) @OutputTimeUnit(TimeUnit.MILLISECONDS) @State(Scope.Benchmark) -open class CombineBenchmark { +open class CombineFlowsBenchmarVolatilek { - @Benchmark - fun measure10() = measure(10) + @Param("10", "100", "1000") + private var size = 10 @Benchmark - fun measure100() = measure(100) + fun combine() = runBlocking { + combine((1 until size).map { flowOf(it) }) { a -> a}.collect() + } @Benchmark - fun measure1000() = measure(1000) - - fun measure(size: Int) = runBlocking { - val flowList = (1..size).map { flowOf(it) } - val listFlow = combine(flowList) { it.toList() } - - listFlow.collect { - } + fun combineTransform() = runBlocking { + val list = (1 until size).map { flowOf(it) }.toList() + combineTransform((1 until size).map { flowOf(it) }) { emit(it) }.collect() } } + diff --git a/benchmarks/src/jmh/kotlin/benchmarks/flow/CombineTwoFlowsBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/flow/CombineTwoFlowsBenchmark.kt new file mode 100644 index 0000000000..f7fbc6cf23 --- /dev/null +++ b/benchmarks/src/jmh/kotlin/benchmarks/flow/CombineTwoFlowsBenchmark.kt @@ -0,0 +1,47 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package benchmarks.flow + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import kotlinx.coroutines.flow.internal.* +import org.openjdk.jmh.annotations.* +import java.util.concurrent.* + +@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@Fork(value = 1) +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@State(Scope.Benchmark) +open class CombineTwoFlowsBenchmark { + + @Param("100", "100000", "1000000") + private var size = 100000 + + @Benchmark + fun combinePlain() = runBlocking { + val flow = (1 until size.toLong()).asFlow() + flow.combine(flow) { a, b -> a + b }.collect() + } + + @Benchmark + fun combineTransform() = runBlocking { + val flow = (1 until size.toLong()).asFlow() + flow.combineTransform(flow) { a, b -> emit(a + b) }.collect() + } + + @Benchmark + fun combineVararg() = runBlocking { + val flow = (1 until size.toLong()).asFlow() + combine(listOf(flow, flow)) { arr -> arr[0] + arr[1] }.collect() + } + + @Benchmark + fun combineTransformVararg() = runBlocking { + val flow = (1 until size.toLong()).asFlow() + combineTransform(listOf(flow, flow)) { arr -> emit(arr[0] + arr[1]) }.collect() + } +} diff --git a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt index 53ecf06a2c..8edd2b310c 100644 --- a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt @@ -137,14 +137,6 @@ internal abstract class AbstractSendChannel( return sendSuspend(element) } - internal suspend fun sendFair(element: E) { - if (offerInternal(element) === OFFER_SUCCESS) { - yield() // Works only on fast path to properly work in sequential use-cases - return - } - return sendSuspend(element) - } - public final override fun offer(element: E): Boolean { val result = offerInternal(element) return when { diff --git a/kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt b/kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt index 3f53b48c53..a75d466199 100644 --- a/kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt +++ b/kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt @@ -34,9 +34,4 @@ internal open class ChannelCoroutine( _channel.cancel(exception) // cancel the channel cancelCoroutine(exception) // cancel the job } - - @Suppress("UNCHECKED_CAST") - suspend fun sendFair(element: E) { - (_channel as AbstractSendChannel).sendFair(element) - } } diff --git a/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt b/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt index 3f9034d388..2e69e0cfd9 100644 --- a/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt +++ b/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt @@ -9,107 +9,51 @@ import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.* import kotlinx.coroutines.internal.* -import kotlinx.coroutines.selects.* import kotlin.coroutines.* import kotlin.coroutines.intrinsics.* -internal fun getNull(): Symbol = NULL // Workaround for JS BE bug - -internal suspend fun FlowCollector.combineTransformInternal( - first: Flow, second: Flow, - transform: suspend FlowCollector.(a: T1, b: T2) -> Unit -) { - coroutineScope { - val firstChannel = asFairChannel(first) - val secondChannel = asFairChannel(second) - var firstValue: Any? = null - var secondValue: Any? = null - var firstIsClosed = false - var secondIsClosed = false - while (!firstIsClosed || !secondIsClosed) { - select { - onReceive(firstIsClosed, firstChannel, { firstIsClosed = true }) { value -> - firstValue = value - if (secondValue !== null) { - transform(getNull().unbox(firstValue), getNull().unbox(secondValue) as T2) - } - } - - onReceive(secondIsClosed, secondChannel, { secondIsClosed = true }) { value -> - secondValue = value - if (firstValue !== null) { - transform(getNull().unbox(firstValue) as T1, getNull().unbox(secondValue) as T2) - } - } - } - } - } -} - @PublishedApi internal suspend fun FlowCollector.combineInternal( flows: Array>, arrayFactory: () -> Array, transform: suspend FlowCollector.(Array) -> Unit -): Unit = coroutineScope { +): Unit = flowScope { // flow scope so any cancellation within the source flow will cancel the whole scope val size = flows.size - val channels = Array(size) { asFairChannel(flows[it]) } - val latestValues = arrayOfNulls(size) + val latestValues = Array(size) { NULL } val isClosed = Array(size) { false } - var nonClosed = size - var remainingNulls = size - // See flow.combine(other) for explanation of the logic - // Reuse receive blocks to avoid allocations on each iteration - val onReceiveBlocks = Array Unit>(size) { i -> - { value -> - if (value === null) { - isClosed[i] = true; - --nonClosed - } - else { - if (latestValues[i] == null) --remainingNulls - latestValues[i] = value - if (remainingNulls == 0) { - val arguments = arrayFactory() - for (index in 0 until size) { - arguments[index] = NULL.unbox(latestValues[index]) + val resultChannel = Channel>(Channel.CONFLATED) + val nonClosed = LocalAtomicInt(size) + val remainingAbsentValues = LocalAtomicInt(size) + for (i in 0 until size) { + // Coroutine per flow that keeps track of its value and sends result to downstream + launch { + try { + flows[i].collect { value -> + val previous = latestValues[i] + latestValues[i] = value + if (previous === NULL) remainingAbsentValues.decrementAndGet() + if (remainingAbsentValues.value == 0) { + val results = arrayFactory() + for (index in 0 until size) { + results[index] = NULL.unbox(latestValues[index]) + } + // NB: here actually "stale" array can overwrite a fresh one and break linearizability + resultChannel.send(results as Array) } - transform(arguments as Array) + yield() // Emulate fairness for backward compatibility + } + } finally { + isClosed[i] = true + // Close the channel when there is no more flows + if (nonClosed.decrementAndGet() == 0) { + resultChannel.close() } } } } - while (nonClosed != 0) { - select { - for (i in 0 until size) { - if (isClosed[i]) continue - channels[i].onReceiveOrNull(onReceiveBlocks[i]) - } - } - } -} - -private inline fun SelectBuilder.onReceive( - isClosed: Boolean, - channel: ReceiveChannel, - crossinline onClosed: () -> Unit, - noinline onReceive: suspend (value: Any) -> Unit -) { - if (isClosed) return - @Suppress("DEPRECATION") - channel.onReceiveOrNull { - // TODO onReceiveOrClosed when boxing issues are fixed - if (it === null) onClosed() - else onReceive(it) - } -} - -// Channel has any type due to onReceiveOrNull. This will be fixed after receiveOrClosed -private fun CoroutineScope.asFairChannel(flow: Flow<*>): ReceiveChannel = produce { - val channel = channel as ChannelCoroutine - flow.collect { value -> - return@collect channel.sendFair(value ?: NULL) + resultChannel.consumeEach { + transform(it) } } @@ -131,12 +75,25 @@ internal fun zipImpl(flow: Flow, flow2: Flow, transform: sus val collectJob = Job() val scopeJob = currentCoroutineContext()[Job]!! (second as SendChannel<*>).invokeOnClose { + // Optimization to avoid AFE allocation when the other flow is done if (!collectJob.isActive) collectJob.cancel(AbortFlowException(this@unsafeFlow)) } val newContext = coroutineContext + scopeJob val cnt = threadContextElements(newContext) try { + /* + * Non-trivial undispatched (because we are in the right context and there is no structured concurrency) + * hierarchy: + * -Outer coroutineScope that owns the whole zip process + * - First flow is collected by the child of coroutineScope, collectJob. + * So it can be safely cancelled as soon as the second flow is done + * - **But** the downstream MUST NOT be cancelled when the second flow is done, + * so we emit to downstream from coroutineScope job. + * Typically, such hierarchy requires coroutine for collector that communicates + * with coroutines scope via a channel, but it's way too expensive, so + * we are using this trick instead. + */ withContextUndispatched( coroutineContext + collectJob) { flow.collect { value -> val otherValue = second.receiveOrNull() ?: return@collect diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt b/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt index ec661819f2..67c4c0e9ac 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt @@ -31,9 +31,7 @@ import kotlinx.coroutines.flow.internal.unsafeFlow as flow */ @JvmName("flowCombine") public fun Flow.combine(flow: Flow, transform: suspend (a: T1, b: T2) -> R): Flow = flow { - combineTransformInternal(this@combine, flow) { a, b -> - emit(transform(a, b)) - } + combineInternal(arrayOf(this@combine, flow), { arrayOfNulls(2) }, { emit(transform(it[0] as T1, it[1] as T2)) }) } /** @@ -75,10 +73,11 @@ public fun combine(flow: Flow, flow2: Flow, transform: suspe public fun Flow.combineTransform( flow: Flow, @BuilderInference transform: suspend FlowCollector.(a: T1, b: T2) -> Unit -): Flow = safeFlow { - combineTransformInternal(this@combineTransform, flow) { a, b -> - transform(a, b) - } +): Flow = combineTransform(this, flow) { args: Array<*> -> + transform( + args[0] as T1, + args[1] as T2 + ) } /** diff --git a/kotlinx-coroutines-core/common/src/internal/LocalAtomics.common.kt b/kotlinx-coroutines-core/common/src/internal/LocalAtomics.common.kt new file mode 100644 index 0000000000..a22ae750a4 --- /dev/null +++ b/kotlinx-coroutines-core/common/src/internal/LocalAtomics.common.kt @@ -0,0 +1,32 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.internal + +/* + * These are atomics that are used as local variables + * where atomicfu doesn't support its tranformations. + * + * Have `Local` prefix to avoid AFU clashes during star-imports + */ + +// In fact, used as @Volatile +internal expect class LocalAtomicRef(value: T) { + fun get(): T + fun set(value: T) +} + +internal inline var LocalAtomicRef.value + get() = get() + set(value) = set(value) + +internal expect class LocalAtomicInt(value: Int) { + fun get(): Int + fun set(value: Int) + fun decrementAndGet(): Int +} + +internal inline var LocalAtomicInt.value + get() = get() + set(value) = set(value) diff --git a/kotlinx-coroutines-core/common/test/flow/operators/CombineTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/CombineTest.kt index 2893321998..db07b4f8be 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/CombineTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/CombineTest.kt @@ -211,16 +211,16 @@ abstract class CombineTestBase : TestBase() { hang { expect(3) } } - val flow = f1.combineLatest(f2, { _, _ -> 1 }).onEach { expect(2) } + val flow = f1.combineLatest(f2, { _, _ -> 1 }).onEach { expectUnreached() } assertFailsWith(flow) - finish(4) + finish(2) } @Test fun testCancellationExceptionDownstream() = runTest { val f1 = flow { emit(1) - expect(2) + expect(1) hang { expect(5) } } val f2 = flow { @@ -230,7 +230,7 @@ abstract class CombineTestBase : TestBase() { } val flow = f1.combineLatest(f2, { _, _ -> 1 }).onEach { - expect(1) + expect(2) yield() expect(4) throw CancellationException("") diff --git a/kotlinx-coroutines-core/common/test/flow/operators/ZipTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/ZipTest.kt index 5262f3c159..4dfc5f5fa1 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/ZipTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/ZipTest.kt @@ -13,6 +13,10 @@ import kotlin.test.* */ class ZipTest : TestBase() { + internal fun Flow.zip(flow2: Flow, transform: suspend (T1, T2) -> R): Flow { + return zipImpl(this, flow2, transform) + } + @Test fun testZip() = runTest { val f1 = flowOf("a", "b", "c") @@ -222,4 +226,32 @@ class ZipTest : TestBase() { assertFailsWith(flow) finish(6) } + + private fun numbers(limit: Long = Long.MAX_VALUE) = flow { + for (i in 2L..limit) emit(i) + } + + @Test + fun zip() = runTest { + val numbers = numbers(1000) + val first = numbers + .filter { it % 2L != 0L } + .map { it * it } + val second = numbers + .filter { it % 2L == 0L } + .map { it * it } + first.zip(second) { v1, v2 -> v1 + v2 }.filter { it % 3 == 0L }.count() + } + + @Test + fun zip2() = runTest { + val numbers = numbers(10000) + val first = numbers + .filter { it % 2L != 0L } + .map { it * it } + val second = numbers + .filter { it % 2L == 0L } + .map { it * it } + first.zip(second) { v1, v2 -> v1 + v2 }.filter { it % 3 == 0L }.count() + } } diff --git a/kotlinx-coroutines-core/js/src/internal/LocalAtomics.kt b/kotlinx-coroutines-core/js/src/internal/LocalAtomics.kt new file mode 100644 index 0000000000..a4af0fd534 --- /dev/null +++ b/kotlinx-coroutines-core/js/src/internal/LocalAtomics.kt @@ -0,0 +1,23 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.internal + +internal actual class LocalAtomicRef actual constructor(private var value: T) { + actual fun set(value: T) { + this.value = value + } + + actual fun get(): T = value +} + +internal actual class LocalAtomicInt actual constructor(private var value: Int) { + actual fun set(value: Int) { + this.value = value + } + + actual fun get(): Int = value + + actual fun decrementAndGet(): Int = --value +} diff --git a/kotlinx-coroutines-core/jvm/src/internal/LocalAtomics.kt b/kotlinx-coroutines-core/jvm/src/internal/LocalAtomics.kt new file mode 100644 index 0000000000..67063192e5 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/src/internal/LocalAtomics.kt @@ -0,0 +1,12 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.internal + + +@Suppress("ACTUAL_WITHOUT_EXPECT") +internal actual typealias LocalAtomicRef = java.util.concurrent.atomic.AtomicReference + +@Suppress("ACTUAL_WITHOUT_EXPECT") +internal actual typealias LocalAtomicInt = java.util.concurrent.atomic.AtomicInteger diff --git a/kotlinx-coroutines-core/native/src/internal/LocalAtomics.kt b/kotlinx-coroutines-core/native/src/internal/LocalAtomics.kt new file mode 100644 index 0000000000..a4af0fd534 --- /dev/null +++ b/kotlinx-coroutines-core/native/src/internal/LocalAtomics.kt @@ -0,0 +1,23 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.internal + +internal actual class LocalAtomicRef actual constructor(private var value: T) { + actual fun set(value: T) { + this.value = value + } + + actual fun get(): T = value +} + +internal actual class LocalAtomicInt actual constructor(private var value: Int) { + actual fun set(value: Int) { + this.value = value + } + + actual fun get(): Int = value + + actual fun decrementAndGet(): Int = --value +}