Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async version of Observable.Create does not signal cancellation token when unsubscribed if subscribeAsync implementation has thread blocking operations #2052

Open
ArXen42 opened this issue Nov 26, 2023 · 0 comments

Comments

@ArXen42
Copy link

ArXen42 commented Nov 26, 2023

Bug

System.Reactive 6.0.0 (latest stable, works in preview 6.0.1 as well)
Tested on Windows (will be able to test on Linux in a few days, but the issue is probably not platform-specific).

When using Observabe.Create<TResult>(Func<IObserver<TResult>, CancellationToken, Task> subscribeAsync), the CancellationToken passed to subscribeAsync action should normally be signaled when the resulting sequence is unsubscribed from.
However, I've encountered a situation when this behavior is not working - it seems to break if subscribeAsync contains thread blocking operations, such as blocking device reads or a simple Thread.Sleep.

Minimal example:

using System.Reactive.Concurrency;
using System.Reactive.Linq;

var o = Observable.Create<int>((observer, ct) =>
{
    Int32 i = 0;
    while (!ct.IsCancellationRequested)
    {
        Console.WriteLine("Still going");
        observer.OnNext(i++);
        Thread.Sleep(250); // This was SerialPort.ReadByte in real code
        // Will work with await Task.Delay(250);
    }

    Console.WriteLine($"Observable.Create loop exited (IsCancellationRequested = {ct.IsCancellationRequested})");
    return Task.CompletedTask;
});

Console.WriteLine("Subscribing");
var subscription = o
    .SubscribeOn(TaskPoolScheduler.Default) // Doesn't really matter
    .Subscribe(Console.WriteLine, Console.WriteLine, () => Console.WriteLine("Complete"));

await Task.Delay(1000);
Console.WriteLine("Disposing subscription");
subscription.Dispose();
Console.WriteLine("Disposed");
await Task.Delay(1500);

This will produce the following output:

Subscribing
Still going
0          
Still going
1
Still going
2
Still going
3
Disposing subscription
Disposed
Still going  <---- this shouldn't happen
Still going
Still going
Still going
Still going
Still going

Note that the use of TaskPoolScheduler here is just for conciseness. The issue will persist without any schedulers - for example if exception is thrown in some downstream subscriber (which is how I encountered this issue in the first place).

I understand that having blocking calls in Observable.Create is non-ideal, but also don't see any reason for the cancellation token behavior to depend on this implementation detail.

Another remark - adding a single Task.Yield at the beginning of subscribeAsync solves the problem. Unfortunately I don't understand the intricate details of how Sinks and other internal stuff works yet, so not sure why this is happening.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant