Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add disposeGracefully method to Scheduler #3089

Merged
merged 35 commits into from
Aug 16, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
44f5f2e
Add Disposable.Graceful interface
chemicL Jun 23, 2022
6fbd57c
Improvements WIP
chemicL Jun 27, 2022
a4a5b0e
Merge branch '3.4.x' into 3068-schedulerGracefulClose
chemicL Jul 5, 2022
13c020f
WIP
chemicL Jul 7, 2022
92f18b8
WIP
chemicL Jul 11, 2022
9026186
WIP
chemicL Jul 11, 2022
0084749
Removed tests that race with task execution
chemicL Jul 13, 2022
0f2684c
Merge branch '3.4.x' into 3068-schedulerGracefulClose
chemicL Jul 13, 2022
3ed8ad3
BoundedElasticScheduler disposeGracefully rework
chemicL Jul 13, 2022
d1af720
Optimizing BoundedElasticScheduler disposal logic
chemicL Jul 14, 2022
0e801f0
WIP more concurrency tests, fixes
chemicL Jul 18, 2022
d10d98a
Migrated BoundedElasticScheduler JCStress tests to RaceTestUtils
chemicL Jul 22, 2022
52f1cf0
plain volatile access
chemicL Jul 22, 2022
b0fc331
Using for loops instead of getAndUpdate
chemicL Jul 22, 2022
5703eb8
lazy set
chemicL Jul 22, 2022
8b4bc63
Tests improvements
chemicL Jul 26, 2022
7a0452f
Generic SchedulerState
chemicL Jul 28, 2022
3dc8dca
Schedulers imports - avoid wildcards
chemicL Jul 28, 2022
8399b5b
Simplify start flow
chemicL Jul 28, 2022
9f5a7eb
Merge branch '3.4.x' into 3068-schedulerGracefulClose
chemicL Jul 28, 2022
8dd4afb
Javadoc
chemicL Jul 28, 2022
2d2aa4e
Avoid looping in BoundedElasticScheduler start and disposal
chemicL Aug 2, 2022
bd03603
Simplify state encapsulation in BoundedElasticScheduler
chemicL Aug 2, 2022
298c051
Simplified, preventing leaks
chemicL Aug 2, 2022
5e72d2e
Adjusting remaining schedulers
chemicL Aug 2, 2022
255878f
Removed dependency on BoundedState counter for state management
chemicL Aug 2, 2022
cbe6663
Ensured atomicity in BoundedServices.dispose() and properly draining …
chemicL Aug 3, 2022
6690a91
Moved Disposable.Graceful to Scheduler and removed gracePeriod param
chemicL Aug 10, 2022
fcc5079
Removed exceptions from start()
chemicL Aug 10, 2022
162cf2a
Merge branch '3.4.x' into 3068-schedulerGracefulClose
chemicL Aug 10, 2022
709f385
JCStress tests rework to avoid time validation, just state consistenc…
chemicL Aug 10, 2022
cb8373c
Add exclusion for japicmp
chemicL Aug 10, 2022
055f72c
Moved more thorough dispose validation to unit tests
chemicL Aug 11, 2022
c0c1eab
unused imports and copyright
chemicL Aug 11, 2022
c4a5a99
Merge branch '3.4.x' into 3068-schedulerGracefulClose
chemicL Aug 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright (c) 2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reactor.core.scheduler;

import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.openjdk.jcstress.annotations.Actor;
import org.openjdk.jcstress.annotations.Arbiter;
import org.openjdk.jcstress.annotations.Expect;
import org.openjdk.jcstress.annotations.JCStressTest;
import org.openjdk.jcstress.annotations.Outcome;
import org.openjdk.jcstress.annotations.State;
import org.openjdk.jcstress.infra.results.ZZZ_Result;
import org.openjdk.jcstress.infra.results.Z_Result;

