Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PubSub Subscriber with StreamingPull Suddenly Stops Regularly #11793

Open
philvmx1 opened this issue Feb 23, 2024 · 72 comments
Open

PubSub Subscriber with StreamingPull Suddenly Stops Regularly #11793

philvmx1 opened this issue Feb 23, 2024 · 72 comments
Assignees
Labels
api: pubsub Issues related to the Pub/Sub API. type: question Request for information or clarification. Not an issue.

Comments

@philvmx1
Copy link

This is a consistent problem for us and many others. My theory is that the server side hangs up in the middle of a keepalive and RpcException has IsRpcCancellation on the line linked below which shuts down the subscription.

Since we expect StartNew to continue to run and RPC connection issues to be resolved, this causes our workers to stop silently leaving us dead in the water.

if (next.IsPull && (task.IsCanceled || (task.IsFaulted && (task.Exception.IsCancellation() || task.Exception.IsRpcCancellation()))))

@amanda-tarafa amanda-tarafa added type: question Request for information or clarification. Not an issue. api: pubsub Issues related to the Pub/Sub API. labels Feb 23, 2024
@amanda-tarafa amanda-tarafa self-assigned this Feb 23, 2024
@amanda-tarafa
Copy link
Contributor

Can you please share a minimal but complete project that would reproduce the problem? A console application should be fine. Note I understand your theory requires something specific to happen to trigger the issue, so I understand this repro project won't be able to consistently reproduce the problem, that's fine, still I'd expect it would reproduce it if we run it long enough. We need to see exactly how your code looks like, for instance, you mention StartNew but we don't have a StartNew method or type in Google.Cloud.PubSub.V1.

@amanda-tarafa amanda-tarafa added the needs more info This issue needs more information from the customer to proceed. label Feb 23, 2024
@philvmx1
Copy link
Author

my apologies, I meant StartAsync. Will follow up with a sample console app.

@philvmx1
Copy link
Author

We also did some metrics analysis and this seems to happen while there is a sharp spike in ModifyAckDeadline requests across all of our services. This specific service is shown below - it's dropping Number of open streaming pull requests.

