Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Share netty event loops between transports #46346

Merged
merged 10 commits into from May 11, 2020
Expand Up @@ -30,7 +30,6 @@
import io.netty.channel.ChannelOption;
import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.ByteToMessageDecoder;
Expand Down Expand Up @@ -64,13 +63,13 @@
import org.elasticsearch.http.netty4.cors.Netty4CorsHandler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.CopyBytesServerSocketChannel;
import org.elasticsearch.transport.SharedGroupFactory;
import org.elasticsearch.transport.netty4.Netty4Utils;

import java.net.InetSocketAddress;
import java.net.SocketOption;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CHUNK_SIZE;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_HEADER_SIZE;
Expand Down Expand Up @@ -127,9 +126,8 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
// Netty's CompositeByteBuf implementation does not allow less than two components.
}, s -> Setting.parseInt(s, 2, Integer.MAX_VALUE, SETTING_KEY_HTTP_NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS), Property.NodeScope);

public static final Setting<Integer> SETTING_HTTP_WORKER_COUNT = new Setting<>("http.netty.worker_count",
(s) -> Integer.toString(EsExecutors.numberOfProcessors(s) * 2),
(s) -> Setting.parseInt(s, 1, "http.netty.worker_count"), Property.NodeScope);
public static final Setting<Integer> SETTING_HTTP_WORKER_COUNT = new Setting<>("http.netty.worker_count", "0",
(s) -> Setting.parseInt(s, 0, "http.netty.worker_count"), Property.NodeScope);
Tim-Brooks marked this conversation as resolved.
Show resolved Hide resolved

public static final Setting<ByteSizeValue> SETTING_HTTP_NETTY_RECEIVE_PREDICTOR_SIZE =
Setting.byteSizeSetting("http.netty.receive_predictor_size", new ByteSizeValue(64, ByteSizeUnit.KB), Property.NodeScope);
Expand All @@ -138,29 +136,30 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
private final ByteSizeValue maxHeaderSize;
private final ByteSizeValue maxChunkSize;

private final int workerCount;

private final int pipeliningMaxEvents;

private final SharedGroupFactory sharedGroupFactory;
private final RecvByteBufAllocator recvByteBufAllocator;
private final int readTimeoutMillis;

private final int maxCompositeBufferComponents;

private volatile ServerBootstrap serverBootstrap;
private volatile SharedGroupFactory.SharedGroup sharedGroup;

public Netty4HttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool,
NamedXContentRegistry xContentRegistry, Dispatcher dispatcher) {
NamedXContentRegistry xContentRegistry, Dispatcher dispatcher,
SharedGroupFactory sharedGroupFactory) {
super(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher);
Netty4Utils.setAvailableProcessors(EsExecutors.NODE_PROCESSORS_SETTING.get(settings));
this.sharedGroupFactory = sharedGroupFactory;

this.maxChunkSize = SETTING_HTTP_MAX_CHUNK_SIZE.get(settings);
this.maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings);
this.maxInitialLineLength = SETTING_HTTP_MAX_INITIAL_LINE_LENGTH.get(settings);
this.pipeliningMaxEvents = SETTING_PIPELINING_MAX_EVENTS.get(settings);

this.maxCompositeBufferComponents = SETTING_HTTP_NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS.get(settings);
this.workerCount = SETTING_HTTP_WORKER_COUNT.get(settings);

this.readTimeoutMillis = Math.toIntExact(SETTING_HTTP_READ_TIMEOUT.get(settings).getMillis());

