Skip to content

Commit

Permalink
refactor: first step to refactor error (#1524)
Browse files Browse the repository at this point in the history
## Rationale
Part of #1513

## Detailed Changes
Replace snafu-based Error to thiserror-based for memtable module.

## Test Plan
CI

---------

Co-authored-by: chunshao.rcs <worcsrcsgg@163.com>
  • Loading branch information
jiacai2050 and chunshao90 committed Apr 26, 2024
1 parent e5c9923 commit 431ae51
Show file tree
Hide file tree
Showing 16 changed files with 200 additions and 219 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ async-trait = "0.1.72"
atomic_enum = "0.2.0"
base64 = "0.13"
bytes = "1"
thiserror = "1"
bytes_ext = { path = "src/components/bytes_ext" }
catalog = { path = "src/catalog" }
catalog_impls = { path = "src/catalog_impls" }
Expand Down
2 changes: 2 additions & 0 deletions src/analytic_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ wal-rocksdb = ["wal/wal-rocksdb"]

[dependencies]
# In alphabetical order
anyhow = { workspace = true }
arc-swap = "1.4.0"
arena = { workspace = true }
arrow = { workspace = true }
Expand Down Expand Up @@ -81,6 +82,7 @@ snafu = { workspace = true }
table_engine = { workspace = true }
table_kv = { workspace = true }
tempfile = { workspace = true, optional = true }
thiserror = { workspace = true }
time_ext = { workspace = true }
tokio = { workspace = true }
trace_metric = { workspace = true }
Expand Down
23 changes: 23 additions & 0 deletions src/analytic_engine/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

/// Global Error type for analytic engine.
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
pub enum ErrorKind {
KeyTooLarge,
Internal,
}
25 changes: 12 additions & 13 deletions src/analytic_engine/src/instance/wal_replayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,11 @@ use crate::{
engine::{Error, ReplayWalWithCause, Result},
flush_compaction::{Flusher, TableFlushOptions},
serial_executor::TableOpSerialExecutor,
write::MemTableWriter,
write::{Error as WriteError, MemTableWriter},
},
payload::{ReadPayload, SingleSchemaProviderAdapter, TableSchemaProvider, WalDecoder},
table::data::TableDataRef,
ErrorKind,
};

