From 12e96cdcc628edc2bf22755dafb658e96d1acb35 Mon Sep 17 00:00:00 2001 From: dkhalanskyjb <52952525+dkhalanskyjb@users.noreply.github.com> Date: Wed, 4 Mar 2020 16:58:18 +0300 Subject: [PATCH] Implement ObservableValue.asFlow() (#1789) * Implement ObservableValue.asFlow() Fixes #1695 --- .../api/kotlinx-coroutines-javafx.api | 4 + .../src/JavaFxConvert.kt | 41 +++++++ .../src/JavaFxDispatcher.kt | 2 +- ...{JavaFxTest.kt => JavaFxDispatcherTest.kt} | 2 +- .../test/JavaFxObservableAsFlowTest.kt | 86 +++++++++++++++ .../test/JavaFxStressTest.kt | 39 +++++++ .../test/examples/FxAsFlow.kt | 101 ++++++++++++++++++ 7 files changed, 273 insertions(+), 2 deletions(-) create mode 100644 ui/kotlinx-coroutines-javafx/src/JavaFxConvert.kt rename ui/kotlinx-coroutines-javafx/test/{JavaFxTest.kt => JavaFxDispatcherTest.kt} (97%) create mode 100644 ui/kotlinx-coroutines-javafx/test/JavaFxObservableAsFlowTest.kt create mode 100644 ui/kotlinx-coroutines-javafx/test/JavaFxStressTest.kt create mode 100644 ui/kotlinx-coroutines-javafx/test/examples/FxAsFlow.kt diff --git a/ui/kotlinx-coroutines-javafx/api/kotlinx-coroutines-javafx.api b/ui/kotlinx-coroutines-javafx/api/kotlinx-coroutines-javafx.api index 24c5b70b9d..620e904612 100644 --- a/ui/kotlinx-coroutines-javafx/api/kotlinx-coroutines-javafx.api +++ b/ui/kotlinx-coroutines-javafx/api/kotlinx-coroutines-javafx.api @@ -1,3 +1,7 @@ +public final class kotlinx/coroutines/javafx/JavaFxConvertKt { + public static final fun asFlow (Ljavafx/beans/value/ObservableValue;)Lkotlinx/coroutines/flow/Flow; +} + public abstract class kotlinx/coroutines/javafx/JavaFxDispatcher : kotlinx/coroutines/MainCoroutineDispatcher, kotlinx/coroutines/Delay { public fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object; public fun dispatch (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Runnable;)V diff --git a/ui/kotlinx-coroutines-javafx/src/JavaFxConvert.kt b/ui/kotlinx-coroutines-javafx/src/JavaFxConvert.kt new file mode 100644 index 0000000000..903b60a2cf --- /dev/null +++ b/ui/kotlinx-coroutines-javafx/src/JavaFxConvert.kt @@ -0,0 +1,41 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.javafx + +import javafx.beans.value.ChangeListener +import javafx.beans.value.ObservableValue +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.awaitClose +import kotlinx.coroutines.flow.* + +/** + * Creates an instance of a cold [Flow] that subscribes to the given [ObservableValue] and emits + * its values as they change. The resulting flow is conflated, meaning that if several values arrive in quick + * succession, only the last one will be emitted. + * Since this implementation uses [ObservableValue.addListener], even if this [ObservableValue] + * supports lazy evaluation, eager computation will be enforced while the flow is being collected. + * All the calls to JavaFX API are performed in [Dispatchers.JavaFx]. + * This flow emits at least the initial value. + * + * ### Operator fusion + * + * Adjacent applications of [flowOn], [buffer], [conflate], and [produceIn] to the result of `asFlow` are fused. + * [conflate] has no effect, as this flow is already conflated; one can use [buffer] to change that instead. + */ +@ExperimentalCoroutinesApi +public fun ObservableValue.asFlow(): Flow = callbackFlow { + val listener = ChangeListener { _, _, newValue -> + try { + offer(newValue) + } catch (e: CancellationException) { + // In case the event fires after the channel is closed + } + } + addListener(listener) + send(value) + awaitClose { + removeListener(listener) + } +}.flowOn(Dispatchers.JavaFx).conflate() diff --git a/ui/kotlinx-coroutines-javafx/src/JavaFxDispatcher.kt b/ui/kotlinx-coroutines-javafx/src/JavaFxDispatcher.kt index 8f5be31dd4..ed74ad6a56 100644 --- a/ui/kotlinx-coroutines-javafx/src/JavaFxDispatcher.kt +++ b/ui/kotlinx-coroutines-javafx/src/JavaFxDispatcher.kt @@ -116,7 +116,7 @@ private class PulseTimer : AnimationTimer() { } } -/** @return [true] if initialized successfully, and [false] if no display is detected */ +/** @return true if initialized successfully, and false if no display is detected */ internal fun initPlatform(): Boolean = PlatformInitializer.success // Lazily try to initialize JavaFx platform just once diff --git a/ui/kotlinx-coroutines-javafx/test/JavaFxTest.kt b/ui/kotlinx-coroutines-javafx/test/JavaFxDispatcherTest.kt similarity index 97% rename from ui/kotlinx-coroutines-javafx/test/JavaFxTest.kt rename to ui/kotlinx-coroutines-javafx/test/JavaFxDispatcherTest.kt index e6a1ddb414..724be6d77b 100644 --- a/ui/kotlinx-coroutines-javafx/test/JavaFxTest.kt +++ b/ui/kotlinx-coroutines-javafx/test/JavaFxDispatcherTest.kt @@ -8,7 +8,7 @@ import javafx.application.* import kotlinx.coroutines.* import org.junit.* -class JavaFxTest : TestBase() { +class JavaFxDispatcherTest : TestBase() { @Before fun setup() { ignoreLostThreads("JavaFX Application Thread", "Thread-", "QuantumRenderer-", "InvokeLaterDispatcher") diff --git a/ui/kotlinx-coroutines-javafx/test/JavaFxObservableAsFlowTest.kt b/ui/kotlinx-coroutines-javafx/test/JavaFxObservableAsFlowTest.kt new file mode 100644 index 0000000000..6964050102 --- /dev/null +++ b/ui/kotlinx-coroutines-javafx/test/JavaFxObservableAsFlowTest.kt @@ -0,0 +1,86 @@ +package kotlinx.coroutines.javafx + +import javafx.beans.property.SimpleIntegerProperty +import kotlinx.coroutines.TestBase +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import org.junit.Before +import org.junit.Test +import kotlin.test.* + + +class JavaFxObservableAsFlowTest : TestBase() { + + @Before + fun setup() { + ignoreLostThreads("JavaFX Application Thread", "Thread-", "QuantumRenderer-", "InvokeLaterDispatcher") + } + + @Test + fun testFlowOrder() = runTest { + if (!initPlatform()) { + println("Skipping JavaFxTest in headless environment") + return@runTest // ignore test in headless environments + } + + val integerProperty = SimpleIntegerProperty(0) + val n = 1000 + val flow = integerProperty.asFlow().takeWhile { j -> j != n } + newSingleThreadContext("setter").use { pool -> + launch(pool) { + for (i in 1..n) { + launch(Dispatchers.JavaFx) { + integerProperty.set(i) + } + } + } + var i = -1 + flow.collect { j -> + assertTrue(i < (j as Int), "Elements are neither repeated nor shuffled") + i = j + } + } + } + + @Test + fun testConflation() = runTest { + if (!initPlatform()) { + println("Skipping JavaFxTest in headless environment") + return@runTest // ignore test in headless environments + } + + withContext(Dispatchers.JavaFx) { + val END_MARKER = -1 + val integerProperty = SimpleIntegerProperty(0) + val flow = integerProperty.asFlow().takeWhile { j -> j != END_MARKER } + launch { + yield() // to subscribe to [integerProperty] + yield() // send 0 + integerProperty.set(1) + expect(3) + yield() // send 1 + expect(5) + integerProperty.set(2) + for (i in (-100..-2)) { + integerProperty.set(i) // should be skipped due to conflation + } + integerProperty.set(3) + expect(6) + yield() // send 2 and 3 + integerProperty.set(-1) + } + expect(1) + flow.collect { i -> + when (i) { + 0 -> expect(2) + 1 -> expect(4) + 2 -> expect(7) + 3 -> expect(8) + else -> fail("i is $i") + } + } + finish(9) + } + } + +} diff --git a/ui/kotlinx-coroutines-javafx/test/JavaFxStressTest.kt b/ui/kotlinx-coroutines-javafx/test/JavaFxStressTest.kt new file mode 100644 index 0000000000..5338835d84 --- /dev/null +++ b/ui/kotlinx-coroutines-javafx/test/JavaFxStressTest.kt @@ -0,0 +1,39 @@ +package kotlinx.coroutines.javafx + +import javafx.beans.property.SimpleIntegerProperty +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.first +import org.junit.* + +class JavaFxStressTest : TestBase() { + + @Before + fun setup() { + ignoreLostThreads("JavaFX Application Thread", "Thread-", "QuantumRenderer-", "InvokeLaterDispatcher") + } + + @get:Rule + val pool = ExecutorRule(1) + + @Test + fun testCancellationRace() = runTest { + if (!initPlatform()) { + println("Skipping JavaFxTest in headless environment") + return@runTest // ignore test in headless environments + } + + val integerProperty = SimpleIntegerProperty(0) + val flow = integerProperty.asFlow() + var i = 1 + val n = 1000 * stressTestMultiplier + repeat (n) { + launch(pool) { + flow.first() + } + withContext(Dispatchers.JavaFx) { + integerProperty.set(i) + } + i += 1 + } + } +} \ No newline at end of file diff --git a/ui/kotlinx-coroutines-javafx/test/examples/FxAsFlow.kt b/ui/kotlinx-coroutines-javafx/test/examples/FxAsFlow.kt new file mode 100644 index 0000000000..00003f7860 --- /dev/null +++ b/ui/kotlinx-coroutines-javafx/test/examples/FxAsFlow.kt @@ -0,0 +1,101 @@ +package examples + +import javafx.application.Application +import javafx.scene.Scene +import javafx.scene.control.* +import javafx.scene.layout.GridPane +import javafx.stage.Stage +import javafx.beans.property.SimpleStringProperty +import javafx.event.EventHandler +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import kotlinx.coroutines.javafx.* +import kotlin.coroutines.CoroutineContext + +fun main(args: Array) { + Application.launch(FxAsFlowApp::class.java, *args) +} + +/** + * Adapted from + * https://github.com/ReactiveX/RxJavaFX/blob/a78ca7d15f7d82d201df8fafb6eba732ec17e327/src/test/java/io/reactivex/rxjavafx/RxJavaFXTest.java + */ +class FxAsFlowApp: Application(), CoroutineScope { + + private var job = Job() + override val coroutineContext: CoroutineContext + get() = JavaFx + job + + private val incrementButton = Button("Increment") + private val incrementLabel = Label("") + private val textInput = TextField() + private val flippedTextLabel = Label() + private val spinner = Spinner() + private val spinnerChangesLabel = Label() + + public override fun start( primaryStage: Stage) { + val gridPane = GridPane() + gridPane.apply { + hgap = 10.0 + vgap = 10.0 + add(incrementButton, 0, 0) + add(incrementLabel, 1, 0) + add(textInput, 0, 1) + add(flippedTextLabel, 1, 1) + add(spinner, 0, 2) + add(spinnerChangesLabel, 1, 2) + } + val scene = Scene(gridPane) + primaryStage.apply { + width = 275.0 + setScene(scene) + show() + } + } + + public override fun stop() { + super.stop() + job.cancel() + job = Job() + } + + init { + // Initializing the "Increment" button + val stringProperty = SimpleStringProperty() + var i = 0 + incrementButton.onAction = EventHandler { + i += 1 + stringProperty.set(i.toString()) + } + launch { + stringProperty.asFlow().collect { + if (it != null) { + stringProperty.set(it) + } + } + } + incrementLabel.textProperty().bind(stringProperty) + // Initializing the reversed text field + val stringProperty2 = SimpleStringProperty() + launch { + textInput.textProperty().asFlow().collect { + if (it != null) { + stringProperty2.set(it.reversed()) + } + } + } + flippedTextLabel.textProperty().bind(stringProperty2) + // Initializing the spinner + spinner.valueFactory = SpinnerValueFactory.IntegerSpinnerValueFactory(0, 100) + spinner.isEditable = true + val stringProperty3 = SimpleStringProperty() + launch { + spinner.valueProperty().asFlow().collect { + if (it != null) { + stringProperty3.set("NEW: $it") + } + } + } + spinnerChangesLabel.textProperty().bind(stringProperty3) + } +}