From 6587369b1e0a66dc2d3c1dedb17e8a56269d8339 Mon Sep 17 00:00:00 2001 From: mjpt777 Date: Wed, 11 Nov 2020 14:22:19 +0000 Subject: [PATCH] [Java] Add RingBuffer.controlledRead methods. Issue #227. --- .../concurrent/ControlledMessageHandler.java | 65 ++++++ .../ringbuffer/ManyToOneRingBuffer.java | 189 +++++++++++----- .../ringbuffer/OneToOneRingBuffer.java | 192 +++++++++++----- .../concurrent/ringbuffer/RingBuffer.java | 206 ++++++++++-------- .../ringbuffer/ManyToOneRingBufferTest.java | 125 ++++++++++- .../ringbuffer/OneToOneRingBufferTest.java | 133 ++++++++++- 6 files changed, 690 insertions(+), 220 deletions(-) create mode 100644 agrona/src/main/java/org/agrona/concurrent/ControlledMessageHandler.java diff --git a/agrona/src/main/java/org/agrona/concurrent/ControlledMessageHandler.java b/agrona/src/main/java/org/agrona/concurrent/ControlledMessageHandler.java new file mode 100644 index 000000000..a02420fe2 --- /dev/null +++ b/agrona/src/main/java/org/agrona/concurrent/ControlledMessageHandler.java @@ -0,0 +1,65 @@ +/* + * Copyright 2014-2020 Real Logic Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.agrona.concurrent; + +import org.agrona.MutableDirectBuffer; + +/** + * Callback interface for processing of messages that are read from a buffer. + */ +@FunctionalInterface +public interface ControlledMessageHandler +{ + /** + * Action to be taken on return from {@link #onMessage(int, MutableDirectBuffer, int, int)}. + */ + enum Action + { + /** + * Abort the current read operation and do not advance the position for this message. + */ + ABORT, + + /** + * Break from the current read operation and commit the position as of the end of the current message + * being handled. + */ + BREAK, + + /** + * Continue processing but commit the position as of the end of the current message so that + * flow control is applied to this point. + */ + COMMIT, + + /** + * Continue processing until limit or no messages with position commit at end of read as the in + * {@link #onMessage(int, MutableDirectBuffer, int, int)}. + */ + CONTINUE, + } + + /** + * Called for the processing of each message read from a buffer in turn. + * + * @param msgTypeId type of the encoded message. + * @param buffer containing the encoded message. + * @param index at which the encoded message begins. + * @param length in bytes of the encoded message. + * @return {@link Action} to be taken to control how the read progresses. + */ + Action onMessage(int msgTypeId, MutableDirectBuffer buffer, int index, int length); +} diff --git a/agrona/src/main/java/org/agrona/concurrent/ringbuffer/ManyToOneRingBuffer.java b/agrona/src/main/java/org/agrona/concurrent/ringbuffer/ManyToOneRingBuffer.java index 51a96c307..79cb2debd 100644 --- a/agrona/src/main/java/org/agrona/concurrent/ringbuffer/ManyToOneRingBuffer.java +++ b/agrona/src/main/java/org/agrona/concurrent/ringbuffer/ManyToOneRingBuffer.java @@ -17,10 +17,10 @@ import org.agrona.DirectBuffer; import org.agrona.UnsafeAccess; -import org.agrona.concurrent.AtomicBuffer; -import org.agrona.concurrent.MessageHandler; +import org.agrona.concurrent.*; import static org.agrona.BitUtil.align; +import static org.agrona.concurrent.ControlledMessageHandler.Action.*; import static org.agrona.concurrent.ringbuffer.RecordDescriptor.*; import static org.agrona.concurrent.ringbuffer.RingBufferDescriptor.*; @@ -98,6 +98,55 @@ public boolean write(final int msgTypeId, final DirectBuffer srcBuffer, final in return true; } + /** + * {@inheritDoc} + */ + public int tryClaim(final int msgTypeId, final int length) + { + checkTypeId(msgTypeId); + checkMsgLength(length); + + final AtomicBuffer buffer = this.buffer; + final int recordLength = length + HEADER_LENGTH; + final int recordIndex = claimCapacity(buffer, recordLength); + + if (INSUFFICIENT_CAPACITY == recordIndex) + { + return recordIndex; + } + + buffer.putIntOrdered(lengthOffset(recordIndex), -recordLength); + UnsafeAccess.UNSAFE.storeFence(); + buffer.putInt(typeOffset(recordIndex), msgTypeId); + + return encodedMsgOffset(recordIndex); + } + + /** + * {@inheritDoc} + */ + public void commit(final int index) + { + final int recordIndex = computeRecordIndex(index); + final AtomicBuffer buffer = this.buffer; + final int recordLength = verifyClaimedSpaceNotReleased(buffer, recordIndex); + + buffer.putIntOrdered(lengthOffset(recordIndex), -recordLength); + } + + /** + * {@inheritDoc} + */ + public void abort(final int index) + { + final int recordIndex = computeRecordIndex(index); + final AtomicBuffer buffer = this.buffer; + final int recordLength = verifyClaimedSpaceNotReleased(buffer, recordIndex); + + buffer.putInt(typeOffset(recordIndex), PADDING_MSG_TYPE_ID); + buffer.putIntOrdered(lengthOffset(recordIndex), -recordLength); + } + /** * {@inheritDoc} */ @@ -141,13 +190,94 @@ public int read(final MessageHandler handler, final int messageCountLimit) continue; } - ++messagesRead; handler.onMessage(messageTypeId, buffer, recordIndex + HEADER_LENGTH, recordLength - HEADER_LENGTH); + ++messagesRead; + } + } + finally + { + if (bytesRead > 0) + { + buffer.setMemory(headIndex, bytesRead, (byte)0); + buffer.putLongOrdered(headPositionIndex, head + bytesRead); + } + } + + return messagesRead; + } + + /** + * {@inheritDoc} + */ + public int controlledRead(final ControlledMessageHandler handler) + { + return controlledRead(handler, Integer.MAX_VALUE); + } + + /** + * {@inheritDoc} + */ + public int controlledRead(final ControlledMessageHandler handler, final int messageCountLimit) + { + int messagesRead = 0; + + final AtomicBuffer buffer = this.buffer; + final int headPositionIndex = this.headPositionIndex; + long head = buffer.getLong(headPositionIndex); + + final int capacity = this.capacity; + int headIndex = (int)head & (capacity - 1); + final int maxBlockLength = capacity - headIndex; + int bytesRead = 0; + + try + { + while ((bytesRead < maxBlockLength) && (messagesRead < messageCountLimit)) + { + final int recordIndex = headIndex + bytesRead; + final int recordLength = buffer.getIntVolatile(lengthOffset(recordIndex)); + if (recordLength <= 0) + { + break; + } + + final int alignedLength = align(recordLength, ALIGNMENT); + bytesRead += alignedLength; + + final int messageTypeId = buffer.getInt(typeOffset(recordIndex)); + if (PADDING_MSG_TYPE_ID == messageTypeId) + { + continue; + } + + final ControlledMessageHandler.Action action = handler.onMessage( + messageTypeId, buffer, recordIndex + HEADER_LENGTH, recordLength - HEADER_LENGTH); + + if (ABORT == action) + { + bytesRead -= alignedLength; + break; + } + + ++messagesRead; + + if (BREAK == action) + { + break; + } + if (COMMIT == action) + { + buffer.setMemory(headIndex, bytesRead, (byte)0); + buffer.putLongOrdered(headPositionIndex, head + bytesRead); + headIndex += bytesRead; + head += bytesRead; + bytesRead = 0; + } } } finally { - if (bytesRead != 0) + if (bytesRead > 0) { buffer.setMemory(headIndex, bytesRead, (byte)0); buffer.putLongOrdered(headPositionIndex, head + bytesRead); @@ -302,55 +432,6 @@ else if (0 == length) return unblocked; } - /** - * {@inheritDoc} - */ - public int tryClaim(final int msgTypeId, final int length) - { - checkTypeId(msgTypeId); - checkMsgLength(length); - - final AtomicBuffer buffer = this.buffer; - final int recordLength = length + HEADER_LENGTH; - final int recordIndex = claimCapacity(buffer, recordLength); - - if (INSUFFICIENT_CAPACITY == recordIndex) - { - return recordIndex; - } - - buffer.putIntOrdered(lengthOffset(recordIndex), -recordLength); - UnsafeAccess.UNSAFE.storeFence(); - buffer.putInt(typeOffset(recordIndex), msgTypeId); - - return encodedMsgOffset(recordIndex); - } - - /** - * {@inheritDoc} - */ - public void commit(final int index) - { - final int recordIndex = computeRecordIndex(index); - final AtomicBuffer buffer = this.buffer; - final int recordLength = verifyClaimedSpaceNotReleased(buffer, recordIndex); - - buffer.putIntOrdered(lengthOffset(recordIndex), -recordLength); - } - - /** - * {@inheritDoc} - */ - public void abort(final int index) - { - final int recordIndex = computeRecordIndex(index); - final AtomicBuffer buffer = this.buffer; - final int recordLength = verifyClaimedSpaceNotReleased(buffer, recordIndex); - - buffer.putInt(typeOffset(recordIndex), PADDING_MSG_TYPE_ID); - buffer.putIntOrdered(lengthOffset(recordIndex), -recordLength); - } - private static boolean scanBackToConfirmStillZeroed(final AtomicBuffer buffer, final int from, final int limit) { int i = from - ALIGNMENT; @@ -378,7 +459,7 @@ private void checkMsgLength(final int length) else if (length > maxMsgLength) { throw new IllegalArgumentException( - "encoded message exceeds maxMsgLength of " + maxMsgLength + ", length=" + length); + "encoded message exceeds maxMsgLength=" + maxMsgLength + ", length=" + length); } } diff --git a/agrona/src/main/java/org/agrona/concurrent/ringbuffer/OneToOneRingBuffer.java b/agrona/src/main/java/org/agrona/concurrent/ringbuffer/OneToOneRingBuffer.java index d6a538c78..f9476315a 100644 --- a/agrona/src/main/java/org/agrona/concurrent/ringbuffer/OneToOneRingBuffer.java +++ b/agrona/src/main/java/org/agrona/concurrent/ringbuffer/OneToOneRingBuffer.java @@ -16,10 +16,10 @@ package org.agrona.concurrent.ringbuffer; import org.agrona.DirectBuffer; -import org.agrona.concurrent.AtomicBuffer; -import org.agrona.concurrent.MessageHandler; +import org.agrona.concurrent.*; import static org.agrona.BitUtil.align; +import static org.agrona.concurrent.ControlledMessageHandler.Action.*; import static org.agrona.concurrent.ringbuffer.RecordDescriptor.*; import static org.agrona.concurrent.ringbuffer.RingBufferDescriptor.*; @@ -97,6 +97,57 @@ public boolean write(final int msgTypeId, final DirectBuffer srcBuffer, final in return true; } + /** + * {@inheritDoc} + */ + public int tryClaim(final int msgTypeId, final int length) + { + checkTypeId(msgTypeId); + checkMsgLength(length); + + final AtomicBuffer buffer = this.buffer; + final int recordLength = length + HEADER_LENGTH; + final int recordIndex = claimCapacity(buffer, recordLength); + + if (INSUFFICIENT_CAPACITY == recordIndex) + { + return recordIndex; + } + + buffer.putInt(typeOffset(recordIndex), msgTypeId); + // Note: putInt is used to write negative length of the message since we are not yet publishing the message and + // hence the order of writes of type field and negative length does not matter. + // It is safe to do so, because the header was pre-zeroed during the capacity claim. + buffer.putInt(lengthOffset(recordIndex), -recordLength); + + return encodedMsgOffset(recordIndex); + } + + /** + * {@inheritDoc} + */ + public void commit(final int index) + { + final int recordIndex = computeRecordIndex(index); + final AtomicBuffer buffer = this.buffer; + final int recordLength = verifyClaimedSpaceNotReleased(buffer, recordIndex); + + buffer.putIntOrdered(lengthOffset(recordIndex), -recordLength); + } + + /** + * {@inheritDoc} + */ + public void abort(final int index) + { + final int recordIndex = computeRecordIndex(index); + final AtomicBuffer buffer = this.buffer; + final int recordLength = verifyClaimedSpaceNotReleased(buffer, recordIndex); + + buffer.putInt(typeOffset(recordIndex), PADDING_MSG_TYPE_ID); + buffer.putIntOrdered(lengthOffset(recordIndex), -recordLength); + } + /** * {@inheritDoc} */ @@ -141,13 +192,93 @@ public int read(final MessageHandler handler, final int messageCountLimit) continue; } - ++messagesRead; handler.onMessage(messageTypeId, buffer, recordIndex + HEADER_LENGTH, recordLength - HEADER_LENGTH); + ++messagesRead; + } + } + finally + { + if (bytesRead > 0) + { + buffer.putLongOrdered(headPositionIndex, head + bytesRead); + } + } + + return messagesRead; + } + + /** + * {@inheritDoc} + */ + public int controlledRead(final ControlledMessageHandler handler) + { + return controlledRead(handler, Integer.MAX_VALUE); + } + + /** + * {@inheritDoc} + */ + public int controlledRead(final ControlledMessageHandler handler, final int messageCountLimit) + { + int messagesRead = 0; + + final AtomicBuffer buffer = this.buffer; + final int headPositionIndex = this.headPositionIndex; + long head = buffer.getLong(headPositionIndex); + + int bytesRead = 0; + + final int capacity = this.capacity; + int headIndex = (int)head & (capacity - 1); + final int contiguousBlockLength = capacity - headIndex; + + try + { + while ((bytesRead < contiguousBlockLength) && (messagesRead < messageCountLimit)) + { + final int recordIndex = headIndex + bytesRead; + final int recordLength = buffer.getIntVolatile(lengthOffset(recordIndex)); + if (recordLength <= 0) + { + break; + } + + final int alignedLength = align(recordLength, ALIGNMENT); + bytesRead += alignedLength; + + final int messageTypeId = buffer.getInt(typeOffset(recordIndex)); + if (PADDING_MSG_TYPE_ID == messageTypeId) + { + continue; + } + + final ControlledMessageHandler.Action action = handler.onMessage( + messageTypeId, buffer, recordIndex + HEADER_LENGTH, recordLength - HEADER_LENGTH); + + if (ABORT == action) + { + bytesRead -= alignedLength; + break; + } + + ++messagesRead; + + if (BREAK == action) + { + break; + } + if (COMMIT == action) + { + buffer.putLongOrdered(headPositionIndex, head + bytesRead); + headIndex += bytesRead; + head += bytesRead; + bytesRead = 0; + } } } finally { - if (bytesRead != 0) + if (bytesRead > 0) { buffer.putLongOrdered(headPositionIndex, head + bytesRead); } @@ -253,57 +384,6 @@ public boolean unblock() return false; } - /** - * {@inheritDoc} - */ - public int tryClaim(final int msgTypeId, final int length) - { - checkTypeId(msgTypeId); - checkMsgLength(length); - - final AtomicBuffer buffer = this.buffer; - final int recordLength = length + HEADER_LENGTH; - final int recordIndex = claimCapacity(buffer, recordLength); - - if (INSUFFICIENT_CAPACITY == recordIndex) - { - return recordIndex; - } - - buffer.putInt(typeOffset(recordIndex), msgTypeId); - // Note: putInt is used to write negative length of the message since we are not yet publishing the message and - // hence the order of writes of type field and negative length does not matter. - // It is safe to do so, because the header was pre-zeroed during the capacity claim. - buffer.putInt(lengthOffset(recordIndex), -recordLength); - - return encodedMsgOffset(recordIndex); - } - - /** - * {@inheritDoc} - */ - public void commit(final int index) - { - final int recordIndex = computeRecordIndex(index); - final AtomicBuffer buffer = this.buffer; - final int recordLength = verifyClaimedSpaceNotReleased(buffer, recordIndex); - - buffer.putIntOrdered(lengthOffset(recordIndex), -recordLength); - } - - /** - * {@inheritDoc} - */ - public void abort(final int index) - { - final int recordIndex = computeRecordIndex(index); - final AtomicBuffer buffer = this.buffer; - final int recordLength = verifyClaimedSpaceNotReleased(buffer, recordIndex); - - buffer.putInt(typeOffset(recordIndex), PADDING_MSG_TYPE_ID); - buffer.putIntOrdered(lengthOffset(recordIndex), -recordLength); - } - private void checkMsgLength(final int length) { if (length < 0) @@ -313,7 +393,7 @@ private void checkMsgLength(final int length) else if (length > maxMsgLength) { throw new IllegalArgumentException( - "encoded message exceeds maxMsgLength of " + maxMsgLength + ", length=" + length); + "encoded message exceeds maxMsgLength=" + maxMsgLength + ", length=" + length); } } diff --git a/agrona/src/main/java/org/agrona/concurrent/ringbuffer/RingBuffer.java b/agrona/src/main/java/org/agrona/concurrent/ringbuffer/RingBuffer.java index 000bbc1b6..28b929194 100644 --- a/agrona/src/main/java/org/agrona/concurrent/ringbuffer/RingBuffer.java +++ b/agrona/src/main/java/org/agrona/concurrent/ringbuffer/RingBuffer.java @@ -16,8 +16,7 @@ package org.agrona.concurrent.ringbuffer; import org.agrona.DirectBuffer; -import org.agrona.concurrent.AtomicBuffer; -import org.agrona.concurrent.MessageHandler; +import org.agrona.concurrent.*; /** * Ring-buffer for the concurrent exchanging of binary encoded messages from producer(s) to consumer(s) @@ -54,6 +53,95 @@ public interface RingBuffer */ boolean write(int msgTypeId, DirectBuffer srcBuffer, int offset, int length); + /** + * Try to claim a space in the underlying ring-buffer into which a message can be written with zero copy semantics. + * Once the message has been written then {@link #commit(int)} should be called thus making it available to be + * consumed. Alternatively a claim can be aborted using {@link #abort(int)} method. + *

