diff --git a/jctools-core/src/main/java/org/jctools/queues/SpscGrowableArrayQueue.java b/jctools-core/src/main/java/org/jctools/queues/SpscGrowableArrayQueue.java index 478c10f7..8be9b580 100644 --- a/jctools-core/src/main/java/org/jctools/queues/SpscGrowableArrayQueue.java +++ b/jctools-core/src/main/java/org/jctools/queues/SpscGrowableArrayQueue.java @@ -114,7 +114,7 @@ final boolean offerColdPath( long currConsumerIndex = lvConsumerIndex(); // use lookAheadStep to store the consumer distance from final buffer this.lookAheadStep = -(index - currConsumerIndex); - producerBufferLimit = currConsumerIndex + maxCapacity - 1; + producerBufferLimit = currConsumerIndex + maxCapacity; } else { diff --git a/jctools-core/src/main/java/org/jctools/queues/atomic/SpscGrowableAtomicArrayQueue.java b/jctools-core/src/main/java/org/jctools/queues/atomic/SpscGrowableAtomicArrayQueue.java index 89a8052e..323d045e 100644 --- a/jctools-core/src/main/java/org/jctools/queues/atomic/SpscGrowableAtomicArrayQueue.java +++ b/jctools-core/src/main/java/org/jctools/queues/atomic/SpscGrowableAtomicArrayQueue.java @@ -106,7 +106,7 @@ final boolean offerColdPath(final AtomicReferenceArray buffer, final long mas long currConsumerIndex = lvConsumerIndex(); // use lookAheadStep to store the consumer distance from final buffer this.lookAheadStep = -(index - currConsumerIndex); - producerBufferLimit = currConsumerIndex + maxCapacity - 1; + producerBufferLimit = currConsumerIndex + maxCapacity; } else { producerBufferLimit = index + producerMask - 1; adjustLookAheadStep(newCapacity); diff --git a/jctools-core/src/test/java/org/jctools/queues/QueueSanityTestSpscGrowable.java b/jctools-core/src/test/java/org/jctools/queues/QueueSanityTestSpscGrowable.java index 053a9b87..2bf62382 100644 --- a/jctools-core/src/test/java/org/jctools/queues/QueueSanityTestSpscGrowable.java +++ b/jctools-core/src/test/java/org/jctools/queues/QueueSanityTestSpscGrowable.java @@ -2,6 +2,8 @@ import org.jctools.queues.spec.ConcurrentQueueSpec; import org.jctools.queues.spec.Ordering; +import org.junit.Assert; +import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -9,6 +11,8 @@ import java.util.Collection; import java.util.Queue; +import static org.hamcrest.Matchers.is; + @RunWith(Parameterized.class) public class QueueSanityTestSpscGrowable extends QueueSanityTest { @@ -27,4 +31,30 @@ public static Collection parameters() return list; } + @Test + public void testSizeNeverExceedCapacity() + { + final SpscGrowableArrayQueue q = new SpscGrowableArrayQueue<>(8, 16); + final Integer v = 0; + final int capacity = q.capacity(); + for (int i = 0; i < capacity; i++) + { + Assert.assertTrue(q.offer(v)); + } + Assert.assertFalse(q.offer(v)); + Assert.assertThat(q.size(), is(capacity)); + for (int i = 0; i < 6; i++) + { + Assert.assertEquals(v, q.poll()); + } + //the consumer is left in the chunk previous the last and biggest one + Assert.assertThat(q.size(), is(capacity - 6)); + for (int i = 0; i < 6; i++) + { + q.offer(v); + } + Assert.assertThat(q.size(), is(capacity)); + Assert.assertFalse(q.offer(v)); + } + }