Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parquet bloom filter part II: read sbbf bitset from row group reader, update API, and add cli demo #3102

Merged
merged 17 commits into from Nov 16, 2022
4 changes: 4 additions & 0 deletions parquet/Cargo.toml
Expand Up @@ -113,6 +113,10 @@ required-features = ["cli"]
name = "parquet-fromcsv"
required-features = ["arrow", "cli"]

[[bin]]
name = "parquet-show-bloom-filter"
required-features = ["cli", "bloom"]

[[bench]]
name = "arrow_writer"
required-features = ["arrow"]
Expand Down
1 change: 1 addition & 0 deletions parquet/README.md
Expand Up @@ -41,6 +41,7 @@ However, for historical reasons, this crate uses versions with major numbers gre
The `parquet` crate provides the following features which may be enabled in your `Cargo.toml`:

- `arrow` (default) - support for reading / writing [`arrow`](https://crates.io/crates/arrow) arrays to / from parquet
- `bloom` (default) - support for [split block bloom filter](https://github.com/apache/parquet-format/blob/master/BloomFilter.md) for reading from / writing to parquet
- `async` - support `async` APIs for reading parquet
- `json` - support for reading / writing `json` data to / from parquet
- `brotli` (default) - support for parquet using `brotli` compression
Expand Down
2 changes: 0 additions & 2 deletions parquet/src/bin/parquet-read.rs
Expand Up @@ -36,8 +36,6 @@
//! Note that `parquet-read` reads full file schema, no projection or filtering is
//! applied.

extern crate parquet;

use clap::Parser;
use parquet::file::reader::{FileReader, SerializedFileReader};
use parquet::record::Row;
Expand Down
1 change: 0 additions & 1 deletion parquet/src/bin/parquet-rowcount.rs
Expand Up @@ -36,7 +36,6 @@
//! Note that `parquet-rowcount` reads full file schema, no projection or filtering is
//! applied.

extern crate parquet;
use clap::Parser;
use parquet::file::reader::{FileReader, SerializedFileReader};
use std::{fs::File, path::Path};
Expand Down
1 change: 0 additions & 1 deletion parquet/src/bin/parquet-schema.rs
Expand Up @@ -36,7 +36,6 @@
//! Note that `verbose` is an optional boolean flag that allows to print schema only,
//! when not provided or print full file metadata when provided.

extern crate parquet;
use clap::Parser;
use parquet::{
file::reader::{FileReader, SerializedFileReader},
Expand Down
110 changes: 110 additions & 0 deletions parquet/src/bin/parquet-show-bloom-filter.rs
@@ -0,0 +1,110 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Binary file to read bloom filter data from a Parquet file.
//!
//! # Install
//!
//! `parquet-show-bloom-filter` can be installed using `cargo`:
//! ```
//! cargo install parquet --features=cli
//! ```
//! After this `parquet-show-bloom-filter` should be available:
//! ```
//! parquet-show-bloom-filter --file-name XYZ.parquet --column id --values a
//! ```
//!
//! The binary can also be built from the source code and run as follows:
//! ```
//! cargo run --features=cli --bin parquet-show-bloom-filter -- --file-name XYZ.parquet --column id --values a
//! ```

use clap::Parser;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could add the ability to dump bloom filters to to https://github.com/apache/arrow-rs/blob/master/parquet/src/bin/parquet-schema.rs rather than make a new executable

I don't feel strongly however

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use parquet::file::reader::{FileReader, SerializedFileReader};
use std::{fs::File, path::Path};

#[derive(Debug, Parser)]
#[clap(author, version, about("Binary file to read bloom filter data from a Parquet file"), long_about = None)]
struct Args {
#[clap(short, long, help("Path to the parquet file"))]
file_name: String,
#[clap(
short,
long,
help("Check the bloom filter indexes for the given column")
)]
column: String,
#[clap(
short,
long,
help("Check if the given values match bloom filter, the values will be evaluated as strings"),
required = true
)]
values: Vec<String>,
}

fn main() {
let args = Args::parse();
let file_name = args.file_name;
let path = Path::new(&file_name);
let file = File::open(path).expect("Unable to open file");

let file_reader =
SerializedFileReader::new(file).expect("Unable to open file as Parquet");
let metadata = file_reader.metadata();
for (ri, row_group) in metadata.row_groups().iter().enumerate() {
println!("Row group #{}", ri);
println!("{}", "=".repeat(80));
if let Some((column_index, _)) = row_group
.columns()
.iter()
.enumerate()
.find(|(_, column)| column.column_path().string() == args.column)
{
let row_group_reader = file_reader
.get_row_group(ri)
.expect("Unable to read row group");
if let Some(sbbf) = row_group_reader
.get_column_bloom_filter(column_index)
.expect("Failed to parse bloom filter")
{
args.values.iter().for_each(|value| {
println!(
"Value {} is {} in bloom filter",
value,
if sbbf.check(value.as_str()) {
"present"
} else {
"absent"
}
)
});
}
} else {
println!(
"No column named {} found, candidate columns are: {}",
args.column,
row_group
.columns()
.iter()
.map(|c| c.column_path().string())
.collect::<Vec<_>>()
.join(", ")
);
}
}
}
124 changes: 101 additions & 23 deletions parquet/src/bloom_filter/mod.rs
Expand Up @@ -18,13 +18,16 @@
//! Bloom filter implementation specific to Parquet, as described
//! in the [spec](https://github.com/apache/parquet-format/blob/master/BloomFilter.md)

