Skip to content

Commit

Permalink
[Akka.Streams] make default LogSources actually usable (#7168)
Browse files Browse the repository at this point in the history
* allow `LogSource.Create` to return itself

If a `LogSource` is passed into `LogSource.Create`, just return that.

* close #7126 - added default usable `LogSource`s to all stream stages

* API approvals

* enrich all stream stages with actor path
  • Loading branch information
Aaronontheweb committed Apr 24, 2024
1 parent f8a3c88 commit 906c4d1
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 18 deletions.
Expand Up @@ -3440,6 +3440,7 @@ namespace Akka.Event
public System.Type Type { get; }
public static Akka.Event.LogSource Create(object o) { }
public static Akka.Event.LogSource Create(object o, Akka.Actor.ActorSystem system) { }
public static Akka.Event.LogSource Create(string source, System.Type t) { }
public static string FromActor(Akka.Actor.IActorContext actor, Akka.Actor.ActorSystem system) { }
public static string FromActorRef(Akka.Actor.IActorRef a, Akka.Actor.ActorSystem system) { }
public static string FromString(string source, Akka.Actor.ActorSystem system) { }
Expand Down
Expand Up @@ -3430,6 +3430,7 @@ namespace Akka.Event
public System.Type Type { get; }
public static Akka.Event.LogSource Create(object o) { }
public static Akka.Event.LogSource Create(object o, Akka.Actor.ActorSystem system) { }
public static Akka.Event.LogSource Create(string source, System.Type t) { }
public static string FromActor(Akka.Actor.IActorContext actor, Akka.Actor.ActorSystem system) { }
public static string FromActorRef(Akka.Actor.IActorRef a, Akka.Actor.ActorSystem system) { }
public static string FromString(string source, Akka.Actor.ActorSystem system) { }
Expand Down
122 changes: 122 additions & 0 deletions src/core/Akka.Streams.Tests/AkkaStreamsLogSourceSpec.cs
@@ -0,0 +1,122 @@
// -----------------------------------------------------------------------
// <copyright file="AkkaStreamsLogSourceSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2024 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2024 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Event;
using Akka.Streams.Dsl;
using Akka.Streams.Stage;
using Akka.TestKit;
using Xunit;
using Xunit.Abstractions;
using FluentAssertions;

namespace Akka.Streams.Tests;

public class AkkaStreamsLogSourceSpec : AkkaSpec
{
public AkkaStreamsLogSourceSpec(ITestOutputHelper output) : base(output, "akka.loglevel=DEBUG")
{
}

// create a custom Flow shape graph stage
private class TestLogStage<T> : GraphStage<FlowShape<T, T>>
{
private readonly string _name;

public TestLogStage(string name)
{
_name = name;
Shape = new FlowShape<T, T>(In, Out);
}

public Inlet<T> In { get; } = new("LogStage.in");
public Outlet<T> Out { get; } = new("LogStage.out");

public override FlowShape<T, T> Shape { get; }

protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) => new Logic(this);

private sealed class Logic : InAndOutGraphStageLogic
{
private readonly TestLogStage<T> _stage;

public Logic(TestLogStage<T> stage) : base(stage.Shape)
{
_stage = stage;
SetHandler(stage.In, this);
SetHandler(stage.Out, this);
}

public override void OnPush()
{
var element = Grab(_stage.In);
Log.Info($"Element: {element}");
Push(_stage.Out, element);
}

public override void OnPull()
{
Pull(_stage.In);
}
}
}


[Fact]
public async Task LogStage_should_log_elements_with_coherent_actorpath()
{
var probe = CreateTestProbe();
var source = Source.From(Enumerable.Range(1, 5));
var flow = Flow.Create<int>().Log("log")
.To(Sink.ActorRef<int>(probe.Ref, "completed", ex => new Status.Failure(ex)));

// create a probe and subscribe it to Debug level events
var logProbe = CreateTestProbe();
Sys.EventStream.Subscribe(logProbe.Ref, typeof(Debug));


source.RunWith(flow, Sys);

await probe.ExpectMsgAsync(1);
await probe.ExpectMsgAsync(2);
await probe.ExpectMsgAsync(3);
await probe.ExpectMsgAsync(4);
await probe.ExpectMsgAsync(5);

// check just the first log message
var logMessage = logProbe.ExpectMsg<Debug>();
logMessage.LogSource.Should().Contain("StreamSupervisor");
}

[Fact]
public async Task CustomStage_should_log_elements_with_friendly_name()
{
var probe = CreateTestProbe();
var source = Source.From(Enumerable.Range(1, 5));
var flow = Flow.Create<int>().Via(new TestLogStage<int>("custom"))
.To(Sink.ActorRef<int>(probe.Ref, "completed", ex => new Status.Failure(ex)));

// create a probe and subscribe it to Debug level events
var logProbe = CreateTestProbe();
Sys.EventStream.Subscribe(logProbe.Ref, typeof(Info));


source.RunWith(flow, Sys);

await probe.ExpectMsgAsync(1);
await probe.ExpectMsgAsync(2);
await probe.ExpectMsgAsync(3);
await probe.ExpectMsgAsync(4);
await probe.ExpectMsgAsync(5);

// check just the first log message
var logMessage = logProbe.ExpectMsg<Info>();
logMessage.LogSource.Should().Contain("StreamSupervisor");
}
}
18 changes: 16 additions & 2 deletions src/core/Akka.Streams/Implementation/ActorMaterializerImpl.cs
Expand Up @@ -384,7 +384,21 @@ public override TMat Materialize<TMat>(IGraph<ClosedShape, TMat> runnable, Func<
/// </summary>
/// <param name="logSource">The source that produces the log events.</param>
/// <returns>The newly created logging adapter.</returns>
public override ILoggingAdapter MakeLogger(object logSource) => Logging.GetLogger(System, logSource);
public override ILoggingAdapter MakeLogger(object logSource)
{
string actorPath;
LogSource newSource;
if (logSource is not LogSource s)
{
actorPath = $"{s}({LogSource.FromActorRef(_supervisor, System)})";
newSource = LogSource.Create(actorPath, s.Type);
return Logging.GetLogger(System, newSource);
}

actorPath = $"{s.Source}({LogSource.FromActorRef(_supervisor, System)})";
newSource = LogSource.Create(actorPath, s.Type);
return Logging.GetLogger(System, newSource);
}

/// <summary>
/// TBD
Expand All @@ -402,7 +416,7 @@ public override void Shutdown()
Supervisor.Tell(PoisonPill.Instance);
}

