Skip to content

Commit

Permalink
Introduce a conservative parallelization algorithm (semaphore instead…
Browse files Browse the repository at this point in the history
… of sync context)
  • Loading branch information
bradwilson committed Apr 18, 2024
1 parent e596742 commit 0cdf854
Show file tree
Hide file tree
Showing 24 changed files with 254 additions and 71 deletions.
29 changes: 29 additions & 0 deletions src/common/ParallelAlgorithm.cs
@@ -0,0 +1,29 @@
#if XUNIT_FRAMEWORK
namespace Xunit.Sdk
#else
namespace Xunit
#endif
{
/// <summary>
/// Indicates the parallelization algorithm to use.
/// </summary>
public enum ParallelAlgorithm
{
/// <summary>
/// The conservative parallelization algorithm uses a semaphore to limit the number of started tests to be equal
/// to the desired parallel thread count. This has the effect of allowing tests that have started to finish faster,
/// since there are no extra tests competing for a chance to run, at the expense that CPU utilization will be lowered
/// if the test project spaws a lot of async tests that have significant wait times.
/// </summary>
Conservative = 0,

/// <summary>
/// The aggressive parallelization algorithm uses a synchronization context to limit the number of running tests
/// to be equal to the desired parallel thread count. This has the effect of being able to use the CPU more
/// effectively since there are typically most tests capable of running than there are CPU cores, at the
/// expense of tests that have already started being put into the back of a long queue before they can run
/// again.
/// </summary>
Aggressive = 1,
}
}
1 change: 1 addition & 0 deletions src/common/TestOptionsNames.cs
Expand Up @@ -17,6 +17,7 @@ internal static class Execution
public static readonly string InternalDiagnosticMessages = "xunit.execution.InternalDiagnosticMessages";
public static readonly string DisableParallelization = "xunit.execution.DisableParallelization";
public static readonly string MaxParallelThreads = "xunit.execution.MaxParallelThreads";
public static readonly string ParallelAlgorithm = "xunit.execution.ParallelAlgorithm";
public static readonly string SynchronousMessageReporting = "xunit.execution.SynchronousMessageReporting";
}
}
12 changes: 12 additions & 0 deletions src/xunit.console/CommandLine.cs
Expand Up @@ -46,6 +46,8 @@ protected CommandLine(string[] args, Predicate<string> fileExists = null)

public XunitProject Project { get; protected set; }

public ParallelAlgorithm? ParallelAlgorithm { get; protected set; }

public bool? ParallelizeAssemblies { get; protected set; }

