-
Notifications
You must be signed in to change notification settings - Fork 772
/
BuildGraph.fs
141 lines (111 loc) · 5.31 KB
/
BuildGraph.fs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
// Copyright (c) Microsoft Corporation. All Rights Reserved. See License.txt in the project root for license information.
module FSharp.Compiler.BuildGraph
open System
open System.Threading
open System.Threading.Tasks
open System.Globalization
open Internal.Utilities.Library
[<AbstractClass; Sealed>]
type Async =
static member RunImmediateWithoutCancellation(computation) =
try
let work = async { return! computation }
Async
.StartImmediateAsTask(work, cancellationToken = CancellationToken.None)
.Result
with :? AggregateException as ex when ex.InnerExceptions.Count = 1 ->
raise (ex.InnerExceptions[0])
static member FromCancellable(computation: Cancellable<'T>) = Cancellable.toAsync computation
static member StartAsTask_ForTesting(computation: Async<'T>, ?ct: CancellationToken) =
Async.StartAsTask(computation, cancellationToken = defaultArg ct CancellationToken.None)
static member SequentialFailFast(computations: Async<'T> seq) =
async {
let results = ResizeArray()
for computation in computations do
let! result = computation
results.Add result
return results.ToArray()
}
[<RequireQualifiedAccess>]
module GraphNode =
// We need to store the culture for the VS thread that is executing now,
// so that when the agent in the async lazy object picks up thread from the thread pool we can set the culture
let mutable culture = CultureInfo(CultureInfo.CurrentUICulture.Name)
let SetPreferredUILang (preferredUiLang: string option) =
match preferredUiLang with
| Some s ->
culture <- CultureInfo s
#if FX_RESHAPED_GLOBALIZATION
CultureInfo.CurrentUICulture <- culture
#else
Thread.CurrentThread.CurrentUICulture <- culture
#endif
| None -> ()
[<Sealed>]
type GraphNode<'T> private (computation: Async<'T>, cachedResult: ValueOption<'T>, cachedResultNode: Async<'T>) =
let mutable computation = computation
let mutable requestCount = 0
let mutable cachedResult = cachedResult
let mutable cachedResultNode: Async<'T> = cachedResultNode
let isCachedResultNodeNotNull () =
not (obj.ReferenceEquals(cachedResultNode, null))
let semaphore = new SemaphoreSlim(1, 1)
member _.GetOrComputeValue() =
// fast path
if isCachedResultNodeNotNull () then
cachedResultNode
else
async {
Interlocked.Increment(&requestCount) |> ignore
try
let! ct = Async.CancellationToken
// We must set 'taken' before any implicit cancellation checks
// occur, making sure we are under the protection of the 'try'.
// For example, NodeCode's 'try/finally' (TryFinally) uses async.TryFinally which does
// implicit cancellation checks even before the try is entered, as do the
// de-sugaring of 'do!' and other NodeCode constructs.
let mutable taken = false
try
do!
semaphore
.WaitAsync(ct)
.ContinueWith(
(fun _ -> taken <- true),
(TaskContinuationOptions.NotOnCanceled
||| TaskContinuationOptions.NotOnFaulted
||| TaskContinuationOptions.ExecuteSynchronously)
)
|> Async.AwaitTask
match cachedResult with
| ValueSome value -> return value
| _ ->
let tcs = TaskCompletionSource<'T>()
let p = computation
Async.StartWithContinuations(
async {
Thread.CurrentThread.CurrentUICulture <- GraphNode.culture
return! p
},
(fun res ->
cachedResult <- ValueSome res
cachedResultNode <- async.Return res
computation <- Unchecked.defaultof<_>
tcs.SetResult(res)),
(fun ex -> tcs.SetException(ex)),
(fun _ -> tcs.SetCanceled()),
ct
)
return! tcs.Task |> Async.AwaitTask
finally
if taken then
semaphore.Release() |> ignore
finally
Interlocked.Decrement(&requestCount) |> ignore
}
member _.TryPeekValue() = cachedResult
member _.HasValue = cachedResult.IsSome
member _.IsComputing = requestCount > 0
static member FromResult(result: 'T) =
let nodeResult = async.Return result
GraphNode(nodeResult, ValueSome result, nodeResult)
new(computation) = GraphNode(computation, ValueNone, Unchecked.defaultof<_>)