From 7b2fe34c12c6619cdec946c59a9c3fe88d870273 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 20 Apr 2022 19:22:24 -0500 Subject: [PATCH 1/8] close #5498 enable `ChannelTaskScheduler` to work inside Akka.Cluster without causing errors inside `/system` actors --- src/core/Akka.Cluster/ClusterDaemon.cs | 2 +- src/core/Akka.Cluster/ClusterHeartbeat.cs | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/core/Akka.Cluster/ClusterDaemon.cs b/src/core/Akka.Cluster/ClusterDaemon.cs index 188bfe4f0b2..4dc9c27eb7d 100644 --- a/src/core/Akka.Cluster/ClusterDaemon.cs +++ b/src/core/Akka.Cluster/ClusterDaemon.cs @@ -895,7 +895,7 @@ private void CreateChildren(Cluster cluster) _cluster = cluster; _coreSupervisor = Context.ActorOf(Props.Create(), "core"); - Context.ActorOf(ClusterHeartbeatReceiver.Props(() => _cluster), "heartbeatReceiver"); + Context.ActorOf(ClusterHeartbeatReceiver.Props(cluster), "heartbeatReceiver"); } protected override void PostStop() diff --git a/src/core/Akka.Cluster/ClusterHeartbeat.cs b/src/core/Akka.Cluster/ClusterHeartbeat.cs index a511b68d875..a05754215d3 100644 --- a/src/core/Akka.Cluster/ClusterHeartbeat.cs +++ b/src/core/Akka.Cluster/ClusterHeartbeat.cs @@ -26,16 +26,16 @@ internal sealed class ClusterHeartbeatReceiver : UntypedActor { // Important - don't use Cluster.Get(Context.System) in constructor because that would // cause deadlock. See startup sequence in ClusterDaemon. - private readonly Lazy _cluster; + private readonly Cluster _cluster; - public bool VerboseHeartbeat => _cluster.Value.Settings.VerboseHeartbeatLogging; + public bool VerboseHeartbeat => _cluster.Settings.VerboseHeartbeatLogging; /// /// TBD /// - public ClusterHeartbeatReceiver(Func getCluster) + public ClusterHeartbeatReceiver(Cluster cluster) { - _cluster = new Lazy(getCluster); + _cluster = cluster; } protected override void OnReceive(object message) @@ -44,8 +44,8 @@ protected override void OnReceive(object message) { case ClusterHeartbeatSender.Heartbeat hb: // TODO log the sequence nr once serializer is enabled - if(VerboseHeartbeat) _cluster.Value.CurrentInfoLogger.LogDebug("Heartbeat from [{0}]", hb.From); - Sender.Tell(new ClusterHeartbeatSender.HeartbeatRsp(_cluster.Value.SelfUniqueAddress, + if(VerboseHeartbeat) _cluster.CurrentInfoLogger.LogDebug("Heartbeat from [{0}]", hb.From); + Sender.Tell(new ClusterHeartbeatSender.HeartbeatRsp(_cluster.SelfUniqueAddress, hb.SequenceNr, hb.CreationTimeNanos)); break; default: @@ -54,7 +54,7 @@ protected override void OnReceive(object message) } } - public static Props Props(Func getCluster) + public static Props Props(Cluster getCluster) { return Akka.Actor.Props.Create(() => new ClusterHeartbeatReceiver(getCluster)); } From f4e5e11cd9363cf4d901906567cfe07be038980f Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 20 Apr 2022 19:32:03 -0500 Subject: [PATCH 2/8] fix `HeartbeatSender` --- .../Akka.Cluster.Tests/ClusterHeartbeatReceiverSpec.cs | 2 +- src/core/Akka.Cluster.Tests/ClusterHeartbeatSenderSpec.cs | 2 +- src/core/Akka.Cluster/ClusterDaemon.cs | 2 +- src/core/Akka.Cluster/ClusterHeartbeat.cs | 7 ++++--- 4 files changed, 7 insertions(+), 6 deletions(-) diff --git a/src/core/Akka.Cluster.Tests/ClusterHeartbeatReceiverSpec.cs b/src/core/Akka.Cluster.Tests/ClusterHeartbeatReceiverSpec.cs index 90dd2c6d90c..58963c44875 100644 --- a/src/core/Akka.Cluster.Tests/ClusterHeartbeatReceiverSpec.cs +++ b/src/core/Akka.Cluster.Tests/ClusterHeartbeatReceiverSpec.cs @@ -27,7 +27,7 @@ public ClusterHeartbeatReceiverSpec(ITestOutputHelper output) [Fact] public void ClusterHeartbeatReceiver_should_respond_to_heartbeats_with_same_SeqNo_and_SendTime() { - var heartbeater = Sys.ActorOf(ClusterHeartbeatReceiver.Props(() => Cluster.Get(Sys))); + var heartbeater = Sys.ActorOf(ClusterHeartbeatReceiver.Props(Cluster.Get(Sys))); heartbeater.Tell(new Heartbeat(Cluster.Get(Sys).SelfAddress, 1, 2)); ExpectMsg(new HeartbeatRsp(Cluster.Get(Sys).SelfUniqueAddress, 1, 2)); } diff --git a/src/core/Akka.Cluster.Tests/ClusterHeartbeatSenderSpec.cs b/src/core/Akka.Cluster.Tests/ClusterHeartbeatSenderSpec.cs index fdea539c91b..9fad2354911 100644 --- a/src/core/Akka.Cluster.Tests/ClusterHeartbeatSenderSpec.cs +++ b/src/core/Akka.Cluster.Tests/ClusterHeartbeatSenderSpec.cs @@ -23,7 +23,7 @@ class TestClusterHeartbeatSender : ClusterHeartbeatSender { private readonly TestProbe _probe; - public TestClusterHeartbeatSender(TestProbe probe) + public TestClusterHeartbeatSender(TestProbe probe) : base(Cluster.Get(Context.System)) { _probe = probe; } diff --git a/src/core/Akka.Cluster/ClusterDaemon.cs b/src/core/Akka.Cluster/ClusterDaemon.cs index 4dc9c27eb7d..99666f47e75 100644 --- a/src/core/Akka.Cluster/ClusterDaemon.cs +++ b/src/core/Akka.Cluster/ClusterDaemon.cs @@ -1333,7 +1333,7 @@ private void BecomeInitialized() { // start heartbeatSender here, and not in constructor to make sure that // heartbeating doesn't start before Welcome is received - Context.ActorOf(Props.Create().WithDispatcher(_cluster.Settings.UseDispatcher), + Context.ActorOf(Props.Create(() => new ClusterHeartbeatSender(_cluster)).WithDispatcher(_cluster.Settings.UseDispatcher), "heartbeatSender"); // make sure that join process is stopped StopSeedNodeProcess(); diff --git a/src/core/Akka.Cluster/ClusterHeartbeat.cs b/src/core/Akka.Cluster/ClusterHeartbeat.cs index a05754215d3..501dcc3517b 100644 --- a/src/core/Akka.Cluster/ClusterHeartbeat.cs +++ b/src/core/Akka.Cluster/ClusterHeartbeat.cs @@ -76,11 +76,12 @@ internal class ClusterHeartbeatSender : ReceiveActor private DateTime _tickTimestamp; /// - /// TBD + /// Create a new instance of the and pass in a reference to the + /// to which it belongs. /// - public ClusterHeartbeatSender() + public ClusterHeartbeatSender(Cluster cluster) { - _cluster = Cluster.Get(Context.System); + _cluster = cluster; var tickInitialDelay = _cluster.Settings.PeriodicTasksInitialDelay.Max(_cluster.Settings.HeartbeatInterval); _tickTimestamp = DateTime.UtcNow + tickInitialDelay; From 66ddd887d08d0bbc353b97a4857e47ef32926877 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 20 Apr 2022 19:37:16 -0500 Subject: [PATCH 3/8] cleaned up SBR internals (style) --- .../Akka.Cluster/SBR/SplitBrainResolver.cs | 94 +++++++++---------- .../SBR/SplitBrainResolverProvider.cs | 30 +++--- 2 files changed, 62 insertions(+), 62 deletions(-) diff --git a/src/core/Akka.Cluster/SBR/SplitBrainResolver.cs b/src/core/Akka.Cluster/SBR/SplitBrainResolver.cs index feb711f139c..5a3ba9a6904 100644 --- a/src/core/Akka.Cluster/SBR/SplitBrainResolver.cs +++ b/src/core/Akka.Cluster/SBR/SplitBrainResolver.cs @@ -25,12 +25,12 @@ namespace Akka.Cluster.SBR /// internal class SplitBrainResolver : SplitBrainResolverBase { - private readonly Cluster cluster; + private readonly Cluster _cluster; public SplitBrainResolver(TimeSpan stableAfter, DowningStrategy strategy) : base(stableAfter, strategy) { - cluster = Cluster.Get(Context.System); + _cluster = Cluster.Get(Context.System); Log.Info( "SBR started. Config: strategy [{0}], stable-after [{1}], down-all-when-unstable [{2}], selfUniqueAddress [{3}].", Logging.SimpleName(strategy.GetType()), @@ -41,7 +41,7 @@ public SplitBrainResolver(TimeSpan stableAfter, DowningStrategy strategy) // ReSharper restore VirtualMemberCallInConstructor } - public override UniqueAddress SelfUniqueAddress => cluster.SelfUniqueAddress; + public override UniqueAddress SelfUniqueAddress => _cluster.SelfUniqueAddress; public static Props Props2(TimeSpan stableAfter, DowningStrategy strategy) { @@ -51,20 +51,20 @@ public static Props Props2(TimeSpan stableAfter, DowningStrategy strategy) // re-subscribe when restart protected override void PreStart() { - cluster.Subscribe(Self, InitialStateAsEvents, typeof(IClusterDomainEvent)); + _cluster.Subscribe(Self, InitialStateAsEvents, typeof(IClusterDomainEvent)); base.PreStart(); } protected override void PostStop() { - cluster.Unsubscribe(Self); + _cluster.Unsubscribe(Self); base.PostStop(); } public override void Down(UniqueAddress node, IDecision decision) { Log.Info("SBR is downing [{0}]", node); - cluster.Down(node.Address); + _cluster.Down(node.Address); } } @@ -75,24 +75,24 @@ public override void Down(UniqueAddress node, IDecision decision) internal abstract class SplitBrainResolverBase : ActorBase, IWithUnboundedStash, IWithTimers { // would be better as constructor parameter, but don't want to break Cinnamon instrumentation - private readonly SplitBrainResolverSettings settings; + private readonly SplitBrainResolverSettings _settings; private ILoggingAdapter _log; - private ReachabilityChangedStats reachabilityChangedStats = + private ReachabilityChangedStats _reachabilityChangedStats = new ReachabilityChangedStats(DateTime.UtcNow, DateTime.UtcNow, 0); - private IReleaseLeaseCondition releaseLeaseCondition = ReleaseLeaseCondition.NoLease.Instance; - private bool selfMemberAdded; + private IReleaseLeaseCondition _releaseLeaseCondition = ReleaseLeaseCondition.NoLease.Instance; + private bool _selfMemberAdded; - private Deadline stableDeadline; + private Deadline _stableDeadline; protected SplitBrainResolverBase(TimeSpan stableAfter, DowningStrategy strategy) { StableAfter = stableAfter; Strategy = strategy; - settings = new SplitBrainResolverSettings(Context.System.Settings.Config); + _settings = new SplitBrainResolverSettings(Context.System.Settings.Config); // ReSharper disable once VirtualMemberCallInConstructor Timers.StartPeriodicTimer(Tick.Instance, Tick.Instance, TickInterval); @@ -110,13 +110,13 @@ protected SplitBrainResolverBase(TimeSpan stableAfter, DowningStrategy strategy) public abstract UniqueAddress SelfUniqueAddress { get; } - public virtual TimeSpan DownAllWhenUnstable => settings.DownAllWhenUnstable; + public virtual TimeSpan DownAllWhenUnstable => _settings.DownAllWhenUnstable; public virtual TimeSpan TickInterval => TimeSpan.FromSeconds(1); protected bool Leader { get; private set; } - public bool IsResponsible => Leader && selfMemberAdded; + public bool IsResponsible => Leader && _selfMemberAdded; public ITimerScheduler Timers { get; set; } @@ -135,18 +135,18 @@ protected virtual Deadline NewStableDeadline() public void ResetStableDeadline() { - stableDeadline = NewStableDeadline(); + _stableDeadline = NewStableDeadline(); } private void ResetReachabilityChangedStats() { var now = DateTime.UtcNow; - reachabilityChangedStats = new ReachabilityChangedStats(now, now, 0); + _reachabilityChangedStats = new ReachabilityChangedStats(now, now, 0); } private void ResetReachabilityChangedStatsIfAllUnreachableDowned() { - if (!reachabilityChangedStats.IsEmpty && Strategy.IsAllUnreachableDownOrExiting) + if (!_reachabilityChangedStats.IsEmpty && Strategy.IsAllUnreachableDownOrExiting) { Log.Debug("SBR resetting reachability stats, after all unreachable healed, downed or removed"); ResetReachabilityChangedStats(); @@ -215,13 +215,13 @@ public void MutateResponsibilityInfo(Action f) else if (responsibleBefore && !responsibleAfter) Log.Info("This node is not the leader any more and not responsible for taking SBR decisions."); - if (Leader && !selfMemberAdded) + if (Leader && !_selfMemberAdded) Log.Debug("This node is leader but !selfMemberAdded."); } protected override void PostStop() { - if (!(releaseLeaseCondition is ReleaseLeaseCondition.NoLease)) + if (!(_releaseLeaseCondition is ReleaseLeaseCondition.NoLease)) Log.Info( "SBR is stopped and owns the lease. The lease will not be released until after the " + "lease heartbeat-timeout."); @@ -292,11 +292,11 @@ private void OnTick() { // note the DownAll due to instability is running on all nodes to make that decision as quickly and // aggressively as possible if time is out - if (reachabilityChangedStats.ChangeCount > 0) + if (_reachabilityChangedStats.ChangeCount > 0) { var now = DateTime.UtcNow; - var durationSinceLatestChange = now - reachabilityChangedStats.LatestChangeTimestamp; - var durationSinceFirstChange = now - reachabilityChangedStats.FirstChangeTimestamp; + var durationSinceLatestChange = now - _reachabilityChangedStats.LatestChangeTimestamp; + var durationSinceFirstChange = now - _reachabilityChangedStats.FirstChangeTimestamp; var downAllWhenUnstableEnabled = DownAllWhenUnstable > TimeSpan.Zero; if (downAllWhenUnstableEnabled && durationSinceFirstChange > StableAfter + DownAllWhenUnstable) @@ -304,7 +304,7 @@ private void OnTick() Log.Warning( //ClusterLogMarker.sbrInstability, "SBR detected instability and will down all nodes: {0}", - reachabilityChangedStats); + _reachabilityChangedStats); ActOnDecision(DownAll.Instance); } else if (!downAllWhenUnstableEnabled && durationSinceLatestChange > StableAfter + StableAfter) @@ -316,7 +316,7 @@ private void OnTick() } } - if (IsResponsible && !Strategy.Unreachable.IsEmpty && stableDeadline.IsOverdue) + if (IsResponsible && !Strategy.Unreachable.IsEmpty && _stableDeadline.IsOverdue) switch (Strategy.Decide()) { case IAcquireLeaseDecision decision: @@ -360,7 +360,7 @@ private void OnTick() break; } - switch (releaseLeaseCondition) + switch (_releaseLeaseCondition) { case ReleaseLeaseCondition.WhenTimeElapsed rlc: if (rlc.Deadline.IsOverdue) @@ -398,18 +398,18 @@ bool ReceiveLease(object message) { Log.Info("SBR acquired lease for decision [{0}]", decision); var downedNodes = ActOnDecision(decision); - switch (releaseLeaseCondition) + switch (_releaseLeaseCondition) { case ReleaseLeaseCondition.WhenMembersRemoved rlc: - releaseLeaseCondition = + _releaseLeaseCondition = new ReleaseLeaseCondition.WhenMembersRemoved(rlc.Nodes.Union(downedNodes)); break; default: if (downedNodes.IsEmpty) - releaseLeaseCondition = + _releaseLeaseCondition = new ReleaseLeaseCondition.WhenTimeElapsed(Deadline.Now + ReleaseLeaseAfter); else - releaseLeaseCondition = + _releaseLeaseCondition = new ReleaseLeaseCondition.WhenMembersRemoved(downedNodes); break; } @@ -422,7 +422,7 @@ bool ReceiveLease(object message) decision, reverseDecision); ActOnDecision(reverseDecision); - releaseLeaseCondition = ReleaseLeaseCondition.NoLease.Instance; + _releaseLeaseCondition = ReleaseLeaseCondition.NoLease.Instance; } Stash.UnstashAll(); @@ -446,13 +446,13 @@ bool ReceiveLease(object message) private void OnReleaseLeaseResult(bool released) { - switch (releaseLeaseCondition) + switch (_releaseLeaseCondition) { case ReleaseLeaseCondition.WhenTimeElapsed rlc: if (released && rlc.Deadline.IsOverdue) { Log.Info("SBR released lease."); - releaseLeaseCondition = ReleaseLeaseCondition.NoLease.Instance; // released successfully + _releaseLeaseCondition = ReleaseLeaseCondition.NoLease.Instance; // released successfully } break; @@ -524,8 +524,8 @@ public void UnreachableMember(Member m) Strategy.AddUnreachable(m); UpdateReachabilityChangedStats(); ResetReachabilityChangedStatsIfAllUnreachableDowned(); - if (!reachabilityChangedStats.IsEmpty) - Log.Debug("SBR noticed {0}", reachabilityChangedStats); + if (!_reachabilityChangedStats.IsEmpty) + Log.Debug("SBR noticed {0}", _reachabilityChangedStats); }); } } @@ -540,8 +540,8 @@ public void ReachableMember(Member m) Strategy.AddReachable(m); UpdateReachabilityChangedStats(); ResetReachabilityChangedStatsIfAllUnreachableDowned(); - if (!reachabilityChangedStats.IsEmpty) - Log.Debug("SBR noticed {0}", reachabilityChangedStats); + if (!_reachabilityChangedStats.IsEmpty) + Log.Debug("SBR noticed {0}", _reachabilityChangedStats); }); } } @@ -554,13 +554,13 @@ private void ReachabilityChanged(Reachability r) private void UpdateReachabilityChangedStats() { var now = DateTime.UtcNow; - if (reachabilityChangedStats.ChangeCount == 0) - reachabilityChangedStats = new ReachabilityChangedStats(now, now, 1); + if (_reachabilityChangedStats.ChangeCount == 0) + _reachabilityChangedStats = new ReachabilityChangedStats(now, now, 1); else - reachabilityChangedStats = new ReachabilityChangedStats( - reachabilityChangedStats.FirstChangeTimestamp, + _reachabilityChangedStats = new ReachabilityChangedStats( + _reachabilityChangedStats.FirstChangeTimestamp, now, - reachabilityChangedStats.ChangeCount + 1 + _reachabilityChangedStats.ChangeCount + 1 ); } @@ -576,7 +576,7 @@ public void AddUp(Member m) { Strategy.Add(m); if (m.UniqueAddress.Equals(SelfUniqueAddress)) - MutateResponsibilityInfo(() => { selfMemberAdded = true; }); + MutateResponsibilityInfo(() => { _selfMemberAdded = true; }); }); switch (Strategy) { @@ -617,7 +617,7 @@ public void AddJoining(Member m) public void AddWeaklyUp(Member m) { if (m.UniqueAddress.Equals(SelfUniqueAddress)) - MutateResponsibilityInfo(() => { selfMemberAdded = true; }); + MutateResponsibilityInfo(() => { _selfMemberAdded = true; }); // treat WeaklyUp in same way as joining AddJoining(m); } @@ -634,16 +634,16 @@ public void Remove(Member m) ResetReachabilityChangedStatsIfAllUnreachableDowned(); - switch (releaseLeaseCondition) + switch (_releaseLeaseCondition) { case ReleaseLeaseCondition.WhenMembersRemoved rlc: var remainingDownedNodes = rlc.Nodes.Remove(m.UniqueAddress); if (remainingDownedNodes.IsEmpty) - releaseLeaseCondition = + _releaseLeaseCondition = new ReleaseLeaseCondition.WhenTimeElapsed(Deadline.Now + ReleaseLeaseAfter); else - releaseLeaseCondition = + _releaseLeaseCondition = new ReleaseLeaseCondition.WhenMembersRemoved(remainingDownedNodes); break; } @@ -654,7 +654,7 @@ private void ReleaseLease() { // implicit val ec: ExecutionContext = internalDispatcher if (Strategy.Lease != null) - if (!(releaseLeaseCondition is ReleaseLeaseCondition.NoLease)) + if (!(_releaseLeaseCondition is ReleaseLeaseCondition.NoLease)) { Log.Debug("SBR releasing lease"); Strategy.Lease.Release().ContinueWith(r => new ReleaseLeaseResult(!r.IsFaulted ? r.Result : false)) diff --git a/src/core/Akka.Cluster/SBR/SplitBrainResolverProvider.cs b/src/core/Akka.Cluster/SBR/SplitBrainResolverProvider.cs index b150976820a..b1a809229c7 100644 --- a/src/core/Akka.Cluster/SBR/SplitBrainResolverProvider.cs +++ b/src/core/Akka.Cluster/SBR/SplitBrainResolverProvider.cs @@ -19,13 +19,13 @@ namespace Akka.Cluster.SBR /// public class SplitBrainResolverProvider : IDowningProvider { - private readonly SplitBrainResolverSettings settings; - private readonly ActorSystem system; + private readonly SplitBrainResolverSettings _settings; + private readonly ActorSystem _system; public SplitBrainResolverProvider(ActorSystem system) { - this.system = system; - settings = new SplitBrainResolverSettings(system.Settings.Config); + _system = system; + _settings = new SplitBrainResolverSettings(system.Settings.Config); } public TimeSpan DownRemovalMargin @@ -35,11 +35,11 @@ public TimeSpan DownRemovalMargin // if down-removal-margin is defined we let it trump stable-after to allow // for two different values for SBR downing and cluster tool stop/start after downing #pragma warning disable CS0618 // Type or member is obsolete - var drm = Cluster.Get(system).Settings.DownRemovalMargin; + var drm = Cluster.Get(_system).Settings.DownRemovalMargin; #pragma warning restore CS0618 // Type or member is obsolete if (drm != TimeSpan.Zero) return drm; - return settings.DowningStableAfter; + return _settings.DowningStableAfter; } } @@ -48,28 +48,28 @@ public Props DowningActorProps get { DowningStrategy strategy; - switch (settings.DowningStrategy) + switch (_settings.DowningStrategy) { case SplitBrainResolverSettings.KeepMajorityName: - strategy = new KeepMajority(settings.KeepMajorityRole); + strategy = new KeepMajority(_settings.KeepMajorityRole); break; case SplitBrainResolverSettings.StaticQuorumName: - var sqs = settings.StaticQuorumSettings; + var sqs = _settings.StaticQuorumSettings; strategy = new StaticQuorum(sqs.Size, sqs.Role); break; case SplitBrainResolverSettings.KeepOldestName: - var kos = settings.KeepOldestSettings; + var kos = _settings.KeepOldestSettings; strategy = new KeepOldest(kos.DownIfAlone, kos.Role); break; case SplitBrainResolverSettings.DownAllName: strategy = new DownAllNodes(); break; case SplitBrainResolverSettings.LeaseMajorityName: - var lms = settings.LeaseMajoritySettings; - var leaseOwnerName = Cluster.Get(system).SelfUniqueAddress.Address.HostPort(); + var lms = _settings.LeaseMajoritySettings; + var leaseOwnerName = Cluster.Get(_system).SelfUniqueAddress.Address.HostPort(); - var leaseName = lms.SafeLeaseName(system.Name); - var lease = LeaseProvider.Get(system).GetLease(leaseName, lms.LeaseImplementation, leaseOwnerName); + var leaseName = lms.SafeLeaseName(_system.Name); + var lease = LeaseProvider.Get(_system).GetLease(leaseName, lms.LeaseImplementation, leaseOwnerName); strategy = new LeaseMajority(lms.Role, lease, lms.AcquireLeaseDelayForMinority, lms.ReleaseAfter); break; @@ -77,7 +77,7 @@ public Props DowningActorProps throw new InvalidOperationException(); } - return SplitBrainResolver.Props2(settings.DowningStableAfter, strategy); + return SplitBrainResolver.Props2(_settings.DowningStableAfter, strategy); } } } From 70f7248f11868b973d30e508ba612f65fe3e16fc Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 20 Apr 2022 19:40:20 -0500 Subject: [PATCH 4/8] cleaned up some comments --- src/core/Akka.Cluster/Cluster.cs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/core/Akka.Cluster/Cluster.cs b/src/core/Akka.Cluster/Cluster.cs index 4beb07754c7..c9c2d7dfbac 100644 --- a/src/core/Akka.Cluster/Cluster.cs +++ b/src/core/Akka.Cluster/Cluster.cs @@ -520,7 +520,7 @@ public ImmutableHashSet SelfRoles public DefaultFailureDetectorRegistry
FailureDetector { get; } /// - /// TBD + /// The downing provider used to execute automatic downing inside Akka.Cluster. /// public IDowningProvider DowningProvider => _downingProvider.Value; @@ -535,7 +535,6 @@ public ImmutableHashSet SelfRoles private static IScheduler CreateScheduler(ActorSystem system) { - //TODO: Whole load of stuff missing here! return system.Scheduler; } From 26d12af5c8d3fb517b0bc4d3d98b3e865a3b06a2 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 20 Apr 2022 20:12:22 -0500 Subject: [PATCH 5/8] asynchronously attempt to acquire `Cluster` inside SBR --- RELEASE_NOTES.md | 3 + src/common.props | 2 +- .../Akka.Cluster/SBR/SplitBrainResolver.cs | 59 +++++++++++++++---- 3 files changed, 53 insertions(+), 11 deletions(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 29fd339617a..79c9769deaa 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,3 +1,6 @@ +#### 1.4.38 April 14 2022 #### +**Placeholder for nightlies** + #### 1.4.37 April 14 2022 #### Akka.NET v1.4.37 is a minor release that contains some minor bug fixes. diff --git a/src/common.props b/src/common.props index 264892c85b5..38bfeffa7f9 100644 --- a/src/common.props +++ b/src/common.props @@ -2,7 +2,7 @@ Copyright © 2013-2021 Akka.NET Team Akka.NET Team - 1.4.34 + 1.4.38 akkalogo.png https://github.com/akkadotnet/akka.net https://github.com/akkadotnet/akka.net/blob/master/LICENSE diff --git a/src/core/Akka.Cluster/SBR/SplitBrainResolver.cs b/src/core/Akka.Cluster/SBR/SplitBrainResolver.cs index 5a3ba9a6904..ef287f37cc7 100644 --- a/src/core/Akka.Cluster/SBR/SplitBrainResolver.cs +++ b/src/core/Akka.Cluster/SBR/SplitBrainResolver.cs @@ -25,20 +25,44 @@ namespace Akka.Cluster.SBR /// internal class SplitBrainResolver : SplitBrainResolverBase { - private readonly Cluster _cluster; + private sealed class AcquireCluster : INoSerializationVerificationNeeded + { + public static readonly AcquireCluster Instance = new AcquireCluster(); + private AcquireCluster(){} + } + + private const string AcquireClusterKey = "acquire-cluster-key"; + private Cluster _cluster; public SplitBrainResolver(TimeSpan stableAfter, DowningStrategy strategy) : base(stableAfter, strategy) { - _cluster = Cluster.Get(Context.System); - Log.Info( - "SBR started. Config: strategy [{0}], stable-after [{1}], down-all-when-unstable [{2}], selfUniqueAddress [{3}].", - Logging.SimpleName(strategy.GetType()), - stableAfter, - // ReSharper disable VirtualMemberCallInConstructor - DownAllWhenUnstable == TimeSpan.Zero ? "off" : DownAllWhenUnstable.ToString(), - SelfUniqueAddress.Address); + // ReSharper restore VirtualMemberCallInConstructor + + Context.Become(WaitingForCluster); + } + + private bool WaitingForCluster(object message) + { + if (message is AcquireCluster) + { + if (!CanAcquireCluster()) return true; + + Log.Info( + "SBR started. Config: strategy [{0}], stable-after [{1}], down-all-when-unstable [{2}], selfUniqueAddress [{3}].", + Logging.SimpleName(Strategy.GetType()), + StableAfter, + // ReSharper disable VirtualMemberCallInConstructor + DownAllWhenUnstable == TimeSpan.Zero ? "off" : DownAllWhenUnstable.ToString(), + SelfUniqueAddress.Address); + Context.Become(base.Receive); + return true; + } + else + { + return false; + } } public override UniqueAddress SelfUniqueAddress => _cluster.SelfUniqueAddress; @@ -51,10 +75,25 @@ public static Props Props2(TimeSpan stableAfter, DowningStrategy strategy) // re-subscribe when restart protected override void PreStart() { - _cluster.Subscribe(Self, InitialStateAsEvents, typeof(IClusterDomainEvent)); + CanAcquireCluster(); + base.PreStart(); } + private bool CanAcquireCluster() + { + try + { + _cluster = Cluster.Get(Context.System); + _cluster.Subscribe(Self, InitialStateAsEvents, typeof(IClusterDomainEvent)); + } + catch (Exception ex) + { + Timers.StartSingleTimer(AcquireClusterKey, AcquireCluster.Instance, TimeSpan.FromSeconds(0.5)); + Log.Warning("Received error when trying to resolve Cluster - retrying in 500ms"); + } + } + protected override void PostStop() { _cluster.Unsubscribe(Self); From 1002e2b154360e35ba7143367b77ecbdb0f7d71c Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 20 Apr 2022 20:18:46 -0500 Subject: [PATCH 6/8] fixed SBR compilation --- src/core/Akka.Cluster/SBR/SplitBrainResolver.cs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/core/Akka.Cluster/SBR/SplitBrainResolver.cs b/src/core/Akka.Cluster/SBR/SplitBrainResolver.cs index ef287f37cc7..bc900f0c492 100644 --- a/src/core/Akka.Cluster/SBR/SplitBrainResolver.cs +++ b/src/core/Akka.Cluster/SBR/SplitBrainResolver.cs @@ -86,11 +86,13 @@ private bool CanAcquireCluster() { _cluster = Cluster.Get(Context.System); _cluster.Subscribe(Self, InitialStateAsEvents, typeof(IClusterDomainEvent)); + return true; } catch (Exception ex) { Timers.StartSingleTimer(AcquireClusterKey, AcquireCluster.Instance, TimeSpan.FromSeconds(0.5)); Log.Warning("Received error when trying to resolve Cluster - retrying in 500ms"); + return false; } } From 5966640f403a2bc37d9748ceb94ba250576cf4e3 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Thu, 21 Apr 2022 08:30:16 -0500 Subject: [PATCH 7/8] Update SplitBrainResolver.cs --- src/core/Akka.Cluster/SBR/SplitBrainResolver.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core/Akka.Cluster/SBR/SplitBrainResolver.cs b/src/core/Akka.Cluster/SBR/SplitBrainResolver.cs index bc900f0c492..85e700fc688 100644 --- a/src/core/Akka.Cluster/SBR/SplitBrainResolver.cs +++ b/src/core/Akka.Cluster/SBR/SplitBrainResolver.cs @@ -90,8 +90,8 @@ private bool CanAcquireCluster() } catch (Exception ex) { - Timers.StartSingleTimer(AcquireClusterKey, AcquireCluster.Instance, TimeSpan.FromSeconds(0.5)); - Log.Warning("Received error when trying to resolve Cluster - retrying in 500ms"); + Timers.StartSingleTimer(AcquireClusterKey, AcquireCluster.Instance, TimeSpan.FromMilliseconds(20)); + Log.Warning("Received error when trying to resolve Cluster - retrying in 20ms"); return false; } } From 08a7b5cbaf1fc788bbdcd8e87982b55fdf962fe0 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Thu, 5 May 2022 14:51:22 -0500 Subject: [PATCH 8/8] subscribe on PreStart --- .../Akka.Cluster/SBR/SplitBrainResolver.cs | 53 ++----------------- 1 file changed, 3 insertions(+), 50 deletions(-) diff --git a/src/core/Akka.Cluster/SBR/SplitBrainResolver.cs b/src/core/Akka.Cluster/SBR/SplitBrainResolver.cs index 85e700fc688..12c203e2cee 100644 --- a/src/core/Akka.Cluster/SBR/SplitBrainResolver.cs +++ b/src/core/Akka.Cluster/SBR/SplitBrainResolver.cs @@ -25,44 +25,11 @@ namespace Akka.Cluster.SBR /// internal class SplitBrainResolver : SplitBrainResolverBase { - private sealed class AcquireCluster : INoSerializationVerificationNeeded - { - public static readonly AcquireCluster Instance = new AcquireCluster(); - private AcquireCluster(){} - } - - private const string AcquireClusterKey = "acquire-cluster-key"; private Cluster _cluster; public SplitBrainResolver(TimeSpan stableAfter, DowningStrategy strategy) : base(stableAfter, strategy) { - - // ReSharper restore VirtualMemberCallInConstructor - - Context.Become(WaitingForCluster); - } - - private bool WaitingForCluster(object message) - { - if (message is AcquireCluster) - { - if (!CanAcquireCluster()) return true; - - Log.Info( - "SBR started. Config: strategy [{0}], stable-after [{1}], down-all-when-unstable [{2}], selfUniqueAddress [{3}].", - Logging.SimpleName(Strategy.GetType()), - StableAfter, - // ReSharper disable VirtualMemberCallInConstructor - DownAllWhenUnstable == TimeSpan.Zero ? "off" : DownAllWhenUnstable.ToString(), - SelfUniqueAddress.Address); - Context.Become(base.Receive); - return true; - } - else - { - return false; - } } public override UniqueAddress SelfUniqueAddress => _cluster.SelfUniqueAddress; @@ -75,26 +42,12 @@ public static Props Props2(TimeSpan stableAfter, DowningStrategy strategy) // re-subscribe when restart protected override void PreStart() { - CanAcquireCluster(); + _cluster = Cluster.Get(Context.System); + _cluster.Subscribe(Self, InitialStateAsEvents, typeof(IClusterDomainEvent)); base.PreStart(); } - - private bool CanAcquireCluster() - { - try - { - _cluster = Cluster.Get(Context.System); - _cluster.Subscribe(Self, InitialStateAsEvents, typeof(IClusterDomainEvent)); - return true; - } - catch (Exception ex) - { - Timers.StartSingleTimer(AcquireClusterKey, AcquireCluster.Instance, TimeSpan.FromMilliseconds(20)); - Log.Warning("Received error when trying to resolve Cluster - retrying in 20ms"); - return false; - } - } + protected override void PostStop() {