Skip to content

Commit

Permalink
Add a J2kt super-source implementation of DirectExecutorService that …
Browse files Browse the repository at this point in the history
…doesn't rely on other unsupported classes.

RELNOTES=n/a
PiperOrigin-RevId: 633880474
  • Loading branch information
stefanhaustein authored and Google Java Core Libraries committed May 15, 2024
1 parent cc28751 commit 7c934a2
Show file tree
Hide file tree
Showing 4 changed files with 258 additions and 214 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Copyright (C) 2024 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package com.google.common.util.concurrent;

import com.google.common.annotations.GwtIncompatible;
import com.google.common.annotations.J2ktIncompatible;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;

/** See newDirectExecutorService javadoc for behavioral notes. */
@J2ktIncompatible // Emulated
@GwtIncompatible
@ElementTypesAreNonnullByDefault
final class DirectExecutorService extends AbstractListeningExecutorService {

/** Lock used whenever accessing the state variables (runningTasks, shutdown) of the executor */
private final Object lock = new Object();

/*
* Conceptually, these two variables describe the executor being in
* one of three states:
* - Active: shutdown == false
* - Shutdown: runningTasks > 0 and shutdown == true
* - Terminated: runningTasks == 0 and shutdown == true
*/
@GuardedBy("lock")
private int runningTasks = 0;

@GuardedBy("lock")
private boolean shutdown = false;

@Override
public void execute(Runnable command) {
startTask();
try {
command.run();
} finally {
endTask();
}
}

@Override
public boolean isShutdown() {
synchronized (lock) {
return shutdown;
}
}

@Override
public void shutdown() {
synchronized (lock) {
shutdown = true;
if (runningTasks == 0) {
lock.notifyAll();
}
}
}

// See newDirectExecutorService javadoc for unusual behavior of this method.
@Override
public List<Runnable> shutdownNow() {
shutdown();
return Collections.emptyList();
}

@Override
public boolean isTerminated() {
synchronized (lock) {
return shutdown && runningTasks == 0;
}
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
synchronized (lock) {
while (true) {
if (shutdown && runningTasks == 0) {
return true;
} else if (nanos <= 0) {
return false;
} else {
long now = System.nanoTime();
TimeUnit.NANOSECONDS.timedWait(lock, nanos);
nanos -= System.nanoTime() - now; // subtract the actual time we waited
}
}
}
}

/**
* Checks if the executor has been shut down and increments the running task count.
*
* @throws RejectedExecutionException if the executor has been previously shutdown
*/
private void startTask() {
synchronized (lock) {
if (shutdown) {
throw new RejectedExecutionException("Executor already shutdown");
}
runningTasks++;
}
}

/** Decrements the running task count. */
private void endTask() {
synchronized (lock) {
int numRunning = --runningTasks;
if (numRunning == 0) {
lock.notifyAll();
}
}
}
}
107 changes: 0 additions & 107 deletions android/guava/src/com/google/common/util/concurrent/MoreExecutors.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@
import com.google.common.collect.Queues;
import com.google.common.util.concurrent.ForwardingListenableFuture.SimpleForwardingListenableFuture;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.lang.reflect.InvocationTargetException;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
Expand Down Expand Up @@ -239,110 +237,6 @@ private static void useDaemonThreadFactory(ThreadPoolExecutor executor) {
.build());
}

// See newDirectExecutorService javadoc for behavioral notes.
@J2ktIncompatible
@GwtIncompatible // TODO
private static final class DirectExecutorService extends AbstractListeningExecutorService {
/** Lock used whenever accessing the state variables (runningTasks, shutdown) of the executor */
private final Object lock = new Object();

/*
* Conceptually, these two variables describe the executor being in
* one of three states:
* - Active: shutdown == false
* - Shutdown: runningTasks > 0 and shutdown == true
* - Terminated: runningTasks == 0 and shutdown == true
*/
@GuardedBy("lock")
private int runningTasks = 0;

@GuardedBy("lock")
private boolean shutdown = false;

@Override
public void execute(Runnable command) {
startTask();
try {
command.run();
} finally {
endTask();
}
}

@Override
public boolean isShutdown() {
synchronized (lock) {
return shutdown;
}
}

@Override
public void shutdown() {
synchronized (lock) {
shutdown = true;
if (runningTasks == 0) {
lock.notifyAll();
}
}
}

// See newDirectExecutorService javadoc for unusual behavior of this method.
@Override
public List<Runnable> shutdownNow() {
shutdown();
return Collections.emptyList();
}

@Override
public boolean isTerminated() {
synchronized (lock) {
return shutdown && runningTasks == 0;
}
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
synchronized (lock) {
while (true) {
if (shutdown && runningTasks == 0) {
return true;
} else if (nanos <= 0) {
return false;
} else {
long now = System.nanoTime();
TimeUnit.NANOSECONDS.timedWait(lock, nanos);
nanos -= System.nanoTime() - now; // subtract the actual time we waited
}
}
}
}

/**
* Checks if the executor has been shut down and increments the running task count.
*
* @throws RejectedExecutionException if the executor has been previously shutdown
*/
private void startTask() {
synchronized (lock) {
if (shutdown) {
throw new RejectedExecutionException("Executor already shutdown");
}
runningTasks++;
}
}

/** Decrements the running task count. */
private void endTask() {
synchronized (lock) {
int numRunning = --runningTasks;
if (numRunning == 0) {
lock.notifyAll();
}
}
}
}

/**
* Creates an executor service that runs each task in the thread that invokes {@code
* execute/submit}, as in {@code ThreadPoolExecutor.CallerRunsPolicy}. This applies both to
Expand All @@ -369,7 +263,6 @@ private void endTask() {
*
* @since 18.0 (present as MoreExecutors.sameThreadExecutor() since 10.0)
*/
@J2ktIncompatible
@GwtIncompatible // TODO
public static ListeningExecutorService newDirectExecutorService() {
return new DirectExecutorService();
Expand Down
129 changes: 129 additions & 0 deletions guava/src/com/google/common/util/concurrent/DirectExecutorService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Copyright (C) 2024 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package com.google.common.util.concurrent;

import com.google.common.annotations.GwtIncompatible;
import com.google.common.annotations.J2ktIncompatible;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;

/** See newDirectExecutorService javadoc for behavioral notes. */
@J2ktIncompatible // Emulated
@GwtIncompatible
@ElementTypesAreNonnullByDefault
final class DirectExecutorService extends AbstractListeningExecutorService {

/** Lock used whenever accessing the state variables (runningTasks, shutdown) of the executor */
private final Object lock = new Object();

/*
* Conceptually, these two variables describe the executor being in
* one of three states:
* - Active: shutdown == false
* - Shutdown: runningTasks > 0 and shutdown == true
* - Terminated: runningTasks == 0 and shutdown == true
*/
@GuardedBy("lock")
private int runningTasks = 0;

@GuardedBy("lock")
private boolean shutdown = false;

@Override
public void execute(Runnable command) {
startTask();
try {
command.run();
} finally {
endTask();
}
}

@Override
public boolean isShutdown() {
synchronized (lock) {
return shutdown;
}
}

@Override
public void shutdown() {
synchronized (lock) {
shutdown = true;
if (runningTasks == 0) {
lock.notifyAll();
}
}
}

// See newDirectExecutorService javadoc for unusual behavior of this method.
@Override
public List<Runnable> shutdownNow() {
shutdown();
return Collections.emptyList();
}

@Override
public boolean isTerminated() {
synchronized (lock) {
return shutdown && runningTasks == 0;
}
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
synchronized (lock) {
while (true) {
if (shutdown && runningTasks == 0) {
return true;
} else if (nanos <= 0) {
return false;
} else {
long now = System.nanoTime();
TimeUnit.NANOSECONDS.timedWait(lock, nanos);
nanos -= System.nanoTime() - now; // subtract the actual time we waited
}
}
}
}

/**
* Checks if the executor has been shut down and increments the running task count.
*
* @throws RejectedExecutionException if the executor has been previously shutdown
*/
private void startTask() {
synchronized (lock) {
if (shutdown) {
throw new RejectedExecutionException("Executor already shutdown");
}
runningTasks++;
}
}

/** Decrements the running task count. */
private void endTask() {
synchronized (lock) {
int numRunning = --runningTasks;
if (numRunning == 0) {
lock.notifyAll();
}
}
}
}

0 comments on commit 7c934a2

Please sign in to comment.