Skip to content

Commit

Permalink
added f# version and net472
Browse files Browse the repository at this point in the history
the net472 version reproduces the issue akkadotnet/akka.net#6931
  • Loading branch information
anpin committed Sep 27, 2023
1 parent 19a9806 commit 3a0c862
Show file tree
Hide file tree
Showing 4 changed files with 360 additions and 124 deletions.
9 changes: 8 additions & 1 deletion ConsoleApp1/ConsoleApp1.csproj
Expand Up @@ -2,14 +2,21 @@

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net7.0</TargetFramework>
<TargetFrameworks>net7.0; net472</TargetFrameworks>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<LangVersion>latest</LangVersion>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Akka" Version="1.5.13" />
<PackageReference Include="Akka.Streams" Version="1.5.12" />
</ItemGroup>
<ItemGroup Condition="'$(TargetFramework)' == 'net472'">
<Compile Include="$(MSBuildThisFileDirectory)IsExternalInit.cs" Visible="true" />
</ItemGroup>
<ItemGroup Condition="'$(TargetFramework)' == 'net7.0'">
<Compile Remove="$(MSBuildThisFileDirectory)IsExternalInit.cs" Visible="true" />
</ItemGroup>

</Project>
7 changes: 7 additions & 0 deletions ConsoleApp1/IsExternalInit.cs
@@ -0,0 +1,7 @@
using System.ComponentModel;

namespace System.Runtime.CompilerServices
{
[EditorBrowsable(EditorBrowsableState.Never)]
internal class IsExternalInit{}
}
256 changes: 133 additions & 123 deletions ConsoleApp1/Program.cs
Expand Up @@ -9,142 +9,152 @@
using Akka.Util;
using Tcp = Akka.Streams.Dsl.Tcp;

var run = true;
var config = Akka.Configuration.ConfigurationFactory.ParseString(
"""
akka {
loglevel = DEBUG
stdout-loglevel = DEBUG
log-config-on-start = on
actor {
debug {
receive = on
autoreceive = on
lifecycle = on
event-stream = on
unhandled = on
namespace ConsoleApp1
{
public static class Program
{
static bool run = true;
static Config config = Akka.Configuration.ConfigurationFactory.ParseString(
"""
akka {
loglevel = DEBUG
stdout-loglevel = DEBUG
log-config-on-start = on
actor {
debug {
receive = on
autoreceive = on
lifecycle = on
event-stream = on
unhandled = on
}
}
}
}
""");
var actorSystem = ActorSystem.Create("my-system", config );
}
""");
static readonly ActorSystem actorSystem = ActorSystem.Create("my-system", config);
static void Main(string[] args)
{
Console.CancelKeyPress += (sender, eventArgs) =>
{
Console.WriteLine("Ctrl+C pressed - exiting...");
eventArgs.Cancel = true;
actorSystem.Dispose();
run = false;
};
Console.WriteLine("Press Ctrl+C to exit...");

Console.CancelKeyPress += (sender, eventArgs) =>
{
Console.WriteLine("Ctrl+C pressed - exiting...");
eventArgs.Cancel = true;
actorSystem.Dispose();
run = false;
};
Console.WriteLine( "Press Ctrl+C to exit...");

var module = actorSystem.ActorOf(SocketActor.Props);

var module = actorSystem.ActorOf(SocketActor.Props);

module.Tell(new ConnectTo(new IPEndPoint(IPAddress.Loopback, 8888)));

module.Tell(new ConnectTo(new IPEndPoint(IPAddress.Loopback, 8888)));
Task.Delay(1000).Wait();

Task.Delay(1000).Wait();
module.Tell(new StopConnections());

module.Tell(new StopConnections());

do
{
Thread.Sleep(1);
} while (run);

do
{
Thread.Sleep(1);
} while (run) ;

public record ConnectTo(IPEndPoint endpoint);
public record StopConnections();
}
public record ConnectTo(IPEndPoint endpoint);
public record StopConnections();


