Skip to content

Commit

Permalink
bloom filter part IV: adjust writer properties, bloom filter properti…
Browse files Browse the repository at this point in the history
…es, and incorporate into column encoder (#3165)

* rework bloom filter

1. update number of properties
2. push down hashing to encoder level
3. add more docs

* move bloom filter

* update prompt

* remove unused updates
  • Loading branch information
jimexist committed Nov 24, 2022
1 parent 8ba7842 commit 5640a5b
Show file tree
Hide file tree
Showing 9 changed files with 284 additions and 151 deletions.
2 changes: 1 addition & 1 deletion parquet/src/arrow/array_reader/mod.rs
Expand Up @@ -102,7 +102,7 @@ pub trait RowGroupCollection {
/// Get schema of parquet file.
fn schema(&self) -> SchemaDescPtr;

/// Get the numer of rows in this collection
/// Get the number of rows in this collection
fn num_rows(&self) -> usize;

/// Returns an iterator over the column chunks for particular column
Expand Down
6 changes: 6 additions & 0 deletions parquet/src/arrow/arrow_writer/byte_array.rs
Expand Up @@ -17,6 +17,7 @@

use crate::arrow::arrow_writer::levels::LevelInfo;
use crate::basic::Encoding;
use crate::bloom_filter::Sbbf;
use crate::column::page::PageWriter;
use crate::column::writer::encoder::{
ColumnValueEncoder, DataPageValues, DictionaryPage,
Expand Down Expand Up @@ -451,6 +452,11 @@ impl ColumnValueEncoder for ByteArrayEncoder {
}
}

fn flush_bloom_filter(&mut self) -> Option<Sbbf> {
// TODO FIX ME need to handle bloom filter in arrow writer
None
}

fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self>
where
Self: Sized,
Expand Down
9 changes: 7 additions & 2 deletions parquet/src/bin/parquet-fromcsv-help.txt
Expand Up @@ -37,10 +37,10 @@ Options:
[possible values: lf, crlf, cr]

-e, --escape-char <ESCAPE_CHAR>
escape charactor
escape character

-q, --quote-char <QUOTE_CHAR>
quate charactor
quote character

-D, --double-quote <DOUBLE_QUOTE>
double quote
Expand All @@ -58,6 +58,11 @@ Options:
-m, --max-row-group-size <MAX_ROW_GROUP_SIZE>
max row group size

--enable-bloom-filter <ENABLE_BLOOM_FILTER>
whether to enable bloom filter writing

[possible values: true, false]

--help
display usage help

Expand Down
22 changes: 16 additions & 6 deletions parquet/src/bin/parquet-fromcsv.rs
Expand Up @@ -57,11 +57,11 @@
//!
//! - `-i`, `--input-file` : Path to input CSV file
//! - `-f`, `--input-format` : Dialect for input file, `csv` or `tsv`.
//! - `-d`, `--delimiter : Field delimitor for CSV file, default depends `--input-format`
//! - `-e`, `--escape` : Escape charactor for input file
//! - `-d`, `--delimiter : Field delimiter for CSV file, default depends `--input-format`
//! - `-e`, `--escape` : Escape character for input file
//! - `-h`, `--has-header` : Input has header
//! - `-r`, `--record-terminator` : Record terminator charactor for input. default is CRLF
//! - `-q`, `--quote-char` : Input quoting charactor
//! - `-r`, `--record-terminator` : Record terminator character for input. default is CRLF
//! - `-q`, `--quote-char` : Input quoting character
//!

use std::{
Expand Down Expand Up @@ -182,9 +182,9 @@ struct Args {
delimiter: Option<char>,
#[clap(value_enum, short, long, help("record terminator"))]
record_terminator: Option<RecordTerminator>,
#[clap(short, long, help("escape charactor"))]
#[clap(short, long, help("escape character"))]
escape_char: Option<char>,
#[clap(short, long, help("quate charactor"))]
#[clap(short, long, help("quote character"))]
quote_char: Option<char>,
#[clap(short('D'), long, help("double quote"))]
double_quote: Option<bool>,
Expand All @@ -197,6 +197,8 @@ struct Args {
writer_version: Option<WriterVersion>,
#[clap(short, long, help("max row group size"))]
max_row_group_size: Option<usize>,
#[clap(long, help("whether to enable bloom filter writing"))]
enable_bloom_filter: Option<bool>,

#[clap(long, action=clap::ArgAction::Help, help("display usage help"))]
help: Option<bool>,
Expand Down Expand Up @@ -290,6 +292,10 @@ fn configure_writer_properties(args: &Args) -> WriterProperties {
properties_builder =
properties_builder.set_max_row_group_size(max_row_group_size);
}
if let Some(enable_bloom_filter) = args.enable_bloom_filter {
properties_builder =
properties_builder.set_bloom_filter_enabled(enable_bloom_filter);
}
properties_builder.build()
}

Expand Down Expand Up @@ -548,6 +554,7 @@ mod tests {
parquet_compression: Compression::SNAPPY,
writer_version: None,
max_row_group_size: None,
enable_bloom_filter: None,
help: None,
};
let arrow_schema = Arc::new(Schema::new(vec![
Expand Down Expand Up @@ -582,6 +589,7 @@ mod tests {
parquet_compression: Compression::SNAPPY,
writer_version: None,
max_row_group_size: None,
enable_bloom_filter: None,
help: None,
};
let arrow_schema = Arc::new(Schema::new(vec![
Expand Down Expand Up @@ -636,6 +644,8 @@ mod tests {
parquet_compression: Compression::SNAPPY,
writer_version: None,
max_row_group_size: None,
// by default we shall test bloom filter writing
enable_bloom_filter: Some(true),
help: None,
};
convert_csv_to_parquet(&args).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/bin/parquet-show-bloom-filter.rs
Expand Up @@ -83,7 +83,7 @@ fn main() {
println!(
"Value {} is {} in bloom filter",
value,
if sbbf.check(value.as_str()) {
if sbbf.check(&value.as_str()) {
"present"
} else {
"absent"
Expand Down
43 changes: 25 additions & 18 deletions parquet/src/bloom_filter/mod.rs
Expand Up @@ -16,7 +16,7 @@
// under the License.

//! Bloom filter implementation specific to Parquet, as described
//! in the [spec](https://github.com/apache/parquet-format/blob/master/BloomFilter.md)
//! in the [spec](https://github.com/apache/parquet-format/blob/master/BloomFilter.md).

use crate::data_type::AsBytes;
use crate::errors::ParquetError;
Expand All @@ -35,7 +35,7 @@ use thrift::protocol::{
};
use twox_hash::XxHash64;

/// Salt as defined in the [spec](https://github.com/apache/parquet-format/blob/master/BloomFilter.md#technical-approach)
/// Salt as defined in the [spec](https://github.com/apache/parquet-format/blob/master/BloomFilter.md#technical-approach).
const SALT: [u32; 8] = [
0x47b6137b_u32,
0x44974d91_u32,
Expand Down Expand Up @@ -83,7 +83,9 @@ fn block_check(block: &Block, hash: u32) -> bool {
true
}

/// A split block Bloom filter
/// A split block Bloom filter. The creation of this structure is based on the
/// [`crate::file::properties::BloomFilterProperties`] struct set via [`crate::file::properties::WriterProperties`] and
/// is thus hidden by default.
#[derive(Debug, Clone)]
pub struct Sbbf(Vec<Block>);

Expand Down Expand Up @@ -118,8 +120,8 @@ fn read_bloom_filter_header_and_length(
))
}

const BITSET_MIN_LENGTH: usize = 32;
const BITSET_MAX_LENGTH: usize = 128 * 1024 * 1024;
pub(crate) const BITSET_MIN_LENGTH: usize = 32;
pub(crate) const BITSET_MAX_LENGTH: usize = 128 * 1024 * 1024;

#[inline]
fn optimal_num_of_bytes(num_bytes: usize) -> usize {
Expand All @@ -141,15 +143,20 @@ fn num_of_bits_from_ndv_fpp(ndv: u64, fpp: f64) -> usize {
impl Sbbf {
/// Create a new [Sbbf] with given number of distinct values and false positive probability.
/// Will panic if `fpp` is greater than 1.0 or less than 0.0.
pub fn new_with_ndv_fpp(ndv: u64, fpp: f64) -> Self {
assert!((0.0..-1.0).contains(&fpp), "invalid fpp: {}", fpp);
pub(crate) fn new_with_ndv_fpp(ndv: u64, fpp: f64) -> Result<Self, ParquetError> {
if !(0.0..1.0).contains(&fpp) {
return Err(ParquetError::General(format!(
"False positive probability must be between 0.0 and 1.0, got {}",
fpp
)));
}
let num_bits = num_of_bits_from_ndv_fpp(ndv, fpp);
Self::new_with_num_of_bytes(num_bits / 8)
Ok(Self::new_with_num_of_bytes(num_bits / 8))
}

/// Create a new [Sbbf] with given number of bytes, the exact number of bytes will be adjusted
/// to the next power of two bounded by `BITSET_MIN_LENGTH` and `BITSET_MAX_LENGTH`.
pub fn new_with_num_of_bytes(num_bytes: usize) -> Self {
/// to the next power of two bounded by [BITSET_MIN_LENGTH] and [BITSET_MAX_LENGTH].
pub(crate) fn new_with_num_of_bytes(num_bytes: usize) -> Self {
let num_bytes = optimal_num_of_bytes(num_bytes);
let bitset = vec![0_u8; num_bytes];
Self::new(&bitset)
Expand All @@ -170,7 +177,7 @@ impl Sbbf {
}

/// Write the bloom filter data (header and then bitset) to the output
pub fn write<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
pub(crate) fn write<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
let mut protocol = TCompactOutputProtocol::new(&mut writer);
let header = self.header();
header.write_to_out_protocol(&mut protocol).map_err(|e| {
Expand Down Expand Up @@ -208,7 +215,7 @@ impl Sbbf {
}

/// Read a new bloom filter from the given offset in the given reader.
pub fn read_from_column_chunk<R: ChunkReader>(
pub(crate) fn read_from_column_chunk<R: ChunkReader>(
column_metadata: &ColumnChunkMetaData,
reader: Arc<R>,
) -> Result<Option<Self>, ParquetError> {
Expand Down Expand Up @@ -254,7 +261,7 @@ impl Sbbf {
}

/// Insert an [AsBytes] value into the filter
pub fn insert<T: AsBytes>(&mut self, value: T) {
pub fn insert<T: AsBytes>(&mut self, value: &T) {
self.insert_hash(hash_as_bytes(value));
}

Expand All @@ -266,7 +273,7 @@ impl Sbbf {
}

/// Check if an [AsBytes] value is probably present or definitely absent in the filter
pub fn check<T: AsBytes>(&self, value: T) -> bool {
pub fn check<T: AsBytes>(&self, value: &T) -> bool {
self.check_hash(hash_as_bytes(value))
}

Expand All @@ -284,7 +291,7 @@ impl Sbbf {
const SEED: u64 = 0;

#[inline]
fn hash_as_bytes<A: AsBytes>(value: A) -> u64 {
fn hash_as_bytes<A: AsBytes + ?Sized>(value: &A) -> u64 {
let mut hasher = XxHash64::with_seed(SEED);
hasher.write(value.as_bytes());
hasher.finish()
Expand Down Expand Up @@ -324,8 +331,8 @@ mod tests {
fn test_sbbf_insert_and_check() {
let mut sbbf = Sbbf(vec![[0_u32; 8]; 1_000]);
for i in 0..1_000_000 {
sbbf.insert(i);
assert!(sbbf.check(i));
sbbf.insert(&i);
assert!(sbbf.check(&i));
}
}

Expand All @@ -339,7 +346,7 @@ mod tests {
let sbbf = Sbbf::new(bitset);
for a in 0..10i64 {
let value = format!("a{}", a);
assert!(sbbf.check(value.as_str()));
assert!(sbbf.check(&value.as_str()));
}
}

Expand Down
32 changes: 32 additions & 0 deletions parquet/src/column/writer/encoder.rs
Expand Up @@ -16,6 +16,7 @@
// under the License.

use crate::basic::Encoding;
use crate::bloom_filter::Sbbf;
use crate::column::writer::{
compare_greater, fallback_encoding, has_dictionary_support, is_nan, update_max,
update_min,
Expand All @@ -24,6 +25,7 @@ use crate::data_type::private::ParquetValueType;
use crate::data_type::DataType;
use crate::encodings::encoding::{get_encoder, DictEncoder, Encoder};
use crate::errors::{ParquetError, Result};
use crate::file::properties::BloomFilterProperties;
use crate::file::properties::{EnabledStatistics, WriterProperties};
use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};
use crate::util::memory::ByteBufferPtr;
Expand Down Expand Up @@ -115,6 +117,11 @@ pub trait ColumnValueEncoder {

/// Flush the next data page for this column chunk
fn flush_data_page(&mut self) -> Result<DataPageValues<Self::T>>;

/// Flushes bloom filter if enabled and returns it, otherwise returns `None`. Subsequent writes
/// will *not* be tracked by the bloom filter as it is empty since. This should be called once
/// near the end of encoding.
fn flush_bloom_filter(&mut self) -> Option<Sbbf>;
}

pub struct ColumnValueEncoderImpl<T: DataType> {
Expand All @@ -125,6 +132,7 @@ pub struct ColumnValueEncoderImpl<T: DataType> {
statistics_enabled: EnabledStatistics,
min_value: Option<T::T>,
max_value: Option<T::T>,
bloom_filter: Option<Sbbf>,
}

impl<T: DataType> ColumnValueEncoderImpl<T> {
Expand All @@ -136,6 +144,13 @@ impl<T: DataType> ColumnValueEncoderImpl<T> {
}
}

// encode the values into bloom filter if enabled
if let Some(bloom_filter) = &mut self.bloom_filter {
for value in slice {
bloom_filter.insert(value);
}
}

match &mut self.dict_encoder {
Some(encoder) => encoder.put(slice),
_ => self.encoder.put(slice),
Expand All @@ -161,6 +176,10 @@ impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
}
}

fn flush_bloom_filter(&mut self) -> Option<Sbbf> {
self.bloom_filter.take()
}

fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self> {
let dict_supported = props.dictionary_enabled(descr.path())
&& has_dictionary_support(T::get_physical_type(), props);
Expand All @@ -175,12 +194,25 @@ impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {

let statistics_enabled = props.statistics_enabled(descr.path());

let bloom_filter_enabled = props.bloom_filter_enabled(descr.path());
let bloom_filter =
if let Some(BloomFilterProperties { ndv, fpp }) = bloom_filter_enabled {
Sbbf::new_with_ndv_fpp(ndv, fpp)
.map_err(|e| {
eprintln!("invalid bloom filter properties: {}", e);
})
.ok()
} else {
None
};

Ok(Self {
encoder,
dict_encoder,
descr: descr.clone(),
num_values: 0,
statistics_enabled,
bloom_filter,
min_value: None,
max_value: None,
})
Expand Down

0 comments on commit 5640a5b

Please sign in to comment.