Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove need for &mut self in create_cf and drop_cf #496

Closed
wants to merge 13 commits into from
190 changes: 167 additions & 23 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,20 @@ use std::path::PathBuf;
use std::ptr;
use std::slice;
use std::str;
use std::sync::{Arc, RwLock};
use std::time::Duration;

enum ColumnFamilyHandles {
MultiThread(RwLock<BTreeMap<String, Arc<ColumnFamily>>>),
SingleThread(BTreeMap<String, ColumnFamily>),
}

/// A RocksDB database.
///
/// See crate level documentation for a simple usage example.
pub struct DB {
pub(crate) inner: *mut ffi::rocksdb_t,
cfs: BTreeMap<String, ColumnFamily>,
cfs: ColumnFamilyHandles,
path: PathBuf,
}

Expand Down Expand Up @@ -105,7 +111,7 @@ impl DB {

Ok(DB {
inner: db,
cfs: BTreeMap::new(),
cfs: ColumnFamilyHandles::SingleThread(BTreeMap::new()),
path: path.as_ref().to_path_buf(),
})
}
Expand All @@ -123,7 +129,24 @@ impl DB {
.into_iter()
.map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));

DB::open_cf_descriptors_internal(opts, path, cfs, &AccessType::ReadWrite)
DB::open_cf_descriptors_internal(opts, path, cfs, &AccessType::ReadWrite, false)
}

/// Opens a database with the given database options and column family names
/// with internal locking for column families.
///
/// Column families opened using this function will be created with default `Options`.
pub fn open_cf_multi_threaded<P, I, N>(opts: &Options, path: P, cfs: I) -> Result<DB, Error>
where
P: AsRef<Path>,
I: IntoIterator<Item = N>,
N: AsRef<str>,
{
let cfs = cfs
.into_iter()
.map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));

DB::open_cf_descriptors_internal(opts, path, cfs, &AccessType::ReadWrite, true)
}

/// Opens a database for read only with the given database options and column family names.
Expand All @@ -149,6 +172,7 @@ impl DB {
&AccessType::ReadOnly {
error_if_log_file_exist,
},
false,
)
}

Expand All @@ -175,6 +199,7 @@ impl DB {
&AccessType::Secondary {
secondary_path: secondary_path.as_ref(),
},
false,
)
}

Expand All @@ -184,7 +209,21 @@ impl DB {
P: AsRef<Path>,
I: IntoIterator<Item = ColumnFamilyDescriptor>,
{
DB::open_cf_descriptors_internal(opts, path, cfs, &AccessType::ReadWrite)
DB::open_cf_descriptors_internal(opts, path, cfs, &AccessType::ReadWrite, false)
}

/// Opens a database with the given database options and column family descriptors,
/// with internal locking for column families
pub fn open_cf_descriptors_multi_threaded<P, I>(
opts: &Options,
path: P,
cfs: I,
) -> Result<DB, Error>
where
P: AsRef<Path>,
I: IntoIterator<Item = ColumnFamilyDescriptor>,
{
DB::open_cf_descriptors_internal(opts, path, cfs, &AccessType::ReadWrite, true)
}

/// Internal implementation for opening RocksDB.
Expand All @@ -193,6 +232,7 @@ impl DB {
path: P,
cfs: I,
access_type: &AccessType,
is_multi_threaded: bool,
) -> Result<DB, Error>
where
P: AsRef<Path>,
Expand Down Expand Up @@ -266,9 +306,20 @@ impl DB {
return Err(Error::new("Could not initialize database.".to_owned()));
}

let cfs = if is_multi_threaded {
ColumnFamilyHandles::MultiThread(RwLock::new(
cf_map
.into_iter()
.map(|(name, cf)| (name, Arc::new(cf)))
.collect(),
))
} else {
ColumnFamilyHandles::SingleThread(cf_map)
};

Ok(DB {
inner: db,
cfs: cf_map,
cfs,
path: path.as_ref().to_path_buf(),
})
}
Expand Down Expand Up @@ -660,41 +711,124 @@ impl DB {
Ok(convert_values(values, values_sizes))
}

