Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add disposeGracefully method to Scheduler #3089

Merged
merged 35 commits into from
Aug 16, 2022
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
44f5f2e
Add Disposable.Graceful interface
chemicL Jun 23, 2022
6fbd57c
Improvements WIP
chemicL Jun 27, 2022
a4a5b0e
Merge branch '3.4.x' into 3068-schedulerGracefulClose
chemicL Jul 5, 2022
13c020f
WIP
chemicL Jul 7, 2022
92f18b8
WIP
chemicL Jul 11, 2022
9026186
WIP
chemicL Jul 11, 2022
0084749
Removed tests that race with task execution
chemicL Jul 13, 2022
0f2684c
Merge branch '3.4.x' into 3068-schedulerGracefulClose
chemicL Jul 13, 2022
3ed8ad3
BoundedElasticScheduler disposeGracefully rework
chemicL Jul 13, 2022
d1af720
Optimizing BoundedElasticScheduler disposal logic
chemicL Jul 14, 2022
0e801f0
WIP more concurrency tests, fixes
chemicL Jul 18, 2022
d10d98a
Migrated BoundedElasticScheduler JCStress tests to RaceTestUtils
chemicL Jul 22, 2022
52f1cf0
plain volatile access
chemicL Jul 22, 2022
b0fc331
Using for loops instead of getAndUpdate
chemicL Jul 22, 2022
5703eb8
lazy set
chemicL Jul 22, 2022
8b4bc63
Tests improvements
chemicL Jul 26, 2022
7a0452f
Generic SchedulerState
chemicL Jul 28, 2022
3dc8dca
Schedulers imports - avoid wildcards
chemicL Jul 28, 2022
8399b5b
Simplify start flow
chemicL Jul 28, 2022
9f5a7eb
Merge branch '3.4.x' into 3068-schedulerGracefulClose
chemicL Jul 28, 2022
8dd4afb
Javadoc
chemicL Jul 28, 2022
2d2aa4e
Avoid looping in BoundedElasticScheduler start and disposal
chemicL Aug 2, 2022
bd03603
Simplify state encapsulation in BoundedElasticScheduler
chemicL Aug 2, 2022
298c051
Simplified, preventing leaks
chemicL Aug 2, 2022
5e72d2e
Adjusting remaining schedulers
chemicL Aug 2, 2022
255878f
Removed dependency on BoundedState counter for state management
chemicL Aug 2, 2022
cbe6663
Ensured atomicity in BoundedServices.dispose() and properly draining …
chemicL Aug 3, 2022
6690a91
Moved Disposable.Graceful to Scheduler and removed gracePeriod param
chemicL Aug 10, 2022
fcc5079
Removed exceptions from start()
chemicL Aug 10, 2022
162cf2a
Merge branch '3.4.x' into 3068-schedulerGracefulClose
chemicL Aug 10, 2022
709f385
JCStress tests rework to avoid time validation, just state consistenc…
chemicL Aug 10, 2022
cb8373c
Add exclusion for japicmp
chemicL Aug 10, 2022
055f72c
Moved more thorough dispose validation to unit tests
chemicL Aug 11, 2022
c0c1eab
unused imports and copyright
chemicL Aug 11, 2022
c4a5a99
Merge branch '3.4.x' into 3068-schedulerGracefulClose
chemicL Aug 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright (c) 2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reactor.core.scheduler;

import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

