Skip to content

Commit

Permalink
~more small optimizations
Browse files Browse the repository at this point in the history
    * Do not allocate array on each transform call in combine, do it only for combineTransform
    * Simplify zip operator even further
    * Get rid of crossinline in combine to fix weird Android crashes

Fixes #1743
Fixes #1683
  • Loading branch information
qwwdfsad committed Oct 19, 2020
1 parent c2d0707 commit 5f01641
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 58 deletions.
14 changes: 7 additions & 7 deletions kotlinx-coroutines-core/common/src/flow/Migration.kt
Expand Up @@ -367,35 +367,35 @@ public fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspen
message = "Flow analogue of 'combineLatest' is 'combine'",
replaceWith = ReplaceWith("combine(this, other, other2, transform)")
)
public inline fun <T1, T2, T3, R> Flow<T1>.combineLatest(
public fun <T1, T2, T3, R> Flow<T1>.combineLatest(
other: Flow<T2>,
other2: Flow<T3>,
crossinline transform: suspend (T1, T2, T3) -> R
transform: suspend (T1, T2, T3) -> R
) = combine(this, other, other2, transform)

@Deprecated(
level = DeprecationLevel.ERROR,
message = "Flow analogue of 'combineLatest' is 'combine'",
replaceWith = ReplaceWith("combine(this, other, other2, other3, transform)")
)
public inline fun <T1, T2, T3, T4, R> Flow<T1>.combineLatest(
public fun <T1, T2, T3, T4, R> Flow<T1>.combineLatest(
other: Flow<T2>,
other2: Flow<T3>,
other3: Flow<T4>,
crossinline transform: suspend (T1, T2, T3, T4) -> R
transform: suspend (T1, T2, T3, T4) -> R
) = combine(this, other, other2, other3, transform)

@Deprecated(
level = DeprecationLevel.ERROR,
message = "Flow analogue of 'combineLatest' is 'combine'",
replaceWith = ReplaceWith("combine(this, other, other2, other3, transform)")
)
public inline fun <T1, T2, T3, T4, T5, R> Flow<T1>.combineLatest(
public fun <T1, T2, T3, T4, T5, R> Flow<T1>.combineLatest(
other: Flow<T2>,
other2: Flow<T3>,
other3: Flow<T4>,
other4: Flow<T5>,
crossinline transform: suspend (T1, T2, T3, T4, T5) -> R
transform: suspend (T1, T2, T3, T4, T5) -> R
): Flow<R> = combine(this, other, other2, other3, other4, transform)

/**
Expand Down Expand Up @@ -482,4 +482,4 @@ public fun <T> Flow<T>.replay(bufferSize: Int): Flow<T> = noImpl()
message = "Flow analogue of 'cache()' is 'shareIn' with unlimited replay and 'started = SharingStared.Lazily' argument'",
replaceWith = ReplaceWith("this.shareIn(scope, Int.MAX_VALUE, started = SharingStared.Lazily)")
)
public fun <T> Flow<T>.cache(): Flow<T> = noImpl()
public fun <T> Flow<T>.cache(): Flow<T> = noImpl()
43 changes: 23 additions & 20 deletions kotlinx-coroutines-core/common/src/flow/internal/Combine.kt
Expand Up @@ -17,14 +17,13 @@ private typealias Update = IndexedValue<Any?>
@PublishedApi
internal suspend fun <R, T> FlowCollector<R>.combineInternal(
flows: Array<out Flow<T>>,
arrayFactory: () -> Array<T?>, // Array factory is required to workaround array typing on JVM
arrayFactory: () -> Array<T?>?, // Array factory is required to workaround array typing on JVM
transform: suspend FlowCollector<R>.(Array<T>) -> Unit
): Unit = flowScope { // flow scope so any cancellation within the source flow will cancel the whole scope
val size = flows.size
if (size == 0) return@flowScope // bail-out for empty input
val latestValues = arrayOfNulls<Any?>(size)
latestValues.fill(UNINITIALIZED) // Smaller bytecode & faster that Array(size) { UNINITIALIZED }
val isClosed = BooleanArray(size)
val resultChannel = Channel<Update>(flows.size)
val nonClosed = LocalAtomicInt(size)
var remainingAbsentValues = size
Expand All @@ -37,7 +36,6 @@ internal suspend fun <R, T> FlowCollector<R>.combineInternal(
yield() // Emulate fairness, giving each flow chance to emit
}
} finally {
isClosed[i] = true
// Close the channel when there is no more flows
if (nonClosed.decrementAndGet() == 0) {
resultChannel.close()
Expand Down Expand Up @@ -72,17 +70,30 @@ internal suspend fun <R, T> FlowCollector<R>.combineInternal(

// Process batch result if there is enough data
if (remainingAbsentValues == 0) {
/*
* If arrayFactory returns null, then we can avoid array copy because
* it's our own safe transformer that immediately deconstructs the array
*/
val results = arrayFactory()
(latestValues as Array<T?>).copyInto(results)
transform(results as Array<T>)
if (results == null) {
transform(latestValues as Array<T>)
} else {
(latestValues as Array<T?>).copyInto(results)
transform(results as Array<T>)
}
}
}
}

