Skip to content

Commit

Permalink
TestKit ReceiveWhile did not insert last inspected message properly. (#…
Browse files Browse the repository at this point in the history
…5092)

* Add bug reproduction spec

* Code cleanup, make sure locks are used properly

* Make sure BlockingQueue inserts items properly

* Code and spec cleanup, make sure that code is readable and works as intended

* Revert "Code and spec cleanup, make sure that code is readable and works as intended"

This reverts commit 09ece29.

Co-authored-by: Aaron Stannard <aaron@petabridge.com>
  • Loading branch information
Arkatufus and Aaronontheweb committed Jun 15, 2021
1 parent c485d9e commit 456d795
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 62 deletions.
16 changes: 15 additions & 1 deletion src/core/Akka.TestKit.Tests/TestKitBaseTests/ReceiveTests.cs
Expand Up @@ -134,8 +134,22 @@ public void ReceiveWhile_Predicate_should_not_consume_last_message_that_didnt_ma
TestActor.Tell("1");
TestActor.Tell("2");
TestActor.Tell(4711);
ReceiveWhile<object>(_ => _ is string);
TestActor.Tell("3");
TestActor.Tell("4");
TestActor.Tell("5");
TestActor.Tell(6);
TestActor.Tell("7");
TestActor.Tell("8");

var received = ReceiveWhile<object>(_ => _ is string);
received.ShouldOnlyContainInOrder("1", "2");

ExpectMsg(4711);

received = ReceiveWhile<object>(_ => _ is string);
received.ShouldOnlyContainInOrder("3", "4", "5");

ExpectMsg(6);
}

}
Expand Down
101 changes: 40 additions & 61 deletions src/core/Akka.TestKit/Internal/BlockingQueue.cs
Expand Up @@ -22,7 +22,7 @@ namespace Akka.TestKit.Internal
/// <typeparam name="T">The type of item to store.</typeparam>
public class BlockingQueue<T>
{
private readonly BlockingCollection<Positioned> _collection = new BlockingCollection<Positioned>();
private readonly BlockingCollection<Positioned> _collection = new BlockingCollection<Positioned>(new QueueWithAddFirst());

/// <summary>
/// The number of items that are currently in the queue.
Expand All @@ -35,7 +35,8 @@ public class BlockingQueue<T>
/// <param name="item">The item to add to the queue.</param>
public void Enqueue(T item)
{
_collection.TryAdd(new Positioned(item));
if (!_collection.TryAdd(new Positioned(item)))
throw new InvalidOperationException("Failed to enqueue item into the queue.");
}

/// <summary>
Expand All @@ -44,7 +45,8 @@ public void Enqueue(T item)
/// <param name="item">The item to add to the queue.</param>
public void AddFirst(T item)
{
_collection.TryAdd(new Positioned(item, first:true));
if(!_collection.TryAdd(new Positioned(item, first:true)))
throw new InvalidOperationException("Failed to enqueue item into the head of the queue.");
}

/// <summary>
Expand All @@ -67,13 +69,12 @@ public bool TryEnqueue(T item, int millisecondsTimeout, CancellationToken cancel
/// <returns><c>true</c> if the item was removed; otherwise, <c>false</c>.</returns>
public bool TryTake(out T item)
{
Positioned p;
if(_collection.TryTake(out p))
if(_collection.TryTake(out var p))
{
item = p.Value;
return true;
}
item = default(T);
item = default;
return false;
}

Expand All @@ -87,13 +88,12 @@ public bool TryTake(out T item)
/// <returns><c>true</c> if the remove completed within the specified timeout; otherwise, <c>false</c>.</returns>
public bool TryTake(out T item, int millisecondsTimeout, CancellationToken cancellationToken)
{
Positioned p;
if(_collection.TryTake(out p,millisecondsTimeout,cancellationToken))
if(_collection.TryTake(out var p, millisecondsTimeout, cancellationToken))
{
item = p.Value;
return true;
}
item = default(T);
item = default;
return false;
}

Expand All @@ -118,12 +118,7 @@ public T Take(CancellationToken cancellationToken)
public List<T> ToList()
{
var positionArray = _collection.ToArray();
var items = new List<T>();
foreach (var positioned in positionArray)
{
items.Add(positioned.Value);
}
return items;
return positionArray.Select(positioned => positioned.Value).ToList();
}


Expand All @@ -145,59 +140,48 @@ public Positioned(T value, bool first = false)
private class QueueWithAddFirst : IProducerConsumerCollection<Positioned>
{
private readonly LinkedList<Positioned> _list = new LinkedList<Positioned>();
private readonly object _lock = new object();

public int Count { get { return _list.Count; } }

public bool TryAdd(Positioned item)
{
if(item.First)
public int Count {
get
{
lock(_lock)
lock (SyncRoot)
{
_list.AddFirst(item);
return _list.Count;
}
}
else
}

public bool TryAdd(Positioned item)
{
lock (SyncRoot)
{
lock(_lock)
{
if(item.First)
_list.AddFirst(item);
else
_list.AddLast(item);
}
return true;
}
return true;
}

public bool TryTake(out Positioned item)
{
var result = false;
if(_list.Count == 0)
lock(SyncRoot)
{
item = null;
}
else
{
lock(_lock)
if(_list.Count == 0)
{
if(_list.Count == 0)
{
item = null;
}
else
{
item = _list.First.Value;
_list.RemoveFirst();
result = true;
}
item = null;
return false;
}

item = _list.First.Value;
_list.RemoveFirst();
return true;
}
return result;
}


public void CopyTo(Positioned[] array, int index)
{
lock(_lock)
lock(SyncRoot)
{
_list.CopyTo(array, index);
}
Expand All @@ -206,43 +190,38 @@ public void CopyTo(Positioned[] array, int index)

public void CopyTo(Array array, int index)
{
lock(_lock)
lock(SyncRoot)
{
((ICollection)_list).CopyTo(array, index);
}
}

public Positioned[] ToArray()
{
Positioned[] array;
lock(_lock)
lock(SyncRoot)
{
array = _list.ToArray();
return _list.ToArray();
}
return array;
}


public IEnumerator<Positioned> GetEnumerator()
{
//We must create a copy
List<Positioned> copy;
lock(_lock)
lock(SyncRoot)
{
copy = new List<Positioned>(_list);
//We must create a copy
return new List<Positioned>(_list).GetEnumerator();
}
return copy.GetEnumerator();
}

IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}

public object SyncRoot { get; } = new object();

public object SyncRoot { get { throw new NotImplementedException(); } }

public bool IsSynchronized { get { return false; } }
public bool IsSynchronized => true;
}
}
}

0 comments on commit 456d795

Please sign in to comment.