Skip to content

Commit

Permalink
[Async TestKit] Convert Akka.Streams.Tests to async - Dsl.FlowCollect…
Browse files Browse the repository at this point in the history
…Spec (akkadotnet#5960)

* Convert Akka.Streams.Tests to async - Dsl.FlowCollectSpec

* Update ScriptedTest to use AssertAllStagesStoppedAsync

* Rearrange parameter positions

Co-authored-by: Aaron Stannard <aaron@petabridge.com>
  • Loading branch information
Arkatufus and Aaronontheweb committed May 25, 2022
1 parent a3030ff commit a4a6fdd
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 69 deletions.
1 change: 1 addition & 0 deletions src/core/Akka.Streams.TestKit/ChainSetup.cs
Expand Up @@ -80,6 +80,7 @@ public class ChainSetup<TIn, TOut, TMat>

public ActorMaterializerSettings Settings { get; }

public ActorMaterializer Materializer => _materializer;
public TestPublisher.ManualProbe<TIn> Upstream
{
get
Expand Down
33 changes: 18 additions & 15 deletions src/core/Akka.Streams.TestKit/ScriptedTest.cs
Expand Up @@ -202,11 +202,6 @@ public void Request(int demand)
_outstandingDemand += demand;
}

[Obsolete("Will be removed after async_testkit conversion is done. Use ShakeItAsync instead")]
public bool ShakeIt()
=> ShakeItAsync()
.ConfigureAwait(false).GetAwaiter().GetResult();

public async Task<bool> ShakeItAsync()
{
var oneMilli = TimeSpan.FromMilliseconds(10);
Expand Down Expand Up @@ -249,11 +244,6 @@ public async Task<bool> ShakeItAsync()
return u.Concat(d).Any(x => x == marker);
}

[Obsolete("Will be removed after async_testkit conversion is done. Use RunAsync instead")]
public void Run()
=> RunAsync()
.ConfigureAwait(false).GetAwaiter().GetResult();

public async Task RunAsync()
{
try
Expand Down Expand Up @@ -327,21 +317,34 @@ protected ScriptedTest(ITestOutputHelper output = null) : base(output)
Func<Flow<TIn2, TIn2, NotUsed>, Flow<TIn2, TOut2, TMat2>> op,
int maximumOverrun = 3,
int maximumRequest = 3,
int maximumBuffer = 3)
=> RunScriptAsync(script, settings, op, maximumOverrun, maximumRequest, maximumBuffer)
int maximumBuffer = 3,
AkkaSpec spec = null)
=> RunScriptAsync(spec, script, settings, op, maximumOverrun, maximumRequest, maximumBuffer)
.ConfigureAwait(false).GetAwaiter().GetResult();

protected async Task RunScriptAsync<TIn2, TOut2, TMat2>(
AkkaSpec spec,
Script<TIn2, TOut2> script,
ActorMaterializerSettings settings,
Func<Flow<TIn2, TIn2, NotUsed>, Flow<TIn2, TOut2, TMat2>> op,
int maximumOverrun = 3,
int maximumRequest = 3,
int maximumBuffer = 3)
{
var runner = await new ScriptRunner<TIn2, TOut2, TMat2>(op, settings, script, maximumOverrun, maximumRequest, maximumBuffer, this)
.InitializeAsync();
await runner.RunAsync();
var runner = new ScriptRunner<TIn2, TOut2, TMat2>(op, settings, script, maximumOverrun, maximumRequest, maximumBuffer, this);
async Task Run()
{
await runner.InitializeAsync();
await runner.RunAsync();
}

if(spec != null)
await spec.AssertAllStagesStoppedAsync(async () =>
{
await Run();
}, runner.Materializer);
else
await Run();
}

