Skip to content

Commit

Permalink
merge: #9778
Browse files Browse the repository at this point in the history
9778: [Backports stable/1.3] Preallocate segment files r=npepinpe a=npepinpe

## Description

Backports #9731 to 1.3.x.

## Related issues

closes #6504
closes #8099



Co-authored-by: Nicolas Pepin-Perreault <nicolas.pepin-perreault@camunda.com>
  • Loading branch information
zeebe-bors-camunda[bot] and npepinpe committed Jul 13, 2022
2 parents d746125 + 3890f64 commit a5ecfb7
Show file tree
Hide file tree
Showing 19 changed files with 525 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,19 @@ public Builder withPreferSnapshotReplicationThreshold(
return this;
}

/**
* Sets whether segment files are pre-allocated at creation. If true, segment files are
* pre-allocated to the maximum segment size (see {@link #withSegmentSize(long)}) at creation
* before any writes happen.
*
* @param preallocateSegmentFiles true to preallocate files, false otherwise
* @return this builder for chaining
*/
public Builder withPreallocateSegmentFiles(final boolean preallocateSegmentFiles) {
config.getStorageConfig().setPreallocateSegmentFiles(preallocateSegmentFiles);
return this;
}

@Override
public RaftPartitionGroup build() {
return new RaftPartitionGroup(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,14 @@ public class RaftStorageConfig {
private static final long DEFAULT_FREE_DISK_SPACE = 1024L * 1024 * 1024;
private static final int DEFAULT_JOURNAL_INDEX_DENSITY = 100;

private static final boolean DEFAULT_PREALLOCATE_SEGMENT_FILES = true;

private String directory;
private long segmentSize = DEFAULT_MAX_SEGMENT_SIZE;
private boolean flushExplicitly = DEFAULT_FLUSH_EXPLICITLY;
private long freeDiskSpace = DEFAULT_FREE_DISK_SPACE;
private int journalIndexDensity = DEFAULT_JOURNAL_INDEX_DENSITY;
private boolean preallocateSegmentFiles = DEFAULT_PREALLOCATE_SEGMENT_FILES;

@Optional("SnapshotStoreFactory")
private ReceivableSnapshotStoreFactory persistedSnapshotStoreFactory;
Expand Down Expand Up @@ -152,4 +155,19 @@ public RaftStorageConfig setJournalIndexDensity(final int journalIndexDensity) {
this.journalIndexDensity = journalIndexDensity;
return this;
}

/** @return true to preallocate segment files, false otherwise */
public boolean isPreallocateSegmentFiles() {
return preallocateSegmentFiles;
}

/**
* Sets whether segment files are pre-allocated at creation. If true, segment files are
* pre-allocated to {@link #segmentSize} at creation before any writes happen.
*
* @param preallocateSegmentFiles true to preallocate files, false otherwise
*/
public void setPreallocateSegmentFiles(final boolean preallocateSegmentFiles) {
this.preallocateSegmentFiles = preallocateSegmentFiles;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ private RaftStorage createRaftStorage() {
.withFreeDiskSpace(storageConfig.getFreeDiskSpace())
.withSnapshotStore(persistedSnapshotStore)
.withJournalIndexDensity(storageConfig.getJournalIndexDensity())
.withPreallocateSegmentFiles(storageConfig.isPreallocateSegmentFiles())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public final class RaftStorage {
private final boolean flushExplicitly;
private final ReceivableSnapshotStore persistedSnapshotStore;
private final int journalIndexDensity;
private final boolean preallocateSegmentFiles;

private RaftStorage(
final String prefix,
Expand All @@ -65,14 +66,16 @@ private RaftStorage(
final long freeDiskSpace,
final boolean flushExplicitly,
final ReceivableSnapshotStore persistedSnapshotStore,
final int journalIndexDensity) {
final int journalIndexDensity,
final boolean preallocateSegmentFiles) {
this.prefix = prefix;
this.directory = directory;
this.maxSegmentSize = maxSegmentSize;
this.freeDiskSpace = freeDiskSpace;
this.flushExplicitly = flushExplicitly;
this.persistedSnapshotStore = persistedSnapshotStore;
this.journalIndexDensity = journalIndexDensity;
this.preallocateSegmentFiles = preallocateSegmentFiles;

try {
FileUtil.ensureDirectoryExists(directory.toPath());
Expand Down Expand Up @@ -172,6 +175,7 @@ public RaftLog openLog() {
.withFlushExplicitly(flushExplicitly)
.withJournalIndexDensity(journalIndexDensity)
.withLastWrittenIndex(lastWrittenIndex)
.withPreallocateSegmentFiles(preallocateSegmentFiles)
.build();
}

Expand Down Expand Up @@ -218,6 +222,7 @@ public static final class Builder implements io.atomix.utils.Builder<RaftStorage
private static final long DEFAULT_FREE_DISK_SPACE = 1024L * 1024 * 1024;
private static final boolean DEFAULT_FLUSH_EXPLICITLY = true;
private static final int DEFAULT_JOURNAL_INDEX_DENSITY = 100;
private static final boolean DEFAULT_PREALLOCATE_SEGMENT_FILES = true;

private String prefix = DEFAULT_PREFIX;
private File directory = new File(DEFAULT_DIRECTORY);
Expand All @@ -226,6 +231,7 @@ public static final class Builder implements io.atomix.utils.Builder<RaftStorage
private boolean flushExplicitly = DEFAULT_FLUSH_EXPLICITLY;
private ReceivableSnapshotStore persistedSnapshotStore;
private int journalIndexDensity = DEFAULT_JOURNAL_INDEX_DENSITY;
private boolean preallocateSegmentFiles = DEFAULT_PREALLOCATE_SEGMENT_FILES;

private Builder() {}

Expand Down Expand Up @@ -316,6 +322,19 @@ public Builder withJournalIndexDensity(final int journalIndexDensity) {
return this;
}

/**
* Sets whether segment files are pre-allocated at creation. If true, segment files are
* pre-allocated to the maximum segment size (see {@link #withMaxSegmentSize(int)}}) at creation
* before any writes happen.
*
* @param preallocateSegmentFiles true to preallocate files, false otherwise
* @return this builder for chaining
*/
public Builder withPreallocateSegmentFiles(final boolean preallocateSegmentFiles) {
this.preallocateSegmentFiles = preallocateSegmentFiles;
return this;
}

/**
* Builds the {@link RaftStorage} object.
*
Expand All @@ -330,7 +349,8 @@ public RaftStorage build() {
freeDiskSpace,
flushExplicitly,
persistedSnapshotStore,
journalIndexDensity);
journalIndexDensity,
preallocateSegmentFiles);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,19 @@ public RaftLogBuilder withLastWrittenIndex(final long lastWrittenIndex) {
return this;
}

/**
* Sets whether segment files are pre-allocated at creation. If true, segment files are
* pre-allocated to the maximum segment size (see {@link #withMaxSegmentSize(int)}}) at creation
* before any writes happen.
*
* @param preallocateSegmentFiles true to preallocate files, false otherwise
* @return this builder for chaining
*/
public RaftLogBuilder withPreallocateSegmentFiles(final boolean preallocateSegmentFiles) {
journalBuilder.withPreallocateSegmentFiles(preallocateSegmentFiles);
return this;
}

@Override
public RaftLog build() {
final Journal journal = journalBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ RaftPartitionGroup buildRaftPartitionGroup(
.withMaxQuorumResponseTimeout(experimentalCfg.getRaft().getMaxQuorumResponseTimeout())
.withMinStepDownFailureCount(experimentalCfg.getRaft().getMinStepDownFailureCount())
.withPreferSnapshotReplicationThreshold(
experimentalCfg.getRaft().getPreferSnapshotReplicationThreshold());
experimentalCfg.getRaft().getPreferSnapshotReplicationThreshold())
.withPreallocateSegmentFiles(experimentalCfg.getRaft().isPreallocateSegmentFiles());

final int maxMessageSize = (int) networkCfg.getMaxMessageSizeInBytes();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@ public final class ExperimentalRaftCfg implements ConfigurationEntry {
private static final Duration DEFAULT_MAX_QUORUM_RESPONSE_TIMEOUT = Duration.ofSeconds(0);
private static final int DEFAULT_MIN_STEP_DOWN_FAILURE_COUNT = 3;
private static final int DEFAULT_PREFER_SNAPSHOT_REPLICATION_THRESHOLD = 100;
private static final boolean DEFAULT_PREALLOCATE_SEGMENT_FILES = true;

private Duration requestTimeout = DEFAULT_REQUEST_TIMEOUT;
private Duration maxQuorumResponseTimeout = DEFAULT_MAX_QUORUM_RESPONSE_TIMEOUT;
private int minStepDownFailureCount = DEFAULT_MIN_STEP_DOWN_FAILURE_COUNT;
private int preferSnapshotReplicationThreshold = DEFAULT_PREFER_SNAPSHOT_REPLICATION_THRESHOLD;

private boolean preallocateSegmentFiles = DEFAULT_PREALLOCATE_SEGMENT_FILES;

public Duration getRequestTimeout() {
return requestTimeout;
}
Expand Down Expand Up @@ -52,4 +55,12 @@ public int getPreferSnapshotReplicationThreshold() {
public void setPreferSnapshotReplicationThreshold(final int preferSnapshotReplicationThreshold) {
this.preferSnapshotReplicationThreshold = preferSnapshotReplicationThreshold;
}

public boolean isPreallocateSegmentFiles() {
return preallocateSegmentFiles;
}

public void setPreallocateSegmentFiles(final boolean preallocateSegmentFiles) {
this.preallocateSegmentFiles = preallocateSegmentFiles;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
import io.camunda.zeebe.snapshots.ReceivableSnapshotStoreFactory;
import java.time.Duration;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.springframework.util.unit.DataSize;

class RaftPartitionGroupFactoryTest {

final class RaftPartitionGroupFactoryTest {
private static final ReceivableSnapshotStoreFactory SNAPSHOT_STORE_FACTORY =
(directory, partitionId) -> mock(ReceivableSnapshotStore.class);

Expand Down Expand Up @@ -153,6 +154,19 @@ void shouldSetPreferSnapshotReplicationThreshold() {
assertThat(config.getPartitionConfig().getPreferSnapshotReplicationThreshold()).isEqualTo(1000);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void shouldSetSegmentFilesPreallocation(final boolean value) {
// given
brokerCfg.getExperimental().getRaft().setPreallocateSegmentFiles(value);

// when
final var config = buildRaftPartitionGroup();

// then
assertThat(config.getStorageConfig().isPreallocateSegmentFiles()).isEqualTo(value);
}

private RaftPartitionGroupConfig buildRaftPartitionGroup() {
final var partitionGroup = factory.buildRaftPartitionGroup(brokerCfg, SNAPSHOT_STORE_FACTORY);
return (RaftPartitionGroupConfig) partitionGroup.config();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import org.junit.Test;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;

public class ExperimentalCfgTest {
@Execution(ExecutionMode.CONCURRENT)
final class ExperimentalCfgTest {

public final Map<String, String> environment = new HashMap<>();
final Map<String, String> environment = new HashMap<>();

@Test
public void shouldSetRaftRequestTimeoutFromConfig() {
void shouldSetRaftRequestTimeoutFromConfig() {
// when
final BrokerCfg cfg = TestConfigReader.readConfig("experimental-cfg", environment);
final var raft = cfg.getExperimental().getRaft();
Expand All @@ -29,7 +32,7 @@ public void shouldSetRaftRequestTimeoutFromConfig() {
}

@Test
public void shouldSetRaftRequestTimeoutFromEnv() {
void shouldSetRaftRequestTimeoutFromEnv() {
// given
environment.put("zeebe.broker.experimental.raft.requestTimeout", "15s");

Expand All @@ -42,7 +45,7 @@ public void shouldSetRaftRequestTimeoutFromEnv() {
}

@Test
public void shouldSetRaftMaxQuorumResponseTimeoutFromConfig() {
void shouldSetRaftMaxQuorumResponseTimeoutFromConfig() {
// when
final BrokerCfg cfg = TestConfigReader.readConfig("experimental-cfg", environment);
final var raft = cfg.getExperimental().getRaft();
Expand All @@ -52,7 +55,7 @@ public void shouldSetRaftMaxQuorumResponseTimeoutFromConfig() {
}

@Test
public void shouldSetRaftMaxQuorumResponseTimeoutFromEnv() {
void shouldSetRaftMaxQuorumResponseTimeoutFromEnv() {
// given
environment.put("zeebe.broker.experimental.raft.maxQuorumResponseTimeout", "15s");

Expand All @@ -65,7 +68,7 @@ public void shouldSetRaftMaxQuorumResponseTimeoutFromEnv() {
}

@Test
public void shouldSetRaftMinStepDownFailureCountFromConfig() {
void shouldSetRaftMinStepDownFailureCountFromConfig() {
// when
final BrokerCfg cfg = TestConfigReader.readConfig("experimental-cfg", environment);
final var raft = cfg.getExperimental().getRaft();
Expand All @@ -75,7 +78,7 @@ public void shouldSetRaftMinStepDownFailureCountFromConfig() {
}

@Test
public void shouldSetRaftMinStepDownFailureCountFromEnv() {
void shouldSetRaftMinStepDownFailureCountFromEnv() {
// given
environment.put("zeebe.broker.experimental.raft.minStepDownFailureCount", "10");

Expand All @@ -88,7 +91,7 @@ public void shouldSetRaftMinStepDownFailureCountFromEnv() {
}

@Test
public void shouldSetPreferSnapshotReplicationThresholdFromConfig() {
void shouldSetPreferSnapshotReplicationThresholdFromConfig() {
// when
final BrokerCfg cfg = TestConfigReader.readConfig("experimental-cfg", environment);
final var raft = cfg.getExperimental().getRaft();
Expand All @@ -98,7 +101,7 @@ public void shouldSetPreferSnapshotReplicationThresholdFromConfig() {
}

@Test
public void shouldSetPreferSnapshotReplicationThresholdFromEnv() {
void shouldSetPreferSnapshotReplicationThresholdFromEnv() {
// given
environment.put("zeebe.broker.experimental.raft.preferSnapshotReplicationThreshold", "10");

Expand All @@ -109,4 +112,27 @@ public void shouldSetPreferSnapshotReplicationThresholdFromEnv() {
// then
assertThat(raft.getPreferSnapshotReplicationThreshold()).isEqualTo(10);
}

@Test
void shouldSetPreallocateSegmentFilesFromEnv() {
// given
environment.put("zeebe.broker.experimental.raft.preallocateSegmentFiles", "false");

// when
final BrokerCfg cfg = TestConfigReader.readConfig("experimental-cfg", environment);
final var raftCfg = cfg.getExperimental().getRaft();

// then
assertThat(raftCfg.isPreallocateSegmentFiles()).isFalse();
}

@Test
void shouldSetPreallocateSegmentFilesFromConfig() {
// when
final BrokerCfg cfg = TestConfigReader.readConfig("experimental-cfg", environment);
final var raftCfg = cfg.getExperimental().getRaft();

// then
assertThat(raftCfg.isPreallocateSegmentFiles()).isTrue();
}
}
19 changes: 15 additions & 4 deletions dist/src/main/config/broker.standalone.yaml.template
Original file line number Diff line number Diff line change
Expand Up @@ -603,13 +603,13 @@
# raft:
# Sets the timeout for all requests send by raft leaders and followers.
# This setting can also be overridden using the environment variable ZEEBE_BROKER_EXPERIMENTAL_RAFT_REQUESTTIMEOUT
# requestTimeout = 5s
# requestTimeout: 5s

# If the leader is not able to reach the quorum, the leader may step down.
# This is triggered after a number of requests, to a quorum of followers, has failed, and the number of failures
# reached minStepDownFailureCount. The maxQuorumResponseTime also influences when the leader step down.
# This setting can also be overridden using the environment variable ZEEBE_BROKER_EXPERIMENTAL_RAFT_MINSTEPDOWNFAILURECOUNT
# minStepDownFailureCount = 3
# minStepDownFailureCount: 3

# If the leader is not able to reach the quorum, the leader may step down.
# This is triggered if the leader is not able to reach the quorum of the followers for maxQuorumResponseTimeout.
Expand All @@ -618,13 +618,24 @@
# When the timeout is lower, there might be false positives, and the leader might step down too quickly.
# When this value is 0, it will use a default value of electionTimeout * 2.
# This setting can also be overridden using the environment variable ZEEBE_BROKER_EXPERIMENTAL_RAFT_MAXQUORUMRESPONSETIMEOUT
# maxQuorumResponseTimeout = 0ms
# maxQuorumResponseTimeout: 0ms

# Threshold used by the leader to decide between replicating a snapshot or records.
# The unit is number of records by which the follower may lag behind before the leader
# prefers replicating snapshots instead of records.
# This setting can also be overridden using the environment variable ZEEBE_BROKER_EXPERIMENTAL_RAFT_PREFERSNAPSHOTREPLICATIONTHRESHOLD.
# preferSnapshotReplicationThreshold = 100
# preferSnapshotReplicationThreshold: 100

# Defines whether segment files are pre-allocated to their full size on creation or not. If
# true, when a new segment is created on demand, disk space will be reserved for its full
# maximum size. This helps avoid potential out of disk space errors which can be fatal when
# using memory mapped files, especially when running on network storage. In the best cases,
# it will also allocate contiguous blocks, giving a small performance boost.
#
# You may want to turn this off if your system does not support efficient file allocation
# via system calls, or if you notice an I/O penalty when creating segments.
# This setting can also be overridden using the environment variable ZEEBE_BROKER_EXPERIMENTAL_RAFT_PREALLOCATESEGMENTFILES.
# preallocateSegmentFiles: true

# Allows to configure RocksDB properties, which is used for state management.
# rocksdb:
Expand Down

0 comments on commit a5ecfb7

Please sign in to comment.