Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pass Akka.Cluster.Cluster into IDowningProvider directly #5965

Merged
merged 8 commits into from May 26, 2022
Expand Up @@ -272,7 +272,7 @@ namespace Akka.Cluster
}
public sealed class SplitBrainResolver : Akka.Cluster.IDowningProvider
{
public SplitBrainResolver(Akka.Actor.ActorSystem system) { }
public SplitBrainResolver(Akka.Actor.ActorSystem system, Akka.Cluster.Cluster cluster) { }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Breaking API change, by design - forces the Cluster to be passed into the SBR directly rather than resolved via Cluster.Get, which is what triggers this issue.

public System.TimeSpan DownRemovalMargin { get; }
public Akka.Actor.Props DowningActorProps { get; }
public System.TimeSpan StableAfter { get; }
Expand Down Expand Up @@ -417,7 +417,7 @@ namespace Akka.Cluster.SBR
}
public class SplitBrainResolverProvider : Akka.Cluster.IDowningProvider
{
public SplitBrainResolverProvider(Akka.Actor.ActorSystem system) { }
public SplitBrainResolverProvider(Akka.Actor.ActorSystem system, Akka.Cluster.Cluster cluster) { }
public System.TimeSpan DownRemovalMargin { get; }
public Akka.Actor.Props DowningActorProps { get; }
}
Expand Down
14 changes: 11 additions & 3 deletions src/core/Akka.Cluster.Tests/StartupWithOneThreadSpec.cs
Expand Up @@ -7,13 +7,15 @@

using System;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Actor.Dsl;
using Akka.Configuration;
using Akka.Event;
using Akka.TestKit;
using Akka.Util;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Cluster.Tests
{
Expand All @@ -25,12 +27,13 @@ public class StartupWithOneThreadSpec : AkkaSpec
akka.actor.default-dispatcher.dedicated-thread-pool.thread-count = 1
akka.actor.provider = ""Akka.Cluster.ClusterActorRefProvider, Akka.Cluster""
akka.remote.dot-netty.tcp.port = 0
akka.cluster.downing-provider-class = ""Akka.Cluster.SBR.SplitBrainResolverProvider, Akka.Cluster""
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added SBR to the single thread startup spec to see if the deadletter'd TimerMsgs would show up here.

akka.cluster.split-brain-resolver.active-strategy = keep-majority
");

private long _startTime;

public StartupWithOneThreadSpec() : base(Configuration)
{
public StartupWithOneThreadSpec(ITestOutputHelper output) : base(Configuration, output) {
_startTime = MonotonicClock.GetTicks();
}

Expand All @@ -53,7 +56,7 @@ private Props TestProps
}

[Fact]
public void A_cluster_must_startup_with_one_dispatcher_thread()
public async Task A_cluster_must_startup_with_one_dispatcher_thread()
{
// This test failed before fixing https://github.com/akkadotnet/akka.net/issues/1959 when adding a sleep before the
// Await of GetClusterCoreRef in the Cluster extension constructor.
Expand All @@ -75,6 +78,11 @@ public void A_cluster_must_startup_with_one_dispatcher_thread()
ExpectMsg("hello");
ExpectMsg("hello");
ExpectMsg("hello");

// perform a self-join
var cts = new CancellationTokenSource(TimeSpan.FromSeconds((3)));
var selfAddress = cluster.SelfAddress;
await cluster.JoinSeedNodesAsync(new[] { selfAddress }, cts.Token);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Force actual cluster formation to complete successfully as part of spec.

}
}
}
2 changes: 1 addition & 1 deletion src/core/Akka.Cluster/Cluster.cs
Expand Up @@ -132,7 +132,7 @@ public Cluster(ActorSystemImpl system)
Scheduler = CreateScheduler(system);

// it has to be lazy - otherwise if downing provider will init a cluster itself, it will deadlock
_downingProvider = new Lazy<IDowningProvider>(() => Akka.Cluster.DowningProvider.Load(Settings.DowningProviderType, system), LazyThreadSafetyMode.ExecutionAndPublication);
_downingProvider = new Lazy<IDowningProvider>(() => Akka.Cluster.DowningProvider.Load(Settings.DowningProviderType, system, this), LazyThreadSafetyMode.ExecutionAndPublication);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where Cluster is passed to the IDowningProvider