pub fn create_cf<N: AsRef<str>>(&mut self, name: N, opts: &Options) -> Result<(), Error> {
let cf_name = if let Ok(c) = CString::new(name.as_ref().as_bytes()) {
fn create_inner_cf_handle(
&self,
name: &str,
opts: &Options,
) -> Result<*mut ffi::rocksdb_column_family_handle_t, Error> {
let cf_name = if let Ok(c) = CString::new(name.as_bytes()) {
c
} else {
return Err(Error::new(
"Failed to convert path to CString when creating cf".to_owned(),
));
};
unsafe {
let inner = ffi_try!(ffi::rocksdb_create_column_family(
Ok(unsafe {
ffi_try!(ffi::rocksdb_create_column_family(
self.inner,
opts.inner,
cf_name.as_ptr(),
));
))
})
}

self.cfs
.insert(name.as_ref().to_string(), ColumnFamily { inner });
/// Creates column family with given name by internally locking the inner column
/// family map. This avoids needing `&mut self` reference, but is only safe to
/// use if the database was opened with multi-threaded config
pub fn create_cf_multi_threaded<N: AsRef<str>>(
&self,
name: N,
opts: &Options,
) -> Result<(), Error> {
let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
match &self.cfs {
ColumnFamilyHandles::MultiThread(cf_map) => cf_map
.write()
.unwrap()
.insert(name.as_ref().to_string(), Arc::new(ColumnFamily { inner })),
ColumnFamilyHandles::SingleThread(_) => {
panic!("Not available on SingleThread implementation");
}
};
Ok(())
}

/// Creates column family with given name and options
pub fn create_cf<N: AsRef<str>>(&mut self, name: N, opts: &Options) -> Result<(), Error> {
let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
match &mut self.cfs {
ColumnFamilyHandles::MultiThread(_) => {
panic!("Not available on MultiThread implementation")
Copy link
Member

@aleksuss aleksuss Mar 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for my pickiness, but I have a feeling that it's not good to panic here. What about to bound an invoke of this with trait ??? I'll try to provide the idea with code soon.

Copy link
Contributor

@ryoqun ryoqun Mar 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aleksuss Hi, again. I want this to move forward :) (cc: @carllin)

What about to bound an invoke of this with trait ??? I'll try to provide the idea with code soon.

Maybe, were you thinking about like this?: ryoqun@1be386d

Sorry for very rough code. But this eliminates bunch of various concerns arisen from this pr's review:

  • ColumnFamily is bounded in the case of MultiThread with zero runtime overhead (I even stopped using Arc)
  • ... and no derive(Clone) on ColumnFamily
  • will have no panic!s (if everything is correctly rewritten)
  • keeps api compatibility and no multithread methods.
  • no enum runtime variant branching (although this is cheap)
  • downside: a bit of internal code churn (I'll promise I'll work hard to fix them all)

If this direction is really to go from maintainer's perspective, I'll finish this very quickly. :)

Copy link
Member

@aleksuss aleksuss Mar 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, were you thinking about like this?: ryoqun/rust-rocksdb@1be386d

Yes. That is exactly what I meant 👍🏻
Unfortunately, I have no time now to provide my vision on it. But your approach is very very near to my vision. Let's try to implement it 🙂

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aleksuss as promised, I created the cleaned-up pr: #506 Please have a look at your convenient time. :)

}
ColumnFamilyHandles::SingleThread(cf_map) => {
cf_map.insert(name.as_ref().to_string(), ColumnFamily { inner });
}
}
Ok(())
}

/// Drops the column family with the given name by internally locking the inner column
/// family map. This avoids needing `&mut self` reference, but is only safe to use if
/// the database was opened with multi-threaded config
pub fn drop_cf_multi_threaded(&self, name: &str) -> Result<(), Error> {
let inner = self.inner;
match &self.cfs {
ColumnFamilyHandles::MultiThread(cf_map) => {
if let Some(cf) = cf_map.write().unwrap().remove(name) {
unsafe {
ffi_try!(ffi::rocksdb_drop_column_family(inner, cf.inner));
}
Ok(())
} else {
Err(Error::new(format!("Invalid column family: {}", name)))
}
}
ColumnFamilyHandles::SingleThread(_) => {
panic!("Not available on SingleThread implementation")
}
}
}

/// Drops the column family with the given name
pub fn drop_cf(&mut self, name: &str) -> Result<(), Error> {
if let Some(cf) = self.cfs.remove(name) {
unsafe {
ffi_try!(ffi::rocksdb_drop_column_family(self.inner, cf.inner));
let inner = self.inner;
match &mut self.cfs {
ColumnFamilyHandles::MultiThread(_) => {
panic!("Not available on MultiThread implementation")
}
ColumnFamilyHandles::SingleThread(cf_map) => {
if let Some(cf) = cf_map.remove(name) {
unsafe {
ffi_try!(ffi::rocksdb_drop_column_family(inner, cf.inner));
}
Ok(())
} else {
Err(Error::new(format!("Invalid column family: {}", name)))
}
}
Ok(())
} else {
Err(Error::new(format!("Invalid column family: {}", name)))
}
}

/// Return the underlying column family handle.
/// Returns the underlying column family handle, only safe to use if the database
/// was configured to be multithreaded
pub fn cf_handle_multi_threaded(&self, name: &str) -> Option<Arc<ColumnFamily>> {
match &self.cfs {
ColumnFamilyHandles::MultiThread(cf_map) => cf_map.write().unwrap().get(name).cloned(),
ColumnFamilyHandles::SingleThread(_) => {
panic!("Not available on SingleThread implementation")
}
}
}

/// Returns the underlying column family handle, only safe to use if the database
/// was not configured to be multithreaded
pub fn cf_handle(&self, name: &str) -> Option<&ColumnFamily> {
self.cfs.get(name)
match &self.cfs {
ColumnFamilyHandles::MultiThread(_) => {
panic!("Not available on MultiThread implementation")
}
ColumnFamilyHandles::SingleThread(cf_map) => cf_map.get(name),
}
}

pub fn iterator<'a: 'b, 'b>(&'a self, mode: IteratorMode) -> DBIterator<'b> {
Expand Down Expand Up @@ -1426,7 +1560,7 @@ impl DB {
/// entirely in the range.
///
/// Note: L0 files are left regardless of whether they're in the range.
///
///
/// Snapshots before the delete might not see the data in the given range.
pub fn delete_file_in_range<K: AsRef<[u8]>>(&self, from: K, to: K) -> Result<(), Error> {
let from = from.as_ref();
Expand Down Expand Up @@ -1476,9 +1610,19 @@ impl DB {
impl Drop for DB {
fn drop(&mut self) {
unsafe {
for cf in self.cfs.values() {
ffi::rocksdb_column_family_handle_destroy(cf.inner);
match &self.cfs {
ColumnFamilyHandles::MultiThread(cf_map) => {
for cf in cf_map.read().unwrap().values() {
ffi::rocksdb_column_family_handle_destroy(cf.inner);
}
}
ColumnFamilyHandles::SingleThread(cf_map) => {
for cf in cf_map.values() {
ffi::rocksdb_column_family_handle_destroy(cf.inner);
}
}
}

ffi::rocksdb_close(self.inner);
}
}
Expand Down
76 changes: 57 additions & 19 deletions tests/test_column_family.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,7 @@ use pretty_assertions::assert_eq;
use rocksdb::{ColumnFamilyDescriptor, MergeOperands, Options, DB, DEFAULT_COLUMN_FAMILY_NAME};
use util::DBPath;

#[test]
fn test_column_family() {
let n = DBPath::new("_rust_rocksdb_cftest");

// should be able to create column families
{
let mut opts = Options::default();
opts.create_if_missing(true);
opts.set_merge_operator_associative("test operator", test_provided_merge);
let mut db = DB::open(&opts, &n).unwrap();
let opts = Options::default();
match db.create_cf("cf1", &opts) {
Ok(()) => println!("cf1 created successfully"),
Err(e) => {
panic!("could not create column family: {}", e);
}
}
}

fn test_cf_common(n: &DBPath) {
// should fail to open db without specifying same column families
{
let mut opts = Options::default();
Expand Down Expand Up @@ -77,6 +59,29 @@ fn test_column_family() {
// TODO should be able to iterate over a cf
{}
// should b able to drop a cf
}

#[test]
fn test_column_family() {
let n = DBPath::new("_rust_rocksdb_cftest");

// should be able to create column families
{
let mut opts = Options::default();
opts.create_if_missing(true);
opts.set_merge_operator_associative("test operator", test_provided_merge);
let mut db = DB::open(&opts, &n).unwrap();
let opts = Options::default();
match db.create_cf("cf1", &opts) {
Ok(()) => println!("cf1 created successfully"),
Err(e) => {
panic!("could not create column family: {}", e);
}
}
}

test_cf_common(&n);

{
let mut db = DB::open_cf(&Options::default(), &n, &["cf1"]).unwrap();
match db.drop_cf("cf1") {
Expand All @@ -86,6 +91,39 @@ fn test_column_family() {
}
}

#[test]
fn test_multi_threaded_column_family() {
let n = DBPath::new("_rust_rocksdb_cftest");

// Should be able to create column families without a mutable reference
// to the db
{
let mut opts = Options::default();
opts.create_if_missing(true);
opts.set_merge_operator_associative("test operator", test_provided_merge);
let db = DB::open_cf_multi_threaded(&opts, &n, None::<&str>).unwrap();
let opts = Options::default();
match db.create_cf_multi_threaded("cf1", &opts) {
Ok(()) => println!("cf1 created successfully"),
Err(e) => {
panic!("could not create column family: {}", e);
}
}
}

test_cf_common(&n);

// Should be able to drop column families without a mutable reference
// to the db
{
let db = DB::open_cf_multi_threaded(&Options::default(), &n, &["cf1"]).unwrap();
match db.drop_cf_multi_threaded("cf1") {
Ok(_) => println!("cf1 successfully dropped."),
Err(e) => panic!("failed to drop column family: {}", e),
}
}
}

#[test]
fn test_can_open_db_with_results_of_list_cf() {
// Test scenario derived from GitHub issue #175 and 177
Expand Down