Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pulsar-broker] Fix Deadlock by Consumer and Reader #6728

Merged
merged 1 commit into from
Apr 14, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -692,49 +692,51 @@ private CompletableFuture<? extends Subscription> getNonDurableSubscription(Stri
CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<>();
log.info("[{}][{}] Creating non-durable subscription at msg id {}", topic, subscriptionName, startMessageId);

// Create a new non-durable cursor only for the first consumer that connects
Subscription subscription = subscriptions.computeIfAbsent(subscriptionName, name -> {
MessageIdImpl msgId = startMessageId != null ? (MessageIdImpl) startMessageId
: (MessageIdImpl) MessageId.latest;

long ledgerId = msgId.getLedgerId();
long entryId = msgId.getEntryId();
if (ledgerId >= 0
&& msgId instanceof BatchMessageIdImpl) {
// When the start message is relative to a batch, we need to take one step back on the previous message,
// because the "batch" might not have been consumed in its entirety.
// The client will then be able to discard the first messages if needed.
entryId = msgId.getEntryId() - 1;
}
synchronized (ledger) {
// Create a new non-durable cursor only for the first consumer that connects
Subscription subscription = subscriptions.computeIfAbsent(subscriptionName, name -> {
MessageIdImpl msgId = startMessageId != null ? (MessageIdImpl) startMessageId
: (MessageIdImpl) MessageId.latest;

long ledgerId = msgId.getLedgerId();
long entryId = msgId.getEntryId();
if (ledgerId >= 0
&& msgId instanceof BatchMessageIdImpl) {
// When the start message is relative to a batch, we need to take one step back on the previous message,
// because the "batch" might not have been consumed in its entirety.
// The client will then be able to discard the first messages if needed.
entryId = msgId.getEntryId() - 1;
}

Position startPosition = new PositionImpl(ledgerId, entryId);
ManagedCursor cursor = null;
try {
cursor = ledger.newNonDurableCursor(startPosition, subscriptionName);
} catch (ManagedLedgerException e) {
subscriptionFuture.completeExceptionally(e);
}
Position startPosition = new PositionImpl(ledgerId, entryId);
ManagedCursor cursor = null;
try {
cursor = ledger.newNonDurableCursor(startPosition, subscriptionName);
} catch (ManagedLedgerException e) {
subscriptionFuture.completeExceptionally(e);
}

return new PersistentSubscription(this, subscriptionName, cursor, false);
});
return new PersistentSubscription(this, subscriptionName, cursor, false);
});

if (!subscriptionFuture.isDone()) {
if (startMessageRollbackDurationSec > 0) {
long timestamp = System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(startMessageRollbackDurationSec);
subscription.resetCursor(timestamp).handle((s, ex) -> {
if (ex != null) {
log.warn("[{}] Failed to reset cursor {} position at timestamp {}", topic, subscriptionName,
startMessageRollbackDurationSec);
}
if (!subscriptionFuture.isDone()) {
if (startMessageRollbackDurationSec > 0) {
long timestamp = System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(startMessageRollbackDurationSec);
subscription.resetCursor(timestamp).handle((s, ex) -> {
if (ex != null) {
log.warn("[{}] Failed to reset cursor {} position at timestamp {}", topic, subscriptionName,
startMessageRollbackDurationSec);
}
subscriptionFuture.complete(subscription);
return null;
});
} else {
subscriptionFuture.complete(subscription);
return null;
});
}
} else {
subscriptionFuture.complete(subscription);
// failed to initialize managed-cursor: clean up created subscription
subscriptions.remove(subscriptionName);
}
} else {
// failed to initialize managed-cursor: clean up created subscription
subscriptions.remove(subscriptionName);
}

return subscriptionFuture;
Expand Down