+ * Claiming a space in the ring-buffer means that the consumer will not be able to consume past the claim until + * the claimed space is either committed or aborted. Producers will be able to write message even when outstanding + * claims exist. + *

+ * An example of using {@code tryClaim}: + *

+     * {@code
+     *     final RingBuffer ringBuffer = ...;
+     *
+     *     final int index = ringBuffer.tryClaim(msgTypeId, messageLength);
+     *     if (index > 0)
+     *     {
+     *         try
+     *         {
+     *             final AtomicBuffer buffer = ringBuffer.buffer();
+     *             // Work with the buffer directly using the index
+     *             ...
+     *         }
+     *         finally
+     *         {
+     *             ringBuffer.commit(index); // commit message
+     *         }
+     *     }
+     * }
+     * 
+ *

+ * Ensure that claimed space is released even in case of an exception: + *

+     * {@code
+     *     final RingBuffer ringBuffer = ...;
+     *
+     *     final int index = ringBuffer.tryClaim(msgTypeId, messageLength);
+     *     if (index > 0)
+     *     {
+     *         try
+     *         {
+     *             final AtomicBuffer buffer = ringBuffer.buffer();
+     *             // Work with the buffer directly using the index
+     *             ...
+     *             ringBuffer.commit(index); // commit message
+     *         }
+     *         catch (final Throwable t)
+     *         {
+     *             ringBuffer.abort(index); // allow consumer to proceed
+     *             ...
+     *         }
+     *     }
+     * }
+     * 
+ * + * @param msgTypeId type of the message encoding. Will be written into the header upon successful claim. + * @param length of the claim in bytes. A claim length cannot be greater than {@link #maxMsgLength()}. + * @return a non-zero index into the underlying ring-buffer at which encoded message begins, otherwise returns + * {@link #INSUFFICIENT_CAPACITY} indicating that there is not enough free space in the buffer. + * @throws IllegalArgumentException if the {@code msgTypeId} is less than {@code 1}. + * @throws IllegalArgumentException if the {@code length} is negative or is greater than {@link #maxMsgLength()}. + * @see #commit(int) + * @see #abort(int) + */ + int tryClaim(int msgTypeId, int length); + + /** + * Commit message that was written in the previously claimed space thus making it available to the consumer. + * + * @param index at which the encoded message begins, i.e. value returned from the {@link #tryClaim(int, int)} call. + * @throws IllegalArgumentException if the {@code index} is out of bounds. + * @throws IllegalStateException if this method is called after {@link #abort(int)} or was already invoked for + * the given {@code index}. + * @see #tryClaim(int, int) + */ + void commit(int index); + + /** + * Abort claim and allow consumer to proceed after the claimed length. Aborting turns unused space into padding, + * i.e. changes type of the message to {@link #PADDING_MSG_TYPE_ID}. + * + * @param index at which the encoded message begins, i.e. value returned from the {@link #tryClaim(int, int)} call. + * @throws IllegalArgumentException if the {@code index} is out of bounds. + * @throws IllegalStateException if this method is called after {@link #commit(int)} or was already invoked for + * the given {@code index}. + * @see #tryClaim(int, int) + */ + void abort(int index); + /** * Read as many messages as are available to the end of the ring buffer. *

@@ -79,6 +167,31 @@ public interface RingBuffer */ int read(MessageHandler handler, int messageCountLimit); + /** + * Read as many messages as are available to the end of the ring buffer with the handler able to control progress. + *

+ * If the ring buffer wraps or encounters a type of record, such a a padding record, then an implementation + * may choose to return and expect the caller to try again. The {@link #size()} method may be called to + * determine of a backlog of message bytes remains in the ring buffer. + * + * @param handler to be called for processing each message in turn which will return how to progress. + * @return the number of messages that have been processed. + */ + int controlledRead(ControlledMessageHandler handler); + + /** + * Read messages up to a limit of available to the end of the ring buffer with the handler able to control progress. + *

+ * If the ring buffer wraps or encounters a type of record, such a a padding record, then an implementation + * may choose to return and expect the caller to try again. The {@link #size()} method may be called to + * determine of a backlog of message bytes remains in the ring buffer. + * + * @param handler to be called for processing each message in turn which will return how to progress. + * @param messageCountLimit the number of messages will be read in a single invocation. + * @return the number of messages that have been processed. + */ + int controlledRead(ControlledMessageHandler handler, int messageCountLimit); + /** * The maximum message length in bytes supported by the underlying ring buffer. * @@ -153,93 +266,4 @@ public interface RingBuffer * @return true of an unblocking action was taken otherwise false. */ boolean unblock(); - - /** - * Try to claim a space in the underlying ring-buffer into which a message can be written with zero copy semantics. - * Once the message has been written then {@link #commit(int)} should be called thus making it available to be - * consumed. Alternatively a claim can be aborted using {@link #abort(int)} method. - *

- * Claiming a space in the ring-buffer means that the consumer will not be able to consume past the claim until - * the claimed space is either committed or aborted. Producers will be able to write message even when outstanding - * claims exist. - *

- * An example of using {@code tryClaim}: - *

-     * {@code
-     *     final RingBuffer ringBuffer = ...;
-     *
-     *     final int index = ringBuffer.tryClaim(msgTypeId, messageLength);
-     *     if (index > 0)
-     *     {
-     *         try
-     *         {
-     *             final AtomicBuffer buffer = ringBuffer.buffer();
-     *             // Work with the buffer directly using the index
-     *             ...
-     *         }
-     *         finally
-     *         {
-     *             ringBuffer.commit(index); // commit message
-     *         }
-     *     }
-     * }
-     * 
- *

- * Ensure that claimed space is released even in case of an exception: - *

-     * {@code
-     *     final RingBuffer ringBuffer = ...;
-     *
-     *     final int index = ringBuffer.tryClaim(msgTypeId, messageLength);
-     *     if (index > 0)
-     *     {
-     *         try
-     *         {
-     *             final AtomicBuffer buffer = ringBuffer.buffer();
-     *             // Work with the buffer directly using the index
-     *             ...
-     *             ringBuffer.commit(index); // commit message
-     *         }
-     *         catch (final Throwable t)
-     *         {
-     *             ringBuffer.abort(index); // allow consumer to proceed
-     *             ...
-     *         }
-     *     }
-     * }
-     * 
- * - * @param msgTypeId type of the message encoding. Will be written into the header upon successful claim. - * @param length of the claim in bytes. A claim length cannot be greater than {@link #maxMsgLength()}. - * @return a non-zero index into the underlying ring-buffer at which encoded message begins, otherwise returns - * {@link #INSUFFICIENT_CAPACITY} indicating that there is not enough free space in the buffer. - * @throws IllegalArgumentException if the {@code msgTypeId} is less than {@code 1}. - * @throws IllegalArgumentException if the {@code length} is negative or is greater than {@link #maxMsgLength()}. - * @see #commit(int) - * @see #abort(int) - */ - int tryClaim(int msgTypeId, int length); - - /** - * Commit message that was written in the previously claimed space thus making it available to the consumer. - * - * @param index at which the encoded message begins, i.e. value returned from the {@link #tryClaim(int, int)} call. - * @throws IllegalArgumentException if the {@code index} is out of bounds. - * @throws IllegalStateException if this method is called after {@link #abort(int)} or was already invoked for - * the given {@code index}. - * @see #tryClaim(int, int) - */ - void commit(int index); - - /** - * Abort claim and allow consumer to proceed after the claimed length. Aborting turns unused space into padding, - * i.e. changes type of the message to {@link #PADDING_MSG_TYPE_ID}. - * - * @param index at which the encoded message begins, i.e. value returned from the {@link #tryClaim(int, int)} call. - * @throws IllegalArgumentException if the {@code index} is out of bounds. - * @throws IllegalStateException if this method is called after {@link #commit(int)} or was already invoked for - * the given {@code index}. - * @see #tryClaim(int, int) - */ - void abort(int index); } diff --git a/agrona/src/test/java/org/agrona/concurrent/ringbuffer/ManyToOneRingBufferTest.java b/agrona/src/test/java/org/agrona/concurrent/ringbuffer/ManyToOneRingBufferTest.java index 2a8a7e107..8f81eb1e7 100644 --- a/agrona/src/test/java/org/agrona/concurrent/ringbuffer/ManyToOneRingBufferTest.java +++ b/agrona/src/test/java/org/agrona/concurrent/ringbuffer/ManyToOneRingBufferTest.java @@ -15,8 +15,9 @@ */ package org.agrona.concurrent.ringbuffer; -import org.agrona.concurrent.MessageHandler; -import org.agrona.concurrent.UnsafeBuffer; +import org.agrona.ExpandableArrayBuffer; +import org.agrona.collections.MutableInteger; +import org.agrona.concurrent.*; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -621,7 +622,7 @@ public void commitPublishesMessageByInvertingTheLengthValue() @ParameterizedTest @ValueSource(ints = { -1, 0, 7, CAPACITY + 1 }) - public void abortThrowsIllegalArgumentExceptionIfOffsetIsInvalid(final int index) + public void abortThrowsIllegalArgumentExceptionIfIndexIsInvalid(final int index) { assertThrows(IllegalArgumentException.class, () -> ringBuffer.abort(index)); } @@ -655,14 +656,124 @@ public void abortMarksUnusedSpaceAsPadding() inOrder.verifyNoMoreInteractions(); } + @Test + public void shouldContinueOnControlledRead() + { + final String msg = "Hello World"; + final ExpandableArrayBuffer srcBuffer = new ExpandableArrayBuffer(); + final int srcLength = srcBuffer.putStringAscii(0, msg); + final RingBuffer ringBuffer = new ManyToOneRingBuffer(new UnsafeBuffer(new byte[1024])); + + assertTrue(ringBuffer.write(MSG_TYPE_ID, srcBuffer, 0, srcLength)); + + final ControlledMessageHandler controlledMessageHandler = + (msgTypeId, buffer, index, length) -> + { + assertEquals(MSG_TYPE_ID, msgTypeId); + assertEquals(HEADER_LENGTH, index); + assertEquals(srcLength, length); + assertEquals(msg, buffer.getStringAscii(index)); + + return ControlledMessageHandler.Action.CONTINUE; + }; + + final int messagesRead = ringBuffer.controlledRead(controlledMessageHandler, 1); + assertEquals(1, messagesRead); + assertEquals(ringBuffer.producerPosition(), ringBuffer.consumerPosition()); + } + + @Test + public void shouldAbortOnControlledRead() + { + final String msg = "Hello World"; + final ExpandableArrayBuffer srcBuffer = new ExpandableArrayBuffer(); + final int srcLength = srcBuffer.putStringAscii(0, msg); + final RingBuffer ringBuffer = new ManyToOneRingBuffer(new UnsafeBuffer(new byte[1024])); + + assertTrue(ringBuffer.write(MSG_TYPE_ID, srcBuffer, 0, srcLength)); + + final ControlledMessageHandler controlledMessageHandler = + (msgTypeId, buffer, index, length) -> + { + assertEquals(MSG_TYPE_ID, msgTypeId); + assertEquals(HEADER_LENGTH, index); + assertEquals(srcLength, length); + assertEquals(msg, buffer.getStringAscii(index)); + + return ControlledMessageHandler.Action.ABORT; + }; + + final int messagesRead = ringBuffer.controlledRead(controlledMessageHandler, 1); + assertEquals(0, messagesRead); + assertEquals(0, ringBuffer.consumerPosition()); + } + + @Test + public void shouldAbortOnControlledReadOfSecondMessage() + { + final String msg = "Hello World"; + final ExpandableArrayBuffer srcBuffer = new ExpandableArrayBuffer(); + final int srcLength = srcBuffer.putStringAscii(0, msg); + final RingBuffer ringBuffer = new ManyToOneRingBuffer(new UnsafeBuffer(new byte[1024])); + final MutableInteger counter = new MutableInteger(); + + assertTrue(ringBuffer.write(MSG_TYPE_ID, srcBuffer, 0, srcLength)); + assertTrue(ringBuffer.write(MSG_TYPE_ID, srcBuffer, 0, srcLength)); + + final ControlledMessageHandler controlledMessageHandler = + (msgTypeId, buffer, index, length) -> + { + return counter.getAndIncrement() == 0 ? + ControlledMessageHandler.Action.CONTINUE : ControlledMessageHandler.Action.ABORT; + }; + + final int messagesRead = ringBuffer.controlledRead(controlledMessageHandler); + assertEquals(2, counter.get()); + assertEquals(1, messagesRead); + assertNotEquals(ringBuffer.producerPosition(), ringBuffer.consumerPosition()); + } + + @Test + public void shouldCommitOnEachMessage() + { + final String msg = "Hello World"; + final ExpandableArrayBuffer srcBuffer = new ExpandableArrayBuffer(); + final int srcLength = srcBuffer.putStringAscii(0, msg); + final RingBuffer ringBuffer = new ManyToOneRingBuffer(new UnsafeBuffer(new byte[1024])); + final MutableInteger counter = new MutableInteger(); + + assertTrue(ringBuffer.write(MSG_TYPE_ID, srcBuffer, 0, srcLength)); + assertTrue(ringBuffer.write(MSG_TYPE_ID, srcBuffer, 0, srcLength)); + + final ControlledMessageHandler controlledMessageHandler = + (msgTypeId, buffer, index, length) -> + { + if (0 == counter.getAndIncrement()) + { + assertEquals(0L, ringBuffer.consumerPosition()); + } + else + { + assertEquals(ringBuffer.producerPosition() / 2, ringBuffer.consumerPosition()); + } + + return ControlledMessageHandler.Action.COMMIT; + }; + + final int messagesRead = ringBuffer.controlledRead(controlledMessageHandler); + assertEquals(2, counter.get()); + assertEquals(2, messagesRead); + assertEquals(ringBuffer.producerPosition(), ringBuffer.consumerPosition()); + } + private void testAlreadyCommitted(final IntConsumer action) { final int index = HEADER_LENGTH; final int recordIndex = index - HEADER_LENGTH; when(buffer.getInt(lengthOffset(recordIndex))).thenReturn(0); - final IllegalStateException exception = assertThrows(IllegalStateException.class, - () -> action.accept(index)); + final IllegalStateException exception = assertThrows( + IllegalStateException.class, () -> action.accept(index)); assertEquals("claimed space previously committed", exception.getMessage()); } @@ -673,8 +784,8 @@ private void testAlreadyAborted(final IntConsumer action) when(buffer.getInt(lengthOffset(recordIndex))).thenReturn(10); when(buffer.getInt(typeOffset(recordIndex))).thenReturn(PADDING_MSG_TYPE_ID); - final IllegalStateException exception = assertThrows(IllegalStateException.class, - () -> action.accept(index)); + final IllegalStateException exception = assertThrows( + IllegalStateException.class, () -> action.accept(index)); assertEquals("claimed space previously aborted", exception.getMessage()); } } diff --git a/agrona/src/test/java/org/agrona/concurrent/ringbuffer/OneToOneRingBufferTest.java b/agrona/src/test/java/org/agrona/concurrent/ringbuffer/OneToOneRingBufferTest.java index 5b3baacb4..aef278057 100644 --- a/agrona/src/test/java/org/agrona/concurrent/ringbuffer/OneToOneRingBufferTest.java +++ b/agrona/src/test/java/org/agrona/concurrent/ringbuffer/OneToOneRingBufferTest.java @@ -15,8 +15,9 @@ */ package org.agrona.concurrent.ringbuffer; -import org.agrona.concurrent.MessageHandler; -import org.agrona.concurrent.UnsafeBuffer; +import org.agrona.ExpandableArrayBuffer; +import org.agrona.collections.MutableInteger; +import org.agrona.concurrent.*; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -403,12 +404,11 @@ public void tryClaimReturnsOffsetAtWhichMessageBodyCanBeWritten() @Test public void tryClaimReturnsInsufficientCapacityIfThereIsNotEnoughSpaceInTheBuffer() { - final int msgTypeId = MSG_TYPE_ID; final int length = 10; when(buffer.getLong(HEAD_COUNTER_CACHE_INDEX)).thenReturn(10L); when(buffer.getLong(TAIL_COUNTER_INDEX)).thenReturn((long)CAPACITY); - final int index = ringBuffer.tryClaim(msgTypeId, length); + final int index = ringBuffer.tryClaim(MSG_TYPE_ID, length); assertEquals(INSUFFICIENT_CAPACITY, index); @@ -422,13 +422,12 @@ public void tryClaimReturnsInsufficientCapacityIfThereIsNotEnoughSpaceInTheBuffe @Test public void tryClaimReturnsInsufficientCapacityIfThereIsNotEnoughSpaceInTheBufferAfterWrap() { - final int msgTypeId = MSG_TYPE_ID; final int length = 100; when(buffer.getLong(HEAD_COUNTER_CACHE_INDEX)).thenReturn(22L); when(buffer.getLong(TAIL_COUNTER_INDEX)).thenReturn(CAPACITY * 2L - 10); when(buffer.getLongVolatile(HEAD_COUNTER_INDEX)).thenReturn(CAPACITY + 111L, 3L); - final int index = ringBuffer.tryClaim(msgTypeId, length); + final int index = ringBuffer.tryClaim(MSG_TYPE_ID, length); assertEquals(INSUFFICIENT_CAPACITY, index); @@ -443,7 +442,7 @@ public void tryClaimReturnsInsufficientCapacityIfThereIsNotEnoughSpaceInTheBuffe @ParameterizedTest @ValueSource(ints = { -1, 0, 7, CAPACITY + 1 }) - public void commitThrowsIllegalArgumentExceptionIfOffsetIsInvalid(final int index) + public void commitThrowsIllegalArgumentExceptionIfIndexIsInvalid(final int index) { assertThrows(IllegalArgumentException.class, () -> ringBuffer.commit(index)); } @@ -477,7 +476,7 @@ public void commitPublishesMessageByInvertingTheLengthValue() @ParameterizedTest @ValueSource(ints = { -1, 0, 7, CAPACITY + 1 }) - public void abortThrowsIllegalArgumentExceptionIfOffsetIsInvalid(final int index) + public void abortThrowsIllegalArgumentExceptionIfIndexIsInvalid(final int index) { assertThrows(IllegalArgumentException.class, () -> ringBuffer.abort(index)); } @@ -510,14 +509,124 @@ public void abortMarksUnusedSpaceAsPadding() inOrder.verifyNoMoreInteractions(); } + @Test + public void shouldContinueOnControlledRead() + { + final String msg = "Hello World"; + final ExpandableArrayBuffer srcBuffer = new ExpandableArrayBuffer(); + final int srcLength = srcBuffer.putStringAscii(0, msg); + final RingBuffer ringBuffer = new OneToOneRingBuffer(new UnsafeBuffer(new byte[1024])); + + assertTrue(ringBuffer.write(MSG_TYPE_ID, srcBuffer, 0, srcLength)); + + final ControlledMessageHandler controlledMessageHandler = + (msgTypeId, buffer, index, length) -> + { + assertEquals(MSG_TYPE_ID, msgTypeId); + assertEquals(HEADER_LENGTH, index); + assertEquals(srcLength, length); + assertEquals(msg, buffer.getStringAscii(index)); + + return ControlledMessageHandler.Action.CONTINUE; + }; + + final int messagesRead = ringBuffer.controlledRead(controlledMessageHandler, 1); + assertEquals(1, messagesRead); + assertEquals(ringBuffer.producerPosition(), ringBuffer.consumerPosition()); + } + + @Test + public void shouldAbortOnControlledRead() + { + final String msg = "Hello World"; + final ExpandableArrayBuffer srcBuffer = new ExpandableArrayBuffer(); + final int srcLength = srcBuffer.putStringAscii(0, msg); + final RingBuffer ringBuffer = new OneToOneRingBuffer(new UnsafeBuffer(new byte[1024])); + + assertTrue(ringBuffer.write(MSG_TYPE_ID, srcBuffer, 0, srcLength)); + + final ControlledMessageHandler controlledMessageHandler = + (msgTypeId, buffer, index, length) -> + { + assertEquals(MSG_TYPE_ID, msgTypeId); + assertEquals(HEADER_LENGTH, index); + assertEquals(srcLength, length); + assertEquals(msg, buffer.getStringAscii(index)); + + return ControlledMessageHandler.Action.ABORT; + }; + + final int messagesRead = ringBuffer.controlledRead(controlledMessageHandler, 1); + assertEquals(0, messagesRead); + assertEquals(0, ringBuffer.consumerPosition()); + } + + @Test + public void shouldAbortOnControlledReadOfSecondMessage() + { + final String msg = "Hello World"; + final ExpandableArrayBuffer srcBuffer = new ExpandableArrayBuffer(); + final int srcLength = srcBuffer.putStringAscii(0, msg); + final RingBuffer ringBuffer = new OneToOneRingBuffer(new UnsafeBuffer(new byte[1024])); + final MutableInteger counter = new MutableInteger(); + + assertTrue(ringBuffer.write(MSG_TYPE_ID, srcBuffer, 0, srcLength)); + assertTrue(ringBuffer.write(MSG_TYPE_ID, srcBuffer, 0, srcLength)); + + final ControlledMessageHandler controlledMessageHandler = + (msgTypeId, buffer, index, length) -> + { + return counter.getAndIncrement() == 0 ? + ControlledMessageHandler.Action.CONTINUE : ControlledMessageHandler.Action.ABORT; + }; + + final int messagesRead = ringBuffer.controlledRead(controlledMessageHandler); + assertEquals(2, counter.get()); + assertEquals(1, messagesRead); + assertNotEquals(ringBuffer.producerPosition(), ringBuffer.consumerPosition()); + } + + @Test + public void shouldCommitOnEachMessage() + { + final String msg = "Hello World"; + final ExpandableArrayBuffer srcBuffer = new ExpandableArrayBuffer(); + final int srcLength = srcBuffer.putStringAscii(0, msg); + final RingBuffer ringBuffer = new OneToOneRingBuffer(new UnsafeBuffer(new byte[1024])); + final MutableInteger counter = new MutableInteger(); + + assertTrue(ringBuffer.write(MSG_TYPE_ID, srcBuffer, 0, srcLength)); + assertTrue(ringBuffer.write(MSG_TYPE_ID, srcBuffer, 0, srcLength)); + + final ControlledMessageHandler controlledMessageHandler = + (msgTypeId, buffer, index, length) -> + { + if (0 == counter.getAndIncrement()) + { + assertEquals(0L, ringBuffer.consumerPosition()); + } + else + { + assertEquals(ringBuffer.producerPosition() / 2, ringBuffer.consumerPosition()); + } + + return ControlledMessageHandler.Action.COMMIT; + }; + + final int messagesRead = ringBuffer.controlledRead(controlledMessageHandler); + assertEquals(2, counter.get()); + assertEquals(2, messagesRead); + assertEquals(ringBuffer.producerPosition(), ringBuffer.consumerPosition()); + } + private void testAlreadyCommitted(final IntConsumer action) { final int index = HEADER_LENGTH; final int recordIndex = index - HEADER_LENGTH; when(buffer.getInt(lengthOffset(recordIndex))).thenReturn(0); - final IllegalStateException exception = assertThrows(IllegalStateException.class, - () -> action.accept(index)); + final IllegalStateException exception = assertThrows( + IllegalStateException.class, () -> action.accept(index)); assertEquals("claimed space previously committed", exception.getMessage()); } @@ -528,8 +637,8 @@ private void testAlreadyAborted(final IntConsumer action) when(buffer.getInt(lengthOffset(recordIndex))).thenReturn(10); when(buffer.getInt(typeOffset(recordIndex))).thenReturn(PADDING_MSG_TYPE_ID); - final IllegalStateException exception = assertThrows(IllegalStateException.class, - () -> action.accept(index)); + final IllegalStateException exception = assertThrows( + IllegalStateException.class, () -> action.accept(index)); assertEquals("claimed space previously aborted", exception.getMessage()); } }