Skip to content

Commit

Permalink
Implement ObservableValue<T>.asFlow()
Browse files Browse the repository at this point in the history
  • Loading branch information
dkhalanskyjb committed Feb 3, 2020
1 parent 6810745 commit 3f795b1
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 1 deletion.
@@ -1,3 +1,7 @@
public final class kotlinx/coroutines/javafx/JavaFxConvertKt {
public static final fun asFlow (Ljavafx/beans/value/ObservableValue;)Lkotlinx/coroutines/flow/Flow;
}

public abstract class kotlinx/coroutines/javafx/JavaFxDispatcher : kotlinx/coroutines/MainCoroutineDispatcher, kotlinx/coroutines/Delay {
public fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun dispatch (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Runnable;)V
Expand Down
49 changes: 49 additions & 0 deletions ui/kotlinx-coroutines-javafx/src/JavaFxConvert.kt
@@ -0,0 +1,49 @@
package kotlinx.coroutines.javafx

import kotlinx.coroutines.ExperimentalCoroutinesApi
import javafx.beans.value.ChangeListener
import javafx.beans.value.ObservableValue
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.flow.conflate
import java.util.concurrent.atomic.AtomicInteger

/**
* Creates an instance of a cold [Flow] that subscribes to the given [ObservableValue] and produces
* its values as they change.
*
* The resulting flow is conflated, meaning that if several values arrive in a quick succession, only
* the last one will be produced.
*
* It produces at least one value.
*
* Since this implementation uses [ObservableValue.addListener], even if this [ObservableValue]
* supports lazy evaluation, eager computation will be enforced while the flow is being collected.
*/
@ExperimentalCoroutinesApi
fun <T: Any> ObservableValue<T>.asFlow(): Flow<T> = callbackFlow<T> {
// It is unknown which thread will produce the initial value
val UNKNOWN_SOURCE = 0
// 1 -- some thread succeeded in CAS, so it will offer the first value
val DETERMINED_SOURCE = 1
// 2 -- the first value has been offered, so everyone else may proceed
val INITIAL_OFFER_PASSED = 2
val initialOfferState = AtomicInteger(UNKNOWN_SOURCE)
val listener = ChangeListener<T> { observable, oldValue, newValue ->
while (initialOfferState.get() != INITIAL_OFFER_PASSED) {
if (initialOfferState.compareAndSet(UNKNOWN_SOURCE, DETERMINED_SOURCE)) {
offer(newValue)

This comment has been minimized.

Copy link
@LouisCAD

LouisCAD Feb 3, 2020

Contributor

This will crash the program when the flow is closed as the listener has not been unregistered yet as awaitClose didn't execute its lambda yet and a new value is arriving in the callback. See #974

This comment has been minimized.

Copy link
@dkhalanskyjb

dkhalanskyjb Feb 4, 2020

Author Collaborator

Thank you for your analysis! No need to worry yet though: this change is still WIP, hasn't been reviewed, even tests do not exist for this yet. So, this code is bound to change significantly.

This comment has been minimized.

Copy link
@LouisCAD

LouisCAD Feb 4, 2020

Contributor

Testing the issue with offer is very inconvenient though, so it happens mostly in production… hence my warning until #974 is resolved.

This comment has been minimized.

Copy link
@dkhalanskyjb

dkhalanskyjb Feb 4, 2020

Author Collaborator

Yes, this is a good catch.

initialOfferState.set(INITIAL_OFFER_PASSED)
return@ChangeListener
}
}
offer(newValue)
}
addListener(listener)
if (initialOfferState.compareAndSet(UNKNOWN_SOURCE, DETERMINED_SOURCE)) {
send(value)
initialOfferState.set(INITIAL_OFFER_PASSED)
}
awaitClose { removeListener(listener) }
}.conflate()
Expand Up @@ -8,7 +8,7 @@ import javafx.application.*
import kotlinx.coroutines.*
import org.junit.*

class JavaFxTest : TestBase() {
class JavaFxDispatcherTest : TestBase() {
@Before
fun setup() {
ignoreLostThreads("JavaFX Application Thread", "Thread-", "QuantumRenderer-", "InvokeLaterDispatcher")
Expand Down

0 comments on commit 3f795b1

Please sign in to comment.