Skip to content

Commit

Permalink
write out to bloom filter
Browse files Browse the repository at this point in the history
  • Loading branch information
jimexist committed Nov 15, 2022
1 parent 63fa643 commit 9b8a0f5
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 2 deletions.
1 change: 1 addition & 0 deletions parquet/src/bloom_filter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ fn block_check(block: &Block, hash: u32) -> bool {
}

/// A split block Bloom filter
#[derive(Debug, Clone)]
pub struct Sbbf(Vec<Block>);

const SBBF_HEADER_SIZE_ESTIMATE: usize = 20;
Expand Down
15 changes: 15 additions & 0 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
// under the License.

//! Contains column writer API.

#[cfg(feature = "bloom")]
use crate::bloom_filter::Sbbf;
use crate::format::{ColumnIndex, OffsetIndex};
use std::collections::{BTreeSet, VecDeque};

Expand Down Expand Up @@ -154,6 +157,9 @@ pub struct ColumnCloseResult {
pub rows_written: u64,
/// Metadata for this column chunk
pub metadata: ColumnChunkMetaData,
/// Optional bloom filter for this column
#[cfg(feature = "bloom")]
pub bloom_filter: Option<Sbbf>,
/// Optional column index, for filtering
pub column_index: Option<ColumnIndex>,
/// Optional offset index, identifying page locations
Expand Down Expand Up @@ -209,6 +215,10 @@ pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> {
rep_levels_sink: Vec<i16>,
data_pages: VecDeque<CompressedPage>,

// bloom filter
#[cfg(feature = "bloom")]
bloom_filter: Option<Sbbf>,

// column index and offset index
column_index_builder: ColumnIndexBuilder,
offset_index_builder: OffsetIndexBuilder,
Expand Down Expand Up @@ -260,6 +270,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
num_column_nulls: 0,
column_distinct_count: None,
},
// TODO!
#[cfg(feature = "bloom")]
bloom_filter: None,
column_index_builder: ColumnIndexBuilder::new(),
offset_index_builder: OffsetIndexBuilder::new(),
encodings,
Expand Down Expand Up @@ -458,6 +471,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
Ok(ColumnCloseResult {
bytes_written: self.column_metrics.total_bytes_written,
rows_written: self.column_metrics.total_rows_written,
#[cfg(feature = "bloom")]
bloom_filter: self.bloom_filter,
metadata,
column_index,
offset_index,
Expand Down
45 changes: 43 additions & 2 deletions parquet/src/file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
//! Contains file writer API, and provides methods to write row groups and columns by
//! using row group writers and column writers respectively.

use std::{io::Write, sync::Arc};

#[cfg(feature = "bloom")]
use crate::bloom_filter::Sbbf;
use crate::format as parquet;
use crate::format::{ColumnIndex, OffsetIndex, RowGroup};
use std::{io::Write, sync::Arc};
use thrift::protocol::{TCompactOutputProtocol, TOutputProtocol, TSerializable};

use crate::basic::PageType;
Expand Down Expand Up @@ -116,6 +117,8 @@ pub struct SerializedFileWriter<W: Write> {
descr: SchemaDescPtr,
props: WriterPropertiesPtr,
row_groups: Vec<RowGroupMetaDataPtr>,
#[cfg(feature = "bloom")]
bloom_filters: Vec<Vec<Option<Sbbf>>>,
column_indexes: Vec<Vec<Option<ColumnIndex>>>,
offset_indexes: Vec<Vec<Option<OffsetIndex>>>,
row_group_index: usize,
Expand All @@ -132,6 +135,8 @@ impl<W: Write> SerializedFileWriter<W> {
descr: Arc::new(SchemaDescriptor::new(schema)),
props: properties,
row_groups: vec![],
#[cfg(feature = "bloom")]
bloom_filters: vec![],
column_indexes: Vec::new(),
offset_indexes: Vec::new(),
row_group_index: 0,
Expand Down Expand Up @@ -212,6 +217,32 @@ impl<W: Write> SerializedFileWriter<W> {
Ok(())
}

#[cfg(feature = "bloom")]
/// Serialize all the bloom filter to the file
fn write_bloom_filters(&mut self, row_groups: &mut [RowGroup]) -> Result<()> {
// iter row group
// iter each column
// write bloom filter to the file
for (row_group_idx, row_group) in row_groups.iter_mut().enumerate() {
for (column_idx, column_metadata) in row_group.columns.iter_mut().enumerate()
{
match &self.bloom_filters[row_group_idx][column_idx] {
Some(bloom_filter) => {
let start_offset = self.buf.bytes_written();
let mut protocol = TCompactOutputProtocol::new(&mut self.buf);
bloom_filter.write_to_out_protocol(&mut protocol)?;
protocol.flush()?;
let end_offset = self.buf.bytes_written();
// set offset and index for bloom filter
column_metadata.bloom_filter_offset = Some(start_offset as i64);
}
None => {}
}
}
}
Ok(())
}

/// Serialize all the column index to the file
fn write_column_indexes(&mut self, row_groups: &mut [RowGroup]) -> Result<()> {
// iter row group
Expand Down Expand Up @@ -250,6 +281,8 @@ impl<W: Write> SerializedFileWriter<W> {
.map(|v| v.to_thrift())
.collect::<Vec<_>>();

#[cfg(feature = "bloom")]
self.write_bloom_filters(&mut row_groups)?;
// Write column indexes and offset indexes
self.write_column_indexes(&mut row_groups)?;
self.write_offset_indexes(&mut row_groups)?;
Expand Down Expand Up @@ -320,6 +353,8 @@ pub struct SerializedRowGroupWriter<'a, W: Write> {
column_index: usize,
row_group_metadata: Option<RowGroupMetaDataPtr>,
column_chunks: Vec<ColumnChunkMetaData>,
#[cfg(feature = "bloom")]
bloom_filters: Vec<Option<Sbbf>>,
column_indexes: Vec<Option<ColumnIndex>>,
offset_indexes: Vec<Option<OffsetIndex>>,
on_close: Option<OnCloseRowGroup<'a>>,
Expand Down Expand Up @@ -348,6 +383,8 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> {
column_index: 0,
row_group_metadata: None,
column_chunks: Vec::with_capacity(num_columns),
#[cfg(feature = "bloom")]
bloom_filters: Vec::with_capacity(num_columns),
column_indexes: Vec::with_capacity(num_columns),
offset_indexes: Vec::with_capacity(num_columns),
total_bytes_written: 0,
Expand Down Expand Up @@ -380,13 +417,17 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> {
let column_chunks = &mut self.column_chunks;
let column_indexes = &mut self.column_indexes;
let offset_indexes = &mut self.offset_indexes;
#[cfg(feature = "bloom")]
let bloom_filters = &mut self.bloom_filters;

let on_close = |r: ColumnCloseResult| {
// Update row group writer metrics
*total_bytes_written += r.bytes_written;
column_chunks.push(r.metadata);
column_indexes.push(r.column_index);
offset_indexes.push(r.offset_index);
#[cfg(feature = "bloom")]
bloom_filters.push(r.bloom_filter);

if let Some(rows) = *total_rows_written {
if rows != r.rows_written {
Expand Down

0 comments on commit 9b8a0f5

Please sign in to comment.