Skip to content

A thread-safe generic first in-first out (FIFO) collection with support for priority queuing.

License

Notifications You must be signed in to change notification settings

noamyogev84/ConcurrentPriorityQueue

Repository files navigation

ConcurrentPriorityQueue Build Workflow Nuget

A thread-safe generic first in first out (FIFO) collection with support for priority queuing.

Nuget: https://www.nuget.org/packages/ConcurrentPriorityQueue

Features

  1. Thread-Safe.
  2. Manages items according to a First in first out policy and priority on top of that.
  3. Implements IProducerConsumerCollection<T> interface.
  4. Extends to a BlockingCollection<T>.
  5. Supports multi-frameworks, includes net48 netstandard2.0 net6.0 net8.0

Examples:

Items in the collection must implement the generic interface IHavePriority<T> where T: implements IEquatable<T>, IComparable<T> and also overrides Object.GetHashCode():

// Simplest implementation of IHavePriority<T>
public class SomeClass : IHavePriority<int> {
    int Priority {get; set;}
}

Simple flow for creating a Priority-By-Integer queue and adding an item:

// Create a new prioritized item.
var itemWithPriority = new SomeClass { Priority = 0 };

// Initialize an unbounded priority-by-integer queue.
var priorityQueue = new ConcurrentPriorityQueue<IHavePriority<int>, int>();

// Enqueue item and handle result.
Result result = priorityQueue.Enqueue(itemWithPriority);

Use the ConcurrentPriorityByIntegerQueue implementation to simplify the above example:

var priorityQueue = new ConcurrentPriorityByIntegerQueue<IHavePriority<int>>();

Consume items by priority/first-in-first-out policy, using Dequeue() and Peek():

// Lower value -> Higher priority.
var item1 = new SomeClass { Priority = 1 };
var item2 = new SomeClass { Priority = 0 };
var item3 = new SomeClass { Priority = 0 };

priorityQueue.Enqueue(item1);
priorityQueue.Enqueue(item2);
priorityQueue.Enqueue(item3);

var result = priority.Dequeue(); // item2
var result = priority.Dequeue(); // item3
var result = priority.Dequeue(); // item1

Iterating over the collection will yield items according to their priority and position (FIFO):

var item1 = new SomeClass { Priority = 1 };
var item2 = new SomeClass { Priority = 0 };
var item3 = new SomeClass { Priority = 0 };

priorityQueue.Enqueue(item1);
priorityQueue.Enqueue(item2);
priorityQueue.Enqueue(item3);

foreach(var item in priorityQueue) {
    // Iteration 1 -> item2
    // Iteration 2 -> item3
    // Iteration 3 -> item1
}

ConcurrentPriorityQueue supports Generic Priorities.

Implement your own Business Priority object and configure the queue to handle it:

// TimeToProcess class implements IEquatable<T>, IComparable<T> and overrides Object.GetHashCode().
public class TimeToProcess : IEquatable<TimeToProcess>, IComparable<TimeToProcess> {
    public decimal TimeInMilliseconds { get; set;}

    public int CompareTo(TimeToProcess other) =>
        TimeInMilliseconds.CompareTo(other.TimeInMilliseconds);

    public bool Equals(TimeToProcess other) =>
        TimeInMilliseconds.Equals(other.TimeInMilliseconds);

    public override int GetHashCode() => TimeInMilliseconds.GetHashCode();
}
// BusinessPriorityItem implements IHavePriority<T>
public class BusinessPriorityItem : IHavePriority<TimeToProcess> {
    TimeToProcess Priority {get; set;}
}
// Create a new prioritized item.
var item = new BusinessPriorityItem { Priority = new TimeToProcess { TimeInMilliseconds = 0.25M } };

// Initialize an unbounded priority-by-TimeToProcess queue.
var priorityQueue = new ConcurrentPriorityQueue<IHavePriority<TimeToProcess>, TimeToProcess>();

// Enqueue item and handle result.
Result result = priorityQueue.Enqueue(item);

ConcurrentPriorityQueue<T> can be bounded to a fixed amount of priorities:

// Create a bounded ConcurrentPriorityQueue to support a fixed amount of priorities.
var maxAmountOfPriorities = 2;
var priorityQueue = new ConcurrentPriorityByIntegerQueue<IHavePriority<int>>(maxAmountOfPriorities);

Result result = PriorityQueue.Enqueue(new SomeClass {Priority = 0}); // result.OK
Result result = PriorityQueue.Enqueue(new SomeClass {Priority = 1}); // result.OK
Result result = PriorityQueue.Enqueue(new SomeClass {Priority = 2}); // result.Fail -> Queue supports [0, 1]
Result result = PriorityQueue.Enqueue(new SomeClass {Priority = 0}); // result.OK

ConcurrentPriorityQueue<T> can be extended to a BlockingCollection<T> using the ToBlockingCollection<T> extension method:

var blockingPriorityQueue = new ConcurrentPriorityByIntegerQueue<IHavePriority<int>>()
                                .ToBlockingCollection();

foreach(var item in blockingPriorityQueue.GetConsumingEnumerable()) {
    // Do something...
    // Blocks until signaled on completion.
}

Additional information and resources:

Releases

No releases published

Packages

No packages published

Contributors 4

  •  
  •  
  •  
  •  

Languages