Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(storage): replace RwLocks with DashMap #8639

Merged
merged 6 commits into from
Nov 6, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/query/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ byteorder = "1.4.3"
bytes = "1.2.1"
chrono = "0.4.22"
chrono-tz = "0.6.3"
dashmap = "5.4"
futures = "0.3.24"
futures-util = "0.3.24"
headers = "0.3.8"
Expand Down
55 changes: 28 additions & 27 deletions src/query/service/src/catalogs/default/table_memory_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,43 +12,41 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;

use common_exception::ErrorCode;
use common_exception::Result;
use common_meta_types::MetaId;
use parking_lot::RwLock;
use dashmap::DashMap;

use crate::storages::Table;

pub struct DbTables {
name_to_table: RwLock<HashMap<String, Arc<dyn Table>>>,
id_to_table: RwLock<HashMap<MetaId, Arc<dyn Table>>>,
name_to_table: DashMap<String, Arc<dyn Table>>,
id_to_table: DashMap<MetaId, Arc<dyn Table>>,
}

pub struct InMemoryMetas {
next_table_id: AtomicU64,
next_db_id: AtomicU64,
db_tables: RwLock<HashMap<String, DbTables>>,
db_tables: DashMap<String, DbTables>,
}

impl InMemoryMetas {
pub fn create(next_db_id: u64, next_table_id: u64) -> Self {
InMemoryMetas {
next_table_id: AtomicU64::new(next_table_id),
next_db_id: AtomicU64::new(next_db_id),
db_tables: RwLock::new(HashMap::new()),
db_tables: DashMap::new(),
}
}

pub fn init_db(&self, db: &str) {
let mut dbs = self.db_tables.write();
dbs.insert(db.to_string(), DbTables {
name_to_table: RwLock::new(HashMap::new()),
id_to_table: RwLock::new(HashMap::new()),
self.db_tables.insert(db.to_string(), DbTables {
name_to_table: DashMap::new(),
id_to_table: DashMap::new(),
});
}

Expand All @@ -65,28 +63,22 @@ impl InMemoryMetas {
}

pub fn insert(&self, db: &str, tbl_ref: Arc<dyn Table>) {
if let Some(db_tables) = self.db_tables.write().get(db) {
if let Some(db_tables) = self.db_tables.get_mut(db) {
let name = tbl_ref.name().to_owned();
db_tables
.name_to_table
.write()
.insert(name, tbl_ref.clone());
db_tables
.id_to_table
.write()
.insert(tbl_ref.get_id(), tbl_ref);
db_tables.name_to_table.insert(name, tbl_ref.clone());
db_tables.id_to_table.insert(tbl_ref.get_id(), tbl_ref);
} else {
panic!("Logical Error: Need create database `{}` first", db)
}
}

pub fn get_by_name(&self, db: &str, name: &str) -> Result<Arc<dyn Table>> {
if let Some(db_tables) = self.db_tables.read().get(db) {
if let Some(db_tables) = self.db_tables.get(db) {
db_tables
.value()
.name_to_table
.read()
.get(name)
.cloned()
.map(|entry| entry.value().clone())
.ok_or_else(|| {
ErrorCode::UnknownTable(format!("`{}.{}` table is unknown", db, name))
})
Expand All @@ -99,17 +91,26 @@ impl InMemoryMetas {
}

pub fn get_by_id(&self, id: &MetaId) -> Option<Arc<dyn Table>> {
for (_db, db_tables) in self.db_tables.read().iter() {
if db_tables.id_to_table.read().contains_key(id) {
return db_tables.id_to_table.read().get(id).cloned();
for entry in self.db_tables.iter() {
let db_tables = entry.value();
if db_tables.id_to_table.contains_key(id) {
return db_tables
.id_to_table
.get(id)
.map(|entry| entry.value().clone());
}
}
None
}

pub fn get_all_tables(&self, db: &str) -> Result<Vec<Arc<dyn Table>>> {
if let Some(db_tables) = self.db_tables.read().get(db) {
Ok(db_tables.name_to_table.read().values().cloned().collect())
if let Some(db_tables) = self.db_tables.get(db) {
Ok(db_tables
.value()
.name_to_table
.iter()
.map(|entry| entry.value().clone())
.collect())
} else {
Err(ErrorCode::UnknownDatabase(format!(
"{} database is unknown",
Expand Down
14 changes: 5 additions & 9 deletions src/query/service/src/databases/database_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use common_config::Config;
use common_exception::ErrorCode;
use common_exception::Result;
use common_meta_app::schema::DatabaseInfo;
use parking_lot::RwLock;
use dashmap::DashMap;

use crate::databases::default::DefaultDatabase;
use crate::databases::share::ShareDatabase;
Expand All @@ -42,12 +41,12 @@ where

#[derive(Default)]
pub struct DatabaseFactory {
creators: RwLock<HashMap<String, Arc<dyn DatabaseCreator>>>,
creators: DashMap<String, Arc<dyn DatabaseCreator>>,
}

impl DatabaseFactory {
pub fn create(_: Config) -> Self {
let mut creators: HashMap<String, Arc<dyn DatabaseCreator>> = Default::default();
let creators: DashMap<String, Arc<dyn DatabaseCreator>> = DashMap::new();
creators.insert(
DefaultDatabase::NAME.to_string(),
Arc::new(DefaultDatabase::try_create),
Expand All @@ -57,9 +56,7 @@ impl DatabaseFactory {
Arc::new(ShareDatabase::try_create),
);

DatabaseFactory {
creators: RwLock::new(creators),
}
DatabaseFactory { creators }
}

pub fn get_database(
Expand All @@ -74,8 +71,7 @@ impl DatabaseFactory {
db_engine.to_uppercase()
};

let lock = self.creators.read();
let factory = lock.get(&engine).ok_or_else(|| {
let factory = self.creators.get(&engine).ok_or_else(|| {
ErrorCode::UnknownDatabaseEngine(format!("Unknown database engine {}", engine))
})?;

Expand Down
26 changes: 11 additions & 15 deletions src/query/service/src/servers/http/v1/query/expiring_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,13 @@
// limitations under the License.

use std::borrow::Borrow;
use std::collections::HashMap;
use std::hash::Hash;
use std::sync::Arc;
use std::time::Duration;

use common_base::base::tokio::task;
use common_base::base::tokio::time::sleep;
use parking_lot::RwLock;
use dashmap::DashMap;

use crate::servers::http::v1::query::expirable::Expirable;
use crate::servers::http::v1::query::expirable::ExpiringState;
Expand Down Expand Up @@ -55,7 +54,7 @@ where V: Expirable
pub struct ExpiringMap<K, V>
where V: Expirable
{
map: Arc<RwLock<HashMap<K, MaybeExpiring<V>>>>,
map: Arc<DashMap<K, MaybeExpiring<V>>>,
}

async fn run_check<T: Expirable>(e: &T, max_idle: Duration) -> bool {
Expand All @@ -76,11 +75,13 @@ async fn run_check<T: Expirable>(e: &T, max_idle: Duration) -> bool {
}

impl<K, V> Default for ExpiringMap<K, V>
where V: Expirable
where
K: Eq + Hash + Clone + Send + Sync + 'static,
V: Expirable,
{
fn default() -> Self {
Self {
map: Arc::new(RwLock::new(HashMap::default())),
map: Arc::new(DashMap::new()),
}
}
}
Expand All @@ -95,7 +96,6 @@ where
K: Clone + Send + Sync + 'static,
V: Send + Sync + 'static,
{
let mut map = self.map.write();
let task = match max_idle_time {
Some(d) => {
let map_clone = self.map.clone();
Expand All @@ -111,16 +111,15 @@ where
None => None,
};
let i = MaybeExpiring { task, value: v };
map.insert(k, i);
self.map.insert(k, i);
}

pub fn get<Q: ?Sized>(&self, k: &Q) -> Option<V>
where
K: Borrow<Q>,
Q: Hash + Eq,
{
let map = self.map.read();
map.get(k).map(|i| &i.value).cloned()
self.map.get(k).map(|i| i.value().value.clone())
}

pub fn remove<Q: ?Sized>(&mut self, k: &Q)
Expand All @@ -131,16 +130,13 @@ where
Self::remove_inner(&self.map, k)
}

fn remove_inner<Q: ?Sized>(map: &Arc<RwLock<HashMap<K, MaybeExpiring<V>>>>, k: &Q)
fn remove_inner<Q: ?Sized>(map: &Arc<DashMap<K, MaybeExpiring<V>>>, k: &Q)
where
K: Borrow<Q>,
Q: Hash + Eq,
{
let checker = {
let mut map = map.write();
map.remove(k)
};
if let Some(mut checker) = checker {
let checker = { map.remove(k) };
if let Some((_, mut checker)) = checker {
checker.on_expire()
}
}
Expand Down