From 2196b884b34a1003602c51e7a27f759a446545e8 Mon Sep 17 00:00:00 2001 From: lipenghui Date: Sun, 4 Jul 2021 08:48:43 +0800 Subject: [PATCH] Make the compaction phase one loop timeout configurable (#11206) --- conf/broker.conf | 4 ++++ .../apache/pulsar/broker/ServiceConfiguration.java | 7 +++++++ .../pulsar/compaction/TwoPhaseCompactor.java | 9 +++++++-- .../apache/pulsar/compaction/CompactorTest.java | 14 ++++++++++++++ 4 files changed, 32 insertions(+), 2 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 94e4643a5ff7cb..ed53cfacefbcbe 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -450,6 +450,10 @@ brokerServiceCompactionMonitorIntervalInSeconds=60 # Using a value of 0, is disabling compression check. brokerServiceCompactionThresholdInBytes=0 +# Timeout for the compaction phase one loop. +# If the execution time of the compaction phase one loop exceeds this time, the compaction will not proceed. +brokerServiceCompactionPhaseOneLoopTimeInSeconds=30 + # Whether to enable the delayed delivery for messages. # If disabled, messages will be immediately delivered and there will # be no tracking overhead. diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index a462f852479d91..306ef853aa5b38 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1883,6 +1883,13 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private long brokerServiceCompactionThresholdInBytes = 0; + @FieldContext( + category = CATEGORY_SERVER, + doc = "Timeout for the compaction phase one loop, If the execution time of the compaction " + + "phase one loop exceeds this time, the compaction will not proceed." + ) + private long brokerServiceCompactionPhaseOneLoopTimeInSeconds = 30; + @FieldContext( category = CATEGORY_SCHEMA, doc = "Enforce schema validation on following cases:\n\n" diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java index 800182ef30ae43..0f0f9811361697 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java @@ -60,13 +60,14 @@ public class TwoPhaseCompactor extends Compactor { private static final Logger log = LoggerFactory.getLogger(TwoPhaseCompactor.class); private static final int MAX_OUTSTANDING = 500; private static final String COMPACTED_TOPIC_LEDGER_PROPERTY = "CompactedTopicLedger"; - public static final Duration PHASE_ONE_LOOP_READ_TIMEOUT = Duration.ofSeconds(10); + private final Duration phaseOneLoopReadTimeout; public TwoPhaseCompactor(ServiceConfiguration conf, PulsarClient pulsar, BookKeeper bk, ScheduledExecutorService scheduler) { super(conf, pulsar, bk, scheduler); + phaseOneLoopReadTimeout = Duration.ofSeconds(conf.getBrokerServiceCompactionPhaseOneLoopTimeInSeconds()); } @Override @@ -116,7 +117,7 @@ private void phaseOneLoop(RawReader reader, } CompletableFuture future = reader.readNextAsync(); FutureUtil.addTimeoutHandling(future, - PHASE_ONE_LOOP_READ_TIMEOUT, scheduler, + phaseOneLoopReadTimeout, scheduler, () -> FutureUtil.createTimeoutException("Timeout", getClass(), "phaseOneLoop(...)")); future.thenAcceptAsync(m -> { @@ -399,4 +400,8 @@ private static class PhaseOneResult { this.latestForKey = latestForKey; } } + + public long getPhaseOneLoopReadTimeoutInSeconds() { + return phaseOneLoopReadTimeout.getSeconds(); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java index 57a2146c8efecd..0d1a95c2c0ca57 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java @@ -39,15 +39,19 @@ import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerEntry; import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.RawMessage; +import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.RawMessageImpl; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.policies.data.ClusterDataImpl; import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -220,6 +224,16 @@ public void testCompactEmptyTopic() throws Exception { compactor.compact(topic).get(); } + @Test + public void testPhaseOneLoopTimeConfiguration() { + ServiceConfiguration configuration = new ServiceConfiguration(); + configuration.setBrokerServiceCompactionPhaseOneLoopTimeInSeconds(60); + TwoPhaseCompactor compactor = new TwoPhaseCompactor(configuration, Mockito.mock(PulsarClientImpl.class), + Mockito.mock(BookKeeper.class), compactionScheduler); + Assert.assertEquals(compactor.getPhaseOneLoopReadTimeoutInSeconds(), 60); + + } + public ByteBuf extractPayload(RawMessage m) throws Exception { ByteBuf payloadAndMetadata = m.getHeadersAndPayload(); Commands.skipChecksumIfPresent(payloadAndMetadata);