diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java index ac6741d37fcab..e5c98b8cfb461 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java @@ -29,7 +29,6 @@ import io.netty.channel.ChannelOption; import io.netty.channel.FixedRecvByteBufAllocator; import io.netty.channel.RecvByteBufAllocator; -import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioChannelOption; import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.http.HttpContentCompressor; @@ -62,6 +61,7 @@ import org.elasticsearch.http.HttpServerChannel; import org.elasticsearch.http.netty4.cors.Netty4CorsHandler; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.SharedGroupFactory; import org.elasticsearch.transport.NettyAllocator; import org.elasticsearch.transport.netty4.Netty4Utils; @@ -69,7 +69,6 @@ import java.net.SocketOption; import java.util.concurrent.TimeUnit; -import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory; import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CHUNK_SIZE; import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH; import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_HEADER_SIZE; @@ -126,9 +125,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport { // Netty's CompositeByteBuf implementation does not allow less than two components. }, s -> Setting.parseInt(s, 2, Integer.MAX_VALUE, SETTING_KEY_HTTP_NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS), Property.NodeScope); - public static final Setting SETTING_HTTP_WORKER_COUNT = new Setting<>("http.netty.worker_count", - (s) -> Integer.toString(EsExecutors.allocatedProcessors(s) * 2), - (s) -> Setting.parseInt(s, 1, "http.netty.worker_count"), Property.NodeScope); + public static final Setting SETTING_HTTP_WORKER_COUNT = Setting.intSetting("http.netty.worker_count", 0, Property.NodeScope); public static final Setting SETTING_HTTP_NETTY_RECEIVE_PREDICTOR_SIZE = Setting.byteSizeSetting("http.netty.receive_predictor_size", new ByteSizeValue(64, ByteSizeUnit.KB), Property.NodeScope); @@ -137,21 +134,23 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport { private final ByteSizeValue maxHeaderSize; private final ByteSizeValue maxChunkSize; - private final int workerCount; - private final int pipeliningMaxEvents; + private final SharedGroupFactory sharedGroupFactory; private final RecvByteBufAllocator recvByteBufAllocator; private final int readTimeoutMillis; private final int maxCompositeBufferComponents; private volatile ServerBootstrap serverBootstrap; + private volatile SharedGroupFactory.SharedGroup sharedGroup; public Netty4HttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool, - NamedXContentRegistry xContentRegistry, Dispatcher dispatcher, ClusterSettings clusterSettings) { + NamedXContentRegistry xContentRegistry, Dispatcher dispatcher, ClusterSettings clusterSettings, + SharedGroupFactory sharedGroupFactory) { super(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher, clusterSettings); Netty4Utils.setAvailableProcessors(EsExecutors.NODE_PROCESSORS_SETTING.get(settings)); + this.sharedGroupFactory = sharedGroupFactory; this.maxChunkSize = SETTING_HTTP_MAX_CHUNK_SIZE.get(settings); this.maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings); @@ -159,7 +158,6 @@ public Netty4HttpServerTransport(Settings settings, NetworkService networkServic this.pipeliningMaxEvents = SETTING_PIPELINING_MAX_EVENTS.get(settings); this.maxCompositeBufferComponents = SETTING_HTTP_NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS.get(settings); - this.workerCount = SETTING_HTTP_WORKER_COUNT.get(settings); this.readTimeoutMillis = Math.toIntExact(SETTING_HTTP_READ_TIMEOUT.get(settings).getMillis()); @@ -180,10 +178,10 @@ public Settings settings() { protected void doStart() { boolean success = false; try { + sharedGroup = sharedGroupFactory.getHttpGroup(); serverBootstrap = new ServerBootstrap(); - serverBootstrap.group(new NioEventLoopGroup(workerCount, daemonThreadFactory(settings, - HTTP_SERVER_WORKER_THREAD_NAME_PREFIX))); + serverBootstrap.group(sharedGroup.getLowLevelGroup()); // NettyAllocator will return the channel type designed to work with the configuredAllocator serverBootstrap.channel(NettyAllocator.getServerChannelType()); @@ -260,9 +258,9 @@ protected HttpServerChannel bind(InetSocketAddress socketAddress) throws Excepti @Override protected void stopInternal() { - if (serverBootstrap != null) { - serverBootstrap.config().group().shutdownGracefully(0, 5, TimeUnit.SECONDS).awaitUninterruptibly(); - serverBootstrap = null; + if (sharedGroup != null) { + sharedGroup.shutdown(); + sharedGroup = null; } } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/Netty4Plugin.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/Netty4Plugin.java index 0cf9a990d7e14..1428eb7b17113 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/Netty4Plugin.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/Netty4Plugin.java @@ -19,6 +19,7 @@ package org.elasticsearch.transport; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkModule; @@ -48,6 +49,8 @@ public class Netty4Plugin extends Plugin implements NetworkPlugin { public static final String NETTY_TRANSPORT_NAME = "netty4"; public static final String NETTY_HTTP_TRANSPORT_NAME = "netty4"; + private final SetOnce groupFactory = new SetOnce<>(); + @Override public List> getSettings() { return Arrays.asList( @@ -77,7 +80,7 @@ public Map> getTransports(Settings settings, ThreadP CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) { return Collections.singletonMap(NETTY_TRANSPORT_NAME, () -> new Netty4Transport(settings, Version.CURRENT, threadPool, - networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService)); + networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, getSharedGroupFactory(settings))); } @Override @@ -90,6 +93,17 @@ public Map> getHttpTransports(Settings set ClusterSettings clusterSettings) { return Collections.singletonMap(NETTY_HTTP_TRANSPORT_NAME, () -> new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher, - clusterSettings)); + clusterSettings, getSharedGroupFactory(settings))); + } + + private SharedGroupFactory getSharedGroupFactory(Settings settings) { + SharedGroupFactory groupFactory = this.groupFactory.get(); + if (groupFactory != null) { + assert groupFactory.getSettings().equals(settings) : "Different settings than originally provided"; + return groupFactory; + } else { + this.groupFactory.set(new SharedGroupFactory(settings)); + return this.groupFactory.get(); + } } } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/SharedGroupFactory.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/SharedGroupFactory.java new file mode 100644 index 0000000000000..04ea83bb5cc90 --- /dev/null +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/SharedGroupFactory.java @@ -0,0 +1,141 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport; + +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.util.concurrent.Future; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AbstractRefCounted; +import org.elasticsearch.http.HttpServerTransport; +import org.elasticsearch.http.netty4.Netty4HttpServerTransport; +import org.elasticsearch.transport.netty4.Netty4Transport; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory; + +/** + * Creates and returns {@link io.netty.channel.EventLoopGroup} instances. It will return a shared group for + * both {@link #getHttpGroup()} and {@link #getTransportGroup()} if + * {@link org.elasticsearch.http.netty4.Netty4HttpServerTransport#SETTING_HTTP_WORKER_COUNT} is configured to be 0. + * If that setting is not 0, then it will return a different group in the {@link #getHttpGroup()} call. + */ +public final class SharedGroupFactory { + + private static final Logger logger = LogManager.getLogger(SharedGroupFactory.class); + + private final Settings settings; + private final int workerCount; + private final int httpWorkerCount; + + private RefCountedGroup genericGroup; + private SharedGroup dedicatedHttpGroup; + + public SharedGroupFactory(Settings settings) { + this.settings = settings; + this.workerCount = Netty4Transport.WORKER_COUNT.get(settings); + this.httpWorkerCount = Netty4HttpServerTransport.SETTING_HTTP_WORKER_COUNT.get(settings); + } + + public Settings getSettings() { + return settings; + } + + public int getTransportWorkerCount() { + return workerCount; + } + + public synchronized SharedGroup getTransportGroup() { + return getGenericGroup(); + } + + public synchronized SharedGroup getHttpGroup() { + if (httpWorkerCount == 0) { + return getGenericGroup(); + } else { + if (dedicatedHttpGroup == null) { + NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(httpWorkerCount, + daemonThreadFactory(settings, HttpServerTransport.HTTP_SERVER_WORKER_THREAD_NAME_PREFIX)); + dedicatedHttpGroup = new SharedGroup(new RefCountedGroup(eventLoopGroup)); + } + return dedicatedHttpGroup; + } + } + + private SharedGroup getGenericGroup() { + if (genericGroup == null) { + EventLoopGroup eventLoopGroup = new NioEventLoopGroup(workerCount, + daemonThreadFactory(settings, TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX)); + this.genericGroup = new RefCountedGroup(eventLoopGroup); + } else { + genericGroup.incRef(); + } + return new SharedGroup(genericGroup); + } + + private static class RefCountedGroup extends AbstractRefCounted { + + public static final String NAME = "ref-counted-event-loop-group"; + private final EventLoopGroup eventLoopGroup; + + private RefCountedGroup(EventLoopGroup eventLoopGroup) { + super(NAME); + this.eventLoopGroup = eventLoopGroup; + } + + @Override + protected void closeInternal() { + Future shutdownFuture = eventLoopGroup.shutdownGracefully(0, 5, TimeUnit.SECONDS); + shutdownFuture.awaitUninterruptibly(); + if (shutdownFuture.isSuccess() == false) { + logger.warn("Error closing netty event loop group", shutdownFuture.cause()); + } + } + } + + /** + * Wraps the {@link RefCountedGroup}. Calls {@link RefCountedGroup#decRef()} on close. After close, + * this wrapped instance can no longer be used. + */ + public static class SharedGroup { + + private final RefCountedGroup refCountedGroup; + + private final AtomicBoolean isOpen = new AtomicBoolean(true); + + private SharedGroup(RefCountedGroup refCountedGroup) { + this.refCountedGroup = refCountedGroup; + } + + public EventLoopGroup getLowLevelGroup() { + return refCountedGroup.eventLoopGroup; + } + + public void shutdown() { + if (isOpen.compareAndSet(true, false)) { + refCountedGroup.decRef(); + } + } + } +} diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index 3cdb5a92134b7..6e1b2a5eb9b8e 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -16,7 +16,6 @@ * specific language governing permissions and limitations * under the License. */ - package org.elasticsearch.transport.netty4; import io.netty.bootstrap.Bootstrap; @@ -31,10 +30,8 @@ import io.netty.channel.ChannelOption; import io.netty.channel.FixedRecvByteBufAllocator; import io.netty.channel.RecvByteBufAllocator; -import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioChannelOption; import io.netty.util.AttributeKey; -import io.netty.util.concurrent.Future; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; @@ -56,6 +53,7 @@ import org.elasticsearch.core.internal.net.NetUtils; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.SharedGroupFactory; import org.elasticsearch.transport.NettyAllocator; import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.TransportSettings; @@ -64,13 +62,10 @@ import java.net.InetSocketAddress; import java.net.SocketOption; import java.util.Map; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; import static org.elasticsearch.common.settings.Setting.byteSizeSetting; import static org.elasticsearch.common.settings.Setting.intSetting; import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; -import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory; /** * There are 4 types of connections per node, low/med/high/ping. Low if for batch oriented APIs (like recovery or @@ -96,20 +91,20 @@ public class Netty4Transport extends TcpTransport { intSetting("transport.netty.boss_count", 1, 1, Property.NodeScope); + private final SharedGroupFactory sharedGroupFactory; private final RecvByteBufAllocator recvByteBufAllocator; - private final int workerCount; private final ByteSizeValue receivePredictorMin; private final ByteSizeValue receivePredictorMax; private final Map serverBootstraps = newConcurrentMap(); private volatile Bootstrap clientBootstrap; - private volatile NioEventLoopGroup eventLoopGroup; + private volatile SharedGroupFactory.SharedGroup sharedGroup; public Netty4Transport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService, PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry, - CircuitBreakerService circuitBreakerService) { + CircuitBreakerService circuitBreakerService, SharedGroupFactory sharedGroupFactory) { super(settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService); Netty4Utils.setAvailableProcessors(EsExecutors.NODE_PROCESSORS_SETTING.get(settings)); - this.workerCount = WORKER_COUNT.get(settings); + this.sharedGroupFactory = sharedGroupFactory; // See AdaptiveReceiveBufferSizePredictor#DEFAULT_XXX for default values in netty..., we can use higher ones for us, even fixed one this.receivePredictorMin = NETTY_RECEIVE_PREDICTOR_MIN.get(settings); @@ -126,12 +121,11 @@ public Netty4Transport(Settings settings, Version version, ThreadPool threadPool protected void doStart() { boolean success = false; try { - ThreadFactory threadFactory = daemonThreadFactory(settings, TRANSPORT_WORKER_THREAD_NAME_PREFIX); - eventLoopGroup = new NioEventLoopGroup(workerCount, threadFactory); - clientBootstrap = createClientBootstrap(eventLoopGroup); + sharedGroup = sharedGroupFactory.getTransportGroup(); + clientBootstrap = createClientBootstrap(sharedGroup); if (NetworkService.NETWORK_SERVER.get(settings)) { for (ProfileSettings profileSettings : profileSettings) { - createServerBootstrap(profileSettings, eventLoopGroup); + createServerBootstrap(profileSettings, sharedGroup); bindServer(profileSettings); } } @@ -144,9 +138,9 @@ protected void doStart() { } } - private Bootstrap createClientBootstrap(NioEventLoopGroup eventLoopGroup) { + private Bootstrap createClientBootstrap(SharedGroupFactory.SharedGroup sharedGroup) { final Bootstrap bootstrap = new Bootstrap(); - bootstrap.group(eventLoopGroup); + bootstrap.group(sharedGroup.getLowLevelGroup()); // NettyAllocator will return the channel type designed to work with the configured allocator bootstrap.channel(NettyAllocator.getChannelType()); @@ -196,17 +190,17 @@ private Bootstrap createClientBootstrap(NioEventLoopGroup eventLoopGroup) { return bootstrap; } - private void createServerBootstrap(ProfileSettings profileSettings, NioEventLoopGroup eventLoopGroup) { + private void createServerBootstrap(ProfileSettings profileSettings, SharedGroupFactory.SharedGroup sharedGroup) { String name = profileSettings.profileName; if (logger.isDebugEnabled()) { logger.debug("using profile[{}], worker_count[{}], port[{}], bind_host[{}], publish_host[{}], receive_predictor[{}->{}]", - name, workerCount, profileSettings.portOrRange, profileSettings.bindHosts, profileSettings.publishHosts, - receivePredictorMin, receivePredictorMax); + name, sharedGroupFactory.getTransportWorkerCount(), profileSettings.portOrRange, profileSettings.bindHosts, + profileSettings.publishHosts, receivePredictorMin, receivePredictorMax); } final ServerBootstrap serverBootstrap = new ServerBootstrap(); - serverBootstrap.group(eventLoopGroup); + serverBootstrap.group(sharedGroup.getLowLevelGroup()); // NettyAllocator will return the channel type designed to work with the configuredAllocator serverBootstrap.channel(NettyAllocator.getServerChannelType()); @@ -307,12 +301,8 @@ protected Netty4TcpServerChannel bind(String name, InetSocketAddress address) { @SuppressForbidden(reason = "debug") protected void stopInternal() { Releasables.close(() -> { - if (eventLoopGroup != null) { - Future shutdownFuture = eventLoopGroup.shutdownGracefully(0, 5, TimeUnit.SECONDS); - shutdownFuture.awaitUninterruptibly(); - if (shutdownFuture.isSuccess() == false) { - logger.warn("Error closing netty event loop group", shutdownFuture.cause()); - } + if (sharedGroup != null) { + sharedGroup.shutdown(); } }, serverBootstraps::clear, () -> clientBootstrap = null); } diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4BadRequestTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4BadRequestTests.java index 34ea7fb8d438f..271afbed5d768 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4BadRequestTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4BadRequestTests.java @@ -38,6 +38,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.SharedGroupFactory; import org.junit.After; import org.junit.Before; @@ -87,7 +88,8 @@ public void dispatchBadRequest(RestChannel channel, ThreadContext threadContext, }; try (HttpServerTransport httpServerTransport = new Netty4HttpServerTransport(Settings.EMPTY, networkService, bigArrays, threadPool, - xContentRegistry(), dispatcher, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))) { + xContentRegistry(), dispatcher, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + new SharedGroupFactory(Settings.EMPTY))) { httpServerTransport.start(); final TransportAddress transportAddress = randomFrom(httpServerTransport.boundAddress().boundAddresses()); diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerPipeliningTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerPipeliningTests.java index 017ce9798b7ef..a81415d7165ad 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerPipeliningTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerPipeliningTests.java @@ -45,6 +45,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.SharedGroupFactory; import org.junit.After; import org.junit.Before; @@ -119,7 +120,8 @@ class CustomNettyHttpServerTransport extends Netty4HttpServerTransport { Netty4HttpServerPipeliningTests.this.networkService, Netty4HttpServerPipeliningTests.this.bigArrays, Netty4HttpServerPipeliningTests.this.threadPool, - xContentRegistry(), new NullDispatcher(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); + xContentRegistry(), new NullDispatcher(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + new SharedGroupFactory(settings)); } @Override diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java index c5ff905b14465..71ed191322b72 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java @@ -64,6 +64,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.SharedGroupFactory; import org.elasticsearch.transport.NettyAllocator; import org.junit.After; import org.junit.Before; @@ -164,7 +165,7 @@ public void dispatchBadRequest(RestChannel channel, ThreadContext threadContext, } }; try (Netty4HttpServerTransport transport = new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, - xContentRegistry(), dispatcher, clusterSettings)) { + xContentRegistry(), dispatcher, clusterSettings, new SharedGroupFactory(settings))) { transport.start(); final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); try (Netty4HttpClient client = new Netty4HttpClient()) { @@ -197,7 +198,7 @@ public void dispatchBadRequest(RestChannel channel, ThreadContext threadContext, public void testBindUnavailableAddress() { try (Netty4HttpServerTransport transport = new Netty4HttpServerTransport(Settings.EMPTY, networkService, bigArrays, threadPool, - xContentRegistry(), new NullDispatcher(), clusterSettings)) { + xContentRegistry(), new NullDispatcher(), clusterSettings, new SharedGroupFactory(Settings.EMPTY))) { transport.start(); TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); Settings settings = Settings.builder() @@ -205,7 +206,7 @@ public void testBindUnavailableAddress() { .put("network.host", remoteAddress.getAddress()) .build(); try (Netty4HttpServerTransport otherTransport = new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, - xContentRegistry(), new NullDispatcher(), clusterSettings)) { + xContentRegistry(), new NullDispatcher(), clusterSettings, new SharedGroupFactory(settings))) { BindHttpException bindHttpException = expectThrows(BindHttpException.class, otherTransport::start); assertEquals( "Failed to bind to " + NetworkAddress.format(remoteAddress.address()), @@ -249,7 +250,8 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th } try (Netty4HttpServerTransport transport = new Netty4HttpServerTransport( - settings, networkService, bigArrays, threadPool, xContentRegistry(), dispatcher, clusterSettings)) { + settings, networkService, bigArrays, threadPool, xContentRegistry(), dispatcher, clusterSettings, + new SharedGroupFactory(settings))) { transport.start(); final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); @@ -295,7 +297,8 @@ public void dispatchBadRequest(final RestChannel channel, .put(SETTING_CORS_ALLOW_ORIGIN.getKey(), "elastic.co").build(); try (Netty4HttpServerTransport transport = new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, - xContentRegistry(), dispatcher, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))) { + xContentRegistry(), dispatcher, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + new SharedGroupFactory(settings))) { transport.start(); final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); @@ -354,7 +357,8 @@ public void dispatchBadRequest(final RestChannel channel, NioEventLoopGroup group = new NioEventLoopGroup(); try (Netty4HttpServerTransport transport = new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, - xContentRegistry(), dispatcher, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))) { + xContentRegistry(), dispatcher, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + new SharedGroupFactory(settings))) { transport.start(); final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/SharedGroupFactoryTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/SharedGroupFactoryTests.java new file mode 100644 index 0000000000000..2728f5db27cff --- /dev/null +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/SharedGroupFactoryTests.java @@ -0,0 +1,74 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.http.netty4.Netty4HttpServerTransport; +import org.elasticsearch.test.ESTestCase; + +public final class SharedGroupFactoryTests extends ESTestCase { + + public void testSharedEventLoops() throws Exception { + SharedGroupFactory sharedGroupFactory = new SharedGroupFactory(Settings.EMPTY); + + SharedGroupFactory.SharedGroup httpGroup = sharedGroupFactory.getHttpGroup(); + SharedGroupFactory.SharedGroup transportGroup = sharedGroupFactory.getTransportGroup(); + + try { + assertSame(httpGroup.getLowLevelGroup(), transportGroup.getLowLevelGroup()); + } finally { + httpGroup.shutdown(); + assertFalse(httpGroup.getLowLevelGroup().isShuttingDown()); + assertFalse(transportGroup.getLowLevelGroup().isShuttingDown()); + assertFalse(transportGroup.getLowLevelGroup().isTerminated()); + assertFalse(transportGroup.getLowLevelGroup().terminationFuture().isDone()); + transportGroup.shutdown(); + assertTrue(httpGroup.getLowLevelGroup().isShuttingDown()); + assertTrue(transportGroup.getLowLevelGroup().isShuttingDown()); + assertTrue(transportGroup.getLowLevelGroup().isTerminated()); + assertTrue(transportGroup.getLowLevelGroup().terminationFuture().isDone()); + } + } + + public void testNonSharedEventLoops() throws Exception { + Settings settings = Settings.builder() + .put(Netty4HttpServerTransport.SETTING_HTTP_WORKER_COUNT.getKey(), randomIntBetween(1, 10)) + .build(); + SharedGroupFactory sharedGroupFactory = new SharedGroupFactory(settings); + SharedGroupFactory.SharedGroup httpGroup = sharedGroupFactory.getHttpGroup(); + SharedGroupFactory.SharedGroup transportGroup = sharedGroupFactory.getTransportGroup(); + + try { + assertNotSame(httpGroup.getLowLevelGroup(), transportGroup.getLowLevelGroup()); + } finally { + httpGroup.shutdown(); + assertTrue(httpGroup.getLowLevelGroup().isShuttingDown()); + assertTrue(httpGroup.getLowLevelGroup().isTerminated()); + assertTrue(httpGroup.getLowLevelGroup().terminationFuture().isDone()); + assertFalse(transportGroup.getLowLevelGroup().isShuttingDown()); + assertFalse(transportGroup.getLowLevelGroup().isTerminated()); + assertFalse(transportGroup.getLowLevelGroup().terminationFuture().isDone()); + transportGroup.shutdown(); + assertTrue(transportGroup.getLowLevelGroup().isShuttingDown()); + assertTrue(transportGroup.getLowLevelGroup().isTerminated()); + assertTrue(transportGroup.getLowLevelGroup().terminationFuture().isDone()); + } + } +} diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4SizeHeaderFrameDecoderTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4SizeHeaderFrameDecoderTests.java index 0e90559bd51b0..d30f383c467a2 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4SizeHeaderFrameDecoderTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4SizeHeaderFrameDecoderTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.mocksocket.MockSocket; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.SharedGroupFactory; import org.elasticsearch.transport.TransportSettings; import org.junit.After; import org.junit.Before; @@ -66,7 +67,7 @@ public void startThreadPool() { NetworkService networkService = new NetworkService(Collections.emptyList()); PageCacheRecycler recycler = new MockPageCacheRecycler(Settings.EMPTY); nettyTransport = new Netty4Transport(settings, Version.CURRENT, threadPool, networkService, recycler, - new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService()); + new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService(), new SharedGroupFactory(settings)); nettyTransport.start(); TransportAddress[] boundAddresses = nettyTransport.boundAddress().boundAddresses(); diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/NettyTransportMultiPortTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/NettyTransportMultiPortTests.java index 5d3e897202cbc..c12efe7a4fefe 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/NettyTransportMultiPortTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/NettyTransportMultiPortTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.SharedGroupFactory; import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.TransportSettings; import org.junit.Before; @@ -120,7 +121,8 @@ public void testThatDefaultProfilePortOverridesGeneralConfiguration() throws Exc private TcpTransport startTransport(Settings settings, ThreadPool threadPool) { PageCacheRecycler recycler = new MockPageCacheRecycler(Settings.EMPTY); TcpTransport transport = new Netty4Transport(settings, Version.CURRENT, threadPool, new NetworkService(Collections.emptyList()), - recycler, new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService()); + recycler, new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService(), + new SharedGroupFactory(settings)); transport.start(); assertThat(transport.lifecycleState(), is(Lifecycle.State.STARTED)); diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java index 068018a456e8f..7fa237737d1d1 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java @@ -32,6 +32,7 @@ import org.elasticsearch.transport.AbstractSimpleTransportTestCase; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.ConnectionProfile; +import org.elasticsearch.transport.SharedGroupFactory; import org.elasticsearch.transport.TcpChannel; import org.elasticsearch.transport.Transport; @@ -49,7 +50,8 @@ public class SimpleNetty4TransportTests extends AbstractSimpleTransportTestCase protected Transport build(Settings settings, final Version version, ClusterSettings clusterSettings, boolean doHandshake) { NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList()); return new Netty4Transport(settings, version, threadPool, new NetworkService(Collections.emptyList()), - PageCacheRecycler.NON_RECYCLING_INSTANCE, namedWriteableRegistry, new NoneCircuitBreakerService()) { + PageCacheRecycler.NON_RECYCLING_INSTANCE, namedWriteableRegistry, new NoneCircuitBreakerService(), + new SharedGroupFactory(settings)) { @Override public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionProfile profile, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java index 89879d746384c..c15e19e52001f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.core; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.Version; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionType; @@ -35,6 +36,7 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.SharedGroupFactory; import org.elasticsearch.transport.Transport; import org.elasticsearch.xpack.core.action.XPackInfoAction; import org.elasticsearch.xpack.core.action.XPackUsageAction; @@ -282,6 +284,7 @@ protected Optional getFeature() { } private final Settings settings; + private final SetOnce sharedGroupFactory = new SetOnce<>(); public XPackClientPlugin(final Settings settings) { this.settings = settings; @@ -692,7 +695,17 @@ public Map> getTransports( throw new RuntimeException(e); } return Collections.singletonMap(SecurityField.NAME4, () -> new SecurityNetty4Transport(settings, Version.CURRENT, threadPool, - networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, sslService)); + networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, sslService, + getNettySharedGroupFactory(settings))); } + private synchronized SharedGroupFactory getNettySharedGroupFactory(Settings settings) { + if (sharedGroupFactory.get() != null) { + assert sharedGroupFactory.get().getSettings().equals(settings) : "Different settings than originally provided"; + return sharedGroupFactory.get(); + } else { + sharedGroupFactory.set(new SharedGroupFactory(settings)); + return sharedGroupFactory.get(); + } + } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/transport/netty4/SecurityNetty4Transport.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/transport/netty4/SecurityNetty4Transport.java index 624b90125b0db..0c47a79a5b253 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/transport/netty4/SecurityNetty4Transport.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/transport/netty4/SecurityNetty4Transport.java @@ -22,6 +22,7 @@ import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ConnectTransportException; +import org.elasticsearch.transport.SharedGroupFactory; import org.elasticsearch.transport.TcpChannel; import org.elasticsearch.transport.netty4.Netty4Transport; import org.elasticsearch.xpack.core.XPackSettings; @@ -61,8 +62,10 @@ public SecurityNetty4Transport( final PageCacheRecycler pageCacheRecycler, final NamedWriteableRegistry namedWriteableRegistry, final CircuitBreakerService circuitBreakerService, - final SSLService sslService) { - super(settings, version, threadPool, networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService); + final SSLService sslService, + final SharedGroupFactory sharedGroupFactory) { + super(settings, version, threadPool, networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, + sharedGroupFactory); this.exceptionHandler = new SecurityTransportExceptionHandler(logger, lifecycle, (c, e) -> super.onException(c, e)); this.sslService = sslService; this.sslEnabled = XPackSettings.TRANSPORT_SSL_ENABLED.get(settings); diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java index 47b852022c379..2d207630f2871 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java @@ -67,6 +67,7 @@ import org.elasticsearch.threadpool.ExecutorBuilder; import org.elasticsearch.threadpool.FixedExecutorBuilder; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.SharedGroupFactory; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportInterceptor; import org.elasticsearch.transport.TransportRequest; @@ -300,7 +301,8 @@ public class Security extends Plugin implements SystemIndexPlugin, IngestPlugin, private final SetOnce tokenService = new SetOnce<>(); private final SetOnce securityActionFilter = new SetOnce<>(); private final SetOnce securityIndex = new SetOnce<>(); - private final SetOnce groupFactory = new SetOnce<>(); + private final SetOnce sharedGroupFactory = new SetOnce<>(); + private final SetOnce nioGroupFactory = new SetOnce<>(); private final SetOnce dlsBitsetCache = new SetOnce<>(); private final SetOnce> bootstrapChecks = new SetOnce<>(); private final List securityExtensions = new ArrayList<>(); @@ -966,7 +968,8 @@ public Map> getTransports(Settings settings, ThreadP IPFilter ipFilter = this.ipFilter.get(); Map> transports = new HashMap<>(); transports.put(SecurityField.NAME4, () -> new SecurityNetty4ServerTransport(settings, Version.CURRENT, threadPool, - networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, ipFilter, getSslService())); + networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, ipFilter, getSslService(), + getNettySharedGroupFactory(settings))); transports.put(SecurityField.NIO, () -> new SecurityNioTransport(settings, Version.CURRENT, threadPool, networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, ipFilter, getSslService(), getNioGroupFactory(settings))); @@ -987,7 +990,8 @@ public Map> getHttpTransports(Settings set Map> httpTransports = new HashMap<>(); httpTransports.put(SecurityField.NAME4, () -> new SecurityNetty4HttpServerTransport(settings, networkService, bigArrays, - ipFilter.get(), getSslService(), threadPool, xContentRegistry, dispatcher, clusterSettings)); + ipFilter.get(), getSslService(), threadPool, xContentRegistry, dispatcher, clusterSettings, + getNettySharedGroupFactory(settings))); httpTransports.put(SecurityField.NIO, () -> new SecurityNioHttpServerTransport(settings, networkService, bigArrays, pageCacheRecycler, threadPool, xContentRegistry, dispatcher, ipFilter.get(), getSslService(), getNioGroupFactory(settings), clusterSettings)); @@ -1113,14 +1117,23 @@ public void reloadSPI(ClassLoader loader) { } private synchronized NioGroupFactory getNioGroupFactory(Settings settings) { - if (groupFactory.get() != null) { - assert groupFactory.get().getSettings().equals(settings) : "Different settings than originally provided"; - return groupFactory.get(); + if (nioGroupFactory.get() != null) { + assert nioGroupFactory.get().getSettings().equals(settings) : "Different settings than originally provided"; + return nioGroupFactory.get(); } else { - groupFactory.set(new NioGroupFactory(settings, logger)); - return groupFactory.get(); + nioGroupFactory.set(new NioGroupFactory(settings, logger)); + return nioGroupFactory.get(); } } + private synchronized SharedGroupFactory getNettySharedGroupFactory(Settings settings) { + if (sharedGroupFactory.get() != null) { + assert sharedGroupFactory.get().getSettings().equals(settings) : "Different settings than originally provided"; + return sharedGroupFactory.get(); + } else { + sharedGroupFactory.set(new SharedGroupFactory(settings)); + return sharedGroupFactory.get(); + } + } @Override public Collection getSystemIndexDescriptors(Settings settings) { diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransport.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransport.java index 9dc7316aeeea2..e3207605658f0 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransport.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransport.java @@ -18,6 +18,7 @@ import org.elasticsearch.http.HttpChannel; import org.elasticsearch.http.netty4.Netty4HttpServerTransport; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.SharedGroupFactory; import org.elasticsearch.xpack.core.ssl.SSLConfiguration; import org.elasticsearch.xpack.core.ssl.SSLService; import org.elasticsearch.xpack.security.transport.SecurityHttpExceptionHandler; @@ -37,8 +38,9 @@ public class SecurityNetty4HttpServerTransport extends Netty4HttpServerTransport public SecurityNetty4HttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, IPFilter ipFilter, SSLService sslService, ThreadPool threadPool, NamedXContentRegistry xContentRegistry, - Dispatcher dispatcher, ClusterSettings clusterSettings) { - super(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher, clusterSettings); + Dispatcher dispatcher, ClusterSettings clusterSettings, + SharedGroupFactory sharedGroupFactory) { + super(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher, clusterSettings, sharedGroupFactory); this.securityExceptionHandler = new SecurityHttpExceptionHandler(logger, lifecycle, (c, e) -> super.onException(c, e)); this.ipFilter = ipFilter; final boolean ssl = HTTP_SSL_ENABLED.get(settings); diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4ServerTransport.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4ServerTransport.java index 8cb1085d3aace..6088a03ef6db0 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4ServerTransport.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4ServerTransport.java @@ -15,6 +15,7 @@ import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.SharedGroupFactory; import org.elasticsearch.xpack.core.security.transport.netty4.SecurityNetty4Transport; import org.elasticsearch.xpack.core.ssl.SSLConfiguration; import org.elasticsearch.xpack.core.ssl.SSLService; @@ -33,8 +34,10 @@ public SecurityNetty4ServerTransport( final NamedWriteableRegistry namedWriteableRegistry, final CircuitBreakerService circuitBreakerService, @Nullable final IPFilter authenticator, - final SSLService sslService) { - super(settings, version, threadPool, networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, sslService); + final SSLService sslService, + final SharedGroupFactory sharedGroupFactory) { + super(settings, version, threadPool, networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, sslService, + sharedGroupFactory); this.authenticator = authenticator; } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransportTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransportTests.java index ccbb2537f7d74..e772d56c7811f 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransportTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransportTests.java @@ -18,6 +18,7 @@ import org.elasticsearch.http.NullDispatcher; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.SharedGroupFactory; import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.ssl.SSLClientAuth; import org.elasticsearch.xpack.core.ssl.SSLService; @@ -71,7 +72,7 @@ public void testDefaultClientAuth() throws Exception { SecurityNetty4HttpServerTransport transport = new SecurityNetty4HttpServerTransport(settings, new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(IPFilter.class), sslService, mock(ThreadPool.class), xContentRegistry(), new NullDispatcher(), - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), new SharedGroupFactory(settings)); ChannelHandler handler = transport.configureServerChannelHandler(); final EmbeddedChannel ch = new EmbeddedChannel(handler); assertThat(ch.pipeline().get(SslHandler.class).engine().getNeedClientAuth(), is(false)); @@ -88,7 +89,7 @@ public void testOptionalClientAuth() throws Exception { SecurityNetty4HttpServerTransport transport = new SecurityNetty4HttpServerTransport(settings, new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(IPFilter.class), sslService, mock(ThreadPool.class), xContentRegistry(), new NullDispatcher(), - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), new SharedGroupFactory(settings)); ChannelHandler handler = transport.configureServerChannelHandler(); final EmbeddedChannel ch = new EmbeddedChannel(handler); assertThat(ch.pipeline().get(SslHandler.class).engine().getNeedClientAuth(), is(false)); @@ -105,7 +106,7 @@ public void testRequiredClientAuth() throws Exception { SecurityNetty4HttpServerTransport transport = new SecurityNetty4HttpServerTransport(settings, new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(IPFilter.class), sslService, mock(ThreadPool.class), xContentRegistry(), new NullDispatcher(), - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), new SharedGroupFactory(settings)); ChannelHandler handler = transport.configureServerChannelHandler(); final EmbeddedChannel ch = new EmbeddedChannel(handler); assertThat(ch.pipeline().get(SslHandler.class).engine().getNeedClientAuth(), is(true)); @@ -122,7 +123,7 @@ public void testNoClientAuth() throws Exception { SecurityNetty4HttpServerTransport transport = new SecurityNetty4HttpServerTransport(settings, new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(IPFilter.class), sslService, mock(ThreadPool.class), xContentRegistry(), new NullDispatcher(), - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), new SharedGroupFactory(settings)); ChannelHandler handler = transport.configureServerChannelHandler(); final EmbeddedChannel ch = new EmbeddedChannel(handler); assertThat(ch.pipeline().get(SslHandler.class).engine().getNeedClientAuth(), is(false)); @@ -137,7 +138,7 @@ public void testCustomSSLConfiguration() throws Exception { SecurityNetty4HttpServerTransport transport = new SecurityNetty4HttpServerTransport(settings, new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(IPFilter.class), sslService, mock(ThreadPool.class), xContentRegistry(), new NullDispatcher(), - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), new SharedGroupFactory(settings)); ChannelHandler handler = transport.configureServerChannelHandler(); EmbeddedChannel ch = new EmbeddedChannel(handler); SSLEngine defaultEngine = ch.pipeline().get(SslHandler.class).engine(); @@ -150,7 +151,7 @@ public void testCustomSSLConfiguration() throws Exception { sslService = new SSLService(settings, TestEnvironment.newEnvironment(settings)); transport = new SecurityNetty4HttpServerTransport(settings, new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(IPFilter.class), sslService, mock(ThreadPool.class), xContentRegistry(), new NullDispatcher(), - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), new SharedGroupFactory(settings)); handler = transport.configureServerChannelHandler(); ch = new EmbeddedChannel(handler); SSLEngine customEngine = ch.pipeline().get(SslHandler.class).engine(); @@ -177,7 +178,7 @@ public void testNoExceptionWhenConfiguredWithoutSslKeySSLDisabled() throws Excep SecurityNetty4HttpServerTransport transport = new SecurityNetty4HttpServerTransport(settings, new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(IPFilter.class), sslService, mock(ThreadPool.class), xContentRegistry(), new NullDispatcher(), - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), new SharedGroupFactory(settings)); assertNotNull(transport.configureServerChannelHandler()); } } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SimpleSecurityNetty4ServerTransportTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SimpleSecurityNetty4ServerTransportTests.java index f2fea6f5c7eaf..4df0d39cb509d 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SimpleSecurityNetty4ServerTransportTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SimpleSecurityNetty4ServerTransportTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.transport.ConnectionProfile; +import org.elasticsearch.transport.SharedGroupFactory; import org.elasticsearch.transport.TcpChannel; import org.elasticsearch.transport.Transport; import org.elasticsearch.xpack.security.transport.AbstractSimpleSecurityTransportTestCase; @@ -32,7 +33,7 @@ protected Transport build(Settings settings, final Version version, ClusterSetti .put("xpack.security.transport.ssl.enabled", true).build(); return new SecurityNetty4ServerTransport(settings1, version, threadPool, networkService, PageCacheRecycler.NON_RECYCLING_INSTANCE, namedWriteableRegistry, - new NoneCircuitBreakerService(), null, createSSLService(settings1)) { + new NoneCircuitBreakerService(), null, createSSLService(settings1), new SharedGroupFactory(settings1)) { @Override public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionProfile profile,