Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RejectedExecutionException is thrown when there is burst of traffic #444

Open
AleksanderBrzozowski opened this issue Oct 11, 2023 · 1 comment
Assignees

Comments

@AleksanderBrzozowski
Copy link

Hi, we have a gRPC server built with grpc-kotlin library and kotlin coroutines. The service exposes a one method that underneath calls Redis database.

We decided to override default executor for gRPC server (the caching thread pool with unbounded size) with a fixed thread pool (16 cores + queue with max capacity = 1000).

During burst of traffic, and when the JVM instance is cold (newly started instance), we observe such errors:

Exception processing message
java.util.concurrent.RejectedExecutionException: Task io.grpc.internal.SerializingExecutor@3b22e3e0 rejected from java.util.concurrent.ThreadPoolExecutor@76d37f1e[Running, pool size = 16, active threads = 2, queued tasks = 1000, completed tasks = 126014]
	at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2065)
	at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:833)
	at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1365)
	at io.grpc.internal.SerializingExecutor.schedule(SerializingExecutor.java:102)
	at io.grpc.internal.SerializingExecutor.execute(SerializingExecutor.java:95)
	at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener.messagesAvailable(ServerImpl.java:841)
	at io.grpc.internal.AbstractStream$TransportState.messagesAvailable(AbstractStream.java:183)
	at io.grpc.internal.MessageDeframer.processBody(MessageDeframer.java:413)
	at io.grpc.internal.MessageDeframer.deliver(MessageDeframer.java:276)
	at io.grpc.internal.MessageDeframer.request(MessageDeframer.java:162)
	at io.grpc.internal.AbstractStream$TransportState$1RequestRunnable.run(AbstractStream.java:233)
	at io.grpc.netty.shaded.io.grpc.netty.NettyServerStream$TransportState$1.run(NettyServerStream.java:188)
	at io.grpc.netty.shaded.io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
	at io.grpc.netty.shaded.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
	at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
	at io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:403)
	at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:833)
Exception in onHeadersRead() ```txt java.util.concurrent.RejectedExecutionException: Task io.grpc.internal.SerializingExecutor@62255b0f rejected from java.util.concurrent.ThreadPoolExecutor@76d37f1e[Running, pool size = 16, active threads = 1, queued tasks = 1000, completed tasks = 126145] at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2065) at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:833) at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1365) at io.grpc.internal.SerializingExecutor.schedule(SerializingExecutor.java:102) at io.grpc.internal.SerializingExecutor.execute(SerializingExecutor.java:95) at io.grpc.internal.ServerImpl$ServerTransportListenerImpl.streamCreatedInternal(ServerImpl.java:641) at io.grpc.internal.ServerImpl$ServerTransportListenerImpl.streamCreated(ServerImpl.java:467) at io.grpc.netty.shaded.io.grpc.netty.NettyServerHandler.onHeadersRead(NettyServerHandler.java:487) at io.grpc.netty.shaded.io.grpc.netty.NettyServerHandler.access$1000(NettyServerHandler.java:111) at io.grpc.netty.shaded.io.grpc.netty.NettyServerHandler$FrameListener.onHeadersRead(NettyServerHandler.java:856) at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onHeadersRead(DefaultHttp2ConnectionDecoder.java:409) at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onHeadersRead(DefaultHttp2ConnectionDecoder.java:337) at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2InboundFrameLogger$1.onHeadersRead(Http2InboundFrameLogger.java:56) at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader$2.processFragment(DefaultHttp2FrameReader.java:476) at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readHeadersFrame(DefaultHttp2FrameReader.java:484) at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:253) at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:159) at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2InboundFrameLogger.readFrame(Http2InboundFrameLogger.java:41) at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:173) at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:393) at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:453) at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529) at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468) at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290) at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.grpc.netty.shaded.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800) at io.grpc.netty.shaded.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe$1.run(AbstractEpollChannel.java:425) at io.grpc.netty.shaded.io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174) at io.grpc.netty.shaded.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167) at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470) at io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:403) at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) at io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base/java.lang.Thread.run(Thread.java:833) ```

As far as I am concerned, the issue is mostly because of not enough resources to schedule execution of the requests. I wonder if it is possible to somehow "throttle" request processing? What do you suggest?

@jamesward
Copy link
Collaborator

I think @lowasser will have some thoughts on this. It seems at least theoretically possible to have the request queue fill with idle connections just consuming a file handle when the request handler is waiting on getting resources to process the request. I think other servers do something similar, usually on top of Netty in JVM land. QQ: Are you using the Netty gRPC server? The one thing I do wonder about is how to manage things that aren't just a burst you can quickly recover from. Assuming you have a load balancer that is handling failures with a retry / fallback, you'd probably want your server to throw an error and close the connection when it is out of capacity.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants