From e2722612892dce8cc463c84c4db844123681bacc Mon Sep 17 00:00:00 2001 From: Andrea Corradi Date: Sat, 13 Feb 2021 00:09:21 +0100 Subject: [PATCH 1/6] Avoid checkpoint to outlive db --- src/checkpoint.rs | 15 ++++++++++----- tests/fail/checkpoint_outlive_db.rs | 8 ++++++++ tests/fail/checkpoint_outlive_db.stderr | 10 ++++++++++ tests/test_checkpoint.rs | 6 ++++++ 4 files changed, 34 insertions(+), 5 deletions(-) create mode 100644 tests/fail/checkpoint_outlive_db.rs create mode 100644 tests/fail/checkpoint_outlive_db.stderr diff --git a/src/checkpoint.rs b/src/checkpoint.rs index b1bd6678d..19f2313b9 100644 --- a/src/checkpoint.rs +++ b/src/checkpoint.rs @@ -19,6 +19,7 @@ use crate::{ffi, Error, DB}; use std::ffi::CString; +use std::marker::PhantomData; use std::path::Path; /// Undocumented parameter for `ffi::rocksdb_checkpoint_create` function. Zero by default. @@ -26,16 +27,17 @@ const LOG_SIZE_FOR_FLUSH: u64 = 0_u64; /// Database's checkpoint object. /// Used to create checkpoints of the specified DB from time to time. -pub struct Checkpoint { +pub struct Checkpoint<'db> { inner: *mut ffi::rocksdb_checkpoint_t, + _db: PhantomData<&'db ()>, } -impl Checkpoint { +impl<'db> Checkpoint<'db> { /// Creates new checkpoint object for specific DB. /// /// Does not actually produce checkpoints, call `.create_checkpoint()` method to produce /// a DB checkpoint. - pub fn new(db: &DB) -> Result { + pub fn new(db: &'db DB) -> Result, Error> { let checkpoint: *mut ffi::rocksdb_checkpoint_t; unsafe { checkpoint = ffi_try!(ffi::rocksdb_checkpoint_object_create(db.inner)) }; @@ -44,7 +46,10 @@ impl Checkpoint { return Err(Error::new("Could not create checkpoint object.".to_owned())); } - Ok(Checkpoint { inner: checkpoint }) + Ok(Checkpoint { + inner: checkpoint, + _db: PhantomData, + }) } /// Creates new physical DB checkpoint in directory specified by `path`. @@ -70,7 +75,7 @@ impl Checkpoint { } } -impl Drop for Checkpoint { +impl<'db> Drop for Checkpoint<'db> { fn drop(&mut self) { unsafe { ffi::rocksdb_checkpoint_object_destroy(self.inner); diff --git a/tests/fail/checkpoint_outlive_db.rs b/tests/fail/checkpoint_outlive_db.rs new file mode 100644 index 000000000..d8400e008 --- /dev/null +++ b/tests/fail/checkpoint_outlive_db.rs @@ -0,0 +1,8 @@ +use rocksdb::{DB, checkpoint::Checkpoint}; + +fn main() { + let _checkpoint = { + let db = DB::open_default("foo").unwrap(); + Checkpoint::new(&db) + }; +} diff --git a/tests/fail/checkpoint_outlive_db.stderr b/tests/fail/checkpoint_outlive_db.stderr new file mode 100644 index 000000000..7bb598c57 --- /dev/null +++ b/tests/fail/checkpoint_outlive_db.stderr @@ -0,0 +1,10 @@ +error[E0597]: `db` does not live long enough + --> $DIR/checkpoint_outlive_db.rs:6:25 + | +4 | let _checkpoint = { + | ----------- borrow later stored here +5 | let db = DB::open_default("foo").unwrap(); +6 | Checkpoint::new(&db) + | ^^^ borrowed value does not live long enough +7 | }; + | - `db` dropped here while still borrowed diff --git a/tests/test_checkpoint.rs b/tests/test_checkpoint.rs index 01e4dc5d0..a7668bb58 100644 --- a/tests/test_checkpoint.rs +++ b/tests/test_checkpoint.rs @@ -99,3 +99,9 @@ pub fn test_multi_checkpoints() { assert_eq!(cp.get(b"k5").unwrap().unwrap(), b"v5"); assert_eq!(cp.get(b"k6").unwrap().unwrap(), b"v6"); } + +#[test] +fn test_checkpoint_outlive_db() { + let t = trybuild::TestCases::new(); + t.compile_fail("tests/fail/checkpoint_outlive_db.rs"); +} From 02601b2f100d3dbab83fe14d8f0d822b9f6e2f77 Mon Sep 17 00:00:00 2001 From: Andrea Corradi Date: Sat, 13 Feb 2021 15:03:18 +0100 Subject: [PATCH 2/6] Add test: snapshot outlive db must fail --- tests/fail/snapshot_outlive_db.rs | 8 ++++++++ tests/fail/snapshot_outlive_db.stderr | 10 ++++++++++ tests/test_db.rs | 6 ++++++ 3 files changed, 24 insertions(+) create mode 100644 tests/fail/snapshot_outlive_db.rs create mode 100644 tests/fail/snapshot_outlive_db.stderr diff --git a/tests/fail/snapshot_outlive_db.rs b/tests/fail/snapshot_outlive_db.rs new file mode 100644 index 000000000..09141d90e --- /dev/null +++ b/tests/fail/snapshot_outlive_db.rs @@ -0,0 +1,8 @@ +use rocksdb::DB; + +fn main() { + let _snapshot = { + let db = DB::open_default("foo").unwrap(); + db.snapshot() + }; +} diff --git a/tests/fail/snapshot_outlive_db.stderr b/tests/fail/snapshot_outlive_db.stderr new file mode 100644 index 000000000..9720e6bf5 --- /dev/null +++ b/tests/fail/snapshot_outlive_db.stderr @@ -0,0 +1,10 @@ +error[E0597]: `db` does not live long enough + --> $DIR/snapshot_outlive_db.rs:6:9 + | +4 | let _snapshot = { + | --------- borrow later stored here +5 | let db = DB::open_default("foo").unwrap(); +6 | db.snapshot() + | ^^ borrowed value does not live long enough +7 | }; + | - `db` dropped here while still borrowed diff --git a/tests/test_db.rs b/tests/test_db.rs index 9c82c5e00..0826d93f6 100644 --- a/tests/test_db.rs +++ b/tests/test_db.rs @@ -946,3 +946,9 @@ fn multi_get_cf() { assert_eq!(values[2], b"v2"); } } + +#[test] +fn test_snapshot_outlive_db() { + let t = trybuild::TestCases::new(); + t.compile_fail("tests/fail/snapshot_outlive_db.rs"); +} From 04a6c8cb1060f3a8b207b7b118317e072e25a04b Mon Sep 17 00:00:00 2001 From: Andrea Corradi Date: Sun, 14 Feb 2021 20:20:47 +0100 Subject: [PATCH 3/6] Keep Cache and Env alive with Rc --- src/db.rs | 4 ++ src/db_options.rs | 128 ++++++++++++++++++++++++++++++++++------------ src/perf.rs | 2 +- 3 files changed, 101 insertions(+), 33 deletions(-) diff --git a/src/db.rs b/src/db.rs index 684d6b172..6e8e7d2ba 100644 --- a/src/db.rs +++ b/src/db.rs @@ -16,6 +16,7 @@ use crate::{ column_family::AsColumnFamilyRef, column_family::BoundColumnFamily, + db_options::OptionsMustOutliveDB, ffi, ffi_util::{from_cstr, opt_bytes_to_ptr, raw_data, to_cpath}, ColumnFamily, ColumnFamilyDescriptor, CompactOptions, DBIteratorWithThreadMode, @@ -102,6 +103,7 @@ pub struct DBWithThreadMode { pub(crate) inner: *mut ffi::rocksdb_t, cfs: T, // Column families are held differently depending on thread mode path: PathBuf, + _outlive: OptionsMustOutliveDB, } /// Minimal set of DB-related methods, intended to be generic over @@ -238,6 +240,7 @@ impl DBWithThreadMode { inner: db, cfs: T::new(BTreeMap::new()), path: path.as_ref().to_path_buf(), + _outlive: opts.outlive.clone(), }) } @@ -401,6 +404,7 @@ impl DBWithThreadMode { inner: db, path: path.as_ref().to_path_buf(), cfs: T::new(cf_map), + _outlive: opts.outlive.clone(), }) } diff --git a/src/db_options.rs b/src/db_options.rs index c99207e38..9252c800e 100644 --- a/src/db_options.rs +++ b/src/db_options.rs @@ -15,6 +15,7 @@ use std::ffi::{CStr, CString}; use std::mem; use std::path::Path; +use std::rc::Rc; use libc::{self, c_char, c_int, c_uchar, c_uint, c_void, size_t}; @@ -35,10 +36,20 @@ fn new_cache(capacity: size_t) -> *mut ffi::rocksdb_cache_t { unsafe { ffi::rocksdb_cache_create_lru(capacity) } } -pub struct Cache { +pub(crate) struct CacheWrapper { pub(crate) inner: *mut ffi::rocksdb_cache_t, } +impl Drop for CacheWrapper { + fn drop(&mut self) { + unsafe { + ffi::rocksdb_cache_destroy(self.inner); + } + } +} + +pub struct Cache(pub(crate) Rc); + impl Cache { /// Create a lru cache with capacity pub fn new_lru_cache(capacity: size_t) -> Result { @@ -46,33 +57,29 @@ impl Cache { if cache.is_null() { Err(Error::new("Could not create Cache".to_owned())) } else { - Ok(Cache { inner: cache }) + Ok(Cache(Rc::new(CacheWrapper { inner: cache }))) } } /// Returns the Cache memory usage pub fn get_usage(&self) -> usize { - unsafe { ffi::rocksdb_cache_get_usage(self.inner) } + unsafe { ffi::rocksdb_cache_get_usage(self.0.inner) } } /// Returns pinned memory usage pub fn get_pinned_usage(&self) -> usize { - unsafe { ffi::rocksdb_cache_get_pinned_usage(self.inner) } + unsafe { ffi::rocksdb_cache_get_pinned_usage(self.0.inner) } } /// Sets cache capacity pub fn set_capacity(&mut self, capacity: size_t) { unsafe { - ffi::rocksdb_cache_set_capacity(self.inner, capacity); + ffi::rocksdb_cache_set_capacity(self.0.inner, capacity); } } -} -impl Drop for Cache { - fn drop(&mut self) { - unsafe { - ffi::rocksdb_cache_destroy(self.inner); - } + fn clone(&self) -> Self { + Self(self.0.clone()) } } @@ -86,11 +93,11 @@ impl Drop for Cache { /// /// Note: currently, C API behinds C++ API for various settings. /// See also: `rocksdb/include/env.h` -pub struct Env { - pub(crate) inner: *mut ffi::rocksdb_env_t, +struct EnvWrapper { + inner: *mut ffi::rocksdb_env_t, } -impl Drop for Env { +impl Drop for EnvWrapper { fn drop(&mut self) { unsafe { ffi::rocksdb_env_destroy(self.inner); @@ -98,6 +105,8 @@ impl Drop for Env { } } +pub struct Env(Rc); + impl Env { /// Returns default env pub fn default() -> Result { @@ -105,7 +114,7 @@ impl Env { if env.is_null() { Err(Error::new("Could not create mem env".to_owned())) } else { - Ok(Env { inner: env }) + Ok(Env(Rc::new(EnvWrapper { inner: env }))) } } @@ -116,7 +125,7 @@ impl Env { if env.is_null() { Err(Error::new("Could not create mem env".to_owned())) } else { - Ok(Env { inner: env }) + Ok(Env(Rc::new(EnvWrapper { inner: env }))) } } @@ -126,7 +135,7 @@ impl Env { /// Default: 1 pub fn set_background_threads(&mut self, num_threads: c_int) { unsafe { - ffi::rocksdb_env_set_background_threads(self.inner, num_threads); + ffi::rocksdb_env_set_background_threads(self.0.inner, num_threads); } } @@ -134,7 +143,7 @@ impl Env { /// prevent compactions from stalling memtable flushes. pub fn set_high_priority_background_threads(&mut self, n: c_int) { unsafe { - ffi::rocksdb_env_set_high_priority_background_threads(self.inner, n); + ffi::rocksdb_env_set_high_priority_background_threads(self.0.inner, n); } } @@ -142,7 +151,7 @@ impl Env { /// prevent compactions from stalling memtable flushes. pub fn set_low_priority_background_threads(&mut self, n: c_int) { unsafe { - ffi::rocksdb_env_set_low_priority_background_threads(self.inner, n); + ffi::rocksdb_env_set_low_priority_background_threads(self.0.inner, n); } } @@ -150,42 +159,81 @@ impl Env { /// prevent compactions from stalling memtable flushes. pub fn set_bottom_priority_background_threads(&mut self, n: c_int) { unsafe { - ffi::rocksdb_env_set_bottom_priority_background_threads(self.inner, n); + ffi::rocksdb_env_set_bottom_priority_background_threads(self.0.inner, n); } } /// Wait for all threads started by StartThread to terminate. pub fn join_all_threads(&mut self) { unsafe { - ffi::rocksdb_env_join_all_threads(self.inner); + ffi::rocksdb_env_join_all_threads(self.0.inner); } } /// Lowering IO priority for threads from the specified pool. pub fn lower_thread_pool_io_priority(&mut self) { unsafe { - ffi::rocksdb_env_lower_thread_pool_io_priority(self.inner); + ffi::rocksdb_env_lower_thread_pool_io_priority(self.0.inner); } } /// Lowering IO priority for high priority thread pool. pub fn lower_high_priority_thread_pool_io_priority(&mut self) { unsafe { - ffi::rocksdb_env_lower_high_priority_thread_pool_io_priority(self.inner); + ffi::rocksdb_env_lower_high_priority_thread_pool_io_priority(self.0.inner); } } /// Lowering CPU priority for threads from the specified pool. pub fn lower_thread_pool_cpu_priority(&mut self) { unsafe { - ffi::rocksdb_env_lower_thread_pool_cpu_priority(self.inner); + ffi::rocksdb_env_lower_thread_pool_cpu_priority(self.0.inner); } } /// Lowering CPU priority for high priority thread pool. pub fn lower_high_priority_thread_pool_cpu_priority(&mut self) { unsafe { - ffi::rocksdb_env_lower_high_priority_thread_pool_cpu_priority(self.inner); + ffi::rocksdb_env_lower_high_priority_thread_pool_cpu_priority(self.0.inner); + } + } + + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} + +#[derive(Default)] +pub(crate) struct OptionsMustOutliveDB { + env: Option, + row_cache: Option, + block_based: Option, +} + +impl OptionsMustOutliveDB { + pub(crate) fn clone(&self) -> Self { + Self { + env: self.env.as_ref().map(Env::clone), + row_cache: self.row_cache.as_ref().map(Cache::clone), + block_based: self + .block_based + .as_ref() + .map(BlockBasedOptionsMustOutliveDB::clone), + } + } +} + +#[derive(Default)] +pub struct BlockBasedOptionsMustOutliveDB { + block_cache: Option, + block_cache_compressed: Option, +} + +impl BlockBasedOptionsMustOutliveDB { + fn clone(&self) -> Self { + Self { + block_cache: self.block_cache.as_ref().map(Cache::clone), + block_cache_compressed: self.block_cache_compressed.as_ref().map(Cache::clone), } } } @@ -226,6 +274,7 @@ impl Env { /// ``` pub struct Options { pub(crate) inner: *mut ffi::rocksdb_options_t, + pub(crate) outlive: OptionsMustOutliveDB, } /// Optionally disable WAL or sync for this write. @@ -284,6 +333,7 @@ pub struct FlushOptions { /// For configuring block-based file storage. pub struct BlockBasedOptions { pub(crate) inner: *mut ffi::rocksdb_block_based_table_options_t, + outlive: BlockBasedOptionsMustOutliveDB, } pub struct ReadOptions { @@ -351,7 +401,10 @@ impl Clone for Options { if inner.is_null() { panic!("Could not copy RocksDB options"); } - Self { inner } + Self { + inner, + outlive: self.outlive.clone(), + } } } @@ -467,8 +520,9 @@ impl BlockBasedOptions { /// By default, rocksdb will automatically create and use an 8MB internal cache. pub fn set_block_cache(&mut self, cache: &Cache) { unsafe { - ffi::rocksdb_block_based_options_set_block_cache(self.inner, cache.inner); + ffi::rocksdb_block_based_options_set_block_cache(self.inner, cache.0.inner); } + self.outlive.block_cache = Some(cache.clone()); } /// Sets global cache for compressed blocks. Cache must outlive DB instance which uses it. @@ -476,8 +530,9 @@ impl BlockBasedOptions { /// By default, rocksdb will not use a compressed block cache. pub fn set_block_cache_compressed(&mut self, cache: &Cache) { unsafe { - ffi::rocksdb_block_based_options_set_block_cache_compressed(self.inner, cache.inner); + ffi::rocksdb_block_based_options_set_block_cache_compressed(self.inner, cache.0.inner); } + self.outlive.block_cache_compressed = Some(cache.clone()); } /// Disable block cache @@ -633,7 +688,10 @@ impl Default for BlockBasedOptions { if block_opts.is_null() { panic!("Could not create RocksDB block based options"); } - BlockBasedOptions { inner: block_opts } + BlockBasedOptions { + inner: block_opts, + outlive: BlockBasedOptionsMustOutliveDB::default(), + } } } @@ -819,8 +877,9 @@ impl Options { /// Default: Env::default() pub fn set_env(&mut self, env: &Env) { unsafe { - ffi::rocksdb_options_set_env(self.inner, env.inner); + ffi::rocksdb_options_set_env(self.inner, env.0.inner); } + self.outlive.env = Some(env.clone()); } /// Sets the compression algorithm that will be used for compressing blocks. @@ -2101,6 +2160,7 @@ impl Options { unsafe { ffi::rocksdb_options_set_block_based_table_factory(self.inner, factory.inner); } + self.outlive.block_based = Some(factory.outlive.clone()); } // This is a factory that provides TableFactory objects. @@ -2504,8 +2564,9 @@ impl Options { /// Not supported in ROCKSDB_LITE mode! pub fn set_row_cache(&mut self, cache: &Cache) { unsafe { - ffi::rocksdb_options_set_row_cache(self.inner, cache.inner); + ffi::rocksdb_options_set_row_cache(self.inner, cache.0.inner); } + self.outlive.row_cache = Some(cache.clone()); } /// Use to control write rate of flush and compaction. Flush has higher @@ -2695,7 +2756,10 @@ impl Default for Options { if opts.is_null() { panic!("Could not create RocksDB options"); } - Options { inner: opts } + Options { + inner: opts, + outlive: OptionsMustOutliveDB::default(), + } } } } diff --git a/src/perf.rs b/src/perf.rs index dee70be90..d099a8993 100644 --- a/src/perf.rs +++ b/src/perf.rs @@ -249,7 +249,7 @@ impl MemoryUsageBuilder { /// Add a cache to collect memory usage from it and add up in total stats fn add_cache(&mut self, cache: &Cache) { unsafe { - ffi::rocksdb_memory_consumers_add_cache(self.inner, cache.inner); + ffi::rocksdb_memory_consumers_add_cache(self.inner, cache.0.inner); } } From 56417867cc00b8c03cd0ac37c797dcff7b90ed6e Mon Sep 17 00:00:00 2001 From: Andrea Corradi Date: Sun, 4 Apr 2021 14:36:10 +0200 Subject: [PATCH 4/6] BlackBasedOptionsMustOutliveDB can be private --- src/db_options.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/db_options.rs b/src/db_options.rs index 9252c800e..4913151bf 100644 --- a/src/db_options.rs +++ b/src/db_options.rs @@ -224,7 +224,7 @@ impl OptionsMustOutliveDB { } #[derive(Default)] -pub struct BlockBasedOptionsMustOutliveDB { +struct BlockBasedOptionsMustOutliveDB { block_cache: Option, block_cache_compressed: Option, } From f15b437095c48260f0fa9d6b1874a02d7fa447f1 Mon Sep 17 00:00:00 2001 From: Andrea Corradi Date: Sun, 4 Apr 2021 15:00:06 +0200 Subject: [PATCH 5/6] Store Options from ColumnFamily --- src/db.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/db.rs b/src/db.rs index 6e8e7d2ba..3684590ac 100644 --- a/src/db.rs +++ b/src/db.rs @@ -30,6 +30,7 @@ use std::collections::BTreeMap; use std::ffi::{CStr, CString}; use std::fmt; use std::fs; +use std::iter; use std::marker::PhantomData; use std::path::Path; use std::path::PathBuf; @@ -103,7 +104,7 @@ pub struct DBWithThreadMode { pub(crate) inner: *mut ffi::rocksdb_t, cfs: T, // Column families are held differently depending on thread mode path: PathBuf, - _outlive: OptionsMustOutliveDB, + _outlive: Vec, } /// Minimal set of DB-related methods, intended to be generic over @@ -240,7 +241,7 @@ impl DBWithThreadMode { inner: db, cfs: T::new(BTreeMap::new()), path: path.as_ref().to_path_buf(), - _outlive: opts.outlive.clone(), + _outlive: vec![opts.outlive.clone()], }) } @@ -333,6 +334,9 @@ impl DBWithThreadMode { I: IntoIterator, { let cfs: Vec<_> = cfs.into_iter().collect(); + let outlive = iter::once(opts.outlive.clone()) + .chain(cfs.iter().map(|cf| cf.options.outlive.clone())) + .collect(); let cpath = to_cpath(&path)?; @@ -404,7 +408,7 @@ impl DBWithThreadMode { inner: db, path: path.as_ref().to_path_buf(), cfs: T::new(cf_map), - _outlive: opts.outlive.clone(), + _outlive: outlive, }) } From b1733218d0c0454bc23b554277bc158a43d46489 Mon Sep 17 00:00:00 2001 From: Andrea Corradi Date: Sun, 4 Apr 2021 16:15:35 +0200 Subject: [PATCH 6/6] Wrap Env and Cache using Arc --- src/db_options.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/db_options.rs b/src/db_options.rs index 4913151bf..50e2d281e 100644 --- a/src/db_options.rs +++ b/src/db_options.rs @@ -15,7 +15,7 @@ use std::ffi::{CStr, CString}; use std::mem; use std::path::Path; -use std::rc::Rc; +use std::sync::Arc; use libc::{self, c_char, c_int, c_uchar, c_uint, c_void, size_t}; @@ -48,7 +48,7 @@ impl Drop for CacheWrapper { } } -pub struct Cache(pub(crate) Rc); +pub struct Cache(pub(crate) Arc); impl Cache { /// Create a lru cache with capacity @@ -57,7 +57,7 @@ impl Cache { if cache.is_null() { Err(Error::new("Could not create Cache".to_owned())) } else { - Ok(Cache(Rc::new(CacheWrapper { inner: cache }))) + Ok(Cache(Arc::new(CacheWrapper { inner: cache }))) } } @@ -93,6 +93,8 @@ impl Cache { /// /// Note: currently, C API behinds C++ API for various settings. /// See also: `rocksdb/include/env.h` +pub struct Env(Arc); + struct EnvWrapper { inner: *mut ffi::rocksdb_env_t, } @@ -105,8 +107,6 @@ impl Drop for EnvWrapper { } } -pub struct Env(Rc); - impl Env { /// Returns default env pub fn default() -> Result { @@ -114,7 +114,7 @@ impl Env { if env.is_null() { Err(Error::new("Could not create mem env".to_owned())) } else { - Ok(Env(Rc::new(EnvWrapper { inner: env }))) + Ok(Env(Arc::new(EnvWrapper { inner: env }))) } } @@ -125,7 +125,7 @@ impl Env { if env.is_null() { Err(Error::new("Could not create mem env".to_owned())) } else { - Ok(Env(Rc::new(EnvWrapper { inner: env }))) + Ok(Env(Arc::new(EnvWrapper { inner: env }))) } }