Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Observable.Start discards the IDisposable an scheduler provides to it and replaces with an empty one. #2033

Open
fahadash opened this issue Nov 7, 2023 · 0 comments

Comments

@fahadash
Copy link

fahadash commented Nov 7, 2023

Observable.Start has an overload that takes an scheduler. I am writing my own Limited Concurrency scheduler that returns a Disposable which when disposed releases my semaphores so my scheduler know that it can kick off other tasks that are waiting. However, I noticed that the IDisposable returned by Schedule method is never disposed. Upon looking further I found that the Observable.Start uses the following ScheduleAction method of System.Reactive.Concurrency.Scheduler.

        internal static IDisposable ScheduleAction<TState>(this IScheduler scheduler, TState state, Action<TState> action)
        {
            if (scheduler == null)
            {
                throw new ArgumentNullException(nameof(scheduler));
            }

            if (action == null)
            {
                throw new ArgumentNullException(nameof(action));
            }

            return scheduler.Schedule(
                (action, state),
                (_, tuple) =>
                {
                    tuple.action(tuple.state);
                    return Disposable.Empty;
                });
        }

As seen in the code above, after the action is called, it never calls Dispose or even returns the same disposable along that was passed to it. This is really limiting us in our scheduler. Consider the following scheduler.

public class ThreeAtATimeScheduler : IScheduler
{
    private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(3);

public IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
{
    var scheduler = ThreadPoolScheduler.Instance;
    return new CompositeDisposable(
        Disposable.Create(() => _semaphore.Release()),
        _semaphore.WaitAsync().
        ContinueWith(t => scheduler.Schedule(state, action))
    );
 }

...
}

now consider the following use

var scheduler = new ThreeAtATimeScheduler();
var subscription = 
Observable.Range(0, 100)
    .Buffer(TimeSpan.FromSeconds(1), 10)
    .SelectMany(x => x)
    .Select(x => Observable.Start(() =>
    {
        Console.WriteLine($"Performing for Thread {Thread.CurrentThread.ManagedThreadId}");
    }, scheduler))
    .Concat()
    .Wait();

The above code writes only 3 lines to the console, the semaphores are allocated and never released.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant