Skip to content

Commit

Permalink
merge: #10683
Browse files Browse the repository at this point in the history
10683: Ensure raft storage lock file is update atomically r=deepthidevaki a=deepthidevaki

## Description

Previously the file creation and updating the contents were not done atomically. Moreover the content of the files were not flushed immediately. Because of this, if the pod restarts there is a chance the lock file exists but it is empty. As a result, a new lock cannot be acquired and the partition startup fails. 

To fix this, we first the write to a temporary file with "SYNC" option and then move the file atomically to the actual lock file.

Existing tests are refactored. No new test is added to verify this, as it is difficult to simulate crashes while acquiring the lock.

## Related issues

closes #10681 



Co-authored-by: Deepthi Devaki Akkoorath <deepthidevaki@gmail.com>
  • Loading branch information
zeebe-bors-camunda[bot] and deepthidevaki committed Oct 13, 2022
2 parents 3524c51 + 8694c15 commit fe37687
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.nio.file.StandardOpenOption;

/**
Expand Down Expand Up @@ -110,17 +112,30 @@ public String prefix() {
* @return indicates whether the lock was successfully acquired
*/
public boolean lock(final String id) {
final File file = new File(directory, String.format(".%s.lock", prefix));
final File lockFile = new File(directory, String.format(".%s.lock", prefix));
final File tempLockFile = new File(directory, String.format(".%s.lock.tmp", id));
try {
if (file.createNewFile()) {
Files.writeString(file.toPath(), id, StandardOpenOption.WRITE);
return true;
} else {
final String lock = Files.readString(file.toPath());
return lock != null && lock.equals(id);
if (!lockFile.exists()) {
// Create and update the file atomically
Files.writeString(
tempLockFile.toPath(),
id,
StandardOpenOption.CREATE,
StandardOpenOption.TRUNCATE_EXISTING,
StandardOpenOption.WRITE,
StandardOpenOption.SYNC);

// If two nodes tries to acquire lock, move will fail with FileAlreadyExistsException
FileUtil.moveDurably(
tempLockFile.toPath(), lockFile.toPath(), StandardCopyOption.ATOMIC_MOVE);
}
// Read the lock file again to ensure that contents matches the local id
final String lock = Files.readString(lockFile.toPath());
return lock != null && lock.equals(id);
} catch (final FileAlreadyExistsException e) {
return false;
} catch (final IOException e) {
throw new StorageException("Failed to acquire storage lock");
throw new StorageException("Failed to acquire storage lock", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,14 @@
*/
package io.atomix.raft.storage;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.assertj.core.api.Assertions.assertThat;

import io.camunda.zeebe.util.FileUtil;
import java.io.File;
import java.io.IOException;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/** Raft storage test. */
Expand All @@ -38,14 +32,14 @@ public class RaftStorageTest {
private static final Path PATH = Paths.get("target/test-logs/");

@Test
public void testDefaultConfiguration() throws Exception {
public void testDefaultConfiguration() {
final RaftStorage storage = RaftStorage.builder().build();
assertEquals("atomix", storage.prefix());
assertEquals(new File(System.getProperty("user.dir")), storage.directory());
assertThat(storage.prefix()).isEqualTo("atomix");
assertThat(storage.directory()).isEqualTo(new File(System.getProperty("user.dir")));
}

@Test
public void testCustomConfiguration() throws Exception {
public void testCustomConfiguration() {
final RaftStorage storage =
RaftStorage.builder()
.withPrefix("foo")
Expand All @@ -54,49 +48,54 @@ public void testCustomConfiguration() throws Exception {
.withFreeDiskSpace(100)
.withFlushExplicitly(false)
.build();
assertEquals("foo", storage.prefix());
assertEquals(new File(PATH.toFile(), "foo"), storage.directory());
assertThat(storage.prefix()).isEqualTo("foo");
assertThat(storage.directory()).isEqualTo(new File(PATH.toFile(), "foo"));
}

@Test
public void testStorageLock() throws Exception {
public void canAcquireLockOnEmptyDirectory() {
// given empty directory in PATH

// when
final RaftStorage storage1 =
RaftStorage.builder().withDirectory(PATH.toFile()).withPrefix("test").build();

assertTrue(storage1.lock("a"));
// then
assertThat(storage1.lock("a")).isTrue();
}

@Test
public void cannotLockAlreadyLockedDirectory() {
// given
final RaftStorage storage1 =
RaftStorage.builder().withDirectory(PATH.toFile()).withPrefix("test").build();
storage1.lock("a");

// when
final RaftStorage storage2 =
RaftStorage.builder().withDirectory(PATH.toFile()).withPrefix("test").build();

assertFalse(storage2.lock("b"));
// then
assertThat(storage2.lock("b")).isFalse();
}

@Test
public void canAcquireLockOnDirectoryLockedBySameNode() {
// given
final RaftStorage storage1 =
RaftStorage.builder().withDirectory(PATH.toFile()).withPrefix("test").build();
storage1.lock("a");

// when
final RaftStorage storage3 =
RaftStorage.builder().withDirectory(PATH.toFile()).withPrefix("test").build();

assertTrue(storage3.lock("a"));
// then
assertThat(storage3.lock("a")).isTrue();
}

@Before
@After
public void cleanupStorage() throws IOException {
if (Files.exists(PATH)) {
Files.walkFileTree(
PATH,
new SimpleFileVisitor<>() {
@Override
public FileVisitResult visitFile(final Path file, final BasicFileAttributes attrs)
throws IOException {
Files.delete(file);
return FileVisitResult.CONTINUE;
}

@Override
public FileVisitResult postVisitDirectory(final Path dir, final IOException exc)
throws IOException {
Files.delete(dir);
return FileVisitResult.CONTINUE;
}
});
}
FileUtil.deleteFolderIfExists(PATH);
}
}

0 comments on commit fe37687

Please sign in to comment.