Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature Request: add a hot flow abstraction that can be complete()d or error()ed from the emitter side #4008

Open
climategadgets opened this issue Jan 6, 2024 · 7 comments

Comments

@climategadgets
Copy link

climategadgets commented Jan 6, 2024

Use case

Any hot flow of limited size, or unlimited flow that can complete abnormally. Examples:

  • A pipeline driven by external source (say, a streaming HTTP request parser producing a number of elements)
  • An I/O driven pipeline ("read from here forever and put into this flow... oops, network error")

The Shape of the API

Current Behavior

  • There is a MutableSharedFlow, but the only operation that communicates data to the consumer is emit() (per documentation, "SharedFlow never ends"). It is not possible to communicate neither completion, nor an error.
  • This behavior is already supported by channels, but they are a different abstraction that can't be reused here without adapters.

Desired Behavior

  • Emitter side: add complete() and error(cause) operations.
  • Consumer side: no changes, existing abstractions already support this.

Prior Art

Project Reactor Sinks.Many tryEmitComplete(), tryEmitError(error)

Note about "non-example" remark from the issue template

This feature is requested not because it is present in Reactor, but because it is a logical extension of flow behavior. This behavior is already implicitly supported by cold flows (complete() happens upon the end of the flow originating set, and error() happens upon throwing an exception from within the flow source), and channels. Having this feature present in the flow will make the behavior uniform across different flow kinds (cold and hot) and make channel adapters behave in a more predictable way.

@dkhalanskyjb
Copy link
Collaborator

Could you provide some code examples of how you'd use this feature? It's unclear why the things you list as the use cases are not better expressed via callbackFlow or flow { }.shareIn.

@dovchinnikov
Copy link
Contributor

// available to send from multiple coroutines
val events = Channel<Event>() 

// no broadcast channel anymore, let's use flow
val broadcaster = flow {
  for (e in events) {
    emit(e)
  }
}.shareIn(...)

// a subscription
launch {
  broadcaster.collect { e ->
    handle(e)
  }
}

From shareIn docs:

Any exception in the upstream flow terminates the sharing coroutine without affecting any of the subscribers

which means it's not possible to let subscriber know that there will no more items. events.close does not let collect to return, which does not allow the collector coroutine to complete.

Sure, it's possible to define a sealed event hierarchy like:

// available to send from multiple coroutines
val events = Channel<Event>() 

sealed class EventOrCompletion {
  class ActualEvent(val evt: Event) : EventOrCompletion 
  class FailedOrCancelled(val t: Throwable) : EventOrCompletion
  data object Completed : EventOrCompletion
}

val broadcaster = flow {
  try {
    for (e in events) {
      emit(ActualEvent(e))
    }
    emit(Completed)
  } 
  catch (t: Throwable) {
    emit(FailedOrCancelled(t)
    throws t
  }
}.shareIn(...)

// a subscription
launch {
  broadcaster
    .takeWhile {
      it != Completed
    }
    .map {
      when (it) {
        is ActualEvent -> it.evt
        is Completion -> throw it.t
        else -> error("unreachable")
      }
    }
    .collect { e: Event ->
      handle(e)
    }
}

But it requires duplicate work on each subscriber side, and it still raises a question: why Channel has it embedded, and why MutableSharedFlow is not consistent with Channel?

@climategadgets
Copy link
Author

climategadgets commented Jan 16, 2024

Could you provide some code examples of how you'd use this feature? It's unclear why the things you list as the use cases are not better expressed via callbackFlow or flow { }.shareIn.

Code Examples

One of recent examples I worked with would be here. This project contains lots of other examples, search for .complete() or sink.error() (case insensitive).

callbackFlow

Per documentation, it is a cold flow. The model can probably be coerced to behave something like a completable and errorable hot flow, but it'll take a stretch, and the question stays - why is the stretch necessary if the concept of a reactive stream (which it very much looks like Flow tries to implement) makes no difference between a cold and a hot flow?

.shareIn

Per documentation, .shareIn produces a SharedFlow which, as the original request states, explicitly does not support neither completion nor erroring out.

P.S.: to the point @dovchinnikov makes - he actually presented a solution identical to the one presented by my colleague when I raised this question at work. Yes, it is pretty trivial to implement, but, why do hot and cold flows behave in a different way? Why do flows and channels behave in a different way? Flow to channel and channel to flow transformations present in Kotlin itself are nice, but the concept is broken as long as this inconsistency stays. Think of how many development teams are forced to bridge this gap in subtly different ways (creating subtly different bugs), and how much faster all of us would get to the solution if a uniform solution was available.

P.P.S: A cursory search reveals another almost identical solution: https://stackoverflow.com/questions/75856447/kotlin-sharedflow-and-catching-exceptions "Almost identical" is a bane, I saw cases when people even inside of the same big team use boilerplate code that turns out to be almost the same, but with different quirks - and later, developers run into making errors because they make assumptions based on occurrences they have previously seen but which are no longer true with different occurrences of the same pattern.

@climategadgets
Copy link
Author

One more thing - here, cold streams (.fromIterable()) are routinely used in place of hot streams for testing. The need to have .complete() and .error() becomes more obvious if one tries to do the same with cold and hot flows.

@dkhalanskyjb
Copy link
Collaborator

Also: #4019 Rephrasing that issue, there's no way to make flattenMerge work correctly for Flow<StateFlow<_>>: after concurrency StateFlow instances have finished their work, nothing else will be emitted.

@yoxjames
Copy link

yoxjames commented Mar 5, 2024

The replicate enhancement I logged somewhat touches on this topic. Replicated flows would be cold but able to be shared with multiple consumers. Ending them is possible via throwing or the replicated flow simply terminating normally. This accomplishes something like sharing HTTP or disk IO without creating a never ending SharedFlow.

I am not sure if this design covers all use cases you have in mind (it does not include a emitter side close() proposal but it seemed somewhat related. Also this design was for situations where the total number of collectors was known vs unknown. Not sure if that would perfectly capture the use cases you had in mind.

#3833

@elizarov
Copy link
Contributor

elizarov commented Mar 6, 2024

The original design issue for the shared flows goes to great lengths in explaining the rationale for this specific design and contrasts it with the design in reactive libraries like Rx and Reactor. Please, see here: #2034

The following operators (not implement yet) are supposed to solve the cases where you need to have a shared flow with support for errors: #2092

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants