Skip to content

Commit

Permalink
refactor: centralize retry state for PubSub Pull operations
Browse files Browse the repository at this point in the history
This is the first part of addressing googleapis#11793.

The next step will be to allow a custom retry predicate to be provided, which can examine the previous exceptions etc, with a default which will fail after a certain time, or after multiple internal failures for example. (The list of exceptions here could easily turn into a map of from "status code to number of exceptions".)

We may also want to implement something similar for Publisher operations, but that can be done separately.
  • Loading branch information
jskeet committed Apr 25, 2024
1 parent 679648b commit 7bbdcd3
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,14 @@ internal RetryInfo(DateTime firstTimeOfFailureInUtc, TimeSpan? backoff = null)
private long _extendThrottleLow = 0; // Incremented after _extendQueueThrottleInterval, checked when throttling.
private bool _exactlyOnceDeliveryEnabled = false; // True if subscription is exactly once, else false.
private bool _messageOrderingEnabled = false; // True if subscription has ordering enabled, else false.
private TimeSpan? _pullBackoff = null;
private readonly RetryState _retryState;
private readonly ILogger _logger;

internal SingleChannel(SubscriberClientImpl subscriber,
SubscriberServiceApiClient client, SubscriptionHandler handler,
Flow flow, bool useLegacyFlowControl,
Action<Task> registerTaskFn)
Action<Task> registerTaskFn,
IClock clock)
{
_registerTaskFn = registerTaskFn;
_taskHelper = subscriber._taskHelper;
Expand All @@ -206,6 +207,7 @@ internal RetryInfo(DateTime firstTimeOfFailureInUtc, TimeSpan? backoff = null)
_eventReceiptModAckForExactlyOnceDelivery = new AsyncAutoResetEvent(subscriber._taskHelper);
_continuationQueue = new AsyncSingleRecvQueue<TaskNextAction>(subscriber._taskHelper);
_logger = subscriber.Logger;
_retryState = new RetryState(clock, s_pullBackoff, TimeSpan.FromSeconds(0.5));
}