use crate::data_type::AsBytes;
use crate::errors::ParquetError;
use crate::file::metadata::ColumnChunkMetaData;
use crate::file::reader::ChunkReader;
use crate::format::{
BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash, BloomFilterHeader,
};
use bytes::{Buf, Bytes};
use std::hash::Hasher;
use std::io::{Read, Seek, SeekFrom};
use std::sync::Arc;
use thrift::protocol::{TCompactInputProtocol, TSerializable};
use twox_hash::XxHash64;

Expand Down Expand Up @@ -79,6 +82,37 @@ fn block_check(block: &Block, hash: u32) -> bool {
/// A split block Bloom filter
pub struct Sbbf(Vec<Block>);

const SBBF_HEADER_SIZE_ESTIMATE: usize = 20;

/// given an initial offset, and a [ChunkReader], try to read out a bloom filter header and return
/// both the header and the offset after it (for bitset).
fn chunk_read_bloom_filter_header_and_offset<R: ChunkReader>(
offset: u64,
reader: Arc<R>,
) -> Result<(BloomFilterHeader, u64), ParquetError> {
let buffer = reader.get_bytes(offset as u64, SBBF_HEADER_SIZE_ESTIMATE)?;
let (header, length) = read_bloom_filter_header_and_length(buffer)?;
Ok((header, offset + length))
}

/// given a [Bytes] buffer, try to read out a bloom filter header and return both the header and
/// length of the header.
#[inline]
fn read_bloom_filter_header_and_length(
buffer: Bytes,
) -> Result<(BloomFilterHeader, u64), ParquetError> {
let total_length = buffer.len();
let mut buf_reader = buffer.reader();
let mut prot = TCompactInputProtocol::new(&mut buf_reader);
let header = BloomFilterHeader::read_from_in_protocol(&mut prot).map_err(|e| {
ParquetError::General(format!("Could not read bloom filter header: {}", e))
})?;
Ok((
header,
(total_length - buf_reader.into_inner().remaining()) as u64,
))
}

impl Sbbf {
fn new(bitset: &[u8]) -> Self {
let data = bitset
Expand All @@ -94,17 +128,20 @@ impl Sbbf {
Self(data)
}

pub fn read_from_column_chunk<R: Read + Seek>(
pub fn read_from_column_chunk<R: ChunkReader>(
column_metadata: &ColumnChunkMetaData,
mut reader: &mut R,
) -> Result<Self, ParquetError> {
let offset = column_metadata.bloom_filter_offset().ok_or_else(|| {
ParquetError::General("Bloom filter offset is not set".to_string())
})? as u64;
reader.seek(SeekFrom::Start(offset))?;
// deserialize header
let mut prot = TCompactInputProtocol::new(&mut reader);
let header = BloomFilterHeader::read_from_in_protocol(&mut prot)?;
reader: Arc<R>,
) -> Result<Option<Self>, ParquetError> {
let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
offset.try_into().map_err(|_| {
ParquetError::General("Bloom filter offset is invalid".to_string())
})?
} else {
return Ok(None);
};

let (header, bitset_offset) =
chunk_read_bloom_filter_header_and_offset(offset, reader.clone())?;

match header.algorithm {
BloomFilterAlgorithm::BLOCK(_) => {
Expand All @@ -125,11 +162,8 @@ impl Sbbf {
let length: usize = header.num_bytes.try_into().map_err(|_| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is unfortunate that we need to do more than one read to potentially read a bloom filter (read the bloom header and then read the length). I think @tustvold noted this as a limitation in the parquet file format itself (that the file metadata only has the bloom filter starting offset, but not its length)

Perhaps the reader abstraction can hide most/all of this nonsense from us

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can take a look at https://github.com/apache/arrow-rs/pull/3119/files#diff-3b307348aabe465890fa39973e9fda0243bd2344cb7cb9cdf02ac2d39521d7caR232-R236 which should show how it works - similar to writing column offsets and indices

ParquetError::General("Bloom filter length is invalid".to_string())
})?;
let mut buffer = vec![0_u8; length];
reader.read_exact(&mut buffer).map_err(|e| {
ParquetError::General(format!("Could not read bloom filter: {}", e))
})?;
Ok(Self::new(&buffer))
let bitset = reader.get_bytes(bitset_offset, length)?;
Ok(Some(Self::new(&bitset)))
}

#[inline]
Expand All @@ -139,17 +173,27 @@ impl Sbbf {
(((hash >> 32).saturating_mul(self.0.len() as u64)) >> 32) as usize
}

/// Insert an [AsBytes] value into the filter
pub fn insert<T: AsBytes>(&mut self, value: T) {
self.insert_hash(hash_as_bytes(value));
}

/// Insert a hash into the filter
pub fn insert(&mut self, hash: u64) {
fn insert_hash(&mut self, hash: u64) {
let block_index = self.hash_to_block_index(hash);
let block = &mut self.0[block_index];
block_insert(block, hash as u32);
}

/// Check if an [AsBytes] value is probably present or definitely absent in the filter
pub fn check<T: AsBytes>(&self, value: T) -> bool {
self.check_hash(hash_as_bytes(value))
}

/// Check if a hash is in the filter. May return
/// true for values that was never inserted ("false positive")
/// but will always return false if a hash has not been inserted.
pub fn check(&self, hash: u64) -> bool {
fn check_hash(&self, hash: u64) -> bool {
let block_index = self.hash_to_block_index(hash);
let block = &self.0[block_index];
block_check(block, hash as u32)
Expand All @@ -159,19 +203,24 @@ impl Sbbf {
// per spec we use xxHash with seed=0
const SEED: u64 = 0;

pub fn hash_bytes<A: AsRef<[u8]>>(value: A) -> u64 {
#[inline]
fn hash_as_bytes<A: AsBytes>(value: A) -> u64 {
let mut hasher = XxHash64::with_seed(SEED);
hasher.write(value.as_ref());
hasher.write(value.as_bytes());
hasher.finish()
}

#[cfg(test)]
mod tests {
use super::*;
use crate::format::{
BloomFilterAlgorithm, BloomFilterCompression, SplitBlockAlgorithm, Uncompressed,
XxHash,
};

#[test]
fn test_hash_bytes() {
assert_eq!(hash_bytes(b""), 17241709254077376921);
assert_eq!(hash_as_bytes(""), 17241709254077376921);
}

#[test]
Expand Down Expand Up @@ -210,8 +259,37 @@ mod tests {
let sbbf = Sbbf::new(bitset);
for a in 0..10i64 {
let value = format!("a{}", a);
let hash = hash_bytes(value);
assert!(sbbf.check(hash));
assert!(sbbf.check(value.as_str()));
}
}

/// test the assumption that bloom filter header size should not exceed SBBF_HEADER_SIZE_ESTIMATE
/// essentially we are testing that the struct is packed with 4 i32 fields, each can be 1-5 bytes
/// so altogether it'll be 20 bytes at most.
#[test]
fn test_bloom_filter_header_size_assumption() {
let buffer: &[u8; 16] =
&[21, 64, 28, 28, 0, 0, 28, 28, 0, 0, 28, 28, 0, 0, 0, 99];
let (
BloomFilterHeader {
algorithm,
compression,
hash,
num_bytes,
},
read_length,
) = read_bloom_filter_header_and_length(Bytes::copy_from_slice(buffer)).unwrap();
assert_eq!(read_length, 15);
assert_eq!(
algorithm,
BloomFilterAlgorithm::BLOCK(SplitBlockAlgorithm {})
);
assert_eq!(
compression,
BloomFilterCompression::UNCOMPRESSED(Uncompressed {})
);
assert_eq!(hash, BloomFilterHash::XXHASH(XxHash {}));
assert_eq!(num_bytes, 32_i32);
assert_eq!(20, SBBF_HEADER_SIZE_ESTIMATE);
}
}
6 changes: 6 additions & 0 deletions parquet/src/file/reader.rs
Expand Up @@ -21,6 +21,8 @@
use bytes::Bytes;
use std::{boxed::Box, io::Read, sync::Arc};

#[cfg(feature = "bloom")]
use crate::bloom_filter::Sbbf;
use crate::column::page::PageIterator;
use crate::column::{page::PageReader, reader::ColumnReader};
use crate::errors::{ParquetError, Result};
Expand Down Expand Up @@ -143,6 +145,10 @@ pub trait RowGroupReader: Send + Sync {
Ok(col_reader)
}

#[cfg(feature = "bloom")]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about handling this in the same eager way that we handle page indexes, namely add an option to ReadOptions to enable reading bloom filters, and read this data in SerializedFileReader?

Copy link
Member Author

@jimexist jimexist Nov 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tustvold are you also suggesting dropping the feature gate altogether and enable it by default? I added the feature gate trying to reduce binary size but then if the feature is very likely to be used there's no need for this gate any more.

Copy link
Contributor

@tustvold tustvold Nov 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm suggesting rather than providing a lazy API to read the bloom filter on demand, provide an API to make SerializedReader load blook filters as part of ParquetMetadata if the corresponding feature and ReadOption is enabled. Similar to how we handle the page index.

This is necessary to be able to support object stores, and is generally a good idea to avoid lots of small IO reads.

Copy link
Member Author

@jimexist jimexist Nov 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also thanks for the suggestion, I agree with this direction, however I have:

coming up, so I'd like to maybe merge this as is and quickly follow up on this after it is merged. do you think that works?

/// Get bloom filter for the `i`th column chunk, if present.
fn get_column_bloom_filter(&self, i: usize) -> Result<Option<Sbbf>>;

/// Get iterator of `Row`s from this row group.
///
/// Projected schema can be a subset of or equal to the file schema, when it is None,
Expand Down