Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Akka.Streams] make default LogSources actually usable #7168

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -3440,6 +3440,7 @@ namespace Akka.Event
public System.Type Type { get; }
public static Akka.Event.LogSource Create(object o) { }
public static Akka.Event.LogSource Create(object o, Akka.Actor.ActorSystem system) { }
public static Akka.Event.LogSource Create(string source, System.Type t) { }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Public API - this basically calls the CTOR of LogSource.

public static string FromActor(Akka.Actor.IActorContext actor, Akka.Actor.ActorSystem system) { }
public static string FromActorRef(Akka.Actor.IActorRef a, Akka.Actor.ActorSystem system) { }
public static string FromString(string source, Akka.Actor.ActorSystem system) { }
Expand Down
Expand Up @@ -3430,6 +3430,7 @@ namespace Akka.Event
public System.Type Type { get; }
public static Akka.Event.LogSource Create(object o) { }
public static Akka.Event.LogSource Create(object o, Akka.Actor.ActorSystem system) { }
public static Akka.Event.LogSource Create(string source, System.Type t) { }
public static string FromActor(Akka.Actor.IActorContext actor, Akka.Actor.ActorSystem system) { }
public static string FromActorRef(Akka.Actor.IActorRef a, Akka.Actor.ActorSystem system) { }
public static string FromString(string source, Akka.Actor.ActorSystem system) { }
Expand Down
122 changes: 122 additions & 0 deletions src/core/Akka.Streams.Tests/AkkaStreamsLogSourceSpec.cs
@@ -0,0 +1,122 @@
// -----------------------------------------------------------------------
// <copyright file="AkkaStreamsLogSourceSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2024 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2024 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Event;
using Akka.Streams.Dsl;
using Akka.Streams.Stage;
using Akka.TestKit;
using Xunit;
using Xunit.Abstractions;
using FluentAssertions;

namespace Akka.Streams.Tests;

public class AkkaStreamsLogSourceSpec : AkkaSpec
{
public AkkaStreamsLogSourceSpec(ITestOutputHelper output) : base(output, "akka.loglevel=DEBUG")
{
}

// create a custom Flow shape graph stage
private class TestLogStage<T> : GraphStage<FlowShape<T, T>>
{
private readonly string _name;

public TestLogStage(string name)
{
_name = name;
Shape = new FlowShape<T, T>(In, Out);
}

public Inlet<T> In { get; } = new("LogStage.in");
public Outlet<T> Out { get; } = new("LogStage.out");

public override FlowShape<T, T> Shape { get; }

protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) => new Logic(this);

private sealed class Logic : InAndOutGraphStageLogic
{
private readonly TestLogStage<T> _stage;

public Logic(TestLogStage<T> stage) : base(stage.Shape)
{
_stage = stage;
SetHandler(stage.In, this);
SetHandler(stage.Out, this);
}

public override void OnPush()
{
var element = Grab(_stage.In);
Log.Info($"Element: {element}");
Push(_stage.Out, element);
}

public override void OnPull()
{
Pull(_stage.In);
}
}
}


[Fact]
public async Task LogStage_should_log_elements_with_coherent_actorpath()
{
var probe = CreateTestProbe();
var source = Source.From(Enumerable.Range(1, 5));
var flow = Flow.Create<int>().Log("log")
.To(Sink.ActorRef<int>(probe.Ref, "completed", ex => new Status.Failure(ex)));

// create a probe and subscribe it to Debug level events
var logProbe = CreateTestProbe();
Sys.EventStream.Subscribe(logProbe.Ref, typeof(Debug));


source.RunWith(flow, Sys);

await probe.ExpectMsgAsync(1);
await probe.ExpectMsgAsync(2);
await probe.ExpectMsgAsync(3);
await probe.ExpectMsgAsync(4);
await probe.ExpectMsgAsync(5);

// check just the first log message
var logMessage = logProbe.ExpectMsg<Debug>();
logMessage.LogSource.Should().Contain("StreamSupervisor");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Laying down some regression tests here to make sure someone doesn't cleverly optimize-away useful data out of the LogSource name for these stages.

}

