From ed105b4779f9d25067163e59297dd3707c18e78f Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Fri, 15 Jul 2022 15:25:27 -0700 Subject: [PATCH] core: Workaround retry causing memory leak Data is getting orphaned sitting in MessageFramer. This hack thus always flushes data out of the framer so no data can remain sitting there. See #9340 --- .../io/grpc/internal/RetriableStream.java | 4 + .../io/grpc/internal/RetriableStreamTest.java | 77 ++++++++++++++++--- 2 files changed, 69 insertions(+), 12 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/RetriableStream.java b/core/src/main/java/io/grpc/internal/RetriableStream.java index 55c17c50ba2..cb94195cce1 100644 --- a/core/src/main/java/io/grpc/internal/RetriableStream.java +++ b/core/src/main/java/io/grpc/internal/RetriableStream.java @@ -550,6 +550,10 @@ class SendMessageEntry implements BufferEntry { @Override public void runWith(Substream substream) { substream.stream.writeMessage(method.streamRequest(message)); + // TODO(ejona): Workaround Netty memory leak. Message writes always need to be followed by + // flushes (or half close), but retry appears to have a code path that the flushes may + // not happen. The code needs to be fixed and this removed. See #9340. + substream.stream.flush(); } } diff --git a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java index 3f8b25ce1c4..f20e772e92b 100644 --- a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java +++ b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java @@ -268,10 +268,14 @@ public Void answer(InvocationOnMock in) { retriableStream.sendMessage("msg3"); retriableStream.request(456); - inOrder.verify(mockStream1, times(2)).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream1).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream1).flush(); // Memory leak workaround + inOrder.verify(mockStream1).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream1).flush(); // Memory leak workaround inOrder.verify(mockStream1).request(345); inOrder.verify(mockStream1, times(2)).flush(); inOrder.verify(mockStream1).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream1).flush(); // Memory leak workaround inOrder.verify(mockStream1).request(456); inOrder.verifyNoMoreInteractions(); @@ -304,12 +308,19 @@ public Void answer(InvocationOnMock in) { ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); - inOrder.verify(mockStream2, times(2)).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream2).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream2).flush(); // Memory leak workaround + inOrder.verify(mockStream2).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream2).flush(); // Memory leak workaround inOrder.verify(mockStream2).request(345); inOrder.verify(mockStream2, times(2)).flush(); inOrder.verify(mockStream2).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream2).flush(); // Memory leak workaround inOrder.verify(mockStream2).request(456); - inOrder.verify(mockStream2, times(2)).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream2).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream2).flush(); // Memory leak workaround + inOrder.verify(mockStream2).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream2).flush(); // Memory leak workaround inOrder.verify(mockStream2).isReady(); inOrder.verifyNoMoreInteractions(); @@ -319,7 +330,10 @@ public Void answer(InvocationOnMock in) { // mockStream1 is closed so it is not in the drainedSubstreams verifyNoMoreInteractions(mockStream1); - inOrder.verify(mockStream2, times(2)).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream2).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream2).flush(); // Memory leak workaround + inOrder.verify(mockStream2).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream2).flush(); // Memory leak workaround // retry2 doReturn(mockStream3).when(retriableStreamRecorder).newSubstream(2); @@ -353,12 +367,19 @@ public Void answer(InvocationOnMock in) { ArgumentCaptor sublistenerCaptor3 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream3).start(sublistenerCaptor3.capture()); - inOrder.verify(mockStream3, times(2)).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream3).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream3).flush(); // Memory leak workaround + inOrder.verify(mockStream3).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream3).flush(); // Memory leak workaround inOrder.verify(mockStream3).request(345); inOrder.verify(mockStream3, times(2)).flush(); inOrder.verify(mockStream3).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream3).flush(); // Memory leak workaround inOrder.verify(mockStream3).request(456); - inOrder.verify(mockStream3, times(7)).writeMessage(any(InputStream.class)); + for (int i = 0; i < 7; i++) { + inOrder.verify(mockStream3).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream3).flush(); // Memory leak workaround + } inOrder.verify(mockStream3).isReady(); inOrder.verifyNoMoreInteractions(); @@ -1958,10 +1979,14 @@ public Void answer(InvocationOnMock in) { hedgingStream.sendMessage("msg3"); hedgingStream.request(456); - inOrder.verify(mockStream1, times(2)).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream1).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream1).flush(); // Memory leak workaround + inOrder.verify(mockStream1).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream1).flush(); // Memory leak workaround inOrder.verify(mockStream1).request(345); inOrder.verify(mockStream1, times(2)).flush(); inOrder.verify(mockStream1).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream1).flush(); // Memory leak workaround inOrder.verify(mockStream1).request(456); inOrder.verifyNoMoreInteractions(); @@ -1984,10 +2009,14 @@ public Void answer(InvocationOnMock in) { ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); - inOrder.verify(mockStream2, times(2)).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream2).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream2).flush(); // Memory leak workaround + inOrder.verify(mockStream2).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream2).flush(); // Memory leak workaround inOrder.verify(mockStream2).request(345); inOrder.verify(mockStream2, times(2)).flush(); inOrder.verify(mockStream2).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream2).flush(); // Memory leak workaround inOrder.verify(mockStream2).request(456); inOrder.verify(mockStream1).isReady(); inOrder.verify(mockStream2).isReady(); @@ -1998,9 +2027,13 @@ public Void answer(InvocationOnMock in) { hedgingStream.sendMessage("msg2 after hedge2 starts"); inOrder.verify(mockStream1).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream1).flush(); // Memory leak workaround inOrder.verify(mockStream2).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream2).flush(); // Memory leak workaround inOrder.verify(mockStream1).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream1).flush(); // Memory leak workaround inOrder.verify(mockStream2).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream2).flush(); // Memory leak workaround inOrder.verifyNoMoreInteractions(); @@ -2022,12 +2055,19 @@ public Void answer(InvocationOnMock in) { ArgumentCaptor sublistenerCaptor3 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream3).start(sublistenerCaptor3.capture()); - inOrder.verify(mockStream3, times(2)).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream3).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream3).flush(); // Memory leak workaround + inOrder.verify(mockStream3).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream3).flush(); // Memory leak workaround inOrder.verify(mockStream3).request(345); inOrder.verify(mockStream3, times(2)).flush(); inOrder.verify(mockStream3).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream3).flush(); // Memory leak workaround inOrder.verify(mockStream3).request(456); - inOrder.verify(mockStream3, times(2)).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream3).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream3).flush(); // Memory leak workaround + inOrder.verify(mockStream3).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream3).flush(); // Memory leak workaround inOrder.verify(mockStream1).isReady(); inOrder.verify(mockStream2).isReady(); inOrder.verify(mockStream3).isReady(); @@ -2036,8 +2076,11 @@ public Void answer(InvocationOnMock in) { // send one more message hedgingStream.sendMessage("msg1 after hedge3 starts"); inOrder.verify(mockStream1).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream1).flush(); // Memory leak workaround inOrder.verify(mockStream2).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream2).flush(); // Memory leak workaround inOrder.verify(mockStream3).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream3).flush(); // Memory leak workaround // hedge3 receives nonFatalStatus sublistenerCaptor3.getValue().closed( @@ -2047,7 +2090,9 @@ public Void answer(InvocationOnMock in) { // send one more message hedgingStream.sendMessage("msg1 after hedge3 fails"); inOrder.verify(mockStream1).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream1).flush(); // Memory leak workaround inOrder.verify(mockStream2).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream2).flush(); // Memory leak workaround // the hedge mockStream4 starts fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS); @@ -2067,12 +2112,19 @@ public Void answer(InvocationOnMock in) { ArgumentCaptor sublistenerCaptor4 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream4).start(sublistenerCaptor4.capture()); - inOrder.verify(mockStream4, times(2)).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream4).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream4).flush(); // Memory leak workaround + inOrder.verify(mockStream4).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream4).flush(); // Memory leak workaround inOrder.verify(mockStream4).request(345); inOrder.verify(mockStream4, times(2)).flush(); inOrder.verify(mockStream4).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream4).flush(); // Memory leak workaround inOrder.verify(mockStream4).request(456); - inOrder.verify(mockStream4, times(4)).writeMessage(any(InputStream.class)); + for (int i = 0; i < 4; i++) { + inOrder.verify(mockStream4).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream4).flush(); // Memory leak workaround + } inOrder.verify(mockStream1).isReady(); inOrder.verify(mockStream2).isReady(); inOrder.verify(mockStream4).isReady(); @@ -2190,6 +2242,7 @@ public void hedging_maxAttempts() { hedgingStream.sendMessage("msg1 after commit"); inOrder.verify(mockStream3).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream3).flush(); // Memory leak workaround inOrder.verifyNoMoreInteractions(); Metadata heders = new Metadata();