Skip to content

Commit

Permalink
pass Akka.Cluster.Cluster into IDowningProvider directly
Browse files Browse the repository at this point in the history
  • Loading branch information
Aaronontheweb committed May 26, 2022
1 parent 67f6737 commit ddfe27f
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 21 deletions.
Expand Up @@ -272,7 +272,7 @@ namespace Akka.Cluster
}
public sealed class SplitBrainResolver : Akka.Cluster.IDowningProvider
{
public SplitBrainResolver(Akka.Actor.ActorSystem system) { }
public SplitBrainResolver(Akka.Actor.ActorSystem system, Akka.Cluster.Cluster cluster) { }
public System.TimeSpan DownRemovalMargin { get; }
public Akka.Actor.Props DowningActorProps { get; }
public System.TimeSpan StableAfter { get; }
Expand Down Expand Up @@ -417,7 +417,7 @@ namespace Akka.Cluster.SBR
}
public class SplitBrainResolverProvider : Akka.Cluster.IDowningProvider
{
public SplitBrainResolverProvider(Akka.Actor.ActorSystem system) { }
public SplitBrainResolverProvider(Akka.Actor.ActorSystem system, Akka.Cluster.Cluster cluster) { }
public System.TimeSpan DownRemovalMargin { get; }
public Akka.Actor.Props DowningActorProps { get; }
}
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Cluster/Cluster.cs
Expand Up @@ -132,7 +132,7 @@ public Cluster(ActorSystemImpl system)
Scheduler = CreateScheduler(system);

// it has to be lazy - otherwise if downing provider will init a cluster itself, it will deadlock
_downingProvider = new Lazy<IDowningProvider>(() => Akka.Cluster.DowningProvider.Load(Settings.DowningProviderType, system), LazyThreadSafetyMode.ExecutionAndPublication);
_downingProvider = new Lazy<IDowningProvider>(() => Akka.Cluster.DowningProvider.Load(Settings.DowningProviderType, system, this), LazyThreadSafetyMode.ExecutionAndPublication);

