Skip to content

Commit

Permalink
Fixes GroupBy does not invoke decider
Browse files Browse the repository at this point in the history
  • Loading branch information
ismaelhamed committed Apr 23, 2022
1 parent b77b68e commit 810882b
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 1 deletion.
Expand Up @@ -1012,6 +1012,10 @@ namespace Akka.Streams
Shaping = 0,
Enforcing = 1,
}
public class TooManySubstreamsOpenException : System.InvalidOperationException
{
public TooManySubstreamsOpenException() { }
}
public abstract class TransformerLikeBase<TIn, TOut> : Akka.Streams.ITransformerLike<TIn, TOut>
{
protected TransformerLikeBase() { }
Expand Down
14 changes: 14 additions & 0 deletions src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs
Expand Up @@ -427,6 +427,20 @@ public void GroupBy_must_fail_when_exceeding_maxSubstreams()
}, Materializer);
}

[Fact]
public void GroupBy_must_resume_when_exceeding_maxSubstreams()
{
var f = Flow.Create<int>().GroupBy(0, x => x).MergeSubstreams();
var (up, down) = ((Flow<int, int, NotUsed>)f)
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider))
.RunWith(this.SourceProbe<int>(), this.SinkProbe<int>(), Materializer);

down.Request(1);

up.SendNext(1);
down.ExpectNoMsg(TimeSpan.FromSeconds(1));
}

[Fact]
public void GroupBy_must_emit_subscribe_before_completed()
{
Expand Down
Expand Up @@ -411,7 +411,7 @@ public void OnPush()
else
{
if (_activeSubstreams.Count == _stage._maxSubstreams)
Fail(new IllegalStateException($"Cannot open substream for key {key}: too many substreams open"));
throw new TooManySubstreamsOpenException();
else if (_closedSubstreams.Contains(key) && !HasBeenPulled(_stage.In))
Pull(_stage.In);
else
Expand Down
22 changes: 22 additions & 0 deletions src/core/Akka.Streams/TooManySubstreamsOpenException.cs
@@ -0,0 +1,22 @@
//-----------------------------------------------------------------------
// <copyright file="TooManySubstreamsOpenException.cs" company="Akka.NET Project">
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;

namespace Akka.Streams
{
/// <summary>
/// This exception signals that the maximum number of substreams declared has been exceeded.
/// A finite limit is imposed so that memory usage is controlled.
/// </summary>
public class TooManySubstreamsOpenException : InvalidOperationException
{
public TooManySubstreamsOpenException() :
base("Cannot open a new substream as there are too many substreams open")
{ }
}
}

0 comments on commit 810882b

Please sign in to comment.