Skip to content

Commit

Permalink
add feature flag
Browse files Browse the repository at this point in the history
  • Loading branch information
jimexist committed Nov 16, 2022
1 parent f54178a commit 52bf18a
Show file tree
Hide file tree
Showing 8 changed files with 192 additions and 16 deletions.
Binary file not shown.
Empty file added bla.parquet/_SUCCESS
Empty file.
Binary file not shown.
1 change: 1 addition & 0 deletions parquet/Cargo.toml
Expand Up @@ -58,6 +58,7 @@ futures = { version = "0.3", default-features = false, features = ["std"], optio
tokio = { version = "1.0", optional = true, default-features = false, features = ["macros", "rt", "io-util"] }
hashbrown = { version = "0.13", default-features = false }
twox-hash = { version = "1.6", optional = true }
paste = "1.0"

[dev-dependencies]
base64 = { version = "0.13", default-features = false, features = ["std"] }
Expand Down
27 changes: 27 additions & 0 deletions parquet/src/bloom_filter/mod.rs
Expand Up @@ -24,9 +24,11 @@ use crate::file::metadata::ColumnChunkMetaData;
use crate::file::reader::ChunkReader;
use crate::format::{
BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash, BloomFilterHeader,
SplitBlockAlgorithm, Uncompressed, XxHash,
};
use bytes::{Buf, Bytes};
use std::hash::Hasher;
use std::io::Write;
use std::sync::Arc;
use thrift::protocol::{TCompactInputProtocol, TSerializable};
use twox_hash::XxHash64;
Expand Down Expand Up @@ -80,6 +82,7 @@ fn block_check(block: &Block, hash: u32) -> bool {
}

/// A split block Bloom filter
#[derive(Debug, Clone)]
pub struct Sbbf(Vec<Block>);

const SBBF_HEADER_SIZE_ESTIMATE: usize = 20;
Expand Down Expand Up @@ -128,6 +131,30 @@ impl Sbbf {
Self(data)
}

pub fn write_bitset<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
for block in &self.0 {
for word in block {
writer.write_all(&word.to_le_bytes()).map_err(|e| {
ParquetError::General(format!(
"Could not write bloom filter bit set: {}",
e
))
})?;
}
}
Ok(())
}

pub fn header(&self) -> BloomFilterHeader {
BloomFilterHeader {
// 8 i32 per block, 4 bytes per i32
num_bytes: self.0.len() as i32 * 4 * 8,
algorithm: BloomFilterAlgorithm::BLOCK(SplitBlockAlgorithm {}),
hash: BloomFilterHash::XXHASH(XxHash {}),
compression: BloomFilterCompression::UNCOMPRESSED(Uncompressed {}),
}
}