This specific topic uses message ordering. At this time, the message ordering key is unique per message (filename since we're using Object Create triggers in GCS). It is also the only subscription where we see this issue. We have a change coming soon to make Ordering Key per file prefix instead.

Sample Code:

using Google.Api.Gax;
using Google.Cloud.PubSub.V1;

const string PROJECT_NAME = "myproject";
const string SUBSCRIPTION_NAME = "mysubscription";
var subscriberClientBuilder = new SubscriberClientBuilder
{
    Settings = new ()
    {
        FlowControlSettings = new FlowControlSettings(10,null),
    },
    SubscriptionName = new (PROJECT_NAME, SUBSCRIPTION_NAME)
};

var cts = new CancellationTokenSource();
var subscriber = await subscriberClientBuilder.BuildAsync(cts.Token);

var task = subscriber.StartAsync(async (message, ct) => 
{
    Console.WriteLine($"Message Received {message.MessageId}");
    if (ct.IsCancellationRequested) Console.WriteLine("Cancellation Requested");
    return await Task.FromResult(SubscriberClient.Reply.Ack);
});

Console.WriteLine("Waiting for Messages, Press any key to cancel and exit.");
Console.ReadKey();
cts.Cancel();

// Oddly, without StopAsync, StartAsync doesn't stop even though we called cancel on the ambient CTS.
await subscriber.StopAsync(cts.Token);
await task;

if (cts.Token.IsCancellationRequested) Console.WriteLine("StartAsync done because Cancelled!");
Console.WriteLine("DONE!");

relevant terraform:

resource "google_pubsub_subscription" "thesubscription" {
  name = "thetopicname"
  topic = "thetopic"

  ack_deadline_seconds = 600
  enable_message_ordering = true
  enable_exactly_once_delivery = true

  retry_policy {
    minimum_backoff = "60s"
  }
  dead_letter_policy {
    dead_letter_topic = google_pubsub_topic.thetopic-dlq.id
    max_delivery_attempts = 15
  }
}

Metrics showing dropoff of Streaming Pull connections over time until it finally dies all the way and we have to restart the service. The service runs as a GKE Deployment with 2 Replicas.

Streaming Pull dropping off over time. All subscriptions ModifyingAckDeadline:

image

Filtered modify ack deadline to the one subscription_id:

image

Dropoff and Ack Message count:
image

Given that last chart, I wonder if it's related to the unique OrderingKey per message more than anything. I saw that the code had a queue for Ordered messages, but it doesn't make sense that it would behave this way. When one message got pulled in with "File123" and is ACK'd then another comes with "File234" the first one is ACK'd so should no longer have an impact on ordering.

FWIW: we process 9 distinct files each day. Each have a distinct name from day to day. Each one may take 10-20 minutes to complete, hence the deadline extension.

@amanda-tarafa
Copy link
Contributor

Thanks for the code and the detailed explanation. One reamining question is which Google.Cloud.PubSub.V1 version are you using, exactly.

I'll try to get to this later today or tomorrow.

@philvmx1
Copy link
Author

3.7.0

Thanks!

We deployed the change to OrderingKey yesterday, usually takes a day to see the StreamingPull drop off. Will be checking for the remainder of the week to see if it follows the same pattern as before.

@philvmx1
Copy link
Author

It seems we are still in the same situation even after changing the OrderingKey to file prefix "FileTypeA", "FileTypeB", ... instead of "FileTypeA_20240228" "FileTypeA_20240229", "FileTypeB_20240228", "FileTypeB_20240229", ...

Every time the daily batch messages are processed (approx 9 individual files per day with 5 different file prefixes, originating from a batch of files uploaded to bucket), there are fewer open streaming pulls until eventually they stop altogether.

If it helps, some of the message processing requires a ModifyAckDeadline due to taking longer than 10 mins. Perhaps this is part of the problem, perhaps not.

@amanda-tarafa amanda-tarafa removed the needs more info This issue needs more information from the customer to proceed. label Feb 29, 2024
@amanda-tarafa
Copy link
Contributor

Thanks for the extra info. I'll take a look as soon as I can but just for expectations it will probably be early next week.

@amanda-tarafa
Copy link
Contributor

Assigning to @jskeet as he's more familiar with this library than I am.

@jskeet
Copy link
Collaborator

jskeet commented Mar 12, 2024

Okay, I'm looking into this now - I'm afraid my experience with diagnosing Pub/Sub issues is that there'll be a lot of back and forth with various questions and tests before we get to the root problem. Thanks so much for all the info you've already provided.

I can explain one thing off the bat: the reason that cts.Cancel() isn't doing anything is that the cancellation token passed into BuildAsync is only used for the build task itself. Building a client involves obtaining a credential, which can involve network requests to a metadata server - the cancellation token allows you to cancel that. After the client has been created, that cancellation token is irrelevant.

Now, to understand your situation a bit better:

  • Am I right in saying you've just got one Pub/Sub message per file, so we're looking at just ~9 messages per day? Or are there multiple messages per file?
  • You're running on GKE: I assume that means Linux in this case? If you could let us know the distribution/version of Linux and the version of .NET you're using that would be great.
  • In terms of reproducing the problem:
    • Do you have a test cluster you're able to try this in? In particular, it would be really interesting to know if you see the same behavior on a shorter timescale, e.g. pushing 9 messages per hour instead of per day (but ideally still taking more than 10 minutes to process - using Task.Delay to simulate that, if necessary)
    • Are you able to run a test to try this from a local developer machine instead of on GKE? (There can be subtle differences between environments in terms of long-running network connections; if we can reproduce this outside GKE it would be really helpful.)
    • If you're able to run various tests, could you run one where ModifyAckDeadline isn't needed (e.g. just post ~9 nearly-empty files which can be processed and ACKed quickly)

All of this is just trying to play divide-and-conquer at the moment (and giving me enough information to reproduce the issue myself, which will massively reduce the back-and-forth). I don't have any concrete ideas about what's happening at the moment - but if I can reproduce the issue myself, I can easily add a load of logging etc.

I'm going to try a simple repro to start with of just starting a single client on my Windows box, using the same topic configuration that you've got, and occasionally adding messages to it, with a mixture of "needs modify ack deadline" and "ack quickly".

@jskeet
Copy link
Collaborator

jskeet commented Mar 12, 2024

Okay, my simplest tests didn't show anything - which is pretty much what I expected. I'll try leaving the code running for multiple hours (if I can remember not to shut my laptop down!) before moving onto the next scenarios...

@jskeet
Copy link
Collaborator

jskeet commented Mar 13, 2024

Nearly 24 hours later, the subscriber is still running fine, and the number of open streaming pull requests is stable (goes up and down a bit, as expected, but returns to the right level). I've sent large and small batches of messages, with a mixture of different simulated processing times. No joy reproducing the problem yet :(

(I'll stop this test now, as it's not getting anywhere.)

@Mihier-Roy
Copy link

Mihier-Roy commented Mar 13, 2024

Dropping in to add a very similar case we're encountering. We've got a few .NET apps on GKE that use the Pub/Sub queues. They're fairly high-volume queues - processing ~110k messages a day, peaking at around 3-4k messages an hour. It works fine on weekdays, but on weekends the number of messages published drops down a lot and we might get 1-2 an hour.

We recently updated the PubSub NuGet package from 2.9.x to 3.7.x. Since updating, we've noticed that the application seems to drop the connection to the Pub/Sub queue if there are no messages published in the last ~15 minutes or so. It seems fairly similar to this issue on the NodeJS lib - googleapis/nodejs-pubsub#1135.

So far our workaround is to just restart it, but we are looking at something more automated if we can't land on a fix in the upstream library. I hope this additional context might help with determining the cause of the issue.

@jskeet
Copy link
Collaborator

jskeet commented Mar 13, 2024

@Mihier-Roy: Are you able to reproduce the issue outside GKE? Given the mention of Docker boundaries in the linked NodeJS issue, I'm wondering whether I might be able to reproduce this in a "simple" Docker container. (Rather than having to deploy to GKE.) Will give it a try. Any more information you can provide would be really useful - see my earlier comments for the sort of thing I'm interested in. (If you could let us know your topic configuration too, that'd be great. Things like message ordering and exactly-once delivery have some interesting effects in terms of how the clients behave.)

@Mihier-Roy
Copy link

@jskeet: I haven't tried to re-produce outside of GKE just yet, but I'll take a stab at it today/tomorrow! I hope it's something we can reproduce on a Docker container.

Some additional info:

  • We use the standard debian based .NET base image (.NET 7 at the moment) for deploying our apps on GKE
  • We do have a test cluster - I'll try reproducing the issue on there since the queue has a lot lower volume on it

As for topic configuration:

  • We don't have message ordering
  • We don't have exactly-once delivery

@philvmx1
Copy link
Author

Getting a break at last from the day-to-day to respond @jskeet - your name is familiar, I think I've read your writing or watched some videos in the past. Anyhow, here's some info you requested and some findings of our own:

We're definitely running in linux containers with .NET 6 runtime. mcr.microsoft.com/dotnet/aspnet:6.0 in fact.

Thanks for clarifying about the Build vs Run context for the cancellation token, it seems relevant in our workaround. As we poured over the code, that token was quite difficult to track down. There are many cancellation tokens in play and tokens created from others IIRC (it was many lines of code ago).

We found a workaround that seems stable. In a shared library, we've had a method that sets up the subscriber.

IMessageBusSubscriber<TEventType>
Task ProcessMessages<TEventHandler>(CancellationToken cancellationToken);

Previously:

        var subscriber = await subscriberClientBuilder.BuildAsync(cancellationToken);

        await subscriber.StartAsync(
            async (message, cancelToken) =>
            {
                try
                {
                    using var scope = _serviceScopeFactory.CreateScope();
                    var processor = scope.ServiceProvider.GetService<IEventPullProcessor<TEventType>>();
                    if (scope.ServiceProvider.GetService<TEventHandler>() is not IEventHandler<TEventType> handler)
                        throw new NotImplementedException();
                    
                    return await processor.ProcessMessage(message, handler.Handle, cancelToken);
                }
                catch (Exception e)
                {
                    _logger.LogError(e, e.Message);
                    throw;
                }
            });

Because if the process was being shut down gracefully - during deployment, for example - we'd want the subscriber to shut down gracefully as well.

We now have the following implementation in which we no longer pass the ambient cancellationToken to BuildAsync.

        // DO NOT USE THE CANCELLATION TOKEN THAT IS PASSED INTO THIS METHOD.
        // We need a new cancellation token because the SubscriberClient can cancel the token itself. If the
        // SubscriberClient cancels, we want to restart the subscription. The caller of ProcessMessages should re-invoke this
        // method when it completes.
        var subscriberCancellationToken = new CancellationToken();
        var subscriber = await subscriberClientBuilder.BuildAsync(subscriberCancellationToken);
        await subscriber.StartAsync(
            async (message, cancelToken) =>
            {
                try
                {
                    using var scope = _serviceScopeFactory.CreateScope();
                    var processor = scope.ServiceProvider.GetService<IEventPullProcessor<TEventType>>();
                    if (scope.ServiceProvider.GetService<TEventHandler>() is not IEventHandler<TEventType> handler)
                        throw new NotImplementedException();

                    return await processor.ProcessMessage(message, handler.Handle, cancelToken);
                }
                catch (Exception e)
                {
                    _logger.LogError(e, e.Message);
                    throw;
                }
            });

However, since StartAsync doesn't take a cancellation token and we're awaiting, there doesn't seem to be a way to tell the subscriber client to stop.

Our consumer is a BackgroundService

public class EventWorker<TEventHandler, TEventType> : BackgroundService
{
    private readonly ILogger<EventWorker<TEventHandler, TEventType>> _logger;
    private readonly IMessageBusSubscriber<TEventType> _messageBusSubscriber;

    public EventWorker(
         ILogger<EventWorker<TEventHandler, TEventType>> logger,
         IMessageBusSubscriber<TEventType> messageBusSubscriber)
    {
        _logger = logger;
        _messageBusSubscriber = messageBusSubscriber;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                await _messageBusSubscriber.ProcessMessages<TEventHandler>(stoppingToken);
            }
            catch (Exception e)
            {
                _logger.LogError(e, e.Message);
            }

            await Task.Delay(1000, stoppingToken);
        }
    }
}

