/
ThreadPoolTaskExecutor.java
399 lines (355 loc) · 13.7 KB
/
ThreadPoolTaskExecutor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
/*
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.scheduling.concurrent;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.springframework.core.task.AsyncListenableTaskExecutor;
import org.springframework.core.task.TaskDecorator;
import org.springframework.core.task.TaskRejectedException;
import org.springframework.lang.Nullable;
import org.springframework.scheduling.SchedulingTaskExecutor;
import org.springframework.util.Assert;
import org.springframework.util.ConcurrentReferenceHashMap;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureTask;
/**
* JavaBean that allows for configuring a {@link java.util.concurrent.ThreadPoolExecutor}
* in bean style (through its "corePoolSize", "maxPoolSize", "keepAliveSeconds", "queueCapacity"
* properties) and exposing it as a Spring {@link org.springframework.core.task.TaskExecutor}.
* This class is also well suited for management and monitoring (e.g. through JMX),
* providing several useful attributes: "corePoolSize", "maxPoolSize", "keepAliveSeconds"
* (all supporting updates at runtime); "poolSize", "activeCount" (for introspection only).
*
* <p>The default configuration is a core pool size of 1, with unlimited max pool size
* and unlimited queue capacity. This is roughly equivalent to
* {@link java.util.concurrent.Executors#newSingleThreadExecutor()}, sharing a single
* thread for all tasks. Setting {@link #setQueueCapacity "queueCapacity"} to 0 mimics
* {@link java.util.concurrent.Executors#newCachedThreadPool()}, with immediate scaling
* of threads in the pool to a potentially very high number. Consider also setting a
* {@link #setMaxPoolSize "maxPoolSize"} at that point, as well as possibly a higher
* {@link #setCorePoolSize "corePoolSize"} (see also the
* {@link #setAllowCoreThreadTimeOut "allowCoreThreadTimeOut"} mode of scaling).
*
* <p><b>NOTE:</b> This class implements Spring's
* {@link org.springframework.core.task.TaskExecutor} interface as well as the
* {@link java.util.concurrent.Executor} interface, with the former being the primary
* interface, the other just serving as secondary convenience. For this reason, the
* exception handling follows the TaskExecutor contract rather than the Executor contract,
* in particular regarding the {@link org.springframework.core.task.TaskRejectedException}.
*
* <p>For an alternative, you may set up a ThreadPoolExecutor instance directly using
* constructor injection, or use a factory method definition that points to the
* {@link java.util.concurrent.Executors} class. To expose such a raw Executor as a
* Spring {@link org.springframework.core.task.TaskExecutor}, simply wrap it with a
* {@link org.springframework.scheduling.concurrent.ConcurrentTaskExecutor} adapter.
*
* @author Juergen Hoeller
* @since 2.0
* @see org.springframework.core.task.TaskExecutor
* @see java.util.concurrent.ThreadPoolExecutor
* @see ThreadPoolExecutorFactoryBean
* @see ConcurrentTaskExecutor
*/
@SuppressWarnings("serial")
public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport
implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {
private final Object poolSizeMonitor = new Object();
private int corePoolSize = 1;
private int maxPoolSize = Integer.MAX_VALUE;
private int keepAliveSeconds = 60;
private int queueCapacity = Integer.MAX_VALUE;
private boolean allowCoreThreadTimeOut = false;
@Nullable
private TaskDecorator taskDecorator;
@Nullable
private ThreadPoolExecutor threadPoolExecutor;
// Runnable decorator to user-level FutureTask, if different
private final Map<Runnable, Object> decoratedTaskMap =
new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.WEAK);
/**
* Set the ThreadPoolExecutor's core pool size.
* Default is 1.
* <p><b>This setting can be modified at runtime, for example through JMX.</b>
*/
public void setCorePoolSize(int corePoolSize) {
synchronized (this.poolSizeMonitor) {
this.corePoolSize = corePoolSize;
if (this.threadPoolExecutor != null) {
this.threadPoolExecutor.setCorePoolSize(corePoolSize);
}
}
}
/**
* Return the ThreadPoolExecutor's core pool size.
*/
public int getCorePoolSize() {
synchronized (this.poolSizeMonitor) {
return this.corePoolSize;
}
}
/**
* Set the ThreadPoolExecutor's maximum pool size.
* Default is {@code Integer.MAX_VALUE}.
* <p><b>This setting can be modified at runtime, for example through JMX.</b>
*/
public void setMaxPoolSize(int maxPoolSize) {
synchronized (this.poolSizeMonitor) {
this.maxPoolSize = maxPoolSize;
if (this.threadPoolExecutor != null) {
this.threadPoolExecutor.setMaximumPoolSize(maxPoolSize);
}
}
}
/**
* Return the ThreadPoolExecutor's maximum pool size.
*/
public int getMaxPoolSize() {
synchronized (this.poolSizeMonitor) {
return this.maxPoolSize;
}
}
/**
* Set the ThreadPoolExecutor's keep-alive seconds.
* Default is 60.
* <p><b>This setting can be modified at runtime, for example through JMX.</b>
*/
public void setKeepAliveSeconds(int keepAliveSeconds) {
synchronized (this.poolSizeMonitor) {
this.keepAliveSeconds = keepAliveSeconds;
if (this.threadPoolExecutor != null) {
this.threadPoolExecutor.setKeepAliveTime(keepAliveSeconds, TimeUnit.SECONDS);
}
}
}
/**
* Return the ThreadPoolExecutor's keep-alive seconds.
*/
public int getKeepAliveSeconds() {
synchronized (this.poolSizeMonitor) {
return this.keepAliveSeconds;
}
}
/**
* Set the capacity for the ThreadPoolExecutor's BlockingQueue.
* Default is {@code Integer.MAX_VALUE}.
* <p>Any positive value will lead to a LinkedBlockingQueue instance;
* any other value will lead to a SynchronousQueue instance.
* @see java.util.concurrent.LinkedBlockingQueue
* @see java.util.concurrent.SynchronousQueue
*/
public void setQueueCapacity(int queueCapacity) {
this.queueCapacity = queueCapacity;
}
/**
* Specify whether to allow core threads to time out. This enables dynamic
* growing and shrinking even in combination with a non-zero queue (since
* the max pool size will only grow once the queue is full).
* <p>Default is "false".
* @see java.util.concurrent.ThreadPoolExecutor#allowCoreThreadTimeOut(boolean)
*/
public void setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) {
this.allowCoreThreadTimeOut = allowCoreThreadTimeOut;
}
/**
* Specify a custom {@link TaskDecorator} to be applied to any {@link Runnable}
* about to be executed.
* <p>Note that such a decorator is not necessarily being applied to the
* user-supplied {@code Runnable}/{@code Callable} but rather to the actual
* execution callback (which may be a wrapper around the user-supplied task).
* <p>The primary use case is to set some execution context around the task's
* invocation, or to provide some monitoring/statistics for task execution.
* <p><b>NOTE:</b> Exception handling in {@code TaskDecorator} implementations
* is limited to plain {@code Runnable} execution via {@code execute} calls.
* In case of {@code #submit} calls, the exposed {@code Runnable} will be a
* {@code FutureTask} which does not propagate any exceptions; you might
* have to cast it and call {@code Future#get} to evaluate exceptions.
* See the {@code ThreadPoolExecutor#afterExecute} javadoc for an example
* of how to access exceptions in such a {@code Future} case.
* @since 4.3
*/
public void setTaskDecorator(TaskDecorator taskDecorator) {
this.taskDecorator = taskDecorator;
}
/**
* Note: This method exposes an {@link ExecutorService} to its base class
* but stores the actual {@link ThreadPoolExecutor} handle internally.
* Do not override this method for replacing the executor, rather just for
* decorating its {@code ExecutorService} handle or storing custom state.
*/
@Override
protected ExecutorService initializeExecutor(
ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);
ThreadPoolExecutor executor;
if (this.taskDecorator != null) {
executor = new ThreadPoolExecutor(
this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
queue, threadFactory, rejectedExecutionHandler) {
@Override
public void execute(Runnable command) {
Runnable decorated = taskDecorator.decorate(command);
if (decorated != command) {
decoratedTaskMap.put(decorated, command);
}
super.execute(decorated);
}
};
}
else {
executor = new ThreadPoolExecutor(
this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
queue, threadFactory, rejectedExecutionHandler);
}
if (this.allowCoreThreadTimeOut) {
executor.allowCoreThreadTimeOut(true);
}
this.threadPoolExecutor = executor;
return executor;
}
/**
* Create the BlockingQueue to use for the ThreadPoolExecutor.
* <p>A LinkedBlockingQueue instance will be created for a positive
* capacity value; a SynchronousQueue else.
* @param queueCapacity the specified queue capacity
* @return the BlockingQueue instance
* @see java.util.concurrent.LinkedBlockingQueue
* @see java.util.concurrent.SynchronousQueue
*/
protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
if (queueCapacity > 0) {
return new LinkedBlockingQueue<>(queueCapacity);
}
else {
return new SynchronousQueue<>();
}
}
/**
* Return the underlying ThreadPoolExecutor for native access.
* @return the underlying ThreadPoolExecutor (never {@code null})
* @throws IllegalStateException if the ThreadPoolTaskExecutor hasn't been initialized yet
*/
public ThreadPoolExecutor getThreadPoolExecutor() throws IllegalStateException {
Assert.state(this.threadPoolExecutor != null, "ThreadPoolTaskExecutor not initialized");
return this.threadPoolExecutor;
}
/**
* Return the current pool size.
* @see java.util.concurrent.ThreadPoolExecutor#getPoolSize()
*/
public int getPoolSize() {
if (this.threadPoolExecutor == null) {
// Not initialized yet: assume core pool size.
return this.corePoolSize;
}
return this.threadPoolExecutor.getPoolSize();
}
/**
* Return the number of currently active threads.
* @see java.util.concurrent.ThreadPoolExecutor#getActiveCount()
*/
public int getActiveCount() {
if (this.threadPoolExecutor == null) {
// Not initialized yet: assume no active threads.
return 0;
}
return this.threadPoolExecutor.getActiveCount();
}
@Override
public void execute(Runnable task) {
Executor executor = getThreadPoolExecutor();
try {
executor.execute(task);
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
}
}
@Override
public void execute(Runnable task, long startTimeout) {
execute(task);
}
@Override
public Future<?> submit(Runnable task) {
ExecutorService executor = getThreadPoolExecutor();
try {
return executor.submit(task);
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
}
}
@Override
public <T> Future<T> submit(Callable<T> task) {
ExecutorService executor = getThreadPoolExecutor();
try {
return executor.submit(task);
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
}
}
@Override
public ListenableFuture<?> submitListenable(Runnable task) {
ExecutorService executor = getThreadPoolExecutor();
try {
ListenableFutureTask<Object> future = new ListenableFutureTask<>(task, null);
executor.execute(future);
return future;
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
}
}
@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
ExecutorService executor = getThreadPoolExecutor();
try {
ListenableFutureTask<T> future = new ListenableFutureTask<>(task);
executor.execute(future);
return future;
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
}
}
@Override
protected void cancelRemainingTask(Runnable task) {
super.cancelRemainingTask(task);
// Cancel associated user-level Future handle as well
Object original = this.decoratedTaskMap.get(task);
if (original instanceof Future) {
((Future<?>) original).cancel(true);
}
}
/**
* This task executor prefers short-lived work units.
*/
@Override
public boolean prefersShortLivedTasks() {
return true;
}
}