Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature Idea: support IAsyncEnumerable for subscriptions #346

Open
rikbosch opened this issue Dec 13, 2019 · 3 comments
Open

Feature Idea: support IAsyncEnumerable for subscriptions #346

rikbosch opened this issue Dec 13, 2019 · 3 comments

Comments

@rikbosch
Copy link
Contributor

rikbosch commented Dec 13, 2019

With IAsyncEnumerable being added in c# it would be nice to support it in the subscriptions for all or individual streams using System.Threading.Channels as the underlying implementation.

proposed api could be:

public IAsyncEnumerable<StreamMessage> SubscribeToAllStream(long fromPosition,Action<SubscriptionOptions> configureSubscription=null, [EnumeratorCancellation] cancellationToken = default)

where SubscriptionOptions hold the Channel<T> (bounded / unbounded) and other options like fetchsize, fetchJson, name etc.

The implementation should provide a sensible default, and allow for easy reconfiguration.

public class SubscriptionOptions
{
    public Channel<T> Channel {get;set;}
    public bool FetchJson {get;set;}
    public int FetchSize {get;set;}
}

on the consumer side we can then write:

  var tcs = new CancellationTokenSource();
    var consumerTask = Task.Run(()=>
    {
        int position = 1000;

        // outer loop (handles retries)
        while(!tcs.IsCancellationRequested)
        {
            try
            {
                await foreach(var message in store.SubscribeToAllStream(
                    position, 
                    options=>
                    {
                        // configure options here
                        options.FetchSize=256;
                    },
                    cancellationToken: tcs.Token))
                {
                    position = message.Position;
                    // handle the message
                    // errorhandling of the consumer has to be done here                                                                  
                }
            }
            catch(Exception ex) when (IsRetryableException(ex))
            {
                // this would be a producer exception
                // database not available, timeout etc
                // just restart the subscription from last known position                            
            }
        }
    }
    //wait for some event to stop the application
    await Application.ExitRequesed;
    // cancel the subscription
    tcs.Cancel();
    // wait until consumer has processed last message    
    await consumerTask;

Using Channels as the backing implementation of IAsyncEnumerable gives the calling code the opportunity to specify how to handle a slow consumer (loadshedding, wait).

An in-depth introduction to System.Threading.Channels can be found here : https://devblogs.microsoft.com/dotnet/an-introduction-to-system-threading-channels/

@thefringeninja
Copy link
Member

In fact, that work is already underway in the v2 branch. However I was not yet aware of System.Threading.Channels. I will take a look at this in the next couple of weeks. Thanks!

@damianh
Copy link
Member

damianh commented Dec 21, 2019

Just to say this will necessitate moving to netcoreapp3.1 and dropping support for netstandard2.0 (which I am ok with)

@mcintyre321
Copy link

It's a shame IQbservable never took off

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants