Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task.ToObservable will always continue on TaskScheduler.Current instead of the current synchronization context. #2062

Closed
paul-scharnofske-AP opened this issue Dec 8, 2023 · 2 comments

Comments

@paul-scharnofske-AP
Copy link

Bug

If you use Task.ToObservable() without specifying a scheduler, then the following code will run:

public static IObservable<TResult> ToObservable<TResult>(this Task<TResult> task)
{
if (task == null)
{
throw new ArgumentNullException(nameof(task));
}
return ToObservableImpl(task, scheduler: null, ignoreExceptionsAfterUnsubscribe: false);
}

private static IObservable<TResult> ToObservableImpl<TResult>(Task<TResult> task, IScheduler? scheduler, bool ignoreExceptionsAfterUnsubscribe)
{
if (task.IsCompleted)
{
scheduler ??= ImmediateScheduler.Instance;
return task.Status switch
{
TaskStatus.Faulted => new Throw<TResult>(task.GetSingleException(), scheduler),
TaskStatus.Canceled => new Throw<TResult>(new TaskCanceledException(task), scheduler),
_ => new Return<TResult>(task.Result, scheduler)
};
}
return new SlowTaskObservable<TResult>(task, scheduler, ignoreExceptionsAfterUnsubscribe);
}

public IDisposable Subscribe(IObserver<TResult> observer)
{
if (observer == null)
{
throw new ArgumentNullException(nameof(observer));
}
var cts = new CancellationDisposable();
var options = GetTaskContinuationOptions(_scheduler);
if (_scheduler == null)
{
_task.ContinueWith(static (t, subjectObject) => t.EmitTaskResult((IObserver<TResult>)subjectObject!), observer, cts.Token, options, TaskScheduler.Current);
}
else
{
_task.ContinueWithState(
static (task, tuple) => tuple.scheduler.ScheduleAction(
(task, tuple.observer),
static tuple2 => tuple2.task.EmitTaskResult(tuple2.observer)),
(scheduler: _scheduler, observer),
options,
cts.Token);
}
if (_ignoreExceptionsAfterUnsubscribe)
{
_task.ContinueWith(t => _ = t.Exception, TaskContinuationOptions.OnlyOnFaulted);
}
return cts;
}
}

The following snippet concerns me:

if (_scheduler == null)
{
    _task.ContinueWith(
        static (t, subjectObject) => t.EmitTaskResult((IObserver<TResult>)subjectObject!),
        observer,
        cts.Token,
        options,
        TaskScheduler.Current); // This should be 'TaskScheduler.FromCurrentSynchronizationContext'
}

This means that after the task completes, it will continue on the task scheduler instead of where you came from.

I find this behavior very unexpected, especially since this is used internally in StartAsync which is used by FromAsync:

using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
using System.Windows.Threading;

internal class Program
{
    public static async Task Main()
    {
        // Run dispatcher in this thread and run 'MainOnDispatcher' on it.
        Dispatcher.CurrentDispatcher.BeginInvoke(_MainOnDispatcher);
        Dispatcher.CurrentDispatcher.UnhandledException += (sender, eventArgs) =>
        {
            Console.WriteLine($"Unhandled exception in dispatcher: {eventArgs.Exception.Message}\n{eventArgs.Exception.StackTrace}");
        };
        Dispatcher.Run();
    }

    private static void _MainOnDispatcher()
    {
        Observable
            .Return(1)
            .ObserveOnDispatcher()
            .Do(_ => Console.WriteLine($"(thread: {Thread.CurrentThread.ManagedThreadId}) before 'FromAsync'"))
            .Select(_ => Example().ToObservable())
            .Concat()
            .Do(_ => Console.WriteLine($"(thread: {Thread.CurrentThread.ManagedThreadId}) after 'Concat'"))
            .Subscribe();
    }
    
    static async Task Example()
    {
        Console.WriteLine($"(thread: {Thread.CurrentThread.ManagedThreadId}) before delay");
        await Task.Delay(TimeSpan.FromMilliseconds(100));
        Console.WriteLine($"(thread: {Thread.CurrentThread.ManagedThreadId}) after delay");
    }
}

Actual Output

(thread: 1) before 'FromAsync'
(thread: 1) before delay
(thread: 1) after delay
(thread: 4) after 'Concat'

Expected Output

(thread: 1) before 'FromAsync'
(thread: 1) before delay
(thread: 1) after delay
(thread: 1) after 'Concat'

What makes this worse is the line scheduler ??= ImmediateScheduler.Instance; which means that there's a race condition where if the underlying task is finished, it won't finish the thread but otherwise it does:

// If the task has been completed before the 'ToObservable' call occurs, then it won't switch the thread.
static Task Example()
{
    return Task.CompletedTask;
}
(thread: 1) before 'FromAsync'
(thread: 1) after 'Concat'

Details

Which library version?

5.0.0 or 6.0.0

What are the platform(s), environment(s) and related component version(s)?

Target Framework: net6.0-windows10.0.19041 (with net6.0 too but then ObserveOnDispatcher is missing)

@idg10
Copy link
Collaborator

idg10 commented May 2, 2024

Rx makes very few guarantees about the context in notifications will be delivered. So in most cases, you should make no assumptions about the context in which your subscriber callbacks (including callbacks passed to Do) will run.

There are exceptions of course—ObserveOnDispatcher would be pretty pointless if you couldn't rely on it delivering notifications to its direct subscribers via the dispatcher.

