Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Confusing behavior when using retry() and wrapping an exception thrown by emit() #2860

Closed
fluidsonic opened this issue Aug 4, 2021 · 6 comments

Comments

@fluidsonic
Copy link

fluidsonic commented Aug 4, 2021

The following code is supposed to retry upstream if it throws an exception.

If the exception comes from downstream through the upstream's emit() then there it shouldn't retry as the downstream has failed already. This works well if I rethrow the exception raised by emit() unchanged.

However if I add some context to that exception by wrapping it in another exception then retry treats it as an exception that doesn't come from downstream but upstream and actually restarts upstream. That leads to a transparency violation.

This is unintuitive because wrapping exceptions is fairly common to add context.
Independent of whether it's legit to do so or not the behavior is quiet confusing. I expect both Flows to stop on downstream exception.
Also, wrapping exceptions works just fine if no retry operator is involved.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*


suspend fun main() {
    flow {
        flow {
            val context = "some context"

            emit(1)
            delay(500)

            emit(2)
            delay(500)

            try {
                emit(3)
            } catch (e: CancellationException) {
                throw e
            } catch (e: Throwable) {
                // throw e // This works.
                throw RuntimeException("<new exception to add $context>", e) // This fails.
            }
        }
            .retry { delay(500); true }
            .collect { n ->
                println("inner: $n")
                emit(n)
            }
    }
        .catch { e ->
            println("outer: $e")
        }
        .collect { n ->
            println("outer: $n")
            if (n == 3) error("Fail")
        }
}

Output

inner: 1
outer: 1
inner: 2
outer: 2
inner: 3
outer: 3
inner: 1
outer: java.lang.IllegalStateException: Flow exception transparency is violated:
    Previous 'emit' call has thrown exception java.lang.IllegalStateException: Fail, but then emission attempt of value '1' has been detected.
    Emissions from 'catch' blocks are prohibited in order to avoid unspecified behaviour, 'Flow.catch' operator can be used instead.
    For a more detailed explanation, please refer to Flow documentation.

It gets even more confusing when I extend the try block to include the first emit().
The downstream keeps collecting but doesn't receive any more elements. The upstream repeatedly emits the first element indefinitely.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

suspend fun main() {
    flow {
        flow {
            val context = "some context"

            try {
                emit(1)
                delay(500)

                emit(2)
                delay(500)

                emit(3)
            } catch (e: CancellationException) {
                throw e
            } catch (e: Throwable) {
                // throw e // This works.
                throw RuntimeException("<new exception to add $context>", e) // This fails.
            }
        }
            .retry { delay(500); true }
            .collect { n ->
                println("inner: $n")
                emit(n)
            }
    }
        .catch { e ->
            println("outer: $e")
        }
        .collect { n ->
            println("outer: $n")
            if (n == 3) error("Fail")
        }
}

Output:

inner: 1
outer: 1
inner: 2
outer: 2
inner: 3
outer: 3
inner: 1
inner: 1
inner: 1
inner: 1
inner: 1
inner: 1
inner: 1

if (e.isSameExceptionAs(fromDownstream) || e.isCancellationCause(coroutineContext)) {
throw e // Rethrow exceptions from downstream and cancellation causes
} else {
return e // not from downstream
}

@qwwdfsad qwwdfsad added the flow label Aug 10, 2021
@qwwdfsad
Copy link
Member

qwwdfsad commented Aug 10, 2021

Thanks for the detailed write-up and self-contained snippets, it significantly simplifies the reasoning and investigation!

However if I add some context to that exception by wrapping it in another exception then retry treats it as an exception that doesn't come from downstream but upstream and actually restarts upstream. That leads to a transparency violation.

The very first observation is about the fact that this is exactly the exception transparency violation. The transparency is important for the library -- e.g. the downstream should not worry about how the upstream flow is implemented -- if it throws an exception X, then it will be properly rethrown. If the collect is invoked in the context of Dispatchers.Main nothing in the upstream can change that.

In your example, you are basically trying to hijack the downstream exception and replace it with your own. It violates the exception transparency and drives the replay operator mad, at this point, all bets are off and we cannot provide any correctness guarantees.

