/
ConcurrencyAbstractionLayerImpl.cs
257 lines (225 loc) · 9.04 KB
/
ConcurrencyAbstractionLayerImpl.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.
#if !NO_THREAD
using System.Collections.Generic;
using System.Reactive.Disposables;
using System.Threading;
namespace System.Reactive.Concurrency
{
//
// WARNING: This code is kept *identically* in two places. One copy is kept in System.Reactive.Core for non-PLIB platforms.
// Another copy is kept in System.Reactive.PlatformServices to enlighten the default lowest common denominator
// behavior of Rx for PLIB when used on a more capable platform.
//
internal class /*Default*/ConcurrencyAbstractionLayerImpl : IConcurrencyAbstractionLayer
{
public IDisposable StartTimer(Action<object> action, object state, TimeSpan dueTime) => new Timer(action, state, Normalize(dueTime));
public IDisposable StartPeriodicTimer(Action action, TimeSpan period)
{
if (period < TimeSpan.Zero)
throw new ArgumentOutOfRangeException(nameof(period));
//
// The contract for periodic scheduling in Rx is that specifying TimeSpan.Zero as the period causes the scheduler to
// call back periodically as fast as possible, sequentially.
//
if (period == TimeSpan.Zero)
{
return new FastPeriodicTimer(action);
}
else
{
return new PeriodicTimer(action, period);
}
}
public IDisposable QueueUserWorkItem(Action<object> action, object state)
{
System.Threading.ThreadPool.QueueUserWorkItem(_ => action(_), state);
return Disposable.Empty;
}
public void Sleep(TimeSpan timeout) => System.Threading.Thread.Sleep(Normalize(timeout));
public IStopwatch StartStopwatch() => new StopwatchImpl();
public bool SupportsLongRunning => true;
public void StartThread(Action<object> action, object state)
{
new Thread(() =>
{
action(state);
}) { IsBackground = true }.Start();
}
private static TimeSpan Normalize(TimeSpan dueTime) => dueTime < TimeSpan.Zero ? TimeSpan.Zero : dueTime;
//
// Some historical context. In the early days of Rx, we discovered an issue with
// the rooting of timers, causing them to get GC'ed even when the IDisposable of
// a scheduled activity was kept alive. The original code simply created a timer
// as follows:
//
// var t = default(Timer);
// t = new Timer(_ =>
// {
// t = null;
// Debug.WriteLine("Hello!");
// }, null, 5000, Timeout.Infinite);
//
// IIRC the reference to "t" captured by the closure wasn't sufficient on .NET CF
// to keep the timer rooted, causing problems on Windows Phone 7. As a result, we
// added rooting code using a dictionary (SD 7280), which we carried forward all
// the way to Rx v2.0 RTM.
//
// However, the desktop CLR's implementation of System.Threading.Timer exhibits
// other characteristics where a timer can root itself when the timer is still
// reachable through the state or callback parameters. To illustrate this, run
// the following piece of code:
//
// static void Main()
// {
// Bar();
//
// while (true)
// {
// GC.Collect();
// GC.WaitForPendingFinalizers();
// Thread.Sleep(100);
// }
// }
//
// static void Bar()
// {
// var t = default(Timer);
// t = new Timer(_ =>
// {
// t = null; // Comment out this line to see the timer stop
// Console.WriteLine("Hello!");
// }, null, 5000, Timeout.Infinite);
// }
//
// When the closure over "t" is removed, the timer will stop automatically upon
// garbage collection. However, when retaining the reference, this problem does
// not exist. The code below exploits this behavior, avoiding unnecessary costs
// to root timers in a thread-safe manner.
//
// Below is a fragment of SOS output, proving the proper rooting:
//
// !gcroot 02492440
// HandleTable:
// 005a13fc (pinned handle)
// -> 03491010 System.Object[]
// -> 024924dc System.Threading.TimerQueue
// -> 02492450 System.Threading.TimerQueueTimer
// -> 02492420 System.Threading.TimerCallback
// -> 02492414 TimerRootingExperiment.Program+<>c__DisplayClass1
// -> 02492440 System.Threading.Timer
//
// With the USE_TIMER_SELF_ROOT symbol, we shake off this additional rooting code
// for newer platforms where this no longer needed. We checked this on .NET Core
// as well as .NET 4.0, and only #define this symbol for those platforms.
//
// NB: 4/13/2017 - All target platforms for the 4.x release have the self-rooting
// behavior described here, so we removed the USE_TIMER_SELF_ROOT
// symbol.
//
private sealed class Timer : IDisposable
{
private object _state;
private Action<object> _action;
private volatile System.Threading.Timer _timer;
public Timer(Action<object> action, object state, TimeSpan dueTime)
{
_state = state;
_action = action;
// Don't want the spin wait in Tick to get stuck if this thread gets aborted.
try { }
finally
{
//
// Rooting of the timer happens through the Timer's state
// which is the current instance and has a field to store the Timer instance.
//
_timer = new System.Threading.Timer(_ => Tick(_), this, dueTime, TimeSpan.FromMilliseconds(System.Threading.Timeout.Infinite));
}
}
private static void Tick(object state)
{
var timer = (Timer) state;
try
{
timer._action(timer._state);
}
finally
{
SpinWait.SpinUntil(timer.IsTimerAssigned);
timer.Dispose();
}
}
private bool IsTimerAssigned() => _timer != null;
public void Dispose()
{
var timer = _timer;
if (timer != TimerStubs.Never)
{
_action = Stubs<object>.Ignore;
_timer = TimerStubs.Never;
_state = null;
timer.Dispose();
}
}
}
private sealed class PeriodicTimer : IDisposable
{
private Action _action;
private volatile System.Threading.Timer _timer;
public PeriodicTimer(Action action, TimeSpan period)
{
_action = action;
//
// Rooting of the timer happens through the timer's state
// which is the current instance and has a field to store the Timer instance.
//
_timer = new System.Threading.Timer(_ => Tick(_), this, period, period);
}
private static void Tick(object state)
{
var timer = (PeriodicTimer)state;
timer._action();
}
public void Dispose()
{
var timer = _timer;
if (timer != null)
{
_action = Stubs.Nop;
_timer = null;
timer.Dispose();
}
}
}
private sealed class FastPeriodicTimer : IDisposable
{
private readonly Action _action;
private volatile bool disposed;
public FastPeriodicTimer(Action action)
{
_action = action;
new System.Threading.Thread(_ => Loop(_))
{
Name = "Rx-FastPeriodicTimer",
IsBackground = true
}
.Start(this);
}
private static void Loop(object threadParam)
{
var timer = (FastPeriodicTimer)threadParam;
while (!timer.disposed)
{
timer._action();
}
}
public void Dispose()
{
disposed = true;
}
}
}
}
#endif