Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TestKit ReceiveWhile did not insert last inspected message properly. #5092

16 changes: 15 additions & 1 deletion src/core/Akka.TestKit.Tests/TestKitBaseTests/ReceiveTests.cs
Expand Up @@ -134,8 +134,22 @@ public void ReceiveWhile_Predicate_should_not_consume_last_message_that_didnt_ma
TestActor.Tell("1");
TestActor.Tell("2");
TestActor.Tell(4711);
ReceiveWhile<object>(_ => _ is string);
TestActor.Tell("3");
TestActor.Tell("4");
TestActor.Tell("5");
TestActor.Tell(6);
TestActor.Tell("7");
TestActor.Tell("8");

var received = ReceiveWhile<object>(_ => _ is string);
received.ShouldOnlyContainInOrder("1", "2");

ExpectMsg(4711);

received = ReceiveWhile<object>(_ => _ is string);
received.ShouldOnlyContainInOrder("3", "4", "5");

ExpectMsg(6);
}

}
Expand Down
101 changes: 40 additions & 61 deletions src/core/Akka.TestKit/Internal/BlockingQueue.cs
Expand Up @@ -22,7 +22,7 @@ namespace Akka.TestKit.Internal
/// <typeparam name="T">The type of item to store.</typeparam>
public class BlockingQueue<T>
{
private readonly BlockingCollection<Positioned> _collection = new BlockingCollection<Positioned>();
private readonly BlockingCollection<Positioned> _collection = new BlockingCollection<Positioned>(new QueueWithAddFirst());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QueueWithAddFirst is the backing IProducerConsumerCollection for the internal BlockingCollection that allows proper item insertion in the head of the queue.


/// <summary>
/// The number of items that are currently in the queue.
Expand All @@ -35,7 +35,8 @@ public class BlockingQueue<T>
/// <param name="item">The item to add to the queue.</param>
public void Enqueue(T item)
{
_collection.TryAdd(new Positioned(item));
if (!_collection.TryAdd(new Positioned(item)))
throw new InvalidOperationException("Failed to enqueue item into the queue.");
}

/// <summary>
Expand All @@ -44,7 +45,8 @@ public void Enqueue(T item)
/// <param name="item">The item to add to the queue.</param>
public void AddFirst(T item)
{
_collection.TryAdd(new Positioned(item, first:true));
if(!_collection.TryAdd(new Positioned(item, first:true)))
throw new InvalidOperationException("Failed to enqueue item into the head of the queue.");
}

/// <summary>
Expand All @@ -67,13 +69,12 @@ public bool TryEnqueue(T item, int millisecondsTimeout, CancellationToken cancel
/// <returns><c>true</c> if the item was removed; otherwise, <c>false</c>.</returns>
public bool TryTake(out T item)
{
Positioned p;
if(_collection.TryTake(out p))
if(_collection.TryTake(out var p))
{
item = p.Value;
return true;
}
item = default(T);
item = default;
return false;
}

Expand All @@ -87,13 +88,12 @@ public bool TryTake(out T item)
/// <returns><c>true</c> if the remove completed within the specified timeout; otherwise, <c>false</c>.</returns>
public bool TryTake(out T item, int millisecondsTimeout, CancellationToken cancellationToken)
{
Positioned p;
if(_collection.TryTake(out p,millisecondsTimeout,cancellationToken))
if(_collection.TryTake(out var p, millisecondsTimeout, cancellationToken))
{
item = p.Value;
return true;
}
item = default(T);
item = default;
return false;
}

Expand All @@ -118,12 +118,7 @@ public T Take(CancellationToken cancellationToken)
public List<T> ToList()
{
var positionArray = _collection.ToArray();
var items = new List<T>();
foreach (var positioned in positionArray)
{
items.Add(positioned.Value);
}
return items;
return positionArray.Select(positioned => positioned.Value).ToList();
}


Expand All @@ -145,59 +140,48 @@ public Positioned(T value, bool first = false)
private class QueueWithAddFirst : IProducerConsumerCollection<Positioned>
{
private readonly LinkedList<Positioned> _list = new LinkedList<Positioned>();
private readonly object _lock = new object();

public int Count { get { return _list.Count; } }

public bool TryAdd(Positioned item)
{
if(item.First)
public int Count {
get
{
lock(_lock)
lock (SyncRoot)
{
_list.AddFirst(item);
return _list.Count;
}
}
else
}

public bool TryAdd(Positioned item)
{
lock (SyncRoot)
{
lock(_lock)
{
if(item.First)
_list.AddFirst(item);
else
_list.AddLast(item);
}
return true;
}
return true;
}

public bool TryTake(out Positioned item)
{
var result = false;
if(_list.Count == 0)
lock(SyncRoot)
{
item = null;
}
else
{
lock(_lock)
if(_list.Count == 0)
{
if(_list.Count == 0)
{
item = null;
}
else
{
item = _list.First.Value;
_list.RemoveFirst();
result = true;
}
item = null;
return false;
}

item = _list.First.Value;
_list.RemoveFirst();
return true;
}
return result;
}


public void CopyTo(Positioned[] array, int index)
{
lock(_lock)
lock(SyncRoot)
{
_list.CopyTo(array, index);
}
Expand All @@ -206,43 +190,38 @@ public void CopyTo(Positioned[] array, int index)

public void CopyTo(Array array, int index)
{
lock(_lock)
lock(SyncRoot)
{
((ICollection)_list).CopyTo(array, index);
}
}

public Positioned[] ToArray()
{
Positioned[] array;
lock(_lock)
lock(SyncRoot)
{
array = _list.ToArray();
return _list.ToArray();
}
return array;
}


public IEnumerator<Positioned> GetEnumerator()
{
//We must create a copy
List<Positioned> copy;
lock(_lock)
lock(SyncRoot)
{
copy = new List<Positioned>(_list);
//We must create a copy
return new List<Positioned>(_list).GetEnumerator();
}
return copy.GetEnumerator();
}

IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}

public object SyncRoot { get; } = new object();

public object SyncRoot { get { throw new NotImplementedException(); } }

public bool IsSynchronized { get { return false; } }
public bool IsSynchronized => true;
}
}
}