Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Java] Add method to get the latest value from the BroadcastReceiver #282

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,24 @@ public MutableDirectBuffer buffer()
* set for the next message to be consumed. If no transmission is available then false.
*/
public boolean receiveNext()
{
return doReceiveNext(false);
}

/**
* Similar to {@link #receiveNext()}, but will skip all messages in the transmission stream
* and only set for the latest messaged.
*
* @return true if transmission is available with {@link #offset()}, {@link #length()} and {@link #typeId()}
* set for the next latest message to be consumed. If no transmission is available then false.
*/
public boolean receiveLatest()
{
return doReceiveNext(true);
}


private boolean doReceiveNext(final boolean latest)
{
boolean isAvailable = false;
final AtomicBuffer buffer = this.buffer;
Expand All @@ -154,16 +172,23 @@ public boolean receiveNext()
if (tail > cursor)
{
final int capacity = this.capacity;
int recordOffset = (int)cursor & (capacity - 1);

if (!validate(cursor, buffer, capacity))
{
lappedCount.lazySet(lappedCount.get() + 1);
if (!latest)
{
cursor = buffer.getLongVolatile(latestCounterIndex);
}
}

if (latest)
{
cursor = buffer.getLongVolatile(latestCounterIndex);
recordOffset = (int)cursor & (capacity - 1);
}

int recordOffset = (int)cursor & (capacity - 1);

this.cursor = cursor;
nextRecord = cursor + align(buffer.getInt(lengthOffset(recordOffset)), RECORD_ALIGNMENT);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,37 @@ void shouldReceiveTwoMessagesFromBuffer()
inOrder.verify(buffer).getLongVolatile(TAIL_INTENT_COUNTER_OFFSET);
}

@Test
void shouldReceiveLatestSecondMessagesFromBuffer()
{
final int length = 8;
final int recordLength = length + HEADER_LENGTH;
final int recordLengthAligned = align(recordLength, RECORD_ALIGNMENT);
final long tail = recordLengthAligned * 2L;
final long latestRecord = tail - recordLengthAligned;
final int recordOffsetTwo = (int)latestRecord;

when(buffer.getLongVolatile(TAIL_INTENT_COUNTER_OFFSET)).thenReturn(tail);
when(buffer.getLongVolatile(TAIL_COUNTER_INDEX)).thenReturn(tail);

when(buffer.getInt(lengthOffset(recordOffsetTwo))).thenReturn(recordLength);
when(buffer.getInt(typeOffset(recordOffsetTwo))).thenReturn(MSG_TYPE_ID);
when(buffer.getLongVolatile(TAIL_COUNTER_INDEX + length)).thenReturn(2L * length);

assertTrue(broadcastReceiver.receiveLatest());
assertThat(broadcastReceiver.typeId(), is(MSG_TYPE_ID));
assertThat(broadcastReceiver.buffer(), is(buffer));
assertThat(broadcastReceiver.offset(), is(msgOffset(recordOffsetTwo)));
assertThat(broadcastReceiver.length(), is(length));

assertTrue(broadcastReceiver.validate());

final InOrder inOrder = inOrder(buffer);
inOrder.verify(buffer).getLongVolatile(TAIL_COUNTER_INDEX);
inOrder.verify(buffer).getLongVolatile(TAIL_INTENT_COUNTER_OFFSET);
inOrder.verify(buffer).getLongVolatile(TAIL_INTENT_COUNTER_OFFSET);
}

@Test
void shouldLateJoinTransmission()
{
Expand Down