pub fn read_from_column_chunk<R: ChunkReader>(
column_metadata: &ColumnChunkMetaData,
reader: Arc<R>,
Expand Down
15 changes: 15 additions & 0 deletions parquet/src/column/writer/mod.rs
Expand Up @@ -16,6 +16,9 @@
// under the License.

//! Contains column writer API.

#[cfg(feature = "bloom")]
use crate::bloom_filter::Sbbf;
use crate::format::{ColumnIndex, OffsetIndex};
use std::collections::{BTreeSet, VecDeque};

Expand Down Expand Up @@ -154,6 +157,9 @@ pub struct ColumnCloseResult {
pub rows_written: u64,
/// Metadata for this column chunk
pub metadata: ColumnChunkMetaData,
/// Optional bloom filter for this column
#[cfg(feature = "bloom")]
pub bloom_filter: Option<Sbbf>,
/// Optional column index, for filtering
pub column_index: Option<ColumnIndex>,
/// Optional offset index, identifying page locations
Expand Down Expand Up @@ -209,6 +215,10 @@ pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> {
rep_levels_sink: Vec<i16>,
data_pages: VecDeque<CompressedPage>,

// bloom filter
#[cfg(feature = "bloom")]
bloom_filter: Option<Sbbf>,

// column index and offset index
column_index_builder: ColumnIndexBuilder,
offset_index_builder: OffsetIndexBuilder,
Expand Down Expand Up @@ -260,6 +270,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
num_column_nulls: 0,
column_distinct_count: None,
},
// TODO!
#[cfg(feature = "bloom")]
bloom_filter: None,
column_index_builder: ColumnIndexBuilder::new(),
offset_index_builder: OffsetIndexBuilder::new(),
encodings,
Expand Down Expand Up @@ -458,6 +471,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
Ok(ColumnCloseResult {
bytes_written: self.column_metrics.total_bytes_written,
rows_written: self.column_metrics.total_rows_written,
#[cfg(feature = "bloom")]
bloom_filter: self.bloom_filter,
metadata,
column_index,
offset_index,
Expand Down
114 changes: 100 additions & 14 deletions parquet/src/file/properties.rs
Expand Up @@ -64,6 +64,7 @@
//! .build();
//! ```

use paste::paste;
use std::{collections::HashMap, sync::Arc};

use crate::basic::{Compression, Encoding};
Expand All @@ -82,6 +83,9 @@ const DEFAULT_STATISTICS_ENABLED: EnabledStatistics = EnabledStatistics::Page;
const DEFAULT_MAX_STATISTICS_SIZE: usize = 4096;
const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024;
const DEFAULT_CREATED_BY: &str = env!("PARQUET_CREATED_BY");
const DEFAULT_BLOOM_FILTER_ENABLED: bool = false;
const DEFAULT_BLOOM_FILTER_MAX_BYTES: u32 = 1024 * 1024;
const DEFAULT_BLOOM_FILTER_FPP: f64 = 0.01;

/// Parquet writer version.
///
Expand Down Expand Up @@ -125,6 +129,26 @@ pub struct WriterProperties {
sorting_columns: Option<Vec<SortingColumn>>,
}

macro_rules! def_col_property_getter {
($field:ident, $field_type:ty) => {
pub fn $field(&self, col: &ColumnPath) -> Option<$field_type> {
self.column_properties
.get(col)
.and_then(|c| c.$field())
.or_else(|| self.default_column_properties.$field())
}
};
($field:ident, $field_type:ty, $default_val:expr) => {
pub fn $field(&self, col: &ColumnPath) -> $field_type {
self.column_properties
.get(col)
.and_then(|c| c.$field())
.or_else(|| self.default_column_properties.$field())
.unwrap_or($default_val)
}
};
}

impl WriterProperties {
/// Returns builder for writer properties with default values.
pub fn builder() -> WriterPropertiesBuilder {
Expand Down Expand Up @@ -255,6 +279,11 @@ impl WriterProperties {
.or_else(|| self.default_column_properties.max_statistics_size())
.unwrap_or(DEFAULT_MAX_STATISTICS_SIZE)
}

def_col_property_getter!(bloom_filter_enabled, bool, DEFAULT_BLOOM_FILTER_ENABLED);
def_col_property_getter!(bloom_filter_fpp, f64, DEFAULT_BLOOM_FILTER_FPP);
def_col_property_getter!(bloom_filter_ndv, u64);
def_col_property_getter!(bloom_filter_max_bytes, u32, DEFAULT_BLOOM_FILTER_MAX_BYTES);
}

/// Writer properties builder.
Expand All @@ -272,6 +301,40 @@ pub struct WriterPropertiesBuilder {
sorting_columns: Option<Vec<SortingColumn>>,
}

macro_rules! def_opt_field_setter {
($field: ident, $type: ty) => {
paste! {
pub fn [<set_ $field>](&mut self, value: $type) -> &mut Self {
self.$field = Some(value);
self
}
}
};
}

macro_rules! def_opt_field_getter {
($field: ident, $type: ty) => {
paste! {
#[doc = "Returns " $field " if set."]
pub fn $field(&self) -> Option<$type> {
self.$field
}
}
};
}

macro_rules! def_per_col_setter {
($field:ident, $field_type:ty) => {
paste! {
#[doc = "Sets " $field " for a column. Takes precedence over globally defined settings."]
pub fn [<set_column_ $field>](mut self, col: ColumnPath, value: $field_type) -> Self {
self.get_mut_props(col).[<set_ $field>](value);
self
}
}
}
}

impl WriterPropertiesBuilder {
/// Returns default state of the builder.
fn with_defaults() -> Self {
Expand All @@ -284,7 +347,7 @@ impl WriterPropertiesBuilder {
writer_version: DEFAULT_WRITER_VERSION,
created_by: DEFAULT_CREATED_BY.to_string(),
key_value_metadata: None,
default_column_properties: ColumnProperties::new(),
default_column_properties: Default::default(),
column_properties: HashMap::new(),
sorting_columns: None,
}
Expand Down Expand Up @@ -439,7 +502,7 @@ impl WriterPropertiesBuilder {
fn get_mut_props(&mut self, col: ColumnPath) -> &mut ColumnProperties {
self.column_properties
.entry(col)
.or_insert_with(ColumnProperties::new)
.or_insert_with(Default::default)
}

/// Sets encoding for a column.
Expand Down Expand Up @@ -492,6 +555,11 @@ impl WriterPropertiesBuilder {
self.get_mut_props(col).set_max_statistics_size(value);
self
}

def_per_col_setter!(bloom_filter_enabled, bool);
def_per_col_setter!(bloom_filter_fpp, f64);
def_per_col_setter!(bloom_filter_max_bytes, u32);
def_per_col_setter!(bloom_filter_ndv, u64);
}

/// Controls the level of statistics to be computed by the writer
Expand All @@ -515,27 +583,24 @@ impl Default for EnabledStatistics {
///
/// If a field is `None`, it means that no specific value has been set for this column,
/// so some subsequent or default value must be used.
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, Default, PartialEq)]
struct ColumnProperties {
encoding: Option<Encoding>,
codec: Option<Compression>,
dictionary_enabled: Option<bool>,
statistics_enabled: Option<EnabledStatistics>,
max_statistics_size: Option<usize>,
/// bloom filter enabled
bloom_filter_enabled: Option<bool>,
/// bloom filter expected number of distinct values
bloom_filter_ndv: Option<u64>,
/// bloom filter false positive probability
bloom_filter_fpp: Option<f64>,
/// bloom filter max number of bytes
bloom_filter_max_bytes: Option<u32>,
}

impl ColumnProperties {
/// Initialise column properties with default values.
fn new() -> Self {
Self {
encoding: None,
codec: None,
dictionary_enabled: None,
statistics_enabled: None,
max_statistics_size: None,
}
}

/// Sets encoding for this column.
///
/// If dictionary is not enabled, this is treated as a primary encoding for a column.
Expand Down Expand Up @@ -572,6 +637,11 @@ impl ColumnProperties {
self.max_statistics_size = Some(value);
}

def_opt_field_setter!(bloom_filter_enabled, bool);
def_opt_field_setter!(bloom_filter_fpp, f64);
def_opt_field_setter!(bloom_filter_max_bytes, u32);
def_opt_field_setter!(bloom_filter_ndv, u64);

/// Returns optional encoding for this column.
fn encoding(&self) -> Option<Encoding> {
self.encoding
Expand Down Expand Up @@ -599,6 +669,11 @@ impl ColumnProperties {
fn max_statistics_size(&self) -> Option<usize> {
self.max_statistics_size
}

def_opt_field_getter!(bloom_filter_enabled, bool);
def_opt_field_getter!(bloom_filter_fpp, f64);
def_opt_field_getter!(bloom_filter_max_bytes, u32);
def_opt_field_getter!(bloom_filter_ndv, u64);
}

/// Reference counted reader properties.
Expand Down Expand Up @@ -701,6 +776,13 @@ mod tests {
props.max_statistics_size(&ColumnPath::from("col")),
DEFAULT_MAX_STATISTICS_SIZE
);
assert_eq!(props.bloom_filter_enabled(&ColumnPath::from("col")), false);
assert_eq!(props.bloom_filter_fpp(&ColumnPath::from("col")), 0.01);
assert_eq!(props.bloom_filter_ndv(&ColumnPath::from("col")), None);
assert_eq!(
props.bloom_filter_max_bytes(&ColumnPath::from("col")),
1024 * 1024
);
}

#[test]
Expand Down Expand Up @@ -784,6 +866,10 @@ mod tests {
EnabledStatistics::Chunk,
)
.set_column_max_statistics_size(ColumnPath::from("col"), 123)
.set_column_bloom_filter_enabled(ColumnPath::from("col"), true)
.set_column_bloom_filter_ndv(ColumnPath::from("col"), 100)
.set_column_bloom_filter_fpp(ColumnPath::from("col"), 0.1)
.set_column_bloom_filter_max_bytes(ColumnPath::from("col"), 1000)
.build();

assert_eq!(props.writer_version(), WriterVersion::PARQUET_2_0);
Expand Down

0 comments on commit 52bf18a

Please sign in to comment.