Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement ObservableValue<T>.asFlow() #1789

Merged
merged 6 commits into from Mar 4, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -1,3 +1,7 @@
public final class kotlinx/coroutines/javafx/JavaFxConvertKt {
public static final fun asFlow (Ljavafx/beans/value/ObservableValue;)Lkotlinx/coroutines/flow/Flow;
}

public abstract class kotlinx/coroutines/javafx/JavaFxDispatcher : kotlinx/coroutines/MainCoroutineDispatcher, kotlinx/coroutines/Delay {
public fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun dispatch (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Runnable;)V
Expand Down
46 changes: 46 additions & 0 deletions ui/kotlinx-coroutines-javafx/src/JavaFxConvert.kt
@@ -0,0 +1,46 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.javafx
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved

import javafx.beans.value.ChangeListener
import javafx.beans.value.ObservableValue
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.*

/**
* Creates an instance of a cold [Flow] that subscribes to the given [ObservableValue] and produces
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
* its values as they change.
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
*
* The resulting flow is conflated, meaning that if several values arrive in a quick succession, only
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
* the last one will be produced.
*
* It produces at least one value.
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
*
* Since this implementation uses [ObservableValue.addListener], even if this [ObservableValue]
* supports lazy evaluation, eager computation will be enforced while the flow is being collected.
*
* All the calls to JavaFX API are performed in the JavaFX application thread.
*
* ### Operator fusion
*
* Adjacent applications of [flowOn], [buffer], [conflate], and [produceIn] to the result of `asFlow` are fused.
* [conflate] has no effect, as this flow is already conflated; one can use [buffer] to change that instead.
*/
@ExperimentalCoroutinesApi
public fun <T> ObservableValue<T>.asFlow(): Flow<T> = callbackFlow<T> {
val listener = ChangeListener<T> { _, _, newValue ->
try {
offer(newValue)
} catch (e: CancellationException) {
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
// In case the event fires after the channel is closed
}
}
addListener(listener)
send(value)
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
awaitClose {
removeListener(listener)
}
}.flowOn(Dispatchers.JavaFx).conflate()
2 changes: 1 addition & 1 deletion ui/kotlinx-coroutines-javafx/src/JavaFxDispatcher.kt
Expand Up @@ -116,7 +116,7 @@ private class PulseTimer : AnimationTimer() {
}
}

/** @return [true] if initialized successfully, and [false] if no display is detected */
/** @return true if initialized successfully, and false if no display is detected */
internal fun initPlatform(): Boolean = PlatformInitializer.success

// Lazily try to initialize JavaFx platform just once
Expand Down
Expand Up @@ -8,7 +8,7 @@ import javafx.application.*
import kotlinx.coroutines.*
import org.junit.*

