diff --git a/src/core/Akka.Cluster.Tests/StartupWithChannelExecutorSpec.cs b/src/core/Akka.Cluster.Tests/StartupWithChannelExecutorSpec.cs new file mode 100644 index 00000000000..713e686fd53 --- /dev/null +++ b/src/core/Akka.Cluster.Tests/StartupWithChannelExecutorSpec.cs @@ -0,0 +1,74 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2021 Lightbend Inc. +// Copyright (C) 2013-2021 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using System.Threading; +using Akka.Actor; +using Akka.Actor.Dsl; +using Akka.Configuration; +using Akka.Event; +using Akka.TestKit; +using Akka.Util; +using Xunit; + +namespace Akka.Cluster.Tests +{ + public sealed class StartupWithChannelExecutorSpec : AkkaSpec + { + private static readonly Config Configuration = ConfigurationFactory.ParseString(@" + akka.actor.creation-timeout = 10s + akka.actor.provider = cluster + akka.actor.default-dispatcher.executor = channel-executor + akka.actor.internal-dispatcher.executor = channel-executor + akka.remote.default-remote-dispatcher.executor = channel-executor + akka.remote.backoff-remote-dispatcher.executor = channel-executor + ").WithFallback(ConfigurationFactory.Default()); + + private long _startTime; + + public StartupWithChannelExecutorSpec() : base(Configuration) + { + _startTime = MonotonicClock.GetTicks(); + } + + private Props TestProps + { + get + { + Action actor = (c => + { + c.ReceiveAny((o, context) => context.Sender.Tell(o)); + c.OnPreStart = context => + { + var log = context.GetLogger(); + var cluster = Cluster.Get(context.System); + log.Debug("Started {0} {1}", cluster.SelfAddress, Thread.CurrentThread.Name); + }; + }); + return Props.Create(() => new Act(actor)); + } + } + + [Fact] + public void A_cluster_must_startup_with_channel_executor_dispatcher() + { + var totalStartupTime = TimeSpan.FromTicks(MonotonicClock.GetTicks() - _startTime).TotalMilliseconds; + Assert.True(totalStartupTime < (Sys.Settings.CreationTimeout - TimeSpan.FromSeconds(2)).TotalMilliseconds); + Sys.ActorOf(TestProps).Tell("hello"); + Sys.ActorOf(TestProps).Tell("hello"); + Sys.ActorOf(TestProps).Tell("hello"); + + var cluster = Cluster.Get(Sys); + totalStartupTime = TimeSpan.FromTicks(MonotonicClock.GetTicks() - _startTime).TotalMilliseconds; + Assert.True(totalStartupTime < (Sys.Settings.CreationTimeout - TimeSpan.FromSeconds(2)).TotalMilliseconds); + + ExpectMsg("hello"); + ExpectMsg("hello"); + ExpectMsg("hello"); + } + } +} diff --git a/src/core/Akka.Cluster/Cluster.cs b/src/core/Akka.Cluster/Cluster.cs index c9c2d7dfbac..9183423b5d0 100644 --- a/src/core/Akka.Cluster/Cluster.cs +++ b/src/core/Akka.Cluster/Cluster.cs @@ -140,7 +140,11 @@ public Cluster(ActorSystemImpl system) _readView = new ClusterReadView(this); // force the underlying system to start - _clusterCore = GetClusterCoreRef().Result; + // and hard block the current thread + var clusterCoreTask = Task.Run(GetClusterCoreRef); + if (!clusterCoreTask.Wait(System.Settings.CreationTimeout)) + throw new TimeoutException("cluster startup"); + _clusterCore = clusterCoreTask.Result; system.RegisterOnTermination(Shutdown); diff --git a/src/core/Akka/Dispatch/AbstractDispatcher.cs b/src/core/Akka/Dispatch/AbstractDispatcher.cs index c91ef4a0025..4895aed3a22 100644 --- a/src/core/Akka/Dispatch/AbstractDispatcher.cs +++ b/src/core/Akka/Dispatch/AbstractDispatcher.cs @@ -6,7 +6,6 @@ //----------------------------------------------------------------------- using System; -using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; using Akka.Actor; @@ -144,6 +143,8 @@ public override ExecutorService Produce(string id) var scheduler = ChannelTaskScheduler.Get(Prerequisites.Settings.System).GetScheduler(Priority); return new TaskSchedulerExecutor(id, scheduler); } + + } /// diff --git a/src/core/Akka/Dispatch/ChannelSchedulerExtension.cs b/src/core/Akka/Dispatch/ChannelSchedulerExtension.cs index 855f95ff109..ccb16a87eda 100644 --- a/src/core/Akka/Dispatch/ChannelSchedulerExtension.cs +++ b/src/core/Akka/Dispatch/ChannelSchedulerExtension.cs @@ -62,14 +62,17 @@ public ChannelTaskScheduler(ExtendedActorSystem system) //config channel-scheduler var config = system.Settings.Config.GetConfig("akka.channel-scheduler"); _maximumConcurrencyLevel = ThreadPoolConfig.ScaledPoolSize( - config.GetInt("parallelism-min"), - config.GetDouble("parallelism-factor", 1.0D), // the scalar-based factor to scale the threadpool size to - config.GetInt("parallelism-max")); + config?.GetInt("parallelism-min", 4) ?? 4, + config?.GetDouble("parallelism-factor", 1.0D) ?? 1.0D, // the scalar-based factor to scale the threadpool size to + config?.GetInt("parallelism-max", 64) ?? 64); _maximumConcurrencyLevel = Math.Max(_maximumConcurrencyLevel, 1); - _maxWork = Math.Max(config.GetInt("work-max", _maxWork), 3); //min 3 normal work in work-loop - - _workInterval = config.GetInt("work-interval", _workInterval); - _workStep = config.GetInt("work-step", _workStep); + + if (config != null) + { + _maxWork = Math.Max(config.GetInt("work-max", _maxWork), 3); //min 3 normal work in work-loop + _workInterval = config.GetInt("work-interval", _workInterval); + _workStep = config.GetInt("work-step", _workStep); + } //create task schedulers var channelOptions = new UnboundedChannelOptions() @@ -276,7 +279,7 @@ private int DoWork(int workerId) //the work loop _threadPriority = TaskSchedulerPriority.Idle; try - { + { do { rounds++;