From ae5427399be173a5dad7bb5c72266147ae5d9250 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Mon, 14 Jun 2021 22:49:04 +0700 Subject: [PATCH 1/5] Add bug reproduction spec --- .../TestKitBaseTests/ReceiveTests.cs | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/src/core/Akka.TestKit.Tests/TestKitBaseTests/ReceiveTests.cs b/src/core/Akka.TestKit.Tests/TestKitBaseTests/ReceiveTests.cs index ed00c3c20b3..a44c7da5606 100644 --- a/src/core/Akka.TestKit.Tests/TestKitBaseTests/ReceiveTests.cs +++ b/src/core/Akka.TestKit.Tests/TestKitBaseTests/ReceiveTests.cs @@ -134,8 +134,22 @@ public void ReceiveWhile_Predicate_should_not_consume_last_message_that_didnt_ma TestActor.Tell("1"); TestActor.Tell("2"); TestActor.Tell(4711); - ReceiveWhile(_ => _ is string); + TestActor.Tell("3"); + TestActor.Tell("4"); + TestActor.Tell("5"); + TestActor.Tell(6); + TestActor.Tell("7"); + TestActor.Tell("8"); + + var received = ReceiveWhile(_ => _ is string); + received.ShouldOnlyContainInOrder("1", "2"); + ExpectMsg(4711); + + received = ReceiveWhile(_ => _ is string); + received.ShouldOnlyContainInOrder("3", "4", "5"); + + ExpectMsg(6); } } From 3a345ad08a2bb40ad48d12e083b6dae608892fc8 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Mon, 14 Jun 2021 23:16:32 +0700 Subject: [PATCH 2/5] Code cleanup, make sure locks are used properly --- .../Akka.TestKit/Internal/BlockingQueue.cs | 99 ++++++++----------- 1 file changed, 39 insertions(+), 60 deletions(-) diff --git a/src/core/Akka.TestKit/Internal/BlockingQueue.cs b/src/core/Akka.TestKit/Internal/BlockingQueue.cs index 2eac4a113b2..5abfa099a59 100644 --- a/src/core/Akka.TestKit/Internal/BlockingQueue.cs +++ b/src/core/Akka.TestKit/Internal/BlockingQueue.cs @@ -35,7 +35,8 @@ public class BlockingQueue /// The item to add to the queue. public void Enqueue(T item) { - _collection.TryAdd(new Positioned(item)); + if (!_collection.TryAdd(new Positioned(item))) + throw new InvalidOperationException("Failed to enqueue item into the queue."); } /// @@ -44,7 +45,8 @@ public void Enqueue(T item) /// The item to add to the queue. public void AddFirst(T item) { - _collection.TryAdd(new Positioned(item, first:true)); + if(!_collection.TryAdd(new Positioned(item, first:true))) + throw new InvalidOperationException("Failed to enqueue item into the head of the queue."); } /// @@ -67,13 +69,12 @@ public bool TryEnqueue(T item, int millisecondsTimeout, CancellationToken cancel /// true if the item was removed; otherwise, false. public bool TryTake(out T item) { - Positioned p; - if(_collection.TryTake(out p)) + if(_collection.TryTake(out var p)) { item = p.Value; return true; } - item = default(T); + item = default; return false; } @@ -87,13 +88,12 @@ public bool TryTake(out T item) /// true if the remove completed within the specified timeout; otherwise, false. public bool TryTake(out T item, int millisecondsTimeout, CancellationToken cancellationToken) { - Positioned p; - if(_collection.TryTake(out p,millisecondsTimeout,cancellationToken)) + if(_collection.TryTake(out var p, millisecondsTimeout, cancellationToken)) { item = p.Value; return true; } - item = default(T); + item = default; return false; } @@ -118,12 +118,7 @@ public T Take(CancellationToken cancellationToken) public List ToList() { var positionArray = _collection.ToArray(); - var items = new List(); - foreach (var positioned in positionArray) - { - items.Add(positioned.Value); - } - return items; + return positionArray.Select(positioned => positioned.Value).ToList(); } @@ -145,59 +140,48 @@ public Positioned(T value, bool first = false) private class QueueWithAddFirst : IProducerConsumerCollection { private readonly LinkedList _list = new LinkedList(); - private readonly object _lock = new object(); - - public int Count { get { return _list.Count; } } - public bool TryAdd(Positioned item) - { - if(item.First) + public int Count { + get { - lock(_lock) + lock (SyncRoot) { - _list.AddFirst(item); + return _list.Count; } } - else + } + + public bool TryAdd(Positioned item) + { + lock (SyncRoot) { - lock(_lock) - { + if(item.First) + _list.AddFirst(item); + else _list.AddLast(item); - } + return true; } - return true; } public bool TryTake(out Positioned item) { - var result = false; - if(_list.Count == 0) + lock(SyncRoot) { - item = null; - } - else - { - lock(_lock) + if(_list.Count == 0) { - if(_list.Count == 0) - { - item = null; - } - else - { - item = _list.First.Value; - _list.RemoveFirst(); - result = true; - } + item = null; + return false; } + + item = _list.First.Value; + _list.RemoveFirst(); + return true; } - return result; } - public void CopyTo(Positioned[] array, int index) { - lock(_lock) + lock(SyncRoot) { _list.CopyTo(array, index); } @@ -206,7 +190,7 @@ public void CopyTo(Positioned[] array, int index) public void CopyTo(Array array, int index) { - lock(_lock) + lock(SyncRoot) { ((ICollection)_list).CopyTo(array, index); } @@ -214,24 +198,20 @@ public void CopyTo(Array array, int index) public Positioned[] ToArray() { - Positioned[] array; - lock(_lock) + lock(SyncRoot) { - array = _list.ToArray(); + return _list.ToArray(); } - return array; } public IEnumerator GetEnumerator() { - //We must create a copy - List copy; - lock(_lock) + lock(SyncRoot) { - copy = new List(_list); + //We must create a copy + return new List(_list).GetEnumerator(); } - return copy.GetEnumerator(); } IEnumerator IEnumerable.GetEnumerator() @@ -239,10 +219,9 @@ IEnumerator IEnumerable.GetEnumerator() return GetEnumerator(); } + public object SyncRoot { get; } = new object(); - public object SyncRoot { get { throw new NotImplementedException(); } } - - public bool IsSynchronized { get { return false; } } + public bool IsSynchronized => true; } } } From 4c605395c24a5ef8d4501818cf092673f1998339 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Mon, 14 Jun 2021 23:17:08 +0700 Subject: [PATCH 3/5] Make sure BlockingQueue inserts items properly --- src/core/Akka.TestKit/Internal/BlockingQueue.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/Akka.TestKit/Internal/BlockingQueue.cs b/src/core/Akka.TestKit/Internal/BlockingQueue.cs index 5abfa099a59..63ba34d6bf1 100644 --- a/src/core/Akka.TestKit/Internal/BlockingQueue.cs +++ b/src/core/Akka.TestKit/Internal/BlockingQueue.cs @@ -22,7 +22,7 @@ namespace Akka.TestKit.Internal /// The type of item to store. public class BlockingQueue { - private readonly BlockingCollection _collection = new BlockingCollection(); + private readonly BlockingCollection _collection = new BlockingCollection(new QueueWithAddFirst()); /// /// The number of items that are currently in the queue. From 09ece291d65ae1caebebbcee690e4fb2886a68ef Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Tue, 15 Jun 2021 01:25:14 +0700 Subject: [PATCH 4/5] Code and spec cleanup, make sure that code is readable and works as intended --- .../TestKitBaseTests/ReceiveTests.cs | 29 +++--- src/core/Akka.TestKit/TestKitBase_Receive.cs | 97 +++++++++++-------- 2 files changed, 73 insertions(+), 53 deletions(-) diff --git a/src/core/Akka.TestKit.Tests/TestKitBaseTests/ReceiveTests.cs b/src/core/Akka.TestKit.Tests/TestKitBaseTests/ReceiveTests.cs index a44c7da5606..ed84e0f0776 100644 --- a/src/core/Akka.TestKit.Tests/TestKitBaseTests/ReceiveTests.cs +++ b/src/core/Akka.TestKit.Tests/TestKitBaseTests/ReceiveTests.cs @@ -9,6 +9,7 @@ using Akka.Actor; using Akka.TestKit; using Xunit; +using Xunit.Sdk; namespace Akka.Testkit.Tests.TestKitBaseTests { @@ -28,7 +29,7 @@ public void ReceiveN_should_receive_correct_number_of_messages() [Fact] public void ReceiveN_should_timeout_if_no_messages() { - Intercept(() => ReceiveN(3, TimeSpan.FromMilliseconds(10))); + AssertThrows(() => ReceiveN(3, TimeSpan.FromMilliseconds(10))); } [Fact] @@ -36,7 +37,7 @@ public void ReceiveN_should_timeout_if_to_few_messages() { TestActor.Tell("1"); TestActor.Tell("2"); - Intercept(() => ReceiveN(3, TimeSpan.FromMilliseconds(100))); + AssertThrows(() => ReceiveN(3, TimeSpan.FromMilliseconds(100))); } @@ -53,7 +54,7 @@ public void FishForMessage_should_return_matched_message() [Fact] public void FishForMessage_should_timeout_if_no_messages() { - Intercept(() => FishForMessage(_=>false, TimeSpan.FromMilliseconds(10))); + AssertThrows(() => FishForMessage(_=>false, TimeSpan.FromMilliseconds(10))); } [Fact] @@ -61,7 +62,7 @@ public void FishForMessage_should_timeout_if_to_few_messages() { TestActor.Tell("1"); TestActor.Tell("2"); - Intercept(() => FishForMessage(_ => false, TimeSpan.FromMilliseconds(100))); + AssertThrows(() => FishForMessage(_ => false, TimeSpan.FromMilliseconds(100))); } [Fact] @@ -87,8 +88,18 @@ public void ReceiveWhile_Filter_should_not_consume_last_message_that_didnt_match TestActor.Tell("1"); TestActor.Tell("2"); TestActor.Tell(4711); - ReceiveWhile(_ => _ is string ? _ : null); + TestActor.Tell("3"); + TestActor.Tell("4"); + TestActor.Tell("56"); + TestActor.Tell("7"); + + ReceiveWhile(_ => _ is string ? _ : null) + .ShouldOnlyContainInOrder("1", "2"); ExpectMsg(4711); + + ReceiveWhile(_ => _ is string s && s.Length == 1 ? s : null) + .ShouldOnlyContainInOrder("3", "4"); + ExpectMsg("56"); } [Fact] @@ -141,14 +152,10 @@ public void ReceiveWhile_Predicate_should_not_consume_last_message_that_didnt_ma TestActor.Tell("7"); TestActor.Tell("8"); - var received = ReceiveWhile(_ => _ is string); - received.ShouldOnlyContainInOrder("1", "2"); - + ReceiveWhile(_ => _ is string).ShouldOnlyContainInOrder("1", "2"); ExpectMsg(4711); - received = ReceiveWhile(_ => _ is string); - received.ShouldOnlyContainInOrder("3", "4", "5"); - + ReceiveWhile(_ => _ is string).ShouldOnlyContainInOrder("3", "4", "5"); ExpectMsg(6); } diff --git a/src/core/Akka.TestKit/TestKitBase_Receive.cs b/src/core/Akka.TestKit/TestKitBase_Receive.cs index b003168f891..f490b47e5a0 100644 --- a/src/core/Akka.TestKit/TestKitBase_Receive.cs +++ b/src/core/Akka.TestKit/TestKitBase_Receive.cs @@ -229,36 +229,46 @@ private bool InternalTryReceiveOne(out MessageEnvelope envelope, TimeSpan? max, /// TBD public IReadOnlyList ReceiveWhile(Func filter, TimeSpan? max = null, TimeSpan? idle = null, int msgs = int.MaxValue) where T : class { - var maxValue = RemainingOrDilated(max); var start = Now; + var maxValue = RemainingOrDilated(max); var stop = start + maxValue; ConditionalLog("Trying to receive {0}messages of type {1} while filter returns non-nulls during {2}", msgs == int.MaxValue ? "" : msgs + " ", typeof(T), maxValue); - var count = 0; + var acc = new List(); var idleValue = idle.GetValueOrDefault(Timeout.InfiniteTimeSpan); MessageEnvelope msg = NullMessageEnvelope.Instance; - while (count < msgs) + while (acc.Count < msgs) { - MessageEnvelope envelope; - if (!TryReceiveOne(out envelope, (stop - Now).Min(idleValue))) - { - _testState.LastMessage = msg; + if (!TryReceiveOne(out var envelope, (stop - Now).Min(idleValue))) break; - } - var message = envelope.Message; - var result = filter(message); - if (result == null) + + var shouldStop = false; + switch (envelope) { - _testState.Queue.AddFirst(envelope); //Put the message back in the queue - _testState.LastMessage = msg; - break; + case NullMessageEnvelope _: + shouldStop = true; + break; + + case RealMessageEnvelope m when filter(m.Message) != null: + msg = _testState.LastMessage; + acc.Add(filter(m.Message)); + break; + + case RealMessageEnvelope _: + _testState.Queue.AddFirst(envelope); //Put the message back in the queue + shouldStop = true; + break; + + case var unexpected: + throw new Exception($"Unexpected {unexpected}"); } - msg = envelope; - acc.Add(result); - count++; + + if (shouldStop) + break; } - ConditionalLog("Received {0} messages with filter during {1}", count, Now - start); + ConditionalLog("Received {0} messages with filter during {1}", acc.Count, Now - start); + _testState.LastMessage = msg; _testState.LastWasNoMsg = true; return acc; } @@ -290,47 +300,50 @@ private bool InternalTryReceiveOne(out MessageEnvelope envelope, TimeSpan? max, var stop = start + maxValue; ConditionalLog("Trying to receive {0}messages of type {1} while predicate returns true during {2}. Messages of other types will {3}", msgs == int.MaxValue ? "" : msgs + " ", typeof(T), maxValue, shouldIgnoreOtherMessageTypes ? "be ignored" : "cause this to stop"); - var count = 0; var acc = new List(); var idleValue = idle.GetValueOrDefault(Timeout.InfiniteTimeSpan); MessageEnvelope msg = NullMessageEnvelope.Instance; - while (count < msgs) + while (acc.Count < msgs) { - MessageEnvelope envelope; - if (!TryReceiveOne(out envelope, (stop - Now).Min(idleValue))) - { - _testState.LastMessage = msg; + if (!TryReceiveOne(out var envelope, (stop - Now).Min(idleValue))) break; - } - var message = envelope.Message; - var typedMessage = message as T; + var shouldStop = false; - if (typedMessage != null) + switch (envelope) { - if (shouldIgnore(typedMessage)) - { - acc.Add(typedMessage); - count++; - } - else - { + case NullMessageEnvelope _: shouldStop = true; - } - } - else - { - shouldStop = !shouldIgnoreOtherMessageTypes; + break; + + case RealMessageEnvelope m when m.Message is T typedMessage: + if (shouldIgnore(typedMessage)) + { + msg = _testState.LastMessage; + acc.Add(typedMessage); + break; + } + shouldStop = true; + break; + + case RealMessageEnvelope _: + shouldStop = !shouldIgnoreOtherMessageTypes; + break; + + case var unexpected: + throw new Exception($"Unexpected {unexpected}"); } + if (shouldStop) { _testState.Queue.AddFirst(envelope); //Put the message back in the queue - _testState.LastMessage = msg; break; } msg = envelope; } - ConditionalLog("Received {0} messages with filter during {1}", count, Now - start); + ConditionalLog("Received {0} messages with filter during {1}", acc.Count, Now - start); + + _testState.LastMessage = msg; _testState.LastWasNoMsg = true; return acc; } From b07db6f7f3d6d7d64611d7b1693cd39680911861 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Tue, 15 Jun 2021 02:13:22 +0700 Subject: [PATCH 5/5] Revert "Code and spec cleanup, make sure that code is readable and works as intended" This reverts commit 09ece291d65ae1caebebbcee690e4fb2886a68ef. --- .../TestKitBaseTests/ReceiveTests.cs | 29 +++--- src/core/Akka.TestKit/TestKitBase_Receive.cs | 97 ++++++++----------- 2 files changed, 53 insertions(+), 73 deletions(-) diff --git a/src/core/Akka.TestKit.Tests/TestKitBaseTests/ReceiveTests.cs b/src/core/Akka.TestKit.Tests/TestKitBaseTests/ReceiveTests.cs index ed84e0f0776..a44c7da5606 100644 --- a/src/core/Akka.TestKit.Tests/TestKitBaseTests/ReceiveTests.cs +++ b/src/core/Akka.TestKit.Tests/TestKitBaseTests/ReceiveTests.cs @@ -9,7 +9,6 @@ using Akka.Actor; using Akka.TestKit; using Xunit; -using Xunit.Sdk; namespace Akka.Testkit.Tests.TestKitBaseTests { @@ -29,7 +28,7 @@ public void ReceiveN_should_receive_correct_number_of_messages() [Fact] public void ReceiveN_should_timeout_if_no_messages() { - AssertThrows(() => ReceiveN(3, TimeSpan.FromMilliseconds(10))); + Intercept(() => ReceiveN(3, TimeSpan.FromMilliseconds(10))); } [Fact] @@ -37,7 +36,7 @@ public void ReceiveN_should_timeout_if_to_few_messages() { TestActor.Tell("1"); TestActor.Tell("2"); - AssertThrows(() => ReceiveN(3, TimeSpan.FromMilliseconds(100))); + Intercept(() => ReceiveN(3, TimeSpan.FromMilliseconds(100))); } @@ -54,7 +53,7 @@ public void FishForMessage_should_return_matched_message() [Fact] public void FishForMessage_should_timeout_if_no_messages() { - AssertThrows(() => FishForMessage(_=>false, TimeSpan.FromMilliseconds(10))); + Intercept(() => FishForMessage(_=>false, TimeSpan.FromMilliseconds(10))); } [Fact] @@ -62,7 +61,7 @@ public void FishForMessage_should_timeout_if_to_few_messages() { TestActor.Tell("1"); TestActor.Tell("2"); - AssertThrows(() => FishForMessage(_ => false, TimeSpan.FromMilliseconds(100))); + Intercept(() => FishForMessage(_ => false, TimeSpan.FromMilliseconds(100))); } [Fact] @@ -88,18 +87,8 @@ public void ReceiveWhile_Filter_should_not_consume_last_message_that_didnt_match TestActor.Tell("1"); TestActor.Tell("2"); TestActor.Tell(4711); - TestActor.Tell("3"); - TestActor.Tell("4"); - TestActor.Tell("56"); - TestActor.Tell("7"); - - ReceiveWhile(_ => _ is string ? _ : null) - .ShouldOnlyContainInOrder("1", "2"); + ReceiveWhile(_ => _ is string ? _ : null); ExpectMsg(4711); - - ReceiveWhile(_ => _ is string s && s.Length == 1 ? s : null) - .ShouldOnlyContainInOrder("3", "4"); - ExpectMsg("56"); } [Fact] @@ -152,10 +141,14 @@ public void ReceiveWhile_Predicate_should_not_consume_last_message_that_didnt_ma TestActor.Tell("7"); TestActor.Tell("8"); - ReceiveWhile(_ => _ is string).ShouldOnlyContainInOrder("1", "2"); + var received = ReceiveWhile(_ => _ is string); + received.ShouldOnlyContainInOrder("1", "2"); + ExpectMsg(4711); - ReceiveWhile(_ => _ is string).ShouldOnlyContainInOrder("3", "4", "5"); + received = ReceiveWhile(_ => _ is string); + received.ShouldOnlyContainInOrder("3", "4", "5"); + ExpectMsg(6); } diff --git a/src/core/Akka.TestKit/TestKitBase_Receive.cs b/src/core/Akka.TestKit/TestKitBase_Receive.cs index f490b47e5a0..b003168f891 100644 --- a/src/core/Akka.TestKit/TestKitBase_Receive.cs +++ b/src/core/Akka.TestKit/TestKitBase_Receive.cs @@ -229,46 +229,36 @@ private bool InternalTryReceiveOne(out MessageEnvelope envelope, TimeSpan? max, /// TBD public IReadOnlyList ReceiveWhile(Func filter, TimeSpan? max = null, TimeSpan? idle = null, int msgs = int.MaxValue) where T : class { - var start = Now; var maxValue = RemainingOrDilated(max); + var start = Now; var stop = start + maxValue; ConditionalLog("Trying to receive {0}messages of type {1} while filter returns non-nulls during {2}", msgs == int.MaxValue ? "" : msgs + " ", typeof(T), maxValue); - + var count = 0; var acc = new List(); var idleValue = idle.GetValueOrDefault(Timeout.InfiniteTimeSpan); MessageEnvelope msg = NullMessageEnvelope.Instance; - while (acc.Count < msgs) + while (count < msgs) { - if (!TryReceiveOne(out var envelope, (stop - Now).Min(idleValue))) - break; - - var shouldStop = false; - switch (envelope) + MessageEnvelope envelope; + if (!TryReceiveOne(out envelope, (stop - Now).Min(idleValue))) { - case NullMessageEnvelope _: - shouldStop = true; - break; - - case RealMessageEnvelope m when filter(m.Message) != null: - msg = _testState.LastMessage; - acc.Add(filter(m.Message)); - break; - - case RealMessageEnvelope _: - _testState.Queue.AddFirst(envelope); //Put the message back in the queue - shouldStop = true; - break; - - case var unexpected: - throw new Exception($"Unexpected {unexpected}"); + _testState.LastMessage = msg; + break; } - - if (shouldStop) + var message = envelope.Message; + var result = filter(message); + if (result == null) + { + _testState.Queue.AddFirst(envelope); //Put the message back in the queue + _testState.LastMessage = msg; break; + } + msg = envelope; + acc.Add(result); + count++; } + ConditionalLog("Received {0} messages with filter during {1}", count, Now - start); - ConditionalLog("Received {0} messages with filter during {1}", acc.Count, Now - start); - _testState.LastMessage = msg; _testState.LastWasNoMsg = true; return acc; } @@ -300,50 +290,47 @@ private bool InternalTryReceiveOne(out MessageEnvelope envelope, TimeSpan? max, var stop = start + maxValue; ConditionalLog("Trying to receive {0}messages of type {1} while predicate returns true during {2}. Messages of other types will {3}", msgs == int.MaxValue ? "" : msgs + " ", typeof(T), maxValue, shouldIgnoreOtherMessageTypes ? "be ignored" : "cause this to stop"); + var count = 0; var acc = new List(); var idleValue = idle.GetValueOrDefault(Timeout.InfiniteTimeSpan); MessageEnvelope msg = NullMessageEnvelope.Instance; - while (acc.Count < msgs) + while (count < msgs) { - if (!TryReceiveOne(out var envelope, (stop - Now).Min(idleValue))) + MessageEnvelope envelope; + if (!TryReceiveOne(out envelope, (stop - Now).Min(idleValue))) + { + _testState.LastMessage = msg; break; - + } + var message = envelope.Message; + var typedMessage = message as T; var shouldStop = false; - switch (envelope) + if (typedMessage != null) { - case NullMessageEnvelope _: - shouldStop = true; - break; - - case RealMessageEnvelope m when m.Message is T typedMessage: - if (shouldIgnore(typedMessage)) - { - msg = _testState.LastMessage; - acc.Add(typedMessage); - break; - } + if (shouldIgnore(typedMessage)) + { + acc.Add(typedMessage); + count++; + } + else + { shouldStop = true; - break; - - case RealMessageEnvelope _: - shouldStop = !shouldIgnoreOtherMessageTypes; - break; - - case var unexpected: - throw new Exception($"Unexpected {unexpected}"); + } + } + else + { + shouldStop = !shouldIgnoreOtherMessageTypes; } - if (shouldStop) { _testState.Queue.AddFirst(envelope); //Put the message back in the queue + _testState.LastMessage = msg; break; } msg = envelope; } + ConditionalLog("Received {0} messages with filter during {1}", count, Now - start); - ConditionalLog("Received {0} messages with filter during {1}", acc.Count, Now - start); - - _testState.LastMessage = msg; _testState.LastWasNoMsg = true; return acc; }