From d112e514cc7b59efcd90ddd2acae44d745e63756 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 4 Sep 2019 14:01:32 -0600 Subject: [PATCH 1/7] Share netty event loops between transports Currently Elasticsearch creates independent event loop groups for each transport (http and internal) transport type. This is unnecessary and can lead to contention when different threads access shared resources (ex: allocators). This commit moves to a model where, by default, the event loops are shared between the transports. The previous behavior can be attained by specifically setting the http worker count. --- .../netty4/Netty4HttpServerTransport.java | 22 +-- .../elasticsearch/transport/Netty4Plugin.java | 20 ++- .../transport/SharedGroupFactory.java | 155 ++++++++++++++++++ .../transport/netty4/Netty4Transport.java | 49 +++--- .../http/netty4/Netty4BadRequestTests.java | 5 +- .../Netty4HttpServerPipeliningTests.java | 3 +- .../Netty4HttpServerTransportTests.java | 19 ++- .../Netty4SizeHeaderFrameDecoderTests.java | 3 +- .../netty4/NettyTransportMultiPortTests.java | 4 +- .../netty4/SimpleNetty4TransportTests.java | 4 +- .../netty4/SecurityNetty4Transport.java | 2 +- .../SecurityNetty4HttpServerTransport.java | 2 +- 12 files changed, 229 insertions(+), 59 deletions(-) create mode 100644 modules/transport-netty4/src/main/java/org/elasticsearch/transport/SharedGroupFactory.java diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java index 6c1579bc28362..1c72fa0f18b96 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java @@ -30,7 +30,6 @@ import io.netty.channel.ChannelOption; import io.netty.channel.FixedRecvByteBufAllocator; import io.netty.channel.RecvByteBufAllocator; -import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioChannelOption; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.ByteToMessageDecoder; @@ -64,13 +63,13 @@ import org.elasticsearch.http.netty4.cors.Netty4CorsHandler; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.CopyBytesServerSocketChannel; +import org.elasticsearch.transport.SharedGroupFactory; import org.elasticsearch.transport.netty4.Netty4Utils; import java.net.InetSocketAddress; import java.net.SocketOption; import java.util.concurrent.TimeUnit; -import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory; import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CHUNK_SIZE; import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH; import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_HEADER_SIZE; @@ -138,21 +137,23 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport { private final ByteSizeValue maxHeaderSize; private final ByteSizeValue maxChunkSize; - private final int workerCount; - private final int pipeliningMaxEvents; + private final SharedGroupFactory sharedGroupFactory; private final RecvByteBufAllocator recvByteBufAllocator; private final int readTimeoutMillis; private final int maxCompositeBufferComponents; private volatile ServerBootstrap serverBootstrap; + private volatile SharedGroupFactory.SharedGroup sharedGroup; public Netty4HttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool, - NamedXContentRegistry xContentRegistry, Dispatcher dispatcher) { + NamedXContentRegistry xContentRegistry, Dispatcher dispatcher, + SharedGroupFactory sharedGroupFactory) { super(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher); Netty4Utils.setAvailableProcessors(EsExecutors.NODE_PROCESSORS_SETTING.get(settings)); + this.sharedGroupFactory = sharedGroupFactory; this.maxChunkSize = SETTING_HTTP_MAX_CHUNK_SIZE.get(settings); this.maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings); @@ -160,7 +161,6 @@ public Netty4HttpServerTransport(Settings settings, NetworkService networkServic this.pipeliningMaxEvents = SETTING_PIPELINING_MAX_EVENTS.get(settings); this.maxCompositeBufferComponents = SETTING_HTTP_NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS.get(settings); - this.workerCount = SETTING_HTTP_WORKER_COUNT.get(settings); this.readTimeoutMillis = Math.toIntExact(SETTING_HTTP_READ_TIMEOUT.get(settings).getMillis()); @@ -181,10 +181,10 @@ public Settings settings() { protected void doStart() { boolean success = false; try { + sharedGroup = sharedGroupFactory.getHttpGroup(); serverBootstrap = new ServerBootstrap(); - serverBootstrap.group(new NioEventLoopGroup(workerCount, daemonThreadFactory(settings, - HTTP_SERVER_WORKER_THREAD_NAME_PREFIX))); + serverBootstrap.group(sharedGroup.getLowLevelGroup()); // If direct buffer pooling is disabled, use the CopyBytesServerSocketChannel which will create child // channels of type CopyBytesSocketChannel. CopyBytesSocketChannel pool a single direct buffer @@ -263,9 +263,9 @@ protected HttpServerChannel bind(InetSocketAddress socketAddress) throws Excepti @Override protected void stopInternal() { - if (serverBootstrap != null) { - serverBootstrap.config().group().shutdownGracefully(0, 5, TimeUnit.SECONDS).awaitUninterruptibly(); - serverBootstrap = null; + if (sharedGroup != null) { + sharedGroup.shutdownGracefully(); + sharedGroup = null; } } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/Netty4Plugin.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/Netty4Plugin.java index bcfd5e0b326d3..2ad82bf1965f0 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/Netty4Plugin.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/Netty4Plugin.java @@ -19,6 +19,7 @@ package org.elasticsearch.transport; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkModule; @@ -47,6 +48,8 @@ public class Netty4Plugin extends Plugin implements NetworkPlugin { public static final String NETTY_TRANSPORT_NAME = "netty4"; public static final String NETTY_HTTP_TRANSPORT_NAME = "netty4"; + private final SetOnce groupFactory = new SetOnce<>(); + @Override public List> getSettings() { return Arrays.asList( @@ -76,7 +79,7 @@ public Map> getTransports(Settings settings, ThreadP CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) { return Collections.singletonMap(NETTY_TRANSPORT_NAME, () -> new Netty4Transport(settings, Version.CURRENT, threadPool, - networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService)); + networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, getSharedGroupFactory(settings))); } @Override @@ -86,7 +89,18 @@ public Map> getHttpTransports(Settings set NamedXContentRegistry xContentRegistry, NetworkService networkService, HttpServerTransport.Dispatcher dispatcher) { - return Collections.singletonMap(NETTY_HTTP_TRANSPORT_NAME, - () -> new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher)); + return Collections.singletonMap(NETTY_HTTP_TRANSPORT_NAME, () -> new Netty4HttpServerTransport(settings, networkService, bigArrays, + threadPool, xContentRegistry, dispatcher, getSharedGroupFactory(settings))); + } + + private synchronized SharedGroupFactory getSharedGroupFactory(Settings settings) { + SharedGroupFactory groupFactory = this.groupFactory.get(); + if (groupFactory != null) { + assert groupFactory.getSettings().equals(settings) : "Different settings than originally provided"; + return groupFactory; + } else { + this.groupFactory.set(new SharedGroupFactory(settings)); + return this.groupFactory.get(); + } } } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/SharedGroupFactory.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/SharedGroupFactory.java new file mode 100644 index 0000000000000..7ad34a2574f35 --- /dev/null +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/SharedGroupFactory.java @@ -0,0 +1,155 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport; + +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.util.concurrent.DefaultPromise; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GlobalEventExecutor; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AbstractRefCounted; +import org.elasticsearch.http.HttpServerTransport; +import org.elasticsearch.http.netty4.Netty4HttpServerTransport; +import org.elasticsearch.transport.netty4.Netty4Transport; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory; + +/** + * Creates and returns {@link io.netty.channel.EventLoopGroup} instances. It will return a shared group for + * both {@link #getHttpGroup()} and {@link #getTransportGroup()} if + * {@link org.elasticsearch.http.netty4.Netty4HttpServerTransport#SETTING_HTTP_WORKER_COUNT} is configured to be 0. + * If that setting is not 0, then it will return a different group in the {@link #getHttpGroup()} call. + */ +public final class SharedGroupFactory { + + private final Settings settings; + private final int workerCount; + private final int httpWorkerCount; + + private RefCountedGroup refCountedGroup; + + public SharedGroupFactory(Settings settings) { + this.settings = settings; + this.workerCount = Netty4Transport.WORKER_COUNT.get(settings); + this.httpWorkerCount = Netty4HttpServerTransport.SETTING_HTTP_WORKER_COUNT.get(settings); + } + + public Settings getSettings() { + return settings; + } + + public int getHttpWorkerCount() { + if (httpWorkerCount == 0) { + return workerCount; + } else { + return httpWorkerCount; + } + } + + public int getTransportWorkerCount() { + return workerCount; + } + + public synchronized SharedGroup getTransportGroup() { + return getGenericGroup(); + } + + public synchronized SharedGroup getHttpGroup() { + if (httpWorkerCount == 0) { + return getGenericGroup(); + } else { + NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(httpWorkerCount, + daemonThreadFactory(settings, HttpServerTransport.HTTP_SERVER_WORKER_THREAD_NAME_PREFIX)); + + return new SharedGroup(new RefCountedGroup(eventLoopGroup)); + } + } + + private SharedGroup getGenericGroup() { + if (refCountedGroup == null) { + EventLoopGroup eventLoopGroup = new NioEventLoopGroup(workerCount, + daemonThreadFactory(settings, TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX)); + this.refCountedGroup = new RefCountedGroup(eventLoopGroup); + return new SharedGroup(refCountedGroup); + } else { + refCountedGroup.incRef(); + return new SharedGroup(refCountedGroup); + } + } + + private static class RefCountedGroup extends AbstractRefCounted { + + public static final String NAME = "ref-counted-event-loop-group"; + private final EventLoopGroup eventLoopGroup; + + private RefCountedGroup(EventLoopGroup eventLoopGroup) { + super(NAME); + this.eventLoopGroup = eventLoopGroup; + } + + @Override + protected void closeInternal() { + eventLoopGroup.shutdownGracefully(0, 5, TimeUnit.SECONDS); + } + } + + /** + * Wraps the {@link RefCountedGroup}. Calls {@link RefCountedGroup#decRef()} on close. After close, + * this wrapped instance can no longer be used. + */ + public static class SharedGroup { + + private final RefCountedGroup refCountedGroup; + + private final AtomicBoolean isOpen = new AtomicBoolean(true); + + private SharedGroup(RefCountedGroup refCountedGroup) { + this.refCountedGroup = refCountedGroup; + } + + public EventLoopGroup getLowLevelGroup() { + return refCountedGroup.eventLoopGroup; + } + + public Future shutdownGracefully() { + if (isOpen.compareAndSet(true, false)) { + refCountedGroup.decRef(); + if (refCountedGroup.refCount() == 0) { + refCountedGroup.eventLoopGroup.terminationFuture(); + return refCountedGroup.eventLoopGroup.terminationFuture(); + } else { + return getSuccessPromise(); + } + } else { + return getSuccessPromise(); + } + } + + private static Future getSuccessPromise() { + DefaultPromise promise = new DefaultPromise<>(GlobalEventExecutor.INSTANCE); + promise.setSuccess(null); + return promise; + } + } +} diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index d3e43e16dd5f4..8670b88721f4e 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -32,7 +32,6 @@ import io.netty.channel.ChannelOption; import io.netty.channel.FixedRecvByteBufAllocator; import io.netty.channel.RecvByteBufAllocator; -import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioChannelOption; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; @@ -46,7 +45,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; @@ -61,6 +59,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.CopyBytesServerSocketChannel; import org.elasticsearch.transport.CopyBytesSocketChannel; +import org.elasticsearch.transport.SharedGroupFactory; import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.TransportSettings; @@ -68,13 +67,10 @@ import java.net.InetSocketAddress; import java.net.SocketOption; import java.util.Map; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; import static org.elasticsearch.common.settings.Setting.byteSizeSetting; import static org.elasticsearch.common.settings.Setting.intSetting; import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; -import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory; /** * There are 4 types of connections per node, low/med/high/ping. Low if for batch oriented APIs (like recovery or @@ -100,20 +96,20 @@ public class Netty4Transport extends TcpTransport { intSetting("transport.netty.boss_count", 1, 1, Property.NodeScope); + private final SharedGroupFactory sharedGroupFactory; private final RecvByteBufAllocator recvByteBufAllocator; - private final int workerCount; private final ByteSizeValue receivePredictorMin; private final ByteSizeValue receivePredictorMax; private final Map serverBootstraps = newConcurrentMap(); private volatile Bootstrap clientBootstrap; - private volatile NioEventLoopGroup eventLoopGroup; + private volatile SharedGroupFactory.SharedGroup sharedGroup; public Netty4Transport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService, PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry, - CircuitBreakerService circuitBreakerService) { + CircuitBreakerService circuitBreakerService, SharedGroupFactory sharedGroupFactory) { super(settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService); Netty4Utils.setAvailableProcessors(EsExecutors.NODE_PROCESSORS_SETTING.get(settings)); - this.workerCount = WORKER_COUNT.get(settings); + this.sharedGroupFactory = sharedGroupFactory; // See AdaptiveReceiveBufferSizePredictor#DEFAULT_XXX for default values in netty..., we can use higher ones for us, even fixed one this.receivePredictorMin = NETTY_RECEIVE_PREDICTOR_MIN.get(settings); @@ -130,12 +126,11 @@ public Netty4Transport(Settings settings, Version version, ThreadPool threadPool protected void doStart() { boolean success = false; try { - ThreadFactory threadFactory = daemonThreadFactory(settings, TRANSPORT_WORKER_THREAD_NAME_PREFIX); - eventLoopGroup = new NioEventLoopGroup(workerCount, threadFactory); - clientBootstrap = createClientBootstrap(eventLoopGroup); + sharedGroup = sharedGroupFactory.getTransportGroup(); + clientBootstrap = createClientBootstrap(sharedGroup); if (NetworkService.NETWORK_SERVER.get(settings)) { for (ProfileSettings profileSettings : profileSettings) { - createServerBootstrap(profileSettings, eventLoopGroup); + createServerBootstrap(profileSettings, sharedGroup); bindServer(profileSettings); } } @@ -148,9 +143,9 @@ protected void doStart() { } } - private Bootstrap createClientBootstrap(NioEventLoopGroup eventLoopGroup) { + private Bootstrap createClientBootstrap(SharedGroupFactory.SharedGroup sharedGroup) { final Bootstrap bootstrap = new Bootstrap(); - bootstrap.group(eventLoopGroup); + bootstrap.group(sharedGroup.getLowLevelGroup()); // If direct buffer pooling is disabled, use the CopyBytesSocketChannel which will pool a single // direct buffer per-event-loop thread which will be used for IO operations. @@ -204,17 +199,17 @@ private Bootstrap createClientBootstrap(NioEventLoopGroup eventLoopGroup) { return bootstrap; } - private void createServerBootstrap(ProfileSettings profileSettings, NioEventLoopGroup eventLoopGroup) { + private void createServerBootstrap(ProfileSettings profileSettings, SharedGroupFactory.SharedGroup sharedGroup) { String name = profileSettings.profileName; if (logger.isDebugEnabled()) { logger.debug("using profile[{}], worker_count[{}], port[{}], bind_host[{}], publish_host[{}], receive_predictor[{}->{}]", - name, workerCount, profileSettings.portOrRange, profileSettings.bindHosts, profileSettings.publishHosts, - receivePredictorMin, receivePredictorMax); + name, sharedGroupFactory.getTransportWorkerCount(), profileSettings.portOrRange, profileSettings.bindHosts, + profileSettings.publishHosts, receivePredictorMin, receivePredictorMax); } final ServerBootstrap serverBootstrap = new ServerBootstrap(); - serverBootstrap.group(eventLoopGroup); + serverBootstrap.group(sharedGroup.getLowLevelGroup()); // If direct buffer pooling is disabled, use the CopyBytesServerSocketChannel which will create child // channels of type CopyBytesSocketChannel. CopyBytesSocketChannel pool a single direct buffer @@ -316,16 +311,14 @@ protected Netty4TcpServerChannel bind(String name, InetSocketAddress address) { @Override @SuppressForbidden(reason = "debug") protected void stopInternal() { - Releasables.close(() -> { - Future shutdownFuture = eventLoopGroup.shutdownGracefully(0, 5, TimeUnit.SECONDS); - shutdownFuture.awaitUninterruptibly(); - if (shutdownFuture.isSuccess() == false) { - logger.warn("Error closing netty event loop group", shutdownFuture.cause()); - } + Future shutdownFuture = sharedGroup.shutdownGracefully(); + shutdownFuture.awaitUninterruptibly(); + if (shutdownFuture.isSuccess() == false) { + logger.warn("Error closing netty event loop group", shutdownFuture.cause()); + } - serverBootstraps.clear(); - clientBootstrap = null; - }); + serverBootstraps.clear(); + clientBootstrap = null; } protected class ClientChannelInitializer extends ChannelInitializer { diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4BadRequestTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4BadRequestTests.java index 344f3bf1265e8..99659f8209b15 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4BadRequestTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4BadRequestTests.java @@ -37,6 +37,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.SharedGroupFactory; import org.junit.After; import org.junit.Before; @@ -85,8 +86,8 @@ public void dispatchBadRequest(RestChannel channel, ThreadContext threadContext, } }; - try (HttpServerTransport httpServerTransport = - new Netty4HttpServerTransport(Settings.EMPTY, networkService, bigArrays, threadPool, xContentRegistry(), dispatcher)) { + try (HttpServerTransport httpServerTransport = new Netty4HttpServerTransport(Settings.EMPTY, networkService, bigArrays, threadPool, + xContentRegistry(), dispatcher, new SharedGroupFactory(Settings.EMPTY))) { httpServerTransport.start(); final TransportAddress transportAddress = randomFrom(httpServerTransport.boundAddress().boundAddresses()); diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerPipeliningTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerPipeliningTests.java index cb4d14038d24a..0bd253647cff6 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerPipeliningTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerPipeliningTests.java @@ -44,6 +44,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.SharedGroupFactory; import org.junit.After; import org.junit.Before; @@ -118,7 +119,7 @@ class CustomNettyHttpServerTransport extends Netty4HttpServerTransport { Netty4HttpServerPipeliningTests.this.networkService, Netty4HttpServerPipeliningTests.this.bigArrays, Netty4HttpServerPipeliningTests.this.threadPool, - xContentRegistry(), new NullDispatcher()); + xContentRegistry(), new NullDispatcher(), new SharedGroupFactory(settings)); } @Override diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java index 717e14ebc89c3..761bbc5c0030d 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java @@ -61,6 +61,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.SharedGroupFactory; import org.junit.After; import org.junit.Before; @@ -157,7 +158,7 @@ public void dispatchBadRequest(RestChannel channel, ThreadContext threadContext, } }; try (Netty4HttpServerTransport transport = new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, - xContentRegistry(), dispatcher)) { + xContentRegistry(), dispatcher, new SharedGroupFactory(settings))) { transport.start(); final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); try (Netty4HttpClient client = new Netty4HttpClient()) { @@ -190,12 +191,12 @@ public void dispatchBadRequest(RestChannel channel, ThreadContext threadContext, public void testBindUnavailableAddress() { try (Netty4HttpServerTransport transport = new Netty4HttpServerTransport(Settings.EMPTY, networkService, bigArrays, threadPool, - xContentRegistry(), new NullDispatcher())) { + xContentRegistry(), new NullDispatcher(), new SharedGroupFactory(Settings.EMPTY))) { transport.start(); TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); Settings settings = Settings.builder().put("http.port", remoteAddress.getPort()).build(); try (Netty4HttpServerTransport otherTransport = new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, - xContentRegistry(), new NullDispatcher())) { + xContentRegistry(), new NullDispatcher(), new SharedGroupFactory(settings))) { BindHttpException bindHttpException = expectThrows(BindHttpException.class, otherTransport::start); assertEquals("Failed to bind to [" + remoteAddress.getPort() + "]", bindHttpException.getMessage()); } @@ -235,8 +236,8 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th settings = Settings.builder().put(httpMaxInitialLineLengthSetting.getKey(), maxInitialLineLength + "b").build(); } - try (Netty4HttpServerTransport transport = - new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry(), dispatcher)) { + try (Netty4HttpServerTransport transport = new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, + xContentRegistry(), dispatcher, new SharedGroupFactory(settings))) { transport.start(); final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); @@ -281,8 +282,8 @@ public void dispatchBadRequest(final RestChannel channel, .put(SETTING_CORS_ENABLED.getKey(), true) .put(SETTING_CORS_ALLOW_ORIGIN.getKey(), "elastic.co").build(); - try (Netty4HttpServerTransport transport = - new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry(), dispatcher)) { + try (Netty4HttpServerTransport transport = new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, + xContentRegistry(), dispatcher, new SharedGroupFactory(settings))) { transport.start(); final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); @@ -341,8 +342,8 @@ public void dispatchBadRequest(final RestChannel channel, NioEventLoopGroup group = new NioEventLoopGroup(); - try (Netty4HttpServerTransport transport = - new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry(), dispatcher)) { + try (Netty4HttpServerTransport transport = new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, + xContentRegistry(), dispatcher, new SharedGroupFactory(settings))) { transport.start(); final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4SizeHeaderFrameDecoderTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4SizeHeaderFrameDecoderTests.java index 0e90559bd51b0..d30f383c467a2 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4SizeHeaderFrameDecoderTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4SizeHeaderFrameDecoderTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.mocksocket.MockSocket; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.SharedGroupFactory; import org.elasticsearch.transport.TransportSettings; import org.junit.After; import org.junit.Before; @@ -66,7 +67,7 @@ public void startThreadPool() { NetworkService networkService = new NetworkService(Collections.emptyList()); PageCacheRecycler recycler = new MockPageCacheRecycler(Settings.EMPTY); nettyTransport = new Netty4Transport(settings, Version.CURRENT, threadPool, networkService, recycler, - new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService()); + new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService(), new SharedGroupFactory(settings)); nettyTransport.start(); TransportAddress[] boundAddresses = nettyTransport.boundAddress().boundAddresses(); diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/NettyTransportMultiPortTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/NettyTransportMultiPortTests.java index 5d3e897202cbc..c12efe7a4fefe 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/NettyTransportMultiPortTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/NettyTransportMultiPortTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.SharedGroupFactory; import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.TransportSettings; import org.junit.Before; @@ -120,7 +121,8 @@ public void testThatDefaultProfilePortOverridesGeneralConfiguration() throws Exc private TcpTransport startTransport(Settings settings, ThreadPool threadPool) { PageCacheRecycler recycler = new MockPageCacheRecycler(Settings.EMPTY); TcpTransport transport = new Netty4Transport(settings, Version.CURRENT, threadPool, new NetworkService(Collections.emptyList()), - recycler, new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService()); + recycler, new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService(), + new SharedGroupFactory(settings)); transport.start(); assertThat(transport.lifecycleState(), is(Lifecycle.State.STARTED)); diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java index 0b210995795bf..9ab4ba4564db3 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java @@ -32,6 +32,7 @@ import org.elasticsearch.transport.AbstractSimpleTransportTestCase; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.ConnectionProfile; +import org.elasticsearch.transport.SharedGroupFactory; import org.elasticsearch.transport.TcpChannel; import org.elasticsearch.transport.Transport; @@ -49,7 +50,8 @@ public class SimpleNetty4TransportTests extends AbstractSimpleTransportTestCase protected Transport build(Settings settings, final Version version, ClusterSettings clusterSettings, boolean doHandshake) { NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList()); return new Netty4Transport(settings, version, threadPool, new NetworkService(Collections.emptyList()), - PageCacheRecycler.NON_RECYCLING_INSTANCE, namedWriteableRegistry, new NoneCircuitBreakerService()) { + PageCacheRecycler.NON_RECYCLING_INSTANCE, namedWriteableRegistry, new NoneCircuitBreakerService(), + new SharedGroupFactory(settings)) { @Override public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionProfile profile, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/transport/netty4/SecurityNetty4Transport.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/transport/netty4/SecurityNetty4Transport.java index 6e2b9c1a7efdd..14fd9cfe8504f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/transport/netty4/SecurityNetty4Transport.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/transport/netty4/SecurityNetty4Transport.java @@ -62,7 +62,7 @@ public SecurityNetty4Transport( final NamedWriteableRegistry namedWriteableRegistry, final CircuitBreakerService circuitBreakerService, final SSLService sslService) { - super(settings, version, threadPool, networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService); + super(settings, version, threadPool, networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, null); this.exceptionHandler = new SecurityTransportExceptionHandler(logger, lifecycle, (c, e) -> super.onException(c, e)); this.sslService = sslService; this.sslEnabled = XPackSettings.TRANSPORT_SSL_ENABLED.get(settings); diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransport.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransport.java index 043af216b8f35..50e6a42fd8dd7 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransport.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransport.java @@ -37,7 +37,7 @@ public class SecurityNetty4HttpServerTransport extends Netty4HttpServerTransport public SecurityNetty4HttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, IPFilter ipFilter, SSLService sslService, ThreadPool threadPool, NamedXContentRegistry xContentRegistry, Dispatcher dispatcher) { - super(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher); + super(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher, null); this.securityExceptionHandler = new SecurityHttpExceptionHandler(logger, lifecycle, (c, e) -> super.onException(c, e)); this.ipFilter = ipFilter; final boolean ssl = HTTP_SSL_ENABLED.get(settings); From b37467a62b05c26ac5f17312d40fc022e9e66331 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 4 Sep 2019 14:19:47 -0600 Subject: [PATCH 2/7] tests --- .../netty4/Netty4HttpServerTransport.java | 5 +- .../transport/SharedGroupFactoryTests.java | 64 +++++++++++++++++++ 2 files changed, 66 insertions(+), 3 deletions(-) create mode 100644 modules/transport-netty4/src/test/java/org/elasticsearch/transport/SharedGroupFactoryTests.java diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java index 1c72fa0f18b96..0b09dd51f8755 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java @@ -126,9 +126,8 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport { // Netty's CompositeByteBuf implementation does not allow less than two components. }, s -> Setting.parseInt(s, 2, Integer.MAX_VALUE, SETTING_KEY_HTTP_NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS), Property.NodeScope); - public static final Setting SETTING_HTTP_WORKER_COUNT = new Setting<>("http.netty.worker_count", - (s) -> Integer.toString(EsExecutors.numberOfProcessors(s) * 2), - (s) -> Setting.parseInt(s, 1, "http.netty.worker_count"), Property.NodeScope); + public static final Setting SETTING_HTTP_WORKER_COUNT = new Setting<>("http.netty.worker_count", "0", + (s) -> Setting.parseInt(s, 0, "http.netty.worker_count"), Property.NodeScope); public static final Setting SETTING_HTTP_NETTY_RECEIVE_PREDICTOR_SIZE = Setting.byteSizeSetting("http.netty.receive_predictor_size", new ByteSizeValue(64, ByteSizeUnit.KB), Property.NodeScope); diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/SharedGroupFactoryTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/SharedGroupFactoryTests.java new file mode 100644 index 0000000000000..ca11991119e53 --- /dev/null +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/SharedGroupFactoryTests.java @@ -0,0 +1,64 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.http.netty4.Netty4HttpServerTransport; +import org.elasticsearch.test.ESTestCase; + +public final class SharedGroupFactoryTests extends ESTestCase { + + public void testSharedEventLoops() throws Exception { + SharedGroupFactory sharedGroupFactory = new SharedGroupFactory(Settings.EMPTY); + + SharedGroupFactory.SharedGroup httpGroup = sharedGroupFactory.getHttpGroup(); + SharedGroupFactory.SharedGroup transportGroup = sharedGroupFactory.getTransportGroup(); + + try { + assertSame(httpGroup.getLowLevelGroup(), transportGroup.getLowLevelGroup()); + } finally { + httpGroup.shutdownGracefully().sync(); + assertFalse(httpGroup.getLowLevelGroup().isShuttingDown()); + assertFalse(transportGroup.getLowLevelGroup().isShuttingDown()); + transportGroup.shutdownGracefully().sync(); + assertTrue(httpGroup.getLowLevelGroup().isShuttingDown()); + assertTrue(transportGroup.getLowLevelGroup().isShuttingDown()); + } + } + + public void testNonSharedEventLoops() throws Exception { + Settings settings = Settings.builder() + .put(Netty4HttpServerTransport.SETTING_HTTP_WORKER_COUNT.getKey(), randomIntBetween(1, 10)) + .build(); + SharedGroupFactory sharedGroupFactory = new SharedGroupFactory(settings); + SharedGroupFactory.SharedGroup httpGroup = sharedGroupFactory.getHttpGroup(); + SharedGroupFactory.SharedGroup transportGroup = sharedGroupFactory.getTransportGroup(); + + try { + assertNotSame(httpGroup.getLowLevelGroup(), transportGroup.getLowLevelGroup()); + } finally { + httpGroup.shutdownGracefully().sync(); + assertTrue(httpGroup.getLowLevelGroup().isShuttingDown()); + assertFalse(transportGroup.getLowLevelGroup().isShuttingDown()); + transportGroup.shutdownGracefully().sync(); + assertTrue(transportGroup.getLowLevelGroup().isShuttingDown()); + } + } +} From 71ff73b57f70001a5d295ac9b2b6964908523909 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 4 Sep 2019 14:36:34 -0600 Subject: [PATCH 3/7] Add security --- .../netty4/SecurityNetty4Transport.java | 7 +++-- .../xpack/security/Security.java | 29 ++++++++++++++----- .../SecurityNetty4HttpServerTransport.java | 5 ++-- .../netty4/SecurityNetty4ServerTransport.java | 7 +++-- ...ecurityNetty4HttpServerTransportTests.java | 19 +++++++----- ...pleSecurityNetty4ServerTransportTests.java | 3 +- 6 files changed, 47 insertions(+), 23 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/transport/netty4/SecurityNetty4Transport.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/transport/netty4/SecurityNetty4Transport.java index 14fd9cfe8504f..fa326a6c48ce7 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/transport/netty4/SecurityNetty4Transport.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/transport/netty4/SecurityNetty4Transport.java @@ -22,6 +22,7 @@ import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ConnectTransportException; +import org.elasticsearch.transport.SharedGroupFactory; import org.elasticsearch.transport.TcpChannel; import org.elasticsearch.transport.netty4.Netty4Transport; import org.elasticsearch.xpack.core.XPackSettings; @@ -61,8 +62,10 @@ public SecurityNetty4Transport( final PageCacheRecycler pageCacheRecycler, final NamedWriteableRegistry namedWriteableRegistry, final CircuitBreakerService circuitBreakerService, - final SSLService sslService) { - super(settings, version, threadPool, networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, null); + final SSLService sslService, + final SharedGroupFactory sharedGroupFactory) { + super(settings, version, threadPool, networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, + sharedGroupFactory); this.exceptionHandler = new SecurityTransportExceptionHandler(logger, lifecycle, (c, e) -> super.onException(c, e)); this.sslService = sslService; this.sslEnabled = XPackSettings.TRANSPORT_SSL_ENABLED.get(settings); diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java index 533897bac4466..040008383cc53 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java @@ -61,6 +61,7 @@ import org.elasticsearch.threadpool.ExecutorBuilder; import org.elasticsearch.threadpool.FixedExecutorBuilder; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.SharedGroupFactory; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportInterceptor; import org.elasticsearch.transport.TransportRequest; @@ -289,7 +290,8 @@ public class Security extends Plugin implements ActionPlugin, IngestPlugin, Netw private final SetOnce tokenService = new SetOnce<>(); private final SetOnce securityActionFilter = new SetOnce<>(); private final SetOnce securityIndex = new SetOnce<>(); - private final SetOnce groupFactory = new SetOnce<>(); + private final SetOnce sharedGroupFactory = new SetOnce<>(); + private final SetOnce nioGroupFactory = new SetOnce<>(); private final SetOnce dlsBitsetCache = new SetOnce<>(); private final List bootstrapChecks; private final List securityExtensions = new ArrayList<>(); @@ -871,7 +873,8 @@ public Map> getTransports(Settings settings, ThreadP namedWriteableRegistry, circuitBreakerService, ipFilter, - getSslService()), + getSslService(), + getNettySharedGroupFactory(settings)), // security based on NIO SecurityField.NIO, () -> new SecurityNioTransport(settings, @@ -899,7 +902,7 @@ public Map> getHttpTransports(Settings set Map> httpTransports = new HashMap<>(); httpTransports.put(SecurityField.NAME4, () -> new SecurityNetty4HttpServerTransport(settings, networkService, bigArrays, - ipFilter.get(), getSslService(), threadPool, xContentRegistry, dispatcher)); + ipFilter.get(), getSslService(), threadPool, xContentRegistry, dispatcher, getNettySharedGroupFactory(settings))); httpTransports.put(SecurityField.NIO, () -> new SecurityNioHttpServerTransport(settings, networkService, bigArrays, pageCacheRecycler, threadPool, xContentRegistry, dispatcher, ipFilter.get(), getSslService(), getNioGroupFactory(settings))); @@ -1011,12 +1014,22 @@ public void reloadSPI(ClassLoader loader) { } private synchronized NioGroupFactory getNioGroupFactory(Settings settings) { - if (groupFactory.get() != null) { - assert groupFactory.get().getSettings().equals(settings) : "Different settings than originally provided"; - return groupFactory.get(); + if (nioGroupFactory.get() != null) { + assert nioGroupFactory.get().getSettings().equals(settings) : "Different settings than originally provided"; + return nioGroupFactory.get(); } else { - groupFactory.set(new NioGroupFactory(settings, logger)); - return groupFactory.get(); + nioGroupFactory.set(new NioGroupFactory(settings, logger)); + return nioGroupFactory.get(); } } + + private synchronized SharedGroupFactory getNettySharedGroupFactory(Settings settings) { + if (sharedGroupFactory.get() != null) { + assert sharedGroupFactory.get().getSettings().equals(settings) : "Different settings than originally provided"; + return sharedGroupFactory.get(); + } else { + sharedGroupFactory.set(new SharedGroupFactory(settings)); + return sharedGroupFactory.get(); + } + } } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransport.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransport.java index 50e6a42fd8dd7..ac2f0fa436efe 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransport.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransport.java @@ -17,6 +17,7 @@ import org.elasticsearch.http.HttpChannel; import org.elasticsearch.http.netty4.Netty4HttpServerTransport; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.SharedGroupFactory; import org.elasticsearch.xpack.core.ssl.SSLConfiguration; import org.elasticsearch.xpack.core.ssl.SSLService; import org.elasticsearch.xpack.security.transport.SecurityHttpExceptionHandler; @@ -36,8 +37,8 @@ public class SecurityNetty4HttpServerTransport extends Netty4HttpServerTransport public SecurityNetty4HttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, IPFilter ipFilter, SSLService sslService, ThreadPool threadPool, NamedXContentRegistry xContentRegistry, - Dispatcher dispatcher) { - super(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher, null); + Dispatcher dispatcher, SharedGroupFactory sharedGroupFactory) { + super(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher, sharedGroupFactory); this.securityExceptionHandler = new SecurityHttpExceptionHandler(logger, lifecycle, (c, e) -> super.onException(c, e)); this.ipFilter = ipFilter; final boolean ssl = HTTP_SSL_ENABLED.get(settings); diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4ServerTransport.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4ServerTransport.java index 8cb1085d3aace..6088a03ef6db0 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4ServerTransport.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4ServerTransport.java @@ -15,6 +15,7 @@ import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.SharedGroupFactory; import org.elasticsearch.xpack.core.security.transport.netty4.SecurityNetty4Transport; import org.elasticsearch.xpack.core.ssl.SSLConfiguration; import org.elasticsearch.xpack.core.ssl.SSLService; @@ -33,8 +34,10 @@ public SecurityNetty4ServerTransport( final NamedWriteableRegistry namedWriteableRegistry, final CircuitBreakerService circuitBreakerService, @Nullable final IPFilter authenticator, - final SSLService sslService) { - super(settings, version, threadPool, networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, sslService); + final SSLService sslService, + final SharedGroupFactory sharedGroupFactory) { + super(settings, version, threadPool, networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, sslService, + sharedGroupFactory); this.authenticator = authenticator; } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransportTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransportTests.java index 20ceee5d52e92..3f1f72038182a 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransportTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransportTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.http.NullDispatcher; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.SharedGroupFactory; import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.ssl.SSLClientAuth; import org.elasticsearch.xpack.core.ssl.SSLService; @@ -65,7 +66,7 @@ public void testDefaultClientAuth() throws Exception { sslService = new SSLService(settings, env); SecurityNetty4HttpServerTransport transport = new SecurityNetty4HttpServerTransport(settings, new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(IPFilter.class), sslService, - mock(ThreadPool.class), xContentRegistry(), new NullDispatcher()); + mock(ThreadPool.class), xContentRegistry(), new NullDispatcher(), new SharedGroupFactory(settings)); ChannelHandler handler = transport.configureServerChannelHandler(); final EmbeddedChannel ch = new EmbeddedChannel(handler); assertThat(ch.pipeline().get(SslHandler.class).engine().getNeedClientAuth(), is(false)); @@ -81,7 +82,7 @@ public void testOptionalClientAuth() throws Exception { sslService = new SSLService(settings, env); SecurityNetty4HttpServerTransport transport = new SecurityNetty4HttpServerTransport(settings, new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(IPFilter.class), sslService, - mock(ThreadPool.class), xContentRegistry(), new NullDispatcher()); + mock(ThreadPool.class), xContentRegistry(), new NullDispatcher(), new SharedGroupFactory(settings)); ChannelHandler handler = transport.configureServerChannelHandler(); final EmbeddedChannel ch = new EmbeddedChannel(handler); assertThat(ch.pipeline().get(SslHandler.class).engine().getNeedClientAuth(), is(false)); @@ -97,7 +98,7 @@ public void testRequiredClientAuth() throws Exception { sslService = new SSLService(settings, env); SecurityNetty4HttpServerTransport transport = new SecurityNetty4HttpServerTransport(settings, new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(IPFilter.class), sslService, - mock(ThreadPool.class), xContentRegistry(), new NullDispatcher()); + mock(ThreadPool.class), xContentRegistry(), new NullDispatcher(), new SharedGroupFactory(settings)); ChannelHandler handler = transport.configureServerChannelHandler(); final EmbeddedChannel ch = new EmbeddedChannel(handler); assertThat(ch.pipeline().get(SslHandler.class).engine().getNeedClientAuth(), is(true)); @@ -113,7 +114,7 @@ public void testNoClientAuth() throws Exception { sslService = new SSLService(settings, env); SecurityNetty4HttpServerTransport transport = new SecurityNetty4HttpServerTransport(settings, new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(IPFilter.class), sslService, - mock(ThreadPool.class), xContentRegistry(), new NullDispatcher()); + mock(ThreadPool.class), xContentRegistry(), new NullDispatcher(), new SharedGroupFactory(settings)); ChannelHandler handler = transport.configureServerChannelHandler(); final EmbeddedChannel ch = new EmbeddedChannel(handler); assertThat(ch.pipeline().get(SslHandler.class).engine().getNeedClientAuth(), is(false)); @@ -127,7 +128,7 @@ public void testCustomSSLConfiguration() throws Exception { sslService = new SSLService(settings, env); SecurityNetty4HttpServerTransport transport = new SecurityNetty4HttpServerTransport(settings, new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(IPFilter.class), sslService, - mock(ThreadPool.class), xContentRegistry(), new NullDispatcher()); + mock(ThreadPool.class), xContentRegistry(), new NullDispatcher(), new SharedGroupFactory(settings)); ChannelHandler handler = transport.configureServerChannelHandler(); EmbeddedChannel ch = new EmbeddedChannel(handler); SSLEngine defaultEngine = ch.pipeline().get(SslHandler.class).engine(); @@ -139,7 +140,8 @@ public void testCustomSSLConfiguration() throws Exception { .build(); sslService = new SSLService(settings, TestEnvironment.newEnvironment(settings)); transport = new SecurityNetty4HttpServerTransport(settings, new NetworkService(Collections.emptyList()), - mock(BigArrays.class), mock(IPFilter.class), sslService, mock(ThreadPool.class), xContentRegistry(), new NullDispatcher()); + mock(BigArrays.class), mock(IPFilter.class), sslService, mock(ThreadPool.class), xContentRegistry(), new NullDispatcher(), + new SharedGroupFactory(settings)); handler = transport.configureServerChannelHandler(); ch = new EmbeddedChannel(handler); SSLEngine customEngine = ch.pipeline().get(SslHandler.class).engine(); @@ -157,7 +159,8 @@ public void testThatExceptionIsThrownWhenConfiguredWithoutSslKey() throws Except sslService = new SSLService(settings, env); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> new SecurityNetty4HttpServerTransport(settings, new NetworkService(Collections.emptyList()), mock(BigArrays.class), - mock(IPFilter.class), sslService, mock(ThreadPool.class), xContentRegistry(), new NullDispatcher())); + mock(IPFilter.class), sslService, mock(ThreadPool.class), xContentRegistry(), new NullDispatcher(), + new SharedGroupFactory(settings))); assertThat(e.getMessage(), containsString("key must be provided")); } @@ -174,7 +177,7 @@ public void testNoExceptionWhenConfiguredWithoutSslKeySSLDisabled() throws Excep sslService = new SSLService(settings, env); SecurityNetty4HttpServerTransport transport = new SecurityNetty4HttpServerTransport(settings, new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(IPFilter.class), sslService, - mock(ThreadPool.class), xContentRegistry(), new NullDispatcher()); + mock(ThreadPool.class), xContentRegistry(), new NullDispatcher(), new SharedGroupFactory(settings)); assertNotNull(transport.configureServerChannelHandler()); } } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SimpleSecurityNetty4ServerTransportTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SimpleSecurityNetty4ServerTransportTests.java index f2fea6f5c7eaf..4df0d39cb509d 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SimpleSecurityNetty4ServerTransportTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SimpleSecurityNetty4ServerTransportTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.transport.ConnectionProfile; +import org.elasticsearch.transport.SharedGroupFactory; import org.elasticsearch.transport.TcpChannel; import org.elasticsearch.transport.Transport; import org.elasticsearch.xpack.security.transport.AbstractSimpleSecurityTransportTestCase; @@ -32,7 +33,7 @@ protected Transport build(Settings settings, final Version version, ClusterSetti .put("xpack.security.transport.ssl.enabled", true).build(); return new SecurityNetty4ServerTransport(settings1, version, threadPool, networkService, PageCacheRecycler.NON_RECYCLING_INSTANCE, namedWriteableRegistry, - new NoneCircuitBreakerService(), null, createSSLService(settings1)) { + new NoneCircuitBreakerService(), null, createSSLService(settings1), new SharedGroupFactory(settings1)) { @Override public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionProfile profile, From 96f1dc5ed0fdee893e5cb00a3be436a0e9e8539d Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 27 Sep 2019 16:18:07 -0600 Subject: [PATCH 4/7] Review changes --- .../netty4/Netty4HttpServerTransport.java | 3 +-- .../elasticsearch/transport/Netty4Plugin.java | 2 +- .../transport/SharedGroupFactory.java | 19 +++++++------------ 3 files changed, 9 insertions(+), 15 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java index 0b09dd51f8755..2d1816eb889c3 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java @@ -126,8 +126,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport { // Netty's CompositeByteBuf implementation does not allow less than two components. }, s -> Setting.parseInt(s, 2, Integer.MAX_VALUE, SETTING_KEY_HTTP_NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS), Property.NodeScope); - public static final Setting SETTING_HTTP_WORKER_COUNT = new Setting<>("http.netty.worker_count", "0", - (s) -> Setting.parseInt(s, 0, "http.netty.worker_count"), Property.NodeScope); + public static final Setting SETTING_HTTP_WORKER_COUNT = Setting.intSetting("http.netty.worker_count", 0, Property.NodeScope); public static final Setting SETTING_HTTP_NETTY_RECEIVE_PREDICTOR_SIZE = Setting.byteSizeSetting("http.netty.receive_predictor_size", new ByteSizeValue(64, ByteSizeUnit.KB), Property.NodeScope); diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/Netty4Plugin.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/Netty4Plugin.java index 2ad82bf1965f0..6fb0631f1bdc6 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/Netty4Plugin.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/Netty4Plugin.java @@ -93,7 +93,7 @@ public Map> getHttpTransports(Settings set threadPool, xContentRegistry, dispatcher, getSharedGroupFactory(settings))); } - private synchronized SharedGroupFactory getSharedGroupFactory(Settings settings) { + private SharedGroupFactory getSharedGroupFactory(Settings settings) { SharedGroupFactory groupFactory = this.groupFactory.get(); if (groupFactory != null) { assert groupFactory.getSettings().equals(settings) : "Different settings than originally provided"; diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/SharedGroupFactory.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/SharedGroupFactory.java index 7ad34a2574f35..2efd73bcb6577 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/SharedGroupFactory.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/SharedGroupFactory.java @@ -48,6 +48,7 @@ public final class SharedGroupFactory { private final int httpWorkerCount; private RefCountedGroup refCountedGroup; + private SharedGroup dedicatedHttpGroup; public SharedGroupFactory(Settings settings) { this.settings = settings; @@ -59,14 +60,6 @@ public Settings getSettings() { return settings; } - public int getHttpWorkerCount() { - if (httpWorkerCount == 0) { - return workerCount; - } else { - return httpWorkerCount; - } - } - public int getTransportWorkerCount() { return workerCount; } @@ -79,10 +72,12 @@ public synchronized SharedGroup getHttpGroup() { if (httpWorkerCount == 0) { return getGenericGroup(); } else { - NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(httpWorkerCount, - daemonThreadFactory(settings, HttpServerTransport.HTTP_SERVER_WORKER_THREAD_NAME_PREFIX)); - - return new SharedGroup(new RefCountedGroup(eventLoopGroup)); + if (dedicatedHttpGroup == null) { + NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(httpWorkerCount, + daemonThreadFactory(settings, HttpServerTransport.HTTP_SERVER_WORKER_THREAD_NAME_PREFIX)); + dedicatedHttpGroup = new SharedGroup(new RefCountedGroup(eventLoopGroup)); + } + return dedicatedHttpGroup; } } From 9a06615e399ef874d6e37082dea892404f97b578 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 17 Oct 2019 16:44:05 -0600 Subject: [PATCH 5/7] Review changes --- .../netty4/Netty4HttpServerTransport.java | 2 +- .../transport/SharedGroupFactory.java | 20 +++++++++---------- .../transport/netty4/Netty4Transport.java | 8 +------- .../transport/SharedGroupFactoryTests.java | 8 ++++---- 4 files changed, 16 insertions(+), 22 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java index 2d1816eb889c3..8fd8620fdd468 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java @@ -262,7 +262,7 @@ protected HttpServerChannel bind(InetSocketAddress socketAddress) throws Excepti @Override protected void stopInternal() { if (sharedGroup != null) { - sharedGroup.shutdownGracefully(); + sharedGroup.shutdown(); sharedGroup = null; } } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/SharedGroupFactory.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/SharedGroupFactory.java index 2efd73bcb6577..92afc4c4e721b 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/SharedGroupFactory.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/SharedGroupFactory.java @@ -24,6 +24,8 @@ import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GlobalEventExecutor; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractRefCounted; import org.elasticsearch.http.HttpServerTransport; @@ -43,6 +45,8 @@ */ public final class SharedGroupFactory { + private static final Logger logger = LogManager.getLogger(SharedGroupFactory.class); + private final Settings settings; private final int workerCount; private final int httpWorkerCount; @@ -105,7 +109,11 @@ private RefCountedGroup(EventLoopGroup eventLoopGroup) { @Override protected void closeInternal() { - eventLoopGroup.shutdownGracefully(0, 5, TimeUnit.SECONDS); + Future shutdownFuture = eventLoopGroup.shutdownGracefully(0, 5, TimeUnit.SECONDS); + shutdownFuture.awaitUninterruptibly(); + if (shutdownFuture.isSuccess() == false) { + logger.warn("Error closing netty event loop group", shutdownFuture.cause()); + } } } @@ -127,17 +135,9 @@ public EventLoopGroup getLowLevelGroup() { return refCountedGroup.eventLoopGroup; } - public Future shutdownGracefully() { + public void shutdown() { if (isOpen.compareAndSet(true, false)) { refCountedGroup.decRef(); - if (refCountedGroup.refCount() == 0) { - refCountedGroup.eventLoopGroup.terminationFuture(); - return refCountedGroup.eventLoopGroup.terminationFuture(); - } else { - return getSuccessPromise(); - } - } else { - return getSuccessPromise(); } } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index 8670b88721f4e..2ac3dc8701b0a 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -36,7 +36,6 @@ import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.AttributeKey; -import io.netty.util.concurrent.Future; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; @@ -311,12 +310,7 @@ protected Netty4TcpServerChannel bind(String name, InetSocketAddress address) { @Override @SuppressForbidden(reason = "debug") protected void stopInternal() { - Future shutdownFuture = sharedGroup.shutdownGracefully(); - shutdownFuture.awaitUninterruptibly(); - if (shutdownFuture.isSuccess() == false) { - logger.warn("Error closing netty event loop group", shutdownFuture.cause()); - } - + sharedGroup.shutdown(); serverBootstraps.clear(); clientBootstrap = null; } diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/SharedGroupFactoryTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/SharedGroupFactoryTests.java index ca11991119e53..66ce065d1ae39 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/SharedGroupFactoryTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/SharedGroupFactoryTests.java @@ -34,10 +34,10 @@ public void testSharedEventLoops() throws Exception { try { assertSame(httpGroup.getLowLevelGroup(), transportGroup.getLowLevelGroup()); } finally { - httpGroup.shutdownGracefully().sync(); + httpGroup.shutdown(); assertFalse(httpGroup.getLowLevelGroup().isShuttingDown()); assertFalse(transportGroup.getLowLevelGroup().isShuttingDown()); - transportGroup.shutdownGracefully().sync(); + transportGroup.shutdown(); assertTrue(httpGroup.getLowLevelGroup().isShuttingDown()); assertTrue(transportGroup.getLowLevelGroup().isShuttingDown()); } @@ -54,10 +54,10 @@ public void testNonSharedEventLoops() throws Exception { try { assertNotSame(httpGroup.getLowLevelGroup(), transportGroup.getLowLevelGroup()); } finally { - httpGroup.shutdownGracefully().sync(); + httpGroup.shutdown(); assertTrue(httpGroup.getLowLevelGroup().isShuttingDown()); assertFalse(transportGroup.getLowLevelGroup().isShuttingDown()); - transportGroup.shutdownGracefully().sync(); + transportGroup.shutdown(); assertTrue(transportGroup.getLowLevelGroup().isShuttingDown()); } } From 04fe9a6c114a808d4c55003f6bfcab23347d5965 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 11 May 2020 09:43:04 -0600 Subject: [PATCH 6/7] Changes --- .../org/elasticsearch/transport/SharedGroupFactory.java | 8 -------- 1 file changed, 8 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/SharedGroupFactory.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/SharedGroupFactory.java index 92afc4c4e721b..49cb5ddc77c95 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/SharedGroupFactory.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/SharedGroupFactory.java @@ -21,9 +21,7 @@ import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.GlobalEventExecutor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.common.settings.Settings; @@ -140,11 +138,5 @@ public void shutdown() { refCountedGroup.decRef(); } } - - private static Future getSuccessPromise() { - DefaultPromise promise = new DefaultPromise<>(GlobalEventExecutor.INSTANCE); - promise.setSuccess(null); - return promise; - } } } From 4484cac6159b102e379f22edb0ffa47a9c3bbb27 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 11 May 2020 12:29:59 -0600 Subject: [PATCH 7/7] Changes --- .../elasticsearch/transport/SharedGroupFactory.java | 11 +++++------ .../transport/SharedGroupFactoryTests.java | 10 ++++++++++ 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/SharedGroupFactory.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/SharedGroupFactory.java index 49cb5ddc77c95..04ea83bb5cc90 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/SharedGroupFactory.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/SharedGroupFactory.java @@ -49,7 +49,7 @@ public final class SharedGroupFactory { private final int workerCount; private final int httpWorkerCount; - private RefCountedGroup refCountedGroup; + private RefCountedGroup genericGroup; private SharedGroup dedicatedHttpGroup; public SharedGroupFactory(Settings settings) { @@ -84,15 +84,14 @@ public synchronized SharedGroup getHttpGroup() { } private SharedGroup getGenericGroup() { - if (refCountedGroup == null) { + if (genericGroup == null) { EventLoopGroup eventLoopGroup = new NioEventLoopGroup(workerCount, daemonThreadFactory(settings, TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX)); - this.refCountedGroup = new RefCountedGroup(eventLoopGroup); - return new SharedGroup(refCountedGroup); + this.genericGroup = new RefCountedGroup(eventLoopGroup); } else { - refCountedGroup.incRef(); - return new SharedGroup(refCountedGroup); + genericGroup.incRef(); } + return new SharedGroup(genericGroup); } private static class RefCountedGroup extends AbstractRefCounted { diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/SharedGroupFactoryTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/SharedGroupFactoryTests.java index 66ce065d1ae39..2728f5db27cff 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/SharedGroupFactoryTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/SharedGroupFactoryTests.java @@ -37,9 +37,13 @@ public void testSharedEventLoops() throws Exception { httpGroup.shutdown(); assertFalse(httpGroup.getLowLevelGroup().isShuttingDown()); assertFalse(transportGroup.getLowLevelGroup().isShuttingDown()); + assertFalse(transportGroup.getLowLevelGroup().isTerminated()); + assertFalse(transportGroup.getLowLevelGroup().terminationFuture().isDone()); transportGroup.shutdown(); assertTrue(httpGroup.getLowLevelGroup().isShuttingDown()); assertTrue(transportGroup.getLowLevelGroup().isShuttingDown()); + assertTrue(transportGroup.getLowLevelGroup().isTerminated()); + assertTrue(transportGroup.getLowLevelGroup().terminationFuture().isDone()); } } @@ -56,9 +60,15 @@ public void testNonSharedEventLoops() throws Exception { } finally { httpGroup.shutdown(); assertTrue(httpGroup.getLowLevelGroup().isShuttingDown()); + assertTrue(httpGroup.getLowLevelGroup().isTerminated()); + assertTrue(httpGroup.getLowLevelGroup().terminationFuture().isDone()); assertFalse(transportGroup.getLowLevelGroup().isShuttingDown()); + assertFalse(transportGroup.getLowLevelGroup().isTerminated()); + assertFalse(transportGroup.getLowLevelGroup().terminationFuture().isDone()); transportGroup.shutdown(); assertTrue(transportGroup.getLowLevelGroup().isShuttingDown()); + assertTrue(transportGroup.getLowLevelGroup().isTerminated()); + assertTrue(transportGroup.getLowLevelGroup().terminationFuture().isDone()); } } }