Skip to content

Commit

Permalink
Add synchronized blocks around the deadLetters collection
Browse files Browse the repository at this point in the history
To further protect the InMemorySequencedDeadLetterQueue against
concurrent changes on the dead-letters collection, we should add some
synchronization around it when accessing. Main point of attention here
are the evict and requeue operations which might remove a sequence or
bring it back. Due to this, contains and enqueue should also synchronize
The process operation does not receive any synchronized-blocks as it
works through the takenSequences set.

#2021
  • Loading branch information
smcvb committed Aug 8, 2022
1 parent dd1228d commit 3d8fdab
Showing 1 changed file with 23 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,10 @@ public void enqueue(@Nonnull Object sequenceIdentifier,
}
}

deadLetters.computeIfAbsent(toIdentifier(sequenceIdentifier), id -> new ConcurrentLinkedDeque<>())
.addLast(letter);
synchronized (deadLetters) {
deadLetters.computeIfAbsent(toIdentifier(sequenceIdentifier), id -> new ConcurrentLinkedDeque<>())
.addLast(letter);
}
}

@Override
Expand All @@ -132,14 +134,15 @@ public void evict(DeadLetter<? extends M> letter) {
.findFirst();

if (optionalSequence.isPresent()) {
String sequenceId = optionalSequence.get().getKey();
Deque<DeadLetter<? extends M>> sequence = optionalSequence.get().getValue();
if (sequence.isEmpty()) {
logger.trace("Sequence [{}] is empty and will be removed.", sequenceId);
deadLetters.remove(sequenceId);
}
if (logger.isTraceEnabled()) {
logger.trace("Evicted letter [{}] for sequence [{}].", letter, sequenceId);
synchronized (deadLetters) {
String sequenceId = optionalSequence.get().getKey();
if (deadLetters.get(sequenceId).isEmpty()) {
logger.trace("Sequence [{}] is empty and will be removed.", sequenceId);
deadLetters.remove(sequenceId);
}
if (logger.isTraceEnabled()) {
logger.trace("Evicted letter [{}] for sequence [{}].", letter, sequenceId);
}
}
} else if (logger.isDebugEnabled()) {
logger.debug("Cannot evict [{}] as it could not be found in this queue.", letter);
Expand All @@ -158,11 +161,13 @@ public void requeue(
.findFirst();

if (optionalSequence.isPresent()) {
String sequenceId = optionalSequence.get().getKey();
Deque<DeadLetter<? extends M>> sequence = optionalSequence.get().getValue();
sequence.addFirst(letterUpdater.apply(letter.markTouched()));
if (logger.isTraceEnabled()) {
logger.trace("Requeued letter [{}] for sequence [{}].", letter, sequenceId);
synchronized (deadLetters) {
String sequenceId = optionalSequence.get().getKey();
deadLetters.get(sequenceId)
.addFirst(letterUpdater.apply(letter.markTouched()));
if (logger.isTraceEnabled()) {
logger.trace("Requeued letter [{}] for sequence [{}].", letter, sequenceId);
}
}
} else {
throw new NoSuchDeadLetterException(
Expand All @@ -186,7 +191,9 @@ public Iterable<DeadLetter<? extends M>> deadLetterSequence(@Nonnull Object sequ
}

private boolean contains(String identifier) {
return deadLetters.containsKey(identifier);
synchronized (deadLetters) {
return deadLetters.containsKey(identifier);
}
}

@Override
Expand Down

0 comments on commit 3d8fdab

Please sign in to comment.