You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Let us say, that we need to have a class holding Oauth access token and refresh token.
A valid access token has to be attached to any outgoing online request. Requests are being executed concurrently.
Access token has a short lifetime. If it is not valid, we should make exactly one call with refresh token to get a new access-refresh token pair and then issue attach new access token to online requests.
We need to avoid refreshing tokens more than once, even when multiple concurrent requests are going out. We must not interrupt refreshing tokens procedure mid-flight.
What do we have in coroutines lib to solve such a problem of mutable state shared between coroutines?
MutableStateFlow. It has an updateAndGet { } method, which is atomic (like AtomicInteger, etc). However a lambda passed to updateAndGet { ... } method may be called multiple times, to achieve atomic update result. This is ok, when state mutation does not include side effects (is idempotent). But in our case, we need to change token state, along making a non-idempotent online request. So this one is out
Mutex . It will allow only one coroutine to enter a protected block of mutex.withLock { }. However mutex does not protect against cancellation - we can be cancelled inside Mutex protected block. And we really should not cancel our (refresh token call + save newly obtained tokens) procedure.
Mutex in combination with withContext(NonCancellable) { } . Now we are safe against cancellation, but... the request, which first encounters an invalid access token and triggers refresh token procedure will be non-cancellable for the duration of the token refresh. This will unnecessarily tie resources, which should have been freed for GC to eat.
Actor coroutine, holding state and accepting messages - as described in Shared mutable state and concurrency official guide. Actor has an edge over Mutex - it will ignore cancellation of coroutine, who send him a Message, while it will not prevent prompt cancellation of this "foreign" corotuine, which have sent the Message.
Except, that actor has been obsolete for quite a few years without replacement. It is considered detrimental (actors should come as a system) and is a pain to write (Messages with CompletableDeferreds, big ass function with when block, etc).
What should be instead?
I think coroutines could use an actor-like container for mutable state, which is safe to use between concurrent coroutines (even, when mutation has side effects and is prone to cancellation problems).
However:
Actor is most likely not a good name. Actor raises expectancies high, when it comes to performance and design. And, I think, we could use just a simple state-container, to be used in generally non-actor environment
We need it to be less tiresome to use and simpler, than obsolete actor { } corotuine builder.
TL, DR:
Let us have a function which just constructs (runs) a coroutine, which sole responsibility is to hold mutable state, say:
val protectedSate = stateCoroutine(AuthTokens())
protectedState could be a SendChannel<suspend (T) -> T> or something more specific.
And a second function to make a safe state mutation, with side effects allowed and free of cancellation problems:
val accessToken = protectedState.update { (accessToken, refreshToken) ->
if(accessToken.isValid) AuthTokens(accessToken, refreshToken)
else refreshTokens(refreshToken) // side effect, which is not going to be cancelled or run multiple times
}.accessToken
This would be like Android Data Store update { } and basically with the same guarantees. Underneath it would use a SendChannel and CompletableDeferred to communicate with stateCoroutine and get a new state from it.
And most likely we would need a
val currentState = protectedState.get()
function to just read the current state without any modification.
Why?
The upsides of your proposal.
Add a truly concurrency problem-free mutable shared state container, which coroutines seem to be missing
Easy to use, hassle free (almost like a MutableStateFlow)
Not an actor by name, so it will not be compared to Akka actors, etc.
Does not need any heavy abstractions or lots of code to implement.
Why not?
The downsides of your proposal that you already see.
Very similar to MutableStateFlow. It may raise a confusion, on which one to use for sharing mutable state. We would have to educate users on how differently update { } works on MutableStateFlow and this stateCoroutine.
Not sure if stateCoroutine should have its own type or be exposed as a SendChannel<suspend (T) -> T>. Probably it should have its own type, as Channels are considered low-level and for advances usages.
The text was updated successfully, but these errors were encountered:
As an idea: we can provide an alternative implementation of StateFlow interface with a different name (like EncapsulatedStateFlow, ManagedStateFlow, etc) that will not provide any ability to directly set the current state of the flow and will not provide emit function like MutableStateFlow does, but will, instead, provide an update method that accepts a suspending lambda with the kind of behavior that you want to achieve (fun EncapsulatedStateFlow<T>.update(function: suspend (T) -> T)).
What do we have now?
Let us say, that we need to have a class holding Oauth access token and refresh token.
What do we have in coroutines lib to solve such a problem of mutable state shared between coroutines?
MutableStateFlow
. It has anupdateAndGet { }
method, which is atomic (like AtomicInteger, etc). However a lambda passed toupdateAndGet { ... }
method may be called multiple times, to achieve atomic update result. This is ok, when state mutation does not include side effects (is idempotent). But in our case, we need to change token state, along making a non-idempotent online request. So this one is outMutex
. It will allow only one coroutine to enter a protected block ofmutex.withLock { }
. However mutex does not protect against cancellation - we can be cancelled insideMutex
protected block. And we really should not cancel our (refresh token call + save newly obtained tokens) procedure.Mutex
in combination withwithContext(NonCancellable) { }
. Now we are safe against cancellation, but... the request, which first encounters an invalid access token and triggers refresh token procedure will be non-cancellable for the duration of the token refresh. This will unnecessarily tie resources, which should have been freed for GC to eat.Actor coroutine
, holding state and accepting messages - as described inShared mutable state and concurrency
official guide. Actor has an edge over Mutex - it will ignore cancellation of coroutine, who send him a Message, while it will not prevent prompt cancellation of this "foreign" corotuine, which have sent the Message.What should be instead?
I think coroutines could use an actor-like container for mutable state, which is safe to use between concurrent coroutines (even, when mutation has side effects and is prone to cancellation problems).
However:
Actor
is most likely not a good name.Actor
raises expectancies high, when it comes to performance and design. And, I think, we could use just a simple state-container, to be used in generally non-actor environmentactor { }
corotuine builder.TL, DR:
Let us have a function which just constructs (runs) a coroutine, which sole responsibility is to hold mutable state, say:
protectedState
could be aSendChannel<suspend (T) -> T>
or something more specific.And a second function to make a safe state mutation, with side effects allowed and free of cancellation problems:
This would be like Android Data Store
update { }
and basically with the same guarantees. Underneath it would use aSendChannel
andCompletableDeferred
to communicate withstateCoroutine
and get a new state from it.And most likely we would need a
function to just read the current state without any modification.
Why?
The upsides of your proposal.
Why not?
The downsides of your proposal that you already see.
update { }
works onMutableStateFlow
and thisstateCoroutine
.stateCoroutine
should have its own type or be exposed as aSendChannel<suspend (T) -> T>
. Probably it should have its own type, as Channels are considered low-level and for advances usages.The text was updated successfully, but these errors were encountered: