Skip to content

Commit

Permalink
Introduce a conservative parallelization algorithm (semaphore instead…
Browse files Browse the repository at this point in the history
… of sync context)
  • Loading branch information
bradwilson committed Apr 25, 2024
1 parent 7c8978d commit ca1afcc
Show file tree
Hide file tree
Showing 25 changed files with 256 additions and 47 deletions.
2 changes: 1 addition & 1 deletion src/Versions.props
Expand Up @@ -37,7 +37,7 @@
<SystemMemoryVersion>4.5.5</SystemMemoryVersion>
<XunitAbstractionsVersion>2.0.3</XunitAbstractionsVersion>
<XunitV1Version>1.9.2</XunitV1Version>
<XunitV2Version>2.7.2-pre.2</XunitV2Version>
<XunitV2Version>2.7.2-pre.21</XunitV2Version>

</PropertyGroup>

Expand Down
2 changes: 2 additions & 0 deletions src/common.tests/TestDoubles/TestData.cs
Expand Up @@ -295,6 +295,7 @@ public static class TestData
ExplicitOption? explicitOption = null,
bool internalDiagnosticMessages = false,
int maxParallelThreads = 2600,
ParallelAlgorithm? parallelAlgorithm = null,
bool? parallelizeTestCollections = null,
bool? stopOnFail = null,
int? seed = null,
Expand All @@ -315,6 +316,7 @@ public static class TestData
ExplicitOption = explicitOption,
InternalDiagnosticMessages = internalDiagnosticMessages,
MaxParallelThreads = maxParallelThreads,
ParallelAlgorithm = parallelAlgorithm,
ParallelizeTestCollections = parallelizeTestCollections,
StopOnFail = stopOnFail,
});
Expand Down
Expand Up @@ -7,6 +7,7 @@
using Xunit;
using Xunit.Abstractions;
using Xunit.Sdk;
using ParallelAlgorithm = Xunit.ParallelAlgorithm;

