-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
SynchronizationContext.java
250 lines (228 loc) · 9.13 KB
/
SynchronizationContext.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
/*
* Copyright 2018 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.concurrent.ThreadSafe;
/**
* A synchronization context is a queue of tasks that run in sequence. It offers following
* guarantees:
*
* <ul>
* <li>Ordering. Tasks are run in the same order as they are submitted via {@link #execute}
* and {@link #executeLater}.</li>
* <li>Serialization. Tasks are run in sequence and establish a happens-before relationship
* between them. </li>
* <li>Non-reentrancy. If a task running in a synchronization context executes or schedules
* another task in the same synchronization context, the latter task will never run
* inline. It will instead be queued and run only after the current task has returned.</li>
* </ul>
*
* <p>It doesn't own any thread. Tasks are run from caller's or caller-provided threads.
*
* <p>Conceptually, it is fairly accurate to think of {@code SynchronizationContext} like a cheaper
* {@code Executors.newSingleThreadExecutor()} when used for synchronization (not long-running
* tasks). Both use a queue for tasks that are run in order and neither guarantee that tasks have
* completed before returning from {@code execute()}. However, the behavior does diverge if locks
* are held when calling the context. So it is encouraged to avoid mixing locks and synchronization
* context except via {@link #executeLater}.
*
* <p>This class is thread-safe.
*
* @since 1.17.0
*/
@ThreadSafe
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/4984")
public final class SynchronizationContext implements Executor {
private final UncaughtExceptionHandler uncaughtExceptionHandler;
private final Queue<Runnable> queue = new ConcurrentLinkedQueue<>();
private final AtomicReference<Thread> drainingThread = new AtomicReference<>();
/**
* Creates a SynchronizationContext.
*
* @param uncaughtExceptionHandler handles exceptions thrown out of the tasks. Different from
* what's documented on {@link UncaughtExceptionHandler#uncaughtException}, the thread is
* not terminated when the handler is called.
*/
public SynchronizationContext(UncaughtExceptionHandler uncaughtExceptionHandler) {
this.uncaughtExceptionHandler =
checkNotNull(uncaughtExceptionHandler, "uncaughtExceptionHandler");
}
/**
* Run all tasks in the queue in the current thread, if no other thread is running this method.
* Otherwise do nothing.
*
* <p>Upon returning, it guarantees that all tasks submitted by {@code #executeLater} before it
* have been or will eventually be run, while not requiring any more calls to {@code drain()}.
*/
public final void drain() {
do {
if (!drainingThread.compareAndSet(null, Thread.currentThread())) {
return;
}
try {
Runnable runnable;
while ((runnable = queue.poll()) != null) {
try {
runnable.run();
} catch (Throwable t) {
uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), t);
}
}
} finally {
drainingThread.set(null);
}
// must check queue again here to catch any added prior to clearing drainingThread
} while (!queue.isEmpty());
}
/**
* Adds a task that will be run when {@link #drain} is called.
*
* <p>This is useful for cases where you want to enqueue a task while under a lock of your own,
* but don't want the tasks to be run under your lock (for fear of deadlock). You can call {@link
* #executeLater} in the lock, and call {@link #drain} outside the lock.
*/
public final void executeLater(Runnable runnable) {
queue.add(checkNotNull(runnable, "runnable is null"));
}
/**
* Adds a task and run it in this synchronization context as soon as possible. The task may run
* inline. If there are tasks that are previously queued by {@link #executeLater} but have not
* been run, this method will trigger them to be run before the given task. This is equivalent to
* calling {@link #executeLater} immediately followed by {@link #drain}.
*/
@Override
public final void execute(Runnable task) {
executeLater(task);
drain();
}
/**
* Throw {@link IllegalStateException} if this method is not called from this synchronization
* context.
*/
public void throwIfNotInThisSynchronizationContext() {
checkState(Thread.currentThread() == drainingThread.get(),
"Not called from the SynchronizationContext");
}
/**
* Schedules a task to be added and run via {@link #execute} after a delay.
*
* @param task the task being scheduled
* @param delay the delay
* @param unit the time unit for the delay
* @param timerService the {@code ScheduledExecutorService} that provides delayed execution
*
* @return an object for checking the status and/or cancel the scheduled task
*/
public final ScheduledHandle schedule(
final Runnable task, long delay, TimeUnit unit, ScheduledExecutorService timerService) {
final ManagedRunnable runnable = new ManagedRunnable(task);
ScheduledFuture<?> future = timerService.schedule(new Runnable() {
@Override
public void run() {
execute(runnable);
}
@Override
public String toString() {
return task.toString() + "(scheduled in SynchronizationContext)";
}
}, delay, unit);
return new ScheduledHandle(runnable, future);
}
/**
* Schedules a task to be added and run via {@link #execute} after an inital delay and then
* repeated after the delay until cancelled.
*
* @param task the task being scheduled
* @param initialDelay the delay before the first run
* @param delay the delay after the first run.
* @param unit the time unit for the delay
* @param timerService the {@code ScheduledExecutorService} that provides delayed execution
*
* @return an object for checking the status and/or cancel the scheduled task
*/
public final ScheduledHandle scheduleWithFixedDelay(
final Runnable task, long initialDelay, long delay, TimeUnit unit,
ScheduledExecutorService timerService) {
final ManagedRunnable runnable = new ManagedRunnable(task);
ScheduledFuture<?> future = timerService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
execute(runnable);
}
@Override
public String toString() {
return task.toString() + "(scheduled in SynchronizationContext with delay of " + delay
+ ")";
}
}, initialDelay, delay, unit);
return new ScheduledHandle(runnable, future);
}
private static class ManagedRunnable implements Runnable {
final Runnable task;
boolean isCancelled;
boolean hasStarted;
ManagedRunnable(Runnable task) {
this.task = checkNotNull(task, "task");
}
@Override
public void run() {
// The task may have been cancelled after timerService calls SynchronizationContext.execute()
// but before the runnable is actually run. We must guarantee that the task will not be run
// in this case.
if (!isCancelled) {
hasStarted = true;
task.run();
}
}
}
/**
* Allows the user to check the status and/or cancel a task scheduled by {@link #schedule}.
*
* <p>This class is NOT thread-safe. All methods must be run from the same {@link
* SynchronizationContext} as which the task was scheduled in.
*/
public static final class ScheduledHandle {
private final ManagedRunnable runnable;
private final ScheduledFuture<?> future;
private ScheduledHandle(ManagedRunnable runnable, ScheduledFuture<?> future) {
this.runnable = checkNotNull(runnable, "runnable");
this.future = checkNotNull(future, "future");
}
/**
* Cancel the task if it's still {@link #isPending pending}.
*/
public void cancel() {
runnable.isCancelled = true;
future.cancel(false);
}
/**
* Returns {@code true} if the task will eventually run, meaning that it has neither started
* running nor been cancelled.
*/
public boolean isPending() {
return !(runnable.hasStarted || runnable.isCancelled);
}
}
}