From 5f0164197b4b14a40aeb332b16183c72ce1c5063 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Mon, 19 Oct 2020 18:05:14 +0300 Subject: [PATCH] ~more small optimizations * Do not allocate array on each transform call in combine, do it only for combineTransform * Simplify zip operator even further * Get rid of crossinline in combine to fix weird Android crashes Fixes #1743 Fixes #1683 --- .../common/src/flow/Migration.kt | 14 ++-- .../common/src/flow/internal/Combine.kt | 43 ++++++------ .../common/src/flow/operators/Zip.kt | 65 +++++++++++++------ .../common/test/flow/operators/CombineTest.kt | 42 ++++++++---- 4 files changed, 106 insertions(+), 58 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/flow/Migration.kt b/kotlinx-coroutines-core/common/src/flow/Migration.kt index 59873eba5f..490e88265c 100644 --- a/kotlinx-coroutines-core/common/src/flow/Migration.kt +++ b/kotlinx-coroutines-core/common/src/flow/Migration.kt @@ -367,10 +367,10 @@ public fun Flow.combineLatest(other: Flow, transform: suspen message = "Flow analogue of 'combineLatest' is 'combine'", replaceWith = ReplaceWith("combine(this, other, other2, transform)") ) -public inline fun Flow.combineLatest( +public fun Flow.combineLatest( other: Flow, other2: Flow, - crossinline transform: suspend (T1, T2, T3) -> R + transform: suspend (T1, T2, T3) -> R ) = combine(this, other, other2, transform) @Deprecated( @@ -378,11 +378,11 @@ public inline fun Flow.combineLatest( message = "Flow analogue of 'combineLatest' is 'combine'", replaceWith = ReplaceWith("combine(this, other, other2, other3, transform)") ) -public inline fun Flow.combineLatest( +public fun Flow.combineLatest( other: Flow, other2: Flow, other3: Flow, - crossinline transform: suspend (T1, T2, T3, T4) -> R + transform: suspend (T1, T2, T3, T4) -> R ) = combine(this, other, other2, other3, transform) @Deprecated( @@ -390,12 +390,12 @@ public inline fun Flow.combineLatest( message = "Flow analogue of 'combineLatest' is 'combine'", replaceWith = ReplaceWith("combine(this, other, other2, other3, transform)") ) -public inline fun Flow.combineLatest( +public fun Flow.combineLatest( other: Flow, other2: Flow, other3: Flow, other4: Flow, - crossinline transform: suspend (T1, T2, T3, T4, T5) -> R + transform: suspend (T1, T2, T3, T4, T5) -> R ): Flow = combine(this, other, other2, other3, other4, transform) /** @@ -482,4 +482,4 @@ public fun Flow.replay(bufferSize: Int): Flow = noImpl() message = "Flow analogue of 'cache()' is 'shareIn' with unlimited replay and 'started = SharingStared.Lazily' argument'", replaceWith = ReplaceWith("this.shareIn(scope, Int.MAX_VALUE, started = SharingStared.Lazily)") ) -public fun Flow.cache(): Flow = noImpl() \ No newline at end of file +public fun Flow.cache(): Flow = noImpl() diff --git a/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt b/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt index 7ab3ae9163..a7d1890334 100644 --- a/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt +++ b/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt @@ -17,14 +17,13 @@ private typealias Update = IndexedValue @PublishedApi internal suspend fun FlowCollector.combineInternal( flows: Array>, - arrayFactory: () -> Array, // Array factory is required to workaround array typing on JVM + arrayFactory: () -> Array?, // Array factory is required to workaround array typing on JVM transform: suspend FlowCollector.(Array) -> Unit ): Unit = flowScope { // flow scope so any cancellation within the source flow will cancel the whole scope val size = flows.size if (size == 0) return@flowScope // bail-out for empty input val latestValues = arrayOfNulls(size) latestValues.fill(UNINITIALIZED) // Smaller bytecode & faster that Array(size) { UNINITIALIZED } - val isClosed = BooleanArray(size) val resultChannel = Channel(flows.size) val nonClosed = LocalAtomicInt(size) var remainingAbsentValues = size @@ -37,7 +36,6 @@ internal suspend fun FlowCollector.combineInternal( yield() // Emulate fairness, giving each flow chance to emit } } finally { - isClosed[i] = true // Close the channel when there is no more flows if (nonClosed.decrementAndGet() == 0) { resultChannel.close() @@ -72,9 +70,17 @@ internal suspend fun FlowCollector.combineInternal( // Process batch result if there is enough data if (remainingAbsentValues == 0) { + /* + * If arrayFactory returns null, then we can avoid array copy because + * it's our own safe transformer that immediately deconstructs the array + */ val results = arrayFactory() - (latestValues as Array).copyInto(results) - transform(results as Array) + if (results == null) { + transform(latestValues as Array) + } else { + (latestValues as Array).copyInto(results) + transform(results as Array) + } } } } @@ -82,7 +88,12 @@ internal suspend fun FlowCollector.combineInternal( internal fun zipImpl(flow: Flow, flow2: Flow, transform: suspend (T1, T2) -> R): Flow = unsafeFlow { coroutineScope { - val second = asChannel(flow2) + val second = produce { + flow2.collect { value -> + return@collect channel.send(value ?: NULL) + } + } + /* * This approach only works with rendezvous channel and is required to enforce correctness * in the following scenario: @@ -95,14 +106,11 @@ internal fun zipImpl(flow: Flow, flow2: Flow, transform: sus * Invariant: this clause is invoked only when all elements from the channel were processed (=> rendezvous restriction). */ val collectJob = Job() - val scopeJob = currentCoroutineContext()[Job]!! // TODO replace with extension when #2245 is here (second as SendChannel<*>).invokeOnClose { // Optimization to avoid AFE allocation when the other flow is done if (collectJob.isActive) collectJob.cancel(AbortFlowException(this@unsafeFlow)) } - val newContext = coroutineContext + scopeJob - val cnt = threadContextElements(newContext) try { /* * Non-trivial undispatched (because we are in the right context and there is no structured concurrency) @@ -116,18 +124,20 @@ internal fun zipImpl(flow: Flow, flow2: Flow, transform: sus * with coroutines scope via a channel, but it's way too expensive, so * we are using this trick instead. */ - withContextUndispatched( coroutineContext + collectJob) { + val scopeContext = coroutineContext + val cnt = threadContextElements(scopeContext) + withContextUndispatched(coroutineContext + collectJob) { flow.collect { value -> - withContextUndispatched(newContext, cnt) { + withContextUndispatched(scopeContext, cnt) { val otherValue = second.receiveOrNull() ?: throw AbortFlowException(this@unsafeFlow) - emit(transform(NULL.unbox(value), NULL.unbox(otherValue))) + emit(transform(value, NULL.unbox(otherValue))) } } } } catch (e: AbortFlowException) { e.checkOwnership(owner = this@unsafeFlow) } finally { - if (!second.isClosedForReceive) second.cancel(AbortFlowException(this@unsafeFlow)) + if (!second.isClosedForReceive) second.cancel() } } } @@ -144,10 +154,3 @@ private suspend fun withContextUndispatched( }) } } - -// Channel has any type due to onReceiveOrNull. This will be fixed after receiveOrClosed -private fun CoroutineScope.asChannel(flow: Flow<*>): ReceiveChannel = produce { - flow.collect { value -> - return@collect channel.send(value ?: NULL) - } -} diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt b/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt index 346c4f65d5..790c0895e4 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt @@ -30,7 +30,7 @@ import kotlinx.coroutines.flow.internal.unsafeFlow as flow */ @JvmName("flowCombine") public fun Flow.combine(flow: Flow, transform: suspend (a: T1, b: T2) -> R): Flow = flow { - combineInternal(arrayOf(this@combine, flow), { arrayOfNulls(2) }, { emit(transform(it[0] as T1, it[1] as T2)) }) + combineInternal(arrayOf(this@combine, flow), nullArrayFactory(), { emit(transform(it[0] as T1, it[1] as T2)) }) } /** @@ -72,7 +72,7 @@ public fun combine(flow: Flow, flow2: Flow, transform: suspe public fun Flow.combineTransform( flow: Flow, @BuilderInference transform: suspend FlowCollector.(a: T1, b: T2) -> Unit -): Flow = combineTransform(this, flow) { args: Array<*> -> +): Flow = combineTransformUnsafe(this, flow) { args: Array<*> -> transform( args[0] as T1, args[1] as T2 @@ -100,7 +100,7 @@ public fun combineTransform( flow: Flow, flow2: Flow, @BuilderInference transform: suspend FlowCollector.(a: T1, b: T2) -> Unit -): Flow = combineTransform(flow, flow2) { args: Array<*> -> +): Flow = combineTransformUnsafe(flow, flow2) { args: Array<*> -> transform( args[0] as T1, args[1] as T2 @@ -111,12 +111,12 @@ public fun combineTransform( * Returns a [Flow] whose values are generated with [transform] function by combining * the most recently emitted values by each flow. */ -public inline fun combine( +public fun combine( flow: Flow, flow2: Flow, flow3: Flow, - @BuilderInference crossinline transform: suspend (T1, T2, T3) -> R -): Flow = combine(flow, flow2, flow3) { args: Array<*> -> + @BuilderInference transform: suspend (T1, T2, T3) -> R +): Flow = combineUnsafe(flow, flow2, flow3) { args: Array<*> -> transform( args[0] as T1, args[1] as T2, @@ -130,12 +130,12 @@ public inline fun combine( * The receiver of the [transform] is [FlowCollector] and thus `transform` is a * generic function that may transform emitted element, skip it or emit it multiple times. */ -public inline fun combineTransform( +public fun combineTransform( flow: Flow, flow2: Flow, flow3: Flow, - @BuilderInference crossinline transform: suspend FlowCollector.(T1, T2, T3) -> Unit -): Flow = combineTransform(flow, flow2, flow3) { args: Array<*> -> + @BuilderInference transform: suspend FlowCollector.(T1, T2, T3) -> Unit +): Flow = combineTransformUnsafe(flow, flow2, flow3) { args: Array<*> -> transform( args[0] as T1, args[1] as T2, @@ -147,12 +147,12 @@ public inline fun combineTransform( * Returns a [Flow] whose values are generated with [transform] function by combining * the most recently emitted values by each flow. */ -public inline fun combine( +public fun combine( flow: Flow, flow2: Flow, flow3: Flow, flow4: Flow, - crossinline transform: suspend (T1, T2, T3, T4) -> R + transform: suspend (T1, T2, T3, T4) -> R ): Flow = combine(flow, flow2, flow3, flow4) { args: Array<*> -> transform( args[0] as T1, @@ -168,13 +168,13 @@ public inline fun combine( * The receiver of the [transform] is [FlowCollector] and thus `transform` is a * generic function that may transform emitted element, skip it or emit it multiple times. */ -public inline fun combineTransform( +public fun combineTransform( flow: Flow, flow2: Flow, flow3: Flow, flow4: Flow, - @BuilderInference crossinline transform: suspend FlowCollector.(T1, T2, T3, T4) -> Unit -): Flow = combineTransform(flow, flow2, flow3, flow4) { args: Array<*> -> + @BuilderInference transform: suspend FlowCollector.(T1, T2, T3, T4) -> Unit +): Flow = combineTransformUnsafe(flow, flow2, flow3, flow4) { args: Array<*> -> transform( args[0] as T1, args[1] as T2, @@ -187,14 +187,14 @@ public inline fun combineTransform( * Returns a [Flow] whose values are generated with [transform] function by combining * the most recently emitted values by each flow. */ -public inline fun combine( +public fun combine( flow: Flow, flow2: Flow, flow3: Flow, flow4: Flow, flow5: Flow, - crossinline transform: suspend (T1, T2, T3, T4, T5) -> R -): Flow = combine(flow, flow2, flow3, flow4, flow5) { args: Array<*> -> + transform: suspend (T1, T2, T3, T4, T5) -> R +): Flow = combineUnsafe(flow, flow2, flow3, flow4, flow5) { args: Array<*> -> transform( args[0] as T1, args[1] as T2, @@ -210,14 +210,14 @@ public inline fun combine( * The receiver of the [transform] is [FlowCollector] and thus `transform` is a * generic function that may transform emitted element, skip it or emit it multiple times. */ -public inline fun combineTransform( +public fun combineTransform( flow: Flow, flow2: Flow, flow3: Flow, flow4: Flow, flow5: Flow, - @BuilderInference crossinline transform: suspend FlowCollector.(T1, T2, T3, T4, T5) -> Unit -): Flow = combineTransform(flow, flow2, flow3, flow4, flow5) { args: Array<*> -> + @BuilderInference transform: suspend FlowCollector.(T1, T2, T3, T4, T5) -> Unit +): Flow = combineTransformUnsafe(flow, flow2, flow3, flow4, flow5) { args: Array<*> -> transform( args[0] as T1, args[1] as T2, @@ -251,6 +251,31 @@ public inline fun combineTransform( combineInternal(flows, { arrayOfNulls(flows.size) }, { transform(it) }) } +/* + * Same as combine, but does not copy array each time, deconstructing existing + * array each time. Used in overloads that accept FunctionN instead of Function> + */ +private inline fun combineUnsafe( + vararg flows: Flow, + crossinline transform: suspend (Array) -> R +): Flow = flow { + combineInternal(flows, nullArrayFactory(), { emit(transform(it)) }) +} + +/* + * Same as combineTransform, but does not copy array each time, deconstructing existing + * array each time. Used in overloads that accept FunctionN instead of Function> + */ +private inline fun combineTransformUnsafe( + vararg flows: Flow, + @BuilderInference crossinline transform: suspend FlowCollector.(Array) -> Unit +): Flow = safeFlow { + combineInternal(flows, nullArrayFactory(), { transform(it) }) +} + +// Saves bunch of anonymous classes +private fun nullArrayFactory(): () -> Array? = { null } + /** * Returns a [Flow] whose values are generated with [transform] function by combining * the most recently emitted values by each flow. diff --git a/kotlinx-coroutines-core/common/test/flow/operators/CombineTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/CombineTest.kt index 0677c2bf18..5e2926d082 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/CombineTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/CombineTest.kt @@ -1,7 +1,7 @@ /* * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ - +@file:Suppress("UNCHECKED_CAST") package kotlinx.coroutines.flow.operators import kotlinx.coroutines.* @@ -29,7 +29,7 @@ abstract class CombineTestBase : TestBase() { fun testNulls() = runTest { val flow = flowOf("a", null, null) val flow2 = flowOf(1, 2, 3) - val list = flow.combineLatest(flow2, { i, j -> i + j }).toList() + val list = flow.combineLatest(flow2) { i, j -> i + j }.toList() assertEquals(listOf("a1", "null2", "null3"), list) } @@ -37,13 +37,13 @@ abstract class CombineTestBase : TestBase() { fun testNullsOther() = runTest { val flow = flowOf("a", "b", "c") val flow2 = flowOf(null, 2, null) - val list = flow.combineLatest(flow2, { i, j -> i + j }).toList() + val list = flow.combineLatest(flow2) { i, j -> i + j }.toList() assertEquals(listOf("anull", "b2", "cnull"), list) } @Test fun testEmptyFlow() = runTest { - val flow = emptyFlow().combineLatest(emptyFlow(), { i, j -> i + j }) + val flow = emptyFlow().combineLatest(emptyFlow()) { i, j -> i + j } assertNull(flow.singleOrNull()) } @@ -212,7 +212,7 @@ abstract class CombineTestBase : TestBase() { expectUnreached() } - val flow = f1.combineLatest(f2, { _, _ -> 1 }).onEach { expect(2) } + val flow = f1.combineLatest(f2) { _, _ -> 1 }.onEach { expect(2) } assertFailsWith(flow) finish(3) } @@ -230,7 +230,7 @@ abstract class CombineTestBase : TestBase() { hang { expect(6) } } - val flow = f1.combineLatest(f2, { _, _ -> 1 }).onEach { + val flow = f1.combineLatest(f2) { _, _ -> 1 }.onEach { expect(1) yield() expect(4) @@ -249,7 +249,7 @@ abstract class CombineTestBase : TestBase() { emit(Unit) // emit } cancel() // cancel the scope - flow.combineLatest(flow) { u, _ -> u }.collect { + flow.combineLatest(flow) { _, _ -> }.collect { // should not be reached, because cancelled before it runs expectUnreached() } @@ -266,15 +266,26 @@ class CombineTransformTest : CombineTestBase() { emit(transform(a, b)) } } +// Array null-out is an additional test for our array elimination optimization class CombineVarargAdapterTest : CombineTestBase() { override fun Flow.combineLatest(other: Flow, transform: suspend (T1, T2) -> R): Flow = - combineOriginal(this, other) { args: Array -> transform(args[0] as T1, args[1] as T2) } + combineOriginal(this, other) { args: Array -> + transform(args[0] as T1, args[1] as T2).also { + args[0] = null + args[1] = null + } + } } class CombineIterableTest : CombineTestBase() { override fun Flow.combineLatest(other: Flow, transform: suspend (T1, T2) -> R): Flow = - combineOriginal(listOf(this, other)) { args -> transform(args[0] as T1, args[1] as T2) } + combineOriginal(listOf(this, other)) { args -> + transform(args[0] as T1, args[1] as T2).also { + args[0] = null + args[1] = null + } + } } class CombineTransformAdapterTest : CombineTestBase() { @@ -284,11 +295,20 @@ class CombineTransformAdapterTest : CombineTestBase() { class CombineTransformVarargAdapterTest : CombineTestBase() { override fun Flow.combineLatest(other: Flow, transform: suspend (T1, T2) -> R): Flow = - combineTransformOriginal(this, other) { args: Array -> emit(transform(args[0] as T1, args[1] as T2)) } + combineTransformOriginal(this, other) { args: Array -> + emit(transform(args[0] as T1, args[1] as T2)) // Mess up with array + args[0] = null + args[1] = null + } } class CombineTransformIterableTest : CombineTestBase() { override fun Flow.combineLatest(other: Flow, transform: suspend (T1, T2) -> R): Flow = - combineTransformOriginal(listOf(this, other)) { args -> emit(transform(args[0] as T1, args[1] as T2)) } + combineTransformOriginal(listOf(this, other)) { args -> + emit(transform(args[0] as T1, args[1] as T2)) + // Mess up with array + args[0] = null + args[1] = null + } }