Skip to content

Commit

Permalink
Merge pull request #8639 from ClSlaid/replace-rwlock-storages-service
Browse files Browse the repository at this point in the history
refactor(storage): replace RwLocks with DashMap
  • Loading branch information
mergify[bot] committed Nov 6, 2022
2 parents 83be7f4 + e715846 commit 460631a
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 66 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/query/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ byteorder = "1.4.3"
bytes = "1.2.1"
chrono = "0.4.22"
chrono-tz = "0.6.3"
dashmap = "5.4"
futures = "0.3.24"
futures-util = "0.3.24"
headers = "0.3.8"
Expand Down
55 changes: 28 additions & 27 deletions src/query/service/src/catalogs/default/table_memory_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,43 +12,41 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;

use common_exception::ErrorCode;
use common_exception::Result;
use common_meta_types::MetaId;
use parking_lot::RwLock;
use dashmap::DashMap;

use crate::storages::Table;

pub struct DbTables {
name_to_table: RwLock<HashMap<String, Arc<dyn Table>>>,
id_to_table: RwLock<HashMap<MetaId, Arc<dyn Table>>>,
name_to_table: DashMap<String, Arc<dyn Table>>,
id_to_table: DashMap<MetaId, Arc<dyn Table>>,
}

pub struct InMemoryMetas {
next_table_id: AtomicU64,
next_db_id: AtomicU64,
db_tables: RwLock<HashMap<String, DbTables>>,
db_tables: DashMap<String, DbTables>,
}

