Skip to content

Commit

Permalink
merge: #9731
Browse files Browse the repository at this point in the history
9731: Preallocate segment files r=npepinpe a=npepinpe

## Description

This PR introduces segment file pre-allocation in the journal. This is on by default, but can be disabled via an experimental configuration option.

At the moment, the pre-allocation is done in a "dumb" fashion - we allocate a 4Kb blocks of zeroes, and write this until we've reached the expected file length. Note that this means there may be one extra block allocated on disk.

One thing to note, to verify this, we used [jnr-posix](https://github.com/jnr/jnr-posix). The reason behind this is we want to know the actual number of blocks on disk reserved for this file. `Files#size`, or `File#length`, return the reported file size, which is part of the file's metadata (on UNIX systems anyway). If you mmap a file with a size of 1Mb, write one byte, then flush it, the reported size will be 1Mb, but the actual size on disk will be a single block (on most modern UNIX systems anyway). By using [stat](https://linux.die.net/man/2/stat), we can get the actual file size in terms of 512-bytes allocated blocks, so we get a pretty accurate measurement of the actual disk space used by the file.

I would've like to capture this in a test utility, but since `test-util` depends on `util`, there wasn't an easy way to do this, so I just copied the method in two places. One possibility I thought of is moving the whole pre-allocation stuff in `journal`, since we only use it there. The only downside I can see there is about discovery and cohesion, but I'd like to hear your thoughts on this.

A follow-up PR will come which will optimize the pre-allocation by using the [posix_fallocate](https://man7.org/linux/man-pages/man3/posix_fallocate.3.html) on POSIX systems.

Finally, I opted for an experimental configuration option instead of a feature flag. My reasoning is that it isn't a "new" feature, but instead we want to option of disabling this (for performance reasons potentially). So it's more of an advanced option. But I'd also like to hear your thoughts here.

## Related issues

closes #6504
closes #8099
related to #7607  



Co-authored-by: Nicolas Pepin-Perreault <nicolas.pepin-perreault@camunda.com>
  • Loading branch information
zeebe-bors-camunda[bot] and npepinpe committed Jul 11, 2022
2 parents f0d3913 + 5167445 commit 419e1cb
Show file tree
Hide file tree
Showing 22 changed files with 597 additions and 144 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,19 @@ public Builder withPreferSnapshotReplicationThreshold(
return this;
}

/**
* Sets whether segment files are pre-allocated at creation. If true, segment files are
* pre-allocated to the maximum segment size (see {@link #withSegmentSize(long)}) at creation
* before any writes happen.
*
* @param preallocateSegmentFiles true to preallocate files, false otherwise
* @return this builder for chaining
*/
public Builder withPreallocateSegmentFiles(final boolean preallocateSegmentFiles) {
config.getStorageConfig().setPreallocateSegmentFiles(preallocateSegmentFiles);
return this;
}

@Override
public RaftPartitionGroup build() {
return new RaftPartitionGroup(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,14 @@ public class RaftStorageConfig {
private static final long DEFAULT_FREE_DISK_SPACE = 1024L * 1024 * 1024;
private static final int DEFAULT_JOURNAL_INDEX_DENSITY = 100;

private static final boolean DEFAULT_PREALLOCATE_SEGMENT_FILES = true;

private String directory;
private long segmentSize = DEFAULT_MAX_SEGMENT_SIZE;
private boolean flushExplicitly = DEFAULT_FLUSH_EXPLICITLY;
private long freeDiskSpace = DEFAULT_FREE_DISK_SPACE;
private int journalIndexDensity = DEFAULT_JOURNAL_INDEX_DENSITY;
private boolean preallocateSegmentFiles = DEFAULT_PREALLOCATE_SEGMENT_FILES;

@Optional("SnapshotStoreFactory")
private ReceivableSnapshotStoreFactory persistedSnapshotStoreFactory;
Expand Down Expand Up @@ -152,4 +155,21 @@ public RaftStorageConfig setJournalIndexDensity(final int journalIndexDensity) {
this.journalIndexDensity = journalIndexDensity;
return this;
}

/**
* @return true to preallocate segment files, false otherwise
*/
public boolean isPreallocateSegmentFiles() {
return preallocateSegmentFiles;
}

/**
* Sets whether segment files are pre-allocated at creation. If true, segment files are
* pre-allocated to {@link #segmentSize} at creation before any writes happen.
*
* @param preallocateSegmentFiles true to preallocate files, false otherwise
*/
public void setPreallocateSegmentFiles(final boolean preallocateSegmentFiles) {
this.preallocateSegmentFiles = preallocateSegmentFiles;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ private RaftStorage createRaftStorage() {
.withFreeDiskSpace(storageConfig.getFreeDiskSpace())
.withSnapshotStore(persistedSnapshotStore)
.withJournalIndexDensity(storageConfig.getJournalIndexDensity())
.withPreallocateSegmentFiles(storageConfig.isPreallocateSegmentFiles())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public final class RaftStorage {
private final boolean flushExplicitly;
private final ReceivableSnapshotStore persistedSnapshotStore;
private final int journalIndexDensity;
private final boolean preallocateSegmentFiles;

private RaftStorage(
final String prefix,
Expand All @@ -65,14 +66,16 @@ private RaftStorage(
final long freeDiskSpace,
final boolean flushExplicitly,
final ReceivableSnapshotStore persistedSnapshotStore,
final int journalIndexDensity) {
final int journalIndexDensity,
final boolean preallocateSegmentFiles) {
this.prefix = prefix;
this.directory = directory;
this.maxSegmentSize = maxSegmentSize;
this.freeDiskSpace = freeDiskSpace;
this.flushExplicitly = flushExplicitly;
this.persistedSnapshotStore = persistedSnapshotStore;
this.journalIndexDensity = journalIndexDensity;
this.preallocateSegmentFiles = preallocateSegmentFiles;

try {
FileUtil.ensureDirectoryExists(directory.toPath());
Expand Down Expand Up @@ -172,6 +175,7 @@ public RaftLog openLog() {
.withFlushExplicitly(flushExplicitly)
.withJournalIndexDensity(journalIndexDensity)
.withLastWrittenIndex(lastWrittenIndex)
.withPreallocateSegmentFiles(preallocateSegmentFiles)
.build();
}

Expand Down Expand Up @@ -218,6 +222,7 @@ public static final class Builder implements io.atomix.utils.Builder<RaftStorage
private static final long DEFAULT_FREE_DISK_SPACE = 1024L * 1024 * 1024;
private static final boolean DEFAULT_FLUSH_EXPLICITLY = true;
private static final int DEFAULT_JOURNAL_INDEX_DENSITY = 100;
private static final boolean DEFAULT_PREALLOCATE_SEGMENT_FILES = true;

private String prefix = DEFAULT_PREFIX;
private File directory = new File(DEFAULT_DIRECTORY);
Expand All @@ -226,6 +231,7 @@ public static final class Builder implements io.atomix.utils.Builder<RaftStorage
private boolean flushExplicitly = DEFAULT_FLUSH_EXPLICITLY;
private ReceivableSnapshotStore persistedSnapshotStore;
private int journalIndexDensity = DEFAULT_JOURNAL_INDEX_DENSITY;
private boolean preallocateSegmentFiles = DEFAULT_PREALLOCATE_SEGMENT_FILES;

private Builder() {}

Expand Down Expand Up @@ -316,6 +322,19 @@ public Builder withJournalIndexDensity(final int journalIndexDensity) {
return this;
}

/**
* Sets whether segment files are pre-allocated at creation. If true, segment files are
* pre-allocated to the maximum segment size (see {@link #withMaxSegmentSize(int)}}) at creation
* before any writes happen.
*
* @param preallocateSegmentFiles true to preallocate files, false otherwise
* @return this builder for chaining
*/
public Builder withPreallocateSegmentFiles(final boolean preallocateSegmentFiles) {
this.preallocateSegmentFiles = preallocateSegmentFiles;
return this;
}

/**
* Builds the {@link RaftStorage} object.
*
Expand All @@ -330,7 +349,8 @@ public RaftStorage build() {
freeDiskSpace,
flushExplicitly,
persistedSnapshotStore,
journalIndexDensity);
journalIndexDensity,
preallocateSegmentFiles);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,19 @@ public RaftLogBuilder withLastWrittenIndex(final long lastWrittenIndex) {
return this;
}

/**
* Sets whether segment files are pre-allocated at creation. If true, segment files are
* pre-allocated to the maximum segment size (see {@link #withMaxSegmentSize(int)}}) at creation
* before any writes happen.
*
* @param preallocateSegmentFiles true to preallocate files, false otherwise
* @return this builder for chaining
*/
public RaftLogBuilder withPreallocateSegmentFiles(final boolean preallocateSegmentFiles) {
journalBuilder.withPreallocateSegmentFiles(preallocateSegmentFiles);
return this;
}

@Override
public RaftLog build() {
final Journal journal = journalBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ RaftPartitionGroup buildRaftPartitionGroup(
.withMaxQuorumResponseTimeout(experimentalCfg.getRaft().getMaxQuorumResponseTimeout())
.withMinStepDownFailureCount(experimentalCfg.getRaft().getMinStepDownFailureCount())
.withPreferSnapshotReplicationThreshold(
experimentalCfg.getRaft().getPreferSnapshotReplicationThreshold());
experimentalCfg.getRaft().getPreferSnapshotReplicationThreshold())
.withPreallocateSegmentFiles(experimentalCfg.getRaft().isPreallocateSegmentFiles());

final int maxMessageSize = (int) networkCfg.getMaxMessageSizeInBytes();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@ public final class ExperimentalRaftCfg implements ConfigurationEntry {
private static final Duration DEFAULT_MAX_QUORUM_RESPONSE_TIMEOUT = Duration.ofSeconds(0);
private static final int DEFAULT_MIN_STEP_DOWN_FAILURE_COUNT = 3;
private static final int DEFAULT_PREFER_SNAPSHOT_REPLICATION_THRESHOLD = 100;
private static final boolean DEFAULT_PREALLOCATE_SEGMENT_FILES = true;

private Duration requestTimeout = DEFAULT_REQUEST_TIMEOUT;
private Duration maxQuorumResponseTimeout = DEFAULT_MAX_QUORUM_RESPONSE_TIMEOUT;
private int minStepDownFailureCount = DEFAULT_MIN_STEP_DOWN_FAILURE_COUNT;
private int preferSnapshotReplicationThreshold = DEFAULT_PREFER_SNAPSHOT_REPLICATION_THRESHOLD;

private boolean preallocateSegmentFiles = DEFAULT_PREALLOCATE_SEGMENT_FILES;

public Duration getRequestTimeout() {
return requestTimeout;
}
Expand Down Expand Up @@ -52,4 +55,12 @@ public int getPreferSnapshotReplicationThreshold() {
public void setPreferSnapshotReplicationThreshold(final int preferSnapshotReplicationThreshold) {
this.preferSnapshotReplicationThreshold = preferSnapshotReplicationThreshold;
}

public boolean isPreallocateSegmentFiles() {
return preallocateSegmentFiles;
}

public void setPreallocateSegmentFiles(final boolean preallocateSegmentFiles) {
this.preallocateSegmentFiles = preallocateSegmentFiles;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
import io.camunda.zeebe.snapshots.ReceivableSnapshotStoreFactory;
import java.time.Duration;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.springframework.util.unit.DataSize;

class RaftPartitionGroupFactoryTest {

final class RaftPartitionGroupFactoryTest {
private static final ReceivableSnapshotStoreFactory SNAPSHOT_STORE_FACTORY =
(directory, partitionId) -> mock(ReceivableSnapshotStore.class);

Expand Down Expand Up @@ -153,6 +154,19 @@ void shouldSetPreferSnapshotReplicationThreshold() {
assertThat(config.getPartitionConfig().getPreferSnapshotReplicationThreshold()).isEqualTo(1000);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void shouldSetSegmentFilesPreallocation(final boolean value) {
// given
brokerCfg.getExperimental().getRaft().setPreallocateSegmentFiles(value);

// when
final var config = buildRaftPartitionGroup();

// then
assertThat(config.getStorageConfig().isPreallocateSegmentFiles()).isEqualTo(value);
}

private RaftPartitionGroupConfig buildRaftPartitionGroup() {
final var partitionGroup = factory.buildRaftPartitionGroup(brokerCfg, SNAPSHOT_STORE_FACTORY);
return (RaftPartitionGroupConfig) partitionGroup.config();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import org.junit.Test;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;

public class ExperimentalCfgTest {
@Execution(ExecutionMode.CONCURRENT)
final class ExperimentalCfgTest {

public final Map<String, String> environment = new HashMap<>();
final Map<String, String> environment = new HashMap<>();

@Test
public void shouldSetRaftRequestTimeoutFromConfig() {
void shouldSetRaftRequestTimeoutFromConfig() {
// when
final BrokerCfg cfg = TestConfigReader.readConfig("experimental-cfg", environment);
final var raft = cfg.getExperimental().getRaft();
Expand All @@ -29,7 +32,7 @@ public void shouldSetRaftRequestTimeoutFromConfig() {
}

@Test
public void shouldSetRaftRequestTimeoutFromEnv() {
void shouldSetRaftRequestTimeoutFromEnv() {
// given
environment.put("zeebe.broker.experimental.raft.requestTimeout", "15s");

Expand All @@ -42,7 +45,7 @@ public void shouldSetRaftRequestTimeoutFromEnv() {
}

@Test
public void shouldSetRaftMaxQuorumResponseTimeoutFromConfig() {
void shouldSetRaftMaxQuorumResponseTimeoutFromConfig() {
// when
final BrokerCfg cfg = TestConfigReader.readConfig("experimental-cfg", environment);
final var raft = cfg.getExperimental().getRaft();
Expand All @@ -52,7 +55,7 @@ public void shouldSetRaftMaxQuorumResponseTimeoutFromConfig() {
}

@Test
public void shouldSetRaftMaxQuorumResponseTimeoutFromEnv() {
void shouldSetRaftMaxQuorumResponseTimeoutFromEnv() {
// given
environment.put("zeebe.broker.experimental.raft.maxQuorumResponseTimeout", "15s");

Expand All @@ -65,7 +68,7 @@ public void shouldSetRaftMaxQuorumResponseTimeoutFromEnv() {
}

@Test
public void shouldSetRaftMinStepDownFailureCountFromConfig() {
void shouldSetRaftMinStepDownFailureCountFromConfig() {
// when
final BrokerCfg cfg = TestConfigReader.readConfig("experimental-cfg", environment);
final var raft = cfg.getExperimental().getRaft();
Expand All @@ -75,7 +78,7 @@ public void shouldSetRaftMinStepDownFailureCountFromConfig() {
}

@Test
public void shouldSetRaftMinStepDownFailureCountFromEnv() {
void shouldSetRaftMinStepDownFailureCountFromEnv() {
// given
environment.put("zeebe.broker.experimental.raft.minStepDownFailureCount", "10");

Expand All @@ -88,7 +91,7 @@ public void shouldSetRaftMinStepDownFailureCountFromEnv() {
}

@Test
public void shouldSetPreferSnapshotReplicationThresholdFromConfig() {
void shouldSetPreferSnapshotReplicationThresholdFromConfig() {
// when
final BrokerCfg cfg = TestConfigReader.readConfig("experimental-cfg", environment);
final var raft = cfg.getExperimental().getRaft();
Expand All @@ -98,7 +101,7 @@ public void shouldSetPreferSnapshotReplicationThresholdFromConfig() {
}

@Test
public void shouldSetPreferSnapshotReplicationThresholdFromEnv() {
void shouldSetPreferSnapshotReplicationThresholdFromEnv() {
// given
environment.put("zeebe.broker.experimental.raft.preferSnapshotReplicationThreshold", "10");

Expand All @@ -111,7 +114,7 @@ public void shouldSetPreferSnapshotReplicationThresholdFromEnv() {
}

@Test
public void shouldSetEnablePreconditionsFromConfig() {
void shouldSetEnablePreconditionsFromConfig() {
// when
final BrokerCfg cfg = TestConfigReader.readConfig("experimental-cfg", environment);
final var consistencyChecks = cfg.getExperimental().getConsistencyChecks();
Expand All @@ -121,7 +124,7 @@ public void shouldSetEnablePreconditionsFromConfig() {
}

@Test
public void shouldSetEnablePreconditionsFromEnv() {
void shouldSetEnablePreconditionsFromEnv() {
// given
environment.put("zeebe.broker.experimental.consistencyChecks.enablePreconditions", "false");

Expand All @@ -134,7 +137,7 @@ public void shouldSetEnablePreconditionsFromEnv() {
}

@Test
public void shouldSetEnableForeignKeyChecksFromConfig() {
void shouldSetEnableForeignKeyChecksFromConfig() {
// when
final BrokerCfg cfg = TestConfigReader.readConfig("experimental-cfg", environment);
final var consistencyChecks = cfg.getExperimental().getConsistencyChecks();
Expand All @@ -144,7 +147,7 @@ public void shouldSetEnableForeignKeyChecksFromConfig() {
}

@Test
public void shouldSetEnableForeignKeyChecksFromEnv() {
void shouldSetEnableForeignKeyChecksFromEnv() {
// given
environment.put("zeebe.broker.experimental.consistencyChecks.enableForeignKeyChecks", "false");

Expand All @@ -155,4 +158,27 @@ public void shouldSetEnableForeignKeyChecksFromEnv() {
// then
assertThat(consistencyChecks.isEnableForeignKeyChecks()).isFalse();
}

@Test
void shouldSetPreallocateSegmentFilesFromEnv() {
// given
environment.put("zeebe.broker.experimental.raft.preallocateSegmentFiles", "false");

// when
final BrokerCfg cfg = TestConfigReader.readConfig("experimental-cfg", environment);
final var raftCfg = cfg.getExperimental().getRaft();

// then
assertThat(raftCfg.isPreallocateSegmentFiles()).isFalse();
}

@Test
void shouldSetPreallocateSegmentFilesFromConfig() {
// when
final BrokerCfg cfg = TestConfigReader.readConfig("experimental-cfg", environment);
final var raftCfg = cfg.getExperimental().getRaft();

// then
assertThat(raftCfg.isPreallocateSegmentFiles()).isTrue();
}
}

0 comments on commit 419e1cb

Please sign in to comment.