Skip to content

Commit

Permalink
Introduce a recovery strategy as a replacement for set_discard_if_cor…
Browse files Browse the repository at this point in the history
…rupted

The user can now choose between discarding corrupted data, moving it out
of the way (into another file) and starting with an empty database or
bubbling up the error.
  • Loading branch information
badboy committed Mar 24, 2023
1 parent 207d04f commit b858665
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 21 deletions.
13 changes: 13 additions & 0 deletions src/backend/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,16 @@ pub enum WriteFlags {
APPEND,
APPEND_DUP,
}

/// Strategy to use when corrupted data is detected while opening a database.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum RecoveryStrategy {
/// Bubble up the error on detecting a corrupted data file. The default.
Error,

/// Discard the corrupted data and start with an empty database.
Discard,

/// Move the corrupted data file to `$file.corrupt` and start with an empty database.
Rename,
}
4 changes: 3 additions & 1 deletion src/backend/impl_lmdb/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use super::{
DatabaseFlagsImpl, DatabaseImpl, EnvironmentFlagsImpl, ErrorImpl, InfoImpl, RoTransactionImpl,
RwTransactionImpl, StatImpl,
};
use crate::backend::common::RecoveryStrategy;
use crate::backend::traits::{
BackendEnvironment, BackendEnvironmentBuilder, BackendInfo, BackendIter, BackendRoCursor,
BackendRoCursorTransaction, BackendStat,
Expand Down Expand Up @@ -86,7 +87,8 @@ impl<'b> BackendEnvironmentBuilder<'b> for EnvironmentBuilderImpl {
self
}

fn set_discard_if_corrupted(&mut self, _discard_if_corrupted: bool) -> &mut Self {
/// **UNIMPLEMENTED.** Will panic at runtime.
fn set_corruption_recovery_strategy(&mut self, _strategy: RecoveryStrategy) -> &mut Self {
// Unfortunately, when opening a database, LMDB doesn't handle all the ways it could have
// been corrupted. Prefer using the `SafeMode` backend if this is important.
unimplemented!();
Expand Down
49 changes: 34 additions & 15 deletions src/backend/impl_safe/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ use super::{
database::Database, DatabaseFlagsImpl, DatabaseImpl, EnvironmentFlagsImpl, ErrorImpl, InfoImpl,
RoTransactionImpl, RwTransactionImpl, StatImpl,
};
use crate::backend::common::RecoveryStrategy;
use crate::backend::traits::{BackendEnvironment, BackendEnvironmentBuilder};

const DEFAULT_DB_FILENAME: &str = "data.safe.bin";
const DEFAULT_CORRUPT_DB_EXTENSION: &str = "bin.corrupt";

type DatabaseArena = Arena<Database>;
type DatabaseNameMap = HashMap<Option<String>, DatabaseImpl>;
Expand All @@ -38,7 +40,7 @@ pub struct EnvironmentBuilderImpl {
max_dbs: Option<usize>,
map_size: Option<usize>,
make_dir_if_needed: bool,
discard_if_corrupted: bool,
corruption_recovery_strategy: RecoveryStrategy,
}

impl<'b> BackendEnvironmentBuilder<'b> for EnvironmentBuilderImpl {
Expand All @@ -53,7 +55,7 @@ impl<'b> BackendEnvironmentBuilder<'b> for EnvironmentBuilderImpl {
max_dbs: None,
map_size: None,
make_dir_if_needed: false,
discard_if_corrupted: false,
corruption_recovery_strategy: RecoveryStrategy::Error,
}
}

Expand Down Expand Up @@ -85,8 +87,8 @@ impl<'b> BackendEnvironmentBuilder<'b> for EnvironmentBuilderImpl {
self
}

fn set_discard_if_corrupted(&mut self, discard_if_corrupted: bool) -> &mut Self {
self.discard_if_corrupted = discard_if_corrupted;
fn set_corruption_recovery_strategy(&mut self, strategy: RecoveryStrategy) -> &mut Self {
self.corruption_recovery_strategy = strategy;
self
}

Expand All @@ -106,7 +108,7 @@ impl<'b> BackendEnvironmentBuilder<'b> for EnvironmentBuilderImpl {
self.max_dbs,
self.map_size,
)?;
env.read_from_disk(self.discard_if_corrupted)?;
env.read_from_disk(self.corruption_recovery_strategy)?;
Ok(env)
}
}
Expand Down Expand Up @@ -152,16 +154,32 @@ impl EnvironmentImpl {
Ok(bincode::serialize(&data)?)
}

fn deserialize(
bytes: &[u8],
discard_if_corrupted: bool,
fn load(
path: &Path,
strategy: RecoveryStrategy,
) -> Result<(DatabaseArena, DatabaseNameMap), ErrorImpl> {
let bytes = fs::read(&path)?;

match Self::deserialize(&bytes) {
Ok((arena, name_map)) => Ok((arena, name_map)),
Err(err) => match strategy {
RecoveryStrategy::Error => Err(err),
RecoveryStrategy::Discard => Ok((DatabaseArena::new(), HashMap::new())),
RecoveryStrategy::Rename => {
let corrupted_path = path.with_extension(DEFAULT_CORRUPT_DB_EXTENSION);
fs::rename(&path, &corrupted_path)?;

Ok((DatabaseArena::new(), HashMap::new()))
}
},
}
}

fn deserialize(bytes: &[u8]) -> Result<(DatabaseArena, DatabaseNameMap), ErrorImpl> {
let mut arena = DatabaseArena::new();
let mut name_map = HashMap::new();
let data: HashMap<_, _> = match bincode::deserialize(bytes) {
Err(_) if discard_if_corrupted => Ok(HashMap::new()),
result => result,
}?;
let data: HashMap<_, _> = bincode::deserialize(bytes)?;

for (name, db) in data {
name_map.insert(name, DatabaseImpl(arena.alloc(db)));
}
Expand Down Expand Up @@ -199,15 +217,15 @@ impl EnvironmentImpl {
})
}

pub(crate) fn read_from_disk(&mut self, discard_if_corrupted: bool) -> Result<(), ErrorImpl> {
pub(crate) fn read_from_disk(&mut self, strategy: RecoveryStrategy) -> Result<(), ErrorImpl> {
let mut path = Cow::from(&self.path);
if fs::metadata(&path)?.is_dir() {
path.to_mut().push(DEFAULT_DB_FILENAME);
};
if fs::metadata(&path).is_err() {
return Ok(());
};
let (arena, name_map) = Self::deserialize(&fs::read(&path)?, discard_if_corrupted)?;
let (arena, name_map) = Self::load(&path, strategy)?;
self.dbs = RwLock::new(EnvironmentDbs { arena, name_map });
Ok(())
}
Expand Down Expand Up @@ -272,7 +290,8 @@ impl<'e> BackendEnvironment<'e> for EnvironmentImpl {
// TOOD: don't reallocate `name`.
let key = name.map(String::from);
let mut dbs = self.dbs.write().map_err(|_| ErrorImpl::EnvPoisonError)?;
if dbs.name_map.keys().filter_map(|k| k.as_ref()).count() >= self.max_dbs && name.is_some() {
if dbs.name_map.keys().filter_map(|k| k.as_ref()).count() >= self.max_dbs && name.is_some()
{
return Err(ErrorImpl::DbsFull);
}
let parts = EnvironmentDbsRefMut::from(dbs.deref_mut());
Expand Down
5 changes: 3 additions & 2 deletions src/backend/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::{
};

use crate::{
backend::common::{DatabaseFlags, EnvironmentFlags, WriteFlags},
backend::common::{DatabaseFlags, EnvironmentFlags, RecoveryStrategy, WriteFlags},
error::StoreError,
};

Expand Down Expand Up @@ -83,7 +83,8 @@ pub trait BackendEnvironmentBuilder<'b>: Debug + Eq + PartialEq + Copy + Clone {

fn set_make_dir_if_needed(&mut self, make_dir_if_needed: bool) -> &mut Self;

fn set_discard_if_corrupted(&mut self, discard_if_corrupted: bool) -> &mut Self;
/// Set the corruption recovery strategy. See [`RecoveryStrategy`] for details.
fn set_corruption_recovery_strategy(&mut self, strategy: RecoveryStrategy) -> &mut Self;

fn open(&self, path: &Path) -> Result<Self::Environment, Self::Error>;
}
Expand Down
2 changes: 1 addition & 1 deletion src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ lazy_static! {
/// is true by default, because it helps enforce the constraints guaranteed by
/// this manager. However, path canonicalization might crash in some fringe
/// circumstances, so the `no-canonicalize-path` feature offers the possibility of
/// disabling it. See: https://bugzilla.mozilla.org/show_bug.cgi?id=1531887
/// disabling it. See: <https://bugzilla.mozilla.org/show_bug.cgi?id=1531887>
///
/// When path canonicalization is disabled, you *must* ensure an RKV environment is
/// always created or retrieved with the same path.
Expand Down
98 changes: 96 additions & 2 deletions tests/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use tempfile::Builder;
#[cfg(feature = "lmdb")]
use rkv::backend::{Lmdb, LmdbEnvironment};
use rkv::{
backend::{BackendEnvironmentBuilder, SafeMode, SafeModeEnvironment},
backend::{BackendEnvironmentBuilder, RecoveryStrategy, SafeMode, SafeModeEnvironment},
CloseOptions, Rkv, StoreOptions, Value,
};

Expand Down Expand Up @@ -247,7 +247,7 @@ fn test_safe_mode_corrupt_while_open_1() {

// But we can use a builder and pass `discard_if_corrupted` to deal with it.
let mut builder = Rkv::environment_builder::<SafeMode>();
builder.set_discard_if_corrupted(true);
builder.set_corruption_recovery_strategy(RecoveryStrategy::Discard);
manager
.get_or_create_from_builder(root.path(), builder, Rkv::from_builder::<SafeMode>)
.expect("created");
Expand Down Expand Up @@ -378,3 +378,97 @@ fn test_safe_mode_corrupt_while_open_2() {
Some(Value::Str("byé, yöu"))
);
}

/// Test how the manager can discard corrupted databases, while moving the corrupted one aside for
/// later inspection.
#[test]
fn test_safe_mode_corrupt_while_open_3() {
type Manager = rkv::Manager<SafeModeEnvironment>;

let root = Builder::new()
.prefix("test_safe_mode_corrupt_while_open_3")
.tempdir()
.expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");

// Verify it was flushed to disk.
let mut safebin = root.path().to_path_buf();
safebin.push("data.safe.bin");

// Oops, corruption.
fs::write(&safebin, "bogus").expect("dbfile corrupted");
assert!(safebin.exists());

// Create environment.
let mut manager = Manager::singleton().write().unwrap();

// Recreating environment fails.
manager
.get_or_create(root.path(), Rkv::new::<SafeMode>)
.expect_err("not created");
assert!(manager.get(root.path()).expect("success").is_none());

// But we can use a builder and pass `RecoveryStrategy::Rename` to deal with it.
let mut builder = Rkv::environment_builder::<SafeMode>();
builder.set_corruption_recovery_strategy(RecoveryStrategy::Rename);
manager
.get_or_create_from_builder(root.path(), builder, Rkv::from_builder::<SafeMode>)
.expect("created");
assert!(manager.get(root.path()).expect("success").is_some());

// Database file was moved out of the way.
assert!(!safebin.exists());

let mut corruptbin = root.path().to_path_buf();
corruptbin.push("data.safe.bin.corrupt");
assert!(corruptbin.exists());

let shared_env = manager
.get_or_create(root.path(), Rkv::new::<SafeMode>)
.expect("created");
let env = shared_env.read().unwrap();

// Writing still works.
let store = env
.open_single("store", StoreOptions::create())
.expect("opened");

// Nothing to be read.
let reader = env.read().expect("reader");
assert_eq!(store.get(&reader, "foo").expect("read"), None);

// We can write.
let mut writer = env.write().expect("writer");
store
.put(&mut writer, "foo", &Value::I64(5678))
.expect("wrote");
writer.commit().expect("committed");
env.sync(true).expect("synced");

// Database file exists.
assert!(safebin.exists());

// Close everything.
drop(env);
drop(shared_env);
manager
.try_close(root.path(), CloseOptions::default())
.expect("closed without deleting");
assert!(manager.get(root.path()).expect("success").is_none());

// Recreate environment.
let shared_env = manager
.get_or_create(root.path(), Rkv::new::<SafeMode>)
.expect("created");
let env = shared_env.read().unwrap();

// Verify that the dbfile is not corrupted.
let store = env
.open_single("store", StoreOptions::default())
.expect("opened");
let reader = env.read().expect("reader");
assert_eq!(
store.get(&reader, "foo").expect("read"),
Some(Value::I64(5678))
);
}

0 comments on commit b858665

Please sign in to comment.