diff --git a/src/core/Akka.Streams.TestKit/ChainSetup.cs b/src/core/Akka.Streams.TestKit/ChainSetup.cs index 6cafa27b8e1..635a6414924 100644 --- a/src/core/Akka.Streams.TestKit/ChainSetup.cs +++ b/src/core/Akka.Streams.TestKit/ChainSetup.cs @@ -80,6 +80,7 @@ public class ChainSetup public ActorMaterializerSettings Settings { get; } + public ActorMaterializer Materializer => _materializer; public TestPublisher.ManualProbe Upstream { get diff --git a/src/core/Akka.Streams.TestKit/ScriptedTest.cs b/src/core/Akka.Streams.TestKit/ScriptedTest.cs index f145b5a54c5..0bf70a76ca3 100644 --- a/src/core/Akka.Streams.TestKit/ScriptedTest.cs +++ b/src/core/Akka.Streams.TestKit/ScriptedTest.cs @@ -202,11 +202,6 @@ public void Request(int demand) _outstandingDemand += demand; } - [Obsolete("Will be removed after async_testkit conversion is done. Use ShakeItAsync instead")] - public bool ShakeIt() - => ShakeItAsync() - .ConfigureAwait(false).GetAwaiter().GetResult(); - public async Task ShakeItAsync() { var oneMilli = TimeSpan.FromMilliseconds(10); @@ -249,11 +244,6 @@ public async Task ShakeItAsync() return u.Concat(d).Any(x => x == marker); } - [Obsolete("Will be removed after async_testkit conversion is done. Use RunAsync instead")] - public void Run() - => RunAsync() - .ConfigureAwait(false).GetAwaiter().GetResult(); - public async Task RunAsync() { try @@ -327,11 +317,13 @@ protected ScriptedTest(ITestOutputHelper output = null) : base(output) Func, Flow> op, int maximumOverrun = 3, int maximumRequest = 3, - int maximumBuffer = 3) - => RunScriptAsync(script, settings, op, maximumOverrun, maximumRequest, maximumBuffer) + int maximumBuffer = 3, + AkkaSpec spec = null) + => RunScriptAsync(spec, script, settings, op, maximumOverrun, maximumRequest, maximumBuffer) .ConfigureAwait(false).GetAwaiter().GetResult(); protected async Task RunScriptAsync( + AkkaSpec spec, Script script, ActorMaterializerSettings settings, Func, Flow> op, @@ -339,9 +331,20 @@ protected ScriptedTest(ITestOutputHelper output = null) : base(output) int maximumRequest = 3, int maximumBuffer = 3) { - var runner = await new ScriptRunner(op, settings, script, maximumOverrun, maximumRequest, maximumBuffer, this) - .InitializeAsync(); - await runner.RunAsync(); + var runner = new ScriptRunner(op, settings, script, maximumOverrun, maximumRequest, maximumBuffer, this); + async Task Run() + { + await runner.InitializeAsync(); + await runner.RunAsync(); + } + + if(spec != null) + await spec.AssertAllStagesStoppedAsync(async () => + { + await Run(); + }, runner.Materializer); + else + await Run(); } protected static IPublisher ToPublisher(Source source, IMaterializer materializer) diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowCollectSpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowCollectSpec.cs index 5e360534d66..f2783b169ba 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FlowCollectSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FlowCollectSpec.cs @@ -8,10 +8,10 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Threading.Tasks; using Akka.Streams.Dsl; using Akka.Streams.Supervision; using Akka.Streams.TestKit; -using Akka.Util.Internal; using Xunit; using Xunit.Abstractions; using static Akka.Streams.Tests.Dsl.TestConfig; @@ -20,6 +20,7 @@ namespace Akka.Streams.Tests.Dsl { public class FlowCollectSpec : ScriptedTest { + private Random Random { get; } = new Random(12345); private ActorMaterializer Materializer { get; } public FlowCollectSpec(ITestOutputHelper helper) : base(helper) @@ -28,76 +29,94 @@ public FlowCollectSpec(ITestOutputHelper helper) : base(helper) Materializer = ActorMaterializer.Create(Sys, settings); } + // No need to use AssertAllStagesStoppedAsync, it is encapsulated in RunScriptAsync [Fact] - public void An_old_behaviour_Collect_must_collect() + public async Task An_old_behaviour_Collect_must_collect() { - var random = new Random(); var script = Script.Create(RandomTestRange(Sys).Select(_ => { - var x = random.Next(0, 10000); + var x = Random.Next(0, 10000); return ((ICollection)new[] { x }, - (x & 1) == 0 ? (ICollection)new[] { (x * x).ToString() } : (ICollection)new string[] { }); + (x & 1) == 0 ? (ICollection)new[] { (x * x).ToString() } : new string[] { }); }).ToArray()); - - RandomTestRange(Sys).ForEach(_ => RunScript(script, Materializer.Settings, flow => flow.Collect(x => x % 2 == 0 ? (x * x).ToString() : null))); + + foreach (var _ in RandomTestRange(Sys)) + { + await RunScriptAsync(this, script, Materializer.Settings, + // This is intentional, testing backward compatibility with old obsolete method +#pragma warning disable CS0618 + flow => flow.Collect(x => x % 2 == 0 ? (x * x).ToString() : null)); +#pragma warning restore CS0618 + } } + // No need to use AssertAllStagesStoppedAsync, it is encapsulated in RunScriptAsync [Fact] - public void A_Collect_must_collect() + public async Task A_Collect_must_collect() { - var random = new Random(); var script = Script.Create(RandomTestRange(Sys).Select(_ => { - var x = random.Next(0, 10000); + var x = Random.Next(0, 10000); return ((ICollection)new[] { x }, - (x & 1) == 0 ? (ICollection)new[] { (x*x).ToString() } : (ICollection)new string[] {}); + (x & 1) == 0 ? (ICollection)new[] { (x*x).ToString() } : new string[] {}); }).ToArray()); - RandomTestRange(Sys).ForEach(_=>RunScript( - script, - Materializer.Settings, - flow => flow.Collect(x => x % 2 == 0, x => (x*x).ToString()))); + foreach (var _ in RandomTestRange(Sys)) + { + await RunScriptAsync(this, script, Materializer.Settings, + flow => flow.Collect(x => x % 2 == 0, x => (x * x).ToString())); + } } [Fact] - public void An_old_behaviour_Collect_must_restart_when_Collect_throws() + public async Task An_old_behaviour_Collect_must_restart_when_Collect_throws() { - Func throwOnTwo = x => + await this.AssertAllStagesStoppedAsync(async () => { - if (x == 2) - throw new TestException(""); - return x; - }; + int ThrowOnTwo(int x) => x == 2 ? throw new TestException("") : x; - var probe = - Source.From(Enumerable.Range(1, 3)) - .Collect(throwOnTwo) - .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.RestartingDecider)) - .RunWith(this.SinkProbe(), Materializer); - probe.Request(1); - probe.ExpectNext(1); - probe.Request(1); - probe.ExpectNext(3); - probe.Request(1); - probe.ExpectComplete(); + var probe = + Source.From(Enumerable.Range(1, 3)) + // This is intentional, testing backward compatibility with old obsolete method +#pragma warning disable CS0618 + .Collect(ThrowOnTwo) +#pragma warning restore CS0618 + .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.RestartingDecider)) + .RunWith(this.SinkProbe(), Materializer); + + await probe.AsyncBuilder() + .Request(1) + .ExpectNext(1) + .Request(1) + .ExpectNext(3) + .Request(1) + .ExpectComplete() + .ExecuteAsync(); + }, Materializer); } [Fact] - public void A_Collect_must_restart_when_Collect_throws() + public async Task A_Collect_must_restart_when_Collect_throws() { - bool ThrowOnTwo(int x) => x == 2 ? throw new TestException("") : true; + await this.AssertAllStagesStoppedAsync(async () => + { + bool ThrowOnTwo(int x) => x == 2 ? throw new TestException("") : true; + + var probe = + Source.From(Enumerable.Range(1, 3)) + .Collect(ThrowOnTwo, x => x) + .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.RestartingDecider)) + .RunWith(this.SinkProbe(), Materializer); - var probe = - Source.From(Enumerable.Range(1, 3)) - .Collect(ThrowOnTwo, x => x) - .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.RestartingDecider)) - .RunWith(this.SinkProbe(), Materializer); - probe.Request(1); - probe.ExpectNext(1); - probe.Request(1); - probe.ExpectNext(3); - probe.Request(1); - probe.ExpectComplete(); + await probe.AsyncBuilder() + .Request(1) + .ExpectNext(1) + .Request(1) + .ExpectNext(3) + .Request(1) + .ExpectComplete() + .ExecuteAsync(); + }, Materializer); } } } diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowGroupedSpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowGroupedSpec.cs index 1bdc06ad6d0..894e9e15ea1 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FlowGroupedSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FlowGroupedSpec.cs @@ -8,6 +8,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Threading.Tasks; using Akka.Streams.Dsl; using Akka.Streams.TestKit; using Akka.Util.Internal; @@ -19,6 +20,7 @@ namespace Akka.Streams.Tests.Dsl { public class FlowGroupedSpec : ScriptedTest { + private ActorMaterializerSettings Settings { get; } public FlowGroupedSpec(ITestOutputHelper output = null) : base(output) @@ -26,29 +28,37 @@ public FlowGroupedSpec(ITestOutputHelper output = null) : base(output) Settings = ActorMaterializerSettings.Create(Sys).WithInputBuffer(2, 16); } - private static readonly Random Random = new Random(); - private static ICollection RandomSeq(int n) => Enumerable.Range(1, n).Select(_ => Random.Next()).ToList(); + private readonly Random _random = new Random(12345); + private ICollection RandomSeq(int n) => Enumerable.Range(1, n).Select(_ => _random.Next()).ToList(); - private static (ICollection, ICollection>) RandomTest(int n) + private (ICollection, ICollection>) RandomTest(int n) { var s = RandomSeq(n); return (s, new[] {s}); } + // No need to use AssertAllStagesStoppedAsync, it is encapsulated in RunScriptAsync [Fact] - public void A_Grouped_must_group_evenly() + public async Task A_Grouped_must_group_evenly() { - var testLength = Random.Next(1, 16); + var testLength = _random.Next(1, 16); var script = Script.Create(RandomTestRange(Sys).Select(_ => RandomTest(testLength)).ToArray()); - RandomTestRange(Sys).ForEach(_ => RunScript(script, Settings, flow => flow.Grouped(testLength))); + foreach (var _ in RandomTestRange(Sys)) + { + await RunScriptAsync(this, script, Settings, flow => flow.Grouped(testLength)); + } } + // No need to use AssertAllStagesStoppedAsync, it is encapsulated in RunScriptAsync [Fact] - public void A_Grouped_must_group_with_rest() + public async Task A_Grouped_must_group_with_rest() { - var testLength = Random.Next(1, 16); + var testLength = _random.Next(1, 16); var script = Script.Create(RandomTestRange(Sys).Select(_ => RandomTest(testLength)).Concat(RandomTest(1)).ToArray()); - RandomTestRange(Sys).ForEach(_ => RunScript(script, Settings, flow => flow.Grouped(testLength))); + foreach (var _ in RandomTestRange(Sys)) + { + await RunScriptAsync(this, script, Settings, flow => flow.Grouped(testLength)); + } } } }