From 7116ed1099a3c0f5fc9acfa04a924375b758d965 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 6 Dec 2022 11:19:40 -0800 Subject: [PATCH 1/4] Set bloom filter on byte array --- parquet/src/arrow/arrow_writer/byte_array.rs | 19 ++++- parquet/src/arrow/arrow_writer/mod.rs | 80 +++++++++++++++++++- parquet/src/data_type.rs | 6 ++ 3 files changed, 99 insertions(+), 6 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs b/parquet/src/arrow/arrow_writer/byte_array.rs index d870ac54fe4..50ea57a9282 100644 --- a/parquet/src/arrow/arrow_writer/byte_array.rs +++ b/parquet/src/arrow/arrow_writer/byte_array.rs @@ -429,6 +429,7 @@ struct ByteArrayEncoder { dict_encoder: Option, min_value: Option, max_value: Option, + bloom_filter: Option, } impl ColumnValueEncoder for ByteArrayEncoder { @@ -453,8 +454,7 @@ impl ColumnValueEncoder for ByteArrayEncoder { } fn flush_bloom_filter(&mut self) -> Option { - // TODO FIX ME need to handle bloom filter in arrow writer - None + self.bloom_filter.take() } fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result @@ -467,11 +467,17 @@ impl ColumnValueEncoder for ByteArrayEncoder { let fallback = FallbackEncoder::new(descr, props)?; + let bloom_filter = props + .bloom_filter_properties(descr.path()) + .map(|props| Sbbf::new_with_ndv_fpp(props.ndv, props.fpp)) + .transpose()?; + Ok(Self { fallback, dict_encoder: dictionary, min_value: None, max_value: None, + bloom_filter, }) } @@ -543,7 +549,7 @@ impl ColumnValueEncoder for ByteArrayEncoder { fn encode(values: T, indices: &[usize], encoder: &mut ByteArrayEncoder) where T: ArrayAccessor + Copy, - T::Item: Copy + Ord + AsRef<[u8]>, + T::Item: Copy + Ord + AsRef<[u8]> + AsBytes, { if let Some((min, max)) = compute_min_max(values, indices.iter().cloned()) { if encoder.min_value.as_ref().map_or(true, |m| m > &min) { @@ -555,6 +561,13 @@ where } } + // encode the values into bloom filter if enabled + if let Some(bloom_filter) = &mut encoder.bloom_filter { + for idx in 0..values.len() { + bloom_filter.insert(&values.value(idx)); + } + } + match &mut encoder.dict_encoder { Some(dict_encoder) => dict_encoder.encode(values, indices), None => encoder.fallback.encode(values, indices), diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index ecb59e93e2f..af71b04f01a 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -622,7 +622,8 @@ mod tests { use crate::basic::Encoding; use crate::file::metadata::ParquetMetaData; use crate::file::page_index::index_reader::read_pages_locations; - use crate::file::properties::WriterVersion; + use crate::file::properties::{ReaderProperties, WriterVersion}; + use crate::file::serialized_reader::ReadOptionsBuilder; use crate::file::{ reader::{FileReader, SerializedFileReader}, statistics::Statistics, @@ -1269,6 +1270,7 @@ mod tests { .set_dictionary_enabled(dictionary_size != 0) .set_dictionary_pagesize_limit(dictionary_size.max(1)) .set_encoding(*encoding) + .set_bloom_filter_enabled(true) .build(); files.push(roundtrip_opts(&expected_batch, props)) @@ -1279,14 +1281,14 @@ mod tests { files } - fn values_required(iter: I) + fn values_required(iter: I) -> Vec where A: From> + Array + 'static, I: IntoIterator, { let raw_values: Vec<_> = iter.into_iter().collect(); let values = Arc::new(A::from(raw_values)); - one_column_roundtrip(values, false); + one_column_roundtrip(values, false) } fn values_optional(iter: I) @@ -1312,6 +1314,57 @@ mod tests { values_optional::(iter); } + fn check_bloom_filter( + files: Vec, + file_column: String, + negative_values: Vec, + ) { + files.into_iter().for_each(|file| { + let file_reader = SerializedFileReader::new_with_options( + file, + ReadOptionsBuilder::new() + .with_reader_properties( + ReaderProperties::builder() + .set_read_bloom_filter(true) + .build(), + ) + .build(), + ) + .expect("Unable to open file as Parquet"); + let metadata = file_reader.metadata(); + for (ri, row_group) in metadata.row_groups().iter().enumerate() { + if let Some((column_index, _)) = row_group + .columns() + .iter() + .enumerate() + .find(|(_, column)| column.column_path().string() == file_column) + { + let row_group_reader = file_reader + .get_row_group(ri) + .expect("Unable to read row group"); + if let Some(sbbf) = + row_group_reader.get_column_bloom_filter(column_index) + { + negative_values.iter().for_each(|value| { + assert!( + !sbbf.check(value), + "{}", + format!( + "Value {:?} should not be in bloom filter", + value.as_bytes() + ) + ); + }); + } else { + panic!("No bloom filter for column named {} found", file_column); + } + } else { + panic!("No column named {} found", file_column); + } + } + }); + } + #[test] fn all_null_primitive_single_column() { let values = Arc::new(Int32Array::from(vec![None; SMALL_SIZE])); @@ -1528,6 +1581,27 @@ mod tests { values_required::(many_vecs_iter); } + #[test] + fn i32_column_bloom_filter() { + let positive_values: Vec = (0..SMALL_SIZE as i32).collect(); + let files = values_required::(positive_values); + check_bloom_filter( + files, + "col".to_string(), + (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(), + ); + } + + #[test] + fn binary_column_bloom_filter() { + let one_vec: Vec = (0..SMALL_SIZE as u8).collect(); + let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect(); + let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice()); + + let files = values_required::(many_vecs_iter); + check_bloom_filter(files, "col".to_string(), vec![vec![(SMALL_SIZE + 1) as u8]]); + } + #[test] fn large_binary_single_column() { let one_vec: Vec = (0..SMALL_SIZE as u8).collect(); diff --git a/parquet/src/data_type.rs b/parquet/src/data_type.rs index 3e423a41562..4da7f4ad147 100644 --- a/parquet/src/data_type.rs +++ b/parquet/src/data_type.rs @@ -448,6 +448,12 @@ impl AsBytes for [u8] { } } +impl AsBytes for &[u8] { + fn as_bytes(&self) -> &[u8] { + self + } +} + macro_rules! gen_as_bytes { ($source_ty:ident) => { impl AsBytes for $source_ty { From bd92427e050aa59ec2b42d824679a5d70a0e9faf Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 6 Dec 2022 15:25:21 -0800 Subject: [PATCH 2/4] Check positive values --- parquet/src/arrow/arrow_writer/mod.rs | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index af71b04f01a..8a6158cdfbe 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -1317,6 +1317,7 @@ mod tests { fn check_bloom_filter( files: Vec, file_column: String, + positive_values: Vec, negative_values: Vec, ) { files.into_iter().for_each(|file| { @@ -1345,6 +1346,18 @@ mod tests { if let Some(sbbf) = row_group_reader.get_column_bloom_filter(column_index) { + if row_group.num_rows() >= positive_values.len() as i64 { + positive_values.iter().for_each(|value| { + assert!( + sbbf.check(value), + "{}", + format!( + "Value {:?} should be in bloom filter", + value.as_bytes() + ) + ); + }); + } negative_values.iter().for_each(|value| { assert!( !sbbf.check(value), @@ -1588,6 +1601,7 @@ mod tests { check_bloom_filter( files, "col".to_string(), + (0..SMALL_SIZE as i32).collect(), (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(), ); } @@ -1599,7 +1613,12 @@ mod tests { let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice()); let files = values_required::(many_vecs_iter); - check_bloom_filter(files, "col".to_string(), vec![vec![(SMALL_SIZE + 1) as u8]]); + check_bloom_filter( + files, + "col".to_string(), + many_vecs, + vec![vec![(SMALL_SIZE + 1) as u8]], + ); } #[test] From 888e05271adab7e6d1d73c326abd1466883f37d6 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 8 Dec 2022 00:48:48 -0800 Subject: [PATCH 3/4] For review --- parquet/src/arrow/arrow_writer/byte_array.rs | 7 +- parquet/src/arrow/arrow_writer/mod.rs | 67 ++++++++++++-------- parquet/src/bloom_filter/mod.rs | 2 +- parquet/src/data_type.rs | 6 -- 4 files changed, 47 insertions(+), 35 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs b/parquet/src/arrow/arrow_writer/byte_array.rs index 50ea57a9282..c3a9f83d15f 100644 --- a/parquet/src/arrow/arrow_writer/byte_array.rs +++ b/parquet/src/arrow/arrow_writer/byte_array.rs @@ -549,7 +549,7 @@ impl ColumnValueEncoder for ByteArrayEncoder { fn encode(values: T, indices: &[usize], encoder: &mut ByteArrayEncoder) where T: ArrayAccessor + Copy, - T::Item: Copy + Ord + AsRef<[u8]> + AsBytes, + T::Item: Copy + Ord + AsRef<[u8]>, { if let Some((min, max)) = compute_min_max(values, indices.iter().cloned()) { if encoder.min_value.as_ref().map_or(true, |m| m > &min) { @@ -563,8 +563,9 @@ where // encode the values into bloom filter if enabled if let Some(bloom_filter) = &mut encoder.bloom_filter { - for idx in 0..values.len() { - bloom_filter.insert(&values.value(idx)); + let valid = indices.iter().cloned(); + for idx in valid { + bloom_filter.insert(values.value(idx).as_ref()); } } diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 8a6158cdfbe..0ad8e72819d 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -1291,7 +1291,7 @@ mod tests { one_column_roundtrip(values, false) } - fn values_optional(iter: I) + fn values_optional(iter: I) -> Vec where A: From>> + Array + 'static, I: IntoIterator, @@ -1302,7 +1302,7 @@ mod tests { .map(|(i, v)| if i % 2 == 0 { None } else { Some(v) }) .collect(); let optional_values = Arc::new(A::from(optional_raw_values)); - one_column_roundtrip(optional_values, true); + one_column_roundtrip(optional_values, true) } fn required_and_optional(iter: I) @@ -1320,7 +1320,7 @@ mod tests { positive_values: Vec, negative_values: Vec, ) { - files.into_iter().for_each(|file| { + files.into_iter().take(1).for_each(|file| { let file_reader = SerializedFileReader::new_with_options( file, ReadOptionsBuilder::new() @@ -1333,6 +1333,9 @@ mod tests { ) .expect("Unable to open file as Parquet"); let metadata = file_reader.metadata(); + + // Gets bloom filters from all row groups. + let mut bloom_filters: Vec<_> = vec![]; for (ri, row_group) in metadata.row_groups().iter().enumerate() { if let Some((column_index, _)) = row_group .columns() @@ -1346,28 +1349,7 @@ mod tests { if let Some(sbbf) = row_group_reader.get_column_bloom_filter(column_index) { - if row_group.num_rows() >= positive_values.len() as i64 { - positive_values.iter().for_each(|value| { - assert!( - sbbf.check(value), - "{}", - format!( - "Value {:?} should be in bloom filter", - value.as_bytes() - ) - ); - }); - } - negative_values.iter().for_each(|value| { - assert!( - !sbbf.check(value), - "{}", - format!( - "Value {:?} should not be in bloom filter", - value.as_bytes() - ) - ); - }); + bloom_filters.push(sbbf.clone()); } else { panic!("No bloom filter for column named {} found", file_column); } @@ -1375,6 +1357,24 @@ mod tests { panic!("No column named {} found", file_column); } } + + positive_values.iter().for_each(|value| { + let found = bloom_filters.iter().find(|sbbf| sbbf.check(value)); + assert!( + found.is_some(), + "{}", + format!("Value {:?} should be in bloom filter", value.as_bytes()) + ); + }); + + negative_values.iter().for_each(|value| { + let found = bloom_filters.iter().find(|sbbf| sbbf.check(value)); + assert!( + found.is_none(), + "{}", + format!("Value {:?} should not be in bloom filter", value.as_bytes()) + ); + }); }); } @@ -1621,6 +1621,23 @@ mod tests { ); } + #[test] + fn empty_string_null_column_bloom_filter() { + let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect(); + let raw_strs = raw_values.iter().map(|s| s.as_str()); + + let files = values_optional::(raw_strs); + + let optional_raw_values: Vec<_> = raw_values + .iter() + .enumerate() + .map(|(i, v)| if i % 2 == 0 { None } else { Some(v.as_str()) }) + .flatten() + .collect(); + // For null slots, empty string should not be in bloom filter. + check_bloom_filter(files, "col".to_string(), optional_raw_values, vec![""]); + } + #[test] fn large_binary_single_column() { let one_vec: Vec = (0..SMALL_SIZE as u8).collect(); diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs index 15c38cf5915..5bb89bf3f4d 100644 --- a/parquet/src/bloom_filter/mod.rs +++ b/parquet/src/bloom_filter/mod.rs @@ -261,7 +261,7 @@ impl Sbbf { } /// Insert an [AsBytes] value into the filter - pub fn insert(&mut self, value: &T) { + pub fn insert(&mut self, value: &T) { self.insert_hash(hash_as_bytes(value)); } diff --git a/parquet/src/data_type.rs b/parquet/src/data_type.rs index 4da7f4ad147..3e423a41562 100644 --- a/parquet/src/data_type.rs +++ b/parquet/src/data_type.rs @@ -448,12 +448,6 @@ impl AsBytes for [u8] { } } -impl AsBytes for &[u8] { - fn as_bytes(&self) -> &[u8] { - self - } -} - macro_rules! gen_as_bytes { ($source_ty:ident) => { impl AsBytes for $source_ty { From 0ec862129617bf34b16adde50f9db62cdb8c343a Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 8 Dec 2022 14:35:01 +0000 Subject: [PATCH 4/4] Clippy --- parquet/src/arrow/arrow_writer/mod.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 0ad8e72819d..a609b992a39 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -1631,8 +1631,7 @@ mod tests { let optional_raw_values: Vec<_> = raw_values .iter() .enumerate() - .map(|(i, v)| if i % 2 == 0 { None } else { Some(v.as_str()) }) - .flatten() + .filter_map(|(i, v)| if i % 2 == 0 { None } else { Some(v.as_str()) }) .collect(); // For null slots, empty string should not be in bloom filter. check_bloom_filter(files, "col".to_string(), optional_raw_values, vec![""]);