internal fun <T1, T2, R> zipImpl(flow: Flow<T1>, flow2: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> =
unsafeFlow {
coroutineScope {
val second = asChannel(flow2)
val second = produce<Any> {
flow2.collect { value ->
return@collect channel.send(value ?: NULL)
}
}

/*
* This approach only works with rendezvous channel and is required to enforce correctness
* in the following scenario:
Expand All @@ -95,14 +106,11 @@ internal fun <T1, T2, R> zipImpl(flow: Flow<T1>, flow2: Flow<T2>, transform: sus
* Invariant: this clause is invoked only when all elements from the channel were processed (=> rendezvous restriction).
*/
val collectJob = Job()
val scopeJob = currentCoroutineContext()[Job]!! // TODO replace with extension when #2245 is here
(second as SendChannel<*>).invokeOnClose {
// Optimization to avoid AFE allocation when the other flow is done
if (collectJob.isActive) collectJob.cancel(AbortFlowException(this@unsafeFlow))
}

val newContext = coroutineContext + scopeJob
val cnt = threadContextElements(newContext)
try {
/*
* Non-trivial undispatched (because we are in the right context and there is no structured concurrency)
Expand All @@ -116,18 +124,20 @@ internal fun <T1, T2, R> zipImpl(flow: Flow<T1>, flow2: Flow<T2>, transform: sus
* with coroutines scope via a channel, but it's way too expensive, so
* we are using this trick instead.
*/
withContextUndispatched( coroutineContext + collectJob) {
val scopeContext = coroutineContext
val cnt = threadContextElements(scopeContext)
withContextUndispatched(coroutineContext + collectJob) {
flow.collect { value ->
withContextUndispatched(newContext, cnt) {
withContextUndispatched(scopeContext, cnt) {
val otherValue = second.receiveOrNull() ?: throw AbortFlowException(this@unsafeFlow)
emit(transform(NULL.unbox(value), NULL.unbox(otherValue)))
emit(transform(value, NULL.unbox(otherValue)))
}
}
}
} catch (e: AbortFlowException) {
e.checkOwnership(owner = this@unsafeFlow)
} finally {
if (!second.isClosedForReceive) second.cancel(AbortFlowException(this@unsafeFlow))
if (!second.isClosedForReceive) second.cancel()
}
}
}
Expand All @@ -144,10 +154,3 @@ private suspend fun withContextUndispatched(
})
}
}

// Channel has any type due to onReceiveOrNull. This will be fixed after receiveOrClosed
private fun CoroutineScope.asChannel(flow: Flow<*>): ReceiveChannel<Any> = produce {
flow.collect { value ->
return@collect channel.send(value ?: NULL)
}
}
65 changes: 45 additions & 20 deletions kotlinx-coroutines-core/common/src/flow/operators/Zip.kt
Expand Up @@ -30,7 +30,7 @@ import kotlinx.coroutines.flow.internal.unsafeFlow as flow
*/
@JvmName("flowCombine")
public fun <T1, T2, R> Flow<T1>.combine(flow: Flow<T2>, transform: suspend (a: T1, b: T2) -> R): Flow<R> = flow {
combineInternal(arrayOf(this@combine, flow), { arrayOfNulls(2) }, { emit(transform(it[0] as T1, it[1] as T2)) })
combineInternal(arrayOf(this@combine, flow), nullArrayFactory(), { emit(transform(it[0] as T1, it[1] as T2)) })
}

