Skip to content

Commit

Permalink
Authority isn't the only thing called before start
Browse files Browse the repository at this point in the history
  • Loading branch information
ejona86 committed Dec 17, 2020
1 parent 25bf8cb commit f26a501
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 15 deletions.
34 changes: 20 additions & 14 deletions core/src/main/java/io/grpc/internal/DelayedStream.java
Expand Up @@ -60,12 +60,8 @@ class DelayedStream implements ClientStream {
private long startTimeNanos;
@GuardedBy("this")
private long streamSetTimeNanos;

// start()-related storage. All references should be cleared after start() is called on realStream
// to avoid retaining the objects for the life of the call
/** non-null when setAuthority needs to be called on realStream when it is set. */
// No need to synchronize; start() synchronization provides a happens-before
private String startAuthority;
private List<Runnable> preStartPendingCalls = new ArrayList<>();

@Override
public void setMaxInboundMessageSize(final int maxSize) {
Expand Down Expand Up @@ -97,7 +93,8 @@ public void run() {

@Override
public void setDeadline(final Deadline deadline) {
delayOrExecute(new Runnable() {
checkState(listener == null, "May only be called before start");
preStartPendingCalls.add(new Runnable() {
@Override
public void run() {
realStream.setDeadline(deadline);
Expand Down Expand Up @@ -211,7 +208,12 @@ private void delayOrExecute(Runnable runnable) {
public void setAuthority(final String authority) {
checkState(listener == null, "May only be called before start");
checkNotNull(authority, "authority");
this.startAuthority = authority;
preStartPendingCalls.add(new Runnable() {
@Override
public void run() {
realStream.setAuthority(authority);
}
});
}

@Override
Expand Down Expand Up @@ -246,10 +248,10 @@ public void start(ClientStreamListener listener) {
* #listener}.
*/
private void internalStart(ClientStreamListener listener) {
if (startAuthority != null) {
realStream.setAuthority(startAuthority);
startAuthority = null;
for (Runnable runnable : preStartPendingCalls) {
runnable.run();
}
preStartPendingCalls = null;
realStream.start(listener);
}

Expand Down Expand Up @@ -360,7 +362,8 @@ public void run() {

@Override
public void optimizeForDirectExecutor() {
delayOrExecute(new Runnable() {
checkState(listener == null, "May only be called before start");
preStartPendingCalls.add(new Runnable() {
@Override
public void run() {
realStream.optimizeForDirectExecutor();
Expand All @@ -370,8 +373,9 @@ public void run() {

@Override
public void setCompressor(final Compressor compressor) {
checkState(listener == null, "May only be called before start");
checkNotNull(compressor, "compressor");
delayOrExecute(new Runnable() {
preStartPendingCalls.add(new Runnable() {
@Override
public void run() {
realStream.setCompressor(compressor);
Expand All @@ -381,7 +385,8 @@ public void run() {

@Override
public void setFullStreamDecompression(final boolean fullStreamDecompression) {
delayOrExecute(
checkState(listener == null, "May only be called before start");
preStartPendingCalls.add(
new Runnable() {
@Override
public void run() {
Expand All @@ -392,8 +397,9 @@ public void run() {

@Override
public void setDecompressorRegistry(final DecompressorRegistry decompressorRegistry) {
checkState(listener == null, "May only be called before start");
checkNotNull(decompressorRegistry, "decompressorRegistry");
delayOrExecute(new Runnable() {
preStartPendingCalls.add(new Runnable() {
@Override
public void run() {
realStream.setDecompressorRegistry(decompressorRegistry);
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/java/io/grpc/internal/DelayedStreamTest.java
Expand Up @@ -92,9 +92,9 @@ public void start_afterStart() {

@Test
public void setStream_sendsAllMessages() {
stream.start(listener);
stream.setCompressor(Codec.Identity.NONE);
stream.setDecompressorRegistry(DecompressorRegistry.getDefaultInstance());
stream.start(listener);

stream.setMessageCompression(true);
InputStream message = new ByteArrayInputStream(new byte[]{'a'});
Expand Down

0 comments on commit f26a501

Please sign in to comment.