protected static IPublisher<TOut> ToPublisher<TOut>(Source<TOut, NotUsed> source, IMaterializer materializer)
Expand Down
109 changes: 64 additions & 45 deletions src/core/Akka.Streams.Tests/Dsl/FlowCollectSpec.cs
Expand Up @@ -8,10 +8,10 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Akka.Streams.Dsl;
using Akka.Streams.Supervision;
using Akka.Streams.TestKit;
using Akka.Util.Internal;
using Xunit;
using Xunit.Abstractions;
using static Akka.Streams.Tests.Dsl.TestConfig;
Expand All @@ -20,6 +20,7 @@ namespace Akka.Streams.Tests.Dsl
{
public class FlowCollectSpec : ScriptedTest
{
private Random Random { get; } = new Random(12345);
private ActorMaterializer Materializer { get; }

public FlowCollectSpec(ITestOutputHelper helper) : base(helper)
Expand All @@ -28,76 +29,94 @@ public FlowCollectSpec(ITestOutputHelper helper) : base(helper)
Materializer = ActorMaterializer.Create(Sys, settings);
}

// No need to use AssertAllStagesStoppedAsync, it is encapsulated in RunScriptAsync
[Fact]
public void An_old_behaviour_Collect_must_collect()
public async Task An_old_behaviour_Collect_must_collect()
{
var random = new Random();
var script = Script.Create(RandomTestRange(Sys).Select(_ =>
{
var x = random.Next(0, 10000);
var x = Random.Next(0, 10000);
return ((ICollection<int>)new[] { x },
(x & 1) == 0 ? (ICollection<string>)new[] { (x * x).ToString() } : (ICollection<string>)new string[] { });
(x & 1) == 0 ? (ICollection<string>)new[] { (x * x).ToString() } : new string[] { });
}).ToArray());

RandomTestRange(Sys).ForEach(_ => RunScript(script, Materializer.Settings, flow => flow.Collect(x => x % 2 == 0 ? (x * x).ToString() : null)));

foreach (var _ in RandomTestRange(Sys))
{
await RunScriptAsync(this, script, Materializer.Settings,
// This is intentional, testing backward compatibility with old obsolete method
#pragma warning disable CS0618
flow => flow.Collect(x => x % 2 == 0 ? (x * x).ToString() : null));
#pragma warning restore CS0618
}
}

// No need to use AssertAllStagesStoppedAsync, it is encapsulated in RunScriptAsync
[Fact]
public void A_Collect_must_collect()
public async Task A_Collect_must_collect()
{
var random = new Random();
var script = Script.Create(RandomTestRange(Sys).Select(_ =>
{
var x = random.Next(0, 10000);
var x = Random.Next(0, 10000);
return ((ICollection<int>)new[] { x },
(x & 1) == 0 ? (ICollection<string>)new[] { (x*x).ToString() } : (ICollection<string>)new string[] {});
(x & 1) == 0 ? (ICollection<string>)new[] { (x*x).ToString() } : new string[] {});
}).ToArray());

RandomTestRange(Sys).ForEach(_=>RunScript(
script,
Materializer.Settings,
flow => flow.Collect(x => x % 2 == 0, x => (x*x).ToString())));
foreach (var _ in RandomTestRange(Sys))
{
await RunScriptAsync(this, script, Materializer.Settings,
flow => flow.Collect(x => x % 2 == 0, x => (x * x).ToString()));
}
}

[Fact]
public void An_old_behaviour_Collect_must_restart_when_Collect_throws()
public async Task An_old_behaviour_Collect_must_restart_when_Collect_throws()
{
Func<int, int> throwOnTwo = x =>
await this.AssertAllStagesStoppedAsync(async () =>
{
if (x == 2)
throw new TestException("");
return x;
};
int ThrowOnTwo(int x) => x == 2 ? throw new TestException("") : x;
var probe =
Source.From(Enumerable.Range(1, 3))
.Collect(throwOnTwo)
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.RestartingDecider))
.RunWith(this.SinkProbe<int>(), Materializer);
probe.Request(1);
probe.ExpectNext(1);
probe.Request(1);
probe.ExpectNext(3);
probe.Request(1);
probe.ExpectComplete();
var probe =
Source.From(Enumerable.Range(1, 3))
// This is intentional, testing backward compatibility with old obsolete method
#pragma warning disable CS0618
.Collect(ThrowOnTwo)
#pragma warning restore CS0618
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.RestartingDecider))
.RunWith(this.SinkProbe<int>(), Materializer);
await probe.AsyncBuilder()
.Request(1)
.ExpectNext(1)
.Request(1)
.ExpectNext(3)
.Request(1)
.ExpectComplete()
.ExecuteAsync();
}, Materializer);
}

[Fact]
public void A_Collect_must_restart_when_Collect_throws()
public async Task A_Collect_must_restart_when_Collect_throws()
{
bool ThrowOnTwo(int x) => x == 2 ? throw new TestException("") : true;
await this.AssertAllStagesStoppedAsync(async () =>
{
bool ThrowOnTwo(int x) => x == 2 ? throw new TestException("") : true;
var probe =
Source.From(Enumerable.Range(1, 3))
.Collect(ThrowOnTwo, x => x)
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.RestartingDecider))
.RunWith(this.SinkProbe<int>(), Materializer);
var probe =
Source.From(Enumerable.Range(1, 3))
.Collect(ThrowOnTwo, x => x)
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.RestartingDecider))
.RunWith(this.SinkProbe<int>(), Materializer);
probe.Request(1);
probe.ExpectNext(1);
probe.Request(1);
probe.ExpectNext(3);
probe.Request(1);
probe.ExpectComplete();
await probe.AsyncBuilder()
.Request(1)
.ExpectNext(1)
.Request(1)
.ExpectNext(3)
.Request(1)
.ExpectComplete()
.ExecuteAsync();
}, Materializer);
}
}
}
28 changes: 19 additions & 9 deletions src/core/Akka.Streams.Tests/Dsl/FlowGroupedSpec.cs
Expand Up @@ -8,6 +8,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Akka.Streams.Dsl;
using Akka.Streams.TestKit;
using Akka.Util.Internal;
Expand All @@ -19,36 +20,45 @@ namespace Akka.Streams.Tests.Dsl
{
public class FlowGroupedSpec : ScriptedTest
{

private ActorMaterializerSettings Settings { get; }

public FlowGroupedSpec(ITestOutputHelper output = null) : base(output)
{
Settings = ActorMaterializerSettings.Create(Sys).WithInputBuffer(2, 16);
}

private static readonly Random Random = new Random();
private static ICollection<int> RandomSeq(int n) => Enumerable.Range(1, n).Select(_ => Random.Next()).ToList();
private readonly Random _random = new Random(12345);
private ICollection<int> RandomSeq(int n) => Enumerable.Range(1, n).Select(_ => _random.Next()).ToList();

private static (ICollection<int>, ICollection<IEnumerable<int>>) RandomTest(int n)
private (ICollection<int>, ICollection<IEnumerable<int>>) RandomTest(int n)
{
var s = RandomSeq(n);
return (s, new[] {s});
}

// No need to use AssertAllStagesStoppedAsync, it is encapsulated in RunScriptAsync
[Fact]
public void A_Grouped_must_group_evenly()
public async Task A_Grouped_must_group_evenly()
{
var testLength = Random.Next(1, 16);
var testLength = _random.Next(1, 16);
var script = Script.Create(RandomTestRange(Sys).Select(_ => RandomTest(testLength)).ToArray());
RandomTestRange(Sys).ForEach(_ => RunScript(script, Settings, flow => flow.Grouped(testLength)));
foreach (var _ in RandomTestRange(Sys))
{
await RunScriptAsync(this, script, Settings, flow => flow.Grouped(testLength));
}
}

// No need to use AssertAllStagesStoppedAsync, it is encapsulated in RunScriptAsync
[Fact]
public void A_Grouped_must_group_with_rest()
public async Task A_Grouped_must_group_with_rest()
{
var testLength = Random.Next(1, 16);
var testLength = _random.Next(1, 16);
var script = Script.Create(RandomTestRange(Sys).Select(_ => RandomTest(testLength)).Concat(RandomTest(1)).ToArray());
RandomTestRange(Sys).ForEach(_ => RunScript(script, Settings, flow => flow.Grouped(testLength)));
foreach (var _ in RandomTestRange(Sys))
{
await RunScriptAsync(this, script, Settings, flow => flow.Grouped(testLength));
}
}
}
}

0 comments on commit a4a6fdd

Please sign in to comment.