Skip to content

Commit

Permalink
Make the compaction phase one loop timeout configurable (apache#11206)
Browse files Browse the repository at this point in the history
  • Loading branch information
codelipenghui authored and ciaocloud committed Oct 16, 2021
1 parent 7499d45 commit 2196b88
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 2 deletions.
4 changes: 4 additions & 0 deletions conf/broker.conf
Expand Up @@ -450,6 +450,10 @@ brokerServiceCompactionMonitorIntervalInSeconds=60
# Using a value of 0, is disabling compression check.
brokerServiceCompactionThresholdInBytes=0

# Timeout for the compaction phase one loop.
# If the execution time of the compaction phase one loop exceeds this time, the compaction will not proceed.
brokerServiceCompactionPhaseOneLoopTimeInSeconds=30

# Whether to enable the delayed delivery for messages.
# If disabled, messages will be immediately delivered and there will
# be no tracking overhead.
Expand Down
Expand Up @@ -1883,6 +1883,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private long brokerServiceCompactionThresholdInBytes = 0;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Timeout for the compaction phase one loop, If the execution time of the compaction " +
"phase one loop exceeds this time, the compaction will not proceed."
)
private long brokerServiceCompactionPhaseOneLoopTimeInSeconds = 30;

@FieldContext(
category = CATEGORY_SCHEMA,
doc = "Enforce schema validation on following cases:\n\n"
Expand Down
Expand Up @@ -60,13 +60,14 @@ public class TwoPhaseCompactor extends Compactor {
private static final Logger log = LoggerFactory.getLogger(TwoPhaseCompactor.class);
private static final int MAX_OUTSTANDING = 500;
private static final String COMPACTED_TOPIC_LEDGER_PROPERTY = "CompactedTopicLedger";
public static final Duration PHASE_ONE_LOOP_READ_TIMEOUT = Duration.ofSeconds(10);
private final Duration phaseOneLoopReadTimeout;

public TwoPhaseCompactor(ServiceConfiguration conf,
PulsarClient pulsar,
BookKeeper bk,
ScheduledExecutorService scheduler) {
super(conf, pulsar, bk, scheduler);
phaseOneLoopReadTimeout = Duration.ofSeconds(conf.getBrokerServiceCompactionPhaseOneLoopTimeInSeconds());
}

@Override
Expand Down Expand Up @@ -116,7 +117,7 @@ private void phaseOneLoop(RawReader reader,
}
CompletableFuture<RawMessage> future = reader.readNextAsync();
FutureUtil.addTimeoutHandling(future,
PHASE_ONE_LOOP_READ_TIMEOUT, scheduler,
phaseOneLoopReadTimeout, scheduler,
() -> FutureUtil.createTimeoutException("Timeout", getClass(), "phaseOneLoop(...)"));

future.thenAcceptAsync(m -> {
Expand Down Expand Up @@ -399,4 +400,8 @@ private static class PhaseOneResult {
this.latestForKey = latestForKey;
}
}

public long getPhaseOneLoopReadTimeoutInSeconds() {
return phaseOneLoopReadTimeout.getSeconds();
}
}
Expand Up @@ -39,15 +39,19 @@
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.RawMessageImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -220,6 +224,16 @@ public void testCompactEmptyTopic() throws Exception {
compactor.compact(topic).get();
}

@Test
public void testPhaseOneLoopTimeConfiguration() {
ServiceConfiguration configuration = new ServiceConfiguration();
configuration.setBrokerServiceCompactionPhaseOneLoopTimeInSeconds(60);
TwoPhaseCompactor compactor = new TwoPhaseCompactor(configuration, Mockito.mock(PulsarClientImpl.class),
Mockito.mock(BookKeeper.class), compactionScheduler);
Assert.assertEquals(compactor.getPhaseOneLoopReadTimeoutInSeconds(), 60);

}

public ByteBuf extractPayload(RawMessage m) throws Exception {
ByteBuf payloadAndMetadata = m.getHeadersAndPayload();
Commands.skipChecksumIfPresent(payloadAndMetadata);
Expand Down

0 comments on commit 2196b88

Please sign in to comment.