public class SocketActor : UntypedActor
{
Sink<ByteString, NotUsed> SharedSink;
Source<ByteString, NotUsed> SharedSource;
SharedKillSwitch ks;
public SocketActor()
{
var mat = Context.Materializer();
ks = KillSwitches.Shared("ks");
var rxTx =
Flow.Create<ByteString>()
.Select(x => x.ToString())
.Log("rx")
.Select(x => ByteString.FromString($"{Random.Shared.Next(0, 255)}\n"))
.Via(ks.Flow<ByteString>())
.Recover(logFailure)
.Log("tx");
;
var (rxSink, rxSource) = MergeHub
.Source<ByteString>(perProducerBufferSize: 16)
.Via (ks.Flow<ByteString>())
.ToMaterialized(BroadcastHub.Sink<ByteString>(bufferSize: 256), Keep.Both)
.Run(mat);
var (txSink, txSource) = MergeHub
.Source<ByteString>(perProducerBufferSize: 16)
.Via (ks.Flow<ByteString>())
.ToMaterialized(BroadcastHub.Sink<ByteString>(bufferSize: 256), Keep.Both)
.Run(mat);
rxSource.Recover(logFailure).Via(rxTx).To(txSink).Run(mat);
SharedSink = rxSink;
SharedSource = txSource;


Source<Tcp.IncomingConnection, Task<Tcp.ServerBinding>> connections =
Context.System.TcpStream().Bind("127.0.0.1", 8888);

connections.RunForeach(connection =>
public class SocketActor : UntypedActor
{
Console.WriteLine($"New connection from: {connection.RemoteAddress}");
var echo = Flow.Create<ByteString>()
.Via(Framing.Delimiter(
ByteString.FromString("\n"),
maximumFrameLength: 256,
allowTruncation: true))
.Select(c => c.ToString())
.Select(c =>
{
var x = Int32.Parse(c);
if (x < 100)
{
return (x - 1).ToString();
}
else
{
throw new ArgumentOutOfRangeException();
}
})
.Merge(Source.Single("HI!\n") )
.Select(ByteString.FromString);
connection.HandleWith(echo, mat);
}, mat);
}
static Option<ByteString> logFailure(Exception ex)
{
Sink<ByteString, NotUsed> SharedSink;
Source<ByteString, NotUsed> SharedSource;
SharedKillSwitch ks;
public SocketActor()
{
var rnd = new Random();
var mat = Context.Materializer();
ks = KillSwitches.Shared("ks");
var rxTx =
Flow.Create<ByteString>()
.Select(x => x.ToString())
.Log("rx")
.Select(x => ByteString.FromString($"{rnd.Next(0, 255)}\n"))
.Via(ks.Flow<ByteString>())
.Recover(logFailure)
.Log("tx");
;
var (rxSink, rxSource) = MergeHub
.Source<ByteString>(perProducerBufferSize: 16)
.Via(ks.Flow<ByteString>())
.ToMaterialized(BroadcastHub.Sink<ByteString>(bufferSize: 256), Keep.Both)
.Run(mat);
var (txSink, txSource) = MergeHub
.Source<ByteString>(perProducerBufferSize: 16)
.Via(ks.Flow<ByteString>())
.ToMaterialized(BroadcastHub.Sink<ByteString>(bufferSize: 256), Keep.Both)
.Run(mat);
rxSource.Recover(logFailure).Via(rxTx).To(txSink).Run(mat);
SharedSink = rxSink;
SharedSource = txSource;


Source<Tcp.IncomingConnection, Task<Tcp.ServerBinding>> connections =
Context.System.TcpStream().Bind("127.0.0.1", 8888);

connections.RunForeach(connection =>
{
Console.WriteLine($"New connection from: {connection.RemoteAddress}");
var echo = Flow.Create<ByteString>()
.Via(Framing.Delimiter(
ByteString.FromString("\n"),
maximumFrameLength: 256,
allowTruncation: true))
.Select(c => c.ToString())
.Select(c =>
{
var x = Int32.Parse(c);
if (x < 100)
{
return (x - 1).ToString();
}
else
{
throw new ArgumentOutOfRangeException();
}
})
.Merge(Source.Single("HI!\n"))
.Select(ByteString.FromString);
connection.HandleWith(echo, mat);
}, mat);
}
static Option<ByteString> logFailure(Exception ex)
{

Console.WriteLine($"Error: {ex}");
return Option<ByteString>.None;
}
protected override void OnReceive(object message)
{
switch (message)
{
case StopConnections :
ks.Shutdown();
break;
case ConnectTo msg :
Context.System.TcpStream()
.OutgoingConnection(msg.endpoint)
.Recover(logFailure)
.Via (ks.Flow<ByteString>())
.Join(Flow.FromSinkAndSource(SharedSink, SharedSource).Recover(logFailure))
.Run(Context.Materializer());
break;
Console.WriteLine($"Error: {ex}");
return Option<ByteString>.None;
}
protected override void OnReceive(object message)
{
switch (message)
{
case StopConnections:
ks.Shutdown();
break;
case ConnectTo msg:
Context.System.TcpStream()
.OutgoingConnection(msg.endpoint)
.Recover(logFailure)
.Via(ks.Flow<ByteString>())
.Join(Flow.FromSinkAndSource(SharedSink, SharedSource).Recover(logFailure))
.Run(Context.Materializer());
break;
}
}

public static Props Props => Props.Create<SocketActor>();
}
}

public static Props Props => Props.Create<SocketActor>();
}
}
}

0 comments on commit 3a0c862

Please sign in to comment.