Skip to content

Commit

Permalink
Remove usage of Long
Browse files Browse the repository at this point in the history
  • Loading branch information
Masahiro Sakamoto committed Jul 6, 2021
1 parent 1f49e47 commit 5a262ce
Showing 1 changed file with 19 additions and 10 deletions.
Expand Up @@ -722,7 +722,7 @@ public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, List
positions.forEach(position -> {
// TODO: We want to pass a sticky key hash as a third argument to guarantee the order of the messages
// on Key_Shared subscription, but it's difficult to get the sticky key here
if (addMessageToReplay(position.getLedgerId(), position.getEntryId(), null)) {
if (addMessageToReplay(position.getLedgerId(), position.getEntryId())) {
redeliveryTracker.addIfAbsent(position);
}
});
Expand Down Expand Up @@ -868,21 +868,30 @@ public void cursorIsReset() {
}
}

protected boolean addMessageToReplay(long ledgerId, long entryId, Long stickyKeyHash) {
Position markDeletePosition = cursor.getMarkDeletedPosition();
if (markDeletePosition == null || ledgerId > markDeletePosition.getLedgerId()
|| (ledgerId == markDeletePosition.getLedgerId() && entryId > markDeletePosition.getEntryId())) {
if (stickyKeyHash == null) {
redeliveryMessages.add(ledgerId, entryId);
} else {
redeliveryMessages.add(ledgerId, entryId, stickyKeyHash);
}
protected boolean addMessageToReplay(long ledgerId, long entryId, long stickyKeyHash) {
if (checkIfMessageIsUnacked(ledgerId, entryId)) {
redeliveryMessages.add(ledgerId, entryId, stickyKeyHash);
return true;
} else {
return false;
}
}

protected boolean addMessageToReplay(long ledgerId, long entryId) {
if (checkIfMessageIsUnacked(ledgerId, entryId)) {
redeliveryMessages.add(ledgerId, entryId);
return true;
} else {
return false;
}
}

private boolean checkIfMessageIsUnacked(long ledgerId, long entryId) {
Position markDeletePosition = cursor.getMarkDeletedPosition();
return (markDeletePosition == null || ledgerId > markDeletePosition.getLedgerId()
|| (ledgerId == markDeletePosition.getLedgerId() && entryId > markDeletePosition.getEntryId()));
}

@Override
public boolean checkAndUnblockIfStuck() {
if (cursor.checkAndUpdateReadPositionChanged()) {
Expand Down

0 comments on commit 5a262ce

Please sign in to comment.