Skip to content

Commit

Permalink
Add new ChannelTaskScheduler Extension (#5403)
Browse files Browse the repository at this point in the history
* add new ChannelTaskScheduler extension and set as the new 'channel-executor'

* rallback dependancy version and add comments

Co-authored-by: Aaron Stannard <aaron@petabridge.com>
  • Loading branch information
Zetanova and Aaronontheweb committed Dec 3, 2021
1 parent 7ec9417 commit ccb4670
Show file tree
Hide file tree
Showing 6 changed files with 526 additions and 19 deletions.
28 changes: 28 additions & 0 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt
Expand Up @@ -2509,6 +2509,22 @@ namespace Akka.Dispatch
public System.TimeSpan PushTimeout { get; }
public override Akka.Dispatch.MessageQueues.IMessageQueue Create(Akka.Actor.IActorRef owner, Akka.Actor.ActorSystem system) { }
}
public sealed class ChannelTaskScheduler : Akka.Actor.IExtension, System.IDisposable
{
public ChannelTaskScheduler(Akka.Actor.ExtendedActorSystem system) { }
public System.Threading.Tasks.TaskScheduler High { get; }
public System.Threading.Tasks.TaskScheduler Idle { get; }
public System.Threading.Tasks.TaskScheduler Low { get; }
public System.Threading.Tasks.TaskScheduler Normal { get; }
public void Dispose() { }
public static Akka.Dispatch.ChannelTaskScheduler Get(Akka.Actor.ActorSystem system) { }
public System.Threading.Tasks.TaskScheduler GetScheduler(Akka.Dispatch.TaskSchedulerPriority priority) { }
}
public sealed class ChannelTaskSchedulerProvider : Akka.Actor.ExtensionIdProvider<Akka.Dispatch.ChannelTaskScheduler>
{
public ChannelTaskSchedulerProvider() { }
public override Akka.Dispatch.ChannelTaskScheduler CreateExtension(Akka.Actor.ExtendedActorSystem system) { }
}
public sealed class CurrentSynchronizationContextDispatcher : Akka.Dispatch.Dispatcher
{
public CurrentSynchronizationContextDispatcher(Akka.Dispatch.MessageDispatcherConfigurator configurator, string id, int throughput, System.Nullable<long> throughputDeadlineTime, Akka.Dispatch.ExecutorServiceFactory executorServiceFactory, System.TimeSpan shutdownTimeout) { }
Expand Down Expand Up @@ -2677,6 +2693,18 @@ namespace Akka.Dispatch
public RejectedExecutionException(string message = null, System.Exception inner = null) { }
protected RejectedExecutionException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { }
}
public enum TaskSchedulerPriority
{
None = 0,
Idle = 4,
Background = 4,
Low = 5,
BelowNormal = 6,
Normal = 8,
AboveNormal = 10,
High = 13,
Realtime = 24,
}
public class ThreadPoolConfig
{
public ThreadPoolConfig(Akka.Configuration.Config config) { }
Expand Down
23 changes: 12 additions & 11 deletions src/core/Akka.Remote/Configuration/Remote.conf
Expand Up @@ -586,20 +586,21 @@ akka {

default-remote-dispatcher {
executor = fork-join-executor
fork-join-executor {
parallelism-min = 2
parallelism-factor = 0.5
parallelism-max = 16
}
fork-join-executor {
parallelism-min = 2
parallelism-factor = 0.5
parallelism-max = 16
}
channel-executor.priority = "high"
}

backoff-remote-dispatcher {
executor = fork-join-executor
fork-join-executor {
parallelism-min = 2
parallelism-max = 2
}
fork-join-executor {
parallelism-min = 2
parallelism-max = 2
}
channel-executor.priority = "low"
}
}

}
}
1 change: 1 addition & 0 deletions src/core/Akka/Akka.csproj
Expand Up @@ -19,6 +19,7 @@
<PackageReference Include="System.Collections.Immutable" Version="5.0.0" />
<PackageReference Include="System.Reflection.Emit" Version="4.7.0" />
<PackageReference Include="System.Collections.Immutable" Version="5.0.0" />
<PackageReference Include="System.Threading.Channels" Version="5.0.0" />
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)' == '$(NetStandardLibVersion)'">
Expand Down
10 changes: 10 additions & 0 deletions src/core/Akka/Configuration/Pigeon.conf
Expand Up @@ -269,6 +269,15 @@ akka {
}
}

channel-scheduler {
parallelism-min = 4 #same as for ForkJoinDispatcher
parallelism-factor = 1 #same as for ForkJoinDispatcher
parallelism-max = 64 #same as for ForkJoinDispatcher
work-max = 10 #max executed work items in sequence until priority loop
work-interval = 500 #time target of executed work items in ms
work-step = 2 #target work item count in interval / burst
}

#used for GUI applications
synchronized-dispatcher {
type = "SynchronizedDispatcher"
Expand Down Expand Up @@ -377,6 +386,7 @@ akka {
parallelism-factor = 1.0
parallelism-max = 64
}
channel-executor.priority = "high"
}

default-blocking-io-dispatcher {
Expand Down
15 changes: 7 additions & 8 deletions src/core/Akka/Dispatch/AbstractDispatcher.cs
Expand Up @@ -120,19 +120,18 @@ internal sealed class ChannelExecutorConfigurator : ExecutorServiceConfigurator
{
public ChannelExecutorConfigurator(Config config, IDispatcherPrerequisites prerequisites) : base(config, prerequisites)
{
var fje = config.GetConfig("fork-join-executor");
MaxParallelism = ThreadPoolConfig.ScaledPoolSize(
fje.GetInt("parallelism-min"),
fje.GetDouble("parallelism-factor", 1.0D), // the scalar-based factor to scale the threadpool size to
fje.GetInt("parallelism-max"));
var cfg = config.GetConfig("channel-executor");
Priority = (TaskSchedulerPriority)Enum.Parse(typeof(TaskSchedulerPriority), cfg.GetString("priority", "normal"), true);
}

public int MaxParallelism {get;}
public TaskSchedulerPriority Priority { get; }

public override ExecutorService Produce(string id)
{
Prerequisites.EventStream.Publish(new Debug($"ChannelExecutor-[id]", typeof(FixedConcurrencyTaskScheduler), $"Launched Dispatcher [{id}] with MaxParallelism=[{MaxParallelism}]"));
return new TaskSchedulerExecutor(id, new FixedConcurrencyTaskScheduler(MaxParallelism));
Prerequisites.EventStream.Publish(new Debug($"ChannelExecutor-[{id}]", typeof(TaskSchedulerExecutor), $"Launched Dispatcher [{id}] with Priority[{Priority}]"));

var scheduler = ChannelTaskScheduler.Get(Prerequisites.Settings.System).GetScheduler(Priority);
return new TaskSchedulerExecutor(id, scheduler);
}
}

Expand Down

0 comments on commit ccb4670

Please sign in to comment.