Even though your proposal is quite simple, implementing it will make other things much worse -- now the upstream can hijack downstream exceptions, retry and catch now cannot differentiate between "the upstream threw an exception in its finally block when it was cleaning up the resources after the downstream cancellation" from "the upstream wants to add some context that shouldn't be handled by 'catch' operator because semantically it is a downstream exception, not an upstream one" and so on.

So before jumping to the potential solutions, it would be nice to know what exactly you are trying to achieve.
E.g. the flow documentation clearly suggests using catch operators instead of try-catch for the sake of transparency
and you can rewrite your code in a more declarative manner:

flow {
    flow {
        emit(1)
        delay(500)
        emit(2)
        delay(500)
        emit(3)
    }
        .retry { delay(500); true }
        .onEach { n ->
            println("inner: $n")
            emit(n)
        }
        .catch { e ->
            if (e !is CancellationException) throw RuntimeException("<new exception to add>", e)
        }
        .collect()
}.catch { e ->
    println("outer: $e")
}.collect { n ->
    println("outer: $n")
    if (n == 3) error("Fail")
}

Does it solve your issue?

@fluidsonic
Copy link
Author

fluidsonic commented Aug 10, 2021

Thank you for the detailed explanation @qwwdfsad.
I agree to not mess with downstream exceptions and that adding context can be achieved in other ways.

However my example simplifies the problem here too much.

In my actual code I have a much longer Flow builder.
I basically wrap much of the implementation in a big try…catch to add some context when some of the logic fails.
However, somewhere in that try block there's also an emit(). The fact that downstream exceptions are also caught easily goes unnoticed. Most exceptions are handled properly but in that rare case that downstream throws an exception, I run into this problem.

What I'd basically have to do is to "catch everything except if thrown by an emit()".
That's actually very hard to do with try…catch. I can't for example surround only part of the websocket block with try…catch if I want to catch exceptions in the websocket function (Ktor) itself.
The catch() operator on the other hand basically covers everything in the Flow builder at once. And it doesn't have access to any current state in the Flow builder.

All of that can be worked around somehow. It's just not too simple if the code is more involved. And it's quite easy to miss those downstream/upstream exception details if you're used to using try…catch blocks.

flow {
   try {
      // various logic
      // database-dependent code
      // maybe some networking
      // load & parse data
      val someLoadedData =// even a websocket connection
      clientWebsocket {
         // do stuff
      }
   } // + rethrow CancellationException
   catch (e: Throwable) {
      throw SomeOtherException("operation for $someLoadedData failed", e)
   }
}

@qwwdfsad
Copy link
Member

Thanks for the explanation!

I basically wrap much of the implementation in a big try…catch to add some context when some of the logic fails.

Just to clarify, you want to enhance only upstream exceptions, right?
So, the API like catch (e: Throwable) { if (!e.isFromDownstream()) throw ... } would've helped, wouldn't it?

The catch() operator on the other hand basically covers everything in the Flow builder at once

I got the problem with the context, but could you please elaborate on what's wrong with "everything" part? If I got your use-case right, try-catch block does exactly the same -- tries to handle any exception from the whole body of the flow

@fluidsonic
Copy link
Author

Just to clarify, you want to enhance only upstream exceptions, right?
So, the API like catch (e: Throwable) { if (!e.isFromDownstream()) throw ... } would've helped, wouldn't it?

Exactly. That would help.
Then I'd probably write if (e.isFromDownstream()) throw e.
That in turn I'd probably put in an extension function like e.throwIfFromDownstream() or even e.throwIfCancellationOrFromDownstream() (but with a nicer name).

However it doesn't solve the other problem that it's too easy to miss the accidental catching of emit() exceptions.

Would it also be a solution to wrap downstream exceptions in a special CancellationException and un-wrap them somewhere else? We've already learned to rethrow CancellationException as they're part of normal coroutine execution.
(However I'm not sure if rethrowing those is even necessary if there's no code after the catch block.)

OTOH someone might expect such an exception to be an actual cancellation rather than a downstream failure.
Just putting out some thoughts here. May not make sense.

