Skip to content

Commit

Permalink
Add multithread column family map option
Browse files Browse the repository at this point in the history
  • Loading branch information
carllin committed Feb 26, 2021
1 parent 0ead892 commit c51484c
Showing 1 changed file with 92 additions and 26 deletions.
118 changes: 92 additions & 26 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,42 @@ use std::str;
use std::sync::RwLock;
use std::time::Duration;

enum ColumnFamilyHandles {
MultiThread(RwLock<BTreeMap<String, ColumnFamily>>),
SingleThread(BTreeMap<String, ColumnFamily>),
}

impl ColumnFamilyHandles {
fn get<T>(&self, f: impl Fn(&BTreeMap<String, ColumnFamily>) -> T) -> T {
match self {
ColumnFamilyHandles::MultiThread(lock) => f(&lock.read().unwrap()),
ColumnFamilyHandles::SingleThread(map) => f(&map),
}
}

fn get_mut<T>(&mut self, mut f: impl FnMut(&mut BTreeMap<String, ColumnFamily>) -> T) -> T {
match self {
ColumnFamilyHandles::MultiThread(lock) => f(&mut lock.write().unwrap()),
ColumnFamilyHandles::SingleThread(ref mut map) => f(map),
}
}

fn get_mut_locked<T>(&self, mut f: impl FnMut(&mut BTreeMap<String, ColumnFamily>) -> T) -> T {
match self {
ColumnFamilyHandles::MultiThread(lock) => f(&mut lock.write().unwrap()),
ColumnFamilyHandles::SingleThread(_) => {
panic!("Not available on SingleThread implementation");
}
}
}
}

/// A RocksDB database.
///
/// See crate level documentation for a simple usage example.
pub struct DB {
pub(crate) inner: *mut ffi::rocksdb_t,
cfs: RwLock<BTreeMap<String, ColumnFamily>>,
cfs: ColumnFamilyHandles,
path: PathBuf,
}

Expand Down Expand Up @@ -106,7 +136,7 @@ impl DB {

Ok(DB {
inner: db,
cfs: RwLock::new(BTreeMap::new()),
cfs: ColumnFamilyHandles::SingleThread(BTreeMap::new()),
path: path.as_ref().to_path_buf(),
})
}
Expand Down Expand Up @@ -269,7 +299,7 @@ impl DB {

Ok(DB {
inner: db,
cfs: RwLock::new(cf_map),
cfs: ColumnFamilyHandles::SingleThread(cf_map),
path: path.as_ref().to_path_buf(),
})
}
Expand Down Expand Up @@ -661,43 +691,77 @@ impl DB {
Ok(convert_values(values, values_sizes))
}

pub fn create_cf<N: AsRef<str>>(&self, name: N, opts: &Options) -> Result<(), Error> {
let cf_name = if let Ok(c) = CString::new(name.as_ref().as_bytes()) {
fn create_inner_cf_handle(
&self,
name: &str,
opts: &Options,
) -> Result<*mut ffi::rocksdb_column_family_handle_t, Error> {
let cf_name = if let Ok(c) = CString::new(name.as_bytes()) {
c
} else {
return Err(Error::new(
"Failed to convert path to CString when creating cf".to_owned(),
));
};
unsafe {
let inner = ffi_try!(ffi::rocksdb_create_column_family(
Ok(unsafe {
ffi_try!(ffi::rocksdb_create_column_family(
self.inner,
opts.inner,
cf_name.as_ptr(),
));
))
})
}

self.cfs
.write()
.unwrap()
.insert(name.as_ref().to_string(), ColumnFamily { inner });
};
pub fn create_cf_multithreaded<N: AsRef<str>>(
&self,
name: N,
opts: &Options,
) -> Result<(), Error> {
let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
self.cfs.get_mut_locked(|cf_map| {
cf_map.insert(name.as_ref().to_string(), ColumnFamily { inner })
});
Ok(())
}

pub fn drop_cf(&self, name: &str) -> Result<(), Error> {
if let Some(cf) = self.cfs.write().unwrap().remove(name) {
unsafe {
ffi_try!(ffi::rocksdb_drop_column_family(self.inner, cf.inner));
pub fn create_cf<N: AsRef<str>>(&mut self, name: N, opts: &Options) -> Result<(), Error> {
let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
self.cfs
.get_mut(|cf_map| cf_map.insert(name.as_ref().to_string(), ColumnFamily { inner }));
Ok(())
}

pub fn drop_cf(&mut self, name: &str) -> Result<(), Error> {
let inner = self.inner;
self.cfs.get_mut(|cf_map| {
if let Some(cf) = cf_map.remove(name) {
unsafe {
ffi_try!(ffi::rocksdb_drop_column_family(inner, cf.inner));
}
Ok(())
} else {
Err(Error::new(format!("Invalid column family: {}", name)))
}
Ok(())
} else {
Err(Error::new(format!("Invalid column family: {}", name)))
}
})
}

pub fn drop_cf_multi_threaded(&mut self, name: &str) -> Result<(), Error> {
let inner = self.inner;
self.cfs.get_mut_locked(|cf_map| {
if let Some(cf) = cf_map.remove(name) {
unsafe {
ffi_try!(ffi::rocksdb_drop_column_family(inner, cf.inner));
}
Ok(())
} else {
Err(Error::new(format!("Invalid column family: {}", name)))
}
})
}

/// Return the underlying column family handle.
pub fn cf_handle(&self, name: &str) -> Option<ColumnFamily> {
self.cfs.read().unwrap().get(name).cloned()
self.cfs.get(|cf_map| cf_map.get(name).cloned())
}

pub fn iterator<'a: 'b, 'b>(&'a self, mode: IteratorMode) -> DBIterator<'b> {
Expand Down Expand Up @@ -1479,10 +1543,12 @@ impl DB {
impl Drop for DB {
fn drop(&mut self) {
unsafe {
for cf in self.cfs.read().unwrap().values() {
ffi::rocksdb_column_family_handle_destroy(cf.inner);
}
ffi::rocksdb_close(self.inner);
self.cfs.get(|cf_map| {
for cf in cf_map.values() {
ffi::rocksdb_column_family_handle_destroy(cf.inner);
ffi::rocksdb_close(self.inner);
}
})
}
}
}
Expand Down

0 comments on commit c51484c

Please sign in to comment.