Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] experimenting with System.Memory in AkkaPduCodec #6831

Draft
wants to merge 2 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 11 additions & 9 deletions src/benchmark/Akka.Benchmarks/Remoting/AkkaPduCodecBenchmark.cs
Expand Up @@ -50,8 +50,8 @@ public class AkkaPduCodecBenchmark

private readonly Ack _lastAck = new Ack(-1);

private ByteString _fullDecode;
private ByteString _pduDecoded;
private ReadOnlyMemory<byte> _fullDecode;
private ReadOnlyMemory<byte> _pduDecoded;
private Akka.Remote.Serialization.Proto.Msg.Payload _payloadDecoded;

[GlobalSetup]
Expand Down Expand Up @@ -79,9 +79,10 @@ public async Task Setup()

_recvCodec = new AkkaPduProtobuffCodec(_sys1);
_sendCodec = new AkkaPduProtobuffCodec(_sys2);
_fullDecode = CreatePayloadPdu();
_pduDecoded = ((Payload)_recvCodec.DecodePdu(_fullDecode)).Bytes;
_payloadDecoded = _recvCodec.DecodeMessage(_pduDecoded, _rarp, _addr1).MessageOption.SerializedMessage;
_fullDecode = CreatePayloadPdu().Memory;
_pduDecoded = ((Payload)_recvCodec.DecodePdu(ref _fullDecode)).Bytes.Memory;

_payloadDecoded = _recvCodec.DecodeMessage(ref _pduDecoded, _rarp, _addr1).MessageOption.SerializedMessage;
}

[GlobalCleanup]
Expand Down Expand Up @@ -149,10 +150,11 @@ public void DecodePayloadPdu()
{
for (var i = 0; i < Operations; i++)
{
var pdu = _recvCodec.DecodePdu(_fullDecode);
var pdu = _recvCodec.DecodePdu(ref _fullDecode);
if (pdu is Payload p)
{
var msg = _recvCodec.DecodeMessage(p.Bytes, _rarp, _addr1);
var b = p.Bytes.Memory;
var msg = _recvCodec.DecodeMessage(ref b, _rarp, _addr1);
var deserialize = MessageSerializer.Deserialize(_sys1, msg.MessageOption.SerializedMessage);
}
}
Expand All @@ -163,7 +165,7 @@ public void DecodePduOnly()
{
for (var i = 0; i < Operations; i++)
{
var pdu = _recvCodec.DecodePdu(_fullDecode);
var pdu = _recvCodec.DecodePdu(ref _fullDecode);
}
}

Expand All @@ -172,7 +174,7 @@ public void DecodeMessageOnly()
{
for (var i = 0; i < Operations; i++)
{
var msg = _recvCodec.DecodeMessage(_pduDecoded, _rarp, _addr1);
var msg = _recvCodec.DecodeMessage(ref _pduDecoded, _rarp, _addr1);
}
}

Expand Down
13 changes: 5 additions & 8 deletions src/core/Akka.Remote/Endpoint.cs
Expand Up @@ -8,17 +8,13 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Runtime.Serialization;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Dispatch;
using Akka.Dispatch.SysMsg;
using Akka.Event;
using Akka.Pattern;
using Akka.Remote.Serialization;
using Akka.Remote.Transport;
using Akka.Serialization;
using Akka.Util;
Expand Down Expand Up @@ -1942,7 +1938,7 @@ private void Reading()
}
else
{
var ackAndMessage = TryDecodeMessageAndAck(payload);
var ackAndMessage = TryDecodeMessageAndAck(ref payload);
if (ackAndMessage.AckOption != null && _reliableDeliverySupervisor != null)
_reliableDeliverySupervisor.Tell(ackAndMessage.AckOption);
if (ackAndMessage.MessageOption != null)
Expand Down Expand Up @@ -1996,7 +1992,8 @@ private void NotReading()
Receive<EndpointWriter.StopReading>(stop => stop.ReplyTo.Tell(new EndpointWriter.StoppedReading(stop.Writer)));
Receive<InboundPayload>(payload =>
{
var ackAndMessage = TryDecodeMessageAndAck(payload.Payload);
var mem = payload.Payload;
var ackAndMessage = TryDecodeMessageAndAck(ref mem);
if (ackAndMessage.AckOption != null && _reliableDeliverySupervisor != null)
_reliableDeliverySupervisor.Tell(ackAndMessage.AckOption);
});
Expand Down Expand Up @@ -2066,11 +2063,11 @@ private void DeliverAndAck()
deliverable.Deliverables.ForEach(msg => _msgDispatch.Dispatch(msg.Recipient, msg.RecipientAddress, msg.SerializedMessage, msg.SenderOptional));
}

