Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(storage): replace RwLocks with DashMap #8639

Merged
merged 6 commits into from
Nov 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/query/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ byteorder = "1.4.3"
bytes = "1.2.1"
chrono = "0.4.22"
chrono-tz = "0.6.3"
dashmap = "5.4"
futures = "0.3.24"
futures-util = "0.3.24"
headers = "0.3.8"
Expand Down
55 changes: 28 additions & 27 deletions src/query/service/src/catalogs/default/table_memory_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,43 +12,41 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;

use common_exception::ErrorCode;
use common_exception::Result;
use common_meta_types::MetaId;
use parking_lot::RwLock;
use dashmap::DashMap;

use crate::storages::Table;

pub struct DbTables {
name_to_table: RwLock<HashMap<String, Arc<dyn Table>>>,
id_to_table: RwLock<HashMap<MetaId, Arc<dyn Table>>>,
name_to_table: DashMap<String, Arc<dyn Table>>,
id_to_table: DashMap<MetaId, Arc<dyn Table>>,
}

pub struct InMemoryMetas {
next_table_id: AtomicU64,
next_db_id: AtomicU64,
db_tables: RwLock<HashMap<String, DbTables>>,
db_tables: DashMap<String, DbTables>,
}

impl InMemoryMetas {
pub fn create(next_db_id: u64, next_table_id: u64) -> Self {
InMemoryMetas {
next_table_id: AtomicU64::new(next_table_id),
next_db_id: AtomicU64::new(next_db_id),
db_tables: RwLock::new(HashMap::new()),
db_tables: DashMap::new(),
}
}

pub fn init_db(&self, db: &str) {
let mut dbs = self.db_tables.write();
dbs.insert(db.to_string(), DbTables {
name_to_table: RwLock::new(HashMap::new()),
id_to_table: RwLock::new(HashMap::new()),
self.db_tables.insert(db.to_string(), DbTables {
name_to_table: DashMap::new(),
id_to_table: DashMap::new(),
});
}

Expand All @@ -65,28 +63,22 @@ impl InMemoryMetas {
}

pub fn insert(&self, db: &str, tbl_ref: Arc<dyn Table>) {
if let Some(db_tables) = self.db_tables.write().get(db) {
if let Some(db_tables) = self.db_tables.get_mut(db) {
let name = tbl_ref.name().to_owned();
db_tables
.name_to_table
.write()
.insert(name, tbl_ref.clone());
db_tables
.id_to_table
.write()
.insert(tbl_ref.get_id(), tbl_ref);
db_tables.name_to_table.insert(name, tbl_ref.clone());
db_tables.id_to_table.insert(tbl_ref.get_id(), tbl_ref);
} else {
panic!("Logical Error: Need create database `{}` first", db)
}
}

pub fn get_by_name(&self, db: &str, name: &str) -> Result<Arc<dyn Table>> {
if let Some(db_tables) = self.db_tables.read().get(db) {
if let Some(db_tables) = self.db_tables.get(db) {
db_tables
.value()
.name_to_table
.read()
.get(name)
.cloned()
.map(|entry| entry.value().clone())
.ok_or_else(|| {
ErrorCode::UnknownTable(format!("`{}.{}` table is unknown", db, name))
})
Expand All @@ -99,17 +91,26 @@ impl InMemoryMetas {
}

pub fn get_by_id(&self, id: &MetaId) -> Option<Arc<dyn Table>> {
for (_db, db_tables) in self.db_tables.read().iter() {
if db_tables.id_to_table.read().contains_key(id) {
return db_tables.id_to_table.read().get(id).cloned();
for entry in self.db_tables.iter() {
let db_tables = entry.value();
if db_tables.id_to_table.contains_key(id) {
return db_tables
.id_to_table
.get(id)
.map(|entry| entry.value().clone());
}
}
None
}

pub fn get_all_tables(&self, db: &str) -> Result<Vec<Arc<dyn Table>>> {
if let Some(db_tables) = self.db_tables.read().get(db) {
Ok(db_tables.name_to_table.read().values().cloned().collect())
if let Some(db_tables) = self.db_tables.get(db) {
Ok(db_tables
.value()
.name_to_table
.iter()
.map(|entry| entry.value().clone())
.collect())
} else {
Err(ErrorCode::UnknownDatabase(format!(
"{} database is unknown",
Expand Down
14 changes: 5 additions & 9 deletions src/query/service/src/databases/database_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use common_config::Config;
use common_exception::ErrorCode;
use common_exception::Result;
use common_meta_app::schema::DatabaseInfo;
use parking_lot::RwLock;
use dashmap::DashMap;

use crate::databases::default::DefaultDatabase;
use crate::databases::share::ShareDatabase;
Expand All @@ -42,12 +41,12 @@ where

#[derive(Default)]
pub struct DatabaseFactory {
creators: RwLock<HashMap<String, Arc<dyn DatabaseCreator>>>,
creators: DashMap<String, Arc<dyn DatabaseCreator>>,
}

impl DatabaseFactory {
pub fn create(_: Config) -> Self {
let mut creators: HashMap<String, Arc<dyn DatabaseCreator>> = Default::default();
let creators: DashMap<String, Arc<dyn DatabaseCreator>> = DashMap::new();
creators.insert(
DefaultDatabase::NAME.to_string(),
Arc::new(DefaultDatabase::try_create),
Expand All @@ -57,9 +56,7 @@ impl DatabaseFactory {
Arc::new(ShareDatabase::try_create),
);

DatabaseFactory {
creators: RwLock::new(creators),
}
DatabaseFactory { creators }
}

pub fn get_database(
Expand All @@ -74,8 +71,7 @@ impl DatabaseFactory {
db_engine.to_uppercase()
};

let lock = self.creators.read();
let factory = lock.get(&engine).ok_or_else(|| {
let factory = self.creators.get(&engine).ok_or_else(|| {
ErrorCode::UnknownDatabaseEngine(format!("Unknown database engine {}", engine))
})?;

Expand Down
26 changes: 11 additions & 15 deletions src/query/service/src/servers/http/v1/query/expiring_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,13 @@
// limitations under the License.

use std::borrow::Borrow;
use std::collections::HashMap;
use std::hash::Hash;
use std::sync::Arc;
use std::time::Duration;

use common_base::base::tokio::task;
use common_base::base::tokio::time::sleep;
use parking_lot::RwLock;
use dashmap::DashMap;

use crate::servers::http::v1::query::expirable::Expirable;
use crate::servers::http::v1::query::expirable::ExpiringState;
Expand Down Expand Up @@ -55,7 +54,7 @@ where V: Expirable
pub struct ExpiringMap<K, V>
where V: Expirable
{
map: Arc<RwLock<HashMap<K, MaybeExpiring<V>>>>,
map: Arc<DashMap<K, MaybeExpiring<V>>>,
}

async fn run_check<T: Expirable>(e: &T, max_idle: Duration) -> bool {
Expand All @@ -76,11 +75,13 @@ async fn run_check<T: Expirable>(e: &T, max_idle: Duration) -> bool {
}

impl<K, V> Default for ExpiringMap<K, V>
where V: Expirable
where
K: Eq + Hash + Clone + Send + Sync + 'static,
V: Expirable,
{
fn default() -> Self {
Self {
map: Arc::new(RwLock::new(HashMap::default())),
map: Arc::new(DashMap::new()),
}
}
}
Expand All @@ -95,7 +96,6 @@ where
K: Clone + Send + Sync + 'static,
V: Send + Sync + 'static,
{
let mut map = self.map.write();
let task = match max_idle_time {
Some(d) => {
let map_clone = self.map.clone();
Expand All @@ -111,16 +111,15 @@ where
None => None,
};
let i = MaybeExpiring { task, value: v };
map.insert(k, i);
self.map.insert(k, i);
}

pub fn get<Q: ?Sized>(&self, k: &Q) -> Option<V>
where
K: Borrow<Q>,
Q: Hash + Eq,
{
let map = self.map.read();
map.get(k).map(|i| &i.value).cloned()
self.map.get(k).map(|i| i.value().value.clone())
}

pub fn remove<Q: ?Sized>(&mut self, k: &Q)
Expand All @@ -131,16 +130,13 @@ where
Self::remove_inner(&self.map, k)
}

fn remove_inner<Q: ?Sized>(map: &Arc<RwLock<HashMap<K, MaybeExpiring<V>>>>, k: &Q)
fn remove_inner<Q: ?Sized>(map: &Arc<DashMap<K, MaybeExpiring<V>>>, k: &Q)
where
K: Borrow<Q>,
Q: Hash + Eq,
{
let checker = {
let mut map = map.write();
map.remove(k)
};
if let Some(mut checker) = checker {
let checker = { map.remove(k) };
if let Some((_, mut checker)) = checker {
checker.on_expire()
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/query/storages/factory/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@ common-storages-null = { path = "../null" }
common-storages-random = { path = "../random" }
common-storages-view = { path = "../view" }

parking_lot = "0.12.1"
dashmap = "5.4"
22 changes: 9 additions & 13 deletions src/query/storages/factory/src/storage_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

pub use common_catalog::catalog::StorageDescription;
Expand All @@ -24,7 +23,7 @@ use common_storages_memory::MemoryTable;
use common_storages_null::NullTable;
use common_storages_random::RandomTable;
use common_storages_view::view_table::ViewTable;
use parking_lot::RwLock;
use dashmap::DashMap;

use crate::fuse::FuseTable;
use crate::Table;
Expand Down Expand Up @@ -64,12 +63,12 @@ pub struct Storage {

#[derive(Default)]
pub struct StorageFactory {
storages: RwLock<HashMap<String, Storage>>,
storages: DashMap<String, Storage>,
}

impl StorageFactory {
pub fn create(conf: Config) -> Self {
let mut creators: HashMap<String, Storage> = Default::default();
let creators: DashMap<String, Storage> = Default::default();

// Register memory table engine.
if conf.query.table_engine_memory_enabled {
Expand Down Expand Up @@ -103,15 +102,12 @@ impl StorageFactory {
descriptor: Arc::new(RandomTable::description),
});

StorageFactory {
storages: RwLock::new(creators),
}
StorageFactory { storages: creators }
}

pub fn get_table(&self, table_info: &TableInfo) -> Result<Arc<dyn Table>> {
let engine = table_info.engine().to_uppercase();
let lock = self.storages.read();
let factory = lock.get(&engine).ok_or_else(|| {
let factory = self.storages.get(&engine).ok_or_else(|| {
ErrorCode::UnknownTableEngine(format!("Unknown table engine {}", engine))
})?;

Expand All @@ -120,10 +116,10 @@ impl StorageFactory {
}

pub fn get_storage_descriptors(&self) -> Vec<StorageDescription> {
let lock = self.storages.read();
let mut descriptors = Vec::with_capacity(lock.len());
for value in lock.values() {
descriptors.push(value.descriptor.description())
let mut descriptors = Vec::with_capacity(self.storages.len());
let it = self.storages.iter();
for entry in it {
descriptors.push(entry.value().descriptor.description())
}
descriptors
}
Expand Down