public bool? ParallelizeTestCollections { get; set; }
Expand Down Expand Up @@ -298,6 +300,16 @@ protected XunitProject Parse(Predicate<string> fileExists)
break;
}
}
else if (optionName == "parallelalgorithm")
{
if (option.Value == null)
throw new ArgumentException("missing argument for -parallelAlgorithm");

if (!Enum.TryParse(option.Value, ignoreCase: true, out ParallelAlgorithm parallelAlgorithm))
throw new ArgumentException("incorrect argument value for -parallelAlgorithm");

ParallelAlgorithm = parallelAlgorithm;
}
else if (optionName == "noshadow")
{
GuardNoOptionValue(option);
Expand Down
120 changes: 64 additions & 56 deletions src/xunit.console/ConsoleRunner.cs

Large diffs are not rendered by default.

Expand Up @@ -119,6 +119,24 @@ public static bool DiagnosticMessagesOrDefault(this ITestFrameworkExecutionOptio
return executionOptions.DiagnosticMessages() ?? false;
}

/// <summary>
/// Gets the parallel algorithm to be used.
/// </summary>
public static ParallelAlgorithm? ParallelAlgorithm(this ITestFrameworkExecutionOptions executionOptions)
{
var parallelAlgorithmString = executionOptions.GetValue<string>(TestOptionsNames.Execution.ParallelAlgorithm);
return parallelAlgorithmString != null ? (ParallelAlgorithm?)Enum.Parse(typeof(ParallelAlgorithm), parallelAlgorithmString) : null;
}

/// <summary>
/// Gets the parallel algorithm to be used. If the flag is not present, return the default
/// value (<see cref="ParallelAlgorithm.Conservative"/>).
/// </summary>
public static ParallelAlgorithm ParallelAlgorithmOrDefault(this ITestFrameworkExecutionOptions executionOptions)
{
return executionOptions.ParallelAlgorithm() ?? Xunit.Sdk.ParallelAlgorithm.Conservative;
}

/// <summary>
/// Gets a flag to disable parallelization.
/// </summary>
Expand Down
Expand Up @@ -2,14 +2,17 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Reflection;
using System.Runtime.Versioning;
using System.Threading;
using System.Threading.Tasks;
using Xunit.Abstractions;

#if NETFRAMEWORK
using System.IO;
#endif

namespace Xunit.Sdk
{
/// <summary>
Expand Down
Expand Up @@ -18,7 +18,9 @@ public class XunitTestAssemblyRunner : TestAssemblyRunner<IXunitTestCase>
bool disableParallelization;
bool initialized;
int maxParallelThreads;
ParallelAlgorithm parallelAlgorithm;
SynchronizationContext originalSyncContext;
SemaphoreSlim parallelSemaphore;
MaxConcurrencySyncContext syncContext;

/// <summary>
Expand Down Expand Up @@ -102,6 +104,8 @@ protected void Initialize()
if (maxParallelThreads == 0)
maxParallelThreads = Environment.ProcessorCount;

parallelAlgorithm = ExecutionOptions.ParallelAlgorithmOrDefault();

var testCaseOrdererAttribute = TestAssembly.Assembly.GetCustomAttributes(typeof(TestCaseOrdererAttribute)).SingleOrDefault();
if (testCaseOrdererAttribute != null)
{
Expand Down Expand Up @@ -191,7 +195,10 @@ protected override async Task<RunSummary> RunTestCollectionsAsync(IMessageBus me
if (disableParallelization)
return await base.RunTestCollectionsAsync(messageBus, cancellationTokenSource);

SetupSyncContext(maxParallelThreads);
if (parallelAlgorithm == ParallelAlgorithm.Aggressive)
SetupSyncContext(maxParallelThreads);
else
parallelSemaphore = new SemaphoreSlim(maxParallelThreads);

Func<Func<Task<RunSummary>>, Task<RunSummary>> taskRunner;
if (SynchronizationContext.Current != null)
Expand Down Expand Up @@ -259,7 +266,19 @@ protected override async Task<RunSummary> RunTestCollectionsAsync(IMessageBus me

/// <inheritdoc/>
protected override Task<RunSummary> RunTestCollectionAsync(IMessageBus messageBus, ITestCollection testCollection, IEnumerable<IXunitTestCase> testCases, CancellationTokenSource cancellationTokenSource)
=> new XunitTestCollectionRunner(testCollection, testCases, DiagnosticMessageSink, messageBus, TestCaseOrderer, new ExceptionAggregator(Aggregator), cancellationTokenSource).RunAsync();
{

parallelSemaphore?.Wait();

try
{
return new XunitTestCollectionRunner(testCollection, testCases, DiagnosticMessageSink, messageBus, TestCaseOrderer, new ExceptionAggregator(Aggregator), cancellationTokenSource).RunAsync();
}
finally
{
parallelSemaphore?.Release();
}
}

[SecuritySafeCritical]
static void SetSynchronizationContext(SynchronizationContext context)
Expand Down
1 change: 1 addition & 0 deletions src/xunit.execution/xunit.execution.csproj
Expand Up @@ -20,6 +20,7 @@
<Compile Include="..\common\LongLivedMarshalByRefObject.cs" LinkBase="Common" />
<Compile Include="..\common\NewReflectionExtensions.cs" LinkBase="Common" />
<Compile Include="..\common\NullMessageSink.cs" LinkBase="Common" />
<Compile Include="..\common\ParallelAlgorithm.cs" LinkBase="Common" />
<Compile Include="..\common\SerializationHelper.cs" LinkBase="Common" />
<Compile Include="..\common\SourceInformation.cs" LinkBase="Common" />
<Compile Include="..\common\TestOptionsNames.cs" LinkBase="Common" />
Expand Down
Expand Up @@ -62,6 +62,7 @@ public static TestAssemblyConfiguration Load(string assemblyFileName, string con
result.MaxParallelThreads = GetInt(settings, Configuration.MaxParallelThreads) ?? result.MaxParallelThreads;
result.MethodDisplay = GetEnum<TestMethodDisplay>(settings, Configuration.MethodDisplay) ?? result.MethodDisplay;
result.MethodDisplayOptions = GetEnum<TestMethodDisplayOptions>(settings, Configuration.MethodDisplayOptions) ?? result.MethodDisplayOptions;
result.ParallelAlgorithm = GetEnum<ParallelAlgorithm>(settings, Configuration.ParallelAlgorithm) ?? result.ParallelAlgorithm;
result.ParallelizeAssembly = GetBoolean(settings, Configuration.ParallelizeAssembly) ?? result.ParallelizeAssembly;
result.ParallelizeTestCollections = GetBoolean(settings, Configuration.ParallelizeTestCollections) ?? result.ParallelizeTestCollections;
result.PreEnumerateTheories = GetBoolean(settings, Configuration.PreEnumerateTheories) ?? result.PreEnumerateTheories;
Expand Down Expand Up @@ -134,6 +135,7 @@ static class Configuration
public const string MaxParallelThreads = "xunit.maxParallelThreads";
public const string MethodDisplay = "xunit.methodDisplay";
public const string MethodDisplayOptions = "xunit.methodDisplayOptions";
public const string ParallelAlgorithm = "xunit.parallelAlgorithm";
public const string ParallelizeAssembly = "xunit.parallelizeAssembly";
public const string ParallelizeTestCollections = "xunit.parallelizeTestCollections";
public const string PreEnumerateTheories = "xunit.preEnumerateTheories";
Expand Down
14 changes: 14 additions & 0 deletions src/xunit.runner.utility/Configuration/ConfigReader_Json.cs
Expand Up @@ -160,6 +160,19 @@ static TestAssemblyConfiguration LoadConfiguration(Stream configStream, string c
catch { }
}
}
else if (string.Equals(propertyName, Configuration.ParallelAlgorithm, StringComparison.OrdinalIgnoreCase))
{
var stringValue = propertyValue as JsonString;
if (stringValue != null)
{
try
{
var parallelAlgorithm = Enum.Parse(typeof(ParallelAlgorithm), stringValue, true);
result.ParallelAlgorithm = (ParallelAlgorithm)parallelAlgorithm;
}
catch { }
}
}
else if (string.Equals(propertyName, Configuration.AppDomain, StringComparison.OrdinalIgnoreCase))
{
var stringValue = propertyValue as JsonString;
Expand Down Expand Up @@ -237,6 +250,7 @@ static class Configuration
public const string MaxParallelThreads = "maxParallelThreads";
public const string MethodDisplay = "methodDisplay";
public const string MethodDisplayOptions = "methodDisplayOptions";
public const string ParallelAlgorithm = "parallelAlgorithm";
public const string ParallelizeAssembly = "parallelizeAssembly";
public const string ParallelizeTestCollections = "parallelizeTestCollections";
public const string PreEnumerateTheories = "preEnumerateTheories";
Expand Down
Expand Up @@ -242,6 +242,24 @@ public static int GetMaxParallelThreadsOrDefault(this ITestFrameworkExecutionOpt
return result.GetValueOrDefault();
}

/// <summary>
/// Gets the parallel algorithm to be used.
/// </summary>
public static ParallelAlgorithm? GetParallelAlgorithm(this ITestFrameworkExecutionOptions executionOptions)
{
var parallelAlgorithmString = executionOptions.GetValue<string>(TestOptionsNames.Execution.ParallelAlgorithm);
return parallelAlgorithmString != null ? (ParallelAlgorithm?)Enum.Parse(typeof(ParallelAlgorithm), parallelAlgorithmString) : null;
}

/// <summary>
/// Gets the parallel algorithm to be used. If the flag is not present, return the default
/// value (<see cref="ParallelAlgorithm.Conservative"/>).
/// </summary>
public static ParallelAlgorithm GetParallelAlgorithmOrDefault(this ITestFrameworkExecutionOptions executionOptions)
{
return executionOptions.GetParallelAlgorithm() ?? ParallelAlgorithm.Conservative;
}

/// <summary>
/// Gets a flag that determines whether xUnit.net stop testing when a test fails.
/// </summary>
Expand Down Expand Up @@ -292,6 +310,14 @@ public static void SetInternalDiagnosticMessages(this ITestFrameworkExecutionOpt
executionOptions.SetValue(TestOptionsNames.Execution.InternalDiagnosticMessages, value);
}

/// <summary>
/// Sets the parallel algorith to be used.
/// </summary>
public static void SetParallelAlgorithm(this ITestFrameworkExecutionOptions executionOptions, ParallelAlgorithm? value)
{
executionOptions.SetValue(TestOptionsNames.Execution.ParallelAlgorithm, value.HasValue ? value.GetValueOrDefault().ToString() : null);
}

/// <summary>
/// Sets a flag that determines whether xUnit.net stop testing when a test fails.
/// </summary>
Expand Down
10 changes: 10 additions & 0 deletions src/xunit.runner.utility/Frameworks/TestAssemblyConfiguration.cs
Expand Up @@ -110,6 +110,16 @@ public int MaxParallelThreadsOrDefault
/// </summary>
public TestMethodDisplayOptions MethodDisplayOptionsOrDefault { get { return MethodDisplayOptions ?? TestMethodDisplayOptions.None; } }

/// <summary>
/// Gets or sets the algorithm to be used for parallelization.
/// </summary>
public ParallelAlgorithm? ParallelAlgorithm { get; set; }

/// <summary>
/// Gets or sets the algorithm to be used for parallelization.
/// </summary>
public ParallelAlgorithm ParallelAlgorithmOrDefault { get { return ParallelAlgorithm ?? Xunit.ParallelAlgorithm.Conservative; } }

/// <summary>
/// Gets or sets a flag indicating that this assembly is safe to parallelize against
/// other assemblies.
Expand Down
Expand Up @@ -52,6 +52,7 @@ public static ITestFrameworkExecutionOptions ForExecution(TestAssemblyConfigurat
{
result.SetDiagnosticMessages(configuration.DiagnosticMessages);
result.SetInternalDiagnosticMessages(configuration.InternalDiagnosticMessages);
result.SetParallelAlgorithm(configuration.ParallelAlgorithm);
result.SetDisableParallelization(!configuration.ParallelizeTestCollections);
result.SetMaxParallelThreads(configuration.MaxParallelThreads);
result.SetStopOnTestFail(configuration.StopOnFail);
Expand Down
Expand Up @@ -230,14 +230,16 @@ protected override bool Visit(ITestAssemblyExecutionStarting executionStarting)
if (executionStarting.ExecutionOptions.GetDiagnosticMessagesOrDefault())
{
var threadCount = executionStarting.ExecutionOptions.GetMaxParallelThreadsOrDefault();
var parallelAlgorithm = executionStarting.ExecutionOptions.GetParallelAlgorithmOrDefault();
var parallelTestCollections =
executionStarting.ExecutionOptions.GetDisableParallelizationOrDefault()
? "off"
: string.Format(
CultureInfo.CurrentCulture,
"on [{0} thread{1}]",
"on [{0} thread{1}{2}]",
threadCount < 0 ? "unlimited" : threadCount.ToString(CultureInfo.CurrentCulture),
threadCount == 1 ? string.Empty : "s"
threadCount == 1 ? string.Empty : "s",
parallelAlgorithm == ParallelAlgorithm.Aggressive ? "/aggressive" : string.Empty
);

Logger.LogImportantMessage(
Expand Down
Expand Up @@ -252,14 +252,16 @@ protected virtual void HandleTestAssemblyExecutionStarting(MessageHandlerArgs<IT
if (executionStarting.ExecutionOptions.GetDiagnosticMessagesOrDefault())
{
var threadCount = executionStarting.ExecutionOptions.GetMaxParallelThreadsOrDefault();
var parallelAlgorithm = executionStarting.ExecutionOptions.GetParallelAlgorithmOrDefault();
var parallelTestCollections =
executionStarting.ExecutionOptions.GetDisableParallelizationOrDefault()
? "off"
: string.Format(
CultureInfo.CurrentCulture,
"on [{0} thread{1}]",
"on [{0} thread{1}{2}]",
threadCount < 0 ? "unlimited" : threadCount.ToString(CultureInfo.CurrentCulture),
threadCount == 1 ? string.Empty : "s"
threadCount == 1 ? string.Empty : "s",
parallelAlgorithm == ParallelAlgorithm.Aggressive ? "/aggressive" : string.Empty
);

Logger.LogImportantMessage(
Expand Down

0 comments on commit 0cdf854

Please sign in to comment.