Skip to content

Commit

Permalink
Add processing in sequence tests
Browse files Browse the repository at this point in the history
Add processing in sequence tests

#2021
  • Loading branch information
smcvb committed Aug 9, 2022
1 parent 65191b9 commit 183f11a
Showing 1 changed file with 82 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
import java.util.Deque;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -81,7 +83,7 @@ void testEnqueueThrowsDeadLetterQueueOverflowExceptionWhenMaxSequencesIsReached(
testSubject.enqueue(generateId(), generateInitialLetter());
}

String oneSequenceToMany = generateId();
Object oneSequenceToMany = generateId();
DeadLetter<M> testLetter = generateInitialLetter();
assertThrows(DeadLetterQueueOverflowException.class, () -> testSubject.enqueue(oneSequenceToMany, testLetter));
}
Expand Down Expand Up @@ -377,7 +379,6 @@ void testProcessReturnsFalseIfThereAreNoLetters() {
assertFalse(taskInvoked.get());
}


@Test
void testProcessReturnsTrueAndEvictsTheLetter() {
AtomicReference<DeadLetter<? extends M>> resultLetter = new AtomicReference<>();
Expand Down Expand Up @@ -453,6 +454,44 @@ void testProcessInvokesProcessingTaskWithLastTouchedOrder() {
assertLetter(testThatLetter, resultLetter.get());
}

@SuppressWarnings("ConstantConditions")
@Test
void testProcessHandlesAllLettersInSequence() {
AtomicReference<Deque<DeadLetter<? extends M>>> resultLetters = new AtomicReference<>();
Function<DeadLetter<? extends M>, EnqueueDecision<M>> testTask = letter -> {
Deque<DeadLetter<? extends M>> sequence = resultLetters.get();
if (sequence == null) {
sequence = new LinkedList<>();
}
sequence.addLast(letter);
resultLetters.set(sequence);
return Decisions.evict();
};

Object testId = generateId();
DeadLetter<? extends M> firstTestLetter = generateInitialLetter();
testSubject.enqueue(testId, firstTestLetter);
// Move time to impose changes to enqueuedAt and lastTouched.
setAndGetTime(Instant.now());
DeadLetter<? extends M> secondTestLetter = generateFollowUpLetter();
testSubject.enqueueIfPresent(testId, () -> secondTestLetter);
// Move time to impose changes to enqueuedAt and lastTouched.
setAndGetTime(Instant.now());
DeadLetter<? extends M> thirdTestLetter = generateFollowUpLetter();
testSubject.enqueueIfPresent(testId, () -> thirdTestLetter);

// Add another letter in a different sequence that we do not expect to receive.
testSubject.enqueue(generateId(), generateInitialLetter());

boolean result = testSubject.process(testTask);
assertTrue(result);
Deque<DeadLetter<? extends M>> resultSequence = resultLetters.get();

assertLetter(firstTestLetter, resultSequence.pollFirst());
assertLetter(secondTestLetter, resultSequence.pollFirst());
assertLetter(thirdTestLetter, resultSequence.pollFirst());
}

/**
* A "claimed sequence" in this case means that a process task for a given "sequence identifier" is still processing
* the sequence. Furthermore, if it's the sole sequence, the processing task will not be invoked. This approach
Expand Down Expand Up @@ -596,6 +635,44 @@ void testProcessLetterPredicateReturnsFalseAndRequeuesTheLetter() {
assertTrue(testSubject.deadLetters().iterator().hasNext());
}

@SuppressWarnings("ConstantConditions")
@Test
void testProcessLetterPredicateHandlesAllLettersInSequence() {
AtomicReference<Deque<DeadLetter<? extends M>>> resultLetters = new AtomicReference<>();
Function<DeadLetter<? extends M>, EnqueueDecision<M>> testTask = letter -> {
Deque<DeadLetter<? extends M>> sequence = resultLetters.get();
if (sequence == null) {
sequence = new LinkedList<>();
}
sequence.addLast(letter);
resultLetters.set(sequence);
return Decisions.evict();
};

Object testId = generateId();
DeadLetter<? extends M> firstTestLetter = generateInitialLetter();
testSubject.enqueue(testId, firstTestLetter);
// Move time to impose changes to enqueuedAt and lastTouched.
setAndGetTime(Instant.now());
DeadLetter<? extends M> secondTestLetter = generateFollowUpLetter();
testSubject.enqueueIfPresent(testId, () -> secondTestLetter);
// Move time to impose changes to enqueuedAt and lastTouched.
setAndGetTime(Instant.now());
DeadLetter<? extends M> thirdTestLetter = generateFollowUpLetter();
testSubject.enqueueIfPresent(testId, () -> thirdTestLetter);

// Add another letter in a different sequence that we do not expect to receive.
testSubject.enqueue(generateId(), generateInitialLetter());

boolean result = testSubject.process(letter -> letter.equals(firstTestLetter), testTask);
assertTrue(result);
Deque<DeadLetter<? extends M>> resultSequence = resultLetters.get();

assertLetter(firstTestLetter, resultSequence.pollFirst());
assertLetter(secondTestLetter, resultSequence.pollFirst());
assertLetter(thirdTestLetter, resultSequence.pollFirst());
}

/**
* A "claimed sequence" in this case means that a process task for a given "sequence identifier" is still processing
* the sequence. Furthermore, if it's the sole sequence, the processing task will not be invoked. This approach
Expand Down Expand Up @@ -674,11 +751,11 @@ void testClearRemovesAllEntries() {
}

/**
* Generate a unique {@link String} based on {@link UUID#randomUUID()}.
* Generate a unique {@link Object} based on {@link UUID#randomUUID()}.
*
* @return A unique {@link String}, based on {@link UUID#randomUUID()}.
* @return A unique {@link Object}, based on {@link UUID#randomUUID()}.
*/
protected static String generateId() {
protected static Object generateId() {
return UUID.randomUUID().toString();
}

Expand Down

0 comments on commit 183f11a

Please sign in to comment.