Skip to content

Commit

Permalink
Merge pull request #5883 from Arkatufus/cherrypick_actorref-not-compl…
Browse files Browse the repository at this point in the history
…eting

Backport of #5875: Fixes Source.ActorRef not completing
  • Loading branch information
Arkatufus committed Apr 25, 2022
1 parent 9c31d13 commit 7cf5c96
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 50 deletions.
8 changes: 4 additions & 4 deletions docs/articles/streams/builtinstages.md
Expand Up @@ -157,13 +157,13 @@ Wrap an actor extending ``ActorPublisher`` as a source.

### ActorRef

Materialize an ``IActorRef``, sending messages to it will emit them on the stream. The actor contain
Materialize an ``IActorRef``, sending messages to it will emit them on the stream. The actor contains
a buffer but since communication is one way, there is no back pressure. Handling overflow is done by either dropping
elements or failing the stream, the strategy is chosen by the user.
elements or failing the stream; the strategy is chosen by the user.

**emits** when there is demand and there are messages in the buffer or a message is sent to the actorref
**emits** when there is demand and there are messages in the buffer or a message is sent to the ``IActorRef``

**completes** when the actorref is sent ``Akka.Actor.Status.Success`` or ``PoisonPill``
**completes** when the ``IActorRef`` is sent ``Akka.Actor.Status.Success``

### PreMaterialize

Expand Down
3 changes: 1 addition & 2 deletions docs/articles/streams/integration.md
Expand Up @@ -114,8 +114,7 @@ for this Source type, i.e. elements will be dropped if the buffer is filled by s
at a rate that is faster than the stream can consume. You should consider using ``Source.Queue``
if you want a backpressured actor interface.

The stream can be completed successfully by sending ``Akka.Actor.PoisonPill`` or
``Akka.Actor.Status.Success`` to the actor reference.
The stream can be completed successfully by sending `Akka.Actor.Status.Success` to the actor reference.

The stream can be completed with failure by sending ``Akka.Actor.Status.Failure`` to the
actor reference.
Expand Down
35 changes: 6 additions & 29 deletions src/core/Akka.Streams.Tests/Dsl/ActorRefSourceSpec.cs
Expand Up @@ -28,7 +28,7 @@ public ActorRefSourceSpec()
var settings = ActorMaterializerSettings.Create(Sys);
Materializer = ActorMaterializer.Create(Sys, settings);
}

[Fact]
public void A_ActorRefSource_must_emit_received_messages_to_the_stream()
{
Expand All @@ -45,7 +45,7 @@ public void A_ActorRefSource_must_emit_received_messages_to_the_stream()
actorRef.Tell(3);
s.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
}

[Fact]
public void A_ActorRefSource_must_buffer_when_needed()
{
Expand Down Expand Up @@ -118,21 +118,6 @@ public void A_ActorRefSource_must_not_fail_when_0_buffer_space_and_demand_is_sig
}, Materializer);
}

[Fact]
public void A_ActorRefSource_must_completes_the_stream_immediately_when_receiving_PoisonPill()
{
this.AssertAllStagesStopped(() =>
{
var s = this.CreateManualSubscriberProbe<int>();
var actorRef = Source.ActorRef<int>(10, OverflowStrategy.Fail)
.To(Sink.FromSubscriber(s))
.Run(Materializer);
s.ExpectSubscription();
actorRef.Tell(PoisonPill.Instance);
s.ExpectComplete();
}, Materializer);
}

[Fact]
public void A_ActorRefSource_must_signal_buffered_elements_and_complete_the_stream_after_receiving_Status_Success()
{
Expand Down Expand Up @@ -178,23 +163,15 @@ public void A_ActorRefSource_must_not_buffer_elements_after_receiving_Status_Suc
}

[Fact]
public void A_ActorRefSource_must_after_receiving_Status_Success_allow_for_earlier_completion_with_PoisonPill()
public void A_ActorRefSource_must_complete_and_materialize_the_stream_after_receiving_Status_Success()
{
this.AssertAllStagesStopped(() =>
{
var s = this.CreateManualSubscriberProbe<int>();
var actorRef = Source.ActorRef<int>(3, OverflowStrategy.DropBuffer)
.To(Sink.FromSubscriber(s))
var (actorRef, done) = Source.ActorRef<int>(3, OverflowStrategy.DropBuffer)
.ToMaterialized(Sink.Ignore<int>(), Keep.Both)
.Run(Materializer);
var sub = s.ExpectSubscription();
actorRef.Tell(1);
actorRef.Tell(2);
actorRef.Tell(3);
actorRef.Tell(new Status.Success("ok"));
sub.Request(2); // not all elements drained yet
s.ExpectNext(1, 2);
actorRef.Tell(PoisonPill.Instance);
s.ExpectComplete(); // element `3` not signaled
done.ContinueWith(_ => Done.Instance).Result.Should().Be(Done.Instance);
}, Materializer);
}

