Skip to content

Commit

Permalink
enable ChannelTaskScheduler to work inside Akka.Cluster without cau…
Browse files Browse the repository at this point in the history
…sing errors inside `/system` actors (akkadotnet#5861)

* close akkadotnet#5498

enable `ChannelTaskScheduler` to work inside Akka.Cluster without causing errors inside `/system` actors

* fix `HeartbeatSender`

* cleaned up SBR internals (style)

* cleaned up some comments

* asynchronously attempt to acquire `Cluster` inside SBR

* fixed SBR compilation

* Update SplitBrainResolver.cs

* subscribe on PreStart
  • Loading branch information
Aaronontheweb committed May 5, 2022
1 parent c2460eb commit 51c446d
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 86 deletions.
Expand Up @@ -27,7 +27,7 @@ public ClusterHeartbeatReceiverSpec(ITestOutputHelper output)
[Fact]
public void ClusterHeartbeatReceiver_should_respond_to_heartbeats_with_same_SeqNo_and_SendTime()
{
var heartbeater = Sys.ActorOf(ClusterHeartbeatReceiver.Props(() => Cluster.Get(Sys)));
var heartbeater = Sys.ActorOf(ClusterHeartbeatReceiver.Props(Cluster.Get(Sys)));
heartbeater.Tell(new Heartbeat(Cluster.Get(Sys).SelfAddress, 1, 2));
ExpectMsg<HeartbeatRsp>(new HeartbeatRsp(Cluster.Get(Sys).SelfUniqueAddress, 1, 2));
}
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Cluster.Tests/ClusterHeartbeatSenderSpec.cs
Expand Up @@ -23,7 +23,7 @@ class TestClusterHeartbeatSender : ClusterHeartbeatSender
{
private readonly TestProbe _probe;

public TestClusterHeartbeatSender(TestProbe probe)
public TestClusterHeartbeatSender(TestProbe probe) : base(Cluster.Get(Context.System))
{
_probe = probe;
}
Expand Down
3 changes: 1 addition & 2 deletions src/core/Akka.Cluster/Cluster.cs
Expand Up @@ -520,7 +520,7 @@ public ImmutableHashSet<string> SelfRoles
public DefaultFailureDetectorRegistry<Address> FailureDetector { get; }

/// <summary>
/// TBD
/// The downing provider used to execute automatic downing inside Akka.Cluster.
/// </summary>
public IDowningProvider DowningProvider => _downingProvider.Value;

Expand All @@ -535,7 +535,6 @@ public ImmutableHashSet<string> SelfRoles

private static IScheduler CreateScheduler(ActorSystem system)
{
//TODO: Whole load of stuff missing here!
return system.Scheduler;
}

Expand Down
4 changes: 2 additions & 2 deletions src/core/Akka.Cluster/ClusterDaemon.cs
Expand Up @@ -895,7 +895,7 @@ private void CreateChildren(Cluster cluster)
_cluster = cluster;
_coreSupervisor = Context.ActorOf(Props.Create<ClusterCoreSupervisor>(), "core");

Context.ActorOf(ClusterHeartbeatReceiver.Props(() => _cluster), "heartbeatReceiver");
Context.ActorOf(ClusterHeartbeatReceiver.Props(cluster), "heartbeatReceiver");
}

protected override void PostStop()
Expand Down Expand Up @@ -1333,7 +1333,7 @@ private void BecomeInitialized()
{
// start heartbeatSender here, and not in constructor to make sure that
// heartbeating doesn't start before Welcome is received
Context.ActorOf(Props.Create<ClusterHeartbeatSender>().WithDispatcher(_cluster.Settings.UseDispatcher),
Context.ActorOf(Props.Create(() => new ClusterHeartbeatSender(_cluster)).WithDispatcher(_cluster.Settings.UseDispatcher),
"heartbeatSender");
// make sure that join process is stopped
StopSeedNodeProcess();
Expand Down
21 changes: 11 additions & 10 deletions src/core/Akka.Cluster/ClusterHeartbeat.cs
Expand Up @@ -26,16 +26,16 @@ internal sealed class ClusterHeartbeatReceiver : UntypedActor
{
// Important - don't use Cluster.Get(Context.System) in constructor because that would
// cause deadlock. See startup sequence in ClusterDaemon.
private readonly Lazy<Cluster> _cluster;
private readonly Cluster _cluster;

public bool VerboseHeartbeat => _cluster.Value.Settings.VerboseHeartbeatLogging;
public bool VerboseHeartbeat => _cluster.Settings.VerboseHeartbeatLogging;

/// <summary>
/// TBD
/// </summary>
public ClusterHeartbeatReceiver(Func<Cluster> getCluster)
public ClusterHeartbeatReceiver(Cluster cluster)
{
_cluster = new Lazy<Cluster>(getCluster);
_cluster = cluster;
}

protected override void OnReceive(object message)
Expand All @@ -44,8 +44,8 @@ protected override void OnReceive(object message)
{
case ClusterHeartbeatSender.Heartbeat hb:
// TODO log the sequence nr once serializer is enabled
if(VerboseHeartbeat) _cluster.Value.CurrentInfoLogger.LogDebug("Heartbeat from [{0}]", hb.From);
Sender.Tell(new ClusterHeartbeatSender.HeartbeatRsp(_cluster.Value.SelfUniqueAddress,
if(VerboseHeartbeat) _cluster.CurrentInfoLogger.LogDebug("Heartbeat from [{0}]", hb.From);
Sender.Tell(new ClusterHeartbeatSender.HeartbeatRsp(_cluster.SelfUniqueAddress,
hb.SequenceNr, hb.CreationTimeNanos));
break;
default:
Expand All @@ -54,7 +54,7 @@ protected override void OnReceive(object message)
}
}

public static Props Props(Func<Cluster> getCluster)
public static Props Props(Cluster getCluster)
{
return Akka.Actor.Props.Create(() => new ClusterHeartbeatReceiver(getCluster));
}
Expand All @@ -76,11 +76,12 @@ internal class ClusterHeartbeatSender : ReceiveActor
private DateTime _tickTimestamp;

/// <summary>
/// TBD
/// Create a new instance of the <see cref="ClusterHeartbeatSender"/> and pass in a reference to the <see cref="Cluster"/>
/// to which it belongs.
/// </summary>
public ClusterHeartbeatSender()
public ClusterHeartbeatSender(Cluster cluster)
{
_cluster = Cluster.Get(Context.System);
_cluster = cluster;
var tickInitialDelay = _cluster.Settings.PeriodicTasksInitialDelay.Max(_cluster.Settings.HeartbeatInterval);
_tickTimestamp = DateTime.UtcNow + tickInitialDelay;

Expand Down

0 comments on commit 51c446d

Please sign in to comment.