From de425322022ebcf2795dce14a7f935c8613bacad Mon Sep 17 00:00:00 2001 From: Masahiro Sakamoto Date: Thu, 10 Jun 2021 00:13:30 +0900 Subject: [PATCH] Return error response when updating cursor property fails --- .../pulsar/broker/admin/impl/PersistentTopicsBase.java | 7 ++++++- .../broker/service/persistent/PersistentSubscription.java | 8 +++++--- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 4daabc676af27..244c142bed0e1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -4152,7 +4152,12 @@ private void internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(Async } if (topic instanceof PersistentTopic && sub instanceof PersistentSubscription) { - ((PersistentSubscription) sub).setReplicated(enabled); + if (!((PersistentSubscription) sub).setReplicated(enabled)) { + asyncResponse.resume( + new RestException(Status.INTERNAL_SERVER_ERROR, "Failed to update cursor properties")); + return; + } + ((PersistentTopic) topic).checkReplicatedSubscriptionControllerState(); log.info("[{}] Changed replicated subscription status to {} - {} {}", clientAppId(), enabled, topicName, subName); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 1edc273c46eb7..973139a0502ef 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -178,7 +178,7 @@ public boolean isReplicated() { return replicatedSubscriptionSnapshotCache != null; } - public void setReplicated(boolean replicated) { + public boolean setReplicated(boolean replicated) { ServiceConfiguration config = topic.getBrokerService().getPulsar().getConfig(); if (!replicated || !config.isEnableReplicatedSubscriptions()) { @@ -190,11 +190,13 @@ public void setReplicated(boolean replicated) { if (this.cursor != null) { if (replicated) { - this.cursor.putProperty(REPLICATED_SUBSCRIPTION_PROPERTY, 1L); + return this.cursor.putProperty(REPLICATED_SUBSCRIPTION_PROPERTY, 1L); } else { - this.cursor.removeProperty(REPLICATED_SUBSCRIPTION_PROPERTY); + return this.cursor.removeProperty(REPLICATED_SUBSCRIPTION_PROPERTY); } } + + return false; } @Override