Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Polish Mono cache operators documentation #2790

Merged
merged 2 commits into from
Sep 24, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
163 changes: 105 additions & 58 deletions reactor-core/src/main/java/reactor/core/publisher/Mono.java
Original file line number Diff line number Diff line change
Expand Up @@ -1800,6 +1800,9 @@ public final <E> Mono<E> cast(Class<E> clazz) {
* <p>
* Once the first subscription is made to this {@link Mono}, the source is subscribed to and
* the signal will be cached, indefinitely. This process cannot be cancelled.
* <p>
* In the face of multiple concurrent subscriptions, this operator ensures that only one
* subscription is made to the source.
*
* @return a replaying {@link Mono}
*/
Expand All @@ -1816,8 +1819,12 @@ public final Mono<T> cache() {
* <p>
* <img class="marble" src="doc-files/marbles/cacheWithTtlForMono.svg" alt="">
* <p>
* First subscription after the cache has been emptied triggers cache loading (ie
* subscription to the source), which cannot be cancelled.
* Cache loading (ie. subscription to the source) can happen either at the very
* first subscription or after the cache has been cleared due to expiry.
* Once that upstream subscription is started, it cannot be cancelled.
* The operator will however prevent multiple concurrent subscriptions from triggering
* duplicated loading (only one load-triggering subscription can win at a time,
* and the others will get notified of the newly cached value when it arrives).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about the following:

Suggested change
* Cache loading (ie. subscription to the source) can happen either at the very
* first subscription or after the cache has been cleared due to expiry.
* Once that upstream subscription is started, it cannot be cancelled.
* The operator will however prevent multiple concurrent subscriptions from triggering
* duplicated loading (only one load-triggering subscription can win at a time,
* and the others will get notified of the newly cached value when it arrives).
* Cache loading (ie. subscription to the source) is triggered by the very
* first subscription to empty or expired cache.
* Once the cache loading is started, it cannot be cancelled.
* The operator guarantees that only a single cash loading may happen at a time
* (only one load-triggering subscription can win at a time,
* and the others will get notified of the newly cached value when it arrives).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same should be applied to the following similar changes if makes sence

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good suggestion. I actually took inspiration and reworded and simplified these sections even further in the last commit, let me know what you think

*
* @return a replaying {@link Mono}
*/
Expand All @@ -1834,8 +1841,12 @@ public final Mono<T> cache(Duration ttl) {
* <p>
* <img class="marble" src="doc-files/marbles/cacheWithTtlForMono.svg" alt="">
* <p>
* First subscription after the cache has been emptied triggers cache loading (ie
* subscription to the source), which cannot be cancelled.
* Cache loading (ie. subscription to the source) can happen either at the very
* first subscription or after the cache has been cleared due to expiry.
* Once that upstream subscription is started, it cannot be cancelled.
* The operator will however prevent multiple concurrent subscriptions from triggering
* duplicated loading (only one load-triggering subscription can win at a time,
* and the others will get notified of the newly cached value when it arrives).
*
* @param ttl Time-to-live for each cached item and post termination.
* @param timer the {@link Scheduler} on which to measure the duration.
Expand Down Expand Up @@ -1866,8 +1877,12 @@ public final Mono<T> cache(Duration ttl, Scheduler timer) {
* Note that subscribers that come in perfectly simultaneously could receive the same
* cached signal even if the TTL is set to zero.
* <p>
* First subscription after the cache has been emptied triggers cache loading (ie
* subscription to the source), which cannot be cancelled.
* Cache loading (ie. subscription to the source) can happen either at the very
* first subscription or after the cache has been cleared due to expiry.
* Once that upstream subscription is started, it cannot be cancelled.
* The operator will however prevent multiple concurrent subscriptions from triggering
* duplicated loading (only one load-triggering subscription can win at a time,
* and the others will get notified of the newly cached value when it arrives).
*
* @param ttlForValue the TTL-generating {@link Function} invoked when source is valued
* @param ttlForError the TTL-generating {@link Function} invoked when source is erroring
Expand Down Expand Up @@ -1900,8 +1915,12 @@ public final Mono<T> cache(Function<? super T, Duration> ttlForValue,
* Note that subscribers that come in perfectly simultaneously could receive the same
* cached signal even if the TTL is set to zero.
* <p>
* First subscription after the cache has been emptied triggers cache loading (ie
* subscription to the source), which cannot be cancelled.
* Cache loading (ie. subscription to the source) can happen either at the very
* first subscription or after the cache has been cleared due to expiry.
* Once that upstream subscription is started, it cannot be cancelled.
* The operator will however prevent multiple concurrent subscriptions from triggering
* duplicated loading (only one load-triggering subscription can win at a time,
* and the others will get notified of the newly cached value when it arrives).
*
* @param ttlForValue the TTL-generating {@link Function} invoked when source is valued
* @param ttlForError the TTL-generating {@link Function} invoked when source is erroring
Expand Down Expand Up @@ -1940,39 +1959,59 @@ public final Mono<T> cache(Function<? super T, Duration> ttlForValue,
* As this form of caching is explicitly value-oriented, empty source completion signals and error signals are NOT
* cached. It is always possible to use {@link #materialize()} to cache these (further using {@link #filter(Predicate)}
* if one wants to only consider empty sources or error sources).
*
* <p>
* Predicate is applied differently depending on whether the cache is populated or not:
* IF EMPTY
* - first incoming subscriber creates a new COORDINATOR and adds itself
*
* IF COORDINATOR
* - each incoming subscriber is added to the current "batch" (COORDINATOR)
* - once the value is received, the predicate is applied ONCE
* - mismatch: all the batch is terminated with an error
* - we're back to init state, next subscriber will trigger a new coordinator and a new subscription
* - ok: all the batch is completed with the value
* - cache is now POPULATED
*
* IF POPULATED
* - each incoming subscriber causes the predicate to apply
* - if ok: complete that subscriber with the value
* - if mismatch, swap the current POPULATED with a new COORDINATOR and add the subscriber to that coordinator
* - imagining a race between sub1 and sub2:
* - OK NOK will naturally lead to sub1 completing and sub2 being put on wait inside a new COORDINATOR
* - NOK NOK will race swap of POPULATED with COORDINATOR1 and COORDINATOR2 respectively
* - if sub1 swaps, sub2 will dismiss the COORDINATOR2 it failed to swap and loop back, see COORDINATOR1 and add itself
* - if sub2 swaps, the reverse happens
* - if value is populated in the time it takes for sub2 to loop back, sub2 sees a value and triggers the predicate again (hopefully passing)
*
* <ul>
* <li>IF EMPTY
* <ul><li>first incoming subscriber creates a new COORDINATOR and adds itself</li></ul>
* </li>
* <li>IF COORDINATOR
* <ol>
* <li>each incoming subscriber is added to the current "batch" (COORDINATOR)</li>
* <li>once the value is received, the predicate is applied ONCE
* <ol>
* <li>mismatch: all the batch is terminated with an error
* -> we're back to init state, next subscriber will trigger a new coordinator and a new subscription</li>
* <li>ok: all the batch is completed with the value -> cache is now POPULATED</li>
* </ol>
* </li>
* </ol>
* </li>
* <li>IF POPULATED
* <ol>
* <li>each incoming subscriber causes the predicate to apply</li>
* <li>if ok: complete that subscriber with the value</li>
* <li>if mismatch, swap the current POPULATED with a new COORDINATOR and add the subscriber to that coordinator</li>
* <li>imagining a race between sub1 and sub2:
* <ol>
* <li>OK NOK will naturally lead to sub1 completing and sub2 being put on wait inside a new COORDINATOR</li>
* <li>NOK NOK will race swap of POPULATED with COORDINATOR1 and COORDINATOR2 respectively
* <ol>
* <li>if sub1 swaps, sub2 will dismiss the COORDINATOR2 it failed to swap and loop back, see COORDINATOR1 and add itself</li>
* <li>if sub2 swaps, the reverse happens</li>
* <li>if value is populated in the time it takes for sub2 to loop back, sub2 sees a value and triggers the predicate again (hopefully passing)</li>
* </ol>
* </li>
* </ol>
* </li>
* </ol>
* </li>
* </ul>
* <p>
* Cancellation is only possible for downstream subscribers when they've been added to a COORDINATOR.
* Subscribers that are received when POPULATED will either be completed right away or (if the predicate fails) end up being added to a COORDINATOR.
*
* when cancelling a COORDINATOR-issued subscription:
* - removes itself from batch
* - if 0 subscribers remaining
* - swap COORDINATOR with EMPTY
* - COORDINATOR cancels its source
*
* <p>
* When cancelling a COORDINATOR-issued subscription:
* <ol>
* <li>removes itself from batch</li>
* <li>if 0 subscribers remaining
* <ol>
* <li>swap COORDINATOR with EMPTY</li>
* <li>COORDINATOR cancels its source</li>
* </ol>
* </li>
* </ol>
* <p>
* The fact that COORDINATOR cancels its source when no more subscribers remain is important, because it prevents issues with a never() source
* or a source that never produces a value passing the predicate (assuming timeouts on the subscriber).
*
Expand Down Expand Up @@ -2017,20 +2056,24 @@ public final Mono<T> cacheInvalidateIf(Predicate<? super T> invalidationPredicat
* If the cached value needs to be discarded in case of invalidation, use the {@link #cacheInvalidateWhen(Function, Consumer)} version.
* Note that some downstream subscribers might still be using or storing the value, for example if they
* haven't requested anything yet.
*
*
* <p>
* Trigger is generated only after a subscribers in the COORDINATOR have received the value, and only once.
* The only way to get out of the POPULATED state is to use the trigger, so there cannot be multiple trigger subscriptions, nor concurrent triggering.
*
* <p>
* Cancellation is only possible for downstream subscribers when they've been added to a COORDINATOR.
* Subscribers that are received when POPULATED will either be completed right away or (if the predicate fails) end up being added to a COORDINATOR.
*
* when cancelling a COORDINATOR-issued subscription:
* - removes itself from batch
* - if 0 subscribers remaining
* - swap COORDINATOR with EMPTY
* - COORDINATOR cancels its source
*
* <p>
* When cancelling a COORDINATOR-issued subscription:
* <ol>
* <li>removes itself from batch</li>
* <li>if 0 subscribers remaining
* <ol>
* <li>swap COORDINATOR with EMPTY</li>
* <li>COORDINATOR cancels its source</li>
* </ol>
* </li>
* </ol>
* <p>
* The fact that COORDINATOR cancels its source when no more subscribers remain is important, because it prevents issues with a never() source
* or a source that never produces a value passing the predicate (assuming timeouts on the subscriber).
*
Expand Down Expand Up @@ -2075,20 +2118,24 @@ public final Mono<T> cacheInvalidateWhen(Function<? super T, Mono<Void>> invalid
* Once a cached value is invalidated, it is passed to the provided {@link Consumer} (which MUST complete normally).
* Note that some downstream subscribers might still be using or storing the value, for example if they
* haven't requested anything yet.
*
*
* <p>
* Trigger is generated only after a subscribers in the COORDINATOR have received the value, and only once.
* The only way to get out of the POPULATED state is to use the trigger, so there cannot be multiple trigger subscriptions, nor concurrent triggering.
*
* <p>
* Cancellation is only possible for downstream subscribers when they've been added to a COORDINATOR.
* Subscribers that are received when POPULATED will either be completed right away or (if the predicate fails) end up being added to a COORDINATOR.
*
* when cancelling a COORDINATOR-issued subscription:
* - removes itself from batch
* - if 0 subscribers remaining
* - swap COORDINATOR with EMPTY
* - COORDINATOR cancels its source
*
* <p>
* When cancelling a COORDINATOR-issued subscription:
* <ol>
* <li>removes itself from batch</li>
* <li>if 0 subscribers remaining
* <ol>
* <li>swap COORDINATOR with EMPTY</li>
* <li>COORDINATOR cancels its source</li>
* </ol>
* </li>
* </ol>
* <p>
* The fact that COORDINATOR cancels its source when no more subscribers remain is important, because it prevents issues with a never() source
* or a source that never produces a value passing the predicate (assuming timeouts on the subscriber).
*
Expand Down