Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent unnecessary duplicated lines in backtraces #2780

Merged
merged 10 commits into from
Oct 1, 2021
205 changes: 135 additions & 70 deletions docs/asciidoc/debugging.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,28 @@ Consider the following stack trace:

.A typical Reactor stack trace
====
[source,java]
[source]
----
java.lang.IndexOutOfBoundsException: Source emitted more than one item
at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:129)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(FluxFlatMap.java:445)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:379)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)
at reactor.core.publisher.FluxRange$RangeSubscription.slowPath(FluxRange.java:154)
at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:109)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:162)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:332)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90)
at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:68)
at reactor.core.publisher.FluxMapFuseable.subscribe(FluxMapFuseable.java:63)
at reactor.core.publisher.FluxFlatMap.subscribe(FluxFlatMap.java:97)
at reactor.core.publisher.MonoSingle.subscribe(MonoSingle.java:58)
at reactor.core.publisher.Mono.subscribe(Mono.java:3096)
at reactor.core.publisher.Mono.subscribeWith(Mono.java:3204)
at reactor.core.publisher.Mono.subscribe(Mono.java:3090)
at reactor.core.publisher.Mono.subscribe(Mono.java:3057)
at reactor.core.publisher.Mono.subscribe(Mono.java:3029)
at reactor.guide.GuideTests.debuggingCommonStacktrace(GuideTests.java:995)
at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:129)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(FluxFlatMap.java:445)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:379)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)
at reactor.core.publisher.FluxRange$RangeSubscription.slowPath(FluxRange.java:154)
at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:109)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:162)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:332)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90)
at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:68)
at reactor.core.publisher.FluxMapFuseable.subscribe(FluxMapFuseable.java:63)
at reactor.core.publisher.FluxFlatMap.subscribe(FluxFlatMap.java:97)
at reactor.core.publisher.MonoSingle.subscribe(MonoSingle.java:58)
at reactor.core.publisher.Mono.subscribe(Mono.java:3096)
at reactor.core.publisher.Mono.subscribeWith(Mono.java:3204)
at reactor.core.publisher.Mono.subscribe(Mono.java:3090)
at reactor.core.publisher.Mono.subscribe(Mono.java:3057)
at reactor.core.publisher.Mono.subscribe(Mono.java:3029)
at reactor.guide.GuideTests.debuggingCommonStacktrace(GuideTests.java:995)
----
====

Expand Down Expand Up @@ -76,7 +76,9 @@ pre-existing `Flux` is subscribed to, as follows:
====
[source,java]
----
toDebug.subscribe(System.out::println, Throwable::printStackTrace);
toDebug
.subscribeOn(Schedulers.immediate())
.subscribe(System.out::println, Throwable::printStackTrace);
----
====

Expand Down Expand Up @@ -114,7 +116,7 @@ bit of experience, we can see that it is not ideal by itself in more advanced ca

Fortunately, Reactor comes with assembly-time instrumentation that is designed for debugging.

