Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pass Akka.Cluster.Cluster into IDowningProvider directly #5965

Merged
merged 8 commits into from May 26, 2022
Expand Up @@ -19,7 +19,7 @@ namespace Akka.Cluster
{
public sealed class AutoDowning : Akka.Cluster.IDowningProvider
{
public AutoDowning(Akka.Actor.ActorSystem system) { }
public AutoDowning(Akka.Actor.ActorSystem system, Akka.Cluster.Cluster cluster) { }
public System.TimeSpan DownRemovalMargin { get; }
public Akka.Actor.Props DowningActorProps { get; }
}
Expand Down Expand Up @@ -266,13 +266,13 @@ namespace Akka.Cluster
}
public sealed class NoDowning : Akka.Cluster.IDowningProvider
{
public NoDowning(Akka.Actor.ActorSystem system) { }
public NoDowning(Akka.Actor.ActorSystem system, Akka.Cluster.Cluster cluster) { }
public System.TimeSpan DownRemovalMargin { get; }
public Akka.Actor.Props DowningActorProps { get; }
}
public sealed class SplitBrainResolver : Akka.Cluster.IDowningProvider
{
public SplitBrainResolver(Akka.Actor.ActorSystem system) { }
public SplitBrainResolver(Akka.Actor.ActorSystem system, Akka.Cluster.Cluster cluster) { }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Breaking API change, by design - forces the Cluster to be passed into the SBR directly rather than resolved via Cluster.Get, which is what triggers this issue.

public System.TimeSpan DownRemovalMargin { get; }
public Akka.Actor.Props DowningActorProps { get; }
public System.TimeSpan StableAfter { get; }
Expand Down Expand Up @@ -417,7 +417,7 @@ namespace Akka.Cluster.SBR
}
public class SplitBrainResolverProvider : Akka.Cluster.IDowningProvider
{
public SplitBrainResolverProvider(Akka.Actor.ActorSystem system) { }
public SplitBrainResolverProvider(Akka.Actor.ActorSystem system, Akka.Cluster.Cluster cluster) { }
public System.TimeSpan DownRemovalMargin { get; }
public Akka.Actor.Props DowningActorProps { get; }
}
Expand Down
88 changes: 88 additions & 0 deletions src/core/Akka.Cluster.Tests/Bugfix5962Spec.cs
@@ -0,0 +1,88 @@
// //-----------------------------------------------------------------------
// // <copyright file="Bugfix5962Spec.cs" company="Akka.NET Project">
// // Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// // Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// // </copyright>
// //-----------------------------------------------------------------------

using System;
using System.Threading.Tasks;
using Akka.Configuration;
using Akka.TestKit;
using FluentAssertions;
using FluentAssertions.Extensions;
using Xunit;
using Xunit.Abstractions;
using static FluentAssertions.FluentActions;


