Skip to content

Commit

Permalink
Implement ObservableValue<T>.asFlow()
Browse files Browse the repository at this point in the history
  • Loading branch information
dkhalanskyjb committed Feb 7, 2020
1 parent 6810745 commit 81e829b
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 1 deletion.
@@ -1,3 +1,7 @@
public final class kotlinx/coroutines/javafx/JavaFxConvertKt {
public static final fun asFlow (Ljavafx/beans/value/ObservableValue;)Lkotlinx/coroutines/flow/Flow;
}

public abstract class kotlinx/coroutines/javafx/JavaFxDispatcher : kotlinx/coroutines/MainCoroutineDispatcher, kotlinx/coroutines/Delay {
public fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun dispatch (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Runnable;)V
Expand Down
43 changes: 43 additions & 0 deletions ui/kotlinx-coroutines-javafx/src/JavaFxConvert.kt
@@ -0,0 +1,43 @@
package kotlinx.coroutines.javafx

import javafx.beans.value.ChangeListener
import javafx.beans.value.ObservableValue
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.flow.conflate

/**
* Creates an instance of a cold [Flow] that subscribes to the given [ObservableValue] and produces
* its values as they change.
*
* The resulting flow is conflated, meaning that if several values arrive in a quick succession, only
* the last one will be produced.
*
* It produces at least one value.
*
* Since this implementation uses [ObservableValue.addListener], even if this [ObservableValue]
* supports lazy evaluation, eager computation will be enforced while the flow is being collected.
*/
@ExperimentalCoroutinesApi
fun <T: Any> ObservableValue<T>.asFlow(): Flow<T> = callbackFlow<T> {
val listener = ChangeListener<T> { observable, oldValue, newValue ->
try {
offer(newValue)
} catch (e: CancellationException) {
// In case the event fires after the channel is closed
}
}
withContext(Dispatchers.JavaFx) {
addListener(listener)
send(value)
}
awaitClose {
runBlocking {
withContext(Dispatchers.JavaFx) {
removeListener(listener)
}
}
}
}.conflate()
Expand Up @@ -8,7 +8,7 @@ import javafx.application.*
import kotlinx.coroutines.*
import org.junit.*

class JavaFxTest : TestBase() {
class JavaFxDispatcherTest : TestBase() {
@Before
fun setup() {
ignoreLostThreads("JavaFX Application Thread", "Thread-", "QuantumRenderer-", "InvokeLaterDispatcher")
Expand Down
99 changes: 99 additions & 0 deletions ui/kotlinx-coroutines-javafx/test/JavaFxObservableAsFlowTest.kt
@@ -0,0 +1,99 @@
package kotlinx.coroutines.javafx

import javafx.beans.property.SimpleIntegerProperty
import kotlinx.coroutines.TestBase
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import org.junit.Before
import org.junit.Test
import kotlin.test.assertTrue


class JavaFxObservableAsFlowTest : TestBase() {

@Before
fun setup() {
ignoreLostThreads("JavaFX Application Thread", "Thread-", "QuantumRenderer-", "InvokeLaterDispatcher")
}

@Test
fun testFlowOrder() = runTest {
if (!initPlatform()) {
println("Skipping JavaFxTest in headless environment")
return@runTest // ignore test in headless environments
}

val integerProperty = SimpleIntegerProperty(0)
val n = 10000 * stressTestMultiplier
val flow = integerProperty.asFlow().takeWhile { j -> j != n }
newSingleThreadContext("setter").use { pool ->
launch(pool) {
for (i in 1..n) {
launch(Dispatchers.JavaFx) {
integerProperty.set(i)
}
}
}
var i = -1
flow.collect { j ->
// elements are neither repeated nor shuffled
assertTrue(i < (j as Int))
i = j
}
// at least one element is present
assertTrue(i != -1)
}
}

@Test
fun testConflation() = runTest {
if (!initPlatform()) {
println("Skipping JavaFxTest in headless environment")
return@runTest // ignore test in headless environments
}

val END_MARKER = -1
val integerProperty = SimpleIntegerProperty(0)
val flow = integerProperty.asFlow().takeWhile { j -> j != END_MARKER }
launch(start = CoroutineStart.UNDISPATCHED) {
withContext(Dispatchers.JavaFx) {
integerProperty.set(1)
}
withContext(Dispatchers.JavaFx) {
integerProperty.set(-2) // should be skipped
integerProperty.set(2)
}
withContext(Dispatchers.JavaFx) {
integerProperty.set(END_MARKER)
}
}

flow.collect { i ->
assertTrue(i == 1 || i == 2)
}
}

@Test
fun cancellationRaceStressTest() = runTest {
if (!initPlatform()) {
println("Skipping JavaFxTest in headless environment")
return@runTest // ignore test in headless environments
}

val integerProperty = SimpleIntegerProperty(0)
val flow = integerProperty.asFlow()
var i = 1
val n = 1000 * stressTestMultiplier
newSingleThreadContext("collector").use { pool ->
repeat (n) {
launch(pool) {
flow.first()
}
withContext(Dispatchers.JavaFx) {
integerProperty.set(i)
}
i += 1
}
}
}
}

0 comments on commit 81e829b

Please sign in to comment.