From 8023457f77e1807219b1f713066f5870a9bd5649 Mon Sep 17 00:00:00 2001 From: ZHANG Dapeng Date: Thu, 2 Sep 2021 10:24:22 -0700 Subject: [PATCH] census: Fix retry stats data race (#8459) There is data race in `CensusStatsModule. CallAttemptsTracerFactory`: If client call is cancelled while an active stream on the transport is not committed, then a [noop substream](https://github.com/grpc/grpc-java/blob/v1.40.0/core/src/main/java/io/grpc/internal/RetriableStream.java#L486) will be committed and the active stream will be cancelled. Because the active stream cancellation triggers the stream listener closed() on the _transport_ thread, the closed() method can be invoked concurrently with the call listener onClose(). Therefore, one `CallAttemptsTracerFactory.attemptEnded()` can be called concurrently with `CallAttemptsTracerFactory.callEnded()`, and there could be data race on RETRY_DELAY_PER_CALL. See also the regression test added. The same data race can happen in hedging case when one of hedges is committed and completes the call, other uncommitted hedges would cancel themselves and trigger their stream listeners closed() on the transport_thread concurrently. Fixing the race by recording RETRY_DELAY_PER_CALL once both the conditions are met: - callEnded is true - number of active streams is 0. --- .../io/grpc/census/CensusStatsModule.java | 111 ++++++++++-------- .../grpc/testing/integration/RetryTest.java | 64 ++++++++++ 2 files changed, 123 insertions(+), 52 deletions(-) diff --git a/census/src/main/java/io/grpc/census/CensusStatsModule.java b/census/src/main/java/io/grpc/census/CensusStatsModule.java index 6faeb575ccc..6f8acdb71e9 100644 --- a/census/src/main/java/io/grpc/census/CensusStatsModule.java +++ b/census/src/main/java/io/grpc/census/CensusStatsModule.java @@ -55,13 +55,13 @@ import io.opencensus.tags.unsafe.ContextUtils; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; /** * Provides factories for {@link StreamTracer} that records stats to Census. @@ -356,12 +356,12 @@ public void streamClosed(Status status) { if (module.recordFinishedRpcs) { // Stream is closed early. So no need to record metrics for any inbound events after this // point. - recordFinishedRpc(); + recordFinishedAttempt(); } } // Otherwise will report stats in callEnded() to guarantee all inbound metrics are recorded. } - void recordFinishedRpc() { + void recordFinishedAttempt() { MeasureMap measureMap = module.statsRecorder.newMeasureMap() // TODO(songya): remove the deprecated measure constants once they are completed removed. .put(DeprecatedCensusConstants.RPC_CLIENT_FINISHED_COUNT, 1) @@ -405,30 +405,11 @@ static final class CallAttemptsTracerFactory extends Measure.MeasureDouble.create( "grpc.io/client/retry_delay_per_call", "Retry delay per call", "ms"); - @Nullable - private static final AtomicIntegerFieldUpdater callEndedUpdater; - - /** - * When using Atomic*FieldUpdater, some Samsung Android 5.0.x devices encounter a bug in their - * JDK reflection API that triggers a NoSuchFieldException. When this occurs, we fallback to - * (potentially racy) direct updates of the volatile variables. - */ - static { - AtomicIntegerFieldUpdater tmpCallEndedUpdater; - try { - tmpCallEndedUpdater = - AtomicIntegerFieldUpdater.newUpdater(CallAttemptsTracerFactory.class, "callEnded"); - } catch (Throwable t) { - logger.log(Level.SEVERE, "Creating atomic field updaters failed", t); - tmpCallEndedUpdater = null; - } - callEndedUpdater = tmpCallEndedUpdater; - } - ClientTracer inboundMetricTracer; private final CensusStatsModule module; private final Stopwatch stopwatch; - private volatile int callEnded; + @GuardedBy("lock") + private boolean callEnded; private final TagContext parentCtx; private final TagContext startCtx; private final String fullMethodName; @@ -436,17 +417,22 @@ static final class CallAttemptsTracerFactory extends // TODO(zdapeng): optimize memory allocation using AtomicFieldUpdater. private final AtomicLong attemptsPerCall = new AtomicLong(); private final AtomicLong transparentRetriesPerCall = new AtomicLong(); - private final AtomicLong retryDelayNanos = new AtomicLong(); - private final AtomicLong lastInactiveTimeStamp = new AtomicLong(); - private final AtomicInteger activeStreams = new AtomicInteger(); - private final AtomicBoolean activated = new AtomicBoolean(); + // write happens before read + private Status status; + private final Object lock = new Object(); + // write @GuardedBy("lock") and happens before read + private long retryDelayNanos; + @GuardedBy("lock") + private int activeStreams; + @GuardedBy("lock") + private boolean finishedCallToBeRecorded; CallAttemptsTracerFactory( CensusStatsModule module, TagContext parentCtx, String fullMethodName) { this.module = checkNotNull(module, "module"); this.parentCtx = checkNotNull(parentCtx, "parentCtx"); this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName"); - this.stopwatch = module.stopwatchSupplier.get().start(); + this.stopwatch = module.stopwatchSupplier.get(); TagValue methodTag = TagValue.create(fullMethodName); startCtx = module.tagger.toBuilder(parentCtx) .putLocal(RpcMeasureConstants.GRPC_CLIENT_METHOD, methodTag) @@ -461,10 +447,14 @@ static final class CallAttemptsTracerFactory extends @Override public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata metadata) { - ClientTracer tracer = new ClientTracer(this, module, parentCtx, startCtx, info); - if (activeStreams.incrementAndGet() == 1) { - if (!activated.compareAndSet(false, true)) { - retryDelayNanos.addAndGet(stopwatch.elapsed(TimeUnit.NANOSECONDS)); + synchronized (lock) { + if (finishedCallToBeRecorded) { + // This can be the case when the called is cancelled but a retry attempt is created. + return new ClientStreamTracer() {}; + } + if (++activeStreams == 1 && stopwatch.isRunning()) { + stopwatch.stop(); + retryDelayNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS); } } if (module.recordStartedRpcs && attemptsPerCall.get() > 0) { @@ -477,42 +467,59 @@ public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata metada } else { attemptsPerCall.incrementAndGet(); } - return tracer; + return new ClientTracer(this, module, parentCtx, startCtx, info); } // Called whenever each attempt is ended. void attemptEnded() { - if (activeStreams.decrementAndGet() == 0) { - // Race condition between two extremely close events does not matter because the difference - // in the result would be very small. - long lastInactiveTimeStamp = - this.lastInactiveTimeStamp.getAndSet(stopwatch.elapsed(TimeUnit.NANOSECONDS)); - retryDelayNanos.addAndGet(-lastInactiveTimeStamp); + if (!module.recordFinishedRpcs) { + return; + } + boolean shouldRecordFinishedCall = false; + synchronized (lock) { + if (--activeStreams == 0) { + stopwatch.start(); + if (callEnded && !finishedCallToBeRecorded) { + shouldRecordFinishedCall = true; + finishedCallToBeRecorded = true; + } + } + } + if (shouldRecordFinishedCall) { + recordFinishedCall(); } } void callEnded(Status status) { - if (callEndedUpdater != null) { - if (callEndedUpdater.getAndSet(this, 1) != 0) { + if (!module.recordFinishedRpcs) { + return; + } + this.status = status; + boolean shouldRecordFinishedCall = false; + synchronized (lock) { + if (callEnded) { + // FIXME(https://github.com/grpc/grpc-java/issues/7921): this shouldn't happen return; } - } else { - if (callEnded != 0) { - return; + callEnded = true; + if (activeStreams == 0 && !finishedCallToBeRecorded) { + shouldRecordFinishedCall = true; + finishedCallToBeRecorded = true; } - callEnded = 1; } - if (!module.recordFinishedRpcs) { - return; + if (shouldRecordFinishedCall) { + recordFinishedCall(); } - stopwatch.stop(); + } + + void recordFinishedCall() { if (attemptsPerCall.get() == 0) { ClientTracer tracer = new ClientTracer(this, module, parentCtx, startCtx, null); tracer.roundtripNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS); tracer.statusCode = status.getCode(); - tracer.recordFinishedRpc(); + tracer.recordFinishedAttempt(); } else if (inboundMetricTracer != null) { - inboundMetricTracer.recordFinishedRpc(); + inboundMetricTracer.recordFinishedAttempt(); } long retriesPerCall = 0; @@ -523,7 +530,7 @@ void callEnded(Status status) { MeasureMap measureMap = module.statsRecorder.newMeasureMap() .put(RETRIES_PER_CALL, retriesPerCall) .put(TRANSPARENT_RETRIES_PER_CALL, transparentRetriesPerCall.get()) - .put(RETRY_DELAY_PER_CALL, retryDelayNanos.get() / NANOS_PER_MILLI); + .put(RETRY_DELAY_PER_CALL, retryDelayNanos / NANOS_PER_MILLI); TagValue methodTag = TagValue.create(fullMethodName); TagValue statusTag = TagValue.create(status.getCode().toString()); measureMap.record( diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/RetryTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/RetryTest.java index bdf39e8546a..eb815501d5c 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/RetryTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/RetryTest.java @@ -356,6 +356,70 @@ public void statsRecorded() throws Exception { assertRetryStatsRecorded(1, 0, 10_000); } + @Test + public void statsRecorde_callCancelledBeforeCommit() throws Exception { + startNewServer(); + retryPolicy = ImmutableMap.builder() + .put("maxAttempts", 4D) + .put("initialBackoff", "10s") + .put("maxBackoff", "10s") + .put("backoffMultiplier", 1D) + .put("retryableStatusCodes", Arrays.asList("UNAVAILABLE")) + .build(); + createNewChannel(); + + // We will have streamClosed return at a particular moment that we want. + final CountDownLatch streamClosedLatch = new CountDownLatch(1); + ClientStreamTracer.Factory streamTracerFactory = new ClientStreamTracer.Factory() { + @Override + public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) { + return new ClientStreamTracer() { + @Override + public void streamClosed(Status status) { + if (status.getCode().equals(Code.CANCELLED)) { + try { + streamClosedLatch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AssertionError("streamClosedLatch interrupted", e); + } + } + } + }; + } + }; + ClientCall call = channel.newCall( + clientStreamingMethod, CallOptions.DEFAULT.withStreamTracerFactory(streamTracerFactory)); + call.start(mockCallListener, new Metadata()); + assertRpcStartedRecorded(); + fakeClock.forwardTime(5, SECONDS); + String message = "String of length 20."; + call.sendMessage(message); + assertOutboundMessageRecorded(); + ServerCall serverCall = serverCalls.poll(5, SECONDS); + serverCall.request(2); + assertOutboundWireSizeRecorded(message.length()); + // trigger retry + serverCall.close( + Status.UNAVAILABLE.withDescription("original attempt failed"), + new Metadata()); + assertRpcStatusRecorded(Code.UNAVAILABLE, 5000, 1); + elapseBackoff(10, SECONDS); + assertRpcStartedRecorded(); + assertOutboundMessageRecorded(); + serverCall = serverCalls.poll(5, SECONDS); + serverCall.request(2); + assertOutboundWireSizeRecorded(message.length()); + fakeClock.forwardTime(7, SECONDS); + call.cancel("Cancelled before commit", null); // A noop substream will commit. + // The call listener is closed, but the netty substream listener is not yet closed. + verify(mockCallListener, timeout(5000)).onClose(any(Status.class), any(Metadata.class)); + // Let the netty substream listener be closed. + streamClosedLatch.countDown(); + assertRetryStatsRecorded(1, 0, 10_000); + assertRpcStatusRecorded(Code.CANCELLED, 7_000, 1); + } + @Test public void serverCancelledAndClientDeadlineExceeded() throws Exception { startNewServer();