Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce allocations in schedulers #500

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 11 additions & 9 deletions Rx.NET/Source/src/System.Reactive/Concurrency/CatchScheduler.cs
Expand Up @@ -76,16 +76,18 @@ public CatchSchedulerLongRunning(ISchedulerLongRunning scheduler, Func<TExceptio

public IDisposable ScheduleLongRunning<TState>(TState state, Action<TState, ICancelable> action)
{
return _scheduler.ScheduleLongRunning(state, (state_, cancel) =>
{
try
{
action(state_, cancel);
}
catch (TException exception) when (_handler(exception))
return _scheduler.ScheduleLongRunning(
(scheduler: this, action, state),
(tuple, cancel) =>
{
}
});
try
{
tuple.action(tuple.state, cancel);
}
catch (TException exception) when (tuple.scheduler._handler(exception))
{
}
});
}
}

Expand Down
Expand Up @@ -16,6 +16,18 @@ namespace System.Reactive.Concurrency
//
internal class /*Default*/ConcurrencyAbstractionLayerImpl : IConcurrencyAbstractionLayer
{
private sealed class WorkItem
{
public WorkItem(Action<object> action, object state)
{
this.Action = action;
this.State = state;
}

public Action<object> Action { get; }
public object State { get; }
}

public IDisposable StartTimer(Action<object> action, object state, TimeSpan dueTime) => new Timer(action, state, Normalize(dueTime));

public IDisposable StartPeriodicTimer(Action action, TimeSpan period)
Expand All @@ -39,7 +51,13 @@ public IDisposable StartPeriodicTimer(Action action, TimeSpan period)

public IDisposable QueueUserWorkItem(Action<object> action, object state)
{
System.Threading.ThreadPool.QueueUserWorkItem(_ => action(_), state);
System.Threading.ThreadPool.QueueUserWorkItem(itemObject =>
{
var item = (WorkItem)itemObject;

item.Action(item.State);
}, new WorkItem(action, state));

return Disposable.Empty;
}

Expand All @@ -51,10 +69,12 @@ public IDisposable QueueUserWorkItem(Action<object> action, object state)

public void StartThread(Action<object> action, object state)
{
new Thread(() =>
new Thread(itemObject =>
{
action(state);
}) { IsBackground = true }.Start();
var item = (WorkItem)itemObject;

item.Action(item.State);
}) { IsBackground = true }.Start(new WorkItem(action, state));
}

private static TimeSpan Normalize(TimeSpan dueTime) => dueTime < TimeSpan.Zero ? TimeSpan.Zero : dueTime;
Expand Down Expand Up @@ -132,35 +152,39 @@ public void StartThread(Action<object> action, object state)

private sealed class Timer : IDisposable
{
private object _state;
private Action<object> _action;
private volatile System.Threading.Timer _timer;

public Timer(Action<object> action, object state, TimeSpan dueTime)
{
_state = state;
_action = action;

// Don't want the spin wait in Tick to get stuck if this thread gets aborted.
try { }
finally
{
//
// Rooting of the timer happens through the this.Tick delegate's target object,
// Rooting of the timer happens through the Timer's state
// which is the current instance and has a field to store the Timer instance.
//
_timer = new System.Threading.Timer(Tick, state, dueTime, TimeSpan.FromMilliseconds(System.Threading.Timeout.Infinite));
_timer = new System.Threading.Timer(_ => Tick(_), this, dueTime, TimeSpan.FromMilliseconds(System.Threading.Timeout.Infinite));
}
}

private void Tick(object state)
private static void Tick(object state)
{
var timer = (Timer) state;

try
{
_action(state);
timer._action(timer._state);
}
finally
{
SpinWait.SpinUntil(IsTimerAssigned);
Dispose();
SpinWait.SpinUntil(timer.IsTimerAssigned);
timer.Dispose();
}
}

Expand All @@ -173,6 +197,7 @@ public void Dispose()
{
_action = Stubs<object>.Ignore;
_timer = TimerStubs.Never;
_state = null;

timer.Dispose();
}
Expand All @@ -189,13 +214,18 @@ public PeriodicTimer(Action action, TimeSpan period)
_action = action;

//
// Rooting of the timer happens through the this.Tick delegate's target object,
// Rooting of the timer happens through the timer's state
// which is the current instance and has a field to store the Timer instance.
//
_timer = new System.Threading.Timer(Tick, null, period, period);
_timer = new System.Threading.Timer(_ => Tick(_), this, period, period);
}

private void Tick(object state) => _action();
private static void Tick(object state)
{
var timer = (PeriodicTimer)state;

timer._action();
}

