From 810882bd08f8de1953398d6826f89dd70949b1f3 Mon Sep 17 00:00:00 2001 From: Ismael Hamed <1279846+ismaelhamed@users.noreply.github.com> Date: Sat, 23 Apr 2022 12:44:25 +0200 Subject: [PATCH] Fixes GroupBy does not invoke decider --- .../CoreAPISpec.ApproveStreams.verified.txt | 4 ++++ .../Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs | 14 ++++++++++++ .../Implementation/Fusing/StreamOfStreams.cs | 2 +- .../TooManySubstreamsOpenException.cs | 22 +++++++++++++++++++ 4 files changed, 41 insertions(+), 1 deletion(-) create mode 100644 src/core/Akka.Streams/TooManySubstreamsOpenException.cs diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.verified.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.verified.txt index 8eeba130ac5..f5ea9a0ead2 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.verified.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.verified.txt @@ -1012,6 +1012,10 @@ namespace Akka.Streams Shaping = 0, Enforcing = 1, } + public class TooManySubstreamsOpenException : System.InvalidOperationException + { + public TooManySubstreamsOpenException() { } + } public abstract class TransformerLikeBase : Akka.Streams.ITransformerLike { protected TransformerLikeBase() { } diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs index f7193503b1e..186fe7399ea 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs @@ -427,6 +427,20 @@ public void GroupBy_must_fail_when_exceeding_maxSubstreams() }, Materializer); } + [Fact] + public void GroupBy_must_resume_when_exceeding_maxSubstreams() + { + var f = Flow.Create().GroupBy(0, x => x).MergeSubstreams(); + var (up, down) = ((Flow)f) + .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider)) + .RunWith(this.SourceProbe(), this.SinkProbe(), Materializer); + + down.Request(1); + + up.SendNext(1); + down.ExpectNoMsg(TimeSpan.FromSeconds(1)); + } + [Fact] public void GroupBy_must_emit_subscribe_before_completed() { diff --git a/src/core/Akka.Streams/Implementation/Fusing/StreamOfStreams.cs b/src/core/Akka.Streams/Implementation/Fusing/StreamOfStreams.cs index 5b8833c2d25..2f0161f9550 100644 --- a/src/core/Akka.Streams/Implementation/Fusing/StreamOfStreams.cs +++ b/src/core/Akka.Streams/Implementation/Fusing/StreamOfStreams.cs @@ -411,7 +411,7 @@ public void OnPush() else { if (_activeSubstreams.Count == _stage._maxSubstreams) - Fail(new IllegalStateException($"Cannot open substream for key {key}: too many substreams open")); + throw new TooManySubstreamsOpenException(); else if (_closedSubstreams.Contains(key) && !HasBeenPulled(_stage.In)) Pull(_stage.In); else diff --git a/src/core/Akka.Streams/TooManySubstreamsOpenException.cs b/src/core/Akka.Streams/TooManySubstreamsOpenException.cs new file mode 100644 index 00000000000..cbb9ab20f65 --- /dev/null +++ b/src/core/Akka.Streams/TooManySubstreamsOpenException.cs @@ -0,0 +1,22 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2022 Lightbend Inc. +// Copyright (C) 2013-2022 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; + +namespace Akka.Streams +{ + /// + /// This exception signals that the maximum number of substreams declared has been exceeded. + /// A finite limit is imposed so that memory usage is controlled. + /// + public class TooManySubstreamsOpenException : InvalidOperationException + { + public TooManySubstreamsOpenException() : + base("Cannot open a new substream as there are too many substreams open") + { } + } +}