Skip to content

Commit

Permalink
Merge pull request #93 from splendo/coroutine_cancellation_fix
Browse files Browse the repository at this point in the history
Fixed Coroutine Cancellation Exception for concurrent close and offer
  • Loading branch information
nbransby committed Oct 21, 2020
2 parents e9d38a5 + beeab75 commit c4b45b0
Show file tree
Hide file tree
Showing 10 changed files with 64 additions and 50 deletions.
Expand Up @@ -6,6 +6,9 @@
@file:JvmName("CommonKt")
package dev.gitlive.firebase

import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.channels.ClosedSendChannelException
import kotlinx.coroutines.channels.SendChannel
import kotlin.jvm.JvmMultifileClass
import kotlin.jvm.JvmName

Expand Down Expand Up @@ -59,4 +62,12 @@ expect class FirebaseNetworkException : FirebaseException

expect open class FirebaseTooManyRequestsException : FirebaseException

expect open class FirebaseApiNotAvailableException : FirebaseException
expect open class FirebaseApiNotAvailableException : FirebaseException

inline fun <E> SendChannel<E>.offerOrNull(element: E): Boolean? = try {
offer(element)
} catch (e : ClosedSendChannelException) {
null
} catch (e : CancellationException) {
null
}
Expand Up @@ -11,6 +11,7 @@ import com.google.firebase.auth.ActionCodeResult.*
import com.google.firebase.auth.FirebaseAuth.AuthStateListener
import dev.gitlive.firebase.Firebase
import dev.gitlive.firebase.FirebaseApp
import dev.gitlive.firebase.offerOrNull
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.callbackFlow
Expand All @@ -26,15 +27,15 @@ actual class FirebaseAuth internal constructor(val android: com.google.firebase.
actual val currentUser: FirebaseUser?
get() = android.currentUser?.let { FirebaseUser(it) }

actual val authStateChanged get() = callbackFlow {
val listener = AuthStateListener { auth -> offer(auth.currentUser?.let { FirebaseUser(it) }) }
actual val authStateChanged get() = callbackFlow<FirebaseUser?> {
val listener = AuthStateListener { auth -> offerOrNull(auth.currentUser?.let { FirebaseUser(it) }) }
android.addAuthStateListener(listener)
awaitClose { android.removeAuthStateListener(listener) }
}

actual val idTokenChanged: Flow<FirebaseUser?>
get() = callbackFlow {
val listener = com.google.firebase.auth.FirebaseAuth.IdTokenListener { auth -> offer(auth.currentUser?.let { FirebaseUser(it) })}
val listener = com.google.firebase.auth.FirebaseAuth.IdTokenListener { auth -> offerOrNull(auth.currentUser?.let { FirebaseUser(it) }) }
android.addIdTokenListener(listener)
awaitClose { android.removeIdTokenListener(listener) }
}
Expand Down
Expand Up @@ -8,6 +8,7 @@ import cocoapods.FirebaseAuth.*
import dev.gitlive.firebase.Firebase
import dev.gitlive.firebase.FirebaseApp
import dev.gitlive.firebase.FirebaseException
import dev.gitlive.firebase.offerOrNull
import dev.gitlive.firebase.auth.ActionCodeResult.*
import kotlinx.cinterop.*
import kotlinx.coroutines.CompletableDeferred
Expand All @@ -27,13 +28,13 @@ actual class FirebaseAuth internal constructor(val ios: FIRAuth) {
actual val currentUser: FirebaseUser?
get() = ios.currentUser?.let { FirebaseUser(it) }

actual val authStateChanged get() = callbackFlow {
val handle = ios.addAuthStateDidChangeListener { _, user -> offer(user?.let { FirebaseUser(it) }) }
actual val authStateChanged get() = callbackFlow<FirebaseUser?> {
val handle = ios.addAuthStateDidChangeListener { _, user -> offerOrNull(user?.let { FirebaseUser(it) }) }
awaitClose { ios.removeAuthStateDidChangeListener(handle) }
}

actual val idTokenChanged get() = callbackFlow {
val handle = ios.addIDTokenDidChangeListener { _, user -> offer(user?.let { FirebaseUser(it) }) }
actual val idTokenChanged get() = callbackFlow<FirebaseUser?> {
val handle = ios.addIDTokenDidChangeListener { _, user -> offerOrNull(user?.let { FirebaseUser(it) }) }
awaitClose { ios.removeIDTokenDidChangeListener(handle) }
}

Expand Down
Expand Up @@ -21,16 +21,16 @@ actual class FirebaseAuth internal constructor(val js: firebase.auth.Auth) {
actual val currentUser: FirebaseUser?
get() = rethrow { js.currentUser?.let { FirebaseUser(it) } }

actual val authStateChanged get() = callbackFlow {
actual val authStateChanged get() = callbackFlow<FirebaseUser?> {
val unsubscribe = js.onAuthStateChanged {
offer(it?.let { FirebaseUser(it) })
offerOrNull(it?.let { FirebaseUser(it) })
}
awaitClose { unsubscribe() }
}

actual val idTokenChanged get() = callbackFlow {
actual val idTokenChanged get() = callbackFlow<FirebaseUser?> {
val unsubscribe = js.onIdTokenChanged {
offer(it?.let { FirebaseUser(it) })
offerOrNull(it?.let { FirebaseUser(it) })
}
awaitClose { unsubscribe() }
}
Expand Down
Expand Up @@ -13,6 +13,7 @@ import dev.gitlive.firebase.Firebase
import dev.gitlive.firebase.FirebaseApp
import dev.gitlive.firebase.database.ChildEvent.Type
import dev.gitlive.firebase.decode
import dev.gitlive.firebase.offerOrNull
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.coroutineScope
Expand Down Expand Up @@ -91,7 +92,7 @@ actual open class Query internal constructor(
get() = callbackFlow {
val listener = object : ValueEventListener {
override fun onDataChange(snapshot: com.google.firebase.database.DataSnapshot) {
offer(DataSnapshot(snapshot))
offerOrNull(DataSnapshot(snapshot))
}

override fun onCancelled(error: com.google.firebase.database.DatabaseError) {
Expand All @@ -107,22 +108,22 @@ actual open class Query internal constructor(

val moved by lazy { types.contains(Type.MOVED) }
override fun onChildMoved(snapshot: com.google.firebase.database.DataSnapshot, previousChildName: String?) {
if(moved) offer(ChildEvent(DataSnapshot(snapshot), Type.MOVED, previousChildName))
if(moved) offerOrNull(ChildEvent(DataSnapshot(snapshot), Type.MOVED, previousChildName))
}

val changed by lazy { types.contains(Type.CHANGED) }
override fun onChildChanged(snapshot: com.google.firebase.database.DataSnapshot, previousChildName: String?) {
if(changed) offer(ChildEvent(DataSnapshot(snapshot), Type.CHANGED, previousChildName))
if(changed) offerOrNull(ChildEvent(DataSnapshot(snapshot), Type.CHANGED, previousChildName))
}

val added by lazy { types.contains(Type.ADDED) }
override fun onChildAdded(snapshot: com.google.firebase.database.DataSnapshot, previousChildName: String?) {
if(added) offer(ChildEvent(DataSnapshot(snapshot), Type.ADDED, previousChildName))
if(added) offerOrNull(ChildEvent(DataSnapshot(snapshot), Type.ADDED, previousChildName))
}

val removed by lazy { types.contains(Type.REMOVED) }
override fun onChildRemoved(snapshot: com.google.firebase.database.DataSnapshot) {
if(removed) offer(ChildEvent(DataSnapshot(snapshot), Type.REMOVED, null))
if(removed) offerOrNull(ChildEvent(DataSnapshot(snapshot), Type.REMOVED, null))
}

override fun onCancelled(error: com.google.firebase.database.DatabaseError) {
Expand Down
Expand Up @@ -11,7 +11,9 @@ import dev.gitlive.firebase.FirebaseApp
import dev.gitlive.firebase.database.ChildEvent.Type
import dev.gitlive.firebase.database.ChildEvent.Type.*
import dev.gitlive.firebase.decode
import dev.gitlive.firebase.offerOrNull
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.channels.ClosedSendChannelException
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.callbackFlow
Expand Down Expand Up @@ -75,19 +77,23 @@ actual open class Query internal constructor(

actual fun startAt(value: Boolean, key: String?) = Query(ios.queryStartingAtValue(value, key), persistenceEnabled)

actual val valueEvents get() = callbackFlow {
actual val valueEvents get() = callbackFlow<DataSnapshot> {
val handle = ios.observeEventType(
FIRDataEventTypeValue,
withBlock = { offer(DataSnapshot(it!!)) }
withBlock = { snapShot ->
offerOrNull(DataSnapshot(snapShot!!))
}
) { close(DatabaseException(it.toString())) }
awaitClose { ios.removeObserverWithHandle(handle) }
}

actual fun childEvents(vararg types: Type) = callbackFlow {
actual fun childEvents(vararg types: Type) = callbackFlow<ChildEvent> {
val handles = types.map { type ->
ios.observeEventType(
type.toEventType(),
andPreviousSiblingKeyWithBlock = { it, key -> offer(ChildEvent(DataSnapshot(it!!), type, key)) }
andPreviousSiblingKeyWithBlock = { snapShot, key ->
offerOrNull(ChildEvent(DataSnapshot(snapShot!!), type, key))
}
) { close(DatabaseException(it.toString())) }
}
awaitClose {
Expand Down
Expand Up @@ -40,29 +40,30 @@ actual open class Query internal constructor(open val js: firebase.database.Quer
actual fun orderByKey() = Query(js.orderByKey())
actual fun orderByChild(path: String) = Query(js.orderByChild(path))

actual val valueEvents get() = callbackFlow {
actual val valueEvents get() = callbackFlow<DataSnapshot> {
val listener = rethrow {
js.on(
"value",
{ it, _ -> offer(DataSnapshot(it)) },
{ it, _ -> offerOrNull(DataSnapshot(it)) },
{ close(DatabaseException(it)).run { Unit } }
)
}
awaitClose { rethrow { js.off("value", listener) } }
}

actual fun childEvents(vararg types: ChildEvent.Type) = callbackFlow {
actual fun childEvents(vararg types: ChildEvent.Type) = callbackFlow<ChildEvent> {
val listeners = rethrow {
types.map { type ->
"child_${type.name.toLowerCase()}".let { eventType ->
eventType to js.on(
eventType,
{ snapshot, previousChildName ->
offer(
ChildEvent(
DataSnapshot(snapshot),
type,
previousChildName
offerOrNull(
ChildEvent(
DataSnapshot(snapshot),
type,
previousChildName
)
)
)
},
Expand Down
Expand Up @@ -6,10 +6,7 @@
package dev.gitlive.firebase.firestore

import com.google.firebase.firestore.SetOptions
import dev.gitlive.firebase.Firebase
import dev.gitlive.firebase.FirebaseApp
import dev.gitlive.firebase.decode
import dev.gitlive.firebase.encode
import dev.gitlive.firebase.*
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.runBlocking
Expand Down Expand Up @@ -262,9 +259,9 @@ actual class DocumentReference(val android: com.google.firebase.firestore.Docume
actual suspend fun get() =
DocumentSnapshot(android.get().await())

actual val snapshots get() = callbackFlow {
actual val snapshots get() = callbackFlow<DocumentSnapshot> {
val listener = android.addSnapshotListener { snapshot, exception ->
snapshot?.let { offer(DocumentSnapshot(snapshot)) }
snapshot?.let { offerOrNull(DocumentSnapshot(snapshot)) }
exception?.let { close(exception) }
}
awaitClose { listener.remove() }
Expand All @@ -277,9 +274,9 @@ actual open class Query(open val android: com.google.firebase.firestore.Query) {

actual fun limit(limit: Number) = Query(android.limit(limit.toLong()))

actual val snapshots get() = callbackFlow {
actual val snapshots get() = callbackFlow<QuerySnapshot> {
val listener = android.addSnapshotListener { snapshot, exception ->
snapshot?.let { offer(QuerySnapshot(snapshot)) }
snapshot?.let { offerOrNull(QuerySnapshot(snapshot)) }
exception?.let { close(exception) }
}
awaitClose { listener.remove() }
Expand Down
Expand Up @@ -5,6 +5,7 @@
package dev.gitlive.firebase.firestore

import cocoapods.FirebaseFirestore.*
import dev.gitlive.firebase.*
import kotlinx.cinterop.*
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.channels.awaitClose
Expand All @@ -13,11 +14,6 @@ import kotlinx.coroutines.runBlocking
import kotlinx.serialization.DeserializationStrategy
import kotlinx.serialization.SerializationStrategy
import platform.Foundation.NSError
import dev.gitlive.firebase.Firebase
import dev.gitlive.firebase.FirebaseApp
import dev.gitlive.firebase.decode
import dev.gitlive.firebase.encode
import dev.gitlive.firebase.FirebaseException

actual val Firebase.firestore get() =
FirebaseFirestore(FIRFirestore.firestore())
Expand Down Expand Up @@ -173,9 +169,9 @@ actual class DocumentReference(val ios: FIRDocumentReference) {
actual suspend fun get() =
DocumentSnapshot(awaitResult { ios.getDocumentWithCompletion(it) })

actual val snapshots get() = callbackFlow {
actual val snapshots get() = callbackFlow<DocumentSnapshot> {
val listener = ios.addSnapshotListener { snapshot, error ->
snapshot?.let { offer(DocumentSnapshot(snapshot)) }
snapshot?.let { offerOrNull(DocumentSnapshot(snapshot)) }
error?.let { close(error.toException()) }
}
awaitClose { listener.remove() }
Expand All @@ -188,9 +184,9 @@ actual open class Query(open val ios: FIRQuery) {

actual fun limit(limit: Number) = Query(ios.queryLimitedTo(limit.toLong()))

actual val snapshots get() = callbackFlow {
actual val snapshots get() = callbackFlow<QuerySnapshot> {
val listener = ios.addSnapshotListener { snapshot, error ->
snapshot?.let { offer(QuerySnapshot(snapshot)) }
snapshot?.let { offerOrNull(QuerySnapshot(snapshot)) }
error?.let { close(error.toException()) }
}
awaitClose { listener.remove() }
Expand Down
Expand Up @@ -236,9 +236,9 @@ actual class DocumentReference(val js: firebase.firestore.DocumentReference) {

actual suspend fun get() = rethrow { DocumentSnapshot(js.get().await()) }

actual val snapshots get() = callbackFlow {
actual val snapshots get() = callbackFlow<DocumentSnapshot> {
val unsubscribe = js.onSnapshot(
{ offer(DocumentSnapshot(it)) },
{ offerOrNull(DocumentSnapshot(it)) },
{ close(errorToException(it)) }
)
awaitClose { unsubscribe() }
Expand Down Expand Up @@ -286,10 +286,10 @@ actual open class Query(open val js: firebase.firestore.Query) {
}
)

actual val snapshots get() = callbackFlow {
actual val snapshots get() = callbackFlow<QuerySnapshot> {
val unsubscribe = rethrow {
js.onSnapshot(
{ offer(QuerySnapshot(it)) },
{ offerOrNull(QuerySnapshot(it)) },
{ close(errorToException(it)) }
)
}
Expand Down

0 comments on commit c4b45b0

Please sign in to comment.