Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[broker] Add REST API to enable or disable replicated subscriptions #10790

Merged
merged 12 commits into from Jun 25, 2021
Expand Up @@ -81,6 +81,16 @@ enum IndividualDeletedEntries {
*/
Map<String, Long> getProperties();

/**
* Add a property associated with the last stored position.
*/
boolean putProperty(String key, Long value);

/**
* Remove a property associated with the last stored position.
*/
boolean removeProperty(String key);

/**
* Read entries from the ManagedLedger, up to the specified number. The returned list can be smaller.
*
Expand Down
Expand Up @@ -285,6 +285,40 @@ public Map<String, Long> getProperties() {
return lastMarkDeleteEntry != null ? lastMarkDeleteEntry.properties : Collections.emptyMap();
}

@Override
public boolean putProperty(String key, Long value) {
if (lastMarkDeleteEntry != null) {
LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this, last -> {
Map<String, Long> properties = last.properties;
Map<String, Long> newProperties = properties == null ? Maps.newHashMap() : Maps.newHashMap(properties);
newProperties.put(key, value);

MarkDeleteEntry newLastMarkDeleteEntry = new MarkDeleteEntry(last.newPosition, newProperties,
last.callback, last.ctx);
newLastMarkDeleteEntry.callbackGroup = last.callbackGroup;

return newLastMarkDeleteEntry;
});
return true;
}
return false;
massakam marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public boolean removeProperty(String key) {
if (lastMarkDeleteEntry != null) {
LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this, last -> {
Map<String, Long> properties = last.properties;
if (properties != null && properties.containsKey(key)) {
properties.remove(key);
}
return last;
});
return true;
}
return false;
}

/**
* Performs the initial recovery, reading the mark-deleted position from the ledger and then calling initialize to
* have a new opened ledger.
Expand Down
Expand Up @@ -66,6 +66,16 @@ public Map<String, Long> getProperties() {
return Collections.emptyMap();
}

@Override
public boolean putProperty(String key, Long value) {
return false;
}

@Override
public boolean removeProperty(String key) {
return false;
}

@Override
public boolean isDurable() {
return true;
Expand Down
Expand Up @@ -585,6 +585,7 @@ public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topicName,
case EXPIRE_MESSAGES:
case PEEK_MESSAGES:
case RESET_CURSOR:
case SET_REPLICATED_SUBSCRIPTION_STATUS:
isAuthorizedFuture = canConsumeAsync(topicName, role, authData, authData.getSubscription());
break;
case TERMINATE:
Expand Down
Expand Up @@ -4040,4 +4040,134 @@ protected void internalTruncateTopic(AsyncResponse asyncResponse, boolean author
});
}
}

protected void internalSetReplicatedSubscriptionStatus(AsyncResponse asyncResponse, String subName,
boolean authoritative, boolean enabled) {
log.info("[{}] Attempting to change replicated subscription status to {} - {} {}", clientAppId(), enabled,
topicName, subName);

// Reject the request if the topic is not persistent
if (!topicName.isPersistent()) {
asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
"Cannot enable/disable replicated subscriptions on non-persistent topics"));
return;
}

// Reject the request if the topic is not global
if (!topicName.isGlobal()) {
asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
"Cannot enable/disable replicated subscriptions on non-global topics"));
return;
}

// Permission to consume this topic is required
try {
validateTopicOperation(topicName, TopicOperation.SET_REPLICATED_SUBSCRIPTION_STATUS, subName);
} catch (Exception e) {
resumeAsyncResponseExceptionally(asyncResponse, e);
return;
}

// Redirect the request to the peer-cluster if the local cluster is not included in the replication clusters
try {
validateGlobalNamespaceOwnership(namespaceName);
} catch (Exception e) {
resumeAsyncResponseExceptionally(asyncResponse, e);
return;
}

// If the topic name is a partition name, no need to get partition topic metadata again
if (topicName.isPartitioned()) {
315157973 marked this conversation as resolved.
Show resolved Hide resolved
internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse, subName, authoritative,
enabled);
} else {
getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions > 0) {
final List<CompletableFuture<Void>> futures = Lists.newArrayList();

for (int i = 0; i < partitionMetadata.partitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
futures.add(pulsar().getAdminClient().topics().setReplicatedSubscriptionStatusAsync(
topicNamePartition.toString(), subName, enabled));
} catch (Exception e) {
log.warn("[{}] Failed to change replicated subscription status to {} - {} {}",
clientAppId(), enabled, topicNamePartition, subName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
return;
}
}

FutureUtil.waitForAll(futures).handle((result, exception) -> {
if (exception != null) {
Throwable t = exception.getCause();
if (t instanceof NotFoundException) {
asyncResponse
.resume(new RestException(Status.NOT_FOUND, "Topic or subscription not found"));
return null;
} else if (t instanceof PreconditionFailedException) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Cannot enable/disable replicated subscriptions on non-global topics"));
return null;
} else {
log.warn("[{}] Failed to change replicated subscription status to {} - {} {}",
clientAppId(), enabled, topicName, subName, t);
asyncResponse.resume(new RestException(t));
return null;
}
}

asyncResponse.resume(Response.noContent().build());
return null;
});
} else {
internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse, subName, authoritative,
enabled);
}
}).exceptionally(ex -> {
log.warn("[{}] Failed to change replicated subscription status to {} - {} {}", clientAppId(), enabled,
topicName, subName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
}

private void internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(AsyncResponse asyncResponse,
String subName, boolean authoritative, boolean enabled) {
try {
// Redirect the request to the appropriate broker if this broker is not the owner of the topic
validateTopicOwnership(topicName, authoritative);

Topic topic = getTopicReference(topicName);
if (topic == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found"));
return;
}

Subscription sub = topic.getSubscription(subName);
if (sub == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
return;
}

if (topic instanceof PersistentTopic && sub instanceof PersistentSubscription) {
if (!((PersistentSubscription) sub).setReplicated(enabled)) {
asyncResponse.resume(
new RestException(Status.INTERNAL_SERVER_ERROR, "Failed to update cursor properties"));
return;
}

((PersistentTopic) topic).checkReplicatedSubscriptionControllerState();
log.info("[{}] Changed replicated subscription status to {} - {} {}", clientAppId(), enabled, topicName,
subName);
asyncResponse.resume(Response.noContent().build());
} else {
asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
"Cannot enable/disable replicated subscriptions on non-persistent topics"));
}
} catch (Exception e) {
resumeAsyncResponseExceptionally(asyncResponse, e);
}
}
}
Expand Up @@ -802,4 +802,43 @@ public void getLastMessageId(
asyncResponse.resume(new RestException(e));
}
}

@POST
@Path("/{tenant}/{cluster}/{namespace}/{topic}/subscription/{subName}/replicatedSubscriptionStatus")
@ApiOperation(value = "Enable or disable a replicated subscription on a topic.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or "
+ "subscriber is not authorized to access this operation"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic or subscription does not exist"),
@ApiResponse(code = 405, message = "Operation not allowed on this topic"),
@ApiResponse(code = 412, message = "Can't find owner for topic"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
public void setReplicatedSubscriptionStatus(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the cluster", required = true)
@PathParam("cluster") String cluster,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Name of subscription", required = true)
@PathParam("subName") String encodedSubName,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "Whether to enable replicated subscription", required = true)
boolean enabled) {
try {
validateTopicName(tenant, cluster, namespace, encodedTopic);
internalSetReplicatedSubscriptionStatus(asyncResponse, decode(encodedSubName), authoritative, enabled);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}
}
Expand Up @@ -3130,5 +3130,42 @@ public void truncateTopic(

}

@POST
@Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/replicatedSubscriptionStatus")
@ApiOperation(value = "Enable or disable a replicated subscription on a topic.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or "
+ "subscriber is not authorized to access this operation"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic or subscription does not exist"),
@ApiResponse(code = 405, message = "Operation not allowed on this topic"),
@ApiResponse(code = 412, message = "Can't find owner for topic"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
public void setReplicatedSubscriptionStatus(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Name of subscription", required = true)
@PathParam("subName") String encodedSubName,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "Whether to enable replicated subscription", required = true)
boolean enabled) {
try {
validateTopicName(tenant, namespace, encodedTopic);
internalSetReplicatedSubscriptionStatus(asyncResponse, decode(encodedSubName), authoritative, enabled);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class);
}
Expand Up @@ -47,6 +47,7 @@
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
Expand Down Expand Up @@ -177,12 +178,25 @@ public boolean isReplicated() {
return replicatedSubscriptionSnapshotCache != null;
}

void setReplicated(boolean replicated) {
this.replicatedSubscriptionSnapshotCache = replicated
? new ReplicatedSubscriptionSnapshotCache(subName,
topic.getBrokerService().pulsar().getConfiguration()
.getReplicatedSubscriptionsSnapshotMaxCachedPerSubscription())
: null;
public boolean setReplicated(boolean replicated) {
ServiceConfiguration config = topic.getBrokerService().getPulsar().getConfig();

if (!replicated || !config.isEnableReplicatedSubscriptions()) {
this.replicatedSubscriptionSnapshotCache = null;
} else if (this.replicatedSubscriptionSnapshotCache == null) {
this.replicatedSubscriptionSnapshotCache = new ReplicatedSubscriptionSnapshotCache(subName,
config.getReplicatedSubscriptionsSnapshotMaxCachedPerSubscription());
}

if (this.cursor != null) {
if (replicated) {
return this.cursor.putProperty(REPLICATED_SUBSCRIPTION_PROPERTY, 1L);
} else {
return this.cursor.removeProperty(REPLICATED_SUBSCRIPTION_PROPERTY);
}
}

return false;
}

@Override
Expand Down
Expand Up @@ -2702,7 +2702,7 @@ public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData schem
});
}

private synchronized void checkReplicatedSubscriptionControllerState() {
public synchronized void checkReplicatedSubscriptionControllerState() {
AtomicBoolean shouldBeEnabled = new AtomicBoolean(false);
subscriptions.forEach((name, subscription) -> {
if (subscription.isReplicated()) {
Expand Down