/**
Expand Down Expand Up @@ -72,7 +72,7 @@ public fun <T1, T2, R> combine(flow: Flow<T1>, flow2: Flow<T2>, transform: suspe
public fun <T1, T2, R> Flow<T1>.combineTransform(
flow: Flow<T2>,
@BuilderInference transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit
): Flow<R> = combineTransform(this, flow) { args: Array<*> ->
): Flow<R> = combineTransformUnsafe(this, flow) { args: Array<*> ->
transform(
args[0] as T1,
args[1] as T2
Expand Down Expand Up @@ -100,7 +100,7 @@ public fun <T1, T2, R> combineTransform(
flow: Flow<T1>,
flow2: Flow<T2>,
@BuilderInference transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit
): Flow<R> = combineTransform(flow, flow2) { args: Array<*> ->
): Flow<R> = combineTransformUnsafe(flow, flow2) { args: Array<*> ->
transform(
args[0] as T1,
args[1] as T2
Expand All @@ -111,12 +111,12 @@ public fun <T1, T2, R> combineTransform(
* Returns a [Flow] whose values are generated with [transform] function by combining
* the most recently emitted values by each flow.
*/
public inline fun <T1, T2, T3, R> combine(
public fun <T1, T2, T3, R> combine(
flow: Flow<T1>,
flow2: Flow<T2>,
flow3: Flow<T3>,
@BuilderInference crossinline transform: suspend (T1, T2, T3) -> R
): Flow<R> = combine(flow, flow2, flow3) { args: Array<*> ->
@BuilderInference transform: suspend (T1, T2, T3) -> R
): Flow<R> = combineUnsafe(flow, flow2, flow3) { args: Array<*> ->
transform(
args[0] as T1,
args[1] as T2,
Expand All @@ -130,12 +130,12 @@ public inline fun <T1, T2, T3, R> combine(
* The receiver of the [transform] is [FlowCollector] and thus `transform` is a
* generic function that may transform emitted element, skip it or emit it multiple times.
*/
public inline fun <T1, T2, T3, R> combineTransform(
public fun <T1, T2, T3, R> combineTransform(
flow: Flow<T1>,
flow2: Flow<T2>,
flow3: Flow<T3>,
@BuilderInference crossinline transform: suspend FlowCollector<R>.(T1, T2, T3) -> Unit
): Flow<R> = combineTransform(flow, flow2, flow3) { args: Array<*> ->
@BuilderInference transform: suspend FlowCollector<R>.(T1, T2, T3) -> Unit
): Flow<R> = combineTransformUnsafe(flow, flow2, flow3) { args: Array<*> ->
transform(
args[0] as T1,
args[1] as T2,
Expand All @@ -147,12 +147,12 @@ public inline fun <T1, T2, T3, R> combineTransform(
* Returns a [Flow] whose values are generated with [transform] function by combining
* the most recently emitted values by each flow.
*/
public inline fun <T1, T2, T3, T4, R> combine(
public fun <T1, T2, T3, T4, R> combine(
flow: Flow<T1>,
flow2: Flow<T2>,
flow3: Flow<T3>,
flow4: Flow<T4>,
crossinline transform: suspend (T1, T2, T3, T4) -> R
transform: suspend (T1, T2, T3, T4) -> R
): Flow<R> = combine(flow, flow2, flow3, flow4) { args: Array<*> ->
transform(
args[0] as T1,
Expand All @@ -168,13 +168,13 @@ public inline fun <T1, T2, T3, T4, R> combine(
* The receiver of the [transform] is [FlowCollector] and thus `transform` is a
* generic function that may transform emitted element, skip it or emit it multiple times.
*/
public inline fun <T1, T2, T3, T4, R> combineTransform(
public fun <T1, T2, T3, T4, R> combineTransform(
flow: Flow<T1>,
flow2: Flow<T2>,
flow3: Flow<T3>,
flow4: Flow<T4>,
@BuilderInference crossinline transform: suspend FlowCollector<R>.(T1, T2, T3, T4) -> Unit
): Flow<R> = combineTransform(flow, flow2, flow3, flow4) { args: Array<*> ->
@BuilderInference transform: suspend FlowCollector<R>.(T1, T2, T3, T4) -> Unit
): Flow<R> = combineTransformUnsafe(flow, flow2, flow3, flow4) { args: Array<*> ->
transform(
args[0] as T1,
args[1] as T2,
Expand All @@ -187,14 +187,14 @@ public inline fun <T1, T2, T3, T4, R> combineTransform(
* Returns a [Flow] whose values are generated with [transform] function by combining
* the most recently emitted values by each flow.
*/
public inline fun <T1, T2, T3, T4, T5, R> combine(
public fun <T1, T2, T3, T4, T5, R> combine(
flow: Flow<T1>,
flow2: Flow<T2>,
flow3: Flow<T3>,
flow4: Flow<T4>,
flow5: Flow<T5>,
crossinline transform: suspend (T1, T2, T3, T4, T5) -> R
): Flow<R> = combine(flow, flow2, flow3, flow4, flow5) { args: Array<*> ->
transform: suspend (T1, T2, T3, T4, T5) -> R
): Flow<R> = combineUnsafe(flow, flow2, flow3, flow4, flow5) { args: Array<*> ->
transform(
args[0] as T1,
args[1] as T2,
Expand All @@ -210,14 +210,14 @@ public inline fun <T1, T2, T3, T4, T5, R> combine(
* The receiver of the [transform] is [FlowCollector] and thus `transform` is a
* generic function that may transform emitted element, skip it or emit it multiple times.
*/
public inline fun <T1, T2, T3, T4, T5, R> combineTransform(
public fun <T1, T2, T3, T4, T5, R> combineTransform(
flow: Flow<T1>,
flow2: Flow<T2>,
flow3: Flow<T3>,
flow4: Flow<T4>,
flow5: Flow<T5>,
@BuilderInference crossinline transform: suspend FlowCollector<R>.(T1, T2, T3, T4, T5) -> Unit
): Flow<R> = combineTransform(flow, flow2, flow3, flow4, flow5) { args: Array<*> ->
@BuilderInference transform: suspend FlowCollector<R>.(T1, T2, T3, T4, T5) -> Unit
): Flow<R> = combineTransformUnsafe(flow, flow2, flow3, flow4, flow5) { args: Array<*> ->
transform(
args[0] as T1,
args[1] as T2,
Expand Down Expand Up @@ -251,6 +251,31 @@ public inline fun <reified T, R> combineTransform(
combineInternal(flows, { arrayOfNulls(flows.size) }, { transform(it) })
}

/*
* Same as combine, but does not copy array each time, deconstructing existing
* array each time. Used in overloads that accept FunctionN instead of Function<Array<R>>
*/
private inline fun <reified T, R> combineUnsafe(
vararg flows: Flow<T>,
crossinline transform: suspend (Array<T>) -> R
): Flow<R> = flow {
combineInternal(flows, nullArrayFactory(), { emit(transform(it)) })
}

/*
* Same as combineTransform, but does not copy array each time, deconstructing existing
* array each time. Used in overloads that accept FunctionN instead of Function<Array<R>>
*/
private inline fun <reified T, R> combineTransformUnsafe(
vararg flows: Flow<T>,
@BuilderInference crossinline transform: suspend FlowCollector<R>.(Array<T>) -> Unit
): Flow<R> = safeFlow {
combineInternal(flows, nullArrayFactory(), { transform(it) })
}

// Saves bunch of anonymous classes
private fun <T> nullArrayFactory(): () -> Array<T>? = { null }

/**
* Returns a [Flow] whose values are generated with [transform] function by combining
* the most recently emitted values by each flow.
Expand Down

0 comments on commit 5f01641

Please sign in to comment.