Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster startup hard block #5515

Open
wants to merge 6 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
74 changes: 74 additions & 0 deletions src/core/Akka.Cluster.Tests/StartupWithChannelExecutorSpec.cs
@@ -0,0 +1,74 @@
//-----------------------------------------------------------------------
// <copyright file="StartupWithOneThreadSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2021 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2021 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Threading;
using Akka.Actor;
using Akka.Actor.Dsl;
using Akka.Configuration;
using Akka.Event;
using Akka.TestKit;
using Akka.Util;
using Xunit;

namespace Akka.Cluster.Tests
{
public sealed class StartupWithChannelExecutorSpec : AkkaSpec
{
private static readonly Config Configuration = ConfigurationFactory.ParseString(@"
akka.actor.creation-timeout = 10s
akka.actor.provider = cluster
akka.actor.default-dispatcher.executor = channel-executor
akka.actor.internal-dispatcher.executor = channel-executor
akka.remote.default-remote-dispatcher.executor = channel-executor
akka.remote.backoff-remote-dispatcher.executor = channel-executor
").WithFallback(ConfigurationFactory.Default());

private long _startTime;

public StartupWithChannelExecutorSpec() : base(Configuration)
{
_startTime = MonotonicClock.GetTicks();
}

private Props TestProps
{
get
{
Action<IActorDsl> actor = (c =>
{
c.ReceiveAny((o, context) => context.Sender.Tell(o));
c.OnPreStart = context =>
{
var log = context.GetLogger();
var cluster = Cluster.Get(context.System);
log.Debug("Started {0} {1}", cluster.SelfAddress, Thread.CurrentThread.Name);
};
});
return Props.Create(() => new Act(actor));
}
}

[Fact]
public void A_cluster_must_startup_with_channel_executor_dispatcher()
{
var totalStartupTime = TimeSpan.FromTicks(MonotonicClock.GetTicks() - _startTime).TotalMilliseconds;
Assert.True(totalStartupTime < (Sys.Settings.CreationTimeout - TimeSpan.FromSeconds(2)).TotalMilliseconds);
Sys.ActorOf(TestProps).Tell("hello");
Sys.ActorOf(TestProps).Tell("hello");
Sys.ActorOf(TestProps).Tell("hello");

var cluster = Cluster.Get(Sys);
totalStartupTime = TimeSpan.FromTicks(MonotonicClock.GetTicks() - _startTime).TotalMilliseconds;
Assert.True(totalStartupTime < (Sys.Settings.CreationTimeout - TimeSpan.FromSeconds(2)).TotalMilliseconds);

ExpectMsg("hello");
ExpectMsg("hello");
ExpectMsg("hello");
}
}
}
6 changes: 5 additions & 1 deletion src/core/Akka.Cluster/Cluster.cs
Expand Up @@ -140,7 +140,11 @@ public Cluster(ActorSystemImpl system)
_readView = new ClusterReadView(this);

// force the underlying system to start
_clusterCore = GetClusterCoreRef().Result;
// and hard block the current thread
var clusterCoreTask = Task.Run(GetClusterCoreRef);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any flags we should consider adding here? i.e. would it be better to do Task.Factory.StartNew with LongRunning and DenyChildAttach?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_clusterCore = GetClusterCoreRef().Result was always locking and I think its still is.

I think what happend is that after the Ask got improved the current thread gets used for the actorcell dispatcher.
I don't know how/where exactly, but the child-actors of cluster-daemon are using it.

´Task.Run(GetClusterCoreRef)´ should force the ask on a different thread,
this is the only thing that matters, that it the current thread does not leak.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what happend is that after the Ask got improved the current thread gets used for the actorcell dispatcher.

Interesting. Would there be other implications if this is true?

cc: @Aaronontheweb

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The actor system extensions implementation and the locking cluster constructor are both just anti-patterns
and we need to rework it in the future.
#5447

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe its just the "ConfigureAwait(false)" of

return await _clusterDaemons.Ask<IActorRef>(new InternalClusterAction.GetClusterCoreRef(this), timeout).ConfigureAwait(false);

The Cluster Extension is called by the ClusterActorRefProvider and it is created in the ActorSystem Startup.
The thread is the same as of the ActorSystem creator and with the normal ForkJoinExecutor they will never mix.
But with the ChannelExecutor that uses the normal ThreadPool of dotnet, the awaiting thread can be used for an ActorCell.

var task = Task.Run(...);
task.Wait();

I hope that simply resolve the problem.

task.Result blocked at .net 4.5 for sure,
I don't know why/how the thread gets now reused.

if (!clusterCoreTask.Wait(System.Settings.CreationTimeout))
throw new TimeoutException("cluster startup");
_clusterCore = clusterCoreTask.Result;

system.RegisterOnTermination(Shutdown);

Expand Down
41 changes: 30 additions & 11 deletions src/core/Akka/Dispatch/AbstractDispatcher.cs
Expand Up @@ -6,7 +6,6 @@
//-----------------------------------------------------------------------

using System;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
Expand Down Expand Up @@ -59,9 +58,9 @@ public sealed class DefaultDispatcherPrerequisites : IDispatcherPrerequisites
/// <param name="settings">TBD</param>
/// <param name="mailboxes">TBD</param>
public DefaultDispatcherPrerequisites(
EventStream eventStream,
IScheduler scheduler,
Settings settings,
EventStream eventStream,
IScheduler scheduler,
Settings settings,
Mailboxes mailboxes)
{
Mailboxes = mailboxes;
Expand Down Expand Up @@ -120,8 +119,26 @@ internal sealed class ChannelExecutorConfigurator : ExecutorServiceConfigurator
{
public ChannelExecutorConfigurator(Config config, IDispatcherPrerequisites prerequisites) : base(config, prerequisites)
{
var cfg = config.GetConfig("channel-executor");
Priority = (TaskSchedulerPriority)Enum.Parse(typeof(TaskSchedulerPriority), cfg.GetString("priority", "normal"), true);
var priorityName = config.GetString("channel-executor.priority", "None") ?? "None";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless you're accessing ActorSystem.Settings.Config directly, always assume that a Config object passed into a method can be null. Need to check for null config instance here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes does not have anything to do with making cluster startup to block, is it? Can you remove these and move it to a new PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is required to use the channel-executor in the spec.

I added one spec that simple starts the cluster with the channel-executor and discovered this problems.

The spec itself will not trigger the original failure, because the testkit is using a SynchronisationContext
for the akka system startup

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but this PR can easily be broken into 2 PR, one depending to the other.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be a problem anymore, fixed in #5568

Priority = (TaskSchedulerPriority)Enum.Parse(typeof(TaskSchedulerPriority), priorityName, true);
if (Priority == TaskSchedulerPriority.None)
{
var dispatcherName = config.Root.GetObject().GetKey("name").ToString();
switch (dispatcherName)
{
case "internal-dispatcher":
case "default-remote-dispatcher":
Priority = TaskSchedulerPriority.High;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These default values should be baked in the default HOCON config, not in code; i.e. they have to be transparent to the user, user should only read the config file to figure out the default value of a setting, not dig through the source code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they are the default in the config.
But the testkit does not load the default config

break;
case "backoff-remote-dispatcher":
Priority = TaskSchedulerPriority.Low;
break;
default:
Priority = TaskSchedulerPriority.Normal;
break;
};
}

}

public TaskSchedulerPriority Priority { get; }
Expand All @@ -133,6 +150,8 @@ public override ExecutorService Produce(string id)
var scheduler = ChannelTaskScheduler.Get(Prerequisites.Settings.System).GetScheduler(Priority);
return new TaskSchedulerExecutor(id, scheduler);
}


}

/// <summary>
Expand All @@ -157,7 +176,7 @@ public override ExecutorService Produce(string id)
/// </summary>
/// <param name="config">TBD</param>
/// <param name="prerequisites">TBD</param>
public DefaultTaskSchedulerExecutorConfigurator(Config config, IDispatcherPrerequisites prerequisites)
public DefaultTaskSchedulerExecutorConfigurator(Config config, IDispatcherPrerequisites prerequisites)
: base(config, prerequisites)
{
}
Expand Down Expand Up @@ -225,13 +244,13 @@ private static DedicatedThreadPoolSettings ConfigureSettings(Config config)
{
var settings = new DedicatedThreadPoolSettings(
ThreadPoolConfig.ScaledPoolSize(
fje.GetInt("parallelism-min"),
1.0,
fje.GetInt("parallelism-min"),
1.0,
fje.GetInt("parallelism-max")),
name:config.GetString("id"));
name: config.GetString("id"));
return settings;
}

}
}

