diff --git a/src/core/Akka.Streams.Tests/Dsl/HubSpec.cs b/src/core/Akka.Streams.Tests/Dsl/HubSpec.cs index e4dc3ee41ec..cb23df85335 100644 --- a/src/core/Akka.Streams.Tests/Dsl/HubSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/HubSpec.cs @@ -9,7 +9,6 @@ using System; using System.Collections.Generic; using System.Linq; -using System.Threading; using System.Threading.Tasks; using Akka.Streams.Dsl; using Akka.Streams.TestKit; @@ -17,95 +16,97 @@ using FluentAssertions; using Xunit; using Akka.Actor; +using Akka.TestKit.Extensions; using Akka.Util.Internal; +using FluentAssertions.Extensions; using Xunit.Abstractions; +using static FluentAssertions.FluentActions; namespace Akka.Streams.Tests.Dsl { public class HubSpec : AkkaSpec { - public HubSpec(ITestOutputHelper helper) : base(helper) + public HubSpec(ITestOutputHelper helper) : base(FullDebugConfig, helper) { Materializer = Sys.Materializer(); } - public ActorMaterializer Materializer { get; } + private ActorMaterializer Materializer { get; } [Fact] - public void MergeHub_must_work_in_the_happy_case() + public async Task MergeHub_must_work_in_the_happy_case() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { - var t = MergeHub.Source(16).Take(20).ToMaterialized(Sink.Seq(), Keep.Both).Run(Materializer); - var sink = t.Item1; - var result = t.Item2; + var (sink, task) = MergeHub.Source(16).Take(20).ToMaterialized(Sink.Seq(), Keep.Both).Run(Materializer); Source.From(Enumerable.Range(1, 10)).RunWith(sink, Materializer); Source.From(Enumerable.Range(11, 10)).RunWith(sink, Materializer); - result.AwaitResult().OrderBy(x => x).Should().BeEquivalentTo(Enumerable.Range(1, 20)); + var result = await task.ShouldCompleteWithin(3.Seconds()); + result.OrderBy(x => x).Should().BeEquivalentTo(Enumerable.Range(1, 20)); }, Materializer); } [Fact] - public void MergeHub_must_notify_new_producers_if_consumer_cancels_before_first_producer() + public async Task MergeHub_must_notify_new_producers_if_consumer_cancels_before_first_producer() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var sink = Sink.Cancelled().RunWith(MergeHub.Source(16), Materializer); var upstream = this.CreatePublisherProbe(); Source.FromPublisher(upstream).RunWith(sink, Materializer); - upstream.ExpectCancellation(); + await upstream.ExpectCancellationAsync(); }, Materializer); } [Fact] - public void MergeHub_must_notify_existing_producers_if_consumer_cancels_after_a_few_elements() + public async Task MergeHub_must_notify_existing_producers_if_consumer_cancels_after_a_few_elements() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { - var t = MergeHub.Source(16).Take(5).ToMaterialized(Sink.Seq(), Keep.Both).Run(Materializer); - var sink = t.Item1; - var result = t.Item2; + var (sink, task) = MergeHub.Source(16).Take(5).ToMaterialized(Sink.Seq(), Keep.Both).Run(Materializer); var upstream = this.CreatePublisherProbe(); Source.FromPublisher(upstream).RunWith(sink, Materializer); - for (var i = 1; i < 6; i++) - upstream.SendNext(i); - - upstream.ExpectCancellation(); - result.AwaitResult().Should().BeEquivalentTo(Enumerable.Range(1, 5)); + await upstream.AsyncBuilder() + .SendNext(Enumerable.Range(1, 5)) + .ExpectCancellation() + .ExecuteAsync(); + + var result = await task.ShouldCompleteWithin(3.Seconds()); + result.Should().BeEquivalentTo(Enumerable.Range(1, 5)); }, Materializer); } [Fact] - public void MergeHub_must_notify_new_producers_if_consumer_cancels_after_a_few_elements() + public async Task MergeHub_must_notify_new_producers_if_consumer_cancels_after_a_few_elements() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { - var t = MergeHub.Source(16).Take(5).ToMaterialized(Sink.Seq(), Keep.Both).Run(Materializer); - var sink = t.Item1; - var result = t.Item2; + var (sink, task) = MergeHub.Source(16).Take(5).ToMaterialized(Sink.Seq(), Keep.Both).Run(Materializer); var upstream1 = this.CreatePublisherProbe(); var upstream2 = this.CreatePublisherProbe(); Source.FromPublisher(upstream1).RunWith(sink, Materializer); - for (var i = 1; i < 6; i++) - upstream1.SendNext(i); + await upstream1.AsyncBuilder() + .SendNext(Enumerable.Range(1, 5)) + .ExpectCancellation() + .ExecuteAsync(); - upstream1.ExpectCancellation(); - result.AwaitResult().Should().BeEquivalentTo(Enumerable.Range(1, 5)); + var result = await task.ShouldCompleteWithin(3.Seconds()); + result.Should().BeEquivalentTo(Enumerable.Range(1, 5)); Source.FromPublisher(upstream2).RunWith(sink, Materializer); - upstream2.ExpectCancellation(); + await upstream2.ExpectCancellationAsync(); }, Materializer); } [Fact] - public void MergeHub_must_respect_the_buffer_size() + public async Task MergeHub_must_respect_the_buffer_size() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var downstream = this.CreateManualSubscriberProbe(); var sink = Sink.FromSubscriber(downstream).RunWith(MergeHub.Source(3), Materializer); @@ -116,28 +117,28 @@ public void MergeHub_must_respect_the_buffer_size() return i; }).RunWith(sink, Materializer); - var sub = downstream.ExpectSubscription(); + var sub = await downstream.ExpectSubscriptionAsync(); sub.Request(1); // Demand starts from 3 - ExpectMsg(1); - ExpectMsg(2); - ExpectMsg(3); - ExpectNoMsg(TimeSpan.FromMilliseconds(100)); + await ExpectMsgAsync(1); + await ExpectMsgAsync(2); + await ExpectMsgAsync(3); + await ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100)); // One element consumed (it was requested), demand 0 remains at producer - downstream.ExpectNext(1); + await downstream.ExpectNextAsync(1); // Requesting next element, results in next element to be consumed. sub.Request(1); - downstream.ExpectNext(2); + await downstream.ExpectNextAsync(2); // Two elements have been consumed, so threshold of 2 is reached, additional 2 demand is dispatched. // There is 2 demand at the producer now - ExpectMsg(4); - ExpectMsg(5); - ExpectNoMsg(TimeSpan.FromMilliseconds(100)); + await ExpectMsgAsync(4); + await ExpectMsgAsync(5); + await ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100)); // Two additional elements have been sent: // - 3, 4, 5 are pending @@ -147,97 +148,93 @@ public void MergeHub_must_respect_the_buffer_size() // Requesting next gives the next element // Demand is not yet refreshed for the producer as there is one more element until threshold is met sub.Request(1); - downstream.ExpectNext(3); + await downstream.ExpectNextAsync(3); - ExpectNoMsg(TimeSpan.FromMilliseconds(100)); + await ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100)); sub.Request(1); - downstream.ExpectNext(4); - ExpectMsg(6); - ExpectMsg(7); + await downstream.ExpectNextAsync(4); + await ExpectMsgAsync(6); + await ExpectMsgAsync(7); sub.Cancel(); }, Materializer); } [Fact] - public void MergeHub_must_work_with_long_streams() + public async Task MergeHub_must_work_with_long_streams() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { - var t = MergeHub.Source(16).Take(20000).ToMaterialized(Sink.Seq(), Keep.Both) + var (sink, task) = MergeHub.Source(16).Take(20000).ToMaterialized(Sink.Seq(), Keep.Both) .Run(Materializer); - var sink = t.Item1; - var result = t.Item2; Source.From(Enumerable.Range(1, 10000)).RunWith(sink, Materializer); Source.From(Enumerable.Range(10001, 10000)).RunWith(sink, Materializer); - result.AwaitResult().OrderBy(x => x).Should().BeEquivalentTo(Enumerable.Range(1, 20000)); + var result = await task.ShouldCompleteWithin(3.Seconds()); + result.OrderBy(x => x).Should().BeEquivalentTo(Enumerable.Range(1, 20000)); }, Materializer); } [Fact] - public void MergeHub_must_work_with_long_streams_when_buffer_size_is_1() + public async Task MergeHub_must_work_with_long_streams_when_buffer_size_is_1() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { - var t = MergeHub.Source(1).Take(20000).ToMaterialized(Sink.Seq(), Keep.Both) + var (sink, task) = MergeHub.Source(1).Take(20000).ToMaterialized(Sink.Seq(), Keep.Both) .Run(Materializer); - var sink = t.Item1; - var result = t.Item2; Source.From(Enumerable.Range(1, 10000)).RunWith(sink, Materializer); Source.From(Enumerable.Range(10001, 10000)).RunWith(sink, Materializer); - result.AwaitResult().OrderBy(x => x).Should().BeEquivalentTo(Enumerable.Range(1, 20000)); + var result = await task.ShouldCompleteWithin(3.Seconds()); + result.OrderBy(x => x).Should().BeEquivalentTo(Enumerable.Range(1, 20000)); }, Materializer); } - [Fact(Skip = "Skipped for async_testkit conversion build")] - public void MergeHub_must_work_with_long_streams_when_consumer_is_slower() + [Fact] + public async Task MergeHub_must_work_with_long_streams_when_consumer_is_slower() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { - var t = MergeHub.Source(16) + var (sink, task) = MergeHub.Source(16) .Take(2000) .Throttle(10, TimeSpan.FromMilliseconds(1), 200, ThrottleMode.Shaping) .ToMaterialized(Sink.Seq(), Keep.Both) .Run(Materializer); - var sink = t.Item1; - var result = t.Item2; Source.From(Enumerable.Range(1, 1000)).RunWith(sink, Materializer); Source.From(Enumerable.Range(1001, 1000)).RunWith(sink, Materializer); - result.AwaitResult().OrderBy(x => x).Should().BeEquivalentTo(Enumerable.Range(1, 2000)); + var result = await task.ShouldCompleteWithin(3.Seconds()); + result.OrderBy(x => x).Should().BeEquivalentTo(Enumerable.Range(1, 2000)); }, Materializer); } [Fact] - public void MergeHub_must_work_with_long_streams_if_one_of_the_producers_is_slower() + public async Task MergeHub_must_work_with_long_streams_if_one_of_the_producers_is_slower() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { - var t = MergeHub.Source(16).Take(2000).ToMaterialized(Sink.Seq(), Keep.Both) + var (sink, task) = MergeHub.Source(16).Take(2000).ToMaterialized(Sink.Seq(), Keep.Both) .Run(Materializer); - var sink = t.Item1; - var result = t.Item2; Source.From(Enumerable.Range(1, 1000)) .Throttle(10, TimeSpan.FromMilliseconds(1), 100, ThrottleMode.Shaping) .RunWith(sink, Materializer); Source.From(Enumerable.Range(1001, 1000)).RunWith(sink, Materializer); - result.AwaitResult().OrderBy(x => x).Should().BeEquivalentTo(Enumerable.Range(1, 2000)); + var result = await task.ShouldCompleteWithin(3.Seconds()); + result.OrderBy(x => x).Should().BeEquivalentTo(Enumerable.Range(1, 2000)); }, Materializer); } [Fact] - public void MergeHub_must_work_with_different_producers_separated_over_time() + public async Task MergeHub_must_work_with_different_producers_separated_over_time() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var downstream = this.CreateSubscriberProbe>(); var sink = MergeHub.Source(16) @@ -245,179 +242,238 @@ public void MergeHub_must_work_with_different_producers_separated_over_time() .ToMaterialized(Sink.FromSubscriber(downstream), Keep.Left) .Run(Materializer); Source.From(Enumerable.Range(1, 100)).RunWith(sink, Materializer); - downstream.RequestNext().Should().BeEquivalentTo(Enumerable.Range(1, 100)); + (await downstream.RequestNextAsync()).Should().BeEquivalentTo(Enumerable.Range(1, 100)); Source.From(Enumerable.Range(101, 100)).RunWith(sink, Materializer); - downstream.RequestNext().Should().BeEquivalentTo(Enumerable.Range(101, 100)); + (await downstream.RequestNextAsync()).Should().BeEquivalentTo(Enumerable.Range(101, 100)); - downstream.Cancel(); + await downstream.CancelAsync(); }, Materializer); } [Fact] - public void MergeHub_must_keep_working_even_if_one_of_the_producers_fail() + public async Task MergeHub_must_keep_working_even_if_one_of_the_producers_fail() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { - var t = MergeHub.Source(16).Take(10).ToMaterialized(Sink.Seq(), Keep.Both).Run(Materializer); - var sink = t.Item1; - var result = t.Item2; + var (sink, task) = MergeHub.Source(16).Take(10).ToMaterialized(Sink.Seq(), Keep.Both).Run(Materializer); - EventFilter.Error(contains: "Upstream producer failed with exception").ExpectOne(() => + await EventFilter.Error(contains: "Upstream producer failed with exception").ExpectOneAsync(() => { Source.Failed(new TestException("failing")).RunWith(sink, Materializer); Source.From(Enumerable.Range(1, 10)).RunWith(sink, Materializer); }); - result.AwaitResult().Should().BeEquivalentTo(Enumerable.Range(1, 10)); + var result = await task.ShouldCompleteWithin(3.Seconds()); + result.Should().BeEquivalentTo(Enumerable.Range(1, 10)); }, Materializer); } [Fact] - public void BroadcastHub_must_work_in_the_happy_case() + public async Task BroadcastHub_must_work_in_the_happy_case() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var source = Source.From(Enumerable.Range(1, 10)).RunWith(BroadcastHub.Sink(8), Materializer); - source.RunWith(Sink.Seq(), Materializer) - .AwaitResult() - .Should().BeEquivalentTo(Enumerable.Range(1, 10)); + var result = await source.RunWith(Sink.Seq(), Materializer).ShouldCompleteWithin(3.Seconds()); + result.Should().BeEquivalentTo(Enumerable.Range(1, 10)); }, Materializer); } [Fact] - public void BroadcastHub_must_send_the_same_elements_to_consumers_attaching_around_the_same_time() + public async Task BroadcastHub_must_send_the_same_elements_to_consumers_attaching_around_the_same_time() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var other = Source.From(Enumerable.Range(2, 9)) .MapMaterializedValue>(_ => null); - var t = Source.Maybe() + var (firstElement, source) = Source.Maybe() .Concat(other) .ToMaterialized(BroadcastHub.Sink(8), Keep.Both) .Run(Materializer); - var firstElement = t.Item1; - var source = t.Item2; + /* + // Original code var f1 = source.RunWith(Sink.Seq(), Materializer); var f2 = source.RunWith(Sink.Seq(), Materializer); // Ensure subscription of Sinks. This is racy but there is no event we can hook into here. - Thread.Sleep(500); + await Task.Delay(500); + firstElement.SetResult(1); - f1.AwaitResult().Should().BeEquivalentTo(Enumerable.Range(1, 10)); - f2.AwaitResult().Should().BeEquivalentTo(Enumerable.Range(1, 10)); + (await f1.ShouldCompleteWithin(3.Seconds())).Should().BeEquivalentTo(Enumerable.Range(1, 10)); + (await f2.ShouldCompleteWithin(3.Seconds())).Should().BeEquivalentTo(Enumerable.Range(1, 10)); + */ + + var f1 = source.RunWith(this.SinkProbe(), Materializer); + var f2 = source.RunWith(this.SinkProbe(), Materializer); + + // Ensure subscription of Sinks. + await Task.WhenAll( + f1.EnsureSubscriptionAsync(), + f2.EnsureSubscriptionAsync()) + .ShouldCompleteWithin(3.Seconds()); + + firstElement.SetResult(1); + (await f1.ToStrictAsync(3.Seconds()).ToListAsync()).Should().BeEquivalentTo(Enumerable.Range(1, 10)); + (await f2.ToStrictAsync(3.Seconds()).ToListAsync()).Should().BeEquivalentTo(Enumerable.Range(1, 10)); + }, Materializer); } [Fact] - public void BroadcastHub_must_send_the_same_prefix_to_consumers_attaching_around_the_same_time_if_one_cancels_earlier() + public async Task BroadcastHub_must_send_the_same_prefix_to_consumers_attaching_around_the_same_time_if_one_cancels_earlier() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var other = Source.From(Enumerable.Range(2, 19)) .MapMaterializedValue>(_ => null); - var t = Source.Maybe() + var (firstElement, source) = Source.Maybe() .Concat(other) .ToMaterialized(BroadcastHub.Sink(8), Keep.Both) .Run(Materializer); - var firstElement = t.Item1; - var source = t.Item2; + /* + // Original code var f1 = source.RunWith(Sink.Seq(), Materializer); var f2 = source.Take(10).RunWith(Sink.Seq(), Materializer); // Ensure subscription of Sinks. This is racy but there is no event we can hook into here. - Thread.Sleep(500); + await Task.Delay(500); + + firstElement.SetResult(1); + (await f1.ShouldCompleteWithin(3.Seconds())).Should().BeEquivalentTo(Enumerable.Range(1, 20)); + (await f2.ShouldCompleteWithin(3.Seconds())).Should().BeEquivalentTo(Enumerable.Range(1, 10)); + */ + + var f1 = source.RunWith(this.SinkProbe(), Materializer); + var f2 = source.Take(10).RunWith(this.SinkProbe(), Materializer); + + // Ensure subscription of Sinks. + await Task.WhenAll( + f1.EnsureSubscriptionAsync(), + f2.EnsureSubscriptionAsync()) + .ShouldCompleteWithin(3.Seconds()); + firstElement.SetResult(1); - f1.AwaitResult().Should().BeEquivalentTo(Enumerable.Range(1, 20)); - f2.AwaitResult().Should().BeEquivalentTo(Enumerable.Range(1, 10)); + (await f1.ToStrictAsync(3.Seconds()).ToListAsync()).Should().BeEquivalentTo(Enumerable.Range(1, 20)); + (await f2.ToStrictAsync(3.Seconds()).ToListAsync()).Should().BeEquivalentTo(Enumerable.Range(1, 10)); + }, Materializer); } [Fact] - public void BroadcastHub_must_ensure_that_subsequent_consumers_see_subsequent_elements_without_gap() + public async Task BroadcastHub_must_ensure_that_subsequent_consumers_see_subsequent_elements_without_gap() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var source = Source.From(Enumerable.Range(1, 20)).RunWith(BroadcastHub.Sink(8), Materializer); - source.Take(10) - .RunWith(Sink.Seq(), Materializer) - .AwaitResult() + (await source.Take(10).RunWith(Sink.Seq(), Materializer).ShouldCompleteWithin(3.Seconds())) .Should().BeEquivalentTo(Enumerable.Range(1, 10)); - source.Take(10) - .RunWith(Sink.Seq(), Materializer) - .AwaitResult() + (await source.Take(10).RunWith(Sink.Seq(), Materializer).ShouldCompleteWithin(3.Seconds())) .Should().BeEquivalentTo(Enumerable.Range(11, 10)); }, Materializer); } [Fact] - public void BroadcastHub_must_send_the_same_elements_to_consumers_of_different_speed_attaching_around_the_same_time() + public async Task BroadcastHub_must_send_the_same_elements_to_consumers_of_different_speed_attaching_around_the_same_time() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var other = Source.From(Enumerable.Range(2, 9)) .MapMaterializedValue>(_ => null); - var t = Source.Maybe() + var (firstElement, source) = Source.Maybe() .Concat(other) .ToMaterialized(BroadcastHub.Sink(8), Keep.Both) .Run(Materializer); - var firstElement = t.Item1; - var source = t.Item2; + /* + // Original code var f1 = source.Throttle(1, TimeSpan.FromMilliseconds(10), 3, ThrottleMode.Shaping) .RunWith(Sink.Seq(), Materializer); var f2 = source.RunWith(Sink.Seq(), Materializer); // Ensure subscription of Sinks. This is racy but there is no event we can hook into here. - Thread.Sleep(500); + await Task.Delay(500); + firstElement.SetResult(1); - f1.AwaitResult().Should().BeEquivalentTo(Enumerable.Range(1, 10)); - f2.AwaitResult().Should().BeEquivalentTo(Enumerable.Range(1, 10)); + (await f1.ShouldCompleteWithin(3.Seconds())).Should().BeEquivalentTo(Enumerable.Range(1, 10)); + (await f2.ShouldCompleteWithin(3.Seconds())).Should().BeEquivalentTo(Enumerable.Range(1, 10)); + */ + + var f1 = source.Throttle(1, TimeSpan.FromMilliseconds(10), 3, ThrottleMode.Shaping) + .RunWith(this.SinkProbe(), Materializer); + var f2 = source.RunWith(this.SinkProbe(), Materializer); + + // Ensure subscription of Sinks. + await Task.WhenAll( + f1.EnsureSubscriptionAsync(), + f2.EnsureSubscriptionAsync()) + .ShouldCompleteWithin(3.Seconds()); + + firstElement.SetResult(1); + (await f1.ToStrictAsync(3.Seconds()).ToListAsync()).Should().BeEquivalentTo(Enumerable.Range(1, 10)); + (await f2.ToStrictAsync(3.Seconds()).ToListAsync()).Should().BeEquivalentTo(Enumerable.Range(1, 10)); + }, Materializer); } [Fact] - public void BroadcastHub_must_send_the_same_elements_to_consumers_of_attaching_around_the_same_time_if_the_producer_is_slow() + public async Task BroadcastHub_must_send_the_same_elements_to_consumers_of_attaching_around_the_same_time_if_the_producer_is_slow() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var other = Source.From(Enumerable.Range(2, 9)) .MapMaterializedValue>(_ => null); - var t = Source.Maybe() + var (firstElement, source) = Source.Maybe() .Concat(other) .Throttle(1, TimeSpan.FromMilliseconds(10), 3, ThrottleMode.Shaping) .ToMaterialized(BroadcastHub.Sink(8), Keep.Both) .Run(Materializer); - var firstElement = t.Item1; - var source = t.Item2; + /* + // Original code var f1 = source.RunWith(Sink.Seq(), Materializer); var f2 = source.RunWith(Sink.Seq(), Materializer); // Ensure subscription of Sinks. This is racy but there is no event we can hook into here. - Thread.Sleep(500); + await Task.Delay(500); + + firstElement.SetResult(1); + (await f1.ShouldCompleteWithin(3.Seconds())).Should().BeEquivalentTo(Enumerable.Range(1, 10)); + (await f2.ShouldCompleteWithin(3.Seconds())).Should().BeEquivalentTo(Enumerable.Range(1, 10)); + */ + + var f1 = source.RunWith(this.SinkProbe(), Materializer); + var f2 = source.RunWith(this.SinkProbe(), Materializer); + + // Ensure subscription of Sinks. + await Task.WhenAll( + f1.EnsureSubscriptionAsync(), + f2.EnsureSubscriptionAsync()) + .ShouldCompleteWithin(3.Seconds()); + firstElement.SetResult(1); - f1.AwaitResult().Should().BeEquivalentTo(Enumerable.Range(1, 10)); - f2.AwaitResult().Should().BeEquivalentTo(Enumerable.Range(1, 10)); + (await f1.ToStrictAsync(3.Seconds()).ToListAsync()).Should().BeEquivalentTo(Enumerable.Range(1, 10)); + (await f2.ToStrictAsync(3.Seconds()).ToListAsync()).Should().BeEquivalentTo(Enumerable.Range(1, 10)); + }, Materializer); } [Fact] - public void BroadcastHub_must_ensure_that_from_two_different_speed_consumers_the_slower_controls_the_rate() + public async Task BroadcastHub_must_ensure_that_from_two_different_speed_consumers_the_slower_controls_the_rate() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var other = Source.From(Enumerable.Range(2, 19)) .MapMaterializedValue>(_ => null); - var t = Source.Maybe() + var (firstElement, source) = Source.Maybe() .Concat(other) .ToMaterialized(BroadcastHub.Sink(1), Keep.Both) .Run(Materializer); - var firstElement = t.Item1; - var source = t.Item2; + /* + // Original code var f1 = source .Throttle(1, TimeSpan.FromMilliseconds(10), 1, ThrottleMode.Shaping) .RunWith(Sink.Seq(), Materializer); @@ -427,42 +483,78 @@ public void BroadcastHub_must_ensure_that_from_two_different_speed_consumers_the .RunWith(Sink.Seq(), Materializer); // Ensure subscription of Sinks. This is racy but there is no event we can hook into here. - Thread.Sleep(100); + await Task.Delay(500); + + firstElement.SetResult(1); + (await f1.ShouldCompleteWithin(3.Seconds())).Should().BeEquivalentTo(Enumerable.Range(1, 20)); + (await f2.ShouldCompleteWithin(3.Seconds())).Should().BeEquivalentTo(Enumerable.Range(1, 20)); + */ + + var f1 = source + .Throttle(1, TimeSpan.FromMilliseconds(10), 1, ThrottleMode.Shaping) + .RunWith(this.SinkProbe(), Materializer); + // Second cannot be overwhelmed since the first one throttles the overall rate, and second allows a higher rate + var f2 = source + .Throttle(10, TimeSpan.FromMilliseconds(10), 8, ThrottleMode.Shaping) + .RunWith(this.SinkProbe(), Materializer); + + // Ensure subscription of Sinks. + await Task.WhenAll( + f1.EnsureSubscriptionAsync(), + f2.EnsureSubscriptionAsync()) + .ShouldCompleteWithin(3.Seconds()); + firstElement.SetResult(1); - f1.AwaitResult().Should().BeEquivalentTo(Enumerable.Range(1, 20)); - f2.AwaitResult().Should().BeEquivalentTo(Enumerable.Range(1, 20)); + (await f1.ToStrictAsync(3.Seconds()).ToListAsync()).Should().BeEquivalentTo(Enumerable.Range(1, 20)); + (await f2.ToStrictAsync(3.Seconds()).ToListAsync()).Should().BeEquivalentTo(Enumerable.Range(1, 20)); }, Materializer); } [Fact] - public void BroadcastHub_must_send_the_same_elements_to_consumers_attaching_around_the_same_time_with_a_buffer_size_of_one() + public async Task BroadcastHub_must_send_the_same_elements_to_consumers_attaching_around_the_same_time_with_a_buffer_size_of_one() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var other = Source.From(Enumerable.Range(2, 9)) .MapMaterializedValue>(_ => null); - var t = Source.Maybe() + var (firstElement, source) = Source.Maybe() .Concat(other) .ToMaterialized(BroadcastHub.Sink(1), Keep.Both) .Run(Materializer); - var firstElement = t.Item1; - var source = t.Item2; + /* + // Original code var f1 = source.RunWith(Sink.Seq(), Materializer); var f2 = source.RunWith(Sink.Seq(), Materializer); // Ensure subscription of Sinks. This is racy but there is no event we can hook into here. - Thread.Sleep(500); + await Task.Delay(500); + + firstElement.SetResult(1); + (await f1.ShouldCompleteWithin(3.Seconds())).Should().BeEquivalentTo(Enumerable.Range(1, 10)); + (await f2.ShouldCompleteWithin(3.Seconds())).Should().BeEquivalentTo(Enumerable.Range(1, 10)); + */ + + var f1 = source.RunWith(this.SinkProbe(), Materializer); + var f2 = source.RunWith(this.SinkProbe(), Materializer); + + // Ensure subscription of Sinks. + await Task.WhenAll( + f1.EnsureSubscriptionAsync(), + f2.EnsureSubscriptionAsync()) + .ShouldCompleteWithin(3.Seconds()); + firstElement.SetResult(1); - f1.AwaitResult().Should().BeEquivalentTo(Enumerable.Range(1, 10)); - f2.AwaitResult().Should().BeEquivalentTo(Enumerable.Range(1, 10)); + (await f1.ToStrictAsync(3.Seconds()).ToListAsync()).Should().BeEquivalentTo(Enumerable.Range(1, 10)); + (await f2.ToStrictAsync(3.Seconds()).ToListAsync()).Should().BeEquivalentTo(Enumerable.Range(1, 10)); + }, Materializer); } [Fact] - public void BroadcastHub_must_be_able_to_implement_a_keep_dropping_if_unsubscribed_policy_with_a_simple_SinkIgnore() + public async Task BroadcastHub_must_be_able_to_implement_a_keep_dropping_if_unsubscribed_policy_with_a_simple_SinkIgnore() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var killSwitch = KillSwitches.Shared("test-switch"); var source = Source.From(Enumerable.Range(1, int.MaxValue)) @@ -470,52 +562,63 @@ public void BroadcastHub_must_be_able_to_implement_a_keep_dropping_if_unsubscrib .RunWith(BroadcastHub.Sink(8), Materializer); // Now the Hub "drops" elements until we attach a new consumer (Source.ignore consumes as fast as possible) - source.RunWith(Sink.Ignore(), Materializer); + var ignoredTask = source.RunWith(Sink.Ignore(), Materializer); // Now we attached a subscriber which will block the Sink.ignore to "take away" and drop elements anymore, // turning the BroadcastHub to a normal non-dropping mode var downstream = this.CreateSubscriberProbe(); source.RunWith(Sink.FromSubscriber(downstream), Materializer); - downstream.Request(1); - var first = downstream.ExpectNext(); + var first = await downstream.AsyncBuilder() + .Request(1) + .ExpectNextAsync(); - for (var i = first + 1; i < first + 11; i++) + foreach (var i in Enumerable.Range(first, 10)) { - downstream.Request(1); - downstream.ExpectNext(i); + await downstream.AsyncBuilder() + .Request(1) + .ExpectNext(i) + .ExecuteAsync(); } - downstream.Cancel(); + await downstream.CancelAsync(); killSwitch.Shutdown(); + await ignoredTask.ShouldCompleteWithin(3.Seconds()); }, Materializer); } [Fact] - public void BroadcastHub_must_remember_completion_for_materialisations_after_completion() + public async Task BroadcastHub_must_remember_completion_for_materialization_after_completion() { - var t = this.SourceProbe() - .ToMaterialized(BroadcastHub.Sink(), Keep.Both) - .Run(Materializer); - var sourceProbe = t.Item1; - var source = t.Item2; - var sinkProbe = source.RunWith(this.SinkProbe(), Materializer); + await this.AssertAllStagesStoppedAsync(async () => + { + var (sourceProbe, source) = this.SourceProbe() + .ToMaterialized(BroadcastHub.Sink(), Keep.Both) + .Run(Materializer); + var sinkProbe = source.RunWith(this.SinkProbe(), Materializer); - sourceProbe.SendComplete(); + sourceProbe.SendComplete(); - sinkProbe.Request(1).ExpectComplete(); + await sinkProbe.AsyncBuilder() + .Request(1) + .ExpectComplete() + .ExecuteAsync(); - // Materialize a second time. There was a race here, where we managed to enqueue our Source registration just - // immediately before the Hub shut down. - var sink2Probe = source.RunWith(this.SinkProbe(), Materializer); + // Materialize a second time. There was a race here, where we managed to enqueue our Source registration just + // immediately before the Hub shut down. + var sink2Probe = source.RunWith(this.SinkProbe(), Materializer); - sink2Probe.Request(1).ExpectComplete(); + await sink2Probe.AsyncBuilder() + .Request(1) + .ExpectComplete() + .ExecuteAsync(); + }, Materializer); } [Fact] - public void BroadcastHub_must_properly_signal_error_to_consumers() + public async Task BroadcastHub_must_properly_signal_error_to_consumers() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var upstream = this.CreatePublisherProbe(); var source = Source.FromPublisher(upstream).RunWith(BroadcastHub.Sink(8), Materializer); @@ -525,86 +628,96 @@ public void BroadcastHub_must_properly_signal_error_to_consumers() source.RunWith(Sink.FromSubscriber(downstream1), Materializer); source.RunWith(Sink.FromSubscriber(downstream2), Materializer); - downstream1.Request(4); - downstream2.Request(8); - - Enumerable.Range(1, 8).ForEach(x => upstream.SendNext(x)); - - downstream1.ExpectNext(1, 2, 3, 4); - downstream2.ExpectNext(1, 2, 3, 4, 5, 6, 7, 8); - - downstream1.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); - downstream2.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); - - upstream.SendError(new TestException("failed")); - downstream1.ExpectError().Message.Should().Be("failed"); - downstream2.ExpectError().Message.Should().Be("failed"); + await downstream1.RequestAsync(4); + await downstream2.RequestAsync(8); + + await upstream.AsyncBuilder() + .SendNext(Enumerable.Range(1, 8)) + .ExecuteAsync(); + + await downstream1.AsyncBuilder() + .ExpectNext(1, 2, 3, 4) + .ExpectNoMsg(100.Milliseconds()) + .ExecuteAsync(); + await downstream2.AsyncBuilder() + .ExpectNext(1, 2, 3, 4, 5, 6, 7, 8) + .ExpectNoMsg(100.Milliseconds()) + .ExecuteAsync(); + + await upstream.SendErrorAsync(new TestException("failed")); + (await downstream1.ExpectErrorAsync()).Message.Should().Be("failed"); + (await downstream2.ExpectErrorAsync()).Message.Should().Be("failed"); }, Materializer); } [Fact] - public void BroadcastHub_must_properly_signal_completion_to_consumers_arriving_after_producer_finished() + public async Task BroadcastHub_must_properly_signal_completion_to_consumers_arriving_after_producer_finished() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var source = Source.Empty().RunWith(BroadcastHub.Sink(8), Materializer); // Wait enough so the Hub gets the completion. This is racy, but this is fine because both // cases should work in the end - Thread.Sleep(50); + await Task.Delay(50); - source.RunWith(Sink.Seq(), Materializer).AwaitResult().Should().BeEmpty(); + (await source.RunWith(Sink.Seq(), Materializer).ShouldCompleteWithin(3.Seconds())) + .Should().BeEmpty(); }, Materializer); } [Fact] - public void BroadcastHub_must_properly_signal_error_to_consumers_arriving_after_producer_finished() + public async Task BroadcastHub_must_properly_signal_error_to_consumers_arriving_after_producer_finished() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var source = Source.Failed(new TestException("Fail!")) .RunWith(BroadcastHub.Sink(8), Materializer); // Wait enough so the Hub gets the completion. This is racy, but this is fine because both // cases should work in the end - Thread.Sleep(50); + await Task.Delay(50); - var task = source.RunWith(Sink.Seq(), Materializer); - task.Invoking(t => t.Wait(TimeSpan.FromSeconds(3))).Should().Throw(); + await Awaiting(async () => + { + await source.RunWith(Sink.Seq(), Materializer); + }).Should().ThrowAsync().ShouldCompleteWithin(3.Seconds()); }, Materializer); } [Fact] - public void PartitionHub_must_work_in_the_happy_case_with_one_stream() + public async Task PartitionHub_must_work_in_the_happy_case_with_one_stream() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var items = Enumerable.Range(1, 10).ToList(); var source = Source.From(items) .RunWith(PartitionHub.Sink((size, e) => 0, 0, 8), Materializer); - var result = source.RunWith(Sink.Seq(), Materializer).AwaitResult(); + var result = await source.RunWith(Sink.Seq(), Materializer).ShouldCompleteWithin(3.Seconds()); result.Should().BeEquivalentTo(items); }, Materializer); } [Fact] - public void PartitionHub_must_work_in_the_happy_case_with_two_streams() + public async Task PartitionHub_must_work_in_the_happy_case_with_two_streams() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var source = Source.From(Enumerable.Range(0, 10)) .RunWith(PartitionHub.Sink((size, e) => e % size, 2, 8), Materializer); + var result1 = source.RunWith(Sink.Seq(), Materializer); // it should not start publishing until startAfterNrOfConsumers = 2 - Thread.Sleep(50); + await Task.Delay(50); var result2 = source.RunWith(Sink.Seq(), Materializer); - result1.AwaitResult().Should().BeEquivalentTo(new[] { 0, 2, 4, 6, 8 }); - result2.AwaitResult().Should().BeEquivalentTo(new[] { 1, 3, 5, 7, 9 }); + + (await result1.ShouldCompleteWithin(3.Seconds())).Should().BeEquivalentTo(new[] { 0, 2, 4, 6, 8 }); + (await result2.ShouldCompleteWithin(3.Seconds())).Should().BeEquivalentTo(new[] { 1, 3, 5, 7, 9 }); }, Materializer); } [Fact] - public void PartitionHub_must_be_able_to_use_as_rount_robin_router() + public async Task PartitionHub_must_be_able_to_be_used_as_round_robin_router() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var source = Source.From(Enumerable.Range(0, 10)) .RunWith(PartitionHub.StatefulSink(() => @@ -616,17 +729,19 @@ public void PartitionHub_must_be_able_to_use_as_rount_robin_router() return info.ConsumerByIndex((int)n % info.Size); }); }, 2, 8), Materializer); + var result1 = source.RunWith(Sink.Seq(), Materializer); var result2 = source.RunWith(Sink.Seq(), Materializer); - result1.AwaitResult().Should().BeEquivalentTo(new[] { 1, 3, 5, 7, 9 }); - result2.AwaitResult().Should().BeEquivalentTo(new[] { 0, 2, 4, 6, 8 }); + + (await result1.ShouldCompleteWithin(3.Seconds())).Should().BeEquivalentTo(new[] { 1, 3, 5, 7, 9 }); + (await result2.ShouldCompleteWithin(3.Seconds())).Should().BeEquivalentTo(new[] { 0, 2, 4, 6, 8 }); }, Materializer); } [Fact] - public void PartitionHub_must_be_able_to_use_as__sticky_session_rount_robin_router() + public async Task PartitionHub_must_be_able_to_be_used_as_sticky_session_round_robin_router() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var source = Source.From(new[] { "usr-1", "usr-2", "usr-1", "usr-3" }) .RunWith(PartitionHub.StatefulSink(() => @@ -643,17 +758,19 @@ public void PartitionHub_must_be_able_to_use_as__sticky_session_rount_robin_rout return id; }); }, 2, 8), Materializer); + var result1 = source.RunWith(Sink.Seq(), Materializer); var result2 = source.RunWith(Sink.Seq(), Materializer); - result1.AwaitResult().Should().BeEquivalentTo(new[] { "usr-2" }); - result2.AwaitResult().Should().BeEquivalentTo(new[] { "usr-1", "usr-1", "usr-3" }); + + (await result1.ShouldCompleteWithin(3.Seconds())).Should().BeEquivalentTo("usr-2"); + (await result2.ShouldCompleteWithin(3.Seconds())).Should().BeEquivalentTo("usr-1", "usr-1", "usr-3"); }, Materializer); } [Fact] - public void PartitionHub_must_be_able_to_use_as_fastest_consumer_router() + public async Task PartitionHub_must_be_able_to_use_as_fastest_consumer_router() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var items = Enumerable.Range(0, 999).ToList(); var source = Source.From(items) @@ -664,66 +781,66 @@ public void PartitionHub_must_be_able_to_use_as_fastest_consumer_router() var result2 = source.Throttle(10, TimeSpan.FromMilliseconds(100), 10, ThrottleMode.Shaping) .RunWith(Sink.Seq(), Materializer); - result1.AwaitResult().Count.ShouldBeGreaterThan(result2.AwaitResult().Count); + var count1 = (await result1.ShouldCompleteWithin(3.Seconds())).Count; + var count2 = (await result2.ShouldCompleteWithin(3.Seconds())).Count; + count1.ShouldBeGreaterThan(count2); }, Materializer); } [Fact] - public void PartitionHub_must_route_evenly() + public async Task PartitionHub_must_route_evenly() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { - var t = this.SourceProbe() + var (testSource, hub) = this.SourceProbe() .ToMaterialized(PartitionHub.Sink((size, e) => e % size, 2, 8), Keep.Both) .Run(Materializer); - var testSource = t.Item1; - var hub = t.Item2; var probe0 = hub.RunWith(this.SinkProbe(), Materializer); var probe1 = hub.RunWith(this.SinkProbe(), Materializer); - probe0.Request(3); - probe1.Request(10); - testSource.SendNext(0); - probe0.ExpectNext(0); - testSource.SendNext(1); - probe1.ExpectNext(1); + await probe0.RequestAsync(3); + await probe1.RequestAsync(10); + await testSource.SendNextAsync(0); + await probe0.ExpectNextAsync(0); + await testSource.SendNextAsync(1); + await probe1.ExpectNextAsync(1); - testSource.SendNext(2); - testSource.SendNext(3); - testSource.SendNext(4); - probe0.ExpectNext(2); - probe1.ExpectNext(3); - probe0.ExpectNext(4); + await testSource.SendNextAsync(2); + await testSource.SendNextAsync(3); + await testSource.SendNextAsync(4); + await probe0.ExpectNextAsync(2); + await probe1.ExpectNextAsync(3); + await probe0.ExpectNextAsync(4); // probe1 has not requested more - testSource.SendNext(5); - testSource.SendNext(6); - testSource.SendNext(7); - probe1.ExpectNext(5); - probe1.ExpectNext(7); - probe0.ExpectNoMsg(TimeSpan.FromMilliseconds(50)); - probe0.Request(10); - probe0.ExpectNext(6); - - testSource.SendComplete(); - probe0.ExpectComplete(); - probe1.ExpectComplete(); + await testSource.SendNextAsync(5); + await testSource.SendNextAsync(6); + await testSource.SendNextAsync(7); + await probe1.ExpectNextAsync(5); + await probe1.ExpectNextAsync(7); + await probe0.AsyncBuilder() + .ExpectNoMsg(TimeSpan.FromMilliseconds(50)) + .Request(10) + .ExpectNext(6) + .ExecuteAsync(); + + await testSource.SendCompleteAsync(); + await probe0.ExpectCompleteAsync(); + await probe1.ExpectCompleteAsync(); }, Materializer); } [Fact] - public void PartitionHub_must_route_unevenly() + public async Task PartitionHub_must_route_unevenly() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { - var t = this.SourceProbe() + var (testSource, hub) = this.SourceProbe() .ToMaterialized(PartitionHub.Sink((size, e) => (e % 3) % 2, 2, 8), Keep.Both) .Run(Materializer); - var testSource = t.Item1; - var hub = t.Item2; var probe0 = hub.RunWith(this.SinkProbe(), Materializer); var probe1 = hub.RunWith(this.SinkProbe(), Materializer); @@ -734,71 +851,69 @@ public void PartitionHub_must_route_unevenly() // 3 => 0 // 4 => 1 - probe0.Request(10); - probe1.Request(10); - testSource.SendNext(0); - probe0.ExpectNext(0); - testSource.SendNext(1); - probe1.ExpectNext(1); - testSource.SendNext(2); - probe0.ExpectNext(2); - testSource.SendNext(3); - probe0.ExpectNext(3); - testSource.SendNext(4); - probe1.ExpectNext(4); - - testSource.SendComplete(); - probe0.ExpectComplete(); - probe1.ExpectComplete(); + await probe0.RequestAsync(10); + await probe1.RequestAsync(10); + await testSource.SendNextAsync(0); + await probe0.ExpectNextAsync(0); + await testSource.SendNextAsync(1); + await probe1.ExpectNextAsync(1); + await testSource.SendNextAsync(2); + await probe0.ExpectNextAsync(2); + await testSource.SendNextAsync(3); + await probe0.ExpectNextAsync(3); + await testSource.SendNextAsync(4); + await probe1.ExpectNextAsync(4); + + await testSource.SendCompleteAsync(); + await probe0.ExpectCompleteAsync(); + await probe1.ExpectCompleteAsync(); }, Materializer); } [Fact] - public void PartitionHub_must_backpressure() + public async Task PartitionHub_must_backpressure() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { - var t = this.SourceProbe() + var (testSource, hub) = this.SourceProbe() .ToMaterialized(PartitionHub.Sink((size, e) => 0, 2, 4), Keep.Both) .Run(Materializer); - var testSource = t.Item1; - var hub = t.Item2; var probe0 = hub.RunWith(this.SinkProbe(), Materializer); var probe1 = hub.RunWith(this.SinkProbe(), Materializer); - probe0.Request(10); - probe1.Request(10); - testSource.SendNext(0); - probe0.ExpectNext(0); - testSource.SendNext(1); - probe0.ExpectNext(1); - testSource.SendNext(2); - probe0.ExpectNext(2); - testSource.SendNext(3); - probe0.ExpectNext(3); - testSource.SendNext(4); - probe0.ExpectNext(4); - - testSource.SendComplete(); - probe0.ExpectComplete(); - probe1.ExpectComplete(); + await probe0.RequestAsync(10); + await probe1.RequestAsync(10); + await testSource.SendNextAsync(0); + await probe0.ExpectNextAsync(0); + await testSource.SendNextAsync(1); + await probe0.ExpectNextAsync(1); + await testSource.SendNextAsync(2); + await probe0.ExpectNextAsync(2); + await testSource.SendNextAsync(3); + await probe0.ExpectNextAsync(3); + await testSource.SendNextAsync(4); + await probe0.ExpectNextAsync(4); + + await testSource.SendCompleteAsync(); + await probe0.ExpectCompleteAsync(); + await probe1.ExpectCompleteAsync(); }, Materializer); } [Fact] - public void PartitionHub_must_ensure_that_from_two_different_speed_consumers_the_slower_controls_the_rate() + public async Task PartitionHub_must_ensure_that_from_two_different_speed_consumers_the_slower_controls_the_rate() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { - var t = Source.Maybe().ConcatMaterialized(Source.From(Enumerable.Range(1, 19)), Keep.Left) + /* + var (firstElement, source) = Source.Maybe().ConcatMaterialized(Source.From(Enumerable.Range(1, 19)), Keep.Left) .ToMaterialized(PartitionHub.Sink((size, e) => e % size, 2, 1), Keep.Both) .Run(Materializer); - var firstElement = t.Item1; - var source = t.Item2; + // Original code var f1 = source.Throttle(1, TimeSpan.FromMilliseconds(10), 1, ThrottleMode.Shaping) .RunWith(Sink.Seq(), Materializer); @@ -807,7 +922,7 @@ public void PartitionHub_must_ensure_that_from_two_different_speed_consumers_the .RunWith(Sink.Seq(), Materializer); // Ensure subscription of Sinks. This is racy but there is no event we can hook into here. - Thread.Sleep(100); + await Task.Delay(100); // on the jvm Some 0 is used, unfortunately haven't we used Option for the Maybe source // and therefore firstElement.SetResult(0) will complete the source without pushing an element // since 0 is the default value for int and if you set the result to default(T) it will ignore @@ -817,15 +932,39 @@ public void PartitionHub_must_ensure_that_from_two_different_speed_consumers_the var expectationF1 = Enumerable.Range(1, 18).Where(v => v % 2 == 0).ToList(); expectationF1.Insert(0, 50); - f1.AwaitResult().Should().BeEquivalentTo(expectationF1); - f2.AwaitResult().Should().BeEquivalentTo(Enumerable.Range(1, 19).Where(v => v % 2 != 0)); + (await f1.ShouldCompleteWithin(3.Seconds())).Should().BeEquivalentTo(expectationF1); + (await f2.ShouldCompleteWithin(3.Seconds())).Should().BeEquivalentTo(Enumerable.Range(1, 19).Where(v => v % 2 != 0)); + */ + + var (firstElement, source) = Source.Maybe().ConcatMaterialized(Source.From(Enumerable.Range(1, 19).Select(i => (int?)i)), Keep.Left) + .ToMaterialized(PartitionHub.Sink((size, e) => (e ?? 0) % size, 2, 1), Keep.Both) + .Run(Materializer); + + var f1 = source.Throttle(1, TimeSpan.FromMilliseconds(10), 1, ThrottleMode.Shaping) + .RunWith(this.SinkProbe(), Materializer); + + // Second cannot be overwhelmed since the first one throttles the overall rate, and second allows a higher rate + var f2 = source.Throttle(10, TimeSpan.FromMilliseconds(10), 8, ThrottleMode.Enforcing) + .RunWith(this.SinkProbe(), Materializer); + + await f1.ExpectSubscriptionAsync(); + await f2.ExpectSubscriptionAsync(); + + firstElement.SetResult(0); + + var expectationF1 = Enumerable.Range(0, 18).Where(v => v % 2 == 0).ToList(); + + var result1 = await f1.ToStrictAsync(3.Seconds()).ToListAsync(); + result1.Should().BeEquivalentTo(expectationF1); + var result2 = await f2.ToStrictAsync(3.Seconds()).ToListAsync(); + result2.Should().BeEquivalentTo(Enumerable.Range(1, 19).Where(v => v % 2 != 0)); }, Materializer); } [Fact] - public void PartitionHub_must_properly_signal_error_to_consumer() + public async Task PartitionHub_must_properly_signal_error_to_consumer() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var upstream = this.CreatePublisherProbe(); var source = Source.FromPublisher(upstream) @@ -836,74 +975,85 @@ public void PartitionHub_must_properly_signal_error_to_consumer() var downstream2 = this.CreateSubscriberProbe(); source.RunWith(Sink.FromSubscriber(downstream2), Materializer); - downstream1.Request(4); - downstream2.Request(8); + await downstream1.RequestAsync(4); + await downstream2.RequestAsync(8); Enumerable.Range(0, 16).ForEach(i => upstream.SendNext(i)); - downstream1.ExpectNext(0, 2, 4, 6); - downstream2.ExpectNext(1, 3, 5, 7, 9, 11, 13, 15); - - downstream1.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); - downstream2.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); + await downstream1.AsyncBuilder() + .ExpectNext(0, 2, 4, 6) + .ExpectNoMsg(TimeSpan.FromMilliseconds(100)) + .ExecuteAsync(); + await downstream2.AsyncBuilder() + .ExpectNext(1, 3, 5, 7, 9, 11, 13, 15) + .ExpectNoMsg(TimeSpan.FromMilliseconds(100)) + .ExecuteAsync(); var failure = new TestException("Failed"); - upstream.SendError(failure); + await upstream.SendErrorAsync(failure); - downstream1.ExpectError().Should().Be(failure); - downstream2.ExpectError().Should().Be(failure); + (await downstream1.ExpectErrorAsync()).Should().Be(failure); + (await downstream2.ExpectErrorAsync()).Should().Be(failure); }, Materializer); } [Fact] - public void PartitionHub_must_properly_signal_completion_to_consumers_arriving_after_producer_finished() + public async Task PartitionHub_must_properly_signal_completion_to_consumers_arriving_after_producer_finished() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var source = Source.Empty().RunWith(PartitionHub.Sink((s, e) => e % s, 0), Materializer); // Wait enough so the Hub gets the completion. This is racy, but this is fine because both // cases should work in the end - Thread.Sleep(50); + await Task.Delay(50); - source.RunWith(Sink.Seq(), Materializer).AwaitResult().Should().BeEmpty(); + (await source.RunWith(Sink.Seq(), Materializer).ShouldCompleteWithin(3.Seconds())) + .Should().BeEmpty(); }, Materializer); } [Fact] - public void PartitionHub_must_remeber_completion_for_materialisations_after_completion() + public async Task PartitionHub_must_remember_completion_for_materialization_after_completion() { - var t = this.SourceProbe().ToMaterialized(PartitionHub.Sink((s, e) => 0, 0), Keep.Both) - .Run(Materializer); - var sourceProbe = t.Item1; - var source = t.Item2; - var sinkProbe = source.RunWith(this.SinkProbe(), Materializer); + await this.AssertAllStagesStoppedAsync(async () => + { + var (sourceProbe, source) = this.SourceProbe().ToMaterialized(PartitionHub.Sink((s, e) => 0, 0), Keep.Both) + .Run(Materializer); + var sinkProbe = source.RunWith(this.SinkProbe(), Materializer); - sourceProbe.SendComplete(); + await sourceProbe.SendCompleteAsync(); - sinkProbe.Request(1); - sinkProbe.ExpectComplete(); + await sinkProbe.AsyncBuilder() + .Request(1) + .ExpectComplete() + .ExecuteAsync(); - // Materialize a second time. There was a race here, where we managed to enqueue our Source registration just - // immediately before the Hub shut down. - var sink2Probe = source.RunWith(this.SinkProbe(), Materializer); + // Materialize a second time. There was a race here, where we managed to enqueue our Source registration just + // immediately before the Hub shut down. + var sink2Probe = source.RunWith(this.SinkProbe(), Materializer); - sink2Probe.Request(1); - sink2Probe.ExpectComplete(); + await sink2Probe.AsyncBuilder() + .Request(1) + .ExpectComplete() + .ExecuteAsync(); + }, Materializer); } [Fact] - public void PartitionHub_must_properly_signal_error_to_consumer_arriving_after_producer_finished() + public async Task PartitionHub_must_properly_signal_error_to_consumer_arriving_after_producer_finished() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async () => { var failure = new TestException("Fail!"); var source = Source.Failed(failure).RunWith(PartitionHub.Sink((s, e) => 0, 0), Materializer); // Wait enough so the Hub gets the completion. This is racy, but this is fine because both // cases should work in the end - Thread.Sleep(50); + await Task.Delay(50); - Action a = () => source.RunWith(Sink.Seq(), Materializer).AwaitResult(); - a.Should().Throw().WithMessage("Fail!"); + await Awaiting(async () => + { + await source.RunWith(Sink.Seq(), Materializer); + }).Should().ThrowAsync().WithMessage("Fail!").ShouldCompleteWithin(3.Seconds()); }, Materializer); } }