Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to use Flows in an imperative mode? #4113

Closed
mahozad opened this issue Apr 23, 2024 · 3 comments
Closed

How to use Flows in an imperative mode? #4113

mahozad opened this issue Apr 23, 2024 · 3 comments

Comments

@mahozad
Copy link
Contributor

mahozad commented Apr 23, 2024

Use case
Suspending functions and the code in a coroutine can be used like normal imperative code.

But Flows can be used only in a reactive way (on X do Y).

I'm just curious, is there any way to use Flows like "regular" code?

Example of current declarative way:

myComponent
    .getSomeThings()
    .onStart { /* START */ }
    .onEmpty { /* EMPTY */ }
    .onEach { /* EACH */ }
    .onCompletion { /* COMPLETE */ }
    .catch { /* EXCEPTION */ }
    .launchIn(myCoroutineScope)

to be written like this:

myCoroutineScope.launch {
    val flow = myComponent.getSomeThings()
    /* START */
    if (flow.isEmpty()) /* EMPTY */
    while (flow.isNotComplete()) {
        val item = flow
            .runCatching { getNext() }
            .getOrElse { /* EXCEPTION */ }
        /* EACH */
    }
    /* COMPLETE */
}

The Shape of the API
Something like the above example. I'm not sure if all the functionalities of various Flow operators can be achieved in imperative code.

@roomscape
Copy link

A flow is "cold", so on its own it can't be used in the way you describe. It's only active when the consumer is collecting it, and all of its state lives inside the collect function. Outside collect, the flow isn't running or producing values, and there's no way to make a stateful iterator.

To turn it into a stateful "hot" stream of values, we can add an additional producer coroutine using produceIn. The coroutine will be responsible for maintaining the flow in an active state, and will expose each successive value via a Channel.

With produceIn, your example code would look like this:

myCoroutineScope.launch {
  val flow = myComponent.getSomeThings()
  val channel = flow.produceIn(this)
  for (item in channel) {
      …
  }
}

Since the producer coroutine is now responsible for the lifecycle of the flow, including any resources it might hold, it's important to use correct structured concurrency to ensure everything's cleaned up afterwards.

@dkhalanskyjb
Copy link
Collaborator

Every flow operator is open-source and fairly short. You can replace it with its implementation by getting rid of a few optimizations.

originalFlow
    .onStart { /* START */ }

becomes

flow {
  /* START */
  emitAll(originalFlow)
}
originalFlow
    .onStart { /* START */ }
    .onEmpty { /* EMPTY */ }

becomes

flow {
  var isEmpty = true
  flow {
    /* START */
    emitAll(originalFlow)
  }.collect {
    isEmpty = false
    emit(it)
  }
  if (isEmpty) {
    /* EMPTY */
  }
}

With onEach, this becomes

flow {
  flow {
    var isEmpty = true
    flow {
      /* START */
      emitAll(originalFlow)
    }.collect {
      isEmpty = false
      emit(it)
    }
    if (isEmpty) {
      /* EMPTY */
    }
  }.collect {
    /* EACH */
    emit(it)
  }
}

and so on.

@qwwdfsad
Copy link
Member

qwwdfsad commented May 8, 2024

https://youtrack.jetbrains.com/issue/KT-33851/iterator-less-for-statement-operator-convention in the future also might help w.r.t. more imperative usages

Another option is to transform flow into the channel via produceIn or reuse the facility from #3274

But in general, our consensus is that we do intend to provide one more way to consume the flow which is drastically different (semantics-wise and performance-wise) than the regular pull model.

@qwwdfsad qwwdfsad closed this as not planned Won't fix, can't repro, duplicate, stale May 8, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants