From 6534bf190d93d817a58fc290870b8496268a6dcc Mon Sep 17 00:00:00 2001 From: fdeantoni Date: Sun, 18 Apr 2021 16:57:07 +0800 Subject: [PATCH] Add DB::open_cf_with_ttl method. (#505) --- CHANGELOG.md | 1 + src/db.rs | 59 ++++++++++++++++++++++++++++++++++++++---------- tests/test_db.rs | 19 ++++++++++++++++ 3 files changed, 67 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0b5c7a615..8f9dd96b9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ * Bump `librocksdb-sys` up to 6.13.3 (aleksuss) * Add `multi_get`, `multi_get_opt`, `multi_get_cf` and `multi_get_cf_opt` `DB` methods (stanislav-tkach) * Bump `librocksdb-sys` up to 6.17.3 (ordian) +* Add `DB::open_cf_with_ttl` method (fdeantoni) ## 0.15.0 (2020-08-25) diff --git a/src/db.rs b/src/db.rs index 3684590ac..ae7f5956e 100644 --- a/src/db.rs +++ b/src/db.rs @@ -231,18 +231,43 @@ impl DBWithThreadMode { path: P, ttl: Duration, ) -> Result { - let c_path = to_cpath(&path)?; - let db = Self::open_raw(opts, &c_path, &AccessType::WithTTL { ttl })?; - if db.is_null() { - return Err(Error::new("Could not initialize database.".to_owned())); - } + Self::open_cf_descriptors_with_ttl(opts, path, std::iter::empty(), ttl) + } - Ok(Self { - inner: db, - cfs: T::new(BTreeMap::new()), - path: path.as_ref().to_path_buf(), - _outlive: vec![opts.outlive.clone()], - }) + /// Opens the database with a Time to Live compaction filter and column family names. + /// + /// Column families opened using this function will be created with default `Options`. + pub fn open_cf_with_ttl( + opts: &Options, + path: P, + cfs: I, + ttl: Duration, + ) -> Result + where + P: AsRef, + I: IntoIterator, + N: AsRef, + { + let cfs = cfs + .into_iter() + .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default())); + + Self::open_cf_descriptors_with_ttl(opts, path, cfs, ttl) + } + + /// Opens a database with the given database with a Time to Live compaction filter and + /// column family descriptors. + pub fn open_cf_descriptors_with_ttl( + opts: &Options, + path: P, + cfs: I, + ttl: Duration, + ) -> Result + where + P: AsRef, + I: IntoIterator, + { + Self::open_cf_descriptors_internal(opts, path, cfs, &AccessType::WithTTL { ttl }) } /// Opens a database with the given database options and column family names. @@ -487,7 +512,17 @@ impl DBWithThreadMode { cfhandles.as_mut_ptr(), )) } - _ => return Err(Error::new("Unsupported access type".to_owned())), + AccessType::WithTTL { ttl } => { + ffi_try!(ffi::rocksdb_open_column_families_with_ttl( + opts.inner, + cpath.as_ptr(), + cfs_v.len() as c_int, + cfnames.as_ptr(), + cfopts.as_ptr(), + cfhandles.as_mut_ptr(), + &(ttl.as_secs() as c_int) as *const _, + )) + } } }; Ok(db) diff --git a/tests/test_db.rs b/tests/test_db.rs index 0826d93f6..1447ed092 100644 --- a/tests/test_db.rs +++ b/tests/test_db.rs @@ -529,6 +529,25 @@ fn test_open_with_ttl() { assert!(db.get(b"key1").unwrap().is_none()); } +#[test] +fn test_open_cf_with_ttl() { + let path = DBPath::new("_rust_rocksdb_test_open_cf_with_ttl"); + + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.create_missing_column_families(true); + let db = DB::open_cf_with_ttl(&opts, &path, &["test_cf"], Duration::from_secs(1)).unwrap(); + let cf = db.cf_handle("test_cf").unwrap(); + db.put_cf(cf, b"key1", b"value1").unwrap(); + + thread::sleep(Duration::from_secs(2)); + // Trigger a manual compaction, this will check the TTL filter + // in the database and drop all expired entries. + db.compact_range_cf(cf, None::<&[u8]>, None::<&[u8]>); + + assert!(db.get_cf(cf, b"key1").unwrap().is_none()); +} + #[test] fn test_open_as_single_threaded() { let primary_path = DBPath::new("_rust_rocksdb_test_open_as_single_threaded");