Skip to content

Commit

Permalink
Avoid use of global AtomicLong for ScheduledFutureTask ids (#9599)
Browse files Browse the repository at this point in the history
Motivation

Currently a static AtomicLong is used to allocate a unique id whenever a
task is scheduled to any event loop. This could be a source of
contention if delayed tasks are scheduled at a high frequency and can be
easily avoided by having a non-volatile id counter per queue.

Modifications

- Replace static AtomicLong ScheduledFutureTask#nextTaskId with a long
field in AbstractScheduledExecutorService
- Set ScheduledFutureTask#id based on this when adding the task to the
queue (in event loop) instead of at construction time
- Add simple benchmark

Result

Less contention / cache-miss possibility when scheduling future tasks

Before:

Benchmark      (num)   Mode  Cnt    Score    Error  Units
scheduleLots  100000  thrpt   20  346.008 ± 21.931  ops/s

Benchmark      (num)   Mode  Cnt    Score    Error  Units
scheduleLots  100000  thrpt   20  654.824 ± 22.064  ops/s
  • Loading branch information
njhill authored and normanmaurer committed Sep 25, 2019
1 parent 86ff76a commit 2791f0f
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 8 deletions.
Expand Up @@ -39,6 +39,8 @@ public int compare(ScheduledFutureTask<?> o1, ScheduledFutureTask<?> o2) {

PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;

long nextTaskId;

protected AbstractScheduledEventExecutor() {
}

Expand Down Expand Up @@ -241,12 +243,12 @@ protected void validateScheduled(long amount, TimeUnit unit) {

private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
if (inEventLoop()) {
scheduledTaskQueue().add(task);
scheduledTaskQueue().add(task.setId(nextTaskId++));
} else {
executeScheduledRunnable(new Runnable() {
@Override
public void run() {
scheduledTaskQueue().add(task);
scheduledTaskQueue().add(task.setId(nextTaskId++));
}
}, true, task.deadlineNanos());
}
Expand Down
Expand Up @@ -23,11 +23,9 @@
import java.util.concurrent.Callable;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

@SuppressWarnings("ComparableImplementedButEqualsNotOverridden")
final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V>, PriorityQueueNode {
private static final AtomicLong nextTaskId = new AtomicLong();
private static final long START_TIME = System.nanoTime();

static long nanoTime() {
Expand All @@ -44,7 +42,9 @@ static long initialNanoTime() {
return START_TIME;
}

private final long id = nextTaskId.getAndIncrement();
// set once when added to priority queue
private long id;

private long deadlineNanos;
/* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */
private final long periodNanos;
Expand Down Expand Up @@ -79,6 +79,11 @@ static long initialNanoTime() {
periodNanos = 0;
}

ScheduledFutureTask<V> setId(long id) {
this.id = id;
return this;
}

@Override
protected EventExecutor executor() {
return super.executor();
Expand Down Expand Up @@ -182,9 +187,7 @@ protected StringBuilder toStringBuilder() {
StringBuilder buf = super.toStringBuilder();
buf.setCharAt(buf.length() - 1, ',');

return buf.append(" id: ")
.append(id)
.append(", deadline: ")
return buf.append(" deadline: ")
.append(deadlineNanos)
.append(", period: ")
.append(periodNanos)
Expand Down
@@ -0,0 +1,88 @@
/*
* Copyright 2019 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.concurrent;

import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;

import io.netty.channel.DefaultEventLoop;
import io.netty.microbench.util.AbstractMicrobenchmark;

@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 10, time = 1, timeUnit = TimeUnit.SECONDS)
@State(Scope.Benchmark)
public class ScheduleFutureTaskBenchmark extends AbstractMicrobenchmark {

static final Callable<Void> NO_OP = new Callable<Void>() {
@Override
public Void call() throws Exception {
return null;
}
};

@State(Scope.Thread)
public static class ThreadState {

@Param({ "100000" })
int num;

AbstractScheduledEventExecutor eventLoop;

@Setup(Level.Trial)
public void reset() {
eventLoop = new DefaultEventLoop();
}

@Setup(Level.Invocation)
public void clear() {
eventLoop.submit(new Runnable() {
@Override
public void run() {
eventLoop.cancelScheduledTasks();
}
}).awaitUninterruptibly();
}

@TearDown(Level.Trial)
public void shutdown() {
eventLoop.shutdownGracefully().awaitUninterruptibly();
}
}

@Benchmark
@Threads(3)
public Future<?> scheduleLots(final ThreadState threadState) {
return threadState.eventLoop.submit(new Runnable() {
@Override
public void run() {
for (int i = 1; i <= threadState.num; i++) {
threadState.eventLoop.schedule(NO_OP, i, TimeUnit.HOURS);
}
}
}).syncUninterruptibly();
}
}
@@ -0,0 +1,19 @@
/*
* Copyright 2019 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
/**
* Benchmarks for {@link io.netty.util.concurrent}.
*/
package io.netty.util.concurrent;

0 comments on commit 2791f0f

Please sign in to comment.