Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enable ChannelTaskScheduler to work inside Akka.Cluster without cau… #5920

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -27,7 +27,7 @@ public ClusterHeartbeatReceiverSpec(ITestOutputHelper output)
[Fact]
public void ClusterHeartbeatReceiver_should_respond_to_heartbeats_with_same_SeqNo_and_SendTime()
{
var heartbeater = Sys.ActorOf(ClusterHeartbeatReceiver.Props(() => Cluster.Get(Sys)));
var heartbeater = Sys.ActorOf(ClusterHeartbeatReceiver.Props(Cluster.Get(Sys)));
heartbeater.Tell(new Heartbeat(Cluster.Get(Sys).SelfAddress, 1, 2));
ExpectMsg<HeartbeatRsp>(new HeartbeatRsp(Cluster.Get(Sys).SelfUniqueAddress, 1, 2));
}
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Cluster.Tests/ClusterHeartbeatSenderSpec.cs
Expand Up @@ -23,7 +23,7 @@ class TestClusterHeartbeatSender : ClusterHeartbeatSender
{
private readonly TestProbe _probe;

public TestClusterHeartbeatSender(TestProbe probe)
public TestClusterHeartbeatSender(TestProbe probe) : base(Cluster.Get(Context.System))
{
_probe = probe;
}
Expand Down
3 changes: 1 addition & 2 deletions src/core/Akka.Cluster/Cluster.cs
Expand Up @@ -520,7 +520,7 @@ public ImmutableHashSet<string> SelfRoles
public DefaultFailureDetectorRegistry<Address> FailureDetector { get; }

/// <summary>
/// TBD
/// The downing provider used to execute automatic downing inside Akka.Cluster.
/// </summary>
public IDowningProvider DowningProvider => _downingProvider.Value;

Expand All @@ -535,7 +535,6 @@ public ImmutableHashSet<string> SelfRoles

private static IScheduler CreateScheduler(ActorSystem system)
{
//TODO: Whole load of stuff missing here!
return system.Scheduler;
}

Expand Down
4 changes: 2 additions & 2 deletions src/core/Akka.Cluster/ClusterDaemon.cs
Expand Up @@ -895,7 +895,7 @@ private void CreateChildren(Cluster cluster)
_cluster = cluster;
_coreSupervisor = Context.ActorOf(Props.Create<ClusterCoreSupervisor>(), "core");

Context.ActorOf(ClusterHeartbeatReceiver.Props(() => _cluster), "heartbeatReceiver");
Context.ActorOf(ClusterHeartbeatReceiver.Props(cluster), "heartbeatReceiver");
}

protected override void PostStop()
Expand Down Expand Up @@ -1333,7 +1333,7 @@ private void BecomeInitialized()
{
// start heartbeatSender here, and not in constructor to make sure that
// heartbeating doesn't start before Welcome is received
Context.ActorOf(Props.Create<ClusterHeartbeatSender>().WithDispatcher(_cluster.Settings.UseDispatcher),
Context.ActorOf(Props.Create(() => new ClusterHeartbeatSender(_cluster)).WithDispatcher(_cluster.Settings.UseDispatcher),
"heartbeatSender");
// make sure that join process is stopped
StopSeedNodeProcess();
Expand Down
21 changes: 11 additions & 10 deletions src/core/Akka.Cluster/ClusterHeartbeat.cs
Expand Up @@ -26,16 +26,16 @@ internal sealed class ClusterHeartbeatReceiver : UntypedActor
{
// Important - don't use Cluster.Get(Context.System) in constructor because that would
// cause deadlock. See startup sequence in ClusterDaemon.
private readonly Lazy<Cluster> _cluster;
private readonly Cluster _cluster;

public bool VerboseHeartbeat => _cluster.Value.Settings.VerboseHeartbeatLogging;
public bool VerboseHeartbeat => _cluster.Settings.VerboseHeartbeatLogging;

/// <summary>
/// TBD
/// </summary>
public ClusterHeartbeatReceiver(Func<Cluster> getCluster)
public ClusterHeartbeatReceiver(Cluster cluster)
{
_cluster = new Lazy<Cluster>(getCluster);
_cluster = cluster;
}

protected override void OnReceive(object message)
Expand All @@ -44,8 +44,8 @@ protected override void OnReceive(object message)
{
case ClusterHeartbeatSender.Heartbeat hb:
// TODO log the sequence nr once serializer is enabled
if(VerboseHeartbeat) _cluster.Value.CurrentInfoLogger.LogDebug("Heartbeat from [{0}]", hb.From);
Sender.Tell(new ClusterHeartbeatSender.HeartbeatRsp(_cluster.Value.SelfUniqueAddress,
if(VerboseHeartbeat) _cluster.CurrentInfoLogger.LogDebug("Heartbeat from [{0}]", hb.From);
Sender.Tell(new ClusterHeartbeatSender.HeartbeatRsp(_cluster.SelfUniqueAddress,
hb.SequenceNr, hb.CreationTimeNanos));
break;
default:
Expand All @@ -54,7 +54,7 @@ protected override void OnReceive(object message)
}
}

public static Props Props(Func<Cluster> getCluster)
public static Props Props(Cluster getCluster)
{
return Akka.Actor.Props.Create(() => new ClusterHeartbeatReceiver(getCluster));
}
Expand All @@ -76,11 +76,12 @@ internal class ClusterHeartbeatSender : ReceiveActor
private DateTime _tickTimestamp;

/// <summary>
/// TBD
/// Create a new instance of the <see cref="ClusterHeartbeatSender"/> and pass in a reference to the <see cref="Cluster"/>
/// to which it belongs.
/// </summary>
public ClusterHeartbeatSender()
public ClusterHeartbeatSender(Cluster cluster)
{
_cluster = Cluster.Get(Context.System);
_cluster = cluster;
var tickInitialDelay = _cluster.Settings.PeriodicTasksInitialDelay.Max(_cluster.Settings.HeartbeatInterval);
_tickTimestamp = DateTime.UtcNow + tickInitialDelay;

Expand Down