/
ConcurrentTaskExecutor.java
251 lines (213 loc) · 9.13 KB
/
ConcurrentTaskExecutor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
/*
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.scheduling.concurrent;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import javax.enterprise.concurrent.ManagedExecutors;
import javax.enterprise.concurrent.ManagedTask;
import org.springframework.core.task.AsyncListenableTaskExecutor;
import org.springframework.core.task.TaskDecorator;
import org.springframework.core.task.support.TaskExecutorAdapter;
import org.springframework.lang.Nullable;
import org.springframework.scheduling.SchedulingAwareRunnable;
import org.springframework.scheduling.SchedulingTaskExecutor;
import org.springframework.util.ClassUtils;
import org.springframework.util.concurrent.ListenableFuture;
/**
* Adapter that takes a {@code java.util.concurrent.Executor} and exposes
* a Spring {@link org.springframework.core.task.TaskExecutor} for it.
* Also detects an extended {@code java.util.concurrent.ExecutorService}, adapting
* the {@link org.springframework.core.task.AsyncTaskExecutor} interface accordingly.
*
* <p>Autodetects a JSR-236 {@link javax.enterprise.concurrent.ManagedExecutorService}
* in order to expose {@link javax.enterprise.concurrent.ManagedTask} adapters for it,
* exposing a long-running hint based on {@link SchedulingAwareRunnable} and an identity
* name based on the given Runnable/Callable's {@code toString()}. For JSR-236 style
* lookup in a Java EE 7 environment, consider using {@link DefaultManagedTaskExecutor}.
*
* <p>Note that there is a pre-built {@link ThreadPoolTaskExecutor} that allows
* for defining a {@link java.util.concurrent.ThreadPoolExecutor} in bean style,
* exposing it as a Spring {@link org.springframework.core.task.TaskExecutor} directly.
* This is a convenient alternative to a raw ThreadPoolExecutor definition with
* a separate definition of the present adapter class.
*
* @author Juergen Hoeller
* @since 2.0
* @see java.util.concurrent.Executor
* @see java.util.concurrent.ExecutorService
* @see java.util.concurrent.ThreadPoolExecutor
* @see java.util.concurrent.Executors
* @see DefaultManagedTaskExecutor
* @see ThreadPoolTaskExecutor
*/
public class ConcurrentTaskExecutor implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {
@Nullable
private static Class<?> managedExecutorServiceClass;
static {
try {
managedExecutorServiceClass = ClassUtils.forName(
"javax.enterprise.concurrent.ManagedExecutorService",
ConcurrentTaskScheduler.class.getClassLoader());
}
catch (ClassNotFoundException ex) {
// JSR-236 API not available...
managedExecutorServiceClass = null;
}
}
private Executor concurrentExecutor;
private TaskExecutorAdapter adaptedExecutor;
/**
* Create a new ConcurrentTaskExecutor, using a single thread executor as default.
* @see java.util.concurrent.Executors#newSingleThreadExecutor()
*/
public ConcurrentTaskExecutor() {
this.concurrentExecutor = Executors.newSingleThreadExecutor();
this.adaptedExecutor = new TaskExecutorAdapter(this.concurrentExecutor);
}
/**
* Create a new ConcurrentTaskExecutor, using the given {@link java.util.concurrent.Executor}.
* <p>Autodetects a JSR-236 {@link javax.enterprise.concurrent.ManagedExecutorService}
* in order to expose {@link javax.enterprise.concurrent.ManagedTask} adapters for it.
* @param executor the {@link java.util.concurrent.Executor} to delegate to
*/
public ConcurrentTaskExecutor(@Nullable Executor executor) {
this.concurrentExecutor = (executor != null ? executor : Executors.newSingleThreadExecutor());
this.adaptedExecutor = getAdaptedExecutor(this.concurrentExecutor);
}
/**
* Specify the {@link java.util.concurrent.Executor} to delegate to.
* <p>Autodetects a JSR-236 {@link javax.enterprise.concurrent.ManagedExecutorService}
* in order to expose {@link javax.enterprise.concurrent.ManagedTask} adapters for it.
*/
public final void setConcurrentExecutor(@Nullable Executor executor) {
this.concurrentExecutor = (executor != null ? executor : Executors.newSingleThreadExecutor());
this.adaptedExecutor = getAdaptedExecutor(this.concurrentExecutor);
}
/**
* Return the {@link java.util.concurrent.Executor} that this adapter delegates to.
*/
public final Executor getConcurrentExecutor() {
return this.concurrentExecutor;
}
/**
* Specify a custom {@link TaskDecorator} to be applied to any {@link Runnable}
* about to be executed.
* <p>Note that such a decorator is not necessarily being applied to the
* user-supplied {@code Runnable}/{@code Callable} but rather to the actual
* execution callback (which may be a wrapper around the user-supplied task).
* <p>The primary use case is to set some execution context around the task's
* invocation, or to provide some monitoring/statistics for task execution.
* <p><b>NOTE:</b> Exception handling in {@code TaskDecorator} implementations
* is limited to plain {@code Runnable} execution via {@code execute} calls.
* In case of {@code #submit} calls, the exposed {@code Runnable} will be a
* {@code FutureTask} which does not propagate any exceptions; you might
* have to cast it and call {@code Future#get} to evaluate exceptions.
* @since 4.3
*/
public final void setTaskDecorator(TaskDecorator taskDecorator) {
this.adaptedExecutor.setTaskDecorator(taskDecorator);
}
@Override
public void execute(Runnable task) {
this.adaptedExecutor.execute(task);
}
@Override
public void execute(Runnable task, long startTimeout) {
this.adaptedExecutor.execute(task, startTimeout);
}
@Override
public Future<?> submit(Runnable task) {
return this.adaptedExecutor.submit(task);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return this.adaptedExecutor.submit(task);
}
@Override
public ListenableFuture<?> submitListenable(Runnable task) {
return this.adaptedExecutor.submitListenable(task);
}
@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
return this.adaptedExecutor.submitListenable(task);
}
private static TaskExecutorAdapter getAdaptedExecutor(Executor concurrentExecutor) {
if (managedExecutorServiceClass != null && managedExecutorServiceClass.isInstance(concurrentExecutor)) {
return new ManagedTaskExecutorAdapter(concurrentExecutor);
}
return new TaskExecutorAdapter(concurrentExecutor);
}
/**
* TaskExecutorAdapter subclass that wraps all provided Runnables and Callables
* with a JSR-236 ManagedTask, exposing a long-running hint based on
* {@link SchedulingAwareRunnable} and an identity name based on the task's
* {@code toString()} representation.
*/
private static class ManagedTaskExecutorAdapter extends TaskExecutorAdapter {
public ManagedTaskExecutorAdapter(Executor concurrentExecutor) {
super(concurrentExecutor);
}
@Override
public void execute(Runnable task) {
super.execute(ManagedTaskBuilder.buildManagedTask(task, task.toString()));
}
@Override
public Future<?> submit(Runnable task) {
return super.submit(ManagedTaskBuilder.buildManagedTask(task, task.toString()));
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return super.submit(ManagedTaskBuilder.buildManagedTask(task, task.toString()));
}
@Override
public ListenableFuture<?> submitListenable(Runnable task) {
return super.submitListenable(ManagedTaskBuilder.buildManagedTask(task, task.toString()));
}
@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
return super.submitListenable(ManagedTaskBuilder.buildManagedTask(task, task.toString()));
}
}
/**
* Delegate that wraps a given Runnable/Callable with a JSR-236 ManagedTask,
* exposing a long-running hint based on {@link SchedulingAwareRunnable}
* and a given identity name.
*/
protected static class ManagedTaskBuilder {
public static Runnable buildManagedTask(Runnable task, String identityName) {
Map<String, String> properties;
if (task instanceof SchedulingAwareRunnable) {
properties = new HashMap<>(4);
properties.put(ManagedTask.LONGRUNNING_HINT,
Boolean.toString(((SchedulingAwareRunnable) task).isLongLived()));
}
else {
properties = new HashMap<>(2);
}
properties.put(ManagedTask.IDENTITY_NAME, identityName);
return ManagedExecutors.managedTask(task, properties, null);
}
public static <T> Callable<T> buildManagedTask(Callable<T> task, String identityName) {
Map<String, String> properties = new HashMap<>(2);
properties.put(ManagedTask.IDENTITY_NAME, identityName);
return ManagedExecutors.managedTask(task, properties, null);
}
}
}