Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Akka.Actor][Perf] Reduce Envelope copying in message-processing hotpaths #6830

Open
wants to merge 9 commits into
base: dev
Choose a base branch
from
8 changes: 4 additions & 4 deletions src/benchmark/Akka.Benchmarks/Actor/PingPongBenchmarks.cs
Expand Up @@ -16,15 +16,15 @@
namespace Akka.Benchmarks.Actor
{
[Config(typeof(MonitoringConfig))]
[SimpleJob(RunStrategy.Monitoring, launchCount: 3, warmupCount: 3, targetCount: 3)]
[SimpleJob(RunStrategy.Monitoring, launchCount: 10, warmupCount: 10, targetCount: 10)]
public class PingPongBenchmarks
{
public const int Operations = 1_000_000;
private TimeSpan timeout;
private ActorSystem system;
private IActorRef ping;

[GlobalSetup]
[IterationSetup]
public void Setup()
{
timeout = TimeSpan.FromMinutes(1);
Expand All @@ -33,13 +33,13 @@ public void Setup()
ping = system.ActorOf(Props.Create(() => new Ping(pong)));
}

[GlobalCleanup]
[IterationCleanup]
public void Cleanup()
{
system.Dispose();
}

[Benchmark(OperationsPerInvoke = Operations)]
[Benchmark(OperationsPerInvoke = Operations * 2)]
public async Task Actor_ping_pong_single_pair_in_memory()
{
await ping.Ask(StartTest.Instance, timeout);
Expand Down
42 changes: 39 additions & 3 deletions src/benchmark/Akka.Benchmarks/Configurations/Configs.cs
Expand Up @@ -5,22 +5,57 @@
// </copyright>
//-----------------------------------------------------------------------

using System.Reflection;
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Columns;
using BenchmarkDotNet.Configs;
using BenchmarkDotNet.Diagnosers;
using BenchmarkDotNet.Exporters;
using BenchmarkDotNet.Loggers;
using BenchmarkDotNet.Reports;
using BenchmarkDotNet.Running;

namespace Akka.Benchmarks.Configurations
{
public class RequestsPerSecondColumn : IColumn
{
public string Id => nameof(RequestsPerSecondColumn);
public string ColumnName => "Req/sec";

public bool IsDefault(Summary summary, BenchmarkCase benchmarkCase) => false;
public string GetValue(Summary summary, BenchmarkCase benchmarkCase) => GetValue(summary, benchmarkCase, null);
public bool IsAvailable(Summary summary) => true;
public bool AlwaysShow => true;
public ColumnCategory Category => ColumnCategory.Custom;
public int PriorityInCategory => -1;
public bool IsNumeric => true;
public UnitType UnitType => UnitType.Dimensionless;
public string Legend => "Requests per Second";

public string GetValue(Summary summary, BenchmarkCase benchmarkCase, SummaryStyle style)
{
var benchmarkAttribute = benchmarkCase.Descriptor.WorkloadMethod.GetCustomAttribute<BenchmarkAttribute>();
var totalOperations = benchmarkAttribute?.OperationsPerInvoke ?? 1;

var report = summary[benchmarkCase];
var nsPerOperation = report.ResultStatistics.Mean;
var operationsPerSecond = 1 / (nsPerOperation / 1e9);


return operationsPerSecond.ToString("N2"); // or format as you like
}
}


/// <summary>
/// Basic BenchmarkDotNet configuration used for microbenchmarks.
/// </summary>
public class MicroBenchmarkConfig : ManualConfig
{
public MicroBenchmarkConfig()
{
this.Add(MemoryDiagnoser.Default);
this.Add(MarkdownExporter.GitHub);
AddDiagnoser(MemoryDiagnoser.Default);
AddExporter(MarkdownExporter.GitHub);
AddLogger(ConsoleLogger.Default);
}
}
Expand All @@ -32,7 +67,8 @@ public class MonitoringConfig : ManualConfig
{
public MonitoringConfig()
{
this.Add(MarkdownExporter.GitHub);
AddExporter(MarkdownExporter.GitHub);
AddColumn(new RequestsPerSecondColumn());
}
}
}
Expand Up @@ -87,7 +87,7 @@ namespace Akka.Actor
protected void AddWatcher(Akka.Actor.IActorRef watchee, Akka.Actor.IActorRef watcher) { }
protected void AddressTerminated(Akka.Actor.Address address) { }
public virtual Akka.Actor.IActorRef AttachChild(Akka.Actor.Props props, bool isSystemService, string name = null) { }
protected virtual void AutoReceiveMessage(Akka.Actor.Envelope envelope) { }
protected virtual void AutoReceiveMessage([System.Runtime.CompilerServices.IsReadOnlyAttribute()] Akka.Actor.Envelope& modreq(System.Runtime.InteropServices.InAttribute) envelope) { }
public void Become(Akka.Actor.Receive receive) { }
public void BecomeStacked(Akka.Actor.Receive receive) { }
public void CheckReceiveTimeout(bool reschedule = True) { }
Expand All @@ -102,7 +102,7 @@ namespace Akka.Actor
public Akka.Actor.IInternalActorRef GetSingleChild(string name) { }
public void Init(bool sendSupervise, Akka.Dispatch.MailboxType mailboxType) { }
public Akka.Actor.Internal.ChildRestartStats InitChild(Akka.Actor.IInternalActorRef actor) { }
public void Invoke(Akka.Actor.Envelope envelope) { }
public void Invoke([System.Runtime.CompilerServices.IsReadOnlyAttribute()] ref Akka.Actor.Envelope envelope) { }
protected virtual void PreStart() { }
protected void PrepareForNewActor() { }
protected virtual void ReceiveMessage(object message) { }
Expand All @@ -113,7 +113,7 @@ namespace Akka.Actor
public void ReserveChild(string name) { }
public void Restart(System.Exception cause) { }
public void Resume(System.Exception causedByFailure) { }
public virtual void SendMessage(Akka.Actor.Envelope message) { }
public virtual void SendMessage([System.Runtime.CompilerServices.IsReadOnlyAttribute()] Akka.Actor.Envelope& modreq(System.Runtime.InteropServices.InAttribute) message) { }
public virtual void SendMessage(Akka.Actor.IActorRef sender, object message) { }
public virtual void SendSystemMessage(Akka.Dispatch.SysMsg.ISystemMessage systemMessage) { }
protected void SetActorFields(Akka.Actor.ActorBase actor) { }
Expand Down Expand Up @@ -2892,7 +2892,7 @@ namespace Akka.Dispatch
}
public interface IDequeBasedMessageQueueSemantics : Akka.Dispatch.ISemantics
{
void EnqueueFirst(Akka.Actor.Envelope envelope);
void EnqueueFirst([System.Runtime.CompilerServices.IsReadOnlyAttribute()] Akka.Actor.Envelope& modreq(System.Runtime.InteropServices.InAttribute) envelope);
}
public interface IDispatcherPrerequisites
{
Expand Down Expand Up @@ -2966,7 +2966,7 @@ namespace Akka.Dispatch
public System.Nullable<long> ThroughputDeadlineTime { get; set; }
public virtual void Attach(Akka.Actor.ActorCell cell) { }
public virtual void Detach(Akka.Actor.ActorCell cell) { }
public virtual void Dispatch(Akka.Actor.ActorCell cell, Akka.Actor.Envelope envelope) { }
public virtual void Dispatch(Akka.Actor.ActorCell cell, [System.Runtime.CompilerServices.IsReadOnlyAttribute()] Akka.Actor.Envelope& modreq(System.Runtime.InteropServices.InAttribute) envelope) { }
protected abstract void ExecuteTask(Akka.Dispatch.IRunnable run);
protected void ReportFailure(System.Exception ex) { }
public void Schedule(System.Action run) { }
Expand Down Expand Up @@ -3050,7 +3050,7 @@ namespace Akka.Dispatch.MessageQueues
public bool HasMessages { get; }
protected abstract int LockedCount { get; }
public void CleanUp(Akka.Actor.IActorRef owner, Akka.Dispatch.MessageQueues.IMessageQueue deadletters) { }
public void Enqueue(Akka.Actor.IActorRef receiver, Akka.Actor.Envelope envelope) { }
public void Enqueue(Akka.Actor.IActorRef receiver, [System.Runtime.CompilerServices.IsReadOnlyAttribute()] ref Akka.Actor.Envelope envelope) { }
protected abstract void LockedEnqueue(Akka.Actor.Envelope envelope);
protected abstract bool LockedTryDequeue(out Akka.Actor.Envelope envelope);
public bool TryDequeue(out Akka.Actor.Envelope envelope) { }
Expand All @@ -3068,7 +3068,7 @@ namespace Akka.Dispatch.MessageQueues
public bool HasMessages { get; }
public System.TimeSpan PushTimeOut { get; set; }
public void CleanUp(Akka.Actor.IActorRef owner, Akka.Dispatch.MessageQueues.IMessageQueue deadletters) { }
public void Enqueue(Akka.Actor.IActorRef receiver, Akka.Actor.Envelope envelope) { }
public void Enqueue(Akka.Actor.IActorRef receiver, [System.Runtime.CompilerServices.IsReadOnlyAttribute()] ref Akka.Actor.Envelope envelope) { }
public bool TryDequeue(out Akka.Actor.Envelope envelope) { }
}
public class DequeWrapperMessageQueue : Akka.Dispatch.IDequeBasedMessageQueueSemantics, Akka.Dispatch.ISemantics, Akka.Dispatch.MessageQueues.IMessageQueue
Expand All @@ -3078,16 +3078,16 @@ namespace Akka.Dispatch.MessageQueues
public int Count { get; }
public bool HasMessages { get; }
public void CleanUp(Akka.Actor.IActorRef owner, Akka.Dispatch.MessageQueues.IMessageQueue deadletters) { }
public void Enqueue(Akka.Actor.IActorRef receiver, Akka.Actor.Envelope envelope) { }
public void EnqueueFirst(Akka.Actor.Envelope envelope) { }
public void Enqueue(Akka.Actor.IActorRef receiver, [System.Runtime.CompilerServices.IsReadOnlyAttribute()] ref Akka.Actor.Envelope envelope) { }
public void EnqueueFirst([System.Runtime.CompilerServices.IsReadOnlyAttribute()] ref Akka.Actor.Envelope envelope) { }
public bool TryDequeue(out Akka.Actor.Envelope envelope) { }
}
public interface IMessageQueue
{
int Count { get; }
bool HasMessages { get; }
void CleanUp(Akka.Actor.IActorRef owner, Akka.Dispatch.MessageQueues.IMessageQueue deadletters);
void Enqueue(Akka.Actor.IActorRef receiver, Akka.Actor.Envelope envelope);
void Enqueue(Akka.Actor.IActorRef receiver, [System.Runtime.CompilerServices.IsReadOnlyAttribute()] Akka.Actor.Envelope& modreq(System.Runtime.InteropServices.InAttribute) envelope);
bool TryDequeue(out Akka.Actor.Envelope envelope);
}
public class UnboundedDequeMessageQueue : Akka.Dispatch.MessageQueues.DequeWrapperMessageQueue, Akka.Dispatch.IDequeBasedMessageQueueSemantics, Akka.Dispatch.ISemantics, Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics, Akka.Dispatch.IUnboundedMessageQueueSemantics
Expand All @@ -3100,22 +3100,22 @@ namespace Akka.Dispatch.MessageQueues
public int Count { get; }
public bool HasMessages { get; }
public void CleanUp(Akka.Actor.IActorRef owner, Akka.Dispatch.MessageQueues.IMessageQueue deadletters) { }
public void Enqueue(Akka.Actor.IActorRef receiver, Akka.Actor.Envelope envelope) { }
public void Enqueue(Akka.Actor.IActorRef receiver, [System.Runtime.CompilerServices.IsReadOnlyAttribute()] ref Akka.Actor.Envelope envelope) { }
public bool TryDequeue(out Akka.Actor.Envelope envelope) { }
}
public class UnboundedPriorityMessageQueue : Akka.Dispatch.MessageQueues.BlockingMessageQueue, Akka.Dispatch.IDequeBasedMessageQueueSemantics, Akka.Dispatch.ISemantics, Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics, Akka.Dispatch.IUnboundedMessageQueueSemantics
{
public UnboundedPriorityMessageQueue(System.Func<object, int> priorityGenerator, int initialCapacity) { }
protected override int LockedCount { get; }
public void EnqueueFirst(Akka.Actor.Envelope envelope) { }
public void EnqueueFirst([System.Runtime.CompilerServices.IsReadOnlyAttribute()] ref Akka.Actor.Envelope envelope) { }
protected override void LockedEnqueue(Akka.Actor.Envelope envelope) { }
protected override bool LockedTryDequeue(out Akka.Actor.Envelope envelope) { }
}
public class UnboundedStablePriorityMessageQueue : Akka.Dispatch.MessageQueues.BlockingMessageQueue, Akka.Dispatch.IDequeBasedMessageQueueSemantics, Akka.Dispatch.ISemantics, Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics, Akka.Dispatch.IUnboundedMessageQueueSemantics
{
public UnboundedStablePriorityMessageQueue(System.Func<object, int> priorityGenerator, int initialCapacity) { }
protected override int LockedCount { get; }
public void EnqueueFirst(Akka.Actor.Envelope envelope) { }
public void EnqueueFirst([System.Runtime.CompilerServices.IsReadOnlyAttribute()] ref Akka.Actor.Envelope envelope) { }
protected override void LockedEnqueue(Akka.Actor.Envelope envelope) { }
protected override bool LockedTryDequeue(out Akka.Actor.Envelope envelope) { }
}
Expand Down
Expand Up @@ -87,7 +87,7 @@ namespace Akka.Actor
protected void AddWatcher(Akka.Actor.IActorRef watchee, Akka.Actor.IActorRef watcher) { }
protected void AddressTerminated(Akka.Actor.Address address) { }
public virtual Akka.Actor.IActorRef AttachChild(Akka.Actor.Props props, bool isSystemService, string name = null) { }
protected virtual void AutoReceiveMessage(Akka.Actor.Envelope envelope) { }
protected virtual void AutoReceiveMessage(Akka.Actor.Envelope& modreq(System.Runtime.InteropServices.InAttribute) envelope) { }
public void Become(Akka.Actor.Receive receive) { }
public void BecomeStacked(Akka.Actor.Receive receive) { }
public void CheckReceiveTimeout(bool reschedule = True) { }
Expand All @@ -102,7 +102,7 @@ namespace Akka.Actor
public Akka.Actor.IInternalActorRef GetSingleChild(string name) { }
public void Init(bool sendSupervise, Akka.Dispatch.MailboxType mailboxType) { }
public Akka.Actor.Internal.ChildRestartStats InitChild(Akka.Actor.IInternalActorRef actor) { }
public void Invoke(Akka.Actor.Envelope envelope) { }
public void Invoke(ref Akka.Actor.Envelope envelope) { }
protected virtual void PreStart() { }
protected void PrepareForNewActor() { }
protected virtual void ReceiveMessage(object message) { }
Expand All @@ -113,7 +113,7 @@ namespace Akka.Actor
public void ReserveChild(string name) { }
public void Restart(System.Exception cause) { }
public void Resume(System.Exception causedByFailure) { }
public virtual void SendMessage(Akka.Actor.Envelope message) { }
public virtual void SendMessage(Akka.Actor.Envelope& modreq(System.Runtime.InteropServices.InAttribute) message) { }
public virtual void SendMessage(Akka.Actor.IActorRef sender, object message) { }
public virtual void SendSystemMessage(Akka.Dispatch.SysMsg.ISystemMessage systemMessage) { }
protected void SetActorFields(Akka.Actor.ActorBase actor) { }
Expand Down Expand Up @@ -2885,7 +2885,7 @@ namespace Akka.Dispatch
}
public interface IDequeBasedMessageQueueSemantics : Akka.Dispatch.ISemantics
{
void EnqueueFirst(Akka.Actor.Envelope envelope);
void EnqueueFirst(Akka.Actor.Envelope& modreq(System.Runtime.InteropServices.InAttribute) envelope);
}
public interface IDispatcherPrerequisites
{
Expand Down Expand Up @@ -2958,7 +2958,7 @@ namespace Akka.Dispatch
public System.Nullable<long> ThroughputDeadlineTime { get; set; }
public virtual void Attach(Akka.Actor.ActorCell cell) { }
public virtual void Detach(Akka.Actor.ActorCell cell) { }
public virtual void Dispatch(Akka.Actor.ActorCell cell, Akka.Actor.Envelope envelope) { }
public virtual void Dispatch(Akka.Actor.ActorCell cell, Akka.Actor.Envelope& modreq(System.Runtime.InteropServices.InAttribute) envelope) { }
protected abstract void ExecuteTask(Akka.Dispatch.IRunnable run);
protected void ReportFailure(System.Exception ex) { }
public void Schedule(System.Action run) { }
Expand Down Expand Up @@ -3042,7 +3042,7 @@ namespace Akka.Dispatch.MessageQueues
public bool HasMessages { get; }
protected abstract int LockedCount { get; }
public void CleanUp(Akka.Actor.IActorRef owner, Akka.Dispatch.MessageQueues.IMessageQueue deadletters) { }
public void Enqueue(Akka.Actor.IActorRef receiver, Akka.Actor.Envelope envelope) { }
public void Enqueue(Akka.Actor.IActorRef receiver, ref Akka.Actor.Envelope envelope) { }
protected abstract void LockedEnqueue(Akka.Actor.Envelope envelope);
protected abstract bool LockedTryDequeue(out Akka.Actor.Envelope envelope);
public bool TryDequeue(out Akka.Actor.Envelope envelope) { }
Expand All @@ -3060,7 +3060,7 @@ namespace Akka.Dispatch.MessageQueues
public bool HasMessages { get; }
public System.TimeSpan PushTimeOut { get; set; }
public void CleanUp(Akka.Actor.IActorRef owner, Akka.Dispatch.MessageQueues.IMessageQueue deadletters) { }
public void Enqueue(Akka.Actor.IActorRef receiver, Akka.Actor.Envelope envelope) { }
public void Enqueue(Akka.Actor.IActorRef receiver, ref Akka.Actor.Envelope envelope) { }
public bool TryDequeue(out Akka.Actor.Envelope envelope) { }
}
public class DequeWrapperMessageQueue : Akka.Dispatch.IDequeBasedMessageQueueSemantics, Akka.Dispatch.ISemantics, Akka.Dispatch.MessageQueues.IMessageQueue
Expand All @@ -3070,16 +3070,16 @@ namespace Akka.Dispatch.MessageQueues
public int Count { get; }
public bool HasMessages { get; }
public void CleanUp(Akka.Actor.IActorRef owner, Akka.Dispatch.MessageQueues.IMessageQueue deadletters) { }
public void Enqueue(Akka.Actor.IActorRef receiver, Akka.Actor.Envelope envelope) { }
public void EnqueueFirst(Akka.Actor.Envelope envelope) { }
public void Enqueue(Akka.Actor.IActorRef receiver, ref Akka.Actor.Envelope envelope) { }
public void EnqueueFirst(ref Akka.Actor.Envelope envelope) { }
public bool TryDequeue(out Akka.Actor.Envelope envelope) { }
}
public interface IMessageQueue
{
int Count { get; }
bool HasMessages { get; }
void CleanUp(Akka.Actor.IActorRef owner, Akka.Dispatch.MessageQueues.IMessageQueue deadletters);
void Enqueue(Akka.Actor.IActorRef receiver, Akka.Actor.Envelope envelope);
void Enqueue(Akka.Actor.IActorRef receiver, Akka.Actor.Envelope& modreq(System.Runtime.InteropServices.InAttribute) envelope);
bool TryDequeue(out Akka.Actor.Envelope envelope);
}
public class UnboundedDequeMessageQueue : Akka.Dispatch.MessageQueues.DequeWrapperMessageQueue, Akka.Dispatch.IDequeBasedMessageQueueSemantics, Akka.Dispatch.ISemantics, Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics, Akka.Dispatch.IUnboundedMessageQueueSemantics
Expand All @@ -3092,22 +3092,22 @@ namespace Akka.Dispatch.MessageQueues
public int Count { get; }
public bool HasMessages { get; }
public void CleanUp(Akka.Actor.IActorRef owner, Akka.Dispatch.MessageQueues.IMessageQueue deadletters) { }
public void Enqueue(Akka.Actor.IActorRef receiver, Akka.Actor.Envelope envelope) { }
public void Enqueue(Akka.Actor.IActorRef receiver, ref Akka.Actor.Envelope envelope) { }
public bool TryDequeue(out Akka.Actor.Envelope envelope) { }
}
public class UnboundedPriorityMessageQueue : Akka.Dispatch.MessageQueues.BlockingMessageQueue, Akka.Dispatch.IDequeBasedMessageQueueSemantics, Akka.Dispatch.ISemantics, Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics, Akka.Dispatch.IUnboundedMessageQueueSemantics
{
public UnboundedPriorityMessageQueue(System.Func<object, int> priorityGenerator, int initialCapacity) { }
protected override int LockedCount { get; }
public void EnqueueFirst(Akka.Actor.Envelope envelope) { }
public void EnqueueFirst(ref Akka.Actor.Envelope envelope) { }
protected override void LockedEnqueue(Akka.Actor.Envelope envelope) { }
protected override bool LockedTryDequeue(out Akka.Actor.Envelope envelope) { }
}
public class UnboundedStablePriorityMessageQueue : Akka.Dispatch.MessageQueues.BlockingMessageQueue, Akka.Dispatch.IDequeBasedMessageQueueSemantics, Akka.Dispatch.ISemantics, Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics, Akka.Dispatch.IUnboundedMessageQueueSemantics
{
public UnboundedStablePriorityMessageQueue(System.Func<object, int> priorityGenerator, int initialCapacity) { }
protected override int LockedCount { get; }
public void EnqueueFirst(Akka.Actor.Envelope envelope) { }
public void EnqueueFirst(ref Akka.Actor.Envelope envelope) { }
protected override void LockedEnqueue(Akka.Actor.Envelope envelope) { }
protected override bool LockedTryDequeue(out Akka.Actor.Envelope envelope) { }
}
Expand Down