public class XunitTestAssemblyRunnerTests
{
Expand Down Expand Up @@ -165,12 +166,13 @@ public static void TestOptionsOverrideAttribute()
public class RunAsync
{
[Fact]
public static async void Parallel_SingleThread()
public static async void Parallel_SingleThread_Aggressive()
{
var passing = Mocks.XunitTestCase<ClassUnderTest>("Passing");
var other = Mocks.XunitTestCase<ClassUnderTest>("Other");
var options = TestFrameworkOptions.ForExecution();
options.SetMaxParallelThreads(1);
options.SetParallelAlgorithm(ParallelAlgorithm.Aggressive);
var runner = TestableXunitTestAssemblyRunner.Create(testCases: new[] { passing, other }, executionOptions: options);

await runner.RunAsync();
Expand Down
2 changes: 2 additions & 0 deletions src/xunit.v3.common/Internal/TestOptionsNames.cs
Expand Up @@ -44,6 +44,8 @@ public static class Execution
/// <summary/>
public static readonly string MaxParallelThreads = "xunit.execution.MaxParallelThreads";
/// <summary/>
public static readonly string ParallelAlgorithm = "xunit.execution.ParallelAlgorithm";
/// <summary/>
public static readonly string Seed = "xunit.execution.Seed";
/// <summary/>
public static readonly string SynchronousMessageReporting = "xunit.execution.SynchronousMessageReporting";
Expand Down
24 changes: 24 additions & 0 deletions src/xunit.v3.common/Options/ParallelAlgorithm.cs
@@ -0,0 +1,24 @@
namespace Xunit.Sdk;

/// <summary>
/// Indicates the parallelization algorithm to use.
/// </summary>
public enum ParallelAlgorithm
{
/// <summary>
/// The conservative parallelization algorithm uses a semaphore to limit the number of started tests to be equal
/// to the desired parallel thread count. This has the effect of allowing tests that have started to finish faster,
/// since there are no extra tests competing for a chance to run, at the expense that CPU utilization will be lowered
/// if the test project spaws a lot of async tests that have significant wait times.
/// </summary>
Conservative = 0,

/// <summary>
/// The aggressive parallelization algorithm uses a synchronization context to limit the number of running tests
/// to be equal to the desired parallel thread count. This has the effect of being able to use the CPU more
/// effectively since there are typically most tests capable of running than there are CPU cores, at the
/// expense of tests that have already started being put into the back of a long queue before they can run
/// again.
/// </summary>
Aggressive = 1,
}
Expand Up @@ -51,25 +51,33 @@ public static async ValueTask Attribute_NonParallel()
Assert.EndsWith("[collection-per-class, non-parallel]", result);
}

[Fact]
public static async ValueTask Attribute_MaxThreads()
[Theory]
[InlineData(1, null, "1 thread")]
[InlineData(3, ParallelAlgorithm.Conservative, "3 threads")]
[InlineData(42, ParallelAlgorithm.Aggressive, "42 threads/aggressive")]
public static async ValueTask Attribute_MaxThreads(
int maxThreads,
ParallelAlgorithm? parallelAlgorithm,
string expected)
{
var attribute = Mocks.CollectionBehaviorAttribute(maxParallelThreads: 3);
var attribute = Mocks.CollectionBehaviorAttribute(maxParallelThreads: maxThreads);
var assembly = Mocks.TestAssembly("assembly.dll", assemblyAttributes: new[] { attribute });
await using var context = TestableXunitTestAssemblyRunnerContext.Create(assembly: assembly);
var options = _TestFrameworkOptions.ForExecution(parallelAlgorithm: parallelAlgorithm);
await using var context = TestableXunitTestAssemblyRunnerContext.Create(assembly: assembly, executionOptions: options);
await context.InitializeAsync();

var result = context.TestFrameworkEnvironment;

Assert.EndsWith("[collection-per-class, parallel (3 threads)]", result);
Assert.EndsWith($"[collection-per-class, parallel ({expected})]", result);
}

[Fact]
public static async ValueTask Attribute_Unlimited()
{
var attribute = Mocks.CollectionBehaviorAttribute(maxParallelThreads: -1);
var assembly = Mocks.TestAssembly("assembly.dll", assemblyAttributes: new[] { attribute });
await using var context = TestableXunitTestAssemblyRunnerContext.Create(assembly: assembly);
var options = _TestFrameworkOptions.ForExecution(parallelAlgorithm: ParallelAlgorithm.Aggressive); // Shouldn't show for unlimited threads
await using var context = TestableXunitTestAssemblyRunnerContext.Create(assembly: assembly, executionOptions: options);
await context.InitializeAsync();

var result = context.TestFrameworkEnvironment;
Expand Down
Expand Up @@ -13,13 +13,17 @@ public class XunitTestAssemblyRunnerTests
{
public class RunAsync
{
// This test is forced to use the aggressive algorithm so that we know we're running in a thread pool with
// a single thread. The default conserative algorithm runs in the .NET Thread Pool, so our async continuation
// could end up on any thread, despite the fact that are limited to running one test at a time.
[Fact]
public static async ValueTask Parallel_SingleThread()
public static async ValueTask Parallel_SingleThread_Aggressive()
{
var passing = TestData.XunitTestCase<ClassUnderTest>("Passing");
var other = TestData.XunitTestCase<ClassUnderTest>("Other");
var options = _TestFrameworkOptions.ForExecution();
options.SetMaxParallelThreads(1);
options.SetParallelAlgorithm(ParallelAlgorithm.Aggressive);
var runner = TestableXunitTestAssemblyRunner.Create(testCases: new[] { passing, other }, executionOptions: options);

await runner.RunAsync();
Expand Down
18 changes: 18 additions & 0 deletions src/xunit.v3.core/Extensions/TestFrameworkOptionsReadExtensions.cs
Expand Up @@ -239,6 +239,24 @@ public static int MaxParallelThreadsOrDefault(this _ITestFrameworkExecutionOptio
return result.GetValueOrDefault();
}

/// <summary>
/// Gets the parallel algorithm to be used.
/// </summary>
public static ParallelAlgorithm? ParallelAlgorithm(this _ITestFrameworkExecutionOptions executionOptions)
{
Guard.ArgumentNotNull(executionOptions);

var parallelAlgorithmString = executionOptions.GetValue<string>(TestOptionsNames.Execution.ParallelAlgorithm);
return parallelAlgorithmString != null ? (ParallelAlgorithm?)Enum.Parse(typeof(ParallelAlgorithm), parallelAlgorithmString) : null;
}

/// <summary>
/// Gets the parallel algorithm to be used. If the flag is not present, return the default
/// value (<see cref="ParallelAlgorithm.Conservative"/>).
/// </summary>
public static ParallelAlgorithm ParallelAlgorithmOrDefault(this _ITestFrameworkExecutionOptions executionOptions) =>
ParallelAlgorithm(executionOptions) ?? Sdk.ParallelAlgorithm.Conservative;

/// <summary>
/// Gets the value that should be used to seed randomness.
/// </summary>
Expand Down
15 changes: 2 additions & 13 deletions src/xunit.v3.core/Sdk/v3/Runners/XunitTestAssemblyRunner.cs
@@ -1,7 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using Xunit.Internal;
Expand Down Expand Up @@ -54,7 +53,6 @@ protected override async ValueTask BeforeTestAssemblyFinishedAsync(XunitTestAsse
await base.BeforeTestAssemblyFinishedAsync(ctxt);
}


/// <inheritdoc/>
protected override ITestCaseOrderer GetTestCaseOrderer(XunitTestAssemblyRunnerContext ctxt) =>
Guard.ArgumentNotNull(ctxt).AssemblyTestCaseOrderer ?? base.GetTestCaseOrderer(ctxt);
Expand Down Expand Up @@ -97,7 +95,7 @@ protected override async ValueTask<RunSummary> RunTestCollectionsAsync(XunitTest
if (ctxt.DisableParallelization)
return await base.RunTestCollectionsAsync(ctxt);

ctxt.SetupMaxConcurrencySyncContext();
ctxt.SetupParallelism();

Func<Func<ValueTask<RunSummary>>, ValueTask<RunSummary>> taskRunner;
if (SynchronizationContext.Current is not null)
Expand Down Expand Up @@ -162,15 +160,6 @@ protected override async ValueTask<RunSummary> RunTestCollectionsAsync(XunitTest
Guard.ArgumentNotNull(testCollection);
Guard.ArgumentNotNull(testCases);

return XunitTestCollectionRunner.Instance.RunAsync(
testCollection,
testCases,
ctxt.ExplicitOption,
ctxt.MessageBus,
GetTestCaseOrderer(ctxt),
ctxt.Aggregator.Clone(),
ctxt.CancellationTokenSource,
ctxt.AssemblyFixtureMappings
);
return ctxt.RunTestCollectionAsync(testCollection, testCases, GetTestCaseOrderer(ctxt));
}
}
74 changes: 66 additions & 8 deletions src/xunit.v3.core/Sdk/v3/Runners/XunitTestAssemblyRunnerContext.cs
Expand Up @@ -16,6 +16,7 @@ namespace Xunit.v3;
public class XunitTestAssemblyRunnerContext : TestAssemblyRunnerContext<IXunitTestCase>
{
_IAttributeInfo? collectionBehaviorAttribute;
SemaphoreSlim? parallelSemaphore;
MaxConcurrencySyncContext? syncContext;

/// <summary>
Expand Down Expand Up @@ -56,6 +57,11 @@ public class XunitTestAssemblyRunnerContext : TestAssemblyRunnerContext<IXunitTe
/// </summary>
public int MaxParallelThreads { get; private set; }

/// <summary>
/// Gets the algorithm used for parallelism.
/// </summary>
public ParallelAlgorithm ParallelAlgorithm { get; private set; }

/// <inheritdoc/>
public override string TestFrameworkDisplayName =>
XunitTestFrameworkDiscoverer.DisplayName;
Expand All @@ -69,18 +75,21 @@ public override string TestFrameworkEnvironment
ExtensibilityPointFactory.GetXunitTestCollectionFactory(collectionBehaviorAttribute, TestAssembly)
?? new CollectionPerClassTestCollectionFactory(TestAssembly);

var threadCountText = MaxParallelThreads < 0 ? "unlimited" : MaxParallelThreads.ToString(CultureInfo.CurrentCulture);
threadCountText += " thread";
if (MaxParallelThreads != 1)
threadCountText += 's';
if (MaxParallelThreads > 0 && ParallelAlgorithm == ParallelAlgorithm.Aggressive)
threadCountText += "/aggressive";

return string.Format(
CultureInfo.CurrentCulture,
"{0} [{1}, {2}]",
base.TestFrameworkEnvironment,
testCollectionFactory.DisplayName,
DisableParallelization
? "non-parallel"
: string.Format(
CultureInfo.CurrentCulture,
"parallel ({0} threads)",
MaxParallelThreads < 0 ? "unlimited" : MaxParallelThreads.ToString(CultureInfo.CurrentCulture)
)
: string.Format(CultureInfo.CurrentCulture, "parallel ({0})", threadCountText)
);
}
}
Expand All @@ -95,6 +104,8 @@ public override async ValueTask DisposeAsync()
else if (syncContext is IDisposable disposable)
disposable.Dispose();

parallelSemaphore?.Dispose();

await base.DisposeAsync();
}

Expand All @@ -110,6 +121,7 @@ public override async ValueTask InitializeAsync()
MaxParallelThreads = collectionBehaviorAttribute.GetNamedArgument<int>(nameof(CollectionBehaviorAttribute.MaxParallelThreads));
}

ParallelAlgorithm = ExecutionOptions.ParallelAlgorithm() ?? ParallelAlgorithm;
DisableParallelization = ExecutionOptions.DisableParallelization() ?? DisableParallelization;
MaxParallelThreads = ExecutionOptions.MaxParallelThreads() ?? MaxParallelThreads;
if (MaxParallelThreads == 0)
Expand Down Expand Up @@ -175,15 +187,61 @@ public override async ValueTask InitializeAsync()
}

/// <summary>
/// Sets up the sync context needed for limiting maximum concurrency, if so configured.
/// Delegation of <see cref="XunitTestAssemblyRunner.RunTestCollectionAsync"/> that properly obeys the parallel
/// algorithm requirements.
/// </summary>
public virtual void SetupMaxConcurrencySyncContext()
public async ValueTask<RunSummary> RunTestCollectionAsync(
_ITestCollection testCollection,
IReadOnlyCollection<IXunitTestCase> testCases,
ITestCaseOrderer testCaseOrderer)
{
if (MaxConcurrencySyncContext.IsSupported && MaxParallelThreads > 0)
if (parallelSemaphore is not null)
await parallelSemaphore.WaitAsync(CancellationTokenSource.Token);

try
{
return await XunitTestCollectionRunner.Instance.RunAsync(
testCollection,
testCases,
ExplicitOption,
MessageBus,
testCaseOrderer,
Aggregator.Clone(),
CancellationTokenSource,
AssemblyFixtureMappings
);
}
finally
{
parallelSemaphore?.Release();
}
}

/// <summary>
/// Sets up the mechanics for parallelism.
/// </summary>
public virtual void SetupParallelism()
{
// When unlimited, we just launch everything and let the .NET Thread Pool sort it out
if (MaxParallelThreads < 0)
return;

// For aggressive, we launch everything and let our sync context limit what's allowed to run
if (ParallelAlgorithm == ParallelAlgorithm.Aggressive && MaxConcurrencySyncContext.IsSupported)
{
syncContext = new MaxConcurrencySyncContext(MaxParallelThreads);
SetupSyncContextInternal(syncContext);
}
// For conversative, we use a semaphore to limit the number of launched tests, and ensure
// that the .NET Thread Pool has enough threads based on the user's requested maximum
else
{
parallelSemaphore = new(initialCount: MaxParallelThreads);

ThreadPool.GetMinThreads(out var minThreads, out var minIOPorts);
if (minThreads < MaxParallelThreads)
ThreadPool.SetMinThreads(MaxParallelThreads, minIOPorts);
}
}

[SecuritySafeCritical]
Expand Down
Expand Up @@ -298,22 +298,25 @@ public static void LogsMessage()
public class OnMessage_TestAssemblyExecutionStarting
{
[Theory]
[InlineData(false, null, null, null, null, null, "[Imp] => Starting: test-assembly")]
[InlineData(true, false, null, null, null, null, "[Imp] => Starting: test-assembly (parallel test collections = off, stop on fail = off, explicit = only)")]
[InlineData(true, null, -1, null, null, null, "[Imp] => Starting: test-assembly (parallel test collections = on [unlimited threads], stop on fail = off, explicit = only)")]
[InlineData(true, null, 1, null, null, null, "[Imp] => Starting: test-assembly (parallel test collections = on [1 thread], stop on fail = off, explicit = only)")]
[InlineData(true, null, null, true, null, null, "[Imp] => Starting: test-assembly (parallel test collections = on [42 threads], stop on fail = on, explicit = only)")]
[InlineData(true, null, null, null, null, null, "[Imp] => Starting: test-assembly (parallel test collections = on [42 threads], stop on fail = off, explicit = only)")]
[InlineData(true, null, null, null, 2112, null, "[Imp] => Starting: test-assembly (parallel test collections = on [42 threads], stop on fail = off, explicit = only, seed = 2112)")]
[InlineData(true, null, null, null, null, "", "[Imp] => Starting: test-assembly (parallel test collections = on [42 threads], stop on fail = off, explicit = only, culture = invariant)")]
[InlineData(true, null, null, null, null, "en-US", "[Imp] => Starting: test-assembly (parallel test collections = on [42 threads], stop on fail = off, explicit = only, culture = en-US)")]
[InlineData(false, null, null, null, null, null, null, "[Imp] => Starting: test-assembly")]
[InlineData(true, false, null, null, null, null, ParallelAlgorithm.Aggressive, "[Imp] => Starting: test-assembly (parallel test collections = off, stop on fail = off, explicit = only)")]
[InlineData(true, null, -1, null, null, null, ParallelAlgorithm.Conservative, "[Imp] => Starting: test-assembly (parallel test collections = on [unlimited threads], stop on fail = off, explicit = only)")]
[InlineData(true, null, -1, null, null, null, ParallelAlgorithm.Aggressive, "[Imp] => Starting: test-assembly (parallel test collections = on [unlimited threads], stop on fail = off, explicit = only)")]
[InlineData(true, null, 1, null, null, null, ParallelAlgorithm.Conservative, "[Imp] => Starting: test-assembly (parallel test collections = on [1 thread], stop on fail = off, explicit = only)")]
[InlineData(true, null, 1, null, null, null, ParallelAlgorithm.Aggressive, "[Imp] => Starting: test-assembly (parallel test collections = on [1 thread/aggressive], stop on fail = off, explicit = only)")]
[InlineData(true, null, null, true, null, null, null, "[Imp] => Starting: test-assembly (parallel test collections = on [42 threads], stop on fail = on, explicit = only)")]
[InlineData(true, null, null, null, null, null, null, "[Imp] => Starting: test-assembly (parallel test collections = on [42 threads], stop on fail = off, explicit = only)")]
[InlineData(true, null, null, null, 2112, null, null, "[Imp] => Starting: test-assembly (parallel test collections = on [42 threads], stop on fail = off, explicit = only, seed = 2112)")]
[InlineData(true, null, null, null, null, "", null, "[Imp] => Starting: test-assembly (parallel test collections = on [42 threads], stop on fail = off, explicit = only, culture = invariant)")]
[InlineData(true, null, null, null, null, "en-US", null, "[Imp] => Starting: test-assembly (parallel test collections = on [42 threads], stop on fail = off, explicit = only, culture = en-US)")]
public static void LogsMessage(
bool diagnosticMessages,
bool? parallelizeTestCollections,
int? maxThreads,
bool? stopOnFail,
int? seed,
string? culture,
ParallelAlgorithm? parallelAlgorithm,
string expectedResult)
{
var message = TestData.TestAssemblyExecutionStarting(
Expand All @@ -322,6 +325,7 @@ public class OnMessage_TestAssemblyExecutionStarting
maxParallelThreads: maxThreads ?? 42,
stopOnFail: stopOnFail,
explicitOption: ExplicitOption.Only,
parallelAlgorithm: parallelAlgorithm,
seed: seed,
culture: culture
);
Expand Down

0 comments on commit ca1afcc

Please sign in to comment.