Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove parking_lot from dependency #242

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 0 additions & 1 deletion springql-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ pest = "2.1"
pest_derive = "2.1"
reqwest = {version = "0.11", features = ["json", "blocking"], default-features = false}
once_cell = "1.8"
parking_lot = "0.12"
time = {version="0.3.9", features = ["formatting", "parsing", "macros"]}

socketcan = "1.7"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use anyhow::Context;

use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};

/// Lock object to barrier main jobs in worker threads.
///
Expand All @@ -15,7 +15,7 @@ pub struct MainJobLock(RwLock<MainJobLockToken>);

impl MainJobLock {
pub fn main_job_barrier(&self) -> MainJobBarrierGuard {
let write_lock = self.0.write();
let write_lock = self.0.write().unwrap();
MainJobBarrierGuard(write_lock)
}

Expand All @@ -25,6 +25,7 @@ impl MainJobLock {
pub fn try_main_job(&self) -> Result<MainJobLockGuard, anyhow::Error> {
self.0
.try_read()
.ok()
.map(MainJobLockGuard)
.context("write lock may be taken")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub use task_metrics::TaskMetrics;

use std::collections::HashMap;

use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};

use crate::{
pipeline::PipelineVersion,
Expand Down Expand Up @@ -150,48 +150,59 @@ impl PerformanceMetrics {
) -> Vec<(&WindowQueueId, RwLockReadGuard<'_, WindowQueueMetrics>)> {
self.window_queues
.iter()
.map(|(id, q)| (id, q.read()))
.map(|(id, q)| (id, q.read().unwrap()))
.collect()
}

pub fn get_row_queues(&self) -> Vec<(&RowQueueId, RwLockReadGuard<'_, RowQueueMetrics>)> {
self.row_queues
.iter()
.map(|(id, q)| (id, q.read()))
.map(|(id, q)| (id, q.read().unwrap()))
.collect()
}

pub fn get_tasks(&self) -> Vec<(&TaskId, RwLockReadGuard<'_, TaskMetrics>)> {
self.tasks.iter().map(|(id, t)| (id, t.read())).collect()
self.tasks
.iter()
.map(|(id, t)| (id, t.read().unwrap()))
.collect()
}

fn get_task_read(&self, id: &TaskId) -> RwLockReadGuard<'_, TaskMetrics> {
self.tasks
.get(id)
.unwrap_or_else(|| panic!("task_id {} not found", id))
.read()
.unwrap()
}
fn get_window_queue_read(&self, id: &WindowQueueId) -> RwLockReadGuard<'_, WindowQueueMetrics> {
self.window_queues
.get(id)
.unwrap_or_else(|| panic!("queue_id {} not found", id))
.read()
.unwrap()
}
fn get_row_queue_read(&self, id: &RowQueueId) -> RwLockReadGuard<'_, RowQueueMetrics> {
self.row_queues
.get(id)
.unwrap_or_else(|| panic!("queue_id {} not found", id))
.read()
.unwrap()
}

