Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bloom filter part IV: adjust writer properties, bloom filter properties, and incorporate into column encoder #3165

Merged
merged 4 commits into from Nov 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion parquet/src/arrow/array_reader/mod.rs
Expand Up @@ -102,7 +102,7 @@ pub trait RowGroupCollection {
/// Get schema of parquet file.
fn schema(&self) -> SchemaDescPtr;

/// Get the numer of rows in this collection
/// Get the number of rows in this collection
fn num_rows(&self) -> usize;

/// Returns an iterator over the column chunks for particular column
Expand Down
6 changes: 6 additions & 0 deletions parquet/src/arrow/arrow_writer/byte_array.rs
Expand Up @@ -17,6 +17,7 @@

use crate::arrow::arrow_writer::levels::LevelInfo;
use crate::basic::Encoding;
use crate::bloom_filter::Sbbf;
use crate::column::page::PageWriter;
use crate::column::writer::encoder::{
ColumnValueEncoder, DataPageValues, DictionaryPage,
Expand Down Expand Up @@ -451,6 +452,11 @@ impl ColumnValueEncoder for ByteArrayEncoder {
}
}

fn flush_bloom_filter(&mut self) -> Option<Sbbf> {
// TODO FIX ME need to handle bloom filter in arrow writer
None
}

fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self>
where
Self: Sized,
Expand Down
9 changes: 7 additions & 2 deletions parquet/src/bin/parquet-fromcsv-help.txt
Expand Up @@ -37,10 +37,10 @@ Options:
[possible values: lf, crlf, cr]

-e, --escape-char <ESCAPE_CHAR>
escape charactor
escape character

-q, --quote-char <QUOTE_CHAR>
quate charactor
quote character

-D, --double-quote <DOUBLE_QUOTE>
double quote
Expand All @@ -58,6 +58,11 @@ Options:
-m, --max-row-group-size <MAX_ROW_GROUP_SIZE>
max row group size

--enable-bloom-filter <ENABLE_BLOOM_FILTER>
whether to enable bloom filter writing

[possible values: true, false]

--help
display usage help

Expand Down
22 changes: 16 additions & 6 deletions parquet/src/bin/parquet-fromcsv.rs
Expand Up @@ -57,11 +57,11 @@
//!
//! - `-i`, `--input-file` : Path to input CSV file
//! - `-f`, `--input-format` : Dialect for input file, `csv` or `tsv`.
//! - `-d`, `--delimiter : Field delimitor for CSV file, default depends `--input-format`
//! - `-e`, `--escape` : Escape charactor for input file
//! - `-d`, `--delimiter : Field delimiter for CSV file, default depends `--input-format`
//! - `-e`, `--escape` : Escape character for input file
//! - `-h`, `--has-header` : Input has header
//! - `-r`, `--record-terminator` : Record terminator charactor for input. default is CRLF
//! - `-q`, `--quote-char` : Input quoting charactor
//! - `-r`, `--record-terminator` : Record terminator character for input. default is CRLF
//! - `-q`, `--quote-char` : Input quoting character
//!

use std::{
Expand Down Expand Up @@ -182,9 +182,9 @@ struct Args {
delimiter: Option<char>,
#[clap(value_enum, short, long, help("record terminator"))]
record_terminator: Option<RecordTerminator>,
#[clap(short, long, help("escape charactor"))]
#[clap(short, long, help("escape character"))]
escape_char: Option<char>,
#[clap(short, long, help("quate charactor"))]
#[clap(short, long, help("quote character"))]
quote_char: Option<char>,
#[clap(short('D'), long, help("double quote"))]
double_quote: Option<bool>,
Expand All @@ -197,6 +197,8 @@ struct Args {
writer_version: Option<WriterVersion>,
#[clap(short, long, help("max row group size"))]
max_row_group_size: Option<usize>,
#[clap(long, help("whether to enable bloom filter writing"))]
enable_bloom_filter: Option<bool>,

#[clap(long, action=clap::ArgAction::Help, help("display usage help"))]
help: Option<bool>,
Expand Down Expand Up @@ -290,6 +292,10 @@ fn configure_writer_properties(args: &Args) -> WriterProperties {
properties_builder =
properties_builder.set_max_row_group_size(max_row_group_size);
}
if let Some(enable_bloom_filter) = args.enable_bloom_filter {
properties_builder =
properties_builder.set_bloom_filter_enabled(enable_bloom_filter);
}
properties_builder.build()
}

Expand Down Expand Up @@ -548,6 +554,7 @@ mod tests {
parquet_compression: Compression::SNAPPY,
writer_version: None,
max_row_group_size: None,
enable_bloom_filter: None,
help: None,
};
let arrow_schema = Arc::new(Schema::new(vec![
Expand Down Expand Up @@ -582,6 +589,7 @@ mod tests {
parquet_compression: Compression::SNAPPY,
writer_version: None,
max_row_group_size: None,
enable_bloom_filter: None,
help: None,
};
let arrow_schema = Arc::new(Schema::new(vec![
Expand Down Expand Up @@ -636,6 +644,8 @@ mod tests {
parquet_compression: Compression::SNAPPY,
writer_version: None,
max_row_group_size: None,
// by default we shall test bloom filter writing
enable_bloom_filter: Some(true),
help: None,
};
convert_csv_to_parquet(&args).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/bin/parquet-show-bloom-filter.rs
Expand Up @@ -83,7 +83,7 @@ fn main() {
println!(
"Value {} is {} in bloom filter",
value,
if sbbf.check(value.as_str()) {
if sbbf.check(&value.as_str()) {
"present"
} else {
"absent"
Expand Down
43 changes: 25 additions & 18 deletions parquet/src/bloom_filter/mod.rs
Expand Up @@ -16,7 +16,7 @@
// under the License.

//! Bloom filter implementation specific to Parquet, as described
//! in the [spec](https://github.com/apache/parquet-format/blob/master/BloomFilter.md)
//! in the [spec](https://github.com/apache/parquet-format/blob/master/BloomFilter.md).

use crate::data_type::AsBytes;
use crate::errors::ParquetError;
Expand All @@ -35,7 +35,7 @@ use thrift::protocol::{
};
use twox_hash::XxHash64;

/// Salt as defined in the [spec](https://github.com/apache/parquet-format/blob/master/BloomFilter.md#technical-approach)
/// Salt as defined in the [spec](https://github.com/apache/parquet-format/blob/master/BloomFilter.md#technical-approach).
const SALT: [u32; 8] = [
0x47b6137b_u32,
0x44974d91_u32,
Expand Down Expand Up @@ -83,7 +83,9 @@ fn block_check(block: &Block, hash: u32) -> bool {
true
}

/// A split block Bloom filter
/// A split block Bloom filter. The creation of this structure is based on the
/// [`crate::file::properties::BloomFilterProperties`] struct set via [`crate::file::properties::WriterProperties`] and
/// is thus hidden by default.
#[derive(Debug, Clone)]
pub struct Sbbf(Vec<Block>);

Expand Down Expand Up @@ -118,8 +120,8 @@ fn read_bloom_filter_header_and_length(
))
}

const BITSET_MIN_LENGTH: usize = 32;
const BITSET_MAX_LENGTH: usize = 128 * 1024 * 1024;
pub(crate) const BITSET_MIN_LENGTH: usize = 32;
pub(crate) const BITSET_MAX_LENGTH: usize = 128 * 1024 * 1024;

#[inline]
fn optimal_num_of_bytes(num_bytes: usize) -> usize {
Expand All @@ -141,15 +143,20 @@ fn num_of_bits_from_ndv_fpp(ndv: u64, fpp: f64) -> usize {
impl Sbbf {
/// Create a new [Sbbf] with given number of distinct values and false positive probability.
/// Will panic if `fpp` is greater than 1.0 or less than 0.0.
pub fn new_with_ndv_fpp(ndv: u64, fpp: f64) -> Self {
assert!((0.0..-1.0).contains(&fpp), "invalid fpp: {}", fpp);
pub(crate) fn new_with_ndv_fpp(ndv: u64, fpp: f64) -> Result<Self, ParquetError> {
if !(0.0..1.0).contains(&fpp) {
return Err(ParquetError::General(format!(
"False positive probability must be between 0.0 and 1.0, got {}",
fpp
)));
}
let num_bits = num_of_bits_from_ndv_fpp(ndv, fpp);
Self::new_with_num_of_bytes(num_bits / 8)
Ok(Self::new_with_num_of_bytes(num_bits / 8))
}

/// Create a new [Sbbf] with given number of bytes, the exact number of bytes will be adjusted
/// to the next power of two bounded by `BITSET_MIN_LENGTH` and `BITSET_MAX_LENGTH`.
pub fn new_with_num_of_bytes(num_bytes: usize) -> Self {
/// to the next power of two bounded by [BITSET_MIN_LENGTH] and [BITSET_MAX_LENGTH].
pub(crate) fn new_with_num_of_bytes(num_bytes: usize) -> Self {
let num_bytes = optimal_num_of_bytes(num_bytes);
let bitset = vec![0_u8; num_bytes];
Self::new(&bitset)
Expand All @@ -170,7 +177,7 @@ impl Sbbf {
}

/// Write the bloom filter data (header and then bitset) to the output
pub fn write<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
pub(crate) fn write<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
let mut protocol = TCompactOutputProtocol::new(&mut writer);
let header = self.header();
header.write_to_out_protocol(&mut protocol).map_err(|e| {
Expand Down Expand Up @@ -208,7 +215,7 @@ impl Sbbf {
}

/// Read a new bloom filter from the given offset in the given reader.
pub fn read_from_column_chunk<R: ChunkReader>(
pub(crate) fn read_from_column_chunk<R: ChunkReader>(
column_metadata: &ColumnChunkMetaData,
reader: Arc<R>,
) -> Result<Option<Self>, ParquetError> {
Expand Down Expand Up @@ -254,7 +261,7 @@ impl Sbbf {
}

/// Insert an [AsBytes] value into the filter
pub fn insert<T: AsBytes>(&mut self, value: T) {
pub fn insert<T: AsBytes>(&mut self, value: &T) {
self.insert_hash(hash_as_bytes(value));
}

Expand All @@ -266,7 +273,7 @@ impl Sbbf {
}

/// Check if an [AsBytes] value is probably present or definitely absent in the filter
pub fn check<T: AsBytes>(&self, value: T) -> bool {
pub fn check<T: AsBytes>(&self, value: &T) -> bool {
self.check_hash(hash_as_bytes(value))
}

Expand All @@ -284,7 +291,7 @@ impl Sbbf {
const SEED: u64 = 0;

#[inline]
fn hash_as_bytes<A: AsBytes>(value: A) -> u64 {
fn hash_as_bytes<A: AsBytes + ?Sized>(value: &A) -> u64 {
let mut hasher = XxHash64::with_seed(SEED);
hasher.write(value.as_bytes());
hasher.finish()
Expand Down Expand Up @@ -324,8 +331,8 @@ mod tests {
fn test_sbbf_insert_and_check() {
let mut sbbf = Sbbf(vec![[0_u32; 8]; 1_000]);
for i in 0..1_000_000 {
sbbf.insert(i);
assert!(sbbf.check(i));
sbbf.insert(&i);
assert!(sbbf.check(&i));
}
}

Expand All @@ -339,7 +346,7 @@ mod tests {
let sbbf = Sbbf::new(bitset);
for a in 0..10i64 {
let value = format!("a{}", a);
assert!(sbbf.check(value.as_str()));
assert!(sbbf.check(&value.as_str()));
}
}

Expand Down
32 changes: 32 additions & 0 deletions parquet/src/column/writer/encoder.rs
Expand Up @@ -16,6 +16,7 @@
// under the License.

use crate::basic::Encoding;
use crate::bloom_filter::Sbbf;
use crate::column::writer::{
compare_greater, fallback_encoding, has_dictionary_support, is_nan, update_max,
update_min,
Expand All @@ -24,6 +25,7 @@ use crate::data_type::private::ParquetValueType;
use crate::data_type::DataType;
use crate::encodings::encoding::{get_encoder, DictEncoder, Encoder};
use crate::errors::{ParquetError, Result};
use crate::file::properties::BloomFilterProperties;
use crate::file::properties::{EnabledStatistics, WriterProperties};
use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};
use crate::util::memory::ByteBufferPtr;
Expand Down Expand Up @@ -115,6 +117,11 @@ pub trait ColumnValueEncoder {

/// Flush the next data page for this column chunk
fn flush_data_page(&mut self) -> Result<DataPageValues<Self::T>>;

/// Flushes bloom filter if enabled and returns it, otherwise returns `None`. Subsequent writes
/// will *not* be tracked by the bloom filter as it is empty since. This should be called once
/// near the end of encoding.
fn flush_bloom_filter(&mut self) -> Option<Sbbf>;
}

pub struct ColumnValueEncoderImpl<T: DataType> {
Expand All @@ -125,6 +132,7 @@ pub struct ColumnValueEncoderImpl<T: DataType> {
statistics_enabled: EnabledStatistics,
min_value: Option<T::T>,
max_value: Option<T::T>,
bloom_filter: Option<Sbbf>,
}

impl<T: DataType> ColumnValueEncoderImpl<T> {
Expand All @@ -136,6 +144,13 @@ impl<T: DataType> ColumnValueEncoderImpl<T> {
}
}

// encode the values into bloom filter if enabled
if let Some(bloom_filter) = &mut self.bloom_filter {
for value in slice {
bloom_filter.insert(value);
}
}

match &mut self.dict_encoder {
Some(encoder) => encoder.put(slice),
_ => self.encoder.put(slice),
Expand All @@ -161,6 +176,10 @@ impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
}
}

fn flush_bloom_filter(&mut self) -> Option<Sbbf> {
self.bloom_filter.take()
}

fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self> {
let dict_supported = props.dictionary_enabled(descr.path())
&& has_dictionary_support(T::get_physical_type(), props);
Expand All @@ -175,12 +194,25 @@ impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {

let statistics_enabled = props.statistics_enabled(descr.path());

let bloom_filter_enabled = props.bloom_filter_enabled(descr.path());
let bloom_filter =
if let Some(BloomFilterProperties { ndv, fpp }) = bloom_filter_enabled {
Sbbf::new_with_ndv_fpp(ndv, fpp)
.map_err(|e| {
eprintln!("invalid bloom filter properties: {}", e);
})
.ok()
} else {
None
};

Ok(Self {
encoder,
dict_encoder,
descr: descr.clone(),
num_values: 0,
statistics_enabled,
bloom_filter,
min_value: None,
max_value: None,
})
Expand Down