From e29b0a84438f8ca57bac0b7fe2cb1f027606d121 Mon Sep 17 00:00:00 2001 From: Masahiro Sakamoto Date: Thu, 27 May 2021 15:28:04 +0900 Subject: [PATCH 01/12] Add REST API to enable or disable replicated subscriptions --- .../PulsarAuthorizationProvider.java | 1 + .../admin/impl/PersistentTopicsBase.java | 122 ++++++++++++++++++ .../broker/admin/v1/PersistentTopics.java | 39 ++++++ .../broker/admin/v2/PersistentTopics.java | 37 ++++++ .../persistent/PersistentSubscription.java | 29 ++++- .../service/persistent/PersistentTopic.java | 2 +- .../broker/admin/PersistentTopicsTest.java | 82 ++++++++++++ .../service/ReplicatorSubscriptionTest.java | 91 +++++++++++++ .../apache/pulsar/client/admin/Topics.java | 20 +++ .../client/admin/internal/TopicsImpl.java | 22 ++++ .../pulsar/admin/cli/PulsarAdminToolTest.java | 6 + .../apache/pulsar/admin/cli/CmdTopics.java | 27 ++++ .../common/policies/data/TopicOperation.java | 2 + 13 files changed, 473 insertions(+), 7 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java index d5029713918db..df9546097b0cd 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java @@ -585,6 +585,7 @@ public CompletableFuture allowTopicOperationAsync(TopicName topicName, case EXPIRE_MESSAGES: case PEEK_MESSAGES: case RESET_CURSOR: + case SET_REPLICATED_SUBSCRIPTION_STATUS: isAuthorizedFuture = canConsumeAsync(topicName, role, authData, authData.getSubscription()); break; case TERMINATE: diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 240bef95b0727..aaf366f23e312 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -4040,4 +4040,126 @@ protected void internalTruncateTopic(AsyncResponse asyncResponse, boolean author }); } } + + protected void internalSetReplicatedSubscriptionStatus(AsyncResponse asyncResponse, String subName, + boolean authoritative, Boolean enabled) { + log.info("[{}] Attempting to change replicated subscription status to {} - {} {}", clientAppId(), enabled, + topicName, subName); + + if (enabled == null) { + asyncResponse.resume(new RestException(Status.BAD_REQUEST, "Boolean type request body is required")); + return; + } + + // Reject the request if the topic is not persistent + if (!topicName.isPersistent()) { + asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED, + "Cannot enable/disable replicated subscriptions on non-persistent topics")); + return; + } + + // Reject the request if the topic is not global + if (!topicName.isGlobal()) { + asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED, + "Cannot enable/disable replicated subscriptions on non-global topics")); + return; + } + + // Permission to consume this topic is required + try { + validateTopicOperation(topicName, TopicOperation.SET_REPLICATED_SUBSCRIPTION_STATUS, subName); + } catch (Exception e) { + resumeAsyncResponseExceptionally(asyncResponse, e); + return; + } + + // If the topic name is a partition name, no need to get partition topic metadata again + if (topicName.isPartitioned()) { + internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse, subName, authoritative, + enabled); + } else { + getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> { + if (partitionMetadata.partitions > 0) { + final List> futures = Lists.newArrayList(); + + for (int i = 0; i < partitionMetadata.partitions; i++) { + TopicName topicNamePartition = topicName.getPartition(i); + try { + futures.add(pulsar().getAdminClient().topics().setReplicatedSubscriptionStatusAsync( + topicNamePartition.toString(), subName, enabled)); + } catch (Exception e) { + log.warn("[{}] Failed to change replicated subscription status to {} - {} {}", + clientAppId(), enabled, topicNamePartition, subName, e); + resumeAsyncResponseExceptionally(asyncResponse, e); + return; + } + } + + FutureUtil.waitForAll(futures).handle((result, exception) -> { + if (exception != null) { + Throwable t = exception.getCause(); + if (t instanceof NotFoundException) { + asyncResponse + .resume(new RestException(Status.NOT_FOUND, "Topic or subscription not found")); + return null; + } else if (t instanceof PreconditionFailedException) { + asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, + "Cannot enable/disable replicated subscriptions on non-global topics")); + return null; + } else { + log.warn("[{}] Failed to change replicated subscription status to {} - {} {}", + clientAppId(), enabled, topicName, subName, t); + asyncResponse.resume(new RestException(t)); + return null; + } + } + + asyncResponse.resume(Response.noContent().build()); + return null; + }); + } else { + internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse, subName, authoritative, + enabled); + } + }).exceptionally(ex -> { + log.warn("[{}] Failed to change replicated subscription status to {} - {} {}", clientAppId(), enabled, + topicName, subName, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } + } + + private void internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(AsyncResponse asyncResponse, + String subName, boolean authoritative, Boolean enabled) { + try { + // Redirect the request to the appropriate broker if this broker is not the owner of the topic + validateTopicOwnership(topicName, authoritative); + + Topic topic = getTopicReference(topicName); + if (topic == null) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found")); + return; + } + + Subscription sub = topic.getSubscription(subName); + if (sub == null) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found")); + return; + } + + if (topic instanceof PersistentTopic && sub instanceof PersistentSubscription) { + ((PersistentSubscription) sub).setReplicated(enabled); + ((PersistentTopic) topic).checkReplicatedSubscriptionControllerState(); + log.info("[{}] Changed replicated subscription status to {} - {} {}", clientAppId(), enabled, topicName, + subName); + asyncResponse.resume(Response.noContent().build()); + } else { + asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED, + "Cannot enable/disable replicated subscriptions on non-persistent topics")); + } + } catch (Exception e) { + resumeAsyncResponseExceptionally(asyncResponse, e); + } + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java index 46708dd451cde..8515f4d34551f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java @@ -802,4 +802,43 @@ public void getLastMessageId( asyncResponse.resume(new RestException(e)); } } + + @POST + @Path("/{tenant}/{cluster}/{namespace}/{topic}/subscription/{subName}/replicatedSubscriptionStatus") + @ApiOperation(value = "Enable or disable a replicated subscription on a topic.") + @ApiResponses(value = { + @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), + @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or " + + "subscriber is not authorized to access this operation"), + @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Topic or subscription does not exist"), + @ApiResponse(code = 405, message = "Operation not allowed on this topic"), + @ApiResponse(code = 412, message = "Can't find owner for topic"), + @ApiResponse(code = 500, message = "Internal server error"), + @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")}) + public void setReplicatedSubscriptionStatus( + @Suspended final AsyncResponse asyncResponse, + @ApiParam(value = "Specify the tenant", required = true) + @PathParam("tenant") String tenant, + @ApiParam(value = "Specify the cluster", required = true) + @PathParam("cluster") String cluster, + @ApiParam(value = "Specify the namespace", required = true) + @PathParam("namespace") String namespace, + @ApiParam(value = "Specify topic name", required = true) + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Name of subscription", required = true) + @PathParam("subName") String encodedSubName, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @ApiParam(value = "Whether to enable replicated subscription", required = true) + Boolean enabled) { + try { + validateTopicName(tenant, cluster, namespace, encodedTopic); + internalSetReplicatedSubscriptionStatus(asyncResponse, decode(encodedSubName), authoritative, enabled); + } catch (WebApplicationException wae) { + asyncResponse.resume(wae); + } catch (Exception e) { + asyncResponse.resume(new RestException(e)); + } + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 97727575ddffc..7ab33794c7a2b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -3130,5 +3130,42 @@ public void truncateTopic( } + @POST + @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/replicatedSubscriptionStatus") + @ApiOperation(value = "Enable or disable a replicated subscription on a topic.") + @ApiResponses(value = { + @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), + @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or " + + "subscriber is not authorized to access this operation"), + @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Topic or subscription does not exist"), + @ApiResponse(code = 405, message = "Operation not allowed on this topic"), + @ApiResponse(code = 412, message = "Can't find owner for topic"), + @ApiResponse(code = 500, message = "Internal server error"), + @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")}) + public void setReplicatedSubscriptionStatus( + @Suspended final AsyncResponse asyncResponse, + @ApiParam(value = "Specify the tenant", required = true) + @PathParam("tenant") String tenant, + @ApiParam(value = "Specify the namespace", required = true) + @PathParam("namespace") String namespace, + @ApiParam(value = "Specify topic name", required = true) + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Name of subscription", required = true) + @PathParam("subName") String encodedSubName, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @ApiParam(value = "Whether to enable replicated subscription", required = true) + Boolean enabled) { + try { + validateTopicName(tenant, namespace, encodedTopic); + internalSetReplicatedSubscriptionStatus(asyncResponse, decode(encodedSubName), authoritative, enabled); + } catch (WebApplicationException wae) { + asyncResponse.resume(wae); + } catch (Exception e) { + asyncResponse.resume(new RestException(e)); + } + } + private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 159f39a438b40..a7d7b910d9fb9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -47,6 +47,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.lang3.tuple.MutablePair; +import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.intercept.BrokerInterceptor; import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException; @@ -177,12 +178,28 @@ public boolean isReplicated() { return replicatedSubscriptionSnapshotCache != null; } - void setReplicated(boolean replicated) { - this.replicatedSubscriptionSnapshotCache = replicated - ? new ReplicatedSubscriptionSnapshotCache(subName, - topic.getBrokerService().pulsar().getConfiguration() - .getReplicatedSubscriptionsSnapshotMaxCachedPerSubscription()) - : null; + public void setReplicated(boolean replicated) { + ServiceConfiguration config = topic.getBrokerService().pulsar().getConfiguration(); + + if (!config.isEnableReplicatedSubscriptions() || !replicated) { + this.replicatedSubscriptionSnapshotCache = null; + } else if (this.replicatedSubscriptionSnapshotCache == null) { + this.replicatedSubscriptionSnapshotCache = new ReplicatedSubscriptionSnapshotCache(subName, + config.getReplicatedSubscriptionsSnapshotMaxCachedPerSubscription()); + } + + if (this.cursor != null) { + Map properties = this.cursor.getProperties(); + try { + if (replicated) { + properties.put(REPLICATED_SUBSCRIPTION_PROPERTY, 1L); + } else { + properties.remove(REPLICATED_SUBSCRIPTION_PROPERTY); + } + } catch (UnsupportedOperationException e) { + // ManagedCursorImpl#lastMarkDeleteEntry has not been initialized yet + } + } } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index c893f8f315af6..ab325e9669222 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -2702,7 +2702,7 @@ public CompletableFuture addSchemaIfIdleOrCheckCompatible(SchemaData schem }); } - private synchronized void checkReplicatedSubscriptionControllerState() { + public synchronized void checkReplicatedSubscriptionControllerState() { AtomicBoolean shouldBeEnabled = new AtomicBoolean(false); subscriptions.forEach((name, subscription) -> { if (subscription.isReplicated()) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index 39a99d2784d69..a3fe507c391f8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -711,4 +711,86 @@ public void testOffloadWithNullMessageId() { } } + @Test + public void testSetReplicatedSubscriptionStatus() { + final String topicName = "topic-with-repl-sub"; + final String partitionName = topicName + "-partition-0"; + final String subName = "sub"; + + // 1) Return 404 if that the topic does not exist + AsyncResponse response = mock(AsyncResponse.class); + persistentTopics.setReplicatedSubscriptionStatus(response, testTenant, testNamespace, topicName, subName, true, + true); + ArgumentCaptor errorCaptor = ArgumentCaptor.forClass(RestException.class); + verify(response, timeout(5000).times(1)).resume(errorCaptor.capture()); + Assert.assertEquals(errorCaptor.getValue().getResponse().getStatus(), + Response.Status.NOT_FOUND.getStatusCode()); + + // 2) Return 404 if that the partitioned topic does not exist + response = mock(AsyncResponse.class); + persistentTopics.setReplicatedSubscriptionStatus(response, testTenant, testNamespace, partitionName, subName, + true, true); + errorCaptor = ArgumentCaptor.forClass(RestException.class); + verify(response, timeout(5000).times(1)).resume(errorCaptor.capture()); + Assert.assertEquals(errorCaptor.getValue().getResponse().getStatus(), + Response.Status.NOT_FOUND.getStatusCode()); + + // 3) Create the partitioned topic + response = mock(AsyncResponse.class); + ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class); + persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, topicName, 2); + verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); + + // 4) Create a subscription + response = mock(AsyncResponse.class); + persistentTopics.createSubscription(response, testTenant, testNamespace, topicName, subName, true, + (MessageIdImpl) MessageId.latest, false); + responseCaptor = ArgumentCaptor.forClass(Response.class); + verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); + + // 5) Enable replicated subscription on the partitioned topic + response = mock(AsyncResponse.class); + persistentTopics.setReplicatedSubscriptionStatus(response, testTenant, testNamespace, topicName, subName, true, + true); + verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); + + // 6) Disable replicated subscription on the partitioned topic + response = mock(AsyncResponse.class); + persistentTopics.setReplicatedSubscriptionStatus(response, testTenant, testNamespace, topicName, subName, true, + false); + verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); + + // 7) Enable replicated subscription on the partition + response = mock(AsyncResponse.class); + persistentTopics.setReplicatedSubscriptionStatus(response, testTenant, testNamespace, partitionName, subName, + true, true); + verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); + + // 8) Disable replicated subscription on the partition + response = mock(AsyncResponse.class); + persistentTopics.setReplicatedSubscriptionStatus(response, testTenant, testNamespace, partitionName, subName, + true, false); + verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); + + // 9) Delete the subscription + response = mock(AsyncResponse.class); + persistentTopics.deleteSubscription(response, testTenant, testNamespace, topicName, subName, false, true); + responseCaptor = ArgumentCaptor.forClass(Response.class); + verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); + + // 10) Delete the partitioned topic + response = mock(AsyncResponse.class); + persistentTopics.deletePartitionedTopic(response, testTenant, testNamespace, topicName, true, true, false); + responseCaptor = ArgumentCaptor.forClass(Response.class); + verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); + } + } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java index 6cb7c4db9155d..7ea251a71096d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java @@ -21,6 +21,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertTrue; import com.google.common.collect.Sets; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -40,6 +41,8 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.policies.data.PartitionedTopicStats; +import org.apache.pulsar.common.policies.data.TopicStats; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.AfterClass; @@ -230,6 +233,94 @@ public void testReplicationSnapshotStopWhenNoTraffic() throws Exception { assertNotEquals(rsc2.getLastCompletedSnapshotId().get(), snapshot2); } + @Test(timeOut = 30000) + public void testReplicatedSubscriptionRestApi1() throws Exception { + final String namespace = BrokerTestUtil.newUniqueName("pulsar/replicatedsubscription"); + final String topicName = "persistent://" + namespace + "/topic-rest-api1"; + final String subName = "sub"; + + admin1.namespaces().createNamespace(namespace); + admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2")); + + @Cleanup + final PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()) + .statsInterval(0, TimeUnit.SECONDS).build(); + + // Create subscription in r1 + createReplicatedSubscription(client1, topicName, subName, true); + + @Cleanup + final PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()) + .statsInterval(0, TimeUnit.SECONDS).build(); + + // Create subscription in r2 + createReplicatedSubscription(client2, topicName, subName, true); + + TopicStats stats = admin1.topics().getStats(topicName); + assertTrue(stats.subscriptions.get(subName).isReplicated); + + // Disable replicated subscription in r1 + admin1.topics().setReplicatedSubscriptionStatus(topicName, subName, false); + stats = admin1.topics().getStats(topicName); + assertFalse(stats.subscriptions.get(subName).isReplicated); + stats = admin2.topics().getStats(topicName); + assertTrue(stats.subscriptions.get(subName).isReplicated); + + // Disable replicated subscription in r2 + admin2.topics().setReplicatedSubscriptionStatus(topicName, subName, false); + stats = admin2.topics().getStats(topicName); + assertFalse(stats.subscriptions.get(subName).isReplicated); + + // Unload topic in r1 + admin1.topics().unload(topicName); + stats = admin1.topics().getStats(topicName); + assertFalse(stats.subscriptions.get(subName).isReplicated); + + // Enable replicated subscription in r1 + admin1.topics().setReplicatedSubscriptionStatus(topicName, subName, true); + stats = admin1.topics().getStats(topicName); + assertTrue(stats.subscriptions.get(subName).isReplicated); + stats = admin2.topics().getStats(topicName); + assertFalse(stats.subscriptions.get(subName).isReplicated); + } + + @Test(timeOut = 30000) + public void testReplicatedSubscriptionRestApi2() throws Exception { + final String namespace = BrokerTestUtil.newUniqueName("pulsar/replicatedsubscription"); + final String topicName = "persistent://" + namespace + "/topic-rest-api2"; + final String subName = "sub"; + + admin1.namespaces().createNamespace(namespace); + admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1")); + admin1.topics().createPartitionedTopic(topicName, 2); + + @Cleanup + final PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()) + .statsInterval(0, TimeUnit.SECONDS).build(); + + // Create subscription in r1 + createReplicatedSubscription(client1, topicName, subName, true); + + PartitionedTopicStats partitionedStats = admin1.topics().getPartitionedStats(topicName, true); + for (TopicStats stats : partitionedStats.partitions.values()) { + assertTrue(stats.subscriptions.get(subName).isReplicated); + } + + // Disable replicated subscription in r1 + admin1.topics().setReplicatedSubscriptionStatus(topicName, subName, false); + partitionedStats = admin1.topics().getPartitionedStats(topicName, true); + for (TopicStats stats : partitionedStats.partitions.values()) { + assertFalse(stats.subscriptions.get(subName).isReplicated); + } + + // Enable replicated subscription in r1 + admin1.topics().setReplicatedSubscriptionStatus(topicName, subName, true); + partitionedStats = admin1.topics().getPartitionedStats(topicName, true); + for (TopicStats stats : partitionedStats.partitions.values()) { + assertTrue(stats.subscriptions.get(subName).isReplicated); + } + } + void readMessages(Consumer consumer, Set messages, int maxMessages, boolean allowDuplicates) throws PulsarClientException { int count = 0; diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java index 66ac8c456ec3b..9f7dc5e0deba0 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -3301,4 +3301,24 @@ CompletableFuture> examineMessageAsync(String topic, String init * @return a future that can be used to track when the topic is truncated */ CompletableFuture truncateAsync(String topic); + + /** + * Enable or disable a replicated subscription on a topic. + * + * @param topic + * @param subName + * @param enabled + * @throws PulsarAdminException + */ + void setReplicatedSubscriptionStatus(String topic, String subName, boolean enabled) throws PulsarAdminException; + + /** + * Enable or disable a replicated subscription on a topic asynchronously. + * + * @param topic + * @param subName + * @param enabled + * @return + */ + CompletableFuture setReplicatedSubscriptionStatusAsync(String topic, String subName, boolean enabled); } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index c90ad15513af5..0678afbe34e9e 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -3481,7 +3481,29 @@ public CompletableFuture removeSubscribeRateAsync(String topic) { return asyncDeleteRequest(path); } + @Override + public void setReplicatedSubscriptionStatus(String topic, String subName, boolean enabled) + throws PulsarAdminException { + try { + setReplicatedSubscriptionStatusAsync(topic, subName, enabled).get(this.readTimeoutMs, + TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + @Override + public CompletableFuture setReplicatedSubscriptionStatusAsync(String topic, String subName, boolean enabled) { + TopicName topicName = validateTopic(topic); + String encodedSubName = Codec.encode(subName); + WebTarget path = topicPath(topicName, "subscription", encodedSubName, "replicatedSubscriptionStatus"); + return asyncPostRequest(path, Entity.entity(enabled, MediaType.APPLICATION_JSON)); + } private static final Logger log = LoggerFactory.getLogger(TopicsImpl.class); } diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 0d19d9ae8d4e1..47e075f0418e7 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -939,6 +939,9 @@ public void topics() throws Exception { cmdTopics.run(split("remove-subscribe-rate persistent://myprop/clust/ns1/ds1")); verify(mockTopics).removeSubscribeRate("persistent://myprop/clust/ns1/ds1"); + cmdTopics.run(split("set-replicated-subscription-status persistent://myprop/clust/ns1/ds1 -s sub1 -e")); + verify(mockTopics).setReplicatedSubscriptionStatus("persistent://myprop/clust/ns1/ds1", "sub1", true); + //cmd with option cannot be executed repeatedly. cmdTopics = new CmdTopics(() -> admin); cmdTopics.run(split("expire-messages persistent://myprop/clust/ns1/ds1 -s sub1 -p 1:1 -e")); @@ -1228,6 +1231,9 @@ public boolean matches(Long timestamp) { cmdTopics.run(split("remove-message-ttl persistent://myprop/clust/ns1/ds1")); verify(mockTopics).removeMessageTTL("persistent://myprop/clust/ns1/ds1"); + cmdTopics.run(split("set-replicated-subscription-status persistent://myprop/clust/ns1/ds1 -s sub1 -d")); + verify(mockTopics).setReplicatedSubscriptionStatus("persistent://myprop/clust/ns1/ds1", "sub1", false); + //cmd with option cannot be executed repeatedly. cmdTopics = new CmdTopics(() -> admin); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 64fe0178a5f5b..d3e4c2550c6fb 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -221,6 +221,8 @@ public CmdTopics(Supplier admin) { jcommander.addCommand("set-subscribe-rate", new SetSubscribeRate()); jcommander.addCommand("remove-subscribe-rate", new RemoveSubscribeRate()); + jcommander.addCommand("set-replicated-subscription-status", new SetReplicatedSubscriptionStatus()); + initDeprecatedCommands(); } @@ -2298,6 +2300,31 @@ void run() throws PulsarAdminException { } } + @Parameters(commandDescription = "Enable or disable a replicated subscription on a topic") + private class SetReplicatedSubscriptionStatus extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Parameter(names = { "-s", + "--subscription" }, description = "Subscription name to enable or disable replication", required = true) + private String subName; + + @Parameter(names = { "--enable", "-e" }, description = "Enable replication") + private boolean enable = false; + + @Parameter(names = { "--disable", "-d" }, description = "Disable replication") + private boolean disable = false; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + if (enable == disable) { + throw new ParameterException("Need to specify either --enable or --disable"); + } + getTopics().setReplicatedSubscriptionStatus(persistentTopic, subName, enable); + } + } + private Topics getTopics() { return getAdmin().topics(); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java index a336dbda13e94..a694729e137ca 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java @@ -50,4 +50,6 @@ public enum TopicOperation { GET_STATS, GET_METADATA, + + SET_REPLICATED_SUBSCRIPTION_STATUS, } From 2db1eee08fa0b170f08ac02f2bc76b10c7be3221 Mon Sep 17 00:00:00 2001 From: Masahiro Sakamoto Date: Thu, 3 Jun 2021 13:57:16 +0900 Subject: [PATCH 02/12] Increase timeout values for testing --- .../broker/admin/PersistentTopicsTest.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index a3fe507c391f8..c01b8fe33ae82 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -722,7 +722,7 @@ public void testSetReplicatedSubscriptionStatus() { persistentTopics.setReplicatedSubscriptionStatus(response, testTenant, testNamespace, topicName, subName, true, true); ArgumentCaptor errorCaptor = ArgumentCaptor.forClass(RestException.class); - verify(response, timeout(5000).times(1)).resume(errorCaptor.capture()); + verify(response, timeout(10000).times(1)).resume(errorCaptor.capture()); Assert.assertEquals(errorCaptor.getValue().getResponse().getStatus(), Response.Status.NOT_FOUND.getStatusCode()); @@ -731,7 +731,7 @@ public void testSetReplicatedSubscriptionStatus() { persistentTopics.setReplicatedSubscriptionStatus(response, testTenant, testNamespace, partitionName, subName, true, true); errorCaptor = ArgumentCaptor.forClass(RestException.class); - verify(response, timeout(5000).times(1)).resume(errorCaptor.capture()); + verify(response, timeout(10000).times(1)).resume(errorCaptor.capture()); Assert.assertEquals(errorCaptor.getValue().getResponse().getStatus(), Response.Status.NOT_FOUND.getStatusCode()); @@ -739,7 +739,7 @@ public void testSetReplicatedSubscriptionStatus() { response = mock(AsyncResponse.class); ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class); persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, topicName, 2); - verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + verify(response, timeout(10000).times(1)).resume(responseCaptor.capture()); Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); // 4) Create a subscription @@ -747,49 +747,49 @@ public void testSetReplicatedSubscriptionStatus() { persistentTopics.createSubscription(response, testTenant, testNamespace, topicName, subName, true, (MessageIdImpl) MessageId.latest, false); responseCaptor = ArgumentCaptor.forClass(Response.class); - verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + verify(response, timeout(10000).times(1)).resume(responseCaptor.capture()); Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); // 5) Enable replicated subscription on the partitioned topic response = mock(AsyncResponse.class); persistentTopics.setReplicatedSubscriptionStatus(response, testTenant, testNamespace, topicName, subName, true, true); - verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + verify(response, timeout(10000).times(1)).resume(responseCaptor.capture()); Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); // 6) Disable replicated subscription on the partitioned topic response = mock(AsyncResponse.class); persistentTopics.setReplicatedSubscriptionStatus(response, testTenant, testNamespace, topicName, subName, true, false); - verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + verify(response, timeout(10000).times(1)).resume(responseCaptor.capture()); Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); // 7) Enable replicated subscription on the partition response = mock(AsyncResponse.class); persistentTopics.setReplicatedSubscriptionStatus(response, testTenant, testNamespace, partitionName, subName, true, true); - verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + verify(response, timeout(10000).times(1)).resume(responseCaptor.capture()); Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); // 8) Disable replicated subscription on the partition response = mock(AsyncResponse.class); persistentTopics.setReplicatedSubscriptionStatus(response, testTenant, testNamespace, partitionName, subName, true, false); - verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + verify(response, timeout(10000).times(1)).resume(responseCaptor.capture()); Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); // 9) Delete the subscription response = mock(AsyncResponse.class); persistentTopics.deleteSubscription(response, testTenant, testNamespace, topicName, subName, false, true); responseCaptor = ArgumentCaptor.forClass(Response.class); - verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + verify(response, timeout(10000).times(1)).resume(responseCaptor.capture()); Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); // 10) Delete the partitioned topic response = mock(AsyncResponse.class); persistentTopics.deletePartitionedTopic(response, testTenant, testNamespace, topicName, true, true, false); responseCaptor = ArgumentCaptor.forClass(Response.class); - verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + verify(response, timeout(10000).times(1)).resume(responseCaptor.capture()); Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); } From bb69be9e5c0640441b318dd06e7f035477b91165 Mon Sep 17 00:00:00 2001 From: Masahiro Sakamoto Date: Thu, 3 Jun 2021 14:39:41 +0900 Subject: [PATCH 03/12] Redirect the request to the peer-cluster if the local cluster is not included in the replication clusters --- .../pulsar/broker/admin/impl/PersistentTopicsBase.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index aaf366f23e312..3fe450577176b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -4073,6 +4073,14 @@ protected void internalSetReplicatedSubscriptionStatus(AsyncResponse asyncRespon return; } + // Redirect the request to the peer-cluster if the local cluster is not included in the replication clusters + try { + validateGlobalNamespaceOwnership(namespaceName); + } catch (Exception e) { + resumeAsyncResponseExceptionally(asyncResponse, e); + return; + } + // If the topic name is a partition name, no need to get partition topic metadata again if (topicName.isPartitioned()) { internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse, subName, authoritative, From 7739e705684116755bf836f64dd212886d43f674 Mon Sep 17 00:00:00 2001 From: Masahiro Sakamoto Date: Thu, 3 Jun 2021 15:58:58 +0900 Subject: [PATCH 04/12] Change the argument type from Boolean to boolean --- .../pulsar/broker/admin/impl/PersistentTopicsBase.java | 9 ++------- .../apache/pulsar/broker/admin/v1/PersistentTopics.java | 2 +- .../apache/pulsar/broker/admin/v2/PersistentTopics.java | 2 +- 3 files changed, 4 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 3fe450577176b..4daabc676af27 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -4042,15 +4042,10 @@ protected void internalTruncateTopic(AsyncResponse asyncResponse, boolean author } protected void internalSetReplicatedSubscriptionStatus(AsyncResponse asyncResponse, String subName, - boolean authoritative, Boolean enabled) { + boolean authoritative, boolean enabled) { log.info("[{}] Attempting to change replicated subscription status to {} - {} {}", clientAppId(), enabled, topicName, subName); - if (enabled == null) { - asyncResponse.resume(new RestException(Status.BAD_REQUEST, "Boolean type request body is required")); - return; - } - // Reject the request if the topic is not persistent if (!topicName.isPersistent()) { asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED, @@ -4139,7 +4134,7 @@ protected void internalSetReplicatedSubscriptionStatus(AsyncResponse asyncRespon } private void internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(AsyncResponse asyncResponse, - String subName, boolean authoritative, Boolean enabled) { + String subName, boolean authoritative, boolean enabled) { try { // Redirect the request to the appropriate broker if this broker is not the owner of the topic validateTopicOwnership(topicName, authoritative); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java index 8515f4d34551f..bda43e77d03bf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java @@ -831,7 +831,7 @@ public void setReplicatedSubscriptionStatus( @ApiParam(value = "Is authentication required to perform this operation") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, @ApiParam(value = "Whether to enable replicated subscription", required = true) - Boolean enabled) { + boolean enabled) { try { validateTopicName(tenant, cluster, namespace, encodedTopic); internalSetReplicatedSubscriptionStatus(asyncResponse, decode(encodedSubName), authoritative, enabled); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 7ab33794c7a2b..2d27c0fed57fa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -3156,7 +3156,7 @@ public void setReplicatedSubscriptionStatus( @ApiParam(value = "Is authentication required to perform this operation") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, @ApiParam(value = "Whether to enable replicated subscription", required = true) - Boolean enabled) { + boolean enabled) { try { validateTopicName(tenant, namespace, encodedTopic); internalSetReplicatedSubscriptionStatus(asyncResponse, decode(encodedSubName), authoritative, enabled); From 4eb1fe2923350e8de74dfd4bc1f2d4c2921fc8b8 Mon Sep 17 00:00:00 2001 From: Masahiro Sakamoto Date: Fri, 4 Jun 2021 14:41:35 +0900 Subject: [PATCH 05/12] Modify code to pass tests --- .../broker/service/persistent/PersistentSubscription.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index a7d7b910d9fb9..2e78803b5b451 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -179,9 +179,9 @@ public boolean isReplicated() { } public void setReplicated(boolean replicated) { - ServiceConfiguration config = topic.getBrokerService().pulsar().getConfiguration(); + ServiceConfiguration config = topic.getBrokerService().getPulsar().getConfig(); - if (!config.isEnableReplicatedSubscriptions() || !replicated) { + if (!replicated || !config.isEnableReplicatedSubscriptions()) { this.replicatedSubscriptionSnapshotCache = null; } else if (this.replicatedSubscriptionSnapshotCache == null) { this.replicatedSubscriptionSnapshotCache = new ReplicatedSubscriptionSnapshotCache(subName, From b878caff54b31736212d996fc956215a3cd38170 Mon Sep 17 00:00:00 2001 From: Masahiro Sakamoto Date: Fri, 4 Jun 2021 16:47:51 +0900 Subject: [PATCH 06/12] Add methods for adding and removing properties to ManagedCursor --- .../bookkeeper/mledger/ManagedCursor.java | 14 ++++++++++ .../mledger/impl/ManagedCursorImpl.java | 26 +++++++++++++++++++ .../impl/ManagedCursorContainerTest.java | 10 +++++++ .../persistent/PersistentSubscription.java | 13 +++------- 4 files changed, 54 insertions(+), 9 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java index 03419dee14d8b..b234d33e24ee4 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java @@ -81,6 +81,20 @@ enum IndividualDeletedEntries { */ Map getProperties(); + /** + * If the last stored position exists and its properties map is mutable, add a property to it. + * + * @return true if the property was added successfully, false otherwise + */ + boolean putPropertyIfPossible(String key, Long value); + + /** + * If the last stored position exists and its properties map is mutable, remove a property from it. + * + * @return true if the property was removed successfully, false otherwise + */ + boolean removePropertyIfPossible(String key); + /** * Read entries from the ManagedLedger, up to the specified number. The returned list can be smaller. * diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 1c120a5e68c9c..af5f2cef1d7fa 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -285,6 +285,32 @@ public Map getProperties() { return lastMarkDeleteEntry != null ? lastMarkDeleteEntry.properties : Collections.emptyMap(); } + @Override + public boolean putPropertyIfPossible(String key, Long value) { + if (lastMarkDeleteEntry != null) { + try { + lastMarkDeleteEntry.properties.put(key, value); + return true; + } catch (UnsupportedOperationException e) { + // lastMarkDeleteEntry.properties can be Collections.emptyMap(), i.e. an immutable object + } + } + return false; + } + + @Override + public boolean removePropertyIfPossible(String key) { + if (lastMarkDeleteEntry != null) { + try { + lastMarkDeleteEntry.properties.remove(key); + return true; + } catch (UnsupportedOperationException e) { + // lastMarkDeleteEntry.properties can be Collections.emptyMap(), i.e. an immutable object + } + } + return false; + } + /** * Performs the initial recovery, reading the mark-deleted position from the ledger and then calling initialize to * have a new opened ledger. diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java index 549027827efdd..cce36ab37362c 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java @@ -66,6 +66,16 @@ public Map getProperties() { return Collections.emptyMap(); } + @Override + public boolean putPropertyIfPossible(String key, Long value) { + return false; + } + + @Override + public boolean removePropertyIfPossible(String key) { + return false; + } + @Override public boolean isDurable() { return true; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 2e78803b5b451..ec86491d18061 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -189,15 +189,10 @@ public void setReplicated(boolean replicated) { } if (this.cursor != null) { - Map properties = this.cursor.getProperties(); - try { - if (replicated) { - properties.put(REPLICATED_SUBSCRIPTION_PROPERTY, 1L); - } else { - properties.remove(REPLICATED_SUBSCRIPTION_PROPERTY); - } - } catch (UnsupportedOperationException e) { - // ManagedCursorImpl#lastMarkDeleteEntry has not been initialized yet + if (replicated) { + this.cursor.putPropertyIfPossible(REPLICATED_SUBSCRIPTION_PROPERTY, 1L); + } else { + this.cursor.removePropertyIfPossible(REPLICATED_SUBSCRIPTION_PROPERTY); } } } From b5ee9dba9a22a84756d401d52d723b18fcf42cd4 Mon Sep 17 00:00:00 2001 From: Masahiro Sakamoto Date: Sat, 5 Jun 2021 00:22:46 +0900 Subject: [PATCH 07/12] Test that replicated subscription is actually working --- .../service/ReplicatorSubscriptionTest.java | 151 +++++++++++++++++- 1 file changed, 149 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java index 7ea251a71096d..68428991b495b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java @@ -238,6 +238,9 @@ public void testReplicatedSubscriptionRestApi1() throws Exception { final String namespace = BrokerTestUtil.newUniqueName("pulsar/replicatedsubscription"); final String topicName = "persistent://" + namespace + "/topic-rest-api1"; final String subName = "sub"; + // Subscription replication produces duplicates, https://github.com/apache/pulsar/issues/10054 + // TODO: duplications shouldn't be allowed, change to "false" when fixing the issue + final boolean allowDuplicates = true; admin1.namespaces().createNamespace(namespace); admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2")); @@ -273,15 +276,71 @@ public void testReplicatedSubscriptionRestApi1() throws Exception { // Unload topic in r1 admin1.topics().unload(topicName); + Thread.sleep(1000); stats = admin1.topics().getStats(topicName); assertFalse(stats.subscriptions.get(subName).isReplicated); + // Make sure the replicated subscription is actually disabled + final int numMessages = 20; + final Set sentMessages = new LinkedHashSet<>(); + final Set receivedMessages = new LinkedHashSet<>(); + + Producer producer = client1.newProducer().topic(topicName).enableBatching(false).create(); + sentMessages.clear(); + publishMessages(producer, 0, numMessages, sentMessages); + producer.close(); + + Consumer consumer1 = client1.newConsumer().topic(topicName).subscriptionName(subName).subscribe(); + receivedMessages.clear(); + readMessages(consumer1, receivedMessages, numMessages, false); + assertEquals(receivedMessages, sentMessages); + consumer1.close(); + + Consumer consumer2 = client2.newConsumer().topic(topicName).subscriptionName(subName).subscribe(); + receivedMessages.clear(); + readMessages(consumer2, receivedMessages, numMessages, false); + assertEquals(receivedMessages, sentMessages); + consumer2.close(); + // Enable replicated subscription in r1 admin1.topics().setReplicatedSubscriptionStatus(topicName, subName, true); stats = admin1.topics().getStats(topicName); assertTrue(stats.subscriptions.get(subName).isReplicated); stats = admin2.topics().getStats(topicName); assertFalse(stats.subscriptions.get(subName).isReplicated); + + // Enable replicated subscription in r2 + admin2.topics().setReplicatedSubscriptionStatus(topicName, subName, true); + stats = admin2.topics().getStats(topicName); + assertTrue(stats.subscriptions.get(subName).isReplicated); + + // Make sure the replicated subscription is actually enabled + sentMessages.clear(); + receivedMessages.clear(); + + producer = client1.newProducer().topic(topicName).enableBatching(false).create(); + publishMessages(producer, 0, numMessages / 2, sentMessages); + producer.close(); + Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis()); + + consumer1 = client1.newConsumer().topic(topicName).subscriptionName(subName).subscribe(); + final int numReceivedMessages1 = readMessages(consumer1, receivedMessages, numMessages / 2, allowDuplicates); + consumer1.close(); + + producer = client1.newProducer().topic(topicName).enableBatching(false).create(); + publishMessages(producer, numMessages / 2, numMessages / 2, sentMessages); + producer.close(); + Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis()); + + consumer2 = client2.newConsumer().topic(topicName).subscriptionName(subName).subscribe(); + final int numReceivedMessages2 = readMessages(consumer2, receivedMessages, -1, allowDuplicates); + consumer2.close(); + + assertEquals(receivedMessages, sentMessages); + assertTrue(numReceivedMessages1 < numMessages, + String.format("numReceivedMessages1 (%d) should be less than %d", numReceivedMessages1, numMessages)); + assertTrue(numReceivedMessages2 < numMessages, + String.format("numReceivedMessages2 (%d) should be less than %d", numReceivedMessages2, numMessages)); } @Test(timeOut = 30000) @@ -289,9 +348,12 @@ public void testReplicatedSubscriptionRestApi2() throws Exception { final String namespace = BrokerTestUtil.newUniqueName("pulsar/replicatedsubscription"); final String topicName = "persistent://" + namespace + "/topic-rest-api2"; final String subName = "sub"; + // Subscription replication produces duplicates, https://github.com/apache/pulsar/issues/10054 + // TODO: duplications shouldn't be allowed, change to "false" when fixing the issue + final boolean allowDuplicates = true; admin1.namespaces().createNamespace(namespace); - admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1")); + admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2")); admin1.topics().createPartitionedTopic(topicName, 2); @Cleanup @@ -301,6 +363,13 @@ public void testReplicatedSubscriptionRestApi2() throws Exception { // Create subscription in r1 createReplicatedSubscription(client1, topicName, subName, true); + @Cleanup + final PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()) + .statsInterval(0, TimeUnit.SECONDS).build(); + + // Create subscription in r2 + createReplicatedSubscription(client2, topicName, subName, true); + PartitionedTopicStats partitionedStats = admin1.topics().getPartitionedStats(topicName, true); for (TopicStats stats : partitionedStats.partitions.values()) { assertTrue(stats.subscriptions.get(subName).isReplicated); @@ -313,15 +382,91 @@ public void testReplicatedSubscriptionRestApi2() throws Exception { assertFalse(stats.subscriptions.get(subName).isReplicated); } + // Disable replicated subscription in r2 + admin2.topics().setReplicatedSubscriptionStatus(topicName, subName, false); + partitionedStats = admin2.topics().getPartitionedStats(topicName, true); + for (TopicStats stats : partitionedStats.partitions.values()) { + assertFalse(stats.subscriptions.get(subName).isReplicated); + } + + // Make sure the replicated subscription is actually disabled + final int numMessages = 20; + final Set sentMessages = new LinkedHashSet<>(); + final Set receivedMessages = new LinkedHashSet<>(); + + Producer producer = client1.newProducer().topic(topicName).enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition).create(); + sentMessages.clear(); + publishMessages(producer, 0, numMessages, sentMessages); + producer.close(); + + Consumer consumer1 = client1.newConsumer().topic(topicName).subscriptionName(subName).subscribe(); + receivedMessages.clear(); + readMessages(consumer1, receivedMessages, numMessages, false); + assertEquals(receivedMessages, sentMessages); + consumer1.close(); + + Consumer consumer2 = client2.newConsumer().topic(topicName).subscriptionName(subName).subscribe(); + receivedMessages.clear(); + readMessages(consumer2, receivedMessages, numMessages, false); + assertEquals(receivedMessages, sentMessages); + consumer2.close(); + // Enable replicated subscription in r1 admin1.topics().setReplicatedSubscriptionStatus(topicName, subName, true); partitionedStats = admin1.topics().getPartitionedStats(topicName, true); for (TopicStats stats : partitionedStats.partitions.values()) { assertTrue(stats.subscriptions.get(subName).isReplicated); } + + // Enable replicated subscription in r2 + admin2.topics().setReplicatedSubscriptionStatus(topicName, subName, true); + partitionedStats = admin2.topics().getPartitionedStats(topicName, true); + for (TopicStats stats : partitionedStats.partitions.values()) { + assertTrue(stats.subscriptions.get(subName).isReplicated); + } + + // Make sure the replicated subscription is actually enabled + sentMessages.clear(); + receivedMessages.clear(); + + producer = client1.newProducer().topic(topicName).enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition).create(); + publishMessages(producer, 0, numMessages / 2, sentMessages); + producer.close(); + Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis()); + + consumer1 = client1.newConsumer().topic(topicName).subscriptionName(subName).subscribe(); + final int numReceivedMessages1 = readMessages(consumer1, receivedMessages, numMessages / 2, allowDuplicates); + consumer1.close(); + + producer = client1.newProducer().topic(topicName).enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition).create(); + publishMessages(producer, numMessages / 2, numMessages / 2, sentMessages); + producer.close(); + Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis()); + + consumer2 = client2.newConsumer().topic(topicName).subscriptionName(subName).subscribe(); + final int numReceivedMessages2 = readMessages(consumer2, receivedMessages, -1, allowDuplicates); + consumer2.close(); + + assertEquals(receivedMessages, sentMessages); + assertTrue(numReceivedMessages1 < numMessages, + String.format("numReceivedMessages1 (%d) should be less than %d", numReceivedMessages1, numMessages)); + assertTrue(numReceivedMessages2 < numMessages, + String.format("numReceivedMessages2 (%d) should be less than %d", numReceivedMessages2, numMessages)); + } + + void publishMessages(Producer producer, int startIndex, int numMessages, Set sentMessages) + throws PulsarClientException { + for (int i = startIndex; i < startIndex + numMessages; i++) { + final String msg = "msg" + i; + producer.send(msg.getBytes(StandardCharsets.UTF_8)); + sentMessages.add(msg); + } } - void readMessages(Consumer consumer, Set messages, int maxMessages, boolean allowDuplicates) + int readMessages(Consumer consumer, Set messages, int maxMessages, boolean allowDuplicates) throws PulsarClientException { int count = 0; while (count < maxMessages || maxMessages == -1) { @@ -333,10 +478,12 @@ void readMessages(Consumer consumer, Set messages, int maxMessag assertFalse(messages.contains(body), "Duplicate message '" + body + "' detected."); } messages.add(body); + consumer.acknowledge(message); } else { break; } } + return count; } void createReplicatedSubscription(PulsarClient pulsarClient, String topicName, String subscriptionName, From 97990b7afc83640185869bf5e2e12efdde130a71 Mon Sep 17 00:00:00 2001 From: Masahiro Sakamoto Date: Wed, 9 Jun 2021 14:19:36 +0900 Subject: [PATCH 08/12] Fix compilation error --- .../service/ReplicatorSubscriptionTest.java | 36 +++++++++---------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java index 68428991b495b..87cc281ba5c57 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java @@ -260,25 +260,25 @@ public void testReplicatedSubscriptionRestApi1() throws Exception { createReplicatedSubscription(client2, topicName, subName, true); TopicStats stats = admin1.topics().getStats(topicName); - assertTrue(stats.subscriptions.get(subName).isReplicated); + assertTrue(stats.getSubscriptions().get(subName).isReplicated()); // Disable replicated subscription in r1 admin1.topics().setReplicatedSubscriptionStatus(topicName, subName, false); stats = admin1.topics().getStats(topicName); - assertFalse(stats.subscriptions.get(subName).isReplicated); + assertFalse(stats.getSubscriptions().get(subName).isReplicated()); stats = admin2.topics().getStats(topicName); - assertTrue(stats.subscriptions.get(subName).isReplicated); + assertTrue(stats.getSubscriptions().get(subName).isReplicated()); // Disable replicated subscription in r2 admin2.topics().setReplicatedSubscriptionStatus(topicName, subName, false); stats = admin2.topics().getStats(topicName); - assertFalse(stats.subscriptions.get(subName).isReplicated); + assertFalse(stats.getSubscriptions().get(subName).isReplicated()); // Unload topic in r1 admin1.topics().unload(topicName); Thread.sleep(1000); stats = admin1.topics().getStats(topicName); - assertFalse(stats.subscriptions.get(subName).isReplicated); + assertFalse(stats.getSubscriptions().get(subName).isReplicated()); // Make sure the replicated subscription is actually disabled final int numMessages = 20; @@ -305,14 +305,14 @@ public void testReplicatedSubscriptionRestApi1() throws Exception { // Enable replicated subscription in r1 admin1.topics().setReplicatedSubscriptionStatus(topicName, subName, true); stats = admin1.topics().getStats(topicName); - assertTrue(stats.subscriptions.get(subName).isReplicated); + assertTrue(stats.getSubscriptions().get(subName).isReplicated()); stats = admin2.topics().getStats(topicName); - assertFalse(stats.subscriptions.get(subName).isReplicated); + assertFalse(stats.getSubscriptions().get(subName).isReplicated()); // Enable replicated subscription in r2 admin2.topics().setReplicatedSubscriptionStatus(topicName, subName, true); stats = admin2.topics().getStats(topicName); - assertTrue(stats.subscriptions.get(subName).isReplicated); + assertTrue(stats.getSubscriptions().get(subName).isReplicated()); // Make sure the replicated subscription is actually enabled sentMessages.clear(); @@ -371,22 +371,22 @@ public void testReplicatedSubscriptionRestApi2() throws Exception { createReplicatedSubscription(client2, topicName, subName, true); PartitionedTopicStats partitionedStats = admin1.topics().getPartitionedStats(topicName, true); - for (TopicStats stats : partitionedStats.partitions.values()) { - assertTrue(stats.subscriptions.get(subName).isReplicated); + for (TopicStats stats : partitionedStats.getPartitions().values()) { + assertTrue(stats.getSubscriptions().get(subName).isReplicated()); } // Disable replicated subscription in r1 admin1.topics().setReplicatedSubscriptionStatus(topicName, subName, false); partitionedStats = admin1.topics().getPartitionedStats(topicName, true); - for (TopicStats stats : partitionedStats.partitions.values()) { - assertFalse(stats.subscriptions.get(subName).isReplicated); + for (TopicStats stats : partitionedStats.getPartitions().values()) { + assertFalse(stats.getSubscriptions().get(subName).isReplicated()); } // Disable replicated subscription in r2 admin2.topics().setReplicatedSubscriptionStatus(topicName, subName, false); partitionedStats = admin2.topics().getPartitionedStats(topicName, true); - for (TopicStats stats : partitionedStats.partitions.values()) { - assertFalse(stats.subscriptions.get(subName).isReplicated); + for (TopicStats stats : partitionedStats.getPartitions().values()) { + assertFalse(stats.getSubscriptions().get(subName).isReplicated()); } // Make sure the replicated subscription is actually disabled @@ -415,15 +415,15 @@ public void testReplicatedSubscriptionRestApi2() throws Exception { // Enable replicated subscription in r1 admin1.topics().setReplicatedSubscriptionStatus(topicName, subName, true); partitionedStats = admin1.topics().getPartitionedStats(topicName, true); - for (TopicStats stats : partitionedStats.partitions.values()) { - assertTrue(stats.subscriptions.get(subName).isReplicated); + for (TopicStats stats : partitionedStats.getPartitions().values()) { + assertTrue(stats.getSubscriptions().get(subName).isReplicated()); } // Enable replicated subscription in r2 admin2.topics().setReplicatedSubscriptionStatus(topicName, subName, true); partitionedStats = admin2.topics().getPartitionedStats(topicName, true); - for (TopicStats stats : partitionedStats.partitions.values()) { - assertTrue(stats.subscriptions.get(subName).isReplicated); + for (TopicStats stats : partitionedStats.getPartitions().values()) { + assertTrue(stats.getSubscriptions().get(subName).isReplicated()); } // Make sure the replicated subscription is actually enabled From a8b6f1f1d622b757e49afa760788dee126da32a4 Mon Sep 17 00:00:00 2001 From: Masahiro Sakamoto Date: Wed, 9 Jun 2021 14:20:43 +0900 Subject: [PATCH 09/12] Fix methods for adding and removing property of ManagedCursor --- .../bookkeeper/mledger/ManagedCursor.java | 12 +++------ .../mledger/impl/ManagedCursorImpl.java | 26 +++++++++++-------- .../impl/ManagedCursorContainerTest.java | 4 +-- .../persistent/PersistentSubscription.java | 4 +-- 4 files changed, 23 insertions(+), 23 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java index b234d33e24ee4..4af64550fe53b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java @@ -82,18 +82,14 @@ enum IndividualDeletedEntries { Map getProperties(); /** - * If the last stored position exists and its properties map is mutable, add a property to it. - * - * @return true if the property was added successfully, false otherwise + * Add a property associated with the last stored position. */ - boolean putPropertyIfPossible(String key, Long value); + boolean putProperty(String key, Long value); /** - * If the last stored position exists and its properties map is mutable, remove a property from it. - * - * @return true if the property was removed successfully, false otherwise + * Remove a property associated with the last stored position. */ - boolean removePropertyIfPossible(String key); + boolean removeProperty(String key); /** * Read entries from the ManagedLedger, up to the specified number. The returned list can be smaller. diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index af5f2cef1d7fa..30a0e710091b3 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -286,26 +286,30 @@ public Map getProperties() { } @Override - public boolean putPropertyIfPossible(String key, Long value) { + public boolean putProperty(String key, Long value) { if (lastMarkDeleteEntry != null) { - try { - lastMarkDeleteEntry.properties.put(key, value); - return true; - } catch (UnsupportedOperationException e) { - // lastMarkDeleteEntry.properties can be Collections.emptyMap(), i.e. an immutable object + MarkDeleteEntry currentLastMarkDeleteEntry = lastMarkDeleteEntry; + Map properties = currentLastMarkDeleteEntry.properties; + if (properties == null || properties.isEmpty()) { + Map newProperties = Maps.newHashMap(); + newProperties.put(key, value); + lastMarkDeleteEntry = new MarkDeleteEntry(currentLastMarkDeleteEntry.newPosition, newProperties, + currentLastMarkDeleteEntry.callback, currentLastMarkDeleteEntry.ctx); + } else { + properties.put(key, value); } + return true; } return false; } @Override - public boolean removePropertyIfPossible(String key) { + public boolean removeProperty(String key) { if (lastMarkDeleteEntry != null) { - try { - lastMarkDeleteEntry.properties.remove(key); + Map properties = lastMarkDeleteEntry.properties; + if (properties != null && properties.containsKey(key)) { + properties.remove(key); return true; - } catch (UnsupportedOperationException e) { - // lastMarkDeleteEntry.properties can be Collections.emptyMap(), i.e. an immutable object } } return false; diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java index cce36ab37362c..5ed8b293ff580 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java @@ -67,12 +67,12 @@ public Map getProperties() { } @Override - public boolean putPropertyIfPossible(String key, Long value) { + public boolean putProperty(String key, Long value) { return false; } @Override - public boolean removePropertyIfPossible(String key) { + public boolean removeProperty(String key) { return false; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index ec86491d18061..1edc273c46eb7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -190,9 +190,9 @@ public void setReplicated(boolean replicated) { if (this.cursor != null) { if (replicated) { - this.cursor.putPropertyIfPossible(REPLICATED_SUBSCRIPTION_PROPERTY, 1L); + this.cursor.putProperty(REPLICATED_SUBSCRIPTION_PROPERTY, 1L); } else { - this.cursor.removePropertyIfPossible(REPLICATED_SUBSCRIPTION_PROPERTY); + this.cursor.removeProperty(REPLICATED_SUBSCRIPTION_PROPERTY); } } } From 644a1fe98d15b65c6e62d91337da5434c118f9a5 Mon Sep 17 00:00:00 2001 From: Masahiro Sakamoto Date: Wed, 9 Jun 2021 14:48:19 +0900 Subject: [PATCH 10/12] Inherit callbackGroup --- .../org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java | 1 + 1 file changed, 1 insertion(+) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 30a0e710091b3..8a127651cc4a1 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -295,6 +295,7 @@ public boolean putProperty(String key, Long value) { newProperties.put(key, value); lastMarkDeleteEntry = new MarkDeleteEntry(currentLastMarkDeleteEntry.newPosition, newProperties, currentLastMarkDeleteEntry.callback, currentLastMarkDeleteEntry.ctx); + lastMarkDeleteEntry.callbackGroup = currentLastMarkDeleteEntry.callbackGroup; } else { properties.put(key, value); } From de425322022ebcf2795dce14a7f935c8613bacad Mon Sep 17 00:00:00 2001 From: Masahiro Sakamoto Date: Thu, 10 Jun 2021 00:13:30 +0900 Subject: [PATCH 11/12] Return error response when updating cursor property fails --- .../pulsar/broker/admin/impl/PersistentTopicsBase.java | 7 ++++++- .../broker/service/persistent/PersistentSubscription.java | 8 +++++--- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 4daabc676af27..244c142bed0e1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -4152,7 +4152,12 @@ private void internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(Async } if (topic instanceof PersistentTopic && sub instanceof PersistentSubscription) { - ((PersistentSubscription) sub).setReplicated(enabled); + if (!((PersistentSubscription) sub).setReplicated(enabled)) { + asyncResponse.resume( + new RestException(Status.INTERNAL_SERVER_ERROR, "Failed to update cursor properties")); + return; + } + ((PersistentTopic) topic).checkReplicatedSubscriptionControllerState(); log.info("[{}] Changed replicated subscription status to {} - {} {}", clientAppId(), enabled, topicName, subName); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 1edc273c46eb7..973139a0502ef 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -178,7 +178,7 @@ public boolean isReplicated() { return replicatedSubscriptionSnapshotCache != null; } - public void setReplicated(boolean replicated) { + public boolean setReplicated(boolean replicated) { ServiceConfiguration config = topic.getBrokerService().getPulsar().getConfig(); if (!replicated || !config.isEnableReplicatedSubscriptions()) { @@ -190,11 +190,13 @@ public void setReplicated(boolean replicated) { if (this.cursor != null) { if (replicated) { - this.cursor.putProperty(REPLICATED_SUBSCRIPTION_PROPERTY, 1L); + return this.cursor.putProperty(REPLICATED_SUBSCRIPTION_PROPERTY, 1L); } else { - this.cursor.removeProperty(REPLICATED_SUBSCRIPTION_PROPERTY); + return this.cursor.removeProperty(REPLICATED_SUBSCRIPTION_PROPERTY); } } + + return false; } @Override From 02487e19e7e9a90f25d752fc4f9b2a4818202bd1 Mon Sep 17 00:00:00 2001 From: Masahiro Sakamoto Date: Thu, 10 Jun 2021 00:16:50 +0900 Subject: [PATCH 12/12] Update lastMarkDeleteEntry using AtomicReferenceFieldUpdater --- .../mledger/impl/ManagedCursorImpl.java | 33 ++++++++++--------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 8a127651cc4a1..bd0b5e124b26d 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -288,17 +288,17 @@ public Map getProperties() { @Override public boolean putProperty(String key, Long value) { if (lastMarkDeleteEntry != null) { - MarkDeleteEntry currentLastMarkDeleteEntry = lastMarkDeleteEntry; - Map properties = currentLastMarkDeleteEntry.properties; - if (properties == null || properties.isEmpty()) { - Map newProperties = Maps.newHashMap(); + LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this, last -> { + Map properties = last.properties; + Map newProperties = properties == null ? Maps.newHashMap() : Maps.newHashMap(properties); newProperties.put(key, value); - lastMarkDeleteEntry = new MarkDeleteEntry(currentLastMarkDeleteEntry.newPosition, newProperties, - currentLastMarkDeleteEntry.callback, currentLastMarkDeleteEntry.ctx); - lastMarkDeleteEntry.callbackGroup = currentLastMarkDeleteEntry.callbackGroup; - } else { - properties.put(key, value); - } + + MarkDeleteEntry newLastMarkDeleteEntry = new MarkDeleteEntry(last.newPosition, newProperties, + last.callback, last.ctx); + newLastMarkDeleteEntry.callbackGroup = last.callbackGroup; + + return newLastMarkDeleteEntry; + }); return true; } return false; @@ -307,11 +307,14 @@ public boolean putProperty(String key, Long value) { @Override public boolean removeProperty(String key) { if (lastMarkDeleteEntry != null) { - Map properties = lastMarkDeleteEntry.properties; - if (properties != null && properties.containsKey(key)) { - properties.remove(key); - return true; - } + LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this, last -> { + Map properties = last.properties; + if (properties != null && properties.containsKey(key)) { + properties.remove(key); + } + return last; + }); + return true; } return false; }