Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue with CancellationToken lifetime in Observable.Using (async version) #2089

Open
DenomikoN opened this issue Mar 6, 2024 · 0 comments
Open

Comments

@DenomikoN
Copy link

DenomikoN commented Mar 6, 2024

Bug

In the async version of Observable.Using, in observable factory delegate cancellationToken gets canceled not at the moment of destroying subscription. The synchronous version looks ok as there are no cancellation tokens.

Reproduced in System.Reactive.Linq 6.0.0, x64, Win11, dotnet version 8.0.100

using System.Reactive;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using static System.Reactive.Linq.Observable;
					
public class Program
{
    public static async Task Main()
    {
        var enumeration = Observable.Using<int, IDisposable>(
                async (cancellation) => {
                    cancellation.Register(() => Console.WriteLine("Resource factory cancellation called"));
                    await Task.Yield();
                    Console.WriteLine("Created resource");
                    return Disposable.Create(() => Console.WriteLine("Disposed resource"));
                },

                async (resource, cancellation) => {
                    cancellation.Register(() => Console.WriteLine("Resource observable cancellation called")); // why is this called not when subscription disposing?

                    Console.WriteLine("Init resource observable");
                    await Task.Yield();
                    // return -1 immediately and then tick with 1 second interval
                    var underlyingObservable = Observable.Return(-1)
                                                        .Concat(
                                                            Observable
                                                            .Interval(TimeSpan.FromSeconds(1))
                                                            .Select((t,i) => i).Take(10)
                                                        );
                    return new AnonymousObservable<int>((observer) => {
                        Console.WriteLine("Subscribed");
                        return new CompositeDisposable(
                            underlyingObservable.Subscribe(observer),
                            Disposable.Create(() => Console.WriteLine("Unsubscribed"))
                        );
                    });
                }
            );


        enumeration = enumeration.Publish().RefCount();

        var subscription = enumeration.Subscribe(
            num => Console.WriteLine("Next:{0}", num),
            ex => Console.WriteLine("Error:{0}", ex),
            () => Console.WriteLine("Completed")
        );

        await Task.Delay(5000);

        subscription.Dispose();
    }
}

Actual output:

Created resource
Init resource observable
Resource factory cancellation called
Subscribed
Next:-1
Resource observable cancellation called  << here is the problem
Next:0
Next:1
Next:2
Next:3
Unsubscribed
Disposed resource

Expected output:

Created resource
Init resource observable
Resource factory cancellation called
Subscribed
Next:-1
Next:0
Next:1
Next:2
Next:3
Resource observable cancellation called
Unsubscribed
Disposed resource
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant