Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Fix namespace bundle stuck in unloading status (#21445) #21567

Merged
merged 9 commits into from
Nov 16, 2023
Merged

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import org.apache.commons.lang3.tuple.MutablePair;
Expand All @@ -43,10 +42,8 @@
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.client.util.RetryUtil;
import org.apache.pulsar.common.events.ActionType;
import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.events.PulsarEvent;
Expand Down Expand Up @@ -314,7 +311,7 @@ public CompletableFuture<Void> addOwnedNamespaceBundleAsync(NamespaceBundle name
requireNonNull(namespace);
return policyCacheInitMap.computeIfAbsent(namespace, (k) -> {
final CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
createSystemTopicClientWithRetry(namespace);
createSystemTopicClient(namespace);
readerCaches.put(namespace, readerCompletableFuture);
ownedBundlesCountPerNamespace.putIfAbsent(namespace, new AtomicInteger(1));
final CompletableFuture<Void> initFuture = readerCompletableFuture
Expand All @@ -340,20 +337,16 @@ public CompletableFuture<Void> addOwnedNamespaceBundleAsync(NamespaceBundle name
});
}

protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> createSystemTopicClientWithRetry(
protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> createSystemTopicClient(
NamespaceName namespace) {
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> result = new CompletableFuture<>();
try {
createSystemTopicFactoryIfNeeded();
} catch (PulsarServerException e) {
result.completeExceptionally(e);
return result;
} catch (PulsarServerException ex) {
return FutureUtil.failedFuture(ex);
}
SystemTopicClient<PulsarEvent> systemTopicClient = namespaceEventsSystemTopicFactory
final SystemTopicClient<PulsarEvent> systemTopicClient = namespaceEventsSystemTopicFactory
.createTopicPoliciesSystemTopicClient(namespace);
Backoff backoff = new Backoff(1, TimeUnit.SECONDS, 3, TimeUnit.SECONDS, 10, TimeUnit.SECONDS);
RetryUtil.retryAsynchronously(systemTopicClient::newReaderAsync, backoff, pulsarService.getExecutor(), result);
return result;
return systemTopicClient.newReaderAsync();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ public void testPartitionedTopicAutoCreationForbiddenDuringNamespaceDeletion()
new InetSocketAddress(pulsar.getAdvertisedAddress(), pulsar.getBrokerListenPort().get());
return CompletableFuture.completedFuture(Pair.of(brokerAddress, brokerAddress));
});
final String topicPoliciesServiceInitException
= "Topic creation encountered an exception by initialize topic policies service";

// Creating a producer and creating a Consumer may trigger automatic topic
// creation, let's try to create a Producer and a Consumer
Expand All @@ -146,7 +148,8 @@ public void testPartitionedTopicAutoCreationForbiddenDuringNamespaceDeletion()
} catch (PulsarClientException.LookupException expected) {
String msg = "Namespace bundle for topic (%s) not served by this instance";
log.info("Expected error", expected);
assertTrue(expected.getMessage().contains(String.format(msg, topic)));
assertTrue(expected.getMessage().contains(String.format(msg, topic))
|| expected.getMessage().contains(topicPoliciesServiceInitException));
}

try (Consumer<byte[]> consumer = pulsarClient.newConsumer()
Expand All @@ -156,7 +159,8 @@ public void testPartitionedTopicAutoCreationForbiddenDuringNamespaceDeletion()
} catch (PulsarClientException.LookupException expected) {
String msg = "Namespace bundle for topic (%s) not served by this instance";
log.info("Expected error", expected);
assertTrue(expected.getMessage().contains(String.format(msg, topic)));
assertTrue(expected.getMessage().contains(String.format(msg, topic))
|| expected.getMessage().contains(topicPoliciesServiceInitException));
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@

import com.google.common.collect.Sets;

import lombok.Cleanup;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
Expand All @@ -34,6 +38,9 @@ public class NamespaceUnloadingTest extends BrokerTestBase {
@BeforeMethod
@Override
protected void setup() throws Exception {
conf.setTopicLevelPoliciesEnabled(true);
conf.setForceDeleteNamespaceAllowed(true);
conf.setTopicLoadTimeoutSeconds(Integer.MAX_VALUE);
super.baseSetup();
}

Expand Down Expand Up @@ -68,4 +75,26 @@ public void testUnloadPartiallyLoadedNamespace() throws Exception {
producer.close();
}

@Test
public void testUnloadWithTopicCreation() throws PulsarAdminException, PulsarClientException {
final String namespaceName = "prop/ns_unloading";
final String topicName = "persistent://prop/ns_unloading/with_topic_creation";
final int partitions = 5;
admin.namespaces().createNamespace(namespaceName, 1);
admin.topics().createPartitionedTopic(topicName, partitions);
@Cleanup
Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
.topic(topicName)
.create();

for (int i = 0; i < 100; i++) {
admin.namespaces().unloadNamespaceBundle(namespaceName, "0x00000000_0xffffffff");
}

for (int i = 0; i < partitions; i++) {
producer.send(i);
}
admin.namespaces().deleteNamespace(namespaceName, true);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1096,7 +1096,7 @@ public void testTopicLoadingOnDisableNamespaceBundle() throws Exception {

// try to create topic which should fail as bundle is disable
CompletableFuture<Optional<Topic>> futureResult = pulsar.getBrokerService()
.loadOrCreatePersistentTopic(topicName, true, null);
.loadOrCreatePersistentTopic(topicName, true, null, null);

try {
futureResult.get();
Expand Down Expand Up @@ -1140,7 +1140,7 @@ public void testConcurrentLoadTopicExceedLimitShouldNotBeAutoCreated() throws Ex
for (int i = 0; i < 10; i++) {
// try to create topic which should fail as bundle is disable
CompletableFuture<Optional<Topic>> futureResult = pulsar.getBrokerService()
.loadOrCreatePersistentTopic(topicName + "_" + i, false, null);
.loadOrCreatePersistentTopic(topicName + "_" + i, false, null, null);
loadFutures.add(futureResult);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@
*/
package org.apache.pulsar.broker.service;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertNotNull;
Expand All @@ -42,11 +39,8 @@
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException;
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.systopic.TopicPoliciesSystemTopicClient;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.BackoffBuilder;
import org.apache.pulsar.common.events.PulsarEvent;
Expand All @@ -55,7 +49,6 @@
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.util.FutureUtil;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
Expand Down Expand Up @@ -322,28 +315,6 @@ public void testGetPolicyTimeout() throws Exception {
assertTrue("actual:" + cost, cost >= 5000 - 1000);
}

@Test
public void testCreatSystemTopicClientWithRetry() throws Exception {
SystemTopicBasedTopicPoliciesService service =
spy((SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService());
Field field = SystemTopicBasedTopicPoliciesService.class
.getDeclaredField("namespaceEventsSystemTopicFactory");
field.setAccessible(true);
NamespaceEventsSystemTopicFactory factory = spy((NamespaceEventsSystemTopicFactory) field.get(service));
SystemTopicClient<PulsarEvent> client = mock(TopicPoliciesSystemTopicClient.class);
doReturn(client).when(factory).createTopicPoliciesSystemTopicClient(any());
field.set(service, factory);

SystemTopicClient.Reader<PulsarEvent> reader = mock(SystemTopicClient.Reader.class);
// Throw an exception first, create successfully after retrying
doReturn(FutureUtil.failedFuture(new PulsarClientException("test")))
.doReturn(CompletableFuture.completedFuture(reader)).when(client).newReaderAsync();

SystemTopicClient.Reader<PulsarEvent> reader1 = service.createSystemTopicClientWithRetry(null).get();

assertEquals(reader1, reader);
}

@Test
public void testGetTopicPoliciesWithRetry() throws Exception {
Field initMapField = SystemTopicBasedTopicPoliciesService.class.getDeclaredField("policyCacheInitMap");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public void testNonDurableSubscribe() throws Exception {
properties.setProperty("useTls", "false");

final String topicName = getTopicWithRandomSuffix("non-durable");
admin.topics().createNonPartitionedTopic(topicName);

int numberOfMessages = 10;
@Cleanup("shutdownNow")
Expand Down Expand Up @@ -200,6 +201,7 @@ public void testEncryption() throws Exception {
properties.setProperty("useTls", "false");

final String topicName = getTopicWithRandomSuffix("encryption");
admin.topics().createNonPartitionedTopic(topicName);
final String keyUriBase = "file:../pulsar-broker/src/test/resources/certificate/";
final int numberOfMessages = 10;

Expand Down