Skip to content

Commit

Permalink
Add update, updateAndGet, and getAndUpdate extension functions to Mut…
Browse files Browse the repository at this point in the history
…ableStateFlow (Kotlin#2720).
  • Loading branch information
lowasser committed May 20, 2021
1 parent 00e6cbf commit 92748be
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 5 deletions.
57 changes: 52 additions & 5 deletions kotlinx-coroutines-core/common/src/flow/StateFlow.kt
Expand Up @@ -37,7 +37,7 @@ import kotlin.native.concurrent.*
* val counter = _counter.asStateFlow() // publicly exposed as read-only state flow
*
* fun inc() {
* _counter.value++
* _counter.update { count -> count + 1 }
* }
* }
* ```
Expand Down Expand Up @@ -186,6 +186,56 @@ public interface MutableStateFlow<T> : StateFlow<T>, MutableSharedFlow<T> {
@Suppress("FunctionName")
public fun <T> MutableStateFlow(value: T): MutableStateFlow<T> = StateFlowImpl(value ?: NULL)

// ------------------------------------ Update methods ------------------------------------

/**
* Updates the [MutableStateFlow.value] atomically using the specified [function] of its value, and returns the new
* value.
*
* [function] may be evaluated multiple times, if [value] is being concurrently updated.
*/
inline fun <T> MutableStateFlow<T>.updateAndGet(function: (T) -> T): T {
while (true) {
val prevValue = value
val nextValue = function(prevValue)
if (compareAndSet(prevValue, nextValue)) {
return nextValue
}
}
}

/**
* Updates the [MutableStateFlow.value] atomically using the specified [function] of its value, and returns its
* prior value.
*
* [function] may be evaluated multiple times, if [value] is being concurrently updated.
*/
inline fun <T> MutableStateFlow<T>.updateAndGet(function: (T) -> T): T {
while (true) {
val prevValue = value
val nextValue = function(prevValue)
if (compareAndSet(prevValue, nextValue)) {
return prevValue
}
}
}


/**
* Updates the [MutableStateFlow.value] atomically using the specified [function] of its value.
*
* [function] may be evaluated multiple times, if [value] is being concurrently updated.
*/
inline fun <T> MutableStateFlow<T>.update(function: (T) -> T): T {
while (true) {
val prevValue = value
val nextValue = function(prevValue)
if (compareAndSet(prevValue, nextValue)) {
return
}
}
}

// ------------------------------------ Implementation ------------------------------------

@SharedImmutable
Expand Down Expand Up @@ -366,10 +416,7 @@ private class StateFlowImpl<T>(
}

internal fun MutableStateFlow<Int>.increment(delta: Int) {
while (true) { // CAS loop
val current = value
if (compareAndSet(current, current + delta)) return
}
update { it + delta }
}

internal fun <T> StateFlow<T>.fuseStateFlow(
Expand Down
55 changes: 55 additions & 0 deletions kotlinx-coroutines-core/common/test/flow/sharing/StateFlowTest.kt
Expand Up @@ -193,4 +193,59 @@ class StateFlowTest : TestBase() {
assertTrue(state.compareAndSet(d1_1, d0)) // updates, reference changes
assertSame(d0, state.value)
}

@Test
fun testGetAndUpdateContended() = runTest {
val state = MutableStateFlow(0)

// use a barrier to ensure j2 at least doesn't finish before j3 starts
val barrier = Job()
val j2 = async {
barrier.join()
state.getAndUpdate { it + 2 }
}
val j3 = async {
barrier.join()
state.getAndUpdate { it + 3 }
}
barrier.complete()
when (j2.await()) {
0 -> assertEquals(2, j3.await())
3 -> assertEquals(0, j3.await())
else -> fail()
}
assertEquals(5, state.value)
}

@Test
fun testUpdateAndGetContended() = runTest {
val state = MutableStateFlow(0)

// use a barrier to ensure j2 at least doesn't finish before j3 starts
val barrier = Job()
val j2 = async {
barrier.join()
state.updateAndGet { it + 2 }
}
val j3 = async {
barrier.join()
state.updateAndGet { it + 3 }
}
barrier.complete()
when (j2.await()) {
5 -> assertEquals(3, j3.await())
3 -> assertEquals(5, j3.await())
else -> fail()
}
assertEquals(5, state.value)
}

@Test
fun update() = runTest {
val state = MutableStateFlow(0)
state.update { it + 2 }
assertEquals(2, state.value)
state.update { it + 3 }
assertEquals(5, state.value)
}
}

0 comments on commit 92748be

Please sign in to comment.