Skip to content

Commit

Permalink
Avoid some closure allocations and enable delegate caching in the Con…
Browse files Browse the repository at this point in the history
…currency namespace.
  • Loading branch information
danielcweber committed May 29, 2018
1 parent 68acb4b commit 5c6e43e
Show file tree
Hide file tree
Showing 10 changed files with 212 additions and 129 deletions.
20 changes: 11 additions & 9 deletions Rx.NET/Source/src/System.Reactive/Concurrency/CatchScheduler.cs
Expand Up @@ -76,16 +76,18 @@ public CatchSchedulerLongRunning(ISchedulerLongRunning scheduler, Func<TExceptio

public IDisposable ScheduleLongRunning<TState>(TState state, Action<TState, ICancelable> action)
{
return _scheduler.ScheduleLongRunning(state, (state_, cancel) =>
{
try
{
action(state_, cancel);
}
catch (TException exception) when (_handler(exception))
return _scheduler.ScheduleLongRunning(
(scheduler: this, action, state),
(tuple, cancel) =>
{
}
});
try
{
tuple.action(tuple.state, cancel);
}
catch (TException exception) when (tuple.scheduler._handler(exception))
{
}
});
}
}

Expand Down
Expand Up @@ -16,6 +16,18 @@ namespace System.Reactive.Concurrency
//
internal class /*Default*/ConcurrencyAbstractionLayerImpl : IConcurrencyAbstractionLayer
{
private sealed class WorkItem
{
public WorkItem(Action<object> action, object state)
{
this.Action = action;
this.State = state;
}

public Action<object> Action { get; }
public object State { get; }
}

public IDisposable StartTimer(Action<object> action, object state, TimeSpan dueTime) => new Timer(action, state, Normalize(dueTime));

public IDisposable StartPeriodicTimer(Action action, TimeSpan period)
Expand All @@ -39,7 +51,13 @@ public IDisposable StartPeriodicTimer(Action action, TimeSpan period)

public IDisposable QueueUserWorkItem(Action<object> action, object state)
{
System.Threading.ThreadPool.QueueUserWorkItem(_ => action(_), state);
System.Threading.ThreadPool.QueueUserWorkItem(itemObject =>
{
var item = (WorkItem)itemObject;
item.Action(item.State);
}, new WorkItem(action, state));

return Disposable.Empty;
}

Expand All @@ -51,10 +69,12 @@ public IDisposable QueueUserWorkItem(Action<object> action, object state)

public void StartThread(Action<object> action, object state)
{
new Thread(() =>
new Thread(itemObject =>
{
action(state);
}) { IsBackground = true }.Start();
var item = (WorkItem)itemObject;
item.Action(item.State);
}) { IsBackground = true }.Start(new WorkItem(action, state));
}

private static TimeSpan Normalize(TimeSpan dueTime) => dueTime < TimeSpan.Zero ? TimeSpan.Zero : dueTime;
Expand Down Expand Up @@ -132,35 +152,39 @@ public void StartThread(Action<object> action, object state)

private sealed class Timer : IDisposable
{
private object _state;
private Action<object> _action;
private volatile System.Threading.Timer _timer;

public Timer(Action<object> action, object state, TimeSpan dueTime)
{
_state = state;
_action = action;

// Don't want the spin wait in Tick to get stuck if this thread gets aborted.
try { }
finally
{
//
// Rooting of the timer happens through the this.Tick delegate's target object,
// Rooting of the timer happens through the Timer's state
// which is the current instance and has a field to store the Timer instance.
//
_timer = new System.Threading.Timer(Tick, state, dueTime, TimeSpan.FromMilliseconds(System.Threading.Timeout.Infinite));
_timer = new System.Threading.Timer(_ => Tick(_), this, dueTime, TimeSpan.FromMilliseconds(System.Threading.Timeout.Infinite));
}
}

private void Tick(object state)
private static void Tick(object state)
{
var timer = (Timer) state;

try
{
_action(state);
timer._action(timer._state);
}
finally
{
SpinWait.SpinUntil(IsTimerAssigned);
Dispose();
SpinWait.SpinUntil(timer.IsTimerAssigned);
timer.Dispose();
}
}

Expand All @@ -173,6 +197,7 @@ public void Dispose()
{
_action = Stubs<object>.Ignore;
_timer = TimerStubs.Never;
_state = null;

timer.Dispose();
}
Expand All @@ -189,13 +214,18 @@ public PeriodicTimer(Action action, TimeSpan period)
_action = action;

//
// Rooting of the timer happens through the this.Tick delegate's target object,
// Rooting of the timer happens through the timer's state
// which is the current instance and has a field to store the Timer instance.
//
_timer = new System.Threading.Timer(Tick, null, period, period);
_timer = new System.Threading.Timer(_ => Tick(_), this, period, period);
}

