From d5458bbdcc67741fdbf814c1fb9b15ae35aebaff Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Sun, 13 Nov 2022 13:13:05 +0000 Subject: [PATCH 01/17] add feature flag --- .github/workflows/arrow.yml | 2 -- parquet/README.md | 1 + 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/.github/workflows/arrow.yml b/.github/workflows/arrow.yml index 2e1c64ebe3a..3e62ed7757e 100644 --- a/.github/workflows/arrow.yml +++ b/.github/workflows/arrow.yml @@ -39,7 +39,6 @@ on: - .github/** jobs: - # test the crate linux-test: name: Test @@ -134,7 +133,6 @@ jobs: - name: Check compilation --features simd --all-targets run: cargo check -p arrow --features simd --all-targets - # test the arrow crate builds against wasm32 in nightly rust wasm32-build: name: Build wasm32 diff --git a/parquet/README.md b/parquet/README.md index d904fc64e74..c9245b08211 100644 --- a/parquet/README.md +++ b/parquet/README.md @@ -41,6 +41,7 @@ However, for historical reasons, this crate uses versions with major numbers gre The `parquet` crate provides the following features which may be enabled in your `Cargo.toml`: - `arrow` (default) - support for reading / writing [`arrow`](https://crates.io/crates/arrow) arrays to / from parquet +- `bloom` (default) - support for [split block bloom filter](https://github.com/apache/parquet-format/blob/master/BloomFilter.md) for reading from / writing to parquet - `async` - support `async` APIs for reading parquet - `json` - support for reading / writing `json` data to / from parquet - `brotli` (default) - support for parquet using `brotli` compression From 2557f2c4a4547675a1d2ea92f7fe8415f5d8db4e Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Sun, 13 Nov 2022 13:24:10 +0000 Subject: [PATCH 02/17] add api --- parquet/src/file/reader.rs | 6 ++++++ parquet/src/file/serialized_reader.rs | 17 ++++++++++++----- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/parquet/src/file/reader.rs b/parquet/src/file/reader.rs index 70ff37a41e1..325944c2168 100644 --- a/parquet/src/file/reader.rs +++ b/parquet/src/file/reader.rs @@ -21,6 +21,8 @@ use bytes::Bytes; use std::{boxed::Box, io::Read, sync::Arc}; +#[cfg(feature = "bloom")] +use crate::bloom_filter::Sbbf; use crate::column::page::PageIterator; use crate::column::{page::PageReader, reader::ColumnReader}; use crate::errors::{ParquetError, Result}; @@ -143,6 +145,10 @@ pub trait RowGroupReader: Send + Sync { Ok(col_reader) } + #[cfg(feature = "bloom")] + /// Get bloom filter for the `i`th column chunk, if present. + fn get_column_bloom_filter(&self, i: usize) -> Result>; + /// Get iterator of `Row`s from this row group. /// /// Projected schema can be a subset of or equal to the file schema, when it is None, diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index ebe87aca6d5..c26409a687a 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -22,11 +22,9 @@ use std::collections::VecDeque; use std::io::Cursor; use std::{convert::TryFrom, fs::File, io::Read, path::Path, sync::Arc}; -use crate::format::{PageHeader, PageLocation, PageType}; -use bytes::{Buf, Bytes}; -use thrift::protocol::{TCompactInputProtocol, TSerializable}; - use crate::basic::{Encoding, Type}; +#[cfg(feature = "bloom")] +use crate::bloom_filter::Sbbf; use crate::column::page::{Page, PageMetadata, PageReader}; use crate::compression::{create_codec, Codec}; use crate::errors::{ParquetError, Result}; @@ -38,11 +36,14 @@ use crate::file::{ reader::*, statistics, }; +use crate::format::{PageHeader, PageLocation, PageType}; use crate::record::reader::RowIter; use crate::record::Row; use crate::schema::types::Type as SchemaType; use crate::util::{io::TryClone, memory::ByteBufferPtr}; -// export `SliceableCursor` and `FileSource` publically so clients can +use bytes::{Buf, Bytes}; +use thrift::protocol::{TCompactInputProtocol, TSerializable}; +// export `SliceableCursor` and `FileSource` publicly so clients can // re-use the logic in their own ParquetFileWriter wrappers pub use crate::util::io::FileSource; @@ -387,6 +388,12 @@ impl<'a, R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<' )?)) } + #[cfg(feature = "bloom")] + /// get bloom filter for the ith column + fn get_column_bloom_filter(&self, i: usize) -> Result> { + todo!() + } + fn get_row_iter(&self, projection: Option) -> Result { RowIter::from_row_group(projection, self) } From 88cea80524aa2a9c08f0ae484cacead6ad2fa9bd Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Sun, 13 Nov 2022 14:44:36 +0000 Subject: [PATCH 03/17] fix reading with chunk reader --- parquet/src/bloom_filter/mod.rs | 58 ++++++++++++++++++++------- parquet/src/file/serialized_reader.rs | 5 ++- 2 files changed, 46 insertions(+), 17 deletions(-) diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs index adfd87307ac..831108c7730 100644 --- a/parquet/src/bloom_filter/mod.rs +++ b/parquet/src/bloom_filter/mod.rs @@ -20,11 +20,14 @@ use crate::errors::ParquetError; use crate::file::metadata::ColumnChunkMetaData; +use crate::file::reader::ChunkReader; use crate::format::{ BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash, BloomFilterHeader, }; +use bytes::{Buf, BufMut, BytesMut}; use std::hash::Hasher; use std::io::{Read, Seek, SeekFrom}; +use std::sync::Arc; use thrift::protocol::{TCompactInputProtocol, TSerializable}; use twox_hash::XxHash64; @@ -94,17 +97,45 @@ impl Sbbf { Self(data) } - pub fn read_from_column_chunk( + pub fn read_from_column_chunk( column_metadata: &ColumnChunkMetaData, - mut reader: &mut R, - ) -> Result { - let offset = column_metadata.bloom_filter_offset().ok_or_else(|| { - ParquetError::General("Bloom filter offset is not set".to_string()) - })? as u64; - reader.seek(SeekFrom::Start(offset))?; - // deserialize header - let mut prot = TCompactInputProtocol::new(&mut reader); - let header = BloomFilterHeader::read_from_in_protocol(&mut prot)?; + reader: Arc, + ) -> Result, ParquetError> { + let offset: usize = if let Some(offset) = column_metadata.bloom_filter_offset() { + offset.try_into().map_err(|_| { + ParquetError::General("Bloom filter offset is invalid".to_string()) + })? + } else { + return Ok(None); + }; + + // because we do not know in advance what the TCompactInputProtocol will read, we have to + // loop read until we can parse the header. Allocate at least 128 bytes to start with + let mut buffer = BytesMut::with_capacity(128); + let mut start = offset; + let header: BloomFilterHeader; + let bitset_offset: usize; + // this size should not be too large to not to hit short read too early (although unlikely) + // but also not to small to ensure cache efficiency + let step_size = 32_usize; + loop { + let batch = reader.get_bytes(offset as u64, step_size)?; + buffer.put(batch); + // need to clone as we read from the very beginning of the buffer each time + let buffer = buffer.clone(); + let mut buf_reader = buffer.reader(); + // try to deserialize header + let mut prot = TCompactInputProtocol::new(&mut buf_reader); + if let Ok(h) = BloomFilterHeader::read_from_in_protocol(&mut prot) { + header = h; + let buffer = buf_reader.into_inner(); + bitset_offset = start + step_size - buffer.remaining(); + break; + } else { + // continue to try by reading another batch + start += step_size; + } + } match header.algorithm { BloomFilterAlgorithm::BLOCK(_) => { @@ -125,11 +156,8 @@ impl Sbbf { let length: usize = header.num_bytes.try_into().map_err(|_| { ParquetError::General("Bloom filter length is invalid".to_string()) })?; - let mut buffer = vec![0_u8; length]; - reader.read_exact(&mut buffer).map_err(|e| { - ParquetError::General(format!("Could not read bloom filter: {}", e)) - })?; - Ok(Self::new(&buffer)) + let bitset = reader.get_bytes(bitset_offset as u64, length)?; + Ok(Some(Self::new(&bitset))) } #[inline] diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index c26409a687a..cb39dd19487 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -389,9 +389,10 @@ impl<'a, R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<' } #[cfg(feature = "bloom")] - /// get bloom filter for the ith column + /// get bloom filter for the `i`th column fn get_column_bloom_filter(&self, i: usize) -> Result> { - todo!() + let col = self.metadata.column(i); + Sbbf::read_from_column_chunk(col, self.chunk_reader.clone()) } fn get_row_iter(&self, projection: Option) -> Result { From efd89916aeaff20eafbbe2c02f007f6336f7f278 Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Sun, 13 Nov 2022 15:00:04 +0000 Subject: [PATCH 04/17] refactor --- parquet/src/bloom_filter/mod.rs | 63 +++++++++++++++++++-------------- 1 file changed, 36 insertions(+), 27 deletions(-) diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs index 831108c7730..12b0fe90851 100644 --- a/parquet/src/bloom_filter/mod.rs +++ b/parquet/src/bloom_filter/mod.rs @@ -82,6 +82,40 @@ fn block_check(block: &Block, hash: u32) -> bool { /// A split block Bloom filter pub struct Sbbf(Vec); +// this size should not be too large to not to hit short read too early (although unlikely) +// but also not to small to ensure cache efficiency, this is essential a "guess" of the header +// size +const STEP_SIZE: usize = 32; + +/// given an initial offset, and a chunk reader, try to read out a bloom filter header by trying +/// one or more iterations, returns both the header and the offset after it (for bitset). +fn chunk_read_bloom_filter_header_and_offset( + offset: usize, + reader: Arc, +) -> Result<(BloomFilterHeader, usize), ParquetError> { + // because we do not know in advance what the TCompactInputProtocol will read, we have to + // loop read until we can parse the header. Allocate at least 128 bytes to start with + let mut buffer = BytesMut::with_capacity(128); + let mut start = offset; + loop { + let batch = reader.get_bytes(offset as u64, STEP_SIZE)?; + buffer.put(batch); + // need to clone as we read from the very beginning of the buffer each time + let buffer = buffer.clone(); + let mut buf_reader = buffer.reader(); + // try to deserialize header + let mut prot = TCompactInputProtocol::new(&mut buf_reader); + if let Ok(h) = BloomFilterHeader::read_from_in_protocol(&mut prot) { + let buffer = buf_reader.into_inner(); + let bitset_offset = start + STEP_SIZE - buffer.remaining(); + return Ok((h, bitset_offset)); + } else { + // continue to try by reading another batch + start += STEP_SIZE; + } + } +} + impl Sbbf { fn new(bitset: &[u8]) -> Self { let data = bitset @@ -109,33 +143,8 @@ impl Sbbf { return Ok(None); }; - // because we do not know in advance what the TCompactInputProtocol will read, we have to - // loop read until we can parse the header. Allocate at least 128 bytes to start with - let mut buffer = BytesMut::with_capacity(128); - let mut start = offset; - let header: BloomFilterHeader; - let bitset_offset: usize; - // this size should not be too large to not to hit short read too early (although unlikely) - // but also not to small to ensure cache efficiency - let step_size = 32_usize; - loop { - let batch = reader.get_bytes(offset as u64, step_size)?; - buffer.put(batch); - // need to clone as we read from the very beginning of the buffer each time - let buffer = buffer.clone(); - let mut buf_reader = buffer.reader(); - // try to deserialize header - let mut prot = TCompactInputProtocol::new(&mut buf_reader); - if let Ok(h) = BloomFilterHeader::read_from_in_protocol(&mut prot) { - header = h; - let buffer = buf_reader.into_inner(); - bitset_offset = start + step_size - buffer.remaining(); - break; - } else { - // continue to try by reading another batch - start += step_size; - } - } + let (header, bitset_offset) = + chunk_read_bloom_filter_header_and_offset(offset, reader.clone())?; match header.algorithm { BloomFilterAlgorithm::BLOCK(_) => { From 5f4deae63fe90f10d7951735e43f7fc74ed8668c Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Mon, 14 Nov 2022 20:19:14 +0800 Subject: [PATCH 05/17] add a binary to demo --- parquet/src/bin/parquet-show-bloom-filter.rs | 113 +++++++++++++++++++ 1 file changed, 113 insertions(+) create mode 100644 parquet/src/bin/parquet-show-bloom-filter.rs diff --git a/parquet/src/bin/parquet-show-bloom-filter.rs b/parquet/src/bin/parquet-show-bloom-filter.rs new file mode 100644 index 00000000000..8b5b22f2adc --- /dev/null +++ b/parquet/src/bin/parquet-show-bloom-filter.rs @@ -0,0 +1,113 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Binary file to read bloom filter data from a Parquet file. +//! +//! # Install +//! +//! `parquet-show-bloom-filter` can be installed using `cargo`: +//! ``` +//! cargo install parquet --features=cli +//! ``` +//! After this `parquet-show-bloom-filter` should be available: +//! ``` +//! parquet-show-bloom-filter --file-name XYZ.parquet --column id --values a +//! ``` +//! +//! The binary can also be built from the source code and run as follows: +//! ``` +//! cargo run --features=cli --bin parquet-show-bloom-filter -- --file-name XYZ.parquet --column id --values a +//! ``` + +extern crate parquet; +use clap::Parser; +use parquet::bloom_filter::hash_bytes; +use parquet::file::reader::{FileReader, SerializedFileReader}; +use std::iter; +use std::{fs::File, path::Path}; + +#[derive(Debug, Parser)] +#[clap(author, version, about("Binary file to read bloom filter data from a Parquet file"), long_about = None)] +struct Args { + #[clap(short, long, help("Path to a parquet file, or - for stdin"))] + file_name: String, + #[clap( + short, + long, + help("Check the bloom filter indexes for the given column") + )] + column: String, + #[clap( + short, + long, + help("Check if the given values match bloom filter"), + required = true + )] + values: Vec, +} + +fn main() { + let args = Args::parse(); + let file_name = args.file_name; + let path = Path::new(&file_name); + let file = File::open(path).expect("Unable to open file"); + + let file_reader = + SerializedFileReader::new(file).expect("Unable to open file as Parquet"); + let metadata = file_reader.metadata(); + for (ri, row_group) in metadata.row_groups().iter().enumerate() { + println!("Row group #{}", ri); + println!("{}", iter::repeat("=").take(80).collect::()); + if let Some((column_index, _)) = row_group + .columns() + .iter() + .enumerate() + .find(|(_, column)| column.column_path().string() == args.column) + { + let row_group_reader = file_reader + .get_row_group(ri) + .expect("Unable to read row group"); + if let Some(sbbf) = row_group_reader + .get_column_bloom_filter(column_index) + .expect("Failed to parse bloom filter") + { + args.values.iter().for_each(|value| { + println!( + "Value {} is {} in bloom filter", + value, + if sbbf.check(hash_bytes(value)) { + "present" + } else { + "absent" + } + ) + }); + } + } else { + println!( + "No column named {} found, candidate columns are: {}", + args.column, + row_group + .columns() + .iter() + .map(|c| c.column_path().string()) + .collect::>() + .join(", ") + ); + } + } +} From c66d7a00a66cd489dabf9b272d55a96323415294 Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Mon, 14 Nov 2022 20:21:37 +0800 Subject: [PATCH 06/17] add bin --- parquet/Cargo.toml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index a5d43bf54bf..50fdac5f67a 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -113,6 +113,10 @@ required-features = ["cli"] name = "parquet-fromcsv" required-features = ["arrow", "cli"] +[[bin]] +name = "parquet-show-bloom-filter" +required-features = ["cli"] + [[bench]] name = "arrow_writer" required-features = ["arrow"] From 7a51342e856a3110fa0db3bd4afc45f44b10969b Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Mon, 14 Nov 2022 20:32:11 +0800 Subject: [PATCH 07/17] remove unused --- parquet/src/bloom_filter/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs index 12b0fe90851..5f0e9fa84c5 100644 --- a/parquet/src/bloom_filter/mod.rs +++ b/parquet/src/bloom_filter/mod.rs @@ -26,7 +26,6 @@ use crate::format::{ }; use bytes::{Buf, BufMut, BytesMut}; use std::hash::Hasher; -use std::io::{Read, Seek, SeekFrom}; use std::sync::Arc; use thrift::protocol::{TCompactInputProtocol, TSerializable}; use twox_hash::XxHash64; From fa3639ccad959ad9dd61f9942ca73835b2ffa63d Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Mon, 14 Nov 2022 21:08:43 +0800 Subject: [PATCH 08/17] fix clippy --- .github/workflows/arrow.yml | 2 ++ parquet/src/bin/parquet-show-bloom-filter.rs | 3 +-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/.github/workflows/arrow.yml b/.github/workflows/arrow.yml index 3e62ed7757e..2e1c64ebe3a 100644 --- a/.github/workflows/arrow.yml +++ b/.github/workflows/arrow.yml @@ -39,6 +39,7 @@ on: - .github/** jobs: + # test the crate linux-test: name: Test @@ -133,6 +134,7 @@ jobs: - name: Check compilation --features simd --all-targets run: cargo check -p arrow --features simd --all-targets + # test the arrow crate builds against wasm32 in nightly rust wasm32-build: name: Build wasm32 diff --git a/parquet/src/bin/parquet-show-bloom-filter.rs b/parquet/src/bin/parquet-show-bloom-filter.rs index 8b5b22f2adc..5f78249071c 100644 --- a/parquet/src/bin/parquet-show-bloom-filter.rs +++ b/parquet/src/bin/parquet-show-bloom-filter.rs @@ -37,7 +37,6 @@ extern crate parquet; use clap::Parser; use parquet::bloom_filter::hash_bytes; use parquet::file::reader::{FileReader, SerializedFileReader}; -use std::iter; use std::{fs::File, path::Path}; #[derive(Debug, Parser)] @@ -71,7 +70,7 @@ fn main() { let metadata = file_reader.metadata(); for (ri, row_group) in metadata.row_groups().iter().enumerate() { println!("Row group #{}", ri); - println!("{}", iter::repeat("=").take(80).collect::()); + println!("=".repeat(80)); if let Some((column_index, _)) = row_group .columns() .iter() From f8b7a27812256d41ddd5b16320ede76cf5e1d5a4 Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Mon, 14 Nov 2022 21:12:12 +0800 Subject: [PATCH 09/17] adjust byte size --- parquet/src/bin/parquet-show-bloom-filter.rs | 2 +- parquet/src/bloom_filter/mod.rs | 10 ++++++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/parquet/src/bin/parquet-show-bloom-filter.rs b/parquet/src/bin/parquet-show-bloom-filter.rs index 5f78249071c..150629bd7fe 100644 --- a/parquet/src/bin/parquet-show-bloom-filter.rs +++ b/parquet/src/bin/parquet-show-bloom-filter.rs @@ -70,7 +70,7 @@ fn main() { let metadata = file_reader.metadata(); for (ri, row_group) in metadata.row_groups().iter().enumerate() { println!("Row group #{}", ri); - println!("=".repeat(80)); + println!("{}", "=".repeat(80)); if let Some((column_index, _)) = row_group .columns() .iter() diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs index 5f0e9fa84c5..c75953b5c61 100644 --- a/parquet/src/bloom_filter/mod.rs +++ b/parquet/src/bloom_filter/mod.rs @@ -83,8 +83,8 @@ pub struct Sbbf(Vec); // this size should not be too large to not to hit short read too early (although unlikely) // but also not to small to ensure cache efficiency, this is essential a "guess" of the header -// size -const STEP_SIZE: usize = 32; +// size. In the demo test the size is 15 bytes. +const STEP_SIZE: usize = 16; /// given an initial offset, and a chunk reader, try to read out a bloom filter header by trying /// one or more iterations, returns both the header and the offset after it (for bitset). @@ -107,6 +107,12 @@ fn chunk_read_bloom_filter_header_and_offset( if let Ok(h) = BloomFilterHeader::read_from_in_protocol(&mut prot) { let buffer = buf_reader.into_inner(); let bitset_offset = start + STEP_SIZE - buffer.remaining(); + println!( + "offset: {}, bitset_offset: {}, size: {}", + offset, + bitset_offset, + bitset_offset - offset + ); return Ok((h, bitset_offset)); } else { // continue to try by reading another batch From 1bc73cd46d8c1e5e95b81f19b80f740722e0d34d Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Mon, 14 Nov 2022 22:02:35 +0800 Subject: [PATCH 10/17] update read method --- parquet/src/bloom_filter/mod.rs | 23 +++++------------------ 1 file changed, 5 insertions(+), 18 deletions(-) diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs index c75953b5c61..8d95db07940 100644 --- a/parquet/src/bloom_filter/mod.rs +++ b/parquet/src/bloom_filter/mod.rs @@ -24,7 +24,7 @@ use crate::file::reader::ChunkReader; use crate::format::{ BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash, BloomFilterHeader, }; -use bytes::{Buf, BufMut, BytesMut}; +use bytes::Buf; use std::hash::Hasher; use std::sync::Arc; use thrift::protocol::{TCompactInputProtocol, TSerializable}; @@ -92,31 +92,18 @@ fn chunk_read_bloom_filter_header_and_offset( offset: usize, reader: Arc, ) -> Result<(BloomFilterHeader, usize), ParquetError> { - // because we do not know in advance what the TCompactInputProtocol will read, we have to - // loop read until we can parse the header. Allocate at least 128 bytes to start with - let mut buffer = BytesMut::with_capacity(128); - let mut start = offset; + let mut length = STEP_SIZE; loop { - let batch = reader.get_bytes(offset as u64, STEP_SIZE)?; - buffer.put(batch); - // need to clone as we read from the very beginning of the buffer each time - let buffer = buffer.clone(); + let buffer = reader.get_bytes(offset as u64, length)?; let mut buf_reader = buffer.reader(); - // try to deserialize header let mut prot = TCompactInputProtocol::new(&mut buf_reader); if let Ok(h) = BloomFilterHeader::read_from_in_protocol(&mut prot) { let buffer = buf_reader.into_inner(); - let bitset_offset = start + STEP_SIZE - buffer.remaining(); - println!( - "offset: {}, bitset_offset: {}, size: {}", - offset, - bitset_offset, - bitset_offset - offset - ); + let bitset_offset = offset + length - buffer.remaining(); return Ok((h, bitset_offset)); } else { // continue to try by reading another batch - start += STEP_SIZE; + length += STEP_SIZE; } } } From f0041d363a20dff1bb65f566f9c958de2f733775 Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Mon, 14 Nov 2022 22:03:50 +0800 Subject: [PATCH 11/17] parquet-show-bloom-filter with bloom feature required --- parquet/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 50fdac5f67a..fc7c8218ad0 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -115,7 +115,7 @@ required-features = ["arrow", "cli"] [[bin]] name = "parquet-show-bloom-filter" -required-features = ["cli"] +required-features = ["cli", "bloom"] [[bench]] name = "arrow_writer" From 3ec6e292c221eb8e88ddb35aa43925d624afe57c Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Tue, 15 Nov 2022 09:39:16 +0800 Subject: [PATCH 12/17] remove extern crate --- parquet/src/bin/parquet-read.rs | 2 -- parquet/src/bin/parquet-rowcount.rs | 1 - parquet/src/bin/parquet-schema.rs | 1 - parquet/src/bin/parquet-show-bloom-filter.rs | 1 - 4 files changed, 5 deletions(-) diff --git a/parquet/src/bin/parquet-read.rs b/parquet/src/bin/parquet-read.rs index cf8009956e2..117f9ee0b17 100644 --- a/parquet/src/bin/parquet-read.rs +++ b/parquet/src/bin/parquet-read.rs @@ -36,8 +36,6 @@ //! Note that `parquet-read` reads full file schema, no projection or filtering is //! applied. -extern crate parquet; - use clap::Parser; use parquet::file::reader::{FileReader, SerializedFileReader}; use parquet::record::Row; diff --git a/parquet/src/bin/parquet-rowcount.rs b/parquet/src/bin/parquet-rowcount.rs index 491f582c510..5069d4b2543 100644 --- a/parquet/src/bin/parquet-rowcount.rs +++ b/parquet/src/bin/parquet-rowcount.rs @@ -36,7 +36,6 @@ //! Note that `parquet-rowcount` reads full file schema, no projection or filtering is //! applied. -extern crate parquet; use clap::Parser; use parquet::file::reader::{FileReader, SerializedFileReader}; use std::{fs::File, path::Path}; diff --git a/parquet/src/bin/parquet-schema.rs b/parquet/src/bin/parquet-schema.rs index cd8e7692203..ff7798a91cd 100644 --- a/parquet/src/bin/parquet-schema.rs +++ b/parquet/src/bin/parquet-schema.rs @@ -36,7 +36,6 @@ //! Note that `verbose` is an optional boolean flag that allows to print schema only, //! when not provided or print full file metadata when provided. -extern crate parquet; use clap::Parser; use parquet::{ file::reader::{FileReader, SerializedFileReader}, diff --git a/parquet/src/bin/parquet-show-bloom-filter.rs b/parquet/src/bin/parquet-show-bloom-filter.rs index 150629bd7fe..888bdeca7c6 100644 --- a/parquet/src/bin/parquet-show-bloom-filter.rs +++ b/parquet/src/bin/parquet-show-bloom-filter.rs @@ -33,7 +33,6 @@ //! cargo run --features=cli --bin parquet-show-bloom-filter -- --file-name XYZ.parquet --column id --values a //! ``` -extern crate parquet; use clap::Parser; use parquet::bloom_filter::hash_bytes; use parquet::file::reader::{FileReader, SerializedFileReader}; From e7a33b693b4878cc1e9e1c2230b3f2de99210c0c Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Tue, 15 Nov 2022 12:08:11 +0800 Subject: [PATCH 13/17] get rid of loop read --- parquet/src/bloom_filter/mod.rs | 36 +++++++++++++-------------------- 1 file changed, 14 insertions(+), 22 deletions(-) diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs index 8d95db07940..2828f93fcc5 100644 --- a/parquet/src/bloom_filter/mod.rs +++ b/parquet/src/bloom_filter/mod.rs @@ -81,31 +81,23 @@ fn block_check(block: &Block, hash: u32) -> bool { /// A split block Bloom filter pub struct Sbbf(Vec); -// this size should not be too large to not to hit short read too early (although unlikely) -// but also not to small to ensure cache efficiency, this is essential a "guess" of the header -// size. In the demo test the size is 15 bytes. -const STEP_SIZE: usize = 16; +const SBBF_HEADER_SIZE_ESTIMATE: usize = 20; /// given an initial offset, and a chunk reader, try to read out a bloom filter header by trying /// one or more iterations, returns both the header and the offset after it (for bitset). fn chunk_read_bloom_filter_header_and_offset( - offset: usize, + offset: u64, reader: Arc, -) -> Result<(BloomFilterHeader, usize), ParquetError> { - let mut length = STEP_SIZE; - loop { - let buffer = reader.get_bytes(offset as u64, length)?; - let mut buf_reader = buffer.reader(); - let mut prot = TCompactInputProtocol::new(&mut buf_reader); - if let Ok(h) = BloomFilterHeader::read_from_in_protocol(&mut prot) { - let buffer = buf_reader.into_inner(); - let bitset_offset = offset + length - buffer.remaining(); - return Ok((h, bitset_offset)); - } else { - // continue to try by reading another batch - length += STEP_SIZE; - } - } +) -> Result<(BloomFilterHeader, u64), ParquetError> { + let buffer = reader.get_bytes(offset as u64, SBBF_HEADER_SIZE_ESTIMATE)?; + let mut buf_reader = buffer.reader(); + let mut prot = TCompactInputProtocol::new(&mut buf_reader); + let header = BloomFilterHeader::read_from_in_protocol(&mut prot).map_err(|e| { + ParquetError::General(format!("Could not read bloom filter header: {}", e)) + })?; + let bitset_offset = + offset + (SBBF_HEADER_SIZE_ESTIMATE - buf_reader.into_inner().remaining()) as u64; + Ok((header, bitset_offset)) } impl Sbbf { @@ -127,7 +119,7 @@ impl Sbbf { column_metadata: &ColumnChunkMetaData, reader: Arc, ) -> Result, ParquetError> { - let offset: usize = if let Some(offset) = column_metadata.bloom_filter_offset() { + let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() { offset.try_into().map_err(|_| { ParquetError::General("Bloom filter offset is invalid".to_string()) })? @@ -157,7 +149,7 @@ impl Sbbf { let length: usize = header.num_bytes.try_into().map_err(|_| { ParquetError::General("Bloom filter length is invalid".to_string()) })?; - let bitset = reader.get_bytes(bitset_offset as u64, length)?; + let bitset = reader.get_bytes(bitset_offset, length)?; Ok(Some(Self::new(&bitset))) } From bd2fb2fd57e542a9b20c275dd1ebe9dff793f6e9 Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Tue, 15 Nov 2022 13:01:30 +0800 Subject: [PATCH 14/17] refactor to test --- parquet/src/bloom_filter/mod.rs | 58 +++++++++++++++++++++++++++++---- 1 file changed, 52 insertions(+), 6 deletions(-) diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs index 2828f93fcc5..befec41eae1 100644 --- a/parquet/src/bloom_filter/mod.rs +++ b/parquet/src/bloom_filter/mod.rs @@ -24,7 +24,7 @@ use crate::file::reader::ChunkReader; use crate::format::{ BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash, BloomFilterHeader, }; -use bytes::Buf; +use bytes::{Buf, Bytes}; use std::hash::Hasher; use std::sync::Arc; use thrift::protocol::{TCompactInputProtocol, TSerializable}; @@ -83,21 +83,33 @@ pub struct Sbbf(Vec); const SBBF_HEADER_SIZE_ESTIMATE: usize = 20; -/// given an initial offset, and a chunk reader, try to read out a bloom filter header by trying -/// one or more iterations, returns both the header and the offset after it (for bitset). +/// given an initial offset, and a [ChunkReader], try to read out a bloom filter header and return +/// both the header and the offset after it (for bitset). fn chunk_read_bloom_filter_header_and_offset( offset: u64, reader: Arc, ) -> Result<(BloomFilterHeader, u64), ParquetError> { let buffer = reader.get_bytes(offset as u64, SBBF_HEADER_SIZE_ESTIMATE)?; + let (header, length) = read_bloom_filter_header_and_length(buffer)?; + Ok((header, offset + length)) +} + +/// given a [Bytes] buffer, try to read out a bloom filter header and return both the header and +/// length of the header. +#[inline] +fn read_bloom_filter_header_and_length( + buffer: Bytes, +) -> Result<(BloomFilterHeader, u64), ParquetError> { + let total_length = buffer.len(); let mut buf_reader = buffer.reader(); let mut prot = TCompactInputProtocol::new(&mut buf_reader); let header = BloomFilterHeader::read_from_in_protocol(&mut prot).map_err(|e| { ParquetError::General(format!("Could not read bloom filter header: {}", e)) })?; - let bitset_offset = - offset + (SBBF_HEADER_SIZE_ESTIMATE - buf_reader.into_inner().remaining()) as u64; - Ok((header, bitset_offset)) + Ok(( + header, + (total_length - buf_reader.into_inner().remaining()) as u64, + )) } impl Sbbf { @@ -189,6 +201,10 @@ pub fn hash_bytes>(value: A) -> u64 { #[cfg(test)] mod tests { use super::*; + use crate::format::{ + BloomFilterAlgorithm, BloomFilterCompression, SplitBlockAlgorithm, Uncompressed, + XxHash, + }; #[test] fn test_hash_bytes() { @@ -235,4 +251,34 @@ mod tests { assert!(sbbf.check(hash)); } } + + /// test the assumption that bloom filter header size should not exceed SBBF_HEADER_SIZE_ESTIMATE + /// essentially we are testing that the struct is packed with 4 i32 fields, each can be 1-5 bytes + /// so altogether it'll be 20 bytes at most. + #[test] + fn test_bloom_filter_header_size_assumption() { + let buffer: &[u8; 16] = + &[21, 64, 28, 28, 0, 0, 28, 28, 0, 0, 28, 28, 0, 0, 0, 99]; + let ( + BloomFilterHeader { + algorithm, + compression, + hash, + num_bytes, + }, + read_length, + ) = read_bloom_filter_header_and_length(Bytes::copy_from_slice(buffer)).unwrap(); + assert_eq!(read_length, 15); + assert_eq!( + algorithm, + BloomFilterAlgorithm::BLOCK(SplitBlockAlgorithm {}) + ); + assert_eq!( + compression, + BloomFilterCompression::UNCOMPRESSED(Uncompressed {}) + ); + assert_eq!(hash, BloomFilterHash::XXHASH(XxHash {})); + assert_eq!(num_bytes, 32_i32); + assert_eq!(20, SBBF_HEADER_SIZE_ESTIMATE); + } } From a9480ad5575e480fc3c467d0939f93981ebb3ce5 Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Tue, 15 Nov 2022 13:20:08 +0800 Subject: [PATCH 15/17] rework api --- parquet/src/bin/parquet-show-bloom-filter.rs | 4 ++-- parquet/src/bloom_filter/mod.rs | 25 ++++++++++++++------ 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/parquet/src/bin/parquet-show-bloom-filter.rs b/parquet/src/bin/parquet-show-bloom-filter.rs index 888bdeca7c6..af2c87da5e6 100644 --- a/parquet/src/bin/parquet-show-bloom-filter.rs +++ b/parquet/src/bin/parquet-show-bloom-filter.rs @@ -34,7 +34,7 @@ //! ``` use clap::Parser; -use parquet::bloom_filter::hash_bytes; +use parquet::data_type::AsBytes; use parquet::file::reader::{FileReader, SerializedFileReader}; use std::{fs::File, path::Path}; @@ -87,7 +87,7 @@ fn main() { println!( "Value {} is {} in bloom filter", value, - if sbbf.check(hash_bytes(value)) { + if sbbf.check(value.as_str()) { "present" } else { "absent" diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs index befec41eae1..4944a93f848 100644 --- a/parquet/src/bloom_filter/mod.rs +++ b/parquet/src/bloom_filter/mod.rs @@ -18,6 +18,7 @@ //! Bloom filter implementation specific to Parquet, as described //! in the [spec](https://github.com/apache/parquet-format/blob/master/BloomFilter.md) +use crate::data_type::AsBytes; use crate::errors::ParquetError; use crate::file::metadata::ColumnChunkMetaData; use crate::file::reader::ChunkReader; @@ -172,17 +173,27 @@ impl Sbbf { (((hash >> 32).saturating_mul(self.0.len() as u64)) >> 32) as usize } + /// Insert an [AsBytes] value into the filter + pub fn insert(&mut self, value: T) { + self.insert_hash(hash_as_bytes(value)); + } + /// Insert a hash into the filter - pub fn insert(&mut self, hash: u64) { + fn insert_hash(&mut self, hash: u64) { let block_index = self.hash_to_block_index(hash); let block = &mut self.0[block_index]; block_insert(block, hash as u32); } + /// Check if an [AsBytes] value is probably present or definitely absent in the filter + pub fn check(&self, value: T) -> bool { + self.check_hash(hash_as_bytes(value)) + } + /// Check if a hash is in the filter. May return /// true for values that was never inserted ("false positive") /// but will always return false if a hash has not been inserted. - pub fn check(&self, hash: u64) -> bool { + fn check_hash(&self, hash: u64) -> bool { let block_index = self.hash_to_block_index(hash); let block = &self.0[block_index]; block_check(block, hash as u32) @@ -192,9 +203,10 @@ impl Sbbf { // per spec we use xxHash with seed=0 const SEED: u64 = 0; -pub fn hash_bytes>(value: A) -> u64 { +#[inline] +fn hash_as_bytes(value: A) -> u64 { let mut hasher = XxHash64::with_seed(SEED); - hasher.write(value.as_ref()); + hasher.write(value.as_bytes()); hasher.finish() } @@ -208,7 +220,7 @@ mod tests { #[test] fn test_hash_bytes() { - assert_eq!(hash_bytes(b""), 17241709254077376921); + assert_eq!(hash_as_bytes(""), 17241709254077376921); } #[test] @@ -247,8 +259,7 @@ mod tests { let sbbf = Sbbf::new(bitset); for a in 0..10i64 { let value = format!("a{}", a); - let hash = hash_bytes(value); - assert!(sbbf.check(hash)); + assert!(sbbf.check(value.as_str())); } } From 86673694faddda285b5e82d83360ae6625cf70b4 Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Tue, 15 Nov 2022 16:28:23 +0800 Subject: [PATCH 16/17] remove unused trait --- parquet/src/bin/parquet-show-bloom-filter.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/parquet/src/bin/parquet-show-bloom-filter.rs b/parquet/src/bin/parquet-show-bloom-filter.rs index af2c87da5e6..2af4583a5aa 100644 --- a/parquet/src/bin/parquet-show-bloom-filter.rs +++ b/parquet/src/bin/parquet-show-bloom-filter.rs @@ -34,7 +34,6 @@ //! ``` use clap::Parser; -use parquet::data_type::AsBytes; use parquet::file::reader::{FileReader, SerializedFileReader}; use std::{fs::File, path::Path}; From 415c6fbb60e873dd2f3d650376a70ceb8b39410b Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Tue, 15 Nov 2022 16:29:58 +0800 Subject: [PATCH 17/17] update help --- parquet/src/bin/parquet-show-bloom-filter.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/parquet/src/bin/parquet-show-bloom-filter.rs b/parquet/src/bin/parquet-show-bloom-filter.rs index 2af4583a5aa..a4dbdbe67de 100644 --- a/parquet/src/bin/parquet-show-bloom-filter.rs +++ b/parquet/src/bin/parquet-show-bloom-filter.rs @@ -40,7 +40,7 @@ use std::{fs::File, path::Path}; #[derive(Debug, Parser)] #[clap(author, version, about("Binary file to read bloom filter data from a Parquet file"), long_about = None)] struct Args { - #[clap(short, long, help("Path to a parquet file, or - for stdin"))] + #[clap(short, long, help("Path to the parquet file"))] file_name: String, #[clap( short, @@ -51,7 +51,7 @@ struct Args { #[clap( short, long, - help("Check if the given values match bloom filter"), + help("Check if the given values match bloom filter, the values will be evaluated as strings"), required = true )] values: Vec,