[Fact]
public async Task CustomStage_should_log_elements_with_friendly_name()
{
var probe = CreateTestProbe();
var source = Source.From(Enumerable.Range(1, 5));
var flow = Flow.Create<int>().Via(new TestLogStage<int>("custom"))
.To(Sink.ActorRef<int>(probe.Ref, "completed", ex => new Status.Failure(ex)));

// create a probe and subscribe it to Debug level events
var logProbe = CreateTestProbe();
Sys.EventStream.Subscribe(logProbe.Ref, typeof(Info));


source.RunWith(flow, Sys);

await probe.ExpectMsgAsync(1);
await probe.ExpectMsgAsync(2);
await probe.ExpectMsgAsync(3);
await probe.ExpectMsgAsync(4);
await probe.ExpectMsgAsync(5);

// check just the first log message
var logMessage = logProbe.ExpectMsg<Info>();
logMessage.LogSource.Should().Contain("StreamSupervisor");
}
}
18 changes: 16 additions & 2 deletions src/core/Akka.Streams/Implementation/ActorMaterializerImpl.cs
Expand Up @@ -384,7 +384,21 @@ public override TMat Materialize<TMat>(IGraph<ClosedShape, TMat> runnable, Func<
/// </summary>
/// <param name="logSource">The source that produces the log events.</param>
/// <returns>The newly created logging adapter.</returns>
public override ILoggingAdapter MakeLogger(object logSource) => Logging.GetLogger(System, logSource);
public override ILoggingAdapter MakeLogger(object logSource)
{
string actorPath;
LogSource newSource;
if (logSource is not LogSource s)
{
actorPath = $"{s}({LogSource.FromActorRef(_supervisor, System)})";
newSource = LogSource.Create(actorPath, s.Type);
return Logging.GetLogger(System, newSource);
}

actorPath = $"{s.Source}({LogSource.FromActorRef(_supervisor, System)})";
newSource = LogSource.Create(actorPath, s.Type);
return Logging.GetLogger(System, newSource);
}

/// <summary>
/// TBD
Expand All @@ -402,7 +416,7 @@ public override void Shutdown()
Supervisor.Tell(PoisonPill.Instance);
}

private ILoggingAdapter GetLogger() => _system.Log;
private ILoggingAdapter GetLogger() => MakeLogger(_supervisor);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Always create a new logger, don't filch and just use the ActorSystem's one.

}

/// <summary>
Expand Down
14 changes: 3 additions & 11 deletions src/core/Akka.Streams/Implementation/Fusing/Ops.cs
Expand Up @@ -2905,6 +2905,8 @@ private sealed class Logic : InAndOutGraphStageLogic
private Attributes.LogLevels _logLevels;
private ILoggingAdapter _log;

protected override object LogSource => Akka.Event.LogSource.Create(_stage._name);

public Logic(Log<T> stage, Attributes inheritedAttributes) : base(stage.Shape)
{
_stage = stage;
Expand Down Expand Up @@ -2981,17 +2983,7 @@ public override void PreStart()
_log = _stage._adapter;
else
{
try
{
var materializer = ActorMaterializerHelper.Downcast(Materializer);
_log = new BusLogging(materializer.System.EventStream, _stage._name, GetType(), materializer.System.Settings.LogFormatter);
}
catch (Exception ex)
{
throw new Exception(
"Log stage can only provide LoggingAdapter when used with ActorMaterializer! Provide a LoggingAdapter explicitly or use the actor based flow materializer.",
ex);
}
_log = Log;
Danthar marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/core/Akka.Streams/Stage/GraphStage.cs
Expand Up @@ -840,7 +840,7 @@ protected GraphStageLogic(int inCount, int outCount)
/// <param name="shape">TBD</param>
protected GraphStageLogic(Shape shape) : this(shape.Inlets.Count(), shape.Outlets.Count())
{
LogSource = Akka.Event.LogSource.Create(shape);
LogSource = Akka.Event.LogSource.Create(shape.ToString());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Get the full details on the shape so we can see input and output ports too.

}

/// <summary>
Expand Down Expand Up @@ -1708,7 +1708,7 @@ protected StageActor GetStageActor(StageActorRef.Receive receive)
/// Override and return a name to be given to the StageActor of this stage.
///
/// This method will be only invoked and used once, during the first <see cref="GetStageActor"/>
/// invocation whichc reates the actor, since subsequent `getStageActors` calls function
/// invocation which creates the actor, since subsequent `getStageActors` calls function
/// like `become`, rather than creating new actors.
///
/// Returns an empty string by default, which means that the name will a unique generated String (e.g. "$$a").
Expand Down
13 changes: 10 additions & 3 deletions src/core/Akka/Event/Logging.cs
Expand Up @@ -47,7 +47,7 @@ public static LogSource Create(object o)
return new LogSource(actorRef.Path.ToString(), SourceType(actorRef));
case string str:
return new LogSource(str, SourceType(str));
case System.Type t:
case Type t:
return new LogSource(Logging.SimpleName(t), t);
default:
return new LogSource(Logging.SimpleName(o), SourceType(o));
Expand All @@ -64,18 +64,25 @@ public static LogSource Create(object o, ActorSystem system)
return new LogSource(FromActorRef(actorRef, system), SourceType(actorRef));
case string str:
return new LogSource(FromString(str, system), SourceType(str));
case System.Type t:
case Type t:
return new LogSource(FromType(t, system), t);
case LogSource logSource:
return logSource; // if someone's already created a LogSource, just use it
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If someone calls LogSource.Create with a ... LogSource... as input, just use the LogSource. This was "eating" user-defined LogSource names before.

default:
return new LogSource(FromType(o.GetType(), system), SourceType(o));
}
}

public static LogSource Create(string source, Type t)
{
return new LogSource(source, t);
}

public static Type SourceType(object o)
{
switch (o)
{
case System.Type t:
case Type t:
return t;
case IActorContext context:
return context.Props.Type;
Expand Down