private AckAndMessage TryDecodeMessageAndAck(ByteString pdu)
private AckAndMessage TryDecodeMessageAndAck(ref ReadOnlyMemory<byte> pdu)
{
try
{
return _codec.DecodeMessage(pdu, _provider, LocalAddress);
return _codec.DecodeMessage(ref pdu, _provider, LocalAddress);
}
catch (Exception ex)
{
Expand Down
69 changes: 32 additions & 37 deletions src/core/Akka.Remote/Transport/AkkaPduCodec.cs
Expand Up @@ -111,7 +111,7 @@ public Payload(ByteString bytes)
/// <summary>
/// TBD
/// </summary>
public ByteString Bytes { get; private set; }
public ByteString Bytes { get; }
}

/// <summary>
Expand Down Expand Up @@ -215,8 +215,8 @@ protected AkkaPduCodec(ActorSystem system)
/// <see cref="ByteString"/>.
/// </summary>
/// <param name="raw">Encoded raw byte representation of an Akka PDU</param>
/// <returns>Class representation of a PDU that can be used in a <see cref="PatternMatch"/>.</returns>
public abstract IAkkaPdu DecodePdu(ByteString raw);
/// <returns>Class representation of a PDU.</returns>
public abstract IAkkaPdu DecodePdu(ref ReadOnlyMemory<byte> raw);

/// <summary>
/// Takes an <see cref="IAkkaPdu"/> representation of an Akka PDU and returns its encoded form
Expand All @@ -229,7 +229,7 @@ public virtual ByteString EncodePdu(IAkkaPdu pdu)
switch (pdu)
{
case Payload p:
return ConstructPayload(p.Bytes);
return ConstructPayload(ByteString.CopyFrom(p.Bytes.Span));
case Heartbeat h:
return ConstructHeartbeat();
case Associate a:
Expand Down Expand Up @@ -275,7 +275,7 @@ public virtual ByteString EncodePdu(IAkkaPdu pdu)
/// <param name="provider">TBD</param>
/// <param name="localAddress">TBD</param>
/// <returns>TBD</returns>
public abstract AckAndMessage DecodeMessage(ByteString raw, IRemoteActorRefProvider provider, Address localAddress);
public abstract AckAndMessage DecodeMessage(ref ReadOnlyMemory<byte> raw, IRemoteActorRefProvider provider, Address localAddress);

/// <summary>
/// TBD
Expand All @@ -299,9 +299,9 @@ public virtual ByteString EncodePdu(IAkkaPdu pdu)
}

/// <summary>
/// TBD
/// INTERNAL API
/// </summary>
internal class AkkaPduProtobuffCodec : AkkaPduCodec
internal sealed class AkkaPduProtobuffCodec : AkkaPduCodec
{
/// <summary>
/// TBD
Expand All @@ -316,14 +316,14 @@ internal class AkkaPduProtobuffCodec : AkkaPduCodec
/// </ul>
/// </exception>
/// <returns>TBD</returns>
public override IAkkaPdu DecodePdu(ByteString raw)
public override IAkkaPdu DecodePdu(ref ReadOnlyMemory<byte> raw)
{
try
{
var pdu = AkkaProtocolMessage.Parser.ParseFrom(raw);
var pdu = AkkaProtocolMessage.Parser.ParseFrom(raw.Span);
if (pdu.Instruction != null) return DecodeControlPdu(pdu.Instruction);
else if (!pdu.Payload.IsEmpty) return new Payload(pdu.Payload); // TODO HasPayload
else throw new PduCodecException("Error decoding Akka PDU: Neither message nor control message were contained");
if (!pdu.Payload.IsEmpty) return new Payload(pdu.Payload); // TODO HasPayload
throw new PduCodecException("Error decoding Akka PDU: Neither message nor control message were contained");
}
catch (InvalidProtocolBufferException ex)
{
Expand Down Expand Up @@ -407,9 +407,9 @@ public override ByteString ConstructHeartbeat()
/// <param name="provider">TBD</param>
/// <param name="localAddress">TBD</param>
/// <returns>TBD</returns>
public override AckAndMessage DecodeMessage(ByteString raw, IRemoteActorRefProvider provider, Address localAddress)
public override AckAndMessage DecodeMessage(ref ReadOnlyMemory<byte> raw, IRemoteActorRefProvider provider, Address localAddress)
{
var ackAndEnvelope = AckAndEnvelopeContainer.Parser.ParseFrom(raw);
var ackAndEnvelope = AckAndEnvelopeContainer.Parser.ParseFrom(raw.Span);

Ack ackOption = null;

Expand All @@ -420,39 +420,34 @@ public override AckAndMessage DecodeMessage(ByteString raw, IRemoteActorRefProvi

Message messageOption = null;

if (ackAndEnvelope.Envelope != null)
{
var envelopeContainer = ackAndEnvelope.Envelope;
if (envelopeContainer != null)
{
var recipient = provider.ResolveActorRefWithLocalAddress(envelopeContainer.Recipient.Path, localAddress);
var envelopeContainer = ackAndEnvelope.Envelope;
if (envelopeContainer == null) return new AckAndMessage(ackOption, null);
var recipient = provider.ResolveActorRefWithLocalAddress(envelopeContainer.Recipient.Path, localAddress);

//todo get parsed address from provider
var recipientAddress = ActorPathCache.Cache.GetOrCompute(envelopeContainer.Recipient.Path).Address;
//todo get parsed address from provider
var recipientAddress = ActorPathCache.Cache.GetOrCompute(envelopeContainer.Recipient.Path).Address;

var serializedMessage = envelopeContainer.Message;
IActorRef senderOption = null;
if (envelopeContainer.Sender != null)
senderOption = provider.ResolveActorRefWithLocalAddress(envelopeContainer.Sender.Path, localAddress);
var serializedMessage = envelopeContainer.Message;
IActorRef senderOption = null;
if (envelopeContainer.Sender != null)
senderOption = provider.ResolveActorRefWithLocalAddress(envelopeContainer.Sender.Path, localAddress);

SeqNo seqOption = null;
if (envelopeContainer.Seq != SeqUndefined)
{
unchecked
{
seqOption = new SeqNo((long)envelopeContainer.Seq); //proto takes a ulong
}
}

messageOption = new Message(recipient, recipientAddress, serializedMessage, senderOption, seqOption);
SeqNo seqOption = null;
if (envelopeContainer.Seq != SeqUndefined)
{
unchecked
{
seqOption = new SeqNo((long)envelopeContainer.Seq); //proto takes a ulong
}
}

messageOption = new Message(recipient, recipientAddress, serializedMessage, senderOption, seqOption);


return new AckAndMessage(ackOption, messageOption);
}

private AcknowledgementInfo AckBuilder(Ack ack)
private static AcknowledgementInfo AckBuilder(Ack ack)
{
var acki = new AcknowledgementInfo();
acki.CumulativeAck = (ulong)ack.CumulativeAck.RawValue;
Expand Down Expand Up @@ -496,7 +491,7 @@ public override ByteString ConstructPureAck(Ack ack)
}

#region Internal methods
private IAkkaPdu DecodeControlPdu(AkkaControlMessage controlPdu)
private static IAkkaPdu DecodeControlPdu(AkkaControlMessage controlPdu)
{
switch (controlPdu.CommandType)
{
Expand Down
25 changes: 15 additions & 10 deletions src/core/Akka.Remote/Transport/AkkaProtocolTransport.cs
Expand Up @@ -894,8 +894,9 @@ private void InitializeFSM()
case Disassociated d:
return Stop(new Failure(d.Info));
case InboundPayload p when @event.StateData is OutboundUnderlyingAssociated ola:
{
var pdu = DecodePdu(p.Payload);
{
var bytes = p.Payload;
var pdu = DecodePdu(ref bytes);
/*
* This state is used for OutboundProtocolState actors when they receive
* a reply back from the inbound end of the association.
Expand Down Expand Up @@ -941,8 +942,9 @@ private void InitializeFSM()

// Events for inbound associations
case InboundPayload p when @event.StateData is InboundUnassociated iu:
{
var pdu = DecodePdu(p.Payload);
{
var bytes = p.Payload;
var pdu = DecodePdu(ref bytes);
/*
* This state is used by inbound protocol state actors
* when they receive an association attempt from the
Expand Down Expand Up @@ -1002,8 +1004,9 @@ private void InitializeFSM()
case Disassociated d:
return Stop(new Failure(d.Info));
case InboundPayload ip:
{
var pdu = DecodePdu(ip.Payload);
{
var bytes = ip.Payload;
var pdu = DecodePdu(ref bytes);
switch (pdu)
{
case Disassociate d:
Expand All @@ -1024,7 +1027,8 @@ private void InitializeFSM()
.Using(new AssociatedWaitHandler(awh.HandlerListener, awh.WrappedHandle,
nQueue));
case ListenerReady lr:
lr.Listener.Notify(new InboundPayload(p.Bytes));
var mem = p.Bytes.Memory;
lr.Listener.Notify(new InboundPayload(ref mem));
return Stay();
default:
throw new AkkaProtocolException(
Expand Down Expand Up @@ -1064,7 +1068,8 @@ AssociationHandle GetHandle(ProtocolStateData data)
case HandleListenerRegistered hlr when @event.StateData is AssociatedWaitHandler awh:
foreach (var p in awh.Queue)
{
hlr.Listener.Notify(new InboundPayload(p));
var mem = p.Memory;
hlr.Listener.Notify(new InboundPayload(ref mem));
}

return Stay().Using(new ListenerReady(hlr.Listener, awh.WrappedHandle));
Expand Down Expand Up @@ -1299,11 +1304,11 @@ private void ListenForListenerRegistration(TaskCompletionSource<IHandleEventList
return readHandlerPromise.Task;
}

private IAkkaPdu DecodePdu(ByteString pdu)
private IAkkaPdu DecodePdu(ref ReadOnlyMemory<byte> pdu)
{
try
{
return _codec.DecodePdu(pdu);
return _codec.DecodePdu(ref pdu);
}
catch (Exception ex)
{
Expand Down
4 changes: 3 additions & 1 deletion src/core/Akka.Remote/Transport/DotNetty/TcpTransport.cs
Expand Up @@ -55,8 +55,10 @@ public override void ChannelRead(IChannelHandlerContext context, object message)
if (buf.ReadableBytes > 0)
{
// no need to copy the byte buffer contents; ByteString does that automatically
// DEFENSIVE COPY
var bytes = ByteString.CopyFrom(buf.Array, buf.ArrayOffset + buf.ReaderIndex, buf.ReadableBytes);
NotifyListener(new InboundPayload(bytes));
var mem = bytes.Memory;
NotifyListener(new InboundPayload(ref mem));
}

// decrease the reference count to 0 (releases buffer)
Expand Down
3 changes: 2 additions & 1 deletion src/core/Akka.Remote/Transport/TestTransport.cs
Expand Up @@ -275,7 +275,8 @@ private Task<bool> DefaultWriteBehavior(TestAssociationHandle handle, ByteString

if (remoteReadHandler != null)
{
remoteReadHandler.Notify(new InboundPayload(payload));
var mem = payload.Memory;
remoteReadHandler.Notify(new InboundPayload(ref mem));
return Task.FromResult(true);
}

Expand Down