Skip to content

Commit

Permalink
Implement ObservableValue<T>.asFlow()
Browse files Browse the repository at this point in the history
  • Loading branch information
dkhalanskyjb committed Feb 13, 2020
1 parent 6810745 commit 897fefa
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 1 deletion.
@@ -1,3 +1,7 @@
public final class kotlinx/coroutines/javafx/JavaFxConvertKt {
public static final fun asFlow (Ljavafx/beans/value/ObservableValue;)Lkotlinx/coroutines/flow/Flow;
}

public abstract class kotlinx/coroutines/javafx/JavaFxDispatcher : kotlinx/coroutines/MainCoroutineDispatcher, kotlinx/coroutines/Delay {
public fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun dispatch (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Runnable;)V
Expand Down
38 changes: 38 additions & 0 deletions ui/kotlinx-coroutines-javafx/src/JavaFxConvert.kt
@@ -0,0 +1,38 @@
package kotlinx.coroutines.javafx

import javafx.beans.value.ChangeListener
import javafx.beans.value.ObservableValue
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.flow.conflate
import kotlinx.coroutines.flow.flowOn

/**
* Creates an instance of a cold [Flow] that subscribes to the given [ObservableValue] and produces
* its values as they change.
*
* The resulting flow is conflated, meaning that if several values arrive in a quick succession, only
* the last one will be produced.
*
* It produces at least one value.
*
* Since this implementation uses [ObservableValue.addListener], even if this [ObservableValue]
* supports lazy evaluation, eager computation will be enforced while the flow is being collected.
*/
@ExperimentalCoroutinesApi
public fun <T: Any> ObservableValue<T>.asFlow(): Flow<T> = callbackFlow<T> {
val listener = ChangeListener<T> { _, _, newValue ->
try {
offer(newValue)
} catch (e: CancellationException) {
// In case the event fires after the channel is closed
}
}
addListener(listener)
send(value)
awaitClose {
removeListener(listener)
}
}.flowOn(Dispatchers.JavaFx).conflate()
Expand Up @@ -8,7 +8,7 @@ import javafx.application.*
import kotlinx.coroutines.*
import org.junit.*

class JavaFxTest : TestBase() {
class JavaFxDispatcherTest : TestBase() {
@Before
fun setup() {
ignoreLostThreads("JavaFX Application Thread", "Thread-", "QuantumRenderer-", "InvokeLaterDispatcher")
Expand Down
86 changes: 86 additions & 0 deletions ui/kotlinx-coroutines-javafx/test/JavaFxObservableAsFlowTest.kt
@@ -0,0 +1,86 @@
package kotlinx.coroutines.javafx

import javafx.beans.property.SimpleIntegerProperty
import kotlinx.coroutines.TestBase
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import org.junit.Before
import org.junit.Test
import kotlin.test.*


class JavaFxObservableAsFlowTest : TestBase() {

@Before
fun setup() {
ignoreLostThreads("JavaFX Application Thread", "Thread-", "QuantumRenderer-", "InvokeLaterDispatcher")
}

@Test
fun testFlowOrder() = runTest {
if (!initPlatform()) {
println("Skipping JavaFxTest in headless environment")
return@runTest // ignore test in headless environments
}

val integerProperty = SimpleIntegerProperty(0)
val n = 10000 * stressTestMultiplier
val flow = integerProperty.asFlow().takeWhile { j -> j != n }
newSingleThreadContext("setter").use { pool ->
launch(pool) {
for (i in 1..n) {
launch(Dispatchers.JavaFx) {
integerProperty.set(i)
}
}
}
var i = -1
flow.collect { j ->
assertTrue(i < (j as Int), "Elements are neither repeated nor shuffled")
i = j
}
}
}

@Test
fun testConflation() = runTest {
if (!initPlatform()) {
println("Skipping JavaFxTest in headless environment")
return@runTest // ignore test in headless environments
}

withContext(Dispatchers.JavaFx) {
val END_MARKER = -1
val integerProperty = SimpleIntegerProperty(0)
val flow = integerProperty.asFlow().takeWhile { j -> j != END_MARKER }
launch {
yield() // to subscribe to [integerProperty]
yield() // send 0
integerProperty.set(1)
expect(3)
yield() // send 1
expect(5)
integerProperty.set(2)
for (i in (-100..-2)) {
integerProperty.set(i) // should be skipped due to conflation
}
integerProperty.set(3)
expect(6)
yield() // send 2 and 3
integerProperty.set(-1)
}
expect(1)
flow.collect { i ->
when (i) {
0 -> expect(2)
1 -> expect(4)
2 -> expect(7)
3 -> expect(8)
else -> fail("i is $i")
}
}
finish(9)
}
}

}
39 changes: 39 additions & 0 deletions ui/kotlinx-coroutines-javafx/test/JavaFxStressTest.kt
@@ -0,0 +1,39 @@
package kotlinx.coroutines.javafx

import javafx.beans.property.SimpleIntegerProperty
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.first
import org.junit.Before
import org.junit.Test

class JavaFxStressTest : TestBase() {

@Before
fun setup() {
ignoreLostThreads("JavaFX Application Thread", "Thread-", "QuantumRenderer-", "InvokeLaterDispatcher")
}

@Test
fun cancellationRaceStressTest() = runTest {
if (!initPlatform()) {
println("Skipping JavaFxTest in headless environment")
return@runTest // ignore test in headless environments
}

val integerProperty = SimpleIntegerProperty(0)
val flow = integerProperty.asFlow()
var i = 1
val n = 1000 * stressTestMultiplier
newSingleThreadContext("collector").use { pool ->
repeat (n) {
launch(pool) {
flow.first()
}
withContext(Dispatchers.JavaFx) {
integerProperty.set(i)
}
i += 1
}
}
}
}

0 comments on commit 897fefa

Please sign in to comment.