Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set bloom filter on byte array #3284

Merged
merged 4 commits into from Dec 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 16 additions & 2 deletions parquet/src/arrow/arrow_writer/byte_array.rs
Expand Up @@ -429,6 +429,7 @@ struct ByteArrayEncoder {
dict_encoder: Option<DictEncoder>,
min_value: Option<ByteArray>,
max_value: Option<ByteArray>,
bloom_filter: Option<Sbbf>,
}

impl ColumnValueEncoder for ByteArrayEncoder {
Expand All @@ -453,8 +454,7 @@ impl ColumnValueEncoder for ByteArrayEncoder {
}

fn flush_bloom_filter(&mut self) -> Option<Sbbf> {
// TODO FIX ME need to handle bloom filter in arrow writer
None
self.bloom_filter.take()
}

fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self>
Expand All @@ -467,11 +467,17 @@ impl ColumnValueEncoder for ByteArrayEncoder {

let fallback = FallbackEncoder::new(descr, props)?;

let bloom_filter = props
.bloom_filter_properties(descr.path())
.map(|props| Sbbf::new_with_ndv_fpp(props.ndv, props.fpp))
.transpose()?;

Ok(Self {
fallback,
dict_encoder: dictionary,
min_value: None,
max_value: None,
bloom_filter,
})
}

Expand Down Expand Up @@ -555,6 +561,14 @@ where
}
}

// encode the values into bloom filter if enabled
if let Some(bloom_filter) = &mut encoder.bloom_filter {
let valid = indices.iter().cloned();
for idx in valid {
bloom_filter.insert(values.value(idx).as_ref());
}
}

match &mut encoder.dict_encoder {
Some(dict_encoder) => dict_encoder.encode(values, indices),
None => encoder.fallback.encode(values, indices),
Expand Down
119 changes: 114 additions & 5 deletions parquet/src/arrow/arrow_writer/mod.rs
Expand Up @@ -622,7 +622,8 @@ mod tests {
use crate::basic::Encoding;
use crate::file::metadata::ParquetMetaData;
use crate::file::page_index::index_reader::read_pages_locations;
use crate::file::properties::WriterVersion;
use crate::file::properties::{ReaderProperties, WriterVersion};
use crate::file::serialized_reader::ReadOptionsBuilder;
use crate::file::{
reader::{FileReader, SerializedFileReader},
statistics::Statistics,
Expand Down Expand Up @@ -1269,6 +1270,7 @@ mod tests {
.set_dictionary_enabled(dictionary_size != 0)
.set_dictionary_pagesize_limit(dictionary_size.max(1))
.set_encoding(*encoding)
.set_bloom_filter_enabled(true)
.build();

files.push(roundtrip_opts(&expected_batch, props))
Expand All @@ -1279,17 +1281,17 @@ mod tests {
files
}

fn values_required<A, I>(iter: I)
fn values_required<A, I>(iter: I) -> Vec<File>
where
A: From<Vec<I::Item>> + Array + 'static,
I: IntoIterator,
{
let raw_values: Vec<_> = iter.into_iter().collect();
let values = Arc::new(A::from(raw_values));
one_column_roundtrip(values, false);
one_column_roundtrip(values, false)
}

fn values_optional<A, I>(iter: I)
fn values_optional<A, I>(iter: I) -> Vec<File>
where
A: From<Vec<Option<I::Item>>> + Array + 'static,
I: IntoIterator,
Expand All @@ -1300,7 +1302,7 @@ mod tests {
.map(|(i, v)| if i % 2 == 0 { None } else { Some(v) })
.collect();
let optional_values = Arc::new(A::from(optional_raw_values));
one_column_roundtrip(optional_values, true);
one_column_roundtrip(optional_values, true)
}

fn required_and_optional<A, I>(iter: I)
Expand All @@ -1312,6 +1314,70 @@ mod tests {
values_optional::<A, I>(iter);
}

fn check_bloom_filter<T: AsBytes>(
files: Vec<File>,
file_column: String,
positive_values: Vec<T>,
negative_values: Vec<T>,
) {
files.into_iter().take(1).for_each(|file| {
let file_reader = SerializedFileReader::new_with_options(
file,
ReadOptionsBuilder::new()
.with_reader_properties(
ReaderProperties::builder()
.set_read_bloom_filter(true)
.build(),
)
.build(),
)
.expect("Unable to open file as Parquet");
let metadata = file_reader.metadata();

// Gets bloom filters from all row groups.
let mut bloom_filters: Vec<_> = vec![];
for (ri, row_group) in metadata.row_groups().iter().enumerate() {
if let Some((column_index, _)) = row_group
.columns()
.iter()
.enumerate()
.find(|(_, column)| column.column_path().string() == file_column)
{
let row_group_reader = file_reader
.get_row_group(ri)
.expect("Unable to read row group");
if let Some(sbbf) =
row_group_reader.get_column_bloom_filter(column_index)
{
bloom_filters.push(sbbf.clone());
} else {
panic!("No bloom filter for column named {} found", file_column);
}
} else {
panic!("No column named {} found", file_column);
}
}

positive_values.iter().for_each(|value| {
let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
assert!(
found.is_some(),
"{}",
format!("Value {:?} should be in bloom filter", value.as_bytes())
);
});

negative_values.iter().for_each(|value| {
let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
assert!(
found.is_none(),
"{}",
format!("Value {:?} should not be in bloom filter", value.as_bytes())
);
});
});
}

#[test]
fn all_null_primitive_single_column() {
let values = Arc::new(Int32Array::from(vec![None; SMALL_SIZE]));
Expand Down Expand Up @@ -1528,6 +1594,49 @@ mod tests {
values_required::<BinaryArray, _>(many_vecs_iter);
}

#[test]
fn i32_column_bloom_filter() {
let positive_values: Vec<i32> = (0..SMALL_SIZE as i32).collect();
let files = values_required::<Int32Array, _>(positive_values);
check_bloom_filter(
files,
"col".to_string(),
(0..SMALL_SIZE as i32).collect(),
(SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
);
}

#[test]
fn binary_column_bloom_filter() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we could get a test of nulls and empty strings?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added one test for that.

let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());

let files = values_required::<BinaryArray, _>(many_vecs_iter);
check_bloom_filter(
files,
"col".to_string(),
many_vecs,
vec![vec![(SMALL_SIZE + 1) as u8]],
);
}

#[test]
fn empty_string_null_column_bloom_filter() {
let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
let raw_strs = raw_values.iter().map(|s| s.as_str());

let files = values_optional::<StringArray, _>(raw_strs);

let optional_raw_values: Vec<_> = raw_values
.iter()
.enumerate()
.filter_map(|(i, v)| if i % 2 == 0 { None } else { Some(v.as_str()) })
.collect();
// For null slots, empty string should not be in bloom filter.
check_bloom_filter(files, "col".to_string(), optional_raw_values, vec![""]);
}

#[test]
fn large_binary_single_column() {
let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/bloom_filter/mod.rs
Expand Up @@ -261,7 +261,7 @@ impl Sbbf {
}

/// Insert an [AsBytes] value into the filter
pub fn insert<T: AsBytes>(&mut self, value: &T) {
pub fn insert<T: AsBytes + ?Sized>(&mut self, value: &T) {
self.insert_hash(hash_as_bytes(value));
}

Expand Down