impl InMemoryMetas {
pub fn create(next_db_id: u64, next_table_id: u64) -> Self {
InMemoryMetas {
next_table_id: AtomicU64::new(next_table_id),
next_db_id: AtomicU64::new(next_db_id),
db_tables: RwLock::new(HashMap::new()),
db_tables: DashMap::new(),
}
}

pub fn init_db(&self, db: &str) {
let mut dbs = self.db_tables.write();
dbs.insert(db.to_string(), DbTables {
name_to_table: RwLock::new(HashMap::new()),
id_to_table: RwLock::new(HashMap::new()),
self.db_tables.insert(db.to_string(), DbTables {
name_to_table: DashMap::new(),
id_to_table: DashMap::new(),
});
}

Expand All @@ -65,28 +63,22 @@ impl InMemoryMetas {
}

pub fn insert(&self, db: &str, tbl_ref: Arc<dyn Table>) {
if let Some(db_tables) = self.db_tables.write().get(db) {
if let Some(db_tables) = self.db_tables.get_mut(db) {
let name = tbl_ref.name().to_owned();
db_tables
.name_to_table
.write()
.insert(name, tbl_ref.clone());
db_tables
.id_to_table
.write()
.insert(tbl_ref.get_id(), tbl_ref);
db_tables.name_to_table.insert(name, tbl_ref.clone());
db_tables.id_to_table.insert(tbl_ref.get_id(), tbl_ref);
} else {
panic!("Logical Error: Need create database `{}` first", db)
}
}

pub fn get_by_name(&self, db: &str, name: &str) -> Result<Arc<dyn Table>> {
if let Some(db_tables) = self.db_tables.read().get(db) {
if let Some(db_tables) = self.db_tables.get(db) {
db_tables
.value()
.name_to_table
.read()
.get(name)
.cloned()
.map(|entry| entry.value().clone())
.ok_or_else(|| {
ErrorCode::UnknownTable(format!("`{}.{}` table is unknown", db, name))
})
Expand All @@ -99,17 +91,26 @@ impl InMemoryMetas {
}

pub fn get_by_id(&self, id: &MetaId) -> Option<Arc<dyn Table>> {
for (_db, db_tables) in self.db_tables.read().iter() {
if db_tables.id_to_table.read().contains_key(id) {
return db_tables.id_to_table.read().get(id).cloned();
for entry in self.db_tables.iter() {
let db_tables = entry.value();
if db_tables.id_to_table.contains_key(id) {
return db_tables
.id_to_table
.get(id)
.map(|entry| entry.value().clone());
}
}
None
}

pub fn get_all_tables(&self, db: &str) -> Result<Vec<Arc<dyn Table>>> {
if let Some(db_tables) = self.db_tables.read().get(db) {
Ok(db_tables.name_to_table.read().values().cloned().collect())
if let Some(db_tables) = self.db_tables.get(db) {
Ok(db_tables
.value()
.name_to_table
.iter()
.map(|entry| entry.value().clone())
.collect())
} else {
Err(ErrorCode::UnknownDatabase(format!(
"{} database is unknown",
Expand Down
14 changes: 5 additions & 9 deletions src/query/service/src/databases/database_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use common_config::Config;
use common_exception::ErrorCode;
use common_exception::Result;
use common_meta_app::schema::DatabaseInfo;
use parking_lot::RwLock;
use dashmap::DashMap;

use crate::databases::default::DefaultDatabase;
use crate::databases::share::ShareDatabase;
Expand All @@ -42,12 +41,12 @@ where

#[derive(Default)]
pub struct DatabaseFactory {
creators: RwLock<HashMap<String, Arc<dyn DatabaseCreator>>>,
creators: DashMap<String, Arc<dyn DatabaseCreator>>,
}

impl DatabaseFactory {
pub fn create(_: Config) -> Self {
let mut creators: HashMap<String, Arc<dyn DatabaseCreator>> = Default::default();
let creators: DashMap<String, Arc<dyn DatabaseCreator>> = DashMap::new();
creators.insert(
DefaultDatabase::NAME.to_string(),
Arc::new(DefaultDatabase::try_create),
Expand All @@ -57,9 +56,7 @@ impl DatabaseFactory {
Arc::new(ShareDatabase::try_create),
);

DatabaseFactory {
creators: RwLock::new(creators),
}
DatabaseFactory { creators }
}

pub fn get_database(
Expand All @@ -74,8 +71,7 @@ impl DatabaseFactory {
db_engine.to_uppercase()
};

let lock = self.creators.read();
let factory = lock.get(&engine).ok_or_else(|| {
let factory = self.creators.get(&engine).ok_or_else(|| {
ErrorCode::UnknownDatabaseEngine(format!("Unknown database engine {}", engine))
})?;

Expand Down
26 changes: 11 additions & 15 deletions src/query/service/src/servers/http/v1/query/expiring_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,13 @@
// limitations under the License.

use std::borrow::Borrow;
use std::collections::HashMap;
use std::hash::Hash;
use std::sync::Arc;
use std::time::Duration;

use common_base::base::tokio::task;
use common_base::base::tokio::time::sleep;
use parking_lot::RwLock;
use dashmap::DashMap;

use crate::servers::http::v1::query::expirable::Expirable;
use crate::servers::http::v1::query::expirable::ExpiringState;
Expand Down Expand Up @@ -55,7 +54,7 @@ where V: Expirable
pub struct ExpiringMap<K, V>
where V: Expirable
{
map: Arc<RwLock<HashMap<K, MaybeExpiring<V>>>>,
map: Arc<DashMap<K, MaybeExpiring<V>>>,
}

async fn run_check<T: Expirable>(e: &T, max_idle: Duration) -> bool {
Expand All @@ -76,11 +75,13 @@ async fn run_check<T: Expirable>(e: &T, max_idle: Duration) -> bool {
}

impl<K, V> Default for ExpiringMap<K, V>
where V: Expirable
where
K: Eq + Hash + Clone + Send + Sync + 'static,
V: Expirable,
{
fn default() -> Self {
Self {
map: Arc::new(RwLock::new(HashMap::default())),
map: Arc::new(DashMap::new()),
}
}
}
Expand All @@ -95,7 +96,6 @@ where
K: Clone + Send + Sync + 'static,
V: Send + Sync + 'static,
{
let mut map = self.map.write();
let task = match max_idle_time {
Some(d) => {
let map_clone = self.map.clone();
Expand All @@ -111,16 +111,15 @@ where
None => None,
};
let i = MaybeExpiring { task, value: v };
map.insert(k, i);
self.map.insert(k, i);
}

pub fn get<Q: ?Sized>(&self, k: &Q) -> Option<V>
where
K: Borrow<Q>,
Q: Hash + Eq,
{
let map = self.map.read();
map.get(k).map(|i| &i.value).cloned()
self.map.get(k).map(|i| i.value().value.clone())
}

pub fn remove<Q: ?Sized>(&mut self, k: &Q)
Expand All @@ -131,16 +130,13 @@ where
Self::remove_inner(&self.map, k)
}

fn remove_inner<Q: ?Sized>(map: &Arc<RwLock<HashMap<K, MaybeExpiring<V>>>>, k: &Q)
fn remove_inner<Q: ?Sized>(map: &Arc<DashMap<K, MaybeExpiring<V>>>, k: &Q)
where
K: Borrow<Q>,
Q: Hash + Eq,
{
let checker = {
let mut map = map.write();
map.remove(k)
};
if let Some(mut checker) = checker {
let checker = { map.remove(k) };
if let Some((_, mut checker)) = checker {
checker.on_expire()
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/query/storages/factory/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@ common-storages-null = { path = "../null" }
common-storages-random = { path = "../random" }
common-storages-view = { path = "../view" }

parking_lot = "0.12.1"
dashmap = "5.4"
22 changes: 9 additions & 13 deletions src/query/storages/factory/src/storage_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

pub use common_catalog::catalog::StorageDescription;
Expand All @@ -24,7 +23,7 @@ use common_storages_memory::MemoryTable;
use common_storages_null::NullTable;
use common_storages_random::RandomTable;
use common_storages_view::view_table::ViewTable;
use parking_lot::RwLock;
use dashmap::DashMap;

use crate::fuse::FuseTable;
use crate::Table;
Expand Down Expand Up @@ -64,12 +63,12 @@ pub struct Storage {

#[derive(Default)]
pub struct StorageFactory {
storages: RwLock<HashMap<String, Storage>>,
storages: DashMap<String, Storage>,
}

impl StorageFactory {
pub fn create(conf: Config) -> Self {
let mut creators: HashMap<String, Storage> = Default::default();
let creators: DashMap<String, Storage> = Default::default();

// Register memory table engine.
if conf.query.table_engine_memory_enabled {
Expand Down Expand Up @@ -103,15 +102,12 @@ impl StorageFactory {
descriptor: Arc::new(RandomTable::description),
});

StorageFactory {
storages: RwLock::new(creators),
}
StorageFactory { storages: creators }
}

pub fn get_table(&self, table_info: &TableInfo) -> Result<Arc<dyn Table>> {
let engine = table_info.engine().to_uppercase();
let lock = self.storages.read();
let factory = lock.get(&engine).ok_or_else(|| {
let factory = self.storages.get(&engine).ok_or_else(|| {
ErrorCode::UnknownTableEngine(format!("Unknown table engine {}", engine))
})?;

Expand All @@ -120,10 +116,10 @@ impl StorageFactory {
}

pub fn get_storage_descriptors(&self) -> Vec<StorageDescription> {
let lock = self.storages.read();
let mut descriptors = Vec::with_capacity(lock.len());
for value in lock.values() {
descriptors.push(value.descriptor.description())
let mut descriptors = Vec::with_capacity(self.storages.len());
let it = self.storages.iter();
for entry in it {
descriptors.push(entry.value().descriptor.description())
}
descriptors
}
Expand Down

1 comment on commit 460631a

@vercel
Copy link

@vercel vercel bot commented on 460631a Nov 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

databend – ./

databend-git-main-databend.vercel.app
databend.rs
databend.vercel.app
databend-databend.vercel.app

Please sign in to comment.