Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Akka.Streams] make default LogSources actually usable #7168

Merged

Conversation

Aaronontheweb
Copy link
Member

@Aaronontheweb Aaronontheweb commented Apr 24, 2024

Changes

Fixes #7126

So TL;DR;, logging from within an Akka.Streams graph stage is basically unusable and has been since the inception of Akka.Streams.

Behold, what my logs look like from my Akka.Streams-powered MQTT client library:

[DEBUG][04/24/2024 00:50:51.923Z][Thread 0046][akka://test/user/turbomqtt-clients/tcp] Creating new TCP transport for [CreateTcpTransport { Options = MqttClientTcpOptions { AddressFamily = Unspecified, MaxFrameSize = 131072, Host = localhost, Port = 21883, ReconnectInterval = 00:00:05, MaxReconnectAttempts = 10 }, ProtocolVersion = V3_1_1 }]
[DEBUG][04/24/2024 00:50:51.924Z][Thread 0046][akka://test/user/turbomqtt-clients/tcp/$a] Created new TCP transport for client connecting to [localhost:21883]
[INFO][04/24/2024 00:50:51.928Z][Thread 0013][akka://test/user/turbomqtt-clients/tcp/$a] Attempting to connect to [localhost:21883]
[DEBUG][04/24/2024 00:50:51.929Z][Thread 0037][akka://test/user/turbomqtt-clients/tcp/$a] Attempting to connect to [localhost:21883] - resolved to [::1, 127.0.0.1]
[INFO][04/24/2024 00:50:51.930Z][Thread 0037][akka://test/user/turbomqtt-clients/tcp/$a] Successfully connected to [localhost:21883]
[DEBUG][04/24/2024 00:50:51.930Z][Thread 0013][LogSource (akka://test)] Encoded 1 messages using 37 bytes
[DEBUG][04/24/2024 00:50:51.930Z][Thread 0046][TcpMqtt311End2EndSpecs (akka://test)] Decoded 1 packets from transport.
[INFO][04/24/2024 00:50:51.930Z][Thread 0046][TcpMqtt311End2EndSpecs (akka://test)] Received packet of type Connect
[DEBUG][04/24/2024 00:50:51.930Z][Thread 0046][TcpMqtt311End2EndSpecs (akka://test)] Sending packet of type ConnAck using V3_1_1
[DEBUG][04/24/2024 00:50:51.932Z][Thread 0046][TcpMqtt311End2EndSpecs (akka://test)] Successfully wrote packet of type ConnAck [4 bytes] to transport.
[DEBUG][04/24/2024 00:50:51.932Z][Thread 0013][LogSource (akka://test)] Decoded [1] packets totaling [4] bytes
[DEBUG][04/24/2024 00:50:51.933Z][Thread 0013][LogSource (akka://test)] Received packet of type [ConnAck] from client.
[DEBUG][04/24/2024 00:50:51.933Z][Thread 0046][LogSource (akka://test)] Encoded 1 messages using 12 bytes
[DEBUG][04/24/2024 00:50:51.933Z][Thread 0013][TcpMqtt311End2EndSpecs (akka://test)] Decoded 1 packets from transport.
[INFO][04/24/2024 00:50:51.933Z][Thread 0013][TcpMqtt311End2EndSpecs (akka://test)] Received packet of type Subscribe
[DEBUG][04/24/2024 00:50:51.933Z][Thread 0013][TcpMqtt311End2EndSpecs (akka://test)] Sending packet of type SubAck using V3_1_1
[DEBUG][04/24/2024 00:50:51.933Z][Thread 0013][TcpMqtt311End2EndSpecs (akka://test)] Successfully wrote packet of type SubAck [5 bytes] to transport.
[DEBUG][04/24/2024 00:50:51.933Z][Thread 0032][LogSource (akka://test)] Decoded [1] packets totaling [5] bytes
[DEBUG][04/24/2024 00:50:51.933Z][Thread 0032][LogSource (akka://test)] Received packet of type [SubAck] from client.
[DEBUG][04/24/2024 00:50:51.934Z][Thread 0013][LogSource (akka://test)] Encoded 1 messages using 24 bytes
[WARNING][04/24/2024 00:50:51.934Z][Thread 0039][TcpMqtt311End2EndSpecs (akka://test)] Client test-client is being disconnected from server.
[DEBUG][04/24/2024 00:50:51.934Z][Thread 0039][TcpMqtt311End2EndSpecs (akka://test)] Sending packet of type Disconnect using V3_1_1
[ERROR][04/24/2024 00:50:51.934Z][Thread 0039][TcpMqtt311End2EndSpecs (akka://test)] Failed to write packet of type Disconnect to transport.
[ERROR][04/24/2024 00:50:51.934Z][Thread 0037][akka://test/user/turbomqtt-clients/tcp/$a] Failed to read from socket.

There are about 10 different stream stages in there, each of which get created twice (rebooting a failed connection) - I can't tell any of them apart, and I know I have a bug in that code somewhere.

Thus, I've made this PR and now these logs will look more like this going forward:

[INFO][04/24/2024 02:25:06.096Z][Thread 0016][FlowShape`2([LogStage.in] [LogStage.out])(akka://AkkaStreamsLogSourceSpec-1/user/StreamSupervisor-0)] Element: 1
[INFO][04/24/2024 02:25:06.097Z][Thread 0016][FlowShape`2([LogStage.in] [LogStage.out])(akka://AkkaStreamsLogSourceSpec-1/user/StreamSupervisor-0)] Element: 2
[INFO][04/24/2024 02:25:06.097Z][Thread 0016][FlowShape`2([LogStage.in] [LogStage.out])(akka://AkkaStreamsLogSourceSpec-1/user/StreamSupervisor-0)] Element: 3
[INFO][04/24/2024 02:25:06.098Z][Thread 0016][FlowShape`2([LogStage.in] [LogStage.out])(akka://AkkaStreamsLogSourceSpec-1/user/StreamSupervisor-0)] Element: 4
[INFO][04/24/2024 02:25:06.098Z][Thread 0016][FlowShape`2([LogStage.in] [LogStage.out])(akka://AkkaStreamsLogSourceSpec-1/user/StreamSupervisor-0)] Element: 5
[DEBUG][04/24/2024 02:25:06.119Z][Thread 0012][CoordinatedShutdown (akka://AkkaStreamsLogSourceSpec-1)] Performing phase [before-service-unbind] with [0] tasks.
[DEBUG][04/24/2024 02:25:06.119Z][Thread 0015][CoordinatedShutdown (akka://AkkaStreamsLogSourceSpec-1)] Performing phase [service-unbind] with [0] tasks.
[DEBUG][04/24/2024 02:25:06.120Z][Thread 0007][CoordinatedShutdown (akka://AkkaStreamsLogSourceSpec-1)] Performing phase [service-requests-done] with [0] tasks.
[DEBUG][04/24/2024 02:25:06.120Z][Thread 0017][CoordinatedShutdown (akka://AkkaStreamsLogSourceSpec-1)] Performing phase [service-stop] with [0] tasks.
[DEBUG][04/24/2024 02:25:06.120Z][Thread 0017][CoordinatedShutdown (akka://AkkaStreamsLogSourceSpec-1)] Performing phase [before-cluster-shutdown] with [0] tasks.

The default LogSource for any Akka.Streams stage, including built in ones is now {shape.ToString()}({streamSupervisorActorPath}).

The shape.ToString() method does the following by default:

public sealed override string ToString() => $"{GetType().Name}([{string.Join(", ", Inlets)}] [{string.Join(", ", Outlets)}])";

Which will print out the type of stage and a rough description of its inputs and outputs, in accordance with the names given to those Inlet and Outlet types. That will add a small amount of overhead to each stage we create (a .ToString() call that iterates over some types that return hard-coded strings back) but in return you can now immediately understand:

  1. Where in the actor hierarchy this stream is
  2. What type of stage it is
  3. What its inputs and outputs are

Moreover, if you override the GraphStage.LogSource with a friendlier name we will still append the StreamSupervisor.Path to it during logger construction, so you can still keep track of where this stream is within the actor hierarchy.

TL;DR; this is a substantial usability improvement for Akka.Streams.

Checklist

For significant changes, please ensure that the following have been completed (delete if not relevant):

Copy link
Member Author

@Aaronontheweb Aaronontheweb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Detailed my changes

@@ -3440,6 +3440,7 @@ namespace Akka.Event
public System.Type Type { get; }
public static Akka.Event.LogSource Create(object o) { }
public static Akka.Event.LogSource Create(object o, Akka.Actor.ActorSystem system) { }
public static Akka.Event.LogSource Create(string source, System.Type t) { }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Public API - this basically calls the CTOR of LogSource.


// check just the first log message
var logMessage = logProbe.ExpectMsg<Debug>();
logMessage.LogSource.Should().Contain("StreamSupervisor");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Laying down some regression tests here to make sure someone doesn't cleverly optimize-away useful data out of the LogSource name for these stages.

public override ILoggingAdapter MakeLogger(object logSource) => Logging.GetLogger(System, logSource);
public override ILoggingAdapter MakeLogger(object logSource)
{
if (logSource is not LogSource s) return Logging.GetLogger(System, logSource);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the old code if someone's not using a LogSource - I'm open to changing this so everything does it the new way.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, went ahead and made this change.

@@ -402,7 +408,7 @@ public override void Shutdown()
Supervisor.Tell(PoisonPill.Instance);
}

private ILoggingAdapter GetLogger() => _system.Log;
private ILoggingAdapter GetLogger() => MakeLogger(_supervisor);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Always create a new logger, don't filch and just use the ActorSystem's one.

@@ -840,7 +840,7 @@ protected GraphStageLogic(int inCount, int outCount)
/// <param name="shape">TBD</param>
protected GraphStageLogic(Shape shape) : this(shape.Inlets.Count(), shape.Outlets.Count())
{
LogSource = Akka.Event.LogSource.Create(shape);
LogSource = Akka.Event.LogSource.Create(shape.ToString());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Get the full details on the shape so we can see input and output ports too.

return new LogSource(FromType(t, system), t);
case LogSource logSource:
return logSource; // if someone's already created a LogSource, just use it
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If someone calls LogSource.Create with a ... LogSource... as input, just use the LogSource. This was "eating" user-defined LogSource names before.

@Aaronontheweb Aaronontheweb mentioned this pull request Apr 24, 2024
3 tasks
Copy link
Contributor

@Arkatufus Arkatufus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@Arkatufus Arkatufus enabled auto-merge (squash) April 24, 2024 15:02
@Aaronontheweb Aaronontheweb merged commit 906c4d1 into akkadotnet:dev Apr 24, 2024
9 of 12 checks passed
@Aaronontheweb Aaronontheweb deleted the fix-7126-AkkaStreams-LogSourceName branch April 24, 2024 16:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Akka.Streams: make it easier to pass in a friendly LogSource name
3 participants