Skip to content

Commit

Permalink
Fix CPU 100% when deleting namespace (#10337) (#10454)
Browse files Browse the repository at this point in the history
When deleting the namespace, the namespace Policies will be marked as deleted.
This will trigger topic's `onPoliciesUpdate`
However, in onPoliciesUpdate, the data of the Policies node on zk will be read, such as: `checkReplicationAndRetryOnFailure`
Due to the deletion of the namespace, the zk node may no longer exist at this time.
Failure to read data will trigger infinite retries.
https://github.com/apache/pulsar/blob/e970c2947aff9231202ab72bdbad047d85c55633/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L1175-L1193

If there are many topics, there will be a short-term CPU spike

![image](https://user-images.githubusercontent.com/9758905/115834541-ebc32480-a447-11eb-887a-95c4a3d1adf1.png)
  • Loading branch information
315157973 committed May 3, 2021
1 parent a695aee commit 7bf14b5
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -1087,7 +1087,8 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
return closeFuture;
}

private CompletableFuture<Void> checkReplicationAndRetryOnFailure() {
@VisibleForTesting
CompletableFuture<Void> checkReplicationAndRetryOnFailure() {
CompletableFuture<Void> result = new CompletableFuture<Void>();
checkReplication().thenAccept(res -> {
log.info("[{}] Policies updated successfully", topic);
Expand Down Expand Up @@ -2059,6 +2060,10 @@ public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
if (log.isDebugEnabled()) {
log.debug("[{}] isEncryptionRequired changes: {} -> {}", topic, isEncryptionRequired, data.encryption_required);
}
if (data.deleted) {
log.debug("Ignore the update because it has been deleted : {}", data);
return CompletableFuture.completedFuture(null);
}
isEncryptionRequired = data.encryption_required;

setSchemaCompatibilityStrategy(data);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.persistent;

import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.util.UUID;
import java.util.concurrent.TimeUnit;

import com.google.common.collect.Sets;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.common.policies.data.Policies;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = "broker")
public class PersistentTopicTest extends BrokerTestBase {

@BeforeMethod(alwaysRun = true)
@Override
protected void setup() throws Exception {
super.baseSetup();
}

@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

@Test
public void testDeleteNamespaceInfiniteRetry() throws Exception {
//init namespace
final String myNamespace = "prop/ns" + UUID.randomUUID();
admin.namespaces().createNamespace(myNamespace, Sets.newHashSet("test"));
final String topic = "persistent://" + myNamespace + "/testDeleteNamespaceInfiniteRetry";
//init topic and policies
pulsarClient.newProducer().topic(topic).create().close();
admin.namespaces().setMaxConsumersPerTopic(myNamespace, 0);
Awaitility.await().atMost(3, TimeUnit.SECONDS).until(()
-> admin.namespaces().getMaxConsumersPerTopic(myNamespace) == 0);

PersistentTopic persistentTopic =
spy((PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get());

Policies policies = new Policies();
policies.deleted = true;
persistentTopic.onPoliciesUpdate(policies);
verify(persistentTopic, times(0)).checkReplicationAndRetryOnFailure();

policies.deleted = false;
persistentTopic.onPoliciesUpdate(policies);
verify(persistentTopic, times(1)).checkReplicationAndRetryOnFailure();
}
}

0 comments on commit 7bf14b5

Please sign in to comment.