Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DelayedClientTransport can trigger RejectedExecutionException #6283

Closed
ejona86 opened this issue Oct 16, 2019 · 2 comments · Fixed by #7720 or #7750
Closed

DelayedClientTransport can trigger RejectedExecutionException #6283

ejona86 opened this issue Oct 16, 2019 · 2 comments · Fixed by #7720 or #7750
Assignees
Labels
Milestone

Comments

@ejona86
Copy link
Member

ejona86 commented Oct 16, 2019

We've seen the following exception (b/142475326):

java.util.concurrent.RejectedExecutionException
        at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2086)
        at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:848)
        at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1394)
        at io.grpc.internal.SerializingExecutor.schedule(SerializingExecutor.java:93)
        at io.grpc.internal.SerializingExecutor.execute(SerializingExecutor.java:13)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.closedInternal(ClientCallImpl.java:696)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.closed(ClientCallImpl.java:646)
        at io.grpc.internal.DelayedStream$DelayedStreamListener$5.run(DelayedStream.java:475)
        at io.grpc.internal.DelayedStream$DelayedStreamListener.drainPendingCallbacks(DelayedStream.java:500)
        at io.grpc.internal.DelayedStream.drainPendingCalls(DelayedStream.java:168)
        at io.grpc.internal.DelayedStream.setStream(DelayedStream.java:132)
        at io.grpc.internal.DelayedClientTransport$PendingStream.createRealStream(DelayedClientTransport.java:358)
        at io.grpc.internal.DelayedClientTransport$5.run(DelayedClientTransport.java:4)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
        at java.lang.Thread.run(Thread.java:919)

The application was using our default executor, which means we scheduled work after the channel terminated or the reference counting is broken.

When the channel terminates it guarantees that it is done scheduling work on the executor, but not that all the scheduled work is complete. The problem here appears that some of that work is scheduling more work on the same executor.

Specifically, this runnable can produce more runnables:

executor.execute(new Runnable() {
@Override
public void run() {
stream.createRealStream(transport);
}
});

But while that runnable is running, 'stream' is likely to have already been removed from 'pendingStreams'. So if the channel is shut down at that point the transport will terminate abruptly and no other transport will "own" the call yet, so the entire channel can terminate before the runnable completes.

We could either split start() draining to a separate method in DelayedStream so that newStream()+start() can be called directly within reprocess(), or we could keep a counter for the number of streams still draining and delay transport termination until it reaches zero.

@YifeiZhuang
Copy link
Contributor

YifeiZhuang commented Dec 17, 2020

RejectedExecutionException can also be triggered at MetaDataApplier e.g.

Exception in thread "grpc-default-executor-1" java.util.concurrent.RejectedExecutionException: event executor terminated
	at io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:926)
	at io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:353)
	at io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:346)
	at io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:828)
	at io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:818)
	at io.grpc.netty.WriteQueue.scheduleFlush(WriteQueue.java:67)
	at io.grpc.netty.WriteQueue.enqueue(WriteQueue.java:87)
	at io.grpc.netty.NettyClientStream$Sink.writeHeadersInternal(NettyClientStream.java:165)
	at io.grpc.netty.NettyClientStream$Sink.writeHeaders(NettyClientStream.java:121)
	at io.grpc.internal.AbstractClientStream.start(AbstractClientStream.java:159)
	at io.grpc.internal.DelayedStream$5.run(DelayedStream.java:245)
	at io.grpc.internal.DelayedStream.drainPendingCalls(DelayedStream.java:174)
	at io.grpc.internal.DelayedStream.setStream(DelayedStream.java:137)
	at io.grpc.internal.MetadataApplierImpl.finalizeWith(MetadataApplierImpl.java:105)
	at io.grpc.internal.MetadataApplierImpl.apply(MetadataApplierImpl.java:79)
	at io.grpc.CallCredentials2$1.apply(CallCredentials2.java:61)
	at io.grpc.auth.GoogleAuthLibraryCallCredentials$1.onSuccess(GoogleAuthLibraryCallCredentials.java:134)
	at com.google.auth.Credentials.blockingGetToCallback(Credentials.java:127)
	at com.google.auth.Credentials$1.run(Credentials.java:105)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

ejona86 added a commit to ejona86/grpc-java that referenced this issue Dec 17, 2020
ejona86 added a commit to ejona86/grpc-java that referenced this issue Dec 21, 2020
@ejona86 ejona86 reopened this Dec 21, 2020
@ejona86
Copy link
Member Author

ejona86 commented Dec 21, 2020

#7720 is being reverted in #7744 in favor of #7743

ejona86 added a commit to ejona86/grpc-java that referenced this issue Dec 22, 2020
DelayedClientTransport needs to avoid becoming terminated while it owns
RPCs. Previously DelayedClientTransport could terminate when some of its
RPCs had their realStream but realStream.start() hadn't yet been called.

To avoid that, we now make sure to call realStream.start()
synchronously with setting realStream. Since start() and the method
calls before start execute quickly, we can run it in-line. But it does
mean we now need to split the Stream methods into "before start" and
"after start" categories for queuing.

Fixes grpc#6283
ejona86 added a commit that referenced this issue Jan 20, 2021
DelayedClientTransport needs to avoid becoming terminated while it owns
RPCs. Previously DelayedClientTransport could terminate when some of its
RPCs had their realStream but realStream.start() hadn't yet been called.

To avoid that, we now make sure to call realStream.start()
synchronously with setting realStream. Since start() and the method
calls before start execute quickly, we can run it in-line. But it does
mean we now need to split the Stream methods into "before start" and
"after start" categories for queuing.

Fixes #6283
@ejona86 ejona86 modified the milestones: Next, 1.36 Feb 11, 2021
@github-actions github-actions bot locked as resolved and limited conversation to collaborators Jun 3, 2021
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.