Skip to content

Commit

Permalink
restore NodeCode subtly different semantics
Browse files Browse the repository at this point in the history
  • Loading branch information
majocha committed Feb 4, 2024
1 parent d84c21c commit 32975e2
Show file tree
Hide file tree
Showing 10 changed files with 77 additions and 48 deletions.
2 changes: 1 addition & 1 deletion src/Compiler/Driver/CompilerImports.fs
Expand Up @@ -2236,7 +2236,7 @@ and [<Sealed>] TcImports
let runMethod =
match tcConfig.parallelReferenceResolution with
| ParallelReferenceResolution.On -> Async.Parallel
| ParallelReferenceResolution.Off -> Async.Sequential
| ParallelReferenceResolution.Off -> Async.SequentialFailFast

let diagnosticsLogger = DiagnosticsThreadStatics.DiagnosticsLogger
let! results =
Expand Down
21 changes: 20 additions & 1 deletion src/Compiler/Facilities/BuildGraph.fs
Expand Up @@ -11,13 +11,32 @@ open Internal.Utilities.Library
[<AbstractClass; Sealed>]
type Async =
static member RunImmediateWithoutCancellation(computation) =
Async.RunImmediate(computation, CancellationToken.None)
try
let work = async { return! computation }

Async
.StartImmediateAsTask(work, cancellationToken = CancellationToken.None)
.Result

with :? AggregateException as ex when ex.InnerExceptions.Count = 1 ->
raise (ex.InnerExceptions[0])

