Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Optimization]: During reroute async fetch data in GatewayAllocator, send request in generic threadpool instead of masterService#updateTask #57498

Closed
shwetathareja opened this issue Jun 2, 2020 · 6 comments
Assignees
Labels
:Distributed/Network Http and internode communication implementations >enhancement Team:Distributed Meta label for distributed team

Comments

@shwetathareja
Copy link

Elasticsearch version (bin/elasticsearch --version): 7.1

Description of the problem including expected versus actual behavior:

This cluster had roughly ~ 50K shards and multiple nodes were disconnected from master and during hot_threads dump observed that master#updateTask thread was spending lot of time sending these TransportNodesListGatewayStartedShards requests. These are async calls and response handling is done in separate threadpool but transport sendRequest seems to be still executing in master#updateTask. The sending logic could also move to separate threadpool.

Though with this #42855 this wont happen in the context of JoinExecutor but when reroute task is scheduled, it will happen.

Provide logs (if relevant):

80.1% (8s out of 10s) cpu usage by thread 'elasticsearch[7b5db5][masterService#updateTask][T#1]'
     7/10 snapshots sharing following 54 elements
       sun.nio.ch.EPollArrayWrapper.interrupt(Native Method)
       sun.nio.ch.EPollArrayWrapper.interrupt(EPollArrayWrapper.java:317)
       sun.nio.ch.EPollSelectorImpl.wakeup(EPollSelectorImpl.java:207)
       io.netty.channel.nio.NioEventLoop.wakeup(NioEventLoop.java:719)
       io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:799)
       io.netty.channel.AbstractChannelHandlerContext.safeExecute(AbstractChannelHandlerContext.java:1013)
       io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:825)
       io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:794)
       io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1066)
       io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:309)
       org.elasticsearch.transport.netty4.Netty4TcpChannel.sendMessage(Netty4TcpChannel.java:139)
       org.elasticsearch.transport.OutboundHandler.internalSendMessage(OutboundHandler.java:80)
       org.elasticsearch.transport.OutboundHandler.sendMessage(OutboundHandler.java:70)
       org.elasticsearch.transport.TcpTransport.sendRequestToChannel(TcpTransport.java:680)
       org.elasticsearch.transport.TcpTransport.sendRequestToChannel(TcpTransport.java:669)
       org.elasticsearch.transport.TcpTransport.access$300(TcpTransport.java:100)
       org.elasticsearch.transport.TcpTransport$NodeChannels.sendRequest(TcpTransport.java:272)
       org.elasticsearch.transport.TransportService.sendRequestInternal(TransportService.java:633)
       org.elasticsearch.transport.TransportService$$Lambda$1519/1694636980.sendRequest(Unknown Source)
       org.elasticsearch.transport.TransportService.sendRequest(TransportService.java:543)
       org.elasticsearch.transport.TransportService.sendRequest(TransportService.java:531)
       org.elasticsearch.action.support.nodes.TransportNodesAction$AsyncAction.start(TransportNodesAction.java:182)
       org.elasticsearch.action.support.nodes.TransportNodesAction.doExecute(TransportNodesAction.java:82)
       org.elasticsearch.action.support.nodes.TransportNodesAction.doExecute(TransportNodesAction.java:51)
       org.elasticsearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:145)
       com.amazon.opendistro.elasticsearch.performanceanalyzer.action.PerformanceAnalyzerActionFilter.apply(PerformanceAnalyzerActionFilter.java:77)
       org.elasticsearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:143)
       org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:121)
       org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:64)
       org.elasticsearch.gateway.TransportNodesListGatewayStartedShards.list(TransportNodesListGatewayStartedShards.java:91)
       org.elasticsearch.gateway.AsyncShardFetch.asyncFetch(AsyncShardFetch.java:283)
       org.elasticsearch.gateway.AsyncShardFetch.fetchData(AsyncShardFetch.java:126)
       org.elasticsearch.gateway.GatewayAllocator$InternalPrimaryShardAllocator.fetchData(GatewayAllocator.java:159)
       org.elasticsearch.gateway.PrimaryShardAllocator.makeAllocationDecision(PrimaryShardAllocator.java:86)
       org.elasticsearch.gateway.BaseGatewayShardAllocator.allocateUnassigned(BaseGatewayShardAllocator.java:59)
       org.elasticsearch.gateway.GatewayAllocator.innerAllocatedUnassigned(GatewayAllocator.java:114)
       org.elasticsearch.gateway.GatewayAllocator.allocateUnassigned(GatewayAllocator.java:104)
       org.elasticsearch.cluster.routing.allocation.AllocationService.reroute(AllocationService.java:410)
       org.elasticsearch.cluster.routing.allocation.AllocationService.reroute(AllocationService.java:378)
       org.elasticsearch.cluster.routing.allocation.AllocationService.reroute(AllocationService.java:361)
       org.elasticsearch.cluster.coordination.JoinTaskExecutor.execute(JoinTaskExecutor.java:155)
       org.elasticsearch.cluster.coordination.JoinHelper$1.execute(JoinHelper.java:118)
       org.elasticsearch.cluster.service.MasterService.executeTasks(MasterService.java:687)
       org.elasticsearch.cluster.service.MasterService.calculateTaskOutputs(MasterService.java:310)
       org.elasticsearch.cluster.service.MasterService.runTasks(MasterService.java:210)
       org.elasticsearch.cluster.service.MasterService$Batcher.run(MasterService.java:142)
       org.elasticsearch.cluster.service.TaskBatcher.runIfNotProcessed(TaskBatcher.java:150)
       org.elasticsearch.cluster.service.TaskBatcher$BatchedTask.run(TaskBatcher.java:188)
       org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:690)
       org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:252)
       org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:215)
       java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
       java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
       java.lang.Thread.run(Thread.java:748)
