diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCluster.verified.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCluster.verified.txt index da4d3dae1d1..f904b9eea13 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCluster.verified.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCluster.verified.txt @@ -19,7 +19,7 @@ namespace Akka.Cluster { public sealed class AutoDowning : Akka.Cluster.IDowningProvider { - public AutoDowning(Akka.Actor.ActorSystem system) { } + public AutoDowning(Akka.Actor.ActorSystem system, Akka.Cluster.Cluster cluster) { } public System.TimeSpan DownRemovalMargin { get; } public Akka.Actor.Props DowningActorProps { get; } } @@ -266,13 +266,13 @@ namespace Akka.Cluster } public sealed class NoDowning : Akka.Cluster.IDowningProvider { - public NoDowning(Akka.Actor.ActorSystem system) { } + public NoDowning(Akka.Actor.ActorSystem system, Akka.Cluster.Cluster cluster) { } public System.TimeSpan DownRemovalMargin { get; } public Akka.Actor.Props DowningActorProps { get; } } public sealed class SplitBrainResolver : Akka.Cluster.IDowningProvider { - public SplitBrainResolver(Akka.Actor.ActorSystem system) { } + public SplitBrainResolver(Akka.Actor.ActorSystem system, Akka.Cluster.Cluster cluster) { } public System.TimeSpan DownRemovalMargin { get; } public Akka.Actor.Props DowningActorProps { get; } public System.TimeSpan StableAfter { get; } @@ -417,7 +417,7 @@ namespace Akka.Cluster.SBR } public class SplitBrainResolverProvider : Akka.Cluster.IDowningProvider { - public SplitBrainResolverProvider(Akka.Actor.ActorSystem system) { } + public SplitBrainResolverProvider(Akka.Actor.ActorSystem system, Akka.Cluster.Cluster cluster) { } public System.TimeSpan DownRemovalMargin { get; } public Akka.Actor.Props DowningActorProps { get; } } diff --git a/src/core/Akka.Cluster.Tests/Bugfix5962Spec.cs b/src/core/Akka.Cluster.Tests/Bugfix5962Spec.cs new file mode 100644 index 00000000000..3092fb47bea --- /dev/null +++ b/src/core/Akka.Cluster.Tests/Bugfix5962Spec.cs @@ -0,0 +1,88 @@ +// //----------------------------------------------------------------------- +// // +// // Copyright (C) 2009-2022 Lightbend Inc. +// // Copyright (C) 2013-2022 .NET Foundation +// // +// //----------------------------------------------------------------------- + +using System; +using System.Threading.Tasks; +using Akka.Configuration; +using Akka.TestKit; +using FluentAssertions; +using FluentAssertions.Extensions; +using Xunit; +using Xunit.Abstractions; +using static FluentAssertions.FluentActions; + + +namespace Akka.Cluster.Tests +{ + public class Bugfix5962Spec : TestKit.Xunit2.TestKit + { + private static readonly Config Config = ConfigurationFactory.ParseString(@" +akka { + loglevel = INFO + actor { + provider = cluster + default-dispatcher = { + executor = channel-executor + channel-executor.priority = normal + } + # Adding this part in combination with the SplitBrainResolverProvider causes the error + internal-dispatcher = { + executor = channel-executor + channel-executor.priority = high + } + } + remote { + dot-netty.tcp { + port = 15508 + hostname = ""127.0.0.1"" + } + default-remote-dispatcher { + executor = channel-executor + channel-executor.priority = high + } + backoff-remote-dispatcher { + executor = channel-executor + channel-executor.priority = low + } + } + cluster { + seed-nodes = [""akka.tcp://Bugfix5962Spec@127.0.0.1:15508""] + downing-provider-class = ""Akka.Cluster.SBR.SplitBrainResolverProvider, Akka.Cluster"" + } +}"); + + private readonly Type _timerMsgType; + + public Bugfix5962Spec(ITestOutputHelper output): base(Config, nameof(Bugfix5962Spec), output) + { + _timerMsgType = Type.GetType("Akka.Actor.Scheduler.TimerScheduler+TimerMsg, Akka"); + } + + [Fact] + public async Task SBR_Should_work_with_channel_executor() + { + var latch = new TestLatch(1); + var cluster = Cluster.Get(Sys); + cluster.RegisterOnMemberUp(() => + { + latch.CountDown(); + }); + + var selection = Sys.ActorSelection("akka://Bugfix5962Spec/system/cluster/core/daemon/downingProvider"); + + await Awaiting(() => selection.ResolveOne(1.Seconds())) + .Should().NotThrowAsync("Downing provider should be alive. ActorSelection will throw an ActorNotFoundException if this fails"); + + // There should be no TimerMsg being sent to dead letter, this signals that the downing provider is dead + await EventFilter.DeadLetter(_timerMsgType).ExpectAsync(0, async () => + { + latch.Ready(1.Seconds()); + await Task.Delay(2.Seconds()); + }); + } + } +} \ No newline at end of file diff --git a/src/core/Akka.Cluster.Tests/DowningProviderSpec.cs b/src/core/Akka.Cluster.Tests/DowningProviderSpec.cs index 6a59d13700d..6e524afa583 100644 --- a/src/core/Akka.Cluster.Tests/DowningProviderSpec.cs +++ b/src/core/Akka.Cluster.Tests/DowningProviderSpec.cs @@ -16,9 +16,9 @@ namespace Akka.Cluster.Tests { - class FailingDowningProvider : IDowningProvider + internal class FailingDowningProvider : IDowningProvider { - public FailingDowningProvider(ActorSystem system) + public FailingDowningProvider(ActorSystem system, Cluster cluster) { } @@ -36,7 +36,7 @@ public Props DowningActorProps class DummyDowningProvider : IDowningProvider { public readonly AtomicBoolean ActorPropsAccessed = new AtomicBoolean(false); - public DummyDowningProvider(ActorSystem system) + public DummyDowningProvider(ActorSystem system, Cluster cluster) { } diff --git a/src/core/Akka.Cluster.Tests/StartupWithOneThreadSpec.cs b/src/core/Akka.Cluster.Tests/StartupWithOneThreadSpec.cs index 942a46b0c19..b20e70d60b3 100644 --- a/src/core/Akka.Cluster.Tests/StartupWithOneThreadSpec.cs +++ b/src/core/Akka.Cluster.Tests/StartupWithOneThreadSpec.cs @@ -7,6 +7,7 @@ using System; using System.Threading; +using System.Threading.Tasks; using Akka.Actor; using Akka.Actor.Dsl; using Akka.Configuration; @@ -14,6 +15,7 @@ using Akka.TestKit; using Akka.Util; using Xunit; +using Xunit.Abstractions; namespace Akka.Cluster.Tests { @@ -25,12 +27,13 @@ public class StartupWithOneThreadSpec : AkkaSpec akka.actor.default-dispatcher.dedicated-thread-pool.thread-count = 1 akka.actor.provider = ""Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"" akka.remote.dot-netty.tcp.port = 0 + akka.cluster.downing-provider-class = ""Akka.Cluster.SBR.SplitBrainResolverProvider, Akka.Cluster"" + akka.cluster.split-brain-resolver.active-strategy = keep-majority "); private long _startTime; - public StartupWithOneThreadSpec() : base(Configuration) - { + public StartupWithOneThreadSpec(ITestOutputHelper output) : base(Configuration, output) { _startTime = MonotonicClock.GetTicks(); } @@ -53,7 +56,7 @@ private Props TestProps } [Fact] - public void A_cluster_must_startup_with_one_dispatcher_thread() + public async Task A_cluster_must_startup_with_one_dispatcher_thread() { // This test failed before fixing https://github.com/akkadotnet/akka.net/issues/1959 when adding a sleep before the // Await of GetClusterCoreRef in the Cluster extension constructor. @@ -75,6 +78,11 @@ public void A_cluster_must_startup_with_one_dispatcher_thread() ExpectMsg("hello"); ExpectMsg("hello"); ExpectMsg("hello"); + + // perform a self-join + var cts = new CancellationTokenSource(TimeSpan.FromSeconds((3))); + var selfAddress = cluster.SelfAddress; + await cluster.JoinSeedNodesAsync(new[] { selfAddress }, cts.Token); } } } diff --git a/src/core/Akka.Cluster/AutoDown.cs b/src/core/Akka.Cluster/AutoDown.cs index a78ebb93250..2b2cd673f65 100644 --- a/src/core/Akka.Cluster/AutoDown.cs +++ b/src/core/Akka.Cluster/AutoDown.cs @@ -30,10 +30,11 @@ internal sealed class AutoDown : AutoDownBase /// TBD /// /// TBD + /// /// TBD - public static Props Props(TimeSpan autoDownUnreachableAfter) + public static Props Props(TimeSpan autoDownUnreachableAfter, Cluster cluster) { - return Actor.Props.Create(autoDownUnreachableAfter); + return Actor.Props.Create(() => new AutoDown(autoDownUnreachableAfter, cluster)); } /// @@ -76,14 +77,10 @@ public override int GetHashCode() } private readonly Cluster _cluster; - - /// - /// TBD - /// - /// TBD - public AutoDown(TimeSpan autoDownUnreachableAfter) : base(autoDownUnreachableAfter) + + public AutoDown(TimeSpan autoDownUnreachableAfter, Cluster cluster) : base(autoDownUnreachableAfter) { - _cluster = Cluster.Get(Context.System); + _cluster = cluster; } /// @@ -276,20 +273,18 @@ private void Remove(UniqueAddress node) public sealed class AutoDowning : IDowningProvider { private readonly ActorSystem _system; - - /// - /// TBD - /// - /// TBD - public AutoDowning(ActorSystem system) + private readonly Cluster _cluster; + + public AutoDowning(ActorSystem system, Cluster cluster) { _system = system; + _cluster = cluster; } /// /// TBD /// - public TimeSpan DownRemovalMargin => Cluster.Get(_system).Settings.DownRemovalMargin; + public TimeSpan DownRemovalMargin => _cluster.Settings.DownRemovalMargin; /// /// TBD @@ -301,11 +296,11 @@ public Props DowningActorProps { get { - var autoDownUnreachableAfter = Cluster.Get(_system).Settings.AutoDownUnreachableAfter; + var autoDownUnreachableAfter = _cluster.Settings.AutoDownUnreachableAfter; if (!autoDownUnreachableAfter.HasValue) throw new ConfigurationException("AutoDowning downing provider selected but 'akka.cluster.auto-down-unreachable-after' not set"); - return AutoDown.Props(autoDownUnreachableAfter.Value); + return AutoDown.Props(autoDownUnreachableAfter.Value, _cluster); } } } diff --git a/src/core/Akka.Cluster/Cluster.cs b/src/core/Akka.Cluster/Cluster.cs index c9c2d7dfbac..966a6bac633 100644 --- a/src/core/Akka.Cluster/Cluster.cs +++ b/src/core/Akka.Cluster/Cluster.cs @@ -132,7 +132,7 @@ public Cluster(ActorSystemImpl system) Scheduler = CreateScheduler(system); // it has to be lazy - otherwise if downing provider will init a cluster itself, it will deadlock - _downingProvider = new Lazy(() => Akka.Cluster.DowningProvider.Load(Settings.DowningProviderType, system), LazyThreadSafetyMode.ExecutionAndPublication); + _downingProvider = new Lazy(() => Akka.Cluster.DowningProvider.Load(Settings.DowningProviderType, system, this), LazyThreadSafetyMode.ExecutionAndPublication); //create supervisor for daemons under path "/system/cluster" _clusterDaemons = system.SystemActorOf(Props.Create(() => new ClusterDaemon(Settings)).WithDeploy(Deploy.Local), "cluster"); diff --git a/src/core/Akka.Cluster/Configuration/Cluster.conf b/src/core/Akka.Cluster/Configuration/Cluster.conf index ad7557168fa..a708faee3af 100644 --- a/src/core/Akka.Cluster/Configuration/Cluster.conf +++ b/src/core/Akka.Cluster/Configuration/Cluster.conf @@ -59,8 +59,10 @@ akka { # * if it is 'off' the `NoDowning` provider is used and no automatic downing will be performed # * if it is set to a duration the `AutoDowning` provider is with the configured downing duration # - # If specified the value must be the fully qualified class name of a subclass of - # `akka.cluster.DowningProvider` having a public one argument constructor accepting an `ActorSystem` + # If specified the value must be the fully qualified class name of an implementation of + # `Akka.Cluster.IDowningProvider` having two argument constructor: + # - argument 1: accepting an `ActorSystem` + # - argument 2: accepting an `Akka.Cluster.Cluster` downing-provider-class = "" # If this is set to "off", the leader will not move 'Joining' members to 'Up' during a network diff --git a/src/core/Akka.Cluster/DowningProvider.cs b/src/core/Akka.Cluster/DowningProvider.cs index 9d5f374b7b5..eeafd4c8e61 100644 --- a/src/core/Akka.Cluster/DowningProvider.cs +++ b/src/core/Akka.Cluster/DowningProvider.cs @@ -45,20 +45,18 @@ public interface IDowningProvider public sealed class NoDowning : IDowningProvider { private readonly ActorSystem _system; - - /// - /// TBD - /// - /// TBD - public NoDowning(ActorSystem system) + private readonly Cluster _cluster; + + public NoDowning(ActorSystem system, Cluster cluster) { _system = system; + _cluster = cluster; } /// /// TBD /// - public TimeSpan DownRemovalMargin => Cluster.Get(_system).Settings.DownRemovalMargin; + public TimeSpan DownRemovalMargin => _cluster.Settings.DownRemovalMargin; /// /// TBD @@ -72,20 +70,25 @@ public NoDowning(ActorSystem system) internal static class DowningProvider { /// - /// TBD + /// Loads the from configuration and instantiates it via reflection. /// /// TBD /// TBD + /// The current cluster object. /// /// This exception is thrown when the specified does not implement . /// - /// TBD - public static IDowningProvider Load(Type downingProviderType, ActorSystem system) + /// The activated + /// + /// Required to pass in manually here since https://github.com/akkadotnet/akka.net/issues/5962 + /// can cause the SBR startup to fail when running with the `channel-executor`. + /// + public static IDowningProvider Load(Type downingProviderType, ActorSystem system, Cluster cluster) { var extendedSystem = system as ExtendedActorSystem; try { - return (IDowningProvider)Activator.CreateInstance(downingProviderType, extendedSystem); + return (IDowningProvider)Activator.CreateInstance(downingProviderType, extendedSystem, cluster); } catch (Exception e) { diff --git a/src/core/Akka.Cluster/SBR/SplitBrainResolver.cs b/src/core/Akka.Cluster/SBR/SplitBrainResolver.cs index 12c203e2cee..16ab1a6325c 100644 --- a/src/core/Akka.Cluster/SBR/SplitBrainResolver.cs +++ b/src/core/Akka.Cluster/SBR/SplitBrainResolver.cs @@ -27,22 +27,22 @@ internal class SplitBrainResolver : SplitBrainResolverBase { private Cluster _cluster; - public SplitBrainResolver(TimeSpan stableAfter, DowningStrategy strategy) + public SplitBrainResolver(TimeSpan stableAfter, DowningStrategy strategy, Cluster cluster) : base(stableAfter, strategy) { + _cluster = cluster; } public override UniqueAddress SelfUniqueAddress => _cluster.SelfUniqueAddress; - public static Props Props2(TimeSpan stableAfter, DowningStrategy strategy) + public static Props Props2(TimeSpan stableAfter, DowningStrategy strategy, Cluster cluster) { - return Props.Create(() => new SplitBrainResolver(stableAfter, strategy)); + return Props.Create(() => new SplitBrainResolver(stableAfter, strategy, cluster)); } // re-subscribe when restart protected override void PreStart() { - _cluster = Cluster.Get(Context.System); _cluster.Subscribe(Self, InitialStateAsEvents, typeof(IClusterDomainEvent)); base.PreStart(); diff --git a/src/core/Akka.Cluster/SBR/SplitBrainResolverProvider.cs b/src/core/Akka.Cluster/SBR/SplitBrainResolverProvider.cs index b1a809229c7..ea671410109 100644 --- a/src/core/Akka.Cluster/SBR/SplitBrainResolverProvider.cs +++ b/src/core/Akka.Cluster/SBR/SplitBrainResolverProvider.cs @@ -21,11 +21,13 @@ public class SplitBrainResolverProvider : IDowningProvider { private readonly SplitBrainResolverSettings _settings; private readonly ActorSystem _system; + private readonly Cluster _cluster; - public SplitBrainResolverProvider(ActorSystem system) + public SplitBrainResolverProvider(ActorSystem system, Cluster cluster) { _system = system; _settings = new SplitBrainResolverSettings(system.Settings.Config); + _cluster = cluster; } public TimeSpan DownRemovalMargin @@ -77,7 +79,7 @@ public Props DowningActorProps throw new InvalidOperationException(); } - return SplitBrainResolver.Props2(_settings.DowningStableAfter, strategy); + return SplitBrainResolver.Props2(_settings.DowningStableAfter, strategy, _cluster); } } } diff --git a/src/core/Akka.Cluster/SplitBrainResolver.cs b/src/core/Akka.Cluster/SplitBrainResolver.cs index 0eb10106828..922e5f61fc3 100644 --- a/src/core/Akka.Cluster/SplitBrainResolver.cs +++ b/src/core/Akka.Cluster/SplitBrainResolver.cs @@ -18,8 +18,9 @@ namespace Akka.Cluster public sealed class SplitBrainResolver : IDowningProvider { private readonly ActorSystem _system; + private readonly Cluster _cluster; - public SplitBrainResolver(ActorSystem system) + public SplitBrainResolver(ActorSystem system, Cluster cluster) { _system = system; var config = system.Settings.Config.GetConfig("akka.cluster.split-brain-resolver"); @@ -28,11 +29,12 @@ public SplitBrainResolver(ActorSystem system) StableAfter = config.GetTimeSpan("stable-after", null); Strategy = ResolveSplitBrainStrategy(config); + _cluster = cluster; } public TimeSpan DownRemovalMargin => Cluster.Get(_system).Settings.DownRemovalMargin; public TimeSpan StableAfter { get; } - public Props DowningActorProps => SplitBrainDecider.Props(StableAfter, Strategy); + public Props DowningActorProps => SplitBrainDecider.Props(StableAfter, Strategy, _cluster); internal ISplitBrainStrategy Strategy { get; } @@ -176,7 +178,7 @@ public IEnumerable Apply(NetworkPartitionContext context) if (remaining.IsEmpty && unreachable.IsEmpty) // prevent exception due to both lists being empty { - return new Member[0]; + return Array.Empty(); } var oldest = remaining.Union(unreachable).ToImmutableSortedSet(Member.AgeOrdering).First(); @@ -241,8 +243,8 @@ private sealed class StabilityReached #endregion - public static Actor.Props Props(TimeSpan stableAfter, ISplitBrainStrategy strategy) => - Actor.Props.Create(() => new SplitBrainDecider(stableAfter, strategy)).WithDeploy(Deploy.Local); + public static Actor.Props Props(TimeSpan stableAfter, ISplitBrainStrategy strategy, Cluster cluster) => + Actor.Props.Create(() => new SplitBrainDecider(stableAfter, strategy, cluster)).WithDeploy(Deploy.Local); private readonly Cluster _cluster; private readonly TimeSpan _stabilityTimeout; @@ -253,13 +255,13 @@ private sealed class StabilityReached private ICancelable _stabilityTask; private ILoggingAdapter _log; - public SplitBrainDecider(TimeSpan stableAfter, ISplitBrainStrategy strategy) + public SplitBrainDecider(TimeSpan stableAfter, ISplitBrainStrategy strategy, Cluster cluster) { if (strategy == null) throw new ArgumentNullException(nameof(strategy)); _stabilityTimeout = stableAfter; _strategy = strategy; - _cluster = Cluster.Get(Context.System); + _cluster = cluster; } public ILoggingAdapter Log => _log ?? (_log = Context.GetLogger());