Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Io Scheduler, Scheduled worker release (v2.x) #7162

Merged
merged 1 commit into from
Jan 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
22 changes: 18 additions & 4 deletions src/main/java/io/reactivex/internal/schedulers/IoScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ public final class IoScheduler extends Scheduler {
/** The name of the system property for setting the thread priority for this Scheduler. */
private static final String KEY_IO_PRIORITY = "rx2.io-priority";

/** The name of the system property for setting the release behaviour for this Scheduler. */
private static final String KEY_SCHEDULED_RELEASE = "rx2.io-scheduled-release";
static boolean USE_SCHEDULED_RELEASE;

static final CachedWorkerPool NONE;

static {
Expand All @@ -63,6 +67,8 @@ public final class IoScheduler extends Scheduler {

EVICTOR_THREAD_FACTORY = new RxThreadFactory(EVICTOR_THREAD_NAME_PREFIX, priority);

USE_SCHEDULED_RELEASE = Boolean.getBoolean(KEY_SCHEDULED_RELEASE);

NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY);
NONE.shutdown();
}
Expand Down Expand Up @@ -200,7 +206,7 @@ public int size() {
return pool.get().allWorkers.size();
}

static final class EventLoopWorker extends Scheduler.Worker {
static final class EventLoopWorker extends Scheduler.Worker implements Runnable {
private final CompositeDisposable tasks;
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;
Expand All @@ -217,12 +223,20 @@ static final class EventLoopWorker extends Scheduler.Worker {
public void dispose() {
if (once.compareAndSet(false, true)) {
tasks.dispose();

// releasing the pool should be the last action
pool.release(threadWorker);
if (USE_SCHEDULED_RELEASE) {
threadWorker.scheduleActual(this, 0, TimeUnit.NANOSECONDS, null);
} else {
// releasing the pool should be the last action
pool.release(threadWorker);
}
}
}

@Override
public void run() {
pool.release(threadWorker);
}

@Override
public boolean isDisposed() {
return once.get();
Expand Down
19 changes: 19 additions & 0 deletions src/main/java/io/reactivex/schedulers/Schedulers.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
* <ul>
* <li>{@code rx2.io-keep-alive-time} (long): sets the keep-alive time of the {@link #io()} Scheduler workers, default is {@link IoScheduler#KEEP_ALIVE_TIME_DEFAULT}</li>
* <li>{@code rx2.io-priority} (int): sets the thread priority of the {@link #io()} Scheduler, default is {@link Thread#NORM_PRIORITY}</li>
* <li>{@code rx2.io-scheduled-release} (boolean): {@code true} sets the worker release mode of the
* {@link #io()} Scheduler to <em>scheduled</em>, default is {@code false} for <em>eager</em> mode.</li>
* <li>{@code rx2.computation-threads} (int): sets the number of threads in the {@link #computation()} Scheduler, default is the number of available CPUs</li>
* <li>{@code rx2.computation-priority} (int): sets the thread priority of the {@link #computation()} Scheduler, default is {@link Thread#NORM_PRIORITY}</li>
* <li>{@code rx2.newthread-priority} (int): sets the thread priority of the {@link #newThread()} Scheduler, default is {@link Thread#NORM_PRIORITY}</li>
Expand Down Expand Up @@ -113,6 +115,8 @@ private Schedulers() {
* <ul>
* <li>{@code rx2.computation-threads} (int): sets the number of threads in the {@link #computation()} Scheduler, default is the number of available CPUs</li>
* <li>{@code rx2.computation-priority} (int): sets the thread priority of the {@link #computation()} Scheduler, default is {@link Thread#NORM_PRIORITY}</li>
* <li>{@code rx2.io-scheduled-release} (boolean): {@code true} sets the worker release mode of the
* {@code #io()} Scheduler to <em>scheduled</em>, default is {@code false} for <em>eager</em> mode.</li>
* </ul>
* <p>
* The default value of this scheduler can be overridden at initialization time via the
Expand All @@ -129,6 +133,21 @@ private Schedulers() {
* <p>Operators on the base reactive classes that use this scheduler are marked with the
* &#64;{@link io.reactivex.annotations.SchedulerSupport SchedulerSupport}({@link io.reactivex.annotations.SchedulerSupport#COMPUTATION COMPUTATION})
* annotation.
* <p>
* When the {@link Scheduler.Worker} is disposed, the underlying worker can be released to the cached worker pool in two modes:
* <ul>
* <li>In <em>eager</em> mode (default), the underlying worker is returned immediately to the cached worker pool
* and can be reused much quicker by operators. The drawback is that if the currently running task doesn't
* respond to interruption in time or at all, this may lead to delays or deadlock with the reuse use of the
* underlying worker.
* </li>
* <li>In <em>scheduled</em> mode (enabled via the system parameter {@code rx2.io-scheduled-release}
* set to {@code true}), the underlying worker is returned to the cached worker pool only after the currently running task
* has finished. This can help prevent premature reuse of the underlying worker and likely won't lead to delays or
* deadlock with such reuses. The drawback is that the delay in release may lead to an excess amount of underlying
* workers being created.
* </li>
* </ul>
* @return a {@link Scheduler} meant for computation-bound work
*/
@NonNull
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.reactivex.internal.schedulers;

import io.reactivex.Completable;
import io.reactivex.Flowable;
import io.reactivex.annotations.NonNull;
import io.reactivex.functions.Function;
import io.reactivex.schedulers.Schedulers;
import org.junit.Test;

import java.util.concurrent.TimeUnit;

public class IoScheduledReleaseTest {

/* This test will be stuck in a deadlock if IoScheduler.USE_SCHEDULED_RELEASE is not set */
@Test
public void scheduledRelease() {
boolean savedScheduledRelease = IoScheduler.USE_SCHEDULED_RELEASE;
IoScheduler.USE_SCHEDULED_RELEASE = true;
try {
Flowable.just("item")
.observeOn(Schedulers.io())
.firstOrError()
.map(new Function<String, String>() {
@Override
public String apply(@NonNull final String item) throws Exception {
for (int i = 0; i < 50; i++) {
Completable.complete()
.observeOn(Schedulers.io())
.blockingAwait();
}
return "Done";
}
})
.ignoreElement()
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertComplete();
} finally {
IoScheduler.USE_SCHEDULED_RELEASE = savedScheduledRelease;
}
}
}