Expand All @@ -181,10 +180,10 @@ public Settings settings() {
protected void doStart() {
boolean success = false;
try {
sharedGroup = sharedGroupFactory.getHttpGroup();
serverBootstrap = new ServerBootstrap();

serverBootstrap.group(new NioEventLoopGroup(workerCount, daemonThreadFactory(settings,
HTTP_SERVER_WORKER_THREAD_NAME_PREFIX)));
serverBootstrap.group(sharedGroup.getLowLevelGroup());

// If direct buffer pooling is disabled, use the CopyBytesServerSocketChannel which will create child
// channels of type CopyBytesSocketChannel. CopyBytesSocketChannel pool a single direct buffer
Expand Down Expand Up @@ -263,9 +262,9 @@ protected HttpServerChannel bind(InetSocketAddress socketAddress) throws Excepti

@Override
protected void stopInternal() {
if (serverBootstrap != null) {
serverBootstrap.config().group().shutdownGracefully(0, 5, TimeUnit.SECONDS).awaitUninterruptibly();
serverBootstrap = null;
if (sharedGroup != null) {
sharedGroup.shutdownGracefully();
sharedGroup = null;
}
}

Expand Down
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.transport;

import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkModule;
Expand Down Expand Up @@ -47,6 +48,8 @@ public class Netty4Plugin extends Plugin implements NetworkPlugin {
public static final String NETTY_TRANSPORT_NAME = "netty4";
public static final String NETTY_HTTP_TRANSPORT_NAME = "netty4";

private final SetOnce<SharedGroupFactory> groupFactory = new SetOnce<>();

@Override
public List<Setting<?>> getSettings() {
return Arrays.asList(
Expand Down Expand Up @@ -76,7 +79,7 @@ public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadP
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) {
return Collections.singletonMap(NETTY_TRANSPORT_NAME, () -> new Netty4Transport(settings, Version.CURRENT, threadPool,
networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService));
networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, getSharedGroupFactory(settings)));
}

@Override
Expand All @@ -86,7 +89,18 @@ public Map<String, Supplier<HttpServerTransport>> getHttpTransports(Settings set
NamedXContentRegistry xContentRegistry,
NetworkService networkService,
HttpServerTransport.Dispatcher dispatcher) {
return Collections.singletonMap(NETTY_HTTP_TRANSPORT_NAME,
() -> new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher));
return Collections.singletonMap(NETTY_HTTP_TRANSPORT_NAME, () -> new Netty4HttpServerTransport(settings, networkService, bigArrays,
threadPool, xContentRegistry, dispatcher, getSharedGroupFactory(settings)));
}

private synchronized SharedGroupFactory getSharedGroupFactory(Settings settings) {
Tim-Brooks marked this conversation as resolved.
Show resolved Hide resolved
SharedGroupFactory groupFactory = this.groupFactory.get();
if (groupFactory != null) {
assert groupFactory.getSettings().equals(settings) : "Different settings than originally provided";
return groupFactory;
} else {
this.groupFactory.set(new SharedGroupFactory(settings));
return this.groupFactory.get();
}
}
}
@@ -0,0 +1,155 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.transport;

import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.http.netty4.Netty4HttpServerTransport;
import org.elasticsearch.transport.netty4.Netty4Transport;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;

/**
* Creates and returns {@link io.netty.channel.EventLoopGroup} instances. It will return a shared group for
* both {@link #getHttpGroup()} and {@link #getTransportGroup()} if
* {@link org.elasticsearch.http.netty4.Netty4HttpServerTransport#SETTING_HTTP_WORKER_COUNT} is configured to be 0.
* If that setting is not 0, then it will return a different group in the {@link #getHttpGroup()} call.
*/
public final class SharedGroupFactory {

private final Settings settings;
private final int workerCount;
private final int httpWorkerCount;

private RefCountedGroup refCountedGroup;

public SharedGroupFactory(Settings settings) {
this.settings = settings;
this.workerCount = Netty4Transport.WORKER_COUNT.get(settings);
Tim-Brooks marked this conversation as resolved.
Show resolved Hide resolved
this.httpWorkerCount = Netty4HttpServerTransport.SETTING_HTTP_WORKER_COUNT.get(settings);
}

public Settings getSettings() {
return settings;
}

public int getHttpWorkerCount() {
if (httpWorkerCount == 0) {
return workerCount;
} else {
return httpWorkerCount;
}
}

public int getTransportWorkerCount() {
return workerCount;
}

public synchronized SharedGroup getTransportGroup() {
return getGenericGroup();
}

public synchronized SharedGroup getHttpGroup() {
if (httpWorkerCount == 0) {
return getGenericGroup();
} else {
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(httpWorkerCount,
Tim-Brooks marked this conversation as resolved.
Show resolved Hide resolved
daemonThreadFactory(settings, HttpServerTransport.HTTP_SERVER_WORKER_THREAD_NAME_PREFIX));

return new SharedGroup(new RefCountedGroup(eventLoopGroup));
}
}

private SharedGroup getGenericGroup() {
if (refCountedGroup == null) {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup(workerCount,
daemonThreadFactory(settings, TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX));
this.refCountedGroup = new RefCountedGroup(eventLoopGroup);
return new SharedGroup(refCountedGroup);
} else {
refCountedGroup.incRef();
return new SharedGroup(refCountedGroup);
}
}

private static class RefCountedGroup extends AbstractRefCounted {

public static final String NAME = "ref-counted-event-loop-group";
private final EventLoopGroup eventLoopGroup;

private RefCountedGroup(EventLoopGroup eventLoopGroup) {
super(NAME);
this.eventLoopGroup = eventLoopGroup;
}

@Override
protected void closeInternal() {
eventLoopGroup.shutdownGracefully(0, 5, TimeUnit.SECONDS);
}
}

/**
* Wraps the {@link RefCountedGroup}. Calls {@link RefCountedGroup#decRef()} on close. After close,
* this wrapped instance can no longer be used.
*/
public static class SharedGroup {

private final RefCountedGroup refCountedGroup;

private final AtomicBoolean isOpen = new AtomicBoolean(true);

private SharedGroup(RefCountedGroup refCountedGroup) {
this.refCountedGroup = refCountedGroup;
}

public EventLoopGroup getLowLevelGroup() {
return refCountedGroup.eventLoopGroup;
}

public Future<?> shutdownGracefully() {
if (isOpen.compareAndSet(true, false)) {
refCountedGroup.decRef();
if (refCountedGroup.refCount() == 0) {
refCountedGroup.eventLoopGroup.terminationFuture();
return refCountedGroup.eventLoopGroup.terminationFuture();
} else {
return getSuccessPromise();
Tim-Brooks marked this conversation as resolved.
Show resolved Hide resolved
}
} else {
return getSuccessPromise();
}
}

private static Future<?> getSuccessPromise() {
Tim-Brooks marked this conversation as resolved.
Show resolved Hide resolved
DefaultPromise<?> promise = new DefaultPromise<>(GlobalEventExecutor.INSTANCE);
promise.setSuccess(null);
return promise;
}
}
}