Skip to content

Commit

Permalink
feat(mito): make use of options in RegionCreate/OpenRequest (#2436)
Browse files Browse the repository at this point in the history
* refactor: move RegionOptions to options mod

* refactor: define compaction strategy in region/options.rs

* feat: use duration for time window

* refactor: rename CompactionStrategy to CompactionOptions

* feat: use serde to parse options

* feat: parse options

* feat: set options on creation/opening

* test: test create/open with options

* chore: remove todo

* feat: get compaction ttl and options from RegionOptions

* style: fix clippy

* chore: Remove unused engine_options

* style: fix clippy

* chore: remove todo
  • Loading branch information
evenyag committed Sep 19, 2023
1 parent 1fb2d95 commit 7b606ed
Show file tree
Hide file tree
Showing 26 changed files with 396 additions and 67 deletions.
34 changes: 34 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion src/cmd/src/cli/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ fn create_table_info(table_id: TableId, table_name: TableName) -> RawTableInfo {
created_on: chrono::DateTime::default(),
primary_key_indices: vec![],
next_column_id: columns as u32 + 1,
engine_options: Default::default(),
value_indices: vec![],
options: Default::default(),
region_numbers: (1..=100).collect(),
Expand Down
1 change: 0 additions & 1 deletion src/common/meta/src/key/table_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,6 @@ mod tests {
created_on: chrono::DateTime::default(),
primary_key_indices: vec![0, 1],
next_column_id: 3,
engine_options: Default::default(),
value_indices: vec![2, 3],
options: Default::default(),
region_numbers: vec![1],
Expand Down
2 changes: 0 additions & 2 deletions src/meta-srv/src/procedure/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ pub mod mock {

#[cfg(test)]
pub mod test_data {
use std::collections::HashMap;
use std::sync::Arc;

use chrono::DateTime;
Expand Down Expand Up @@ -178,7 +177,6 @@ pub mod test_data {
engine: MITO2_ENGINE.to_string(),
next_column_id: 3,
region_numbers: vec![1, 2, 3],
engine_options: HashMap::new(),
options: TableOptions::default(),
created_on: DateTime::default(),
partition_key_indices: vec![],
Expand Down
3 changes: 0 additions & 3 deletions src/meta-srv/src/table_routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@ pub(crate) async fn fetch_tables(

#[cfg(test)]
pub(crate) mod tests {
use std::collections::HashMap;

use chrono::DateTime;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE};
use common_meta::key::TableMetadataManagerRef;
Expand Down Expand Up @@ -103,7 +101,6 @@ pub(crate) mod tests {
engine: MITO_ENGINE.to_string(),
next_column_id: 1,
region_numbers: vec![1, 2, 3, 4],
engine_options: HashMap::new(),
options: TableOptions::default(),
created_on: DateTime::default(),
partition_key_indices: vec![],
Expand Down
1 change: 1 addition & 0 deletions src/mito2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ prost.workspace = true
regex = "1.5"
serde = { version = "1.0", features = ["derive"] }
serde_json.workspace = true
serde_with = "3"
smallvec.workspace = true
snafu.workspace = true
store-api = { workspace = true }
Expand Down
19 changes: 7 additions & 12 deletions src/mito2/src/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,19 @@ mod twcs;

use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

use common_telemetry::{debug, error};
pub use picker::CompactionPickerRef;
use snafu::ResultExt;
use store_api::storage::{CompactionStrategy, RegionId, TwcsOptions};
use store_api::storage::RegionId;
use tokio::sync::mpsc::{self, Sender};

use crate::access_layer::AccessLayerRef;
use crate::compaction::twcs::TwcsPicker;
use crate::error::{
CompactRegionSnafu, Error, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, Result,
};
use crate::region::options::CompactionOptions;
use crate::region::version::{VersionControlRef, VersionRef};
use crate::request::{OptionOutputTx, OutputTx, WorkerRequest};
use crate::schedule::scheduler::SchedulerRef;
Expand All @@ -42,7 +42,6 @@ use crate::sst::file_purger::FilePurgerRef;
pub struct CompactionRequest {
pub(crate) current_version: VersionRef,
pub(crate) access_layer: AccessLayerRef,
pub(crate) ttl: Option<Duration>,
pub(crate) compaction_time_window: Option<i64>,
/// Sender to send notification to the region worker.
pub(crate) request_sender: mpsc::Sender<WorkerRequest>,
Expand All @@ -64,13 +63,13 @@ impl CompactionRequest {
}
}

/// Builds compaction picker according to [CompactionStrategy].
pub fn compaction_strategy_to_picker(strategy: &CompactionStrategy) -> CompactionPickerRef {
/// Builds compaction picker according to [CompactionOptions].
pub fn compaction_options_to_picker(strategy: &CompactionOptions) -> CompactionPickerRef {
match strategy {
CompactionStrategy::Twcs(twcs_opts) => Arc::new(TwcsPicker::new(
CompactionOptions::Twcs(twcs_opts) => Arc::new(TwcsPicker::new(
twcs_opts.max_active_window_files,
twcs_opts.max_inactive_window_files,
twcs_opts.time_window_seconds,
twcs_opts.time_window_seconds(),
)) as Arc<_>,
}
}
Expand Down Expand Up @@ -175,9 +174,7 @@ impl CompactionScheduler {
///
/// If the region has nothing to compact, it removes the region from the status map.
fn schedule_compaction_request(&mut self, request: CompactionRequest) -> Result<()> {
// TODO(hl): build picker according to region options.
let picker =
compaction_strategy_to_picker(&CompactionStrategy::Twcs(TwcsOptions::default()));
let picker = compaction_options_to_picker(&request.current_version.options.compaction);
let region_id = request.region_id();
debug!(
"Pick compaction strategy {:?} for region: {}",
Expand Down Expand Up @@ -309,8 +306,6 @@ impl CompactionStatus {
let mut req = CompactionRequest {
current_version,
access_layer: self.access_layer.clone(),
// TODO(hl): get TTL info from region metadata
ttl: None,
// TODO(hl): get persisted region compaction time window
compaction_time_window: None,
request_sender: request_sender.clone(),
Expand Down
3 changes: 1 addition & 2 deletions src/mito2/src/compaction/twcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ impl Picker for TwcsPicker {
let CompactionRequest {
current_version,
access_layer,
ttl,
compaction_time_window,
request_sender,
waiters,
Expand All @@ -131,6 +130,7 @@ impl Picker for TwcsPicker {
let region_id = region_metadata.region_id;

let levels = current_version.ssts.levels();
let ttl = current_version.options.ttl;
let expired_ssts = get_expired_ssts(levels, ttl, Timestamp::current_millis());
if !expired_ssts.is_empty() {
info!("Expired SSTs in region {}: {:?}", region_id, expired_ssts);
Expand Down Expand Up @@ -376,7 +376,6 @@ impl CompactionTask for TwcsCompactionTask {
notify,
})
.await;
// TODO(hl): handle reschedule
}
}

Expand Down
2 changes: 0 additions & 2 deletions src/mito2/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ use serde::{Deserialize, Serialize};
const DEFAULT_NUM_WORKERS: usize = 1;
/// Default max running background job.
const DEFAULT_MAX_BG_JOB: usize = 4;
/// Default region write buffer size.
pub(crate) const DEFAULT_WRITE_BUFFER_SIZE: ReadableSize = ReadableSize::mb(32);

/// Configuration for [MitoEngine](crate::engine::MitoEngine).
#[derive(Debug, Serialize, Deserialize, Clone)]
Expand Down
24 changes: 24 additions & 0 deletions src/mito2/src/engine/create_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::time::Duration;

use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use store_api::region_engine::RegionEngine;
Expand Down Expand Up @@ -77,3 +79,25 @@ async fn test_engine_create_existing_region() {
"unexpected err: {err}"
);
}

#[tokio::test]
async fn test_engine_create_with_options() {
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new()
.insert_option("ttl", "10d")
.build();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();

assert!(engine.is_region_exists(region_id));
let region = engine.get_region(region_id).unwrap();
assert_eq!(
Duration::from_secs(3600 * 24 * 10),
region.version().options.ttl.unwrap()
);
}
44 changes: 43 additions & 1 deletion src/mito2/src/engine/open_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@
// limitations under the License.

use std::collections::HashMap;
use std::time::Duration;

use api::v1::Rows;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{RegionOpenRequest, RegionPutRequest, RegionRequest};
use store_api::region_request::{
RegionCloseRequest, RegionOpenRequest, RegionPutRequest, RegionRequest,
};
use store_api::storage::RegionId;

use crate::config::MitoConfig;
Expand Down Expand Up @@ -125,3 +128,42 @@ async fn test_engine_open_readonly() {
engine.set_writable(region_id, true).unwrap();
put_rows(&engine, region_id, rows).await;
}

#[tokio::test]
async fn test_engine_region_open_with_options() {
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let region_dir = request.region_dir.clone();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();

// Close the region.
engine
.handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
.await
.unwrap();

// Open the region again with options.
engine
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
region_dir,
options: HashMap::from([("ttl".to_string(), "4d".to_string())]),
}),
)
.await
.unwrap();

let region = engine.get_region(region_id).unwrap();
assert_eq!(
Duration::from_secs(3600 * 24 * 4),
region.version().options.ttl.unwrap()
);
}
7 changes: 7 additions & 0 deletions src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,12 @@ pub enum Error {
region_id: RegionId,
location: Location,
},

#[snafu(display("Invalid options, source: {}", source))]
JsonOptions {
source: serde_json::Error,
location: Location,
},
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -522,6 +528,7 @@ impl ErrorExt for Error {
CompatReader { .. } => StatusCode::Unexpected,
InvalidRegionRequest { source, .. } => source.status_code(),
RegionReadonly { .. } => StatusCode::RegionReadonly,
JsonOptions { .. } => StatusCode::InvalidArguments,
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub mod memtable;
mod metrics;
#[allow(dead_code)]
pub mod read;
mod region;
pub mod region;
mod region_write_ctx;
#[allow(dead_code)]
pub mod request;
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
//! Mito region.

pub(crate) mod opener;
pub mod options;
pub(crate) mod version;

use std::collections::HashMap;
Expand Down

0 comments on commit 7b606ed

Please sign in to comment.