Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] avoid offload system topic #22497

Merged
merged 5 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1944,7 +1944,13 @@ private CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(@Nonnull T
topicLevelOffloadPolicies,
OffloadPoliciesImpl.oldPoliciesCompatible(nsLevelOffloadPolicies, policies.orElse(null)),
getPulsar().getConfig().getProperties());
if (NamespaceService.isSystemServiceNamespace(namespace.toString())) {
if (NamespaceService.isSystemServiceNamespace(namespace.toString())
|| SystemTopicNames.isSystemTopic(topicName)) {
lhotari marked this conversation as resolved.
Show resolved Hide resolved
/*
Avoid setting broker internal system topics using off-loader because some of them are the
preconditions of other topics. The slow replying log speed will cause a delay in all the topic
loading.(timeout)
*/
managedLedgerConfig.setLedgerOffloader(NullLedgerOffloader.INSTANCE);
} else {
if (topicLevelOffloadPolicies != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,14 @@
import java.util.concurrent.atomic.AtomicReference;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
Expand Down Expand Up @@ -108,6 +111,9 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.policies.data.OffloadedReadPriority;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.protocol.Commands;
Expand Down Expand Up @@ -1726,4 +1732,92 @@
fail("Unsubscribe failed");
}
}


@Test
public void testOffloadConfShouldNotAppliedForSystemTopic() throws PulsarAdminException {
final String driver = "aws-s3";
final String region = "test-region";
final String bucket = "test-bucket";
final String role = "test-role";
final String roleSessionName = "test-role-session-name";
final String credentialId = "test-credential-id";
final String credentialSecret = "test-credential-secret";
final String endPoint = "test-endpoint";
final Integer maxBlockSizeInBytes = 5;
final Integer readBufferSizeInBytes = 2;
final Long offloadThresholdInBytes = 10L;
final Long offloadThresholdInSeconds = 1000L;
final Long offloadDeletionLagInMillis = 5L;

final OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create(
driver,
region,
bucket,
endPoint,
role,
roleSessionName,
credentialId,
credentialSecret,
maxBlockSizeInBytes,
readBufferSizeInBytes,
offloadThresholdInBytes,
offloadThresholdInSeconds,
offloadDeletionLagInMillis,
OffloadedReadPriority.TIERED_STORAGE_FIRST
);

var fakeOffloader = new LedgerOffloader() {
@Override
public String getOffloadDriverName() {
return driver;
}

@Override
public CompletableFuture<Void> offload(ReadHandle ledger, UUID uid, Map<String, String> extraMetadata) {
return CompletableFuture.completedFuture(null);
}

@Override
public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid, Map<String, String> offloadDriverMetadata) {
return CompletableFuture.completedFuture(null);
}

@Override
public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid, Map<String, String> offloadDriverMetadata) {
return CompletableFuture.completedFuture(null);
}

@Override
public OffloadPolicies getOffloadPolicies() {
return offloadPolicies;
}

@Override
public void close() {
}
};

final BrokerService brokerService = pulsar.getBrokerService();
final String namespace = "public/" + UUID.randomUUID();
admin.namespaces().createNamespace(namespace);
admin.namespaces().setOffloadPolicies(namespace, offloadPolicies);

// Inject the cache to avoid real load off-loader jar
final Map<NamespaceName, LedgerOffloader> ledgerOffloaderMap = pulsar.getLedgerOffloaderMap();
ledgerOffloaderMap.put(NamespaceName.get(namespace), fakeOffloader);

// (1) test normal topic
final String normalTopic = "persistent://" + namespace + "/" + UUID.randomUUID();
var managedLedgerConfig = brokerService.getManagedLedgerConfig(TopicName.get(normalTopic)).join();

Assert.assertEquals(managedLedgerConfig.getLedgerOffloader(), fakeOffloader);

// (2) test system topic
for (String eventTopicName : SystemTopicNames.EVENTS_TOPIC_NAMES) {
managedLedgerConfig = brokerService.getManagedLedgerConfig(TopicName.get(eventTopicName)).join();
Assert.assertEquals(managedLedgerConfig.getLedgerOffloader(), NullLedgerOffloader.INSTANCE);
}
}
}

Check failure on line 1822 in pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java

View workflow job for this annotation

GitHub Actions / CI - Unit - Brokers - Broker Group 1

BrokerServiceTest.testOffloadConfShouldNotAppliedForSystemTopic

Tenant does not exist