diff --git a/benchmarks/build.gradle.kts b/benchmarks/build.gradle.kts index 5da40f261e..b60dcbc8f4 100644 --- a/benchmarks/build.gradle.kts +++ b/benchmarks/build.gradle.kts @@ -31,38 +31,12 @@ tasks.named("compileJmhKotlin") { } } -/* - * Due to a bug in the inliner it sometimes does not remove inlined symbols (that are later renamed) from unused code paths, - * and it breaks JMH that tries to post-process these symbols and fails because they are renamed. - */ -val removeRedundantFiles by tasks.registering(Delete::class) { - delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$buildHistoOnScore\$1\$\$special\$\$inlined\$filter\$1\$1.class") - delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$nBlanks\$1\$\$special\$\$inlined\$map\$1\$1.class") - delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$score2\$1\$\$special\$\$inlined\$map\$1\$1.class") - delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$bonusForDoubleLetter\$1\$\$special\$\$inlined\$map\$1\$1.class") - delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$nBlanks\$1\$\$special\$\$inlined\$map\$1\$2\$1.class") - delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$bonusForDoubleLetter\$1\$\$special\$\$inlined\$map\$1\$2\$1.class") - delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$score2\$1\$\$special\$\$inlined\$map\$1\$2\$1.class") - delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOptKt\$\$special\$\$inlined\$collect\$1\$1.class") - delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOptKt\$\$special\$\$inlined\$collect\$2\$1.class") - delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$histoOfLetters\$1\$\$special\$\$inlined\$fold\$1\$1.class") - delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleBase\$play\$buildHistoOnScore\$1\$\$special\$\$inlined\$filter\$1\$1.class") - delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleBase\$play\$histoOfLetters\$1\$\$special\$\$inlined\$fold\$1\$1.class") - delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/SaneFlowPlaysScrabble\$play\$buildHistoOnScore\$1\$\$special\$\$inlined\$filter\$1\$1.class") - // Primes - delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/misc/Numbers\$\$special\$\$inlined\$filter\$1\$2\$1.class") - delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/misc/Numbers\$\$special\$\$inlined\$filter\$1\$1.class") -} - -tasks.named("jmhRunBytecodeGenerator") { - dependsOn(removeRedundantFiles) -} // It is better to use the following to run benchmarks, otherwise you may get unexpected errors: // ./gradlew --no-daemon cleanJmhJar jmh -Pjmh="MyBenchmark" extensions.configure("jmh") { - jmhVersion = "1.21" + jmhVersion = "1.26" duplicateClassesStrategy = DuplicatesStrategy.INCLUDE failOnError = true resultFormat = "CSV" @@ -80,7 +54,7 @@ tasks.named("jmhJar") { } dependencies { - compile("org.openjdk.jmh:jmh-core:1.21") + compile("org.openjdk.jmh:jmh-core:1.26") compile("io.projectreactor:reactor-core:${version("reactor")}") compile("io.reactivex.rxjava2:rxjava:2.1.9") compile("com.github.akarnokd:rxjava2-extensions:0.20.8") diff --git a/benchmarks/src/jmh/kotlin/benchmarks/flow/CombineFlowsBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/flow/CombineFlowsBenchmark.kt new file mode 100644 index 0000000000..4725ceda91 --- /dev/null +++ b/benchmarks/src/jmh/kotlin/benchmarks/flow/CombineFlowsBenchmark.kt @@ -0,0 +1,34 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package benchmarks.flow + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import org.openjdk.jmh.annotations.* +import java.util.concurrent.* + +@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@Fork(value = 1) +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@State(Scope.Benchmark) +open class CombineFlowsBenchmark { + + @Param("10", "100", "1000") + private var size = 10 + + @Benchmark + fun combine() = runBlocking { + combine((1 until size).map { flowOf(it) }) { a -> a}.collect() + } + + @Benchmark + fun combineTransform() = runBlocking { + val list = (1 until size).map { flowOf(it) }.toList() + combineTransform((1 until size).map { flowOf(it) }) { emit(it) }.collect() + } +} + diff --git a/benchmarks/src/jmh/kotlin/benchmarks/flow/CombineTwoFlowsBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/flow/CombineTwoFlowsBenchmark.kt new file mode 100644 index 0000000000..f7fbc6cf23 --- /dev/null +++ b/benchmarks/src/jmh/kotlin/benchmarks/flow/CombineTwoFlowsBenchmark.kt @@ -0,0 +1,47 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package benchmarks.flow + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import kotlinx.coroutines.flow.internal.* +import org.openjdk.jmh.annotations.* +import java.util.concurrent.* + +@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@Fork(value = 1) +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@State(Scope.Benchmark) +open class CombineTwoFlowsBenchmark { + + @Param("100", "100000", "1000000") + private var size = 100000 + + @Benchmark + fun combinePlain() = runBlocking { + val flow = (1 until size.toLong()).asFlow() + flow.combine(flow) { a, b -> a + b }.collect() + } + + @Benchmark + fun combineTransform() = runBlocking { + val flow = (1 until size.toLong()).asFlow() + flow.combineTransform(flow) { a, b -> emit(a + b) }.collect() + } + + @Benchmark + fun combineVararg() = runBlocking { + val flow = (1 until size.toLong()).asFlow() + combine(listOf(flow, flow)) { arr -> arr[0] + arr[1] }.collect() + } + + @Benchmark + fun combineTransformVararg() = runBlocking { + val flow = (1 until size.toLong()).asFlow() + combineTransform(listOf(flow, flow)) { arr -> emit(arr[0] + arr[1]) }.collect() + } +} diff --git a/benchmarks/src/jmh/kotlin/benchmarks/flow/NumbersBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/flow/NumbersBenchmark.kt index 4ebb3d07ff..8453f5c7f9 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/flow/NumbersBenchmark.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/flow/NumbersBenchmark.kt @@ -77,14 +77,14 @@ open class NumbersBenchmark { @Benchmark fun zipRx() { - val numbers = rxNumbers().take(natural.toLong()) + val numbers = rxNumbers().take(natural) val first = numbers .filter { it % 2L != 0L } .map { it * it } val second = numbers .filter { it % 2L == 0L } .map { it * it } - first.zipWith(second, BiFunction { v1, v2 -> v1 + v2 }).filter { it % 3 == 0L }.count() + first.zipWith(second, { v1, v2 -> v1 + v2 }).filter { it % 3 == 0L }.count() .blockingGet() } @@ -98,7 +98,7 @@ open class NumbersBenchmark { @Benchmark fun transformationsRx(): Long { - return rxNumbers().take(natural.toLong()) + return rxNumbers().take(natural) .filter { it % 2L != 0L } .map { it * it } .filter { (it + 1) % 3 == 0L }.count() diff --git a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt index 53ecf06a2c..8edd2b310c 100644 --- a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt @@ -137,14 +137,6 @@ internal abstract class AbstractSendChannel( return sendSuspend(element) } - internal suspend fun sendFair(element: E) { - if (offerInternal(element) === OFFER_SUCCESS) { - yield() // Works only on fast path to properly work in sequential use-cases - return - } - return sendSuspend(element) - } - public final override fun offer(element: E): Boolean { val result = offerInternal(element) return when { diff --git a/kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt b/kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt index 3f53b48c53..a75d466199 100644 --- a/kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt +++ b/kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt @@ -34,9 +34,4 @@ internal open class ChannelCoroutine( _channel.cancel(exception) // cancel the channel cancelCoroutine(exception) // cancel the job } - - @Suppress("UNCHECKED_CAST") - suspend fun sendFair(element: E) { - (_channel as AbstractSendChannel).sendFair(element) - } } diff --git a/kotlinx-coroutines-core/common/src/flow/Migration.kt b/kotlinx-coroutines-core/common/src/flow/Migration.kt index 59873eba5f..490e88265c 100644 --- a/kotlinx-coroutines-core/common/src/flow/Migration.kt +++ b/kotlinx-coroutines-core/common/src/flow/Migration.kt @@ -367,10 +367,10 @@ public fun Flow.combineLatest(other: Flow, transform: suspen message = "Flow analogue of 'combineLatest' is 'combine'", replaceWith = ReplaceWith("combine(this, other, other2, transform)") ) -public inline fun Flow.combineLatest( +public fun Flow.combineLatest( other: Flow, other2: Flow, - crossinline transform: suspend (T1, T2, T3) -> R + transform: suspend (T1, T2, T3) -> R ) = combine(this, other, other2, transform) @Deprecated( @@ -378,11 +378,11 @@ public inline fun Flow.combineLatest( message = "Flow analogue of 'combineLatest' is 'combine'", replaceWith = ReplaceWith("combine(this, other, other2, other3, transform)") ) -public inline fun Flow.combineLatest( +public fun Flow.combineLatest( other: Flow, other2: Flow, other3: Flow, - crossinline transform: suspend (T1, T2, T3, T4) -> R + transform: suspend (T1, T2, T3, T4) -> R ) = combine(this, other, other2, other3, transform) @Deprecated( @@ -390,12 +390,12 @@ public inline fun Flow.combineLatest( message = "Flow analogue of 'combineLatest' is 'combine'", replaceWith = ReplaceWith("combine(this, other, other2, other3, transform)") ) -public inline fun Flow.combineLatest( +public fun Flow.combineLatest( other: Flow, other2: Flow, other3: Flow, other4: Flow, - crossinline transform: suspend (T1, T2, T3, T4, T5) -> R + transform: suspend (T1, T2, T3, T4, T5) -> R ): Flow = combine(this, other, other2, other3, other4, transform) /** @@ -482,4 +482,4 @@ public fun Flow.replay(bufferSize: Int): Flow = noImpl() message = "Flow analogue of 'cache()' is 'shareIn' with unlimited replay and 'started = SharingStared.Lazily' argument'", replaceWith = ReplaceWith("this.shareIn(scope, Int.MAX_VALUE, started = SharingStared.Lazily)") ) -public fun Flow.cache(): Flow = noImpl() \ No newline at end of file +public fun Flow.cache(): Flow = noImpl() diff --git a/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt b/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt index e53ef35c45..f3730cc7fa 100644 --- a/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt +++ b/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt @@ -224,19 +224,33 @@ private class UndispatchedContextCollector( private val emitRef: suspend (T) -> Unit = { downstream.emit(it) } // allocate suspend function ref once on creation override suspend fun emit(value: T): Unit = - withContextUndispatched(emitContext, countOrElement, emitRef, value) + withContextUndispatched(emitContext, value, countOrElement, emitRef) } // Efficiently computes block(value) in the newContext -private suspend fun withContextUndispatched( +internal suspend fun withContextUndispatched( newContext: CoroutineContext, + value: V, countOrElement: Any = threadContextElements(newContext), // can be precomputed for speed - block: suspend (V) -> T, value: V + block: suspend (V) -> T ): T = suspendCoroutineUninterceptedOrReturn { uCont -> withCoroutineContext(newContext, countOrElement) { - block.startCoroutineUninterceptedOrReturn(value, Continuation(newContext) { - uCont.resumeWith(it) - }) + block.startCoroutineUninterceptedOrReturn(value, StackFrameContinuation(uCont, newContext)) } } + +// Continuation that links the caller with uCont with walkable CoroutineStackFrame +private class StackFrameContinuation( + private val uCont: Continuation, override val context: CoroutineContext +) : Continuation, CoroutineStackFrame { + + override val callerFrame: CoroutineStackFrame? + get() = uCont as? CoroutineStackFrame + + override fun resumeWith(result: Result) { + uCont.resumeWith(result) + } + + override fun getStackTraceElement(): StackTraceElement? = null +} diff --git a/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt b/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt index 67da32c9f9..bbdebd08b9 100644 --- a/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt +++ b/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt @@ -9,133 +9,135 @@ import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.* import kotlinx.coroutines.internal.* -import kotlinx.coroutines.selects.* +import kotlin.coroutines.* +import kotlin.coroutines.intrinsics.* -internal fun getNull(): Symbol = NULL // Workaround for JS BE bug - -internal suspend fun FlowCollector.combineTransformInternal( - first: Flow, second: Flow, - transform: suspend FlowCollector.(a: T1, b: T2) -> Unit -) { - coroutineScope { - val firstChannel = asFairChannel(first) - val secondChannel = asFairChannel(second) - var firstValue: Any? = null - var secondValue: Any? = null - var firstIsClosed = false - var secondIsClosed = false - while (!firstIsClosed || !secondIsClosed) { - select { - onReceive(firstIsClosed, firstChannel, { firstIsClosed = true }) { value -> - firstValue = value - if (secondValue !== null) { - transform(getNull().unbox(firstValue), getNull().unbox(secondValue) as T2) - } - } - - onReceive(secondIsClosed, secondChannel, { secondIsClosed = true }) { value -> - secondValue = value - if (firstValue !== null) { - transform(getNull().unbox(firstValue) as T1, getNull().unbox(secondValue) as T2) - } - } - } - } - } -} +private typealias Update = IndexedValue @PublishedApi internal suspend fun FlowCollector.combineInternal( flows: Array>, - arrayFactory: () -> Array, + arrayFactory: () -> Array?, // Array factory is required to workaround array typing on JVM transform: suspend FlowCollector.(Array) -> Unit -): Unit = coroutineScope { +): Unit = flowScope { // flow scope so any cancellation within the source flow will cancel the whole scope val size = flows.size - val channels = Array(size) { asFairChannel(flows[it]) } + if (size == 0) return@flowScope // bail-out for empty input val latestValues = arrayOfNulls(size) - val isClosed = Array(size) { false } - var nonClosed = size - var remainingNulls = size - // See flow.combine(other) for explanation. - while (nonClosed != 0) { - select { - for (i in 0 until size) { - onReceive(isClosed[i], channels[i], { isClosed[i] = true; --nonClosed }) { value -> - if (latestValues[i] == null) --remainingNulls - latestValues[i] = value - if (remainingNulls != 0) return@onReceive - val arguments = arrayFactory() - for (index in 0 until size) { - arguments[index] = NULL.unbox(latestValues[index]) - } - transform(arguments as Array) + latestValues.fill(UNINITIALIZED) // Smaller bytecode & faster that Array(size) { UNINITIALIZED } + val resultChannel = Channel(size) + val nonClosed = LocalAtomicInt(size) + var remainingAbsentValues = size + for (i in 0 until size) { + // Coroutine per flow that keeps track of its value and sends result to downstream + launch { + try { + flows[i].collect { value -> + resultChannel.send(Update(i, value)) + yield() // Emulate fairness, giving each flow chance to emit + } + } finally { + // Close the channel when there is no more flows + if (nonClosed.decrementAndGet() == 0) { + resultChannel.close() } } } } -} -private inline fun SelectBuilder.onReceive( - isClosed: Boolean, - channel: ReceiveChannel, - crossinline onClosed: () -> Unit, - noinline onReceive: suspend (value: Any) -> Unit -) { - if (isClosed) return - @Suppress("DEPRECATION") - channel.onReceiveOrNull { - // TODO onReceiveOrClosed when boxing issues are fixed - if (it === null) onClosed() - else onReceive(it) - } -} + /* + * Batch-receive optimization: read updates in batches, but bail-out + * as soon as we encountered two values from the same source + */ + val lastReceivedEpoch = ByteArray(size) + var currentEpoch: Byte = 0 + while (true) { + ++currentEpoch + // Start batch + // The very first receive in epoch should be suspending + var element = resultChannel.receiveOrNull() ?: break // Channel is closed, nothing to do here + while (true) { + val index = element.index + // Update values + val previous = latestValues[index] + latestValues[index] = element.value + if (previous === UNINITIALIZED) --remainingAbsentValues + // Check epoch + // Received the second value from the same flow in the same epoch -- bail out + if (lastReceivedEpoch[index] == currentEpoch) break + lastReceivedEpoch[index] = currentEpoch + element = resultChannel.poll() ?: break + } -// Channel has any type due to onReceiveOrNull. This will be fixed after receiveOrClosed -private fun CoroutineScope.asFairChannel(flow: Flow<*>): ReceiveChannel = produce { - val channel = channel as ChannelCoroutine - flow.collect { value -> - return@collect channel.sendFair(value ?: NULL) + // Process batch result if there is enough data + if (remainingAbsentValues == 0) { + /* + * If arrayFactory returns null, then we can avoid array copy because + * it's our own safe transformer that immediately deconstructs the array + */ + val results = arrayFactory() + if (results == null) { + transform(latestValues as Array) + } else { + (latestValues as Array).copyInto(results) + transform(results as Array) + } + } } } -internal fun zipImpl(flow: Flow, flow2: Flow, transform: suspend (T1, T2) -> R): Flow = unsafeFlow { - coroutineScope { - val first = asChannel(flow) - val second = asChannel(flow2) - /* - * This approach only works with rendezvous channel and is required to enforce correctness - * in the following scenario: - * ``` - * val f1 = flow { emit(1); delay(Long.MAX_VALUE) } - * val f2 = flowOf(1) - * f1.zip(f2) { ... } - * ``` - * - * Invariant: this clause is invoked only when all elements from the channel were processed (=> rendezvous restriction). - */ - (second as SendChannel<*>).invokeOnClose { - if (!first.isClosedForReceive) first.cancel(AbortFlowException(this@unsafeFlow)) - } +internal fun zipImpl(flow: Flow, flow2: Flow, transform: suspend (T1, T2) -> R): Flow = + unsafeFlow { + coroutineScope { + val second = produce { + flow2.collect { value -> + return@collect channel.send(value ?: NULL) + } + } - val otherIterator = second.iterator() - try { - first.consumeEach { value -> - if (!otherIterator.hasNext()) { - return@consumeEach + /* + * This approach only works with rendezvous channel and is required to enforce correctness + * in the following scenario: + * ``` + * val f1 = flow { emit(1); delay(Long.MAX_VALUE) } + * val f2 = flowOf(1) + * f1.zip(f2) { ... } + * ``` + * + * Invariant: this clause is invoked only when all elements from the channel were processed (=> rendezvous restriction). + */ + val collectJob = Job() + (second as SendChannel<*>).invokeOnClose { + // Optimization to avoid AFE allocation when the other flow is done + if (collectJob.isActive) collectJob.cancel(AbortFlowException(this@unsafeFlow)) + } + + try { + /* + * Non-trivial undispatched (because we are in the right context and there is no structured concurrency) + * hierarchy: + * -Outer coroutineScope that owns the whole zip process + * - First flow is collected by the child of coroutineScope, collectJob. + * So it can be safely cancelled as soon as the second flow is done + * - **But** the downstream MUST NOT be cancelled when the second flow is done, + * so we emit to downstream from coroutineScope job. + * Typically, such hierarchy requires coroutine for collector that communicates + * with coroutines scope via a channel, but it's way too expensive, so + * we are using this trick instead. + */ + val scopeContext = coroutineContext + val cnt = threadContextElements(scopeContext) + withContextUndispatched(coroutineContext + collectJob, Unit) { + flow.collect { value -> + withContextUndispatched(scopeContext, Unit, cnt) { + val otherValue = second.receiveOrNull() ?: throw AbortFlowException(this@unsafeFlow) + emit(transform(value, NULL.unbox(otherValue))) + } + } } - emit(transform(NULL.unbox(value), NULL.unbox(otherIterator.next()))) + } catch (e: AbortFlowException) { + e.checkOwnership(owner = this@unsafeFlow) + } finally { + if (!second.isClosedForReceive) second.cancel() } - } catch (e: AbortFlowException) { - e.checkOwnership(owner = this@unsafeFlow) - } finally { - if (!second.isClosedForReceive) second.cancel(AbortFlowException(this@unsafeFlow)) } } -} - -// Channel has any type due to onReceiveOrNull. This will be fixed after receiveOrClosed -private fun CoroutineScope.asChannel(flow: Flow<*>): ReceiveChannel = produce { - flow.collect { value -> - return@collect channel.send(value ?: NULL) - } -} diff --git a/kotlinx-coroutines-core/common/src/flow/internal/NullSurrogate.kt b/kotlinx-coroutines-core/common/src/flow/internal/NullSurrogate.kt index 22e1957419..f20deb2d38 100644 --- a/kotlinx-coroutines-core/common/src/flow/internal/NullSurrogate.kt +++ b/kotlinx-coroutines-core/common/src/flow/internal/NullSurrogate.kt @@ -11,11 +11,20 @@ import kotlin.native.concurrent.* /** * This value is used a a surrogate `null` value when needed. * It should never leak to the outside world. + * Its usage typically are paired with [Symbol.unbox] usages. */ @JvmField @SharedImmutable internal val NULL = Symbol("NULL") +/** + * Symbol to indicate that the value is not yet initialized. + * It should never leak to the outside world. + */ +@JvmField +@SharedImmutable +internal val UNINITIALIZED = Symbol("UNINITIALIZED") + /* * Symbol used to indicate that the flow is complete. * It should never leak to the outside world. diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt b/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt index ec661819f2..790c0895e4 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt @@ -8,7 +8,6 @@ package kotlinx.coroutines.flow -import kotlinx.coroutines.* import kotlinx.coroutines.flow.internal.* import kotlin.jvm.* import kotlinx.coroutines.flow.flow as safeFlow @@ -31,9 +30,7 @@ import kotlinx.coroutines.flow.internal.unsafeFlow as flow */ @JvmName("flowCombine") public fun Flow.combine(flow: Flow, transform: suspend (a: T1, b: T2) -> R): Flow = flow { - combineTransformInternal(this@combine, flow) { a, b -> - emit(transform(a, b)) - } + combineInternal(arrayOf(this@combine, flow), nullArrayFactory(), { emit(transform(it[0] as T1, it[1] as T2)) }) } /** @@ -75,10 +72,11 @@ public fun combine(flow: Flow, flow2: Flow, transform: suspe public fun Flow.combineTransform( flow: Flow, @BuilderInference transform: suspend FlowCollector.(a: T1, b: T2) -> Unit -): Flow = safeFlow { - combineTransformInternal(this@combineTransform, flow) { a, b -> - transform(a, b) - } +): Flow = combineTransformUnsafe(this, flow) { args: Array<*> -> + transform( + args[0] as T1, + args[1] as T2 + ) } /** @@ -102,7 +100,7 @@ public fun combineTransform( flow: Flow, flow2: Flow, @BuilderInference transform: suspend FlowCollector.(a: T1, b: T2) -> Unit -): Flow = combineTransform(flow, flow2) { args: Array<*> -> +): Flow = combineTransformUnsafe(flow, flow2) { args: Array<*> -> transform( args[0] as T1, args[1] as T2 @@ -113,12 +111,12 @@ public fun combineTransform( * Returns a [Flow] whose values are generated with [transform] function by combining * the most recently emitted values by each flow. */ -public inline fun combine( +public fun combine( flow: Flow, flow2: Flow, flow3: Flow, - @BuilderInference crossinline transform: suspend (T1, T2, T3) -> R -): Flow = combine(flow, flow2, flow3) { args: Array<*> -> + @BuilderInference transform: suspend (T1, T2, T3) -> R +): Flow = combineUnsafe(flow, flow2, flow3) { args: Array<*> -> transform( args[0] as T1, args[1] as T2, @@ -132,12 +130,12 @@ public inline fun combine( * The receiver of the [transform] is [FlowCollector] and thus `transform` is a * generic function that may transform emitted element, skip it or emit it multiple times. */ -public inline fun combineTransform( +public fun combineTransform( flow: Flow, flow2: Flow, flow3: Flow, - @BuilderInference crossinline transform: suspend FlowCollector.(T1, T2, T3) -> Unit -): Flow = combineTransform(flow, flow2, flow3) { args: Array<*> -> + @BuilderInference transform: suspend FlowCollector.(T1, T2, T3) -> Unit +): Flow = combineTransformUnsafe(flow, flow2, flow3) { args: Array<*> -> transform( args[0] as T1, args[1] as T2, @@ -149,12 +147,12 @@ public inline fun combineTransform( * Returns a [Flow] whose values are generated with [transform] function by combining * the most recently emitted values by each flow. */ -public inline fun combine( +public fun combine( flow: Flow, flow2: Flow, flow3: Flow, flow4: Flow, - crossinline transform: suspend (T1, T2, T3, T4) -> R + transform: suspend (T1, T2, T3, T4) -> R ): Flow = combine(flow, flow2, flow3, flow4) { args: Array<*> -> transform( args[0] as T1, @@ -170,13 +168,13 @@ public inline fun combine( * The receiver of the [transform] is [FlowCollector] and thus `transform` is a * generic function that may transform emitted element, skip it or emit it multiple times. */ -public inline fun combineTransform( +public fun combineTransform( flow: Flow, flow2: Flow, flow3: Flow, flow4: Flow, - @BuilderInference crossinline transform: suspend FlowCollector.(T1, T2, T3, T4) -> Unit -): Flow = combineTransform(flow, flow2, flow3, flow4) { args: Array<*> -> + @BuilderInference transform: suspend FlowCollector.(T1, T2, T3, T4) -> Unit +): Flow = combineTransformUnsafe(flow, flow2, flow3, flow4) { args: Array<*> -> transform( args[0] as T1, args[1] as T2, @@ -189,14 +187,14 @@ public inline fun combineTransform( * Returns a [Flow] whose values are generated with [transform] function by combining * the most recently emitted values by each flow. */ -public inline fun combine( +public fun combine( flow: Flow, flow2: Flow, flow3: Flow, flow4: Flow, flow5: Flow, - crossinline transform: suspend (T1, T2, T3, T4, T5) -> R -): Flow = combine(flow, flow2, flow3, flow4, flow5) { args: Array<*> -> + transform: suspend (T1, T2, T3, T4, T5) -> R +): Flow = combineUnsafe(flow, flow2, flow3, flow4, flow5) { args: Array<*> -> transform( args[0] as T1, args[1] as T2, @@ -212,14 +210,14 @@ public inline fun combine( * The receiver of the [transform] is [FlowCollector] and thus `transform` is a * generic function that may transform emitted element, skip it or emit it multiple times. */ -public inline fun combineTransform( +public fun combineTransform( flow: Flow, flow2: Flow, flow3: Flow, flow4: Flow, flow5: Flow, - @BuilderInference crossinline transform: suspend FlowCollector.(T1, T2, T3, T4, T5) -> Unit -): Flow = combineTransform(flow, flow2, flow3, flow4, flow5) { args: Array<*> -> + @BuilderInference transform: suspend FlowCollector.(T1, T2, T3, T4, T5) -> Unit +): Flow = combineTransformUnsafe(flow, flow2, flow3, flow4, flow5) { args: Array<*> -> transform( args[0] as T1, args[1] as T2, @@ -253,6 +251,31 @@ public inline fun combineTransform( combineInternal(flows, { arrayOfNulls(flows.size) }, { transform(it) }) } +/* + * Same as combine, but does not copy array each time, deconstructing existing + * array each time. Used in overloads that accept FunctionN instead of Function> + */ +private inline fun combineUnsafe( + vararg flows: Flow, + crossinline transform: suspend (Array) -> R +): Flow = flow { + combineInternal(flows, nullArrayFactory(), { emit(transform(it)) }) +} + +/* + * Same as combineTransform, but does not copy array each time, deconstructing existing + * array each time. Used in overloads that accept FunctionN instead of Function> + */ +private inline fun combineTransformUnsafe( + vararg flows: Flow, + @BuilderInference crossinline transform: suspend FlowCollector.(Array) -> Unit +): Flow = safeFlow { + combineInternal(flows, nullArrayFactory(), { transform(it) }) +} + +// Saves bunch of anonymous classes +private fun nullArrayFactory(): () -> Array? = { null } + /** * Returns a [Flow] whose values are generated with [transform] function by combining * the most recently emitted values by each flow. @@ -298,5 +321,11 @@ public inline fun combineTransform( * println(it) // Will print "1a 2b 3c" * } * ``` + * + * ### Buffering + * + * The upstream flow is collected sequentially in the same coroutine without any buffering, while the + * [other] flow is collected concurrently as if `buffer(0)` is used. See documentation in the [buffer] operator + * for explanation. You can use additional calls to the [buffer] operator as needed for more concurrency. */ public fun Flow.zip(other: Flow, transform: suspend (T1, T2) -> R): Flow = zipImpl(this, other, transform) diff --git a/kotlinx-coroutines-core/common/src/internal/LocalAtomics.common.kt b/kotlinx-coroutines-core/common/src/internal/LocalAtomics.common.kt new file mode 100644 index 0000000000..bcfb932de3 --- /dev/null +++ b/kotlinx-coroutines-core/common/src/internal/LocalAtomics.common.kt @@ -0,0 +1,21 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.internal + +/* + * These are atomics that are used as local variables + * where atomicfu doesn't support its tranformations. + * + * Have `Local` prefix to avoid AFU clashes during star-imports + */ +internal expect class LocalAtomicInt(value: Int) { + fun get(): Int + fun set(value: Int) + fun decrementAndGet(): Int +} + +internal inline var LocalAtomicInt.value + get() = get() + set(value) = set(value) diff --git a/kotlinx-coroutines-core/common/test/flow/operators/CombineParametersTestBase.kt b/kotlinx-coroutines-core/common/test/flow/operators/CombineParametersTestBase.kt index b51197e395..8c65ea4fe7 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/CombineParametersTestBase.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/CombineParametersTestBase.kt @@ -161,4 +161,33 @@ class CombineParametersTest : TestBase() { }.singleOrNull() assertNull(value) } + + @Test + fun testFairnessInVariousConfigurations() = runTest { + // Test various configurations + for (flowsCount in 2..5) { + for (flowSize in 1..5) { + val flows = List(flowsCount) { (1..flowSize).asFlow() } + val combined = combine(flows) { it.joinToString(separator = "") }.toList() + val expected = List(flowSize) { (it + 1).toString().repeat(flowsCount) } + assertEquals(expected, combined, "Count: $flowsCount, size: $flowSize") + } + } + } + + @Test + fun testEpochOverflow() = runTest { + val flow = (0..1023).asFlow() + val result = flow.combine(flow) { a, b -> a + b }.toList() + assertEquals(List(1024) { it * 2 } , result) + } + + @Test + fun testArrayType() = runTest { + val arr = flowOf(1) + combine(listOf(arr, arr)) { + println(it[0]) + it[0] + }.toList().also { println(it) } + } } diff --git a/kotlinx-coroutines-core/common/test/flow/operators/CombineTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/CombineTest.kt index 2893321998..5e2926d082 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/CombineTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/CombineTest.kt @@ -1,10 +1,11 @@ /* * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ - -package kotlinx.coroutines.flow +@file:Suppress("UNCHECKED_CAST") +package kotlinx.coroutines.flow.operators import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* import kotlin.test.* import kotlinx.coroutines.flow.combine as combineOriginal import kotlinx.coroutines.flow.combineTransform as combineTransformOriginal @@ -21,28 +22,28 @@ abstract class CombineTestBase : TestBase() { val flow = flowOf("a", "b", "c") val flow2 = flowOf(1, 2, 3) val list = flow.combineLatest(flow2) { i, j -> i + j }.toList() - assertEquals(listOf("a1", "b1", "b2", "c2", "c3"), list) + assertEquals(listOf("a1", "b2", "c3"), list) } @Test fun testNulls() = runTest { val flow = flowOf("a", null, null) val flow2 = flowOf(1, 2, 3) - val list = flow.combineLatest(flow2, { i, j -> i + j }).toList() - assertEquals(listOf("a1", "null1", "null2", "null2", "null3"), list) + val list = flow.combineLatest(flow2) { i, j -> i + j }.toList() + assertEquals(listOf("a1", "null2", "null3"), list) } @Test fun testNullsOther() = runTest { val flow = flowOf("a", "b", "c") val flow2 = flowOf(null, 2, null) - val list = flow.combineLatest(flow2, { i, j -> i + j }).toList() - assertEquals(listOf("anull", "bnull", "b2", "c2", "cnull"), list) + val list = flow.combineLatest(flow2) { i, j -> i + j }.toList() + assertEquals(listOf("anull", "b2", "cnull"), list) } @Test fun testEmptyFlow() = runTest { - val flow = emptyFlow().combineLatest(emptyFlow(), { i, j -> i + j }) + val flow = emptyFlow().combineLatest(emptyFlow()) { i, j -> i + j } assertNull(flow.singleOrNull()) } @@ -208,12 +209,12 @@ abstract class CombineTestBase : TestBase() { } val f2 = flow { emit(1) - hang { expect(3) } + expectUnreached() } - val flow = f1.combineLatest(f2, { _, _ -> 1 }).onEach { expect(2) } + val flow = f1.combineLatest(f2) { _, _ -> 1 }.onEach { expect(2) } assertFailsWith(flow) - finish(4) + finish(3) } @Test @@ -229,7 +230,7 @@ abstract class CombineTestBase : TestBase() { hang { expect(6) } } - val flow = f1.combineLatest(f2, { _, _ -> 1 }).onEach { + val flow = f1.combineLatest(f2) { _, _ -> 1 }.onEach { expect(1) yield() expect(4) @@ -248,7 +249,7 @@ abstract class CombineTestBase : TestBase() { emit(Unit) // emit } cancel() // cancel the scope - flow.combineLatest(flow) { u, _ -> u }.collect { + flow.combineLatest(flow) { _, _ -> }.collect { // should not be reached, because cancelled before it runs expectUnreached() } @@ -265,15 +266,26 @@ class CombineTransformTest : CombineTestBase() { emit(transform(a, b)) } } +// Array null-out is an additional test for our array elimination optimization class CombineVarargAdapterTest : CombineTestBase() { override fun Flow.combineLatest(other: Flow, transform: suspend (T1, T2) -> R): Flow = - combineOriginal(this, other) { args: Array -> transform(args[0] as T1, args[1] as T2) } + combineOriginal(this, other) { args: Array -> + transform(args[0] as T1, args[1] as T2).also { + args[0] = null + args[1] = null + } + } } class CombineIterableTest : CombineTestBase() { override fun Flow.combineLatest(other: Flow, transform: suspend (T1, T2) -> R): Flow = - combineOriginal(listOf(this, other)) { args -> transform(args[0] as T1, args[1] as T2) } + combineOriginal(listOf(this, other)) { args -> + transform(args[0] as T1, args[1] as T2).also { + args[0] = null + args[1] = null + } + } } class CombineTransformAdapterTest : CombineTestBase() { @@ -283,11 +295,20 @@ class CombineTransformAdapterTest : CombineTestBase() { class CombineTransformVarargAdapterTest : CombineTestBase() { override fun Flow.combineLatest(other: Flow, transform: suspend (T1, T2) -> R): Flow = - combineTransformOriginal(this, other) { args: Array -> emit(transform(args[0] as T1, args[1] as T2)) } + combineTransformOriginal(this, other) { args: Array -> + emit(transform(args[0] as T1, args[1] as T2)) // Mess up with array + args[0] = null + args[1] = null + } } class CombineTransformIterableTest : CombineTestBase() { override fun Flow.combineLatest(other: Flow, transform: suspend (T1, T2) -> R): Flow = - combineTransformOriginal(listOf(this, other)) { args -> emit(transform(args[0] as T1, args[1] as T2)) } + combineTransformOriginal(listOf(this, other)) { args -> + emit(transform(args[0] as T1, args[1] as T2)) + // Mess up with array + args[0] = null + args[1] = null + } } diff --git a/kotlinx-coroutines-core/common/test/flow/operators/ZipTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/ZipTest.kt index 5f2b5a74cd..02dbfc40d9 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/ZipTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/ZipTest.kt @@ -23,7 +23,7 @@ class ZipTest : TestBase() { fun testUnevenZip() = runTest { val f1 = flowOf("a", "b", "c", "d", "e") val f2 = flowOf(1, 2, 3) - assertEquals(listOf("a1", "b2", "c3"), f1.zip(f2, { i, j -> i + j }).toList()) + assertEquals(listOf("a1", "b2", "c3"), f1.zip(f2) { i, j -> i + j }.toList()) assertEquals(listOf("a1", "b2", "c3"), f2.zip(f1) { i, j -> j + i }.toList()) } @@ -67,14 +67,35 @@ class ZipTest : TestBase() { val f1 = flow { emit("1") emit("2") - expectUnreached() // the above emit will get cancelled because f2 ends } - val f2 = flowOf("a", "b") + val f2 = flow { + emit("a") + emit("b") + expectUnreached() + } assertEquals(listOf("1a", "2b"), f1.zip(f2) { s1, s2 -> s1 + s2 }.toList()) finish(1) } + @Test + fun testCancelWhenFlowIsDone2() = runTest { + val f1 = flow { + emit("1") + emit("2") + try { + emit("3") + expectUnreached() + } finally { + expect(1) + } + } + + val f2 = flowOf("a", "b") + assertEquals(listOf("1a", "2b"), f1.zip(f2) { s1, s2 -> s1 + s2 }.toList()) + finish(2) + } + @Test fun testCancelWhenFlowIsDoneReversed() = runTest { val f1 = flow { @@ -85,7 +106,12 @@ class ZipTest : TestBase() { } } - val f2 = flowOf("a", "b") + val f2 = flow { + emit("a") + emit("b") + yield() + } + assertEquals(listOf("a1", "b2"), f2.zip(f1) { s1, s2 -> s1 + s2 }.toList()) finish(2) } @@ -95,19 +121,19 @@ class ZipTest : TestBase() { val f1 = flow { emit("a") assertEquals("first", NamedDispatchers.name()) - expect(1) + expect(3) }.flowOn(NamedDispatchers("first")).onEach { assertEquals("with", NamedDispatchers.name()) - expect(2) + expect(4) }.flowOn(NamedDispatchers("with")) val f2 = flow { emit(1) assertEquals("second", NamedDispatchers.name()) - expect(3) + expect(1) }.flowOn(NamedDispatchers("second")).onEach { assertEquals("nested", NamedDispatchers.name()) - expect(4) + expect(2) }.flowOn(NamedDispatchers("nested")) val value = withContext(NamedDispatchers("main")) { @@ -127,14 +153,14 @@ class ZipTest : TestBase() { val f1 = flow { emit("a") hang { - expect(2) + expect(3) } }.flowOn(NamedDispatchers("first")) val f2 = flow { emit(1) hang { - expect(3) + expect(2) } }.flowOn(NamedDispatchers("second")) @@ -174,19 +200,18 @@ class ZipTest : TestBase() { val f1 = flow { expect(1) emit(1) - yield() - expect(4) + expect(5) throw CancellationException("") } val f2 = flow { expect(2) emit(1) - expect(5) + expect(3) hang { expect(6) } } - val flow = f1.zip(f2, { _, _ -> 1 }).onEach { expect(3) } + val flow = f1.zip(f2) { _, _ -> 1 }.onEach { expect(4) } assertFailsWith(flow) finish(7) } @@ -196,24 +221,37 @@ class ZipTest : TestBase() { val f1 = flow { expect(1) emit(1) - yield() - expect(4) - hang { expect(6) } + expectUnreached() // Will throw CE } val f2 = flow { expect(2) emit(1) - expect(5) - hang { expect(7) } + expect(3) + hang { expect(5) } } val flow = f1.zip(f2, { _, _ -> 1 }).onEach { - expect(3) + expect(4) yield() throw CancellationException("") } assertFailsWith(flow) - finish(8) + finish(6) + } + + @Test + fun testCancellationOfCollector() = runTest { + val f1 = flow { + emit("1") + awaitCancellation() + } + + val f2 = flow { + emit("2") + yield() + } + + f1.zip(f2) { a, b -> a + b }.collect { } } } diff --git a/kotlinx-coroutines-core/js/src/internal/LocalAtomics.kt b/kotlinx-coroutines-core/js/src/internal/LocalAtomics.kt new file mode 100644 index 0000000000..fffd76c452 --- /dev/null +++ b/kotlinx-coroutines-core/js/src/internal/LocalAtomics.kt @@ -0,0 +1,15 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.internal + +internal actual class LocalAtomicInt actual constructor(private var value: Int) { + actual fun set(value: Int) { + this.value = value + } + + actual fun get(): Int = value + + actual fun decrementAndGet(): Int = --value +} diff --git a/kotlinx-coroutines-core/jvm/src/flow/internal/SafeCollector.kt b/kotlinx-coroutines-core/jvm/src/flow/internal/SafeCollector.kt index a8e04f0f16..b275a481cc 100644 --- a/kotlinx-coroutines-core/jvm/src/flow/internal/SafeCollector.kt +++ b/kotlinx-coroutines-core/jvm/src/flow/internal/SafeCollector.kt @@ -21,7 +21,11 @@ private val emitFun = internal actual class SafeCollector actual constructor( @JvmField internal actual val collector: FlowCollector, @JvmField internal actual val collectContext: CoroutineContext -) : FlowCollector, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext) { +) : FlowCollector, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext), CoroutineStackFrame { + + override val callerFrame: CoroutineStackFrame? get() = completion as? CoroutineStackFrame + + override fun getStackTraceElement(): StackTraceElement? = null @JvmField // Note, it is non-capturing lambda, so no extra allocation during init of SafeCollector internal actual val collectContextSize = collectContext.fold(0) { count, _ -> count + 1 } @@ -51,6 +55,7 @@ internal actual class SafeCollector actual constructor( */ override suspend fun emit(value: T) { return suspendCoroutineUninterceptedOrReturn sc@{ uCont -> + // Update information about caller for stackwalking try { emit(uCont, value) } catch (e: Throwable) { diff --git a/kotlinx-coroutines-core/jvm/src/internal/LocalAtomics.kt b/kotlinx-coroutines-core/jvm/src/internal/LocalAtomics.kt new file mode 100644 index 0000000000..f508749ed0 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/src/internal/LocalAtomics.kt @@ -0,0 +1,8 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.internal + +@Suppress("ACTUAL_WITHOUT_EXPECT") +internal actual typealias LocalAtomicInt = java.util.concurrent.atomic.AtomicInteger diff --git a/kotlinx-coroutines-core/native/src/internal/LocalAtomics.kt b/kotlinx-coroutines-core/native/src/internal/LocalAtomics.kt new file mode 100644 index 0000000000..398cb63bc2 --- /dev/null +++ b/kotlinx-coroutines-core/native/src/internal/LocalAtomics.kt @@ -0,0 +1,20 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.internal + +import kotlinx.atomicfu.* + +internal actual class LocalAtomicInt actual constructor(value: Int) { + + private val iRef = atomic(value) + + actual fun set(value: Int) { + iRef.value = value + } + + actual fun get(): Int = iRef.value + + actual fun decrementAndGet(): Int = iRef.decrementAndGet() +} diff --git a/kotlinx-coroutines-debug/test/WithContextUndispatchedTest.kt b/kotlinx-coroutines-debug/test/WithContextUndispatchedTest.kt new file mode 100644 index 0000000000..e803c980cf --- /dev/null +++ b/kotlinx-coroutines-debug/test/WithContextUndispatchedTest.kt @@ -0,0 +1,67 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ +package kotlinx.coroutines.debug + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import org.junit.* + +// Test four our internal optimization "withContextUndispatched" +class WithContextUndispatchedTest : DebugTestBase() { + + @Test + fun testZip() = runTest { + val f1 = flowOf("a") + val f2 = flow { + nestedEmit() + yield() + } + f1.zip(f2) { i, j -> i + j }.collect { + bar(false) + } + } + + private suspend fun FlowCollector.nestedEmit() { + emit(1) + emit(2) + } + + @Test + fun testUndispatchedFlowOn() = runTest { + val flow = flowOf(1, 2, 3).flowOn(CoroutineName("...")) + flow.collect { + bar(true) + } + } + + @Test + fun testUndispatchedFlowOnWithNestedCaller() = runTest { + val flow = flow { + nestedEmit() + }.flowOn(CoroutineName("...")) + flow.collect { + bar(true) + } + } + + private suspend fun bar(forFlowOn: Boolean) { + yield() + if (forFlowOn) { + verifyFlowOn() + } else { + verifyZip() + } + yield() + } + + private suspend fun verifyFlowOn() { + yield() // suspend + verifyPartialDump(1, "verifyFlowOn", "bar") + } + + private suspend fun verifyZip() { + yield() // suspend + verifyPartialDump(2, "verifyZip", "bar", "nestedEmit") + } +} diff --git a/settings.gradle b/settings.gradle index d22d65fd25..3a07891799 100644 --- a/settings.gradle +++ b/settings.gradle @@ -8,7 +8,7 @@ pluginManagement { // JMH id "net.ltgt.apt" version "0.21" - id "me.champeau.gradle.jmh" version "0.5.0" + id "me.champeau.gradle.jmh" version "0.5.2" } }