Skip to content

Commit

Permalink
Internal cleanups (#318)
Browse files Browse the repository at this point in the history
  • Loading branch information
JakeWharton committed Apr 12, 2024
1 parent 87dea67 commit 0547b86
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 31 deletions.
21 changes: 16 additions & 5 deletions src/commonMain/kotlin/app/cash/turbine/Turbine.kt
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,12 @@ public operator fun <T> Turbine<T>.plusAssign(value: T) { add(value) }
public fun <T> Turbine(
timeout: Duration? = null,
name: String? = null,
): Turbine<T> = ChannelTurbine(timeout = timeout, name = name)
): Turbine<T> = ChannelTurbine(Channel(UNLIMITED), null, timeout, name)

internal class ChannelTurbine<T>(
channel: Channel<T> = Channel(UNLIMITED),
private val job: Job? = null,
channel: Channel<T>,
/** Non-null if [channel] is being populated by an external `Flow` collection. */
private val collectJob: Job?,
private val timeout: Duration?,
private val name: String?,
) : Turbine<T> {
Expand Down Expand Up @@ -136,6 +137,16 @@ internal class ChannelTurbine<T>(
}
}
}

override fun cancel(cause: CancellationException?) {
collectJob?.cancel()
channel.close(cause)
}

override fun close(cause: Throwable?): Boolean {
collectJob?.cancel()
return channel.close(cause)
}
}

override fun asChannel(): Channel<T> = channel
Expand All @@ -148,14 +159,14 @@ internal class ChannelTurbine<T>(
override suspend fun cancel() {
if (!channel.isClosedForSend) ignoreTerminalEvents = true
channel.cancel()
job?.cancelAndJoin()
collectJob?.cancelAndJoin()
}

@OptIn(DelicateCoroutinesApi::class)
override fun close(cause: Throwable?) {
if (!channel.isClosedForSend) ignoreTerminalEvents = true
channel.close(cause)
job?.cancel()
collectJob?.cancel()
}

override fun takeEvent(): Event<T> = channel.takeEvent(name = name)
Expand Down
32 changes: 6 additions & 26 deletions src/commonMain/kotlin/app/cash/turbine/flow.kt
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public interface TurbineContext : CoroutineScope {
public interface TurbineTestContext<T> : TurbineContext, ReceiveTurbine<T>

internal class TurbineTestContextImpl<T>(
turbine: Turbine<T>,
turbine: ReceiveTurbine<T>,
turbineContext: CoroutineContext,
) : TurbineContext by TurbineContextImpl(turbineContext), ReceiveTurbine<T> by turbine, TurbineTestContext<T>

Expand Down Expand Up @@ -180,7 +180,7 @@ public fun <T> Flow<T>.testIn(
return testInInternal(this, timeout, scope, name)
}

private fun <T> testInInternal(flow: Flow<T>, timeout: Duration?, scope: CoroutineScope, name: String?): Turbine<T> {
private fun <T> testInInternal(flow: Flow<T>, timeout: Duration?, scope: CoroutineScope, name: String?): ReceiveTurbine<T> {
if (timeout != null) {
// Eager check to throw early rather than in a subsequent 'await' call.
checkTimeout(timeout)
Expand All @@ -204,27 +204,15 @@ private fun <T> testInInternal(flow: Flow<T>, timeout: Duration?, scope: Corouti
return turbine
}

private fun <T> Flow<T>.collectTurbineIn(scope: CoroutineScope, timeout: Duration?, name: String?): Turbine<T> {
lateinit var channel: Channel<T>

private fun <T> Flow<T>.collectTurbineIn(scope: CoroutineScope, timeout: Duration?, name: String?): ReceiveTurbine<T> {
// Use test-specific unconfined if test scheduler is in use to inherit its virtual time.
@OptIn(ExperimentalCoroutinesApi::class) // UnconfinedTestDispatcher is still experimental.
val unconfined = scope.coroutineContext[TestCoroutineScheduler]
?.let(::UnconfinedTestDispatcher)
?: Unconfined

val job = scope.launch(unconfined, start = UNDISPATCHED) {
channel = collectIntoChannel(this)
}

return ChannelTurbine(channel, job, timeout, name).also {
scope.reportTurbine(it)
}
}

private fun <T> Flow<T>.collectIntoChannel(scope: CoroutineScope): Channel<T> {
val output = Channel<T>(UNLIMITED)
val job = scope.launch(start = UNDISPATCHED) {
val job = scope.launch(unconfined, start = UNDISPATCHED) {
try {
collect { output.trySend(it) }
output.close()
Expand All @@ -233,15 +221,7 @@ private fun <T> Flow<T>.collectIntoChannel(scope: CoroutineScope): Channel<T> {
}
}

return object : Channel<T> by output {
override fun cancel(cause: CancellationException?) {
job.cancel()
output.close(cause)
}

override fun close(cause: Throwable?): Boolean {
job.cancel()
return output.close(cause)
}
return ChannelTurbine(output, job, timeout, name).also {
scope.reportTurbine(it)
}
}

0 comments on commit 0547b86

Please sign in to comment.