abstract class RacingDisposeGracefullyStressTest<T extends Scheduler> {
chemicL marked this conversation as resolved.
Show resolved Hide resolved

final T scheduler;
final CountDownLatch arbiterLatch = new CountDownLatch(1);

{
scheduler = createScheduler();
}

abstract T createScheduler();
abstract boolean isTerminated();

void awaitArbiter() {
while (true) {
try {
if (arbiterLatch.await(40, TimeUnit.MILLISECONDS)) {
return;
}
}
catch (InterruptedException ignored) {
}
}
}

void arbiterStarted() {
arbiterLatch.countDown();
}

boolean checkDisposeGracefullyTimesOut() {
long start = System.nanoTime();
try {
scheduler.disposeGracefully(Duration.ofMillis(40)).block();
} catch (Exception e) {
long duration = System.nanoTime() - start;
// Validate that the wait took non-zero time.
return (e.getCause() instanceof TimeoutException) && Duration.ofNanos(duration).toMillis() > 30;
}
return false;
}

boolean validateSchedulerDisposed() {
try {
scheduler.schedule(() -> {});
} catch (RejectedExecutionException e) {
scheduler.disposeGracefully(Duration.ofMillis(50)).block();
return scheduler.isDisposed() && isTerminated();
}
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,288 @@
/*
* Copyright (c) 2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reactor.core.scheduler;

import org.openjdk.jcstress.annotations.Actor;
import org.openjdk.jcstress.annotations.Arbiter;
import org.openjdk.jcstress.annotations.Expect;
import org.openjdk.jcstress.annotations.JCStressTest;
import org.openjdk.jcstress.annotations.Outcome;
import org.openjdk.jcstress.annotations.State;
import org.openjdk.jcstress.infra.results.ZZZ_Result;
import org.openjdk.jcstress.infra.results.Z_Result;

import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public abstract class SchedulersStressTest {

private static void restart(Scheduler scheduler) {
scheduler.disposeGracefully(Duration.ofMillis(100)).block(Duration.ofMillis(100));
try {
scheduler.start();
} catch (Exception ignored) {
}
}

private static boolean canScheduleTask(Scheduler scheduler) {
final CountDownLatch latch = new CountDownLatch(1);
if (scheduler.isDisposed()) {
return false;
}
scheduler.schedule(latch::countDown);
boolean taskDone = false;
try {
taskDone = latch.await(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException ignored) {
}
return taskDone;
}

@JCStressTest
@Outcome(id = {"true"}, expect = Expect.ACCEPTABLE, desc = "Task scheduled after racing restart")
@State
public static class SingleSchedulerStartDisposeStressTest {

final SingleScheduler scheduler = new SingleScheduler(Thread::new);
{
scheduler.start();
}

@Actor
public void restart1() {
restart(scheduler);
}

@Actor
public void restart2() {
restart(scheduler);
}

@Arbiter
public void arbiter(Z_Result r) {
// At this stage, at least one actor called scheduler.start(),
// so we should be able to execute a task.
r.r1 = canScheduleTask(scheduler);
scheduler.dispose();
}
}

@JCStressTest
@Outcome(id = {"true"}, expect = Expect.ACCEPTABLE, desc = "Task scheduled after racing restart")
@State
public static class ParallelSchedulerStartDisposeStressTest {

final ParallelScheduler scheduler = new ParallelScheduler(4, Thread::new);
{
scheduler.start();
}

@Actor
public void restart1() {
restart(scheduler);
}

@Actor
public void restart2() {
restart(scheduler);
}

@Arbiter
public void arbiter(Z_Result r) {
// At this stage, at least one actor called scheduler.start(),
// so we should be able to execute a task.
r.r1 = canScheduleTask(scheduler);
scheduler.dispose();
}
}

@JCStressTest
@Outcome(id = {"true, true, true"}, expect = Expect.ACCEPTABLE,
desc = "Both time out, task gets rejected, scheduler disposed eventually")
@State
public static class SingleSchedulerDisposeGracefullyStressTest extends RacingDisposeGracefullyStressTest<SingleScheduler> {

@Override
SingleScheduler createScheduler() {
return new SingleScheduler(Thread::new);
}

@Override
boolean isTerminated() {
return scheduler.state.currentResource.isTerminated();
}

{
scheduler.start();
// Schedule a task that disallows graceful closure until the arbiter kicks in
// to make sure that actors fail while waiting.
scheduler.schedule(this::awaitArbiter);
}

@Actor
public void disposeGracefully1(ZZZ_Result r) {
r.r1 = checkDisposeGracefullyTimesOut();
chemicL marked this conversation as resolved.
Show resolved Hide resolved
}

@Actor
public void disposeGracefully2(ZZZ_Result r) {
r.r2 = checkDisposeGracefullyTimesOut();
}

@Arbiter
public void arbiter(ZZZ_Result r) {
// Release the task blocking graceful closure.
arbiterStarted();
r.r3 = validateSchedulerDisposed();
}
}

@JCStressTest
@Outcome(id = {"true, true, true"}, expect = Expect.ACCEPTABLE,
desc = "Both time out, task gets rejected, scheduler disposed eventually")
@State
public static class ParallelSchedulerDisposeGracefullyStressTest extends RacingDisposeGracefullyStressTest<ParallelScheduler> {

@Override
ParallelScheduler createScheduler() {
return new ParallelScheduler(10, Thread::new);
}

@Override
boolean isTerminated() {
assert scheduler.state.initialResource != null;
for (ScheduledExecutorService executor : scheduler.state.initialResource) {
if (!executor.isTerminated()) {
return false;
}
}
return true;
}

{
scheduler.start();
// Schedule a task that disallows graceful closure until the arbiter kicks in
// to make sure that actors fail while waiting.
scheduler.schedule(this::awaitArbiter);
}

@Actor
public void disposeGracefully1(ZZZ_Result r) {
r.r1 = checkDisposeGracefullyTimesOut();
}

@Actor
public void disposeGracefully2(ZZZ_Result r) {
r.r2 = checkDisposeGracefullyTimesOut();
}

@Arbiter
public void arbiter(ZZZ_Result r) {
// Release the task blocking graceful closure.
arbiterStarted();
r.r3 = validateSchedulerDisposed();
}
}

@JCStressTest
@Outcome(id = {"true"}, expect = Expect.ACCEPTABLE, desc = "Task scheduled after racing restart")
@State
public static class BoundedElasticSchedulerStartDisposeStressTest {

final BoundedElasticScheduler scheduler = new BoundedElasticScheduler(1, 1, Thread::new, 5);
{
scheduler.start();
}

@Actor
public void restart1() {
restart(scheduler);
}

@Actor
public void restart2() {
restart(scheduler);
}

@Arbiter
public void arbiter(Z_Result r) {
// At this stage, at least one actor called scheduler.start(),
// so we should be able to execute a task.
r.r1 = canScheduleTask(scheduler);
scheduler.dispose();
}
}

@JCStressTest
@Outcome(id = {"true, true, true"}, expect = Expect.ACCEPTABLE,
desc = "Both time out, task gets rejected, scheduler disposed eventually")
@State
public static class BoundedElasticSchedulerDisposeGracefullyStressTest
extends RacingDisposeGracefullyStressTest<BoundedElasticScheduler> {

@Override
BoundedElasticScheduler createScheduler() {
return new BoundedElasticScheduler(4, 4, Thread::new, 5);
}

@Override
boolean isTerminated() {
for (BoundedElasticScheduler.BoundedState bs :
scheduler.state.currentResource.busyStates.array) {
if (!bs.executor.isTerminated()) {
return false;
}
}
if (!scheduler.state.initialResource.idleQueue.isEmpty()) {
return false;
}
for (BoundedElasticScheduler.BoundedState bs :
scheduler.state.initialResource.busyStates.array) {
if (!bs.executor.isTerminated()) {
return false;
}
}
return scheduler.state.currentResource.idleQueue.isEmpty();
}

{
scheduler.start();
// Schedule a task that disallows graceful closure until the arbiter kicks in
// to make sure that actors fail while waiting.
scheduler.schedule(this::awaitArbiter);
}

@Actor
public void disposeGracefully1(ZZZ_Result r) {
r.r1 = checkDisposeGracefullyTimesOut();
}

@Actor
public void disposeGracefully2(ZZZ_Result r) {
r.r2 = checkDisposeGracefullyTimesOut();
}

@Arbiter
public void arbiter(ZZZ_Result r) {
// Release the task blocking graceful closure.
arbiterStarted();
r.r3 = validateSchedulerDisposed();
}
}
}