public abstract class SingleSchedulerStressTest {

@JCStressTest
@Outcome(id = {"true"}, expect = Expect.ACCEPTABLE, desc = "Task scheduled after racing restart")
@State
public static class StartDisposeStressTest {

final SingleScheduler scheduler = new SingleScheduler(Thread::new);
{
scheduler.start();
}

@Actor
public void restart1() {
scheduler.disposeGracefully(Duration.ofMillis(100)).block(Duration.ofMillis(100));
scheduler.start();
}

@Actor
public void restart2() {
scheduler.disposeGracefully(Duration.ofMillis(100)).block(Duration.ofMillis(100));
scheduler.start();
}

@Arbiter
public void arbiter(Z_Result r) {
// At this stage, at least one actor called scheduler.start(),
// so we should be able to execute a task.
final CountDownLatch latch = new CountDownLatch(1);
if (scheduler.isDisposed()) {
return;
}
scheduler.schedule(() -> {
r.r1 = true;
latch.countDown();
});
try {
r.r1 = latch.await(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException ignored) {
}
scheduler.dispose();
}
}

@JCStressTest
@Outcome(id = {"true, true, true"}, expect = Expect.ACCEPTABLE,
desc = "Both time out, task gets rejected, scheduler disposed eventually")
@State
public static class DisposeGracefullyStressTest {
final SingleScheduler scheduler = new SingleScheduler(Thread::new);
final CountDownLatch arbiterLatch = new CountDownLatch(1);
{
scheduler.start();
// Schedule a task that disallows graceful closure until the arbiter kicks in
// to make sure that actors fail while waiting.
scheduler.schedule(() -> {
while (true) {
try {
if (arbiterLatch.await(20, TimeUnit.MILLISECONDS)) {
return;
}
}
catch (InterruptedException ignored) {
}
}
});
}

@Actor
public void disposeGracefully1(ZZZ_Result r) {
long start = System.nanoTime();
try {
scheduler.disposeGracefully(Duration.ofMillis(20)).block();
} catch (Exception e) {
long duration = System.nanoTime() - start;
// Validate that the wait took non-zero time.
r.r1 = (e.getCause() instanceof TimeoutException) && Duration.ofNanos(duration).toMillis() > 15;
}
}

@Actor
public void disposeGracefully2(ZZZ_Result r) {
long start = System.nanoTime();
try {
scheduler.disposeGracefully(Duration.ofMillis(20)).block();
} catch (Exception e) {
long duration = System.nanoTime() - start;
// Validate that the wait took non-zero time.
r.r2 = (e.getCause() instanceof TimeoutException) && Duration.ofNanos(duration).toMillis() > 15;
}
}

@Arbiter
public void arbiter(ZZZ_Result r) {
// Release the task blocking graceful closure.
arbiterLatch.countDown();
try {
scheduler.schedule(() -> {});
} catch (RejectedExecutionException e) {
scheduler.disposeGracefully(Duration.ofMillis(20)).block();
r.r3 = scheduler.isDisposed() && scheduler.state.executor.isTerminated();
}
}
}
}
11 changes: 10 additions & 1 deletion reactor-core/src/main/java/reactor/core/Disposable.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2015-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2015-2022 VMware Inc. or its affiliates, All Rights Reserved.
chemicL marked this conversation as resolved.
Show resolved Hide resolved
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,9 +16,11 @@

package reactor.core;

import java.time.Duration;
chemicL marked this conversation as resolved.
Show resolved Hide resolved
import java.util.Collection;
import java.util.function.Supplier;

import reactor.core.publisher.Mono;
chemicL marked this conversation as resolved.
Show resolved Hide resolved
import reactor.util.annotation.Nullable;

/**
Expand Down Expand Up @@ -165,4 +167,11 @@ default boolean addAll(Collection<? extends Disposable> ds) {
*/
int size();
}

// TODO(dj): add javadoc
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the semantics of disposeGracefully may vary widely, so I would make the documented contract say that explicitly (eg. "each class implementing this trait should define how subsequent calls behave during the grace period and after it")

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one limitation I'm thinking of with this API is that most of the time the underlying resource(s) being disposed gracefully will be atomically swapped out. Which means that even if one re-subscribes to the Mono, including with onErrorResume() or retry(), the underlying resources won't be reachable anymore.

