Skip to content

Commit

Permalink
When delete a topic, delete the topic policy together. (#11316)
Browse files Browse the repository at this point in the history
(cherry picked from commit 00f8e57)
  • Loading branch information
horizonzy authored and codelipenghui committed Jul 23, 2021
1 parent 7d75d4b commit 25f552c
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -848,6 +848,10 @@ public Optional<TopicPolicies> getTopicPolicies() {
return brokerService.getTopicPolicies(TopicName.get(topic));
}

public CompletableFuture<Void> deleteTopicPolicies() {
return brokerService.deleteTopicPolicies(TopicName.get(topic));
}

protected int getWaitingProducersCount() {
return waitingExclusiveProducers.size();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2668,13 +2668,21 @@ private <T> boolean checkMaxTopicsPerNamespace(TopicName topicName, int numParti
// NoNode means there are no partitioned topics in this domain for this namespace
} catch (Exception e) {
log.error("Failed to create partitioned topic {}", topicName, e);
topicFuture.completeExceptionally(new RestException(e));
return false;
topicFuture.completeExceptionally(new RestException(e));
return false;
}

return true;
}

public CompletableFuture<Void> deleteTopicPolicies(TopicName topicName) {
if (!pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
return CompletableFuture.completedFuture(null);
}
TopicName cloneTopicName = TopicName.get(topicName.getPartitionedTopicName());
return pulsar.getTopicPoliciesService().deleteTopicPoliciesAsync(cloneTopicName);
}

public void setInterceptor(BrokerInterceptor interceptor) {
this.interceptor = interceptor;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,22 @@ public SystemTopicBasedTopicPoliciesService(PulsarService pulsarService) {
this.pulsarService = pulsarService;
}

@Override
public CompletableFuture<Void> deleteTopicPoliciesAsync(TopicName topicName) {
return sendTopicPolicyEvent(topicName, ActionType.DELETE, null);
}

@Override
public CompletableFuture<Void> updateTopicPoliciesAsync(TopicName topicName, TopicPolicies policies) {
CompletableFuture<Void> result = new CompletableFuture<>();
return sendTopicPolicyEvent(topicName, ActionType.UPDATE, policies);
}

private CompletableFuture<Void> sendTopicPolicyEvent(TopicName topicName, ActionType actionType,
TopicPolicies policies) {
createSystemTopicFactoryIfNeeded();

CompletableFuture<Void> result = new CompletableFuture<>();

SystemTopicClient<PulsarEvent> systemTopicClient =
namespaceEventsSystemTopicFactory.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject());

Expand All @@ -84,7 +95,7 @@ public CompletableFuture<Void> updateTopicPoliciesAsync(TopicName topicName, Top
} else {
writer.writeAsync(
PulsarEvent.builder()
.actionType(ActionType.UPDATE)
.actionType(actionType)
.eventType(EventType.TOPIC_POLICY)
.topicPoliciesEvent(
TopicPoliciesEvent.builder()
Expand Down Expand Up @@ -302,10 +313,27 @@ private void readMorePolicies(SystemTopicClient.Reader<PulsarEvent> reader) {
private void refreshTopicPoliciesCache(Message<PulsarEvent> msg) {
if (EventType.TOPIC_POLICY.equals(msg.getValue().getEventType())) {
TopicPoliciesEvent event = msg.getValue().getTopicPoliciesEvent();
policiesCache.put(
TopicName.get(event.getDomain(), event.getTenant(), event.getNamespace(), event.getTopic()),
event.getPolicies()
);
TopicName topicName =
TopicName.get(event.getDomain(), event.getTenant(), event.getNamespace(), event.getTopic());
switch (msg.getValue().getActionType()) {
case INSERT:
TopicPolicies old = policiesCache.putIfAbsent(topicName, event.getPolicies());
if (old != null) {
log.warn("Policy insert failed, the topic: {}' policy already exist", topicName);
}
break;
case UPDATE:
policiesCache.put(topicName, event.getPolicies());
break;
case DELETE:
policiesCache.remove(topicName);
break;
case NONE:
break;
default:
log.warn("Unknown event action type: {}", msg.getValue().getActionType());
break;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ public interface TopicPoliciesService {

TopicPoliciesService DISABLED = new TopicPoliciesServiceDisabled();

/**
* Delete policies for a topic async.
*
* @param topicName topic name
*/
CompletableFuture<Void> deleteTopicPoliciesAsync(TopicName topicName);

/**
* Update policies for a topic async.
*
Expand Down Expand Up @@ -94,6 +101,11 @@ default void clean(TopicName topicName) {

class TopicPoliciesServiceDisabled implements TopicPoliciesService {

@Override
public CompletableFuture<Void> deleteTopicPoliciesAsync(TopicName topicName) {
return FutureUtil.failedFuture(new UnsupportedOperationException("Topic policies service is disabled."));
}

@Override
public CompletableFuture<Void> updateTopicPoliciesAsync(TopicName topicName, TopicPolicies policies) {
return FutureUtil.failedFuture(new UnsupportedOperationException("Topic policies service is disabled."));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions, boolean c
if (deleteSchema) {
futures.add(deleteSchema().thenApply(schemaVersion -> null));
}
futures.add(deleteTopicPolicies());
FutureUtil.waitForAll(futures).whenComplete((v, ex) -> {
if (ex != null) {
log.error("[{}] Error deleting topic", topic, ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1071,7 +1071,7 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
CompletableFuture<SchemaVersion> deleteSchemaFuture =
deleteSchema ? deleteSchema() : CompletableFuture.completedFuture(null);

deleteSchemaFuture.whenComplete((v, ex) -> {
deleteSchemaFuture.thenAccept(__ -> deleteTopicPolicies()).whenComplete((v, ex) -> {
if (ex != null) {
log.error("[{}] Error deleting topic", topic, ex);
unfenceTopicToResume();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -2365,4 +2366,64 @@ public void testTopicRetentionPolicySetInManagedLedgerConfig() throws Exception
});
}

@Test
public void testPolicyIsDeleteTogetherManually() throws Exception {
final String topic = testTopic + UUID.randomUUID();
pulsarClient.newProducer().topic(topic).create().close();

Awaitility.await().untilAsserted(() ->
Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)))
.isNull());

int maxConsumersPerSubscription = 10;
admin.topics().setMaxConsumersPerSubscription(topic, maxConsumersPerSubscription);

Awaitility.await().untilAsserted(() ->
Assertions.assertThat(pulsar.getBrokerService().getTopic(topic, false).get().isPresent()).isTrue());
Awaitility.await().untilAsserted(() ->
Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)))
.isNotNull());

admin.topics().delete(topic);

Awaitility.await().untilAsserted(() ->
Assertions.assertThat(pulsar.getBrokerService().getTopic(topic, false).get().isPresent()).isFalse());
Awaitility.await().untilAsserted(() ->
Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)))
.isNull());
}

@Test
public void testPolicyIsDeleteTogetherAutomatically() throws Exception {
final String topic = testTopic + UUID.randomUUID();
pulsarClient.newProducer().topic(topic).create().close();

Awaitility.await().untilAsserted(() ->
Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)))
.isNull());

int maxConsumersPerSubscription = 10;
admin.topics().setMaxConsumersPerSubscription(topic, maxConsumersPerSubscription);

Awaitility.await().untilAsserted(() ->
Assertions.assertThat(pulsar.getBrokerService().getTopic(topic, false).get().isPresent()).isTrue());
Awaitility.await().untilAsserted(() ->
Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)))
.isNotNull());

InactiveTopicPolicies inactiveTopicPolicies =
new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 3, true);
admin.topics().setInactiveTopicPolicies(topic, inactiveTopicPolicies);

Thread.sleep(4_000L);

pulsar.getBrokerService().checkGC();

Awaitility.await().untilAsserted(() ->
Assertions.assertThat(pulsar.getBrokerService().getTopic(topic, false).get().isPresent()).isFalse());
Awaitility.await().untilAsserted(() ->
Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)))
.isNull());
}

}

0 comments on commit 25f552c

Please sign in to comment.