fn get_task_write(&self, id: &TaskId) -> RwLockWriteGuard<'_, TaskMetrics> {
self.tasks
.get(id)
.unwrap_or_else(|| panic!("task id {} not found", id))
.write()
.unwrap()
}
fn get_row_queue_write(&self, id: &RowQueueId) -> RwLockWriteGuard<'_, RowQueueMetrics> {
self.row_queues.get(id).expect("queue_id not found").write()
self.row_queues
.get(id)
.expect("queue_id not found")
.write()
.unwrap()
}
fn get_window_queue_write(
&self,
Expand All @@ -201,5 +212,6 @@ impl PerformanceMetrics {
.get(id)
.expect("queue_id not found")
.write()
.unwrap()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{
sync::Arc,
};

use parking_lot::RwLock;
use std::sync::RwLock;

use crate::stream_engine::autonomous_executor::{
queue::row_queue::RowQueue, task_graph::RowQueueId,
Expand All @@ -18,15 +18,15 @@ pub struct RowQueueRepository {

impl RowQueueRepository {
pub fn get(&self, row_queue_id: &RowQueueId) -> Arc<RowQueue> {
let repo = self.repo.read();
let repo = self.repo.read().unwrap();
repo.get(row_queue_id)
.unwrap_or_else(|| panic!("row queue id {} is not in RowQueueRepository", row_queue_id))
.clone()
}

/// Removes all currently existing queues and creates new empty ones.
pub fn reset(&self, queue_ids: HashSet<RowQueueId>) {
let mut repo = self.repo.write();
let mut repo = self.repo.write().unwrap();
repo.clear();

queue_ids.into_iter().for_each(|queue_id| {
Expand All @@ -35,7 +35,7 @@ impl RowQueueRepository {
}

pub fn purge(&self) {
let mut repo = self.repo.write();
let mut repo = self.repo.write().unwrap();
repo.iter_mut().for_each(|(_, queue)| {
queue.purge();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{
sync::Arc,
};

use parking_lot::RwLock;
use std::sync::RwLock;

use crate::stream_engine::autonomous_executor::{
queue::window_queue::WindowQueue, task_graph::WindowQueueId,
Expand All @@ -18,7 +18,7 @@ pub struct WindowQueueRepository {

impl WindowQueueRepository {
pub fn get(&self, window_queue_id: &WindowQueueId) -> Arc<WindowQueue> {
let repo = self.repo.read();
let repo = self.repo.read().unwrap();
repo.get(window_queue_id)
.unwrap_or_else(|| {
panic!(
Expand All @@ -31,7 +31,7 @@ impl WindowQueueRepository {

/// Removes all currently existing queues and creates new empty ones.
pub fn reset(&self, queue_ids: HashSet<WindowQueueId>) {
let mut repo = self.repo.write();
let mut repo = self.repo.write().unwrap();
repo.clear();

queue_ids.into_iter().for_each(|queue_id| {
Expand All @@ -40,7 +40,7 @@ impl WindowQueueRepository {
}

pub fn purge(&self) {
let mut repo = self.repo.write();
let mut repo = self.repo.write().unwrap();
repo.iter_mut().for_each(|(_, queue)| {
queue.purge();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{
sync::{Arc, Mutex},
};

use parking_lot::RwLock;
use std::sync::RwLock;

use crate::{
api::{error::Result, SpringSinkWriterConfig},
Expand Down Expand Up @@ -38,7 +38,7 @@ impl SinkWriterRepository {
/// - `SpringError::ForeignIo` when:
/// - failed to start subtask.
pub fn register(&self, sink_writer: &SinkWriterModel) -> Result<()> {
let mut sinks = self.sinks.write();
let mut sinks = self.sinks.write().unwrap();

if sinks.get(sink_writer.name()).is_some() {
Ok(())
Expand All @@ -64,6 +64,7 @@ impl SinkWriterRepository {
pub fn get_sink_writer(&self, name: &SinkWriterName) -> Arc<Mutex<Box<dyn SinkWriter>>> {
self.sinks
.read()
.unwrap()
.get(name)
.unwrap_or_else(|| panic!("sink name ({}) not registered yet", name))
.clone()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{
sync::{Arc, Mutex},
};

use parking_lot::RwLock;
use std::sync::RwLock;

use crate::{
api::{error::Result, SpringSourceReaderConfig},
Expand Down Expand Up @@ -38,7 +38,7 @@ impl SourceReaderRepository {
/// - `SpringError::ForeignIo` when:
/// - failed to start subtask.
pub fn register(&self, source_reader: &SourceReaderModel) -> Result<()> {
let mut sources = self.sources.write();
let mut sources = self.sources.write().unwrap();

if sources.get(source_reader.name()).is_some() {
Ok(())
Expand All @@ -64,6 +64,7 @@ impl SourceReaderRepository {
pub fn get_source_reader(&self, name: &SourceReaderName) -> Arc<Mutex<Box<dyn SourceReader>>> {
self.sources
.read()
.unwrap()
.get(name)
.unwrap_or_else(|| panic!("source reader name ({}) not registered yet", name))
.clone()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use anyhow::Context;
//
// `std::sync::RwLock` uses `pthread_rwlock_wrlock`, which might cause writer starvation without setting `PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP` attribute.
// `parking_lot::RwLock` avoids writer starvation.
use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};

/// Task executor is responsible for queues' cleanup on pipeline update.
///
Expand All @@ -17,7 +17,7 @@ pub struct TaskExecutorLock(RwLock<TaskExecutorLockToken>);

impl TaskExecutorLock {
pub fn task_execution_barrier(&self) -> TaskExecutionBarrierGuard {
let write_lock = self.0.write();
let write_lock = self.0.write().unwrap();
TaskExecutionBarrierGuard(write_lock)
}

Expand All @@ -27,6 +27,7 @@ impl TaskExecutorLock {
pub fn try_task_execution(&self) -> Result<TaskExecutionLockGuard, anyhow::Error> {
self.0
.try_read()
.ok()
.map(TaskExecutionLockGuard)
.context("write lock may be taken")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
time::Duration,
};

use parking_lot::{Mutex, MutexGuard};
use std::sync::{Mutex, MutexGuard};

use crate::{
api::SpringConfig,
Expand Down Expand Up @@ -117,26 +117,25 @@ impl WorkerSetupCoordinator {
fn sync<T, F: Fn(&T) -> bool>(&self, v: &Mutex<T>, is_sync: F) {
loop {
match v.try_lock() {
Some(v_) => {
Ok(v_) => {
if is_sync(&v_) {
break;
} else {
thread::sleep(Self::SYNC_SLEEP);
}
}
None => thread::sleep(Self::SYNC_SLEEP),
Err(_) => {}
}
thread::sleep(Self::SYNC_SLEEP)
}
}

fn ready_i64(&self, n: &Mutex<i64>) {
let mut n_ = n.lock();
let mut n_ = n.lock().unwrap();
assert!(*n_ > 0);
*n_ -= 1;
}

fn ready_bool(&self, b: &Mutex<bool>) {
let mut b_ = b.lock();
let mut b_ = b.lock().unwrap();
assert!(!*b_);
*b_ = true;
}
Expand All @@ -160,7 +159,7 @@ impl WorkerStopCoordinator {
}

fn join(&self) {
let mut live_worker_count = self.live_worker_count.lock();
let mut live_worker_count = self.live_worker_count.lock().unwrap();
*live_worker_count += 1;
}

Expand All @@ -170,6 +169,6 @@ impl WorkerStopCoordinator {
}

fn locked_count(&self) -> MutexGuard<u16> {
self.live_worker_count.lock()
self.live_worker_count.lock().unwrap()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,13 +151,19 @@ pub trait WorkerThread {
let mut state = Self::LoopState::new(&thread_arg);

while stop_receiver.try_recv().is_err() {
if let Ok(_lock) = main_job_lock.try_main_job() {
let wait = if let Ok(_lock) = main_job_lock.try_main_job() {
if state.is_integral() {
state = Self::main_loop_cycle(state, &thread_arg, event_queue.as_ref());
false
} else {
log::debug!("[{}] main_loop(): state is not integral", Self::THREAD_NAME);
thread::sleep(Duration::from_millis(100));
true
}
} else {
false
};
if wait {
thread::sleep(Duration::from_millis(100));
}
state = Self::handle_events(state, &event_polls, &thread_arg, event_queue.clone());
}
Expand Down