-
Notifications
You must be signed in to change notification settings - Fork 0
/
queue.hh
142 lines (135 loc) · 2.34 KB
/
queue.hh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
#ifndef QUEUE_HH
#define QUEUE_HH
#include <mutex>
#include <condition_variable>
namespace amazon {
template<class T>
class Ring {
public:
Ring(unsigned size)
{
this->len = npw2(size);
this->mask = len - 1;
this->begin = 0;
this->end = 0;
this->count = 0;
this->ring = new T[this->len];
}
~Ring()
{
delete[] ring;
}
bool push(T &val)
{
if (count == len)
return false;
ring[end] = val;
end = (end + 1) & mask;
++count;
return true;
}
bool pop(T &val)
{
if (count == 0)
return false;
val = ring[begin];
begin = (begin + 1) & mask;
--count;
return true;
}
unsigned size()
{
return count;
}
private:
unsigned len;
unsigned mask;
unsigned begin;
unsigned end;
unsigned count;
T *ring;
/* next power of 2 from Hackers Delight, by Henry S. Warren */
unsigned npw2(unsigned n)
{
n--;
n |= n >> 1;
n |= n >> 2;
n |= n >> 4;
n |= n >> 8;
n |= n >> 16;
n++;
return n;
}
};
template<class T>
class Queue {
public:
Queue(unsigned len) : insert_waiters(0), remove_waiters(0)
{
ring = new Ring<T>(len);
}
~Queue() { delete ring; }
bool push(T &val)
{
std::unique_lock<std::mutex> lock(mtx);
if (! ring->push(val)) {
++insert_waiters;
do
insert_cond.wait(lock);
while (! ring->push(val));
--insert_waiters;
}
if (remove_waiters > 0)
remove_cond.notify_one();
return true; /* since we wait till we get some */
}
bool try_push(T &val)
{
std::unique_lock<std::mutex> lock(mtx);
if (ring->push(val)) {
if (remove_waiters > 0)
remove_cond.notify_one();
return true;
}
return false;
}
bool try_pop(T &val)
{
std::unique_lock<std::mutex> lock(mtx);
if (ring->pop(val)) {
if (insert_waiters > 0)
insert_cond.notify_one();
return true;
}
return false;
}
bool pop(T &val)
{
std::unique_lock<std::mutex> lock(mtx);
if (! ring->pop(val)) {
++remove_waiters;
do
remove_cond.wait(lock);
while (! ring->pop(val));
--remove_waiters;
}
if (insert_waiters > 0)
insert_cond.notify_one();
return true; /* since we wait till we get some */
}
unsigned size()
{
std::unique_lock<std::mutex> lock(mtx);
return ring->size();
}
private:
unsigned len;
mutable std::mutex mtx;
std::condition_variable insert_cond;
std::condition_variable remove_cond;
unsigned insert_waiters;
unsigned remove_waiters;
Ring<T> *ring;
};
}
#endif