diff --git a/src/core/Akka.TestKit.Tests/TestKitBaseTests/ReceiveTests.cs b/src/core/Akka.TestKit.Tests/TestKitBaseTests/ReceiveTests.cs index ed00c3c20b3..a44c7da5606 100644 --- a/src/core/Akka.TestKit.Tests/TestKitBaseTests/ReceiveTests.cs +++ b/src/core/Akka.TestKit.Tests/TestKitBaseTests/ReceiveTests.cs @@ -134,8 +134,22 @@ public void ReceiveWhile_Predicate_should_not_consume_last_message_that_didnt_ma TestActor.Tell("1"); TestActor.Tell("2"); TestActor.Tell(4711); - ReceiveWhile(_ => _ is string); + TestActor.Tell("3"); + TestActor.Tell("4"); + TestActor.Tell("5"); + TestActor.Tell(6); + TestActor.Tell("7"); + TestActor.Tell("8"); + + var received = ReceiveWhile(_ => _ is string); + received.ShouldOnlyContainInOrder("1", "2"); + ExpectMsg(4711); + + received = ReceiveWhile(_ => _ is string); + received.ShouldOnlyContainInOrder("3", "4", "5"); + + ExpectMsg(6); } } diff --git a/src/core/Akka.TestKit/Internal/BlockingQueue.cs b/src/core/Akka.TestKit/Internal/BlockingQueue.cs index 2eac4a113b2..63ba34d6bf1 100644 --- a/src/core/Akka.TestKit/Internal/BlockingQueue.cs +++ b/src/core/Akka.TestKit/Internal/BlockingQueue.cs @@ -22,7 +22,7 @@ namespace Akka.TestKit.Internal /// The type of item to store. public class BlockingQueue { - private readonly BlockingCollection _collection = new BlockingCollection(); + private readonly BlockingCollection _collection = new BlockingCollection(new QueueWithAddFirst()); /// /// The number of items that are currently in the queue. @@ -35,7 +35,8 @@ public class BlockingQueue /// The item to add to the queue. public void Enqueue(T item) { - _collection.TryAdd(new Positioned(item)); + if (!_collection.TryAdd(new Positioned(item))) + throw new InvalidOperationException("Failed to enqueue item into the queue."); } /// @@ -44,7 +45,8 @@ public void Enqueue(T item) /// The item to add to the queue. public void AddFirst(T item) { - _collection.TryAdd(new Positioned(item, first:true)); + if(!_collection.TryAdd(new Positioned(item, first:true))) + throw new InvalidOperationException("Failed to enqueue item into the head of the queue."); } /// @@ -67,13 +69,12 @@ public bool TryEnqueue(T item, int millisecondsTimeout, CancellationToken cancel /// true if the item was removed; otherwise, false. public bool TryTake(out T item) { - Positioned p; - if(_collection.TryTake(out p)) + if(_collection.TryTake(out var p)) { item = p.Value; return true; } - item = default(T); + item = default; return false; } @@ -87,13 +88,12 @@ public bool TryTake(out T item) /// true if the remove completed within the specified timeout; otherwise, false. public bool TryTake(out T item, int millisecondsTimeout, CancellationToken cancellationToken) { - Positioned p; - if(_collection.TryTake(out p,millisecondsTimeout,cancellationToken)) + if(_collection.TryTake(out var p, millisecondsTimeout, cancellationToken)) { item = p.Value; return true; } - item = default(T); + item = default; return false; } @@ -118,12 +118,7 @@ public T Take(CancellationToken cancellationToken) public List ToList() { var positionArray = _collection.ToArray(); - var items = new List(); - foreach (var positioned in positionArray) - { - items.Add(positioned.Value); - } - return items; + return positionArray.Select(positioned => positioned.Value).ToList(); } @@ -145,59 +140,48 @@ public Positioned(T value, bool first = false) private class QueueWithAddFirst : IProducerConsumerCollection { private readonly LinkedList _list = new LinkedList(); - private readonly object _lock = new object(); - - public int Count { get { return _list.Count; } } - public bool TryAdd(Positioned item) - { - if(item.First) + public int Count { + get { - lock(_lock) + lock (SyncRoot) { - _list.AddFirst(item); + return _list.Count; } } - else + } + + public bool TryAdd(Positioned item) + { + lock (SyncRoot) { - lock(_lock) - { + if(item.First) + _list.AddFirst(item); + else _list.AddLast(item); - } + return true; } - return true; } public bool TryTake(out Positioned item) { - var result = false; - if(_list.Count == 0) + lock(SyncRoot) { - item = null; - } - else - { - lock(_lock) + if(_list.Count == 0) { - if(_list.Count == 0) - { - item = null; - } - else - { - item = _list.First.Value; - _list.RemoveFirst(); - result = true; - } + item = null; + return false; } + + item = _list.First.Value; + _list.RemoveFirst(); + return true; } - return result; } - public void CopyTo(Positioned[] array, int index) { - lock(_lock) + lock(SyncRoot) { _list.CopyTo(array, index); } @@ -206,7 +190,7 @@ public void CopyTo(Positioned[] array, int index) public void CopyTo(Array array, int index) { - lock(_lock) + lock(SyncRoot) { ((ICollection)_list).CopyTo(array, index); } @@ -214,24 +198,20 @@ public void CopyTo(Array array, int index) public Positioned[] ToArray() { - Positioned[] array; - lock(_lock) + lock(SyncRoot) { - array = _list.ToArray(); + return _list.ToArray(); } - return array; } public IEnumerator GetEnumerator() { - //We must create a copy - List copy; - lock(_lock) + lock(SyncRoot) { - copy = new List(_list); + //We must create a copy + return new List(_list).GetEnumerator(); } - return copy.GetEnumerator(); } IEnumerator IEnumerable.GetEnumerator() @@ -239,10 +219,9 @@ IEnumerator IEnumerable.GetEnumerator() return GetEnumerator(); } + public object SyncRoot { get; } = new object(); - public object SyncRoot { get { throw new NotImplementedException(); } } - - public bool IsSynchronized { get { return false; } } + public bool IsSynchronized => true; } } }