From ec3b5d0bd808cc13eed8b5f85bed6a3377a2780f Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Wed, 16 Nov 2022 13:46:48 +0800 Subject: [PATCH] eager reading --- parquet/src/file/metadata.rs | 2 +- parquet/src/file/properties.rs | 21 +++++++++++++++++++++ parquet/src/file/reader.rs | 2 +- parquet/src/file/serialized_reader.rs | 27 +++++++++++++++++++-------- 4 files changed, 42 insertions(+), 10 deletions(-) diff --git a/parquet/src/file/metadata.rs b/parquet/src/file/metadata.rs index 895776a8a42..2ba50fa31a1 100644 --- a/parquet/src/file/metadata.rs +++ b/parquet/src/file/metadata.rs @@ -236,7 +236,7 @@ pub struct RowGroupMetaData { } impl RowGroupMetaData { - /// Returns builer for row group metadata. + /// Returns builder for row group metadata. pub fn builder(schema_descr: SchemaDescPtr) -> RowGroupMetaDataBuilder { RowGroupMetaDataBuilder::new(schema_descr) } diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index 95ca6bbb1ec..cf5024e9099 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -685,6 +685,7 @@ pub type ReaderPropertiesPtr = Arc; /// Use [`ReaderPropertiesBuilder`] to assemble these properties. pub struct ReaderProperties { codec_options: CodecOptions, + read_bloom_filter: bool, } impl ReaderProperties { @@ -697,11 +698,17 @@ impl ReaderProperties { pub(crate) fn codec_options(&self) -> &CodecOptions { &self.codec_options } + + /// Returns whether to read bloom filter + pub(crate) fn read_bloom_filter(&self) -> bool { + self.read_bloom_filter + } } /// Reader properties builder. pub struct ReaderPropertiesBuilder { codec_options_builder: CodecOptionsBuilder, + read_bloom_filter: Option, } /// Reader properties builder. @@ -710,6 +717,7 @@ impl ReaderPropertiesBuilder { fn with_defaults() -> Self { Self { codec_options_builder: CodecOptionsBuilder::default(), + read_bloom_filter: None, } } @@ -717,6 +725,7 @@ impl ReaderPropertiesBuilder { pub fn build(self) -> ReaderProperties { ReaderProperties { codec_options: self.codec_options_builder.build(), + read_bloom_filter: self.read_bloom_filter.unwrap_or(true), } } @@ -734,6 +743,17 @@ impl ReaderPropertiesBuilder { .set_backward_compatible_lz4(value); self } + + /// Enable/disable reading bloom filter + /// + /// If reading bloom filter is enabled, bloom filter will be read from the file. + /// If reading bloom filter is disabled, bloom filter will not be read from the file. + /// + /// By default bloom filter is set to be read. + pub fn set_read_bloom_filter(mut self, value: bool) -> Self { + self.read_bloom_filter = Some(value); + self + } } #[cfg(test)] @@ -944,6 +964,7 @@ mod tests { .build(); assert_eq!(props.codec_options(), &codec_options); + assert_eq!(props.read_bloom_filter(), true); } #[test] diff --git a/parquet/src/file/reader.rs b/parquet/src/file/reader.rs index cf072d7c7c5..4610ccc54fb 100644 --- a/parquet/src/file/reader.rs +++ b/parquet/src/file/reader.rs @@ -145,7 +145,7 @@ pub trait RowGroupReader: Send + Sync { } /// Get bloom filter for the `i`th column chunk, if present. - fn get_column_bloom_filter(&self, i: usize) -> Result>; + fn get_column_bloom_filter(&self, i: usize) -> Option<&Sbbf>; /// Get iterator of `Row`s from this row group. /// diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 5705677fe7e..423a8eea222 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -20,6 +20,7 @@ use std::collections::VecDeque; use std::io::Cursor; +use std::iter; use std::{convert::TryFrom, fs::File, io::Read, path::Path, sync::Arc}; use crate::basic::{Encoding, Type}; @@ -328,7 +329,7 @@ impl FileReader for SerializedFileReader { f, row_group_metadata, props, - ))) + )?)) } fn get_row_iter(&self, projection: Option) -> Result { @@ -341,6 +342,7 @@ pub struct SerializedRowGroupReader<'a, R: ChunkReader> { chunk_reader: Arc, metadata: &'a RowGroupMetaData, props: ReaderPropertiesPtr, + bloom_filters: Vec>, } impl<'a, R: ChunkReader> SerializedRowGroupReader<'a, R> { @@ -349,12 +351,22 @@ impl<'a, R: ChunkReader> SerializedRowGroupReader<'a, R> { chunk_reader: Arc, metadata: &'a RowGroupMetaData, props: ReaderPropertiesPtr, - ) -> Self { - Self { - chunk_reader, + ) -> Result { + let bloom_filters = if props.read_bloom_filter() { + metadata + .columns() + .iter() + .map(|col| Sbbf::read_from_column_chunk(col, chunk_reader.clone())) + .collect::>>()? + } else { + iter::repeat(None).take(metadata.columns().len()).collect() + }; + Ok(Self { + chunk_reader: chunk_reader, metadata, props, - } + bloom_filters, + }) } } @@ -388,9 +400,8 @@ impl<'a, R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<' } /// get bloom filter for the `i`th column - fn get_column_bloom_filter(&self, i: usize) -> Result> { - let col = self.metadata.column(i); - Sbbf::read_from_column_chunk(col, self.chunk_reader.clone()) + fn get_column_bloom_filter(&self, i: usize) -> Option<&Sbbf> { + self.bloom_filters[i].as_ref() } fn get_row_iter(&self, projection: Option) -> Result {