Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

census: Fix retry stats data race #8422

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
27 changes: 23 additions & 4 deletions census/src/main/java/io/grpc/census/CensusStatsModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -356,12 +356,12 @@ public void streamClosed(Status status) {
if (module.recordFinishedRpcs) {
// Stream is closed early. So no need to record metrics for any inbound events after this
// point.
recordFinishedRpc();
recordFinishedAttempt();
}
} // Otherwise will report stats in callEnded() to guarantee all inbound metrics are recorded.
}

void recordFinishedRpc() {
void recordFinishedAttempt() {
MeasureMap measureMap = module.statsRecorder.newMeasureMap()
// TODO(songya): remove the deprecated measure constants once they are completed removed.
.put(DeprecatedCensusConstants.RPC_CLIENT_FINISHED_COUNT, 1)
Expand Down Expand Up @@ -439,7 +439,10 @@ static final class CallAttemptsTracerFactory extends
private final AtomicLong retryDelayNanos = new AtomicLong();
private final AtomicLong lastInactiveTimeStamp = new AtomicLong();
private final AtomicInteger activeStreams = new AtomicInteger();
private final AtomicInteger activeStreams2 = new AtomicInteger();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand. Why do we need another atomic variable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the first atomic variable

if (activeStreams.decrementAndGet() == 0) {
    // Race condition between two extremely close events does not matter because the difference
    // in the result would be very small.
    long lastInactiveTimeStamp =
        this.lastInactiveTimeStamp.getAndSet(stopwatch.elapsed(TimeUnit.NANOSECONDS));
    retryDelayNanos.addAndGet(-lastInactiveTimeStamp);
}

is doing something after the atomic check, and that thing can not be done atomically with the decrement. If another thread is using the same atomic variable, it will be broken.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is doing something after the atomic check, and that thing can not be done atomically with the decrement

Why does it need to be? There's no critical section, so it seems "any time after decrementAndGet() == 0 must be safe."

If another thread is using the same atomic variable, it will be broken.

What atomic variable? How would another thread be using it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not safe when callEnded() reports RETRY_DELAY_PER_CALL, to be exaggerated adding a long sleep as follows:

// attmeptEnded()
if (activeStreams.decrementAndGet() == 0) {
    Thread.sleep(10000000000);   // long sleep
    long lastInactiveTimeStamp =
        this.lastInactiveTimeStamp.getAndSet(stopwatch.elapsed(TimeUnit.NANOSECONDS));
    retryDelayNanos.addAndGet(-lastInactiveTimeStamp);   // write operation
    if (callEnded == 1) {
      recordFinishedCall();
    }
}

...

// callEnded()
if (callEndedUpdater != null) {
  if (callEndedUpdater.getAndSet(this, 1) != 0) {
    return;
  }
} else {
  if (callEnded != 0) {
    return;
  }
  callEnded = 1;
}
if (activeStreams.get() == 0) {  // another thread using the same atomic variable
  recordFinishedCall();   // read operation not safe
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A race on retryDelayNanos wouldn't have been detected by a race detector. The race detector noticed a race on stopwatch. But why would there be a race on stopwatch, since stopwatch.elapsed() is read-only? (After all, we could have just saved a System.nanoTime() ourselves and that wouldn't change over time.)

It looks like the stopwatch.stop() in callEnded() shouldn't be there, as it adds no value and introduces a write that could interfere with attemptEnded(). I agree that you've found an additional race though.

Concerning updating retryDelayNanos in attemptEnded(), I think we should just "not do that." Instead of taking the full RPC duration and subtracting out the time in each attempt, can we instead just add the individual time periods when there has been no attempt? That is, we update the atomic when a delay period ends instead of when it starts? It looks like that'd be as simple as:

// CallAttemptsTracerFactory.<init>
retryDelayStopwatch = module.stopwatchSupplier.get(); // initially stopped

// newClientStreamTracer
if (activeStreams.incrementAndGet() == 1) {
  retryDelayStopwatch.stop(); // does nothing for the first attempt
  retryDelayNanos = stopwatch.elapsed(); // just a plain volatile
}

// attemptEnded
if (activeStreams.decrementAndGet() == 0) {
  retryDelayStopwatch.start();
}

That has the advantage that the stopwatch won't be racy and will be exactly 0 if there were no retries.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A race on retryDelayNanos wouldn't have been detected by a race detector. The race detector noticed a race on stopwatch. But why would there be a race on stopwatch, since stopwatch.elapsed() is read-only? (After all, we could have just saved a System.nanoTime() ourselves and that wouldn't change over time.)

The read/write operations inside the if-block

if (activeStreams.decrementAndGet() == 0) { ...}

can happen concurrently with the write/read operations callEnded(). So the race detector noticed stopwatch.elapsed() first.

It looks like the stopwatch.stop() in callEnded() shouldn't be there, as it adds no value.

I agree.

can we instead just add the individual time periods when there has been no attempt? That is, we update the atomic when a delay period ends instead of when it starts? It looks like that'd be as simple as:

// CallAttemptsTracerFactory.<init>
retryDelayStopwatch = module.stopwatchSupplier.get(); // initially stopped

// newClientStreamTracer
if (activeStreams.incrementAndGet() == 1) {
  retryDelayStopwatch.stop(); // does nothing for the first attempt
  retryDelayNanos = stopwatch.elapsed(); // just a plain volatile
}

// attemptEnded
if (activeStreams.decrementAndGet() == 0) {
  retryDelayStopwatch.start();
}

It works for retry but not for hedging.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The read/write operations inside the if-block

Yeah, I was basically getting to a part that "there has to be a bug." And that bug was we were doing a write with stopwatch.stop().

It works for retry but not for hedging.

I just came here to mention that, as it just dawned on me. There are ways to fix that, but they wouldn't be trivial. I think the answer is "use a lock."

// newClientStreamTracer
synchronized (this) {
  if (activeStreams++ == 0) { // not volatile
    retryDelayStopwatch.stop(); // does nothing for the first attempt
    retryDelayNanos = stopwatch.elapsed();
  }
}

// attemptEnded
synchronized (this) {
  if (--activeStreams == 0) {
    retryDelayStopwatch.start();
  }
}

//------------ or -----------

// attemptEnded
synchronized (this) {
  if (activeStreams.decrementAndGet() == 0) {
    long lastInactiveTimeStamp =
            this.lastInactiveTimeStamp.getAndSet(stopwatch.elapsed(TimeUnit.NANOSECONDS));
    retryDelayNanos.addAndGet(-lastInactiveTimeStamp);
  }
}

I'd be willing to go further moving "anything that is used for control flow" under the lock. So attemptsPerCall and callEnded.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the answer is "use a lock."

That's fair. The original atomic counter activeStream was already too complex. I was not using a lock in the origin implementation just because I had feeling that CensusStatsModule is performance sensitive (I saw it's using AtomicReferenceFieldUpdaters). Even it's viable to fix without a lock, I'd prefer using lock.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be willing to go further moving "anything that is used for control flow" under the lock.

I was thinking about that too, for fixing the isReady()-halfClose race. Using a lock seems only possible solution in that case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about that too, for fixing the isReady()-halfClose race. Using a lock seems only possible solution in that case.

The simplest possible solution is to make MessageFramer.closed volatile. Better though I think is to just stop using framer().isClosed() in isReady() check. AbstractStream (or the subclasses) can just keep their own volatile variable that is set when halfClose()/close() is called. In fact, AbstractClientStream already has such a variable: AbstractClientStream.TransportState.outboundClosed.

private final AtomicBoolean activated = new AtomicBoolean();
private AtomicBoolean finishedRpcRecorded = new AtomicBoolean();
private Status status;

CallAttemptsTracerFactory(
CensusStatsModule module, TagContext parentCtx, String fullMethodName) {
Expand All @@ -462,6 +465,7 @@ static final class CallAttemptsTracerFactory extends
@Override
public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata metadata) {
ClientTracer tracer = new ClientTracer(this, module, parentCtx, startCtx, info);
activeStreams2.incrementAndGet();
if (activeStreams.incrementAndGet() == 1) {
if (!activated.compareAndSet(false, true)) {
retryDelayNanos.addAndGet(stopwatch.elapsed(TimeUnit.NANOSECONDS));
Expand Down Expand Up @@ -489,9 +493,15 @@ void attemptEnded() {
this.lastInactiveTimeStamp.getAndSet(stopwatch.elapsed(TimeUnit.NANOSECONDS));
retryDelayNanos.addAndGet(-lastInactiveTimeStamp);
}
if (activeStreams2.decrementAndGet() == 0) {
if (callEnded == 1) {
recordFinishedCall();
}
}
}

void callEnded(Status status) {
this.status = status;
if (callEndedUpdater != null) {
if (callEndedUpdater.getAndSet(this, 1) != 0) {
return;
Expand All @@ -502,6 +512,15 @@ void callEnded(Status status) {
}
callEnded = 1;
}
if (activeStreams2.get() == 0) {
recordFinishedCall();
}
}

void recordFinishedCall() {
if (!finishedRpcRecorded.compareAndSet(false, true)) {
return;
}
if (!module.recordFinishedRpcs) {
return;
}
Expand All @@ -510,9 +529,9 @@ void callEnded(Status status) {
ClientTracer tracer = new ClientTracer(this, module, parentCtx, startCtx, null);
tracer.roundtripNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS);
tracer.statusCode = status.getCode();
tracer.recordFinishedRpc();
tracer.recordFinishedAttempt();
} else if (inboundMetricTracer != null) {
inboundMetricTracer.recordFinishedRpc();
inboundMetricTracer.recordFinishedAttempt();
}

long retriesPerCall = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,70 @@ public void statsRecorded() throws Exception {
assertRetryStatsRecorded(1, 0, 10_000);
}

@Test
public void statsRecorde_callCancelledBeforeCommit() throws Exception {
startNewServer();
retryPolicy = ImmutableMap.<String, Object>builder()
.put("maxAttempts", 4D)
.put("initialBackoff", "10s")
.put("maxBackoff", "10s")
.put("backoffMultiplier", 1D)
.put("retryableStatusCodes", Arrays.<Object>asList("UNAVAILABLE"))
.build();
createNewChannel();

// We will have streamClosed return at a particular moment that we want.
final CountDownLatch streamClosedLatch = new CountDownLatch(1);
ClientStreamTracer.Factory streamTracerFactory = new ClientStreamTracer.Factory() {
@Override
public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
return new ClientStreamTracer() {
@Override
public void streamClosed(Status status) {
if (status.getCode().equals(Code.CANCELLED)) {
try {
streamClosedLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new AssertionError("streamClosedLatch interrupted", e);
}
}
}
};
}
};
ClientCall<String, Integer> call = channel.newCall(
clientStreamingMethod, CallOptions.DEFAULT.withStreamTracerFactory(streamTracerFactory));
call.start(mockCallListener, new Metadata());
assertRpcStartedRecorded();
fakeClock.forwardTime(5, SECONDS);
String message = "String of length 20.";
call.sendMessage(message);
assertOutboundMessageRecorded();
ServerCall<String, Integer> serverCall = serverCalls.poll(5, SECONDS);
serverCall.request(2);
assertOutboundWireSizeRecorded(message.length());
// trigger retry
serverCall.close(
Status.UNAVAILABLE.withDescription("original attempt failed"),
new Metadata());
assertRpcStatusRecorded(Code.UNAVAILABLE, 5000, 1);
elapseBackoff(10, SECONDS);
assertRpcStartedRecorded();
assertOutboundMessageRecorded();
serverCall = serverCalls.poll(5, SECONDS);
serverCall.request(2);
assertOutboundWireSizeRecorded(message.length());
fakeClock.forwardTime(7, SECONDS);
call.cancel("Cancelled before commit", null); // A noop substream will commit.
// The call listener is closed, but the netty substream listener is not yet closed.
verify(mockCallListener, timeout(5000)).onClose(any(Status.class), any(Metadata.class));
// Let the netty substream listener be closed.
streamClosedLatch.countDown();
assertRetryStatsRecorded(1, 0, 10_000);
assertRpcStatusRecorded(Code.CANCELLED, 7_000, 1);
}

@Test
public void serverCancelledAndClientDeadlineExceeded() throws Exception {
startNewServer();
Expand Down