From 3ebbfcfc4b0a5b9f421fc05c4340c5bf9074b595 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 12 May 2021 13:02:39 -0500 Subject: [PATCH 01/50] Update RELEASE_NOTES.md (#5009) --- RELEASE_NOTES.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 71ee73ab0cc..063410b369f 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,3 +1,6 @@ +#### 1.4.21 May 12 2021 #### +**Placeholder for nightlies** + #### 1.4.20 May 12 2021 #### **Maintenance Release for Akka.NET 1.4** From 0ef7f07e12426ea68876747f235498961376b184 Mon Sep 17 00:00:00 2001 From: Brah McDude <77924970+brah-mcdude@users.noreply.github.com> Date: Fri, 14 May 2021 16:43:18 +0300 Subject: [PATCH 02/50] Merge pull request #5011 from brah-mcdude/dev Update TestScheduler.cs --- src/core/Akka.TestKit/TestScheduler.cs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/core/Akka.TestKit/TestScheduler.cs b/src/core/Akka.TestKit/TestScheduler.cs index 8a224a8a80f..4315cc815a1 100644 --- a/src/core/Akka.TestKit/TestScheduler.cs +++ b/src/core/Akka.TestKit/TestScheduler.cs @@ -88,10 +88,14 @@ public void AdvanceTo(DateTimeOffset when) { var scheduledTime = _now.Add(initialDelay ?? delay).UtcTicks; - if (!_scheduledWork.TryGetValue(scheduledTime, out var tickItems)) + ConcurrentQueue tickItems; + while (!_scheduledWork.TryGetValue(scheduledTime, out tickItems)) { tickItems = new ConcurrentQueue(); - _scheduledWork.TryAdd(scheduledTime, tickItems); + if (_scheduledWork.TryAdd(scheduledTime, tickItems)) + { + break; + } } var type = message == null ? ScheduledItem.ScheduledItemType.Action : ScheduledItem.ScheduledItemType.Message; From bc5a2e4be88b7724ede417c769f79741ccb7cb62 Mon Sep 17 00:00:00 2001 From: Martin Date: Fri, 14 May 2021 16:00:43 +0200 Subject: [PATCH 03/50] Fix SonarQube's "IEnumerable LINQs should be simplified" / Code Smell (#5013) --- .../cluster/Akka.Cluster.Tools.Tests.MultiNode/TestLease.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/TestLease.cs b/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/TestLease.cs index 696682bb1b5..65d2fb84758 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/TestLease.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/TestLease.cs @@ -167,7 +167,7 @@ protected override bool Receive(object message) return true; case ActionRequest ar: - var r = requests.Where(i => i.Item2.Equals(ar.Request)).FirstOrDefault(); + var r = requests.FirstOrDefault(i => i.Item2.Equals(ar.Request)); if (r.Item1 != null) { _log.Info("Actioning request {0} to {1}", r.Item2, ar.Result); From 594eb7c859ebf6d18ef8c5d7b8c80c1cbbdae9a0 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Sat, 15 May 2021 03:56:56 +0700 Subject: [PATCH 04/50] [RACY] LoggerSpec TestOutputLogger Make sure that event receive order does not matter (#5015) * Make sure that event receive order does not matter * Remove repeat attribute --- src/core/Akka.Tests/Loggers/LoggerSpec.cs | 33 +++++++++++++++++------ 1 file changed, 25 insertions(+), 8 deletions(-) diff --git a/src/core/Akka.Tests/Loggers/LoggerSpec.cs b/src/core/Akka.Tests/Loggers/LoggerSpec.cs index 17ba5c31a1b..72681e225a6 100644 --- a/src/core/Akka.Tests/Loggers/LoggerSpec.cs +++ b/src/core/Akka.Tests/Loggers/LoggerSpec.cs @@ -1,10 +1,12 @@ using System; using System.Collections.Generic; +using System.Linq; using System.Threading; using Akka.Actor; using Akka.Configuration; using Akka.Event; using Akka.TestKit; +using Akka.Tests.Shared.Internals; using Xunit; using Xunit.Abstractions; using FluentAssertions; @@ -26,25 +28,40 @@ public LoggerSpec(ITestOutputHelper output) : base(Config, output) [Fact] public void TestOutputLogger_WithBadFormattingMustNotThrow() { + var events = new List(); + // Need to wait until TestOutputLogger initializes Thread.Sleep(200); Sys.EventStream.Subscribe(TestActor, typeof(LogEvent)); Sys.Log.Error(new FakeException("BOOM"), Case.t, Case.p); - ExpectMsg().Cause.Should().BeOfType(); - ExpectMsg().Cause.Should().BeOfType(); + events.Add(ExpectMsg()); + events.Add(ExpectMsg()); + + events.All(e => e is Error).Should().BeTrue(); + events.Select(e => e.Cause).Any(c => c is FakeException).Should().BeTrue(); + events.Select(e => e.Cause).Any(c => c is AggregateException).Should().BeTrue(); + events.Clear(); Sys.Log.Warning(Case.t, Case.p); - ExpectMsg(); - ExpectMsg().Cause.Should().BeOfType(); + events.Add(ExpectMsg()); + events.Add(ExpectMsg()); + events.Any(e => e is Warning).Should().BeTrue(); + events.First(e => e is Error).Cause.Should().BeOfType(); + events.Clear(); Sys.Log.Info(Case.t, Case.p); - ExpectMsg(); - ExpectMsg().Cause.Should().BeOfType(); + events.Add(ExpectMsg()); + events.Add(ExpectMsg()); + events.Any(e => e is Info).Should().BeTrue(); + events.First(e => e is Error).Cause.Should().BeOfType(); + events.Clear(); Sys.Log.Debug(Case.t, Case.p); - ExpectMsg(); - ExpectMsg().Cause.Should().BeOfType(); + events.Add(ExpectMsg()); + events.Add(ExpectMsg()); + events.Any(e => e is Debug).Should().BeTrue(); + events.First(e => e is Error).Cause.Should().BeOfType(); } [Fact] From e51d46b8bb4e76b35cee44dd63266ed129649d3a Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Sun, 16 May 2021 01:54:12 +0700 Subject: [PATCH 05/50] Enable skipped batching SQLite specs, these runs just fine now. (#5017) --- .../Batching/BatchingSqliteCurrentEventsByTagSpec.cs | 5 ----- .../Batching/BatchingSqliteCurrentPersistenceIdsSpec.cs | 5 ----- .../Query/SqliteCurrentEventsByTagSpec.cs | 5 ----- 3 files changed, 15 deletions(-) diff --git a/src/contrib/persistence/Akka.Persistence.Sqlite.Tests/Batching/BatchingSqliteCurrentEventsByTagSpec.cs b/src/contrib/persistence/Akka.Persistence.Sqlite.Tests/Batching/BatchingSqliteCurrentEventsByTagSpec.cs index 42a2ddd9b0a..c6026f341e1 100644 --- a/src/contrib/persistence/Akka.Persistence.Sqlite.Tests/Batching/BatchingSqliteCurrentEventsByTagSpec.cs +++ b/src/contrib/persistence/Akka.Persistence.Sqlite.Tests/Batching/BatchingSqliteCurrentEventsByTagSpec.cs @@ -43,10 +43,5 @@ public BatchingSqliteCurrentEventsByTagSpec(ITestOutputHelper output) : base(Con { ReadJournal = Sys.ReadJournalFor(SqlReadJournal.Identifier); } - - [Fact(Skip = "Test work good, but raises an exception")] - public override void ReadJournal_query_CurrentEventsByTag_should_complete_when_no_events() - { - } } } diff --git a/src/contrib/persistence/Akka.Persistence.Sqlite.Tests/Batching/BatchingSqliteCurrentPersistenceIdsSpec.cs b/src/contrib/persistence/Akka.Persistence.Sqlite.Tests/Batching/BatchingSqliteCurrentPersistenceIdsSpec.cs index e57b74a6253..74bdf202a4a 100644 --- a/src/contrib/persistence/Akka.Persistence.Sqlite.Tests/Batching/BatchingSqliteCurrentPersistenceIdsSpec.cs +++ b/src/contrib/persistence/Akka.Persistence.Sqlite.Tests/Batching/BatchingSqliteCurrentPersistenceIdsSpec.cs @@ -38,10 +38,5 @@ public BatchingSqliteCurrentPersistenceIdsSpec(ITestOutputHelper output) : base( { ReadJournal = Sys.ReadJournalFor(SqlReadJournal.Identifier); } - - [Fact(Skip = "Not implemented, due to bugs on NetCore")] - public override void ReadJournal_query_CurrentPersistenceIds_should_not_see_new_events_after_complete() - { - } } } diff --git a/src/contrib/persistence/Akka.Persistence.Sqlite.Tests/Query/SqliteCurrentEventsByTagSpec.cs b/src/contrib/persistence/Akka.Persistence.Sqlite.Tests/Query/SqliteCurrentEventsByTagSpec.cs index eeccb160702..50eaac2ae30 100644 --- a/src/contrib/persistence/Akka.Persistence.Sqlite.Tests/Query/SqliteCurrentEventsByTagSpec.cs +++ b/src/contrib/persistence/Akka.Persistence.Sqlite.Tests/Query/SqliteCurrentEventsByTagSpec.cs @@ -44,10 +44,5 @@ public SqliteCurrentEventsByTagSpec(ITestOutputHelper output) : base(Config(Coun { ReadJournal = Sys.ReadJournalFor(SqlReadJournal.Identifier); } - - [Fact(Skip = "Test work good, but raises an exception")] - public override void ReadJournal_query_CurrentEventsByTag_should_complete_when_no_events() - { - } } } From 1875c320dc4317e22dfde18c264150cd136f2975 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Tue, 18 May 2021 05:14:07 +0700 Subject: [PATCH 06/50] Enable skipped specs, they're working now (#5018) --- src/core/Akka.Persistence.Tests/PersistentActorSpec.cs | 6 +++--- .../Akka.Persistence.Tests/PersistentActorSpecAsyncAwait.cs | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/core/Akka.Persistence.Tests/PersistentActorSpec.cs b/src/core/Akka.Persistence.Tests/PersistentActorSpec.cs index 74cdbbae343..d6a044c6fcf 100644 --- a/src/core/Akka.Persistence.Tests/PersistentActorSpec.cs +++ b/src/core/Akka.Persistence.Tests/PersistentActorSpec.cs @@ -135,7 +135,7 @@ public void PersistentActor_should_allow_behavior_changes_in_event_handler_as_la ExpectMsgInOrder("a-1", "a-2", "b-0", "c-30", "c-31", "c-32", "d-0", "e-30", "e-31", "e-32"); } - [Fact(Skip = "Need https://github.com/akkadotnet/akka.net/pull/3668 merged")] + [Fact] public void PersistentActor_should_support_snapshotting() { var pref = ActorOf(Props.Create(() => new SnapshottingPersistentActor(Name, TestActor))); @@ -152,7 +152,7 @@ public void PersistentActor_should_support_snapshotting() ExpectMsgInOrder("a-1", "a-2", "b-41", "b-42", "c-41", "c-42"); } - [Fact(Skip = "Need https://github.com/akkadotnet/akka.net/pull/3668 merged")] + [Fact] public void PersistentActor_should_support_Context_Become_during_recovery() { var pref = ActorOf(Props.Create(() => new SnapshottingPersistentActor(Name, TestActor))); @@ -433,7 +433,7 @@ public void PersistentActor_should_invoke_deferred_handlers_preserving_the_origi ExpectNoMsg(TimeSpan.FromMilliseconds(100)); } - [Fact(Skip = "Need https://github.com/akkadotnet/akka.net/pull/3668 merged")] + [Fact] public void PersistentActor_should_receive_RecoveryFinished_if_it_is_handled_after_all_events_have_been_replayed() { var pref = ActorOf(Props.Create(() => new SnapshottingPersistentActor(Name, TestActor))); diff --git a/src/core/Akka.Persistence.Tests/PersistentActorSpecAsyncAwait.cs b/src/core/Akka.Persistence.Tests/PersistentActorSpecAsyncAwait.cs index c68b8c02b7a..24c321e5543 100644 --- a/src/core/Akka.Persistence.Tests/PersistentActorSpecAsyncAwait.cs +++ b/src/core/Akka.Persistence.Tests/PersistentActorSpecAsyncAwait.cs @@ -134,7 +134,7 @@ public void PersistentActor_should_allow_behavior_changes_in_event_handler_as_la ExpectMsgInOrder("a-1", "a-2", "b-0", "c-30", "c-31", "c-32", "d-0", "e-30", "e-31", "e-32"); } - [Fact(Skip = "Need https://github.com/akkadotnet/akka.net/pull/3668 merged")] + [Fact] public void PersistentActor_should_support_snapshotting() { var pref = ActorOf(Props.Create(() => new SnapshottingPersistentActor(Name, TestActor))); @@ -151,7 +151,7 @@ public void PersistentActor_should_support_snapshotting() ExpectMsgInOrder("a-1", "a-2", "b-41", "b-42", "c-41", "c-42"); } - [Fact(Skip = "Need https://github.com/akkadotnet/akka.net/pull/3668 merged")] + [Fact] public void PersistentActor_should_support_Context_Become_during_recovery() { var pref = ActorOf(Props.Create(() => new SnapshottingPersistentActor(Name, TestActor))); @@ -432,7 +432,7 @@ public void PersistentActor_should_invoke_deferred_handlers_preserving_the_origi ExpectNoMsg(TimeSpan.FromMilliseconds(100)); } - [Fact(Skip = "Need https://github.com/akkadotnet/akka.net/pull/3668 merged")] + [Fact] public void PersistentActor_should_receive_RecoveryFinished_if_it_is_handled_after_all_events_have_been_replayed() { var pref = ActorOf(Props.Create(() => new SnapshottingPersistentActor(Name, TestActor))); From 3f6182c6145ab7fd5b896005504f0a9097e5f851 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Tue, 18 May 2021 22:20:04 +0700 Subject: [PATCH 07/50] Fix AkkaProtocolStressTest spec (#5020) Co-authored-by: Aaron Stannard --- .../Transport/AkkaProtocolStressTest.cs | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/core/Akka.Remote.Tests/Transport/AkkaProtocolStressTest.cs b/src/core/Akka.Remote.Tests/Transport/AkkaProtocolStressTest.cs index 1a48a618a79..ace0cd656d1 100644 --- a/src/core/Akka.Remote.Tests/Transport/AkkaProtocolStressTest.cs +++ b/src/core/Akka.Remote.Tests/Transport/AkkaProtocolStressTest.cs @@ -16,6 +16,7 @@ using Akka.TestKit.TestEvent; using Akka.Util.Internal; using Xunit; +using Xunit.Abstractions; namespace Akka.Remote.Tests.Transport { @@ -163,14 +164,15 @@ private IActorRef Here get { Sys.ActorSelection(RootB / "user" / "echo").Tell(new Identify(null), TestActor); - return ExpectMsg(TimeSpan.FromSeconds(300)).Subject; + var subject = ExpectMsg(TimeSpan.FromSeconds(3)).Subject; + return subject; } } #endregion - public AkkaProtocolStressTest() : base(AkkaProtocolStressTestConfig) + public AkkaProtocolStressTest(ITestOutputHelper output) : base(AkkaProtocolStressTestConfig, output) { systemB = ActorSystem.Create("systemB", Sys.Settings.Config); remote = systemB.ActorOf(Props.Create(), "echo"); @@ -178,7 +180,7 @@ public AkkaProtocolStressTest() : base(AkkaProtocolStressTestConfig) #region Tests - [Fact(Skip = "Extremely racy")] + [Fact] public void AkkaProtocolTransport_must_guarantee_at_most_once_delivery_and_message_ordering_despite_packet_loss() { //todo mute both systems for deadletters for any type of message @@ -190,7 +192,12 @@ public void AkkaProtocolTransport_must_guarantee_at_most_once_delivery_and_messa new FailureInjectorTransportAdapter.Drop(0.1, 0.1))); AwaitCondition(() => mc.IsCompleted && mc.Result, TimeSpan.FromSeconds(3)); - var here = Here; + IActorRef here = null; + AwaitCondition(() => + { + here = Here; + return here != null && !here.Equals(ActorRefs.Nobody); + }, TimeSpan.FromSeconds(3)); var tester = Sys.ActorOf(Props.Create(() => new SequenceVerifier(here, TestActor))); tester.Tell("start"); From 0869e498e1a9e880946138865e13def80fdc7399 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Wed, 19 May 2021 01:03:44 +0700 Subject: [PATCH 08/50] Increase timeout value to avoid timeout failure (#5021) Co-authored-by: Aaron Stannard --- src/core/Akka.Streams.Tests/Dsl/KeepAliveConcatSpec.cs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/core/Akka.Streams.Tests/Dsl/KeepAliveConcatSpec.cs b/src/core/Akka.Streams.Tests/Dsl/KeepAliveConcatSpec.cs index c6a32b2c88d..f653c5f5dd9 100644 --- a/src/core/Akka.Streams.Tests/Dsl/KeepAliveConcatSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/KeepAliveConcatSpec.cs @@ -13,11 +13,16 @@ using Akka.Streams.TestKit.Tests; using FluentAssertions; using Xunit; +using Xunit.Abstractions; namespace Akka.Streams.Tests.Dsl { public class KeepAliveConcatSpec : Akka.TestKit.Xunit2.TestKit { + public KeepAliveConcatSpec(ITestOutputHelper output) + : base(output: output) + { } + private readonly Source, NotUsed> _sampleSource = Source.From(Enumerable.Range(1, 10).Grouped(3)); private IEnumerable> Expand(IEnumerable lst) @@ -52,7 +57,7 @@ public void KeepAliveConcat_should_emit_elements_periodically_after_silent_perio .Grouped(1000) .RunWith(Sink.First>>(), Sys.Materializer()); - t.AwaitResult() + t.AwaitResult(TimeSpan.FromSeconds(6)) .SelectMany(x => x) .Should().BeEquivalentTo(Enumerable.Range(1, 10), o => o.WithStrictOrdering()); } From d0afa72ca0131d0f463641e971fecc30969e8148 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 20 May 2021 07:43:26 -0500 Subject: [PATCH 09/50] Bump BenchmarkDotNet from 0.12.1 to 0.13.0 (#5024) Bumps [BenchmarkDotNet](https://github.com/dotnet/BenchmarkDotNet) from 0.12.1 to 0.13.0. - [Release notes](https://github.com/dotnet/BenchmarkDotNet/releases) - [Commits](https://github.com/dotnet/BenchmarkDotNet/compare/v0.12.1...v0.13.0) Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- src/benchmark/Akka.Benchmarks/Akka.Benchmarks.csproj | 2 +- .../SerializationBenchmarks/SerializationBenchmarks.csproj | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/benchmark/Akka.Benchmarks/Akka.Benchmarks.csproj b/src/benchmark/Akka.Benchmarks/Akka.Benchmarks.csproj index 36bfa3fcd9a..8b2cee65503 100644 --- a/src/benchmark/Akka.Benchmarks/Akka.Benchmarks.csproj +++ b/src/benchmark/Akka.Benchmarks/Akka.Benchmarks.csproj @@ -7,7 +7,7 @@ - + diff --git a/src/benchmark/SerializationBenchmarks/SerializationBenchmarks.csproj b/src/benchmark/SerializationBenchmarks/SerializationBenchmarks.csproj index 0f7a1838ad3..dbf48b2cf2b 100644 --- a/src/benchmark/SerializationBenchmarks/SerializationBenchmarks.csproj +++ b/src/benchmark/SerializationBenchmarks/SerializationBenchmarks.csproj @@ -6,7 +6,7 @@ - + From b5f6552a719cb75686686af69e8744f77bf47e3f Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 20 May 2021 16:27:09 +0000 Subject: [PATCH 10/50] Bump Google.Protobuf from 3.16.0 to 3.17.0 (#5012) Bumps [Google.Protobuf](https://github.com/protocolbuffers/protobuf) from 3.16.0 to 3.17.0. - [Release notes](https://github.com/protocolbuffers/protobuf/releases) - [Changelog](https://github.com/protocolbuffers/protobuf/blob/master/generate_changelog.py) - [Commits](https://github.com/protocolbuffers/protobuf/compare/v3.16.0...v3.17.0) Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- src/common.props | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/common.props b/src/common.props index 809e3c7a451..d630508f928 100644 --- a/src/common.props +++ b/src/common.props @@ -14,7 +14,7 @@ 0.10.1 13.0.1 2.0.1 - 3.16.0 + 3.17.0 netcoreapp3.1 net5.0 net471 From aa8a3deee525b87b1d643d0d2babe412da868d05 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Fri, 21 May 2021 13:51:41 -0500 Subject: [PATCH 11/50] adding ORSet benchmarks (#4990) * adding ORSet benchmarks Working on https://github.com/akkadotnet/akka.net/issues/4956 * added VersionVectorBenchmark Using this to help gauge ORSet / other CRDT merge performance * moved all `VersionVector` comparisons to use `ValueTuple` instead of `KeyValuePair` * temporarily lowered ORSetBenchmark parameters * added VersionVector merge benchmarks and cleaned up code * working on more benchmarking for ORSet --- src/Akka.sln | 19 +- .../Akka.Benchmarks/DData/ORSetBenchmarks.cs | 129 +++++++++++++ .../DData/VersionVectorBenchmark.cs | 174 ++++++++++++++++++ .../Akka.DistributedData.Tests/ORSetSpec.cs | 24 ++- .../cluster/Akka.DistributedData/ORSet.cs | 47 ++--- .../Akka.DistributedData/VersionVector.cs | 95 ++++++---- .../DDataStressTest/DDataStressTest.csproj | 12 ++ .../Cluster/DData/DDataStressTest/Program.cs | 91 +++++++++ 8 files changed, 521 insertions(+), 70 deletions(-) create mode 100644 src/benchmark/Akka.Benchmarks/DData/ORSetBenchmarks.cs create mode 100644 src/benchmark/Akka.Benchmarks/DData/VersionVectorBenchmark.cs create mode 100644 src/examples/Cluster/DData/DDataStressTest/DDataStressTest.csproj create mode 100644 src/examples/Cluster/DData/DDataStressTest/Program.cs diff --git a/src/Akka.sln b/src/Akka.sln index c52324212ab..2c21386bd39 100644 --- a/src/Akka.sln +++ b/src/Akka.sln @@ -240,9 +240,11 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Akka.DependencyInjection.Te EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "AspNetCore", "AspNetCore", "{162F5991-EA57-4221-9B70-F9B6FEC18036}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Samples.Akka.AspNetCore", "examples\AspNetCore\Samples.Akka.AspNetCore\Samples.Akka.AspNetCore.csproj", "{D62F4AD6-318F-4ECC-B875-83FA9933A81B}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Samples.Akka.AspNetCore", "examples\AspNetCore\Samples.Akka.AspNetCore\Samples.Akka.AspNetCore.csproj", "{D62F4AD6-318F-4ECC-B875-83FA9933A81B}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SerializationBenchmarks", "benchmark\SerializationBenchmarks\SerializationBenchmarks.csproj", "{2E4B9584-42CC-4D17-B719-9F462B16C94D}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SerializationBenchmarks", "benchmark\SerializationBenchmarks\SerializationBenchmarks.csproj", "{2E4B9584-42CC-4D17-B719-9F462B16C94D}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DDataStressTest", "examples\Cluster\DData\DDataStressTest\DDataStressTest.csproj", "{44B3DDD6-6103-4E8F-8AC2-0F4BA3CF6B50}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -1123,6 +1125,18 @@ Global {2E4B9584-42CC-4D17-B719-9F462B16C94D}.Release|x64.Build.0 = Release|Any CPU {2E4B9584-42CC-4D17-B719-9F462B16C94D}.Release|x86.ActiveCfg = Release|Any CPU {2E4B9584-42CC-4D17-B719-9F462B16C94D}.Release|x86.Build.0 = Release|Any CPU + {44B3DDD6-6103-4E8F-8AC2-0F4BA3CF6B50}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {44B3DDD6-6103-4E8F-8AC2-0F4BA3CF6B50}.Debug|Any CPU.Build.0 = Debug|Any CPU + {44B3DDD6-6103-4E8F-8AC2-0F4BA3CF6B50}.Debug|x64.ActiveCfg = Debug|Any CPU + {44B3DDD6-6103-4E8F-8AC2-0F4BA3CF6B50}.Debug|x64.Build.0 = Debug|Any CPU + {44B3DDD6-6103-4E8F-8AC2-0F4BA3CF6B50}.Debug|x86.ActiveCfg = Debug|Any CPU + {44B3DDD6-6103-4E8F-8AC2-0F4BA3CF6B50}.Debug|x86.Build.0 = Debug|Any CPU + {44B3DDD6-6103-4E8F-8AC2-0F4BA3CF6B50}.Release|Any CPU.ActiveCfg = Release|Any CPU + {44B3DDD6-6103-4E8F-8AC2-0F4BA3CF6B50}.Release|Any CPU.Build.0 = Release|Any CPU + {44B3DDD6-6103-4E8F-8AC2-0F4BA3CF6B50}.Release|x64.ActiveCfg = Release|Any CPU + {44B3DDD6-6103-4E8F-8AC2-0F4BA3CF6B50}.Release|x64.Build.0 = Release|Any CPU + {44B3DDD6-6103-4E8F-8AC2-0F4BA3CF6B50}.Release|x86.ActiveCfg = Release|Any CPU + {44B3DDD6-6103-4E8F-8AC2-0F4BA3CF6B50}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -1230,6 +1244,7 @@ Global {162F5991-EA57-4221-9B70-F9B6FEC18036} = {D3AF8295-AEB5-4324-AA82-FCC0014AC310} {D62F4AD6-318F-4ECC-B875-83FA9933A81B} = {162F5991-EA57-4221-9B70-F9B6FEC18036} {2E4B9584-42CC-4D17-B719-9F462B16C94D} = {73108242-625A-4D7B-AA09-63375DBAE464} + {44B3DDD6-6103-4E8F-8AC2-0F4BA3CF6B50} = {C50E1A9E-820C-4E75-AE39-6F96A99AC4A7} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {03AD8E21-7507-4E68-A4E9-F4A7E7273164} diff --git a/src/benchmark/Akka.Benchmarks/DData/ORSetBenchmarks.cs b/src/benchmark/Akka.Benchmarks/DData/ORSetBenchmarks.cs new file mode 100644 index 00000000000..9e0c2c0e971 --- /dev/null +++ b/src/benchmark/Akka.Benchmarks/DData/ORSetBenchmarks.cs @@ -0,0 +1,129 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using Akka.Actor; +using Akka.Benchmarks.Configurations; +using Akka.Cluster; +using Akka.DistributedData; +using BenchmarkDotNet.Attributes; + +namespace Akka.Benchmarks.DData +{ + [Config(typeof(MicroBenchmarkConfig))] + public class ORSetBenchmarks + { + [Params(25)] + public int NumElements; + + [Params(10)] + public int NumNodes; + + [Params(100)] + public int Iterations; + + private UniqueAddress[] _nodes; + private string[] _elements; + + private readonly string _user1 = "{\"username\":\"john\",\"password\":\"coltrane\"}"; + private readonly string _user2 = "{\"username\":\"sonny\",\"password\":\"rollins\"}"; + private readonly string _user3 = "{\"username\":\"charlie\",\"password\":\"parker\"}"; + private readonly string _user4 = "{\"username\":\"charles\",\"password\":\"mingus\"}"; + + // has data from all nodes + private ORSet _c1 = ORSet.Empty; + + // has additional items from all nodes + private ORSet _c2 = ORSet.Empty; + + // has removed items from all nodes + private ORSet _c3 = ORSet.Empty; + + [GlobalSetup] + public void Setup() + { + var newNodes = new List(NumNodes); + foreach(var i in Enumerable.Range(0, NumNodes)){ + var address = new Address("akka.tcp", "Sys", "localhost", 2552 + i); + var uniqueAddress = new UniqueAddress(address, i); + newNodes.Add(uniqueAddress); + } + _nodes = newNodes.ToArray(); + + var newElements = new List(NumNodes); + foreach(var i in Enumerable.Range(0, NumElements)){ + newElements.Add(i.ToString()); + } + _elements = newElements.ToArray(); + + _c1 = ORSet.Empty; + foreach(var node in _nodes){ + _c1 = _c1.Add(node, _elements[0]); + } + + // add some data that _c2 doesn't have + _c2 = _c1; + foreach(var node in _nodes.Skip(NumNodes/2)){ + _c2 = _c2.Add(node, _elements[1]); + } + + _c3 = _c1; + foreach(var node in _nodes.Take(NumNodes/2)){ + _c3 = _c3.Remove(node, _elements[0]); + } + } + + [Benchmark] + public void Should_add_node_to_ORSet() + { + for (var i = 0; i < Iterations; i++) + { + var init = ORSet.Empty; + foreach (var node in _nodes) + { + init = init.Add(node, _elements[0]); + } + } + + } + + [Benchmark] + public void Should_add_elements_for_Same_node() + { + for (var i = 0; i < Iterations; i++) + { + var init = ORSet.Empty; + foreach (var element in _elements) + { + init = init.Add(_nodes[0], element); + } + } + } + + [Benchmark] + public void Should_merge_in_new_Elements_from_other_nodes(){ + for (var i = 0; i < Iterations; i++) + { + var c4 = _c1.Merge(_c2); + } + + } + + [Benchmark] + public void Should_merge_in_removed_Elements_from_other_nodes(){ + for (var i = 0; i < Iterations; i++) + { + var c4 = _c1.Merge(_c3); + } + + } + + [Benchmark] + public void Should_merge_in_add_and_removed_Elements_from_other_nodes(){ + for (var i = 0; i < Iterations; i++) + { + var c4 = _c1.Merge(_c2).Merge(_c3); + } + } + } +} diff --git a/src/benchmark/Akka.Benchmarks/DData/VersionVectorBenchmark.cs b/src/benchmark/Akka.Benchmarks/DData/VersionVectorBenchmark.cs new file mode 100644 index 00000000000..dd8fea98b24 --- /dev/null +++ b/src/benchmark/Akka.Benchmarks/DData/VersionVectorBenchmark.cs @@ -0,0 +1,174 @@ +using System; +using System.Collections.Generic; +using System.Collections.Immutable; +using System.Linq; +using System.Text; +using Akka.Benchmarks.Configurations; +using Akka.Cluster; +using Akka.DistributedData; +using BenchmarkDotNet.Attributes; +using FluentAssertions; +using static Akka.DistributedData.VersionVector; + +namespace Akka.Benchmarks.DData +{ + [Config(typeof(MicroBenchmarkConfig))] + public class VersionVectorBenchmarks + { + [Params(100)] + public int ClockSize; + + [Params(1000)] + public int Iterations; + + internal (VersionVector clock, ImmutableSortedSet nodes) CreateVectorClockOfSize(int size) + { + UniqueAddress GenerateUniqueAddress(int nodeCount){ + return new UniqueAddress(new Akka.Actor.Address("akka.tcp", "ClusterSys", "localhost", nodeCount), nodeCount); + } + + return Enumerable.Range(1, size) + .Aggregate((VersionVector.Empty, ImmutableSortedSet.Empty), + (tuple, i) => + { + var (vc, nodes) = tuple; + var node = GenerateUniqueAddress(i); + return (vc.Increment(node), nodes.Add(node)); + }); + } + + internal VersionVector CopyVectorClock(VersionVector vc) + { + var versions = ImmutableDictionary.Empty; + var enumerator = vc.VersionEnumerator; + while(enumerator.MoveNext()){ + var nodePair = enumerator.Current; + versions = versions.SetItem(nodePair.Key, nodePair.Value); + } + + return VersionVector.Create(versions); + } + + private UniqueAddress _firstNode; + private UniqueAddress _lastNode; + private UniqueAddress _middleNode; + private ImmutableSortedSet _nodes; + private VersionVector _vcBefore; + private VersionVector _vcBaseLast; + private VersionVector _vcAfterLast; + private VersionVector _vcConcurrentLast; + private VersionVector _vcBaseMiddle; + private VersionVector _vcAfterMiddle; + private VersionVector _vcConcurrentMiddle; + + [GlobalSetup] + public void Setup() + { + var (vcBefore, nodes) = CreateVectorClockOfSize(ClockSize); + _vcBefore = vcBefore; + _nodes = nodes; + + _firstNode = nodes.First(); + _lastNode = nodes.Last(); + _middleNode = nodes[ClockSize / 2]; + + _vcBaseLast = vcBefore.Increment(_lastNode); + _vcAfterLast = _vcBaseLast.Increment(_firstNode); + _vcConcurrentLast = _vcBaseLast.Increment(_lastNode); + _vcBaseMiddle = _vcBefore.Increment(_middleNode); + _vcAfterMiddle = _vcBaseMiddle.Increment(_firstNode); + _vcConcurrentMiddle = _vcBaseMiddle.Increment(_middleNode); + } + + private void CheckThunkFor(VersionVector vc1, VersionVector vc2, Action thunk, int times) + { + var vcc1 = CopyVectorClock(vc1); + var vcc2 = CopyVectorClock(vc2); + for (var i = 0; i < times; i++) + { + thunk(vcc1, vcc2); + } + } + + private void CompareTo(VersionVector vc1, VersionVector vc2, Ordering ordering) + { + vc1.Compare(vc2).Should().Be(ordering); + } + + private void NotEqual(VersionVector vc1, VersionVector vc2) + { + (vc1 == vc2).Should().BeFalse(); + } + + private void Merge(VersionVector vc1, VersionVector vc2) + { + vc1.Merge(vc2); + } + + [Benchmark] + public void VectorClock_comparisons_should_compare_same() + { + CheckThunkFor(_vcBaseLast, _vcBaseLast, (clock, vectorClock) => CompareTo(clock, vectorClock, Ordering.Same), Iterations); + } + + [Benchmark] + public void VectorClock_comparisons_should_compare_Before_last() + { + CheckThunkFor(_vcBefore, _vcBaseLast, (clock, vectorClock) => CompareTo(clock, vectorClock, Ordering.Before), Iterations); + } + + [Benchmark] + public void VectorClock_comparisons_should_compare_After_last() + { + CheckThunkFor(_vcAfterLast, _vcBaseLast, (clock, vectorClock) => CompareTo(clock, vectorClock, Ordering.After), Iterations); + } + + [Benchmark] + public void VectorClock_comparisons_should_compare_Concurrent_last() + { + CheckThunkFor(_vcAfterLast, _vcConcurrentLast, (clock, vectorClock) => CompareTo(clock, vectorClock, Ordering.Concurrent), Iterations); + } + + [Benchmark] + public void VectorClock_comparisons_should_compare_Before_middle() + { + CheckThunkFor(_vcBefore, _vcBaseMiddle, (clock, vectorClock) => CompareTo(clock, vectorClock, Ordering.Before), Iterations); + } + + [Benchmark] + public void VectorClock_comparisons_should_compare_After_middle() + { + CheckThunkFor(_vcAfterMiddle, _vcBaseMiddle, (clock, vectorClock) => CompareTo(clock, vectorClock, Ordering.After), Iterations); + } + + [Benchmark] + public void VectorClock_comparisons_should_compare_Concurrent_middle() + { + CheckThunkFor(_vcAfterMiddle, _vcConcurrentMiddle, (clock, vectorClock) => CompareTo(clock, vectorClock, Ordering.Concurrent), Iterations); + } + + [Benchmark] + public void VectorClock_comparisons_should_compare_notEquals_Before_Middle() + { + CheckThunkFor(_vcBefore, _vcBaseMiddle, (clock, vectorClock) => NotEqual(clock, vectorClock), Iterations); + } + + [Benchmark] + public void VectorClock_comparisons_should_compare_notEquals_After_Middle() + { + CheckThunkFor(_vcAfterMiddle, _vcBaseMiddle, (clock, vectorClock) => NotEqual(clock, vectorClock), Iterations); + } + + [Benchmark] + public void VectorClock_comparisons_should_compare_notEquals_Concurrent_Middle() + { + CheckThunkFor(_vcAfterMiddle, _vcConcurrentMiddle, (clock, vectorClock) => NotEqual(clock, vectorClock), Iterations); + } + + [Benchmark] + public void VersionVector_merge_Multi_Multi() + { + CheckThunkFor(_vcBefore, _vcAfterLast, (vector, versionVector) => Merge(vector, versionVector), Iterations); + } + } +} \ No newline at end of file diff --git a/src/contrib/cluster/Akka.DistributedData.Tests/ORSetSpec.cs b/src/contrib/cluster/Akka.DistributedData.Tests/ORSetSpec.cs index 97a87a20b00..883ebe461ac 100644 --- a/src/contrib/cluster/Akka.DistributedData.Tests/ORSetSpec.cs +++ b/src/contrib/cluster/Akka.DistributedData.Tests/ORSetSpec.cs @@ -57,16 +57,20 @@ public ORSetSpec(ITestOutputHelper output) [Fact] public void ORSet_must_be_able_to_add_element() { - var c1 = ORSet.Empty; - var c2 = c1.Add(_node1, _user1); - var c3 = c2.Add(_node1, _user2); - var c4 = c3.Add(_node1, _user4); - var c5 = c4.Add(_node1, _user3); - - Assert.Contains(_user1, c5.Elements); - Assert.Contains(_user2, c5.Elements); - Assert.Contains(_user3, c5.Elements); - Assert.Contains(_user4, c5.Elements); + for (var i = 0; i < 100; i++) + { + var c1 = ORSet.Empty; + var c2 = c1.Add(_node1, _user1); + var c3 = c2.Add(_node1, _user2); + var c4 = c3.Add(_node1, _user4); + var c5 = c4.Add(_node1, _user3); + + Assert.Contains(_user1, c5.Elements); + Assert.Contains(_user2, c5.Elements); + Assert.Contains(_user3, c5.Elements); + Assert.Contains(_user4, c5.Elements); + } + } [Fact] diff --git a/src/contrib/cluster/Akka.DistributedData/ORSet.cs b/src/contrib/cluster/Akka.DistributedData/ORSet.cs index 150b29c32ac..95e50398df1 100644 --- a/src/contrib/cluster/Akka.DistributedData/ORSet.cs +++ b/src/contrib/cluster/Akka.DistributedData/ORSet.cs @@ -518,35 +518,38 @@ public AddDeltaOperation(ORSet underlying) public override ORSet Underlying { get; } public override IReplicatedData Merge(IReplicatedData other) { - if (other is AddDeltaOperation) - { - var u = ((AddDeltaOperation)other).Underlying; - // Note that we only merge deltas originating from the same node - return new AddDeltaOperation(new ORSet( - ConcatElementsMap(u.ElementsMap), - Underlying.VersionVector.Merge(u.VersionVector))); - } - else if (other is AtomicDeltaOperation) - { - return new DeltaGroup(ImmutableArray.Create(this, other)); - } - else if (other is DeltaGroup) + switch (other) { - var vector = ((DeltaGroup)other).Operations; - return new DeltaGroup(vector.Add(this)); + case AddDeltaOperation operation: + { + var u = operation.Underlying; + // Note that we only merge deltas originating from the same node + return new AddDeltaOperation(new ORSet( + ConcatElementsMap(u.ElementsMap), + Underlying.VersionVector.Merge(u.VersionVector))); + } + case AtomicDeltaOperation _: + return new DeltaGroup(ImmutableArray.Create(this, other)); + case DeltaGroup dg: + { + var vector = dg.Operations; + return new DeltaGroup(vector.Add(this)); + } + default: + throw new ArgumentException($"Unknown delta operation of type {other.GetType()}", nameof(other)); } - else throw new ArgumentException($"Unknown delta operation of type {other.GetType()}", nameof(other)); } private ImmutableDictionary ConcatElementsMap( ImmutableDictionary thatMap) { - var u = Underlying.ElementsMap.ToBuilder(); - foreach (var entry in thatMap) - { - u[entry.Key] = entry.Value; - } - return u.ToImmutable(); + //var u = Underlying.ElementsMap.ToBuilder(); + //foreach (var entry in thatMap) + //{ + // u[entry.Key] = entry.Value; + //} + //return u.ToImmutable(); + return Underlying.ElementsMap.SetItems(thatMap); } } diff --git a/src/contrib/cluster/Akka.DistributedData/VersionVector.cs b/src/contrib/cluster/Akka.DistributedData/VersionVector.cs index d4d367a2283..d83daec7f38 100644 --- a/src/contrib/cluster/Akka.DistributedData/VersionVector.cs +++ b/src/contrib/cluster/Akka.DistributedData/VersionVector.cs @@ -52,13 +52,19 @@ public static VersionVector Create(ImmutableDictionary vers /// /// Marker to signal that we have reached the end of a version vector. /// - private static readonly KeyValuePair EndMarker = new KeyValuePair(null, long.MinValue); + private static readonly (UniqueAddress addr, long version) EndMarker = (null, long.MinValue); public abstract bool IsEmpty { get; } public abstract int Count { get; } public abstract IEnumerator> VersionEnumerator { get; } + + internal abstract IEnumerable<(UniqueAddress addr, long version)> InternalVersions { get; } + + internal IEnumerator<(UniqueAddress addr, long version)> InternalVersionEnumerator => + InternalVersions.GetEnumerator(); + public static readonly VersionVector Empty = new MultiVersionVector(ImmutableDictionary.Empty); /// @@ -156,15 +162,15 @@ private Ordering CompareOnlyTo(VersionVector other, Ordering order) { if (ReferenceEquals(this, other)) return Ordering.Same; - return Compare(VersionEnumerator, other.VersionEnumerator, + return Compare(InternalVersionEnumerator, other.InternalVersionEnumerator, order == Ordering.Concurrent ? Ordering.FullOrder : order); } - private T NextOrElse(IEnumerator enumerator, T defaultValue) => + private static T NextOrElse(IEnumerator enumerator, T defaultValue) => enumerator.MoveNext() ? enumerator.Current : defaultValue; - private Ordering Compare(IEnumerator> i1, - IEnumerator> i2, Ordering requestedOrder) + private Ordering Compare(IEnumerator<(UniqueAddress addr, long version)> i1, + IEnumerator<(UniqueAddress addr, long version)> i2, Ordering requestedOrder) { var nt1 = NextOrElse(i1, EndMarker); var nt2 = NextOrElse(i2, EndMarker); @@ -177,15 +183,15 @@ private Ordering CompareOnlyTo(VersionVector other, Ordering order) else if (Equals(nt2, EndMarker)) return currentOrder == Ordering.Before ? Ordering.Concurrent : Ordering.After; else { - var nc = nt1.Key.CompareTo(nt2.Key); + var nc = nt1.addr.CompareTo(nt2.addr); if (nc == 0) { - if (nt1.Value < nt2.Value) + if (nt1.version < nt2.version) { if (currentOrder == Ordering.After) return Ordering.Concurrent; currentOrder = Ordering.Before; } - else if (nt1.Value > nt2.Value) + else if (nt1.version > nt2.version) { if (currentOrder == Ordering.Before) return Ordering.Concurrent; currentOrder = Ordering.After; @@ -258,6 +264,15 @@ public SingleVersionVector(UniqueAddress node, long version) public override bool IsEmpty => false; public override int Count => 1; public override IEnumerator> VersionEnumerator => new Enumerator(Node, Version); + + internal override IEnumerable<(UniqueAddress addr, long version)> InternalVersions + { + get + { + yield return (Node, Version); + } + } + public override VersionVector Increment(UniqueAddress node) { var v = Counter.GetAndIncrement(); @@ -274,23 +289,23 @@ public override VersionVector Increment(UniqueAddress node) public override VersionVector Merge(VersionVector other) { - if (other is MultiVersionVector vector1) + switch (other) { - var v2 = vector1.Versions.GetValueOrDefault(Node, 0L); - var mergedVersions = v2 >= Version ? vector1.Versions : vector1.Versions.SetItem(Node, Version); - return new MultiVersionVector(mergedVersions); - } - else if (other is SingleVersionVector vector) - { - if (Node == vector.Node) + case MultiVersionVector vector1: { - return Version >= vector.Version ? this : new SingleVersionVector(vector.Node, vector.Version); + var v2 = vector1.Versions.GetValueOrDefault(Node, 0L); + var mergedVersions = v2 >= Version ? vector1.Versions : vector1.Versions.SetItem(Node, Version); + return new MultiVersionVector(mergedVersions); } - else return new MultiVersionVector( - new KeyValuePair(Node, Version), - new KeyValuePair(vector.Node, vector.Version)); + case SingleVersionVector vector when Node == vector.Node: + return Version >= vector.Version ? this : new SingleVersionVector(vector.Node, vector.Version); + case SingleVersionVector vector: + return new MultiVersionVector( + new KeyValuePair(Node, Version), + new KeyValuePair(vector.Node, vector.Version)); + default: + throw new NotSupportedException("SingleVersionVector doesn't support merge with provided version vector"); } - else throw new NotSupportedException("SingleVersionVector doesn't support merge with provided version vector"); } public override ImmutableHashSet ModifiedByNodes => ImmutableHashSet.Create(Node); @@ -337,6 +352,10 @@ public MultiVersionVector(ImmutableDictionary nodeVersions) public override bool IsEmpty => Versions.IsEmpty; public override int Count => Versions.Count; public override IEnumerator> VersionEnumerator => Versions.GetEnumerator(); + + internal override IEnumerable<(UniqueAddress addr, long version)> InternalVersions => + Versions.Select(x => (x.Key, x.Value)); + public override VersionVector Increment(UniqueAddress node) => new MultiVersionVector(Versions.SetItem(node, Counter.GetAndIncrement())); @@ -346,25 +365,29 @@ public MultiVersionVector(ImmutableDictionary nodeVersions) public override VersionVector Merge(VersionVector other) { - if (other is MultiVersionVector vector1) + switch (other) { - var merged = vector1.Versions.ToBuilder(); - foreach (var pair in Versions) + case MultiVersionVector vector1: { - var mergedCurrentTime = merged.GetValueOrDefault(pair.Key, 0L); - if (pair.Value >= mergedCurrentTime) - merged.AddOrSet(pair.Key, pair.Value); - } + var merged = vector1.Versions.ToBuilder(); + foreach (var pair in Versions) + { + var mergedCurrentTime = merged.GetValueOrDefault(pair.Key, 0L); + if (pair.Value >= mergedCurrentTime) + merged[pair.Key] = pair.Value; + } - return new MultiVersionVector(merged.ToImmutable()); - } - else if (other is SingleVersionVector vector) - { - var v1 = Versions.GetValueOrDefault(vector.Node, 0L); - var merged = v1 >= vector.Version ? Versions : Versions.SetItem(vector.Node, vector.Version); - return new MultiVersionVector(merged); + return new MultiVersionVector(merged.ToImmutable()); + } + case SingleVersionVector vector: + { + var v1 = Versions.GetValueOrDefault(vector.Node, 0L); + var merged = v1 >= vector.Version ? Versions : Versions.SetItem(vector.Node, vector.Version); + return new MultiVersionVector(merged); + } + default: + throw new NotSupportedException("MultiVersionVector doesn't support merge with provided version vector"); } - else throw new NotSupportedException("MultiVersionVector doesn't support merge with provided version vector"); } public override ImmutableHashSet ModifiedByNodes => Versions.Keys.ToImmutableHashSet(); diff --git a/src/examples/Cluster/DData/DDataStressTest/DDataStressTest.csproj b/src/examples/Cluster/DData/DDataStressTest/DDataStressTest.csproj new file mode 100644 index 00000000000..ee9b9a3f54c --- /dev/null +++ b/src/examples/Cluster/DData/DDataStressTest/DDataStressTest.csproj @@ -0,0 +1,12 @@ + + + + Exe + netcoreapp3.1 + + + + + + + diff --git a/src/examples/Cluster/DData/DDataStressTest/Program.cs b/src/examples/Cluster/DData/DDataStressTest/Program.cs new file mode 100644 index 00000000000..9d366410436 --- /dev/null +++ b/src/examples/Cluster/DData/DDataStressTest/Program.cs @@ -0,0 +1,91 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Cluster; +using Akka.DistributedData; + +namespace DDataStressTest +{ + class Program + { + public const int NumElements = 25; + + public const int NumNodes = 10; + + public const int Iterations = 10000; + + private const string _user1 = "{\"username\":\"john\",\"password\":\"coltrane\"}"; + private const string _user2 = "{\"username\":\"sonny\",\"password\":\"rollins\"}"; + private const string _user3 = "{\"username\":\"charlie\",\"password\":\"parker\"}"; + private const string _user4 = "{\"username\":\"charles\",\"password\":\"mingus\"}"; + + + static async Task Main(string[] args) + { + UniqueAddress[] _nodes; + + string[] _elements; + + + // has data from all nodes + ORSet _c1 = ORSet.Empty; + + // has additional items from all nodes + ORSet _c2 = ORSet.Empty; + + // has removed items from all nodes + ORSet _c3 = ORSet.Empty; + + var newNodes = new List(NumNodes); + foreach (var i in Enumerable.Range(0, NumNodes)) + { + var address = new Address("akka.tcp", "Sys", "localhost", 2552 + i); + var uniqueAddress = new UniqueAddress(address, i); + newNodes.Add(uniqueAddress); + } + _nodes = newNodes.ToArray(); + + var newElements = new List(NumNodes); + foreach (var i in Enumerable.Range(0, NumElements)) + { + newElements.Add(i.ToString()); + } + _elements = newElements.ToArray(); + + _c1 = ORSet.Empty; + foreach (var node in _nodes) + { + _c1 = _c1.Add(node, _elements[0]); + } + + // add some data that _c2 doesn't have + _c2 = _c1; + foreach (var node in _nodes.Skip(NumNodes / 2)) + { + _c2 = _c2.Add(node, _elements[1]); + } + + _c3 = _c1; + foreach (var node in _nodes.Take(NumNodes / 2)) + { + _c3 = _c3.Remove(node, _elements[0]); + } + + var init = ORSet.Empty; + + foreach (var element in _elements) + { + foreach (var node in _nodes) + { + init = init.Add(node, element); + } + } + + _c1.Merge(init).Merge(_c2).Merge(_c3); + + await Task.Delay(5000); + } + } +} From 5294e9a3589e7be69cecb81d9a66f0c4da9f6fcb Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Fri, 21 May 2021 17:24:13 -0500 Subject: [PATCH 12/50] added `FastHash` benchmark (#5029) --- .../Remoting/FastHashBenchmarks.cs | 27 +++++++++++++++++++ .../CoreAPISpec.ApproveRemote.approved.txt | 1 + .../Akka.Remote/Properties/AssemblyInfo.cs | 1 + 3 files changed, 29 insertions(+) create mode 100644 src/benchmark/Akka.Benchmarks/Remoting/FastHashBenchmarks.cs diff --git a/src/benchmark/Akka.Benchmarks/Remoting/FastHashBenchmarks.cs b/src/benchmark/Akka.Benchmarks/Remoting/FastHashBenchmarks.cs new file mode 100644 index 00000000000..87d3155de53 --- /dev/null +++ b/src/benchmark/Akka.Benchmarks/Remoting/FastHashBenchmarks.cs @@ -0,0 +1,27 @@ +using System; +using System.Collections.Generic; +using System.Text; +using Akka.Benchmarks.Configurations; +using Akka.Remote.Serialization; +using BenchmarkDotNet.Attributes; + +namespace Akka.Benchmarks.Remoting +{ + [Config(typeof(MicroBenchmarkConfig))] + public class FastHashBenchmarks + { + public const string HashKey1 = "hash1"; + + [Benchmark] + public int FastHash_OfString() + { + return FastHash.OfString(HashKey1); + } + + [Benchmark] + public int FastHash_OfStringUnsafe() + { + return FastHash.OfStringFast(HashKey1); + } + } +} diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveRemote.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveRemote.approved.txt index 766d4f50243..c03de639b12 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveRemote.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveRemote.approved.txt @@ -1,4 +1,5 @@ [assembly: System.Reflection.AssemblyMetadataAttribute("RepositoryUrl", "https://github.com/akkadotnet/akka.net")] +[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Benchmarks")] [assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster")] [assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.Metrics")] [assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.Sharding")] diff --git a/src/core/Akka.Remote/Properties/AssemblyInfo.cs b/src/core/Akka.Remote/Properties/AssemblyInfo.cs index 5327178cc1e..5cb60a08b60 100644 --- a/src/core/Akka.Remote/Properties/AssemblyInfo.cs +++ b/src/core/Akka.Remote/Properties/AssemblyInfo.cs @@ -34,3 +34,4 @@ [assembly: InternalsVisibleTo("Akka.Cluster.Tools")] [assembly: InternalsVisibleTo("Akka.Cluster.Sharding")] [assembly: InternalsVisibleTo("Akka.Cluster.Metrics")] +[assembly: InternalsVisibleTo("Akka.Benchmarks")] \ No newline at end of file From 61d08b7040488f3cba4ba3ec969417339349e17b Mon Sep 17 00:00:00 2001 From: Ismael Hamed <1279846+ismaelhamed@users.noreply.github.com> Date: Sat, 22 May 2021 19:35:49 +0200 Subject: [PATCH 13/50] Fix a potential race condition in FileSubscriber (#5032) --- .../Implementation/IO/FileSubscriber.cs | 73 +++++++++---------- 1 file changed, 35 insertions(+), 38 deletions(-) diff --git a/src/core/Akka.Streams/Implementation/IO/FileSubscriber.cs b/src/core/Akka.Streams/Implementation/IO/FileSubscriber.cs index dfcc731ffd5..c5c80a65e71 100644 --- a/src/core/Akka.Streams/Implementation/IO/FileSubscriber.cs +++ b/src/core/Akka.Streams/Implementation/IO/FileSubscriber.cs @@ -13,7 +13,6 @@ using Akka.IO; using Akka.Streams.Actors; using Akka.Streams.IO; -using Akka.Util; namespace Akka.Streams.Implementation.IO { @@ -35,20 +34,20 @@ internal class FileSubscriber : ActorSubscriber /// TBD /// TBD public static Props Props( - FileInfo f, - TaskCompletionSource completionPromise, - int bufferSize, - long startPosition, + FileInfo f, + TaskCompletionSource completionPromise, + int bufferSize, + long startPosition, FileMode fileMode, bool autoFlush = false, object flushCommand = null) { - if(bufferSize <= 0) + if (bufferSize <= 0) throw new ArgumentException($"bufferSize must be > 0 (was {bufferSize})", nameof(bufferSize)); - if(startPosition < 0) + if (startPosition < 0) throw new ArgumentException($"startPosition must be >= 0 (was {startPosition})", nameof(startPosition)); - return Actor.Props.Create(()=> new FileSubscriber(f, completionPromise, bufferSize, startPosition, fileMode, autoFlush, flushCommand)) + return Actor.Props.Create(() => new FileSubscriber(f, completionPromise, bufferSize, startPosition, fileMode, autoFlush, flushCommand)) .WithDeploy(Deploy.Local); } @@ -74,12 +73,12 @@ internal class FileSubscriber : ActorSubscriber /// /// public FileSubscriber( - FileInfo f, - TaskCompletionSource completionPromise, - int bufferSize, - long startPosition, + FileInfo f, + TaskCompletionSource completionPromise, + int bufferSize, + long startPosition, FileMode fileMode, - bool autoFlush, + bool autoFlush, object flushCommand) { _f = f; @@ -111,7 +110,7 @@ protected override void PreStart() } catch (Exception ex) { - _completionPromise.TrySetResult(IOResult.Failed(_bytesWritten, ex)); + CloseAndComplete(IOResult.Failed(_bytesWritten, ex)); Cancel(); } } @@ -128,32 +127,23 @@ protected override bool Receive(object message) case OnNext next: try { - var byteString = (ByteString) next.Element; + var byteString = (ByteString)next.Element; var bytes = byteString.ToArray(); - try - { - _chan.Write(bytes, 0, bytes.Length); - _bytesWritten += bytes.Length; - if (_autoFlush) - _chan.Flush(true); - } - catch (Exception ex) - { - _log.Error(ex, $"Tearing down FileSink({_f.FullName}) due to write error."); - _completionPromise.TrySetResult(IOResult.Failed(_bytesWritten, ex)); - Context.Stop(Self); - } + _chan.Write(bytes, 0, bytes.Length); + _bytesWritten += bytes.Length; + if (_autoFlush) + _chan.Flush(true); } catch (Exception ex) { - _completionPromise.TrySetResult(IOResult.Failed(_bytesWritten, ex)); + CloseAndComplete(IOResult.Failed(_bytesWritten, ex)); Cancel(); } return true; case OnError error: - _log.Error(error.Cause, $"Tearing down FileSink({_f.FullName}) due to upstream error"); - _completionPromise.TrySetResult(IOResult.Failed(_bytesWritten, error.Cause)); + _log.Error(error.Cause, "Tearing down FileSink({0}) due to upstream error", _f.FullName); + CloseAndComplete(IOResult.Failed(_bytesWritten, error.Cause)); Context.Stop(Self); return true; @@ -164,8 +154,8 @@ protected override bool Receive(object message) } catch (Exception ex) { - _completionPromise.TrySetResult(IOResult.Failed(_bytesWritten, ex)); - } + CloseAndComplete(IOResult.Failed(_bytesWritten, ex)); + } Context.Stop(Self); return true; @@ -176,8 +166,8 @@ protected override bool Receive(object message) } catch (Exception ex) { - _log.Error(ex, $"Tearing down FileSink({_f.FullName}). File flush failed."); - _completionPromise.TrySetResult(IOResult.Failed(_bytesWritten, ex)); + _log.Error(ex, "Tearing down FileSink({0}). File flush failed.", _f.FullName); + CloseAndComplete(IOResult.Failed(_bytesWritten, ex)); Context.Stop(Self); } return true; @@ -190,18 +180,25 @@ protected override bool Receive(object message) /// TBD /// protected override void PostStop() + { + CloseAndComplete(IOResult.Success(_bytesWritten)); + base.PostStop(); + } + + private void CloseAndComplete(IOResult result) { try { + // close the channel/file before completing the promise, allowing the + // file to be deleted, which would not work (on some systems) if the + // file is still open for writing _chan?.Dispose(); + _completionPromise.TrySetResult(result); } catch (Exception ex) { _completionPromise.TrySetResult(IOResult.Failed(_bytesWritten, ex)); } - - _completionPromise.TrySetResult(IOResult.Success(_bytesWritten)); - base.PostStop(); } } } From 99be70b642501e8abdd9d2864ba3b19df5ad48cc Mon Sep 17 00:00:00 2001 From: Ismael Hamed <1279846+ismaelhamed@users.noreply.github.com> Date: Mon, 24 May 2021 16:17:34 +0200 Subject: [PATCH 14/50] Consolidate intercept (#5033) * Removed a bunch of obsolete and unused methods * Consolidate Intercept methods, and add the new AssertThrows * Fixed a few tests that where dealing with the AggregateExceptions themselves --- .../Transport/AkkaProtocolSpec.cs | 6 +- .../Akka.Streams.Tests/Dsl/FlowAskSpec.cs | 6 +- .../Akka.Tests.Shared.Internals/AkkaSpec.cs | 98 +++++++++++++------ .../Actor/CoordinatedShutdownSpec.cs | 13 +-- 4 files changed, 73 insertions(+), 50 deletions(-) diff --git a/src/core/Akka.Remote.Tests/Transport/AkkaProtocolSpec.cs b/src/core/Akka.Remote.Tests/Transport/AkkaProtocolSpec.cs index 4a87a588930..1996822dd1b 100644 --- a/src/core/Akka.Remote.Tests/Transport/AkkaProtocolSpec.cs +++ b/src/core/Akka.Remote.Tests/Transport/AkkaProtocolSpec.cs @@ -423,11 +423,7 @@ public void ProtocolStateActor_must_give_up_outbound_after_connection_timeout() Watch(stateActor); - // inner exception will be a TimeoutException - Intercept(() => - { - statusPromise.Task.Wait(TimeSpan.FromSeconds(5)).Should().BeTrue(); - }); + Intercept(() => statusPromise.Task.Wait(TimeSpan.FromSeconds(5)).Should().BeTrue()); ExpectTerminated(stateActor); } diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowAskSpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowAskSpec.cs index 7a5999c128a..c9309ec143e 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FlowAskSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FlowAskSpec.cs @@ -266,13 +266,11 @@ public void Flow_with_ask_must_produce_future_elements_in_order() .Ask(r, _timeout, 4) .RunWith(Sink.Ignore(), _materializer); - Intercept(() => + Intercept(() => { r.Tell(PoisonPill.Instance); done.Wait(RemainingOrDefault); - }) - .Flatten() - .InnerException.Should().BeOfType(); + }); }, _materializer); diff --git a/src/core/Akka.Tests.Shared.Internals/AkkaSpec.cs b/src/core/Akka.Tests.Shared.Internals/AkkaSpec.cs index 9b6a2c94e7f..e82ca53eca6 100644 --- a/src/core/Akka.Tests.Shared.Internals/AkkaSpec.cs +++ b/src/core/Akka.Tests.Shared.Internals/AkkaSpec.cs @@ -113,7 +113,7 @@ private static string GetCallerName() } public static Config AkkaSpecConfig { get { return _akkaSpecConfig; } } - + protected T ExpectMsgPf(TimeSpan? timeout, string hint, Func function) { MessageEnvelope envelope; @@ -136,30 +136,88 @@ protected T ExpectMsgPf(string hint, Func pf) return pf.Invoke(t); } - + /// + /// Intercept and return an exception that's expected to be thrown by the passed function value. The thrown + /// exception must be an instance of the type specified by the type parameter of this method. This method + /// invokes the passed function. If the function throws an exception that's an instance of the specified type, + /// this method returns that exception. Else, whether the passed function returns normally or completes abruptly + /// with a different exception, this method throws . + /// + /// Also note that the difference between this method and is that this method + /// returns the expected exception, so it lets you perform further assertions on that exception. By contrast, + /// the indicates to the reader of the code that nothing further is expected + /// about the thrown exception other than its type. The recommended usage is to use + /// by default, intercept only when you need to inspect the caught exception further. + /// + /// + /// The action that should throw the expected exception + /// The intercepted exception, if it is of the expected type + /// If the passed action does not complete abruptly with an exception that's an instance of the specified type. protected T Intercept(Action actionThatThrows) where T : Exception { - return Assert.Throws(() => actionThatThrows()); - } + try + { + actionThatThrows(); + } + catch (Exception ex) + { + var exception = ex is AggregateException aggregateException + ? aggregateException.Flatten().InnerExceptions[0] + : ex; + + var exceptionType = typeof(T); + return exceptionType == exception.GetType() + ? (T)exception + : throw new ThrowsException(exceptionType, exception); + } - protected void Intercept(Action actionThatThrows) + throw new ThrowsException(typeof(T)); + } + + /// + /// Ensure that an expected exception is thrown by the passed function value. The thrown exception must be an + /// instance of the type specified by the type parameter of this method. This method invokes the passed + /// function. If the function throws an exception that's an instance of the specified type, this method returns + /// void. Else, whether the passed function returns normally or completes abruptly with a different + /// exception, this method throws . + /// + /// Also note that the difference between this method and is that this method + /// does not return the expected exception, so it does not let you perform further assertions on that exception. + /// It also indicates to the reader of the code that nothing further is expected about the thrown exception + /// other than its type. The recommended usage is to use by default, + /// only when you need to inspect the caught exception further. + /// + /// + /// The action that should throw the expected exception + /// If the passed action does not complete abruptly with an exception that's an instance of the specified type. + protected void AssertThrows(Action actionThatThrows) where T : Exception { try { actionThatThrows(); } - catch(Exception) + catch (Exception ex) { - return; + var exception = ex is AggregateException aggregateException + ? aggregateException.Flatten().InnerExceptions[0] + : ex; + + var exceptionType = typeof(T); + if (exceptionType == exception.GetType()) + return; + + throw new ThrowsException(exceptionType, exception); } - throw new ThrowsException(typeof(Exception)); + + throw new ThrowsException(typeof(T)); } - - protected async Task InterceptAsync(Func asyncActionThatThrows) + + [Obsolete("Use AssertThrows instead.")] + protected void Intercept(Action actionThatThrows) { try { - await asyncActionThatThrows(); + actionThatThrows(); } catch(Exception) { @@ -168,24 +226,6 @@ protected async Task InterceptAsync(Func asyncActionThatThrows) throw new ThrowsException(typeof(Exception)); } - [Obsolete("Use Intercept instead. This member will be removed.")] - protected void intercept(Action actionThatThrows) where T : Exception - { - Assert.Throws(() => actionThatThrows()); - } - - [Obsolete("Use ExpectMsgPf instead. This member will be removed.")] - protected T expectMsgPF(string hint, Func pf) - { - return ExpectMsgPf(hint, pf); - } - - [Obsolete("Use ExpectMsgPf instead. This member will be removed.")] - protected T expectMsgPF(TimeSpan duration, string hint, Func pf) - { - return ExpectMsgPf(duration, hint, pf); - } - protected void MuteDeadLetters(params Type[] messageClasses) { if (!Sys.Log.IsDebugEnabled) diff --git a/src/core/Akka.Tests/Actor/CoordinatedShutdownSpec.cs b/src/core/Akka.Tests/Actor/CoordinatedShutdownSpec.cs index ac352e677b3..6fe34ceae13 100644 --- a/src/core/Akka.Tests/Actor/CoordinatedShutdownSpec.cs +++ b/src/core/Akka.Tests/Actor/CoordinatedShutdownSpec.cs @@ -337,18 +337,7 @@ public void CoordinatedShutdown_must_abort_if_recover_is_off() var result = co.Run(CoordinatedShutdown.UnknownReason.Instance); ExpectMsg("B"); - Intercept(() => - { - if (result.Wait(RemainingOrDefault)) - { - result.Exception?.Flatten().InnerException.Should().BeOfType(); - } - else - { - throw new Exception("CoordinatedShutdown task did not complete"); - } - }); - + Intercept(() => result.Wait(RemainingOrDefault)); ExpectNoMsg(TimeSpan.FromMilliseconds(200)); // C not run } From 6f5b003dce9d3484903632631ad16a95cda5ad3d Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Mon, 24 May 2021 15:04:39 -0500 Subject: [PATCH 15/50] remove `ActorPath.ToString` call from `ResolveActorRefWithLocalAddress` (#5034) * remove `ActorPath.ToString` call from `ResolveActorRefWithLocalAddress` * remove comment --- src/core/Akka.Remote/RemoteActorRefProvider.cs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/core/Akka.Remote/RemoteActorRefProvider.cs b/src/core/Akka.Remote/RemoteActorRefProvider.cs index e24674c6e7f..317df8d5f3f 100644 --- a/src/core/Akka.Remote/RemoteActorRefProvider.cs +++ b/src/core/Akka.Remote/RemoteActorRefProvider.cs @@ -489,8 +489,7 @@ public IInternalActorRef ResolveActorRefWithLocalAddress(string path, Address lo //the actor's local address was already included in the ActorPath if (HasAddress(actorPath.Address)) { - // HACK: needed to make ActorSelections work - if (actorPath.ToStringWithoutAddress().Equals("/")) + if (actorPath is RootActorPath) return RootGuardian; return _local.ResolveActorRef(RootGuardian, actorPath.ElementsWithUid); } From 3f8fa299f9cf7f8750734a023f307d2e376ff9c6 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Tue, 25 May 2021 04:12:24 +0700 Subject: [PATCH 16/50] Improve MurmurHash string hash memory footprint (#5028) * Reduce memory footprint * Fix enumerator, off by 1 error --- src/Akka.sln | 2 +- src/contrib/cluster/Akka.DistributedData/ORSet.cs | 6 +++--- src/core/Akka/Actor/Address.cs | 2 +- src/core/Akka/Util/MurmurHash.cs | 6 +++--- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/Akka.sln b/src/Akka.sln index 2c21386bd39..52dbe1dacd7 100644 --- a/src/Akka.sln +++ b/src/Akka.sln @@ -244,7 +244,7 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Samples.Akka.AspNetCore", " EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SerializationBenchmarks", "benchmark\SerializationBenchmarks\SerializationBenchmarks.csproj", "{2E4B9584-42CC-4D17-B719-9F462B16C94D}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DDataStressTest", "examples\Cluster\DData\DDataStressTest\DDataStressTest.csproj", "{44B3DDD6-6103-4E8F-8AC2-0F4BA3CF6B50}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DDataStressTest", "examples\Cluster\DData\DDataStressTest\DDataStressTest.csproj", "{44B3DDD6-6103-4E8F-8AC2-0F4BA3CF6B50}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution diff --git a/src/contrib/cluster/Akka.DistributedData/ORSet.cs b/src/contrib/cluster/Akka.DistributedData/ORSet.cs index 95e50398df1..7ebbd63bc7f 100644 --- a/src/contrib/cluster/Akka.DistributedData/ORSet.cs +++ b/src/contrib/cluster/Akka.DistributedData/ORSet.cs @@ -712,9 +712,9 @@ private ORSet MergeRemoveDelta(RemoveDeltaOperation delta) { while (deleteDots.MoveNext()) { - var curr = deleteDots.Current; - deleteDotNodes.Add(curr.Key); - deleteDotsAreGreater &= (thisDot != null && (thisDot.VersionAt(curr.Key) <= curr.Value)); + var current = deleteDots.Current; + deleteDotNodes.Add(current.Key); + deleteDotsAreGreater &= (thisDot != null && (thisDot.VersionAt(current.Key) <= current.Value)); } } diff --git a/src/core/Akka/Actor/Address.cs b/src/core/Akka/Actor/Address.cs index 9e13a9374d2..9e638d01428 100644 --- a/src/core/Akka/Actor/Address.cs +++ b/src/core/Akka/Actor/Address.cs @@ -40,7 +40,7 @@ public int Compare(Address x, Address y) if (result != 0) return result; result = string.CompareOrdinal(x.System, y.System); if (result != 0) return result; - result = string.CompareOrdinal(x.Host ?? "", y.Host ?? ""); + result = string.CompareOrdinal(x.Host ?? string.Empty, y.Host ?? string.Empty); if (result != 0) return result; result = (x.Port ?? 0).CompareTo(y.Port ?? 0); return result; diff --git a/src/core/Akka/Util/MurmurHash.cs b/src/core/Akka/Util/MurmurHash.cs index 511e8f5c4cd..f9aa8df2af2 100644 --- a/src/core/Akka/Util/MurmurHash.cs +++ b/src/core/Akka/Util/MurmurHash.cs @@ -202,20 +202,20 @@ public static int StringHash(string s) { unchecked { - var sChar = s.ToCharArray(); + var span = s.AsSpan(); var h = StartHash((uint)s.Length * StringSeed); var c = HiddenMagicA; var k = HiddenMagicB; var j = 0; while (j + 1 < s.Length) { - var i = (uint)((sChar[j] << 16) + sChar[j + 1]); + var i = (uint)((span[j] << 16) + span[j + 1]); h = ExtendHash(h, i, c, k); c = NextMagicA(c); k = NextMagicB(k); j += 2; } - if (j < s.Length) h = ExtendHash(h, sChar[j], c, k); + if (j < s.Length) h = ExtendHash(h, span[j], c, k); return (int)FinalizeHash(h); } } From 431b69eeed610c804aa3aa463e966d7d4d6424ea Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Tue, 25 May 2021 11:21:29 +0700 Subject: [PATCH 17/50] Turn on DData related ClusterSharding MNTR specs (#4926) * Merge LMDB config into DistributedData * Remove DistributedData config from ClusterSharding config, only values that overrides/adds to the config are retained, config is then composed together in code. * Turn on DData related specs in ClusterSharding MNTR specs. * Fix Multinode spec ClusterShardingSpec * Fix DData remember-entities feature * Revert "Fix Multinode spec ClusterShardingSpec" This reverts commit 80897e5bcc17be7176e424b6607fb4f62482318a. * Fix spec * Fix how and when LMDB disposes its resources * Harden remember-entities spec, spec must work when remember-entities are both set and unset Co-authored-by: Aaron Stannard --- .../ClusterShardingRememberEntitiesSpec.cs | 81 +++++++--- .../ClusterShardingSpec.cs | 19 +-- .../Akka.Cluster.Sharding/ClusterSharding.cs | 3 +- .../ClusterShardingGuardian.cs | 4 +- .../Akka.Cluster.Sharding/DDataShard.cs | 32 +++- .../DDataShardCoordinator.cs | 2 +- .../Akka.Cluster.Sharding/reference.conf | 101 +----------- .../LmdbDurableStore.cs | 145 +++++++++--------- 8 files changed, 180 insertions(+), 207 deletions(-) diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingRememberEntitiesSpec.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingRememberEntitiesSpec.cs index 77ad9462c0b..ebaa63fdeb3 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingRememberEntitiesSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingRememberEntitiesSpec.cs @@ -13,6 +13,7 @@ using Akka.Cluster.TestKit; using Akka.Configuration; using Akka.Remote.TestKit; +using Akka.TestKit; using Akka.Util; using FluentAssertions; @@ -21,13 +22,15 @@ namespace Akka.Cluster.Sharding.Tests public abstract class ClusterShardingRememberEntitiesSpecConfig : MultiNodeConfig { public string Mode { get; } + public bool RememberEntities { get; } public RoleName First { get; } public RoleName Second { get; } public RoleName Third { get; } - protected ClusterShardingRememberEntitiesSpecConfig(string mode) + protected ClusterShardingRememberEntitiesSpecConfig(string mode, bool rememberEntities) { Mode = mode; + RememberEntities = rememberEntities; First = Role("first"); Second = Role("second"); Third = Role("third"); @@ -78,24 +81,43 @@ class = ""Akka.Cluster.Sharding.Tests.MemoryJournalShared, Akka.Cluster.Sharding } public class PersistentClusterShardingRememberEntitiesSpecConfig : ClusterShardingRememberEntitiesSpecConfig { - public PersistentClusterShardingRememberEntitiesSpecConfig() : base("persistence") { } + public PersistentClusterShardingRememberEntitiesSpecConfig(bool rememberEntities) : base("persistence", rememberEntities) { } } public class DDataClusterShardingRememberEntitiesSpecConfig : ClusterShardingRememberEntitiesSpecConfig { - public DDataClusterShardingRememberEntitiesSpecConfig() : base("ddata") { } + public DDataClusterShardingRememberEntitiesSpecConfig(bool rememberEntities) : base("ddata", rememberEntities) { } } - public class PersistentClusterShardingRememberEntitiesSpec : ClusterShardingRememberEntitiesSpec + public abstract class PersistentClusterShardingRememberEntitiesSpec : ClusterShardingRememberEntitiesSpec { - public PersistentClusterShardingRememberEntitiesSpec() : this(new PersistentClusterShardingRememberEntitiesSpecConfig()) { } - protected PersistentClusterShardingRememberEntitiesSpec(PersistentClusterShardingRememberEntitiesSpecConfig config) : base(config, typeof(PersistentClusterShardingRememberEntitiesSpec)) { } + protected PersistentClusterShardingRememberEntitiesSpec(bool rememberEntities) : this(new PersistentClusterShardingRememberEntitiesSpecConfig(rememberEntities)) { } + private PersistentClusterShardingRememberEntitiesSpec(PersistentClusterShardingRememberEntitiesSpecConfig config) : base(config, typeof(PersistentClusterShardingRememberEntitiesSpec)) { } } - // DData has no support for remember-entities at this point - public class DDataClusterShardingRememberEntitiesSpec : ClusterShardingRememberEntitiesSpec + public abstract class DDataClusterShardingRememberEntitiesSpec : ClusterShardingRememberEntitiesSpec { - public DDataClusterShardingRememberEntitiesSpec() : this(new DDataClusterShardingRememberEntitiesSpecConfig()) { } - protected DDataClusterShardingRememberEntitiesSpec(DDataClusterShardingRememberEntitiesSpecConfig config) : base(config, typeof(PersistentClusterShardingRememberEntitiesSpec)) { } + protected DDataClusterShardingRememberEntitiesSpec(bool rememberEntities) : this(new DDataClusterShardingRememberEntitiesSpecConfig(rememberEntities)) { } + private DDataClusterShardingRememberEntitiesSpec(DDataClusterShardingRememberEntitiesSpecConfig config) : base(config, typeof(DDataClusterShardingRememberEntitiesSpec)) { } + } + + public class PersistentClusterShardingRememberEntitiesDefaultSpec : PersistentClusterShardingRememberEntitiesSpec + { + public PersistentClusterShardingRememberEntitiesDefaultSpec() : base(false) { } + } + + public class PersistentClusterShardingRememberEntitiesEnabledSpec : PersistentClusterShardingRememberEntitiesSpec + { + public PersistentClusterShardingRememberEntitiesEnabledSpec() : base(true) { } + } + + public class DDataClusterShardingRememberEntitiesDefaultSpec : DDataClusterShardingRememberEntitiesSpec + { + public DDataClusterShardingRememberEntitiesDefaultSpec() : base(false) { } + } + + public class DDataClusterShardingRememberEntitiesEnabledSpec : DDataClusterShardingRememberEntitiesSpec + { + public DDataClusterShardingRememberEntitiesEnabledSpec() : base(true) { } } public abstract class ClusterShardingRememberEntitiesSpec : MultiNodeClusterSpec @@ -141,7 +163,7 @@ protected override bool Receive(object message) return null; }; - private Lazy _region; + private readonly Lazy _region; private readonly ClusterShardingRememberEntitiesSpecConfig _config; private readonly List _storageLocations; @@ -191,23 +213,31 @@ private void Join(RoleName from, RoleName to) private void StartSharding(ActorSystem sys, IActorRef probe) { - var allocationStrategy = ShardAllocationStrategy.LeastShardAllocationStrategy(absoluteLimit: 1, relativeLimit: 0.1); ClusterSharding.Get(sys).Start( typeName: "Entity", entityProps: Props.Create(() => new TestEntity(probe)), - settings: ClusterShardingSettings.Create(Sys).WithRememberEntities(true), + settings: ClusterShardingSettings.Create(Sys).WithRememberEntities(_config.RememberEntities), extractEntityId: extractEntityId, extractShardId: extractShardId); } + private Started ExpectEntityRestarted(ActorSystem sys, int evt, TestProbe probe, TestProbe entityProbe, TimeSpan? remaining = null) + { + if (!_config.RememberEntities) + { + probe.Send(ClusterSharding.Get(sys).ShardRegion("Entity"), evt); + probe.ExpectMsg(1); + } + + return entityProbe.ExpectMsg(remaining ?? TimeSpan.FromSeconds(30)); + } + [MultiNodeFact] public void Cluster_sharding_with_remember_entities_specs() { if (!IsDDataMode) Cluster_sharding_with_remember_entities_should_setup_shared_journal(); Cluster_sharding_with_remember_entities_should_start_remembered_entities_when_coordinator_fail_over(); - - // https://github.com/akkadotnet/akka.net/issues/4262 - need to resolve this and then we can remove if statement - if (!IsDDataMode) Cluster_sharding_with_remember_entities_should_start_remembered_entities_in_new_cluster(); + Cluster_sharding_with_remember_entities_should_start_remembered_entities_in_new_cluster(); } public void Cluster_sharding_with_remember_entities_should_setup_shared_journal() @@ -245,19 +275,23 @@ public void Cluster_sharding_with_remember_entities_should_start_remembered_enti { Within(TimeSpan.FromSeconds(30), () => { + var probe = CreateTestProbe(); + var entityProbe = CreateTestProbe(); + Join(_config.Second, _config.Second); RunOn(() => { - StartSharding(Sys, TestActor); - _region.Value.Tell(1); - ExpectMsg(); + StartSharding(Sys, entityProbe.Ref); + probe.Send(_region.Value, 1); + probe.ExpectMsg(1); + entityProbe.ExpectMsg(); }, _config.Second); EnterBarrier("second-started"); Join(_config.Third, _config.Second); RunOn(() => { - StartSharding(Sys, TestActor); + StartSharding(Sys, entityProbe.Ref); }, _config.Third); RunOn(() => @@ -289,7 +323,7 @@ public void Cluster_sharding_with_remember_entities_should_start_remembered_enti RunOn(() => { - ExpectMsg(Remaining); + ExpectEntityRestarted(Sys, 1, probe, entityProbe, Remaining); }, _config.Third); EnterBarrier("after-2"); @@ -313,6 +347,7 @@ public void Cluster_sharding_with_remember_entities_should_start_remembered_enti // no nodes left of the original cluster, start a new cluster var sys2 = ActorSystem.Create(Sys.Name, Sys.Settings.Config); + var entityProbe2 = CreateTestProbe(sys2); var probe2 = CreateTestProbe(sys2); if (!IsDDataMode) @@ -327,8 +362,8 @@ public void Cluster_sharding_with_remember_entities_should_start_remembered_enti } Cluster.Get(sys2).Join(Cluster.Get(sys2).SelfAddress); - StartSharding(sys2, probe2.Ref); - probe2.ExpectMsg(TimeSpan.FromSeconds(20)); + StartSharding(sys2, entityProbe2.Ref); + ExpectEntityRestarted(sys2, 1, probe2, entityProbe2); Shutdown(sys2); }, _config.Third); diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingSpec.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingSpec.cs index 7a5d58c9346..d6d48dbfc7e 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingSpec.cs @@ -83,7 +83,7 @@ protected ClusterShardingSpecConfig(string mode, string entityRecoveryStrategy = }} distributed-data.durable.lmdb {{ dir = ""target/ClusterShardingSpec/sharding-ddata"" - map-size = 10000000 + map-size = 10 MiB }} }} akka.testconductor.barrier-timeout = 70s @@ -491,6 +491,10 @@ private Props CoordinatorProps(string typeName, bool rebalanceEntities, bool rem .WithFallback(Sys.Settings.Config.GetConfig("akka.cluster.sharding")); var settings = ClusterShardingSettings.Create(config, Sys.Settings.Config.GetConfig("akka.cluster.singleton")) .WithRememberEntities(rememberEntities); + var majorityMinCap = Sys.Settings.Config.GetInt("akka.cluster.sharding.distributed-data.majority-min-cap"); + if (IsDDataMode) + return DDataShardCoordinator.Props(typeName, settings, allocationStrategy, ReplicatorRef, + majorityMinCap, rememberEntities); return PersistentShardCoordinator.Props(typeName, settings, allocationStrategy); } @@ -540,14 +544,11 @@ public void ClusterSharding_specs() ClusterSharding_should_be_easy_API_for_starting(); - if (!IsDDataMode) - { - PersistentClusterShards_should_recover_entities_upon_restart(); - PersistentClusterShards_should_permanently_stop_entities_which_passivate(); - PersistentClusterShards_should_restart_entities_which_stop_without_passivation(); - PersistentClusterShards_should_be_migrated_to_new_regions_upon_region_failure(); - PersistentClusterShards_should_ensure_rebalance_restarts_shards(); - } + PersistentClusterShards_should_recover_entities_upon_restart(); + PersistentClusterShards_should_permanently_stop_entities_which_passivate(); + PersistentClusterShards_should_restart_entities_which_stop_without_passivation(); + PersistentClusterShards_should_be_migrated_to_new_regions_upon_region_failure(); + PersistentClusterShards_should_ensure_rebalance_restarts_shards(); } public void ClusterSharding_should_setup_shared_journal() diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ClusterSharding.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ClusterSharding.cs index e6240fe3a29..c961f9d354c 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ClusterSharding.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ClusterSharding.cs @@ -277,7 +277,8 @@ public ClusterSharding(ExtendedActorSystem system) /// TBD public static Config DefaultConfig() { - return ConfigurationFactory.FromResource("Akka.Cluster.Sharding.reference.conf"); + return ConfigurationFactory.FromResource("Akka.Cluster.Sharding.reference.conf") + .WithFallback(DistributedData.DistributedData.DefaultConfig()); } /// diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ClusterShardingGuardian.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ClusterShardingGuardian.cs index 5931d4aecc7..b28aca1089d 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ClusterShardingGuardian.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ClusterShardingGuardian.cs @@ -251,7 +251,9 @@ public ClusterShardingGuardian() private ReplicatorSettings GetReplicatorSettings(ClusterShardingSettings shardingSettings) { - var configuredSettings = ReplicatorSettings.Create(Context.System.Settings.Config.GetConfig("akka.cluster.sharding.distributed-data")); + var config = Context.System.Settings.Config.GetConfig("akka.cluster.sharding.distributed-data") + .WithFallback(Context.System.Settings.Config.GetConfig("akka.cluster.distributed-data")); + var configuredSettings = ReplicatorSettings.Create(config); var settingsWithRoles = configuredSettings.WithRole(shardingSettings.Role); if (shardingSettings.RememberEntities) return settingsWithRoles; diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/DDataShard.cs b/src/contrib/cluster/Akka.Cluster.Sharding/DDataShard.cs index 68940789db1..d161b485bb6 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/DDataShard.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/DDataShard.cs @@ -127,7 +127,37 @@ internal sealed class DDataShard : ActorBase, IShard, IWithUnboundedStash, IWith } } - public void EntityTerminated(IActorRef tref) => this.BaseEntityTerminated(tref); + public void EntityTerminated(IActorRef tref) + { + if (!IdByRef.TryGetValue(tref, out var id)) return; + IdByRef = IdByRef.Remove(tref); + RefById = RefById.Remove(id); + + if (PassivateIdleTask != null) + { + LastMessageTimestamp = LastMessageTimestamp.Remove(id); + } + + if (MessageBuffers.TryGetValue(id, out var buffer) && buffer.Count != 0) + { + //Note; because we're not persisting the EntityStopped, we don't need + // to persist the EntityStarted either. + Log.Debug("Starting entity [{0}] again, there are buffered messages for it", id); + this.SendMessageBuffer(new Shard.EntityStarted(id)); + } + else + { + if (!Passivating.Contains(tref)) + { + Log.Debug("Entity [{0}] stopped without passivating, will restart after backoff", id); + Context.System.Scheduler.ScheduleTellOnce(Settings.TuningParameters.EntityRestartBackoff, Self, new Shard.RestartEntity(id), ActorRefs.NoSender); + } + else + ProcessChange(new Shard.EntityStopped(id), this.PassivateCompleted); + } + + Passivating = Passivating.Remove(tref); + } public void DeliverTo(string id, object message, object payload, IActorRef sender) { diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/DDataShardCoordinator.cs b/src/contrib/cluster/Akka.Cluster.Sharding/DDataShardCoordinator.cs index d7da03d62fd..b7f14097ca6 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/DDataShardCoordinator.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/DDataShardCoordinator.cs @@ -345,7 +345,7 @@ private void SendCoordinatorStateUpdate(PersistentShardCoordinator.IDomainEvent { var s = CurrentState.Updated(e); _replicator.Tell(Dsl.Update(_coordinatorStateKey, - new LWWRegister(Cluster.SelfUniqueAddress, PersistentShardCoordinator.State.Empty), + new LWWRegister(Cluster.SelfUniqueAddress, PersistentShardCoordinator.State.Empty.WithRememberEntities(Settings.RememberEntities)), _writeConsistency, e, reg => reg.WithValue(Cluster.SelfUniqueAddress, s))); diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/reference.conf b/src/contrib/cluster/Akka.Cluster.Sharding/reference.conf index f94ea77704b..9f3e31d0a19 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/reference.conf +++ b/src/contrib/cluster/Akka.Cluster.Sharding/reference.conf @@ -184,110 +184,13 @@ akka.cluster.sharding { distributed-data { # minCap parameter to MajorityWrite and MajorityRead consistency level. majority-min-cap = 5 - #durable.keys = ["shard-*"] + + durable.keys = ["shard-*"] # When using many entities with "remember entities" the Gossip message # can become to large if including to many in same message. Limit to # the same number as the number of ORSet per shard. max-delta-elements = 5 - - # Actor name of the Replicator actor, /system/ddataReplicator - name = ddataReplicator - - # Replicas are running on members tagged with this role. - # All members are used if undefined or empty. - role = "" - - # How often the Replicator should send out gossip information - gossip-interval = 2 s - - # How often the subscribers will be notified of changes, if any - notify-subscribers-interval = 500 ms - - # The id of the dispatcher to use for Replicator actors. If not specified - # default dispatcher is used. - # If specified you need to define the settings of the actual dispatcher. - use-dispatcher = "" - - # How often the Replicator checks for pruning of data associated with - # removed cluster nodes. - pruning-interval = 30 s - - # How long time it takes (worst case) to spread the data to all other replica nodes. - # This is used when initiating and completing the pruning process of data associated - # with removed cluster nodes. The time measurement is stopped when any replica is - # unreachable, so it should be configured to worst case in a healthy cluster. - max-pruning-dissemination = 60 s - - # Serialized Write and Read messages are cached when they are sent to - # several nodes. If no further activity they are removed from the cache - # after this duration. - serializer-cache-time-to-live = 10s - - delta-crdt { - - # Some complex deltas grow in size for each update and above this - # threshold such deltas are discarded and sent as full state instead. - max-delta-size = 200 - } - - durable { - # List of keys that are durable. Prefix matching is supported by using * at the - # end of a key. - keys = [] - - # The markers of that pruning has been performed for a removed node are kept for this - # time and thereafter removed. If and old data entry that was never pruned is - # injected and merged with existing data after this time the value will not be correct. - # This would be possible if replica with durable data didn't participate in the pruning - # (e.g. it was shutdown) and later started after this time. A durable replica should not - # be stopped for longer time than this duration and if it is joining again after this - # duration its data should first be manually removed (from the lmdb directory). - # It should be in the magnitude of days. Note that there is a corresponding setting - # for non-durable data: 'akka.cluster.distributed-data.pruning-marker-time-to-live'. - pruning-marker-time-to-live = 10 d - - # Fully qualified class name of the durable store actor. It must be a subclass - # of akka.actor.Actor and handle the protocol defined in - # akka.cluster.ddata.DurableStore. The class must have a constructor with - # com.typesafe.config.Config parameter. - store-actor-class = "" - - use-dispatcher = akka.cluster.distributed-data.durable.pinned-store - - pinned-store { - executor = thread-pool-executor - type = PinnedDispatcher - } - } - - lmdb { - # Directory of LMDB file. There are two options: - # 1. A relative or absolute path to a directory that ends with 'ddata' - # the full name of the directory will contain name of the ActorSystem - # and its remote port. - # 2. Otherwise the path is used as is, as a relative or absolute path to - # a directory. - # - # When running in production you may want to configure this to a specific - # path (alt 2), since the default directory contains the remote port of the - # actor system to make the name unique. If using a dynamically assigned - # port (0) it will be different each time and the previously stored data - # will not be loaded. - dir = "ddata" - - # Size in bytes of the memory mapped file. - map-size = 104857600 # 100MiB - - # Accumulate changes before storing improves performance with the - # risk of losing the last writes if the JVM crashes. - # The interval is by default set to 'off' to write each update immediately. - # Enabling write behind by specifying a duration, e.g. 200ms, is especially - # efficient when performing many writes to the same key, because it is only - # the last value for each key that will be serialized and stored. - # write-behind-interval = 200 ms - write-behind-interval = off - } } } # //#sharding-ext-config diff --git a/src/contrib/cluster/Akka.DistributedData.LightningDB/LmdbDurableStore.cs b/src/contrib/cluster/Akka.DistributedData.LightningDB/LmdbDurableStore.cs index 694812d3426..41647744cad 100644 --- a/src/contrib/cluster/Akka.DistributedData.LightningDB/LmdbDurableStore.cs +++ b/src/contrib/cluster/Akka.DistributedData.LightningDB/LmdbDurableStore.cs @@ -43,7 +43,7 @@ public sealed class LmdbDurableStore : ReceiveActor public const string DatabaseName = "ddata"; - private sealed class WriteBehind + private sealed class WriteBehind : IDeadLetterSuppression { public static readonly WriteBehind Instance = new WriteBehind(); private WriteBehind() { } @@ -60,49 +60,6 @@ private sealed class WriteBehind private readonly Dictionary _pending = new Dictionary(); private readonly ILoggingAdapter _log; - private (LightningEnvironment env, LightningDatabase db, bool initialized) _lmdb; - // Lazy init - private (LightningEnvironment env, LightningDatabase db, bool initialized) Lmdb - { - get - { - if (_lmdb.initialized) - return _lmdb; - - var t0 = Stopwatch.StartNew(); - _log.Info($"Using durable data in LMDB directory [{_dir}]"); - - if (!Directory.Exists(_dir)) - Directory.CreateDirectory(_dir); - - var mapSize = _config.GetByteSize("map-size", 100 * 1024 * 1024); - var env = new LightningEnvironment(_dir) - { - MapSize = mapSize.Value, - MaxDatabases = 1 - }; - env.Open(EnvironmentOpenFlags.NoLock); - - using (var tx = env.BeginTransaction()) - { - var db = tx.OpenDatabase(DatabaseName, new DatabaseConfiguration - { - Flags = DatabaseOpenFlags.Create - }); - tx.Commit(); - - t0.Stop(); - if (_log.IsDebugEnabled) - _log.Debug($"Init of LMDB in directory [{_dir}] took [{t0.ElapsedMilliseconds} ms]"); - - _lmdb = (env, db, true); - return _lmdb; - } - } - } - - public bool IsDbInitialized => _lmdb.initialized; - public LmdbDurableStore(Config config) { _config = config.GetConfig("lmdb"); @@ -142,13 +99,48 @@ protected override void PostStop() { base.PostStop(); DoWriteBehind(); + } + + private LightningEnvironment GetLightningEnvironment() + { + _log.Info($"Using durable data in LMDB directory [{_dir}]"); + + var mapSize = _config.GetByteSize("map-size"); + LightningEnvironment env; - if(IsDbInitialized) + if (!Directory.Exists(_dir)) { - var (env, db, _) = Lmdb; - try { db?.Dispose(); } catch { } - try { env?.Dispose(); } catch { } + var t0 = Stopwatch.StartNew(); + Directory.CreateDirectory(_dir); + + env = new LightningEnvironment(_dir) + { + MapSize = mapSize ?? 100 * 1024 * 1024, + MaxDatabases = 1 + }; + env.Open(EnvironmentOpenFlags.NoLock); + + using (var tx = env.BeginTransaction()) + using (tx.OpenDatabase(DatabaseName, new DatabaseConfiguration { Flags = DatabaseOpenFlags.Create })) + { + tx.Commit(); + } + + t0.Stop(); + if (_log.IsDebugEnabled) + _log.Debug($"Init of LMDB in directory [{_dir}] took [{t0.ElapsedMilliseconds} ms]"); } + else + { + env = new LightningEnvironment(_dir) + { + MapSize = mapSize ?? 100 * 1024 * 1024, + MaxDatabases = 1 + }; + env.Open(EnvironmentOpenFlags.NoLock); + } + + return env; } private void Active() @@ -157,13 +149,24 @@ private void Active() { try { - var l = Lmdb; // init if (_writeBehindInterval == TimeSpan.Zero) { - using (var tx = l.env.BeginTransaction()) + using (var env = GetLightningEnvironment()) + using(var tx = env.BeginTransaction()) + using (var db = tx.OpenDatabase(DatabaseName)) { - DbPut(tx, store.Key, store.Data); - tx.Commit(); + try + { + var byteKey = Encoding.UTF8.GetBytes(store.Key); + var byteValue = _serializer.ToBinary(store.Data); + tx.Put(db, byteKey, byteValue); + tx.Commit(); + } + catch (Exception) + { + tx.Abort(); + throw; + } } } else @@ -179,6 +182,7 @@ private void Active() { _log.Error(cause, "Failed to store [{0}]:{1}", store.Key, cause); store.Reply?.ReplyTo.Tell(store.Reply.FailureMessage); + throw; } }); @@ -197,20 +201,21 @@ private void Init() return; } - var (environment, db, _) = Lmdb; var t0 = Stopwatch.StartNew(); - using (var tx = environment.BeginTransaction(TransactionBeginFlags.ReadOnly)) - using (var cursor = tx.CreateCursor(db)) + using (var env = GetLightningEnvironment()) + using (var tx = env.BeginTransaction(TransactionBeginFlags.ReadOnly)) + using(var db = tx.OpenDatabase(DatabaseName)) + using(var cursor = tx.CreateCursor(db)) { try { var data = cursor.AsEnumerable().Select((x, i) => { - var (key, value) = x; - return new KeyValuePair( - Encoding.UTF8.GetString(key.CopyToNewArray()), - (DurableDataEnvelope)_serializer.FromBinary(value.CopyToNewArray(), _manifest)); - }).ToImmutableDictionary(); + var (key, value) = x; + return new KeyValuePair( + Encoding.UTF8.GetString(key.CopyToNewArray()), + (DurableDataEnvelope)_serializer.FromBinary(value.CopyToNewArray(), _manifest)); + }).ToImmutableDictionary(); if (data.Count > 0) { @@ -235,28 +240,22 @@ private void Init() }); } - private void DbPut(LightningTransaction tx, string key, DurableDataEnvelope data) - { - var byteKey = Encoding.UTF8.GetBytes(key); - var byteValue = _serializer.ToBinary(data); - - var l = Lmdb; - tx.Put(l.db, byteKey, byteValue); - } - private void DoWriteBehind() { if (_pending.Count > 0) { - var (env, _, _) = Lmdb; var t0 = Stopwatch.StartNew(); - using (var tx = env.BeginTransaction()) + using (var env = GetLightningEnvironment()) + using(var tx = env.BeginTransaction()) + using (var db = tx.OpenDatabase(DatabaseName)) { try { foreach (var entry in _pending) { - DbPut(tx, entry.Key, entry.Value); + var byteKey = Encoding.UTF8.GetBytes(entry.Key); + var byteValue = _serializer.ToBinary(entry.Value); + tx.Put(db, byteKey, byteValue); } tx.Commit(); @@ -270,9 +269,11 @@ private void DoWriteBehind() { _log.Error(cause, "failed to store [{0}]", string.Join(", ", _pending.Keys)); tx.Abort(); + throw; } finally { + t0.Stop(); _pending.Clear(); } } From b888b40314f7b44cd2463658c9289ef0b2f5fc1f Mon Sep 17 00:00:00 2001 From: Ismael Hamed <1279846+ismaelhamed@users.noreply.github.com> Date: Tue, 25 May 2021 17:32:20 +0200 Subject: [PATCH 18/50] A couple of fixes affecting the FileSubscriber (#5035) * Fail materialized Task of IO stages when stream fails * Fail FileSubscriber's Task if it can't open the file --- .../CoreAPISpec.ApproveStreams.approved.txt | 5 ++ .../Akka.Streams.Tests/IO/FileSinkSpec.cs | 31 ++++++++++++ .../IO/OutputStreamSinkSpec.cs | 47 +++++++++++++++++++ src/core/Akka.Streams/IO/IOResult.cs | 23 +++++++++ .../Implementation/IO/FileSubscriber.cs | 15 ++++-- .../IO/OutputStreamSubscriber.cs | 30 ++++++------ 6 files changed, 130 insertions(+), 21 deletions(-) diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt index af1c1fda081..6ae64ac5823 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt @@ -2564,6 +2564,11 @@ namespace Akka.Streams.Extra } namespace Akka.Streams.IO { + public sealed class AbruptIOTerminationException : System.Exception + { + public AbruptIOTerminationException(Akka.Streams.IO.IOResult ioResult, System.Exception cause) { } + public Akka.Streams.IO.IOResult IoResult { get; } + } public struct IOResult { public readonly long Count; diff --git a/src/core/Akka.Streams.Tests/IO/FileSinkSpec.cs b/src/core/Akka.Streams.Tests/IO/FileSinkSpec.cs index 6b38d1cdcbc..f1c1f06c0ca 100644 --- a/src/core/Akka.Streams.Tests/IO/FileSinkSpec.cs +++ b/src/core/Akka.Streams.Tests/IO/FileSinkSpec.cs @@ -329,6 +329,37 @@ public void SynchronousFileSink_should_write_single_line_to_a_file_from_lazy_sin }); } + [Fact] + public void SynchronousFileSink_should_complete_materialized_task_with_an_exception_when_upstream_fails() + { + TargetFile(f => + { + var completion = Source.From(_testByteStrings) + .Select(bytes => + { + if (bytes.Contains(Convert.ToByte('b'))) throw new TestException("bees!"); + return bytes; + }) + .RunWith(FileIO.ToFile(f), _materializer); + + var ex = Intercept(() => completion.Wait(TimeSpan.FromSeconds(3))); + ex.IoResult.Count.ShouldBe(1001); + CheckFileContent(f, string.Join("", _testLines.TakeWhile(s => !s.Contains('b')))); + }, _materializer); + } + + [Fact] + public void SynchronousFileSink_should_complete_with_failure_when_file_cannot_be_open() + { + TargetFile(f => + { + var completion = Source.Single(ByteString.FromString("42")) + .RunWith(FileIO.ToFile(new FileInfo("I-hope-this-file-doesnt-exist.txt"), FileMode.Open), _materializer); + + AssertThrows(completion.Wait); + }, _materializer); + } + [Fact] public void SynchronousFileSink_should_write_each_element_if_auto_flush_is_set() { diff --git a/src/core/Akka.Streams.Tests/IO/OutputStreamSinkSpec.cs b/src/core/Akka.Streams.Tests/IO/OutputStreamSinkSpec.cs index 11703552a7c..8c00bb4dce8 100644 --- a/src/core/Akka.Streams.Tests/IO/OutputStreamSinkSpec.cs +++ b/src/core/Akka.Streams.Tests/IO/OutputStreamSinkSpec.cs @@ -11,6 +11,7 @@ using Akka.Actor; using Akka.IO; using Akka.Streams.Dsl; +using Akka.Streams.IO; using Akka.Streams.TestKit.Tests; using Akka.TestKit; using Xunit; @@ -152,6 +153,40 @@ protected override void Dispose(bool disposing) public override long Length { get; } public override long Position { get; set; } } + + private sealed class OutputStream : Stream + { + public override void Flush() + { + throw new NotImplementedException(); + } + + public override long Seek(long offset, SeekOrigin origin) + { + throw new NotImplementedException(); + } + + public override void SetLength(long value) + { + throw new NotImplementedException(); + } + + public override int Read(byte[] buffer, int offset, int count) + { + throw new NotImplementedException(); + } + + public override void Write(byte[] buffer, int offset, int count) + { + } + + public override bool CanRead { get; } + public override bool CanSeek { get; } + public override bool CanWrite => true; + public override long Length { get; } + public override long Position { get; set; } + } + #endregion private readonly ActorMaterializer _materializer; @@ -199,6 +234,18 @@ public void OutputStreamSink_must_close_underlying_stream_when_error_received() }, _materializer); } + [Fact] + public void OutputStreamSink_must_complete_materialized_value_with_the_error() + { + this.AssertAllStagesStopped(() => + { + var completion = Source.Failed(new Exception("Boom!")) + .RunWith(StreamConverters.FromOutputStream(() => new OutputStream()), _materializer); + + AssertThrows(completion.Wait); + }, _materializer); + } + [Fact] public void OutputStreamSink_must_close_underlying_stream_when_completion_received() { diff --git a/src/core/Akka.Streams/IO/IOResult.cs b/src/core/Akka.Streams/IO/IOResult.cs index 4d4dcd766d8..cc22e83e266 100644 --- a/src/core/Akka.Streams/IO/IOResult.cs +++ b/src/core/Akka.Streams/IO/IOResult.cs @@ -71,4 +71,27 @@ public Exception Error public static IOResult Failed(long count, Exception reason) => new IOResult(count, Result.Failure(reason)); } + + /// + /// This exception signals that a stream has been completed by an onError signal while there was still IO operations in progress. + /// + public sealed class AbruptIOTerminationException : Exception + { + /// + /// The number of bytes read/written up until the error + /// + public IOResult IoResult { get; } + + /// + /// Initializes a new instance of the class with the result of the IO operation + /// until the error and a reference to the inner exception that is the cause of this exception. + /// + /// The result of the IO operation until the error + /// The exception that is the cause of the current exception + public AbruptIOTerminationException(IOResult ioResult, Exception cause) + : base("Stream terminated without completing IO operation.", cause) + { + IoResult = ioResult; + } + } } diff --git a/src/core/Akka.Streams/Implementation/IO/FileSubscriber.cs b/src/core/Akka.Streams/Implementation/IO/FileSubscriber.cs index c5c80a65e71..ea7f713f8b5 100644 --- a/src/core/Akka.Streams/Implementation/IO/FileSubscriber.cs +++ b/src/core/Akka.Streams/Implementation/IO/FileSubscriber.cs @@ -13,6 +13,7 @@ using Akka.IO; using Akka.Streams.Actors; using Akka.Streams.IO; +using Akka.Util; namespace Akka.Streams.Implementation.IO { @@ -110,7 +111,7 @@ protected override void PreStart() } catch (Exception ex) { - CloseAndComplete(IOResult.Failed(_bytesWritten, ex)); + CloseAndComplete(new Try(ex)); Cancel(); } } @@ -143,7 +144,7 @@ protected override bool Receive(object message) case OnError error: _log.Error(error.Cause, "Tearing down FileSink({0}) due to upstream error", _f.FullName); - CloseAndComplete(IOResult.Failed(_bytesWritten, error.Cause)); + CloseAndComplete(new Try(new AbruptIOTerminationException(IOResult.Success(_bytesWritten), error.Cause))); Context.Stop(Self); return true; @@ -185,7 +186,7 @@ protected override void PostStop() base.PostStop(); } - private void CloseAndComplete(IOResult result) + private void CloseAndComplete(Try result) { try { @@ -193,11 +194,15 @@ private void CloseAndComplete(IOResult result) // file to be deleted, which would not work (on some systems) if the // file is still open for writing _chan?.Dispose(); - _completionPromise.TrySetResult(result); + + if (result.IsSuccess) + _completionPromise.SetResult(result.Success.Value); + else + _completionPromise.SetException(result.Failure.Value); } catch (Exception ex) { - _completionPromise.TrySetResult(IOResult.Failed(_bytesWritten, ex)); + _completionPromise.TrySetException(ex); } } } diff --git a/src/core/Akka.Streams/Implementation/IO/OutputStreamSubscriber.cs b/src/core/Akka.Streams/Implementation/IO/OutputStreamSubscriber.cs index 10f3fc884fb..0be685828fb 100644 --- a/src/core/Akka.Streams/Implementation/IO/OutputStreamSubscriber.cs +++ b/src/core/Akka.Streams/Implementation/IO/OutputStreamSubscriber.cs @@ -13,7 +13,6 @@ using Akka.IO; using Akka.Streams.Actors; using Akka.Streams.IO; -using Akka.Util; namespace Akka.Streams.Implementation.IO { @@ -75,12 +74,12 @@ public OutputStreamSubscriber(Stream outputStream, TaskCompletionSourceTBD protected override bool Receive(object message) { - return message.Match() - .With(next => - { + switch (message) + { + case OnNext next: try { - var bytes = next.Element as ByteString; + var bytes = (ByteString)next.Element; //blocking write _outputStream.Write(bytes.ToArray(), 0, bytes.Count); _bytesWritten += bytes.Count; @@ -92,20 +91,19 @@ protected override bool Receive(object message) _completionPromise.TrySetResult(IOResult.Failed(_bytesWritten, ex)); Cancel(); } - }) - .With(error => - { - _log.Error(error.Cause, - $"Tearing down OutputStreamSink due to upstream error, wrote bytes: {_bytesWritten}"); - _completionPromise.TrySetResult(IOResult.Failed(_bytesWritten, error.Cause)); + return true; + case OnError error: + _log.Error(error.Cause, "Tearing down OutputStreamSink due to upstream error, wrote bytes: {0}", _bytesWritten); + _completionPromise.TrySetException(new AbruptIOTerminationException(IOResult.Success(_bytesWritten), error.Cause)); Context.Stop(Self); - }) - .With(() => - { + return true; + case OnComplete _: Context.Stop(Self); _outputStream.Flush(); - }) - .WasHandled; + return true; + } + + return false; } /// From 3dc2e35e001f38e41f551e7371d28c0505e76660 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 25 May 2021 20:20:17 +0000 Subject: [PATCH 19/50] Bump Google.Protobuf from 3.17.0 to 3.17.1 (#5036) Bumps [Google.Protobuf](https://github.com/protocolbuffers/protobuf) from 3.17.0 to 3.17.1. - [Release notes](https://github.com/protocolbuffers/protobuf/releases) - [Changelog](https://github.com/protocolbuffers/protobuf/blob/master/generate_changelog.py) - [Commits](https://github.com/protocolbuffers/protobuf/compare/v3.17.0...v3.17.1) Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- src/common.props | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/common.props b/src/common.props index d630508f928..4c6720b7681 100644 --- a/src/common.props +++ b/src/common.props @@ -14,7 +14,7 @@ 0.10.1 13.0.1 2.0.1 - 3.17.0 + 3.17.1 netcoreapp3.1 net5.0 net471 From 99afc0ecb4e2e9accb861b67d3832f65012fb5a2 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 26 May 2021 08:46:18 -0500 Subject: [PATCH 20/50] Bump Google.Protobuf from 3.17.1 to 3.17.2 (#5040) Bumps [Google.Protobuf](https://github.com/protocolbuffers/protobuf) from 3.17.1 to 3.17.2. - [Release notes](https://github.com/protocolbuffers/protobuf/releases) - [Changelog](https://github.com/protocolbuffers/protobuf/blob/master/generate_changelog.py) - [Commits](https://github.com/protocolbuffers/protobuf/commits) Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- src/common.props | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/common.props b/src/common.props index 4c6720b7681..b00522f1e73 100644 --- a/src/common.props +++ b/src/common.props @@ -14,7 +14,7 @@ 0.10.1 13.0.1 2.0.1 - 3.17.1 + 3.17.2 netcoreapp3.1 net5.0 net471 From c663bc230946644b9d91999d9dbbccf8ebc699f5 Mon Sep 17 00:00:00 2001 From: Sam Ember Date: Wed, 26 May 2021 22:48:06 +0100 Subject: [PATCH 21/50] Abstraction of ServiceProvider, Improving Akka.DependencyInjection (#4814) * Abstraction of ServiceProvider * introduced non-breaking Akka.DependencyInjection API changes * fixed unit tests / Props bug * fixed up DelegateInjectionSpecs * Added type checking for `Props(Type type, params object[] args)` * fixed non-generic `Props()` method Co-authored-by: Aaron Stannard --- ...ctorServiceProviderPropsWithScopesSpecs.cs | 24 +++- .../DelegateInjectionSpecs.cs | 12 +- .../ServiceProviderSetupSpecs.cs | 22 +-- .../DependencyResolver.cs | 125 ++++++++++++++++++ .../DependencyResolverSetup.cs | 84 ++++++++++++ .../IDependencyResolver.cs | 61 +++++++++ .../ServiceProvider.cs | 27 +++- .../ServiceProviderDependencyResolver.cs | 62 +++++++++ .../ServiceProviderScope.cs | 33 +++++ .../ServiceProviderSetup.cs | 40 ------ .../Actors/AkkaService.cs | 5 +- 11 files changed, 428 insertions(+), 67 deletions(-) create mode 100644 src/contrib/dependencyinjection/Akka.DependencyInjection/DependencyResolver.cs create mode 100644 src/contrib/dependencyinjection/Akka.DependencyInjection/DependencyResolverSetup.cs create mode 100644 src/contrib/dependencyinjection/Akka.DependencyInjection/IDependencyResolver.cs create mode 100644 src/contrib/dependencyinjection/Akka.DependencyInjection/ServiceProviderDependencyResolver.cs create mode 100644 src/contrib/dependencyinjection/Akka.DependencyInjection/ServiceProviderScope.cs delete mode 100644 src/contrib/dependencyinjection/Akka.DependencyInjection/ServiceProviderSetup.cs diff --git a/src/contrib/dependencyinjection/Akka.DependencyInjection.Tests/ActorServiceProviderPropsWithScopesSpecs.cs b/src/contrib/dependencyinjection/Akka.DependencyInjection.Tests/ActorServiceProviderPropsWithScopesSpecs.cs index ef5d2fe0c39..4c2c84c5f54 100644 --- a/src/contrib/dependencyinjection/Akka.DependencyInjection.Tests/ActorServiceProviderPropsWithScopesSpecs.cs +++ b/src/contrib/dependencyinjection/Akka.DependencyInjection.Tests/ActorServiceProviderPropsWithScopesSpecs.cs @@ -21,7 +21,7 @@ namespace Akka.DependencyInjection.Tests public class ActorServiceProviderPropsWithScopesSpecs : AkkaSpec, IClassFixture { - public ActorServiceProviderPropsWithScopesSpecs(AkkaDiFixture fixture, ITestOutputHelper output) : base(ServiceProviderSetup.Create(fixture.Provider) + public ActorServiceProviderPropsWithScopesSpecs(AkkaDiFixture fixture, ITestOutputHelper output) : base(DependencyResolverSetup.Create(fixture.Provider) .And(BootstrapSetup.Create().WithConfig(TestKitBase.DefaultConfig)), output) { @@ -30,7 +30,7 @@ public ActorServiceProviderPropsWithScopesSpecs(AkkaDiFixture fixture, ITestOutp [Fact(DisplayName = "DI: actors who receive an IServiceScope through Props should dispose of their dependencies upon termination")] public void ActorsWithScopedDependenciesShouldDisposeUponStop() { - var spExtension = ServiceProvider.For(Sys); + var spExtension = DependencyResolver.For(Sys); var props = spExtension.Props(); // create a scoped actor using the props from Akka.DependencyInjection @@ -60,11 +60,23 @@ public void ActorsWithScopedDependenciesShouldDisposeUponStop() deps2.Dependencies.All(x => x.Disposed).Should().BeFalse(); } + [Fact(DisplayName = "DI: should be able to start actors with untyped Props")] + public void ShouldStartActorWithUntypedProps() + { + var spExtension = DependencyResolver.For(Sys); + var props = spExtension.Props(typeof(ScopedActor)); + + // create a scoped actor using the props from Akka.DependencyInjection + var scoped1 = Sys.ActorOf(props, "scoped1"); + scoped1.Tell(new FetchDependencies()); + var deps1 = ExpectMsg(); + } + [Fact(DisplayName = "DI: actors who receive an IServiceScope through Props should dispose of their dependencies and recreate upon restart")] public void ActorsWithScopedDependenciesShouldDisposeAndRecreateUponRestart() { - var spExtension = ServiceProvider.For(Sys); + var spExtension = DependencyResolver.For(Sys); var props = spExtension.Props(); // create a scoped actor using the props from Akka.DependencyInjection @@ -95,7 +107,7 @@ public void ActorsWithScopedDependenciesShouldDisposeAndRecreateUponRestart() "DI: actors who receive a mix of dependencies via IServiceScope should dispose ONLY of their scoped dependencies and recreate upon restart")] public void ActorsWithMixedDependenciesShouldDisposeAndRecreateScopedUponRestart() { - var spExtension = ServiceProvider.For(Sys); + var spExtension = DependencyResolver.For(Sys); var props = spExtension.Props(); // create a scoped actor using the props from Akka.DependencyInjection @@ -134,7 +146,7 @@ public void ActorsWithMixedDependenciesShouldDisposeAndRecreateScopedUponRestart public void ActorsWithNonDiDependenciesShouldStart() { // - var spExtension = ServiceProvider.For(Sys); + var spExtension = DependencyResolver.For(Sys); var arg1 = "foo"; var arg2 = "bar"; var props = spExtension.Props(arg1, arg2); @@ -182,7 +194,7 @@ public void ActorsWithNonDiDependenciesShouldStart() public void ServiceProvider_Props_should_support_copying() { // - var spExtension = ServiceProvider.For(Sys); + var spExtension = DependencyResolver.For(Sys); var arg1 = "foo"; var arg2 = "bar"; var props = spExtension.Props(arg1, arg2).WithRouter(new RoundRobinPool(10).WithSupervisorStrategy(new OneForOneStrategy( diff --git a/src/contrib/dependencyinjection/Akka.DependencyInjection.Tests/DelegateInjectionSpecs.cs b/src/contrib/dependencyinjection/Akka.DependencyInjection.Tests/DelegateInjectionSpecs.cs index bc0a64388ad..1e2342644d6 100644 --- a/src/contrib/dependencyinjection/Akka.DependencyInjection.Tests/DelegateInjectionSpecs.cs +++ b/src/contrib/dependencyinjection/Akka.DependencyInjection.Tests/DelegateInjectionSpecs.cs @@ -1,4 +1,10 @@ -using System; +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2021 Lightbend Inc. +// Copyright (C) 2013-2021 .NET Foundation +// +//----------------------------------------------------------------------- +using System; using System.Collections.Generic; using System.Linq; using System.Text; @@ -73,7 +79,7 @@ public async Task DI_should_be_able_to_retrieve_singleton_using_delegate_from_in internal class ParentActor : UntypedActor { public static Props Props(ActorSystem system) => - ServiceProvider.For(system).Props(); + DependencyResolver.For(system).Props(); private readonly IActorRef _echoActor; @@ -114,7 +120,7 @@ public AkkaService(IServiceProvider serviceProvider) public Task StartAsync(CancellationToken cancellationToken) { - var setup = ServiceProviderSetup.Create(_serviceProvider) + var setup = DependencyResolverSetup.Create(_serviceProvider) .And(BootstrapSetup.Create().WithConfig(TestKitBase.DefaultConfig)); ActorSystem = ActorSystem.Create("TestSystem", setup); diff --git a/src/contrib/dependencyinjection/Akka.DependencyInjection.Tests/ServiceProviderSetupSpecs.cs b/src/contrib/dependencyinjection/Akka.DependencyInjection.Tests/ServiceProviderSetupSpecs.cs index e14e8e8ee4b..d44085f69f6 100644 --- a/src/contrib/dependencyinjection/Akka.DependencyInjection.Tests/ServiceProviderSetupSpecs.cs +++ b/src/contrib/dependencyinjection/Akka.DependencyInjection.Tests/ServiceProviderSetupSpecs.cs @@ -18,7 +18,7 @@ namespace Akka.DependencyInjection.Tests { public class ServiceProviderSetupSpecs : AkkaSpec, IClassFixture { - public ServiceProviderSetupSpecs(AkkaDiFixture fixture, ITestOutputHelper output) : base(ServiceProviderSetup.Create(fixture.Provider) + public ServiceProviderSetupSpecs(AkkaDiFixture fixture, ITestOutputHelper output) : base(DependencyResolverSetup.Create(fixture.Provider) .And(BootstrapSetup.Create().WithConfig(TestKitBase.DefaultConfig)), output) { @@ -27,29 +27,29 @@ public ServiceProviderSetupSpecs(AkkaDiFixture fixture, ITestOutputHelper output [Fact(DisplayName = "DI: Should access Microsoft.Extensions.DependencyInjection.IServiceProvider from ServiceProvider ActorSystem extension")] public void ShouldAccessServiceProviderFromActorSystemExtension() { - var sp = ServiceProvider.For(Sys); - var dep = sp.Provider.GetService(); + var sp = DependencyResolver.For(Sys); + var dep = sp.Resolver.GetService(); dep.Should().BeOfType(); - var dep2 = sp.Provider.GetService(); + var dep2 = sp.Resolver.GetService(); dep2.Should().NotBe(dep); // the two transient instances should be different // scoped services should be the same - var scoped1 = sp.Provider.GetService(); - var scoped2 = sp.Provider.GetService(); + var scoped1 = sp.Resolver.GetService(); + var scoped2 = sp.Resolver.GetService(); scoped1.Should().Be(scoped2); // create a new scope - using (var newScope = sp.Provider.CreateScope()) + using (var newScope = sp.Resolver.CreateScope()) { - var scoped3 = newScope.ServiceProvider.GetService(); + var scoped3 = newScope.Resolver.GetService(); scoped1.Should().NotBe(scoped3); } // singleton services should be the same - var singleton1 = sp.Provider.GetService(); - var singleton2 = sp.Provider.GetService(); + var singleton1 = sp.Resolver.GetService(); + var singleton2 = sp.Resolver.GetService(); singleton1.Should().Be(singleton2); } @@ -67,7 +67,7 @@ public void ShouldAccessServiceProviderFromActorSystemExtension() { Action getSp = () => { - var sp = ServiceProvider.For(Sys); + var sp = DependencyResolver.For(Sys); }; getSp.Should().Throw(); diff --git a/src/contrib/dependencyinjection/Akka.DependencyInjection/DependencyResolver.cs b/src/contrib/dependencyinjection/Akka.DependencyInjection/DependencyResolver.cs new file mode 100644 index 00000000000..4917a1b8ed5 --- /dev/null +++ b/src/contrib/dependencyinjection/Akka.DependencyInjection/DependencyResolver.cs @@ -0,0 +1,125 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2021 Lightbend Inc. +// Copyright (C) 2013-2021 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using Akka.Actor; +using Akka.Configuration; +using Akka.Event; +using Microsoft.Extensions.DependencyInjection; + +namespace Akka.DependencyInjection +{ + /// + /// Provides users with immediate access to the bound to + /// this , if any. + /// + public sealed class DependencyResolver : IExtension + { + public DependencyResolver(IDependencyResolver resolver) + { + Resolver = resolver; + } + + /// + /// The globally scoped . + /// + /// + /// Per https://docs.microsoft.com/en-us/dotnet/core/extensions/dependency-injection-guidelines - please use + /// the appropriate for your actors and the dependencies they consume. DI is typically + /// not used for long-lived, stateful objects such as actors. + /// + /// Therefore, injecting transient dependencies via constructors is a bad idea in most cases. You'd be far better off + /// creating a local "request scope" each time your actor processes a message that depends on a transient dependency, + /// such as a database connection, and disposing that scope once the operation is complete. + /// + /// Actors are not MVC Controllers. Actors can live forever, have the ability to restart, and are often stateful. + /// Be mindful of this as you use this feature or bad things will happen. Akka.NET does not magically manage scopes + /// for you. + /// + public IDependencyResolver Resolver { get; } + + public static DependencyResolver For(ActorSystem actorSystem) + { + return actorSystem.WithExtension(); + } + + /// + /// Uses a delegate to dynamically instantiate an actor where some of the constructor arguments are populated via dependency injection + /// and others are not. + /// + /// + /// YOU ARE RESPONSIBLE FOR MANAGING THE LIFECYCLE OF YOUR OWN DEPENDENCIES. AKKA.NET WILL NOT ATTEMPT TO DO IT FOR YOU. + /// + /// The type of actor to instantiate. + /// Optional. Any constructor arguments that will be passed into the actor's constructor directly without being resolved by DI first. + /// A new instance which uses DI internally. + public Props Props(params object[] args) where T : ActorBase + { + return Resolver.Props(args); + } + + /// + /// Used to dynamically instantiate an actor where some of the constructor arguments are populated via dependency injection + /// and others are not. + /// + /// + /// YOU ARE RESPONSIBLE FOR MANAGING THE LIFECYCLE OF YOUR OWN DEPENDENCIES. AKKA.NET WILL NOT ATTEMPT TO DO IT FOR YOU. + /// + /// The type of actor to instantiate. + /// A new instance which uses DI internally. + public Props Props() where T : ActorBase + { + return Resolver.Props(); + } + + /// + /// Used to dynamically instantiate an actor where some of the constructor arguments are populated via dependency injection + /// and others are not. + /// + /// + /// YOU ARE RESPONSIBLE FOR MANAGING THE LIFECYCLE OF YOUR OWN DEPENDENCIES. AKKA.NET WILL NOT ATTEMPT TO DO IT FOR YOU. + /// + /// The type of actor to instantiate. + /// A new instance which uses DI internally. + public Props Props(Type type) + { + return Resolver.Props(type); + } + + /// + /// Used to dynamically instantiate an actor where some of the constructor arguments are populated via dependency injection + /// and others are not. + /// + /// + /// YOU ARE RESPONSIBLE FOR MANAGING THE LIFECYCLE OF YOUR OWN DEPENDENCIES. AKKA.NET WILL NOT ATTEMPT TO DO IT FOR YOU. + /// + /// The type of actor to instantiate. + /// Optional. Any constructor arguments that will be passed into the actor's constructor directly without being resolved by DI first. + /// A new instance which uses DI internally. + public Props Props(Type type, params object[] args) + { + return Resolver.Props(type, args); + } + } + + /// + /// INTERNAL API + /// + public sealed class DependencyResolverExtension : ExtensionIdProvider + { + public override DependencyResolver CreateExtension(ExtendedActorSystem system) + { + var setup = system.Settings.Setup.Get(); + if (setup.HasValue) return new DependencyResolver(setup.Value.DependencyResolver); + + var exception = new ConfigurationException("Unable to find [DependencyResolverSetup] included in ActorSystem settings." + + " Please specify one before attempting to use dependency injection inside Akka.NET."); + system.EventStream.Publish(new Error(exception, "Akka.DependencyInjection", typeof(DependencyResolverExtension), exception.Message)); + throw exception; + } + } +} diff --git a/src/contrib/dependencyinjection/Akka.DependencyInjection/DependencyResolverSetup.cs b/src/contrib/dependencyinjection/Akka.DependencyInjection/DependencyResolverSetup.cs new file mode 100644 index 00000000000..3df580095a3 --- /dev/null +++ b/src/contrib/dependencyinjection/Akka.DependencyInjection/DependencyResolverSetup.cs @@ -0,0 +1,84 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2021 Lightbend Inc. +// Copyright (C) 2013-2021 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using Akka.Actor; +using Akka.Actor.Setup; + +namespace Akka.DependencyInjection +{ + /// + /// Used to help bootstrap an with dependency injection (DI) + /// support via a reference. + /// + /// The will be used to access previously registered services + /// in the creation of actors and other pieces of infrastructure inside Akka.NET. + /// + /// The constructor is internal. Please use to create a new instance. + /// + [Obsolete("Used DependencyResolverSetup instead.")] + public class ServiceProviderSetup : Setup + { + internal ServiceProviderSetup(IServiceProvider serviceProvider) + { + ServiceProvider = serviceProvider; + } + + public IServiceProvider ServiceProvider { get; } + + public static ServiceProviderSetup Create(IServiceProvider provider) + { + if (provider == null) + throw new ArgumentNullException(nameof(provider)); + + return new ServiceProviderSetup(provider); + } + } + + /// + /// Used to help bootstrap an with dependency injection (DI) + /// support via a reference. + /// + /// The will be used to access previously registered services + /// in the creation of actors and other pieces of infrastructure inside Akka.NET. + /// + /// The constructor is internal. Please use to create a new instance. + /// + public class DependencyResolverSetup : Setup + { + public IDependencyResolver DependencyResolver { get; } + + internal DependencyResolverSetup(IDependencyResolver dependencyResolver) + { + DependencyResolver = dependencyResolver; + } + + /// + /// Creates a new instance of DependencyResolverSetup, passing in + /// here creates an that resolves dependencies from the specified + /// + public static DependencyResolverSetup Create(IServiceProvider provider) + { + if (provider == null) + throw new ArgumentNullException(nameof(provider)); + + return new DependencyResolverSetup(new ServiceProviderDependencyResolver(provider)); + } + + /// + /// Creates a new instance of DependencyResolverSetup, an implementation of + /// can be passed in here to resolve services from test or alternative DI frameworks. + /// + public static DependencyResolverSetup Create(IDependencyResolver provider) + { + if (provider == null) + throw new ArgumentNullException(nameof(provider)); + + return new DependencyResolverSetup(provider); + } + } +} diff --git a/src/contrib/dependencyinjection/Akka.DependencyInjection/IDependencyResolver.cs b/src/contrib/dependencyinjection/Akka.DependencyInjection/IDependencyResolver.cs new file mode 100644 index 00000000000..d221f6480db --- /dev/null +++ b/src/contrib/dependencyinjection/Akka.DependencyInjection/IDependencyResolver.cs @@ -0,0 +1,61 @@ +// //----------------------------------------------------------------------- +// // +// // Copyright (C) 2009-2021 Lightbend Inc. +// // Copyright (C) 2013-2021 .NET Foundation +// // +// //----------------------------------------------------------------------- + +using System; +using Akka.Actor; + +namespace Akka.DependencyInjection +{ + /// + /// Interface abstraction for working with DI providers + /// in Akka.NET without being bound to any specific implementation. + /// + /// + /// See for a reference implementation. + /// + public interface IDependencyResolver + { + IResolverScope CreateScope(); + object GetService(); + object GetService(Type type); + + /// + /// Used to dynamically instantiate an actor where some of the constructor arguments are populated via dependency injection + /// and others are not. + /// + /// + /// YOU ARE RESPONSIBLE FOR MANAGING THE LIFECYCLE OF YOUR OWN DEPENDENCIES. AKKA.NET WILL NOT ATTEMPT TO DO IT FOR YOU. + /// + /// The type of actor to instantiate. + /// Optional. Any constructor arguments that will be passed into the actor's constructor directly without being resolved by DI first. + /// A new instance which uses DI internally. + Props Props(Type type, params object[] args); + + /// + /// Used to dynamically instantiate an actor where some of the constructor arguments are populated via dependency injection + /// and others are not. + /// + /// + /// YOU ARE RESPONSIBLE FOR MANAGING THE LIFECYCLE OF YOUR OWN DEPENDENCIES. AKKA.NET WILL NOT ATTEMPT TO DO IT FOR YOU. + /// + /// The type of actor to instantiate. + /// A new instance which uses DI internally. + Props Props(Type type); + + /// + /// Used to dynamically instantiate an actor where some of the constructor arguments are populated via dependency injection + /// and others are not. + /// + /// + /// YOU ARE RESPONSIBLE FOR MANAGING THE LIFECYCLE OF YOUR OWN DEPENDENCIES. AKKA.NET WILL NOT ATTEMPT TO DO IT FOR YOU. + /// + /// The type of actor to instantiate. + /// Optional. Any constructor arguments that will be passed into the actor's constructor directly without being resolved by DI first. + /// A new instance which uses DI internally. + Props Props(params object[] args) where T : ActorBase; + } +} \ No newline at end of file diff --git a/src/contrib/dependencyinjection/Akka.DependencyInjection/ServiceProvider.cs b/src/contrib/dependencyinjection/Akka.DependencyInjection/ServiceProvider.cs index 883730d0b27..aa652707737 100644 --- a/src/contrib/dependencyinjection/Akka.DependencyInjection/ServiceProvider.cs +++ b/src/contrib/dependencyinjection/Akka.DependencyInjection/ServiceProvider.cs @@ -17,6 +17,10 @@ namespace Akka.DependencyInjection /// Provides users with immediate access to the bound to /// this , if any. /// + /// + /// [OBSOLETE] Switch to the instead. + /// + [Obsolete("Replaced by the Akka.DependencyInjection.DependencyResolver in Akka.NET v1.4.20. Please switch to that.")] public sealed class ServiceProvider : IExtension { public ServiceProvider(IServiceProvider provider) @@ -66,6 +70,7 @@ public static ServiceProvider For(ActorSystem actorSystem) /// /// INTERNAL API /// + [Obsolete("Use the DependencyResolverExtensions instead.")] public sealed class ServiceProviderExtension : ExtensionIdProvider { public override ServiceProvider CreateExtension(ExtendedActorSystem system) @@ -88,17 +93,16 @@ public override ServiceProvider CreateExtension(ExtendedActorSystem system) /// /// Used to create actors via the . /// - /// the actor type - internal sealed class ServiceProviderActorProducer : IIndirectActorProducer where TActor:ActorBase + internal class ServiceProviderActorProducer : IIndirectActorProducer { private readonly IServiceProvider _provider; private readonly object[] _args; - public ServiceProviderActorProducer(IServiceProvider provider, object[] args) + public ServiceProviderActorProducer(IServiceProvider provider, Type actorType, object[] args) { _provider = provider; _args = args; - ActorType = typeof(TActor); + ActorType = actorType; } public ActorBase Produce() @@ -113,4 +117,19 @@ public void Release(ActorBase actor) // no-op } } + + /// + /// INTERNAL API + /// + /// Used to create actors via the . + /// + /// the actor type + internal class ServiceProviderActorProducer : ServiceProviderActorProducer where TActor:ActorBase + { + + public ServiceProviderActorProducer(IServiceProvider provider, object[] args) + : base(provider, typeof(TActor), args) + { + } + } } diff --git a/src/contrib/dependencyinjection/Akka.DependencyInjection/ServiceProviderDependencyResolver.cs b/src/contrib/dependencyinjection/Akka.DependencyInjection/ServiceProviderDependencyResolver.cs new file mode 100644 index 00000000000..8fdab7c3ada --- /dev/null +++ b/src/contrib/dependencyinjection/Akka.DependencyInjection/ServiceProviderDependencyResolver.cs @@ -0,0 +1,62 @@ +// //----------------------------------------------------------------------- +// // +// // Copyright (C) 2009-2021 Lightbend Inc. +// // Copyright (C) 2013-2021 .NET Foundation +// // +// //----------------------------------------------------------------------- + +using System; +using Akka.Actor; +using Microsoft.Extensions.DependencyInjection; + +namespace Akka.DependencyInjection +{ + /// + /// INTERNAL API. + /// + /// implementation backed by + /// + public class ServiceProviderDependencyResolver : IDependencyResolver + { + public IServiceProvider ServiceProvider { get; } + + public ServiceProviderDependencyResolver(IServiceProvider serviceProvider) + { + ServiceProvider = serviceProvider; + } + + public IResolverScope CreateScope() + { + return new ServiceProviderScope(ServiceProvider.CreateScope()); + } + + public object GetService() + { + return ServiceProvider.GetService(); + } + + public object GetService(Type type) + { + return ServiceProvider.GetService(type); + } + + public Props Props(Type type, params object[] args) + { + if(typeof(ActorBase).IsAssignableFrom(type)) + return Akka.Actor.Props.CreateBy(new ServiceProviderActorProducer(ServiceProvider, type, args)); + throw new ArgumentException(nameof(type), $"[{type}] does not implement Akka.Actor.ActorBase."); + } + + public Props Props(Type type) + { + return Props(type, Array.Empty()); + } + + public Props Props(params object[] args) where T : ActorBase + { + return Akka.Actor.Props.CreateBy(new ServiceProviderActorProducer(ServiceProvider, args)); + } + } + + +} diff --git a/src/contrib/dependencyinjection/Akka.DependencyInjection/ServiceProviderScope.cs b/src/contrib/dependencyinjection/Akka.DependencyInjection/ServiceProviderScope.cs new file mode 100644 index 00000000000..ce3f1911dcc --- /dev/null +++ b/src/contrib/dependencyinjection/Akka.DependencyInjection/ServiceProviderScope.cs @@ -0,0 +1,33 @@ +// //----------------------------------------------------------------------- +// // +// // Copyright (C) 2009-2021 Lightbend Inc. +// // Copyright (C) 2013-2021 .NET Foundation +// // +// //----------------------------------------------------------------------- + +using System; +using Microsoft.Extensions.DependencyInjection; + +namespace Akka.DependencyInjection +{ + public interface IResolverScope : IDisposable + { + IDependencyResolver Resolver { get; } + } + + public class ServiceProviderScope : IResolverScope + { + private readonly IServiceScope _scope; + public IDependencyResolver Resolver { get; } + public ServiceProviderScope(IServiceScope scope) + { + _scope = scope; + Resolver = new ServiceProviderDependencyResolver(scope.ServiceProvider); + } + + public void Dispose() + { + _scope?.Dispose(); + } + } +} \ No newline at end of file diff --git a/src/contrib/dependencyinjection/Akka.DependencyInjection/ServiceProviderSetup.cs b/src/contrib/dependencyinjection/Akka.DependencyInjection/ServiceProviderSetup.cs deleted file mode 100644 index 835d1ad98f3..00000000000 --- a/src/contrib/dependencyinjection/Akka.DependencyInjection/ServiceProviderSetup.cs +++ /dev/null @@ -1,40 +0,0 @@ -//----------------------------------------------------------------------- -// -// Copyright (C) 2009-2021 Lightbend Inc. -// Copyright (C) 2013-2021 .NET Foundation -// -//----------------------------------------------------------------------- - -using System; -using Akka.Actor; -using Akka.Actor.Setup; - -namespace Akka.DependencyInjection -{ - /// - /// Used to help bootstrap an with dependency injection (DI) - /// support via a reference. - /// - /// The will be used to access previously registered services - /// in the creation of actors and other pieces of infrastructure inside Akka.NET. - /// - /// The constructor is internal. Please use to create a new instance. - /// - public class ServiceProviderSetup : Setup - { - internal ServiceProviderSetup(IServiceProvider serviceProvider) - { - ServiceProvider = serviceProvider; - } - - public IServiceProvider ServiceProvider { get; } - - public static ServiceProviderSetup Create(IServiceProvider provider) - { - if (provider == null) - throw new ArgumentNullException(nameof(provider)); - - return new ServiceProviderSetup(provider); - } - } -} diff --git a/src/examples/AspNetCore/Samples.Akka.AspNetCore/Actors/AkkaService.cs b/src/examples/AspNetCore/Samples.Akka.AspNetCore/Actors/AkkaService.cs index 98bb914016e..efb40de587d 100644 --- a/src/examples/AspNetCore/Samples.Akka.AspNetCore/Actors/AkkaService.cs +++ b/src/examples/AspNetCore/Samples.Akka.AspNetCore/Actors/AkkaService.cs @@ -18,7 +18,6 @@ using Microsoft.Extensions.Hosting; using Samples.Akka.AspNetCore.Messages; using Samples.Akka.AspNetCore.Services; -using ServiceProvider = Akka.DependencyInjection.ServiceProvider; namespace Samples.Akka.AspNetCore.Actors { @@ -41,14 +40,14 @@ public async Task StartAsync(CancellationToken cancellationToken) { var hocon = ConfigurationFactory.ParseString(await File.ReadAllTextAsync("app.conf", cancellationToken)); var bootstrap = BootstrapSetup.Create().WithConfig(hocon); - var di = ServiceProviderSetup.Create(_sp); + var di = DependencyResolverSetup.Create(_sp); var actorSystemSetup = bootstrap.And(di); _actorSystem = ActorSystem.Create("AspNetDemo", actorSystemSetup); // // // props created via IServiceProvider dependency injection - var hasherProps = ServiceProvider.For(_actorSystem).Props(); + var hasherProps = DependencyResolver.For(_actorSystem).Props(); RouterActor = _actorSystem.ActorOf(hasherProps.WithRouter(FromConfig.Instance), "hasher"); // From 410066f425898e8dbda27b973e0008b131344152 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Fri, 28 May 2021 03:46:08 +0700 Subject: [PATCH 22/50] [MNTR] fix test dll loading error (#5044) * Fix assembly loading * Fix regex for passing test * Try to start a full build * Regex \w includes underscore * Revert back regex to \w * Update NotUsed.cs Co-authored-by: Aaron Stannard --- src/core/Akka.MultiNodeTestRunner.Shared/Sinks/MessageSink.cs | 2 +- src/core/Akka.MultiNodeTestRunner/Discovery.cs | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/core/Akka.MultiNodeTestRunner.Shared/Sinks/MessageSink.cs b/src/core/Akka.MultiNodeTestRunner.Shared/Sinks/MessageSink.cs index 79288d13e87..a60084df8a3 100644 --- a/src/core/Akka.MultiNodeTestRunner.Shared/Sinks/MessageSink.cs +++ b/src/core/Akka.MultiNodeTestRunner.Shared/Sinks/MessageSink.cs @@ -89,7 +89,7 @@ public enum MultiNodeTestRunnerMessageType }; private const string NodePassStatusRegexString = - @"\[(\w){4}(?[0-9]{1,2})(?:\w+)?\]\[(?