However, it only make that guarantee for its direct subscribers, mainly because it can't do anything more than that. All it sees is an IObserver<T>. It has no control over what the observer does next. So if that observer chooses to, say, start a new thread, and then deliver notifications to a downstream subscriber on that thread, the ObserveOnDispatcher step is powerless to do anything about that, because this is something that happened downstream.

With that in mind, I'm going to add some annotations to your code to clarify what you can or can't rely on at each stage:

Observable
    .Return(1)

    // If we subscribe here, we've got no control over what thread notifications happen on.

   .ObserveOnDispatcher()

    // If we subscribe here, we're guaranteed that notifications occur on the dispatcher thread
    // The `Do` callback is effectively a subscription, so this callback runs on the dispatcher.
    .Do(_ => Console.WriteLine($"(thread: {Thread.CurrentThread.ManagedThreadId}) before 'FromAsync'"))

    // If we subscribe here, then I don't think the documentation guarantees anything at all,
    // but as it happens, the `Do` operator always uses whatever thread the incoming notification
    // arrived on, so in fact, subscribers will receive notifications on the dispatcher thread here too.
    // Even though that's not a documented promise, I can't see us ever changing it, because too
    // many apps likely rely on it.

    .Select(_ =>
        // Select invokes its projection callback on the same thread as it received the
        // notification, so this next bit of code will run on the dispatcher thread (and to be
        // more precise, within the dispatcher's SynchronizationContext.
        // However it's basically irrelevant which thread we call the `ToObservable` method
        // on. In general in Rx, the context from which you subscribe to an observable source
        // has no bearing on the context in which that source will deliver events.
        // (That's precisely why we have both `SubscribeOn` and `ObserveOn`.)
        // So you should be aware that there's absolutely no reason to think the `IObservable<T>`
        // returned by `ToObservable` here is going to remember anything about the context
        // from which it was called. If you want an IObservable that's going to deliver events
        // on a particular context, you'll need to use one of the ObserveOnXxx methods.
        Example().ToObservable())

    // If we subscribe here, we're *still* going to get notifications on the dispatcher thread
    // because `Select` just uses whatever thread it received a notification on to deliver the projected
    // value downstream. (Again, I don't think that's documented, but I'm confident it will
    // never change.)

        .Concat()

    // So this is where it gets interesting. I believe the specific overload of Concat here will be
    // the one that takes an input of type IObservable<IObservable<TSource>>, and produces
    // a flattened IObservable<TSource> as a result. It happens to wait for each input source
    // to complete before it moves onto the next, but that's irrelevant here because your source
    // only ever produces a single observable, which itself only ever produces a single value.
    // I'd call Concat a somewhat idiosyncratic way to achieve this, because it suggests you're
    // stitching together items in order from a bunch of streams, when in fact you're just collecting
    // a single item.
    // It would be more idiomatic to replace the Select with a SelectMany and drop the Concat.
    // (The 'Many' is still a bit misleading, but SelectMany is the slightly more idiomatically common
    // way to flatten nested observables.)
    //
    // The reason this is interesting from a scheduler perspective is that this operator is actually
    // the end of the line for the original notification that started from the Return operator.
    // This Concat operator isn't going to forward the notification it received from its upstream.
    // Instead, it *subscribes* to the observable source it receives from its upstream, and then
    // returns.
    // Only when one of those newly set up subscriptions receives a notification will Concat have
    // anything to forward. (And in this specific instance, there will be exactly one such subscription,
    // and it will only ever produce one item.)
    // So at that point, the context in which Concat runs is *not* the one established by your
    // earlier call to ObserveOnDispatcher. It's now the one established by the Select callback:
    //    Example().ToObservable()
    // And since you didn't tell Rx you wanted this observable to deliver its notifications through
    // any particular scheduler, you can't safely make any assumptions about the context in
    // which notifications will be delivered. If you wanted the observable produced by your
    // Select callback to deliver through the dispatcher you should have said so.
    // That's why this does not run on the dispatcher thread:
    .Do(_ => Console.WriteLine($"(thread: {Thread.CurrentThread.ManagedThreadId}) after 'Concat'"))
            .Subscribe();

The key observation here is that Select breaks the chain. More generally, anything that delivers events not directly from its upstream, but by subscribing to other observables, will break the causality chain.

If you changed that Select callback to use this:

Example().ToObservable().ObserveOnDispatcher()

you would then explicitly be telling Rx that you do want it to capture dispatcher you're on at the point where that Select callback runs, and use that to deliver notifications to any subscribers to that stream.

Basically, any time you introduce completely new observable sources (which is what ToObservable is doing here), Rx won't capture any sort of context, because in general we don't. You have to tell it you want that.

And the reason we don't capture context automatically in all cases is that a) it's fairly expensive, and b) in cases where you really didn't need work to be run on the UI thread, it can ruin the responsiveness of the UI if that work does in fact happen on the UI thread.

So observable sources that generate new events out of thin air either use the most efficient, naturally available context (so the current task scheduler in this case), or they will accept a scheduler, and will pick a default scheduler if you don't supply one (e.g. Return works that way, as do the various timer operators).

So we don't consider this behaviour to be a bug. If you want to capture the current dispatcher and have all subscribers receive notifications through that, use ObserveOnDispatcher. If for some reason you need to be more general and want to capture the current synchronization context use the SynchronizationContextScheduler in conjunction with ObserveOn.

@idg10 idg10 closed this as completed May 2, 2024
@paul-scharnofske-AP
Copy link
Author

Thanks for the elaborate response.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants