diff --git a/ISSUE_TEMPLATE.md b/ISSUE_TEMPLATE.md deleted file mode 100644 index 2dce55c6e53..00000000000 --- a/ISSUE_TEMPLATE.md +++ /dev/null @@ -1,6 +0,0 @@ -When creating a new issue, please make sure the following information is part of your issue description. (if applicable). Thank You! - -- Which Akka.Net version you are using -- On which platform you are using Akka.Net -- A list of steps to reproduce the issue. Or an gist or github repo which can be easily used to reproduce your case. - diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 71ee73ab0cc..ab2fe10b5b1 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,3 +1,93 @@ +#### 1.4.21 June 16 2021 #### +**Maintenance Release for Akka.NET 1.4** + +Akka.NET v1.4.21 is a significant release that includes major performance improvements, bug fixes, and a major update to the [Akka.DependencyInjection NuGet package](https://getakka.net/articles/actors/dependency-injection.html). + +**Performance Improvements** +Akka.NET v1.4.21 includes some major performance fixes and improvements: + +* [`Ask` is now ~10% faster](https://github.com/akkadotnet/akka.net/pull/5051) +* [`MurmurHash` is 33% faster and allocates 0 memory](https://github.com/akkadotnet/akka.net/pull/5028) - used _heavily_ in DData, Cluster Sharding, and Consistent Hash Routers +* `ActorPath.Parse` went from 1672 ns/op to 527 ns/op - a 68% improvement in throughput and a 50% reduction in memory. See [#5039](https://github.com/akkadotnet/akka.net/pull/5039) and [#5068](https://github.com/akkadotnet/akka.net/pull/5068). +* [Akka.Remote: remove `ActorPath.ToString` call from `ResolveActorRefWithLocalAddress`](https://github.com/akkadotnet/akka.net/pull/5034) +* **Important**: [Revert `ThreadPool.SetMinThreads(0,0)`](https://github.com/akkadotnet/akka.net/pull/5059) - based on the input from users on "[Akka.NET v1.4.19: ChannelExecutor performance data](https://github.com/akkadotnet/akka.net/discussions/4983)" + +Our observed performance numbers for Akka.Remote show a significant increase in performance for v1.4.21 over v1.4.20: + +*Before* + +``` +PS> dotnet run -c Release --framework netcoreapp3.1 +OSVersion: Microsoft Windows NT 6.2.9200.0 +ProcessorCount: 16 +ClockSpeed: 0 MHZ +Actor Count: 32 +Messages sent/received per client: 200000 (2e5) +Is Server GC: True +Thread count: 109 + +Num clients, Total [msg], Msgs/sec, Total [ms] + 1, 200000, 113379, 1764.56 + 5, 1000000, 186429, 5364.05 + 10, 2000000, 185340, 10791.11 + 15, 3000000, 183218, 16374.06 + 20, 4000000, 179824, 22244.63 + 25, 5000000, 182716, 27365.89 + 30, 6000000, 182039, 32960.61 +``` + +*After* + +``` +PS> dotnet run -c Release --framework netcoreapp3.1 +OSVersion: Microsoft Windows NT 6.2.9200.0 +ProcessorCount: 16 +ClockSpeed: 0 MHZ +Actor Count: 32 +Messages sent/received per client: 200000 (2e5) +Is Server GC: True +Thread count: 111 + +Num clients, Total [msg], Msgs/sec, Total [ms] + 1, 200000, 109770, 1822.14 + 5, 1000000, 192902, 5184.79 + 10, 2000000, 191663, 10435.53 + 15, 3000000, 191339, 15679.11 + 20, 4000000, 192725, 20755.78 + 25, 5000000, 189754, 26350.14 + 30, 6000000, 189772, 31617.20 +``` + +> N.B. these after numbers don't benefit from the performance benefits we observed in v1.4.20 when we invoked `ThreadPool.SetMinThreads(0,0)`, which makes them even more impressive. + +**Akka.DependencyInjection Updates** +We had one major issue we implemented in v1.4.21 for Akka.DependencyInjection: [Abstraction of `ServiceProvider`, Improving Akka.DependencyInjection ](https://github.com/akkadotnet/akka.net/pull/4814) + +What this change did was: + +* Deprecate the `Akka.DependencyInjection.ServiceProvider` class in favor of the `Akka.DependencyInjection.DependencyResolver` class - to avoid namespace collision with Microsoft.Extensions.DependencyInjection.ServiceProvider; +* Deprecates the `Akka.DependencyInjection.ServiceProviderSetup` class in favor of the `Akka.DependencyInjection.DependencyResolverSetup` class for consistency reasons; +* `Akka.DependencyInjection.DependencyResolver` now takes an input of type [`IDependencyResolver`](https://getakka.net/api/Akka.DependencyInjection.IDependencyResolver.html), which allows users to abstract away the `IServiceProvider` and mock / replace it during unit testing; and +* Added some non-generic `Props` methods for dynamically spawning actors via DI. + +All of these changes are backwards-compatible with v1.4.20 and earlier - and the deprecation warnings will appear in your code when you upgrade. If you run into any [issues upgrading to Akka.DependencyInjection v1.4.21 please reply on this thread](https://github.com/akkadotnet/akka.net/discussions/5070)! + +**Other Changes and Fixes** + +* [Akka.Streams: A couple of fixes affecting the `FileSubscriber`](https://github.com/akkadotnet/akka.net/pull/5035) +* [Akka.DistributedData: memory leak when recovering events from LMDB data store](https://github.com/akkadotnet/akka.net/issues/5022) +* [Akka.DistributedData: port `VectorClock` performance optimizations to `VersionVector` and similar types](https://github.com/akkadotnet/akka.net/issues/4956) + +To see the [full set of fixes in Akka.NET v1.4.21, please see the milestone on Github](https://github.com/akkadotnet/akka.net/milestone/51). + +| COMMITS | LOC+ | LOC- | AUTHOR | +| --- | --- | --- | --- | +| 5 | 34 | 24 | Aaron Stannard | +| 4 | 196 | 77 | Gregorius Soedharmo | +| 3 | 3 | 3 | dependabot[bot] | +| 1 | 2 | 2 | Wessel Kranenborg | +| 1 | 1 | 1 | Martijn Schoemaker | + #### 1.4.20 May 12 2021 #### **Maintenance Release for Akka.NET 1.4** diff --git a/docs/articles/actors/dispatchers.md b/docs/articles/actors/dispatchers.md index 47bf76e5b8e..682af25188f 100644 --- a/docs/articles/actors/dispatchers.md +++ b/docs/articles/actors/dispatchers.md @@ -171,7 +171,7 @@ In Akka.NET v1.4.19 we will be introducing an opt-in feature, the `ChannelExecut During its initial development and benchmarks, we observed the following: -1. The `ChannelExecutor` tremendously reduced idle CPU and max busy CPU even during peak message throughput, primarily as a result of dynamically shrinking the total `ThreadPool` to only the necessary size. This resolves one of the largest complaints large users of Akka.NET have today. However, **in order for this setting to be effective `ThreadPool.SetMin(0,0)` must also be set**. We are considering doing this inside the `ActorSystem.Create` method, those settings don't work for you you can easily override them by simply calling `ThreadPool.SetMin(yourValue, yourValue)` again after `ActorSystem.Create` has exited. +1. The `ChannelExecutor` tremendously reduced idle CPU and max busy CPU even during peak message throughput, primarily as a result of dynamically shrinking the total `ThreadPool` to only the necessary size. This resolves one of the largest complaints large users of Akka.NET have today. 2. The `ChannelExecutor` actually beat the `ForkJoinDispatcher` and others on performance even in environments like Docker and bare metal on Windows. > [!NOTE] @@ -220,9 +220,6 @@ akka.remote.backoff-remote-dispatcher { This will enable the `ChannelExecutor` to run everywhere and all Akka.NET loads, with the exception of anything you manually allocate onto a `ForkJoinDispatcher` or `PinnedDispatcher`, will be managed by the `ThreadPool`. -> [!IMPORTANT] -> As of Akka.NET v1.4.19, we call `ThreadPool.SetMinThreads(0,0)` inside the `ActorSystem.Create` method as we've found that the default `ThreadPool` minimum values have a negative impact on performance. However, if this causes undesireable side effects for you inside your application you can always override those settings by calling `ThreadPool.SetMinThreads(yourValue, yourValue)` again after you've created your `ActorSystem`. - #### Common Dispatcher Configuration The following configuration keys are available for any dispatcher configuration: diff --git a/docs/articles/actors/receive-actor-api.md b/docs/articles/actors/receive-actor-api.md index 553d4194f30..273d41da26b 100644 --- a/docs/articles/actors/receive-actor-api.md +++ b/docs/articles/actors/receive-actor-api.md @@ -405,10 +405,10 @@ For more information on Tasks, check out the [MSDN documentation](https://msdn.m > When using task callbacks inside actors, you need to carefully avoid closing over the containing actor’s reference, i.e. do not call methods or access mutable state on the enclosing actor from within the callback. This would break the actor encapsulation and may introduce synchronization bugs and race conditions because the callback will be scheduled concurrently to the enclosing actor. Unfortunately there is not yet a way to detect these illegal accesses at compile time. ### Forward message -You can forward a message from one actor to another. This means that the original sender address/reference is maintained even though the message is going through a 'mediator'. This can be useful when writing actors that work as routers, load-balancers, replicators etc. You need to pass along your context variable as well. +You can forward a message from one actor to another. This means that the original sender address/reference is maintained even though the message is going through a 'mediator'. This can be useful when writing actors that work as routers, load-balancers, replicators etc. ```csharp -target.Forward(result, Context); +target.Forward(result); ``` ## Receive messages diff --git a/src/Akka.sln b/src/Akka.sln index c52324212ab..52dbe1dacd7 100644 --- a/src/Akka.sln +++ b/src/Akka.sln @@ -240,9 +240,11 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Akka.DependencyInjection.Te EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "AspNetCore", "AspNetCore", "{162F5991-EA57-4221-9B70-F9B6FEC18036}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Samples.Akka.AspNetCore", "examples\AspNetCore\Samples.Akka.AspNetCore\Samples.Akka.AspNetCore.csproj", "{D62F4AD6-318F-4ECC-B875-83FA9933A81B}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Samples.Akka.AspNetCore", "examples\AspNetCore\Samples.Akka.AspNetCore\Samples.Akka.AspNetCore.csproj", "{D62F4AD6-318F-4ECC-B875-83FA9933A81B}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SerializationBenchmarks", "benchmark\SerializationBenchmarks\SerializationBenchmarks.csproj", "{2E4B9584-42CC-4D17-B719-9F462B16C94D}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SerializationBenchmarks", "benchmark\SerializationBenchmarks\SerializationBenchmarks.csproj", "{2E4B9584-42CC-4D17-B719-9F462B16C94D}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DDataStressTest", "examples\Cluster\DData\DDataStressTest\DDataStressTest.csproj", "{44B3DDD6-6103-4E8F-8AC2-0F4BA3CF6B50}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -1123,6 +1125,18 @@ Global {2E4B9584-42CC-4D17-B719-9F462B16C94D}.Release|x64.Build.0 = Release|Any CPU {2E4B9584-42CC-4D17-B719-9F462B16C94D}.Release|x86.ActiveCfg = Release|Any CPU {2E4B9584-42CC-4D17-B719-9F462B16C94D}.Release|x86.Build.0 = Release|Any CPU + {44B3DDD6-6103-4E8F-8AC2-0F4BA3CF6B50}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {44B3DDD6-6103-4E8F-8AC2-0F4BA3CF6B50}.Debug|Any CPU.Build.0 = Debug|Any CPU + {44B3DDD6-6103-4E8F-8AC2-0F4BA3CF6B50}.Debug|x64.ActiveCfg = Debug|Any CPU + {44B3DDD6-6103-4E8F-8AC2-0F4BA3CF6B50}.Debug|x64.Build.0 = Debug|Any CPU + {44B3DDD6-6103-4E8F-8AC2-0F4BA3CF6B50}.Debug|x86.ActiveCfg = Debug|Any CPU + {44B3DDD6-6103-4E8F-8AC2-0F4BA3CF6B50}.Debug|x86.Build.0 = Debug|Any CPU + {44B3DDD6-6103-4E8F-8AC2-0F4BA3CF6B50}.Release|Any CPU.ActiveCfg = Release|Any CPU + {44B3DDD6-6103-4E8F-8AC2-0F4BA3CF6B50}.Release|Any CPU.Build.0 = Release|Any CPU + {44B3DDD6-6103-4E8F-8AC2-0F4BA3CF6B50}.Release|x64.ActiveCfg = Release|Any CPU + {44B3DDD6-6103-4E8F-8AC2-0F4BA3CF6B50}.Release|x64.Build.0 = Release|Any CPU + {44B3DDD6-6103-4E8F-8AC2-0F4BA3CF6B50}.Release|x86.ActiveCfg = Release|Any CPU + {44B3DDD6-6103-4E8F-8AC2-0F4BA3CF6B50}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -1230,6 +1244,7 @@ Global {162F5991-EA57-4221-9B70-F9B6FEC18036} = {D3AF8295-AEB5-4324-AA82-FCC0014AC310} {D62F4AD6-318F-4ECC-B875-83FA9933A81B} = {162F5991-EA57-4221-9B70-F9B6FEC18036} {2E4B9584-42CC-4D17-B719-9F462B16C94D} = {73108242-625A-4D7B-AA09-63375DBAE464} + {44B3DDD6-6103-4E8F-8AC2-0F4BA3CF6B50} = {C50E1A9E-820C-4E75-AE39-6F96A99AC4A7} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {03AD8E21-7507-4E68-A4E9-F4A7E7273164} diff --git a/src/benchmark/Akka.Benchmarks/Actor/ActorMemoryFootprintBenchmark.cs b/src/benchmark/Akka.Benchmarks/Actor/ActorMemoryFootprintBenchmark.cs new file mode 100644 index 00000000000..9cd1afd52ad --- /dev/null +++ b/src/benchmark/Akka.Benchmarks/Actor/ActorMemoryFootprintBenchmark.cs @@ -0,0 +1,57 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2021 Lightbend Inc. +// Copyright (C) 2013-2021 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Benchmarks.Configurations; +using BenchmarkDotNet.Attributes; +using BenchmarkDotNet.Engines; + +namespace Akka.Benchmarks.Actor +{ + [Config(typeof(MicroBenchmarkConfig))] + [SimpleJob(RunStrategy.Monitoring, targetCount: 25, warmupCount: 5)] + public class ActorMemoryFootprintBenchmark + { + public ActorSystem Sys; + public Props Props; + + [Params(10_000)] + public int SpawnCount { get; set; } + + [GlobalSetup] + public void Setup() + { + Sys = ActorSystem.Create("Bench"); + Props = Props.Create(() => new TempActor()); + } + + private class TempActor : UntypedActor + { + protected override void OnReceive(object message) + { + + } + } + + [Benchmark] + public void SpawnActor() + { + for(var i = 0; i < SpawnCount; i++) + Sys.ActorOf(Props); + } + + [GlobalCleanup] + public async Task Cleanup() + { + await Sys.Terminate(); + } + } +} diff --git a/src/benchmark/Akka.Benchmarks/Actor/ActorPathBenchmarks.cs b/src/benchmark/Akka.Benchmarks/Actor/ActorPathBenchmarks.cs index 02c72881c98..b9885c1290c 100644 --- a/src/benchmark/Akka.Benchmarks/Actor/ActorPathBenchmarks.cs +++ b/src/benchmark/Akka.Benchmarks/Actor/ActorPathBenchmarks.cs @@ -16,12 +16,16 @@ public class ActorPathBenchmarks { private ActorPath x; private ActorPath y; + private ActorPath _childPath; + private Address _sysAdr = new Address("akka.tcp", "system", "127.0.0.1", 1337); + private Address _otherAdr = new Address("akka.tcp", "system", "127.0.0.1", 1338); [GlobalSetup] public void Setup() { - x = new RootActorPath(new Address("akka.tcp", "system", "127.0.0.1", 1337), "user"); - y = new RootActorPath(new Address("akka.tcp", "system", "127.0.0.1", 1337), "system"); + x = new RootActorPath(_sysAdr, "user"); + y = new RootActorPath(_sysAdr, "system"); + _childPath = x / "parent" / "child"; } [Benchmark] @@ -45,7 +49,19 @@ public bool ActorPath_Equals() [Benchmark] public string ActorPath_ToString() { - return x.ToString(); + return _childPath.ToString(); + } + + [Benchmark] + public string ActorPath_ToSerializationFormat() + { + return _childPath.ToSerializationFormat(); + } + + [Benchmark] + public string ActorPath_ToSerializationFormatWithAddress() + { + return _childPath.ToSerializationFormatWithAddress(_otherAdr); } } } diff --git a/src/benchmark/Akka.Benchmarks/Actor/NameAndUidBenchmarks.cs b/src/benchmark/Akka.Benchmarks/Actor/NameAndUidBenchmarks.cs new file mode 100644 index 00000000000..c616f5527b4 --- /dev/null +++ b/src/benchmark/Akka.Benchmarks/Actor/NameAndUidBenchmarks.cs @@ -0,0 +1,34 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2021 Lightbend Inc. +// Copyright (C) 2013-2021 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using System.Collections.Generic; +using System.Text; +using Akka.Actor; +using Akka.Benchmarks.Configurations; +using BenchmarkDotNet.Attributes; + +namespace Akka.Benchmarks.Actor +{ + [Config(typeof(MicroBenchmarkConfig))] + public class NameAndUidBenchmarks + { + public const string ActorPath = "foo#11241311"; + + [Benchmark] + public NameAndUid ActorCell_SplitNameAndUid() + { + return ActorCell.SplitNameAndUid(ActorPath); + } + + [Benchmark] + public (string name, int uid) ActorCell_GetNameAndUid() + { + return ActorCell.GetNameAndUid(ActorPath); + } + } +} diff --git a/src/benchmark/Akka.Benchmarks/Akka.Benchmarks.csproj b/src/benchmark/Akka.Benchmarks/Akka.Benchmarks.csproj index 36bfa3fcd9a..8b2cee65503 100644 --- a/src/benchmark/Akka.Benchmarks/Akka.Benchmarks.csproj +++ b/src/benchmark/Akka.Benchmarks/Akka.Benchmarks.csproj @@ -7,7 +7,7 @@ - + diff --git a/src/benchmark/Akka.Benchmarks/DData/ORSetBenchmarks.cs b/src/benchmark/Akka.Benchmarks/DData/ORSetBenchmarks.cs new file mode 100644 index 00000000000..9e0c2c0e971 --- /dev/null +++ b/src/benchmark/Akka.Benchmarks/DData/ORSetBenchmarks.cs @@ -0,0 +1,129 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using Akka.Actor; +using Akka.Benchmarks.Configurations; +using Akka.Cluster; +using Akka.DistributedData; +using BenchmarkDotNet.Attributes; + +namespace Akka.Benchmarks.DData +{ + [Config(typeof(MicroBenchmarkConfig))] + public class ORSetBenchmarks + { + [Params(25)] + public int NumElements; + + [Params(10)] + public int NumNodes; + + [Params(100)] + public int Iterations; + + private UniqueAddress[] _nodes; + private string[] _elements; + + private readonly string _user1 = "{\"username\":\"john\",\"password\":\"coltrane\"}"; + private readonly string _user2 = "{\"username\":\"sonny\",\"password\":\"rollins\"}"; + private readonly string _user3 = "{\"username\":\"charlie\",\"password\":\"parker\"}"; + private readonly string _user4 = "{\"username\":\"charles\",\"password\":\"mingus\"}"; + + // has data from all nodes + private ORSet _c1 = ORSet.Empty; + + // has additional items from all nodes + private ORSet _c2 = ORSet.Empty; + + // has removed items from all nodes + private ORSet _c3 = ORSet.Empty; + + [GlobalSetup] + public void Setup() + { + var newNodes = new List(NumNodes); + foreach(var i in Enumerable.Range(0, NumNodes)){ + var address = new Address("akka.tcp", "Sys", "localhost", 2552 + i); + var uniqueAddress = new UniqueAddress(address, i); + newNodes.Add(uniqueAddress); + } + _nodes = newNodes.ToArray(); + + var newElements = new List(NumNodes); + foreach(var i in Enumerable.Range(0, NumElements)){ + newElements.Add(i.ToString()); + } + _elements = newElements.ToArray(); + + _c1 = ORSet.Empty; + foreach(var node in _nodes){ + _c1 = _c1.Add(node, _elements[0]); + } + + // add some data that _c2 doesn't have + _c2 = _c1; + foreach(var node in _nodes.Skip(NumNodes/2)){ + _c2 = _c2.Add(node, _elements[1]); + } + + _c3 = _c1; + foreach(var node in _nodes.Take(NumNodes/2)){ + _c3 = _c3.Remove(node, _elements[0]); + } + } + + [Benchmark] + public void Should_add_node_to_ORSet() + { + for (var i = 0; i < Iterations; i++) + { + var init = ORSet.Empty; + foreach (var node in _nodes) + { + init = init.Add(node, _elements[0]); + } + } + + } + + [Benchmark] + public void Should_add_elements_for_Same_node() + { + for (var i = 0; i < Iterations; i++) + { + var init = ORSet.Empty; + foreach (var element in _elements) + { + init = init.Add(_nodes[0], element); + } + } + } + + [Benchmark] + public void Should_merge_in_new_Elements_from_other_nodes(){ + for (var i = 0; i < Iterations; i++) + { + var c4 = _c1.Merge(_c2); + } + + } + + [Benchmark] + public void Should_merge_in_removed_Elements_from_other_nodes(){ + for (var i = 0; i < Iterations; i++) + { + var c4 = _c1.Merge(_c3); + } + + } + + [Benchmark] + public void Should_merge_in_add_and_removed_Elements_from_other_nodes(){ + for (var i = 0; i < Iterations; i++) + { + var c4 = _c1.Merge(_c2).Merge(_c3); + } + } + } +} diff --git a/src/benchmark/Akka.Benchmarks/DData/VersionVectorBenchmark.cs b/src/benchmark/Akka.Benchmarks/DData/VersionVectorBenchmark.cs new file mode 100644 index 00000000000..dd8fea98b24 --- /dev/null +++ b/src/benchmark/Akka.Benchmarks/DData/VersionVectorBenchmark.cs @@ -0,0 +1,174 @@ +using System; +using System.Collections.Generic; +using System.Collections.Immutable; +using System.Linq; +using System.Text; +using Akka.Benchmarks.Configurations; +using Akka.Cluster; +using Akka.DistributedData; +using BenchmarkDotNet.Attributes; +using FluentAssertions; +using static Akka.DistributedData.VersionVector; + +namespace Akka.Benchmarks.DData +{ + [Config(typeof(MicroBenchmarkConfig))] + public class VersionVectorBenchmarks + { + [Params(100)] + public int ClockSize; + + [Params(1000)] + public int Iterations; + + internal (VersionVector clock, ImmutableSortedSet nodes) CreateVectorClockOfSize(int size) + { + UniqueAddress GenerateUniqueAddress(int nodeCount){ + return new UniqueAddress(new Akka.Actor.Address("akka.tcp", "ClusterSys", "localhost", nodeCount), nodeCount); + } + + return Enumerable.Range(1, size) + .Aggregate((VersionVector.Empty, ImmutableSortedSet.Empty), + (tuple, i) => + { + var (vc, nodes) = tuple; + var node = GenerateUniqueAddress(i); + return (vc.Increment(node), nodes.Add(node)); + }); + } + + internal VersionVector CopyVectorClock(VersionVector vc) + { + var versions = ImmutableDictionary.Empty; + var enumerator = vc.VersionEnumerator; + while(enumerator.MoveNext()){ + var nodePair = enumerator.Current; + versions = versions.SetItem(nodePair.Key, nodePair.Value); + } + + return VersionVector.Create(versions); + } + + private UniqueAddress _firstNode; + private UniqueAddress _lastNode; + private UniqueAddress _middleNode; + private ImmutableSortedSet _nodes; + private VersionVector _vcBefore; + private VersionVector _vcBaseLast; + private VersionVector _vcAfterLast; + private VersionVector _vcConcurrentLast; + private VersionVector _vcBaseMiddle; + private VersionVector _vcAfterMiddle; + private VersionVector _vcConcurrentMiddle; + + [GlobalSetup] + public void Setup() + { + var (vcBefore, nodes) = CreateVectorClockOfSize(ClockSize); + _vcBefore = vcBefore; + _nodes = nodes; + + _firstNode = nodes.First(); + _lastNode = nodes.Last(); + _middleNode = nodes[ClockSize / 2]; + + _vcBaseLast = vcBefore.Increment(_lastNode); + _vcAfterLast = _vcBaseLast.Increment(_firstNode); + _vcConcurrentLast = _vcBaseLast.Increment(_lastNode); + _vcBaseMiddle = _vcBefore.Increment(_middleNode); + _vcAfterMiddle = _vcBaseMiddle.Increment(_firstNode); + _vcConcurrentMiddle = _vcBaseMiddle.Increment(_middleNode); + } + + private void CheckThunkFor(VersionVector vc1, VersionVector vc2, Action thunk, int times) + { + var vcc1 = CopyVectorClock(vc1); + var vcc2 = CopyVectorClock(vc2); + for (var i = 0; i < times; i++) + { + thunk(vcc1, vcc2); + } + } + + private void CompareTo(VersionVector vc1, VersionVector vc2, Ordering ordering) + { + vc1.Compare(vc2).Should().Be(ordering); + } + + private void NotEqual(VersionVector vc1, VersionVector vc2) + { + (vc1 == vc2).Should().BeFalse(); + } + + private void Merge(VersionVector vc1, VersionVector vc2) + { + vc1.Merge(vc2); + } + + [Benchmark] + public void VectorClock_comparisons_should_compare_same() + { + CheckThunkFor(_vcBaseLast, _vcBaseLast, (clock, vectorClock) => CompareTo(clock, vectorClock, Ordering.Same), Iterations); + } + + [Benchmark] + public void VectorClock_comparisons_should_compare_Before_last() + { + CheckThunkFor(_vcBefore, _vcBaseLast, (clock, vectorClock) => CompareTo(clock, vectorClock, Ordering.Before), Iterations); + } + + [Benchmark] + public void VectorClock_comparisons_should_compare_After_last() + { + CheckThunkFor(_vcAfterLast, _vcBaseLast, (clock, vectorClock) => CompareTo(clock, vectorClock, Ordering.After), Iterations); + } + + [Benchmark] + public void VectorClock_comparisons_should_compare_Concurrent_last() + { + CheckThunkFor(_vcAfterLast, _vcConcurrentLast, (clock, vectorClock) => CompareTo(clock, vectorClock, Ordering.Concurrent), Iterations); + } + + [Benchmark] + public void VectorClock_comparisons_should_compare_Before_middle() + { + CheckThunkFor(_vcBefore, _vcBaseMiddle, (clock, vectorClock) => CompareTo(clock, vectorClock, Ordering.Before), Iterations); + } + + [Benchmark] + public void VectorClock_comparisons_should_compare_After_middle() + { + CheckThunkFor(_vcAfterMiddle, _vcBaseMiddle, (clock, vectorClock) => CompareTo(clock, vectorClock, Ordering.After), Iterations); + } + + [Benchmark] + public void VectorClock_comparisons_should_compare_Concurrent_middle() + { + CheckThunkFor(_vcAfterMiddle, _vcConcurrentMiddle, (clock, vectorClock) => CompareTo(clock, vectorClock, Ordering.Concurrent), Iterations); + } + + [Benchmark] + public void VectorClock_comparisons_should_compare_notEquals_Before_Middle() + { + CheckThunkFor(_vcBefore, _vcBaseMiddle, (clock, vectorClock) => NotEqual(clock, vectorClock), Iterations); + } + + [Benchmark] + public void VectorClock_comparisons_should_compare_notEquals_After_Middle() + { + CheckThunkFor(_vcAfterMiddle, _vcBaseMiddle, (clock, vectorClock) => NotEqual(clock, vectorClock), Iterations); + } + + [Benchmark] + public void VectorClock_comparisons_should_compare_notEquals_Concurrent_Middle() + { + CheckThunkFor(_vcAfterMiddle, _vcConcurrentMiddle, (clock, vectorClock) => NotEqual(clock, vectorClock), Iterations); + } + + [Benchmark] + public void VersionVector_merge_Multi_Multi() + { + CheckThunkFor(_vcBefore, _vcAfterLast, (vector, versionVector) => Merge(vector, versionVector), Iterations); + } + } +} \ No newline at end of file diff --git a/src/benchmark/Akka.Benchmarks/Remoting/FastHashBenchmarks.cs b/src/benchmark/Akka.Benchmarks/Remoting/FastHashBenchmarks.cs new file mode 100644 index 00000000000..87d3155de53 --- /dev/null +++ b/src/benchmark/Akka.Benchmarks/Remoting/FastHashBenchmarks.cs @@ -0,0 +1,27 @@ +using System; +using System.Collections.Generic; +using System.Text; +using Akka.Benchmarks.Configurations; +using Akka.Remote.Serialization; +using BenchmarkDotNet.Attributes; + +namespace Akka.Benchmarks.Remoting +{ + [Config(typeof(MicroBenchmarkConfig))] + public class FastHashBenchmarks + { + public const string HashKey1 = "hash1"; + + [Benchmark] + public int FastHash_OfString() + { + return FastHash.OfString(HashKey1); + } + + [Benchmark] + public int FastHash_OfStringUnsafe() + { + return FastHash.OfStringFast(HashKey1); + } + } +} diff --git a/src/benchmark/SerializationBenchmarks/SerializationBenchmarks.csproj b/src/benchmark/SerializationBenchmarks/SerializationBenchmarks.csproj index 0f7a1838ad3..dbf48b2cf2b 100644 --- a/src/benchmark/SerializationBenchmarks/SerializationBenchmarks.csproj +++ b/src/benchmark/SerializationBenchmarks/SerializationBenchmarks.csproj @@ -6,7 +6,7 @@ - + diff --git a/src/common.props b/src/common.props index 809e3c7a451..3d759a2c138 100644 --- a/src/common.props +++ b/src/common.props @@ -10,11 +10,11 @@ 2.4.1 - 16.9.4 + 16.10.0 0.10.1 13.0.1 2.0.1 - 3.16.0 + 3.17.2 netcoreapp3.1 net5.0 net471 diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingRememberEntitiesSpec.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingRememberEntitiesSpec.cs index 77ad9462c0b..ebaa63fdeb3 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingRememberEntitiesSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingRememberEntitiesSpec.cs @@ -13,6 +13,7 @@ using Akka.Cluster.TestKit; using Akka.Configuration; using Akka.Remote.TestKit; +using Akka.TestKit; using Akka.Util; using FluentAssertions; @@ -21,13 +22,15 @@ namespace Akka.Cluster.Sharding.Tests public abstract class ClusterShardingRememberEntitiesSpecConfig : MultiNodeConfig { public string Mode { get; } + public bool RememberEntities { get; } public RoleName First { get; } public RoleName Second { get; } public RoleName Third { get; } - protected ClusterShardingRememberEntitiesSpecConfig(string mode) + protected ClusterShardingRememberEntitiesSpecConfig(string mode, bool rememberEntities) { Mode = mode; + RememberEntities = rememberEntities; First = Role("first"); Second = Role("second"); Third = Role("third"); @@ -78,24 +81,43 @@ class = ""Akka.Cluster.Sharding.Tests.MemoryJournalShared, Akka.Cluster.Sharding } public class PersistentClusterShardingRememberEntitiesSpecConfig : ClusterShardingRememberEntitiesSpecConfig { - public PersistentClusterShardingRememberEntitiesSpecConfig() : base("persistence") { } + public PersistentClusterShardingRememberEntitiesSpecConfig(bool rememberEntities) : base("persistence", rememberEntities) { } } public class DDataClusterShardingRememberEntitiesSpecConfig : ClusterShardingRememberEntitiesSpecConfig { - public DDataClusterShardingRememberEntitiesSpecConfig() : base("ddata") { } + public DDataClusterShardingRememberEntitiesSpecConfig(bool rememberEntities) : base("ddata", rememberEntities) { } } - public class PersistentClusterShardingRememberEntitiesSpec : ClusterShardingRememberEntitiesSpec + public abstract class PersistentClusterShardingRememberEntitiesSpec : ClusterShardingRememberEntitiesSpec { - public PersistentClusterShardingRememberEntitiesSpec() : this(new PersistentClusterShardingRememberEntitiesSpecConfig()) { } - protected PersistentClusterShardingRememberEntitiesSpec(PersistentClusterShardingRememberEntitiesSpecConfig config) : base(config, typeof(PersistentClusterShardingRememberEntitiesSpec)) { } + protected PersistentClusterShardingRememberEntitiesSpec(bool rememberEntities) : this(new PersistentClusterShardingRememberEntitiesSpecConfig(rememberEntities)) { } + private PersistentClusterShardingRememberEntitiesSpec(PersistentClusterShardingRememberEntitiesSpecConfig config) : base(config, typeof(PersistentClusterShardingRememberEntitiesSpec)) { } } - // DData has no support for remember-entities at this point - public class DDataClusterShardingRememberEntitiesSpec : ClusterShardingRememberEntitiesSpec + public abstract class DDataClusterShardingRememberEntitiesSpec : ClusterShardingRememberEntitiesSpec { - public DDataClusterShardingRememberEntitiesSpec() : this(new DDataClusterShardingRememberEntitiesSpecConfig()) { } - protected DDataClusterShardingRememberEntitiesSpec(DDataClusterShardingRememberEntitiesSpecConfig config) : base(config, typeof(PersistentClusterShardingRememberEntitiesSpec)) { } + protected DDataClusterShardingRememberEntitiesSpec(bool rememberEntities) : this(new DDataClusterShardingRememberEntitiesSpecConfig(rememberEntities)) { } + private DDataClusterShardingRememberEntitiesSpec(DDataClusterShardingRememberEntitiesSpecConfig config) : base(config, typeof(DDataClusterShardingRememberEntitiesSpec)) { } + } + + public class PersistentClusterShardingRememberEntitiesDefaultSpec : PersistentClusterShardingRememberEntitiesSpec + { + public PersistentClusterShardingRememberEntitiesDefaultSpec() : base(false) { } + } + + public class PersistentClusterShardingRememberEntitiesEnabledSpec : PersistentClusterShardingRememberEntitiesSpec + { + public PersistentClusterShardingRememberEntitiesEnabledSpec() : base(true) { } + } + + public class DDataClusterShardingRememberEntitiesDefaultSpec : DDataClusterShardingRememberEntitiesSpec + { + public DDataClusterShardingRememberEntitiesDefaultSpec() : base(false) { } + } + + public class DDataClusterShardingRememberEntitiesEnabledSpec : DDataClusterShardingRememberEntitiesSpec + { + public DDataClusterShardingRememberEntitiesEnabledSpec() : base(true) { } } public abstract class ClusterShardingRememberEntitiesSpec : MultiNodeClusterSpec @@ -141,7 +163,7 @@ protected override bool Receive(object message) return null; }; - private Lazy _region; + private readonly Lazy _region; private readonly ClusterShardingRememberEntitiesSpecConfig _config; private readonly List _storageLocations; @@ -191,23 +213,31 @@ private void Join(RoleName from, RoleName to) private void StartSharding(ActorSystem sys, IActorRef probe) { - var allocationStrategy = ShardAllocationStrategy.LeastShardAllocationStrategy(absoluteLimit: 1, relativeLimit: 0.1); ClusterSharding.Get(sys).Start( typeName: "Entity", entityProps: Props.Create(() => new TestEntity(probe)), - settings: ClusterShardingSettings.Create(Sys).WithRememberEntities(true), + settings: ClusterShardingSettings.Create(Sys).WithRememberEntities(_config.RememberEntities), extractEntityId: extractEntityId, extractShardId: extractShardId); } + private Started ExpectEntityRestarted(ActorSystem sys, int evt, TestProbe probe, TestProbe entityProbe, TimeSpan? remaining = null) + { + if (!_config.RememberEntities) + { + probe.Send(ClusterSharding.Get(sys).ShardRegion("Entity"), evt); + probe.ExpectMsg(1); + } + + return entityProbe.ExpectMsg(remaining ?? TimeSpan.FromSeconds(30)); + } + [MultiNodeFact] public void Cluster_sharding_with_remember_entities_specs() { if (!IsDDataMode) Cluster_sharding_with_remember_entities_should_setup_shared_journal(); Cluster_sharding_with_remember_entities_should_start_remembered_entities_when_coordinator_fail_over(); - - // https://github.com/akkadotnet/akka.net/issues/4262 - need to resolve this and then we can remove if statement - if (!IsDDataMode) Cluster_sharding_with_remember_entities_should_start_remembered_entities_in_new_cluster(); + Cluster_sharding_with_remember_entities_should_start_remembered_entities_in_new_cluster(); } public void Cluster_sharding_with_remember_entities_should_setup_shared_journal() @@ -245,19 +275,23 @@ public void Cluster_sharding_with_remember_entities_should_start_remembered_enti { Within(TimeSpan.FromSeconds(30), () => { + var probe = CreateTestProbe(); + var entityProbe = CreateTestProbe(); + Join(_config.Second, _config.Second); RunOn(() => { - StartSharding(Sys, TestActor); - _region.Value.Tell(1); - ExpectMsg(); + StartSharding(Sys, entityProbe.Ref); + probe.Send(_region.Value, 1); + probe.ExpectMsg(1); + entityProbe.ExpectMsg(); }, _config.Second); EnterBarrier("second-started"); Join(_config.Third, _config.Second); RunOn(() => { - StartSharding(Sys, TestActor); + StartSharding(Sys, entityProbe.Ref); }, _config.Third); RunOn(() => @@ -289,7 +323,7 @@ public void Cluster_sharding_with_remember_entities_should_start_remembered_enti RunOn(() => { - ExpectMsg(Remaining); + ExpectEntityRestarted(Sys, 1, probe, entityProbe, Remaining); }, _config.Third); EnterBarrier("after-2"); @@ -313,6 +347,7 @@ public void Cluster_sharding_with_remember_entities_should_start_remembered_enti // no nodes left of the original cluster, start a new cluster var sys2 = ActorSystem.Create(Sys.Name, Sys.Settings.Config); + var entityProbe2 = CreateTestProbe(sys2); var probe2 = CreateTestProbe(sys2); if (!IsDDataMode) @@ -327,8 +362,8 @@ public void Cluster_sharding_with_remember_entities_should_start_remembered_enti } Cluster.Get(sys2).Join(Cluster.Get(sys2).SelfAddress); - StartSharding(sys2, probe2.Ref); - probe2.ExpectMsg(TimeSpan.FromSeconds(20)); + StartSharding(sys2, entityProbe2.Ref); + ExpectEntityRestarted(sys2, 1, probe2, entityProbe2); Shutdown(sys2); }, _config.Third); diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingSpec.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingSpec.cs index 7a5d58c9346..d6d48dbfc7e 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingSpec.cs @@ -83,7 +83,7 @@ protected ClusterShardingSpecConfig(string mode, string entityRecoveryStrategy = }} distributed-data.durable.lmdb {{ dir = ""target/ClusterShardingSpec/sharding-ddata"" - map-size = 10000000 + map-size = 10 MiB }} }} akka.testconductor.barrier-timeout = 70s @@ -491,6 +491,10 @@ private Props CoordinatorProps(string typeName, bool rebalanceEntities, bool rem .WithFallback(Sys.Settings.Config.GetConfig("akka.cluster.sharding")); var settings = ClusterShardingSettings.Create(config, Sys.Settings.Config.GetConfig("akka.cluster.singleton")) .WithRememberEntities(rememberEntities); + var majorityMinCap = Sys.Settings.Config.GetInt("akka.cluster.sharding.distributed-data.majority-min-cap"); + if (IsDDataMode) + return DDataShardCoordinator.Props(typeName, settings, allocationStrategy, ReplicatorRef, + majorityMinCap, rememberEntities); return PersistentShardCoordinator.Props(typeName, settings, allocationStrategy); } @@ -540,14 +544,11 @@ public void ClusterSharding_specs() ClusterSharding_should_be_easy_API_for_starting(); - if (!IsDDataMode) - { - PersistentClusterShards_should_recover_entities_upon_restart(); - PersistentClusterShards_should_permanently_stop_entities_which_passivate(); - PersistentClusterShards_should_restart_entities_which_stop_without_passivation(); - PersistentClusterShards_should_be_migrated_to_new_regions_upon_region_failure(); - PersistentClusterShards_should_ensure_rebalance_restarts_shards(); - } + PersistentClusterShards_should_recover_entities_upon_restart(); + PersistentClusterShards_should_permanently_stop_entities_which_passivate(); + PersistentClusterShards_should_restart_entities_which_stop_without_passivation(); + PersistentClusterShards_should_be_migrated_to_new_regions_upon_region_failure(); + PersistentClusterShards_should_ensure_rebalance_restarts_shards(); } public void ClusterSharding_should_setup_shared_journal() diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ClusterSharding.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ClusterSharding.cs index e6240fe3a29..c961f9d354c 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ClusterSharding.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ClusterSharding.cs @@ -277,7 +277,8 @@ public ClusterSharding(ExtendedActorSystem system) /// TBD public static Config DefaultConfig() { - return ConfigurationFactory.FromResource("Akka.Cluster.Sharding.reference.conf"); + return ConfigurationFactory.FromResource("Akka.Cluster.Sharding.reference.conf") + .WithFallback(DistributedData.DistributedData.DefaultConfig()); } /// diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ClusterShardingGuardian.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ClusterShardingGuardian.cs index 5931d4aecc7..b28aca1089d 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ClusterShardingGuardian.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ClusterShardingGuardian.cs @@ -251,7 +251,9 @@ public ClusterShardingGuardian() private ReplicatorSettings GetReplicatorSettings(ClusterShardingSettings shardingSettings) { - var configuredSettings = ReplicatorSettings.Create(Context.System.Settings.Config.GetConfig("akka.cluster.sharding.distributed-data")); + var config = Context.System.Settings.Config.GetConfig("akka.cluster.sharding.distributed-data") + .WithFallback(Context.System.Settings.Config.GetConfig("akka.cluster.distributed-data")); + var configuredSettings = ReplicatorSettings.Create(config); var settingsWithRoles = configuredSettings.WithRole(shardingSettings.Role); if (shardingSettings.RememberEntities) return settingsWithRoles; diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/DDataShard.cs b/src/contrib/cluster/Akka.Cluster.Sharding/DDataShard.cs index 68940789db1..d161b485bb6 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/DDataShard.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/DDataShard.cs @@ -127,7 +127,37 @@ internal sealed class DDataShard : ActorBase, IShard, IWithUnboundedStash, IWith } } - public void EntityTerminated(IActorRef tref) => this.BaseEntityTerminated(tref); + public void EntityTerminated(IActorRef tref) + { + if (!IdByRef.TryGetValue(tref, out var id)) return; + IdByRef = IdByRef.Remove(tref); + RefById = RefById.Remove(id); + + if (PassivateIdleTask != null) + { + LastMessageTimestamp = LastMessageTimestamp.Remove(id); + } + + if (MessageBuffers.TryGetValue(id, out var buffer) && buffer.Count != 0) + { + //Note; because we're not persisting the EntityStopped, we don't need + // to persist the EntityStarted either. + Log.Debug("Starting entity [{0}] again, there are buffered messages for it", id); + this.SendMessageBuffer(new Shard.EntityStarted(id)); + } + else + { + if (!Passivating.Contains(tref)) + { + Log.Debug("Entity [{0}] stopped without passivating, will restart after backoff", id); + Context.System.Scheduler.ScheduleTellOnce(Settings.TuningParameters.EntityRestartBackoff, Self, new Shard.RestartEntity(id), ActorRefs.NoSender); + } + else + ProcessChange(new Shard.EntityStopped(id), this.PassivateCompleted); + } + + Passivating = Passivating.Remove(tref); + } public void DeliverTo(string id, object message, object payload, IActorRef sender) { diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/DDataShardCoordinator.cs b/src/contrib/cluster/Akka.Cluster.Sharding/DDataShardCoordinator.cs index d7da03d62fd..b7f14097ca6 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/DDataShardCoordinator.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/DDataShardCoordinator.cs @@ -345,7 +345,7 @@ private void SendCoordinatorStateUpdate(PersistentShardCoordinator.IDomainEvent { var s = CurrentState.Updated(e); _replicator.Tell(Dsl.Update(_coordinatorStateKey, - new LWWRegister(Cluster.SelfUniqueAddress, PersistentShardCoordinator.State.Empty), + new LWWRegister(Cluster.SelfUniqueAddress, PersistentShardCoordinator.State.Empty.WithRememberEntities(Settings.RememberEntities)), _writeConsistency, e, reg => reg.WithValue(Cluster.SelfUniqueAddress, s))); diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/reference.conf b/src/contrib/cluster/Akka.Cluster.Sharding/reference.conf index f94ea77704b..9f3e31d0a19 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/reference.conf +++ b/src/contrib/cluster/Akka.Cluster.Sharding/reference.conf @@ -184,110 +184,13 @@ akka.cluster.sharding { distributed-data { # minCap parameter to MajorityWrite and MajorityRead consistency level. majority-min-cap = 5 - #durable.keys = ["shard-*"] + + durable.keys = ["shard-*"] # When using many entities with "remember entities" the Gossip message # can become to large if including to many in same message. Limit to # the same number as the number of ORSet per shard. max-delta-elements = 5 - - # Actor name of the Replicator actor, /system/ddataReplicator - name = ddataReplicator - - # Replicas are running on members tagged with this role. - # All members are used if undefined or empty. - role = "" - - # How often the Replicator should send out gossip information - gossip-interval = 2 s - - # How often the subscribers will be notified of changes, if any - notify-subscribers-interval = 500 ms - - # The id of the dispatcher to use for Replicator actors. If not specified - # default dispatcher is used. - # If specified you need to define the settings of the actual dispatcher. - use-dispatcher = "" - - # How often the Replicator checks for pruning of data associated with - # removed cluster nodes. - pruning-interval = 30 s - - # How long time it takes (worst case) to spread the data to all other replica nodes. - # This is used when initiating and completing the pruning process of data associated - # with removed cluster nodes. The time measurement is stopped when any replica is - # unreachable, so it should be configured to worst case in a healthy cluster. - max-pruning-dissemination = 60 s - - # Serialized Write and Read messages are cached when they are sent to - # several nodes. If no further activity they are removed from the cache - # after this duration. - serializer-cache-time-to-live = 10s - - delta-crdt { - - # Some complex deltas grow in size for each update and above this - # threshold such deltas are discarded and sent as full state instead. - max-delta-size = 200 - } - - durable { - # List of keys that are durable. Prefix matching is supported by using * at the - # end of a key. - keys = [] - - # The markers of that pruning has been performed for a removed node are kept for this - # time and thereafter removed. If and old data entry that was never pruned is - # injected and merged with existing data after this time the value will not be correct. - # This would be possible if replica with durable data didn't participate in the pruning - # (e.g. it was shutdown) and later started after this time. A durable replica should not - # be stopped for longer time than this duration and if it is joining again after this - # duration its data should first be manually removed (from the lmdb directory). - # It should be in the magnitude of days. Note that there is a corresponding setting - # for non-durable data: 'akka.cluster.distributed-data.pruning-marker-time-to-live'. - pruning-marker-time-to-live = 10 d - - # Fully qualified class name of the durable store actor. It must be a subclass - # of akka.actor.Actor and handle the protocol defined in - # akka.cluster.ddata.DurableStore. The class must have a constructor with - # com.typesafe.config.Config parameter. - store-actor-class = "" - - use-dispatcher = akka.cluster.distributed-data.durable.pinned-store - - pinned-store { - executor = thread-pool-executor - type = PinnedDispatcher - } - } - - lmdb { - # Directory of LMDB file. There are two options: - # 1. A relative or absolute path to a directory that ends with 'ddata' - # the full name of the directory will contain name of the ActorSystem - # and its remote port. - # 2. Otherwise the path is used as is, as a relative or absolute path to - # a directory. - # - # When running in production you may want to configure this to a specific - # path (alt 2), since the default directory contains the remote port of the - # actor system to make the name unique. If using a dynamically assigned - # port (0) it will be different each time and the previously stored data - # will not be loaded. - dir = "ddata" - - # Size in bytes of the memory mapped file. - map-size = 104857600 # 100MiB - - # Accumulate changes before storing improves performance with the - # risk of losing the last writes if the JVM crashes. - # The interval is by default set to 'off' to write each update immediately. - # Enabling write behind by specifying a duration, e.g. 200ms, is especially - # efficient when performing many writes to the same key, because it is only - # the last value for each key that will be serialized and stored. - # write-behind-interval = 200 ms - write-behind-interval = off - } } } # //#sharding-ext-config diff --git a/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/TestLease.cs b/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/TestLease.cs index 696682bb1b5..65d2fb84758 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/TestLease.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/TestLease.cs @@ -167,7 +167,7 @@ protected override bool Receive(object message) return true; case ActionRequest ar: - var r = requests.Where(i => i.Item2.Equals(ar.Request)).FirstOrDefault(); + var r = requests.FirstOrDefault(i => i.Item2.Equals(ar.Request)); if (r.Item1 != null) { _log.Info("Actioning request {0} to {1}", r.Item2, ar.Result); diff --git a/src/contrib/cluster/Akka.DistributedData.LightningDB/LmdbDurableStore.cs b/src/contrib/cluster/Akka.DistributedData.LightningDB/LmdbDurableStore.cs index 694812d3426..bded7d3b430 100644 --- a/src/contrib/cluster/Akka.DistributedData.LightningDB/LmdbDurableStore.cs +++ b/src/contrib/cluster/Akka.DistributedData.LightningDB/LmdbDurableStore.cs @@ -43,7 +43,7 @@ public sealed class LmdbDurableStore : ReceiveActor public const string DatabaseName = "ddata"; - private sealed class WriteBehind + private sealed class WriteBehind : IDeadLetterSuppression { public static readonly WriteBehind Instance = new WriteBehind(); private WriteBehind() { } @@ -53,56 +53,15 @@ private sealed class WriteBehind private readonly Akka.Serialization.Serialization _serialization; private readonly SerializerWithStringManifest _serializer; private readonly string _manifest; + private readonly long _mapSize; private readonly TimeSpan _writeBehindInterval; private readonly string _dir; + private bool _dirExists; private readonly Dictionary _pending = new Dictionary(); private readonly ILoggingAdapter _log; - private (LightningEnvironment env, LightningDatabase db, bool initialized) _lmdb; - // Lazy init - private (LightningEnvironment env, LightningDatabase db, bool initialized) Lmdb - { - get - { - if (_lmdb.initialized) - return _lmdb; - - var t0 = Stopwatch.StartNew(); - _log.Info($"Using durable data in LMDB directory [{_dir}]"); - - if (!Directory.Exists(_dir)) - Directory.CreateDirectory(_dir); - - var mapSize = _config.GetByteSize("map-size", 100 * 1024 * 1024); - var env = new LightningEnvironment(_dir) - { - MapSize = mapSize.Value, - MaxDatabases = 1 - }; - env.Open(EnvironmentOpenFlags.NoLock); - - using (var tx = env.BeginTransaction()) - { - var db = tx.OpenDatabase(DatabaseName, new DatabaseConfiguration - { - Flags = DatabaseOpenFlags.Create - }); - tx.Commit(); - - t0.Stop(); - if (_log.IsDebugEnabled) - _log.Debug($"Init of LMDB in directory [{_dir}] took [{t0.ElapsedMilliseconds} ms]"); - - _lmdb = (env, db, true); - return _lmdb; - } - } - } - - public bool IsDbInitialized => _lmdb.initialized; - public LmdbDurableStore(Config config) { _config = config.GetConfig("lmdb"); @@ -123,11 +82,16 @@ public LmdbDurableStore(Config config) TimeSpan.Zero : _config.GetTimeSpan("write-behind-interval"); + _mapSize = _config.GetByteSize("map-size") ?? 100 * 1024 * 1024; + var path = _config.GetString("dir"); _dir = path.EndsWith(DatabaseName) ? Path.GetFullPath($"{path}-{Context.System.Name}-{Self.Path.Parent.Name}-{Cluster.Cluster.Get(Context.System).SelfAddress.Port}") : Path.GetFullPath(path); + _dirExists = Directory.Exists(_dir); + + _log.Info($"Using durable data in LMDB directory [{_dir}]"); Init(); } @@ -142,13 +106,46 @@ protected override void PostStop() { base.PostStop(); DoWriteBehind(); + } + + private LightningEnvironment GetLightningEnvironment() + { + LightningEnvironment env; + + if (!_dirExists) + { + var t0 = Stopwatch.StartNew(); + Directory.CreateDirectory(_dir); + _dirExists = true; + + env = new LightningEnvironment(_dir) + { + MapSize = _mapSize, + MaxDatabases = 1 + }; + env.Open(EnvironmentOpenFlags.NoLock); + + using (var tx = env.BeginTransaction()) + using (tx.OpenDatabase(DatabaseName, new DatabaseConfiguration { Flags = DatabaseOpenFlags.Create })) + { + tx.Commit(); + } - if(IsDbInitialized) + t0.Stop(); + if (_log.IsDebugEnabled) + _log.Debug($"Init of LMDB in directory [{_dir}] took [{t0.ElapsedMilliseconds} ms]"); + } + else { - var (env, db, _) = Lmdb; - try { db?.Dispose(); } catch { } - try { env?.Dispose(); } catch { } + env = new LightningEnvironment(_dir) + { + MapSize = _mapSize, + MaxDatabases = 1 + }; + env.Open(EnvironmentOpenFlags.NoLock); } + + return env; } private void Active() @@ -157,13 +154,24 @@ private void Active() { try { - var l = Lmdb; // init if (_writeBehindInterval == TimeSpan.Zero) { - using (var tx = l.env.BeginTransaction()) + using (var env = GetLightningEnvironment()) + using(var tx = env.BeginTransaction()) + using (var db = tx.OpenDatabase(DatabaseName)) { - DbPut(tx, store.Key, store.Data); - tx.Commit(); + try + { + var byteKey = Encoding.UTF8.GetBytes(store.Key); + var byteValue = _serializer.ToBinary(store.Data); + tx.Put(db, byteKey, byteValue); + tx.Commit(); + } + catch (Exception) + { + tx.Abort(); + throw; + } } } else @@ -179,6 +187,7 @@ private void Active() { _log.Error(cause, "Failed to store [{0}]:{1}", store.Key, cause); store.Reply?.ReplyTo.Tell(store.Reply.FailureMessage); + throw; } }); @@ -197,20 +206,21 @@ private void Init() return; } - var (environment, db, _) = Lmdb; var t0 = Stopwatch.StartNew(); - using (var tx = environment.BeginTransaction(TransactionBeginFlags.ReadOnly)) - using (var cursor = tx.CreateCursor(db)) + using (var env = GetLightningEnvironment()) + using (var tx = env.BeginTransaction(TransactionBeginFlags.ReadOnly)) + using(var db = tx.OpenDatabase(DatabaseName)) + using(var cursor = tx.CreateCursor(db)) { try { var data = cursor.AsEnumerable().Select((x, i) => { - var (key, value) = x; - return new KeyValuePair( - Encoding.UTF8.GetString(key.CopyToNewArray()), - (DurableDataEnvelope)_serializer.FromBinary(value.CopyToNewArray(), _manifest)); - }).ToImmutableDictionary(); + var (key, value) = x; + return new KeyValuePair( + Encoding.UTF8.GetString(key.CopyToNewArray()), + (DurableDataEnvelope)_serializer.FromBinary(value.CopyToNewArray(), _manifest)); + }).ToImmutableDictionary(); if (data.Count > 0) { @@ -235,28 +245,22 @@ private void Init() }); } - private void DbPut(LightningTransaction tx, string key, DurableDataEnvelope data) - { - var byteKey = Encoding.UTF8.GetBytes(key); - var byteValue = _serializer.ToBinary(data); - - var l = Lmdb; - tx.Put(l.db, byteKey, byteValue); - } - private void DoWriteBehind() { if (_pending.Count > 0) { - var (env, _, _) = Lmdb; var t0 = Stopwatch.StartNew(); - using (var tx = env.BeginTransaction()) + using (var env = GetLightningEnvironment()) + using(var tx = env.BeginTransaction()) + using (var db = tx.OpenDatabase(DatabaseName)) { try { foreach (var entry in _pending) { - DbPut(tx, entry.Key, entry.Value); + var byteKey = Encoding.UTF8.GetBytes(entry.Key); + var byteValue = _serializer.ToBinary(entry.Value); + tx.Put(db, byteKey, byteValue); } tx.Commit(); @@ -270,9 +274,11 @@ private void DoWriteBehind() { _log.Error(cause, "failed to store [{0}]", string.Join(", ", _pending.Keys)); tx.Abort(); + throw; } finally { + t0.Stop(); _pending.Clear(); } } diff --git a/src/contrib/cluster/Akka.DistributedData.Tests/ORSetSpec.cs b/src/contrib/cluster/Akka.DistributedData.Tests/ORSetSpec.cs index 97a87a20b00..883ebe461ac 100644 --- a/src/contrib/cluster/Akka.DistributedData.Tests/ORSetSpec.cs +++ b/src/contrib/cluster/Akka.DistributedData.Tests/ORSetSpec.cs @@ -57,16 +57,20 @@ public ORSetSpec(ITestOutputHelper output) [Fact] public void ORSet_must_be_able_to_add_element() { - var c1 = ORSet.Empty; - var c2 = c1.Add(_node1, _user1); - var c3 = c2.Add(_node1, _user2); - var c4 = c3.Add(_node1, _user4); - var c5 = c4.Add(_node1, _user3); - - Assert.Contains(_user1, c5.Elements); - Assert.Contains(_user2, c5.Elements); - Assert.Contains(_user3, c5.Elements); - Assert.Contains(_user4, c5.Elements); + for (var i = 0; i < 100; i++) + { + var c1 = ORSet.Empty; + var c2 = c1.Add(_node1, _user1); + var c3 = c2.Add(_node1, _user2); + var c4 = c3.Add(_node1, _user4); + var c5 = c4.Add(_node1, _user3); + + Assert.Contains(_user1, c5.Elements); + Assert.Contains(_user2, c5.Elements); + Assert.Contains(_user3, c5.Elements); + Assert.Contains(_user4, c5.Elements); + } + } [Fact] diff --git a/src/contrib/cluster/Akka.DistributedData/ORSet.cs b/src/contrib/cluster/Akka.DistributedData/ORSet.cs index 150b29c32ac..7ebbd63bc7f 100644 --- a/src/contrib/cluster/Akka.DistributedData/ORSet.cs +++ b/src/contrib/cluster/Akka.DistributedData/ORSet.cs @@ -518,35 +518,38 @@ public AddDeltaOperation(ORSet underlying) public override ORSet Underlying { get; } public override IReplicatedData Merge(IReplicatedData other) { - if (other is AddDeltaOperation) - { - var u = ((AddDeltaOperation)other).Underlying; - // Note that we only merge deltas originating from the same node - return new AddDeltaOperation(new ORSet( - ConcatElementsMap(u.ElementsMap), - Underlying.VersionVector.Merge(u.VersionVector))); - } - else if (other is AtomicDeltaOperation) - { - return new DeltaGroup(ImmutableArray.Create(this, other)); - } - else if (other is DeltaGroup) + switch (other) { - var vector = ((DeltaGroup)other).Operations; - return new DeltaGroup(vector.Add(this)); + case AddDeltaOperation operation: + { + var u = operation.Underlying; + // Note that we only merge deltas originating from the same node + return new AddDeltaOperation(new ORSet( + ConcatElementsMap(u.ElementsMap), + Underlying.VersionVector.Merge(u.VersionVector))); + } + case AtomicDeltaOperation _: + return new DeltaGroup(ImmutableArray.Create(this, other)); + case DeltaGroup dg: + { + var vector = dg.Operations; + return new DeltaGroup(vector.Add(this)); + } + default: + throw new ArgumentException($"Unknown delta operation of type {other.GetType()}", nameof(other)); } - else throw new ArgumentException($"Unknown delta operation of type {other.GetType()}", nameof(other)); } private ImmutableDictionary ConcatElementsMap( ImmutableDictionary thatMap) { - var u = Underlying.ElementsMap.ToBuilder(); - foreach (var entry in thatMap) - { - u[entry.Key] = entry.Value; - } - return u.ToImmutable(); + //var u = Underlying.ElementsMap.ToBuilder(); + //foreach (var entry in thatMap) + //{ + // u[entry.Key] = entry.Value; + //} + //return u.ToImmutable(); + return Underlying.ElementsMap.SetItems(thatMap); } } @@ -709,9 +712,9 @@ private ORSet MergeRemoveDelta(RemoveDeltaOperation delta) { while (deleteDots.MoveNext()) { - var curr = deleteDots.Current; - deleteDotNodes.Add(curr.Key); - deleteDotsAreGreater &= (thisDot != null && (thisDot.VersionAt(curr.Key) <= curr.Value)); + var current = deleteDots.Current; + deleteDotNodes.Add(current.Key); + deleteDotsAreGreater &= (thisDot != null && (thisDot.VersionAt(current.Key) <= current.Value)); } } diff --git a/src/contrib/cluster/Akka.DistributedData/Serialization/OtherMessageComparer.cs b/src/contrib/cluster/Akka.DistributedData/Serialization/OtherMessageComparer.cs new file mode 100644 index 00000000000..c1e52f2e1d7 --- /dev/null +++ b/src/contrib/cluster/Akka.DistributedData/Serialization/OtherMessageComparer.cs @@ -0,0 +1,39 @@ +using System; +using System.Collections.Generic; +using System.Text; +using Akka.DistributedData.Serialization.Proto.Msg; + +namespace Akka.DistributedData.Serialization +{ + internal class OtherMessageComparer : IComparer + { + public static OtherMessageComparer Instance { get; } = new OtherMessageComparer(); + + private OtherMessageComparer() + {} + + public int Compare(OtherMessage a, OtherMessage b) + { + if (a == null || b == null) + throw new Exception("Both messages must not be null"); + if (ReferenceEquals(a, b)) return 0; + + var aByteString = a.EnclosedMessage.Span; + var bByteString = b.EnclosedMessage.Span; + var aSize = aByteString.Length; + var bSize = bByteString.Length; + if (aSize < bSize) return -1; + if (aSize > bSize) return 1; + + for (var i = 0; i < aSize; i++) + { + var aByte = aByteString[i]; + var bByte = bByteString[i]; + if (aByte < bByte) return -1; + if (aByte > bByte) return 1; + } + + return 0; + } + } +} diff --git a/src/contrib/cluster/Akka.DistributedData/Serialization/ReplicatedDataSerializer.cs b/src/contrib/cluster/Akka.DistributedData/Serialization/ReplicatedDataSerializer.cs index 1afae920d81..42a894d81f2 100644 --- a/src/contrib/cluster/Akka.DistributedData/Serialization/ReplicatedDataSerializer.cs +++ b/src/contrib/cluster/Akka.DistributedData/Serialization/ReplicatedDataSerializer.cs @@ -251,15 +251,6 @@ private static Type GetTypeFromDescriptor(TypeDescriptor t) } #region ORSet - - private static Proto.Msg.ORSet ORSetToProto(ORSet set) - { - var p = new Proto.Msg.ORSet(); - p.Vvector = SerializationSupport.VersionVectorToProto(set.VersionVector); - p.Dots.Add(set.ElementsMap.Values.Select(SerializationSupport.VersionVectorToProto)); - p.TypeInfo = new TypeDescriptor(); - return p; - } private IORSet ORSetFromBinary(byte[] bytes) { return FromProto(Proto.Msg.ORSet.Parser.ParseFrom(bytes)); @@ -267,41 +258,70 @@ private IORSet ORSetFromBinary(byte[] bytes) private Proto.Msg.ORSet ToProto(IORSet orset) { + var b = new Proto.Msg.ORSet + { + TypeInfo = new TypeDescriptor() + }; + switch (orset) { case ORSet ints: { - var p = ORSetToProto(ints); - p.TypeInfo.Type = ValType.Int; - p.IntElements.Add(ints.Elements); - return p; + b.Vvector = SerializationSupport.VersionVectorToProto(ints.VersionVector); + b.TypeInfo.Type = ValType.Int; + var intElements = new List(ints.ElementsMap.Keys); + intElements.Sort(); + foreach (var val in intElements) + { + b.IntElements.Add(val); + b.Dots.Add(SerializationSupport.VersionVectorToProto(ints.ElementsMap[val])); + } + return b; } case ORSet longs: { - var p = ORSetToProto(longs); - p.TypeInfo.Type = ValType.Long; - p.LongElements.Add(longs.Elements); - return p; + b.Vvector = SerializationSupport.VersionVectorToProto(longs.VersionVector); + b.TypeInfo.Type = ValType.Long; + var longElements = new List(longs.ElementsMap.Keys); + longElements.Sort(); + foreach (var val in longElements) + { + b.LongElements.Add(val); + b.Dots.Add(SerializationSupport.VersionVectorToProto(longs.ElementsMap[val])); + } + return b; } case ORSet strings: { - var p = ORSetToProto(strings); - p.TypeInfo.Type = ValType.String; - p.StringElements.Add(strings.Elements); - return p; + b.Vvector = SerializationSupport.VersionVectorToProto(strings.VersionVector); + b.TypeInfo.Type = ValType.String; + var stringElements = new List(strings.ElementsMap.Keys); + stringElements.Sort(); + foreach (var val in stringElements) + { + b.StringElements.Add(val); + b.Dots.Add(SerializationSupport.VersionVectorToProto(strings.ElementsMap[val])); + } + return b; } case ORSet refs: { - var p = ORSetToProto(refs); - p.TypeInfo.Type = ValType.ActorRef; - p.ActorRefElements.Add(refs.Select(Akka.Serialization.Serialization.SerializedActorPath)); - return p; + b.Vvector = SerializationSupport.VersionVectorToProto(refs.VersionVector); + b.TypeInfo.Type = ValType.ActorRef; + var actorRefElements = new List(refs.ElementsMap.Keys); + actorRefElements.Sort(); + foreach (var val in actorRefElements) + { + b.ActorRefElements.Add(Akka.Serialization.Serialization.SerializedActorPath(val)); + b.Dots.Add(SerializationSupport.VersionVectorToProto(refs.ElementsMap[val])); + } + return b; } default: // unknown type { // runtime type - enter horrible dynamic serialization stuff var makeProto = ORSetUnknownMaker.MakeGenericMethod(orset.SetType); - return (Proto.Msg.ORSet)makeProto.Invoke(this, new object[] { orset }); + return (Proto.Msg.ORSet)makeProto.Invoke(this, new object[] { orset, b }); } } } @@ -368,14 +388,29 @@ private static ORSet ToGenericORSet(ImmutableDictionary /// Called when we're serializing none of the standard object types with ORSet /// - private Proto.Msg.ORSet ORSetUnknownToProto(IORSet o) + private Proto.Msg.ORSet ORSetUnknownToProto(IORSet o, Proto.Msg.ORSet b) { var orset = (ORSet)o; - var p = ORSetToProto(orset); - p.TypeInfo.Type = ValType.Other; - p.TypeInfo.TypeName = typeof(T).TypeQualifiedName(); - p.OtherElements.Add(orset.Elements.Select(x => _ser.OtherMessageToProto(x))); - return p; + b.Vvector = SerializationSupport.VersionVectorToProto(orset.VersionVector); + b.TypeInfo.Type = ValType.Other; + b.TypeInfo.TypeName = typeof(T).TypeQualifiedName(); + + var otherElements = new List(); + var otherElementsDict = new Dictionary(); + foreach (var kvp in orset.ElementsMap) + { + var otherElement = _ser.OtherMessageToProto(kvp.Key); + otherElements.Add(otherElement); + otherElementsDict[otherElement] = SerializationSupport.VersionVectorToProto(kvp.Value); + } + otherElements.Sort(OtherMessageComparer.Instance); + + foreach (var val in otherElements) + { + b.OtherElements.Add(val); + b.Dots.Add(otherElementsDict[val]); + } + return b; } private ORSet.IAddDeltaOperation ORAddDeltaOperationFromBinary(byte[] bytes) @@ -494,9 +529,14 @@ private Proto.Msg.GSet GSetToProto(GSet gset) private Proto.Msg.GSet GSetToProtoUnknown(IGSet g) { var gset = (GSet)g; - var p = new Proto.Msg.GSet(); - p.TypeInfo = GetTypeDescriptor(typeof(T)); - p.OtherElements.Add(gset.Select(x => _ser.OtherMessageToProto(x))); + var otherElements = new List(gset.Select(x => _ser.OtherMessageToProto(x))); + otherElements.Sort(OtherMessageComparer.Instance); + + var p = new Proto.Msg.GSet + { + TypeInfo = GetTypeDescriptor(typeof(T)) + }; + p.OtherElements.Add(otherElements); return p; } @@ -510,25 +550,33 @@ private Proto.Msg.GSet ToProto(IGSet gset) case GSet ints: { var p = GSetToProto(ints); - p.IntElements.Add(ints.Elements); + var intElements = new List(ints.Elements); + intElements.Sort(); + p.IntElements.Add(intElements); return p; } case GSet longs: { var p = GSetToProto(longs); - p.LongElements.Add(longs.Elements); + var longElements = new List(longs.Elements); + longElements.Sort(); + p.LongElements.Add(longElements); return p; } case GSet strings: { var p = GSetToProto(strings); - p.StringElements.Add(strings.Elements); + var stringElements = new List(strings.Elements); + stringElements.Sort(); + p.StringElements.Add(stringElements); return p; } case GSet refs: { var p = GSetToProto(refs); - p.ActorRefElements.Add(refs.Select(Akka.Serialization.Serialization.SerializedActorPath)); + var refElements = new List(refs.Elements); + refElements.Sort(); + p.ActorRefElements.Add(refElements.Select(Akka.Serialization.Serialization.SerializedActorPath)); return p; } default: // unknown type diff --git a/src/contrib/cluster/Akka.DistributedData/VersionVector.cs b/src/contrib/cluster/Akka.DistributedData/VersionVector.cs index d4d367a2283..d83daec7f38 100644 --- a/src/contrib/cluster/Akka.DistributedData/VersionVector.cs +++ b/src/contrib/cluster/Akka.DistributedData/VersionVector.cs @@ -52,13 +52,19 @@ public static VersionVector Create(ImmutableDictionary vers /// /// Marker to signal that we have reached the end of a version vector. /// - private static readonly KeyValuePair EndMarker = new KeyValuePair(null, long.MinValue); + private static readonly (UniqueAddress addr, long version) EndMarker = (null, long.MinValue); public abstract bool IsEmpty { get; } public abstract int Count { get; } public abstract IEnumerator> VersionEnumerator { get; } + + internal abstract IEnumerable<(UniqueAddress addr, long version)> InternalVersions { get; } + + internal IEnumerator<(UniqueAddress addr, long version)> InternalVersionEnumerator => + InternalVersions.GetEnumerator(); + public static readonly VersionVector Empty = new MultiVersionVector(ImmutableDictionary.Empty); /// @@ -156,15 +162,15 @@ private Ordering CompareOnlyTo(VersionVector other, Ordering order) { if (ReferenceEquals(this, other)) return Ordering.Same; - return Compare(VersionEnumerator, other.VersionEnumerator, + return Compare(InternalVersionEnumerator, other.InternalVersionEnumerator, order == Ordering.Concurrent ? Ordering.FullOrder : order); } - private T NextOrElse(IEnumerator enumerator, T defaultValue) => + private static T NextOrElse(IEnumerator enumerator, T defaultValue) => enumerator.MoveNext() ? enumerator.Current : defaultValue; - private Ordering Compare(IEnumerator> i1, - IEnumerator> i2, Ordering requestedOrder) + private Ordering Compare(IEnumerator<(UniqueAddress addr, long version)> i1, + IEnumerator<(UniqueAddress addr, long version)> i2, Ordering requestedOrder) { var nt1 = NextOrElse(i1, EndMarker); var nt2 = NextOrElse(i2, EndMarker); @@ -177,15 +183,15 @@ private Ordering CompareOnlyTo(VersionVector other, Ordering order) else if (Equals(nt2, EndMarker)) return currentOrder == Ordering.Before ? Ordering.Concurrent : Ordering.After; else { - var nc = nt1.Key.CompareTo(nt2.Key); + var nc = nt1.addr.CompareTo(nt2.addr); if (nc == 0) { - if (nt1.Value < nt2.Value) + if (nt1.version < nt2.version) { if (currentOrder == Ordering.After) return Ordering.Concurrent; currentOrder = Ordering.Before; } - else if (nt1.Value > nt2.Value) + else if (nt1.version > nt2.version) { if (currentOrder == Ordering.Before) return Ordering.Concurrent; currentOrder = Ordering.After; @@ -258,6 +264,15 @@ public SingleVersionVector(UniqueAddress node, long version) public override bool IsEmpty => false; public override int Count => 1; public override IEnumerator> VersionEnumerator => new Enumerator(Node, Version); + + internal override IEnumerable<(UniqueAddress addr, long version)> InternalVersions + { + get + { + yield return (Node, Version); + } + } + public override VersionVector Increment(UniqueAddress node) { var v = Counter.GetAndIncrement(); @@ -274,23 +289,23 @@ public override VersionVector Increment(UniqueAddress node) public override VersionVector Merge(VersionVector other) { - if (other is MultiVersionVector vector1) + switch (other) { - var v2 = vector1.Versions.GetValueOrDefault(Node, 0L); - var mergedVersions = v2 >= Version ? vector1.Versions : vector1.Versions.SetItem(Node, Version); - return new MultiVersionVector(mergedVersions); - } - else if (other is SingleVersionVector vector) - { - if (Node == vector.Node) + case MultiVersionVector vector1: { - return Version >= vector.Version ? this : new SingleVersionVector(vector.Node, vector.Version); + var v2 = vector1.Versions.GetValueOrDefault(Node, 0L); + var mergedVersions = v2 >= Version ? vector1.Versions : vector1.Versions.SetItem(Node, Version); + return new MultiVersionVector(mergedVersions); } - else return new MultiVersionVector( - new KeyValuePair(Node, Version), - new KeyValuePair(vector.Node, vector.Version)); + case SingleVersionVector vector when Node == vector.Node: + return Version >= vector.Version ? this : new SingleVersionVector(vector.Node, vector.Version); + case SingleVersionVector vector: + return new MultiVersionVector( + new KeyValuePair(Node, Version), + new KeyValuePair(vector.Node, vector.Version)); + default: + throw new NotSupportedException("SingleVersionVector doesn't support merge with provided version vector"); } - else throw new NotSupportedException("SingleVersionVector doesn't support merge with provided version vector"); } public override ImmutableHashSet ModifiedByNodes => ImmutableHashSet.Create(Node); @@ -337,6 +352,10 @@ public MultiVersionVector(ImmutableDictionary nodeVersions) public override bool IsEmpty => Versions.IsEmpty; public override int Count => Versions.Count; public override IEnumerator> VersionEnumerator => Versions.GetEnumerator(); + + internal override IEnumerable<(UniqueAddress addr, long version)> InternalVersions => + Versions.Select(x => (x.Key, x.Value)); + public override VersionVector Increment(UniqueAddress node) => new MultiVersionVector(Versions.SetItem(node, Counter.GetAndIncrement())); @@ -346,25 +365,29 @@ public MultiVersionVector(ImmutableDictionary nodeVersions) public override VersionVector Merge(VersionVector other) { - if (other is MultiVersionVector vector1) + switch (other) { - var merged = vector1.Versions.ToBuilder(); - foreach (var pair in Versions) + case MultiVersionVector vector1: { - var mergedCurrentTime = merged.GetValueOrDefault(pair.Key, 0L); - if (pair.Value >= mergedCurrentTime) - merged.AddOrSet(pair.Key, pair.Value); - } + var merged = vector1.Versions.ToBuilder(); + foreach (var pair in Versions) + { + var mergedCurrentTime = merged.GetValueOrDefault(pair.Key, 0L); + if (pair.Value >= mergedCurrentTime) + merged[pair.Key] = pair.Value; + } - return new MultiVersionVector(merged.ToImmutable()); - } - else if (other is SingleVersionVector vector) - { - var v1 = Versions.GetValueOrDefault(vector.Node, 0L); - var merged = v1 >= vector.Version ? Versions : Versions.SetItem(vector.Node, vector.Version); - return new MultiVersionVector(merged); + return new MultiVersionVector(merged.ToImmutable()); + } + case SingleVersionVector vector: + { + var v1 = Versions.GetValueOrDefault(vector.Node, 0L); + var merged = v1 >= vector.Version ? Versions : Versions.SetItem(vector.Node, vector.Version); + return new MultiVersionVector(merged); + } + default: + throw new NotSupportedException("MultiVersionVector doesn't support merge with provided version vector"); } - else throw new NotSupportedException("MultiVersionVector doesn't support merge with provided version vector"); } public override ImmutableHashSet ModifiedByNodes => Versions.Keys.ToImmutableHashSet(); diff --git a/src/contrib/dependencyinjection/Akka.DependencyInjection.Tests/ActorServiceProviderPropsWithScopesSpecs.cs b/src/contrib/dependencyinjection/Akka.DependencyInjection.Tests/ActorServiceProviderPropsWithScopesSpecs.cs index ef5d2fe0c39..4c2c84c5f54 100644 --- a/src/contrib/dependencyinjection/Akka.DependencyInjection.Tests/ActorServiceProviderPropsWithScopesSpecs.cs +++ b/src/contrib/dependencyinjection/Akka.DependencyInjection.Tests/ActorServiceProviderPropsWithScopesSpecs.cs @@ -21,7 +21,7 @@ namespace Akka.DependencyInjection.Tests public class ActorServiceProviderPropsWithScopesSpecs : AkkaSpec, IClassFixture { - public ActorServiceProviderPropsWithScopesSpecs(AkkaDiFixture fixture, ITestOutputHelper output) : base(ServiceProviderSetup.Create(fixture.Provider) + public ActorServiceProviderPropsWithScopesSpecs(AkkaDiFixture fixture, ITestOutputHelper output) : base(DependencyResolverSetup.Create(fixture.Provider) .And(BootstrapSetup.Create().WithConfig(TestKitBase.DefaultConfig)), output) { @@ -30,7 +30,7 @@ public ActorServiceProviderPropsWithScopesSpecs(AkkaDiFixture fixture, ITestOutp [Fact(DisplayName = "DI: actors who receive an IServiceScope through Props should dispose of their dependencies upon termination")] public void ActorsWithScopedDependenciesShouldDisposeUponStop() { - var spExtension = ServiceProvider.For(Sys); + var spExtension = DependencyResolver.For(Sys); var props = spExtension.Props(); // create a scoped actor using the props from Akka.DependencyInjection @@ -60,11 +60,23 @@ public void ActorsWithScopedDependenciesShouldDisposeUponStop() deps2.Dependencies.All(x => x.Disposed).Should().BeFalse(); } + [Fact(DisplayName = "DI: should be able to start actors with untyped Props")] + public void ShouldStartActorWithUntypedProps() + { + var spExtension = DependencyResolver.For(Sys); + var props = spExtension.Props(typeof(ScopedActor)); + + // create a scoped actor using the props from Akka.DependencyInjection + var scoped1 = Sys.ActorOf(props, "scoped1"); + scoped1.Tell(new FetchDependencies()); + var deps1 = ExpectMsg(); + } + [Fact(DisplayName = "DI: actors who receive an IServiceScope through Props should dispose of their dependencies and recreate upon restart")] public void ActorsWithScopedDependenciesShouldDisposeAndRecreateUponRestart() { - var spExtension = ServiceProvider.For(Sys); + var spExtension = DependencyResolver.For(Sys); var props = spExtension.Props(); // create a scoped actor using the props from Akka.DependencyInjection @@ -95,7 +107,7 @@ public void ActorsWithScopedDependenciesShouldDisposeAndRecreateUponRestart() "DI: actors who receive a mix of dependencies via IServiceScope should dispose ONLY of their scoped dependencies and recreate upon restart")] public void ActorsWithMixedDependenciesShouldDisposeAndRecreateScopedUponRestart() { - var spExtension = ServiceProvider.For(Sys); + var spExtension = DependencyResolver.For(Sys); var props = spExtension.Props(); // create a scoped actor using the props from Akka.DependencyInjection @@ -134,7 +146,7 @@ public void ActorsWithMixedDependenciesShouldDisposeAndRecreateScopedUponRestart public void ActorsWithNonDiDependenciesShouldStart() { // - var spExtension = ServiceProvider.For(Sys); + var spExtension = DependencyResolver.For(Sys); var arg1 = "foo"; var arg2 = "bar"; var props = spExtension.Props(arg1, arg2); @@ -182,7 +194,7 @@ public void ActorsWithNonDiDependenciesShouldStart() public void ServiceProvider_Props_should_support_copying() { // - var spExtension = ServiceProvider.For(Sys); + var spExtension = DependencyResolver.For(Sys); var arg1 = "foo"; var arg2 = "bar"; var props = spExtension.Props(arg1, arg2).WithRouter(new RoundRobinPool(10).WithSupervisorStrategy(new OneForOneStrategy( diff --git a/src/contrib/dependencyinjection/Akka.DependencyInjection.Tests/DelegateInjectionSpecs.cs b/src/contrib/dependencyinjection/Akka.DependencyInjection.Tests/DelegateInjectionSpecs.cs index bc0a64388ad..1e2342644d6 100644 --- a/src/contrib/dependencyinjection/Akka.DependencyInjection.Tests/DelegateInjectionSpecs.cs +++ b/src/contrib/dependencyinjection/Akka.DependencyInjection.Tests/DelegateInjectionSpecs.cs @@ -1,4 +1,10 @@ -using System; +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2021 Lightbend Inc. +// Copyright (C) 2013-2021 .NET Foundation +// +//----------------------------------------------------------------------- +using System; using System.Collections.Generic; using System.Linq; using System.Text; @@ -73,7 +79,7 @@ public async Task DI_should_be_able_to_retrieve_singleton_using_delegate_from_in internal class ParentActor : UntypedActor { public static Props Props(ActorSystem system) => - ServiceProvider.For(system).Props(); + DependencyResolver.For(system).Props(); private readonly IActorRef _echoActor; @@ -114,7 +120,7 @@ public AkkaService(IServiceProvider serviceProvider) public Task StartAsync(CancellationToken cancellationToken) { - var setup = ServiceProviderSetup.Create(_serviceProvider) + var setup = DependencyResolverSetup.Create(_serviceProvider) .And(BootstrapSetup.Create().WithConfig(TestKitBase.DefaultConfig)); ActorSystem = ActorSystem.Create("TestSystem", setup); diff --git a/src/contrib/dependencyinjection/Akka.DependencyInjection.Tests/ServiceProviderSetupSpecs.cs b/src/contrib/dependencyinjection/Akka.DependencyInjection.Tests/ServiceProviderSetupSpecs.cs index e14e8e8ee4b..d44085f69f6 100644 --- a/src/contrib/dependencyinjection/Akka.DependencyInjection.Tests/ServiceProviderSetupSpecs.cs +++ b/src/contrib/dependencyinjection/Akka.DependencyInjection.Tests/ServiceProviderSetupSpecs.cs @@ -18,7 +18,7 @@ namespace Akka.DependencyInjection.Tests { public class ServiceProviderSetupSpecs : AkkaSpec, IClassFixture { - public ServiceProviderSetupSpecs(AkkaDiFixture fixture, ITestOutputHelper output) : base(ServiceProviderSetup.Create(fixture.Provider) + public ServiceProviderSetupSpecs(AkkaDiFixture fixture, ITestOutputHelper output) : base(DependencyResolverSetup.Create(fixture.Provider) .And(BootstrapSetup.Create().WithConfig(TestKitBase.DefaultConfig)), output) { @@ -27,29 +27,29 @@ public ServiceProviderSetupSpecs(AkkaDiFixture fixture, ITestOutputHelper output [Fact(DisplayName = "DI: Should access Microsoft.Extensions.DependencyInjection.IServiceProvider from ServiceProvider ActorSystem extension")] public void ShouldAccessServiceProviderFromActorSystemExtension() { - var sp = ServiceProvider.For(Sys); - var dep = sp.Provider.GetService(); + var sp = DependencyResolver.For(Sys); + var dep = sp.Resolver.GetService(); dep.Should().BeOfType(); - var dep2 = sp.Provider.GetService(); + var dep2 = sp.Resolver.GetService(); dep2.Should().NotBe(dep); // the two transient instances should be different // scoped services should be the same - var scoped1 = sp.Provider.GetService(); - var scoped2 = sp.Provider.GetService(); + var scoped1 = sp.Resolver.GetService(); + var scoped2 = sp.Resolver.GetService(); scoped1.Should().Be(scoped2); // create a new scope - using (var newScope = sp.Provider.CreateScope()) + using (var newScope = sp.Resolver.CreateScope()) { - var scoped3 = newScope.ServiceProvider.GetService(); + var scoped3 = newScope.Resolver.GetService(); scoped1.Should().NotBe(scoped3); } // singleton services should be the same - var singleton1 = sp.Provider.GetService(); - var singleton2 = sp.Provider.GetService(); + var singleton1 = sp.Resolver.GetService(); + var singleton2 = sp.Resolver.GetService(); singleton1.Should().Be(singleton2); } @@ -67,7 +67,7 @@ public void ShouldAccessServiceProviderFromActorSystemExtension() { Action getSp = () => { - var sp = ServiceProvider.For(Sys); + var sp = DependencyResolver.For(Sys); }; getSp.Should().Throw(); diff --git a/src/contrib/dependencyinjection/Akka.DependencyInjection/DependencyResolver.cs b/src/contrib/dependencyinjection/Akka.DependencyInjection/DependencyResolver.cs new file mode 100644 index 00000000000..4917a1b8ed5 --- /dev/null +++ b/src/contrib/dependencyinjection/Akka.DependencyInjection/DependencyResolver.cs @@ -0,0 +1,125 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2021 Lightbend Inc. +// Copyright (C) 2013-2021 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using Akka.Actor; +using Akka.Configuration; +using Akka.Event; +using Microsoft.Extensions.DependencyInjection; + +namespace Akka.DependencyInjection +{ + /// + /// Provides users with immediate access to the bound to + /// this , if any. + /// + public sealed class DependencyResolver : IExtension + { + public DependencyResolver(IDependencyResolver resolver) + { + Resolver = resolver; + } + + /// + /// The globally scoped . + /// + /// + /// Per https://docs.microsoft.com/en-us/dotnet/core/extensions/dependency-injection-guidelines - please use + /// the appropriate for your actors and the dependencies they consume. DI is typically + /// not used for long-lived, stateful objects such as actors. + /// + /// Therefore, injecting transient dependencies via constructors is a bad idea in most cases. You'd be far better off + /// creating a local "request scope" each time your actor processes a message that depends on a transient dependency, + /// such as a database connection, and disposing that scope once the operation is complete. + /// + /// Actors are not MVC Controllers. Actors can live forever, have the ability to restart, and are often stateful. + /// Be mindful of this as you use this feature or bad things will happen. Akka.NET does not magically manage scopes + /// for you. + /// + public IDependencyResolver Resolver { get; } + + public static DependencyResolver For(ActorSystem actorSystem) + { + return actorSystem.WithExtension(); + } + + /// + /// Uses a delegate to dynamically instantiate an actor where some of the constructor arguments are populated via dependency injection + /// and others are not. + /// + /// + /// YOU ARE RESPONSIBLE FOR MANAGING THE LIFECYCLE OF YOUR OWN DEPENDENCIES. AKKA.NET WILL NOT ATTEMPT TO DO IT FOR YOU. + /// + /// The type of actor to instantiate. + /// Optional. Any constructor arguments that will be passed into the actor's constructor directly without being resolved by DI first. + /// A new instance which uses DI internally. + public Props Props(params object[] args) where T : ActorBase + { + return Resolver.Props(args); + } + + /// + /// Used to dynamically instantiate an actor where some of the constructor arguments are populated via dependency injection + /// and others are not. + /// + /// + /// YOU ARE RESPONSIBLE FOR MANAGING THE LIFECYCLE OF YOUR OWN DEPENDENCIES. AKKA.NET WILL NOT ATTEMPT TO DO IT FOR YOU. + /// + /// The type of actor to instantiate. + /// A new instance which uses DI internally. + public Props Props() where T : ActorBase + { + return Resolver.Props(); + } + + /// + /// Used to dynamically instantiate an actor where some of the constructor arguments are populated via dependency injection + /// and others are not. + /// + /// + /// YOU ARE RESPONSIBLE FOR MANAGING THE LIFECYCLE OF YOUR OWN DEPENDENCIES. AKKA.NET WILL NOT ATTEMPT TO DO IT FOR YOU. + /// + /// The type of actor to instantiate. + /// A new instance which uses DI internally. + public Props Props(Type type) + { + return Resolver.Props(type); + } + + /// + /// Used to dynamically instantiate an actor where some of the constructor arguments are populated via dependency injection + /// and others are not. + /// + /// + /// YOU ARE RESPONSIBLE FOR MANAGING THE LIFECYCLE OF YOUR OWN DEPENDENCIES. AKKA.NET WILL NOT ATTEMPT TO DO IT FOR YOU. + /// + /// The type of actor to instantiate. + /// Optional. Any constructor arguments that will be passed into the actor's constructor directly without being resolved by DI first. + /// A new instance which uses DI internally. + public Props Props(Type type, params object[] args) + { + return Resolver.Props(type, args); + } + } + + /// + /// INTERNAL API + /// + public sealed class DependencyResolverExtension : ExtensionIdProvider + { + public override DependencyResolver CreateExtension(ExtendedActorSystem system) + { + var setup = system.Settings.Setup.Get(); + if (setup.HasValue) return new DependencyResolver(setup.Value.DependencyResolver); + + var exception = new ConfigurationException("Unable to find [DependencyResolverSetup] included in ActorSystem settings." + + " Please specify one before attempting to use dependency injection inside Akka.NET."); + system.EventStream.Publish(new Error(exception, "Akka.DependencyInjection", typeof(DependencyResolverExtension), exception.Message)); + throw exception; + } + } +} diff --git a/src/contrib/dependencyinjection/Akka.DependencyInjection/DependencyResolverSetup.cs b/src/contrib/dependencyinjection/Akka.DependencyInjection/DependencyResolverSetup.cs new file mode 100644 index 00000000000..3df580095a3 --- /dev/null +++ b/src/contrib/dependencyinjection/Akka.DependencyInjection/DependencyResolverSetup.cs @@ -0,0 +1,84 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2021 Lightbend Inc. +// Copyright (C) 2013-2021 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using Akka.Actor; +using Akka.Actor.Setup; + +namespace Akka.DependencyInjection +{ + /// + /// Used to help bootstrap an with dependency injection (DI) + /// support via a reference. + /// + /// The will be used to access previously registered services + /// in the creation of actors and other pieces of infrastructure inside Akka.NET. + /// + /// The constructor is internal. Please use to create a new instance. + /// + [Obsolete("Used DependencyResolverSetup instead.")] + public class ServiceProviderSetup : Setup + { + internal ServiceProviderSetup(IServiceProvider serviceProvider) + { + ServiceProvider = serviceProvider; + } + + public IServiceProvider ServiceProvider { get; } + + public static ServiceProviderSetup Create(IServiceProvider provider) + { + if (provider == null) + throw new ArgumentNullException(nameof(provider)); + + return new ServiceProviderSetup(provider); + } + } + + /// + /// Used to help bootstrap an with dependency injection (DI) + /// support via a reference. + /// + /// The will be used to access previously registered services + /// in the creation of actors and other pieces of infrastructure inside Akka.NET. + /// + /// The constructor is internal. Please use to create a new instance. + /// + public class DependencyResolverSetup : Setup + { + public IDependencyResolver DependencyResolver { get; } + + internal DependencyResolverSetup(IDependencyResolver dependencyResolver) + { + DependencyResolver = dependencyResolver; + } + + /// + /// Creates a new instance of DependencyResolverSetup, passing in + /// here creates an that resolves dependencies from the specified + /// + public static DependencyResolverSetup Create(IServiceProvider provider) + { + if (provider == null) + throw new ArgumentNullException(nameof(provider)); + + return new DependencyResolverSetup(new ServiceProviderDependencyResolver(provider)); + } + + /// + /// Creates a new instance of DependencyResolverSetup, an implementation of + /// can be passed in here to resolve services from test or alternative DI frameworks. + /// + public static DependencyResolverSetup Create(IDependencyResolver provider) + { + if (provider == null) + throw new ArgumentNullException(nameof(provider)); + + return new DependencyResolverSetup(provider); + } + } +} diff --git a/src/contrib/dependencyinjection/Akka.DependencyInjection/IDependencyResolver.cs b/src/contrib/dependencyinjection/Akka.DependencyInjection/IDependencyResolver.cs new file mode 100644 index 00000000000..ea0a63c51d9 --- /dev/null +++ b/src/contrib/dependencyinjection/Akka.DependencyInjection/IDependencyResolver.cs @@ -0,0 +1,61 @@ +// //----------------------------------------------------------------------- +// // +// // Copyright (C) 2009-2021 Lightbend Inc. +// // Copyright (C) 2013-2021 .NET Foundation +// // +// //----------------------------------------------------------------------- + +using System; +using Akka.Actor; + +namespace Akka.DependencyInjection +{ + /// + /// Interface abstraction for working with DI providers + /// in Akka.NET without being bound to any specific implementation. + /// + /// + /// See for a reference implementation. + /// + public interface IDependencyResolver + { + IResolverScope CreateScope(); + T GetService(); + object GetService(Type type); + + /// + /// Used to dynamically instantiate an actor where some of the constructor arguments are populated via dependency injection + /// and others are not. + /// + /// + /// YOU ARE RESPONSIBLE FOR MANAGING THE LIFECYCLE OF YOUR OWN DEPENDENCIES. AKKA.NET WILL NOT ATTEMPT TO DO IT FOR YOU. + /// + /// The type of actor to instantiate. + /// Optional. Any constructor arguments that will be passed into the actor's constructor directly without being resolved by DI first. + /// A new instance which uses DI internally. + Props Props(Type type, params object[] args); + + /// + /// Used to dynamically instantiate an actor where some of the constructor arguments are populated via dependency injection + /// and others are not. + /// + /// + /// YOU ARE RESPONSIBLE FOR MANAGING THE LIFECYCLE OF YOUR OWN DEPENDENCIES. AKKA.NET WILL NOT ATTEMPT TO DO IT FOR YOU. + /// + /// The type of actor to instantiate. + /// A new instance which uses DI internally. + Props Props(Type type); + + /// + /// Used to dynamically instantiate an actor where some of the constructor arguments are populated via dependency injection + /// and others are not. + /// + /// + /// YOU ARE RESPONSIBLE FOR MANAGING THE LIFECYCLE OF YOUR OWN DEPENDENCIES. AKKA.NET WILL NOT ATTEMPT TO DO IT FOR YOU. + /// + /// The type of actor to instantiate. + /// Optional. Any constructor arguments that will be passed into the actor's constructor directly without being resolved by DI first. + /// A new instance which uses DI internally. + Props Props(params object[] args) where T : ActorBase; + } +} \ No newline at end of file diff --git a/src/contrib/dependencyinjection/Akka.DependencyInjection/ServiceProvider.cs b/src/contrib/dependencyinjection/Akka.DependencyInjection/ServiceProvider.cs index 883730d0b27..e990debf9a9 100644 --- a/src/contrib/dependencyinjection/Akka.DependencyInjection/ServiceProvider.cs +++ b/src/contrib/dependencyinjection/Akka.DependencyInjection/ServiceProvider.cs @@ -17,6 +17,10 @@ namespace Akka.DependencyInjection /// Provides users with immediate access to the bound to /// this , if any. /// + /// + /// [OBSOLETE] Switch to the instead. + /// + [Obsolete("Replaced by the Akka.DependencyInjection.DependencyResolver in Akka.NET v1.4.21. Please switch to that.")] public sealed class ServiceProvider : IExtension { public ServiceProvider(IServiceProvider provider) @@ -66,6 +70,7 @@ public static ServiceProvider For(ActorSystem actorSystem) /// /// INTERNAL API /// + [Obsolete("Use the DependencyResolverExtensions instead.")] public sealed class ServiceProviderExtension : ExtensionIdProvider { public override ServiceProvider CreateExtension(ExtendedActorSystem system) @@ -88,17 +93,16 @@ public override ServiceProvider CreateExtension(ExtendedActorSystem system) /// /// Used to create actors via the . /// - /// the actor type - internal sealed class ServiceProviderActorProducer : IIndirectActorProducer where TActor:ActorBase + internal class ServiceProviderActorProducer : IIndirectActorProducer { private readonly IServiceProvider _provider; private readonly object[] _args; - public ServiceProviderActorProducer(IServiceProvider provider, object[] args) + public ServiceProviderActorProducer(IServiceProvider provider, Type actorType, object[] args) { _provider = provider; _args = args; - ActorType = typeof(TActor); + ActorType = actorType; } public ActorBase Produce() @@ -113,4 +117,19 @@ public void Release(ActorBase actor) // no-op } } + + /// + /// INTERNAL API + /// + /// Used to create actors via the . + /// + /// the actor type + internal class ServiceProviderActorProducer : ServiceProviderActorProducer where TActor:ActorBase + { + + public ServiceProviderActorProducer(IServiceProvider provider, object[] args) + : base(provider, typeof(TActor), args) + { + } + } } diff --git a/src/contrib/dependencyinjection/Akka.DependencyInjection/ServiceProviderDependencyResolver.cs b/src/contrib/dependencyinjection/Akka.DependencyInjection/ServiceProviderDependencyResolver.cs new file mode 100644 index 00000000000..68f10aff02b --- /dev/null +++ b/src/contrib/dependencyinjection/Akka.DependencyInjection/ServiceProviderDependencyResolver.cs @@ -0,0 +1,62 @@ +// //----------------------------------------------------------------------- +// // +// // Copyright (C) 2009-2021 Lightbend Inc. +// // Copyright (C) 2013-2021 .NET Foundation +// // +// //----------------------------------------------------------------------- + +using System; +using Akka.Actor; +using Microsoft.Extensions.DependencyInjection; + +namespace Akka.DependencyInjection +{ + /// + /// INTERNAL API. + /// + /// implementation backed by + /// + public class ServiceProviderDependencyResolver : IDependencyResolver + { + public IServiceProvider ServiceProvider { get; } + + public ServiceProviderDependencyResolver(IServiceProvider serviceProvider) + { + ServiceProvider = serviceProvider; + } + + public IResolverScope CreateScope() + { + return new ServiceProviderScope(ServiceProvider.CreateScope()); + } + + public T GetService() + { + return ServiceProvider.GetService(); + } + + public object GetService(Type type) + { + return ServiceProvider.GetService(type); + } + + public Props Props(Type type, params object[] args) + { + if(typeof(ActorBase).IsAssignableFrom(type)) + return Akka.Actor.Props.CreateBy(new ServiceProviderActorProducer(ServiceProvider, type, args)); + throw new ArgumentException(nameof(type), $"[{type}] does not implement Akka.Actor.ActorBase."); + } + + public Props Props(Type type) + { + return Props(type, Array.Empty()); + } + + public Props Props(params object[] args) where T : ActorBase + { + return Akka.Actor.Props.CreateBy(new ServiceProviderActorProducer(ServiceProvider, args)); + } + } + + +} diff --git a/src/contrib/dependencyinjection/Akka.DependencyInjection/ServiceProviderScope.cs b/src/contrib/dependencyinjection/Akka.DependencyInjection/ServiceProviderScope.cs new file mode 100644 index 00000000000..ce3f1911dcc --- /dev/null +++ b/src/contrib/dependencyinjection/Akka.DependencyInjection/ServiceProviderScope.cs @@ -0,0 +1,33 @@ +// //----------------------------------------------------------------------- +// // +// // Copyright (C) 2009-2021 Lightbend Inc. +// // Copyright (C) 2013-2021 .NET Foundation +// // +// //----------------------------------------------------------------------- + +using System; +using Microsoft.Extensions.DependencyInjection; + +namespace Akka.DependencyInjection +{ + public interface IResolverScope : IDisposable + { + IDependencyResolver Resolver { get; } + } + + public class ServiceProviderScope : IResolverScope + { + private readonly IServiceScope _scope; + public IDependencyResolver Resolver { get; } + public ServiceProviderScope(IServiceScope scope) + { + _scope = scope; + Resolver = new ServiceProviderDependencyResolver(scope.ServiceProvider); + } + + public void Dispose() + { + _scope?.Dispose(); + } + } +} \ No newline at end of file diff --git a/src/contrib/dependencyinjection/Akka.DependencyInjection/ServiceProviderSetup.cs b/src/contrib/dependencyinjection/Akka.DependencyInjection/ServiceProviderSetup.cs deleted file mode 100644 index 835d1ad98f3..00000000000 --- a/src/contrib/dependencyinjection/Akka.DependencyInjection/ServiceProviderSetup.cs +++ /dev/null @@ -1,40 +0,0 @@ -//----------------------------------------------------------------------- -// -// Copyright (C) 2009-2021 Lightbend Inc. -// Copyright (C) 2013-2021 .NET Foundation -// -//----------------------------------------------------------------------- - -using System; -using Akka.Actor; -using Akka.Actor.Setup; - -namespace Akka.DependencyInjection -{ - /// - /// Used to help bootstrap an with dependency injection (DI) - /// support via a reference. - /// - /// The will be used to access previously registered services - /// in the creation of actors and other pieces of infrastructure inside Akka.NET. - /// - /// The constructor is internal. Please use to create a new instance. - /// - public class ServiceProviderSetup : Setup - { - internal ServiceProviderSetup(IServiceProvider serviceProvider) - { - ServiceProvider = serviceProvider; - } - - public IServiceProvider ServiceProvider { get; } - - public static ServiceProviderSetup Create(IServiceProvider provider) - { - if (provider == null) - throw new ArgumentNullException(nameof(provider)); - - return new ServiceProviderSetup(provider); - } - } -} diff --git a/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/BatchingSqlJournal.cs b/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/BatchingSqlJournal.cs index 36aeb315709..97d1dfcdf7e 100644 --- a/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/BatchingSqlJournal.cs +++ b/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/BatchingSqlJournal.cs @@ -13,6 +13,7 @@ using System.Diagnostics; using System.Linq; using System.Runtime.CompilerServices; +using System.Runtime.ExceptionServices; using System.Runtime.Serialization; using System.Text; using System.Threading.Tasks; @@ -800,8 +801,15 @@ private void FailChunkExecution(ChunkExecutionFailure message) replayAll.ReplyTo.Tell(new EventReplayFailure(cause)); break; + case SelectCurrentPersistenceIds select: + // SqlJournal handled this failure case by using the default PipeTo failure + // handler which sends a Status.Failure message back to the sender. + select.ReplyTo.Tell(new Status.Failure(cause)); + break; + default: - throw new Exception($"Unknown persistence journal request type [{request.GetType()}]"); + Log.Error(cause, $"Batching failure not reported to original sender. Unknown batched persistence journal request type [{request.GetType()}]."); + break; } } @@ -1096,10 +1104,12 @@ protected virtual async Task HandleSelectCurrentPersistenceIds(SelectCurrentPers command.Parameters.Clear(); AddParameter(command, "@Ordering", DbType.Int64, message.Offset); - var reader = await command.ExecuteReaderAsync(); - while (await reader.ReadAsync()) + using (var reader = await command.ExecuteReaderAsync()) { - result.Add(reader.GetString(0)); + while (await reader.ReadAsync()) + { + result.Add(reader.GetString(0)); + } } message.ReplyTo.Tell(new CurrentPersistenceIds(result, highestOrderingNumber)); diff --git a/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/QueryExecutor.cs b/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/QueryExecutor.cs index 4020aaf9405..2285f2e3267 100644 --- a/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/QueryExecutor.cs +++ b/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/QueryExecutor.cs @@ -684,7 +684,7 @@ public virtual async Task InsertBatchAsync(DbConnection connection, Cancellation var evt = entry.Key; var tags = entry.Value; - WriteEvent(command, evt.WithTimestamp(DateTime.UtcNow.Ticks), tags); + WriteEvent(command, evt.WithTimestamp(TimestampProvider.GenerateTimestamp(evt)), tags); await command.ExecuteScalarAsync(cancellationToken); command.Parameters.Clear(); } diff --git a/src/contrib/persistence/Akka.Persistence.Sqlite.Tests/Batching/BatchingSqliteCurrentEventsByTagSpec.cs b/src/contrib/persistence/Akka.Persistence.Sqlite.Tests/Batching/BatchingSqliteCurrentEventsByTagSpec.cs index 42a2ddd9b0a..c6026f341e1 100644 --- a/src/contrib/persistence/Akka.Persistence.Sqlite.Tests/Batching/BatchingSqliteCurrentEventsByTagSpec.cs +++ b/src/contrib/persistence/Akka.Persistence.Sqlite.Tests/Batching/BatchingSqliteCurrentEventsByTagSpec.cs @@ -43,10 +43,5 @@ public BatchingSqliteCurrentEventsByTagSpec(ITestOutputHelper output) : base(Con { ReadJournal = Sys.ReadJournalFor(SqlReadJournal.Identifier); } - - [Fact(Skip = "Test work good, but raises an exception")] - public override void ReadJournal_query_CurrentEventsByTag_should_complete_when_no_events() - { - } } } diff --git a/src/contrib/persistence/Akka.Persistence.Sqlite.Tests/Batching/BatchingSqliteCurrentPersistenceIdsSpec.cs b/src/contrib/persistence/Akka.Persistence.Sqlite.Tests/Batching/BatchingSqliteCurrentPersistenceIdsSpec.cs index e57b74a6253..74bdf202a4a 100644 --- a/src/contrib/persistence/Akka.Persistence.Sqlite.Tests/Batching/BatchingSqliteCurrentPersistenceIdsSpec.cs +++ b/src/contrib/persistence/Akka.Persistence.Sqlite.Tests/Batching/BatchingSqliteCurrentPersistenceIdsSpec.cs @@ -38,10 +38,5 @@ public BatchingSqliteCurrentPersistenceIdsSpec(ITestOutputHelper output) : base( { ReadJournal = Sys.ReadJournalFor(SqlReadJournal.Identifier); } - - [Fact(Skip = "Not implemented, due to bugs on NetCore")] - public override void ReadJournal_query_CurrentPersistenceIds_should_not_see_new_events_after_complete() - { - } } } diff --git a/src/contrib/persistence/Akka.Persistence.Sqlite.Tests/Query/SqliteCurrentEventsByTagSpec.cs b/src/contrib/persistence/Akka.Persistence.Sqlite.Tests/Query/SqliteCurrentEventsByTagSpec.cs index eeccb160702..50eaac2ae30 100644 --- a/src/contrib/persistence/Akka.Persistence.Sqlite.Tests/Query/SqliteCurrentEventsByTagSpec.cs +++ b/src/contrib/persistence/Akka.Persistence.Sqlite.Tests/Query/SqliteCurrentEventsByTagSpec.cs @@ -44,10 +44,5 @@ public SqliteCurrentEventsByTagSpec(ITestOutputHelper output) : base(Config(Coun { ReadJournal = Sys.ReadJournalFor(SqlReadJournal.Identifier); } - - [Fact(Skip = "Test work good, but raises an exception")] - public override void ReadJournal_query_CurrentEventsByTag_should_complete_when_no_events() - { - } } } diff --git a/src/contrib/persistence/Akka.Persistence.Sqlite/Akka.Persistence.Sqlite.csproj b/src/contrib/persistence/Akka.Persistence.Sqlite/Akka.Persistence.Sqlite.csproj index 40be4c4326d..105913319d6 100644 --- a/src/contrib/persistence/Akka.Persistence.Sqlite/Akka.Persistence.Sqlite.csproj +++ b/src/contrib/persistence/Akka.Persistence.Sqlite/Akka.Persistence.Sqlite.csproj @@ -16,7 +16,7 @@ - + diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt index 8aee29b1c71..1a37c9471d7 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt @@ -110,11 +110,12 @@ namespace Akka.Actor public void Resume(System.Exception causedByFailure) { } public virtual void SendMessage(Akka.Actor.Envelope message) { } public virtual void SendMessage(Akka.Actor.IActorRef sender, object message) { } - public void SendSystemMessage(Akka.Dispatch.SysMsg.ISystemMessage systemMessage) { } + public virtual void SendSystemMessage(Akka.Dispatch.SysMsg.ISystemMessage systemMessage) { } protected void SetActorFields(Akka.Actor.ActorBase actor) { } protected bool SetChildrenTerminationReason(Akka.Actor.Internal.SuspendReason reason) { } public void SetReceiveTimeout(System.Nullable timeout = null) { } protected void SetTerminated() { } + [System.ObsoleteAttribute("Not used. Will be removed in Akka.NET v1.5.")] public static Akka.Actor.NameAndUid SplitNameAndUid(string name) { } public virtual void Start() { } protected void Stash(Akka.Dispatch.SysMsg.SystemMessage msg) { } @@ -1325,6 +1326,7 @@ namespace Akka.Actor public override void Suspend() { } protected override void TellInternal(object message, Akka.Actor.IActorRef sender) { } } + [System.ObsoleteAttribute("Not used. Will be removed in Akka.NET v1.5.")] public class NameAndUid { public NameAndUid(string name, int uid) { } @@ -4808,6 +4810,7 @@ namespace Akka.Util { public const string Base64Chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789+~"; public static string Base64Encode(this long value) { } + [System.ObsoleteAttribute("Do not use. Pass in prefix as a string instead.")] public static System.Text.StringBuilder Base64Encode(this long value, System.Text.StringBuilder sb) { } public static string Base64Encode(this string s) { } } diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveRemote.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveRemote.approved.txt index 766d4f50243..c03de639b12 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveRemote.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveRemote.approved.txt @@ -1,4 +1,5 @@ [assembly: System.Reflection.AssemblyMetadataAttribute("RepositoryUrl", "https://github.com/akkadotnet/akka.net")] +[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Benchmarks")] [assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster")] [assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.Metrics")] [assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.Sharding")] diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt index af1c1fda081..9238071f9bb 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt @@ -1913,7 +1913,7 @@ namespace Akka.Streams.Dsl public static Akka.Streams.Dsl.Source UnfoldAsync(TState state, System.Func>>> unfoldAsync) { } public static Akka.Streams.Dsl.Source UnfoldInfinite(TState state, System.Func> unfold) { } public static Akka.Streams.Dsl.Source UnfoldResource(System.Func create, System.Func> read, System.Action close) { } - public static Akka.Streams.Dsl.Source UnfoldResourceAsync(System.Func> create, System.Func>> read, System.Func close) { } + public static Akka.Streams.Dsl.Source UnfoldResourceAsync(System.Func> create, System.Func>> read, System.Func> close) { } public static Akka.Streams.Dsl.Source, Akka.NotUsed> ZipN(System.Collections.Generic.IEnumerable> sources) { } public static Akka.Streams.Dsl.Source ZipWithN(System.Func, TOut2> zipper, System.Collections.Generic.IEnumerable> sources) { } } @@ -2564,6 +2564,11 @@ namespace Akka.Streams.Extra } namespace Akka.Streams.IO { + public sealed class AbruptIOTerminationException : System.Exception + { + public AbruptIOTerminationException(Akka.Streams.IO.IOResult ioResult, System.Exception cause) { } + public Akka.Streams.IO.IOResult IoResult { get; } + } public struct IOResult { public readonly long Count; diff --git a/src/core/Akka.Cluster.TestKit/MultiNodeClusterSpec.cs b/src/core/Akka.Cluster.TestKit/MultiNodeClusterSpec.cs index ec7fd773968..5a7359a6dc2 100644 --- a/src/core/Akka.Cluster.TestKit/MultiNodeClusterSpec.cs +++ b/src/core/Akka.Cluster.TestKit/MultiNodeClusterSpec.cs @@ -12,6 +12,7 @@ using System.Linq; using System.Text.RegularExpressions; using Akka.Actor; +using Akka.Actor.Setup; using Akka.Cluster.Tests.MultiNode; using Akka.Configuration; using Akka.Dispatch.SysMsg; @@ -155,6 +156,29 @@ protected MultiNodeClusterSpec(MultiNodeConfig config, Type type) _roleNameComparer = new RoleNameComparer(this); } + protected MultiNodeClusterSpec( + RoleName myself, + ActorSystem system, + ImmutableList roles, + Func> deployments) + : base(myself, system, roles, deployments) + { + _assertions = new XunitAssertions(); + _roleNameComparer = new RoleNameComparer(this); + } + + protected MultiNodeClusterSpec( + RoleName myself, + ActorSystemSetup setup, + ImmutableList roles, + Func> deployments) + : base(myself, setup, roles, deployments) + { + _assertions = new XunitAssertions(); + _roleNameComparer = new RoleNameComparer(this); + } + + protected override int InitialParticipantsValueFactory { get { return Roles.Count; } diff --git a/src/core/Akka.Cluster/SBR/SplitBrainResolver.cs b/src/core/Akka.Cluster/SBR/SplitBrainResolver.cs index 4bf85662238..611b4c8e036 100644 --- a/src/core/Akka.Cluster/SBR/SplitBrainResolver.cs +++ b/src/core/Akka.Cluster/SBR/SplitBrainResolver.cs @@ -374,6 +374,7 @@ private void OnAcquireLease() { Log.Debug("SBR trying to acquire lease"); //implicit val ec: ExecutionContext = internalDispatcher + Strategy.Lease?.Acquire().ContinueWith(r => { if (r.IsFaulted) @@ -385,7 +386,7 @@ private void OnAcquireLease() public Receive WaitingForLease(IDecision decision) { - bool Receive(object message) + bool ReceiveLease(object message) { switch (message) { @@ -426,7 +427,7 @@ bool Receive(object message) } Stash.UnstashAll(); - Context.Become(Receive); + Context.Become(ReceiveLease); return true; case ReleaseLeaseResult lr: @@ -441,7 +442,7 @@ bool Receive(object message) } } - return Receive; + return ReceiveLease; } private void OnReleaseLeaseResult(bool released) diff --git a/src/core/Akka.Coordination/LeaseProvider.cs b/src/core/Akka.Coordination/LeaseProvider.cs index 526c0945917..5f3df3b0fb1 100644 --- a/src/core/Akka.Coordination/LeaseProvider.cs +++ b/src/core/Akka.Coordination/LeaseProvider.cs @@ -83,7 +83,7 @@ public static LeaseProvider Get(ActorSystem system) } private readonly ExtendedActorSystem _system; - private readonly ConcurrentDictionary leases = new ConcurrentDictionary(); + private readonly ConcurrentDictionary _leases = new ConcurrentDictionary(); private ILoggingAdapter _log; @@ -122,7 +122,7 @@ public Lease GetLease(string leaseName, string configPath, string ownerName) { var leaseKey = new LeaseKey(leaseName, configPath, ownerName); - return leases.GetOrAdd(leaseKey, lk => + return _leases.GetOrAdd(leaseKey, lk => { var leaseConfig = _system.Settings.Config .GetConfig(configPath) @@ -151,7 +151,7 @@ public Lease GetLease(string leaseName, string configPath, string ownerName) Log.Error( ex, "Invalid lease configuration for leaseName [{0}], configPath [{1}] lease-class [{2}]. " + - "The class must implement scaladsl.Lease or javadsl.Lease and have constructor with LeaseSettings parameter and " + + "The class must implement Akka.Coordination.Lease and have constructor with LeaseSettings parameter and " + "optionally ActorSystem parameter.", settings.LeaseName, configPath, diff --git a/src/core/Akka.MultiNodeTestRunner.Shared/Sinks/MessageSink.cs b/src/core/Akka.MultiNodeTestRunner.Shared/Sinks/MessageSink.cs index 79288d13e87..a60084df8a3 100644 --- a/src/core/Akka.MultiNodeTestRunner.Shared/Sinks/MessageSink.cs +++ b/src/core/Akka.MultiNodeTestRunner.Shared/Sinks/MessageSink.cs @@ -89,7 +89,7 @@ public enum MultiNodeTestRunnerMessageType }; private const string NodePassStatusRegexString = - @"\[(\w){4}(?[0-9]{1,2})(?:\w+)?\]\[(?