// Metrics of wal replayer
Expand Down Expand Up @@ -547,22 +548,20 @@ async fn replay_table_log_entries(
let index_in_writer =
IndexInWriterSchema::for_same_schema(row_group.schema().num_columns());
let memtable_writer = MemTableWriter::new(table_data.clone(), serial_exec);
let write_res = memtable_writer
.write(sequence, row_group, index_in_writer)
.box_err()
.context(ReplayWalWithCause {
msg: Some(format!(
"table_id:{}, table_name:{}, space_id:{}",
table_data.space_id, table_data.name, table_data.id
)),
});
let write_res = memtable_writer.write(sequence, row_group, index_in_writer);
if let Err(e) = write_res {
// TODO: find a better way to match this.
if e.to_string().contains(crate::memtable::TOO_LARGE_MESSAGE) {
if matches!(e, WriteError::UpdateMemTableSequence { ref source } if source.kind() == ErrorKind::KeyTooLarge )
{
// ignore this error
warn!("Unable to insert memtable, err:{e}");
} else {
return Err(e);
return Err(Error::ReplayWalWithCause {
msg: Some(format!(
"table_id:{}, table_name:{}, space_id:{}",
table_data.space_id, table_data.name, table_data.id
)),
source: Box::new(e),
});
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/analytic_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
mod compaction;
mod context;
mod engine;
pub mod error;
mod instance;
mod manifest;
pub mod memtable;
Expand All @@ -39,6 +40,7 @@ pub mod table_meta_set_impl;
#[cfg(any(test, feature = "test"))]
pub mod tests;

use error::ErrorKind;
use manifest::details::Options as ManifestOptions;
use object_store::config::StorageOptions;
use serde::{Deserialize, Serialize};
Expand Down
42 changes: 17 additions & 25 deletions src/analytic_engine/src/memtable/columnar/iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::{
time::Instant,
};

use anyhow::Context;
use arena::{Arena, BasicStats, MonoIncArena};
use bytes_ext::{ByteVec, Bytes};
use codec::{memcomparable::MemComparable, row, Encoder};
Expand All @@ -36,17 +37,14 @@ use common_types::{
schema::Schema,
SequenceNumber,
};
use generic_error::BoxError;
use logger::trace;
use macros::ensure;
use parquet::data_type::AsBytes;
use skiplist::{ArenaSlice, BytewiseComparator, IterRef, Skiplist};
use snafu::{OptionExt, ResultExt};

use crate::memtable::{
key,
key::{KeySequence, SequenceCodec},
AppendRow, BuildRecordBatch, DecodeInternalKey, Internal, InternalNoCause, IterTimeout,
ProjectSchema, Result, ScanContext, ScanRequest,
key::{self, KeySequence, SequenceCodec},
Result, ScanContext, ScanRequest,
};

/// Iterator state
Expand Down Expand Up @@ -106,7 +104,7 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send> ColumnarIterImpl<A> {
let row_projector = request
.row_projector_builder
.build(&schema)
.context(ProjectSchema)?;
.context("ProjectSchema")?;
let mut columnar_iter = Self {
memtable,
row_num,
Expand Down Expand Up @@ -147,26 +145,18 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send> ColumnarIterImpl<A> {
let column_schema = self.memtable_schema.column(*idx);
let column = memtable
.get(&column_schema.id)
.with_context(|| InternalNoCause {
msg: format!("column not found, column:{}", column_schema.name),
})?;
.with_context(|| format!("column not found, column:{}", column_schema.name))?;
for (i, key) in key_vec.iter_mut().enumerate().take(self.row_num) {
let datum = column.get_datum(i);
encoder
.encode(key, &datum)
.box_err()
.context(Internal { msg: "encode key" })?;
encoder.encode(key, &datum).context("encode key")?;
}
}

// TODO: Persist the skiplist.
for (i, mut key) in key_vec.into_iter().enumerate() {
SequenceCodec
.encode(&mut key, &KeySequence::new(self.last_sequence, i as u32))
.box_err()
.context(Internal {
msg: "encode key sequence",
})?;
.context("encode key sequence")?;
self.skiplist.put(&key, (i as u32).to_le_bytes().as_slice());
}

Expand Down Expand Up @@ -203,9 +193,10 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send> ColumnarIterImpl<A> {
if !rows.is_empty() {
if let Some(deadline) = self.deadline {
let now = Instant::now();
if now >= deadline {
return IterTimeout { now, deadline }.fail();
}
ensure!(
now < deadline,
"iter timeout, now:{now:?}, deadline:{deadline:?}"
);
}

let fetched_schema = self.row_projector.fetched_schema().clone();
Expand All @@ -219,10 +210,10 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send> ColumnarIterImpl<A> {
self.batch_size,
);
for row in rows.into_iter() {
builder.append_row(row).context(AppendRow)?;
builder.append_row(row).context("AppendRow")?;
}

let batch = builder.build().context(BuildRecordBatch)?;
let batch = builder.build().context("BuildRecordBatch")?;
trace!("column iterator send one batch:{:?}", batch);
Ok(Some(batch))
} else {
Expand All @@ -245,7 +236,8 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send> ColumnarIterImpl<A> {
while self.iter.valid() {
// Fetch current entry
let key = self.iter.key();
let (user_key, _) = key::user_key_from_internal_key(key).context(DecodeInternalKey)?;
let (user_key, _) =
key::user_key_from_internal_key(key).context("DecodeInternalKey")?;

// Check user key is still in range
if self.is_after_end_bound(user_key) {
Expand All @@ -262,7 +254,7 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send> ColumnarIterImpl<A> {
// be set as last_internal_key so maybe we can just
// unwrap it?
let (last_user_key, _) = key::user_key_from_internal_key(last_internal_key)
.context(DecodeInternalKey)?;
.context("DecodeInternalKey")?;
user_key == last_user_key
}
// This is the first user key
Expand Down
40 changes: 11 additions & 29 deletions src/analytic_engine/src/memtable/columnar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,21 @@ use std::{
},
};

use anyhow::Context;
use arena::MonoIncArena;
use bytes_ext::Bytes;
use common_types::{
column::Column, column_schema::ColumnId, datum::Datum, row::Row, schema::Schema,
time::TimeRange, SequenceNumber,
};
use generic_error::BoxError;
use logger::debug;
use macros::ensure;
use skiplist::{BytewiseComparator, Skiplist};
use snafu::{ensure, OptionExt, ResultExt};

use crate::memtable::{
columnar::iter::ColumnarIterImpl, factory::Options, key::KeySequence,
reversed_iter::ReversedColumnarIterator, ColumnarIterPtr, Internal, InternalNoCause,
InvalidPutSequence, MemTable, Metrics as MemtableMetrics, PutContext, Result, ScanContext,
ScanRequest,
reversed_iter::ReversedColumnarIterator, ColumnarIterPtr, MemTable, Metrics as MemtableMetrics,
PutContext, Result, ScanContext, ScanRequest,
};

pub mod factory;
Expand Down Expand Up @@ -108,16 +107,11 @@ impl MemTable for ColumnarMemTable {
} else {
// TODO: impl append() one row in column, avoid memory expansion.
let column = Column::with_capacity(1, column_schema.data_type)
.box_err()
.context(Internal {
msg: "new column failed",
})?;
.context("new column failed")?;
columns.insert(column_schema.id, column);
columns
.get_mut(&column_schema.id)
.context(InternalNoCause {
msg: "get column failed",
})?
.context("get column failed")?
};

if let Some(writer_index) = ctx.index_in_writer.column_index_in_writer(i) {
Expand All @@ -127,10 +121,7 @@ impl MemTable for ColumnarMemTable {
} else {
column
.append_datum_ref(&row[writer_index])
.box_err()
.context(Internal {
msg: "append datum failed",
})?
.context("append datum failed")?
}
} else {
column.append_nulls(1);
Expand All @@ -140,9 +131,7 @@ impl MemTable for ColumnarMemTable {
let mut memtable = self.memtable.write().unwrap();
for (k, v) in columns {
if let Some(column) = memtable.get_mut(&k) {
column.append_column(v).box_err().context(Internal {
msg: "append column",
})?;
column.append_column(v).context("append column")?;
} else {
memtable.insert(k, v);
};
Expand Down Expand Up @@ -174,18 +163,14 @@ impl MemTable for ColumnarMemTable {
.schema
.columns()
.get(self.schema.timestamp_index())
.context(InternalNoCause {
msg: "timestamp column is missing",
})?;
.context("timestamp column is missing")?;

let num_rows = self
.memtable
.read()
.unwrap()
.get(&timestamp_column.id)
.context(InternalNoCause {
msg: "get timestamp column failed",
})?
.context("get timestamp column failed")?
.len();
let (reverse, batch_size) = (request.reverse, ctx.batch_size);
let arena = MonoIncArena::with_collector(
Expand Down Expand Up @@ -219,10 +204,7 @@ impl MemTable for ColumnarMemTable {
let last = self.last_sequence();
ensure!(
sequence >= last,
InvalidPutSequence {
given: sequence,
last
}
"invalid sequence, given:{sequence}, last:{last}"
);

self.last_sequence.store(sequence, Ordering::Relaxed);
Expand Down
51 changes: 51 additions & 0 deletions src/analytic_engine/src/memtable/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use thiserror::Error;

use crate::ErrorKind;

#[derive(Debug, Error)]
#[error(transparent)]
pub struct Error(#[from] InnerError);

impl From<anyhow::Error> for Error {
fn from(source: anyhow::Error) -> Self {
Self(InnerError::Other { source })
}
}

impl Error {
pub fn kind(&self) -> ErrorKind {
match self.0 {
InnerError::KeyTooLarge { .. } => ErrorKind::KeyTooLarge,
InnerError::Other { .. } => ErrorKind::Internal,
}
}
}

#[derive(Error, Debug)]
pub(crate) enum InnerError {
#[error("too large key, max:{max}, current:{current}")]
KeyTooLarge { current: usize, max: usize },

#[error(transparent)]
Other {
#[from]
source: anyhow::Error,
},
}

0 comments on commit 431ae51

Please sign in to comment.