forked from akkadotnet/akka.net
-
Notifications
You must be signed in to change notification settings - Fork 1
/
FlowCollectSpec.cs
122 lines (109 loc) · 4.65 KB
/
FlowCollectSpec.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
//-----------------------------------------------------------------------
// <copyright file="FlowCollectSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2021 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2021 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Akka.Streams.Dsl;
using Akka.Streams.Supervision;
using Akka.Streams.TestKit;
using Xunit;
using Xunit.Abstractions;
using static Akka.Streams.Tests.Dsl.TestConfig;
namespace Akka.Streams.Tests.Dsl
{
public class FlowCollectSpec : ScriptedTest
{
private Random Random { get; } = new Random(12345);
private ActorMaterializer Materializer { get; }
public FlowCollectSpec(ITestOutputHelper helper) : base(helper)
{
var settings = ActorMaterializerSettings.Create(Sys);
Materializer = ActorMaterializer.Create(Sys, settings);
}
// No need to use AssertAllStagesStoppedAsync, it is encapsulated in RunScriptAsync
[Fact]
public async Task An_old_behaviour_Collect_must_collect()
{
var script = Script.Create(RandomTestRange(Sys).Select(_ =>
{
var x = Random.Next(0, 10000);
return ((ICollection<int>)new[] { x },
(x & 1) == 0 ? (ICollection<string>)new[] { (x * x).ToString() } : new string[] { });
}).ToArray());
foreach (var _ in RandomTestRange(Sys))
{
await RunScriptAsync(this, script, Materializer.Settings,
// This is intentional, testing backward compatibility with old obsolete method
#pragma warning disable CS0618
flow => flow.Collect(x => x % 2 == 0 ? (x * x).ToString() : null));
#pragma warning restore CS0618
}
}
// No need to use AssertAllStagesStoppedAsync, it is encapsulated in RunScriptAsync
[Fact]
public async Task A_Collect_must_collect()
{
var script = Script.Create(RandomTestRange(Sys).Select(_ =>
{
var x = Random.Next(0, 10000);
return ((ICollection<int>)new[] { x },
(x & 1) == 0 ? (ICollection<string>)new[] { (x*x).ToString() } : new string[] {});
}).ToArray());
foreach (var _ in RandomTestRange(Sys))
{
await RunScriptAsync(this, script, Materializer.Settings,
flow => flow.Collect(x => x % 2 == 0, x => (x * x).ToString()));
}
}
[Fact]
public async Task An_old_behaviour_Collect_must_restart_when_Collect_throws()
{
await this.AssertAllStagesStoppedAsync(async () =>
{
int ThrowOnTwo(int x) => x == 2 ? throw new TestException("") : x;
var probe =
Source.From(Enumerable.Range(1, 3))
// This is intentional, testing backward compatibility with old obsolete method
#pragma warning disable CS0618
.Collect(ThrowOnTwo)
#pragma warning restore CS0618
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.RestartingDecider))
.RunWith(this.SinkProbe<int>(), Materializer);
await probe.AsyncBuilder()
.Request(1)
.ExpectNext(1)
.Request(1)
.ExpectNext(3)
.Request(1)
.ExpectComplete()
.ExecuteAsync();
}, Materializer);
}
[Fact]
public async Task A_Collect_must_restart_when_Collect_throws()
{
await this.AssertAllStagesStoppedAsync(async () =>
{
bool ThrowOnTwo(int x) => x == 2 ? throw new TestException("") : true;
var probe =
Source.From(Enumerable.Range(1, 3))
.Collect(ThrowOnTwo, x => x)
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.RestartingDecider))
.RunWith(this.SinkProbe<int>(), Materializer);
await probe.AsyncBuilder()
.Request(1)
.ExpectNext(1)
.Request(1)
.ExpectNext(3)
.Request(1)
.ExpectComplete()
.ExecuteAsync();
}, Materializer);
}
}
}