Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster startup hard block #5515

Open
wants to merge 6 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
74 changes: 74 additions & 0 deletions src/core/Akka.Cluster.Tests/StartupWithChannelExecutorSpec.cs
@@ -0,0 +1,74 @@
//-----------------------------------------------------------------------
// <copyright file="StartupWithOneThreadSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2021 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2021 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Threading;
using Akka.Actor;
using Akka.Actor.Dsl;
using Akka.Configuration;
using Akka.Event;
using Akka.TestKit;
using Akka.Util;
using Xunit;

namespace Akka.Cluster.Tests
{
public sealed class StartupWithChannelExecutorSpec : AkkaSpec
{
private static readonly Config Configuration = ConfigurationFactory.ParseString(@"
akka.actor.creation-timeout = 10s
akka.actor.provider = cluster
akka.actor.default-dispatcher.executor = channel-executor
akka.actor.internal-dispatcher.executor = channel-executor
akka.remote.default-remote-dispatcher.executor = channel-executor
akka.remote.backoff-remote-dispatcher.executor = channel-executor
").WithFallback(ConfigurationFactory.Default());

private long _startTime;

public StartupWithChannelExecutorSpec() : base(Configuration)
{
_startTime = MonotonicClock.GetTicks();
}

private Props TestProps
{
get
{
Action<IActorDsl> actor = (c =>
{
c.ReceiveAny((o, context) => context.Sender.Tell(o));
c.OnPreStart = context =>
{
var log = context.GetLogger();
var cluster = Cluster.Get(context.System);
log.Debug("Started {0} {1}", cluster.SelfAddress, Thread.CurrentThread.Name);
};
});
return Props.Create(() => new Act(actor));
}
}

[Fact]
public void A_cluster_must_startup_with_channel_executor_dispatcher()
{
var totalStartupTime = TimeSpan.FromTicks(MonotonicClock.GetTicks() - _startTime).TotalMilliseconds;
Assert.True(totalStartupTime < (Sys.Settings.CreationTimeout - TimeSpan.FromSeconds(2)).TotalMilliseconds);
Sys.ActorOf(TestProps).Tell("hello");
Sys.ActorOf(TestProps).Tell("hello");
Sys.ActorOf(TestProps).Tell("hello");

var cluster = Cluster.Get(Sys);
totalStartupTime = TimeSpan.FromTicks(MonotonicClock.GetTicks() - _startTime).TotalMilliseconds;
Assert.True(totalStartupTime < (Sys.Settings.CreationTimeout - TimeSpan.FromSeconds(2)).TotalMilliseconds);

ExpectMsg("hello");
ExpectMsg("hello");
ExpectMsg("hello");
}
}
}
6 changes: 5 additions & 1 deletion src/core/Akka.Cluster/Cluster.cs
Expand Up @@ -140,7 +140,11 @@ public Cluster(ActorSystemImpl system)
_readView = new ClusterReadView(this);

// force the underlying system to start
_clusterCore = GetClusterCoreRef().Result;
// and hard block the current thread
var clusterCoreTask = Task.Run(GetClusterCoreRef);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any flags we should consider adding here? i.e. would it be better to do Task.Factory.StartNew with LongRunning and DenyChildAttach?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_clusterCore = GetClusterCoreRef().Result was always locking and I think its still is.

I think what happend is that after the Ask got improved the current thread gets used for the actorcell dispatcher.
I don't know how/where exactly, but the child-actors of cluster-daemon are using it.

´Task.Run(GetClusterCoreRef)´ should force the ask on a different thread,
this is the only thing that matters, that it the current thread does not leak.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what happend is that after the Ask got improved the current thread gets used for the actorcell dispatcher.

Interesting. Would there be other implications if this is true?

cc: @Aaronontheweb

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The actor system extensions implementation and the locking cluster constructor are both just anti-patterns
and we need to rework it in the future.
#5447

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe its just the "ConfigureAwait(false)" of

return await _clusterDaemons.Ask<IActorRef>(new InternalClusterAction.GetClusterCoreRef(this), timeout).ConfigureAwait(false);

The Cluster Extension is called by the ClusterActorRefProvider and it is created in the ActorSystem Startup.
The thread is the same as of the ActorSystem creator and with the normal ForkJoinExecutor they will never mix.
But with the ChannelExecutor that uses the normal ThreadPool of dotnet, the awaiting thread can be used for an ActorCell.

var task = Task.Run(...);
task.Wait();

I hope that simply resolve the problem.

task.Result blocked at .net 4.5 for sure,
I don't know why/how the thread gets now reused.

if (!clusterCoreTask.Wait(System.Settings.CreationTimeout))
throw new TimeoutException("cluster startup");
_clusterCore = clusterCoreTask.Result;

system.RegisterOnTermination(Shutdown);

Expand Down
3 changes: 2 additions & 1 deletion src/core/Akka/Dispatch/AbstractDispatcher.cs
Expand Up @@ -6,7 +6,6 @@
//-----------------------------------------------------------------------

using System;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
Expand Down Expand Up @@ -144,6 +143,8 @@ public override ExecutorService Produce(string id)
var scheduler = ChannelTaskScheduler.Get(Prerequisites.Settings.System).GetScheduler(Priority);
return new TaskSchedulerExecutor(id, scheduler);
}


}

/// <summary>
Expand Down
19 changes: 11 additions & 8 deletions src/core/Akka/Dispatch/ChannelSchedulerExtension.cs
Expand Up @@ -62,14 +62,17 @@ public ChannelTaskScheduler(ExtendedActorSystem system)
//config channel-scheduler
var config = system.Settings.Config.GetConfig("akka.channel-scheduler");
_maximumConcurrencyLevel = ThreadPoolConfig.ScaledPoolSize(
config.GetInt("parallelism-min"),
config.GetDouble("parallelism-factor", 1.0D), // the scalar-based factor to scale the threadpool size to
config.GetInt("parallelism-max"));
config?.GetInt("parallelism-min", 4) ?? 4,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing here, these should be placed in a different PR

config?.GetDouble("parallelism-factor", 1.0D) ?? 1.0D, // the scalar-based factor to scale the threadpool size to
config?.GetInt("parallelism-max", 64) ?? 64);
_maximumConcurrencyLevel = Math.Max(_maximumConcurrencyLevel, 1);
_maxWork = Math.Max(config.GetInt("work-max", _maxWork), 3); //min 3 normal work in work-loop

_workInterval = config.GetInt("work-interval", _workInterval);
_workStep = config.GetInt("work-step", _workStep);

if (config != null)
{
_maxWork = Math.Max(config.GetInt("work-max", _maxWork), 3); //min 3 normal work in work-loop
_workInterval = config.GetInt("work-interval", _workInterval);
_workStep = config.GetInt("work-step", _workStep);
}

//create task schedulers
var channelOptions = new UnboundedChannelOptions()
Expand Down Expand Up @@ -276,7 +279,7 @@ private int DoWork(int workerId)
//the work loop
_threadPriority = TaskSchedulerPriority.Idle;
try
{
{
do
{
rounds++;
Expand Down