forked from Kotlin/kotlinx.coroutines
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Collect.kt
110 lines (102 loc) · 3.73 KB
/
Collect.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
@file:JvmMultifileClass
@file:JvmName("FlowKt")
package kotlinx.coroutines.flow
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.internal.*
import kotlin.jvm.*
/**
* Terminal flow operator that collects the given flow but ignores all emitted values.
* If any exception occurs during collect or in the provided flow, this exception is rethrown from this method.
*
* It is a shorthand for `collect {}`.
*
* This operator is usually used with [onEach], [onCompletion] and [catch] operators to process all emitted values and
* handle an exception that might occur in the upstream flow or during processing, for example:
*
* ```
* flow
* .onEach { value -> process(value) }
* .catch { e -> handleException(e) }
* .collect() // trigger collection of the flow
* ```
*/
public suspend fun Flow<*>.collect(): Unit = collect(NopCollector)
/**
* Terminal flow operator that [launches][launch] the [collection][collect] of the given flow in the [scope].
* It is a shorthand for `scope.launch { flow.collect() }`.
*
* This operator is usually used with [onEach], [onCompletion] and [catch] operators to process all emitted values
* handle an exception that might occur in the upstream flow or during processing, for example:
*
* ```
* flow
* .onEach { value -> updateUi(value) }
* .onCompletion { cause -> updateUi(if (cause == null) "Done" else "Failed") }
* .catch { cause -> LOG.error("Exception: $cause") }
* .launchIn(uiScope)
* ```
*
* Note that the resulting value of [launchIn] is not used and the provided scope takes care of cancellation.
*/
public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch {
collect() // tail-call
}
/**
* Terminal flow operator that collects the given flow with a provided [action] that takes the index of an element (zero-based) and the element.
* If any exception occurs during collect or in the provided flow, this exception is rethrown from this method.
*
* See also [collect] and [withIndex].
*/
public suspend inline fun <T> Flow<T>.collectIndexed(crossinline action: suspend (index: Int, value: T) -> Unit): Unit =
collect(object : FlowCollector<T> {
private var index = 0
override suspend fun emit(value: T) = action(checkIndexOverflow(index++), value)
})
/**
* Terminal flow operator that collects the given flow with a provided [action].
* The crucial difference from [collect] is that when the original flow emits a new value
* then the [action] block for the previous value is cancelled.
*
* It can be demonstrated by the following example:
*
* ```
* flow {
* emit(1)
* delay(50)
* emit(2)
* }.collectLatest { value ->
* println("Collecting $value")
* delay(100) // Emulate work
* println("$value collected")
* }
* ```
*
* prints "Collecting 1, Collecting 2, 2 collected"
*/
public suspend fun <T> Flow<T>.collectLatest(action: suspend (value: T) -> Unit) {
/*
* Implementation note:
* buffer(0) is inserted here to fulfil user's expectations in sequential usages, e.g.:
* ```
* flowOf(1, 2, 3).collectLatest {
* delay(1)
* println(it) // Expect only 3 to be printed
* }
* ```
*
* It's not the case for intermediate operators which users mostly use for interactive UI,
* where performance of dispatch is more important.
*/
mapLatest(action).buffer(0).collect()
}
/**
* Collects all the values from the given [flow] and emits them to the collector.
* It is a shorthand for `flow.collect { value -> emit(value) }`.
*/
public suspend fun <T> FlowCollector<T>.emitAll(flow: Flow<T>) {
ensureActive()
flow.collect(this)
}