From f26a501251906bec3ffe5100f40813a271f22f8e Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Thu, 17 Dec 2020 15:48:34 -0800 Subject: [PATCH] Authority isn't the only thing called before start --- .../java/io/grpc/internal/DelayedStream.java | 34 +++++++++++-------- .../io/grpc/internal/DelayedStreamTest.java | 2 +- 2 files changed, 21 insertions(+), 15 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/DelayedStream.java b/core/src/main/java/io/grpc/internal/DelayedStream.java index 0eb6b80c3666..e5926d0cb50b 100644 --- a/core/src/main/java/io/grpc/internal/DelayedStream.java +++ b/core/src/main/java/io/grpc/internal/DelayedStream.java @@ -60,12 +60,8 @@ class DelayedStream implements ClientStream { private long startTimeNanos; @GuardedBy("this") private long streamSetTimeNanos; - - // start()-related storage. All references should be cleared after start() is called on realStream - // to avoid retaining the objects for the life of the call - /** non-null when setAuthority needs to be called on realStream when it is set. */ // No need to synchronize; start() synchronization provides a happens-before - private String startAuthority; + private List preStartPendingCalls = new ArrayList<>(); @Override public void setMaxInboundMessageSize(final int maxSize) { @@ -97,7 +93,8 @@ public void run() { @Override public void setDeadline(final Deadline deadline) { - delayOrExecute(new Runnable() { + checkState(listener == null, "May only be called before start"); + preStartPendingCalls.add(new Runnable() { @Override public void run() { realStream.setDeadline(deadline); @@ -211,7 +208,12 @@ private void delayOrExecute(Runnable runnable) { public void setAuthority(final String authority) { checkState(listener == null, "May only be called before start"); checkNotNull(authority, "authority"); - this.startAuthority = authority; + preStartPendingCalls.add(new Runnable() { + @Override + public void run() { + realStream.setAuthority(authority); + } + }); } @Override @@ -246,10 +248,10 @@ public void start(ClientStreamListener listener) { * #listener}. */ private void internalStart(ClientStreamListener listener) { - if (startAuthority != null) { - realStream.setAuthority(startAuthority); - startAuthority = null; + for (Runnable runnable : preStartPendingCalls) { + runnable.run(); } + preStartPendingCalls = null; realStream.start(listener); } @@ -360,7 +362,8 @@ public void run() { @Override public void optimizeForDirectExecutor() { - delayOrExecute(new Runnable() { + checkState(listener == null, "May only be called before start"); + preStartPendingCalls.add(new Runnable() { @Override public void run() { realStream.optimizeForDirectExecutor(); @@ -370,8 +373,9 @@ public void run() { @Override public void setCompressor(final Compressor compressor) { + checkState(listener == null, "May only be called before start"); checkNotNull(compressor, "compressor"); - delayOrExecute(new Runnable() { + preStartPendingCalls.add(new Runnable() { @Override public void run() { realStream.setCompressor(compressor); @@ -381,7 +385,8 @@ public void run() { @Override public void setFullStreamDecompression(final boolean fullStreamDecompression) { - delayOrExecute( + checkState(listener == null, "May only be called before start"); + preStartPendingCalls.add( new Runnable() { @Override public void run() { @@ -392,8 +397,9 @@ public void run() { @Override public void setDecompressorRegistry(final DecompressorRegistry decompressorRegistry) { + checkState(listener == null, "May only be called before start"); checkNotNull(decompressorRegistry, "decompressorRegistry"); - delayOrExecute(new Runnable() { + preStartPendingCalls.add(new Runnable() { @Override public void run() { realStream.setDecompressorRegistry(decompressorRegistry); diff --git a/core/src/test/java/io/grpc/internal/DelayedStreamTest.java b/core/src/test/java/io/grpc/internal/DelayedStreamTest.java index 8fb42c48fb52..fc0daed28133 100644 --- a/core/src/test/java/io/grpc/internal/DelayedStreamTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedStreamTest.java @@ -92,9 +92,9 @@ public void start_afterStart() { @Test public void setStream_sendsAllMessages() { - stream.start(listener); stream.setCompressor(Codec.Identity.NONE); stream.setDecompressorRegistry(DecompressorRegistry.getDefaultInstance()); + stream.start(listener); stream.setMessageCompression(true); InputStream message = new ByteArrayInputStream(new byte[]{'a'});