Skip to content

Commit

Permalink
parquet bloom filter part II: read sbbf bitset from row group reader,…
Browse files Browse the repository at this point in the history
… update API, and add cli demo (#3102)

* add feature flag

* add api

* fix reading with chunk reader

* refactor

* add a binary to demo

* add bin

* remove unused

* fix clippy

* adjust byte size

* update read method

* parquet-show-bloom-filter with bloom feature required

* remove extern crate

* get rid of loop read

* refactor to test

* rework api

* remove unused trait

* update help
  • Loading branch information
jimexist committed Nov 16, 2022
1 parent fb167f6 commit f54178a
Show file tree
Hide file tree
Showing 9 changed files with 235 additions and 32 deletions.
4 changes: 4 additions & 0 deletions parquet/Cargo.toml
Expand Up @@ -113,6 +113,10 @@ required-features = ["cli"]
name = "parquet-fromcsv"
required-features = ["arrow", "cli"]

[[bin]]
name = "parquet-show-bloom-filter"
required-features = ["cli", "bloom"]

[[bench]]
name = "arrow_writer"
required-features = ["arrow"]
Expand Down
1 change: 1 addition & 0 deletions parquet/README.md
Expand Up @@ -41,6 +41,7 @@ However, for historical reasons, this crate uses versions with major numbers gre
The `parquet` crate provides the following features which may be enabled in your `Cargo.toml`:

- `arrow` (default) - support for reading / writing [`arrow`](https://crates.io/crates/arrow) arrays to / from parquet
- `bloom` (default) - support for [split block bloom filter](https://github.com/apache/parquet-format/blob/master/BloomFilter.md) for reading from / writing to parquet
- `async` - support `async` APIs for reading parquet
- `json` - support for reading / writing `json` data to / from parquet
- `brotli` (default) - support for parquet using `brotli` compression
Expand Down
2 changes: 0 additions & 2 deletions parquet/src/bin/parquet-read.rs
Expand Up @@ -36,8 +36,6 @@
//! Note that `parquet-read` reads full file schema, no projection or filtering is
//! applied.

extern crate parquet;

use clap::Parser;
use parquet::file::reader::{FileReader, SerializedFileReader};
use parquet::record::Row;
Expand Down
1 change: 0 additions & 1 deletion parquet/src/bin/parquet-rowcount.rs
Expand Up @@ -36,7 +36,6 @@
//! Note that `parquet-rowcount` reads full file schema, no projection or filtering is
//! applied.

extern crate parquet;
use clap::Parser;
use parquet::file::reader::{FileReader, SerializedFileReader};
use std::{fs::File, path::Path};
Expand Down
1 change: 0 additions & 1 deletion parquet/src/bin/parquet-schema.rs
Expand Up @@ -36,7 +36,6 @@
//! Note that `verbose` is an optional boolean flag that allows to print schema only,
//! when not provided or print full file metadata when provided.

extern crate parquet;
use clap::Parser;
use parquet::{
file::reader::{FileReader, SerializedFileReader},
Expand Down
110 changes: 110 additions & 0 deletions parquet/src/bin/parquet-show-bloom-filter.rs
@@ -0,0 +1,110 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Binary file to read bloom filter data from a Parquet file.
//!
//! # Install
//!
//! `parquet-show-bloom-filter` can be installed using `cargo`:
//! ```
//! cargo install parquet --features=cli
//! ```
//! After this `parquet-show-bloom-filter` should be available:
//! ```
//! parquet-show-bloom-filter --file-name XYZ.parquet --column id --values a
//! ```
//!
//! The binary can also be built from the source code and run as follows:
//! ```
//! cargo run --features=cli --bin parquet-show-bloom-filter -- --file-name XYZ.parquet --column id --values a
//! ```

use clap::Parser;
use parquet::file::reader::{FileReader, SerializedFileReader};
use std::{fs::File, path::Path};

#[derive(Debug, Parser)]
#[clap(author, version, about("Binary file to read bloom filter data from a Parquet file"), long_about = None)]
struct Args {
#[clap(short, long, help("Path to the parquet file"))]
file_name: String,
#[clap(
short,
long,
help("Check the bloom filter indexes for the given column")
)]
column: String,
#[clap(
short,
long,
help("Check if the given values match bloom filter, the values will be evaluated as strings"),
required = true
)]
values: Vec<String>,
}