private void Tick(object state) => _action();
private static void Tick(object state)
{
var timer = (PeriodicTimer)state;

timer._action();
}

public void Dispose()
{
Expand All @@ -219,19 +249,21 @@ public FastPeriodicTimer(Action action)
{
_action = action;

new System.Threading.Thread(Loop)
new System.Threading.Thread(_ => Loop(_))
{
Name = "Rx-FastPeriodicTimer",
IsBackground = true
}
.Start();
.Start(this);
}

private void Loop()
private static void Loop(object threadParam)
{
while (!disposed)
var timer = (FastPeriodicTimer)threadParam;

while (!timer.disposed)
{
_action();
timer._action();
}
}

Expand Down
72 changes: 48 additions & 24 deletions Rx.NET/Source/src/System.Reactive/Concurrency/DefaultScheduler.cs
Expand Up @@ -12,6 +12,43 @@ namespace System.Reactive.Concurrency
/// <seealso cref="Scheduler.Default">Singleton instance of this type exposed through this static property.</seealso>
public sealed class DefaultScheduler : LocalScheduler, ISchedulerPeriodic
{
private sealed class UserWorkItem<TState> : IDisposable
{
private IDisposable _cancelRunDisposable;
private IDisposable _cancelQueueDisposable;

private readonly TState _state;
private readonly IScheduler _scheduler;
private readonly Func<IScheduler, TState, IDisposable> _action;

public UserWorkItem(IScheduler scheduler, TState state, Func<IScheduler, TState, IDisposable> action)
{
_state = state;
_action = action;
_scheduler = scheduler;
}

public void Run()
{
if (!Disposable.GetIsDisposed(ref _cancelRunDisposable))
{
Disposable.TrySetSingle(ref _cancelRunDisposable, _action(_scheduler, _state));
}
}

public IDisposable CancelQueueDisposable
{
get => Disposable.GetValue(ref _cancelQueueDisposable);
set => Disposable.TrySetSingle(ref _cancelQueueDisposable, value);
}

public void Dispose()
{
Disposable.TryDispose(ref _cancelQueueDisposable);
Disposable.TryDispose(ref _cancelRunDisposable);
}
}

private static readonly Lazy<DefaultScheduler> s_instance = new Lazy<DefaultScheduler>(() => new DefaultScheduler());
private static IConcurrencyAbstractionLayer s_cal = ConcurrencyAbstractionLayer.Current;

Expand All @@ -37,20 +74,13 @@ public override IDisposable Schedule<TState>(TState state, Func<IScheduler, TSta
if (action == null)
throw new ArgumentNullException(nameof(action));

var d = new SingleAssignmentDisposable();
var workItem = new UserWorkItem<TState>(this, state, action);

var cancel = s_cal.QueueUserWorkItem(_ =>
{
if (!d.IsDisposed)
{
d.Disposable = action(this, state);
}
}, null);
workItem.CancelQueueDisposable = s_cal.QueueUserWorkItem(
closureWorkItem => ((UserWorkItem<TState>)closureWorkItem).Run(),
workItem);

return StableCompositeDisposable.Create(
d,
cancel
);
return workItem;
}

/// <summary>
Expand All @@ -71,20 +101,14 @@ public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Fun
if (dt.Ticks == 0)
return Schedule(state, action);

var d = new SingleAssignmentDisposable();
var workItem = new UserWorkItem<TState>(this, state, action);

var cancel = s_cal.StartTimer(_ =>
{
if (!d.IsDisposed)
{
d.Disposable = action(this, state);
}
}, null, dt);
workItem.CancelQueueDisposable = s_cal.StartTimer(
closureWorkItem => ((UserWorkItem<TState>)closureWorkItem).Run(),
workItem,
dt);

return StableCompositeDisposable.Create(
d,
cancel
);
return workItem;
}

/// <summary>
Expand Down
26 changes: 13 additions & 13 deletions Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Async.cs
Expand Up @@ -159,7 +159,7 @@ public static IDisposable ScheduleAsync(this IScheduler scheduler, Func<ISchedul
if (action == null)
throw new ArgumentNullException(nameof(action));

return ScheduleAsync_(scheduler, default(object), (self, o, ct) => action(self, ct));
return ScheduleAsync_(scheduler, action, (self, closureAction, ct) => closureAction(self, ct));
}

/// <summary>
Expand All @@ -176,7 +176,7 @@ public static IDisposable ScheduleAsync(this IScheduler scheduler, Func<ISchedul
if (action == null)
throw new ArgumentNullException(nameof(action));

return ScheduleAsync_(scheduler, default(object), (self, o, ct) => action(self, ct));
return ScheduleAsync_(scheduler, action, (self, closureAction, ct) => closureAction(self, ct));
}

/// <summary>
Expand Down Expand Up @@ -234,7 +234,7 @@ public static IDisposable ScheduleAsync(this IScheduler scheduler, TimeSpan dueT
if (action == null)
throw new ArgumentNullException(nameof(action));

return ScheduleAsync_(scheduler, default(object), dueTime, (self, o, ct) => action(self, ct));
return ScheduleAsync_(scheduler, action, dueTime, (self, closureAction, ct) => closureAction(self, ct));
}

/// <summary>
Expand All @@ -252,7 +252,7 @@ public static IDisposable ScheduleAsync(this IScheduler scheduler, TimeSpan dueT
if (action == null)
throw new ArgumentNullException(nameof(action));

return ScheduleAsync_(scheduler, default(object), dueTime, (self, o, ct) => action(self, ct));
return ScheduleAsync_(scheduler, action, dueTime, (self, closureAction, ct) => closureAction(self, ct));
}

/// <summary>
Expand Down Expand Up @@ -310,7 +310,7 @@ public static IDisposable ScheduleAsync(this IScheduler scheduler, DateTimeOffse
if (action == null)
throw new ArgumentNullException(nameof(action));

return ScheduleAsync_(scheduler, default(object), dueTime, (self, o, ct) => action(self, ct));
return ScheduleAsync_(scheduler, action, dueTime, (self, closureAction, ct) => closureAction(self, ct));
}

/// <summary>
Expand All @@ -328,37 +328,37 @@ public static IDisposable ScheduleAsync(this IScheduler scheduler, DateTimeOffse
if (action == null)
throw new ArgumentNullException(nameof(action));

return ScheduleAsync_(scheduler, default(object), dueTime, (self, o, ct) => action(self, ct));
return ScheduleAsync_(scheduler, action, dueTime, (self, closureAction, ct) => closureAction(self, ct));
}

private static IDisposable ScheduleAsync_<TState>(IScheduler scheduler, TState state, Func<IScheduler, TState, CancellationToken, Task> action)
{
return scheduler.Schedule(state, (self, s) => InvokeAsync(self, s, action));
return scheduler.Schedule((state, action), (self, t) => InvokeAsync(self, t.state, t.action));
}

private static IDisposable ScheduleAsync_<TState>(IScheduler scheduler, TState state, Func<IScheduler, TState, CancellationToken, Task<IDisposable>> action)
{
return scheduler.Schedule(state, (self, s) => InvokeAsync(self, s, action));
return scheduler.Schedule((state, action), (self, t) => InvokeAsync(self, t.state, t.action));
}

private static IDisposable ScheduleAsync_<TState>(IScheduler scheduler, TState state, TimeSpan dueTime, Func<IScheduler, TState, CancellationToken, Task> action)
{
return scheduler.Schedule(state, dueTime, (self, s) => InvokeAsync(self, s, action));
return scheduler.Schedule((state, action), dueTime, (self, t) => InvokeAsync(self, t.state, t.action));
}

private static IDisposable ScheduleAsync_<TState>(IScheduler scheduler, TState state, TimeSpan dueTime, Func<IScheduler, TState, CancellationToken, Task<IDisposable>> action)
{
return scheduler.Schedule(state, dueTime, (self, s) => InvokeAsync(self, s, action));
return scheduler.Schedule((state, action), dueTime, (self, t) => InvokeAsync(self, t.state, t.action));
}

private static IDisposable ScheduleAsync_<TState>(IScheduler scheduler, TState state, DateTimeOffset dueTime, Func<IScheduler, TState, CancellationToken, Task> action)
{
return scheduler.Schedule(state, dueTime, (self, s) => InvokeAsync(self, s, action));
return scheduler.Schedule((state, action), dueTime, (self, t) => InvokeAsync(self, t.state, t.action));
}

private static IDisposable ScheduleAsync_<TState>(IScheduler scheduler, TState state, DateTimeOffset dueTime, Func<IScheduler, TState, CancellationToken, Task<IDisposable>> action)
{
return scheduler.Schedule(state, dueTime, (self, s) => InvokeAsync(self, s, action));
return scheduler.Schedule((state, action), dueTime, (self, t) => InvokeAsync(self, t.state, t.action));
}

private static IDisposable InvokeAsync<TState>(IScheduler self, TState s, Func<IScheduler, TState, CancellationToken, Task<IDisposable>> action)
Expand All @@ -384,7 +384,7 @@ private static IDisposable InvokeAsync<TState>(IScheduler self, TState s, Func<I

private static IDisposable InvokeAsync<TState>(IScheduler self, TState s, Func<IScheduler, TState, CancellationToken, Task> action)
{
return InvokeAsync(self, s, (self_, state, ct) => action(self_, state, ct).ContinueWith(_ => Disposable.Empty));
return InvokeAsync(self, (action, state: s), (self_, t, ct) => t.action(self_, t.state, ct).ContinueWith(_ => Disposable.Empty));
}

private static CancellationToken GetCancellationToken(this IScheduler scheduler)
Expand Down

0 comments on commit 5c6e43e

Please sign in to comment.