/
SpscGrowableArrayQueue.java
171 lines (159 loc) · 6.97 KB
/
SpscGrowableArrayQueue.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jctools.queues;
import org.jctools.util.Pow2;
import org.jctools.util.RangeUtil;
import static org.jctools.queues.CircularArrayOffsetCalculator.allocate;
import static org.jctools.queues.CircularArrayOffsetCalculator.calcElementOffset;
import static org.jctools.queues.LinkedArrayQueueUtil.length;
import static org.jctools.util.UnsafeRefArrayAccess.lvElement;
/**
* An SPSC array queue which starts at <i>initialCapacity</i> and grows to <i>maxCapacity</i> in linked chunks,
* doubling theirs size every time until the full blown backing array is used.
* The queue grows only when the current chunk is full and elements are not copied on
* resize, instead a link to the new chunk is stored in the old chunk for the consumer to follow.<br>
*
* @param <E>
*/
public class SpscGrowableArrayQueue<E> extends BaseSpscLinkedArrayQueue<E>
{
private final int maxQueueCapacity;
private long lookAheadStep;
public SpscGrowableArrayQueue(final int capacity)
{
this(Math.max(8, Pow2.roundToPowerOfTwo(capacity / 8)), capacity);
}
public SpscGrowableArrayQueue(final int chunkSize, final int capacity)
{
RangeUtil.checkGreaterThanOrEqual(capacity, 16, "capacity");
// minimal chunk size of eight makes sure minimal lookahead step is 2
RangeUtil.checkGreaterThanOrEqual(chunkSize, 8, "chunkSize");
maxQueueCapacity = Pow2.roundToPowerOfTwo(capacity);
int chunkCapacity = Pow2.roundToPowerOfTwo(chunkSize);
RangeUtil.checkLessThan(chunkCapacity, maxQueueCapacity, "chunkCapacity");
long mask = chunkCapacity - 1;
// need extra element to point at next array
E[] buffer = allocate(chunkCapacity + 1);
producerBuffer = buffer;
producerMask = mask;
consumerBuffer = buffer;
consumerMask = mask;
producerBufferLimit = mask - 1; // we know it's all empty to start with
adjustLookAheadStep(chunkCapacity);
}
@Override
final boolean offerColdPath(
final E[] buffer,
final long mask,
final long index,
final long offset,
final E v,
final Supplier<? extends E> s)
{
final long lookAheadStep = this.lookAheadStep;
// normal case, go around the buffer or resize if full (unless we hit max capacity)
if (lookAheadStep > 0)
{
long lookAheadElementOffset = calcElementOffset(index + lookAheadStep, mask);
// Try and look ahead a number of elements so we don't have to do this all the time
if (null == lvElement(buffer, lookAheadElementOffset))
{
producerBufferLimit = index + lookAheadStep - 1; // joy, there's plenty of room
writeToQueue(buffer, v == null ? s.get() : v, index, offset);
return true;
}
// we're at max capacity, can use up last element
final int maxCapacity = maxQueueCapacity;
if (mask + 1 == maxCapacity)
{
if (null == lvElement(buffer, offset))
{
writeToQueue(buffer, v == null ? s.get() : v, index, offset);
return true;
}
// we're full and can't grow
return false;
}
// not at max capacity, so must allow extra slot for next buffer pointer
if (null == lvElement(buffer, calcElementOffset(index + 1, mask)))
{ // buffer is not full
writeToQueue(buffer, v == null ? s.get() : v, index, offset);
}
else
{
// allocate new buffer of same length
final E[] newBuffer = allocate((int) (2 * (mask + 1) + 1));
producerBuffer = newBuffer;
producerMask = length(newBuffer) - 2;
final long offsetInNew = calcElementOffset(index, producerMask);
linkOldToNew(index, buffer, offset, newBuffer, offsetInNew, v == null ? s.get() : v);
int newCapacity = (int) (producerMask + 1);
if (newCapacity == maxCapacity)
{
long currConsumerIndex = lvConsumerIndex();
// use lookAheadStep to store the consumer distance from final buffer
this.lookAheadStep = -(index - currConsumerIndex);
producerBufferLimit = currConsumerIndex + maxCapacity;
}
else
{
producerBufferLimit = index + producerMask - 1;
adjustLookAheadStep(newCapacity);
}
}
return true;
}
// the step is negative (or zero) in the period between allocating the max sized buffer and the
// consumer starting on it
else
{
final long prevElementsInOtherBuffers = -lookAheadStep;
// until the consumer starts using the current buffer we need to check consumer index to
// verify size
long currConsumerIndex = lvConsumerIndex();
int size = (int) (index - currConsumerIndex);
int maxCapacity = (int) mask + 1; // we're on max capacity or we wouldn't be here
if (size == maxCapacity)
{
// consumer index has not changed since adjusting the lookAhead index, we're full
return false;
}
// if consumerIndex progressed enough so that current size indicates it is on same buffer
long firstIndexInCurrentBuffer = producerBufferLimit - maxCapacity + prevElementsInOtherBuffers;
if (currConsumerIndex >= firstIndexInCurrentBuffer)
{
// job done, we've now settled into our final state
adjustLookAheadStep(maxCapacity);
}
// consumer is still on some other buffer
else
{
// how many elements out of buffer?
this.lookAheadStep = (int) (currConsumerIndex - firstIndexInCurrentBuffer);
}
producerBufferLimit = currConsumerIndex + maxCapacity;
writeToQueue(buffer, v == null ? s.get() : v, index, offset);
return true;
}
}
private void adjustLookAheadStep(int capacity)
{
lookAheadStep = Math.min(capacity / 4, SpscArrayQueue.MAX_LOOK_AHEAD_STEP);
}
@Override
public int capacity()
{
return maxQueueCapacity;
}
}