Expand Down
19 changes: 11 additions & 8 deletions src/core/Akka/Dispatch/ChannelSchedulerExtension.cs
Expand Up @@ -62,14 +62,17 @@ public ChannelTaskScheduler(ExtendedActorSystem system)
//config channel-scheduler
var config = system.Settings.Config.GetConfig("akka.channel-scheduler");
_maximumConcurrencyLevel = ThreadPoolConfig.ScaledPoolSize(
config.GetInt("parallelism-min"),
config.GetDouble("parallelism-factor", 1.0D), // the scalar-based factor to scale the threadpool size to
config.GetInt("parallelism-max"));
config?.GetInt("parallelism-min", 4) ?? 4,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing here, these should be placed in a different PR

config?.GetDouble("parallelism-factor", 1.0D) ?? 1.0D, // the scalar-based factor to scale the threadpool size to
config?.GetInt("parallelism-max", 64) ?? 64);
_maximumConcurrencyLevel = Math.Max(_maximumConcurrencyLevel, 1);
_maxWork = Math.Max(config.GetInt("work-max", _maxWork), 3); //min 3 normal work in work-loop

_workInterval = config.GetInt("work-interval", _workInterval);
_workStep = config.GetInt("work-step", _workStep);

if (config != null)
{
_maxWork = Math.Max(config.GetInt("work-max", _maxWork), 3); //min 3 normal work in work-loop
_workInterval = config.GetInt("work-interval", _workInterval);
_workStep = config.GetInt("work-step", _workStep);
}

//create task schedulers
var channelOptions = new UnboundedChannelOptions()
Expand Down Expand Up @@ -276,7 +279,7 @@ private int DoWork(int workerId)
//the work loop
_threadPriority = TaskSchedulerPriority.Idle;
try
{
{
do
{
rounds++;
Expand Down