Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: Rework retry memory leak fix in https://github.com/grpc/grpc-ja… #10209

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
109 changes: 75 additions & 34 deletions core/src/main/java/io/grpc/internal/RetriableStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.collect.Iterables;
import io.grpc.Attributes;
import io.grpc.ClientStreamTracer;
import io.grpc.Compressor;
Expand All @@ -34,11 +35,7 @@
import io.grpc.internal.ClientStreamListener.RpcProgress;
import java.io.InputStream;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.*;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -191,6 +188,7 @@ public void run() {
// For hedging only, not needed for normal retry
for (Substream substream : savedDrainedSubstreams) {
if (substream != winningSubstream) {
substream.stream.flush();
substream.stream.cancel(CANCELLED_BECAUSE_COMMITTED);
}
}
Expand Down Expand Up @@ -275,6 +273,7 @@ private void drain(Substream substream) {
int chunk = 0x80;
List<BufferEntry> list = null;
boolean streamStarted = false;
boolean needsFlush = false;
Runnable onReadyRunnable = null;

while (true) {
Expand All @@ -291,6 +290,7 @@ private void drain(Substream substream) {
}
if (index == savedState.buffer.size()) { // I'm drained
state = savedState.substreamDrained(substream);
substream.stream.flush();
if (!isReady()) {
return;
}
Expand All @@ -306,6 +306,10 @@ public void run() {
}

if (substream.closed) {
substream.stream.flush();
// if (needsFlush) {
// substream.stream.flush();
// }
return;
}

Expand All @@ -324,6 +328,14 @@ public void run() {
if (bufferEntry instanceof RetriableStream.StartEntry) {
streamStarted = true;
}

if (bufferEntry instanceof RetriableStream.SendMessageEntry) {
needsFlush = true;
}
if (bufferEntry instanceof RetriableStream.FlushEntry) {
needsFlush = false;
}

savedState = state;
if (savedState.winningSubstream != null && savedState.winningSubstream != substream) {
// committed but not me, to be cancelled
Expand All @@ -335,6 +347,11 @@ public void run() {
}
}

substream.stream.flush();
// if (needsFlush) {
// substream.stream.flush();
// }

if (onReadyRunnable != null) {
listenerSerializeExecutor.execute(onReadyRunnable);
return;
Expand All @@ -344,6 +361,7 @@ public void run() {
// Start stream so inFlightSubStreams is decremented in Sublistener.closed()
substream.stream.start(new Sublistener(substream));
}

substream.stream.cancel(
state.winningSubstream == substream ? cancellationStatus : CANCELLED_BECAUSE_COMMITTED);
}
Expand Down Expand Up @@ -504,17 +522,32 @@ public void run() {

@Override
public final void cancel(final Status reason) {
Substream noopSubstream = new Substream(0 /* previousAttempts doesn't matter here */);
noopSubstream.stream = new NoopClientStream();
Runnable runnable = commit(noopSubstream);

if (runnable != null) {
synchronized (lock) {
state = state.substreamDrained(noopSubstream);
// Handle everything the same way, e.g. isHedging vs not.
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This strategy plus attempting to brute-force flush in drain() (even non-conditionally as the code is now) is still apparently not enough to fix the leak and get rid of the explicit flush after writes. I think this is a reasonable attempt at what we discussed offline, am I missing anything obvious?

// If no stream or in flight not-drained stream, commit noop, safeCloseMasterListener, and return.
// If drained stream, commit it and fall through.
Substream drainedSubstreamToCommit;
synchronized (lock) {
drainedSubstreamToCommit = Iterables.getFirst(state.drainedSubstreams, null);
}
if (drainedSubstreamToCommit != null) {
Runnable runnable = commit(drainedSubstreamToCommit);
if (runnable != null) {
runnable.run();
} else {
}
} else { // No substream exists or no drained substreams exist; treat this case the same way.
Substream noopSubstream = new Substream(0 /* previousAttempts doesn't matter here */);
noopSubstream.stream = new NoopClientStream();
Runnable runnable = commit(noopSubstream);

if (runnable != null) {
synchronized (lock) {
state = state.substreamDrained(noopSubstream);
}
runnable.run();
safeCloseMasterListener(reason, RpcProgress.PROCESSED, new Metadata());
return;
}
runnable.run();
safeCloseMasterListener(reason, RpcProgress.PROCESSED, new Metadata());
return;
}

Substream winningSubstreamToCancel = null;
Expand All @@ -527,6 +560,7 @@ public final void cancel(final Status reason) {
state = state.cancelled();
}
if (winningSubstreamToCancel != null) {
winningSubstreamToCancel.stream.flush();
winningSubstreamToCancel.stream.cancel(reason);
}
}
Expand Down Expand Up @@ -554,25 +588,33 @@ public final void writeMessage(InputStream message) {
throw new IllegalStateException("RetriableStream.writeMessage() should not be called directly");
}

final void sendMessage(final ReqT message) {
State savedState = state;
if (savedState.passThrough) {
savedState.winningSubstream.stream.writeMessage(method.streamRequest(message));
return;
class SendMessageEntry implements BufferEntry {
private final ReqT message;

SendMessageEntry(ReqT message) {
this.message = message;
}

class SendMessageEntry implements BufferEntry {
@Override
public void runWith(Substream substream) {
substream.stream.writeMessage(method.streamRequest(message));
@Override
public void runWith(Substream substream) {
substream.stream.writeMessage(method.streamRequest(message));
if (isHedging) {
// TODO(ejona): Workaround Netty memory leak. Message writes always need to be followed by
// flushes (or half close), but retry appears to have a code path that the flushes may
// not happen. The code needs to be fixed and this removed. See #9340.
substream.stream.flush();
}
}
}

final void sendMessage(final ReqT message) {
State savedState = state;
if (savedState.passThrough) {
savedState.winningSubstream.stream.writeMessage(method.streamRequest(message));
return;
}

delayOrExecute(new SendMessageEntry());
delayOrExecute(new SendMessageEntry(message));
}

@Override
Expand All @@ -593,6 +635,13 @@ public void runWith(Substream substream) {
delayOrExecute(new RequestEntry());
}

static class FlushEntry implements BufferEntry {
@Override
public void runWith(Substream substream) {
substream.stream.flush();
}
}

@Override
public final void flush() {
State savedState = state;
Expand All @@ -601,13 +650,6 @@ public final void flush() {
return;
}

class FlushEntry implements BufferEntry {
@Override
public void runWith(Substream substream) {
substream.stream.flush();
}
}

delayOrExecute(new FlushEntry());
}

Expand Down Expand Up @@ -1129,7 +1171,7 @@ private static final class State {

/**
* Unmodifiable collection of all the open substreams that are drained. Singleton once
* passThrough; Empty if committed but not passTrough.
* passThrough; Empty if committed but not passThrough.
*/
final Collection<Substream> drainedSubstreams;

Expand Down Expand Up @@ -1180,7 +1222,6 @@ private static final class State {
|| (drainedSubstreams.size() == 1 && drainedSubstreams.contains(winningSubstream))
|| (drainedSubstreams.size() == 0 && winningSubstream.closed),
"passThrough should imply winningSubstream is drained");
checkState(!cancelled || winningSubstream != null, "cancelled should imply committed");
}

@CheckReturnValue
Expand Down
47 changes: 47 additions & 0 deletions core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import java.io.IOException;
import java.io.InputStream;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Rule;
Expand Down Expand Up @@ -158,6 +160,18 @@ public void cancel(Status errorStatus) {
verify(mockListener).closed(any(Status.class), same(PROCESSED), any(Metadata.class));
}

@Test
public void cancel_closesFramerAndReleasesBuffers() {
TrackingWritableBufferAllocator trackingAllocator = new TrackingWritableBufferAllocator();
AbstractClientStream stream =
new BaseAbstractClientStream(trackingAllocator, statsTraceCtx, transportTracer);
stream.start(mockListener);
stream.writeMessage(new ByteArrayInputStream(new byte[1]));
stream.cancel(Status.DEADLINE_EXCEEDED);
assertTrue(trackingAllocator.allocatedBuffersReleased());
assertTrue(stream.framer().isClosed());
}

@Test
public void startFailsOnNullListener() {
AbstractClientStream stream =
Expand Down Expand Up @@ -584,4 +598,37 @@ public void runOnTransportThread(Runnable r) {
r.run();
}
}

private static class TrackingWritableBufferAllocator implements WritableBufferAllocator {
List<ReleaseVerifyingBuffer> allocatedBuffers = new ArrayList<>();

@Override
public WritableBuffer allocate(int capacityHint) {
ReleaseVerifyingBuffer buf = new ReleaseVerifyingBuffer(capacityHint);
allocatedBuffers.add(buf);
return buf;
}

boolean allocatedBuffersReleased() {
return allocatedBuffers.stream().allMatch(ReleaseVerifyingBuffer::isReleased);
}
}

private static class ReleaseVerifyingBuffer extends ByteWritableBuffer {
boolean isReleased;

ReleaseVerifyingBuffer(int maxFrameSize) {
super(maxFrameSize);
}

@Override
public void release() {
super.release();
isReleased = true;
}

boolean isReleased() {
return isReleased;
}
}
}