This is done by customizing the `Hooks.onOperator` hook at application start (or at
This is done by activating a global debug mode via the `Hooks.onOperatorDebug()` method at application start (or at
least before the incriminated `Flux` or `Mono` can be instantiated), as follows:

====
Expand All @@ -124,64 +126,76 @@ Hooks.onOperatorDebug();
----
====

This starts instrumenting the calls to the `Flux` (and `Mono`) operator methods (where
This starts instrumenting the calls to Reactor operator methods (where
they are assembled into the chain) by wrapping the construction of the operator and
capturing a stack trace there. Since this is done when the operator chain is declared, the
hook should be activated before that, so the safest way is to activate it right at the
start of your application.

Later on, if an exception occurs, the failing operator is able to refer to that capture
and append it to the stack trace. We call this captured assembly information a *traceback*.
and to rework the stack trace, appending additional information.

TIP: We call this captured assembly information a *traceback*.

In the next section, we see how the stack trace differs and how to interpret
that new information.

== Reading a Stack Trace in Debug Mode

When we reuse our initial example but activate the `operatorStacktrace` debug feature,
the stack trace is as follows:
several things happen:

1. The stack trace, which points to subscription site and is thus less interesting, is cut after the first frame and set aside.
2. A special suppressed exception is added to the original exception (or amended if already there).
3. A message is constructed for that special exception with several sections.
4. First section will trace back to the assembly site of the operator that fails.
5. Second section will attempt to display the chain(s) that are built from this operator and have seen the error propagate
6. Last section is the original stack trace


The full stack trace, once printed, is as follows:

====
[source,java]
[source]
----
java.lang.IndexOutOfBoundsException: Source emitted more than one item
at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:129)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:375) <1>
at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:127) <1>
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: <2>
Assembly trace from producer [reactor.core.publisher.MonoSingle] : <3>
reactor.core.publisher.Flux.single(Flux.java:7915)
reactor.guide.GuideTests.scatterAndGather(GuideTests.java:1017)
Error has been observed at the following site(s): <4>
*_______Flux.single ⇢ at reactor.guide.GuideTests.scatterAndGather(GuideTests.java:1017) <5>
|_ Mono.subscribeOn ⇢ at reactor.guide.GuideTests.debuggingActivated(GuideTests.java:1071) <6>
Stack trace: <7>
at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:127)
...
<2>
<8>
...
at reactor.core.publisher.Mono.subscribeWith(Mono.java:3204)
at reactor.core.publisher.Mono.subscribe(Mono.java:3090)
at reactor.core.publisher.Mono.subscribe(Mono.java:3057)
at reactor.core.publisher.Mono.subscribe(Mono.java:3029)
at reactor.guide.GuideTests.debuggingActivated(GuideTests.java:1000)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: <3>
Assembly trace from producer [reactor.core.publisher.MonoSingle] : <4>
reactor.core.publisher.Flux.single(Flux.java:6676)
reactor.guide.GuideTests.scatterAndGather(GuideTests.java:949)
reactor.guide.GuideTests.populateDebug(GuideTests.java:962)
org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
org.junit.rules.RunRules.evaluate(RunRules.java:20)
Error has been observed by the following operator(s): <5>
|_ Flux.single ⇢ reactor.guide.GuideTests.scatterAndGather(GuideTests.java:949) <6>
----
<1> This is new: We see the wrapper operator that captures the stack.
<2> Apart from that, the first section of the stack trace is still the same for the most
part, showing a bit of the operator's internals (so we removed a bit of the snippet here).
<3> This is where the traceback starts to appear.
<4> First, we get some details about where the operator was assembled.
<5> We also get a traceback of the error as it propagated through the operator chain,
at reactor.core.publisher.Mono.subscribeWith(Mono.java:4363)
at reactor.core.publisher.Mono.subscribe(Mono.java:4223)
at reactor.core.publisher.Mono.subscribe(Mono.java:4159)
at reactor.core.publisher.Mono.subscribe(Mono.java:4131)
at reactor.guide.GuideTests.debuggingActivated(GuideTests.java:1067)
----
<1> The original stack trace is truncated to a single frame.
<2> This is new: We see the wrapper operator that captures the stack.
This is where the traceback starts to appear.
<3> First, we get some details about where the operator was assembled.
<4> Second, we get a notion of operator chain(s) through which the error propagated,
from first to last (error site to subscribe site).
<6> Each operator that saw the error is mentioned along with the user class and line where it
was used.
<5> Each operator that saw the error is mentioned along with the user class and line where it
was used. Here we have a "root".
<6> Here we have a simple part of the chain.
<7> The rest of the stack trace is moved at the end...
<8> ...showing a bit of the operator's internals (so we removed a bit of the snippet here).
====

The captured stack trace is appended to the original error as a
suppressed `OnAssemblyException`. There are two parts to it, but the first section is the
suppressed `OnAssemblyException`. There are three parts to it, but the first section is the
most interesting. It shows the path of construction for the operator that triggered the
exception. Here, it shows that the `single` that caused our issue was created in the
`scatterAndGather` method, itself called from a `populateDebug` method that got executed
through JUnit.
exception. Here, it shows that the `single` that caused our issue was actually created in the
`scatterAndGather` method.

Now that we are armed with enough information to find the culprit, we can have
a meaningful look at that `scatterAndGather` method:
Expand Down Expand Up @@ -209,7 +223,7 @@ Now consider the following line in the stack trace:
====
[source]
----
Error has been observed by the following operator(s):
Error has been observed at the following site(s):
----
====

Expand All @@ -229,33 +243,82 @@ FakeRepository.findAllUserByName(Flux.just("pedro", "simon", "stephane"))
====

Now imagine that, inside `findAllUserByName`, there is a `map` that fails. Here,
we would see the following final traceback:
we would see the following in the second part of the traceback:

====
[source,java]
[source]
----
Error has been observed by the following operator(s):
|_ Flux.map ⇢ reactor.guide.FakeRepository.findAllUserByName(FakeRepository.java:27)
|_ Flux.map ⇢ reactor.guide.FakeRepository.findAllUserByName(FakeRepository.java:28)
|_ Flux.filter ⇢ reactor.guide.FakeUtils1.lambda$static$1(FakeUtils1.java:29)
|_ Flux.transform ⇢ reactor.guide.GuideDebuggingExtraTests.debuggingActivatedWithDeepTraceback(GuideDebuggingExtraTests.java:40)
|_ Flux.elapsed ⇢ reactor.guide.FakeUtils2.lambda$static$0(FakeUtils2.java:30)
|_ Flux.transform ⇢ reactor.guide.GuideDebuggingExtraTests.debuggingActivatedWithDeepTraceback(GuideDebuggingExtraTests.java:41)
Error has been observed at the following site(s):
*________Flux.map ⇢ at reactor.guide.FakeRepository.findAllUserByName(FakeRepository.java:27)
|_ Flux.map ⇢ at reactor.guide.FakeRepository.findAllUserByName(FakeRepository.java:28)
|_ Flux.filter ⇢ at reactor.guide.FakeUtils1.lambda$static$1(FakeUtils1.java:29)
|_ Flux.transform ⇢ at reactor.guide.GuideDebuggingExtraTests.debuggingActivatedWithDeepTraceback(GuideDebuggingExtraTests.java:39)
|_ Flux.elapsed ⇢ at reactor.guide.FakeUtils2.lambda$static$0(FakeUtils2.java:30)
|_ Flux.transform ⇢ at reactor.guide.GuideDebuggingExtraTests.debuggingActivatedWithDeepTraceback(GuideDebuggingExtraTests.java:40)
----
====

