Skip to content

Commit

Permalink
[broker] Add REST API to enable or disable replicated subscriptions (#…
Browse files Browse the repository at this point in the history
…10790)

### Motivation

Currently, once a replicated subscription is created on a topic, there is no way for users to disable it. I think it is useful to have a REST API that allows users to enable or disable replicated subscriptions.

### Modifications

Added the following REST API endpoints:
```
/admin/persistent/{tenant}/{cluster}/{namespace}/{topic}/subscription/{subName}/replicatedSubscriptionStatus
/admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{subName}/replicatedSubscriptionStatus
```
We can enable/disable a replicated subscription by posting true or false to these endpoints.
  • Loading branch information
Masahiro Sakamoto committed Jun 25, 2021
1 parent 296ba76 commit 1d3e92e
Show file tree
Hide file tree
Showing 16 changed files with 680 additions and 8 deletions.
Expand Up @@ -81,6 +81,16 @@ enum IndividualDeletedEntries {
*/
Map<String, Long> getProperties();

/**
* Add a property associated with the last stored position.
*/
boolean putProperty(String key, Long value);

/**
* Remove a property associated with the last stored position.
*/
boolean removeProperty(String key);

/**
* Read entries from the ManagedLedger, up to the specified number. The returned list can be smaller.
*
Expand Down
Expand Up @@ -285,6 +285,40 @@ public Map<String, Long> getProperties() {
return lastMarkDeleteEntry != null ? lastMarkDeleteEntry.properties : Collections.emptyMap();
}

@Override
public boolean putProperty(String key, Long value) {
if (lastMarkDeleteEntry != null) {
LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this, last -> {
Map<String, Long> properties = last.properties;
Map<String, Long> newProperties = properties == null ? Maps.newHashMap() : Maps.newHashMap(properties);
newProperties.put(key, value);

MarkDeleteEntry newLastMarkDeleteEntry = new MarkDeleteEntry(last.newPosition, newProperties,
last.callback, last.ctx);
newLastMarkDeleteEntry.callbackGroup = last.callbackGroup;

return newLastMarkDeleteEntry;
});
return true;
}
return false;
}

@Override
public boolean removeProperty(String key) {
if (lastMarkDeleteEntry != null) {
LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this, last -> {
Map<String, Long> properties = last.properties;
if (properties != null && properties.containsKey(key)) {
properties.remove(key);
}
return last;
});
return true;
}
return false;
}

/**
* Performs the initial recovery, reading the mark-deleted position from the ledger and then calling initialize to
* have a new opened ledger.
Expand Down
Expand Up @@ -66,6 +66,16 @@ public Map<String, Long> getProperties() {
return Collections.emptyMap();
}

@Override
public boolean putProperty(String key, Long value) {
return false;
}

@Override
public boolean removeProperty(String key) {
return false;
}

@Override
public boolean isDurable() {
return true;
Expand Down
Expand Up @@ -581,6 +581,7 @@ public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topicName,
case EXPIRE_MESSAGES:
case PEEK_MESSAGES:
case RESET_CURSOR:
case SET_REPLICATED_SUBSCRIPTION_STATUS:
isAuthorizedFuture = canConsumeAsync(topicName, role, authData, authData.getSubscription());
break;
case TERMINATE:
Expand Down
Expand Up @@ -4036,4 +4036,134 @@ protected void internalTruncateTopic(AsyncResponse asyncResponse, boolean author
});
}
}

protected void internalSetReplicatedSubscriptionStatus(AsyncResponse asyncResponse, String subName,
boolean authoritative, boolean enabled) {
log.info("[{}] Attempting to change replicated subscription status to {} - {} {}", clientAppId(), enabled,
topicName, subName);

// Reject the request if the topic is not persistent
if (!topicName.isPersistent()) {
asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
"Cannot enable/disable replicated subscriptions on non-persistent topics"));
return;
}

// Reject the request if the topic is not global
if (!topicName.isGlobal()) {
asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
"Cannot enable/disable replicated subscriptions on non-global topics"));
return;
}

// Permission to consume this topic is required
try {
validateTopicOperation(topicName, TopicOperation.SET_REPLICATED_SUBSCRIPTION_STATUS, subName);
} catch (Exception e) {
resumeAsyncResponseExceptionally(asyncResponse, e);
return;
}

// Redirect the request to the peer-cluster if the local cluster is not included in the replication clusters
try {
validateGlobalNamespaceOwnership(namespaceName);
} catch (Exception e) {
resumeAsyncResponseExceptionally(asyncResponse, e);
return;
}

// If the topic name is a partition name, no need to get partition topic metadata again
if (topicName.isPartitioned()) {
internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse, subName, authoritative,
enabled);
} else {
getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions > 0) {
final List<CompletableFuture<Void>> futures = Lists.newArrayList();

for (int i = 0; i < partitionMetadata.partitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
futures.add(pulsar().getAdminClient().topics().setReplicatedSubscriptionStatusAsync(
topicNamePartition.toString(), subName, enabled));
} catch (Exception e) {
log.warn("[{}] Failed to change replicated subscription status to {} - {} {}",
clientAppId(), enabled, topicNamePartition, subName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
return;
}
}

FutureUtil.waitForAll(futures).handle((result, exception) -> {
if (exception != null) {
Throwable t = exception.getCause();
if (t instanceof NotFoundException) {
asyncResponse
.resume(new RestException(Status.NOT_FOUND, "Topic or subscription not found"));
return null;
} else if (t instanceof PreconditionFailedException) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Cannot enable/disable replicated subscriptions on non-global topics"));
return null;
} else {
log.warn("[{}] Failed to change replicated subscription status to {} - {} {}",
clientAppId(), enabled, topicName, subName, t);
asyncResponse.resume(new RestException(t));
return null;
}
}

asyncResponse.resume(Response.noContent().build());
return null;
});
} else {
internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse, subName, authoritative,
enabled);
}
}).exceptionally(ex -> {
log.warn("[{}] Failed to change replicated subscription status to {} - {} {}", clientAppId(), enabled,
topicName, subName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
}

private void internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(AsyncResponse asyncResponse,
String subName, boolean authoritative, boolean enabled) {
try {
// Redirect the request to the appropriate broker if this broker is not the owner of the topic
validateTopicOwnership(topicName, authoritative);

Topic topic = getTopicReference(topicName);
if (topic == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found"));
return;
}

Subscription sub = topic.getSubscription(subName);
if (sub == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
return;
}

if (topic instanceof PersistentTopic && sub instanceof PersistentSubscription) {
if (!((PersistentSubscription) sub).setReplicated(enabled)) {
asyncResponse.resume(
new RestException(Status.INTERNAL_SERVER_ERROR, "Failed to update cursor properties"));
return;
}

((PersistentTopic) topic).checkReplicatedSubscriptionControllerState();
log.info("[{}] Changed replicated subscription status to {} - {} {}", clientAppId(), enabled, topicName,
subName);
asyncResponse.resume(Response.noContent().build());
} else {
asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
"Cannot enable/disable replicated subscriptions on non-persistent topics"));
}
} catch (Exception e) {
resumeAsyncResponseExceptionally(asyncResponse, e);
}
}
}
Expand Up @@ -805,4 +805,43 @@ public void getLastMessageId(
asyncResponse.resume(new RestException(e));
}
}

@POST
@Path("/{tenant}/{cluster}/{namespace}/{topic}/subscription/{subName}/replicatedSubscriptionStatus")
@ApiOperation(value = "Enable or disable a replicated subscription on a topic.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or "
+ "subscriber is not authorized to access this operation"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic or subscription does not exist"),
@ApiResponse(code = 405, message = "Operation not allowed on this topic"),
@ApiResponse(code = 412, message = "Can't find owner for topic"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
public void setReplicatedSubscriptionStatus(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the cluster", required = true)
@PathParam("cluster") String cluster,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Name of subscription", required = true)
@PathParam("subName") String encodedSubName,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "Whether to enable replicated subscription", required = true)
boolean enabled) {
try {
validateTopicName(tenant, cluster, namespace, encodedTopic);
internalSetReplicatedSubscriptionStatus(asyncResponse, decode(encodedSubName), authoritative, enabled);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}
}
Expand Up @@ -3244,5 +3244,42 @@ public void truncateTopic(

}

@POST
@Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/replicatedSubscriptionStatus")
@ApiOperation(value = "Enable or disable a replicated subscription on a topic.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or "
+ "subscriber is not authorized to access this operation"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic or subscription does not exist"),
@ApiResponse(code = 405, message = "Operation not allowed on this topic"),
@ApiResponse(code = 412, message = "Can't find owner for topic"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
public void setReplicatedSubscriptionStatus(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Name of subscription", required = true)
@PathParam("subName") String encodedSubName,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "Whether to enable replicated subscription", required = true)
boolean enabled) {
try {
validateTopicName(tenant, namespace, encodedTopic);
internalSetReplicatedSubscriptionStatus(asyncResponse, decode(encodedSubName), authoritative, enabled);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class);
}
Expand Up @@ -47,6 +47,7 @@
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
Expand Down Expand Up @@ -172,12 +173,25 @@ public boolean isReplicated() {
return replicatedSubscriptionSnapshotCache != null;
}

void setReplicated(boolean replicated) {
this.replicatedSubscriptionSnapshotCache = replicated
? new ReplicatedSubscriptionSnapshotCache(subName,
topic.getBrokerService().pulsar().getConfiguration()
.getReplicatedSubscriptionsSnapshotMaxCachedPerSubscription())
: null;
public boolean setReplicated(boolean replicated) {
ServiceConfiguration config = topic.getBrokerService().getPulsar().getConfig();

if (!replicated || !config.isEnableReplicatedSubscriptions()) {
this.replicatedSubscriptionSnapshotCache = null;
} else if (this.replicatedSubscriptionSnapshotCache == null) {
this.replicatedSubscriptionSnapshotCache = new ReplicatedSubscriptionSnapshotCache(subName,
config.getReplicatedSubscriptionsSnapshotMaxCachedPerSubscription());
}

if (this.cursor != null) {
if (replicated) {
return this.cursor.putProperty(REPLICATED_SUBSCRIPTION_PROPERTY, 1L);
} else {
return this.cursor.removeProperty(REPLICATED_SUBSCRIPTION_PROPERTY);
}
}

return false;
}

@Override
Expand Down
Expand Up @@ -2724,7 +2724,7 @@ public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData schem
});
}

private synchronized void checkReplicatedSubscriptionControllerState() {
public synchronized void checkReplicatedSubscriptionControllerState() {
AtomicBoolean shouldBeEnabled = new AtomicBoolean(false);
subscriptions.forEach((name, subscription) -> {
if (subscription.isReplicated()) {
Expand Down

0 comments on commit 1d3e92e

Please sign in to comment.