diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index a5d43bf54bf..fc7c8218ad0 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -113,6 +113,10 @@ required-features = ["cli"] name = "parquet-fromcsv" required-features = ["arrow", "cli"] +[[bin]] +name = "parquet-show-bloom-filter" +required-features = ["cli", "bloom"] + [[bench]] name = "arrow_writer" required-features = ["arrow"] diff --git a/parquet/README.md b/parquet/README.md index d904fc64e74..c9245b08211 100644 --- a/parquet/README.md +++ b/parquet/README.md @@ -41,6 +41,7 @@ However, for historical reasons, this crate uses versions with major numbers gre The `parquet` crate provides the following features which may be enabled in your `Cargo.toml`: - `arrow` (default) - support for reading / writing [`arrow`](https://crates.io/crates/arrow) arrays to / from parquet +- `bloom` (default) - support for [split block bloom filter](https://github.com/apache/parquet-format/blob/master/BloomFilter.md) for reading from / writing to parquet - `async` - support `async` APIs for reading parquet - `json` - support for reading / writing `json` data to / from parquet - `brotli` (default) - support for parquet using `brotli` compression diff --git a/parquet/src/bin/parquet-read.rs b/parquet/src/bin/parquet-read.rs index cf8009956e2..117f9ee0b17 100644 --- a/parquet/src/bin/parquet-read.rs +++ b/parquet/src/bin/parquet-read.rs @@ -36,8 +36,6 @@ //! Note that `parquet-read` reads full file schema, no projection or filtering is //! applied. -extern crate parquet; - use clap::Parser; use parquet::file::reader::{FileReader, SerializedFileReader}; use parquet::record::Row; diff --git a/parquet/src/bin/parquet-rowcount.rs b/parquet/src/bin/parquet-rowcount.rs index 491f582c510..5069d4b2543 100644 --- a/parquet/src/bin/parquet-rowcount.rs +++ b/parquet/src/bin/parquet-rowcount.rs @@ -36,7 +36,6 @@ //! Note that `parquet-rowcount` reads full file schema, no projection or filtering is //! applied. -extern crate parquet; use clap::Parser; use parquet::file::reader::{FileReader, SerializedFileReader}; use std::{fs::File, path::Path}; diff --git a/parquet/src/bin/parquet-schema.rs b/parquet/src/bin/parquet-schema.rs index cd8e7692203..ff7798a91cd 100644 --- a/parquet/src/bin/parquet-schema.rs +++ b/parquet/src/bin/parquet-schema.rs @@ -36,7 +36,6 @@ //! Note that `verbose` is an optional boolean flag that allows to print schema only, //! when not provided or print full file metadata when provided. -extern crate parquet; use clap::Parser; use parquet::{ file::reader::{FileReader, SerializedFileReader}, diff --git a/parquet/src/bin/parquet-show-bloom-filter.rs b/parquet/src/bin/parquet-show-bloom-filter.rs new file mode 100644 index 00000000000..a4dbdbe67de --- /dev/null +++ b/parquet/src/bin/parquet-show-bloom-filter.rs @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Binary file to read bloom filter data from a Parquet file. +//! +//! # Install +//! +//! `parquet-show-bloom-filter` can be installed using `cargo`: +//! ``` +//! cargo install parquet --features=cli +//! ``` +//! After this `parquet-show-bloom-filter` should be available: +//! ``` +//! parquet-show-bloom-filter --file-name XYZ.parquet --column id --values a +//! ``` +//! +//! The binary can also be built from the source code and run as follows: +//! ``` +//! cargo run --features=cli --bin parquet-show-bloom-filter -- --file-name XYZ.parquet --column id --values a +//! ``` + +use clap::Parser; +use parquet::file::reader::{FileReader, SerializedFileReader}; +use std::{fs::File, path::Path}; + +#[derive(Debug, Parser)] +#[clap(author, version, about("Binary file to read bloom filter data from a Parquet file"), long_about = None)] +struct Args { + #[clap(short, long, help("Path to the parquet file"))] + file_name: String, + #[clap( + short, + long, + help("Check the bloom filter indexes for the given column") + )] + column: String, + #[clap( + short, + long, + help("Check if the given values match bloom filter, the values will be evaluated as strings"), + required = true + )] + values: Vec, +} + +fn main() { + let args = Args::parse(); + let file_name = args.file_name; + let path = Path::new(&file_name); + let file = File::open(path).expect("Unable to open file"); + + let file_reader = + SerializedFileReader::new(file).expect("Unable to open file as Parquet"); + let metadata = file_reader.metadata(); + for (ri, row_group) in metadata.row_groups().iter().enumerate() { + println!("Row group #{}", ri); + println!("{}", "=".repeat(80)); + if let Some((column_index, _)) = row_group + .columns() + .iter() + .enumerate() + .find(|(_, column)| column.column_path().string() == args.column) + { + let row_group_reader = file_reader + .get_row_group(ri) + .expect("Unable to read row group"); + if let Some(sbbf) = row_group_reader + .get_column_bloom_filter(column_index) + .expect("Failed to parse bloom filter") + { + args.values.iter().for_each(|value| { + println!( + "Value {} is {} in bloom filter", + value, + if sbbf.check(value.as_str()) { + "present" + } else { + "absent" + } + ) + }); + } + } else { + println!( + "No column named {} found, candidate columns are: {}", + args.column, + row_group + .columns() + .iter() + .map(|c| c.column_path().string()) + .collect::>() + .join(", ") + ); + } + } +} diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs index adfd87307ac..4944a93f848 100644 --- a/parquet/src/bloom_filter/mod.rs +++ b/parquet/src/bloom_filter/mod.rs @@ -18,13 +18,16 @@ //! Bloom filter implementation specific to Parquet, as described //! in the [spec](https://github.com/apache/parquet-format/blob/master/BloomFilter.md) +use crate::data_type::AsBytes; use crate::errors::ParquetError; use crate::file::metadata::ColumnChunkMetaData; +use crate::file::reader::ChunkReader; use crate::format::{ BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash, BloomFilterHeader, }; +use bytes::{Buf, Bytes}; use std::hash::Hasher; -use std::io::{Read, Seek, SeekFrom}; +use std::sync::Arc; use thrift::protocol::{TCompactInputProtocol, TSerializable}; use twox_hash::XxHash64; @@ -79,6 +82,37 @@ fn block_check(block: &Block, hash: u32) -> bool { /// A split block Bloom filter pub struct Sbbf(Vec); +const SBBF_HEADER_SIZE_ESTIMATE: usize = 20; + +/// given an initial offset, and a [ChunkReader], try to read out a bloom filter header and return +/// both the header and the offset after it (for bitset). +fn chunk_read_bloom_filter_header_and_offset( + offset: u64, + reader: Arc, +) -> Result<(BloomFilterHeader, u64), ParquetError> { + let buffer = reader.get_bytes(offset as u64, SBBF_HEADER_SIZE_ESTIMATE)?; + let (header, length) = read_bloom_filter_header_and_length(buffer)?; + Ok((header, offset + length)) +} + +/// given a [Bytes] buffer, try to read out a bloom filter header and return both the header and +/// length of the header. +#[inline] +fn read_bloom_filter_header_and_length( + buffer: Bytes, +) -> Result<(BloomFilterHeader, u64), ParquetError> { + let total_length = buffer.len(); + let mut buf_reader = buffer.reader(); + let mut prot = TCompactInputProtocol::new(&mut buf_reader); + let header = BloomFilterHeader::read_from_in_protocol(&mut prot).map_err(|e| { + ParquetError::General(format!("Could not read bloom filter header: {}", e)) + })?; + Ok(( + header, + (total_length - buf_reader.into_inner().remaining()) as u64, + )) +} + impl Sbbf { fn new(bitset: &[u8]) -> Self { let data = bitset @@ -94,17 +128,20 @@ impl Sbbf { Self(data) } - pub fn read_from_column_chunk( + pub fn read_from_column_chunk( column_metadata: &ColumnChunkMetaData, - mut reader: &mut R, - ) -> Result { - let offset = column_metadata.bloom_filter_offset().ok_or_else(|| { - ParquetError::General("Bloom filter offset is not set".to_string()) - })? as u64; - reader.seek(SeekFrom::Start(offset))?; - // deserialize header - let mut prot = TCompactInputProtocol::new(&mut reader); - let header = BloomFilterHeader::read_from_in_protocol(&mut prot)?; + reader: Arc, + ) -> Result, ParquetError> { + let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() { + offset.try_into().map_err(|_| { + ParquetError::General("Bloom filter offset is invalid".to_string()) + })? + } else { + return Ok(None); + }; + + let (header, bitset_offset) = + chunk_read_bloom_filter_header_and_offset(offset, reader.clone())?; match header.algorithm { BloomFilterAlgorithm::BLOCK(_) => { @@ -125,11 +162,8 @@ impl Sbbf { let length: usize = header.num_bytes.try_into().map_err(|_| { ParquetError::General("Bloom filter length is invalid".to_string()) })?; - let mut buffer = vec![0_u8; length]; - reader.read_exact(&mut buffer).map_err(|e| { - ParquetError::General(format!("Could not read bloom filter: {}", e)) - })?; - Ok(Self::new(&buffer)) + let bitset = reader.get_bytes(bitset_offset, length)?; + Ok(Some(Self::new(&bitset))) } #[inline] @@ -139,17 +173,27 @@ impl Sbbf { (((hash >> 32).saturating_mul(self.0.len() as u64)) >> 32) as usize } + /// Insert an [AsBytes] value into the filter + pub fn insert(&mut self, value: T) { + self.insert_hash(hash_as_bytes(value)); + } + /// Insert a hash into the filter - pub fn insert(&mut self, hash: u64) { + fn insert_hash(&mut self, hash: u64) { let block_index = self.hash_to_block_index(hash); let block = &mut self.0[block_index]; block_insert(block, hash as u32); } + /// Check if an [AsBytes] value is probably present or definitely absent in the filter + pub fn check(&self, value: T) -> bool { + self.check_hash(hash_as_bytes(value)) + } + /// Check if a hash is in the filter. May return /// true for values that was never inserted ("false positive") /// but will always return false if a hash has not been inserted. - pub fn check(&self, hash: u64) -> bool { + fn check_hash(&self, hash: u64) -> bool { let block_index = self.hash_to_block_index(hash); let block = &self.0[block_index]; block_check(block, hash as u32) @@ -159,19 +203,24 @@ impl Sbbf { // per spec we use xxHash with seed=0 const SEED: u64 = 0; -pub fn hash_bytes>(value: A) -> u64 { +#[inline] +fn hash_as_bytes(value: A) -> u64 { let mut hasher = XxHash64::with_seed(SEED); - hasher.write(value.as_ref()); + hasher.write(value.as_bytes()); hasher.finish() } #[cfg(test)] mod tests { use super::*; + use crate::format::{ + BloomFilterAlgorithm, BloomFilterCompression, SplitBlockAlgorithm, Uncompressed, + XxHash, + }; #[test] fn test_hash_bytes() { - assert_eq!(hash_bytes(b""), 17241709254077376921); + assert_eq!(hash_as_bytes(""), 17241709254077376921); } #[test] @@ -210,8 +259,37 @@ mod tests { let sbbf = Sbbf::new(bitset); for a in 0..10i64 { let value = format!("a{}", a); - let hash = hash_bytes(value); - assert!(sbbf.check(hash)); + assert!(sbbf.check(value.as_str())); } } + + /// test the assumption that bloom filter header size should not exceed SBBF_HEADER_SIZE_ESTIMATE + /// essentially we are testing that the struct is packed with 4 i32 fields, each can be 1-5 bytes + /// so altogether it'll be 20 bytes at most. + #[test] + fn test_bloom_filter_header_size_assumption() { + let buffer: &[u8; 16] = + &[21, 64, 28, 28, 0, 0, 28, 28, 0, 0, 28, 28, 0, 0, 0, 99]; + let ( + BloomFilterHeader { + algorithm, + compression, + hash, + num_bytes, + }, + read_length, + ) = read_bloom_filter_header_and_length(Bytes::copy_from_slice(buffer)).unwrap(); + assert_eq!(read_length, 15); + assert_eq!( + algorithm, + BloomFilterAlgorithm::BLOCK(SplitBlockAlgorithm {}) + ); + assert_eq!( + compression, + BloomFilterCompression::UNCOMPRESSED(Uncompressed {}) + ); + assert_eq!(hash, BloomFilterHash::XXHASH(XxHash {})); + assert_eq!(num_bytes, 32_i32); + assert_eq!(20, SBBF_HEADER_SIZE_ESTIMATE); + } } diff --git a/parquet/src/file/reader.rs b/parquet/src/file/reader.rs index 70ff37a41e1..325944c2168 100644 --- a/parquet/src/file/reader.rs +++ b/parquet/src/file/reader.rs @@ -21,6 +21,8 @@ use bytes::Bytes; use std::{boxed::Box, io::Read, sync::Arc}; +#[cfg(feature = "bloom")] +use crate::bloom_filter::Sbbf; use crate::column::page::PageIterator; use crate::column::{page::PageReader, reader::ColumnReader}; use crate::errors::{ParquetError, Result}; @@ -143,6 +145,10 @@ pub trait RowGroupReader: Send + Sync { Ok(col_reader) } + #[cfg(feature = "bloom")] + /// Get bloom filter for the `i`th column chunk, if present. + fn get_column_bloom_filter(&self, i: usize) -> Result>; + /// Get iterator of `Row`s from this row group. /// /// Projected schema can be a subset of or equal to the file schema, when it is None, diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index ebe87aca6d5..cb39dd19487 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -22,11 +22,9 @@ use std::collections::VecDeque; use std::io::Cursor; use std::{convert::TryFrom, fs::File, io::Read, path::Path, sync::Arc}; -use crate::format::{PageHeader, PageLocation, PageType}; -use bytes::{Buf, Bytes}; -use thrift::protocol::{TCompactInputProtocol, TSerializable}; - use crate::basic::{Encoding, Type}; +#[cfg(feature = "bloom")] +use crate::bloom_filter::Sbbf; use crate::column::page::{Page, PageMetadata, PageReader}; use crate::compression::{create_codec, Codec}; use crate::errors::{ParquetError, Result}; @@ -38,11 +36,14 @@ use crate::file::{ reader::*, statistics, }; +use crate::format::{PageHeader, PageLocation, PageType}; use crate::record::reader::RowIter; use crate::record::Row; use crate::schema::types::Type as SchemaType; use crate::util::{io::TryClone, memory::ByteBufferPtr}; -// export `SliceableCursor` and `FileSource` publically so clients can +use bytes::{Buf, Bytes}; +use thrift::protocol::{TCompactInputProtocol, TSerializable}; +// export `SliceableCursor` and `FileSource` publicly so clients can // re-use the logic in their own ParquetFileWriter wrappers pub use crate::util::io::FileSource; @@ -387,6 +388,13 @@ impl<'a, R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<' )?)) } + #[cfg(feature = "bloom")] + /// get bloom filter for the `i`th column + fn get_column_bloom_filter(&self, i: usize) -> Result> { + let col = self.metadata.column(i); + Sbbf::read_from_column_chunk(col, self.chunk_reader.clone()) + } + fn get_row_iter(&self, projection: Option) -> Result { RowIter::from_row_group(projection, self) }