Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mono.share() is canceled on subscription disposal #2680

Closed
yamass opened this issue Apr 14, 2021 · 3 comments · Fixed by #2756
Closed

Mono.share() is canceled on subscription disposal #2680

yamass opened this issue Apr 14, 2021 · 3 comments · Fixed by #2756
Assignees
Labels
status/need-investigation This needs more in-depth investigation type/documentation A documentation update
Milestone

Comments

@yamass
Copy link

yamass commented Apr 14, 2021

Expected Behavior

When doing var sharedMono = innerMono.share(), the innerMono should not be canceled if calling sharedMono.subscribe().dispose().

At least, that's what the JavaDoc for Mono.share() suggests: It's worth noting this is an un-cancellable Subscription.

Actual Behavior

The innerMono is canceled.

Steps to Reproduce

		Mono.create(monoSink -> {
			monoSink.onCancel(() -> {
				System.out.println("CANCELED");
			});
			monoSink.success("asdf");
		})
				.subscribeOn(Schedulers.boundedElastic())
				.share()
				.subscribe()
				.dispose();

		Thread.sleep(1000);

Prints CANCELED.

By the way, innerMono.cache().subscribe().dispose does NOT cancel innerMono, although the JavaDoc makes no statement about cancellation behavior... (just replace share() with cache() in the example.

Possible Solution

This might be a bug in the implementation or documentation, or simply me not understanding the documentation right.

Please also note that the marble diagram for share() needs a fix, too. (see its cancel() arrow)

Also, please consider documenting the cancellation behavior of cache().

Meanwhile, I will use cache() as a workaround. It might actually also make sense to document the difference between these two methods.

Thanks a lot!

Your Environment

  • Reactor version(s) used: 3.4.5
  • JVM version (java -version): 14.0.1
  • OS and version (eg uname -a): MacOS 11.2.3
@reactorbot reactorbot added the ❓need-triage This issue needs triage, hasn't been looked at by a team member yet label Apr 14, 2021
@simonbasle
Copy link
Member

we need to backtrack a bit, ensure which behavior is the "correct" one and better document the two operators behavior in face of cancellation, indeed.

@simonbasle simonbasle added status/need-investigation This needs more in-depth investigation type/documentation A documentation update and removed ❓need-triage This issue needs triage, hasn't been looked at by a team member yet labels Apr 19, 2021
@simonbasle simonbasle added this to the 3.4.x Backlog milestone Apr 19, 2021
@simonbasle simonbasle self-assigned this Aug 9, 2021
@simonbasle
Copy link
Member

if we look at Flux#share() (which is actually flux.publish().refCount()), it does cancel when going from 1 subscriber down to 0 subscriber. Flux#cache() is flux.replay().autoConnect(), which doesn't cancel on subscriber removal.

but Mono is in another place entirely. Underneath the cover, share() is implemented via NextProcessor, which usually doesn't react to cancellations of subscribers EXCEPT the subscribe() one (the dumbed down version without any handlers).

actually, there is a special case in place for mono.share().subscribe() that returns the NextProcessor as the Disposable. So calling dispose() here actually terminates the underlying NextProcessor and cancels its source subscription if it has one.

That means that calling .subscribe().dispose() on the same instance of mono.share() twice will actually ignore that second subscription. it can be better illustrated by doing:

Mono<Integer> mono = mono.just(1).share();
mono.subscribe().dispose();
mono.block();

The block() throws a java.util.concurrent.CancellationException: Disposed.

We have to re-evaluate all of these internals :(

@simonbasle
Copy link
Member

I'm not sure what to do here in terms of timeline / milestone for changing this.
Can we consider that Flux.share().subscribe() returning a Disposable that is the same for all subsequent subscriptions() and effectively terminates the underlying processor a bug?

Opened #2756 to explore a potential fix

simonbasle added a commit that referenced this issue Sep 9, 2021
This commit makes changes to `NextProcessor`, splitting it into two
classes: one deals with the (deprecated) pure processor aspect while the
other implements the sink aspect.

In order to align `Mono.share()` behavior with that of `Flux.share()`,
several changes are introduced:

 - Each call to `mono.share().subscribe()` returns a dedicated
 `Disposable`
 - `mono.share()` behaves like a reference counted multicast, ie the
 same as `Flux#share`: individual subscribers can be cancelled and once
 all are gone, upstream is cancelled and next incoming subscriber will
 trigger an upstream re-subscription

We avoid impacting `Sinks.one()` usages by introducing a dedicated
`SinkOneMulticast` implementation.

Accordingly, relevant tests in `NextProcessorTest` are copied and
adapted in `SinkOneMulticastTest`.

Fixes #2680.
chemicL added a commit that referenced this issue Mar 6, 2024
Similarly to Flux#share, Mono#share also cancels the source when all
Subscribers have cancelled. This change improves the documentation.

Following #2680 and #2756 there exists a misalignment in the javadoc for
Mono#share method. Since cancelling the source is a fact, the
javadoc is now improved instead of changing the behaviour.

Resolves #3740
chemicL added a commit that referenced this issue Mar 6, 2024
Similarly to Flux#share, Mono#share also cancels the source when all
Subscribers have cancelled. This change improves the documentation.

Following #2680 and #2756 there exists a misalignment in the javadoc for
Mono#share method. Since cancelling the source is a fact, the
javadoc is now improved instead of changing the behaviour.

Resolves #3740
chemicL added a commit that referenced this issue Mar 7, 2024
Similarly to `Flux#share`, `Mono#share` also cancels the source when all
`Subscriber`s have cancelled. This change improves the documentation.

Following #2680 and #2756 there exists a misalignment in the javadoc for
`Mono#share` method. Since cancelling the source is a fact, the javadoc
is now improved instead of changing the behaviour.

Resolves #3740
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status/need-investigation This needs more in-depth investigation type/documentation A documentation update
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants