Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enable ChannelTaskScheduler to work inside Akka.Cluster without causing errors inside /system actors #5861

Merged
3 changes: 3 additions & 0 deletions RELEASE_NOTES.md
@@ -1,3 +1,6 @@
#### 1.4.38 April 14 2022 ####
**Placeholder for nightlies**

#### 1.4.37 April 14 2022 ####
Akka.NET v1.4.37 is a minor release that contains some minor bug fixes.

Expand Down
2 changes: 1 addition & 1 deletion src/common.props
Expand Up @@ -2,7 +2,7 @@
<PropertyGroup>
<Copyright>Copyright © 2013-2021 Akka.NET Team</Copyright>
<Authors>Akka.NET Team</Authors>
<VersionPrefix>1.4.34</VersionPrefix>
<VersionPrefix>1.4.38</VersionPrefix>
<PackageIcon>akkalogo.png</PackageIcon>
<PackageProjectUrl>https://github.com/akkadotnet/akka.net</PackageProjectUrl>
<PackageLicenseUrl>https://github.com/akkadotnet/akka.net/blob/master/LICENSE</PackageLicenseUrl>
Expand Down
Expand Up @@ -27,7 +27,7 @@ public ClusterHeartbeatReceiverSpec(ITestOutputHelper output)
[Fact]
public void ClusterHeartbeatReceiver_should_respond_to_heartbeats_with_same_SeqNo_and_SendTime()
{
var heartbeater = Sys.ActorOf(ClusterHeartbeatReceiver.Props(() => Cluster.Get(Sys)));
var heartbeater = Sys.ActorOf(ClusterHeartbeatReceiver.Props(Cluster.Get(Sys)));
heartbeater.Tell(new Heartbeat(Cluster.Get(Sys).SelfAddress, 1, 2));
ExpectMsg<HeartbeatRsp>(new HeartbeatRsp(Cluster.Get(Sys).SelfUniqueAddress, 1, 2));
}
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Cluster.Tests/ClusterHeartbeatSenderSpec.cs
Expand Up @@ -23,7 +23,7 @@ class TestClusterHeartbeatSender : ClusterHeartbeatSender
{
private readonly TestProbe _probe;

public TestClusterHeartbeatSender(TestProbe probe)
public TestClusterHeartbeatSender(TestProbe probe) : base(Cluster.Get(Context.System))
{
_probe = probe;
}
Expand Down
3 changes: 1 addition & 2 deletions src/core/Akka.Cluster/Cluster.cs
Expand Up @@ -520,7 +520,7 @@ public ImmutableHashSet<string> SelfRoles
public DefaultFailureDetectorRegistry<Address> FailureDetector { get; }

/// <summary>
/// TBD
/// The downing provider used to execute automatic downing inside Akka.Cluster.
/// </summary>
public IDowningProvider DowningProvider => _downingProvider.Value;

Expand All @@ -535,7 +535,6 @@ public ImmutableHashSet<string> SelfRoles

private static IScheduler CreateScheduler(ActorSystem system)
{
//TODO: Whole load of stuff missing here!
return system.Scheduler;
}

Expand Down
4 changes: 2 additions & 2 deletions src/core/Akka.Cluster/ClusterDaemon.cs
Expand Up @@ -895,7 +895,7 @@ private void CreateChildren(Cluster cluster)
_cluster = cluster;
_coreSupervisor = Context.ActorOf(Props.Create<ClusterCoreSupervisor>(), "core");

Context.ActorOf(ClusterHeartbeatReceiver.Props(() => _cluster), "heartbeatReceiver");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid creating a delegate for the Lazy<Cluster> that we're removing from the HeartbeatReceiver

Context.ActorOf(ClusterHeartbeatReceiver.Props(cluster), "heartbeatReceiver");
}

protected override void PostStop()
Expand Down Expand Up @@ -1333,7 +1333,7 @@ private void BecomeInitialized()
{
// start heartbeatSender here, and not in constructor to make sure that
// heartbeating doesn't start before Welcome is received
Context.ActorOf(Props.Create<ClusterHeartbeatSender>().WithDispatcher(_cluster.Settings.UseDispatcher),
Context.ActorOf(Props.Create(() => new ClusterHeartbeatSender(_cluster)).WithDispatcher(_cluster.Settings.UseDispatcher),
"heartbeatSender");
// make sure that join process is stopped
StopSeedNodeProcess();
Expand Down
21 changes: 11 additions & 10 deletions src/core/Akka.Cluster/ClusterHeartbeat.cs
Expand Up @@ -26,16 +26,16 @@ internal sealed class ClusterHeartbeatReceiver : UntypedActor
{
// Important - don't use Cluster.Get(Context.System) in constructor because that would
// cause deadlock. See startup sequence in ClusterDaemon.
private readonly Lazy<Cluster> _cluster;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No real reason for a Lazy<Cluster> here since we're already receiving an instance in this actor's constructor at the time it's created.

private readonly Cluster _cluster;

public bool VerboseHeartbeat => _cluster.Value.Settings.VerboseHeartbeatLogging;
public bool VerboseHeartbeat => _cluster.Settings.VerboseHeartbeatLogging;

/// <summary>
/// TBD
/// </summary>
public ClusterHeartbeatReceiver(Func<Cluster> getCluster)
public ClusterHeartbeatReceiver(Cluster cluster)
{
_cluster = new Lazy<Cluster>(getCluster);
_cluster = cluster;
}

protected override void OnReceive(object message)
Expand All @@ -44,8 +44,8 @@ protected override void OnReceive(object message)
{
case ClusterHeartbeatSender.Heartbeat hb:
// TODO log the sequence nr once serializer is enabled
if(VerboseHeartbeat) _cluster.Value.CurrentInfoLogger.LogDebug("Heartbeat from [{0}]", hb.From);
Sender.Tell(new ClusterHeartbeatSender.HeartbeatRsp(_cluster.Value.SelfUniqueAddress,
if(VerboseHeartbeat) _cluster.CurrentInfoLogger.LogDebug("Heartbeat from [{0}]", hb.From);
Sender.Tell(new ClusterHeartbeatSender.HeartbeatRsp(_cluster.SelfUniqueAddress,
hb.SequenceNr, hb.CreationTimeNanos));
break;
default:
Expand All @@ -54,7 +54,7 @@ protected override void OnReceive(object message)
}
}

public static Props Props(Func<Cluster> getCluster)
public static Props Props(Cluster getCluster)
{
return Akka.Actor.Props.Create(() => new ClusterHeartbeatReceiver(getCluster));
}
Expand All @@ -76,11 +76,12 @@ internal class ClusterHeartbeatSender : ReceiveActor
private DateTime _tickTimestamp;

/// <summary>
/// TBD
/// Create a new instance of the <see cref="ClusterHeartbeatSender"/> and pass in a reference to the <see cref="Cluster"/>
/// to which it belongs.
/// </summary>
public ClusterHeartbeatSender()
public ClusterHeartbeatSender(Cluster cluster)
{
_cluster = Cluster.Get(Context.System);
_cluster = cluster;
var tickInitialDelay = _cluster.Settings.PeriodicTasksInitialDelay.Max(_cluster.Settings.HeartbeatInterval);
_tickTimestamp = DateTime.UtcNow + tickInitialDelay;

Expand Down