Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport of #5965 - pass Akka.Cluster.Cluster into IDowningProvider directly #5968

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -19,7 +19,7 @@ namespace Akka.Cluster
{
public sealed class AutoDowning : Akka.Cluster.IDowningProvider
{
public AutoDowning(Akka.Actor.ActorSystem system) { }
public AutoDowning(Akka.Actor.ActorSystem system, Akka.Cluster.Cluster cluster) { }
public System.TimeSpan DownRemovalMargin { get; }
public Akka.Actor.Props DowningActorProps { get; }
}
Expand Down Expand Up @@ -266,13 +266,13 @@ namespace Akka.Cluster
}
public sealed class NoDowning : Akka.Cluster.IDowningProvider
{
public NoDowning(Akka.Actor.ActorSystem system) { }
public NoDowning(Akka.Actor.ActorSystem system, Akka.Cluster.Cluster cluster) { }
public System.TimeSpan DownRemovalMargin { get; }
public Akka.Actor.Props DowningActorProps { get; }
}
public sealed class SplitBrainResolver : Akka.Cluster.IDowningProvider
{
public SplitBrainResolver(Akka.Actor.ActorSystem system) { }
public SplitBrainResolver(Akka.Actor.ActorSystem system, Akka.Cluster.Cluster cluster) { }
public System.TimeSpan DownRemovalMargin { get; }
public Akka.Actor.Props DowningActorProps { get; }
public System.TimeSpan StableAfter { get; }
Expand Down Expand Up @@ -417,7 +417,7 @@ namespace Akka.Cluster.SBR
}
public class SplitBrainResolverProvider : Akka.Cluster.IDowningProvider
{
public SplitBrainResolverProvider(Akka.Actor.ActorSystem system) { }
public SplitBrainResolverProvider(Akka.Actor.ActorSystem system, Akka.Cluster.Cluster cluster) { }
public System.TimeSpan DownRemovalMargin { get; }
public Akka.Actor.Props DowningActorProps { get; }
}
Expand Down
Expand Up @@ -19,7 +19,7 @@ namespace Akka.Cluster
{
public sealed class AutoDowning : Akka.Cluster.IDowningProvider
{
public AutoDowning(Akka.Actor.ActorSystem system) { }
public AutoDowning(Akka.Actor.ActorSystem system, Akka.Cluster.Cluster cluster) { }
public System.TimeSpan DownRemovalMargin { get; }
public Akka.Actor.Props DowningActorProps { get; }
}
Expand Down Expand Up @@ -266,13 +266,13 @@ namespace Akka.Cluster
}
public sealed class NoDowning : Akka.Cluster.IDowningProvider
{
public NoDowning(Akka.Actor.ActorSystem system) { }
public NoDowning(Akka.Actor.ActorSystem system, Akka.Cluster.Cluster cluster) { }
public System.TimeSpan DownRemovalMargin { get; }
public Akka.Actor.Props DowningActorProps { get; }
}
public sealed class SplitBrainResolver : Akka.Cluster.IDowningProvider
{
public SplitBrainResolver(Akka.Actor.ActorSystem system) { }
public SplitBrainResolver(Akka.Actor.ActorSystem system, Akka.Cluster.Cluster cluster) { }
public System.TimeSpan DownRemovalMargin { get; }
public Akka.Actor.Props DowningActorProps { get; }
public System.TimeSpan StableAfter { get; }
Expand Down Expand Up @@ -417,7 +417,7 @@ namespace Akka.Cluster.SBR
}
public class SplitBrainResolverProvider : Akka.Cluster.IDowningProvider
{
public SplitBrainResolverProvider(Akka.Actor.ActorSystem system) { }
public SplitBrainResolverProvider(Akka.Actor.ActorSystem system, Akka.Cluster.Cluster cluster) { }
public System.TimeSpan DownRemovalMargin { get; }
public Akka.Actor.Props DowningActorProps { get; }
}
Expand Down
Expand Up @@ -19,7 +19,7 @@ namespace Akka.Cluster
{
public sealed class AutoDowning : Akka.Cluster.IDowningProvider
{
public AutoDowning(Akka.Actor.ActorSystem system) { }
public AutoDowning(Akka.Actor.ActorSystem system, Akka.Cluster.Cluster cluster) { }
public System.TimeSpan DownRemovalMargin { get; }
public Akka.Actor.Props DowningActorProps { get; }
}
Expand Down Expand Up @@ -266,13 +266,13 @@ namespace Akka.Cluster
}
public sealed class NoDowning : Akka.Cluster.IDowningProvider
{
public NoDowning(Akka.Actor.ActorSystem system) { }
public NoDowning(Akka.Actor.ActorSystem system, Akka.Cluster.Cluster cluster) { }
public System.TimeSpan DownRemovalMargin { get; }
public Akka.Actor.Props DowningActorProps { get; }
}
public sealed class SplitBrainResolver : Akka.Cluster.IDowningProvider
{
public SplitBrainResolver(Akka.Actor.ActorSystem system) { }
public SplitBrainResolver(Akka.Actor.ActorSystem system, Akka.Cluster.Cluster cluster) { }
public System.TimeSpan DownRemovalMargin { get; }
public Akka.Actor.Props DowningActorProps { get; }
public System.TimeSpan StableAfter { get; }
Expand Down Expand Up @@ -417,7 +417,7 @@ namespace Akka.Cluster.SBR
}
public class SplitBrainResolverProvider : Akka.Cluster.IDowningProvider
{
public SplitBrainResolverProvider(Akka.Actor.ActorSystem system) { }
public SplitBrainResolverProvider(Akka.Actor.ActorSystem system, Akka.Cluster.Cluster cluster) { }
public System.TimeSpan DownRemovalMargin { get; }
public Akka.Actor.Props DowningActorProps { get; }
}
Expand Down
88 changes: 88 additions & 0 deletions src/core/Akka.Cluster.Tests/Bugfix5962Spec.cs
@@ -0,0 +1,88 @@
// //-----------------------------------------------------------------------
// // <copyright file="Bugfix5962Spec.cs" company="Akka.NET Project">
// // Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// // Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// // </copyright>
// //-----------------------------------------------------------------------

using System;
using System.Threading.Tasks;
using Akka.Configuration;
using Akka.TestKit;
using FluentAssertions;
using FluentAssertions.Extensions;
using Xunit;
using Xunit.Abstractions;
using static FluentAssertions.FluentActions;


namespace Akka.Cluster.Tests
{
public class Bugfix5962Spec : TestKit.Xunit2.TestKit
{
private static readonly Config Config = ConfigurationFactory.ParseString(@"
akka {
loglevel = INFO
actor {
provider = cluster
default-dispatcher = {
executor = channel-executor
channel-executor.priority = normal
}
# Adding this part in combination with the SplitBrainResolverProvider causes the error
internal-dispatcher = {
executor = channel-executor
channel-executor.priority = high
}
}
remote {
dot-netty.tcp {
port = 15508
hostname = ""127.0.0.1""
}
default-remote-dispatcher {
executor = channel-executor
channel-executor.priority = high
}
backoff-remote-dispatcher {
executor = channel-executor
channel-executor.priority = low
}
}
cluster {
seed-nodes = [""akka.tcp://Bugfix5962Spec@127.0.0.1:15508""]
downing-provider-class = ""Akka.Cluster.SBR.SplitBrainResolverProvider, Akka.Cluster""
}
}");

private readonly Type _timerMsgType;

public Bugfix5962Spec(ITestOutputHelper output): base(Config, nameof(Bugfix5962Spec), output)
{
_timerMsgType = Type.GetType("Akka.Actor.Scheduler.TimerScheduler+TimerMsg, Akka");
}

[Fact]
public async Task SBR_Should_work_with_channel_executor()
{
var latch = new TestLatch(1);
var cluster = Cluster.Get(Sys);
cluster.RegisterOnMemberUp(() =>
{
latch.CountDown();
});

var selection = Sys.ActorSelection("akka://Bugfix5962Spec/system/cluster/core/daemon/downingProvider");

await Awaiting(() => selection.ResolveOne(1.Seconds()))
.Should().NotThrowAsync("Downing provider should be alive. ActorSelection will throw an ActorNotFoundException if this fails");

// There should be no TimerMsg being sent to dead letter, this signals that the downing provider is dead
await EventFilter.DeadLetter(_timerMsgType).ExpectAsync(0, async () =>
{
latch.Ready(1.Seconds());
await Task.Delay(2.Seconds());
});
}
}
}
6 changes: 3 additions & 3 deletions src/core/Akka.Cluster.Tests/DowningProviderSpec.cs
Expand Up @@ -16,9 +16,9 @@

namespace Akka.Cluster.Tests
{
class FailingDowningProvider : IDowningProvider
internal class FailingDowningProvider : IDowningProvider
{
public FailingDowningProvider(ActorSystem system)
public FailingDowningProvider(ActorSystem system, Cluster cluster)
{
}

Expand All @@ -36,7 +36,7 @@ public Props DowningActorProps
class DummyDowningProvider : IDowningProvider
{
public readonly AtomicBoolean ActorPropsAccessed = new AtomicBoolean(false);
public DummyDowningProvider(ActorSystem system)
public DummyDowningProvider(ActorSystem system, Cluster cluster)
{
}

Expand Down
14 changes: 11 additions & 3 deletions src/core/Akka.Cluster.Tests/StartupWithOneThreadSpec.cs
Expand Up @@ -7,13 +7,15 @@

using System;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Actor.Dsl;
using Akka.Configuration;
using Akka.Event;
using Akka.TestKit;
using Akka.Util;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Cluster.Tests
{
Expand All @@ -25,12 +27,13 @@ public class StartupWithOneThreadSpec : AkkaSpec
akka.actor.default-dispatcher.dedicated-thread-pool.thread-count = 1
akka.actor.provider = ""Akka.Cluster.ClusterActorRefProvider, Akka.Cluster""
akka.remote.dot-netty.tcp.port = 0
akka.cluster.downing-provider-class = ""Akka.Cluster.SBR.SplitBrainResolverProvider, Akka.Cluster""
akka.cluster.split-brain-resolver.active-strategy = keep-majority
");

private long _startTime;

public StartupWithOneThreadSpec() : base(Configuration)
{
public StartupWithOneThreadSpec(ITestOutputHelper output) : base(Configuration, output) {
_startTime = MonotonicClock.GetTicks();
}

Expand All @@ -53,7 +56,7 @@ private Props TestProps
}

[Fact]
public void A_cluster_must_startup_with_one_dispatcher_thread()
public async Task A_cluster_must_startup_with_one_dispatcher_thread()
{
// This test failed before fixing https://github.com/akkadotnet/akka.net/issues/1959 when adding a sleep before the
// Await of GetClusterCoreRef in the Cluster extension constructor.
Expand All @@ -75,6 +78,11 @@ public void A_cluster_must_startup_with_one_dispatcher_thread()
ExpectMsg("hello");
ExpectMsg("hello");
ExpectMsg("hello");

// perform a self-join
var cts = new CancellationTokenSource(TimeSpan.FromSeconds((3)));
var selfAddress = cluster.SelfAddress;
await cluster.JoinSeedNodesAsync(new[] { selfAddress }, cts.Token);
}
}
}
31 changes: 13 additions & 18 deletions src/core/Akka.Cluster/AutoDown.cs
Expand Up @@ -30,10 +30,11 @@ internal sealed class AutoDown : AutoDownBase
/// TBD
/// </summary>
/// <param name="autoDownUnreachableAfter">TBD</param>
/// <param name="cluster"></param>
/// <returns>TBD</returns>
public static Props Props(TimeSpan autoDownUnreachableAfter)
public static Props Props(TimeSpan autoDownUnreachableAfter, Cluster cluster)
{
return Actor.Props.Create<AutoDown>(autoDownUnreachableAfter);
return Actor.Props.Create(() => new AutoDown(autoDownUnreachableAfter, cluster));
}

/// <summary>
Expand Down Expand Up @@ -76,14 +77,10 @@ public override int GetHashCode()
}

private readonly Cluster _cluster;

/// <summary>
/// TBD
/// </summary>
/// <param name="autoDownUnreachableAfter">TBD</param>
public AutoDown(TimeSpan autoDownUnreachableAfter) : base(autoDownUnreachableAfter)

public AutoDown(TimeSpan autoDownUnreachableAfter, Cluster cluster) : base(autoDownUnreachableAfter)
{
_cluster = Cluster.Get(Context.System);
_cluster = cluster;
}

/// <summary>
Expand Down Expand Up @@ -276,20 +273,18 @@ private void Remove(UniqueAddress node)
public sealed class AutoDowning : IDowningProvider
{
private readonly ActorSystem _system;

/// <summary>
/// TBD
/// </summary>
/// <param name="system">TBD</param>
public AutoDowning(ActorSystem system)
private readonly Cluster _cluster;

public AutoDowning(ActorSystem system, Cluster cluster)
{
_system = system;
_cluster = cluster;
}

/// <summary>
/// TBD
/// </summary>
public TimeSpan DownRemovalMargin => Cluster.Get(_system).Settings.DownRemovalMargin;
public TimeSpan DownRemovalMargin => _cluster.Settings.DownRemovalMargin;

/// <summary>
/// TBD
Expand All @@ -301,11 +296,11 @@ public Props DowningActorProps
{
get
{
var autoDownUnreachableAfter = Cluster.Get(_system).Settings.AutoDownUnreachableAfter;
var autoDownUnreachableAfter = _cluster.Settings.AutoDownUnreachableAfter;
if (!autoDownUnreachableAfter.HasValue)
throw new ConfigurationException("AutoDowning downing provider selected but 'akka.cluster.auto-down-unreachable-after' not set");

return AutoDown.Props(autoDownUnreachableAfter.Value);
return AutoDown.Props(autoDownUnreachableAfter.Value, _cluster);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Cluster/Cluster.cs
Expand Up @@ -132,7 +132,7 @@ public Cluster(ActorSystemImpl system)
Scheduler = CreateScheduler(system);

// it has to be lazy - otherwise if downing provider will init a cluster itself, it will deadlock
_downingProvider = new Lazy<IDowningProvider>(() => Akka.Cluster.DowningProvider.Load(Settings.DowningProviderType, system), LazyThreadSafetyMode.ExecutionAndPublication);
_downingProvider = new Lazy<IDowningProvider>(() => Akka.Cluster.DowningProvider.Load(Settings.DowningProviderType, system, this), LazyThreadSafetyMode.ExecutionAndPublication);

//create supervisor for daemons under path "/system/cluster"
_clusterDaemons = system.SystemActorOf(Props.Create(() => new ClusterDaemon(Settings)).WithDeploy(Deploy.Local), "cluster");
Expand Down
6 changes: 4 additions & 2 deletions src/core/Akka.Cluster/Configuration/Cluster.conf
Expand Up @@ -59,8 +59,10 @@ akka {
# * if it is 'off' the `NoDowning` provider is used and no automatic downing will be performed
# * if it is set to a duration the `AutoDowning` provider is with the configured downing duration
#
# If specified the value must be the fully qualified class name of a subclass of
# `akka.cluster.DowningProvider` having a public one argument constructor accepting an `ActorSystem`
# If specified the value must be the fully qualified class name of an implementation of
# `Akka.Cluster.IDowningProvider` having two argument constructor:
# - argument 1: accepting an `ActorSystem`
# - argument 2: accepting an `Akka.Cluster.Cluster`
downing-provider-class = ""

# If this is set to "off", the leader will not move 'Joining' members to 'Up' during a network
Expand Down