From 3f795b10f263fa4428f01b52007f05ac6d2ef711 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Thu, 30 Jan 2020 18:33:19 +0300 Subject: [PATCH] Implement ObservableValue.asFlow() --- .../kotlinx-coroutines-javafx.txt | 4 ++ .../src/JavaFxConvert.kt | 49 +++++++++++++++++++ ...{JavaFxTest.kt => JavaFxDispatcherTest.kt} | 2 +- 3 files changed, 54 insertions(+), 1 deletion(-) create mode 100644 ui/kotlinx-coroutines-javafx/src/JavaFxConvert.kt rename ui/kotlinx-coroutines-javafx/test/{JavaFxTest.kt => JavaFxDispatcherTest.kt} (97%) diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-javafx.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-javafx.txt index 24c5b70b9d..620e904612 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-javafx.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-javafx.txt @@ -1,3 +1,7 @@ +public final class kotlinx/coroutines/javafx/JavaFxConvertKt { + public static final fun asFlow (Ljavafx/beans/value/ObservableValue;)Lkotlinx/coroutines/flow/Flow; +} + public abstract class kotlinx/coroutines/javafx/JavaFxDispatcher : kotlinx/coroutines/MainCoroutineDispatcher, kotlinx/coroutines/Delay { public fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object; public fun dispatch (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Runnable;)V diff --git a/ui/kotlinx-coroutines-javafx/src/JavaFxConvert.kt b/ui/kotlinx-coroutines-javafx/src/JavaFxConvert.kt new file mode 100644 index 0000000000..fd0a86e614 --- /dev/null +++ b/ui/kotlinx-coroutines-javafx/src/JavaFxConvert.kt @@ -0,0 +1,49 @@ +package kotlinx.coroutines.javafx + +import kotlinx.coroutines.ExperimentalCoroutinesApi +import javafx.beans.value.ChangeListener +import javafx.beans.value.ObservableValue +import kotlinx.coroutines.channels.awaitClose +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.callbackFlow +import kotlinx.coroutines.flow.conflate +import java.util.concurrent.atomic.AtomicInteger + +/** + * Creates an instance of a cold [Flow] that subscribes to the given [ObservableValue] and produces + * its values as they change. + * + * The resulting flow is conflated, meaning that if several values arrive in a quick succession, only + * the last one will be produced. + * + * It produces at least one value. + * + * Since this implementation uses [ObservableValue.addListener], even if this [ObservableValue] + * supports lazy evaluation, eager computation will be enforced while the flow is being collected. + */ +@ExperimentalCoroutinesApi +fun ObservableValue.asFlow(): Flow = callbackFlow { + // It is unknown which thread will produce the initial value + val UNKNOWN_SOURCE = 0 + // 1 -- some thread succeeded in CAS, so it will offer the first value + val DETERMINED_SOURCE = 1 + // 2 -- the first value has been offered, so everyone else may proceed + val INITIAL_OFFER_PASSED = 2 + val initialOfferState = AtomicInteger(UNKNOWN_SOURCE) + val listener = ChangeListener { observable, oldValue, newValue -> + while (initialOfferState.get() != INITIAL_OFFER_PASSED) { + if (initialOfferState.compareAndSet(UNKNOWN_SOURCE, DETERMINED_SOURCE)) { + offer(newValue) + initialOfferState.set(INITIAL_OFFER_PASSED) + return@ChangeListener + } + } + offer(newValue) + } + addListener(listener) + if (initialOfferState.compareAndSet(UNKNOWN_SOURCE, DETERMINED_SOURCE)) { + send(value) + initialOfferState.set(INITIAL_OFFER_PASSED) + } + awaitClose { removeListener(listener) } +}.conflate() \ No newline at end of file diff --git a/ui/kotlinx-coroutines-javafx/test/JavaFxTest.kt b/ui/kotlinx-coroutines-javafx/test/JavaFxDispatcherTest.kt similarity index 97% rename from ui/kotlinx-coroutines-javafx/test/JavaFxTest.kt rename to ui/kotlinx-coroutines-javafx/test/JavaFxDispatcherTest.kt index e6a1ddb414..724be6d77b 100644 --- a/ui/kotlinx-coroutines-javafx/test/JavaFxTest.kt +++ b/ui/kotlinx-coroutines-javafx/test/JavaFxDispatcherTest.kt @@ -8,7 +8,7 @@ import javafx.application.* import kotlinx.coroutines.* import org.junit.* -class JavaFxTest : TestBase() { +class JavaFxDispatcherTest : TestBase() { @Before fun setup() { ignoreLostThreads("JavaFX Application Thread", "Thread-", "QuantumRenderer-", "InvokeLaterDispatcher")