fn main() {
let args = Args::parse();
let file_name = args.file_name;
let path = Path::new(&file_name);
let file = File::open(path).expect("Unable to open file");

let file_reader =
SerializedFileReader::new(file).expect("Unable to open file as Parquet");
let metadata = file_reader.metadata();
for (ri, row_group) in metadata.row_groups().iter().enumerate() {
println!("Row group #{}", ri);
println!("{}", "=".repeat(80));
if let Some((column_index, _)) = row_group
.columns()
.iter()
.enumerate()
.find(|(_, column)| column.column_path().string() == args.column)
{
let row_group_reader = file_reader
.get_row_group(ri)
.expect("Unable to read row group");
if let Some(sbbf) = row_group_reader
.get_column_bloom_filter(column_index)
.expect("Failed to parse bloom filter")
{
args.values.iter().for_each(|value| {
println!(
"Value {} is {} in bloom filter",
value,
if sbbf.check(value.as_str()) {
"present"
} else {
"absent"
}
)
});
}
} else {
println!(
"No column named {} found, candidate columns are: {}",
args.column,
row_group
.columns()
.iter()
.map(|c| c.column_path().string())
.collect::<Vec<_>>()
.join(", ")
);
}
}
}
124 changes: 101 additions & 23 deletions parquet/src/bloom_filter/mod.rs
Expand Up @@ -18,13 +18,16 @@
//! Bloom filter implementation specific to Parquet, as described
//! in the [spec](https://github.com/apache/parquet-format/blob/master/BloomFilter.md)

use crate::data_type::AsBytes;
use crate::errors::ParquetError;
use crate::file::metadata::ColumnChunkMetaData;
use crate::file::reader::ChunkReader;
use crate::format::{
BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash, BloomFilterHeader,
};
use bytes::{Buf, Bytes};
use std::hash::Hasher;
use std::io::{Read, Seek, SeekFrom};
use std::sync::Arc;
use thrift::protocol::{TCompactInputProtocol, TSerializable};
use twox_hash::XxHash64;