namespace Akka.Cluster.Tests
{
public class Bugfix5962Spec : TestKit.Xunit2.TestKit
{
private static readonly Config Config = ConfigurationFactory.ParseString(@"
akka {
loglevel = INFO
actor {
provider = cluster
default-dispatcher = {
executor = channel-executor
channel-executor.priority = normal
}
# Adding this part in combination with the SplitBrainResolverProvider causes the error
internal-dispatcher = {
executor = channel-executor
channel-executor.priority = high
}
}
remote {
dot-netty.tcp {
port = 15508
hostname = ""127.0.0.1""
}
default-remote-dispatcher {
executor = channel-executor
channel-executor.priority = high
}
backoff-remote-dispatcher {
executor = channel-executor
channel-executor.priority = low
}
}
cluster {
seed-nodes = [""akka.tcp://Bugfix5962Spec@127.0.0.1:15508""]
downing-provider-class = ""Akka.Cluster.SBR.SplitBrainResolverProvider, Akka.Cluster""
}
}");

private readonly Type _timerMsgType;

public Bugfix5962Spec(ITestOutputHelper output): base(Config, nameof(Bugfix5962Spec), output)
{
_timerMsgType = Type.GetType("Akka.Actor.Scheduler.TimerScheduler+TimerMsg, Akka");
}

[Fact]
public async Task SBR_Should_work_with_channel_executor()
{
var latch = new TestLatch(1);
var cluster = Cluster.Get(Sys);
cluster.RegisterOnMemberUp(() =>
{
latch.CountDown();
});

var selection = Sys.ActorSelection("akka://Bugfix5962Spec/system/cluster/core/daemon/downingProvider");

await Awaiting(() => selection.ResolveOne(1.Seconds()))
.Should().NotThrowAsync("Downing provider should be alive. ActorSelection will throw an ActorNotFoundException if this fails");

// There should be no TimerMsg being sent to dead letter, this signals that the downing provider is dead
await EventFilter.DeadLetter(_timerMsgType).ExpectAsync(0, async () =>
{
latch.Ready(1.Seconds());
await Task.Delay(2.Seconds());
});
}
}
}
6 changes: 3 additions & 3 deletions src/core/Akka.Cluster.Tests/DowningProviderSpec.cs
Expand Up @@ -16,9 +16,9 @@

namespace Akka.Cluster.Tests
{
class FailingDowningProvider : IDowningProvider
internal class FailingDowningProvider : IDowningProvider
{
public FailingDowningProvider(ActorSystem system)
public FailingDowningProvider(ActorSystem system, Cluster cluster)
{
}

Expand All @@ -36,7 +36,7 @@ public Props DowningActorProps
class DummyDowningProvider : IDowningProvider
{
public readonly AtomicBoolean ActorPropsAccessed = new AtomicBoolean(false);
public DummyDowningProvider(ActorSystem system)
public DummyDowningProvider(ActorSystem system, Cluster cluster)
{
}

Expand Down
14 changes: 11 additions & 3 deletions src/core/Akka.Cluster.Tests/StartupWithOneThreadSpec.cs
Expand Up @@ -7,13 +7,15 @@

using System;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Actor.Dsl;
using Akka.Configuration;
using Akka.Event;
using Akka.TestKit;
using Akka.Util;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Cluster.Tests
{
Expand All @@ -25,12 +27,13 @@ public class StartupWithOneThreadSpec : AkkaSpec
akka.actor.default-dispatcher.dedicated-thread-pool.thread-count = 1
akka.actor.provider = ""Akka.Cluster.ClusterActorRefProvider, Akka.Cluster""
akka.remote.dot-netty.tcp.port = 0
akka.cluster.downing-provider-class = ""Akka.Cluster.SBR.SplitBrainResolverProvider, Akka.Cluster""
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added SBR to the single thread startup spec to see if the deadletter'd TimerMsgs would show up here.

akka.cluster.split-brain-resolver.active-strategy = keep-majority
");

private long _startTime;

public StartupWithOneThreadSpec() : base(Configuration)
{
public StartupWithOneThreadSpec(ITestOutputHelper output) : base(Configuration, output) {
_startTime = MonotonicClock.GetTicks();
}

Expand All @@ -53,7 +56,7 @@ private Props TestProps
}

[Fact]
public void A_cluster_must_startup_with_one_dispatcher_thread()
public async Task A_cluster_must_startup_with_one_dispatcher_thread()
{
// This test failed before fixing https://github.com/akkadotnet/akka.net/issues/1959 when adding a sleep before the
// Await of GetClusterCoreRef in the Cluster extension constructor.
Expand All @@ -75,6 +78,11 @@ public void A_cluster_must_startup_with_one_dispatcher_thread()
ExpectMsg("hello");
ExpectMsg("hello");
ExpectMsg("hello");

// perform a self-join
var cts = new CancellationTokenSource(TimeSpan.FromSeconds((3)));
var selfAddress = cluster.SelfAddress;
await cluster.JoinSeedNodesAsync(new[] { selfAddress }, cts.Token);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Force actual cluster formation to complete successfully as part of spec.

}
}
}
31 changes: 13 additions & 18 deletions src/core/Akka.Cluster/AutoDown.cs
Expand Up @@ -30,10 +30,11 @@ internal sealed class AutoDown : AutoDownBase
/// TBD
/// </summary>
/// <param name="autoDownUnreachableAfter">TBD</param>
/// <param name="cluster"></param>
/// <returns>TBD</returns>
public static Props Props(TimeSpan autoDownUnreachableAfter)
public static Props Props(TimeSpan autoDownUnreachableAfter, Cluster cluster)
{
return Actor.Props.Create<AutoDown>(autoDownUnreachableAfter);
return Actor.Props.Create(() => new AutoDown(autoDownUnreachableAfter, cluster));
}

/// <summary>
Expand Down Expand Up @@ -76,14 +77,10 @@ public override int GetHashCode()
}

private readonly Cluster _cluster;

/// <summary>
/// TBD
/// </summary>
/// <param name="autoDownUnreachableAfter">TBD</param>
public AutoDown(TimeSpan autoDownUnreachableAfter) : base(autoDownUnreachableAfter)

public AutoDown(TimeSpan autoDownUnreachableAfter, Cluster cluster) : base(autoDownUnreachableAfter)
{
_cluster = Cluster.Get(Context.System);
_cluster = cluster;
}

/// <summary>
Expand Down Expand Up @@ -276,20 +273,18 @@ private void Remove(UniqueAddress node)
public sealed class AutoDowning : IDowningProvider
{
private readonly ActorSystem _system;

/// <summary>
/// TBD
/// </summary>
/// <param name="system">TBD</param>
public AutoDowning(ActorSystem system)
private readonly Cluster _cluster;

public AutoDowning(ActorSystem system, Cluster cluster)
{
_system = system;
_cluster = cluster;
}

/// <summary>
/// TBD
/// </summary>
public TimeSpan DownRemovalMargin => Cluster.Get(_system).Settings.DownRemovalMargin;
public TimeSpan DownRemovalMargin => _cluster.Settings.DownRemovalMargin;

/// <summary>
/// TBD
Expand All @@ -301,11 +296,11 @@ public Props DowningActorProps
{
get
{
var autoDownUnreachableAfter = Cluster.Get(_system).Settings.AutoDownUnreachableAfter;
var autoDownUnreachableAfter = _cluster.Settings.AutoDownUnreachableAfter;
if (!autoDownUnreachableAfter.HasValue)
throw new ConfigurationException("AutoDowning downing provider selected but 'akka.cluster.auto-down-unreachable-after' not set");

return AutoDown.Props(autoDownUnreachableAfter.Value);
return AutoDown.Props(autoDownUnreachableAfter.Value, _cluster);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Cluster/Cluster.cs
Expand Up @@ -132,7 +132,7 @@ public Cluster(ActorSystemImpl system)
Scheduler = CreateScheduler(system);

// it has to be lazy - otherwise if downing provider will init a cluster itself, it will deadlock
_downingProvider = new Lazy<IDowningProvider>(() => Akka.Cluster.DowningProvider.Load(Settings.DowningProviderType, system), LazyThreadSafetyMode.ExecutionAndPublication);
_downingProvider = new Lazy<IDowningProvider>(() => Akka.Cluster.DowningProvider.Load(Settings.DowningProviderType, system, this), LazyThreadSafetyMode.ExecutionAndPublication);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where Cluster is passed to the IDowningProvider


//create supervisor for daemons under path "/system/cluster"
_clusterDaemons = system.SystemActorOf(Props.Create(() => new ClusterDaemon(Settings)).WithDeploy(Deploy.Local), "cluster");
Expand Down
6 changes: 4 additions & 2 deletions src/core/Akka.Cluster/Configuration/Cluster.conf
Expand Up @@ -59,8 +59,10 @@ akka {
# * if it is 'off' the `NoDowning` provider is used and no automatic downing will be performed
# * if it is set to a duration the `AutoDowning` provider is with the configured downing duration
#
# If specified the value must be the fully qualified class name of a subclass of
# `akka.cluster.DowningProvider` having a public one argument constructor accepting an `ActorSystem`
# If specified the value must be the fully qualified class name of an implementation of
# `Akka.Cluster.IDowningProvider` having two argument constructor:
# - argument 1: accepting an `ActorSystem`
# - argument 2: accepting an `Akka.Cluster.Cluster`
downing-provider-class = ""

# If this is set to "off", the leader will not move 'Joining' members to 'Up' during a network
Expand Down
25 changes: 14 additions & 11 deletions src/core/Akka.Cluster/DowningProvider.cs
Expand Up @@ -45,20 +45,18 @@ public interface IDowningProvider
public sealed class NoDowning : IDowningProvider
{
private readonly ActorSystem _system;

/// <summary>
/// TBD
/// </summary>
/// <param name="system">TBD</param>
public NoDowning(ActorSystem system)
private readonly Cluster _cluster;

public NoDowning(ActorSystem system, Cluster cluster)
{
_system = system;
_cluster = cluster;
}

/// <summary>
/// TBD
/// </summary>
public TimeSpan DownRemovalMargin => Cluster.Get(_system).Settings.DownRemovalMargin;
public TimeSpan DownRemovalMargin => _cluster.Settings.DownRemovalMargin;

/// <summary>
/// TBD
Expand All @@ -72,20 +70,25 @@ public NoDowning(ActorSystem system)
internal static class DowningProvider
{
/// <summary>
/// TBD
/// Loads the <see cref="IDowningProvider"/> from configuration and instantiates it via reflection.
/// </summary>
/// <param name="downingProviderType">TBD</param>
/// <param name="system">TBD</param>
/// <param name="cluster">The current cluster object.</param>
/// <exception cref="ConfigurationException">
/// This exception is thrown when the specified <paramref name="downingProviderType"/> does not implement <see cref="IDowningProvider"/>.
/// </exception>
/// <returns>TBD</returns>
public static IDowningProvider Load(Type downingProviderType, ActorSystem system)
/// <returns>The activated <see cref="IDowningProvider"/></returns>
/// <remarks>
/// Required to pass in <see cref="Akka.Cluster.Cluster"/> manually here since https://github.com/akkadotnet/akka.net/issues/5962
/// can cause the SBR startup to fail when running with the `channel-executor`.
/// </remarks>
public static IDowningProvider Load(Type downingProviderType, ActorSystem system, Cluster cluster)
{
var extendedSystem = system as ExtendedActorSystem;
try
{
return (IDowningProvider)Activator.CreateInstance(downingProviderType, extendedSystem);
return (IDowningProvider)Activator.CreateInstance(downingProviderType, extendedSystem, cluster);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pass this into the new constructor via reflection - this line will throw and fail the Akka.Cluster startup sequence for any IDowningProvider implementations that don't have a constructor with both an ActorSystem parameter and a second Akka.Cluster.Cluster parameter.

}
catch (Exception e)
{
Expand Down
8 changes: 4 additions & 4 deletions src/core/Akka.Cluster/SBR/SplitBrainResolver.cs
Expand Up @@ -27,22 +27,22 @@ internal class SplitBrainResolver : SplitBrainResolverBase
{
private Cluster _cluster;

public SplitBrainResolver(TimeSpan stableAfter, DowningStrategy strategy)
public SplitBrainResolver(TimeSpan stableAfter, DowningStrategy strategy, Cluster cluster)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modern SBR updates.

: base(stableAfter, strategy)
{
_cluster = cluster;
}

public override UniqueAddress SelfUniqueAddress => _cluster.SelfUniqueAddress;

public static Props Props2(TimeSpan stableAfter, DowningStrategy strategy)
public static Props Props2(TimeSpan stableAfter, DowningStrategy strategy, Cluster cluster)
{
return Props.Create(() => new SplitBrainResolver(stableAfter, strategy));
return Props.Create(() => new SplitBrainResolver(stableAfter, strategy, cluster));
}

// re-subscribe when restart
protected override void PreStart()
{
_cluster = Cluster.Get(Context.System);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bugfix 2.

_cluster.Subscribe(Self, InitialStateAsEvents, typeof(IClusterDomainEvent));

base.PreStart();
Expand Down
6 changes: 4 additions & 2 deletions src/core/Akka.Cluster/SBR/SplitBrainResolverProvider.cs
Expand Up @@ -21,11 +21,13 @@ public class SplitBrainResolverProvider : IDowningProvider
{
private readonly SplitBrainResolverSettings _settings;
private readonly ActorSystem _system;
private readonly Cluster _cluster;

public SplitBrainResolverProvider(ActorSystem system)
public SplitBrainResolverProvider(ActorSystem system, Cluster cluster)
{
_system = system;
_settings = new SplitBrainResolverSettings(system.Settings.Config);
_cluster = cluster;
}

public TimeSpan DownRemovalMargin
Expand Down Expand Up @@ -77,7 +79,7 @@ public Props DowningActorProps
throw new InvalidOperationException();
}

return SplitBrainResolver.Props2(_settings.DowningStableAfter, strategy);
return SplitBrainResolver.Props2(_settings.DowningStableAfter, strategy, _cluster);
}
}
}
Expand Down