Skip to content

Commit

Permalink
Share netty event loops between transports (#46346)
Browse files Browse the repository at this point in the history
Currently Elasticsearch creates independent event loop groups for each
transport (http and internal) transport type. This is unnecessary and
can lead to contention when different threads access shared resources
(ex: allocators). This commit moves to a model where, by default, the
event loops are shared between the transports. The previous behavior can
be attained by specifically setting the http worker count.
  • Loading branch information
Tim-Brooks committed May 11, 2020
1 parent f61d3d8 commit 0bf4be7
Show file tree
Hide file tree
Showing 17 changed files with 328 additions and 75 deletions.
Expand Up @@ -29,7 +29,6 @@
import io.netty.channel.ChannelOption;
import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.http.HttpContentCompressor;
Expand Down Expand Up @@ -62,14 +61,14 @@
import org.elasticsearch.http.HttpServerChannel;
import org.elasticsearch.http.netty4.cors.Netty4CorsHandler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.SharedGroupFactory;
import org.elasticsearch.transport.NettyAllocator;
import org.elasticsearch.transport.netty4.Netty4Utils;

import java.net.InetSocketAddress;
import java.net.SocketOption;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CHUNK_SIZE;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_HEADER_SIZE;
Expand Down Expand Up @@ -126,9 +125,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
// Netty's CompositeByteBuf implementation does not allow less than two components.
}, s -> Setting.parseInt(s, 2, Integer.MAX_VALUE, SETTING_KEY_HTTP_NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS), Property.NodeScope);

public static final Setting<Integer> SETTING_HTTP_WORKER_COUNT = new Setting<>("http.netty.worker_count",
(s) -> Integer.toString(EsExecutors.allocatedProcessors(s) * 2),
(s) -> Setting.parseInt(s, 1, "http.netty.worker_count"), Property.NodeScope);
public static final Setting<Integer> SETTING_HTTP_WORKER_COUNT = Setting.intSetting("http.netty.worker_count", 0, Property.NodeScope);

public static final Setting<ByteSizeValue> SETTING_HTTP_NETTY_RECEIVE_PREDICTOR_SIZE =
Setting.byteSizeSetting("http.netty.receive_predictor_size", new ByteSizeValue(64, ByteSizeUnit.KB), Property.NodeScope);
Expand All @@ -137,29 +134,30 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
private final ByteSizeValue maxHeaderSize;
private final ByteSizeValue maxChunkSize;

private final int workerCount;

private final int pipeliningMaxEvents;

private final SharedGroupFactory sharedGroupFactory;
private final RecvByteBufAllocator recvByteBufAllocator;
private final int readTimeoutMillis;

private final int maxCompositeBufferComponents;

private volatile ServerBootstrap serverBootstrap;
private volatile SharedGroupFactory.SharedGroup sharedGroup;

public Netty4HttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool,
NamedXContentRegistry xContentRegistry, Dispatcher dispatcher, ClusterSettings clusterSettings) {
NamedXContentRegistry xContentRegistry, Dispatcher dispatcher, ClusterSettings clusterSettings,
SharedGroupFactory sharedGroupFactory) {
super(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher, clusterSettings);
Netty4Utils.setAvailableProcessors(EsExecutors.NODE_PROCESSORS_SETTING.get(settings));
this.sharedGroupFactory = sharedGroupFactory;

this.maxChunkSize = SETTING_HTTP_MAX_CHUNK_SIZE.get(settings);
this.maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings);
this.maxInitialLineLength = SETTING_HTTP_MAX_INITIAL_LINE_LENGTH.get(settings);
this.pipeliningMaxEvents = SETTING_PIPELINING_MAX_EVENTS.get(settings);

this.maxCompositeBufferComponents = SETTING_HTTP_NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS.get(settings);
this.workerCount = SETTING_HTTP_WORKER_COUNT.get(settings);

this.readTimeoutMillis = Math.toIntExact(SETTING_HTTP_READ_TIMEOUT.get(settings).getMillis());