This corresponds to the section of the chain of operators that gets notified of the error:
This corresponds to the section of the chain(s) of operators that gets notified of the error:

. The exception originates in the first `map`.
. It is seen by a second `map` (both in fact correspond to the `findAllUserByName`
. The exception originates in the first `map`. This one is identified as a root by the `*` connector and the fact `_`
are used for indentation.
. The exception is seen by a second `map` (both in fact correspond to the `findAllUserByName`
method).
. It is then seen by a `filter` and a `transform`, which indicate that part of the chain
is constructed by a reusable transformation function (here, the `applyFilters` utility
method).
. Finally, it is seen by an `elapsed` and a `transform`. Once again, `elapsed` is applied
by the transformation function of that second transform.

TIP: As tracebacks are appended to original errors as suppressed exceptions, this can somewhat
In some cases where the same exception is propagated through multiple chains, the "root" marker `*_`
allows us to better separate such chains.
If a site is seen several time, there will be an `(observed x times)` after the call site information.

For instance, let us consider the following snippet:

====
[source,java]
----
public class MyClass {
public void myMethod() {
Flux<String> source = Flux.error(sharedError);
Flux<String> chain1 = source.map(String::toLowerCase).filter(s -> s.length() < 4);
Flux<String> chain2 = source.filter(s -> s.length() > 5).distinct();

Mono<Void> when = Mono.when(chain1, chain2);
}
}
----
====

In the code above, error propagates to the `when`, going through two separate chains `chain1` and `chain2`.
It would lead to a traceback containing the following:

====
[source]
----
Error has been observed at the following site(s):
*_____Flux.error ⇢ at myClass.myMethod(MyClass.java:3) (observed 2 times)
|_ Flux.map ⇢ at myClass.myMethod(MyClass.java:4)
|_ Flux.filter ⇢ at myClass.myMethod(MyClass.java:4)
*_____Flux.error ⇢ at myClass.myMethod(MyClass.java:3) (observed 2 times)
|_ Flux.filter ⇢ at myClass.myMethod(MyClass.java:5)
|_ Flux.distinct ⇢ at myClass.myMethod(MyClass.java:5)
*______Mono.when ⇢ at myClass.myMethod(MyClass.java:7)
----
====

We see that:

. there are 3 "root" elements (the `when` is the true root).
. two chains starting from `Flux.error` are visible.
. both chains seem to be based on the same `Flux.error` source (`observed 2 times`).
. first chain is `Flux.error().map().filter`
. second chain is `Flux.error().filter().distinct()


TIP: *A note on tracebacks and suppressed exceptions*:
As tracebacks are appended to original errors as suppressed exceptions, this can somewhat
interfere with another type of exception that uses this mechanism: composite exceptions.
Such exceptions can be created directly via `Exceptions.multiple(Throwable...)`, or by some
operators that might join multiple erroring sources (like `Flux#flatMapDelayError`). They
Expand Down Expand Up @@ -304,6 +367,7 @@ cost than a regular `checkpoint`.
searching), as shown in the following example:

====
[source]
----
...
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Expand All @@ -318,6 +382,7 @@ initial message for the traceback, augmented with a `description`, as shown in t
following example:

====
[source]
----
Assembly trace from producer [reactor.core.publisher.ParallelSource], described as [descriptionCorrelation1234] : <1>
reactor.core.publisher.ParallelFlux.checkpoint(ParallelFlux.java:215)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ final class ConnectableFluxOnAssembly<T> extends InternalConnectableFluxOperator

@Override
public final CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) {
return FluxOnAssembly.wrapSubscriber(actual, source, stacktrace);
return FluxOnAssembly.wrapSubscriber(actual, source, this, stacktrace);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ final class FluxCallableOnAssembly<T> extends InternalFluxOperator<T, T>

@Override
public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) {
return FluxOnAssembly.wrapSubscriber(actual, source, stacktrace);
return FluxOnAssembly.wrapSubscriber(actual, source, this, stacktrace);
}

@SuppressWarnings("unchecked")
Expand Down