//create supervisor for daemons under path "/system/cluster"
_clusterDaemons = system.SystemActorOf(Props.Create(() => new ClusterDaemon(Settings)).WithDeploy(Deploy.Local), "cluster");
Expand Down
4 changes: 3 additions & 1 deletion src/core/Akka.Cluster/Configuration/Cluster.conf
Expand Up @@ -60,7 +60,9 @@ akka {
# * if it is set to a duration the `AutoDowning` provider is with the configured downing duration
#
# If specified the value must be the fully qualified class name of a subclass of
# `akka.cluster.DowningProvider` having a public one argument constructor accepting an `ActorSystem`
# `akka.cluster.DowningProvider` having two argument constructor:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documented API changes, in case a user has a custom IDowningProvider implementation. I have never seen one in the wild, which is why I think this breaking API change is probably safe for a v1.4.39 release.

# - argument 1: accepting an `ActorSystem`
# - argument 2: accepting an `Akka.Cluster.Cluster`
downing-provider-class = ""

# If this is set to "off", the leader will not move 'Joining' members to 'Up' during a network
Expand Down
13 changes: 9 additions & 4 deletions src/core/Akka.Cluster/DowningProvider.cs
Expand Up @@ -72,20 +72,25 @@ public NoDowning(ActorSystem system)
internal static class DowningProvider
{
/// <summary>
/// TBD
/// Loads the <see cref="IDowningProvider"/> from configuration and instantiates it via reflection.
/// </summary>
/// <param name="downingProviderType">TBD</param>
/// <param name="system">TBD</param>
/// <param name="cluster">The current cluster object.</param>
/// <exception cref="ConfigurationException">
/// This exception is thrown when the specified <paramref name="downingProviderType"/> does not implement <see cref="IDowningProvider"/>.
/// </exception>
/// <returns>TBD</returns>
public static IDowningProvider Load(Type downingProviderType, ActorSystem system)
/// <returns>The activated <see cref="IDowningProvider"/></returns>
/// <remarks>
/// Required to pass in <see cref="Akka.Cluster.Cluster"/> manually here since https://github.com/akkadotnet/akka.net/issues/5962
/// can cause the SBR startup to fail when running with the `channel-executor`.
/// </remarks>
public static IDowningProvider Load(Type downingProviderType, ActorSystem system, Cluster cluster)
{
var extendedSystem = system as ExtendedActorSystem;
try
{
return (IDowningProvider)Activator.CreateInstance(downingProviderType, extendedSystem);
return (IDowningProvider)Activator.CreateInstance(downingProviderType, extendedSystem, cluster);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pass this into the new constructor via reflection - this line will throw and fail the Akka.Cluster startup sequence for any IDowningProvider implementations that don't have a constructor with both an ActorSystem parameter and a second Akka.Cluster.Cluster parameter.

}
catch (Exception e)
{
Expand Down
8 changes: 4 additions & 4 deletions src/core/Akka.Cluster/SBR/SplitBrainResolver.cs
Expand Up @@ -27,22 +27,22 @@ internal class SplitBrainResolver : SplitBrainResolverBase
{
private Cluster _cluster;

public SplitBrainResolver(TimeSpan stableAfter, DowningStrategy strategy)
public SplitBrainResolver(TimeSpan stableAfter, DowningStrategy strategy, Cluster cluster)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modern SBR updates.

: base(stableAfter, strategy)
{
_cluster = cluster;
}

public override UniqueAddress SelfUniqueAddress => _cluster.SelfUniqueAddress;

public static Props Props2(TimeSpan stableAfter, DowningStrategy strategy)
public static Props Props2(TimeSpan stableAfter, DowningStrategy strategy, Cluster cluster)
{
return Props.Create(() => new SplitBrainResolver(stableAfter, strategy));
return Props.Create(() => new SplitBrainResolver(stableAfter, strategy, cluster));
}

// re-subscribe when restart
protected override void PreStart()
{
_cluster = Cluster.Get(Context.System);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bugfix 2.

_cluster.Subscribe(Self, InitialStateAsEvents, typeof(IClusterDomainEvent));

base.PreStart();
Expand Down
6 changes: 4 additions & 2 deletions src/core/Akka.Cluster/SBR/SplitBrainResolverProvider.cs
Expand Up @@ -21,11 +21,13 @@ public class SplitBrainResolverProvider : IDowningProvider
{
private readonly SplitBrainResolverSettings _settings;
private readonly ActorSystem _system;
private readonly Cluster _cluster;

public SplitBrainResolverProvider(ActorSystem system)
public SplitBrainResolverProvider(ActorSystem system, Cluster cluster)
{
_system = system;
_settings = new SplitBrainResolverSettings(system.Settings.Config);
_cluster = cluster;
}

public TimeSpan DownRemovalMargin
Expand Down Expand Up @@ -77,7 +79,7 @@ public Props DowningActorProps
throw new InvalidOperationException();
}

return SplitBrainResolver.Props2(_settings.DowningStableAfter, strategy);
return SplitBrainResolver.Props2(_settings.DowningStableAfter, strategy, _cluster);
}
}
}
Expand Down
16 changes: 9 additions & 7 deletions src/core/Akka.Cluster/SplitBrainResolver.cs
Expand Up @@ -18,8 +18,9 @@ namespace Akka.Cluster
public sealed class SplitBrainResolver : IDowningProvider
{
private readonly ActorSystem _system;
private readonly Cluster _cluster;

public SplitBrainResolver(ActorSystem system)
public SplitBrainResolver(ActorSystem system, Cluster cluster)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Legacy SBR updates.

{
_system = system;
var config = system.Settings.Config.GetConfig("akka.cluster.split-brain-resolver");
Expand All @@ -28,11 +29,12 @@ public SplitBrainResolver(ActorSystem system)

StableAfter = config.GetTimeSpan("stable-after", null);
Strategy = ResolveSplitBrainStrategy(config);
_cluster = cluster;
}

public TimeSpan DownRemovalMargin => Cluster.Get(_system).Settings.DownRemovalMargin;
public TimeSpan StableAfter { get; }
public Props DowningActorProps => SplitBrainDecider.Props(StableAfter, Strategy);
public Props DowningActorProps => SplitBrainDecider.Props(StableAfter, Strategy, _cluster);

internal ISplitBrainStrategy Strategy { get; }

Expand Down Expand Up @@ -176,7 +178,7 @@ public IEnumerable<Member> Apply(NetworkPartitionContext context)

if (remaining.IsEmpty && unreachable.IsEmpty) // prevent exception due to both lists being empty
{
return new Member[0];
return Array.Empty<Member>();
}

var oldest = remaining.Union(unreachable).ToImmutableSortedSet(Member.AgeOrdering).First();
Expand Down Expand Up @@ -241,8 +243,8 @@ private sealed class StabilityReached

#endregion

public static Actor.Props Props(TimeSpan stableAfter, ISplitBrainStrategy strategy) =>
Actor.Props.Create(() => new SplitBrainDecider(stableAfter, strategy)).WithDeploy(Deploy.Local);
public static Actor.Props Props(TimeSpan stableAfter, ISplitBrainStrategy strategy, Cluster cluster) =>
Actor.Props.Create(() => new SplitBrainDecider(stableAfter, strategy, cluster)).WithDeploy(Deploy.Local);

private readonly Cluster _cluster;
private readonly TimeSpan _stabilityTimeout;
Expand All @@ -253,13 +255,13 @@ private sealed class StabilityReached
private ICancelable _stabilityTask;
private ILoggingAdapter _log;

public SplitBrainDecider(TimeSpan stableAfter, ISplitBrainStrategy strategy)
public SplitBrainDecider(TimeSpan stableAfter, ISplitBrainStrategy strategy, Cluster cluster)
{
if (strategy == null) throw new ArgumentNullException(nameof(strategy));

_stabilityTimeout = stableAfter;
_strategy = strategy;
_cluster = Cluster.Get(Context.System);
_cluster = cluster;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bugfix 1.

}

public ILoggingAdapter Log => _log ?? (_log = Context.GetLogger());
Expand Down