Skip to content

Commit

Permalink
Backport of akkadotnet#5965 - pass Akka.Cluster.Cluster into IDowning…
Browse files Browse the repository at this point in the history
…Provider directly
  • Loading branch information
Arkatufus committed May 26, 2022
1 parent a4a6fdd commit 7149cb3
Show file tree
Hide file tree
Showing 13 changed files with 163 additions and 63 deletions.
Expand Up @@ -19,7 +19,7 @@ namespace Akka.Cluster
{
public sealed class AutoDowning : Akka.Cluster.IDowningProvider
{
public AutoDowning(Akka.Actor.ActorSystem system) { }
public AutoDowning(Akka.Actor.ActorSystem system, Akka.Cluster.Cluster cluster) { }
public System.TimeSpan DownRemovalMargin { get; }
public Akka.Actor.Props DowningActorProps { get; }
}
Expand Down Expand Up @@ -266,13 +266,13 @@ namespace Akka.Cluster
}
public sealed class NoDowning : Akka.Cluster.IDowningProvider
{
public NoDowning(Akka.Actor.ActorSystem system) { }
public NoDowning(Akka.Actor.ActorSystem system, Akka.Cluster.Cluster cluster) { }
public System.TimeSpan DownRemovalMargin { get; }
public Akka.Actor.Props DowningActorProps { get; }
}
public sealed class SplitBrainResolver : Akka.Cluster.IDowningProvider
{
public SplitBrainResolver(Akka.Actor.ActorSystem system) { }
public SplitBrainResolver(Akka.Actor.ActorSystem system, Akka.Cluster.Cluster cluster) { }
public System.TimeSpan DownRemovalMargin { get; }
public Akka.Actor.Props DowningActorProps { get; }
public System.TimeSpan StableAfter { get; }
Expand Down Expand Up @@ -417,7 +417,7 @@ namespace Akka.Cluster.SBR
}
public class SplitBrainResolverProvider : Akka.Cluster.IDowningProvider
{
public SplitBrainResolverProvider(Akka.Actor.ActorSystem system) { }
public SplitBrainResolverProvider(Akka.Actor.ActorSystem system, Akka.Cluster.Cluster cluster) { }
public System.TimeSpan DownRemovalMargin { get; }
public Akka.Actor.Props DowningActorProps { get; }
}
Expand Down
Expand Up @@ -19,7 +19,7 @@ namespace Akka.Cluster
{
public sealed class AutoDowning : Akka.Cluster.IDowningProvider
{
public AutoDowning(Akka.Actor.ActorSystem system) { }
public AutoDowning(Akka.Actor.ActorSystem system, Akka.Cluster.Cluster cluster) { }
public System.TimeSpan DownRemovalMargin { get; }
public Akka.Actor.Props DowningActorProps { get; }
}
Expand Down Expand Up @@ -266,13 +266,13 @@ namespace Akka.Cluster
}
public sealed class NoDowning : Akka.Cluster.IDowningProvider
{
public NoDowning(Akka.Actor.ActorSystem system) { }
public NoDowning(Akka.Actor.ActorSystem system, Akka.Cluster.Cluster cluster) { }
public System.TimeSpan DownRemovalMargin { get; }
public Akka.Actor.Props DowningActorProps { get; }
}
public sealed class SplitBrainResolver : Akka.Cluster.IDowningProvider
{
public SplitBrainResolver(Akka.Actor.ActorSystem system) { }
public SplitBrainResolver(Akka.Actor.ActorSystem system, Akka.Cluster.Cluster cluster) { }
public System.TimeSpan DownRemovalMargin { get; }
public Akka.Actor.Props DowningActorProps { get; }
public System.TimeSpan StableAfter { get; }
Expand Down Expand Up @@ -417,7 +417,7 @@ namespace Akka.Cluster.SBR
}
public class SplitBrainResolverProvider : Akka.Cluster.IDowningProvider
{
public SplitBrainResolverProvider(Akka.Actor.ActorSystem system) { }
public SplitBrainResolverProvider(Akka.Actor.ActorSystem system, Akka.Cluster.Cluster cluster) { }
public System.TimeSpan DownRemovalMargin { get; }
public Akka.Actor.Props DowningActorProps { get; }
}
Expand Down
Expand Up @@ -19,7 +19,7 @@ namespace Akka.Cluster
{
public sealed class AutoDowning : Akka.Cluster.IDowningProvider
{
public AutoDowning(Akka.Actor.ActorSystem system) { }
public AutoDowning(Akka.Actor.ActorSystem system, Akka.Cluster.Cluster cluster) { }
public System.TimeSpan DownRemovalMargin { get; }
public Akka.Actor.Props DowningActorProps { get; }
}
Expand Down Expand Up @@ -266,13 +266,13 @@ namespace Akka.Cluster
}
public sealed class NoDowning : Akka.Cluster.IDowningProvider
{
public NoDowning(Akka.Actor.ActorSystem system) { }
public NoDowning(Akka.Actor.ActorSystem system, Akka.Cluster.Cluster cluster) { }
public System.TimeSpan DownRemovalMargin { get; }
public Akka.Actor.Props DowningActorProps { get; }
}
public sealed class SplitBrainResolver : Akka.Cluster.IDowningProvider
{
public SplitBrainResolver(Akka.Actor.ActorSystem system) { }
public SplitBrainResolver(Akka.Actor.ActorSystem system, Akka.Cluster.Cluster cluster) { }
public System.TimeSpan DownRemovalMargin { get; }
public Akka.Actor.Props DowningActorProps { get; }
public System.TimeSpan StableAfter { get; }
Expand Down Expand Up @@ -417,7 +417,7 @@ namespace Akka.Cluster.SBR
}
public class SplitBrainResolverProvider : Akka.Cluster.IDowningProvider
{
public SplitBrainResolverProvider(Akka.Actor.ActorSystem system) { }
public SplitBrainResolverProvider(Akka.Actor.ActorSystem system, Akka.Cluster.Cluster cluster) { }
public System.TimeSpan DownRemovalMargin { get; }
public Akka.Actor.Props DowningActorProps { get; }
}
Expand Down
88 changes: 88 additions & 0 deletions src/core/Akka.Cluster.Tests/Bugfix5962Spec.cs
@@ -0,0 +1,88 @@
// //-----------------------------------------------------------------------
// // <copyright file="Bugfix5962Spec.cs" company="Akka.NET Project">
// // Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// // Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// // </copyright>
// //-----------------------------------------------------------------------

using System;
using System.Threading.Tasks;
using Akka.Configuration;
using Akka.TestKit;
using FluentAssertions;
using FluentAssertions.Extensions;
using Xunit;
using Xunit.Abstractions;
using static FluentAssertions.FluentActions;


namespace Akka.Cluster.Tests
{
public class Bugfix5962Spec : TestKit.Xunit2.TestKit
{
private static readonly Config Config = ConfigurationFactory.ParseString(@"
akka {
loglevel = INFO
actor {
provider = cluster
default-dispatcher = {
executor = channel-executor
channel-executor.priority = normal
}
# Adding this part in combination with the SplitBrainResolverProvider causes the error
internal-dispatcher = {
executor = channel-executor
channel-executor.priority = high
}
}
remote {
dot-netty.tcp {
port = 15508
hostname = ""127.0.0.1""
}
default-remote-dispatcher {
executor = channel-executor
channel-executor.priority = high
}
backoff-remote-dispatcher {
executor = channel-executor
channel-executor.priority = low
}
}
cluster {
seed-nodes = [""akka.tcp://Bugfix5962Spec@127.0.0.1:15508""]
downing-provider-class = ""Akka.Cluster.SBR.SplitBrainResolverProvider, Akka.Cluster""
}
}");

private readonly Type _timerMsgType;

public Bugfix5962Spec(ITestOutputHelper output): base(Config, nameof(Bugfix5962Spec), output)
{
_timerMsgType = Type.GetType("Akka.Actor.Scheduler.TimerScheduler+TimerMsg, Akka");
}

[Fact]
public async Task SBR_Should_work_with_channel_executor()
{
var latch = new TestLatch(1);
var cluster = Cluster.Get(Sys);
cluster.RegisterOnMemberUp(() =>
{
latch.CountDown();
});

var selection = Sys.ActorSelection("akka://Bugfix5962Spec/system/cluster/core/daemon/downingProvider");

await Awaiting(() => selection.ResolveOne(1.Seconds()))
.Should().NotThrowAsync("Downing provider should be alive. ActorSelection will throw an ActorNotFoundException if this fails");

// There should be no TimerMsg being sent to dead letter, this signals that the downing provider is dead
await EventFilter.DeadLetter(_timerMsgType).ExpectAsync(0, async () =>
{
latch.Ready(1.Seconds());
await Task.Delay(2.Seconds());
});
}
}
}
6 changes: 3 additions & 3 deletions src/core/Akka.Cluster.Tests/DowningProviderSpec.cs
Expand Up @@ -16,9 +16,9 @@

namespace Akka.Cluster.Tests
{
class FailingDowningProvider : IDowningProvider
internal class FailingDowningProvider : IDowningProvider
{
public FailingDowningProvider(ActorSystem system)
public FailingDowningProvider(ActorSystem system, Cluster cluster)
{
}

Expand All @@ -36,7 +36,7 @@ public Props DowningActorProps
class DummyDowningProvider : IDowningProvider
{
public readonly AtomicBoolean ActorPropsAccessed = new AtomicBoolean(false);
public DummyDowningProvider(ActorSystem system)
public DummyDowningProvider(ActorSystem system, Cluster cluster)
{
}

Expand Down
14 changes: 11 additions & 3 deletions src/core/Akka.Cluster.Tests/StartupWithOneThreadSpec.cs
Expand Up @@ -7,13 +7,15 @@

using System;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Actor.Dsl;
using Akka.Configuration;
using Akka.Event;
using Akka.TestKit;
using Akka.Util;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Cluster.Tests
{
Expand All @@ -25,12 +27,13 @@ public class StartupWithOneThreadSpec : AkkaSpec
akka.actor.default-dispatcher.dedicated-thread-pool.thread-count = 1
akka.actor.provider = ""Akka.Cluster.ClusterActorRefProvider, Akka.Cluster""
akka.remote.dot-netty.tcp.port = 0
akka.cluster.downing-provider-class = ""Akka.Cluster.SBR.SplitBrainResolverProvider, Akka.Cluster""
akka.cluster.split-brain-resolver.active-strategy = keep-majority
");

private long _startTime;

public StartupWithOneThreadSpec() : base(Configuration)
{
public StartupWithOneThreadSpec(ITestOutputHelper output) : base(Configuration, output) {
_startTime = MonotonicClock.GetTicks();
}

Expand All @@ -53,7 +56,7 @@ private Props TestProps
}

[Fact]
public void A_cluster_must_startup_with_one_dispatcher_thread()
public async Task A_cluster_must_startup_with_one_dispatcher_thread()
{
// This test failed before fixing https://github.com/akkadotnet/akka.net/issues/1959 when adding a sleep before the
// Await of GetClusterCoreRef in the Cluster extension constructor.
Expand All @@ -75,6 +78,11 @@ public void A_cluster_must_startup_with_one_dispatcher_thread()
ExpectMsg("hello");
ExpectMsg("hello");
ExpectMsg("hello");

// perform a self-join
var cts = new CancellationTokenSource(TimeSpan.FromSeconds((3)));
var selfAddress = cluster.SelfAddress;
await cluster.JoinSeedNodesAsync(new[] { selfAddress }, cts.Token);
}
}
}
31 changes: 13 additions & 18 deletions src/core/Akka.Cluster/AutoDown.cs
Expand Up @@ -30,10 +30,11 @@ internal sealed class AutoDown : AutoDownBase
/// TBD
/// </summary>
/// <param name="autoDownUnreachableAfter">TBD</param>
/// <param name="cluster"></param>
/// <returns>TBD</returns>
public static Props Props(TimeSpan autoDownUnreachableAfter)
public static Props Props(TimeSpan autoDownUnreachableAfter, Cluster cluster)
{
return Actor.Props.Create<AutoDown>(autoDownUnreachableAfter);
return Actor.Props.Create(() => new AutoDown(autoDownUnreachableAfter, cluster));
}

/// <summary>
Expand Down Expand Up @@ -76,14 +77,10 @@ public override int GetHashCode()
}

private readonly Cluster _cluster;

/// <summary>
/// TBD
/// </summary>
/// <param name="autoDownUnreachableAfter">TBD</param>
public AutoDown(TimeSpan autoDownUnreachableAfter) : base(autoDownUnreachableAfter)

public AutoDown(TimeSpan autoDownUnreachableAfter, Cluster cluster) : base(autoDownUnreachableAfter)
{
_cluster = Cluster.Get(Context.System);
_cluster = cluster;
}

/// <summary>
Expand Down Expand Up @@ -276,20 +273,18 @@ private void Remove(UniqueAddress node)
public sealed class AutoDowning : IDowningProvider
{
private readonly ActorSystem _system;

/// <summary>
/// TBD
/// </summary>
/// <param name="system">TBD</param>
public AutoDowning(ActorSystem system)
private readonly Cluster _cluster;

public AutoDowning(ActorSystem system, Cluster cluster)
{
_system = system;
_cluster = cluster;
}

/// <summary>
/// TBD
/// </summary>
public TimeSpan DownRemovalMargin => Cluster.Get(_system).Settings.DownRemovalMargin;
public TimeSpan DownRemovalMargin => _cluster.Settings.DownRemovalMargin;

/// <summary>
/// TBD
Expand All @@ -301,11 +296,11 @@ public Props DowningActorProps
{
get
{
var autoDownUnreachableAfter = Cluster.Get(_system).Settings.AutoDownUnreachableAfter;
var autoDownUnreachableAfter = _cluster.Settings.AutoDownUnreachableAfter;
if (!autoDownUnreachableAfter.HasValue)
throw new ConfigurationException("AutoDowning downing provider selected but 'akka.cluster.auto-down-unreachable-after' not set");

return AutoDown.Props(autoDownUnreachableAfter.Value);
return AutoDown.Props(autoDownUnreachableAfter.Value, _cluster);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Cluster/Cluster.cs
Expand Up @@ -132,7 +132,7 @@ public Cluster(ActorSystemImpl system)
Scheduler = CreateScheduler(system);

// it has to be lazy - otherwise if downing provider will init a cluster itself, it will deadlock
_downingProvider = new Lazy<IDowningProvider>(() => Akka.Cluster.DowningProvider.Load(Settings.DowningProviderType, system), LazyThreadSafetyMode.ExecutionAndPublication);
_downingProvider = new Lazy<IDowningProvider>(() => Akka.Cluster.DowningProvider.Load(Settings.DowningProviderType, system, this), LazyThreadSafetyMode.ExecutionAndPublication);

//create supervisor for daemons under path "/system/cluster"
_clusterDaemons = system.SystemActorOf(Props.Create(() => new ClusterDaemon(Settings)).WithDeploy(Deploy.Local), "cluster");
Expand Down
6 changes: 4 additions & 2 deletions src/core/Akka.Cluster/Configuration/Cluster.conf
Expand Up @@ -59,8 +59,10 @@ akka {
# * if it is 'off' the `NoDowning` provider is used and no automatic downing will be performed
# * if it is set to a duration the `AutoDowning` provider is with the configured downing duration
#
# If specified the value must be the fully qualified class name of a subclass of
# `akka.cluster.DowningProvider` having a public one argument constructor accepting an `ActorSystem`
# If specified the value must be the fully qualified class name of an implementation of
# `Akka.Cluster.IDowningProvider` having two argument constructor:
# - argument 1: accepting an `ActorSystem`
# - argument 2: accepting an `Akka.Cluster.Cluster`
downing-provider-class = ""

# If this is set to "off", the leader will not move 'Joining' members to 'Up' during a network
Expand Down

0 comments on commit 7149cb3

Please sign in to comment.