thus, there will be no way of composing operators to fall back to a "hard" shutdown once the graceful shutdown is initiated.

I'm thinking this is fine if documented. The recommendation for implementors should probably be to trigger a hard dispose at the end of the gracePeriod THEN propagate a TimeoutException, noting that it only serves as a warning / logging but cannot be recovered.

Copy link
Member Author

@chemicL chemicL Jun 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed the possible contracts here with @OlegDokuka. The approach with forceful shutdown before propagating the TimeoutException and non-recoverable errors might be confusing to users if they don't read the specific Scheduler documentation, but it has some advantages implementation wise.
Another approach could be to propagate a retry-able error and allow re-initiating the shutdown() + awaitTermination(...) procedure, while also allowing for an explicit final call to explicit shutdownNow() when desired. I'll go back to the original issue and ask for opinion from the user's perspective to guide the design.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My personal opinion is that once we fix specific behavior (e.g. call shutdownNow() if a timeout or InteruptedException) then we probably will end up with everyone doing scheduler.disposeGracefully(Duration.ofHours(9999999)).subscribe() or then complaining that they did not wont to have shutdownNow called but rather retry later

Another thought on TimoutException - any exception is useless if we can not do anything useful after that. I'm not sure that logging such an event makes any sense. This exception is just a fact that we forced shutdown process so a user just has to take it. Also, taking into account the impl details - all other active subscribers are going to get the same notification but the other late subscriber will not get it, then it is going to be too confusing so even having it documented will not resolve this confusion.

My personal recommendation is to prefer flexibility over fixed behavior. One can always write something like the following to mimic what we can hardcode

scheduler.disposeGracefully(Duration.ofMillis(100))
    .retryWhen(Retry.backoff(5, Duration.ofMillis(50)))
    .onErrorResume(e -> Mono.fromRunnable(scheduler::dispose));

