Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed Coroutine Cancellation Exception for concurrent close and offer #93

Merged
merged 4 commits into from Oct 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -6,6 +6,9 @@
@file:JvmName("CommonKt")
package dev.gitlive.firebase

import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.channels.ClosedSendChannelException
import kotlinx.coroutines.channels.SendChannel
import kotlin.jvm.JvmMultifileClass
import kotlin.jvm.JvmName

Expand Down Expand Up @@ -59,4 +62,12 @@ expect class FirebaseNetworkException : FirebaseException

expect open class FirebaseTooManyRequestsException : FirebaseException

expect open class FirebaseApiNotAvailableException : FirebaseException
expect open class FirebaseApiNotAvailableException : FirebaseException

inline fun <E> SendChannel<E>.offerOrNull(element: E): Boolean? = try {
offer(element)
} catch (e : ClosedSendChannelException) {
null
} catch (e : CancellationException) {
null
}
Expand Up @@ -11,6 +11,7 @@ import com.google.firebase.auth.ActionCodeResult.*
import com.google.firebase.auth.FirebaseAuth.AuthStateListener
import dev.gitlive.firebase.Firebase
import dev.gitlive.firebase.FirebaseApp
import dev.gitlive.firebase.offerOrNull
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.callbackFlow
Expand All @@ -26,15 +27,15 @@ actual class FirebaseAuth internal constructor(val android: com.google.firebase.
actual val currentUser: FirebaseUser?
get() = android.currentUser?.let { FirebaseUser(it) }

actual val authStateChanged get() = callbackFlow {
val listener = AuthStateListener { auth -> offer(auth.currentUser?.let { FirebaseUser(it) }) }
actual val authStateChanged get() = callbackFlow<FirebaseUser?> {
val listener = AuthStateListener { auth -> offerOrNull(auth.currentUser?.let { FirebaseUser(it) }) }
android.addAuthStateListener(listener)
awaitClose { android.removeAuthStateListener(listener) }
}

actual val idTokenChanged: Flow<FirebaseUser?>
get() = callbackFlow {
val listener = com.google.firebase.auth.FirebaseAuth.IdTokenListener { auth -> offer(auth.currentUser?.let { FirebaseUser(it) })}
val listener = com.google.firebase.auth.FirebaseAuth.IdTokenListener { auth -> offerOrNull(auth.currentUser?.let { FirebaseUser(it) }) }
android.addIdTokenListener(listener)
awaitClose { android.removeIdTokenListener(listener) }
}
Expand Down
Expand Up @@ -8,6 +8,7 @@ import cocoapods.FirebaseAuth.*
import dev.gitlive.firebase.Firebase
import dev.gitlive.firebase.FirebaseApp
import dev.gitlive.firebase.FirebaseException
import dev.gitlive.firebase.offerOrNull
import dev.gitlive.firebase.auth.ActionCodeResult.*
import kotlinx.cinterop.*
import kotlinx.coroutines.CompletableDeferred
Expand All @@ -27,13 +28,13 @@ actual class FirebaseAuth internal constructor(val ios: FIRAuth) {
actual val currentUser: FirebaseUser?
get() = ios.currentUser?.let { FirebaseUser(it) }

actual val authStateChanged get() = callbackFlow {
val handle = ios.addAuthStateDidChangeListener { _, user -> offer(user?.let { FirebaseUser(it) }) }
actual val authStateChanged get() = callbackFlow<FirebaseUser?> {
val handle = ios.addAuthStateDidChangeListener { _, user -> offerOrNull(user?.let { FirebaseUser(it) }) }
awaitClose { ios.removeAuthStateDidChangeListener(handle) }
}

actual val idTokenChanged get() = callbackFlow {
val handle = ios.addIDTokenDidChangeListener { _, user -> offer(user?.let { FirebaseUser(it) }) }
actual val idTokenChanged get() = callbackFlow<FirebaseUser?> {
val handle = ios.addIDTokenDidChangeListener { _, user -> offerOrNull(user?.let { FirebaseUser(it) }) }
awaitClose { ios.removeIDTokenDidChangeListener(handle) }
}

Expand Down
Expand Up @@ -21,16 +21,16 @@ actual class FirebaseAuth internal constructor(val js: firebase.auth.Auth) {
actual val currentUser: FirebaseUser?
get() = rethrow { js.currentUser?.let { FirebaseUser(it) } }

actual val authStateChanged get() = callbackFlow {
actual val authStateChanged get() = callbackFlow<FirebaseUser?> {
val unsubscribe = js.onAuthStateChanged {
offer(it?.let { FirebaseUser(it) })
offerOrNull(it?.let { FirebaseUser(it) })
}
awaitClose { unsubscribe() }
}

actual val idTokenChanged get() = callbackFlow {
actual val idTokenChanged get() = callbackFlow<FirebaseUser?> {
val unsubscribe = js.onIdTokenChanged {
offer(it?.let { FirebaseUser(it) })
offerOrNull(it?.let { FirebaseUser(it) })
}
awaitClose { unsubscribe() }
}
Expand Down
Expand Up @@ -13,6 +13,7 @@ import dev.gitlive.firebase.Firebase
import dev.gitlive.firebase.FirebaseApp
import dev.gitlive.firebase.database.ChildEvent.Type
import dev.gitlive.firebase.decode
import dev.gitlive.firebase.offerOrNull
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.coroutineScope
Expand Down Expand Up @@ -91,7 +92,7 @@ actual open class Query internal constructor(
get() = callbackFlow {
val listener = object : ValueEventListener {
override fun onDataChange(snapshot: com.google.firebase.database.DataSnapshot) {
offer(DataSnapshot(snapshot))
offerOrNull(DataSnapshot(snapshot))
}

override fun onCancelled(error: com.google.firebase.database.DatabaseError) {
Expand All @@ -107,22 +108,22 @@ actual open class Query internal constructor(

val moved by lazy { types.contains(Type.MOVED) }
override fun onChildMoved(snapshot: com.google.firebase.database.DataSnapshot, previousChildName: String?) {
if(moved) offer(ChildEvent(DataSnapshot(snapshot), Type.MOVED, previousChildName))
if(moved) offerOrNull(ChildEvent(DataSnapshot(snapshot), Type.MOVED, previousChildName))
}

val changed by lazy { types.contains(Type.CHANGED) }
override fun onChildChanged(snapshot: com.google.firebase.database.DataSnapshot, previousChildName: String?) {
if(changed) offer(ChildEvent(DataSnapshot(snapshot), Type.CHANGED, previousChildName))
if(changed) offerOrNull(ChildEvent(DataSnapshot(snapshot), Type.CHANGED, previousChildName))
}

val added by lazy { types.contains(Type.ADDED) }
override fun onChildAdded(snapshot: com.google.firebase.database.DataSnapshot, previousChildName: String?) {
if(added) offer(ChildEvent(DataSnapshot(snapshot), Type.ADDED, previousChildName))
if(added) offerOrNull(ChildEvent(DataSnapshot(snapshot), Type.ADDED, previousChildName))
}

val removed by lazy { types.contains(Type.REMOVED) }
override fun onChildRemoved(snapshot: com.google.firebase.database.DataSnapshot) {
if(removed) offer(ChildEvent(DataSnapshot(snapshot), Type.REMOVED, null))
if(removed) offerOrNull(ChildEvent(DataSnapshot(snapshot), Type.REMOVED, null))
}

override fun onCancelled(error: com.google.firebase.database.DatabaseError) {
Expand Down
Expand Up @@ -11,7 +11,9 @@ import dev.gitlive.firebase.FirebaseApp
import dev.gitlive.firebase.database.ChildEvent.Type
import dev.gitlive.firebase.database.ChildEvent.Type.*
import dev.gitlive.firebase.decode
import dev.gitlive.firebase.offerOrNull
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.channels.ClosedSendChannelException
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.callbackFlow
Expand Down Expand Up @@ -75,19 +77,23 @@ actual open class Query internal constructor(

actual fun startAt(value: Boolean, key: String?) = Query(ios.queryStartingAtValue(value, key), persistenceEnabled)

actual val valueEvents get() = callbackFlow {
actual val valueEvents get() = callbackFlow<DataSnapshot> {
val handle = ios.observeEventType(
FIRDataEventTypeValue,
withBlock = { offer(DataSnapshot(it!!)) }
withBlock = { snapShot ->
offerOrNull(DataSnapshot(snapShot!!))
}
) { close(DatabaseException(it.toString())) }
awaitClose { ios.removeObserverWithHandle(handle) }
}

actual fun childEvents(vararg types: Type) = callbackFlow {
actual fun childEvents(vararg types: Type) = callbackFlow<ChildEvent> {
val handles = types.map { type ->
ios.observeEventType(
type.toEventType(),
andPreviousSiblingKeyWithBlock = { it, key -> offer(ChildEvent(DataSnapshot(it!!), type, key)) }
andPreviousSiblingKeyWithBlock = { snapShot, key ->
offerOrNull(ChildEvent(DataSnapshot(snapShot!!), type, key))
}
) { close(DatabaseException(it.toString())) }
}
awaitClose {
Expand Down
Expand Up @@ -40,29 +40,30 @@ actual open class Query internal constructor(open val js: firebase.database.Quer
actual fun orderByKey() = Query(js.orderByKey())
actual fun orderByChild(path: String) = Query(js.orderByChild(path))

actual val valueEvents get() = callbackFlow {
actual val valueEvents get() = callbackFlow<DataSnapshot> {
val listener = rethrow {
js.on(
"value",
{ it, _ -> offer(DataSnapshot(it)) },
{ it, _ -> offerOrNull(DataSnapshot(it)) },
{ close(DatabaseException(it)).run { Unit } }
)
}
awaitClose { rethrow { js.off("value", listener) } }
}

actual fun childEvents(vararg types: ChildEvent.Type) = callbackFlow {
actual fun childEvents(vararg types: ChildEvent.Type) = callbackFlow<ChildEvent> {
val listeners = rethrow {
types.map { type ->
"child_${type.name.toLowerCase()}".let { eventType ->
eventType to js.on(
eventType,
{ snapshot, previousChildName ->
offer(
ChildEvent(
DataSnapshot(snapshot),
type,
previousChildName
offerOrNull(
ChildEvent(
DataSnapshot(snapshot),
type,
previousChildName
)
)
)
},
Expand Down
Expand Up @@ -6,10 +6,7 @@
package dev.gitlive.firebase.firestore

import com.google.firebase.firestore.SetOptions
import dev.gitlive.firebase.Firebase
import dev.gitlive.firebase.FirebaseApp
import dev.gitlive.firebase.decode
import dev.gitlive.firebase.encode
import dev.gitlive.firebase.*
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.runBlocking
Expand Down Expand Up @@ -262,9 +259,9 @@ actual class DocumentReference(val android: com.google.firebase.firestore.Docume
actual suspend fun get() =
DocumentSnapshot(android.get().await())

actual val snapshots get() = callbackFlow {
actual val snapshots get() = callbackFlow<DocumentSnapshot> {
val listener = android.addSnapshotListener { snapshot, exception ->
snapshot?.let { offer(DocumentSnapshot(snapshot)) }
snapshot?.let { offerOrNull(DocumentSnapshot(snapshot)) }
exception?.let { close(exception) }
}
awaitClose { listener.remove() }
Expand All @@ -277,9 +274,9 @@ actual open class Query(open val android: com.google.firebase.firestore.Query) {

actual fun limit(limit: Number) = Query(android.limit(limit.toLong()))

actual val snapshots get() = callbackFlow {
actual val snapshots get() = callbackFlow<QuerySnapshot> {
val listener = android.addSnapshotListener { snapshot, exception ->
snapshot?.let { offer(QuerySnapshot(snapshot)) }
snapshot?.let { offerOrNull(QuerySnapshot(snapshot)) }
exception?.let { close(exception) }
}
awaitClose { listener.remove() }
Expand Down
Expand Up @@ -5,6 +5,7 @@
package dev.gitlive.firebase.firestore

import cocoapods.FirebaseFirestore.*
import dev.gitlive.firebase.*
import kotlinx.cinterop.*
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.channels.awaitClose
Expand All @@ -13,11 +14,6 @@ import kotlinx.coroutines.runBlocking
import kotlinx.serialization.DeserializationStrategy
import kotlinx.serialization.SerializationStrategy
import platform.Foundation.NSError
import dev.gitlive.firebase.Firebase
import dev.gitlive.firebase.FirebaseApp
import dev.gitlive.firebase.decode
import dev.gitlive.firebase.encode
import dev.gitlive.firebase.FirebaseException

actual val Firebase.firestore get() =
FirebaseFirestore(FIRFirestore.firestore())
Expand Down Expand Up @@ -173,9 +169,9 @@ actual class DocumentReference(val ios: FIRDocumentReference) {
actual suspend fun get() =
DocumentSnapshot(awaitResult { ios.getDocumentWithCompletion(it) })

actual val snapshots get() = callbackFlow {
actual val snapshots get() = callbackFlow<DocumentSnapshot> {
val listener = ios.addSnapshotListener { snapshot, error ->
snapshot?.let { offer(DocumentSnapshot(snapshot)) }
snapshot?.let { offerOrNull(DocumentSnapshot(snapshot)) }
error?.let { close(error.toException()) }
}
awaitClose { listener.remove() }
Expand All @@ -188,9 +184,9 @@ actual open class Query(open val ios: FIRQuery) {

actual fun limit(limit: Number) = Query(ios.queryLimitedTo(limit.toLong()))

actual val snapshots get() = callbackFlow {
actual val snapshots get() = callbackFlow<QuerySnapshot> {
val listener = ios.addSnapshotListener { snapshot, error ->
snapshot?.let { offer(QuerySnapshot(snapshot)) }
snapshot?.let { offerOrNull(QuerySnapshot(snapshot)) }
error?.let { close(error.toException()) }
}
awaitClose { listener.remove() }
Expand Down
Expand Up @@ -236,9 +236,9 @@ actual class DocumentReference(val js: firebase.firestore.DocumentReference) {

actual suspend fun get() = rethrow { DocumentSnapshot(js.get().await()) }

actual val snapshots get() = callbackFlow {
actual val snapshots get() = callbackFlow<DocumentSnapshot> {
val unsubscribe = js.onSnapshot(
{ offer(DocumentSnapshot(it)) },
{ offerOrNull(DocumentSnapshot(it)) },
{ close(errorToException(it)) }
)
awaitClose { unsubscribe() }
Expand Down Expand Up @@ -286,10 +286,10 @@ actual open class Query(open val js: firebase.firestore.Query) {
}
)

actual val snapshots get() = callbackFlow {
actual val snapshots get() = callbackFlow<QuerySnapshot> {
val unsubscribe = rethrow {
js.onSnapshot(
{ offer(QuerySnapshot(it)) },
{ offerOrNull(QuerySnapshot(it)) },
{ close(errorToException(it)) }
)
}
Expand Down