Expand All @@ -180,10 +178,10 @@ public Settings settings() {
protected void doStart() {
boolean success = false;
try {
sharedGroup = sharedGroupFactory.getHttpGroup();
serverBootstrap = new ServerBootstrap();

serverBootstrap.group(new NioEventLoopGroup(workerCount, daemonThreadFactory(settings,
HTTP_SERVER_WORKER_THREAD_NAME_PREFIX)));
serverBootstrap.group(sharedGroup.getLowLevelGroup());

// NettyAllocator will return the channel type designed to work with the configuredAllocator
serverBootstrap.channel(NettyAllocator.getServerChannelType());
Expand Down Expand Up @@ -260,9 +258,9 @@ protected HttpServerChannel bind(InetSocketAddress socketAddress) throws Excepti

@Override
protected void stopInternal() {
if (serverBootstrap != null) {
serverBootstrap.config().group().shutdownGracefully(0, 5, TimeUnit.SECONDS).awaitUninterruptibly();
serverBootstrap = null;
if (sharedGroup != null) {
sharedGroup.shutdown();
sharedGroup = null;
}
}

Expand Down
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.transport;

import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkModule;
Expand Down Expand Up @@ -48,6 +49,8 @@ public class Netty4Plugin extends Plugin implements NetworkPlugin {
public static final String NETTY_TRANSPORT_NAME = "netty4";
public static final String NETTY_HTTP_TRANSPORT_NAME = "netty4";

private final SetOnce<SharedGroupFactory> groupFactory = new SetOnce<>();

@Override
public List<Setting<?>> getSettings() {
return Arrays.asList(
Expand Down Expand Up @@ -77,7 +80,7 @@ public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadP
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) {
return Collections.singletonMap(NETTY_TRANSPORT_NAME, () -> new Netty4Transport(settings, Version.CURRENT, threadPool,
networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService));
networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, getSharedGroupFactory(settings)));
}

@Override
Expand All @@ -90,6 +93,17 @@ public Map<String, Supplier<HttpServerTransport>> getHttpTransports(Settings set
ClusterSettings clusterSettings) {
return Collections.singletonMap(NETTY_HTTP_TRANSPORT_NAME,
() -> new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher,
clusterSettings));
clusterSettings, getSharedGroupFactory(settings)));
}

private SharedGroupFactory getSharedGroupFactory(Settings settings) {
SharedGroupFactory groupFactory = this.groupFactory.get();
if (groupFactory != null) {
assert groupFactory.getSettings().equals(settings) : "Different settings than originally provided";
return groupFactory;
} else {
this.groupFactory.set(new SharedGroupFactory(settings));
return this.groupFactory.get();
}
}
}
@@ -0,0 +1,141 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.transport;

import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.Future;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.http.netty4.Netty4HttpServerTransport;
import org.elasticsearch.transport.netty4.Netty4Transport;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;

/**
* Creates and returns {@link io.netty.channel.EventLoopGroup} instances. It will return a shared group for
* both {@link #getHttpGroup()} and {@link #getTransportGroup()} if
* {@link org.elasticsearch.http.netty4.Netty4HttpServerTransport#SETTING_HTTP_WORKER_COUNT} is configured to be 0.
* If that setting is not 0, then it will return a different group in the {@link #getHttpGroup()} call.
*/
public final class SharedGroupFactory {

private static final Logger logger = LogManager.getLogger(SharedGroupFactory.class);

private final Settings settings;
private final int workerCount;
private final int httpWorkerCount;

private RefCountedGroup genericGroup;
private SharedGroup dedicatedHttpGroup;

public SharedGroupFactory(Settings settings) {
this.settings = settings;
this.workerCount = Netty4Transport.WORKER_COUNT.get(settings);
this.httpWorkerCount = Netty4HttpServerTransport.SETTING_HTTP_WORKER_COUNT.get(settings);
}

public Settings getSettings() {
return settings;
}

public int getTransportWorkerCount() {
return workerCount;
}

public synchronized SharedGroup getTransportGroup() {
return getGenericGroup();
}

public synchronized SharedGroup getHttpGroup() {
if (httpWorkerCount == 0) {
return getGenericGroup();
} else {
if (dedicatedHttpGroup == null) {
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(httpWorkerCount,
daemonThreadFactory(settings, HttpServerTransport.HTTP_SERVER_WORKER_THREAD_NAME_PREFIX));
dedicatedHttpGroup = new SharedGroup(new RefCountedGroup(eventLoopGroup));
}
return dedicatedHttpGroup;
}
}

private SharedGroup getGenericGroup() {
if (genericGroup == null) {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup(workerCount,
daemonThreadFactory(settings, TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX));
this.genericGroup = new RefCountedGroup(eventLoopGroup);
} else {
genericGroup.incRef();
}
return new SharedGroup(genericGroup);
}

private static class RefCountedGroup extends AbstractRefCounted {

public static final String NAME = "ref-counted-event-loop-group";
private final EventLoopGroup eventLoopGroup;

private RefCountedGroup(EventLoopGroup eventLoopGroup) {
super(NAME);
this.eventLoopGroup = eventLoopGroup;
}

@Override
protected void closeInternal() {
Future<?> shutdownFuture = eventLoopGroup.shutdownGracefully(0, 5, TimeUnit.SECONDS);
shutdownFuture.awaitUninterruptibly();
if (shutdownFuture.isSuccess() == false) {
logger.warn("Error closing netty event loop group", shutdownFuture.cause());
}
}
}

/**
* Wraps the {@link RefCountedGroup}. Calls {@link RefCountedGroup#decRef()} on close. After close,
* this wrapped instance can no longer be used.
*/
public static class SharedGroup {

private final RefCountedGroup refCountedGroup;

private final AtomicBoolean isOpen = new AtomicBoolean(true);

private SharedGroup(RefCountedGroup refCountedGroup) {
this.refCountedGroup = refCountedGroup;
}

public EventLoopGroup getLowLevelGroup() {
return refCountedGroup.eventLoopGroup;
}

public void shutdown() {
if (isOpen.compareAndSet(true, false)) {
refCountedGroup.decRef();
}
}
}
}

0 comments on commit 0bf4be7

Please sign in to comment.