Skip to content

Commit

Permalink
eager reading
Browse files Browse the repository at this point in the history
  • Loading branch information
jimexist committed Nov 16, 2022
1 parent 7ee3aa2 commit ec3b5d0
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 10 deletions.
2 changes: 1 addition & 1 deletion parquet/src/file/metadata.rs
Expand Up @@ -236,7 +236,7 @@ pub struct RowGroupMetaData {
}

impl RowGroupMetaData {
/// Returns builer for row group metadata.
/// Returns builder for row group metadata.
pub fn builder(schema_descr: SchemaDescPtr) -> RowGroupMetaDataBuilder {
RowGroupMetaDataBuilder::new(schema_descr)
}
Expand Down
21 changes: 21 additions & 0 deletions parquet/src/file/properties.rs
Expand Up @@ -685,6 +685,7 @@ pub type ReaderPropertiesPtr = Arc<ReaderProperties>;
/// Use [`ReaderPropertiesBuilder`] to assemble these properties.
pub struct ReaderProperties {
codec_options: CodecOptions,
read_bloom_filter: bool,
}

impl ReaderProperties {
Expand All @@ -697,11 +698,17 @@ impl ReaderProperties {
pub(crate) fn codec_options(&self) -> &CodecOptions {
&self.codec_options
}

/// Returns whether to read bloom filter
pub(crate) fn read_bloom_filter(&self) -> bool {
self.read_bloom_filter
}
}

/// Reader properties builder.
pub struct ReaderPropertiesBuilder {
codec_options_builder: CodecOptionsBuilder,
read_bloom_filter: Option<bool>,
}

/// Reader properties builder.
Expand All @@ -710,13 +717,15 @@ impl ReaderPropertiesBuilder {
fn with_defaults() -> Self {
Self {
codec_options_builder: CodecOptionsBuilder::default(),
read_bloom_filter: None,
}
}

/// Finalizes the configuration and returns immutable reader properties struct.
pub fn build(self) -> ReaderProperties {
ReaderProperties {
codec_options: self.codec_options_builder.build(),
read_bloom_filter: self.read_bloom_filter.unwrap_or(true),
}
}

Expand All @@ -734,6 +743,17 @@ impl ReaderPropertiesBuilder {
.set_backward_compatible_lz4(value);
self
}

/// Enable/disable reading bloom filter
///
/// If reading bloom filter is enabled, bloom filter will be read from the file.
/// If reading bloom filter is disabled, bloom filter will not be read from the file.
///
/// By default bloom filter is set to be read.
pub fn set_read_bloom_filter(mut self, value: bool) -> Self {
self.read_bloom_filter = Some(value);
self
}
}

#[cfg(test)]
Expand Down Expand Up @@ -944,6 +964,7 @@ mod tests {
.build();

assert_eq!(props.codec_options(), &codec_options);
assert_eq!(props.read_bloom_filter(), true);
}

#[test]
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/file/reader.rs
Expand Up @@ -145,7 +145,7 @@ pub trait RowGroupReader: Send + Sync {
}

/// Get bloom filter for the `i`th column chunk, if present.
fn get_column_bloom_filter(&self, i: usize) -> Result<Option<Sbbf>>;
fn get_column_bloom_filter(&self, i: usize) -> Option<&Sbbf>;

/// Get iterator of `Row`s from this row group.
///
Expand Down
27 changes: 19 additions & 8 deletions parquet/src/file/serialized_reader.rs
Expand Up @@ -20,6 +20,7 @@

use std::collections::VecDeque;
use std::io::Cursor;
use std::iter;
use std::{convert::TryFrom, fs::File, io::Read, path::Path, sync::Arc};

use crate::basic::{Encoding, Type};
Expand Down Expand Up @@ -328,7 +329,7 @@ impl<R: 'static + ChunkReader> FileReader for SerializedFileReader<R> {
f,
row_group_metadata,
props,
)))
)?))
}

fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter> {
Expand All @@ -341,6 +342,7 @@ pub struct SerializedRowGroupReader<'a, R: ChunkReader> {
chunk_reader: Arc<R>,
metadata: &'a RowGroupMetaData,
props: ReaderPropertiesPtr,
bloom_filters: Vec<Option<Sbbf>>,
}

impl<'a, R: ChunkReader> SerializedRowGroupReader<'a, R> {
Expand All @@ -349,12 +351,22 @@ impl<'a, R: ChunkReader> SerializedRowGroupReader<'a, R> {
chunk_reader: Arc<R>,
metadata: &'a RowGroupMetaData,
props: ReaderPropertiesPtr,
) -> Self {
Self {
chunk_reader,
) -> Result<Self> {
let bloom_filters = if props.read_bloom_filter() {
metadata
.columns()
.iter()
.map(|col| Sbbf::read_from_column_chunk(col, chunk_reader.clone()))
.collect::<Result<Vec<_>>>()?
} else {
iter::repeat(None).take(metadata.columns().len()).collect()
};
Ok(Self {
chunk_reader: chunk_reader,
metadata,
props,
}
bloom_filters,
})
}
}

Expand Down Expand Up @@ -388,9 +400,8 @@ impl<'a, R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<'
}

/// get bloom filter for the `i`th column
fn get_column_bloom_filter(&self, i: usize) -> Result<Option<Sbbf>> {
let col = self.metadata.column(i);
Sbbf::read_from_column_chunk(col, self.chunk_reader.clone())
fn get_column_bloom_filter(&self, i: usize) -> Option<&Sbbf> {
self.bloom_filters[i].as_ref()
}

fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter> {
Expand Down

0 comments on commit ec3b5d0

Please sign in to comment.