Expand Down
26 changes: 19 additions & 7 deletions src/core/Akka.Streams/Dsl/Source.cs
Expand Up @@ -752,30 +752,42 @@ public static class Source
/// Creates a <see cref="Source{TOut,TMat}"/> that is materialized as an <see cref="IActorRef"/>.
/// Messages sent to this actor will be emitted to the stream if there is demand from downstream,
/// otherwise they will be buffered until request for demand is received.
///
/// <para>
/// Depending on the defined <see cref="OverflowStrategy"/> it might drop elements if
/// there is no space available in the buffer.
///
/// </para>
/// <para>
/// The strategy <see cref="OverflowStrategy.Backpressure"/> is not supported, and an
/// IllegalArgument("Backpressure overflowStrategy not supported") will be thrown if it is passed as argument.
///
/// </para>
/// <para>
/// The buffer can be disabled by using <paramref name="bufferSize"/> of 0 and then received messages are dropped
/// if there is no demand from downstream. When <paramref name="bufferSize"/> is 0 the <paramref name="overflowStrategy"/> does
/// not matter. An async boundary is added after this Source; as such, it is never safe to assume the downstream will always generate demand.
///
/// </para>
/// <para>
/// The stream can be completed successfully by sending the actor reference a <see cref="Status.Success"/>
/// message (whose content will be ignored) in which case already buffered elements will be signaled before signaling completion,
/// or by sending <see cref="PoisonPill"/> in which case completion will be signaled immediately.
///
/// </para>
/// <para>
/// The stream can be completed with failure by sending a <see cref="Status.Failure"/> to the
/// actor reference. In case the Actor is still draining its internal buffer (after having received
/// a <see cref="Status.Success"/>) before signaling completion and it receives a <see cref="Status.Failure"/>,
/// the failure will be signaled downstream immediately (instead of the completion signal).
///
/// </para>
/// <para>
/// Note that terminating the actor without first completing it, either with a success or a
/// failure, will prevent the actor triggering downstream completion and the stream will continue
/// to run even though the source actor is dead. Therefore you should **not** attempt to
/// manually terminate the actor such as with a <see cref="PoisonPill"/>.
/// </para>
/// <para>
/// The actor will be stopped when the stream is completed, failed or canceled from downstream,
/// i.e. you can watch it to get notified when that happens.
/// </para>
/// See also <seealso cref="Queue{T}"/>
/// </summary>
/// <seealso cref="Queue{T}"/>
/// <typeparam name="T">TBD</typeparam>
/// <param name="bufferSize">The size of the buffer in element count</param>
/// <param name="overflowStrategy">Strategy that is used when incoming elements cannot fit inside the buffer</param>
Expand Down
18 changes: 10 additions & 8 deletions src/core/Akka.Streams/Implementation/ActorRefSourceActor.cs
Expand Up @@ -62,7 +62,7 @@ public ActorRefSourceActor(int bufferSize, OverflowStrategy overflowStrategy, in
{
BufferSize = bufferSize;
OverflowStrategy = overflowStrategy;
Buffer = bufferSize != 0 ? Implementation.Buffer.Create<T>(bufferSize, maxFixedBufferSize) : null;
Buffer = bufferSize != 0 ? Implementation.Buffer.Create<T>(bufferSize, maxFixedBufferSize) : null;
}

/// <summary>
Expand All @@ -76,7 +76,7 @@ public ActorRefSourceActor(int bufferSize, OverflowStrategy overflowStrategy, in
/// <param name="message">TBD</param>
/// <returns>TBD</returns>
protected override bool Receive(object message)
=> DefaultReceive(message) || RequestElement(message) || (message is T && ReceiveElement((T) message));
=> DefaultReceive(message) || RequestElement(message) || (message is T && ReceiveElement((T)message));

/// <summary>
/// TBD
Expand All @@ -90,12 +90,12 @@ protected bool DefaultReceive(object message)
else if (message is Status.Success)
{
if (BufferSize == 0 || Buffer.IsEmpty)
Context.Stop(Self); // will complete the stream successfully
OnCompleteThenStop(); // will complete the stream successfully
else
Context.Become(DrainBufferThenComplete);
}
else if (message is Status.Failure && IsActive)
OnErrorThenStop(((Status.Failure)message).Cause);
else if (message is Status.Failure failure && IsActive)
OnErrorThenStop(failure.Cause);
else
return false;
return true;
Expand Down Expand Up @@ -179,12 +179,14 @@ protected virtual bool ReceiveElement(T message)
private bool DrainBufferThenComplete(object message)
{
if (message is Cancel)
{
Context.Stop(Self);
else if (message is Status.Failure && IsActive)
}
else if (message is Status.Failure failure && IsActive)
{
// errors must be signaled as soon as possible,
// even if previously valid completion was requested via Status.Success
OnErrorThenStop(((Status.Failure)message).Cause);
OnErrorThenStop(failure.Cause);
}
else if (message is Request)
{
Expand All @@ -193,7 +195,7 @@ private bool DrainBufferThenComplete(object message)
OnNext(Buffer.Dequeue());

if (Buffer.IsEmpty)
Context.Stop(Self); // will complete the stream successfully
OnCompleteThenStop(); // will complete the stream successfully
}
else if (IsActive)
Log.Debug(
Expand Down

0 comments on commit 7cf5c96

Please sign in to comment.