diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java index 03419dee14d8b..4af64550fe53b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java @@ -81,6 +81,16 @@ enum IndividualDeletedEntries { */ Map getProperties(); + /** + * Add a property associated with the last stored position. + */ + boolean putProperty(String key, Long value); + + /** + * Remove a property associated with the last stored position. + */ + boolean removeProperty(String key); + /** * Read entries from the ManagedLedger, up to the specified number. The returned list can be smaller. * diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index bc19373453c1e..da93df28fd65e 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -285,6 +285,40 @@ public Map getProperties() { return lastMarkDeleteEntry != null ? lastMarkDeleteEntry.properties : Collections.emptyMap(); } + @Override + public boolean putProperty(String key, Long value) { + if (lastMarkDeleteEntry != null) { + LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this, last -> { + Map properties = last.properties; + Map newProperties = properties == null ? Maps.newHashMap() : Maps.newHashMap(properties); + newProperties.put(key, value); + + MarkDeleteEntry newLastMarkDeleteEntry = new MarkDeleteEntry(last.newPosition, newProperties, + last.callback, last.ctx); + newLastMarkDeleteEntry.callbackGroup = last.callbackGroup; + + return newLastMarkDeleteEntry; + }); + return true; + } + return false; + } + + @Override + public boolean removeProperty(String key) { + if (lastMarkDeleteEntry != null) { + LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this, last -> { + Map properties = last.properties; + if (properties != null && properties.containsKey(key)) { + properties.remove(key); + } + return last; + }); + return true; + } + return false; + } + /** * Performs the initial recovery, reading the mark-deleted position from the ledger and then calling initialize to * have a new opened ledger. diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java index 549027827efdd..5ed8b293ff580 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java @@ -66,6 +66,16 @@ public Map getProperties() { return Collections.emptyMap(); } + @Override + public boolean putProperty(String key, Long value) { + return false; + } + + @Override + public boolean removeProperty(String key) { + return false; + } + @Override public boolean isDurable() { return true; diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java index c6eb60ccab21d..cdc57df30118a 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java @@ -581,6 +581,7 @@ public CompletableFuture allowTopicOperationAsync(TopicName topicName, case EXPIRE_MESSAGES: case PEEK_MESSAGES: case RESET_CURSOR: + case SET_REPLICATED_SUBSCRIPTION_STATUS: isAuthorizedFuture = canConsumeAsync(topicName, role, authData, authData.getSubscription()); break; case TERMINATE: diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 5b1ac6b17d348..195bff48511e3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -4036,4 +4036,134 @@ protected void internalTruncateTopic(AsyncResponse asyncResponse, boolean author }); } } + + protected void internalSetReplicatedSubscriptionStatus(AsyncResponse asyncResponse, String subName, + boolean authoritative, boolean enabled) { + log.info("[{}] Attempting to change replicated subscription status to {} - {} {}", clientAppId(), enabled, + topicName, subName); + + // Reject the request if the topic is not persistent + if (!topicName.isPersistent()) { + asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED, + "Cannot enable/disable replicated subscriptions on non-persistent topics")); + return; + } + + // Reject the request if the topic is not global + if (!topicName.isGlobal()) { + asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED, + "Cannot enable/disable replicated subscriptions on non-global topics")); + return; + } + + // Permission to consume this topic is required + try { + validateTopicOperation(topicName, TopicOperation.SET_REPLICATED_SUBSCRIPTION_STATUS, subName); + } catch (Exception e) { + resumeAsyncResponseExceptionally(asyncResponse, e); + return; + } + + // Redirect the request to the peer-cluster if the local cluster is not included in the replication clusters + try { + validateGlobalNamespaceOwnership(namespaceName); + } catch (Exception e) { + resumeAsyncResponseExceptionally(asyncResponse, e); + return; + } + + // If the topic name is a partition name, no need to get partition topic metadata again + if (topicName.isPartitioned()) { + internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse, subName, authoritative, + enabled); + } else { + getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> { + if (partitionMetadata.partitions > 0) { + final List> futures = Lists.newArrayList(); + + for (int i = 0; i < partitionMetadata.partitions; i++) { + TopicName topicNamePartition = topicName.getPartition(i); + try { + futures.add(pulsar().getAdminClient().topics().setReplicatedSubscriptionStatusAsync( + topicNamePartition.toString(), subName, enabled)); + } catch (Exception e) { + log.warn("[{}] Failed to change replicated subscription status to {} - {} {}", + clientAppId(), enabled, topicNamePartition, subName, e); + resumeAsyncResponseExceptionally(asyncResponse, e); + return; + } + } + + FutureUtil.waitForAll(futures).handle((result, exception) -> { + if (exception != null) { + Throwable t = exception.getCause(); + if (t instanceof NotFoundException) { + asyncResponse + .resume(new RestException(Status.NOT_FOUND, "Topic or subscription not found")); + return null; + } else if (t instanceof PreconditionFailedException) { + asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, + "Cannot enable/disable replicated subscriptions on non-global topics")); + return null; + } else { + log.warn("[{}] Failed to change replicated subscription status to {} - {} {}", + clientAppId(), enabled, topicName, subName, t); + asyncResponse.resume(new RestException(t)); + return null; + } + } + + asyncResponse.resume(Response.noContent().build()); + return null; + }); + } else { + internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse, subName, authoritative, + enabled); + } + }).exceptionally(ex -> { + log.warn("[{}] Failed to change replicated subscription status to {} - {} {}", clientAppId(), enabled, + topicName, subName, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } + } + + private void internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(AsyncResponse asyncResponse, + String subName, boolean authoritative, boolean enabled) { + try { + // Redirect the request to the appropriate broker if this broker is not the owner of the topic + validateTopicOwnership(topicName, authoritative); + + Topic topic = getTopicReference(topicName); + if (topic == null) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found")); + return; + } + + Subscription sub = topic.getSubscription(subName); + if (sub == null) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found")); + return; + } + + if (topic instanceof PersistentTopic && sub instanceof PersistentSubscription) { + if (!((PersistentSubscription) sub).setReplicated(enabled)) { + asyncResponse.resume( + new RestException(Status.INTERNAL_SERVER_ERROR, "Failed to update cursor properties")); + return; + } + + ((PersistentTopic) topic).checkReplicatedSubscriptionControllerState(); + log.info("[{}] Changed replicated subscription status to {} - {} {}", clientAppId(), enabled, topicName, + subName); + asyncResponse.resume(Response.noContent().build()); + } else { + asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED, + "Cannot enable/disable replicated subscriptions on non-persistent topics")); + } + } catch (Exception e) { + resumeAsyncResponseExceptionally(asyncResponse, e); + } + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java index babff0d006b70..fee8707edc637 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java @@ -805,4 +805,43 @@ public void getLastMessageId( asyncResponse.resume(new RestException(e)); } } + + @POST + @Path("/{tenant}/{cluster}/{namespace}/{topic}/subscription/{subName}/replicatedSubscriptionStatus") + @ApiOperation(value = "Enable or disable a replicated subscription on a topic.") + @ApiResponses(value = { + @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), + @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or " + + "subscriber is not authorized to access this operation"), + @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Topic or subscription does not exist"), + @ApiResponse(code = 405, message = "Operation not allowed on this topic"), + @ApiResponse(code = 412, message = "Can't find owner for topic"), + @ApiResponse(code = 500, message = "Internal server error"), + @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")}) + public void setReplicatedSubscriptionStatus( + @Suspended final AsyncResponse asyncResponse, + @ApiParam(value = "Specify the tenant", required = true) + @PathParam("tenant") String tenant, + @ApiParam(value = "Specify the cluster", required = true) + @PathParam("cluster") String cluster, + @ApiParam(value = "Specify the namespace", required = true) + @PathParam("namespace") String namespace, + @ApiParam(value = "Specify topic name", required = true) + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Name of subscription", required = true) + @PathParam("subName") String encodedSubName, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @ApiParam(value = "Whether to enable replicated subscription", required = true) + boolean enabled) { + try { + validateTopicName(tenant, cluster, namespace, encodedTopic); + internalSetReplicatedSubscriptionStatus(asyncResponse, decode(encodedSubName), authoritative, enabled); + } catch (WebApplicationException wae) { + asyncResponse.resume(wae); + } catch (Exception e) { + asyncResponse.resume(new RestException(e)); + } + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 018749af7f27d..ead3b79386c39 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -3244,5 +3244,42 @@ public void truncateTopic( } + @POST + @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/replicatedSubscriptionStatus") + @ApiOperation(value = "Enable or disable a replicated subscription on a topic.") + @ApiResponses(value = { + @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), + @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or " + + "subscriber is not authorized to access this operation"), + @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Topic or subscription does not exist"), + @ApiResponse(code = 405, message = "Operation not allowed on this topic"), + @ApiResponse(code = 412, message = "Can't find owner for topic"), + @ApiResponse(code = 500, message = "Internal server error"), + @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")}) + public void setReplicatedSubscriptionStatus( + @Suspended final AsyncResponse asyncResponse, + @ApiParam(value = "Specify the tenant", required = true) + @PathParam("tenant") String tenant, + @ApiParam(value = "Specify the namespace", required = true) + @PathParam("namespace") String namespace, + @ApiParam(value = "Specify topic name", required = true) + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Name of subscription", required = true) + @PathParam("subName") String encodedSubName, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @ApiParam(value = "Whether to enable replicated subscription", required = true) + boolean enabled) { + try { + validateTopicName(tenant, namespace, encodedTopic); + internalSetReplicatedSubscriptionStatus(asyncResponse, decode(encodedSubName), authoritative, enabled); + } catch (WebApplicationException wae) { + asyncResponse.resume(wae); + } catch (Exception e) { + asyncResponse.resume(new RestException(e)); + } + } + private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 316ac1099e7fa..f6f390c346f06 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -47,6 +47,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.lang3.tuple.MutablePair; +import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.intercept.BrokerInterceptor; import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException; @@ -172,12 +173,25 @@ public boolean isReplicated() { return replicatedSubscriptionSnapshotCache != null; } - void setReplicated(boolean replicated) { - this.replicatedSubscriptionSnapshotCache = replicated - ? new ReplicatedSubscriptionSnapshotCache(subName, - topic.getBrokerService().pulsar().getConfiguration() - .getReplicatedSubscriptionsSnapshotMaxCachedPerSubscription()) - : null; + public boolean setReplicated(boolean replicated) { + ServiceConfiguration config = topic.getBrokerService().getPulsar().getConfig(); + + if (!replicated || !config.isEnableReplicatedSubscriptions()) { + this.replicatedSubscriptionSnapshotCache = null; + } else if (this.replicatedSubscriptionSnapshotCache == null) { + this.replicatedSubscriptionSnapshotCache = new ReplicatedSubscriptionSnapshotCache(subName, + config.getReplicatedSubscriptionsSnapshotMaxCachedPerSubscription()); + } + + if (this.cursor != null) { + if (replicated) { + return this.cursor.putProperty(REPLICATED_SUBSCRIPTION_PROPERTY, 1L); + } else { + return this.cursor.removeProperty(REPLICATED_SUBSCRIPTION_PROPERTY); + } + } + + return false; } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index dbce58544ecfe..0e863494fccae 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -2724,7 +2724,7 @@ public CompletableFuture addSchemaIfIdleOrCheckCompatible(SchemaData schem }); } - private synchronized void checkReplicatedSubscriptionControllerState() { + public synchronized void checkReplicatedSubscriptionControllerState() { AtomicBoolean shouldBeEnabled = new AtomicBoolean(false); subscriptions.forEach((name, subscription) -> { if (subscription.isReplicated()) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index a1c7276d35764..dd3cf029bda77 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -714,4 +714,86 @@ public void testOffloadWithNullMessageId() { } } + @Test + public void testSetReplicatedSubscriptionStatus() { + final String topicName = "topic-with-repl-sub"; + final String partitionName = topicName + "-partition-0"; + final String subName = "sub"; + + // 1) Return 404 if that the topic does not exist + AsyncResponse response = mock(AsyncResponse.class); + persistentTopics.setReplicatedSubscriptionStatus(response, testTenant, testNamespace, topicName, subName, true, + true); + ArgumentCaptor errorCaptor = ArgumentCaptor.forClass(RestException.class); + verify(response, timeout(10000).times(1)).resume(errorCaptor.capture()); + Assert.assertEquals(errorCaptor.getValue().getResponse().getStatus(), + Response.Status.NOT_FOUND.getStatusCode()); + + // 2) Return 404 if that the partitioned topic does not exist + response = mock(AsyncResponse.class); + persistentTopics.setReplicatedSubscriptionStatus(response, testTenant, testNamespace, partitionName, subName, + true, true); + errorCaptor = ArgumentCaptor.forClass(RestException.class); + verify(response, timeout(10000).times(1)).resume(errorCaptor.capture()); + Assert.assertEquals(errorCaptor.getValue().getResponse().getStatus(), + Response.Status.NOT_FOUND.getStatusCode()); + + // 3) Create the partitioned topic + response = mock(AsyncResponse.class); + ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class); + persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, topicName, 2); + verify(response, timeout(10000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); + + // 4) Create a subscription + response = mock(AsyncResponse.class); + persistentTopics.createSubscription(response, testTenant, testNamespace, topicName, subName, true, + (MessageIdImpl) MessageId.latest, false); + responseCaptor = ArgumentCaptor.forClass(Response.class); + verify(response, timeout(10000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); + + // 5) Enable replicated subscription on the partitioned topic + response = mock(AsyncResponse.class); + persistentTopics.setReplicatedSubscriptionStatus(response, testTenant, testNamespace, topicName, subName, true, + true); + verify(response, timeout(10000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); + + // 6) Disable replicated subscription on the partitioned topic + response = mock(AsyncResponse.class); + persistentTopics.setReplicatedSubscriptionStatus(response, testTenant, testNamespace, topicName, subName, true, + false); + verify(response, timeout(10000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); + + // 7) Enable replicated subscription on the partition + response = mock(AsyncResponse.class); + persistentTopics.setReplicatedSubscriptionStatus(response, testTenant, testNamespace, partitionName, subName, + true, true); + verify(response, timeout(10000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); + + // 8) Disable replicated subscription on the partition + response = mock(AsyncResponse.class); + persistentTopics.setReplicatedSubscriptionStatus(response, testTenant, testNamespace, partitionName, subName, + true, false); + verify(response, timeout(10000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); + + // 9) Delete the subscription + response = mock(AsyncResponse.class); + persistentTopics.deleteSubscription(response, testTenant, testNamespace, topicName, subName, false, true); + responseCaptor = ArgumentCaptor.forClass(Response.class); + verify(response, timeout(10000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); + + // 10) Delete the partitioned topic + response = mock(AsyncResponse.class); + persistentTopics.deletePartitionedTopic(response, testTenant, testNamespace, topicName, true, true, false); + responseCaptor = ArgumentCaptor.forClass(Response.class); + verify(response, timeout(10000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); + } + } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java index 6cb7c4db9155d..87cc281ba5c57 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java @@ -21,6 +21,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertTrue; import com.google.common.collect.Sets; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -40,6 +41,8 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.policies.data.PartitionedTopicStats; +import org.apache.pulsar.common.policies.data.TopicStats; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.AfterClass; @@ -230,7 +233,240 @@ public void testReplicationSnapshotStopWhenNoTraffic() throws Exception { assertNotEquals(rsc2.getLastCompletedSnapshotId().get(), snapshot2); } - void readMessages(Consumer consumer, Set messages, int maxMessages, boolean allowDuplicates) + @Test(timeOut = 30000) + public void testReplicatedSubscriptionRestApi1() throws Exception { + final String namespace = BrokerTestUtil.newUniqueName("pulsar/replicatedsubscription"); + final String topicName = "persistent://" + namespace + "/topic-rest-api1"; + final String subName = "sub"; + // Subscription replication produces duplicates, https://github.com/apache/pulsar/issues/10054 + // TODO: duplications shouldn't be allowed, change to "false" when fixing the issue + final boolean allowDuplicates = true; + + admin1.namespaces().createNamespace(namespace); + admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2")); + + @Cleanup + final PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()) + .statsInterval(0, TimeUnit.SECONDS).build(); + + // Create subscription in r1 + createReplicatedSubscription(client1, topicName, subName, true); + + @Cleanup + final PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()) + .statsInterval(0, TimeUnit.SECONDS).build(); + + // Create subscription in r2 + createReplicatedSubscription(client2, topicName, subName, true); + + TopicStats stats = admin1.topics().getStats(topicName); + assertTrue(stats.getSubscriptions().get(subName).isReplicated()); + + // Disable replicated subscription in r1 + admin1.topics().setReplicatedSubscriptionStatus(topicName, subName, false); + stats = admin1.topics().getStats(topicName); + assertFalse(stats.getSubscriptions().get(subName).isReplicated()); + stats = admin2.topics().getStats(topicName); + assertTrue(stats.getSubscriptions().get(subName).isReplicated()); + + // Disable replicated subscription in r2 + admin2.topics().setReplicatedSubscriptionStatus(topicName, subName, false); + stats = admin2.topics().getStats(topicName); + assertFalse(stats.getSubscriptions().get(subName).isReplicated()); + + // Unload topic in r1 + admin1.topics().unload(topicName); + Thread.sleep(1000); + stats = admin1.topics().getStats(topicName); + assertFalse(stats.getSubscriptions().get(subName).isReplicated()); + + // Make sure the replicated subscription is actually disabled + final int numMessages = 20; + final Set sentMessages = new LinkedHashSet<>(); + final Set receivedMessages = new LinkedHashSet<>(); + + Producer producer = client1.newProducer().topic(topicName).enableBatching(false).create(); + sentMessages.clear(); + publishMessages(producer, 0, numMessages, sentMessages); + producer.close(); + + Consumer consumer1 = client1.newConsumer().topic(topicName).subscriptionName(subName).subscribe(); + receivedMessages.clear(); + readMessages(consumer1, receivedMessages, numMessages, false); + assertEquals(receivedMessages, sentMessages); + consumer1.close(); + + Consumer consumer2 = client2.newConsumer().topic(topicName).subscriptionName(subName).subscribe(); + receivedMessages.clear(); + readMessages(consumer2, receivedMessages, numMessages, false); + assertEquals(receivedMessages, sentMessages); + consumer2.close(); + + // Enable replicated subscription in r1 + admin1.topics().setReplicatedSubscriptionStatus(topicName, subName, true); + stats = admin1.topics().getStats(topicName); + assertTrue(stats.getSubscriptions().get(subName).isReplicated()); + stats = admin2.topics().getStats(topicName); + assertFalse(stats.getSubscriptions().get(subName).isReplicated()); + + // Enable replicated subscription in r2 + admin2.topics().setReplicatedSubscriptionStatus(topicName, subName, true); + stats = admin2.topics().getStats(topicName); + assertTrue(stats.getSubscriptions().get(subName).isReplicated()); + + // Make sure the replicated subscription is actually enabled + sentMessages.clear(); + receivedMessages.clear(); + + producer = client1.newProducer().topic(topicName).enableBatching(false).create(); + publishMessages(producer, 0, numMessages / 2, sentMessages); + producer.close(); + Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis()); + + consumer1 = client1.newConsumer().topic(topicName).subscriptionName(subName).subscribe(); + final int numReceivedMessages1 = readMessages(consumer1, receivedMessages, numMessages / 2, allowDuplicates); + consumer1.close(); + + producer = client1.newProducer().topic(topicName).enableBatching(false).create(); + publishMessages(producer, numMessages / 2, numMessages / 2, sentMessages); + producer.close(); + Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis()); + + consumer2 = client2.newConsumer().topic(topicName).subscriptionName(subName).subscribe(); + final int numReceivedMessages2 = readMessages(consumer2, receivedMessages, -1, allowDuplicates); + consumer2.close(); + + assertEquals(receivedMessages, sentMessages); + assertTrue(numReceivedMessages1 < numMessages, + String.format("numReceivedMessages1 (%d) should be less than %d", numReceivedMessages1, numMessages)); + assertTrue(numReceivedMessages2 < numMessages, + String.format("numReceivedMessages2 (%d) should be less than %d", numReceivedMessages2, numMessages)); + } + + @Test(timeOut = 30000) + public void testReplicatedSubscriptionRestApi2() throws Exception { + final String namespace = BrokerTestUtil.newUniqueName("pulsar/replicatedsubscription"); + final String topicName = "persistent://" + namespace + "/topic-rest-api2"; + final String subName = "sub"; + // Subscription replication produces duplicates, https://github.com/apache/pulsar/issues/10054 + // TODO: duplications shouldn't be allowed, change to "false" when fixing the issue + final boolean allowDuplicates = true; + + admin1.namespaces().createNamespace(namespace); + admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2")); + admin1.topics().createPartitionedTopic(topicName, 2); + + @Cleanup + final PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()) + .statsInterval(0, TimeUnit.SECONDS).build(); + + // Create subscription in r1 + createReplicatedSubscription(client1, topicName, subName, true); + + @Cleanup + final PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()) + .statsInterval(0, TimeUnit.SECONDS).build(); + + // Create subscription in r2 + createReplicatedSubscription(client2, topicName, subName, true); + + PartitionedTopicStats partitionedStats = admin1.topics().getPartitionedStats(topicName, true); + for (TopicStats stats : partitionedStats.getPartitions().values()) { + assertTrue(stats.getSubscriptions().get(subName).isReplicated()); + } + + // Disable replicated subscription in r1 + admin1.topics().setReplicatedSubscriptionStatus(topicName, subName, false); + partitionedStats = admin1.topics().getPartitionedStats(topicName, true); + for (TopicStats stats : partitionedStats.getPartitions().values()) { + assertFalse(stats.getSubscriptions().get(subName).isReplicated()); + } + + // Disable replicated subscription in r2 + admin2.topics().setReplicatedSubscriptionStatus(topicName, subName, false); + partitionedStats = admin2.topics().getPartitionedStats(topicName, true); + for (TopicStats stats : partitionedStats.getPartitions().values()) { + assertFalse(stats.getSubscriptions().get(subName).isReplicated()); + } + + // Make sure the replicated subscription is actually disabled + final int numMessages = 20; + final Set sentMessages = new LinkedHashSet<>(); + final Set receivedMessages = new LinkedHashSet<>(); + + Producer producer = client1.newProducer().topic(topicName).enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition).create(); + sentMessages.clear(); + publishMessages(producer, 0, numMessages, sentMessages); + producer.close(); + + Consumer consumer1 = client1.newConsumer().topic(topicName).subscriptionName(subName).subscribe(); + receivedMessages.clear(); + readMessages(consumer1, receivedMessages, numMessages, false); + assertEquals(receivedMessages, sentMessages); + consumer1.close(); + + Consumer consumer2 = client2.newConsumer().topic(topicName).subscriptionName(subName).subscribe(); + receivedMessages.clear(); + readMessages(consumer2, receivedMessages, numMessages, false); + assertEquals(receivedMessages, sentMessages); + consumer2.close(); + + // Enable replicated subscription in r1 + admin1.topics().setReplicatedSubscriptionStatus(topicName, subName, true); + partitionedStats = admin1.topics().getPartitionedStats(topicName, true); + for (TopicStats stats : partitionedStats.getPartitions().values()) { + assertTrue(stats.getSubscriptions().get(subName).isReplicated()); + } + + // Enable replicated subscription in r2 + admin2.topics().setReplicatedSubscriptionStatus(topicName, subName, true); + partitionedStats = admin2.topics().getPartitionedStats(topicName, true); + for (TopicStats stats : partitionedStats.getPartitions().values()) { + assertTrue(stats.getSubscriptions().get(subName).isReplicated()); + } + + // Make sure the replicated subscription is actually enabled + sentMessages.clear(); + receivedMessages.clear(); + + producer = client1.newProducer().topic(topicName).enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition).create(); + publishMessages(producer, 0, numMessages / 2, sentMessages); + producer.close(); + Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis()); + + consumer1 = client1.newConsumer().topic(topicName).subscriptionName(subName).subscribe(); + final int numReceivedMessages1 = readMessages(consumer1, receivedMessages, numMessages / 2, allowDuplicates); + consumer1.close(); + + producer = client1.newProducer().topic(topicName).enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition).create(); + publishMessages(producer, numMessages / 2, numMessages / 2, sentMessages); + producer.close(); + Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis()); + + consumer2 = client2.newConsumer().topic(topicName).subscriptionName(subName).subscribe(); + final int numReceivedMessages2 = readMessages(consumer2, receivedMessages, -1, allowDuplicates); + consumer2.close(); + + assertEquals(receivedMessages, sentMessages); + assertTrue(numReceivedMessages1 < numMessages, + String.format("numReceivedMessages1 (%d) should be less than %d", numReceivedMessages1, numMessages)); + assertTrue(numReceivedMessages2 < numMessages, + String.format("numReceivedMessages2 (%d) should be less than %d", numReceivedMessages2, numMessages)); + } + + void publishMessages(Producer producer, int startIndex, int numMessages, Set sentMessages) + throws PulsarClientException { + for (int i = startIndex; i < startIndex + numMessages; i++) { + final String msg = "msg" + i; + producer.send(msg.getBytes(StandardCharsets.UTF_8)); + sentMessages.add(msg); + } + } + + int readMessages(Consumer consumer, Set messages, int maxMessages, boolean allowDuplicates) throws PulsarClientException { int count = 0; while (count < maxMessages || maxMessages == -1) { @@ -242,10 +478,12 @@ void readMessages(Consumer consumer, Set messages, int maxMessag assertFalse(messages.contains(body), "Duplicate message '" + body + "' detected."); } messages.add(body); + consumer.acknowledge(message); } else { break; } } + return count; } void createReplicatedSubscription(PulsarClient pulsarClient, String topicName, String subscriptionName, diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java index 66ac8c456ec3b..9f7dc5e0deba0 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -3301,4 +3301,24 @@ CompletableFuture> examineMessageAsync(String topic, String init * @return a future that can be used to track when the topic is truncated */ CompletableFuture truncateAsync(String topic); + + /** + * Enable or disable a replicated subscription on a topic. + * + * @param topic + * @param subName + * @param enabled + * @throws PulsarAdminException + */ + void setReplicatedSubscriptionStatus(String topic, String subName, boolean enabled) throws PulsarAdminException; + + /** + * Enable or disable a replicated subscription on a topic asynchronously. + * + * @param topic + * @param subName + * @param enabled + * @return + */ + CompletableFuture setReplicatedSubscriptionStatusAsync(String topic, String subName, boolean enabled); } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 10fb9ad73ddc4..877448d1e835c 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -3494,7 +3494,29 @@ public CompletableFuture removeSubscribeRateAsync(String topic) { return asyncDeleteRequest(path); } + @Override + public void setReplicatedSubscriptionStatus(String topic, String subName, boolean enabled) + throws PulsarAdminException { + try { + setReplicatedSubscriptionStatusAsync(topic, subName, enabled).get(this.readTimeoutMs, + TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + @Override + public CompletableFuture setReplicatedSubscriptionStatusAsync(String topic, String subName, boolean enabled) { + TopicName topicName = validateTopic(topic); + String encodedSubName = Codec.encode(subName); + WebTarget path = topicPath(topicName, "subscription", encodedSubName, "replicatedSubscriptionStatus"); + return asyncPostRequest(path, Entity.entity(enabled, MediaType.APPLICATION_JSON)); + } private static final Logger log = LoggerFactory.getLogger(TopicsImpl.class); } diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 0d19d9ae8d4e1..47e075f0418e7 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -939,6 +939,9 @@ public void topics() throws Exception { cmdTopics.run(split("remove-subscribe-rate persistent://myprop/clust/ns1/ds1")); verify(mockTopics).removeSubscribeRate("persistent://myprop/clust/ns1/ds1"); + cmdTopics.run(split("set-replicated-subscription-status persistent://myprop/clust/ns1/ds1 -s sub1 -e")); + verify(mockTopics).setReplicatedSubscriptionStatus("persistent://myprop/clust/ns1/ds1", "sub1", true); + //cmd with option cannot be executed repeatedly. cmdTopics = new CmdTopics(() -> admin); cmdTopics.run(split("expire-messages persistent://myprop/clust/ns1/ds1 -s sub1 -p 1:1 -e")); @@ -1228,6 +1231,9 @@ public boolean matches(Long timestamp) { cmdTopics.run(split("remove-message-ttl persistent://myprop/clust/ns1/ds1")); verify(mockTopics).removeMessageTTL("persistent://myprop/clust/ns1/ds1"); + cmdTopics.run(split("set-replicated-subscription-status persistent://myprop/clust/ns1/ds1 -s sub1 -d")); + verify(mockTopics).setReplicatedSubscriptionStatus("persistent://myprop/clust/ns1/ds1", "sub1", false); + //cmd with option cannot be executed repeatedly. cmdTopics = new CmdTopics(() -> admin); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 6eacf743fa77e..da42686ffc15a 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -221,6 +221,8 @@ public CmdTopics(Supplier admin) { jcommander.addCommand("set-subscribe-rate", new SetSubscribeRate()); jcommander.addCommand("remove-subscribe-rate", new RemoveSubscribeRate()); + jcommander.addCommand("set-replicated-subscription-status", new SetReplicatedSubscriptionStatus()); + initDeprecatedCommands(); } @@ -2309,6 +2311,31 @@ void run() throws PulsarAdminException { } } + @Parameters(commandDescription = "Enable or disable a replicated subscription on a topic") + private class SetReplicatedSubscriptionStatus extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Parameter(names = { "-s", + "--subscription" }, description = "Subscription name to enable or disable replication", required = true) + private String subName; + + @Parameter(names = { "--enable", "-e" }, description = "Enable replication") + private boolean enable = false; + + @Parameter(names = { "--disable", "-d" }, description = "Disable replication") + private boolean disable = false; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + if (enable == disable) { + throw new ParameterException("Need to specify either --enable or --disable"); + } + getTopics().setReplicatedSubscriptionStatus(persistentTopic, subName, enable); + } + } + private Topics getTopics() { return getAdmin().topics(); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java index a336dbda13e94..a694729e137ca 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java @@ -50,4 +50,6 @@ public enum TopicOperation { GET_STATS, GET_METADATA, + + SET_REPLICATED_SUBSCRIPTION_STATUS, }