class JavaFxTest : TestBase() {
class JavaFxDispatcherTest : TestBase() {
@Before
fun setup() {
ignoreLostThreads("JavaFX Application Thread", "Thread-", "QuantumRenderer-", "InvokeLaterDispatcher")
Expand Down
86 changes: 86 additions & 0 deletions ui/kotlinx-coroutines-javafx/test/JavaFxObservableAsFlowTest.kt
@@ -0,0 +1,86 @@
package kotlinx.coroutines.javafx

import javafx.beans.property.SimpleIntegerProperty
import kotlinx.coroutines.TestBase
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import org.junit.Before
import org.junit.Test
import kotlin.test.*


class JavaFxObservableAsFlowTest : TestBase() {

@Before
fun setup() {
ignoreLostThreads("JavaFX Application Thread", "Thread-", "QuantumRenderer-", "InvokeLaterDispatcher")
}

@Test
fun testFlowOrder() = runTest {
if (!initPlatform()) {
println("Skipping JavaFxTest in headless environment")
return@runTest // ignore test in headless environments
}

val integerProperty = SimpleIntegerProperty(0)
val n = 1000
val flow = integerProperty.asFlow().takeWhile { j -> j != n }
newSingleThreadContext("setter").use { pool ->
launch(pool) {
for (i in 1..n) {
launch(Dispatchers.JavaFx) {
integerProperty.set(i)
}
}
}
var i = -1
flow.collect { j ->
assertTrue(i < (j as Int), "Elements are neither repeated nor shuffled")
i = j
}
}
}

@Test
fun testConflation() = runTest {
if (!initPlatform()) {
println("Skipping JavaFxTest in headless environment")
return@runTest // ignore test in headless environments
}

withContext(Dispatchers.JavaFx) {
val END_MARKER = -1
val integerProperty = SimpleIntegerProperty(0)
val flow = integerProperty.asFlow().takeWhile { j -> j != END_MARKER }
launch {
yield() // to subscribe to [integerProperty]
yield() // send 0
integerProperty.set(1)
expect(3)
yield() // send 1
expect(5)
integerProperty.set(2)
for (i in (-100..-2)) {
integerProperty.set(i) // should be skipped due to conflation
}
integerProperty.set(3)
expect(6)
yield() // send 2 and 3
integerProperty.set(-1)
}
expect(1)
flow.collect { i ->
when (i) {
0 -> expect(2)
1 -> expect(4)
2 -> expect(7)
3 -> expect(8)
else -> fail("i is $i")
}
}
finish(9)
}
}

}
39 changes: 39 additions & 0 deletions ui/kotlinx-coroutines-javafx/test/JavaFxStressTest.kt
@@ -0,0 +1,39 @@
package kotlinx.coroutines.javafx

import javafx.beans.property.SimpleIntegerProperty
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.first
import org.junit.Before
import org.junit.Test

class JavaFxStressTest : TestBase() {

@Before
fun setup() {
ignoreLostThreads("JavaFX Application Thread", "Thread-", "QuantumRenderer-", "InvokeLaterDispatcher")
}

@Test
fun testCancellationRace() = runTest {
if (!initPlatform()) {
println("Skipping JavaFxTest in headless environment")
return@runTest // ignore test in headless environments
}

val integerProperty = SimpleIntegerProperty(0)
val flow = integerProperty.asFlow()
var i = 1
val n = 1000 * stressTestMultiplier
newSingleThreadContext("collector").use { pool ->
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
repeat (n) {
launch(pool) {
flow.first()
}
withContext(Dispatchers.JavaFx) {
integerProperty.set(i)
}
i += 1
}
}
}
}
101 changes: 101 additions & 0 deletions ui/kotlinx-coroutines-javafx/test/examples/FxAsFlow.kt
@@ -0,0 +1,101 @@
package examples

import javafx.application.Application
import javafx.scene.Scene
import javafx.scene.control.*
import javafx.scene.layout.GridPane
import javafx.stage.Stage
import javafx.beans.property.SimpleStringProperty
import javafx.event.EventHandler
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.javafx.*
import kotlin.coroutines.CoroutineContext

fun main(args: Array<String>) {
Application.launch(FxAsFlowApp::class.java, *args)
}

/**
* Adapted from
* https://github.com/ReactiveX/RxJavaFX/blob/a78ca7d15f7d82d201df8fafb6eba732ec17e327/src/test/java/io/reactivex/rxjavafx/RxJavaFXTest.java
*/
class FxAsFlowApp: Application(), CoroutineScope {

private var job = Job()
override val coroutineContext: CoroutineContext
get() = JavaFx + job

private val incrementButton = Button("Increment")
private val incrementLabel = Label("")
private val textInput = TextField()
private val flippedTextLabel = Label()
private val spinner = Spinner<Int>()
private val spinnerChangesLabel = Label()

public override fun start( primaryStage: Stage) {
val gridPane = GridPane()
gridPane.apply {
hgap = 10.0
vgap = 10.0
add(incrementButton, 0, 0)
add(incrementLabel, 1, 0)
add(textInput, 0, 1)
add(flippedTextLabel, 1, 1)
add(spinner, 0, 2)
add(spinnerChangesLabel, 1, 2)
}
val scene = Scene(gridPane)
primaryStage.apply {
width = 275.0
setScene(scene)
show()
}
}

public override fun stop() {
super.stop()
job.cancel()
job = Job()
}

init {
// Initializing the "Increment" button
val stringProperty = SimpleStringProperty()
var i = 0
incrementButton.onAction = EventHandler {
i += 1
stringProperty.set(i.toString())
}
launch {
stringProperty.asFlow().collect {
if (it != null) {
stringProperty.set(it)
}
}
}
incrementLabel.textProperty().bind(stringProperty)
// Initializing the reversed text field
val stringProperty2 = SimpleStringProperty()
launch {
textInput.textProperty().asFlow().collect {
if (it != null) {
stringProperty2.set(it.reversed())
}
}
}
flippedTextLabel.textProperty().bind(stringProperty2)
// Initializing the spinner
spinner.valueFactory = SpinnerValueFactory.IntegerSpinnerValueFactory(0, 100)
spinner.isEditable = true
val stringProperty3 = SimpleStringProperty()
launch {
spinner.valueProperty().asFlow().collect {
if (it != null) {
stringProperty3.set("NEW: $it")
}
}
}
spinnerChangesLabel.textProperty().bind(stringProperty3)
}
}