forked from microsoft/vs-threading
/
JoinableTaskFactory.cs
1285 lines (1155 loc) · 62.9 KB
/
JoinableTaskFactory.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace Microsoft.VisualStudio.Threading
{
using System;
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Globalization;
using System.Linq;
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Security;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using JoinableTaskSynchronizationContext = Microsoft.VisualStudio.Threading.JoinableTask.JoinableTaskSynchronizationContext;
/// <summary>
/// A factory for starting asynchronous tasks that can mitigate deadlocks
/// when the tasks require the Main thread of an application and the Main
/// thread may itself be blocking on the completion of a task.
/// </summary>
/// <remarks>
/// For more complete comments please see the <see cref="JoinableTaskContext"/>.
/// </remarks>
public partial class JoinableTaskFactory
{
/// <summary>
/// The <see cref="JoinableTaskContext"/> that owns this instance.
/// </summary>
private readonly JoinableTaskContext owner;
private readonly SynchronizationContext mainThreadJobSyncContext;
/// <summary>
/// The collection to add all created tasks to. May be <c>null</c>.
/// </summary>
private readonly JoinableTaskCollection? jobCollection;
/// <summary>
/// Backing field for the <see cref="HangDetectionTimeout"/> property.
/// </summary>
private TimeSpan hangDetectionTimeout = TimeSpan.FromSeconds(6);
/// <summary>
/// Initializes a new instance of the <see cref="JoinableTaskFactory" /> class.
/// </summary>
/// <param name="owner">The context for the tasks created by this factory.</param>
public JoinableTaskFactory(JoinableTaskContext owner)
: this(owner, null)
{
}
/// <summary>
/// Initializes a new instance of the <see cref="JoinableTaskFactory" /> class
/// that adds all generated jobs to the specified collection.
/// </summary>
/// <param name="collection">The collection that all tasks created by this factory will belong to till they complete.</param>
public JoinableTaskFactory(JoinableTaskCollection collection)
: this(Requires.NotNull(collection, "collection").Context, collection)
{
}
/// <summary>
/// Initializes a new instance of the <see cref="JoinableTaskFactory"/> class.
/// </summary>
/// <param name="owner">The context for the tasks created by this factory.</param>
/// <param name="collection">The collection that all tasks created by this factory will belong to till they complete. May be null.</param>
internal JoinableTaskFactory(JoinableTaskContext owner, JoinableTaskCollection? collection)
{
Requires.NotNull(owner, nameof(owner));
Assumes.True(collection is null || collection.Context == owner);
this.owner = owner;
this.jobCollection = collection;
this.mainThreadJobSyncContext = new JoinableTaskSynchronizationContext(this);
}
/// <summary>
/// Gets the joinable task context to which this factory belongs.
/// </summary>
public JoinableTaskContext Context
{
get { return this.owner; }
}
/// <summary>
/// Gets the synchronization context to apply before executing work associated with this factory.
/// </summary>
internal SynchronizationContext? ApplicableJobSyncContext
{
get { return this.Context.IsOnMainThread ? this.mainThreadJobSyncContext : null; }
}
/// <summary>
/// Gets the collection to which created tasks belong until they complete. May be null.
/// </summary>
internal JoinableTaskCollection? Collection
{
get { return this.jobCollection; }
}
/// <summary>
/// Gets or sets the timeout after which no activity while synchronously blocking
/// suggests a hang has occurred.
/// </summary>
protected TimeSpan HangDetectionTimeout
{
get
{
return this.hangDetectionTimeout;
}
set
{
Requires.Range(value > TimeSpan.Zero, "value");
this.hangDetectionTimeout = value;
}
}
/// <summary>
/// Gets the underlying <see cref="SynchronizationContext"/> that controls the main thread in the host.
/// </summary>
protected SynchronizationContext? UnderlyingSynchronizationContext
{
get { return this.Context.UnderlyingSynchronizationContext; }
}
/// <summary>
/// Gets an awaitable whose continuations execute on the synchronization context that this instance was initialized with,
/// in such a way as to mitigate both deadlocks and reentrancy.
/// </summary>
/// <param name="cancellationToken">
/// A token whose cancellation will immediately schedule the continuation
/// on a threadpool thread and will cause the continuation to throw <see cref="OperationCanceledException"/>,
/// even if the caller is already on the main thread.
/// </param>
/// <returns>An awaitable.</returns>
/// <exception cref="OperationCanceledException">
/// Thrown back at the awaiting caller if <paramref name="cancellationToken"/> is canceled,
/// even if the caller is already on the main thread.
/// </exception>
/// <remarks>
/// <example>
/// <code>
/// private async Task SomeOperationAsync() {
/// // on the caller's thread.
/// await DoAsync();
///
/// // Now switch to a threadpool thread explicitly.
/// await TaskScheduler.Default;
///
/// // Now switch to the Main thread to talk to some STA object.
/// await this.JobContext.SwitchToMainThreadAsync();
/// STAService.DoSomething();
/// }
/// </code>
/// </example>
/// </remarks>
public MainThreadAwaitable SwitchToMainThreadAsync(CancellationToken cancellationToken = default(CancellationToken))
{
return new MainThreadAwaitable(this, this.Context.AmbientTask, cancellationToken);
}
/// <summary>
/// Gets an awaitable whose continuations execute on the synchronization context that this instance was initialized with,
/// in such a way as to mitigate both deadlocks and reentrancy.
/// </summary>
/// <param name="alwaysYield">A value indicating whether the caller should yield even if
/// already executing on the main thread.</param>
/// <param name="cancellationToken">
/// A token whose cancellation will immediately schedule the continuation
/// on a threadpool thread and will cause the continuation to throw <see cref="OperationCanceledException"/>,
/// even if the caller is already on the main thread.
/// </param>
/// <returns>An awaitable.</returns>
/// <exception cref="OperationCanceledException">
/// Thrown back at the awaiting caller if <paramref name="cancellationToken"/> is canceled,
/// even if the caller is already on the main thread.
/// </exception>
/// <remarks>
/// <example>
/// <code>
/// private async Task SomeOperationAsync()
/// {
/// // This first part can be on the caller's thread, whatever that is.
/// DoSomething();
///
/// // Now switch to the Main thread to talk to some STA object.
/// // Supposing it is also important to *not* do this step on our caller's callstack,
/// // be sure we yield even if we're on the UI thread.
/// await this.JoinableTaskFactory.SwitchToMainThreadAsync(alwaysYield: true);
/// STAService.DoSomething();
/// }
/// </code>
/// </example>
/// </remarks>
public MainThreadAwaitable SwitchToMainThreadAsync(bool alwaysYield, CancellationToken cancellationToken = default(CancellationToken))
{
return new MainThreadAwaitable(this, this.Context.AmbientTask, cancellationToken, alwaysYield);
}
/// <summary>
/// Runs the specified asynchronous method to completion while synchronously blocking the calling thread.
/// </summary>
/// <param name="asyncMethod">The asynchronous method to execute.</param>
/// <remarks>
/// <para>Any exception thrown by the delegate is rethrown in its original type to the caller of this method.</para>
/// <para>When the delegate resumes from a yielding await, the default behavior is to resume in its original context
/// as an ordinary async method execution would. For example, if the caller was on the main thread, execution
/// resumes after an await on the main thread; but if it started on a threadpool thread it resumes on a threadpool thread.</para>
/// <example>
/// <code>
/// // On threadpool or Main thread, this method will block
/// // the calling thread until all async operations in the
/// // delegate complete.
/// joinableTaskFactory.Run(async delegate {
/// // still on the threadpool or Main thread as before.
/// await OperationAsync();
/// // still on the threadpool or Main thread as before.
/// await Task.Run(async delegate {
/// // Now we're on a threadpool thread.
/// await Task.Yield();
/// // still on a threadpool thread.
/// });
/// // Now back on the Main thread (or threadpool thread if that's where we started).
/// });
/// </code>
/// </example>
/// </remarks>
public void Run(Func<Task> asyncMethod)
{
this.Run(asyncMethod, JoinableTaskCreationOptions.None, entrypointOverride: null);
}
/// <summary>
/// Runs the specified asynchronous method to completion while synchronously blocking the calling thread.
/// </summary>
/// <param name="asyncMethod">The asynchronous method to execute.</param>
/// <param name="creationOptions">The <see cref="JoinableTaskCreationOptions"/> used to customize the task's behavior.</param>
public void Run(Func<Task> asyncMethod, JoinableTaskCreationOptions creationOptions)
{
this.Run(asyncMethod, creationOptions, entrypointOverride: null);
}
/// <summary>
/// Runs the specified asynchronous method to completion while synchronously blocking the calling thread.
/// </summary>
/// <typeparam name="T">The type of value returned by the asynchronous operation.</typeparam>
/// <param name="asyncMethod">The asynchronous method to execute.</param>
/// <returns>The result of the Task returned by <paramref name="asyncMethod"/>.</returns>
/// <remarks>
/// <para>Any exception thrown by the delegate is rethrown in its original type to the caller of this method.</para>
/// <para>When the delegate resumes from a yielding await, the default behavior is to resume in its original context
/// as an ordinary async method execution would. For example, if the caller was on the main thread, execution
/// resumes after an await on the main thread; but if it started on a threadpool thread it resumes on a threadpool thread.</para>
/// <para>See the <see cref="Run(Func{Task})" /> overload documentation for an example.</para>
/// </remarks>
public T Run<T>(Func<Task<T>> asyncMethod)
{
return this.Run(asyncMethod, JoinableTaskCreationOptions.None);
}
/// <summary>
/// Runs the specified asynchronous method to completion while synchronously blocking the calling thread.
/// </summary>
/// <typeparam name="T">The type of value returned by the asynchronous operation.</typeparam>
/// <param name="asyncMethod">The asynchronous method to execute.</param>
/// <param name="creationOptions">The <see cref="JoinableTaskCreationOptions"/> used to customize the task's behavior.</param>
/// <returns>The result of the Task returned by <paramref name="asyncMethod"/>.</returns>
/// <remarks>
/// <para>Any exception thrown by the delegate is rethrown in its original type to the caller of this method.</para>
/// <para>When the delegate resumes from a yielding await, the default behavior is to resume in its original context
/// as an ordinary async method execution would. For example, if the caller was on the main thread, execution
/// resumes after an await on the main thread; but if it started on a threadpool thread it resumes on a threadpool thread.</para>
/// </remarks>
public T Run<T>(Func<Task<T>> asyncMethod, JoinableTaskCreationOptions creationOptions)
{
VerifyNoNonConcurrentSyncContext();
JoinableTask<T>? joinable = this.RunAsync(asyncMethod, synchronouslyBlocking: true, creationOptions: creationOptions);
return joinable.CompleteOnCurrentThread();
}
/// <summary>
/// Invokes an async delegate on the caller's thread, and yields back to the caller when the async method yields.
/// The async delegate is invoked in such a way as to mitigate deadlocks in the event that the async method
/// requires the main thread while the main thread is blocked waiting for the async method's completion.
/// </summary>
/// <param name="asyncMethod">The method that, when executed, will begin the async operation.</param>
/// <returns>An object that tracks the completion of the async operation, and allows for later synchronous blocking of the main thread for completion if necessary.</returns>
/// <remarks>
/// <para>Exceptions thrown by the delegate are captured by the returned <see cref="JoinableTask" />.</para>
/// <para>When the delegate resumes from a yielding await, the default behavior is to resume in its original context
/// as an ordinary async method execution would. For example, if the caller was on the main thread, execution
/// resumes after an await on the main thread; but if it started on a threadpool thread it resumes on a threadpool thread.</para>
/// </remarks>
public JoinableTask RunAsync(Func<Task> asyncMethod)
{
return this.RunAsync(asyncMethod, synchronouslyBlocking: false, creationOptions: JoinableTaskCreationOptions.None);
}
/// <summary>
/// Invokes an async delegate on the caller's thread, and yields back to the caller when the async method yields.
/// The async delegate is invoked in such a way as to mitigate deadlocks in the event that the async method
/// requires the main thread while the main thread is blocked waiting for the async method's completion.
/// </summary>
/// <param name="asyncMethod">The method that, when executed, will begin the async operation.</param>
/// <returns>An object that tracks the completion of the async operation, and allows for later synchronous blocking of the main thread for completion if necessary.</returns>
/// <param name="creationOptions">The <see cref="JoinableTaskCreationOptions"/> used to customize the task's behavior.</param>
/// <remarks>
/// <para>Exceptions thrown by the delegate are captured by the returned <see cref="JoinableTask" />.</para>
/// <para>When the delegate resumes from a yielding await, the default behavior is to resume in its original context
/// as an ordinary async method execution would. For example, if the caller was on the main thread, execution
/// resumes after an await on the main thread; but if it started on a threadpool thread it resumes on a threadpool thread.</para>
/// </remarks>
public JoinableTask RunAsync(Func<Task> asyncMethod, JoinableTaskCreationOptions creationOptions)
{
return this.RunAsync(asyncMethod, synchronouslyBlocking: false, creationOptions: creationOptions);
}
/// <summary>
/// Invokes an async delegate on the caller's thread, and yields back to the caller when the async method yields.
/// The async delegate is invoked in such a way as to mitigate deadlocks in the event that the async method
/// requires the main thread while the main thread is blocked waiting for the async method's completion.
/// </summary>
/// <typeparam name="T">The type of value returned by the asynchronous operation.</typeparam>
/// <param name="asyncMethod">The method that, when executed, will begin the async operation.</param>
/// <returns>
/// An object that tracks the completion of the async operation, and allows for later synchronous blocking of the main thread for completion if necessary.
/// </returns>
/// <remarks>
/// <para>Exceptions thrown by the delegate are captured by the returned <see cref="JoinableTask" />.</para>
/// <para>When the delegate resumes from a yielding await, the default behavior is to resume in its original context
/// as an ordinary async method execution would. For example, if the caller was on the main thread, execution
/// resumes after an await on the main thread; but if it started on a threadpool thread it resumes on a threadpool thread.</para>
/// </remarks>
public JoinableTask<T> RunAsync<T>(Func<Task<T>> asyncMethod)
{
return this.RunAsync(asyncMethod, synchronouslyBlocking: false, creationOptions: JoinableTaskCreationOptions.None);
}
/// <summary>
/// Invokes an async delegate on the caller's thread, and yields back to the caller when the async method yields.
/// The async delegate is invoked in such a way as to mitigate deadlocks in the event that the async method
/// requires the main thread while the main thread is blocked waiting for the async method's completion.
/// </summary>
/// <typeparam name="T">The type of value returned by the asynchronous operation.</typeparam>
/// <param name="asyncMethod">The method that, when executed, will begin the async operation.</param>
/// <param name="creationOptions">The <see cref="JoinableTaskCreationOptions"/> used to customize the task's behavior.</param>
/// <returns>
/// An object that tracks the completion of the async operation, and allows for later synchronous blocking of the main thread for completion if necessary.
/// </returns>
/// <remarks>
/// <para>Exceptions thrown by the delegate are captured by the returned <see cref="JoinableTask" />.</para>
/// <para>When the delegate resumes from a yielding await, the default behavior is to resume in its original context
/// as an ordinary async method execution would. For example, if the caller was on the main thread, execution
/// resumes after an await on the main thread; but if it started on a threadpool thread it resumes on a threadpool thread.</para>
/// </remarks>
public JoinableTask<T> RunAsync<T>(Func<Task<T>> asyncMethod, JoinableTaskCreationOptions creationOptions)
{
return this.RunAsync(asyncMethod, synchronouslyBlocking: false, creationOptions: creationOptions);
}
/// <summary>
/// Responds to calls to <see cref="JoinableTaskFactory.MainThreadAwaiter.OnCompleted(Action)"/>
/// by scheduling a continuation to execute on the Main thread.
/// </summary>
/// <param name="callback">The callback to invoke.</param>
internal SingleExecuteProtector RequestSwitchToMainThread(Action callback)
{
Requires.NotNull(callback, nameof(callback));
// Make sure that this thread switch request is in a job that is captured by the job collection
// to which this switch request belongs.
// If an ambient job already exists and belongs to the collection, that's good enough. But if
// there is no ambient job, or the ambient job does not belong to the collection, we must create
// a (child) job and add that to this job factory's collection so that folks joining that factory
// can help this switch to complete.
JoinableTask? ambientJob = this.Context.AmbientTask;
SingleExecuteProtector? wrapper = null;
if (ambientJob is null || (this.jobCollection is object && !this.jobCollection.Contains(ambientJob)))
{
JoinableTask? transient = this.RunAsync(
delegate
{
RoslynDebug.Assert(this.Context.AmbientTask is object, $"{nameof(this.Context.AmbientTask)} is always set for {nameof(this.RunAsync)} callbacks.");
ambientJob = this.Context.AmbientTask;
wrapper = SingleExecuteProtector.Create(ambientJob, callback);
ambientJob.Post(SingleExecuteProtector.ExecuteOnce, wrapper, true);
return Task.CompletedTask;
},
synchronouslyBlocking: false,
creationOptions: JoinableTaskCreationOptions.None,
entrypointOverride: callback);
if (transient.Task.IsFaulted)
{
// rethrow the exception.
transient.Task.GetAwaiter().GetResult();
}
}
else
{
wrapper = SingleExecuteProtector.Create(ambientJob, callback);
ambientJob.Post(SingleExecuteProtector.ExecuteOnce, wrapper, true);
}
Assumes.NotNull(wrapper);
return wrapper;
}
/// <summary>
/// Posts a callback to the main thread via the underlying dispatcher,
/// or to the threadpool when no dispatcher exists on the main thread.
/// </summary>
internal void PostToUnderlyingSynchronizationContextOrThreadPool(SingleExecuteProtector callback)
{
Requires.NotNull(callback, nameof(callback));
if (this.UnderlyingSynchronizationContext is object)
{
this.PostToUnderlyingSynchronizationContext(SingleExecuteProtector.ExecuteOnce, callback);
}
else
{
ThreadPool.QueueUserWorkItem(SingleExecuteProtector.ExecuteOnceWaitCallback, callback);
}
}
/// <summary>Runs the specified asynchronous method.</summary>
/// <param name="asyncMethod">The asynchronous method to execute.</param>
/// <param name="creationOptions">The <see cref="JoinableTaskCreationOptions"/> used to customize the task's behavior.</param>
/// <param name="entrypointOverride">The delegate to record as the entrypoint for this JoinableTask.</param>
internal void Run(Func<Task> asyncMethod, JoinableTaskCreationOptions creationOptions, Delegate? entrypointOverride)
{
VerifyNoNonConcurrentSyncContext();
JoinableTask? joinable = this.RunAsync(asyncMethod, synchronouslyBlocking: true, creationOptions: creationOptions, entrypointOverride: entrypointOverride);
joinable.CompleteOnCurrentThread();
}
internal void Post(SendOrPostCallback callback, object? state, bool mainThreadAffinitized)
{
Requires.NotNull(callback, nameof(callback));
if (mainThreadAffinitized)
{
JoinableTask? transient = this.RunAsync(delegate
{
RoslynDebug.Assert(this.Context.AmbientTask is object, $"{nameof(this.Context.AmbientTask)} is always set for {nameof(this.RunAsync)} callbacks.");
this.Context.AmbientTask.Post(callback, state, true);
return Task.CompletedTask;
});
if (transient.Task.IsFaulted)
{
// rethrow the exception.
transient.Task.GetAwaiter().GetResult();
}
}
else
{
ThreadPool.QueueUserWorkItem(new WaitCallback(callback), state);
}
}
/// <summary>
/// Posts a message to the specified underlying SynchronizationContext for processing when the main thread
/// is freely available.
/// </summary>
/// <param name="callback">The callback to invoke.</param>
/// <param name="state">State to pass to the callback.</param>
protected internal virtual void PostToUnderlyingSynchronizationContext(SendOrPostCallback callback, object state)
{
Requires.NotNull(callback, nameof(callback));
Assumes.NotNull(this.UnderlyingSynchronizationContext);
this.UnderlyingSynchronizationContext.Post(callback, state);
}
/// <summary>
/// Raised when a joinable task has requested a transition to the main thread.
/// </summary>
/// <param name="joinableTask">The task requesting the transition to the main thread.</param>
/// <remarks>
/// This event may be raised on any thread, including the main thread.
/// </remarks>
protected internal virtual void OnTransitioningToMainThread(JoinableTask joinableTask)
{
Requires.NotNull(joinableTask, nameof(joinableTask));
}
/// <summary>
/// Raised whenever a joinable task has completed a transition to the main thread.
/// </summary>
/// <param name="joinableTask">The task whose request to transition to the main thread has completed.</param>
/// <param name="canceled">A value indicating whether the transition was cancelled before it was fulfilled.</param>
/// <remarks>
/// This event is usually raised on the main thread, but can be on another thread when <paramref name="canceled"/> is <c>true</c>.
/// </remarks>
protected internal virtual void OnTransitionedToMainThread(JoinableTask joinableTask, bool canceled)
{
Requires.NotNull(joinableTask, nameof(joinableTask));
}
/// <summary>
/// Synchronously blocks the calling thread for the completion of the specified task.
/// If running on the main thread, any applicable message pump is suppressed
/// while the thread sleeps.
/// </summary>
/// <param name="task">The task whose completion is being waited on.</param>
/// <remarks>
/// Implementations should take care that exceptions from faulted or canceled tasks
/// not be thrown back to the caller.
/// </remarks>
protected internal virtual void WaitSynchronously(Task task)
{
if (this.Context.IsOnMainThread)
{
// Suppress any reentrancy by causing this synchronously blocking wait
// to not pump any messages at all.
using (this.Context.NoMessagePumpSynchronizationContext.Apply())
{
this.WaitSynchronouslyCore(task);
}
}
else
{
this.WaitSynchronouslyCore(task);
}
}
/// <summary>
/// Synchronously blocks the calling thread for the completion of the specified task.
/// </summary>
/// <param name="task">The task whose completion is being waited on.</param>
/// <remarks>
/// Implementations should take care that exceptions from faulted or canceled tasks
/// not be thrown back to the caller.
/// </remarks>
protected virtual void WaitSynchronouslyCore(Task task)
{
Requires.NotNull(task, nameof(task));
int hangTimeoutsCount = 0; // useful for debugging dump files to see how many times we looped.
int hangNotificationCount = 0;
Guid hangId = Guid.Empty;
Stopwatch? stopWatch = null;
try
{
while (!task.Wait(this.HangDetectionTimeout))
{
if (hangTimeoutsCount == 0)
{
stopWatch = Stopwatch.StartNew();
}
hangTimeoutsCount++;
TimeSpan hangDuration = TimeSpan.FromMilliseconds(this.HangDetectionTimeout.TotalMilliseconds * hangTimeoutsCount);
if (hangId == Guid.Empty)
{
hangId = Guid.NewGuid();
}
if (!this.IsWaitingOnLongRunningTask())
{
hangNotificationCount++;
this.Context.OnHangDetected(hangDuration, hangNotificationCount, hangId);
}
}
if (hangNotificationCount > 0)
{
RoslynDebug.Assert(stopWatch is object);
// We detect a false alarm. The stop watch was started after the first timeout, so we add intial timeout to the total delay.
this.Context.OnFalseHangDetected(
stopWatch.Elapsed + this.HangDetectionTimeout,
hangId);
}
}
catch (AggregateException)
{
// Swallow exceptions thrown by Task.Wait().
// Our caller just wants to know when the Task completes,
// whether successfully or not.
}
}
/// <summary>
/// Check whether the current joinableTask is waiting on a long running task.
/// </summary>
/// <returns>Return true if the current synchronous task on the thread is waiting on a long running task.</returns>
protected bool IsWaitingOnLongRunningTask()
{
JoinableTask? currentBlockingTask = JoinableTask.TaskCompletingOnThisThread;
if (currentBlockingTask is object)
{
if ((currentBlockingTask.CreationOptions & JoinableTaskCreationOptions.LongRunning) == JoinableTaskCreationOptions.LongRunning)
{
return true;
}
using (this.Context.NoMessagePumpSynchronizationContext.Apply())
{
var allJoinedJobs = new HashSet<JoinableTask>();
lock (this.Context.SyncContextLock)
{
JoinableTaskDependencyGraph.AddSelfAndDescendentOrJoinedJobs(currentBlockingTask, allJoinedJobs);
return allJoinedJobs.Any(t => (t.CreationOptions & JoinableTaskCreationOptions.LongRunning) == JoinableTaskCreationOptions.LongRunning);
}
}
}
return false;
}
/// <summary>
/// Adds the specified joinable task to the applicable collection.
/// </summary>
protected void Add(JoinableTask joinable)
{
Requires.NotNull(joinable, nameof(joinable));
if (this.jobCollection is object)
{
this.jobCollection.Add(joinable);
}
}
/// <summary>
/// Throws an exception if an active AsyncReaderWriterLock
/// upgradeable read or write lock is held by the caller.
/// </summary>
/// <remarks>
/// This is important to call from the Run and Run{T} methods because
/// if they are called from within an ARWL upgradeable read or write lock,
/// then Run will synchronously block while inside the semaphore held
/// by the ARWL that prevents concurrency. If the delegate within Run
/// yields and then tries to reacquire the ARWL lock, it will be unable
/// to re-enter the semaphore, leading to a deadlock.
/// Instead, callers who hold UR/W locks should never call Run, or should
/// switch to the STA thread first in order to exit the semaphore before
/// calling the Run method.
/// </remarks>
private static void VerifyNoNonConcurrentSyncContext()
{
// Don't use Verify.Operation here to avoid loading a string resource in success cases.
if (SynchronizationContext.Current is AsyncReaderWriterLock.NonConcurrentSynchronizationContext)
{
Report.Fail(Strings.NotAllowedUnderURorWLock); // pops a CHK assert dialog, but doesn't throw.
Verify.FailOperation(Strings.NotAllowedUnderURorWLock); // actually throws, even in RET.
}
}
/// <summary>
/// Wraps the invocation of an async method such that it may
/// execute asynchronously, but may potentially be
/// synchronously completed (waited on) in the future.
/// </summary>
/// <param name="asyncMethod">The asynchronous method to execute.</param>
/// <param name="synchronouslyBlocking">A value indicating whether the launching thread will synchronously block for this job's completion.</param>
/// <param name="creationOptions">The <see cref="JoinableTaskCreationOptions"/> used to customize the task's behavior.</param>
/// <param name="entrypointOverride">The entry method's info for diagnostics.</param>
private JoinableTask RunAsync(Func<Task> asyncMethod, bool synchronouslyBlocking, JoinableTaskCreationOptions creationOptions, Delegate? entrypointOverride = null)
{
Requires.NotNull(asyncMethod, nameof(asyncMethod));
var job = new JoinableTask(this, synchronouslyBlocking, creationOptions, entrypointOverride ?? asyncMethod);
this.ExecuteJob<EmptyStruct>(asyncMethod, job);
return job;
}
private JoinableTask<T> RunAsync<T>(Func<Task<T>> asyncMethod, bool synchronouslyBlocking, JoinableTaskCreationOptions creationOptions)
{
Requires.NotNull(asyncMethod, nameof(asyncMethod));
var job = new JoinableTask<T>(this, synchronouslyBlocking, creationOptions, asyncMethod);
this.ExecuteJob<T>(asyncMethod, job);
return job;
}
private void ExecuteJob<T>(Func<Task> asyncMethod, JoinableTask job)
{
try
{
using (var framework = new RunFramework(this, job))
{
Task asyncMethodResult;
try
{
asyncMethodResult = asyncMethod();
}
catch (Exception ex)
{
var tcs = new TaskCompletionSource<T>();
tcs.SetException(ex);
asyncMethodResult = tcs.Task;
}
job.SetWrappedTask(asyncMethodResult);
}
}
catch (Exception ex) when (FailFast(ex))
{
// We use a crashing exception filter to capture all the detail possible (even before unwinding the callstack)
// when an exception is thrown from this critical method.
// In particular, we have seen the WeakReference object that is instantiated by "new RunFramework" throw OutOfMemoryException.
throw Assumes.NotReachable();
}
static bool FailFast(Exception ex)
{
Environment.FailFast("Unexpected exception thrown in critical scheduling code.", ex);
throw Assumes.NotReachable();
}
}
/// <summary>
/// An awaitable struct that facilitates an asynchronous transition to the Main thread.
/// </summary>
public readonly struct MainThreadAwaitable
{
private readonly JoinableTaskFactory? jobFactory;
private readonly JoinableTask? job;
private readonly CancellationToken cancellationToken;
private readonly bool alwaysYield;
/// <summary>
/// Initializes a new instance of the <see cref="MainThreadAwaitable"/> struct.
/// </summary>
internal MainThreadAwaitable(JoinableTaskFactory jobFactory, JoinableTask? job, CancellationToken cancellationToken, bool alwaysYield = false)
{
Requires.NotNull(jobFactory, nameof(jobFactory));
this.jobFactory = jobFactory;
this.job = job;
this.cancellationToken = cancellationToken;
this.alwaysYield = alwaysYield;
}
/// <summary>
/// Gets the awaiter.
/// </summary>
public MainThreadAwaiter GetAwaiter()
{
if (this.jobFactory is null)
{
return default;
}
return new MainThreadAwaiter(this.jobFactory, this.job, this.alwaysYield, this.cancellationToken);
}
}
/// <summary>
/// An awaiter struct that facilitates an asynchronous transition to the Main thread.
/// </summary>
public readonly struct MainThreadAwaiter : ICriticalNotifyCompletion
{
private static readonly Action<object> SafeCancellationAction = state => ThreadPool.QueueUserWorkItem(SingleExecuteProtector.ExecuteOnceWaitCallback, state);
private static readonly Action<object> UnsafeCancellationAction = state => ThreadPool.UnsafeQueueUserWorkItem(SingleExecuteProtector.ExecuteOnceWaitCallback, state);
private readonly JoinableTaskFactory? jobFactory;
private readonly CancellationToken cancellationToken;
private readonly bool alwaysYield;
private readonly JoinableTask? job;
private readonly bool synchronousCancellation;
/// <summary>
/// Holds the reference to the <see cref="CancellationTokenRegistration"/> struct, so that all the copies of <see cref="MainThreadAwaiter"/> will hold
/// the same <see cref="CancellationTokenRegistration"/> object.
/// </summary>
/// <remarks>
/// This must be initialized to either null or an <see cref="Nullable{T}"/> object holding no value.
/// If this starts as an <see cref="Nullable{T}"/> object object holding no value, then it means we are interested in the cancellation,
/// and its state would be changed following one of these 2 patterns determined by the execution order.
/// 1. if <see cref="OnCompleted(Action)"/> finishes before <see cref="GetResult"/> is being executed on main thread,
/// then this will hold the real registered value after <see cref="OnCompleted(Action)"/>, and <see cref="GetResult"/>
/// will dispose that value and set a default value of <see cref="CancellationTokenRegistration"/>.
/// 2. if <see cref="GetResult"/> is executed on main thread before <see cref="OnCompleted(Action)"/> registers the cancellation,
/// then this will hold a default value of <see cref="CancellationTokenRegistration"/>, and <see cref="OnCompleted(Action)"/>
/// would not touch it.
/// </remarks>
private readonly StrongBox<CancellationTokenRegistration?>? cancellationRegistrationPtr;
/// <summary>
/// Initializes a new instance of the <see cref="MainThreadAwaiter"/> struct.
/// </summary>
internal MainThreadAwaiter(JoinableTaskFactory jobFactory, JoinableTask? job, bool alwaysYield, CancellationToken cancellationToken)
{
this.jobFactory = jobFactory;
this.job = job;
this.cancellationToken = cancellationToken;
this.synchronousCancellation = cancellationToken.IsCancellationRequested && !alwaysYield;
this.alwaysYield = alwaysYield;
// Don't allocate the pointer if the cancellation token can't be canceled (or already is):
this.cancellationRegistrationPtr = cancellationToken.CanBeCanceled && !this.synchronousCancellation
? new StrongBox<CancellationTokenRegistration?>()
: null;
}
/// <summary>
/// Gets a value indicating whether the caller is already on the Main thread.
/// </summary>
public bool IsCompleted
{
get
{
if (this.alwaysYield)
{
return false;
}
return this.synchronousCancellation
|| this.jobFactory is null
|| this.jobFactory.Context.IsOnMainThread
|| this.jobFactory.Context.UnderlyingSynchronizationContext is null;
}
}
/// <summary>
/// Schedules a continuation for execution on the Main thread
/// without capturing the ExecutionContext.
/// </summary>
/// <param name="continuation">The action to invoke when the operation completes.</param>
public void UnsafeOnCompleted(Action continuation)
{
this.OnCompleted(continuation, flowExecutionContext: false);
}
/// <summary>
/// Schedules a continuation for execution on the Main thread.
/// </summary>
/// <param name="continuation">The action to invoke when the operation completes.</param>
public void OnCompleted(Action continuation)
{
this.OnCompleted(continuation, flowExecutionContext: true);
}
/// <summary>
/// Called on the Main thread to prepare it to execute the continuation.
/// </summary>
public void GetResult()
{
Assumes.True(this.jobFactory is object);
if (!(this.jobFactory.Context.IsOnMainThread || this.jobFactory.Context.UnderlyingSynchronizationContext is null || this.cancellationToken.IsCancellationRequested))
{
throw new JoinableTaskContextException(Strings.SwitchToMainThreadFailedToReachExpectedThread);
}
// Release memory associated with the cancellation request.
if (this.cancellationRegistrationPtr is object)
{
CancellationTokenRegistration registration = default(CancellationTokenRegistration);
using (this.jobFactory.Context.NoMessagePumpSynchronizationContext.Apply())
{
lock (this.cancellationRegistrationPtr)
{
if (this.cancellationRegistrationPtr.Value.HasValue)
{
registration = this.cancellationRegistrationPtr.Value.Value;
}
// The reason we set this is to effectively null the struct that
// the strong box points to. Dispose does not seem to do this. If we
// have two copies of MainThreadAwaiter pointing to the same strongbox,
// then if one copy executes but the other does not, we could end
// up holding onto the memory pointed to through this pointer. By
// resetting the value here we make sure it gets cleaned.
//
// In addition, assigning default(CancellationTokenRegistration) to a field that
// stores a Nullable<CancellationTokenRegistration> effectively gives it a HasValue status,
// which will let OnCompleted know it lost the interest on the cancellation. That is an
// important hint for OnCompleted() in order NOT to leak the cancellation registration.
this.cancellationRegistrationPtr.Value = default(CancellationTokenRegistration);
}
}
// Intentionally deferring disposal till we exit the lock to avoid executing outside code within the lock.
registration.Dispose();
}
// If this method is called in a continuation after an actual yield, then SingleExecuteProtector.TryExecute
// should have already applied the appropriate SynchronizationContext to avoid deadlocks.
// However if no yield occurred then no TryExecute would have been invoked, so to avoid deadlocks in those
// cases, we apply the synchronization context here.
// We don't have an opportunity to revert the sync context change, but it turns out we don't need to because
// this method should only be called from async methods, which automatically revert any execution context
// changes they apply (including SynchronizationContext) when they complete, thanks to the way .NET 4.5 works.
SynchronizationContext? syncContext = this.job is object ? this.job.ApplicableJobSyncContext : this.jobFactory.ApplicableJobSyncContext;
syncContext.Apply();
// Cancel if requested, even if we arrived on the main thread.
// Unlike most async methods where throwing OperationCanceledException after completing the work may not be a good idea,
// SwitchToMainThreadAsync is a scheduler method, and always precedes some work by the caller that almost certainly should
// not be carried out if cancellation was requested.
this.cancellationToken.ThrowIfCancellationRequested();
}
/// <summary>
/// Schedules a continuation for execution on the Main thread.
/// </summary>
/// <param name="continuation">The action to invoke when the operation completes.</param>
/// <param name="flowExecutionContext">A value indicating whether to capture and reapply the current ExecutionContext for the continuation.</param>
private void OnCompleted(Action continuation, bool flowExecutionContext)
{
Assumes.True(this.jobFactory is object);
bool restoreFlow = !flowExecutionContext && !ExecutionContext.IsFlowSuppressed();
if (restoreFlow)
{
ExecutionContext.SuppressFlow();
}
try
{
// In the event of a cancellation request, it becomes a race as to whether the threadpool
// or the main thread will execute the continuation first. So we must wrap the continuation
// in a SingleExecuteProtector so that it can't be executed twice by accident.
// Success case of the main thread.
SingleExecuteProtector? wrapper = this.jobFactory.RequestSwitchToMainThread(continuation);
// Cancellation case of a threadpool thread.
if (this.cancellationRegistrationPtr is object)
{
// Store the cancellation token registration in the struct pointer. This way,
// if the awaiter has been copied (since it's a struct), each copy of the awaiter
// points to the same registration. Without this we can have a memory leak.
CancellationTokenRegistration registration = this.cancellationToken.Register(
NullableHelpers.AsNullableArgAction(flowExecutionContext ? SafeCancellationAction : UnsafeCancellationAction),
wrapper,
useSynchronizationContext: false);
// Needs a lock to avoid a race condition between this method and GetResult().
// This method is usually called on a background thread. After "this.jobFactory.RequestSwitchToMainThread()" returns,
// the continuation is scheduled and GetResult() will be called whenever it is ready on main thread.
// We have observed sometimes GetResult() was called right after "this.jobFactory.RequestSwitchToMainThread()"
// and before "this.cancellationToken.Register()". If that happens, that means we lose the interest on the cancellation
// and should not register the cancellation here. Without protecting that, "this.cancellationRegistrationPtr" will be leaked.
bool disposeThisRegistration = false;
using (this.jobFactory.Context.NoMessagePumpSynchronizationContext.Apply())
{
lock (this.cancellationRegistrationPtr)
{
if (!this.cancellationRegistrationPtr.Value.HasValue)
{
this.cancellationRegistrationPtr.Value = registration;
}
else
{
disposeThisRegistration = true;
}
}
}
if (disposeThisRegistration)
{
registration.Dispose();
}
}
}
catch (Exception ex)
{
// This is bad. It would cause a hang without a trace as to why, since if we can't
// schedule the continuation, stuff would just never happen.
// Crash now, so that a Watson report would capture the original error.
Environment.FailFast("Failed to schedule time on the UI thread. A continuation would never execute.", ex);
}
finally
{
if (restoreFlow)
{
ExecutionContext.RestoreFlow();
}
}
}
}
/// <summary>
/// A value to construct with a C# using block in all the Run method overloads
/// to setup and teardown the boilerplate stuff.
/// </summary>
private readonly struct RunFramework : IDisposable
{
private readonly JoinableTaskFactory factory;
private readonly SpecializedSyncContext syncContextRevert;
private readonly JoinableTask joinable;
private readonly JoinableTask? previousJoinable;