/
KQueueEventLoop.java
398 lines (355 loc) · 16.3 KB
/
KQueueEventLoop.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.kqueue;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.EventLoopTaskQueueFactory;
import io.netty.channel.SelectStrategy;
import io.netty.channel.SingleThreadEventLoop;
import io.netty.channel.kqueue.AbstractKQueueChannel.AbstractKQueueUnsafe;
import io.netty.channel.unix.FileDescriptor;
import io.netty.channel.unix.IovArray;
import io.netty.util.IntSupplier;
import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap;
import io.netty.util.concurrent.RejectedExecutionHandler;
import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import static java.lang.Math.min;
/**
* {@link EventLoop} which uses kqueue under the covers. Only works on BSD!
*/
final class KQueueEventLoop extends SingleThreadEventLoop {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(KQueueEventLoop.class);
private static final AtomicIntegerFieldUpdater<KQueueEventLoop> WAKEN_UP_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(KQueueEventLoop.class, "wakenUp");
private static final int KQUEUE_WAKE_UP_IDENT = 0;
static {
// Ensure JNI is initialized by the time this class is loaded by this time!
// We use unix-common methods in this class which are backed by JNI methods.
KQueue.ensureAvailability();
}
private final boolean allowGrowing;
private final FileDescriptor kqueueFd;
private final KQueueEventArray changeList;
private final KQueueEventArray eventList;
private final SelectStrategy selectStrategy;
private final IovArray iovArray = new IovArray();
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
return kqueueWaitNow();
}
};
private final IntObjectMap<AbstractKQueueChannel> channels = new IntObjectHashMap<AbstractKQueueChannel>(4096);
private volatile int wakenUp;
private volatile int ioRatio = 50;
KQueueEventLoop(EventLoopGroup parent, Executor executor, int maxEvents,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {
super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
rejectedExecutionHandler);
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
this.kqueueFd = Native.newKQueue();
if (maxEvents == 0) {
allowGrowing = true;
maxEvents = 4096;
} else {
allowGrowing = false;
}
this.changeList = new KQueueEventArray(maxEvents);
this.eventList = new KQueueEventArray(maxEvents);
int result = Native.keventAddUserEvent(kqueueFd.intValue(), KQUEUE_WAKE_UP_IDENT);
if (result < 0) {
cleanup();
throw new IllegalStateException("kevent failed to add user event with errno: " + (-result));
}
}
private static Queue<Runnable> newTaskQueue(
EventLoopTaskQueueFactory queueFactory) {
if (queueFactory == null) {
return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
}
return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
}
void add(AbstractKQueueChannel ch) {
assert inEventLoop();
AbstractKQueueChannel old = channels.put(ch.fd().intValue(), ch);
// We either expect to have no Channel in the map with the same FD or that the FD of the old Channel is already
// closed.
assert old == null || !old.isOpen();
}
void evSet(AbstractKQueueChannel ch, short filter, short flags, int fflags) {
assert inEventLoop();
changeList.evSet(ch, filter, flags, fflags);
}
void remove(AbstractKQueueChannel ch) throws Exception {
assert inEventLoop();
int fd = ch.fd().intValue();
AbstractKQueueChannel old = channels.remove(fd);
if (old != null && old != ch) {
// The Channel mapping was already replaced due FD reuse, put back the stored Channel.
channels.put(fd, old);
// If we found another Channel in the map that is mapped to the same FD the given Channel MUST be closed.
assert !ch.isOpen();
} else if (ch.isOpen()) {
// Remove the filters. This is only needed if it's still open as otherwise it will be automatically
// removed once the file-descriptor is closed.
//
// See also https://www.freebsd.org/cgi/man.cgi?query=kqueue&sektion=2
ch.unregisterFilters();
}
}
/**
* Return a cleared {@link IovArray} that can be used for writes in this {@link EventLoop}.
*/
IovArray cleanArray() {
iovArray.clear();
return iovArray;
}
@Override
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && WAKEN_UP_UPDATER.compareAndSet(this, 0, 1)) {
wakeup();
}
}
private void wakeup() {
Native.keventTriggerUserEvent(kqueueFd.intValue(), KQUEUE_WAKE_UP_IDENT);
// Note that the result may return an error (e.g. errno = EBADF after the event loop has been shutdown).
// So it is not very practical to assert the return value is always >= 0.
}
private int kqueueWait(boolean oldWakeup) throws IOException {
// If a task was submitted when wakenUp value was 1, the task didn't get a chance to produce wakeup event.
// So we need to check task queue again before calling kqueueWait. If we don't, the task might be pended
// until kqueueWait was timed out. It might be pended until idle timeout if IdleStateHandler existed
// in pipeline.
if (oldWakeup && hasTasks()) {
return kqueueWaitNow();
}
long totalDelay = delayNanos(System.nanoTime());
int delaySeconds = (int) min(totalDelay / 1000000000L, Integer.MAX_VALUE);
return kqueueWait(delaySeconds, (int) min(totalDelay - delaySeconds * 1000000000L, Integer.MAX_VALUE));
}
private int kqueueWaitNow() throws IOException {
return kqueueWait(0, 0);
}
private int kqueueWait(int timeoutSec, int timeoutNs) throws IOException {
int numEvents = Native.keventWait(kqueueFd.intValue(), changeList, eventList, timeoutSec, timeoutNs);
changeList.clear();
return numEvents;
}
private void processReady(int ready) {
for (int i = 0; i < ready; ++i) {
final short filter = eventList.filter(i);
final short flags = eventList.flags(i);
final int fd = eventList.fd(i);
if (filter == Native.EVFILT_USER || (flags & Native.EV_ERROR) != 0) {
// EV_ERROR is returned if the FD is closed synchronously (which removes from kqueue) and then
// we later attempt to delete the filters from kqueue.
assert filter != Native.EVFILT_USER ||
(filter == Native.EVFILT_USER && fd == KQUEUE_WAKE_UP_IDENT);
continue;
}
AbstractKQueueChannel channel = channels.get(fd);
if (channel == null) {
// This may happen if the channel has already been closed, and it will be removed from kqueue anyways.
// We also handle EV_ERROR above to skip this even early if it is a result of a referencing a closed and
// thus removed from kqueue FD.
logger.warn("events[{}]=[{}, {}] had no channel!", i, eventList.fd(i), filter);
continue;
}
AbstractKQueueUnsafe unsafe = (AbstractKQueueUnsafe) channel.unsafe();
// First check for EPOLLOUT as we may need to fail the connect ChannelPromise before try
// to read from the file descriptor.
if (filter == Native.EVFILT_WRITE) {
unsafe.writeReady();
} else if (filter == Native.EVFILT_READ) {
// Check READ before EOF to ensure all data is read before shutting down the input.
unsafe.readReady(eventList.data(i));
} else if (filter == Native.EVFILT_SOCK && (eventList.fflags(i) & Native.NOTE_RDHUP) != 0) {
unsafe.readEOF();
}
// Check if EV_EOF was set, this will notify us for connection-reset in which case
// we may close the channel directly or try to read more data depending on the state of the
// Channel and also depending on the AbstractKQueueChannel subtype.
if ((flags & Native.EV_EOF) != 0) {
unsafe.readEOF();
}
}
}
@Override
protected void run() {
for (;;) {
try {
int strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with kqueue
case SelectStrategy.SELECT:
strategy = kqueueWait(WAKEN_UP_UPDATER.getAndSet(this, 0) == 1);
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).
if (wakenUp == 1) {
wakeup();
}
// fallthrough
default:
}
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
if (strategy > 0) {
processReady(strategy);
}
} finally {
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
if (strategy > 0) {
processReady(strategy);
}
} finally {
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
if (allowGrowing && strategy == eventList.capacity()) {
//increase the size of the array as we needed the whole space for the events
eventList.realloc(false);
}
} catch (Error e) {
throw e;
} catch (Throwable t) {
handleLoopException(t);
} finally {
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
break;
}
}
} catch (Error e) {
throw e;
} catch (Throwable t) {
handleLoopException(t);
}
}
}
}
@Override
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
return newTaskQueue0(maxPendingTasks);
}
private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
// This event loop never calls takeTask()
return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
: PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
}
/**
* Returns the percentage of the desired amount of time spent for I/O in the event loop.
*/
public int getIoRatio() {
return ioRatio;
}
/**
* Sets the percentage of the desired amount of time spent for I/O in the event loop. The default value is
* {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks.
*/
public void setIoRatio(int ioRatio) {
if (ioRatio <= 0 || ioRatio > 100) {
throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
}
this.ioRatio = ioRatio;
}
@Override
public int registeredChannels() {
return channels.size();
}
@Override
protected void cleanup() {
try {
try {
kqueueFd.close();
} catch (IOException e) {
logger.warn("Failed to close the kqueue fd.", e);
}
} finally {
// Cleanup all native memory!
changeList.free();
eventList.free();
}
}
private void closeAll() {
try {
kqueueWaitNow();
} catch (IOException e) {
// ignore on close
}
// Using the intermediate collection to prevent ConcurrentModificationException.
// In the `close()` method, the channel is deleted from `channels` map.
AbstractKQueueChannel[] localChannels = channels.values().toArray(new AbstractKQueueChannel[0]);
for (AbstractKQueueChannel ch: localChannels) {
ch.unsafe().close(ch.unsafe().voidPromise());
}
}
private static void handleLoopException(Throwable t) {
logger.warn("Unexpected exception in the selector loop.", t);
// Prevent possible consecutive immediate failures that lead to
// excessive CPU consumption.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore.
}
}
}