I got the problem with the context, but could you please elaborate on what's wrong with "everything" part? If I got your use-case right, try-catch block does exactly the same -- tries to handle any exception from the whole body of the flow

That's totally fine if I want to handle exceptions for all code in the Flow builder the same way.
But if I want a replacement for multiple try…catch blocks in the Flow builder (for example to add different context or throw different exceptions depending on the code I wrap) then I'm out of luck with an all-encompassing catch() operator.

qwwdfsad added a commit that referenced this issue Aug 12, 2021
…tream (JVM-only yet) with explicit guarantees on context/exception transparency enforcement

Pre-design for #2860
qwwdfsad added a commit that referenced this issue Aug 12, 2021
…tream (JVM-only yet) with explicit guarantees on context/exception transparency enforcement

Pre-design for #2860
@qwwdfsad
Copy link
Member

Thanks!

We'll discuss your use case and potential solutions and the final resolution will be here around Kotlin 1.6.0

@qwwdfsad
Copy link
Member

qwwdfsad commented Oct 14, 2021

Small update: we'll try to prototype a slightly different solution.

The problem with isFromDownstream is straightforward -- we have to educate people about isFromDownstream and it's really hard to get right from the first try. The code without it will be working just fine until the very first unexpected exception (and our experience shows that people rarely test that). Only then users will start looking for a solution.
Also, side-effects of misuse are really wild -- catch operator starts catching, retry starts retrying and emitting, violating invariant at the different place of code, and so on.

An alternative approach is to change how exception transparency works -- when upstream fails, do not overwrite an exception from the downstream, but suppress it and add it to the original exception

qwwdfsad added a commit that referenced this issue Nov 11, 2021
…from the upstream when the downstream has been failed, but also suppress such exceptions by the downstream one

It solves the problem of graceful shutdown: when the upstream fails unwillingly (e.g. file.close() has thrown in 'finally') block, we cannot treat is as an exception transparency violation (hint: 'finally' is a shortcut for 'catch'+body), but we also cannot leave things as is, otherwise it leads to unforeseen consequences such as successful 'retry' and 'catch' operators that may, or may not, then fail with exception on attempt to emit.

Downstream exception supersedes the upstream exception only if it is not an instance of CancellationException, semantically emulating cancellation-friendly 'use' block.

Fixes #2860
yorickhenning pushed a commit to yorickhenning/kotlinx.coroutines that referenced this issue Jan 28, 2022
Kotlin#3017)

* Improve exception transparency: explicitly allow throwing exceptions from the upstream when the downstream has been failed, suppress the downstream exception by the new one, but still ignore it in the exception handling operators, that still consider the flow failed.

It solves the problem of graceful shutdown: when the upstream fails unwillingly (e.g. `file.close()` has thrown in `finally` block), we cannot treat it as an exception transparency violation (hint: `finally` is a shortcut for `catch` + body that rethrows in the end), but we also cannot leave things as is, otherwise, it leads to unforeseen consequences such as successful `retry` and `catch` operators that may, or may not, then fail with an exception on an attempt to emit.

Upstream exception supersedes the downstream exception only if it is not an instance of `CancellationException`, semantically emulating cancellation-friendly 'use' block.

Fixes Kotlin#2860
pablobaxter pushed a commit to pablobaxter/kotlinx.coroutines that referenced this issue Sep 14, 2022
Kotlin#3017)

* Improve exception transparency: explicitly allow throwing exceptions from the upstream when the downstream has been failed, suppress the downstream exception by the new one, but still ignore it in the exception handling operators, that still consider the flow failed.

It solves the problem of graceful shutdown: when the upstream fails unwillingly (e.g. `file.close()` has thrown in `finally` block), we cannot treat it as an exception transparency violation (hint: `finally` is a shortcut for `catch` + body that rethrows in the end), but we also cannot leave things as is, otherwise, it leads to unforeseen consequences such as successful `retry` and `catch` operators that may, or may not, then fail with an exception on an attempt to emit.

Upstream exception supersedes the downstream exception only if it is not an instance of `CancellationException`, semantically emulating cancellation-friendly 'use' block.

Fixes Kotlin#2860
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants