From 906343903a1e1408b34f5d473b89c57e99353e09 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Wed, 1 Jun 2022 06:15:42 +0700 Subject: [PATCH] Convert Akka.Streams.Tests to async - Dsl.RestartSpec (#5955) Co-authored-by: Aaron Stannard --- .../Akka.Streams.Tests/Dsl/RestartSpec.cs | 924 +++++++++--------- 1 file changed, 480 insertions(+), 444 deletions(-) diff --git a/src/core/Akka.Streams.Tests/Dsl/RestartSpec.cs b/src/core/Akka.Streams.Tests/Dsl/RestartSpec.cs index 4eb9f992056..257c81b18bc 100644 --- a/src/core/Akka.Streams.Tests/Dsl/RestartSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/RestartSpec.cs @@ -15,8 +15,11 @@ using Akka.Streams.Dsl; using Akka.Streams.TestKit; using Akka.TestKit; +using Akka.TestKit.Extensions; +using Akka.Tests.Shared.Internals; using Akka.Util.Internal; using FluentAssertions; +using FluentAssertions.Extensions; using Xunit; using Xunit.Abstractions; @@ -47,10 +50,10 @@ public RestartSpec(ITestOutputHelper output) // Source // - [Fact(Skip = "Skipped for async_testkit conversion build")] - public void A_restart_with_backoff_source_should_run_normally() + [Fact] + public async Task A_restart_with_backoff_source_should_run_normally() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var created = new AtomicCounter(0); var probe = RestartSource.WithBackoff(() => @@ -59,22 +62,20 @@ public void A_restart_with_backoff_source_should_run_normally() return Source.Repeat("a"); }, _shortRestartSettings).RunWith(this.SinkProbe(), Materializer); - probe.RequestNext("a"); - probe.RequestNext("a"); - probe.RequestNext("a"); - probe.RequestNext("a"); - probe.RequestNext("a"); + await probe.AsyncBuilder() + .ExpectNextN(Enumerable.Repeat("a", 5)) + .ExecuteAsync(); created.Current.Should().Be(1); - probe.Cancel(); + await probe.CancelAsync(); }, Materializer); } - [Fact(Skip = "Skipped for async_testkit conversion build")] - public void A_restart_with_backoff_source_should_restart_on_completion() + [Fact] + public async Task A_restart_with_backoff_source_should_restart_on_completion() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var created = new AtomicCounter(0); var probe = RestartSource.WithBackoff(() => @@ -83,22 +84,20 @@ public void A_restart_with_backoff_source_should_restart_on_completion() return Source.From(new List { "a", "b" }); }, _shortRestartSettings).RunWith(this.SinkProbe(), Materializer); - probe.RequestNext("a"); - probe.RequestNext("b"); - probe.RequestNext("a"); - probe.RequestNext("b"); - probe.RequestNext("a"); + await probe.AsyncBuilder() + .ExpectNext("a", "b", "a", "b", "a") + .ExecuteAsync(); created.Current.Should().Be(3); - probe.Cancel(); + await probe.CancelAsync(); }, Materializer); } - [Fact(Skip = "Skipped for async_testkit conversion build")] - public void A_restart_with_backoff_source_should_restart_on_failure() + [Fact] + public async Task A_restart_with_backoff_source_should_restart_on_failure() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var created = new AtomicCounter(0); var probe = RestartSource.WithBackoff(() => @@ -113,22 +112,20 @@ public void A_restart_with_backoff_source_should_restart_on_failure() return Source.From(enumerable); }, _shortRestartSettings).RunWith(this.SinkProbe(), Materializer); - probe.RequestNext("a"); - probe.RequestNext("b"); - probe.RequestNext("a"); - probe.RequestNext("b"); - probe.RequestNext("a"); + await probe.AsyncBuilder() + .ExpectNext("a", "b", "a", "b", "a") + .ExecuteAsync(); created.Current.Should().Be(3); - probe.Cancel(); + await probe.CancelAsync(); }, Materializer); } - [Fact(Skip = "Skipped for async_testkit conversion build")] - public void A_restart_with_backoff_source_should_backoff_before_restart() + [Fact] + public async Task A_restart_with_backoff_source_should_backoff_before_restart() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var created = new AtomicCounter(0); var probe = RestartSource.WithBackoff(() => @@ -138,26 +135,30 @@ public void A_restart_with_backoff_source_should_backoff_before_restart() }, _restartSettings) .RunWith(this.SinkProbe(), Materializer); - probe.RequestNext("a"); - probe.RequestNext("b"); + await probe.AsyncBuilder() + .ExpectNext("a", "b") + .ExecuteAsync(); // There should be a delay of at least _minBackoff before we receive the element after restart var deadline = (_minBackoff - TimeSpan.FromMilliseconds(1)).FromNow(); - probe.Request(1); - probe.ExpectNext("a"); + await probe.AsyncBuilder() + .Request(1) + .ExpectNext("a") + .ExecuteAsync(); + deadline.IsOverdue.Should().Be(true); created.Current.Should().Be(2); - probe.Cancel(); + await probe.CancelAsync(); }, Materializer); } - [Fact(Skip = "Skipped for async_testkit conversion build")] - public void A_restart_with_backoff_source_should_reset_exponential_backoff_back_to_minimum_when_source_runs_for_at_least_minimum_backoff_without_completing() + [Fact] + public async Task A_restart_with_backoff_source_should_reset_exponential_backoff_back_to_minimum_when_source_runs_for_at_least_minimum_backoff_without_completing() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var created = new AtomicCounter(0); var probe = RestartSource.WithBackoff(() => @@ -167,35 +168,37 @@ public void A_restart_with_backoff_source_should_reset_exponential_backoff_back_ }, _restartSettings) .RunWith(this.SinkProbe(), Materializer); - probe.RequestNext("a"); - probe.RequestNext("b"); - // There should be _minBackoff delay - probe.RequestNext("a"); - probe.RequestNext("b"); - probe.Request(1); + await probe.AsyncBuilder() + .ExpectNext("a", "b") + // There should be _minBackoff delay + .ExpectNext("a", "b") + .Request(1) + .ExecuteAsync(); // The probe should now be backing off again with with increased backoff // Now wait for the delay to pass, then it will start the new source, we also want to wait for the // subsequent backoff to pass, so it resets the restart count - Thread.Sleep(_minBackoff + TimeSpan.FromTicks(_minBackoff.Ticks * 2) + _minBackoff + TimeSpan.FromMilliseconds(500)); + await Task.Delay(_minBackoff + TimeSpan.FromTicks(_minBackoff.Ticks * 2) + _minBackoff + TimeSpan.FromMilliseconds(500)); - probe.ExpectNext("a"); - probe.RequestNext("b"); + await probe.AsyncBuilder() + .ExpectNext("a", "b") + .ExecuteAsync(); // We should have reset, so the restart delay should be back, ie we should receive the // next element within < 2 * _minBackoff - probe.RequestNext(TimeSpan.FromTicks(_minBackoff.Ticks * 2) - TimeSpan.FromMilliseconds(10)).Should().Be("a"); + (await probe.RequestNextAsync(TimeSpan.FromTicks(_minBackoff.Ticks * 2) - TimeSpan.FromMilliseconds(10))) + .Should().Be("a"); created.Current.Should().Be(4); - probe.Cancel(); + await probe.CancelAsync(); }, Materializer); } - [Fact(Skip = "Skipped for async_testkit conversion build")] - public void A_restart_with_backoff_source_should_cancel_the_currently_running_source_when_cancelled() + [Fact] + public async Task A_restart_with_backoff_source_should_cancel_the_currently_running_source_when_cancelled() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var created = new AtomicCounter(0); var tcs = new TaskCompletionSource(); @@ -210,21 +213,23 @@ public void A_restart_with_backoff_source_should_cancel_the_currently_running_so }); }, _shortRestartSettings).RunWith(this.SinkProbe(), Materializer); - probe.RequestNext("a"); - probe.Cancel(); + await probe.AsyncBuilder() + .RequestNext("a") + .Cancel() + .ExecuteAsync(); tcs.Task.Result.Should().BeSameAs(Done.Instance); // Wait to ensure it isn't restarted - Thread.Sleep(200); + await Task.Delay(200); created.Current.Should().Be(1); }, Materializer); } - [Fact(Skip = "Skipped for async_testkit conversion build")] - public void A_restart_with_backoff_source_should_not_restart_the_source_when_cancelled_while_backing_off() + [Fact] + public async Task A_restart_with_backoff_source_should_not_restart_the_source_when_cancelled_while_backing_off() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var created = new AtomicCounter(0); var probe = RestartSource.WithBackoff(() => @@ -233,21 +238,23 @@ public void A_restart_with_backoff_source_should_not_restart_the_source_when_can return Source.Single("a"); }, _restartSettings).RunWith(this.SinkProbe(), Materializer); - probe.RequestNext("a"); - probe.Request(1); - // Should be backing off now - probe.Cancel(); + await probe.AsyncBuilder() + .RequestNext("a") + .Request(1) + // Should be backing off now + .Cancel() + .ExecuteAsync(); // Wait to ensure it isn't restarted - Thread.Sleep(_minBackoff + TimeSpan.FromMilliseconds(100)); + await Task.Delay(_minBackoff + TimeSpan.FromMilliseconds(100)); created.Current.Should().Be(1); }, Materializer); } - [Fact(Skip = "Skipped for async_testkit conversion build")] - public void A_restart_with_backoff_source_should_stop_on_completion_if_it_should_only_be_restarted_in_failures() + [Fact] + public async Task A_restart_with_backoff_source_should_stop_on_completion_if_it_should_only_be_restarted_in_failures() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var created = new AtomicCounter(0); var probe = RestartSource.OnFailuresWithBackoff(() => @@ -264,23 +271,22 @@ public void A_restart_with_backoff_source_should_stop_on_completion_if_it_should return Source.From(enumerable); }, _shortRestartSettings).RunWith(this.SinkProbe(), Materializer); - probe.RequestNext("a"); - probe.RequestNext("b"); - // will fail, and will restart - probe.RequestNext("a"); - probe.RequestNext("b"); - probe.RequestNext("c"); + await probe.AsyncBuilder() + .ExpectNext("a", "b") + // will fail, and will restart + .ExpectNext("a", "b", "c") + .ExecuteAsync(); created.Current.Should().Be(2); - probe.Cancel(); + await probe.CancelAsync(); }, Materializer); } - [Fact(Skip = "Skipped for async_testkit conversion build")] - public void A_restart_with_backoff_source_should_restart_on_failure_when_only_due_to_failures_should_be_restarted() + [Fact] + public async Task A_restart_with_backoff_source_should_restart_on_failure_when_only_due_to_failures_should_be_restarted() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var created = new AtomicCounter(0); var probe = RestartSource.OnFailuresWithBackoff(() => @@ -295,24 +301,22 @@ public void A_restart_with_backoff_source_should_restart_on_failure_when_only_du return Source.From(enumerable); }, _shortRestartSettings).RunWith(this.SinkProbe(), Materializer); - probe.RequestNext("a"); - probe.RequestNext("b"); - probe.RequestNext("a"); - probe.RequestNext("b"); - probe.RequestNext("a"); + await probe.AsyncBuilder() + .ExpectNext("a", "b", "a", "b", "a") + .ExecuteAsync(); created.Current.Should().Be(3); - probe.Cancel(); + await probe.CancelAsync(); }, Materializer); } // Flaky test, ExpectComplete times out with the default 3 seconds value under heavy load. // Fail rate was 1:500 - [Fact(Skip = "Skipped for async_testkit conversion build")] - public void A_restart_with_backoff_source_should_not_restart_the_source_when_maxRestarts_is_reached() + [Fact] + public async Task A_restart_with_backoff_source_should_not_restart_the_source_when_maxRestarts_is_reached() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var created = new AtomicCounter(0); var probe = RestartSource.WithBackoff(() => @@ -322,20 +326,21 @@ public void A_restart_with_backoff_source_should_not_restart_the_source_when_max }, _shortRestartSettings.WithMaxRestarts(1, _shortMinBackoff)) .RunWith(this.SinkProbe(), Materializer); - probe.RequestNext("a"); - probe.RequestNext("a"); - probe.ExpectComplete(); + await probe.AsyncBuilder() + .ExpectNext("a", "a") + .ExpectComplete() + .ExecuteAsync(); created.Current.Should().Be(2); - probe.Cancel(); + await probe.CancelAsync(); }, Materializer); } - [Fact(Skip = "Skipped for async_testkit conversion build")] - public void A_restart_with_backoff_source_should_reset_maxRestarts_when_source_runs_for_at_least_minimum_backoff_without_completing() + [Fact] + public async Task A_restart_with_backoff_source_should_reset_maxRestarts_when_source_runs_for_at_least_minimum_backoff_without_completing() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var created = new AtomicCounter(0); var probe = RestartSource.WithBackoff(() => @@ -345,29 +350,33 @@ public void A_restart_with_backoff_source_should_reset_maxRestarts_when_source_r }, _restartSettings.WithMaxRestarts(2, _minBackoff)) .RunWith(this.SinkProbe(), Materializer); - probe.RequestNext("a"); - // There should be minBackoff delay - probe.RequestNext("a"); + await probe.AsyncBuilder() + .RequestNext("a") + // There should be minBackoff delay + .RequestNext("a") + .ExecuteAsync(); // The probe should now be backing off again with with increased backoff // Now wait for the delay to pass, then it will start the new source, we also want to wait for the // subsequent backoff to pass - Thread.Sleep(_minBackoff + TimeSpan.FromTicks(_minBackoff.Ticks * 2) + _minBackoff + TimeSpan.FromMilliseconds(500)); + await Task.Delay(_minBackoff + TimeSpan.FromTicks(_minBackoff.Ticks * 2) + _minBackoff + TimeSpan.FromMilliseconds(500)); - probe.RequestNext("a"); - // We now are able to trigger the third restart, since enough time has elapsed to reset the counter - probe.RequestNext("a"); + await probe.AsyncBuilder() + .RequestNext("a") + // We now are able to trigger the third restart, since enough time has elapsed to reset the counter + .RequestNext("a") + .ExecuteAsync(); created.Current.Should().Be(4); - probe.Cancel(); + await probe.CancelAsync(); }, Materializer); } - [Fact(Skip = "Skipped for async_testkit conversion build")] - public void A_restart_with_backoff_source_should_allow_using_withMaxRestarts_instead_of_minBackoff_to_determine_the_maxRestarts_reset_time() + [Fact] + public async Task A_restart_with_backoff_source_should_allow_using_withMaxRestarts_instead_of_minBackoff_to_determine_the_maxRestarts_reset_time() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var created = new AtomicCounter(0); var probe = RestartSource.WithBackoff(() => @@ -377,19 +386,21 @@ public void A_restart_with_backoff_source_should_allow_using_withMaxRestarts_ins }, _shortRestartSettings.WithMaxRestarts(2, TimeSpan.FromSeconds(1))) .RunWith(this.SinkProbe(), Materializer); - probe.RequestNext("a"); - probe.RequestNext("a"); - - Thread.Sleep(_shortMinBackoff + TimeSpan.FromTicks(_shortMinBackoff.Ticks * 2) + _shortMinBackoff); // if using shortMinBackoff as deadline cause reset + await probe.AsyncBuilder() + .ExpectNext("a", "a") + .ExecuteAsync(); - probe.RequestNext("a"); + await Task.Delay(_shortMinBackoff + TimeSpan.FromTicks(_shortMinBackoff.Ticks * 2) + _shortMinBackoff); // if using shortMinBackoff as deadline cause reset - probe.Request(1); - probe.ExpectComplete(); + await probe.AsyncBuilder() + .RequestNext("a") + .Request(1) + .ExpectComplete() + .ExecuteAsync(); created.Current.Should().Be(3); - probe.Cancel(); + await probe.CancelAsync(); }, Materializer); } @@ -397,10 +408,10 @@ public void A_restart_with_backoff_source_should_allow_using_withMaxRestarts_ins // Sink // - [Fact(Skip = "Skipped for async_testkit conversion build")] - public void A_restart_with_backoff_sink_should_run_normally() + [Fact] + public async Task A_restart_with_backoff_sink_should_run_normally() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var created = new AtomicCounter(0); var tcs = new TaskCompletionSource>(); @@ -414,20 +425,23 @@ public void A_restart_with_backoff_sink_should_run_normally() }); }, _shortRestartSettings), Keep.Left).Run(Materializer); - probe.SendNext("a"); - probe.SendNext("b"); - probe.SendNext("c"); - probe.SendComplete(); + await probe.AsyncBuilder() + .SendNext("a") + .SendNext("b") + .SendNext("c") + .SendComplete() + .ExecuteAsync(); + await tcs.Task.ShouldCompleteWithin(3.Seconds()); tcs.Task.Result.Should().ContainInOrder("a", "b", "c"); created.Current.Should().Be(1); }, Materializer); } - [Fact(Skip = "Skipped for async_testkit conversion build")] - public void A_restart_with_backoff_sink_should_restart_on_cancellation() + [Fact] + public async Task A_restart_with_backoff_sink_should_restart_on_cancellation() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var created = new AtomicCounter(0); var (queue, sinkProbe) = this.SourceProbe().ToMaterialized(this.SinkProbe(), Keep.Both).Run(Materializer); @@ -439,27 +453,27 @@ public void A_restart_with_backoff_sink_should_restart_on_cancellation() }, _shortRestartSettings), Keep.Left) .Run(Materializer); - probe.SendNext("a"); - sinkProbe.RequestNext("a"); - probe.SendNext("b"); - sinkProbe.RequestNext("b"); - probe.SendNext("cancel"); - sinkProbe.RequestNext("cancel"); - probe.SendNext("c"); - sinkProbe.RequestNext("c"); - probe.SendComplete(); + await probe.SendNextAsync("a"); + await sinkProbe.RequestNextAsync("a"); + await probe.SendNextAsync("b"); + await sinkProbe.RequestNextAsync("b"); + await probe.SendNextAsync("cancel"); + await sinkProbe.RequestNextAsync("cancel"); + await probe.SendNextAsync("c"); + await sinkProbe.RequestNextAsync("c"); + await probe.SendCompleteAsync(); created.Current.Should().Be(2); - sinkProbe.Cancel(); - probe.SendComplete(); + await sinkProbe.CancelAsync(); + await probe.SendCompleteAsync(); }, Materializer); } - [Fact(Skip = "Skipped for async_testkit conversion build")] - public void A_restart_with_backoff_sink_should_backoff_before_restart() + [Fact] + public async Task A_restart_with_backoff_sink_should_backoff_before_restart() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var created = new AtomicCounter(0); var (queue, sinkProbe) = this.SourceProbe().ToMaterialized(this.SinkProbe(), Keep.Both).Run(Materializer); @@ -471,27 +485,27 @@ public void A_restart_with_backoff_sink_should_backoff_before_restart() }, _restartSettings), Keep.Left) .Run(Materializer); - probe.SendNext("a"); - sinkProbe.RequestNext("a"); - probe.SendNext("cancel"); - sinkProbe.RequestNext("cancel"); - probe.SendNext("b"); + await probe.SendNextAsync("a"); + await sinkProbe.RequestNextAsync("a"); + await probe.SendNextAsync("cancel"); + await sinkProbe.RequestNextAsync("cancel"); + await probe.SendNextAsync("b"); var deadline = (_minBackoff - TimeSpan.FromMilliseconds(1)).FromNow(); - sinkProbe.Request(1); - sinkProbe.ExpectNext("b"); + await sinkProbe.RequestAsync(1); + await sinkProbe.ExpectNextAsync("b"); deadline.IsOverdue.Should().BeTrue(); created.Current.Should().Be(2); - sinkProbe.Cancel(); - probe.SendComplete(); + await sinkProbe.CancelAsync(); + await probe.SendCompleteAsync(); }, Materializer); } - [Fact(Skip = "Skipped for async_testkit conversion build")] - public void A_restart_with_backoff_sink_should_reset_exponential_backoff_back_to_minimum_when_source_runs_for_at_least_minimum_backoff_without_completing() + [Fact] + public async Task A_restart_with_backoff_sink_should_reset_exponential_backoff_back_to_minimum_when_source_runs_for_at_least_minimum_backoff_without_completing() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var created = new AtomicCounter(0); var (queue, sinkProbe) = this.SourceProbe().ToMaterialized(this.SinkProbe(), Keep.Both).Run(Materializer); @@ -503,42 +517,48 @@ public void A_restart_with_backoff_sink_should_reset_exponential_backoff_back_to }, _restartSettings), Keep.Left) .Run(Materializer); - probe.SendNext("a"); - sinkProbe.RequestNext("a"); - probe.SendNext("cancel"); - sinkProbe.RequestNext("cancel"); + await probe.SendNextAsync("a"); + await sinkProbe.RequestNextAsync("a"); + await probe.SendNextAsync("cancel"); + await sinkProbe.RequestNextAsync("cancel"); // There should be a minBackoff delay - probe.SendNext("b"); - sinkProbe.RequestNext("b"); - probe.SendNext("cancel"); - sinkProbe.RequestNext("cancel"); - sinkProbe.Request(1); + await probe.SendNextAsync("b"); + await sinkProbe.RequestNextAsync("b"); + await probe.SendNextAsync("cancel"); + + await sinkProbe.AsyncBuilder() + .RequestNext("cancel") + .Request(1) + .ExecuteAsync(); // The probe should now be backing off for 2 * minBackoff // Now wait for the 2 * minBackoff delay to pass, then it will start the new source, we also want to wait for the // subsequent minBackoff min backoff to pass, so it resets the restart count - Thread.Sleep(_minBackoff + TimeSpan.FromTicks(_minBackoff.Ticks * 2) + _minBackoff + TimeSpan.FromMilliseconds(500)); + await Task.Delay(_minBackoff + TimeSpan.FromTicks(_minBackoff.Ticks * 2) + _minBackoff + TimeSpan.FromMilliseconds(500)); - probe.SendNext("cancel"); - sinkProbe.RequestNext("cancel"); + await probe.SendNextAsync("cancel"); + await sinkProbe.RequestNextAsync("cancel"); // We should have reset, so the restart delay should be back to minBackoff, ie we should definitely receive the // next element within < 2 * minBackoff - probe.SendNext("c"); - sinkProbe.Request(1); - sinkProbe.ExpectNext(TimeSpan.FromTicks(2 * _minBackoff.Ticks) - TimeSpan.FromMilliseconds(10), "c"); + await probe.SendNextAsync("c"); + + await sinkProbe.AsyncBuilder() + .Request(1) + .ExpectNext(TimeSpan.FromTicks(2 * _minBackoff.Ticks) - TimeSpan.FromMilliseconds(10), "c") + .ExecuteAsync(); created.Current.Should().Be(4); - sinkProbe.Cancel(); - probe.SendComplete(); + await sinkProbe.CancelAsync(); + await probe.SendCompleteAsync(); }, Materializer); } - [Fact(Skip = "Skipped for async_testkit conversion build")] - public void A_restart_with_backoff_sink_should_not_restart_the_sink_when_completed_while_backing_off() + [Fact] + public async Task A_restart_with_backoff_sink_should_not_restart_the_sink_when_completed_while_backing_off() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var created = new AtomicCounter(0); var (queue, sinkProbe) = this.SourceProbe().ToMaterialized(this.SinkProbe(), Keep.Both).Run(Materializer); @@ -550,25 +570,25 @@ public void A_restart_with_backoff_sink_should_not_restart_the_sink_when_complet }, _restartSettings), Keep.Left) .Run(Materializer); - probe.SendNext("a"); - sinkProbe.RequestNext("a"); - probe.SendNext("cancel"); - sinkProbe.RequestNext("cancel"); + await probe.SendNextAsync("a"); + await sinkProbe.RequestNextAsync("a"); + await probe.SendNextAsync("cancel"); + await sinkProbe.RequestNextAsync("cancel"); // Should be backing off now - probe.SendComplete(); + await probe.SendCompleteAsync(); // Wait to ensure it isn't restarted - Thread.Sleep(_minBackoff + TimeSpan.FromMilliseconds(100)); + await Task.Delay(_minBackoff + TimeSpan.FromMilliseconds(100)); created.Current.Should().Be(1); - sinkProbe.Cancel(); + await sinkProbe.CancelAsync(); }, Materializer); } - [Fact(Skip = "Skipped for async_testkit conversion build")] - public void A_restart_with_backoff_sink_should_not_restart_the_sink_when_maxRestarts_is_reached() + [Fact] + public async Task A_restart_with_backoff_sink_should_not_restart_the_sink_when_maxRestarts_is_reached() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var created = new AtomicCounter(0); var (queue, sinkProbe) = this.SourceProbe().ToMaterialized(this.SinkProbe(), Keep.Both).Run(Materializer); @@ -580,24 +600,24 @@ public void A_restart_with_backoff_sink_should_not_restart_the_sink_when_maxRest }, _shortRestartSettings.WithMaxRestarts(1, _shortMinBackoff)), Keep.Left) .Run(Materializer); - probe.SendNext("cancel"); - sinkProbe.RequestNext("cancel"); - probe.SendNext("cancel"); - sinkProbe.RequestNext("cancel"); + await probe.SendNextAsync("cancel"); + await sinkProbe.RequestNextAsync("cancel"); + await probe.SendNextAsync("cancel"); + await sinkProbe.RequestNextAsync("cancel"); - probe.ExpectCancellation(); + await probe.ExpectCancellationAsync(); created.Current.Should().Be(2); - sinkProbe.Cancel(); - probe.SendComplete(); + await sinkProbe.CancelAsync(); + await probe.SendCompleteAsync(); }, Materializer); } - [Fact(Skip = "Skipped for async_testkit conversion build")] - public void A_restart_with_backoff_sink_should_reset_maxRestarts_when_sink_runs_for_at_least_minimum_backoff_without_completing() + [Fact] + public async Task A_restart_with_backoff_sink_should_reset_maxRestarts_when_sink_runs_for_at_least_minimum_backoff_without_completing() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var created = new AtomicCounter(0); var (queue, sinkProbe) = this.SourceProbe().ToMaterialized(this.SinkProbe(), Keep.Both).Run(Materializer); @@ -609,35 +629,35 @@ public void A_restart_with_backoff_sink_should_reset_maxRestarts_when_sink_runs_ }, _restartSettings.WithMaxRestarts(2, _minBackoff)), Keep.Left) .Run(Materializer); - probe.SendNext("cancel"); - sinkProbe.RequestNext("cancel"); + await probe.SendNextAsync("cancel"); + await sinkProbe.RequestNextAsync("cancel"); // There should be a minBackoff delay - probe.SendNext("cancel"); - sinkProbe.RequestNext("cancel"); + await probe.SendNextAsync("cancel"); + await sinkProbe.RequestNextAsync("cancel"); // The probe should now be backing off for 2 * minBackoff // Now wait for the 2 * minBackoff delay to pass, then it will start the new source, we also want to wait for the // subsequent minBackoff to pass, so it resets the restart count Thread.Sleep(_minBackoff + TimeSpan.FromTicks(_minBackoff.Ticks * 2) + _minBackoff + TimeSpan.FromMilliseconds(500)); - probe.SendNext("cancel"); - sinkProbe.RequestNext("cancel"); + await probe.SendNextAsync("cancel"); + await sinkProbe.RequestNextAsync("cancel"); // We now are able to trigger the third restart, since enough time has elapsed to reset the counter - probe.SendNext("cancel"); - sinkProbe.RequestNext("cancel"); + await probe.SendNextAsync("cancel"); + await sinkProbe.RequestNextAsync("cancel"); created.Current.Should().Be(4); - sinkProbe.Cancel(); - probe.SendComplete(); + await sinkProbe.CancelAsync(); + await probe.SendCompleteAsync(); }, Materializer); } - [Fact(Skip = "Skipped for async_testkit conversion build")] - public void A_restart_with_backoff_sink_should_allow_using_withMaxRestarts_instead_of_minBackoff_to_determine_the_maxRestarts_reset_time() + [Fact] + public async Task A_restart_with_backoff_sink_should_allow_using_withMaxRestarts_instead_of_minBackoff_to_determine_the_maxRestarts_reset_time() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var created = new AtomicCounter(0); var (queue, sinkProbe) = this.SourceProbe().ToMaterialized(this.SinkProbe(), Keep.Both).Run(Materializer); @@ -649,27 +669,29 @@ public void A_restart_with_backoff_sink_should_allow_using_withMaxRestarts_inste }, _shortRestartSettings.WithMaxRestarts(2, TimeSpan.FromSeconds(1))), Keep.Left) .Run(Materializer); - probe.SendNext("cancel"); - sinkProbe.RequestNext("cancel"); + await probe.SendNextAsync("cancel"); + await sinkProbe.RequestNextAsync("cancel"); // There should be a shortMinBackoff delay - probe.SendNext("cancel"); - sinkProbe.RequestNext("cancel"); + await probe.SendNextAsync("cancel"); + await sinkProbe.RequestNextAsync("cancel"); // The probe should now be backing off for 2 * shortMinBackoff - Thread.Sleep(_shortMinBackoff + TimeSpan.FromTicks(_shortMinBackoff.Ticks * 2) + _shortMinBackoff); // if using shortMinBackoff as deadline cause reset + await Task.Delay(_shortMinBackoff + TimeSpan.FromTicks(_shortMinBackoff.Ticks * 2) + _shortMinBackoff); // if using shortMinBackoff as deadline cause reset - probe.SendNext("cancel"); - sinkProbe.RequestNext("cancel"); + await probe.SendNextAsync("cancel"); + await sinkProbe.RequestNextAsync("cancel"); // We cannot get a final element - probe.SendNext("cancel"); - sinkProbe.Request(1); - sinkProbe.ExpectNoMsg(); + await probe.SendNextAsync("cancel"); + await sinkProbe.AsyncBuilder() + .Request(1) + .ExpectNoMsg() + .ExecuteAsync(); created.Current.Should().Be(3); - sinkProbe.Cancel(); - probe.SendComplete(); + await sinkProbe.CancelAsync(); + await probe.SendCompleteAsync(); }, Materializer); } @@ -695,16 +717,12 @@ public void A_restart_with_backoff_sink_should_allow_using_withMaxRestarts_inste bool onlyOnFailures = false) { var created = new AtomicCounter(0); - var probe1 = this.SourceProbe().ToMaterialized(this.SinkProbe(), Keep.Both).Run(Materializer); - var flowInSource = probe1.Item1; - var flowInProbe = probe1.Item2; - var probe2 = this.SourceProbe().ToMaterialized(BroadcastHub.Sink(), Keep.Both).Run(Materializer); - var flowOutProbe = probe2.Item1; - var flowOutSource = probe2.Item2; + var (flowInSource, flowInProbe) = this.SourceProbe().ToMaterialized(this.SinkProbe(), Keep.Both).Run(Materializer); + var (flowOutProbe, flowOutSource) = this.SourceProbe().ToMaterialized(BroadcastHub.Sink(), Keep.Both).Run(Materializer); // We can't just use ordinary probes here because we're expecting them to get started/restarted. Instead, we // simply use the probes as a message bus for feeding and capturing events. - var probe3 = this.SourceProbe().ViaMaterialized(RestartFlowFactory(() => + var (source, sink) = this.SourceProbe().ViaMaterialized(RestartFlowFactory(() => { created.IncrementAndGet(); var snk = Flow.Create() @@ -737,16 +755,14 @@ public void A_restart_with_backoff_sink_should_allow_using_withMaxRestarts_inste return Flow.FromSinkAndSource(snk, src); }, onlyOnFailures, RestartSettings.Create(minBackoff, maxBackoff, 0).WithMaxRestarts(maxRestarts, minBackoff)), Keep.Left) .ToMaterialized(this.SinkProbe(), Keep.Both).Run(Materializer); - var source = probe3.Item1; - var sink = probe3.Item2; return (created, source, flowInProbe, flowOutProbe, sink); } - [Fact(Skip = "Skipped for async_testkit conversion build")] - public void A_restart_with_backoff_flow_should_run_normally() + [Fact] + public async Task A_restart_with_backoff_flow_should_run_normally() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var created = new AtomicCounter(0); var (source, sink) = this.SourceProbe().ViaMaterialized(RestartFlow.WithBackoff(() => @@ -755,23 +771,24 @@ public void A_restart_with_backoff_flow_should_run_normally() return Flow.Create(); }, _shortRestartSettings), Keep.Left).ToMaterialized(this.SinkProbe(), Keep.Both).Run(Materializer); - source.SendNext("a"); - sink.RequestNext("a"); - source.SendNext("b"); - sink.RequestNext("b"); + await source.SendNextAsync("a"); + await sink.RequestNextAsync("a"); + await source.SendNextAsync("b"); + await sink.RequestNextAsync("b"); created.Current.Should().Be(1); - source.SendComplete(); + await source.SendCompleteAsync(); }, Materializer); } - [Fact(Skip = "Skipped for async_testkit conversion build")] - public void Simplified_restart_flow_restarts_stages_test() + [Fact] + public async Task Simplified_restart_flow_restarts_stages_test() { - var created = new AtomicCounter(0); - var restarts = 4; - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { + var created = new AtomicCounter(0); + const int restarts = 4; + var flow = RestartFlowFactory(() => { created.IncrementAndGet(); @@ -798,284 +815,303 @@ public void Simplified_restart_flow_restarts_stages_test() .ToMaterialized(this.SinkProbe(), Keep.Both) .Run(Materializer); - source.SendNext(1); - source.SendNext(2); - source.SendNext(3); - source.SendNext(4); - source.SendNext(5); - for (int i = 0; i < restarts; i++) - { - source.SendNext(6); - } - - source.SendNext(7); - source.SendNext(8); - source.SendNext(9); - source.SendNext(10); - - sink.RequestNext(1); - sink.RequestNext(2); - sink.RequestNext(3); - sink.RequestNext(4); - sink.RequestNext(5); - //6 is never received since RestartFlow's do not retry - sink.RequestNext(7); - sink.RequestNext(8); - sink.RequestNext(9); - sink.RequestNext(10); - - source.SendComplete(); + await source.AsyncBuilder() + .SendNext(new[] { 1, 2, 3, 4, 5 }) + .SendNext(Enumerable.Repeat(6, restarts)) + .SendNext(new[] { 7, 8, 9, 10 }) + .ExecuteAsync(); + + await sink.AsyncBuilder() + //6 is never received since RestartFlow's do not retry + .ExpectNextN(new[] { 1, 2, 3, 4, 5, 7, 8, 9, 10 }, 3.Seconds()) + .ExecuteAsync(); + + await source.SendCompleteAsync(); + + created.Current.Should().Be(restarts + 1); }, Materializer); - created.Current.Should().Be(restarts + 1); } - [Fact(Skip = "Skipped for async_testkit conversion build")] - public void A_restart_with_backoff_flow_should_restart_on_cancellation() + [Fact] + public async Task A_restart_with_backoff_flow_should_restart_on_cancellation() { - var (created, source, flowInProbe, flowOutProbe, sink) = SetupFlow(_shortMinBackoff, _shortMaxBackoff); - - source.SendNext("a"); - flowInProbe.RequestNext("a"); - flowOutProbe.SendNext("b"); - sink.RequestNext("b"); - - source.SendNext("cancel"); - // This will complete the flow in probe and cancel the flow out probe - flowInProbe.Request(2); - ImmutableList.Create(flowInProbe.ExpectNext(), flowInProbe.ExpectNext()).Should() - .Contain(ImmutableList.Create("in complete", "out complete")); - - // and it should restart - source.SendNext("c"); - flowInProbe.RequestNext("c"); - flowOutProbe.SendNext("d"); - sink.RequestNext("d"); - - created.Current.Should().Be(2); + await this.AssertAllStagesStoppedAsync(async () => + { + var (created, source, flowInProbe, flowOutProbe, sink) = SetupFlow(_shortMinBackoff, _shortMaxBackoff); + + await source.SendNextAsync("a"); + await flowInProbe.RequestNextAsync("a"); + await flowOutProbe.SendNextAsync("b"); + await sink.RequestNextAsync("b"); + + await source.SendNextAsync("cancel"); + // This will complete the flow in probe and cancel the flow out probe + await flowInProbe.RequestAsync(2); + (await flowInProbe.ExpectNextNAsync(2, 3.Seconds()).ToListAsync()) + .Should().Contain(new []{"in complete", "out complete"}); + + // and it should restart + await source.SendNextAsync("c"); + await flowInProbe.RequestNextAsync("c"); + await flowOutProbe.SendNextAsync("d"); + await sink.RequestNextAsync("d"); + + created.Current.Should().Be(2); + }, Materializer); } - [Fact(Skip = "Skipped for async_testkit conversion build")] - public void A_restart_with_backoff_flow_should_restart_on_completion() + [Fact] + public async Task A_restart_with_backoff_flow_should_restart_on_completion() { - var (created, source, flowInProbe, flowOutProbe, sink) = SetupFlow(_shortMinBackoff, _shortMaxBackoff); + await this.AssertAllStagesStoppedAsync(async () => + { + var (created, source, flowInProbe, flowOutProbe, sink) = SetupFlow(_shortMinBackoff, _shortMaxBackoff); - source.SendNext("a"); - flowInProbe.RequestNext("a"); - flowOutProbe.SendNext("b"); - sink.RequestNext("b"); + await source.SendNextAsync("a"); + await flowInProbe.RequestNextAsync("a"); + await flowOutProbe.SendNextAsync("b"); + await sink.RequestNextAsync("b"); - sink.Request(1); - flowOutProbe.SendNext("complete"); + await sink.RequestAsync(1); + await flowOutProbe.SendNextAsync("complete"); - // This will complete the flow in probe and cancel the flow out probe - flowInProbe.Request(2); - ImmutableList.Create(flowInProbe.ExpectNext(), flowInProbe.ExpectNext()).Should() - .Contain(ImmutableList.Create("in complete", "out complete")); + // This will complete the flow in probe and cancel the flow out probe + await flowInProbe.RequestAsync(2); + (await flowInProbe.ExpectNextNAsync(2, 3.Seconds()).ToListAsync()) + .Should().Contain(new []{"in complete", "out complete"}); - // and it should restart - source.SendNext("c"); - flowInProbe.RequestNext("c"); - flowOutProbe.SendNext("d"); - sink.RequestNext("d"); + // and it should restart + await source.SendNextAsync("c"); + await flowInProbe.RequestNextAsync("c"); + await flowOutProbe.SendNextAsync("d"); + await sink.RequestNextAsync("d"); - created.Current.Should().Be(2); + created.Current.Should().Be(2); + }, Materializer); } - [Fact(Skip = "Skipped for async_testkit conversion build")] - public void A_restart_with_backoff_flow_should_restart_on_failure() + [Fact] + public async Task A_restart_with_backoff_flow_should_restart_on_failure() { - var (created, source, flowInProbe, flowOutProbe, sink) = SetupFlow(_shortMinBackoff, _shortMaxBackoff); + await this.AssertAllStagesStoppedAsync(async () => + { + var (created, source, flowInProbe, flowOutProbe, sink) = SetupFlow(_shortMinBackoff, _shortMaxBackoff); - source.SendNext("a"); - flowInProbe.RequestNext("a"); - flowOutProbe.SendNext("b"); - sink.RequestNext("b"); + await source.SendNextAsync("a"); + await flowInProbe.RequestNextAsync("a"); + await flowOutProbe.SendNextAsync("b"); + await sink.RequestNextAsync("b"); - sink.Request(1); - flowOutProbe.SendNext("error"); + await sink.RequestAsync(1); + await flowOutProbe.SendNextAsync("error"); - // This should complete the in probe - flowInProbe.RequestNext("in complete"); + // This should complete the in probe + await flowInProbe.RequestNextAsync("in complete"); - // and it should restart - source.SendNext("c"); - flowInProbe.RequestNext("c"); - flowOutProbe.SendNext("d"); - sink.RequestNext("d"); + // and it should restart + await source.SendNextAsync("c"); + await flowInProbe.RequestNextAsync("c"); + await flowOutProbe.SendNextAsync("d"); + await sink.RequestNextAsync("d"); - created.Current.Should().Be(2); + created.Current.Should().Be(2); + }, Materializer); } - [Fact(Skip = "Skipped for async_testkit conversion build")] - public void A_restart_with_backoff_flow_should_backoff_before_restart() + [Fact] + public async Task A_restart_with_backoff_flow_should_backoff_before_restart() { - var (created, source, flowInProbe, flowOutProbe, sink) = SetupFlow(_minBackoff, _maxBackoff); - - source.SendNext("a"); - flowInProbe.RequestNext("a"); - flowOutProbe.SendNext("b"); - sink.RequestNext("b"); - - // we need to start counting time before we issue the cancel signal, - // as starting the counter anywhere after the cancel signal, might not - // capture all of the time, that has been spent for the backoff. - var deadline = _minBackoff.FromNow(); - - source.SendNext("cancel"); - // This will complete the flow in probe and cancel the flow out probe - flowInProbe.Request(2); - ImmutableList.Create(flowInProbe.ExpectNext(), flowInProbe.ExpectNext()).Should() - .Contain(ImmutableList.Create("in complete", "out complete")); - - source.SendNext("c"); - flowInProbe.Request(1); - flowInProbe.ExpectNext("c"); - deadline.IsOverdue.Should().BeTrue(); - - created.Current.Should().Be(2); + await this.AssertAllStagesStoppedAsync(async () => + { + var (created, source, flowInProbe, flowOutProbe, sink) = SetupFlow(_minBackoff, _maxBackoff); + + await source.SendNextAsync("a"); + await flowInProbe.RequestNextAsync("a"); + await flowOutProbe.SendNextAsync("b"); + await sink.RequestNextAsync("b"); + + // we need to start counting time before we issue the cancel signal, + // as starting the counter anywhere after the cancel signal, might not + // capture all of the time, that has been spent for the backoff. + var deadline = _minBackoff.FromNow(); + + await source.SendNextAsync("cancel"); + // This will complete the flow in probe and cancel the flow out probe + await flowInProbe.RequestAsync(2); + (await flowInProbe.ExpectNextNAsync(2, 3.Seconds()).ToListAsync()) + .Should().Contain(new []{"in complete", "out complete"}); + + await source.SendNextAsync("c"); + await flowInProbe.RequestAsync(1); + await flowInProbe.ExpectNextAsync("c"); + deadline.IsOverdue.Should().BeTrue(); + + created.Current.Should().Be(2); + }, Materializer); } - [Fact(Skip = "Skipped for async_testkit conversion build")] - public void A_restart_with_backoff_flow_should_continue_running_flow_out_port_after_in_has_been_sent_completion() + [Fact] + public async Task A_restart_with_backoff_flow_should_continue_running_flow_out_port_after_in_has_been_sent_completion() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var (created, source, flowInProbe, flowOutProbe, sink) = SetupFlow(_shortMinBackoff, _maxBackoff); - source.SendNext("a"); - flowInProbe.RequestNext("a"); - flowOutProbe.SendNext("b"); - sink.RequestNext("b"); + await source.SendNextAsync("a"); + await flowInProbe.RequestNextAsync("a"); + await flowOutProbe.SendNextAsync("b"); + await sink.RequestNextAsync("b"); - source.SendComplete(); - flowInProbe.RequestNext("in complete"); + await source.SendCompleteAsync(); + await flowInProbe.RequestNextAsync("in complete"); - flowOutProbe.SendNext("c"); - sink.RequestNext("c"); - flowOutProbe.SendNext("d"); - sink.RequestNext("d"); + await flowOutProbe.SendNextAsync("c"); + await sink.RequestNextAsync("c"); + await flowOutProbe.SendNextAsync("d"); + await sink.RequestNextAsync("d"); - sink.Request(1); - flowOutProbe.SendComplete(); - flowInProbe.RequestNext("out complete"); - sink.ExpectComplete(); + await sink.RequestAsync(1); + await flowOutProbe.SendCompleteAsync(); + await flowInProbe.RequestNextAsync("out complete"); + await sink.ExpectCompleteAsync(); created.Current.Should().Be(1); }, Materializer); } - [Fact(Skip = "Skipped for async_testkit conversion build")] - public void A_restart_with_backoff_flow_should_continue_running_flow_in_port_after_out_has_been_cancelled() + [Fact] + public async Task A_restart_with_backoff_flow_should_continue_running_flow_in_port_after_out_has_been_cancelled() { - var (created, source, flowInProbe, flowOutProbe, sink) = SetupFlow(_shortMinBackoff, _maxBackoff); + await this.AssertAllStagesStoppedAsync(async () => + { + var (created, source, flowInProbe, flowOutProbe, sink) = SetupFlow(_shortMinBackoff, _maxBackoff); - source.SendNext("a"); - flowInProbe.RequestNext("a"); - flowOutProbe.SendNext("b"); - sink.RequestNext("b"); + await source.SendNextAsync("a"); + await flowInProbe.RequestNextAsync("a"); + await flowOutProbe.SendNextAsync("b"); + await sink.RequestNextAsync("b"); - sink.Cancel(); - flowInProbe.RequestNext("out complete"); + await sink.CancelAsync(); + await flowInProbe.RequestNextAsync("out complete"); - source.SendNext("c"); - flowInProbe.RequestNext("c"); - source.SendNext("d"); - flowInProbe.RequestNext("d"); + await source.SendNextAsync("c"); + await flowInProbe.RequestNextAsync("c"); + await source.SendNextAsync("d"); + await flowInProbe.RequestNextAsync("d"); - source.SendNext("cancel"); - flowInProbe.RequestNext("in complete"); - source.ExpectCancellation(); + await source.SendNextAsync("cancel"); + await flowInProbe.RequestNextAsync("in complete"); + await source.ExpectCancellationAsync(); - created.Current.Should().Be(1); + created.Current.Should().Be(1); + }, Materializer); } - [Fact(Skip = "Skipped for async_testkit conversion build")] - public void A_restart_with_backoff_flow_should_not_restart_on_completion_when_maxRestarts_is_reached() + [Fact] + public async Task A_restart_with_backoff_flow_should_not_restart_on_completion_when_maxRestarts_is_reached() { - var (created, _, flowInProbe, flowOutProbe, sink) = SetupFlow(_shortMinBackoff, _shortMaxBackoff, maxRestarts: 1); + await this.AssertAllStagesStoppedAsync(async () => + { + var (created, _, flowInProbe, flowOutProbe, sink) = SetupFlow(_shortMinBackoff, _shortMaxBackoff, maxRestarts: 1); - sink.Request(1); - flowOutProbe.SendNext("complete"); + await sink.RequestAsync(1); + await flowOutProbe.SendNextAsync("complete"); - // This will complete the flow in probe and cancel the flow out probe - flowInProbe.Request(2); - ImmutableList.Create(flowInProbe.ExpectNext(), flowInProbe.ExpectNext()).Should() - .Contain(ImmutableList.Create("in complete", "out complete")); + // This will complete the flow in probe and cancel the flow out probe + await flowInProbe.RequestAsync(2); + (await flowInProbe.ExpectNextNAsync(2, 3.Seconds()).ToListAsync()) + .Should().Contain(new []{"in complete", "out complete"}); - // and it should restart - sink.Request(1); - flowOutProbe.SendNext("complete"); + // and it should restart + await sink.RequestAsync(1); + await flowOutProbe.SendNextAsync("complete"); - // This will complete the flow in probe and cancel the flow out probe - flowInProbe.Request(2); - flowInProbe.ExpectNext("out complete"); - flowInProbe.ExpectNoMsg(TimeSpan.FromTicks(_shortMinBackoff.Ticks * 3)); - sink.ExpectComplete(); + // This will complete the flow in probe and cancel the flow out probe + await flowInProbe.AsyncBuilder() + .Request(2) + .ExpectNext("out complete") + .ExpectNoMsg(TimeSpan.FromTicks(_shortMinBackoff.Ticks * 3)) + .ExecuteAsync(); + await sink.ExpectCompleteAsync(); - created.Current.Should().Be(2); + created.Current.Should().Be(2); + + }, Materializer); } // onlyOnFailures - [Fact(Skip = "Skipped for async_testkit conversion build")] - public void A_restart_with_backoff_flow_should_stop_on_cancellation_when_using_onlyOnFailuresWithBackoff() + [Fact] + public async Task A_restart_with_backoff_flow_should_stop_on_cancellation_when_using_onlyOnFailuresWithBackoff() { - var (created, source, flowInProbe, flowOutProbe, sink) = SetupFlow(_shortMinBackoff, _shortMaxBackoff, -1, true); + await this.AssertAllStagesStoppedAsync(async () => + { + var (created, source, flowInProbe, flowOutProbe, sink) = SetupFlow(_shortMinBackoff, _shortMaxBackoff, -1, true); - source.SendNext("a"); - flowInProbe.RequestNext("a"); - flowOutProbe.SendNext("b"); - sink.RequestNext("b"); + await source.SendNextAsync("a"); + await flowInProbe.RequestNextAsync("a"); + await flowOutProbe.SendNextAsync("b"); + await sink.RequestNextAsync("b"); - source.SendNext("cancel"); - // This will complete the flow in probe and cancel the flow out probe - flowInProbe.Request(2); - flowInProbe.ExpectNext("in complete"); + await source.SendNextAsync("cancel"); + // This will complete the flow in probe and cancel the flow out probe + await flowInProbe.AsyncBuilder() + .Request(2) + .ExpectNext("in complete") + .ExecuteAsync(); - source.ExpectCancellation(); + await source.ExpectCancellationAsync(); - created.Current.Should().Be(1); + created.Current.Should().Be(1); + }, Materializer); } - [Fact(Skip = "Skipped for async_testkit conversion build")] - public void A_restart_with_backoff_flow_should_stop_on_completion_when_using_onlyOnFailuresWithBackoff() + [Fact] + public async Task A_restart_with_backoff_flow_should_stop_on_completion_when_using_onlyOnFailuresWithBackoff() { - var (created, source, flowInProbe, flowOutProbe, sink) = SetupFlow(_shortMinBackoff, _shortMaxBackoff, -1, true); + await this.AssertAllStagesStoppedAsync(async () => + { + var (created, source, flowInProbe, flowOutProbe, sink) = SetupFlow(_shortMinBackoff, _shortMaxBackoff, -1, true); - source.SendNext("a"); - flowInProbe.RequestNext("a"); - flowOutProbe.SendNext("b"); - sink.RequestNext("b"); + await source.SendNextAsync("a"); + await flowInProbe.RequestNextAsync("a"); + await flowOutProbe.SendNextAsync("b"); + await sink.RequestNextAsync("b"); - flowOutProbe.SendNext("complete"); - sink.Request(1); - sink.ExpectComplete(); + await flowOutProbe.SendNextAsync("complete"); + await sink.RequestAsync(1); + await sink.ExpectCompleteAsync(); - created.Current.Should().Be(1); + created.Current.Should().Be(1); + }, Materializer); } - [Fact(Skip = "Skipped for async_testkit conversion build")] - public void A_restart_with_backoff_flow_should_restart_on_failure_when_using_onlyOnFailuresWithBackoff() + [Fact] + public async Task A_restart_with_backoff_flow_should_restart_on_failure_when_using_onlyOnFailuresWithBackoff() { - var (created, source, flowInProbe, flowOutProbe, sink) = SetupFlow(_shortMinBackoff, _shortMaxBackoff, -1, true); - - source.SendNext("a"); - flowInProbe.RequestNext("a"); - flowOutProbe.SendNext("b"); - sink.RequestNext("b"); - - sink.Request(1); - flowOutProbe.SendNext("error"); - - // This should complete the in probe - flowInProbe.RequestNext("in complete"); - - // and it should restart - source.SendNext("c"); - flowInProbe.RequestNext("c"); - flowOutProbe.SendNext("d"); - sink.RequestNext("d"); - sink.Request(1); - created.Current.Should().Be(2); + await this.AssertAllStagesStoppedAsync(async () => + { + var (created, source, flowInProbe, flowOutProbe, sink) = SetupFlow(_shortMinBackoff, _shortMaxBackoff, -1, true); + + await source.SendNextAsync("a"); + await flowInProbe.RequestNextAsync("a"); + await flowOutProbe.SendNextAsync("b"); + await sink.RequestNextAsync("b"); + + await sink.RequestAsync(1); + await flowOutProbe.SendNextAsync("error"); + + // This should complete the in probe + await flowInProbe.RequestNextAsync("in complete"); + + // and it should restart + await source.SendNextAsync("c"); + await flowInProbe.RequestNextAsync("c"); + await flowOutProbe.SendNextAsync("d"); + await sink.RequestNextAsync("d"); + await sink.RequestAsync(1); + await sink.ExpectCompleteAsync(); + created.Current.Should().Be(2); + }, Materializer); } } } \ No newline at end of file