Skip to content

Commit

Permalink
Polish Mono cache operators documentation (#2790)
Browse files Browse the repository at this point in the history
This commit polishes the javadoc of most cache variants in Mono:
 - explicit the behavior in case of concurrent subscriptions for
 `cache()`, `cache(Duration)` and `cache(Function)`
 - polish (un)ordered lists and paragraph breaks in `cacheInvalidateIf`
 and `cacheInvalidateWhen` javadoc.

Fixes #2782.
  • Loading branch information
simonbasle committed Sep 24, 2021
1 parent bf3ea41 commit a050dcf
Showing 1 changed file with 97 additions and 58 deletions.
155 changes: 97 additions & 58 deletions reactor-core/src/main/java/reactor/core/publisher/Mono.java
Original file line number Diff line number Diff line change
Expand Up @@ -1800,6 +1800,9 @@ public final <E> Mono<E> cast(Class<E> clazz) {
* <p>
* Once the first subscription is made to this {@link Mono}, the source is subscribed to and
* the signal will be cached, indefinitely. This process cannot be cancelled.
* <p>
* In the face of multiple concurrent subscriptions, this operator ensures that only one
* subscription is made to the source.
*
* @return a replaying {@link Mono}
*/
Expand All @@ -1816,8 +1819,10 @@ public final Mono<T> cache() {
* <p>
* <img class="marble" src="doc-files/marbles/cacheWithTtlForMono.svg" alt="">
* <p>
* First subscription after the cache has been emptied triggers cache loading (ie
* subscription to the source), which cannot be cancelled.
* Cache loading (ie. subscription to the source) is triggered atomically by the first
* subscription to an uninitialized or expired cache, which guarantees that a single
* cache load happens at a time (and other subscriptions will get notified of the newly
* cached value when it arrives).
*
* @return a replaying {@link Mono}
*/
Expand All @@ -1834,8 +1839,10 @@ public final Mono<T> cache(Duration ttl) {
* <p>
* <img class="marble" src="doc-files/marbles/cacheWithTtlForMono.svg" alt="">
* <p>
* First subscription after the cache has been emptied triggers cache loading (ie
* subscription to the source), which cannot be cancelled.
* Cache loading (ie. subscription to the source) is triggered atomically by the first
* subscription to an uninitialized or expired cache, which guarantees that a single
* cache load happens at a time (and other subscriptions will get notified of the newly
* cached value when it arrives).
*
* @param ttl Time-to-live for each cached item and post termination.
* @param timer the {@link Scheduler} on which to measure the duration.
Expand Down Expand Up @@ -1866,8 +1873,10 @@ public final Mono<T> cache(Duration ttl, Scheduler timer) {
* Note that subscribers that come in perfectly simultaneously could receive the same
* cached signal even if the TTL is set to zero.
* <p>
* First subscription after the cache has been emptied triggers cache loading (ie
* subscription to the source), which cannot be cancelled.
* Cache loading (ie. subscription to the source) is triggered atomically by the first
* subscription to an uninitialized or expired cache, which guarantees that a single
* cache load happens at a time (and other subscriptions will get notified of the newly
* cached value when it arrives).
*
* @param ttlForValue the TTL-generating {@link Function} invoked when source is valued
* @param ttlForError the TTL-generating {@link Function} invoked when source is erroring
Expand Down Expand Up @@ -1900,8 +1909,10 @@ public final Mono<T> cache(Function<? super T, Duration> ttlForValue,
* Note that subscribers that come in perfectly simultaneously could receive the same
* cached signal even if the TTL is set to zero.
* <p>
* First subscription after the cache has been emptied triggers cache loading (ie
* subscription to the source), which cannot be cancelled.
* Cache loading (ie. subscription to the source) is triggered atomically by the first
* subscription to an uninitialized or expired cache, which guarantees that a single
* cache load happens at a time (and other subscriptions will get notified of the newly
* cached value when it arrives).
*
* @param ttlForValue the TTL-generating {@link Function} invoked when source is valued
* @param ttlForError the TTL-generating {@link Function} invoked when source is erroring
Expand Down Expand Up @@ -1940,39 +1951,59 @@ public final Mono<T> cache(Function<? super T, Duration> ttlForValue,
* As this form of caching is explicitly value-oriented, empty source completion signals and error signals are NOT
* cached. It is always possible to use {@link #materialize()} to cache these (further using {@link #filter(Predicate)}
* if one wants to only consider empty sources or error sources).
*
* <p>
* Predicate is applied differently depending on whether the cache is populated or not:
* IF EMPTY
* - first incoming subscriber creates a new COORDINATOR and adds itself
*
* IF COORDINATOR
* - each incoming subscriber is added to the current "batch" (COORDINATOR)
* - once the value is received, the predicate is applied ONCE
* - mismatch: all the batch is terminated with an error
* - we're back to init state, next subscriber will trigger a new coordinator and a new subscription
* - ok: all the batch is completed with the value
* - cache is now POPULATED
*
* IF POPULATED
* - each incoming subscriber causes the predicate to apply
* - if ok: complete that subscriber with the value
* - if mismatch, swap the current POPULATED with a new COORDINATOR and add the subscriber to that coordinator
* - imagining a race between sub1 and sub2:
* - OK NOK will naturally lead to sub1 completing and sub2 being put on wait inside a new COORDINATOR
* - NOK NOK will race swap of POPULATED with COORDINATOR1 and COORDINATOR2 respectively
* - if sub1 swaps, sub2 will dismiss the COORDINATOR2 it failed to swap and loop back, see COORDINATOR1 and add itself
* - if sub2 swaps, the reverse happens
* - if value is populated in the time it takes for sub2 to loop back, sub2 sees a value and triggers the predicate again (hopefully passing)
*
* <ul>
* <li>IF EMPTY
* <ul><li>first incoming subscriber creates a new COORDINATOR and adds itself</li></ul>
* </li>
* <li>IF COORDINATOR
* <ol>
* <li>each incoming subscriber is added to the current "batch" (COORDINATOR)</li>
* <li>once the value is received, the predicate is applied ONCE
* <ol>
* <li>mismatch: all the batch is terminated with an error
* -> we're back to init state, next subscriber will trigger a new coordinator and a new subscription</li>
* <li>ok: all the batch is completed with the value -> cache is now POPULATED</li>
* </ol>
* </li>
* </ol>
* </li>
* <li>IF POPULATED
* <ol>
* <li>each incoming subscriber causes the predicate to apply</li>
* <li>if ok: complete that subscriber with the value</li>
* <li>if mismatch, swap the current POPULATED with a new COORDINATOR and add the subscriber to that coordinator</li>
* <li>imagining a race between sub1 and sub2:
* <ol>
* <li>OK NOK will naturally lead to sub1 completing and sub2 being put on wait inside a new COORDINATOR</li>
* <li>NOK NOK will race swap of POPULATED with COORDINATOR1 and COORDINATOR2 respectively
* <ol>
* <li>if sub1 swaps, sub2 will dismiss the COORDINATOR2 it failed to swap and loop back, see COORDINATOR1 and add itself</li>
* <li>if sub2 swaps, the reverse happens</li>
* <li>if value is populated in the time it takes for sub2 to loop back, sub2 sees a value and triggers the predicate again (hopefully passing)</li>
* </ol>
* </li>
* </ol>
* </li>
* </ol>
* </li>
* </ul>
* <p>
* Cancellation is only possible for downstream subscribers when they've been added to a COORDINATOR.
* Subscribers that are received when POPULATED will either be completed right away or (if the predicate fails) end up being added to a COORDINATOR.
*
* when cancelling a COORDINATOR-issued subscription:
* - removes itself from batch
* - if 0 subscribers remaining
* - swap COORDINATOR with EMPTY
* - COORDINATOR cancels its source
*
* <p>
* When cancelling a COORDINATOR-issued subscription:
* <ol>
* <li>removes itself from batch</li>
* <li>if 0 subscribers remaining
* <ol>
* <li>swap COORDINATOR with EMPTY</li>
* <li>COORDINATOR cancels its source</li>
* </ol>
* </li>
* </ol>
* <p>
* The fact that COORDINATOR cancels its source when no more subscribers remain is important, because it prevents issues with a never() source
* or a source that never produces a value passing the predicate (assuming timeouts on the subscriber).
*
Expand Down Expand Up @@ -2017,20 +2048,24 @@ public final Mono<T> cacheInvalidateIf(Predicate<? super T> invalidationPredicat
* If the cached value needs to be discarded in case of invalidation, use the {@link #cacheInvalidateWhen(Function, Consumer)} version.
* Note that some downstream subscribers might still be using or storing the value, for example if they
* haven't requested anything yet.
*
*
* <p>
* Trigger is generated only after a subscribers in the COORDINATOR have received the value, and only once.
* The only way to get out of the POPULATED state is to use the trigger, so there cannot be multiple trigger subscriptions, nor concurrent triggering.
*
* <p>
* Cancellation is only possible for downstream subscribers when they've been added to a COORDINATOR.
* Subscribers that are received when POPULATED will either be completed right away or (if the predicate fails) end up being added to a COORDINATOR.
*
* when cancelling a COORDINATOR-issued subscription:
* - removes itself from batch
* - if 0 subscribers remaining
* - swap COORDINATOR with EMPTY
* - COORDINATOR cancels its source
*
* <p>
* When cancelling a COORDINATOR-issued subscription:
* <ol>
* <li>removes itself from batch</li>
* <li>if 0 subscribers remaining
* <ol>
* <li>swap COORDINATOR with EMPTY</li>
* <li>COORDINATOR cancels its source</li>
* </ol>
* </li>
* </ol>
* <p>
* The fact that COORDINATOR cancels its source when no more subscribers remain is important, because it prevents issues with a never() source
* or a source that never produces a value passing the predicate (assuming timeouts on the subscriber).
*
Expand Down Expand Up @@ -2075,20 +2110,24 @@ public final Mono<T> cacheInvalidateWhen(Function<? super T, Mono<Void>> invalid
* Once a cached value is invalidated, it is passed to the provided {@link Consumer} (which MUST complete normally).
* Note that some downstream subscribers might still be using or storing the value, for example if they
* haven't requested anything yet.
*
*
* <p>
* Trigger is generated only after a subscribers in the COORDINATOR have received the value, and only once.
* The only way to get out of the POPULATED state is to use the trigger, so there cannot be multiple trigger subscriptions, nor concurrent triggering.
*
* <p>
* Cancellation is only possible for downstream subscribers when they've been added to a COORDINATOR.
* Subscribers that are received when POPULATED will either be completed right away or (if the predicate fails) end up being added to a COORDINATOR.
*
* when cancelling a COORDINATOR-issued subscription:
* - removes itself from batch
* - if 0 subscribers remaining
* - swap COORDINATOR with EMPTY
* - COORDINATOR cancels its source
*
* <p>
* When cancelling a COORDINATOR-issued subscription:
* <ol>
* <li>removes itself from batch</li>
* <li>if 0 subscribers remaining
* <ol>
* <li>swap COORDINATOR with EMPTY</li>
* <li>COORDINATOR cancels its source</li>
* </ol>
* </li>
* </ol>
* <p>
* The fact that COORDINATOR cancels its source when no more subscribers remain is important, because it prevents issues with a never() source
* or a source that never produces a value passing the predicate (assuming timeouts on the subscriber).
*
Expand Down

0 comments on commit a050dcf

Please sign in to comment.