//create supervisor for daemons under path "/system/cluster"
_clusterDaemons = system.SystemActorOf(Props.Create(() => new ClusterDaemon(Settings)).WithDeploy(Deploy.Local), "cluster");
Expand Down
4 changes: 3 additions & 1 deletion src/core/Akka.Cluster/Configuration/Cluster.conf
Expand Up @@ -60,7 +60,9 @@ akka {
# * if it is set to a duration the `AutoDowning` provider is with the configured downing duration
#
# If specified the value must be the fully qualified class name of a subclass of
# `akka.cluster.DowningProvider` having a public one argument constructor accepting an `ActorSystem`
# `akka.cluster.DowningProvider` having two argument constructor:
# - argument 1: accepting an `ActorSystem`
# - argument 2: accepting an `Akka.Cluster.Cluster`
downing-provider-class = ""

# If this is set to "off", the leader will not move 'Joining' members to 'Up' during a network
Expand Down
13 changes: 9 additions & 4 deletions src/core/Akka.Cluster/DowningProvider.cs
Expand Up @@ -72,20 +72,25 @@ public NoDowning(ActorSystem system)
internal static class DowningProvider
{
/// <summary>
/// TBD
/// Loads the <see cref="IDowningProvider"/> from configuration and instantiates it via reflection.
/// </summary>
/// <param name="downingProviderType">TBD</param>
/// <param name="system">TBD</param>
/// <param name="cluster">The current cluster object.</param>
/// <exception cref="ConfigurationException">
/// This exception is thrown when the specified <paramref name="downingProviderType"/> does not implement <see cref="IDowningProvider"/>.
/// </exception>
/// <returns>TBD</returns>
public static IDowningProvider Load(Type downingProviderType, ActorSystem system)
/// <returns>The activated <see cref="IDowningProvider"/></returns>
/// <remarks>
/// Required to pass in <see cref="Akka.Cluster.Cluster"/> manually here since https://github.com/akkadotnet/akka.net/issues/5962
/// can cause the SBR startup to fail when running with the `channel-executor`.
/// </remarks>
public static IDowningProvider Load(Type downingProviderType, ActorSystem system, Cluster cluster)
{
var extendedSystem = system as ExtendedActorSystem;
try
{
return (IDowningProvider)Activator.CreateInstance(downingProviderType, extendedSystem);
return (IDowningProvider)Activator.CreateInstance(downingProviderType, extendedSystem, cluster);
}
catch (Exception e)
{
Expand Down
8 changes: 4 additions & 4 deletions src/core/Akka.Cluster/SBR/SplitBrainResolver.cs
Expand Up @@ -27,22 +27,22 @@ internal class SplitBrainResolver : SplitBrainResolverBase
{
private Cluster _cluster;

public SplitBrainResolver(TimeSpan stableAfter, DowningStrategy strategy)
public SplitBrainResolver(TimeSpan stableAfter, DowningStrategy strategy, Cluster cluster)
: base(stableAfter, strategy)
{
_cluster = cluster;
}

public override UniqueAddress SelfUniqueAddress => _cluster.SelfUniqueAddress;

public static Props Props2(TimeSpan stableAfter, DowningStrategy strategy)
public static Props Props2(TimeSpan stableAfter, DowningStrategy strategy, Cluster cluster)
{
return Props.Create(() => new SplitBrainResolver(stableAfter, strategy));
return Props.Create(() => new SplitBrainResolver(stableAfter, strategy, cluster));
}

// re-subscribe when restart
protected override void PreStart()
{
_cluster = Cluster.Get(Context.System);
_cluster.Subscribe(Self, InitialStateAsEvents, typeof(IClusterDomainEvent));

base.PreStart();
Expand Down
6 changes: 4 additions & 2 deletions src/core/Akka.Cluster/SBR/SplitBrainResolverProvider.cs
Expand Up @@ -21,11 +21,13 @@ public class SplitBrainResolverProvider : IDowningProvider
{
private readonly SplitBrainResolverSettings _settings;
private readonly ActorSystem _system;
private readonly Cluster _cluster;

public SplitBrainResolverProvider(ActorSystem system)
public SplitBrainResolverProvider(ActorSystem system, Cluster cluster)
{
_system = system;
_settings = new SplitBrainResolverSettings(system.Settings.Config);
_cluster = cluster;
}

public TimeSpan DownRemovalMargin
Expand Down Expand Up @@ -77,7 +79,7 @@ public Props DowningActorProps
throw new InvalidOperationException();
}

return SplitBrainResolver.Props2(_settings.DowningStableAfter, strategy);
return SplitBrainResolver.Props2(_settings.DowningStableAfter, strategy, _cluster);
}
}
}
Expand Down
16 changes: 9 additions & 7 deletions src/core/Akka.Cluster/SplitBrainResolver.cs
Expand Up @@ -18,8 +18,9 @@ namespace Akka.Cluster
public sealed class SplitBrainResolver : IDowningProvider
{
private readonly ActorSystem _system;
private readonly Cluster _cluster;

public SplitBrainResolver(ActorSystem system)
public SplitBrainResolver(ActorSystem system, Cluster cluster)
{
_system = system;
var config = system.Settings.Config.GetConfig("akka.cluster.split-brain-resolver");
Expand All @@ -28,11 +29,12 @@ public SplitBrainResolver(ActorSystem system)

StableAfter = config.GetTimeSpan("stable-after", null);
Strategy = ResolveSplitBrainStrategy(config);
_cluster = cluster;
}

public TimeSpan DownRemovalMargin => Cluster.Get(_system).Settings.DownRemovalMargin;
public TimeSpan StableAfter { get; }
public Props DowningActorProps => SplitBrainDecider.Props(StableAfter, Strategy);
public Props DowningActorProps => SplitBrainDecider.Props(StableAfter, Strategy, _cluster);

internal ISplitBrainStrategy Strategy { get; }

Expand Down Expand Up @@ -176,7 +178,7 @@ public IEnumerable<Member> Apply(NetworkPartitionContext context)

if (remaining.IsEmpty && unreachable.IsEmpty) // prevent exception due to both lists being empty
{
return new Member[0];
return Array.Empty<Member>();
}

var oldest = remaining.Union(unreachable).ToImmutableSortedSet(Member.AgeOrdering).First();
Expand Down Expand Up @@ -241,8 +243,8 @@ private sealed class StabilityReached

#endregion

public static Actor.Props Props(TimeSpan stableAfter, ISplitBrainStrategy strategy) =>
Actor.Props.Create(() => new SplitBrainDecider(stableAfter, strategy)).WithDeploy(Deploy.Local);
public static Actor.Props Props(TimeSpan stableAfter, ISplitBrainStrategy strategy, Cluster cluster) =>
Actor.Props.Create(() => new SplitBrainDecider(stableAfter, strategy, cluster)).WithDeploy(Deploy.Local);

private readonly Cluster _cluster;
private readonly TimeSpan _stabilityTimeout;
Expand All @@ -253,13 +255,13 @@ private sealed class StabilityReached
private ICancelable _stabilityTask;
private ILoggingAdapter _log;

public SplitBrainDecider(TimeSpan stableAfter, ISplitBrainStrategy strategy)
public SplitBrainDecider(TimeSpan stableAfter, ISplitBrainStrategy strategy, Cluster cluster)
{
if (strategy == null) throw new ArgumentNullException(nameof(strategy));

_stabilityTimeout = stableAfter;
_strategy = strategy;
_cluster = Cluster.Get(Context.System);
_cluster = cluster;
}

public ILoggingAdapter Log => _log ?? (_log = Context.GetLogger());
Expand Down

0 comments on commit ddfe27f

Please sign in to comment.