interface Graceful extends Disposable {
default Mono<Void> disposeGracefully(Duration gracePeriod) {
return Mono.fromRunnable(this::dispose);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package reactor.core.scheduler;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList;
Expand Down Expand Up @@ -49,6 +50,9 @@
import reactor.core.Disposables;
import reactor.core.Exceptions;
import reactor.core.Scannable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;
Expand Down Expand Up @@ -102,6 +106,7 @@ public String toString() {
final Clock clock;
final ThreadFactory factory;
final long ttlMillis;
final Sinks.Empty<Void> disposedNotifier;

volatile BoundedServices boundedServices;
static final AtomicReferenceFieldUpdater<BoundedElasticScheduler, BoundedServices> BOUNDED_SERVICES =
Expand Down Expand Up @@ -131,6 +136,7 @@ public String toString() {
this.factory = threadFactory;
this.clock = Objects.requireNonNull(clock, "A Clock must be provided");
this.ttlMillis = ttlMillis;
this.disposedNotifier = Sinks.empty();

this.boundedServices = SHUTDOWN; //initially disposed, EVICTOR is also null
}
Expand Down Expand Up @@ -204,6 +210,25 @@ public void dispose() {
}
}

@Override
public Mono<Void> disposeGracefully(Duration gracePeriod) {
BoundedServices services = BOUNDED_SERVICES.get(this);
if (services != SHUTDOWN && BOUNDED_SERVICES.compareAndSet(this, services, SHUTDOWN)) {
ScheduledExecutorService e = EVICTOR.getAndSet(this, null);
Mono<Void> evictorDone;
if (e != null) {
Sinks.Empty<Void> done = Sinks.empty();
Schedulers.shutdownAndAwait(e, gracePeriod, done);
simonbasle marked this conversation as resolved.
Show resolved Hide resolved
evictorDone = done.asMono();
} else {
evictorDone = Mono.empty();
}
Mono.whenDelayError(services.disposeGracefully(gracePeriod), evictorDone)
.subscribe(v -> {}, disposedNotifier::tryEmitError, disposedNotifier::tryEmitEmpty);
}
return disposedNotifier.asMono();
}

@Override
public Disposable schedule(Runnable task) {
//tasks running once will call dispose on the BoundedState, decreasing its usage by one
Expand Down Expand Up @@ -321,7 +346,7 @@ public Worker createWorker() {
}


static final class BoundedServices extends AtomicInteger implements Disposable {
static final class BoundedServices extends AtomicInteger implements Disposable.Graceful {

/**
* Constant for this counter of live executors to reflect the whole pool has been
Expand All @@ -342,6 +367,7 @@ static final class BoundedServices extends AtomicInteger implements Disposable {
//duplicated Clock field from parent so that SHUTDOWN can be instantiated and partially used
final Clock clock;
final Deque<BoundedState> idleQueue;
final Sinks.Empty<Void> disposedNotifier = Sinks.empty();

volatile BoundedState[] busyArray;
static final AtomicReferenceFieldUpdater<BoundedServices, BoundedState[]> BUSY_ARRAY =
Expand Down Expand Up @@ -526,6 +552,20 @@ public void dispose() {
for (int i = 0; i < arr.length; i++) {
arr[i].shutdown();
}
disposedNotifier.tryEmitEmpty();
}

@Override
public Mono<Void> disposeGracefully(Duration gracePeriod) {
int disposed = getAndSet(DISPOSED);
if (disposed >= 0) {
BoundedState[] arr = BUSY_ARRAY.getAndSet(this, ALL_SHUTDOWN);
Mono.whenDelayError(
Flux.fromIterable(idleQueue).flatMap(bs -> bs.shutdownGracefully(gracePeriod)),
Flux.fromArray(arr).flatMap(bs -> bs.shutdownGracefully(gracePeriod))
).subscribe(v -> {}, disposedNotifier::tryEmitError, disposedNotifier::tryEmitEmpty);
}
return disposedNotifier.asMono();
}
}

Expand All @@ -543,6 +583,7 @@ static class BoundedState implements Disposable, Scannable {

final BoundedServices parent;
final ScheduledExecutorService executor;
final Sinks.Empty<Void> disposedNotifier;

long idleSinceTimestamp = -1L;

Expand All @@ -552,6 +593,7 @@ static class BoundedState implements Disposable, Scannable {
BoundedState(BoundedServices parent, ScheduledExecutorService executor) {
this.parent = parent;
this.executor = executor;
this.disposedNotifier = Sinks.empty();
}

/**
Expand Down Expand Up @@ -599,6 +641,7 @@ boolean tryEvict(long evictionTimestamp, long ttlMillis) {
if (elapsed >= ttlMillis) {
if (MARK_COUNT.compareAndSet(this, 0, EVICTED)) {
executor.shutdownNow();
disposedNotifier.tryEmitEmpty();
return true;
}
}
Expand All @@ -612,6 +655,7 @@ boolean tryEvict(long evictionTimestamp, long ttlMillis) {
* to this method (for APIs that take a {@link Disposable}).
*
* @see #shutdown()
* @see #shutdownGracefully(Duration)
* @see #dispose()
*/
void release() {
Expand Down Expand Up @@ -642,6 +686,19 @@ void shutdown() {
this.idleSinceTimestamp = -1L;
MARK_COUNT.set(this, EVICTED);
this.executor.shutdownNow();
this.disposedNotifier.tryEmitEmpty();
}

Mono<Void> shutdownGracefully(Duration gracePeriod) {
this.idleSinceTimestamp = -1L;
int inUse = MARK_COUNT.getAndSet(this, EVICTED);
if (inUse >= 0) {
// TODO(dj): consecutive calls to dispose could simply get an empty mono
// but they'd lose the ability to get notified once the shutdown is done
// so we need an field for the disposed signal
Schedulers.shutdownAndAwait(this.executor, gracePeriod, this.disposedNotifier);
}
return this.disposedNotifier.asMono();
}

/**
Expand Down