internal async Task StartAsync()
Expand Down Expand Up @@ -298,7 +300,7 @@ private void StopStreamingPull()
// If backoff is non-zero delay before opening streaming-pull.
private void StartStreamingPull()
{
if (_pullBackoff is TimeSpan backoff)
if (_retryState.Backoff is TimeSpan backoff)
{
// Delay, then start the streaming-pull.
_logger?.LogDebug("Delaying for {seconds}s before streaming pull call.", (int) backoff.TotalSeconds);
Expand Down Expand Up @@ -328,37 +330,43 @@ private void HandleStartStreamingPullWithoutBackoff()
Add(initTask, Next(true, () => HandlePullMoveNext(initTask)));
}

private bool HandleRpcFailure(Exception e)
/// <summary>
/// Restarts the pull operation (with a backoff) if the given exception is retriable, otherwise throws it.
/// </summary>
private void RestartPullOrThrow(Exception e)
{
if (e != null)
if (_retryState.ShouldRetry(e))
{
if (e.As<RpcException>()?.IsRecoverable() ?? false)
{
_logger?.LogWarning(e, "Recoverable error in streaming pull; will retry.");
// Recoverable RPC error, stop and restart pull.
StopStreamingPull();
// Increase backoff internal and start stream again.
// If stream-pull fails repeatly, increase the delay, up to a maximum of 30 seconds.
_pullBackoff = s_pullBackoff.NextBackoff(_pullBackoff ?? TimeSpan.Zero);
StartStreamingPull();
return true;
}
else
{
_logger?.LogError(e, "Unrecoverable error in streaming pull; aborting subscriber.");
// Unrecoverable error; throw it.
throw e.FlattenIfPossible();
}
RestartPullAfterRetriableFailure(e);
}
else
{
_logger?.LogError(e, "Unrecoverable error in streaming pull; aborting subscriber.");
// Unrecoverable error; throw it.
throw e.FlattenIfPossible();
}
return false;
}

/// <summary>
/// Restarts the streaming pull after an exception which is expected to already be retriable.
/// (This isn't rechecked.)
/// </summary>
private void RestartPullAfterRetriableFailure(Exception e)
{
_logger?.LogWarning(e, "Recoverable error in streaming pull; will retry.");
// Update the retry state, increasing the backoff etc.
_retryState.OnFailure(e);
StopStreamingPull();
StartStreamingPull();
}

// Pull-stream is ready; call MoveNext to wait for messages.
private void HandlePullMoveNext(Task initTask)
{
// Check if the init write failed.
if (initTask != null && HandleRpcFailure(initTask.Exception))
if (initTask?.Exception is Exception ex)
{
RestartPullOrThrow(ex);
return;
}
// Check if pulls need throttling due to push queues being too full, or too slow to push.
Expand Down Expand Up @@ -389,14 +397,15 @@ private void HandlePullMoveNext(Task initTask)
// Message-stream has messages (or not, depending on moveNextResult)
private void HandlePullMessageData(Task<bool> moveNextTask)
{
if (HandleRpcFailure(moveNextTask.Exception))
if (moveNextTask.Exception is Exception ex)
{
RestartPullOrThrow(ex);
return;
}
if (moveNextTask.Result)
{
// Successful receive. Reset pull backoff to zero.
_pullBackoff = null;
// Successful receive. Reset retry state.
_retryState.OnSuccess();
// Copy msgs to list, and clear original proto repeatedfield; to remove refs to large messages as soon as possible.
// It is not possible to set RepeatedField elements to null, so messages need transfering to a List.
StreamingPullResponse current;
Expand All @@ -406,9 +415,9 @@ private void HandlePullMessageData(Task<bool> moveNextTask)
_exactlyOnceDeliveryEnabled = current.SubscriptionProperties?.ExactlyOnceDeliveryEnabled ?? false;
_messageOrderingEnabled = current.SubscriptionProperties?.MessageOrderingEnabled ?? false;
}
catch (Exception e) when (e.As<RpcException>()?.IsRecoverable() ?? false)
catch (Exception e) when (_retryState.ShouldRetry(e))
{
HandleRpcFailure(e);
RestartPullAfterRetriableFailure(e);
return;
}
var receivedMessages = current.ReceivedMessages;
Expand Down Expand Up @@ -448,7 +457,7 @@ private void HandlePullMessageData(Task<bool> moveNextTask)
{
StopStreamingPull();
// Always a short pause on server disconnect.
_pullBackoff = TimeSpan.FromSeconds(0.5);
_retryState.OnServerDisconnect();
StartStreamingPull();
}

Expand Down Expand Up @@ -1089,5 +1098,70 @@ private void UpdateReceiptModAckStatus(IEnumerable<string> ids, bool? status)
_receiptModAckStatusLookup[id] = status;
}
}

/// <summary>
/// Maintains the state of retry operations.
/// Currently this is only used for pull operations, but it could be used for publishing later.
/// Currently this only uses the existing "IsRecoverable" exception check; in the future we're
/// likely to expose the rest of this state for more nuanced decisions.
/// </summary>
private class RetryState
{
private readonly IClock _clock;
// Only used for timing.
private readonly RetrySettings _retrySettings;
private readonly TimeSpan _disconnectBackoff;

private readonly List<RpcException> _exceptions;
private DateTime? _firstExceptionTimestamp;
public TimeSpan? Backoff { get; private set; }

public RetryState(IClock clock, RetrySettings retrySettings, TimeSpan disconnectBackoff)
{
_clock = clock;
_exceptions = new List<RpcException>();
_firstExceptionTimestamp = null;
_retrySettings = retrySettings;
_disconnectBackoff = disconnectBackoff;
Backoff = null;
}

/// <summary>
/// Checks whether the given exception should be retried, without updating any state.
/// </summary>
public bool ShouldRetry(Exception exception) =>
exception?.As<RpcException>()?.IsRecoverable() ?? false;

/// <summary>
/// Records the given exception in the retry state, updating the backoff accordingly.
/// </summary>
public void OnFailure(Exception exception)
{
_firstExceptionTimestamp ??= _clock.GetCurrentDateTimeUtc();
if (exception?.As<RpcException>() is RpcException rpcException && _exceptions.Count < 100)
{
_exceptions.Add(rpcException);
}
Backoff = _retrySettings.NextBackoff(Backoff ?? TimeSpan.Zero);
}

/// <summary>
/// Records that an operation succeeded.
/// </summary>
public void OnSuccess()
{
_firstExceptionTimestamp = null;
_exceptions.Clear();
Backoff = null;
}

public void OnServerDisconnect()
{
// Ignore previous exceptions.
OnSuccess();
// Backoff briefly.
Backoff = _disconnectBackoff;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public override Task StartAsync(SubscriptionHandler handler)
// Start all subscribers
var subscriberTasks = _clients.Select(client =>
{
var singleChannel = new SingleChannel(this, client, handler, flow, _useLegacyFlowControl, registerTask);
var singleChannel = new SingleChannel(this, client, handler, flow, _useLegacyFlowControl, registerTask, _clock);
return _taskHelper.Run(() => singleChannel.StartAsync());
}).ToArray();
// Set up finish task; code that executes when this subscriber is being shutdown (for whatever reason).
Expand Down

0 comments on commit 7bbdcd3

Please sign in to comment.