Skip to content

Commit

Permalink
Fix ChannelExecutor configuration backward compatibility problem (#5568)
Browse files Browse the repository at this point in the history
* Fix ChannelExecutor configuration backward compatibility problem

* Move channel scheduler configuration from `akka.actor.channel-scheduler` to `akka.channel-scheduler`
  • Loading branch information
Arkatufus committed Feb 7, 2022
1 parent d4cb079 commit 261910e
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 12 deletions.
67 changes: 67 additions & 0 deletions src/core/Akka.Tests/Dispatch/ChannelExecutorConfigurationSpec.cs
@@ -0,0 +1,67 @@
// //-----------------------------------------------------------------------
// // <copyright file="ChannelExecutorConfigurationSpec.cs" company="Akka.NET Project">
// // Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// // Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// // </copyright>
// //-----------------------------------------------------------------------

using System.Threading.Tasks;
using Akka.Actor;
using Akka.Configuration;
using Akka.Dispatch;
using Akka.TestKit;
using Xunit;
using FluentAssertions;

namespace Akka.Tests.Dispatch
{
public class ChannelExecutorConfigurationSpec : AkkaSpec
{
[Fact]
public void ChannelExecutor_config_should_be_injected_when_it_doesnt_exist()
{
var config = ConfigurationFactory.ParseString(@"executor = channel-executor");
var configurator = new ChannelExecutorConfigurator(config, Sys.Dispatchers.Prerequisites);
configurator.Priority.Should().Be(TaskSchedulerPriority.Normal);
}

[Fact]
public void ChannelExecutor_default_should_be_overriden_by_config()
{
var config = ConfigurationFactory.ParseString(@"
executor = channel-executor
channel-executor.priority = high");
var configurator = new ChannelExecutorConfigurator(config, Sys.Dispatchers.Prerequisites);
configurator.Priority.Should().Be(TaskSchedulerPriority.High);
}

[Fact]
public void ChannelExecutorConfigurator_should_use_default_when_config_is_null()
{
var configurator = new ChannelExecutorConfigurator(null, Sys.Dispatchers.Prerequisites);
configurator.Priority.Should().Be(TaskSchedulerPriority.Normal);
}

// backward compatibility test
[Fact]
public async Task ChannelExecutor_instantiation_should_not_throw_when_config_doesnt_exist()
{
var config = ConfigurationFactory.ParseString(@"
akka.actor.default-dispatcher = {
executor = channel-executor
}");

// Throws NRE in 1.4.29-32
var sys = ActorSystem.Create("test", config);

// Check that all settings are correct
var dispatcher = sys.Dispatchers.Lookup("akka.actor.default-dispatcher");
dispatcher.Configurator.Config.GetString("executor").Should().Be("channel-executor");

var configurator = new ChannelExecutorConfigurator(dispatcher.Configurator.Config, Sys.Dispatchers.Prerequisites);
configurator.Priority.Should().Be(TaskSchedulerPriority.Normal);

await sys.Terminate();
}
}
}
18 changes: 9 additions & 9 deletions src/core/Akka/Configuration/Pigeon.conf
Expand Up @@ -269,15 +269,6 @@ akka {
}
}

channel-scheduler {
parallelism-min = 4 #same as for ForkJoinDispatcher
parallelism-factor = 1 #same as for ForkJoinDispatcher
parallelism-max = 64 #same as for ForkJoinDispatcher
work-max = 10 #max executed work items in sequence until priority loop
work-interval = 500 #time target of executed work items in ms
work-step = 2 #target work item count in interval / burst
}

#used for GUI applications
synchronized-dispatcher {
type = "SynchronizedDispatcher"
Expand Down Expand Up @@ -561,6 +552,15 @@ akka {
}
}

channel-scheduler {
parallelism-min = 4 #same as for ForkJoinDispatcher
parallelism-factor = 1 #same as for ForkJoinDispatcher
parallelism-max = 64 #same as for ForkJoinDispatcher
work-max = 10 #max executed work items in sequence until priority loop
work-interval = 500 #time target of executed work items in ms
work-step = 2 #target work item count in interval / burst
}

# Used to set the behavior of the scheduler.
# Changing the default values may change the system behavior drastically so make
# sure you know what you're doing! See the Scheduler section of the Akka
Expand Down
10 changes: 8 additions & 2 deletions src/core/Akka/Dispatch/AbstractDispatcher.cs
Expand Up @@ -118,10 +118,16 @@ protected ExecutorServiceConfigurator(Config config, IDispatcherPrerequisites pr

internal sealed class ChannelExecutorConfigurator : ExecutorServiceConfigurator
{
private static readonly Config PriorityDefault = ConfigurationFactory.ParseString(@"
executor = channel-executor
channel-executor.priority = normal");

public ChannelExecutorConfigurator(Config config, IDispatcherPrerequisites prerequisites) : base(config, prerequisites)
{
var cfg = config.GetConfig("channel-executor");
Priority = (TaskSchedulerPriority)Enum.Parse(typeof(TaskSchedulerPriority), cfg.GetString("priority", "normal"), true);
config = config == null ? PriorityDefault : config.WithFallback(PriorityDefault);

var priority = config.GetString("channel-executor.priority", "normal");
Priority = (TaskSchedulerPriority)Enum.Parse(typeof(TaskSchedulerPriority), priority, true);
}

public TaskSchedulerPriority Priority { get; }
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka/Dispatch/ChannelSchedulerExtension.cs
Expand Up @@ -349,7 +349,7 @@ public void Dispose()
/// and to help execute queued works internaly.
/// It supports task-inlining only for task equal or above the own priority
/// </summary>
sealed class PriorityTaskScheduler : TaskScheduler, IDisposable
internal sealed class PriorityTaskScheduler : TaskScheduler, IDisposable
{
readonly Channel<Task> _channel;

Expand Down

0 comments on commit 261910e

Please sign in to comment.