private ILoggingAdapter GetLogger() => _system.Log;
private ILoggingAdapter GetLogger() => MakeLogger(_supervisor);
}

/// <summary>
Expand Down
14 changes: 3 additions & 11 deletions src/core/Akka.Streams/Implementation/Fusing/Ops.cs
Expand Up @@ -2905,6 +2905,8 @@ private sealed class Logic : InAndOutGraphStageLogic
private Attributes.LogLevels _logLevels;
private ILoggingAdapter _log;

protected override object LogSource => Akka.Event.LogSource.Create(_stage._name);

public Logic(Log<T> stage, Attributes inheritedAttributes) : base(stage.Shape)
{
_stage = stage;
Expand Down Expand Up @@ -2981,17 +2983,7 @@ public override void PreStart()
_log = _stage._adapter;
else
{
try
{
var materializer = ActorMaterializerHelper.Downcast(Materializer);
_log = new BusLogging(materializer.System.EventStream, _stage._name, GetType(), materializer.System.Settings.LogFormatter);
}
catch (Exception ex)
{
throw new Exception(
"Log stage can only provide LoggingAdapter when used with ActorMaterializer! Provide a LoggingAdapter explicitly or use the actor based flow materializer.",
ex);
}
_log = Log;
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/core/Akka.Streams/Stage/GraphStage.cs
Expand Up @@ -840,7 +840,7 @@ protected GraphStageLogic(int inCount, int outCount)
/// <param name="shape">TBD</param>
protected GraphStageLogic(Shape shape) : this(shape.Inlets.Count(), shape.Outlets.Count())
{
LogSource = Akka.Event.LogSource.Create(shape);
LogSource = Akka.Event.LogSource.Create(shape.ToString());
}

/// <summary>
Expand Down Expand Up @@ -1708,7 +1708,7 @@ protected StageActor GetStageActor(StageActorRef.Receive receive)
/// Override and return a name to be given to the StageActor of this stage.
///
/// This method will be only invoked and used once, during the first <see cref="GetStageActor"/>
/// invocation whichc reates the actor, since subsequent `getStageActors` calls function
/// invocation which creates the actor, since subsequent `getStageActors` calls function
/// like `become`, rather than creating new actors.
///
/// Returns an empty string by default, which means that the name will a unique generated String (e.g. "$$a").
Expand Down
13 changes: 10 additions & 3 deletions src/core/Akka/Event/Logging.cs
Expand Up @@ -47,7 +47,7 @@ public static LogSource Create(object o)
return new LogSource(actorRef.Path.ToString(), SourceType(actorRef));
case string str:
return new LogSource(str, SourceType(str));
case System.Type t:
case Type t:
return new LogSource(Logging.SimpleName(t), t);
default:
return new LogSource(Logging.SimpleName(o), SourceType(o));
Expand All @@ -64,18 +64,25 @@ public static LogSource Create(object o, ActorSystem system)
return new LogSource(FromActorRef(actorRef, system), SourceType(actorRef));
case string str:
return new LogSource(FromString(str, system), SourceType(str));
case System.Type t:
case Type t:
return new LogSource(FromType(t, system), t);
case LogSource logSource:
return logSource; // if someone's already created a LogSource, just use it
default:
return new LogSource(FromType(o.GetType(), system), SourceType(o));
}
}

public static LogSource Create(string source, Type t)
{
return new LogSource(source, t);
}

public static Type SourceType(object o)
{
switch (o)
{
case System.Type t:
case Type t:
return t;
case IActorContext context:
return context.Props.Type;
Expand Down

0 comments on commit 906c4d1

Please sign in to comment.