Skip to content

Commit

Permalink
core: Attempt to rework retry memory leak fix in #9360 to send fewer …
Browse files Browse the repository at this point in the history
…FIN packets.
  • Loading branch information
jtk54 committed Jun 8, 2023
1 parent 478f30e commit 84cbeec
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 99 deletions.
109 changes: 75 additions & 34 deletions core/src/main/java/io/grpc/internal/RetriableStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.collect.Iterables;
import io.grpc.Attributes;
import io.grpc.ClientStreamTracer;
import io.grpc.Compressor;
Expand All @@ -34,11 +35,7 @@
import io.grpc.internal.ClientStreamListener.RpcProgress;
import java.io.InputStream;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.*;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -191,6 +188,7 @@ public void run() {
// For hedging only, not needed for normal retry
for (Substream substream : savedDrainedSubstreams) {
if (substream != winningSubstream) {
substream.stream.flush();
substream.stream.cancel(CANCELLED_BECAUSE_COMMITTED);
}
}
Expand Down Expand Up @@ -275,6 +273,7 @@ private void drain(Substream substream) {
int chunk = 0x80;
List<BufferEntry> list = null;
boolean streamStarted = false;
boolean needsFlush = false;
Runnable onReadyRunnable = null;

while (true) {
Expand All @@ -291,6 +290,7 @@ private void drain(Substream substream) {
}
if (index == savedState.buffer.size()) { // I'm drained
state = savedState.substreamDrained(substream);
substream.stream.flush();
if (!isReady()) {
return;
}
Expand All @@ -306,6 +306,10 @@ public void run() {
}

if (substream.closed) {
substream.stream.flush();
// if (needsFlush) {
// substream.stream.flush();
// }
return;
}

Expand All @@ -324,6 +328,14 @@ public void run() {
if (bufferEntry instanceof RetriableStream.StartEntry) {
streamStarted = true;
}

if (bufferEntry instanceof RetriableStream.SendMessageEntry) {
needsFlush = true;
}
if (bufferEntry instanceof RetriableStream.FlushEntry) {
needsFlush = false;
}

savedState = state;
if (savedState.winningSubstream != null && savedState.winningSubstream != substream) {
// committed but not me, to be cancelled
Expand All @@ -335,6 +347,11 @@ public void run() {
}
}

substream.stream.flush();
// if (needsFlush) {
// substream.stream.flush();
// }

if (onReadyRunnable != null) {
listenerSerializeExecutor.execute(onReadyRunnable);
return;
Expand All @@ -344,6 +361,7 @@ public void run() {
// Start stream so inFlightSubStreams is decremented in Sublistener.closed()
substream.stream.start(new Sublistener(substream));
}

substream.stream.cancel(
state.winningSubstream == substream ? cancellationStatus : CANCELLED_BECAUSE_COMMITTED);
}
Expand Down Expand Up @@ -504,17 +522,32 @@ public void run() {

@Override
public final void cancel(final Status reason) {
Substream noopSubstream = new Substream(0 /* previousAttempts doesn't matter here */);
noopSubstream.stream = new NoopClientStream();
Runnable runnable = commit(noopSubstream);

if (runnable != null) {
synchronized (lock) {
state = state.substreamDrained(noopSubstream);
// Handle everything the same way, e.g. isHedging vs not.
// If no stream or in flight not-drained stream, commit noop, safeCloseMasterListener, and return.
// If drained stream, commit it and fall through.
Substream drainedSubstreamToCommit;
synchronized (lock) {
drainedSubstreamToCommit = Iterables.getFirst(state.drainedSubstreams, null);
}
if (drainedSubstreamToCommit != null) {
Runnable runnable = commit(drainedSubstreamToCommit);
if (runnable != null) {
runnable.run();
} else {
}
} else { // No substream exists or no drained substreams exist; treat this case the same way.
Substream noopSubstream = new Substream(0 /* previousAttempts doesn't matter here */);
noopSubstream.stream = new NoopClientStream();
Runnable runnable = commit(noopSubstream);

if (runnable != null) {
synchronized (lock) {
state = state.substreamDrained(noopSubstream);
}
runnable.run();
safeCloseMasterListener(reason, RpcProgress.PROCESSED, new Metadata());
return;
}
runnable.run();
safeCloseMasterListener(reason, RpcProgress.PROCESSED, new Metadata());
return;
}

Substream winningSubstreamToCancel = null;
Expand All @@ -527,6 +560,7 @@ public final void cancel(final Status reason) {
state = state.cancelled();
}
if (winningSubstreamToCancel != null) {
winningSubstreamToCancel.stream.flush();
winningSubstreamToCancel.stream.cancel(reason);
}
}
Expand Down Expand Up @@ -554,25 +588,33 @@ public final void writeMessage(InputStream message) {
throw new IllegalStateException("RetriableStream.writeMessage() should not be called directly");
}

final void sendMessage(final ReqT message) {
State savedState = state;
if (savedState.passThrough) {
savedState.winningSubstream.stream.writeMessage(method.streamRequest(message));
return;
class SendMessageEntry implements BufferEntry {
private final ReqT message;

SendMessageEntry(ReqT message) {
this.message = message;
}

class SendMessageEntry implements BufferEntry {
@Override
public void runWith(Substream substream) {
substream.stream.writeMessage(method.streamRequest(message));
@Override
public void runWith(Substream substream) {
substream.stream.writeMessage(method.streamRequest(message));
if (isHedging) {
// TODO(ejona): Workaround Netty memory leak. Message writes always need to be followed by
// flushes (or half close), but retry appears to have a code path that the flushes may
// not happen. The code needs to be fixed and this removed. See #9340.
substream.stream.flush();
}
}
}

final void sendMessage(final ReqT message) {
State savedState = state;
if (savedState.passThrough) {
savedState.winningSubstream.stream.writeMessage(method.streamRequest(message));
return;
}

delayOrExecute(new SendMessageEntry());
delayOrExecute(new SendMessageEntry(message));
}

@Override
Expand All @@ -593,6 +635,13 @@ public void runWith(Substream substream) {
delayOrExecute(new RequestEntry());
}

static class FlushEntry implements BufferEntry {
@Override
public void runWith(Substream substream) {
substream.stream.flush();
}
}

@Override
public final void flush() {
State savedState = state;
Expand All @@ -601,13 +650,6 @@ public final void flush() {
return;
}

class FlushEntry implements BufferEntry {
@Override
public void runWith(Substream substream) {
substream.stream.flush();
}
}

delayOrExecute(new FlushEntry());
}

Expand Down Expand Up @@ -1129,7 +1171,7 @@ private static final class State {

/**
* Unmodifiable collection of all the open substreams that are drained. Singleton once
* passThrough; Empty if committed but not passTrough.
* passThrough; Empty if committed but not passThrough.
*/
final Collection<Substream> drainedSubstreams;

Expand Down Expand Up @@ -1180,7 +1222,6 @@ private static final class State {
|| (drainedSubstreams.size() == 1 && drainedSubstreams.contains(winningSubstream))
|| (drainedSubstreams.size() == 0 && winningSubstream.closed),
"passThrough should imply winningSubstream is drained");
checkState(!cancelled || winningSubstream != null, "cancelled should imply committed");
}

@CheckReturnValue
Expand Down
47 changes: 47 additions & 0 deletions core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import java.io.IOException;
import java.io.InputStream;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Rule;
Expand Down Expand Up @@ -158,6 +160,18 @@ public void cancel(Status errorStatus) {
verify(mockListener).closed(any(Status.class), same(PROCESSED), any(Metadata.class));
}

@Test
public void cancel_closesFramerAndReleasesBuffers() {
TrackingWritableBufferAllocator trackingAllocator = new TrackingWritableBufferAllocator();
AbstractClientStream stream =
new BaseAbstractClientStream(trackingAllocator, statsTraceCtx, transportTracer);
stream.start(mockListener);
stream.writeMessage(new ByteArrayInputStream(new byte[1]));
stream.cancel(Status.DEADLINE_EXCEEDED);
assertTrue(trackingAllocator.allocatedBuffersReleased());
assertTrue(stream.framer().isClosed());
}

@Test
public void startFailsOnNullListener() {
AbstractClientStream stream =
Expand Down Expand Up @@ -584,4 +598,37 @@ public void runOnTransportThread(Runnable r) {
r.run();
}
}

private static class TrackingWritableBufferAllocator implements WritableBufferAllocator {
List<ReleaseVerifyingBuffer> allocatedBuffers = new ArrayList<>();

@Override
public WritableBuffer allocate(int capacityHint) {
ReleaseVerifyingBuffer buf = new ReleaseVerifyingBuffer(capacityHint);
allocatedBuffers.add(buf);
return buf;
}

boolean allocatedBuffersReleased() {
return allocatedBuffers.stream().allMatch(ReleaseVerifyingBuffer::isReleased);
}
}

private static class ReleaseVerifyingBuffer extends ByteWritableBuffer {
boolean isReleased;

ReleaseVerifyingBuffer(int maxFrameSize) {
super(maxFrameSize);
}

@Override
public void release() {
super.release();
isReleased = true;
}

boolean isReleased() {
return isReleased;
}
}
}

0 comments on commit 84cbeec

Please sign in to comment.