public void Dispose()
{
Expand All @@ -219,19 +249,21 @@ public FastPeriodicTimer(Action action)
{
_action = action;

new System.Threading.Thread(Loop)
new System.Threading.Thread(_ => Loop(_))
{
Name = "Rx-FastPeriodicTimer",
IsBackground = true
}
.Start();
.Start(this);
}

private void Loop()
private static void Loop(object threadParam)
{
while (!disposed)
var timer = (FastPeriodicTimer)threadParam;

while (!timer.disposed)
{
_action();
timer._action();
}
}

Expand Down
72 changes: 48 additions & 24 deletions Rx.NET/Source/src/System.Reactive/Concurrency/DefaultScheduler.cs
Expand Up @@ -12,6 +12,43 @@ namespace System.Reactive.Concurrency
/// <seealso cref="Scheduler.Default">Singleton instance of this type exposed through this static property.</seealso>
public sealed class DefaultScheduler : LocalScheduler, ISchedulerPeriodic
{
private sealed class UserWorkItem<TState> : IDisposable
{
private IDisposable _cancelRunDisposable;
private IDisposable _cancelQueueDisposable;

private readonly TState _state;
private readonly IScheduler _scheduler;
private readonly Func<IScheduler, TState, IDisposable> _action;

public UserWorkItem(IScheduler scheduler, TState state, Func<IScheduler, TState, IDisposable> action)
{
_state = state;
_action = action;
_scheduler = scheduler;
}

public void Run()
{
if (!Disposable.GetIsDisposed(ref _cancelRunDisposable))
{
Disposable.TrySetSingle(ref _cancelRunDisposable, _action(_scheduler, _state));
}
}

public IDisposable CancelQueueDisposable
{
get => Disposable.GetValue(ref _cancelQueueDisposable);
set => Disposable.TrySetSingle(ref _cancelQueueDisposable, value);
}

public void Dispose()
{
Disposable.TryDispose(ref _cancelQueueDisposable);
Disposable.TryDispose(ref _cancelRunDisposable);
}
}

private static readonly Lazy<DefaultScheduler> s_instance = new Lazy<DefaultScheduler>(() => new DefaultScheduler());
private static IConcurrencyAbstractionLayer s_cal = ConcurrencyAbstractionLayer.Current;

Expand All @@ -37,20 +74,13 @@ public override IDisposable Schedule<TState>(TState state, Func<IScheduler, TSta
if (action == null)
throw new ArgumentNullException(nameof(action));

var d = new SingleAssignmentDisposable();
var workItem = new UserWorkItem<TState>(this, state, action);

var cancel = s_cal.QueueUserWorkItem(_ =>
{
if (!d.IsDisposed)
{
d.Disposable = action(this, state);
}
}, null);
workItem.CancelQueueDisposable = s_cal.QueueUserWorkItem(
closureWorkItem => ((UserWorkItem<TState>)closureWorkItem).Run(),
workItem);

return StableCompositeDisposable.Create(
d,
cancel
);
return workItem;
}

/// <summary>
Expand All @@ -71,20 +101,14 @@ public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Fun
if (dt.Ticks == 0)
return Schedule(state, action);

var d = new SingleAssignmentDisposable();
var workItem = new UserWorkItem<TState>(this, state, action);

var cancel = s_cal.StartTimer(_ =>
{
if (!d.IsDisposed)
{
d.Disposable = action(this, state);
}
}, null, dt);
workItem.CancelQueueDisposable = s_cal.StartTimer(
closureWorkItem => ((UserWorkItem<TState>)closureWorkItem).Run(),
workItem,
dt);

return StableCompositeDisposable.Create(
d,
cancel
);
return workItem;
}

/// <summary>
Expand Down
26 changes: 13 additions & 13 deletions Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Async.cs
Expand Up @@ -159,7 +159,7 @@ public static IDisposable ScheduleAsync(this IScheduler scheduler, Func<ISchedul
if (action == null)
throw new ArgumentNullException(nameof(action));

return ScheduleAsync_(scheduler, default(object), (self, o, ct) => action(self, ct));
return ScheduleAsync_(scheduler, action, (self, closureAction, ct) => closureAction(self, ct));
}

/// <summary>
Expand All @@ -176,7 +176,7 @@ public static IDisposable ScheduleAsync(this IScheduler scheduler, Func<ISchedul
if (action == null)
throw new ArgumentNullException(nameof(action));

return ScheduleAsync_(scheduler, default(object), (self, o, ct) => action(self, ct));
return ScheduleAsync_(scheduler, action, (self, closureAction, ct) => closureAction(self, ct));
}

/// <summary>
Expand Down Expand Up @@ -234,7 +234,7 @@ public static IDisposable ScheduleAsync(this IScheduler scheduler, TimeSpan dueT
if (action == null)
throw new ArgumentNullException(nameof(action));

return ScheduleAsync_(scheduler, default(object), dueTime, (self, o, ct) => action(self, ct));
return ScheduleAsync_(scheduler, action, dueTime, (self, closureAction, ct) => closureAction(self, ct));
}

/// <summary>
Expand All @@ -252,7 +252,7 @@ public static IDisposable ScheduleAsync(this IScheduler scheduler, TimeSpan dueT
if (action == null)
throw new ArgumentNullException(nameof(action));

return ScheduleAsync_(scheduler, default(object), dueTime, (self, o, ct) => action(self, ct));
return ScheduleAsync_(scheduler, action, dueTime, (self, closureAction, ct) => closureAction(self, ct));
}

/// <summary>
Expand Down Expand Up @@ -310,7 +310,7 @@ public static IDisposable ScheduleAsync(this IScheduler scheduler, DateTimeOffse
if (action == null)
throw new ArgumentNullException(nameof(action));

return ScheduleAsync_(scheduler, default(object), dueTime, (self, o, ct) => action(self, ct));
return ScheduleAsync_(scheduler, action, dueTime, (self, closureAction, ct) => closureAction(self, ct));
}

/// <summary>
Expand All @@ -328,37 +328,37 @@ public static IDisposable ScheduleAsync(this IScheduler scheduler, DateTimeOffse
if (action == null)
throw new ArgumentNullException(nameof(action));

return ScheduleAsync_(scheduler, default(object), dueTime, (self, o, ct) => action(self, ct));
return ScheduleAsync_(scheduler, action, dueTime, (self, closureAction, ct) => closureAction(self, ct));
}

private static IDisposable ScheduleAsync_<TState>(IScheduler scheduler, TState state, Func<IScheduler, TState, CancellationToken, Task> action)
{
return scheduler.Schedule(state, (self, s) => InvokeAsync(self, s, action));
return scheduler.Schedule((state, action), (self, t) => InvokeAsync(self, t.state, t.action));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it have to be tuples? Can't you just define a new struct to hold both parameters and avoid the hassle with changing the language version and depending on yet another external library?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

C# 7.1 aims to avoid exactly that, having to create structs everytime you need such tuples. Besides, the language support for tuples is really nice, the syntax is very low-noise. What exactly do you mean by 'hassle with changing the language version' ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't certain C# language versions require minimum .NET Framework, .NET Standard or .NET Core versions to run? https://en.wikipedia.org/wiki/C_Sharp_(programming_language)#Versions

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if so, where's the break from C# 7.0 to 7.1? I didn't even pull it up to 7.3 right away, which would probably be fine as well.

}

private static IDisposable ScheduleAsync_<TState>(IScheduler scheduler, TState state, Func<IScheduler, TState, CancellationToken, Task<IDisposable>> action)
{
return scheduler.Schedule(state, (self, s) => InvokeAsync(self, s, action));
return scheduler.Schedule((state, action), (self, t) => InvokeAsync(self, t.state, t.action));
}

private static IDisposable ScheduleAsync_<TState>(IScheduler scheduler, TState state, TimeSpan dueTime, Func<IScheduler, TState, CancellationToken, Task> action)
{
return scheduler.Schedule(state, dueTime, (self, s) => InvokeAsync(self, s, action));
return scheduler.Schedule((state, action), dueTime, (self, t) => InvokeAsync(self, t.state, t.action));
}

private static IDisposable ScheduleAsync_<TState>(IScheduler scheduler, TState state, TimeSpan dueTime, Func<IScheduler, TState, CancellationToken, Task<IDisposable>> action)
{
return scheduler.Schedule(state, dueTime, (self, s) => InvokeAsync(self, s, action));
return scheduler.Schedule((state, action), dueTime, (self, t) => InvokeAsync(self, t.state, t.action));
}

private static IDisposable ScheduleAsync_<TState>(IScheduler scheduler, TState state, DateTimeOffset dueTime, Func<IScheduler, TState, CancellationToken, Task> action)
{
return scheduler.Schedule(state, dueTime, (self, s) => InvokeAsync(self, s, action));
return scheduler.Schedule((state, action), dueTime, (self, t) => InvokeAsync(self, t.state, t.action));
}

private static IDisposable ScheduleAsync_<TState>(IScheduler scheduler, TState state, DateTimeOffset dueTime, Func<IScheduler, TState, CancellationToken, Task<IDisposable>> action)
{
return scheduler.Schedule(state, dueTime, (self, s) => InvokeAsync(self, s, action));
return scheduler.Schedule((state, action), dueTime, (self, t) => InvokeAsync(self, t.state, t.action));
}

private static IDisposable InvokeAsync<TState>(IScheduler self, TState s, Func<IScheduler, TState, CancellationToken, Task<IDisposable>> action)
Expand All @@ -384,7 +384,7 @@ private static IDisposable InvokeAsync<TState>(IScheduler self, TState s, Func<I

private static IDisposable InvokeAsync<TState>(IScheduler self, TState s, Func<IScheduler, TState, CancellationToken, Task> action)
{
return InvokeAsync(self, s, (self_, state, ct) => action(self_, state, ct).ContinueWith(_ => Disposable.Empty));
return InvokeAsync(self, (action, state: s), (self_, t, ct) => t.action(self_, t.state, ct).ContinueWith(_ => Disposable.Empty));
}

private static CancellationToken GetCancellationToken(this IScheduler scheduler)
Expand Down