forked from Kotlin/kotlinx.coroutines
/
Transform.kt
129 lines (115 loc) · 4.61 KB
/
Transform.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
@file:JvmMultifileClass
@file:JvmName("FlowKt")
@file:Suppress("UNCHECKED_CAST")
package kotlinx.coroutines.flow
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.internal.*
import kotlin.jvm.*
import kotlinx.coroutines.flow.internal.unsafeFlow as flow
import kotlinx.coroutines.flow.unsafeTransform as transform
/**
* Returns a flow containing only values of the original flow that match the given [predicate].
*/
public inline fun <T> Flow<T>.filter(crossinline predicate: suspend (T) -> Boolean): Flow<T> = transform { value ->
if (predicate(value)) return@transform emit(value)
}
/**
* Returns a flow containing only values of the original flow that do not match the given [predicate].
*/
public inline fun <T> Flow<T>.filterNot(crossinline predicate: suspend (T) -> Boolean): Flow<T> = transform { value ->
if (!predicate(value)) return@transform emit(value)
}
/**
* Returns a flow containing only values that are instances of specified type [R].
*/
@Suppress("UNCHECKED_CAST")
public inline fun <reified R> Flow<*>.filterIsInstance(): Flow<R> = filter { it is R } as Flow<R>
/**
* Returns a flow containing only values of the original flow that are not null.
*/
public fun <T: Any> Flow<T?>.filterNotNull(): Flow<T> = transform<T?, T> { value ->
if (value != null) return@transform emit(value)
}
/**
* Returns a flow containing the results of applying the given [transform] function to each value of the original flow.
*/
public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) -> R): Flow<R> = transform { value ->
return@transform emit(transform(value))
}
/**
* Returns a flow that contains only non-null results of applying the given [transform] function to each value of the original flow.
*/
public inline fun <T, R: Any> Flow<T>.mapNotNull(crossinline transform: suspend (value: T) -> R?): Flow<R> = transform { value ->
val transformed = transform(value) ?: return@transform
return@transform emit(transformed)
}
/**
* Returns a flow that wraps each element into [IndexedValue], containing value and its index (starting from zero).
*/
public fun <T> Flow<T>.withIndex(): Flow<IndexedValue<T>> = flow {
var index = 0
collect { value ->
emit(IndexedValue(checkIndexOverflow(index++), value))
}
}
/**
* Returns a flow that invokes the given [action] **before** each value of the upstream flow is emitted downstream.
*/
public fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T> = transform { value ->
action(value)
return@transform emit(value)
}
/**
* Folds the given flow with [operation], emitting every intermediate result, including [initial] value.
* Note that initial value should be immutable (or should not be mutated) as it is shared between different collectors.
* For example:
* ```
* flowOf(1, 2, 3).scan(emptyList<Int>()) { acc, value -> acc + value }.toList()
* ```
* will produce `[], [1], [1, 2], [1, 2, 3]]`.
*
* This function is an alias to [runningFold] operator.
*/
public fun <T, R> Flow<T>.scan(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow<R> = runningFold(initial, operation)
/**
* Folds the given flow with [operation], emitting every intermediate result, including [initial] value.
* Note that initial value should be immutable (or should not be mutated) as it is shared between different collectors.
* For example:
* ```
* flowOf(1, 2, 3).runningFold(emptyList<Int>()) { acc, value -> acc + value }.toList()
* ```
* will produce `[], [1], [1, 2], [1, 2, 3]]`.
*/
public fun <T, R> Flow<T>.runningFold(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow<R> = flow {
var accumulator: R = initial
emit(accumulator)
collect { value ->
accumulator = operation(accumulator, value)
emit(accumulator)
}
}
/**
* Reduces the given flow with [operation], emitting every intermediate result, including initial value.
* The first element is taken as initial value for operation accumulator.
* This operator has a sibling with initial value -- [scan].
*
* For example:
* ```
* flowOf(1, 2, 3, 4).runningReduce { acc, value -> acc + value }.toList()
* ```
* will produce `[1, 3, 6, 10]`
*/
public fun <T> Flow<T>.runningReduce(operation: suspend (accumulator: T, value: T) -> T): Flow<T> = flow {
var accumulator: Any? = NULL
collect { value ->
accumulator = if (accumulator === NULL) {
value
} else {
operation(accumulator as T, value)
}
emit(accumulator as T)
}
}