Background service is registered as a Singleton service

services.AddTransient<TEventHandler>();
services.AddSingleton<IHostedService>(x =>
    ActivatorUtilities.CreateInstance<EventWorker<TEventHandler, TEventType>>(x,
        ActivatorUtilities.CreateInstance<GcpPubSubMessageBusSubscriber<TEventType>>(x,
            Options.Create(gcpPubSubSubscriberSettings)))
);

As I said, this is in a library that we use for several services to wire up subscribers so it uses generics. A more concrete implementation might be easier to debug.

Our fix works, so...I think what was happening is this:

Since we were passing the stoppingToken to the BuildAsync...

I'm thinking something inside somewhere was switching that stoppingToken.IsCancellationRequested == true and our re-subscribe loop was opened causing it to stop altogether.

This is speculation on my part, but it led to a solution that has been working for some time now.

However... something isn't quite right with this scenario. What's causing it to stop? This single change to NOT pass that token into BuildAsync fixed the problem.

one error was logged recently which may (or not) be related, this was after our fix was in place. We don't retain logs long enough to have any from before our fix.

System.IO.IOException: The request was aborted.
 ---> System.IO.IOException: Unable to read data from the transport connection: Connection reset by peer.
 ---> System.Net.Sockets.SocketException (104): Connection reset by peer
   --- End of inner exception stack trace ---
   at System.Net.Sockets.Socket.AwaitableSocketAsyncEventArgs.ThrowException(SocketError error, CancellationToken cancellationToken)
   at System.Net.Sockets.Socket.AwaitableSocketAsyncEventArgs.System.Threading.Tasks.Sources.IValueTaskSource<System.Int32>.GetResult(Int16 token)
   at System.Net.Security.SslStream.EnsureFullTlsFrameAsync[TIOAdapter](TIOAdapter adapter)
   at System.Net.Security.SslStream.ReadAsyncInternal[TIOAdapter](TIOAdapter adapter, Memory`1 buffer)
   at System.Net.Http.Http2Connection.ProcessIncomingFramesAsync()
   --- End of inner exception stack trace ---
   at Google.Cloud.PubSub.V1.SubscriberClientImpl.SingleChannel.HandleRpcFailure(Exception e)
   at Google.Cloud.PubSub.V1.SubscriberClientImpl.SingleChannel.HandlePullMoveNext(Task initTask)
   at Google.Cloud.PubSub.V1.SubscriberClientImpl.SingleChannel.StartAsync()
   at Google.Cloud.PubSub.V1.Tasks.Extensions.<>c__DisplayClass4_0.<<ConfigureAwaitHideErrors>g__Inner|0>d.MoveNext()

@jskeet
Copy link
Collaborator

jskeet commented Mar 14, 2024

@philvmx1: Your new code is still somewhat confusing.

Firstly, the cancellation token here is pointless:

var subscriberCancellationToken = new CancellationToken();
var subscriber = await subscriberClientBuilder.BuildAsync(subscriberCancellationToken);

That's just equivalent to not passing in a cancellation token:

var subscriber = await subscriberClientBuilder.BuildAsync();

Secondly, this comment is pretty misleading in its current location:

// DO NOT USE THE CANCELLATION TOKEN THAT IS PASSED INTO THIS METHOD.
// We need a new cancellation token because the SubscriberClient can cancel the token itself. If the
// SubscriberClient cancels, we want to restart the subscription. The caller of ProcessMessages should re-invoke this
// method when it completes.

It looks like you're talking about the cancellation token passed into BuildAsync, but I'm guessing you actually mean the cancellation token passed to the delegate that's passed to subscriber.StartAsync... that's the cancellation token which SubscriberClient can (and should!) cancel, if the subscriber is being shut down. If you really did mean the cancellation token passed into BuildAsync, then the comment is just incorrect. That cancellation token is only used within BuildAsync, and couldn't be cancelled by SubscriberClient (as noted at the bottom of this GitHub comment).

However, since StartAsync doesn't take a cancellation token and we're awaiting, there doesn't seem to be a way to tell the subscriber client to stop.

The task returned by StartAsync will only complete when the subscriber has been asked to stop. It's a bit like WebApplication.RunAsync. It's not clear to me where you're calling StartAsync, but you quite possibly want to just keep hold of the task and await it on application shutdown. You could potentially do it in some kind of loop so that if the task completes in a faulted state due to some error, you restart the subscription - but I'd expect that in most cases such a fault would be permanently fatal and your application should fail at that point anyway.

If you want a kind of StartAsync which observes a cancellation token and stops the subscriber client when it's cancelled, I believe at the moment you'd need to write that yourself. It would be doable, and probably not too hard, but you'd need to bear in mind that stopping the subscriber client then has a separate cancellation token for a "hard stop" in case the "soft stop" request doesn't complete as quickly as you want.

Note that in your original code, when you were passing cts.Token into BuildAsync, nothing in the Pub/Sub code could cancel that token. It's a fundamental principle of .NET cancellation that a CancellationToken can only be used to observe a cancellation request - if you want to request cancellation, you need a CancellationTokenSource.

@jskeet
Copy link
Collaborator

jskeet commented Mar 14, 2024

@philvmx1: Separately, regarding this:

one error was logged recently which may (or not) be related, this was after our fix was in place. We don't retain logs long enough to have any from before our fix.

That looks like it could very well be related. That certainly looks like it's a single channel failing - which would then (if I'm reading the code correctly) stop the whole SubscriberClient. So if your fix effectively changed how you were restarting SubscriberClient in the face of failure, then that could explain the previous behavior. (If you weren't restarting appropriately and now are.)

Of course, that just means that the next question is why you're seeing that exception. That's where it may be GKE-specific. I've tried a long-running task in plain Docker (not running on GCP) and it's working fine for me. I'd still be really interested in knowing whether you can reproduce the error at all when not running in GKE. Additionally, if you're collecting metrics about when you're restarting the SubscriberClient, and can compare that with the problems you were having before, that would be good to know about.

@philvmx1
Copy link
Author

@jskeet I realized that the comment is not correct now that you've cleared up that the cancellation token in BuildAsync is not passed to the built client. It doesn't quite explain why it seems to have solve our problem. There must be something else. I'll look closely if anything else changed.

What I can't quite understand is how the client doesn't reconnect on its own after the other side hangs up.Or if it can't, why it wouldn't throw, in which case we would catch, log an error (which we never did see in the logs or this would have been much clearer to us) and StartAsync again - even in the old code.

@jskeet
Copy link
Collaborator

jskeet commented Mar 14, 2024

Or if it can't, why it wouldn't throw

I believe that's what it is doing, effectively, by making the task returned by StartAsync complete in the faulted state. I would expect that's how you got the log entry you've got. (What other behavior would you expect from an async method?)

I would expect errors like this to usually indicate a fatal problem - something we wouldn't want to just retry, or at least not for long. (It's possible that there's already retry going on, although we normally don't retry for streaming calls.) If the application wants to retry the whole subscription, that's up to the app (and it sounds like you're doing that).

It's hard to say anything more until we can reproduce the failure though.

@philvmx1
Copy link
Author

If StartAsync completed in any state, we should expect to see the Warning logged at the end of the following (our log level is >= Warning).

public async Task ProcessMessages<TEventHandler>(CancellationToken cancellationToken)
{
... snipped setup and BuildAsync code for brevity ....
        await subscriber.StartAsync(
            async (message, cancelToken) =>
            {
                try
                {
                  ... snipped for brevity ...
                    return await processor.ProcessMessage(message, handler.Handle, cancelToken);
                }
                catch (Exception e)
                {
                    _logger.LogError(e, e.Message);
                    throw;
                }
            });
        
        _logger.LogWarning($"PubSub listener for subscription {_config.SubscriptionName} has disconnected");
}

@jskeet
Copy link
Collaborator

jskeet commented Mar 14, 2024

So how did you capture the log you included in #11793 (comment)? I'd assumed that was already from awaiting StartAsync.

Note that the code you've shown above wouldn't log a warning if StartAsync returned a task that was then faulted, because when you await the task that await expression would throw. (So you wouldn't get to the _logger.LogWarning statement. You might have something logging ProcessMessages failing, but you haven't shown that.)

Rather than getting snippets of code in various places, I'd really like to go back to a minimal but complete example which actually demonstrates the problem. It's not clear to me what you observed with the code in #11793 (comment). Did you actually run that code and see the problem, or was it just meant to be sample code to show how you set things up? (It's really important that we end up with shared code which does definitely demonstrate the problem.)

@philvmx1
Copy link
Author

Error should have been logged in the BackgroundWorker in #11793 (comment) via _logger.LogError(e, e.Message);. Then it should loop around after the delay, build a new PubSub client and StartAsync again. The referenced log message is more recent, after removing the cancellationToken from BuildAsync.

Understood about the working sample, I am working on creating something that models the problem with much less abstraction. I do have test environments to run them in. What I don't have is a lot of dedicated time for this since I'm running a project with a fixed deadline. So I have to squeeze in time to try things, build isolated test code, and dig for additional information.

Digging into metrics in DataDog, found that each GRPC POST last 15 minutes until error. Interestingly, the number of errors tapers off over time. As does the number of requests.

Errors:
image

Requests
image

minimal problem model code I have so far:

using Google.Api.Gax;
using Google.Cloud.PubSub.V1;

Host.CreateDefaultBuilder(args)
    .ConfigureServices((b, s) => {
        s.AddLogging(
            x => x.AddSimpleConsole()
        )
        .AddHostedService<MessageHandler>();
    })
    .Build()
    .Run();

public class MessageHandler : BackgroundService
{
    const string PROJECT_NAME = "<your-project-id>";
    const string SUBSCRIPTION_NAME = "<your-sub-name>";
    private readonly ILogger _logger;
    private readonly SubscriberClientBuilder _builder;

    public MessageHandler(IServiceProvider sp)
    {
        _logger = sp.GetRequiredService<ILogger<MessageHandler>>();
        _builder = MakeBuilder();
    }
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _logger.LogInformation("ExecuteAsync");
        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
        var subscriber = await _builder.BuildAsync(stoppingToken);
        _logger.LogInformation("subscriber.StartAsync");

        await subscriber.StartAsync(
            async (message, cancelToken) =>
            {
                try
                {
                    _logger.LogInformation("Message {}", message.MessageId);
                    if (stoppingToken.IsCancellationRequested) _logger.LogWarning("Cancelling stoppingToken");
                    if (cancelToken.IsCancellationRequested) _logger.LogWarning("Cancelling cancelToken");

                    await HandleMessage(message, cancelToken);

                    if (stoppingToken.IsCancellationRequested) _logger.LogWarning("Cancelling stoppingToken");
                    if (cancelToken.IsCancellationRequested) _logger.LogWarning("Cancelling cancelToken");

                    _logger.LogInformation("Acking {}", message.MessageId);
                    return SubscriberClient.Reply.Ack;
                }
                catch (Exception e)
                {
                    _logger.LogError(e, e.Message);
                    if (stoppingToken.IsCancellationRequested) _logger.LogWarning("Cancelling stoppingToken");
                    throw;
                }
            });
            }
            catch (Exception e)
            {
                _logger.LogError(e, e.Message);
            }

            await Task.Delay(1000, stoppingToken);
        }
    }

    private async Task HandleMessage(PubsubMessage message, CancellationToken cancelToken)
    {
        int delayMilliseconds = GetDelayFromMessage(message);
        _logger.LogInformation("Delay {} ms", delayMilliseconds);
        await Task.Delay(delayMilliseconds, cancelToken);
    }

    private static int GetDelayFromMessage(PubsubMessage message)
    {
        int delayMilliseconds = TimeSpan.FromMinutes(10).Milliseconds;
        if (message.Attributes.TryGetValue("delay", out string delayValue)
            && int.TryParse(delayValue, out int delayMinutes)
        ) delayMilliseconds = TimeSpan.FromMinutes(delayMinutes).Milliseconds;
        return delayMilliseconds;
    }

    private static SubscriberClientBuilder MakeBuilder()
    {
        var subscriptionName = new SubscriptionName(PROJECT_NAME, SUBSCRIPTION_NAME);
        var settings = new SubscriberClient.Settings
        {
            FlowControlSettings = new FlowControlSettings(
                maxOutstandingElementCount: 10,
                maxOutstandingByteCount: null)
        };

        var subscriberClientBuilder = new SubscriberClientBuilder
        {
            Settings = settings,
            SubscriptionName = subscriptionName
        };
        return subscriberClientBuilder;
    }
}

csproj

<Project Sdk="Microsoft.NET.Sdk.Worker">

  <PropertyGroup>
    <OutputType>Exe</OutputType>
    <TargetFramework>net6.0</TargetFramework>
    <ImplicitUsings>enable</ImplicitUsings>
    <Nullable>enable</Nullable>
  </PropertyGroup>

  <ItemGroup>
    <PackageReference Include="Google.Cloud.PubSub.V1" Version="3.9.1" />
    <PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
  </ItemGroup>

</Project>

@philvmx1
Copy link
Author

Correction, our change did not solve the issue for us. With enough time, we are still seeing the downward stair step from Number of open streaming pulls.

image

@jskeet jskeet added the needs more info This issue needs more information from the customer to proceed. label Mar 27, 2024
@jskeet
Copy link
Collaborator

jskeet commented Apr 1, 2024

@philvmx1: Any updates with more logging?

@philvmx1
Copy link
Author

philvmx1 commented Apr 1, 2024

@jskeet I had it running in a dev env until Friday when trunk deployed over it. Logging in the dev env showed a lot of detail but no issues. Even then, it didn't model prod very well since we weren't doing any daily file drops there.

We haven't seen the issue in production since Feb 21, which has been good for us but not for understanding the issue at hand. Not understanding what caused the instability makes it difficult to accept that it's stable going forward. At the moment, we don't have any other information.

The only notable change RE Pub/Sub is going from OrderingKey FileName (FileType__.txt) to OrderingKey FilePrefix (FileType). That made its way into prod on Feb 29. Is there something in the OrderingKey algo that could cause streaming pulls to cease pulling as the number of OrderingKeys increases? I made some prelim tests on that theory in dev but I used random ordering keys not TestFile_20240101_1010.txt, TestFile_20240102_1001.txt, ...

@jskeet
Copy link
Collaborator

jskeet commented Apr 1, 2024

Is there something in the OrderingKey algo that could cause streaming pulls to cease pulling as the number of OrderingKeys increases?

No, I wouldn't expect so - although I can't say I'm an expert on the precise details of all the options in Pub/Sub. I'm still hopeful that this was really due to the perpetual retry loop we can currently get into if auth fails with an error of "internal" (which we're still discussing internally, in terms of the best fix).

@jskeet
Copy link
Collaborator

jskeet commented Apr 19, 2024

Any more information on this? I'm still hopeful that this is due to the perpetual retry on certain types of auth failure - which we're tracking internally.

jskeet added a commit to jskeet/google-cloud-dotnet that referenced this issue Apr 25, 2024
This is the first part of addressing googleapis#11793.

The next step will be to allow a custom retry predicate to be provided, which can examine the previous exceptions etc, with a default which will fail after a certain time, or after multiple internal failures for example. (The list of exceptions here could easily turn into a map of from "status code to number of exceptions".)

We may also want to implement something similar for Publisher operations, but that can be done separately.
jskeet added a commit to jskeet/google-cloud-dotnet that referenced this issue Apr 25, 2024
This is the first part of addressing googleapis#11793.

The next step will be to allow a custom retry predicate to be provided, which can examine the previous exceptions etc, with a default which will fail after a certain time, or after multiple internal failures for example. (The list of exceptions here could easily turn into a map of from "status code to number of exceptions".)

We may also want to implement something similar for Publisher operations, but that can be done separately.
jskeet added a commit to jskeet/google-cloud-dotnet that referenced this issue Apr 25, 2024
This is the first part of addressing googleapis#11793.

The next step will be to allow a custom retry predicate to be provided, which can examine the previous exceptions etc, with a default which will fail after a certain time, or after multiple internal failures for example. (The list of exceptions here could easily turn into a map of from "status code to number of exceptions".)

We may also want to implement something similar for Publisher operations, but that can be done separately.
jskeet added a commit that referenced this issue Apr 25, 2024
This is the first part of addressing #11793.

The next step will be to allow a custom retry predicate to be provided, which can examine the previous exceptions etc, with a default which will fail after a certain time, or after multiple internal failures for example. (The list of exceptions here could easily turn into a map of from "status code to number of exceptions".)

We may also want to implement something similar for Publisher operations, but that can be done separately.
@philvmx1
Copy link
Author

It's been awhile, but I have new information regarding this issue. We put OrderingKey and OnlyOnce delivery on an existing pull subscriber. This is now causing the subscription to stop processing. As this one sees much higher volume of messages, this happens much more quickly. We're thinking it's related exceptions being bubbled up to StartAsync.

Documentation for StartAsync says exceptions are handled as if the reply was a NACK. We're going to try returning NACK explicitly instead of bubbling the exception to see if that resolves the issue for us.

@jskeet
Copy link
Collaborator

jskeet commented May 14, 2024

Can you clarify exactly what you mean by "exceptions being bubbled up to StartAsync"? Do you mean that the subscriber task (returned by StartAsync) ends up being faulted? (Very similarly, when you say the subscription "stops processing" do you mean it just hangs like before, or that the task returned by StartAsync fails?)

@philvmx1
Copy link
Author

Clarifications:

  1. Yes, task is faulted.
  2. It hangs like before.

stripped down version of code that describes the scenario:

// in our BackgroundService...

while(!stoppingToken.IsCancellationRequested)
{
        var subscriber = await GetSubscriber();
        await subscriber.StartAsync(async (msg, ct) => throw new Exception());
        
        await Task.Delay(1000, stoppingToken);
}

@philvmx1
Copy link
Author

Sorry, I have a correction to make to the above comment - turns out we ARE returning NACK from our handler rather than rethrowing.

// in our BackgroundService...

while(!stoppingToken.IsCancellationRequested)
{
        var subscriber = await GetSubscriber();
        await subscriber.StartAsync(async (msg, ct) => SubscriberClient.Reply.Nack);
        
        await Task.Delay(1000, stoppingToken);
}

@jskeet
Copy link
Collaborator

jskeet commented May 14, 2024

I'm confused - if the task is faulted, then it's not hanging like before - because before, the task didn't end up faulted, it just retried forever.
Are you saying that in some situation it's now faulting and in some other cases it's just retrying forever?

Note that if the task is faulted, your while loop would end up throwing.

@philvmx1
Copy link
Author

Sorry, it's not faulting like I thought it was. Digging deeper into our own abstraction, we're catching, logging, and returning NACK in the Func passed to StartAsync.

@jskeet
Copy link
Collaborator

jskeet commented May 14, 2024

Okay... rather than me trying to assemble a complete picture from the multiple comments, could you add a new comment with a reasonably full description of what you're observing? I think that'll be a better way forward. (Note that this will probably be my last comment for the day - I stopped actual work a couple of hours ago.)

@philvmx1
Copy link
Author

To clarify/correct myself even further:

Our subscriber is hanging when we reply with NACK frequently when using message ordering and only once delivery. After 15 attempts with NACKs on each, these messages go to DLQ. There is a strong correlation with going to DLQ and halting pulls.

@philvmx1
Copy link
Author

philvmx1 commented May 14, 2024

Able to repro!

set up program so it NACKs all messages.

create a topic.
create subscription with OrderingKey, max tries 5, and DLQ.
publish message with ordering key a1. It get pulled, NACK'd 5 times, goes to DLQ.
publish message with ordering key a2. It get pulled, NACK'd 5 times, goes to DLQ.
publish another message with ordering key a1... not pulled!
publish message with ordering key a3. It get pulled, NACK'd 5 times, goes to DLQ.

publish a message with any ordering key used previously...not pulled!

Did additional testing with a simple project that can repro the issue. The heart of the logic is this:

await subscriber.StartAsync(
    async (message, cancelToken) =>
    {
    
        if(message.OrderingKey.StartsWith("a"))
        {
            LogMessageAction("ACK", message); 
            return await Task.FromResult(SubscriberClient.Reply.Ack);
        }
        else
        {
            LogMessageAction("NACK", message); 
            return await Task.FromResult(SubscriberClient.Reply.Nack);
        }
    });

Then, in web console I publish messages with OrderingKey either "a1" or "b1".

Once a "b1" is NACK'd all 5 tries, it goes to DLQ but no other "b1" can be pulled by neither the locally running app NOR the web console. When I stop the locally running app, the web console almost immediately pulls the "b1".

It seems like the PubSubClient is not "releasing" the OrderingKey when the message has been NACKd max times and gets sent to the DLQ. How would it? Does the client have a way to know that the message will go to the DLQ this time?

I tried throwing an exception instead of returning NACK, but the client just swallows that and treats it as a NACK which gets us into the same place again anyway.

Additionally... if I publish multiple OrderingKeys that will NACK, such as b1, b2, b3, b4, b5, b6, ..., b10. It eventually just stops receiving new messages. This is happening sometime after b5 or b6. When I restart, it will start getting the new messages.

UPDATE: I removed Only Once Delivery and it no longer stalled. So NACK + OrderingKey + Only Once is the combo that causes it to stall.

@jskeet
Copy link
Collaborator

jskeet commented May 15, 2024

Right, I've managed to reproduce that. I'm going to try without the dead-lettering now, just to see if that's relevant. Once I've got a really minimal example, I'll create a PR with code that anyone can run.

@jskeet
Copy link
Collaborator

jskeet commented May 15, 2024

#12961 is the repro.

Sample output:

2024-05-15T08:34:18.829Z info: Handler[0] Starting subscriber
2024-05-15T08:34:24.114Z info: Publish[0] Starting publishing.
2024-05-15T08:34:24.116Z info: Publish[0] Publishing message with text 'Normal 1' and ordering key 'normal'
2024-05-15T08:34:26.912Z info: Publish[0] Publishing message with text 'Normal 2' and ordering key 'normal'
2024-05-15T08:34:27.345Z info: Handler[0] Received message with ordering key 'normal' and text 'Normal 1'. Returning 'Ack'. MessageID='11153053032920147'
2024-05-15T08:34:29.013Z info: Publish[0] Publishing message with text 'Nack 1' and ordering key 'nack'
2024-05-15T08:34:29.461Z info: Handler[0] Received message with ordering key 'normal' and text 'Normal 2'. Returning 'Ack'. MessageID='11152815505479254'
2024-05-15T08:34:30.734Z info: Handler[0] Received message with ordering key 'nack' and text 'Nack 1'. Returning 'Nack'. MessageID='11152605002562264'
2024-05-15T08:34:33.236Z info: Handler[0] Received message with ordering key 'nack' and text 'Nack 1'. Returning 'Nack'. MessageID='11152605002562264'
2024-05-15T08:34:35.715Z info: Handler[0] Received message with ordering key 'nack' and text 'Nack 1'. Returning 'Nack'. MessageID='11152605002562264'
2024-05-15T08:34:38.254Z info: Handler[0] Received message with ordering key 'nack' and text 'Nack 1'. Returning 'Nack'. MessageID='11152605002562264'
2024-05-15T08:34:40.770Z info: Handler[0] Received message with ordering key 'nack' and text 'Nack 1'. Returning 'Nack'. MessageID='11152605002562264'
2024-05-15T08:34:44.260Z info: Publish[0] Publishing message with text 'Normal 3' and ordering key 'normal'
2024-05-15T08:34:44.360Z info: Publish[0] Publishing message with text 'Nack 2' and ordering key 'nack'
2024-05-15T08:34:44.446Z info: Publish[0] Publishing complete. Waiting for 2 minutes
2024-05-15T08:34:45.413Z info: Handler[0] Received message with ordering key 'normal' and text 'Normal 3'. Returning 'Ack'. MessageID='11152512446328396'
2024-05-15T08:35:25.458Z warn: Subscriber[0] Recoverable error in streaming pull; will retry. [...]

The last "recoverable error" part happens multiple times, but I don't think that's related to the issue. (I'll track that internally with a separate issue.)

The important part is that we don't get the "Nack 2" message.

@jskeet jskeet removed the needs more info This issue needs more information from the customer to proceed. label May 15, 2024
@jskeet
Copy link
Collaborator

jskeet commented May 15, 2024

(I won't be able to spend any more time on this today, but it's good that we've managed to reproduce it.)

@jskeet
Copy link
Collaborator

jskeet commented May 17, 2024

Okay, having played with this a bit more and given the subscriber a bit longer, I see the following pattern:

  • Publish "nack 1" message
  • Message is received and nacked 5 times, ends up in DLQ
  • Wait for a bit (to make sure all the above has happened) - it gets confusing if the second message is sent while the first is still in the process of being nacked
  • Publish "nack 2" message (same ordering key)
  • The "nack 2" message is not received for some time, but it is received eventually
    • At which point it's nacked 5 times, ends up in DLQ

I usually see that the specific ordering key is "blocked" for about 2 minutes after the final nack - but I've seen longer and shorter times too. (And sometimes the gap between delivery attempts is on the order of a second - sometimes it's longer.)

While I haven't seen any documentation for this (and I'll ask the Pub/Sub team to confirm that this is the intended behavior and document it), I don't think the client is doing anything wrong here.

Now, this issue has gone through quite a few iterations - but does this description explain everything you're seeing?

@philvmx1
Copy link
Author

We weren't seeing a recovery like you're seeing, even after days. But then again we several to hundreds of OrderingKeys rather than just 1.

What happens if you use more than one OrderingKey such as "nack1" "nack2" "nack3" ... and up to "nack6"? It seems to have made a difference when there are more than 4 or 5.

@jskeet
Copy link
Collaborator

jskeet commented May 17, 2024

Thanks for the suggestion - I'll give that a go (although it might be on Monday).

@philvmx1
Copy link
Author

NP.

IIRC, it stopped getting "ack" too after that.

@jskeet
Copy link
Collaborator

jskeet commented May 17, 2024

IIRC, it stopped getting "ack" too after that.

Could you clarify precisely what you mean by that?

@philvmx1
Copy link
Author

I mean that it stopped receiving messages entirely after messages with several OrderKeys went to DLQ within a short window (order of minutes perhaps). I didn't necessarily dig into this since I have some deadlines coming up very soon.

However, there was some very strange behaviors when sending a combination of "ack" and "nack" with different OrderingKey and some mixed in without an OrderingKey. I was basically doing exploratory testing at the time. We saw different things like the same message was handles again even though it was already ACK'd and things like that.

I would suggest sending OrderingKeys randomly chosen from a set of "ack1" - "ack6", "nack1" - "nack6", and "" (no ordering key).

@jskeet
Copy link
Collaborator

jskeet commented May 17, 2024

Just tried with 10 ordering keys, and I got the same behavior as before:

  • Send 10 messages, each with a different ordering key
  • Nack them all (5x each)
  • Wait a bit
  • Send 10 messages, with the same set of 10 ordering keys as before
  • They all get received (after a while)

Will try with a mixture of acks and nacks (and no ordering key)... I'd rather not get into randomness though.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the Pub/Sub API. type: question Request for information or clarification. Not an issue.
Projects
None yet
Development

No branches or pull requests

4 participants