Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jiacai2050 committed Nov 7, 2022
1 parent 016d9e8 commit 9badf7e
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 28 deletions.
26 changes: 14 additions & 12 deletions analytic_engine/src/sst/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,14 @@ pub enum Error {
StorageFormatOptionsNotFound { backtrace: Backtrace },

#[snafu(display("Bloom filter options are not found.\nBacktrace\n:{}", backtrace))]
BloomFitlerNotFound { backtrace: Backtrace },
BloomFilterNotFound { backtrace: Backtrace },

#[snafu(display(
"Bloom filter should be 256 byte, current:{}.\nBacktrace\n:{}",
size,
backtrace
))]
InvalidBloomFilter { size: usize, backtrace: Backtrace },
InvalidBloomFilterSize { size: usize, backtrace: Backtrace },

#[snafu(display("Failed to convert time range, err:{}", source))]
ConvertTimeRange { source: common_types::time::Error },
Expand Down Expand Up @@ -448,21 +448,23 @@ impl BloomFilter {
pub fn new(filters: Vec<Vec<Bloom>>) -> Self {
Self { filters }
}
}

pub fn to_pb(&self) -> sst_pb::SstBloomFilter {
let filters = self
impl From<BloomFilter> for sst_pb::SstBloomFilter {
fn from(bloom_filter: BloomFilter) -> Self {
let row_group_filters = bloom_filter
.filters
.iter()
.map(|row_group_filter| {
let bloom_filter = row_group_filter
let column_filters = row_group_filter
.iter()
.map(|column_filter| column_filter.data().to_vec())
.collect::<Vec<_>>();
sst_pb::sst_bloom_filter::RowGroupFilter { bloom_filter }
sst_pb::sst_bloom_filter::RowGroupFilter { column_filters }
})
.collect::<Vec<_>>();

sst_pb::SstBloomFilter { filters }
sst_pb::SstBloomFilter { row_group_filters }
}
}

Expand All @@ -471,18 +473,18 @@ impl TryFrom<sst_pb::SstBloomFilter> for BloomFilter {

fn try_from(src: sst_pb::SstBloomFilter) -> Result<Self> {
let filters = src
.filters
.row_group_filters
.into_iter()
.map(|row_group_filter| {
row_group_filter
.bloom_filter
.column_filters
.into_iter()
.map(|encoded_bytes| {
let size = encoded_bytes.len();
let bs: [u8; 256] = encoded_bytes
.try_into()
.ok()
.with_context(|| InvalidBloomFilter { size })?;
.context(InvalidBloomFilterSize { size })?;

Ok(Bloom::from(bs))
})
Expand Down Expand Up @@ -529,7 +531,7 @@ impl From<SstMetaData> for sst_pb::SstMetaData {
size: src.size,
row_num: src.row_num,
storage_format_opts: Some(src.storage_format_opts.into()),
bloom_filter: Some(src.bloom_filter.to_pb()),
bloom_filter: Some(src.bloom_filter.into()),
}
}
}
Expand All @@ -551,7 +553,7 @@ impl TryFrom<sst_pb::SstMetaData> for SstMetaData {
.context(StorageFormatOptionsNotFound)?,
);
let bloom_filter = {
let pb_filter = src.bloom_filter.context(BloomFitlerNotFound)?;
let pb_filter = src.bloom_filter.context(BloomFilterNotFound)?;
BloomFilter::try_from(pb_filter)?
};

Expand Down
26 changes: 13 additions & 13 deletions analytic_engine/src/sst/parquet/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,17 @@ struct RecordBytesReader {
compression: Compression,
meta_data: SstMetaData,
total_row_num: Arc<AtomicUsize>,
fetched_record_batch: Vec<Vec<RecordBatchWithKey>>,
// Record batch partitioned by given `num_rows_per_row_group`
// There may be more than one `RecordBatchWithKey` inside each partition
partitioned_record_batch: Vec<Vec<RecordBatchWithKey>>,
}

impl RecordBytesReader {
// Partition record batch stream into batch vector with given
// `num_rows_per_row_group`
async fn partition_record_batch(&mut self) -> Result<()> {
let mut fetched_row_num = 0;
let mut pending_record_batch: Option<Vec<RecordBatchWithKey>> = None;
let mut pending_record_batch: Vec<RecordBatchWithKey> = Default::default();

while let Some(record_batch) = self.record_stream.next().await {
let record_batch = record_batch.context(PollRecordBatch)?;
Expand All @@ -75,28 +77,26 @@ impl RecordBytesReader {
);

fetched_row_num += record_batch.num_rows();
pending_record_batch
.get_or_insert_with(Default::default)
.push(record_batch);
pending_record_batch.push(record_batch);

// reach batch limit, append to self and reset counter and pending batch
if fetched_row_num >= self.num_rows_per_row_group {
fetched_row_num = 0;
self.fetched_record_batch
.push(pending_record_batch.take().unwrap());
self.partitioned_record_batch
.push(std::mem::take(&mut pending_record_batch));
}
}

if let Some(remaining) = pending_record_batch.take() {
self.fetched_record_batch.push(remaining);
if !pending_record_batch.is_empty() {
self.partitioned_record_batch.push(pending_record_batch);
}

Ok(())
}

fn build_bloom_filter(&self) -> BloomFilter {
let filters = self
.fetched_record_batch
.partitioned_record_batch
.iter()
.map(|row_group_batch| {
let mut row_group_filters =
Expand All @@ -106,7 +106,7 @@ impl RecordBytesReader {
for (col_idx, column) in partial_batch.columns().iter().enumerate() {
for row in 0..column.num_rows() {
let datum = column.datum(row);
let bytes = datum.as_bytes();
let bytes = datum.to_bytes();
row_group_filters[col_idx].accrue(Input::Raw(&bytes));
}
}
Expand Down Expand Up @@ -134,7 +134,7 @@ impl RecordBytesReader {

// process record batch stream
let mut arrow_record_batch_vec = Vec::new();
for record_batches in self.fetched_record_batch {
for record_batches in self.partitioned_record_batch {
for batch in record_batches {
arrow_record_batch_vec.push(batch.into_record_batch().into_arrow_record_batch());
}
Expand Down Expand Up @@ -178,7 +178,7 @@ impl<'a> SstBuilder for ParquetSstBuilder<'a> {
total_row_num: total_row_num.clone(),
// TODO(xikai): should we avoid this clone?
meta_data: meta.to_owned(),
fetched_record_batch: Default::default(),
partitioned_record_batch: Default::default(),
};
let bytes = reader.read_all().await?;
self.storage
Expand Down
2 changes: 1 addition & 1 deletion common_types/src/datum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ impl Datum {
}
}

pub fn as_bytes(&self) -> Vec<u8> {
pub fn to_bytes(&self) -> Vec<u8> {
match self {
Datum::Double(v) => v.to_le_bytes().to_vec(),
Datum::Float(v) => v.to_le_bytes().to_vec(),
Expand Down
4 changes: 2 additions & 2 deletions proto/protos/sst.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import "analytic_common.proto";

message SstBloomFilter {
message RowGroupFilter {
repeated bytes bloom_filter = 1;
repeated bytes column_filters = 1;
};

repeated RowGroupFilter filters = 1;
repeated RowGroupFilter row_group_filters = 1;
}

message SstMetaData {
Expand Down

0 comments on commit 9badf7e

Please sign in to comment.