diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 4daabc676af27..244c142bed0e1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -4152,7 +4152,12 @@ private void internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(Async } if (topic instanceof PersistentTopic && sub instanceof PersistentSubscription) { - ((PersistentSubscription) sub).setReplicated(enabled); + if (!((PersistentSubscription) sub).setReplicated(enabled)) { + asyncResponse.resume( + new RestException(Status.INTERNAL_SERVER_ERROR, "Failed to update cursor properties")); + return; + } + ((PersistentTopic) topic).checkReplicatedSubscriptionControllerState(); log.info("[{}] Changed replicated subscription status to {} - {} {}", clientAppId(), enabled, topicName, subName); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 1edc273c46eb7..973139a0502ef 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -178,7 +178,7 @@ public boolean isReplicated() { return replicatedSubscriptionSnapshotCache != null; } - public void setReplicated(boolean replicated) { + public boolean setReplicated(boolean replicated) { ServiceConfiguration config = topic.getBrokerService().getPulsar().getConfig(); if (!replicated || !config.isEnableReplicatedSubscriptions()) { @@ -190,11 +190,13 @@ public void setReplicated(boolean replicated) { if (this.cursor != null) { if (replicated) { - this.cursor.putProperty(REPLICATED_SUBSCRIPTION_PROPERTY, 1L); + return this.cursor.putProperty(REPLICATED_SUBSCRIPTION_PROPERTY, 1L); } else { - this.cursor.removeProperty(REPLICATED_SUBSCRIPTION_PROPERTY); + return this.cursor.removeProperty(REPLICATED_SUBSCRIPTION_PROPERTY); } } + + return false; } @Override