static member FromCancellable(computation: Cancellable<'T>) = Cancellable.toAsync computation

static member StartAsTask_ForTesting(computation: Async<'T>, ?ct: CancellationToken) =
Async.StartAsTask(computation, cancellationToken = defaultArg ct CancellationToken.None)

static member SequentialFailFast(computations: Async<'T> seq) =
async {
let results = ResizeArray()

for computation in computations do
let! result = computation
results.Add result

return results.ToArray()
}

[<RequireQualifiedAccess>]
module GraphNode =

Expand Down
2 changes: 2 additions & 0 deletions src/Compiler/Facilities/BuildGraph.fsi
Expand Up @@ -21,6 +21,8 @@ type Async =
/// Only used for testing, do not use
static member StartAsTask_ForTesting: computation: Async<'T> * ?ct: CancellationToken -> Task<'T>

static member SequentialFailFast: computations: Async<'T> seq -> Async<'T array>

/// Contains helpers related to the build graph
[<RequireQualifiedAccess>]
module internal GraphNode =
Expand Down
4 changes: 3 additions & 1 deletion src/Compiler/Facilities/DiagnosticsLogger.fs
Expand Up @@ -543,7 +543,9 @@ let UseDiagnosticsLogger newLogger =
UseTransformedDiagnosticsLogger(fun _ -> newLogger)

let CaptureDiagnosticsConcurrently () =
let newLogger = ConcurrentCapturingDiagnosticsLogger("CaptureDiagnosticsConcurrently")
let newLogger =
ConcurrentCapturingDiagnosticsLogger("CaptureDiagnosticsConcurrently")

let oldLogger = DiagnosticsThreadStatics.DiagnosticsLogger
DiagnosticsThreadStatics.DiagnosticsLogger <- newLogger

Expand Down
3 changes: 2 additions & 1 deletion src/Compiler/Service/FSharpProjectSnapshot.fs
Expand Up @@ -19,6 +19,7 @@ open System.Runtime.CompilerServices
open FSharp.Compiler.Syntax
open FSharp.Compiler.Diagnostics
open FSharp.Compiler.DiagnosticsLogger
open FSharp.Compiler.BuildGraph

type internal ProjectIdentifier = string * string

Expand Down Expand Up @@ -567,7 +568,7 @@ and [<Experimental("This FCS API is experimental and subject to change.")>] FSha
async.Return
<| FSharpReferencedProjectSnapshot.ILModuleReference(outputName, getStamp, getReader))

|> Async.Sequential
|> Async.SequentialFailFast

let referencesOnDisk, otherOptions =
options.OtherOptions
Expand Down
4 changes: 2 additions & 2 deletions src/Compiler/Service/IncrementalBuild.fs
Expand Up @@ -740,12 +740,12 @@ module IncrementalBuilderHelpers =
let diagnosticsLogger = CompilationDiagnosticLogger("FinalizeTypeCheckTask", tcConfig.diagnosticsOptions)
use _ = new CompilationGlobalsScope(diagnosticsLogger, BuildPhase.TypeCheck)

let! computedBoundModels = boundModels |> Seq.map (fun g -> g.GetOrComputeValue()) |> Async.Sequential
let! computedBoundModels = boundModels |> Seq.map (fun g -> g.GetOrComputeValue()) |> Async.SequentialFailFast

let! tcInfos =
computedBoundModels
|> Seq.map (fun boundModel -> async { return! boundModel.GetOrComputeTcInfo() })
|> Async.Sequential
|> Async.SequentialFailFast

// tcInfoExtras can be computed in parallel. This will check any previously skipped implementation files in parallel, too.
let! latestImplFiles =
Expand Down
6 changes: 3 additions & 3 deletions src/Compiler/Service/TransparentCompiler.fs
Expand Up @@ -542,8 +542,8 @@ type internal TransparentCompiler
| FSharpReferencedProjectSnapshot.PEReference(getStamp, delayedReader) ->
{ new IProjectReference with
member x.EvaluateRawContents() =
node {
let! ilReaderOpt = delayedReader.TryGetILModuleReader() |> NodeCode.FromCancellable
async {
let! ilReaderOpt = delayedReader.TryGetILModuleReader() |> Cancellable.toAsync

match ilReaderOpt with
| Some ilReader ->
Expand All @@ -569,7 +569,7 @@ type internal TransparentCompiler
let data = RawFSharpAssemblyData(ilModuleDef, ilAsmRefs) :> IRawFSharpAssemblyData
return ProjectAssemblyDataResult.Available data
}
|> NodeCode.FromCancellable
|> Async.FromCancellable

member x.TryGetLogicalTimeStamp _ = getStamp () |> Some
member x.FileName = nm
Expand Down
6 changes: 5 additions & 1 deletion src/Compiler/Utilities/illib.fs
Expand Up @@ -160,11 +160,15 @@ module internal PervasiveAutoOpens =

static member RunImmediate(computation: Async<'T>, ?cancellationToken) =
let cancellationToken = defaultArg cancellationToken Async.DefaultCancellationToken
#if DEBUG
let ts = TaskCompletionSource<'T>()
let task = ts.Task

Async.StartWithContinuations(computation, (ts.SetResult), (ts.SetException), (fun _ -> ts.SetCanceled()), cancellationToken)

let task = ts.Task
#else
let task = Async.StartAsTask(computation, cancellationToken = cancellationToken)
#endif
task.Result

[<AbstractClass>]
Expand Down
Expand Up @@ -28,9 +28,9 @@ let waitUntil condition value =
}

let rec internal spinFor (duration: TimeSpan) =
node {
async {
let sw = Stopwatch.StartNew()
do! Async.Sleep 10 |> NodeCode.AwaitAsync
do! Async.Sleep 10
let remaining = duration - sw.Elapsed
if remaining > TimeSpan.Zero then
return! spinFor remaining
Expand Down Expand Up @@ -58,8 +58,8 @@ type internal EventRecorder<'a, 'b, 'c when 'a : equality and 'b : equality>(mem
[<Fact>]
let ``Basics``() =

let computation key = node {
do! Async.Sleep 1 |> NodeCode.AwaitAsync
let computation key = async {
do! Async.Sleep 1
return key * 2
}

Expand All @@ -77,8 +77,8 @@ let ``Basics``() =
memoize.Get'(3, computation 3)
memoize.Get'(2, computation 2)
}
|> NodeCode.Parallel
|> NodeCode.RunImmediateWithoutCancellation
|> Async.Parallel
|> Async.RunImmediateWithoutCancellation

let expected = [| 10; 10; 4; 10; 6; 4|]

Expand All @@ -95,7 +95,7 @@ let ``We can cancel a job`` () =

let jobStarted = new ManualResetEvent(false)

let computation action = node {
let computation action = async {
action() |> ignore
do! spinFor timeout
failwith "Should be canceled before it gets here"
Expand All @@ -110,13 +110,13 @@ let ``We can cancel a job`` () =

let key = 1

let _task1 = NodeCode.StartAsTask_ForTesting( memoize.Get'(key, computation jobStarted.Set), ct = cts1.Token)
let _task1 = Async.StartAsTask_ForTesting( memoize.Get'(key, computation jobStarted.Set), ct = cts1.Token)

waitFor jobStarted
jobStarted.Reset() |> ignore

let _task2 = NodeCode.StartAsTask_ForTesting( memoize.Get'(key, computation ignore), ct = cts2.Token)
let _task3 = NodeCode.StartAsTask_ForTesting( memoize.Get'(key, computation ignore), ct = cts3.Token)
let _task2 = Async.StartAsTask_ForTesting( memoize.Get'(key, computation ignore), ct = cts2.Token)
let _task3 = Async.StartAsTask_ForTesting( memoize.Get'(key, computation ignore), ct = cts3.Token)

do! waitUntil (events.CountOf Requested) 3

Expand Down Expand Up @@ -146,7 +146,7 @@ let ``Job is restarted if first requestor cancels`` () =

let jobCanComplete = new ManualResetEvent(false)

let computation key = node {
let computation key = async {
jobStarted.Set() |> ignore
waitFor jobCanComplete
return key * 2
Expand All @@ -162,13 +162,13 @@ let ``Job is restarted if first requestor cancels`` () =

let key = 1

let _task1 = NodeCode.StartAsTask_ForTesting( memoize.Get'(key, computation key), ct = cts1.Token)
let _task1 = Async.StartAsTask_ForTesting( memoize.Get'(key, computation key), ct = cts1.Token)

waitFor jobStarted
jobStarted.Reset() |> ignore

let _task2 = NodeCode.StartAsTask_ForTesting( memoize.Get'(key, computation key), ct = cts2.Token)
let _task3 = NodeCode.StartAsTask_ForTesting( memoize.Get'(key, computation key), ct = cts3.Token)
let _task2 = Async.StartAsTask_ForTesting( memoize.Get'(key, computation key), ct = cts2.Token)
let _task3 = Async.StartAsTask_ForTesting( memoize.Get'(key, computation key), ct = cts3.Token)

do! waitUntil (events.CountOf Requested) 3

Expand Down Expand Up @@ -197,7 +197,7 @@ let ``Job is restarted if first requestor cancels but keeps running if second re

let jobCanComplete = new ManualResetEvent(false)

let computation key = node {
let computation key = async {
jobStarted.Set() |> ignore
waitFor jobCanComplete
return key * 2
Expand All @@ -213,13 +213,13 @@ let ``Job is restarted if first requestor cancels but keeps running if second re

let key = 1

let _task1 = NodeCode.StartAsTask_ForTesting( memoize.Get'(key, computation key), ct = cts1.Token)
let _task1 = Async.StartAsTask_ForTesting( memoize.Get'(key, computation key), ct = cts1.Token)

waitFor jobStarted
jobStarted.Reset() |> ignore

let _task2 = NodeCode.StartAsTask_ForTesting( memoize.Get'(key, computation key), ct = cts2.Token)
let _task3 = NodeCode.StartAsTask_ForTesting( memoize.Get'(key, computation key), ct = cts3.Token)
let _task2 = Async.StartAsTask_ForTesting( memoize.Get'(key, computation key), ct = cts2.Token)
let _task3 = Async.StartAsTask_ForTesting( memoize.Get'(key, computation key), ct = cts3.Token)

do! waitUntil (events.CountOf Requested) 3

Expand Down Expand Up @@ -275,21 +275,21 @@ let ``Stress test`` () =
while (int s.ElapsedMilliseconds) < durationMs do
number <- number + 1 % 12345
return [result]
} |> NodeCode.AwaitAsync
}

let rec sleepyComputation durationMs result =
node {
async {
if rng.NextDouble() < (exceptionProbability / (float durationMs / float stepMs)) then
raise (ExpectedException())
if durationMs > 0 then
do! Async.Sleep (min stepMs durationMs) |> NodeCode.AwaitAsync
do! Async.Sleep (min stepMs durationMs)
return! sleepyComputation (durationMs - stepMs) result
else
return [result]
}

let rec mixedComputation durationMs result =
node {
async {
if durationMs > 0 then
if rng.NextDouble() < 0.5 then
let! _ = intenseComputation (min stepMs durationMs) ()
Expand Down Expand Up @@ -331,7 +331,7 @@ let ``Stress test`` () =
let result = key * 2
let job = cache.Get'(key, computation durationMs result)
let cts = new CancellationTokenSource()
let runningJob = NodeCode.StartAsTask_ForTesting(job, ct = cts.Token)
let runningJob = Async.StartAsTask_ForTesting(job, ct = cts.Token)
cts.CancelAfter timeoutMs
Interlocked.Increment &started |> ignore
try
Expand Down Expand Up @@ -385,7 +385,7 @@ let ``Cancel running jobs with the same key`` cancelDuplicate expectFinished =
let job2started = new ManualResetEvent(false)
let job2finished = new ManualResetEvent(false)

let work onStart onFinish = node {
let work onStart onFinish = async {
Interlocked.Increment &started |> ignore
onStart() |> ignore
waitFor jobCanContinue
Expand All @@ -400,7 +400,7 @@ let ``Cancel running jobs with the same key`` cancelDuplicate expectFinished =
member _.GetVersion() = 1
member _.GetLabel() = "key1" }

cache.Get(key1, work job1started.Set job1finished.Set) |> Async.AwaitNodeCode |> Async.Start
cache.Get(key1, work job1started.Set job1finished.Set) |> Async.Start

waitFor job1started

Expand All @@ -410,7 +410,7 @@ let ``Cancel running jobs with the same key`` cancelDuplicate expectFinished =
member _.GetVersion() = key1.GetVersion() + 1
member _.GetLabel() = "key2" }

cache.Get(key2, work job2started.Set job2finished.Set ) |> Async.AwaitNodeCode |> Async.Start
cache.Get(key2, work job2started.Set job2finished.Set ) |> Async.Start

waitFor job2started

Expand Down Expand Up @@ -438,18 +438,18 @@ let ``Preserve thread static diagnostics`` () =
let job1Cache = AsyncMemoize()
let job2Cache = AsyncMemoize()

let job1 (input: string) = node {
let! _ = Async.Sleep (rng.Next(1, 30)) |> NodeCode.AwaitAsync
let job1 (input: string) = async {
let! _ = Async.Sleep (rng.Next(1, 30))
let ex = DummyException("job1 error")
DiagnosticsThreadStatics.DiagnosticsLogger.ErrorR(ex)
return Ok input
}

let job2 (input: int) = node {
let job2 (input: int) = async {

DiagnosticsThreadStatics.DiagnosticsLogger.Warning(DummyException("job2 error 1"))

let! _ = Async.Sleep (rng.Next(1, 30)) |> NodeCode.AwaitAsync
let! _ = Async.Sleep (rng.Next(1, 30))

let key = { new ICacheKey<_, _> with
member _.GetKey() = "job1"
Expand Down Expand Up @@ -481,7 +481,7 @@ let ``Preserve thread static diagnostics`` () =
member _.GetVersion() = rng.Next(1, 10)
member _.GetLabel() = "job2" }

let! result = job2Cache.Get(key, job2 (i % 10)) |> Async.AwaitNodeCode
let! result = job2Cache.Get(key, job2 (i % 10))

let diagnostics = diagnosticsLogger.GetDiagnostics()

Expand Down Expand Up @@ -512,7 +512,7 @@ let ``Preserve thread static diagnostics already completed job`` () =
member _.GetVersion() = 1
member _.GetLabel() = "job1" }

let job (input: string) = node {
let job (input: string) = async {
let ex = DummyException($"job {input} error")
DiagnosticsThreadStatics.DiagnosticsLogger.ErrorR(ex)
return Ok input
Expand All @@ -524,8 +524,8 @@ let ``Preserve thread static diagnostics already completed job`` () =

use _ = new CompilationGlobalsScope(diagnosticsLogger, BuildPhase.Optimize)

let! _ = cache.Get(key, job "1" ) |> Async.AwaitNodeCode
let! _ = cache.Get(key, job "2" ) |> Async.AwaitNodeCode
let! _ = cache.Get(key, job "1" )
let! _ = cache.Get(key, job "2" )

let diagnosticMessages = diagnosticsLogger.GetDiagnostics() |> Array.map (fun (d, _) -> d.Exception.Message) |> Array.toList

Expand All @@ -545,9 +545,9 @@ let ``We get diagnostics from the job that failed`` () =
member _.GetVersion() = 1
member _.GetLabel() = "job1" }

let job (input: int) = node {
let job (input: int) = async {
let ex = DummyException($"job {input} error")
do! Async.Sleep 100 |> NodeCode.AwaitAsync
do! Async.Sleep 100
DiagnosticsThreadStatics.DiagnosticsLogger.Error(ex)
return 5
}
Expand All @@ -560,7 +560,7 @@ let ``We get diagnostics from the job that failed`` () =

use _ = new CompilationGlobalsScope(diagnosticsLogger, BuildPhase.Optimize)
try
let! _ = cache.Get(key, job i ) |> Async.AwaitNodeCode
let! _ = cache.Get(key, job i )
()
with _ ->
()
Expand Down
3 changes: 2 additions & 1 deletion tests/FSharp.Test.Utilities/ProjectGeneration.fs
Expand Up @@ -27,6 +27,7 @@ open System.Xml
open FSharp.Compiler.CodeAnalysis
open FSharp.Compiler.CodeAnalysis.ProjectSnapshot
open FSharp.Compiler.Diagnostics
open FSharp.Compiler.BuildGraph
open FSharp.Compiler.Text

open Xunit
Expand Down Expand Up @@ -792,7 +793,7 @@ module ProjectOperations =
let! projects =
p.DependsOn
|> Seq.map (absorbAutoGeneratedSignatures checker)
|> Async.Sequential
|> Async.SequentialFailFast
return
{ p with
SourceFiles = files
Expand Down

0 comments on commit 32975e2

Please sign in to comment.