Expand Down Expand Up @@ -79,6 +82,37 @@ fn block_check(block: &Block, hash: u32) -> bool {
/// A split block Bloom filter
pub struct Sbbf(Vec<Block>);

const SBBF_HEADER_SIZE_ESTIMATE: usize = 20;

/// given an initial offset, and a [ChunkReader], try to read out a bloom filter header and return
/// both the header and the offset after it (for bitset).
fn chunk_read_bloom_filter_header_and_offset<R: ChunkReader>(
offset: u64,
reader: Arc<R>,
) -> Result<(BloomFilterHeader, u64), ParquetError> {
let buffer = reader.get_bytes(offset as u64, SBBF_HEADER_SIZE_ESTIMATE)?;
let (header, length) = read_bloom_filter_header_and_length(buffer)?;
Ok((header, offset + length))
}

/// given a [Bytes] buffer, try to read out a bloom filter header and return both the header and
/// length of the header.
#[inline]
fn read_bloom_filter_header_and_length(
buffer: Bytes,
) -> Result<(BloomFilterHeader, u64), ParquetError> {
let total_length = buffer.len();
let mut buf_reader = buffer.reader();
let mut prot = TCompactInputProtocol::new(&mut buf_reader);
let header = BloomFilterHeader::read_from_in_protocol(&mut prot).map_err(|e| {
ParquetError::General(format!("Could not read bloom filter header: {}", e))
})?;
Ok((
header,
(total_length - buf_reader.into_inner().remaining()) as u64,
))
}

impl Sbbf {
fn new(bitset: &[u8]) -> Self {
let data = bitset
Expand All @@ -94,17 +128,20 @@ impl Sbbf {
Self(data)
}

pub fn read_from_column_chunk<R: Read + Seek>(
pub fn read_from_column_chunk<R: ChunkReader>(
column_metadata: &ColumnChunkMetaData,
mut reader: &mut R,
) -> Result<Self, ParquetError> {
let offset = column_metadata.bloom_filter_offset().ok_or_else(|| {
ParquetError::General("Bloom filter offset is not set".to_string())
})? as u64;
reader.seek(SeekFrom::Start(offset))?;
// deserialize header
let mut prot = TCompactInputProtocol::new(&mut reader);
let header = BloomFilterHeader::read_from_in_protocol(&mut prot)?;
reader: Arc<R>,
) -> Result<Option<Self>, ParquetError> {
let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
offset.try_into().map_err(|_| {
ParquetError::General("Bloom filter offset is invalid".to_string())
})?
} else {
return Ok(None);
};

let (header, bitset_offset) =
chunk_read_bloom_filter_header_and_offset(offset, reader.clone())?;

match header.algorithm {
BloomFilterAlgorithm::BLOCK(_) => {
Expand All @@ -125,11 +162,8 @@ impl Sbbf {
let length: usize = header.num_bytes.try_into().map_err(|_| {
ParquetError::General("Bloom filter length is invalid".to_string())
})?;
let mut buffer = vec![0_u8; length];
reader.read_exact(&mut buffer).map_err(|e| {
ParquetError::General(format!("Could not read bloom filter: {}", e))
})?;
Ok(Self::new(&buffer))
let bitset = reader.get_bytes(bitset_offset, length)?;
Ok(Some(Self::new(&bitset)))
}

#[inline]
Expand All @@ -139,17 +173,27 @@ impl Sbbf {
(((hash >> 32).saturating_mul(self.0.len() as u64)) >> 32) as usize
}

/// Insert an [AsBytes] value into the filter
pub fn insert<T: AsBytes>(&mut self, value: T) {
self.insert_hash(hash_as_bytes(value));
}

/// Insert a hash into the filter
pub fn insert(&mut self, hash: u64) {
fn insert_hash(&mut self, hash: u64) {
let block_index = self.hash_to_block_index(hash);
let block = &mut self.0[block_index];
block_insert(block, hash as u32);
}

/// Check if an [AsBytes] value is probably present or definitely absent in the filter
pub fn check<T: AsBytes>(&self, value: T) -> bool {
self.check_hash(hash_as_bytes(value))
}

/// Check if a hash is in the filter. May return
/// true for values that was never inserted ("false positive")
/// but will always return false if a hash has not been inserted.
pub fn check(&self, hash: u64) -> bool {
fn check_hash(&self, hash: u64) -> bool {
let block_index = self.hash_to_block_index(hash);
let block = &self.0[block_index];
block_check(block, hash as u32)
Expand All @@ -159,19 +203,24 @@ impl Sbbf {
// per spec we use xxHash with seed=0
const SEED: u64 = 0;

pub fn hash_bytes<A: AsRef<[u8]>>(value: A) -> u64 {
#[inline]
fn hash_as_bytes<A: AsBytes>(value: A) -> u64 {
let mut hasher = XxHash64::with_seed(SEED);
hasher.write(value.as_ref());
hasher.write(value.as_bytes());
hasher.finish()
}

#[cfg(test)]
mod tests {
use super::*;
use crate::format::{
BloomFilterAlgorithm, BloomFilterCompression, SplitBlockAlgorithm, Uncompressed,
XxHash,
};

#[test]
fn test_hash_bytes() {
assert_eq!(hash_bytes(b""), 17241709254077376921);
assert_eq!(hash_as_bytes(""), 17241709254077376921);
}

#[test]
Expand Down Expand Up @@ -210,8 +259,37 @@ mod tests {
let sbbf = Sbbf::new(bitset);
for a in 0..10i64 {
let value = format!("a{}", a);
let hash = hash_bytes(value);
assert!(sbbf.check(hash));
assert!(sbbf.check(value.as_str()));
}
}

/// test the assumption that bloom filter header size should not exceed SBBF_HEADER_SIZE_ESTIMATE
/// essentially we are testing that the struct is packed with 4 i32 fields, each can be 1-5 bytes
/// so altogether it'll be 20 bytes at most.
#[test]
fn test_bloom_filter_header_size_assumption() {
let buffer: &[u8; 16] =
&[21, 64, 28, 28, 0, 0, 28, 28, 0, 0, 28, 28, 0, 0, 0, 99];
let (
BloomFilterHeader {
algorithm,
compression,
hash,
num_bytes,
},
read_length,
) = read_bloom_filter_header_and_length(Bytes::copy_from_slice(buffer)).unwrap();
assert_eq!(read_length, 15);
assert_eq!(
algorithm,
BloomFilterAlgorithm::BLOCK(SplitBlockAlgorithm {})
);
assert_eq!(
compression,
BloomFilterCompression::UNCOMPRESSED(Uncompressed {})
);
assert_eq!(hash, BloomFilterHash::XXHASH(XxHash {}));
assert_eq!(num_bytes, 32_i32);
assert_eq!(20, SBBF_HEADER_SIZE_ESTIMATE);
}
}
6 changes: 6 additions & 0 deletions parquet/src/file/reader.rs
Expand Up @@ -21,6 +21,8 @@
use bytes::Bytes;
use std::{boxed::Box, io::Read, sync::Arc};

#[cfg(feature = "bloom")]
use crate::bloom_filter::Sbbf;
use crate::column::page::PageIterator;
use crate::column::{page::PageReader, reader::ColumnReader};
use crate::errors::{ParquetError, Result};
Expand Down Expand Up @@ -143,6 +145,10 @@ pub trait RowGroupReader: Send + Sync {
Ok(col_reader)
}

#[cfg(feature = "bloom")]
/// Get bloom filter for the `i`th column chunk, if present.
fn get_column_bloom_filter(&self, i: usize) -> Result<Option<Sbbf>>;

/// Get iterator of `Row`s from this row group.
///
/// Projected schema can be a subset of or equal to the file schema, when it is None,
Expand Down

0 comments on commit f54178a

Please sign in to comment.