Skip to content

Commit

Permalink
Return error response when updating cursor property fails
Browse files Browse the repository at this point in the history
  • Loading branch information
Masahiro Sakamoto committed Jun 9, 2021
1 parent 644a1fe commit de42532
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 4 deletions.
Expand Up @@ -4152,7 +4152,12 @@ private void internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(Async
}

if (topic instanceof PersistentTopic && sub instanceof PersistentSubscription) {
((PersistentSubscription) sub).setReplicated(enabled);
if (!((PersistentSubscription) sub).setReplicated(enabled)) {
asyncResponse.resume(
new RestException(Status.INTERNAL_SERVER_ERROR, "Failed to update cursor properties"));
return;
}

((PersistentTopic) topic).checkReplicatedSubscriptionControllerState();
log.info("[{}] Changed replicated subscription status to {} - {} {}", clientAppId(), enabled, topicName,
subName);
Expand Down
Expand Up @@ -178,7 +178,7 @@ public boolean isReplicated() {
return replicatedSubscriptionSnapshotCache != null;
}

public void setReplicated(boolean replicated) {
public boolean setReplicated(boolean replicated) {
ServiceConfiguration config = topic.getBrokerService().getPulsar().getConfig();

if (!replicated || !config.isEnableReplicatedSubscriptions()) {
Expand All @@ -190,11 +190,13 @@ public void setReplicated(boolean replicated) {

if (this.cursor != null) {
if (replicated) {
this.cursor.putProperty(REPLICATED_SUBSCRIPTION_PROPERTY, 1L);
return this.cursor.putProperty(REPLICATED_SUBSCRIPTION_PROPERTY, 1L);
} else {
this.cursor.removeProperty(REPLICATED_SUBSCRIPTION_PROPERTY);
return this.cursor.removeProperty(REPLICATED_SUBSCRIPTION_PROPERTY);
}
}

return false;
}

@Override
Expand Down

0 comments on commit de42532

Please sign in to comment.