@shwetathareja shwetathareja added >bug needs:triage Requires assignment of a team area label labels Jun 2, 2020
@original-brownbear original-brownbear self-assigned this Jun 2, 2020
@original-brownbear original-brownbear added :Distributed/Network Http and internode communication implementations and removed needs:triage Requires assignment of a team area label labels Jun 2, 2020
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-distributed (:Distributed/Network)

@elasticmachine elasticmachine added the Team:Distributed Meta label for distributed team label Jun 2, 2020
@original-brownbear
Copy link
Member

Talked about this with David and I think we can improve the way we use the networking layer instead of working around this issue by forking off to a different thread-pool to then have that hit the flush (that's where the wake-ups are coming from) API in a loop.

I'll look into creating a PR that optimises this and other spots where we might be sending a large number of requests to channels on the same selector in a loop.

@shwetathareja
Copy link
Author

Thanks @original-brownbear for looking into it. Can you elaborate a bit on how are you planning to optimize the use of networking layer?

Also, I was wondering that these async fetch calls are made per unassigned shard per node. These could be batched per node as well to reduce the no. of calls over the transport layer from master to data nodes?

@original-brownbear
Copy link
Member

Can you elaborate a bit on how are you planning to optimize the use of networking layer?

Sure, it's fairly straight forward. Currently, whenever we write a message to the transport layer we use channel.writeAndFlush. If this is call from outside the Netty event loop it will often lead to a sys call to wake up the selector (that's what you observed as being so costly). If we were to just use the simple channel.write method and later flush (after we know we wrote a batch of messages) we wouldn't be triggering these wake-ups.

These could be batched per node as well to reduce the no. of calls over the transport layer from master to data nodes?

True, I think we could make protocol improvements/changes there. Since we identified a number of spots where improving the behaviour on the network layer would be beneficial and it's somewhat involved to introduce batching into these messages on the protocol level I think I think the network layer improvements are what I would focus on for now.

@original-brownbear
Copy link
Member

I looked into the details of what a fix for this would look like in ES and found that Netty recently made some improvements to the way it avoids unnecessary wake-ups in netty/netty#9799 (fixed in a Netty version that isn't used in 7.1).
Also changes #39286, #57084 and should cause the event loop to do a little more work when sending messages and leave it stuck in .select less often.
Furthermore #56488 and #46346 reduced the number of IO threads we use by a factor of 4, reducing the likelihood of running into a thread that is stuck on .select and hence must be woken up significantly.
(this may be very relevant in practice)

=> It's hard to tell from theory alone by how much the situation has already improved qualitatively but I would expect to see a non-trivial improvement relative to 7.1 with a high likelihood.

I still think this is worth improving on our end if we can, but there is a good chance you'll have a better experience with more recent (and upcoming) ES versions.

@original-brownbear
Copy link
Member

I profiled this exact spot a lot lately when investigating slowness on the IO thread and I don't think this is much of an issue any more with the changes mentioned above. I don't think there's reasonable cause to improve batching of sending transport messages in the near future => closing this, it should be have much better in newer ES versions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Distributed/Network Http and internode communication implementations >enhancement Team:Distributed Meta label for distributed team
Projects
None yet
Development

No branches or pull requests

3 participants