From 51c446ddfe3d82b70dd98edd9376c81af6576937 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Thu, 5 May 2022 18:03:19 -0500 Subject: [PATCH] enable `ChannelTaskScheduler` to work inside Akka.Cluster without causing errors inside `/system` actors (#5861) * close #5498 enable `ChannelTaskScheduler` to work inside Akka.Cluster without causing errors inside `/system` actors * fix `HeartbeatSender` * cleaned up SBR internals (style) * cleaned up some comments * asynchronously attempt to acquire `Cluster` inside SBR * fixed SBR compilation * Update SplitBrainResolver.cs * subscribe on PreStart --- .../ClusterHeartbeatReceiverSpec.cs | 2 +- .../ClusterHeartbeatSenderSpec.cs | 2 +- src/core/Akka.Cluster/Cluster.cs | 3 +- src/core/Akka.Cluster/ClusterDaemon.cs | 4 +- src/core/Akka.Cluster/ClusterHeartbeat.cs | 21 ++-- .../Akka.Cluster/SBR/SplitBrainResolver.cs | 104 +++++++++--------- .../SBR/SplitBrainResolverProvider.cs | 30 ++--- 7 files changed, 80 insertions(+), 86 deletions(-) diff --git a/src/core/Akka.Cluster.Tests/ClusterHeartbeatReceiverSpec.cs b/src/core/Akka.Cluster.Tests/ClusterHeartbeatReceiverSpec.cs index 90dd2c6d90c..58963c44875 100644 --- a/src/core/Akka.Cluster.Tests/ClusterHeartbeatReceiverSpec.cs +++ b/src/core/Akka.Cluster.Tests/ClusterHeartbeatReceiverSpec.cs @@ -27,7 +27,7 @@ public ClusterHeartbeatReceiverSpec(ITestOutputHelper output) [Fact] public void ClusterHeartbeatReceiver_should_respond_to_heartbeats_with_same_SeqNo_and_SendTime() { - var heartbeater = Sys.ActorOf(ClusterHeartbeatReceiver.Props(() => Cluster.Get(Sys))); + var heartbeater = Sys.ActorOf(ClusterHeartbeatReceiver.Props(Cluster.Get(Sys))); heartbeater.Tell(new Heartbeat(Cluster.Get(Sys).SelfAddress, 1, 2)); ExpectMsg(new HeartbeatRsp(Cluster.Get(Sys).SelfUniqueAddress, 1, 2)); } diff --git a/src/core/Akka.Cluster.Tests/ClusterHeartbeatSenderSpec.cs b/src/core/Akka.Cluster.Tests/ClusterHeartbeatSenderSpec.cs index fdea539c91b..9fad2354911 100644 --- a/src/core/Akka.Cluster.Tests/ClusterHeartbeatSenderSpec.cs +++ b/src/core/Akka.Cluster.Tests/ClusterHeartbeatSenderSpec.cs @@ -23,7 +23,7 @@ class TestClusterHeartbeatSender : ClusterHeartbeatSender { private readonly TestProbe _probe; - public TestClusterHeartbeatSender(TestProbe probe) + public TestClusterHeartbeatSender(TestProbe probe) : base(Cluster.Get(Context.System)) { _probe = probe; } diff --git a/src/core/Akka.Cluster/Cluster.cs b/src/core/Akka.Cluster/Cluster.cs index 4beb07754c7..c9c2d7dfbac 100644 --- a/src/core/Akka.Cluster/Cluster.cs +++ b/src/core/Akka.Cluster/Cluster.cs @@ -520,7 +520,7 @@ public ImmutableHashSet SelfRoles public DefaultFailureDetectorRegistry
FailureDetector { get; } /// - /// TBD + /// The downing provider used to execute automatic downing inside Akka.Cluster. /// public IDowningProvider DowningProvider => _downingProvider.Value; @@ -535,7 +535,6 @@ public ImmutableHashSet SelfRoles private static IScheduler CreateScheduler(ActorSystem system) { - //TODO: Whole load of stuff missing here! return system.Scheduler; } diff --git a/src/core/Akka.Cluster/ClusterDaemon.cs b/src/core/Akka.Cluster/ClusterDaemon.cs index 188bfe4f0b2..99666f47e75 100644 --- a/src/core/Akka.Cluster/ClusterDaemon.cs +++ b/src/core/Akka.Cluster/ClusterDaemon.cs @@ -895,7 +895,7 @@ private void CreateChildren(Cluster cluster) _cluster = cluster; _coreSupervisor = Context.ActorOf(Props.Create(), "core"); - Context.ActorOf(ClusterHeartbeatReceiver.Props(() => _cluster), "heartbeatReceiver"); + Context.ActorOf(ClusterHeartbeatReceiver.Props(cluster), "heartbeatReceiver"); } protected override void PostStop() @@ -1333,7 +1333,7 @@ private void BecomeInitialized() { // start heartbeatSender here, and not in constructor to make sure that // heartbeating doesn't start before Welcome is received - Context.ActorOf(Props.Create().WithDispatcher(_cluster.Settings.UseDispatcher), + Context.ActorOf(Props.Create(() => new ClusterHeartbeatSender(_cluster)).WithDispatcher(_cluster.Settings.UseDispatcher), "heartbeatSender"); // make sure that join process is stopped StopSeedNodeProcess(); diff --git a/src/core/Akka.Cluster/ClusterHeartbeat.cs b/src/core/Akka.Cluster/ClusterHeartbeat.cs index a511b68d875..501dcc3517b 100644 --- a/src/core/Akka.Cluster/ClusterHeartbeat.cs +++ b/src/core/Akka.Cluster/ClusterHeartbeat.cs @@ -26,16 +26,16 @@ internal sealed class ClusterHeartbeatReceiver : UntypedActor { // Important - don't use Cluster.Get(Context.System) in constructor because that would // cause deadlock. See startup sequence in ClusterDaemon. - private readonly Lazy _cluster; + private readonly Cluster _cluster; - public bool VerboseHeartbeat => _cluster.Value.Settings.VerboseHeartbeatLogging; + public bool VerboseHeartbeat => _cluster.Settings.VerboseHeartbeatLogging; /// /// TBD /// - public ClusterHeartbeatReceiver(Func getCluster) + public ClusterHeartbeatReceiver(Cluster cluster) { - _cluster = new Lazy(getCluster); + _cluster = cluster; } protected override void OnReceive(object message) @@ -44,8 +44,8 @@ protected override void OnReceive(object message) { case ClusterHeartbeatSender.Heartbeat hb: // TODO log the sequence nr once serializer is enabled - if(VerboseHeartbeat) _cluster.Value.CurrentInfoLogger.LogDebug("Heartbeat from [{0}]", hb.From); - Sender.Tell(new ClusterHeartbeatSender.HeartbeatRsp(_cluster.Value.SelfUniqueAddress, + if(VerboseHeartbeat) _cluster.CurrentInfoLogger.LogDebug("Heartbeat from [{0}]", hb.From); + Sender.Tell(new ClusterHeartbeatSender.HeartbeatRsp(_cluster.SelfUniqueAddress, hb.SequenceNr, hb.CreationTimeNanos)); break; default: @@ -54,7 +54,7 @@ protected override void OnReceive(object message) } } - public static Props Props(Func getCluster) + public static Props Props(Cluster getCluster) { return Akka.Actor.Props.Create(() => new ClusterHeartbeatReceiver(getCluster)); } @@ -76,11 +76,12 @@ internal class ClusterHeartbeatSender : ReceiveActor private DateTime _tickTimestamp; /// - /// TBD + /// Create a new instance of the and pass in a reference to the + /// to which it belongs. /// - public ClusterHeartbeatSender() + public ClusterHeartbeatSender(Cluster cluster) { - _cluster = Cluster.Get(Context.System); + _cluster = cluster; var tickInitialDelay = _cluster.Settings.PeriodicTasksInitialDelay.Max(_cluster.Settings.HeartbeatInterval); _tickTimestamp = DateTime.UtcNow + tickInitialDelay; diff --git a/src/core/Akka.Cluster/SBR/SplitBrainResolver.cs b/src/core/Akka.Cluster/SBR/SplitBrainResolver.cs index feb711f139c..12c203e2cee 100644 --- a/src/core/Akka.Cluster/SBR/SplitBrainResolver.cs +++ b/src/core/Akka.Cluster/SBR/SplitBrainResolver.cs @@ -25,23 +25,14 @@ namespace Akka.Cluster.SBR /// internal class SplitBrainResolver : SplitBrainResolverBase { - private readonly Cluster cluster; + private Cluster _cluster; public SplitBrainResolver(TimeSpan stableAfter, DowningStrategy strategy) : base(stableAfter, strategy) { - cluster = Cluster.Get(Context.System); - Log.Info( - "SBR started. Config: strategy [{0}], stable-after [{1}], down-all-when-unstable [{2}], selfUniqueAddress [{3}].", - Logging.SimpleName(strategy.GetType()), - stableAfter, - // ReSharper disable VirtualMemberCallInConstructor - DownAllWhenUnstable == TimeSpan.Zero ? "off" : DownAllWhenUnstable.ToString(), - SelfUniqueAddress.Address); - // ReSharper restore VirtualMemberCallInConstructor } - public override UniqueAddress SelfUniqueAddress => cluster.SelfUniqueAddress; + public override UniqueAddress SelfUniqueAddress => _cluster.SelfUniqueAddress; public static Props Props2(TimeSpan stableAfter, DowningStrategy strategy) { @@ -51,20 +42,23 @@ public static Props Props2(TimeSpan stableAfter, DowningStrategy strategy) // re-subscribe when restart protected override void PreStart() { - cluster.Subscribe(Self, InitialStateAsEvents, typeof(IClusterDomainEvent)); + _cluster = Cluster.Get(Context.System); + _cluster.Subscribe(Self, InitialStateAsEvents, typeof(IClusterDomainEvent)); + base.PreStart(); } + protected override void PostStop() { - cluster.Unsubscribe(Self); + _cluster.Unsubscribe(Self); base.PostStop(); } public override void Down(UniqueAddress node, IDecision decision) { Log.Info("SBR is downing [{0}]", node); - cluster.Down(node.Address); + _cluster.Down(node.Address); } } @@ -75,24 +69,24 @@ public override void Down(UniqueAddress node, IDecision decision) internal abstract class SplitBrainResolverBase : ActorBase, IWithUnboundedStash, IWithTimers { // would be better as constructor parameter, but don't want to break Cinnamon instrumentation - private readonly SplitBrainResolverSettings settings; + private readonly SplitBrainResolverSettings _settings; private ILoggingAdapter _log; - private ReachabilityChangedStats reachabilityChangedStats = + private ReachabilityChangedStats _reachabilityChangedStats = new ReachabilityChangedStats(DateTime.UtcNow, DateTime.UtcNow, 0); - private IReleaseLeaseCondition releaseLeaseCondition = ReleaseLeaseCondition.NoLease.Instance; - private bool selfMemberAdded; + private IReleaseLeaseCondition _releaseLeaseCondition = ReleaseLeaseCondition.NoLease.Instance; + private bool _selfMemberAdded; - private Deadline stableDeadline; + private Deadline _stableDeadline; protected SplitBrainResolverBase(TimeSpan stableAfter, DowningStrategy strategy) { StableAfter = stableAfter; Strategy = strategy; - settings = new SplitBrainResolverSettings(Context.System.Settings.Config); + _settings = new SplitBrainResolverSettings(Context.System.Settings.Config); // ReSharper disable once VirtualMemberCallInConstructor Timers.StartPeriodicTimer(Tick.Instance, Tick.Instance, TickInterval); @@ -110,13 +104,13 @@ protected SplitBrainResolverBase(TimeSpan stableAfter, DowningStrategy strategy) public abstract UniqueAddress SelfUniqueAddress { get; } - public virtual TimeSpan DownAllWhenUnstable => settings.DownAllWhenUnstable; + public virtual TimeSpan DownAllWhenUnstable => _settings.DownAllWhenUnstable; public virtual TimeSpan TickInterval => TimeSpan.FromSeconds(1); protected bool Leader { get; private set; } - public bool IsResponsible => Leader && selfMemberAdded; + public bool IsResponsible => Leader && _selfMemberAdded; public ITimerScheduler Timers { get; set; } @@ -135,18 +129,18 @@ protected virtual Deadline NewStableDeadline() public void ResetStableDeadline() { - stableDeadline = NewStableDeadline(); + _stableDeadline = NewStableDeadline(); } private void ResetReachabilityChangedStats() { var now = DateTime.UtcNow; - reachabilityChangedStats = new ReachabilityChangedStats(now, now, 0); + _reachabilityChangedStats = new ReachabilityChangedStats(now, now, 0); } private void ResetReachabilityChangedStatsIfAllUnreachableDowned() { - if (!reachabilityChangedStats.IsEmpty && Strategy.IsAllUnreachableDownOrExiting) + if (!_reachabilityChangedStats.IsEmpty && Strategy.IsAllUnreachableDownOrExiting) { Log.Debug("SBR resetting reachability stats, after all unreachable healed, downed or removed"); ResetReachabilityChangedStats(); @@ -215,13 +209,13 @@ public void MutateResponsibilityInfo(Action f) else if (responsibleBefore && !responsibleAfter) Log.Info("This node is not the leader any more and not responsible for taking SBR decisions."); - if (Leader && !selfMemberAdded) + if (Leader && !_selfMemberAdded) Log.Debug("This node is leader but !selfMemberAdded."); } protected override void PostStop() { - if (!(releaseLeaseCondition is ReleaseLeaseCondition.NoLease)) + if (!(_releaseLeaseCondition is ReleaseLeaseCondition.NoLease)) Log.Info( "SBR is stopped and owns the lease. The lease will not be released until after the " + "lease heartbeat-timeout."); @@ -292,11 +286,11 @@ private void OnTick() { // note the DownAll due to instability is running on all nodes to make that decision as quickly and // aggressively as possible if time is out - if (reachabilityChangedStats.ChangeCount > 0) + if (_reachabilityChangedStats.ChangeCount > 0) { var now = DateTime.UtcNow; - var durationSinceLatestChange = now - reachabilityChangedStats.LatestChangeTimestamp; - var durationSinceFirstChange = now - reachabilityChangedStats.FirstChangeTimestamp; + var durationSinceLatestChange = now - _reachabilityChangedStats.LatestChangeTimestamp; + var durationSinceFirstChange = now - _reachabilityChangedStats.FirstChangeTimestamp; var downAllWhenUnstableEnabled = DownAllWhenUnstable > TimeSpan.Zero; if (downAllWhenUnstableEnabled && durationSinceFirstChange > StableAfter + DownAllWhenUnstable) @@ -304,7 +298,7 @@ private void OnTick() Log.Warning( //ClusterLogMarker.sbrInstability, "SBR detected instability and will down all nodes: {0}", - reachabilityChangedStats); + _reachabilityChangedStats); ActOnDecision(DownAll.Instance); } else if (!downAllWhenUnstableEnabled && durationSinceLatestChange > StableAfter + StableAfter) @@ -316,7 +310,7 @@ private void OnTick() } } - if (IsResponsible && !Strategy.Unreachable.IsEmpty && stableDeadline.IsOverdue) + if (IsResponsible && !Strategy.Unreachable.IsEmpty && _stableDeadline.IsOverdue) switch (Strategy.Decide()) { case IAcquireLeaseDecision decision: @@ -360,7 +354,7 @@ private void OnTick() break; } - switch (releaseLeaseCondition) + switch (_releaseLeaseCondition) { case ReleaseLeaseCondition.WhenTimeElapsed rlc: if (rlc.Deadline.IsOverdue) @@ -398,18 +392,18 @@ bool ReceiveLease(object message) { Log.Info("SBR acquired lease for decision [{0}]", decision); var downedNodes = ActOnDecision(decision); - switch (releaseLeaseCondition) + switch (_releaseLeaseCondition) { case ReleaseLeaseCondition.WhenMembersRemoved rlc: - releaseLeaseCondition = + _releaseLeaseCondition = new ReleaseLeaseCondition.WhenMembersRemoved(rlc.Nodes.Union(downedNodes)); break; default: if (downedNodes.IsEmpty) - releaseLeaseCondition = + _releaseLeaseCondition = new ReleaseLeaseCondition.WhenTimeElapsed(Deadline.Now + ReleaseLeaseAfter); else - releaseLeaseCondition = + _releaseLeaseCondition = new ReleaseLeaseCondition.WhenMembersRemoved(downedNodes); break; } @@ -422,7 +416,7 @@ bool ReceiveLease(object message) decision, reverseDecision); ActOnDecision(reverseDecision); - releaseLeaseCondition = ReleaseLeaseCondition.NoLease.Instance; + _releaseLeaseCondition = ReleaseLeaseCondition.NoLease.Instance; } Stash.UnstashAll(); @@ -446,13 +440,13 @@ bool ReceiveLease(object message) private void OnReleaseLeaseResult(bool released) { - switch (releaseLeaseCondition) + switch (_releaseLeaseCondition) { case ReleaseLeaseCondition.WhenTimeElapsed rlc: if (released && rlc.Deadline.IsOverdue) { Log.Info("SBR released lease."); - releaseLeaseCondition = ReleaseLeaseCondition.NoLease.Instance; // released successfully + _releaseLeaseCondition = ReleaseLeaseCondition.NoLease.Instance; // released successfully } break; @@ -524,8 +518,8 @@ public void UnreachableMember(Member m) Strategy.AddUnreachable(m); UpdateReachabilityChangedStats(); ResetReachabilityChangedStatsIfAllUnreachableDowned(); - if (!reachabilityChangedStats.IsEmpty) - Log.Debug("SBR noticed {0}", reachabilityChangedStats); + if (!_reachabilityChangedStats.IsEmpty) + Log.Debug("SBR noticed {0}", _reachabilityChangedStats); }); } } @@ -540,8 +534,8 @@ public void ReachableMember(Member m) Strategy.AddReachable(m); UpdateReachabilityChangedStats(); ResetReachabilityChangedStatsIfAllUnreachableDowned(); - if (!reachabilityChangedStats.IsEmpty) - Log.Debug("SBR noticed {0}", reachabilityChangedStats); + if (!_reachabilityChangedStats.IsEmpty) + Log.Debug("SBR noticed {0}", _reachabilityChangedStats); }); } } @@ -554,13 +548,13 @@ private void ReachabilityChanged(Reachability r) private void UpdateReachabilityChangedStats() { var now = DateTime.UtcNow; - if (reachabilityChangedStats.ChangeCount == 0) - reachabilityChangedStats = new ReachabilityChangedStats(now, now, 1); + if (_reachabilityChangedStats.ChangeCount == 0) + _reachabilityChangedStats = new ReachabilityChangedStats(now, now, 1); else - reachabilityChangedStats = new ReachabilityChangedStats( - reachabilityChangedStats.FirstChangeTimestamp, + _reachabilityChangedStats = new ReachabilityChangedStats( + _reachabilityChangedStats.FirstChangeTimestamp, now, - reachabilityChangedStats.ChangeCount + 1 + _reachabilityChangedStats.ChangeCount + 1 ); } @@ -576,7 +570,7 @@ public void AddUp(Member m) { Strategy.Add(m); if (m.UniqueAddress.Equals(SelfUniqueAddress)) - MutateResponsibilityInfo(() => { selfMemberAdded = true; }); + MutateResponsibilityInfo(() => { _selfMemberAdded = true; }); }); switch (Strategy) { @@ -617,7 +611,7 @@ public void AddJoining(Member m) public void AddWeaklyUp(Member m) { if (m.UniqueAddress.Equals(SelfUniqueAddress)) - MutateResponsibilityInfo(() => { selfMemberAdded = true; }); + MutateResponsibilityInfo(() => { _selfMemberAdded = true; }); // treat WeaklyUp in same way as joining AddJoining(m); } @@ -634,16 +628,16 @@ public void Remove(Member m) ResetReachabilityChangedStatsIfAllUnreachableDowned(); - switch (releaseLeaseCondition) + switch (_releaseLeaseCondition) { case ReleaseLeaseCondition.WhenMembersRemoved rlc: var remainingDownedNodes = rlc.Nodes.Remove(m.UniqueAddress); if (remainingDownedNodes.IsEmpty) - releaseLeaseCondition = + _releaseLeaseCondition = new ReleaseLeaseCondition.WhenTimeElapsed(Deadline.Now + ReleaseLeaseAfter); else - releaseLeaseCondition = + _releaseLeaseCondition = new ReleaseLeaseCondition.WhenMembersRemoved(remainingDownedNodes); break; } @@ -654,7 +648,7 @@ private void ReleaseLease() { // implicit val ec: ExecutionContext = internalDispatcher if (Strategy.Lease != null) - if (!(releaseLeaseCondition is ReleaseLeaseCondition.NoLease)) + if (!(_releaseLeaseCondition is ReleaseLeaseCondition.NoLease)) { Log.Debug("SBR releasing lease"); Strategy.Lease.Release().ContinueWith(r => new ReleaseLeaseResult(!r.IsFaulted ? r.Result : false)) diff --git a/src/core/Akka.Cluster/SBR/SplitBrainResolverProvider.cs b/src/core/Akka.Cluster/SBR/SplitBrainResolverProvider.cs index b150976820a..b1a809229c7 100644 --- a/src/core/Akka.Cluster/SBR/SplitBrainResolverProvider.cs +++ b/src/core/Akka.Cluster/SBR/SplitBrainResolverProvider.cs @@ -19,13 +19,13 @@ namespace Akka.Cluster.SBR /// public class SplitBrainResolverProvider : IDowningProvider { - private readonly SplitBrainResolverSettings settings; - private readonly ActorSystem system; + private readonly SplitBrainResolverSettings _settings; + private readonly ActorSystem _system; public SplitBrainResolverProvider(ActorSystem system) { - this.system = system; - settings = new SplitBrainResolverSettings(system.Settings.Config); + _system = system; + _settings = new SplitBrainResolverSettings(system.Settings.Config); } public TimeSpan DownRemovalMargin @@ -35,11 +35,11 @@ public TimeSpan DownRemovalMargin // if down-removal-margin is defined we let it trump stable-after to allow // for two different values for SBR downing and cluster tool stop/start after downing #pragma warning disable CS0618 // Type or member is obsolete - var drm = Cluster.Get(system).Settings.DownRemovalMargin; + var drm = Cluster.Get(_system).Settings.DownRemovalMargin; #pragma warning restore CS0618 // Type or member is obsolete if (drm != TimeSpan.Zero) return drm; - return settings.DowningStableAfter; + return _settings.DowningStableAfter; } } @@ -48,28 +48,28 @@ public Props DowningActorProps get { DowningStrategy strategy; - switch (settings.DowningStrategy) + switch (_settings.DowningStrategy) { case SplitBrainResolverSettings.KeepMajorityName: - strategy = new KeepMajority(settings.KeepMajorityRole); + strategy = new KeepMajority(_settings.KeepMajorityRole); break; case SplitBrainResolverSettings.StaticQuorumName: - var sqs = settings.StaticQuorumSettings; + var sqs = _settings.StaticQuorumSettings; strategy = new StaticQuorum(sqs.Size, sqs.Role); break; case SplitBrainResolverSettings.KeepOldestName: - var kos = settings.KeepOldestSettings; + var kos = _settings.KeepOldestSettings; strategy = new KeepOldest(kos.DownIfAlone, kos.Role); break; case SplitBrainResolverSettings.DownAllName: strategy = new DownAllNodes(); break; case SplitBrainResolverSettings.LeaseMajorityName: - var lms = settings.LeaseMajoritySettings; - var leaseOwnerName = Cluster.Get(system).SelfUniqueAddress.Address.HostPort(); + var lms = _settings.LeaseMajoritySettings; + var leaseOwnerName = Cluster.Get(_system).SelfUniqueAddress.Address.HostPort(); - var leaseName = lms.SafeLeaseName(system.Name); - var lease = LeaseProvider.Get(system).GetLease(leaseName, lms.LeaseImplementation, leaseOwnerName); + var leaseName = lms.SafeLeaseName(_system.Name); + var lease = LeaseProvider.Get(_system).GetLease(leaseName, lms.LeaseImplementation, leaseOwnerName); strategy = new LeaseMajority(lms.Role, lease, lms.AcquireLeaseDelayForMinority, lms.ReleaseAfter); break; @@ -77,7 +77,7 @@ public Props DowningActorProps throw new InvalidOperationException(); } - return SplitBrainResolver.Props2(settings.DowningStableAfter, strategy); + return SplitBrainResolver.Props2(_settings.DowningStableAfter, strategy); } } }