diff --git a/docs/articles/streams/builtinstages.md b/docs/articles/streams/builtinstages.md index 5ac675b47b1..e85abb667c6 100644 --- a/docs/articles/streams/builtinstages.md +++ b/docs/articles/streams/builtinstages.md @@ -157,13 +157,13 @@ Wrap an actor extending ``ActorPublisher`` as a source. ### ActorRef -Materialize an ``IActorRef``, sending messages to it will emit them on the stream. The actor contain +Materialize an ``IActorRef``, sending messages to it will emit them on the stream. The actor contains a buffer but since communication is one way, there is no back pressure. Handling overflow is done by either dropping -elements or failing the stream, the strategy is chosen by the user. +elements or failing the stream; the strategy is chosen by the user. -**emits** when there is demand and there are messages in the buffer or a message is sent to the actorref +**emits** when there is demand and there are messages in the buffer or a message is sent to the ``IActorRef`` -**completes** when the actorref is sent ``Akka.Actor.Status.Success`` or ``PoisonPill`` +**completes** when the ``IActorRef`` is sent ``Akka.Actor.Status.Success`` ### PreMaterialize diff --git a/docs/articles/streams/integration.md b/docs/articles/streams/integration.md index 779bf0ec421..da273c42b19 100644 --- a/docs/articles/streams/integration.md +++ b/docs/articles/streams/integration.md @@ -114,8 +114,7 @@ for this Source type, i.e. elements will be dropped if the buffer is filled by s at a rate that is faster than the stream can consume. You should consider using ``Source.Queue`` if you want a backpressured actor interface. -The stream can be completed successfully by sending ``Akka.Actor.PoisonPill`` or -``Akka.Actor.Status.Success`` to the actor reference. +The stream can be completed successfully by sending `Akka.Actor.Status.Success` to the actor reference. The stream can be completed with failure by sending ``Akka.Actor.Status.Failure`` to the actor reference. diff --git a/src/core/Akka.Streams.Tests/Dsl/ActorRefSourceSpec.cs b/src/core/Akka.Streams.Tests/Dsl/ActorRefSourceSpec.cs index 54f774ce755..f22255de95d 100644 --- a/src/core/Akka.Streams.Tests/Dsl/ActorRefSourceSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/ActorRefSourceSpec.cs @@ -28,7 +28,7 @@ public ActorRefSourceSpec() var settings = ActorMaterializerSettings.Create(Sys); Materializer = ActorMaterializer.Create(Sys, settings); } - + [Fact] public void A_ActorRefSource_must_emit_received_messages_to_the_stream() { @@ -45,7 +45,7 @@ public void A_ActorRefSource_must_emit_received_messages_to_the_stream() actorRef.Tell(3); s.ExpectNoMsg(TimeSpan.FromMilliseconds(500)); } - + [Fact] public void A_ActorRefSource_must_buffer_when_needed() { @@ -118,21 +118,6 @@ public void A_ActorRefSource_must_not_fail_when_0_buffer_space_and_demand_is_sig }, Materializer); } - [Fact] - public void A_ActorRefSource_must_completes_the_stream_immediately_when_receiving_PoisonPill() - { - this.AssertAllStagesStopped(() => - { - var s = this.CreateManualSubscriberProbe(); - var actorRef = Source.ActorRef(10, OverflowStrategy.Fail) - .To(Sink.FromSubscriber(s)) - .Run(Materializer); - s.ExpectSubscription(); - actorRef.Tell(PoisonPill.Instance); - s.ExpectComplete(); - }, Materializer); - } - [Fact] public void A_ActorRefSource_must_signal_buffered_elements_and_complete_the_stream_after_receiving_Status_Success() { @@ -178,23 +163,15 @@ public void A_ActorRefSource_must_not_buffer_elements_after_receiving_Status_Suc } [Fact] - public void A_ActorRefSource_must_after_receiving_Status_Success_allow_for_earlier_completion_with_PoisonPill() + public void A_ActorRefSource_must_complete_and_materialize_the_stream_after_receiving_Status_Success() { this.AssertAllStagesStopped(() => { - var s = this.CreateManualSubscriberProbe(); - var actorRef = Source.ActorRef(3, OverflowStrategy.DropBuffer) - .To(Sink.FromSubscriber(s)) + var (actorRef, done) = Source.ActorRef(3, OverflowStrategy.DropBuffer) + .ToMaterialized(Sink.Ignore(), Keep.Both) .Run(Materializer); - var sub = s.ExpectSubscription(); - actorRef.Tell(1); - actorRef.Tell(2); - actorRef.Tell(3); actorRef.Tell(new Status.Success("ok")); - sub.Request(2); // not all elements drained yet - s.ExpectNext(1, 2); - actorRef.Tell(PoisonPill.Instance); - s.ExpectComplete(); // element `3` not signaled + done.ContinueWith(_ => Done.Instance).Result.Should().Be(Done.Instance); }, Materializer); } diff --git a/src/core/Akka.Streams/Dsl/Source.cs b/src/core/Akka.Streams/Dsl/Source.cs index 2433f985c69..1173114c380 100644 --- a/src/core/Akka.Streams/Dsl/Source.cs +++ b/src/core/Akka.Streams/Dsl/Source.cs @@ -752,30 +752,42 @@ public static class Source /// Creates a that is materialized as an . /// Messages sent to this actor will be emitted to the stream if there is demand from downstream, /// otherwise they will be buffered until request for demand is received. - /// + /// /// Depending on the defined it might drop elements if /// there is no space available in the buffer. - /// + /// + /// /// The strategy is not supported, and an /// IllegalArgument("Backpressure overflowStrategy not supported") will be thrown if it is passed as argument. - /// + /// + /// /// The buffer can be disabled by using of 0 and then received messages are dropped /// if there is no demand from downstream. When is 0 the does /// not matter. An async boundary is added after this Source; as such, it is never safe to assume the downstream will always generate demand. - /// + /// + /// /// The stream can be completed successfully by sending the actor reference a /// message (whose content will be ignored) in which case already buffered elements will be signaled before signaling completion, /// or by sending in which case completion will be signaled immediately. - /// + /// + /// /// The stream can be completed with failure by sending a to the /// actor reference. In case the Actor is still draining its internal buffer (after having received /// a ) before signaling completion and it receives a , /// the failure will be signaled downstream immediately (instead of the completion signal). - /// + /// + /// + /// Note that terminating the actor without first completing it, either with a success or a + /// failure, will prevent the actor triggering downstream completion and the stream will continue + /// to run even though the source actor is dead. Therefore you should **not** attempt to + /// manually terminate the actor such as with a . + /// + /// /// The actor will be stopped when the stream is completed, failed or canceled from downstream, /// i.e. you can watch it to get notified when that happens. + /// + /// See also /// - /// /// TBD /// The size of the buffer in element count /// Strategy that is used when incoming elements cannot fit inside the buffer diff --git a/src/core/Akka.Streams/Implementation/ActorRefSourceActor.cs b/src/core/Akka.Streams/Implementation/ActorRefSourceActor.cs index 6a63c3cb25e..6822bc5d002 100644 --- a/src/core/Akka.Streams/Implementation/ActorRefSourceActor.cs +++ b/src/core/Akka.Streams/Implementation/ActorRefSourceActor.cs @@ -62,7 +62,7 @@ public ActorRefSourceActor(int bufferSize, OverflowStrategy overflowStrategy, in { BufferSize = bufferSize; OverflowStrategy = overflowStrategy; - Buffer = bufferSize != 0 ? Implementation.Buffer.Create(bufferSize, maxFixedBufferSize) : null; + Buffer = bufferSize != 0 ? Implementation.Buffer.Create(bufferSize, maxFixedBufferSize) : null; } /// @@ -76,7 +76,7 @@ public ActorRefSourceActor(int bufferSize, OverflowStrategy overflowStrategy, in /// TBD /// TBD protected override bool Receive(object message) - => DefaultReceive(message) || RequestElement(message) || (message is T && ReceiveElement((T) message)); + => DefaultReceive(message) || RequestElement(message) || (message is T && ReceiveElement((T)message)); /// /// TBD @@ -90,12 +90,12 @@ protected bool DefaultReceive(object message) else if (message is Status.Success) { if (BufferSize == 0 || Buffer.IsEmpty) - Context.Stop(Self); // will complete the stream successfully + OnCompleteThenStop(); // will complete the stream successfully else Context.Become(DrainBufferThenComplete); } - else if (message is Status.Failure && IsActive) - OnErrorThenStop(((Status.Failure)message).Cause); + else if (message is Status.Failure failure && IsActive) + OnErrorThenStop(failure.Cause); else return false; return true; @@ -179,12 +179,14 @@ protected virtual bool ReceiveElement(T message) private bool DrainBufferThenComplete(object message) { if (message is Cancel) + { Context.Stop(Self); - else if (message is Status.Failure && IsActive) + } + else if (message is Status.Failure failure && IsActive) { // errors must be signaled as soon as possible, // even if previously valid completion was requested via Status.Success - OnErrorThenStop(((Status.Failure)message).Cause); + OnErrorThenStop(failure.Cause); } else if (message is Request) { @@ -193,7 +195,7 @@ private bool DrainBufferThenComplete(object message) OnNext(Buffer.Dequeue()); if (Buffer.IsEmpty) - Context.Stop(Self); // will complete the stream successfully + OnCompleteThenStop(); // will complete the stream successfully } else if (IsActive) Log.Debug(