From 02371d2be6b9fd276ea3423d51122c47935b7714 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Sun, 17 Jul 2022 16:32:14 -0400 Subject: [PATCH] Generify parquet write path (#1764) (#2045) * Generify parquet write path (#1764) * More docs * Lint * Fix doc * Review feedback * Fix doc --- parquet/src/basic.rs | 2 +- parquet/src/column/writer/encoder.rs | 250 +++++++ .../src/column/{writer.rs => writer/mod.rs} | 667 ++++++------------ parquet/src/data_type.rs | 90 +-- parquet/src/file/statistics.rs | 110 ++- 5 files changed, 584 insertions(+), 535 deletions(-) create mode 100644 parquet/src/column/writer/encoder.rs rename parquet/src/column/{writer.rs => writer/mod.rs} (81%) diff --git a/parquet/src/basic.rs b/parquet/src/basic.rs index 59a0fe07b7d..2d8073e75fe 100644 --- a/parquet/src/basic.rs +++ b/parquet/src/basic.rs @@ -212,7 +212,7 @@ pub enum Repetition { /// Encodings supported by Parquet. /// Not all encodings are valid for all types. These enums are also used to specify the /// encoding of definition and repetition levels. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd)] pub enum Encoding { /// Default byte encoding. /// - BOOLEAN - 1 bit per value, 0 is false; 1 is true. diff --git a/parquet/src/column/writer/encoder.rs b/parquet/src/column/writer/encoder.rs new file mode 100644 index 00000000000..bc31aedf4e5 --- /dev/null +++ b/parquet/src/column/writer/encoder.rs @@ -0,0 +1,250 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::basic::Encoding; +use crate::column::writer::{ + compare_greater, fallback_encoding, has_dictionary_support, is_nan, update_max, + update_min, +}; +use crate::data_type::private::ParquetValueType; +use crate::data_type::DataType; +use crate::encodings::encoding::{get_encoder, DictEncoder, Encoder}; +use crate::errors::{ParquetError, Result}; +use crate::file::properties::WriterProperties; +use crate::schema::types::{ColumnDescPtr, ColumnDescriptor}; +use crate::util::memory::ByteBufferPtr; + +/// A collection of [`ParquetValueType`] encoded by a [`ColumnValueEncoder`] +pub trait ColumnValues { + /// The underlying value type + type T: ParquetValueType; + + /// The number of values in this collection + fn len(&self) -> usize; + + /// Returns the min and max values in this collection, skipping any NaN values + /// + /// Returns `None` if no values found + fn min_max(&self, descr: &ColumnDescriptor) -> Option<(&Self::T, &Self::T)>; +} + +/// The encoded data for a dictionary page +pub struct DictionaryPage { + pub buf: ByteBufferPtr, + pub num_values: usize, + pub is_sorted: bool, +} + +/// The encoded values for a data page, with optional statistics +pub struct DataPageValues { + pub buf: ByteBufferPtr, + pub num_values: usize, + pub encoding: Encoding, + pub min_value: Option, + pub max_value: Option, +} + +/// A generic encoder of [`ColumnValues`] to data and dictionary pages used by +/// [super::GenericColumnWriter`] +pub trait ColumnValueEncoder { + /// The underlying value type of [`Self::Values`] + /// + /// Note: this avoids needing to fully qualify `::T` + type T: ParquetValueType; + + /// The values encoded by this encoder + type Values: ColumnValues + ?Sized; + + /// Create a new [`ColumnValueEncoder`] + fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result + where + Self: Sized; + + /// Write the corresponding values to this [`ColumnValueEncoder`] + fn write(&mut self, values: &Self::Values, offset: usize, len: usize) -> Result<()>; + + /// Returns the number of buffered values + fn num_values(&self) -> usize; + + /// Returns true if this encoder has a dictionary page + fn has_dictionary(&self) -> bool; + + /// Returns an estimate of the dictionary page size in bytes, or `None` if no dictionary + fn estimated_dict_page_size(&self) -> Option; + + /// Returns an estimate of the data page size in bytes + fn estimated_data_page_size(&self) -> usize; + + /// Flush the dictionary page for this column chunk if any. Any subsequent calls to + /// [`Self::write`] will not be dictionary encoded + /// + /// Note: [`Self::flush_data_page`] must be called first, as this will error if there + /// are any pending page values + fn flush_dict_page(&mut self) -> Result>; + + /// Flush the next data page for this column chunk + fn flush_data_page(&mut self) -> Result>; +} + +pub struct ColumnValueEncoderImpl { + encoder: Box>, + dict_encoder: Option>, + descr: ColumnDescPtr, + num_values: usize, + min_value: Option, + max_value: Option, +} + +impl ColumnValueEncoder for ColumnValueEncoderImpl { + type T = T::T; + + type Values = [T::T]; + + fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result { + let dict_supported = props.dictionary_enabled(descr.path()) + && has_dictionary_support(T::get_physical_type(), props); + let dict_encoder = dict_supported.then(|| DictEncoder::new(descr.clone())); + + // Set either main encoder or fallback encoder. + let encoder = get_encoder( + descr.clone(), + props + .encoding(descr.path()) + .unwrap_or_else(|| fallback_encoding(T::get_physical_type(), props)), + )?; + + Ok(Self { + encoder, + dict_encoder, + descr: descr.clone(), + num_values: 0, + min_value: None, + max_value: None, + }) + } + + fn write(&mut self, values: &[T::T], offset: usize, len: usize) -> Result<()> { + self.num_values += len; + + let slice = values.get(offset..offset + len).ok_or_else(|| { + general_err!( + "Expected to write {} values, but have only {}", + len, + values.len() - offset + ) + })?; + + if let Some((min, max)) = slice.min_max(&self.descr) { + update_min(&self.descr, min, &mut self.min_value); + update_max(&self.descr, max, &mut self.max_value); + } + + match &mut self.dict_encoder { + Some(encoder) => encoder.put(slice), + _ => self.encoder.put(slice), + } + } + + fn num_values(&self) -> usize { + self.num_values + } + + fn has_dictionary(&self) -> bool { + self.dict_encoder.is_some() + } + + fn estimated_dict_page_size(&self) -> Option { + Some(self.dict_encoder.as_ref()?.dict_encoded_size()) + } + + fn estimated_data_page_size(&self) -> usize { + match &self.dict_encoder { + Some(encoder) => encoder.estimated_data_encoded_size(), + _ => self.encoder.estimated_data_encoded_size(), + } + } + + fn flush_dict_page(&mut self) -> Result> { + match self.dict_encoder.take() { + Some(encoder) => { + if self.num_values != 0 { + return Err(general_err!( + "Must flush data pages before flushing dictionary" + )); + } + + let buf = encoder.write_dict()?; + + Ok(Some(DictionaryPage { + buf, + num_values: encoder.num_entries(), + is_sorted: encoder.is_sorted(), + })) + } + _ => Ok(None), + } + } + + fn flush_data_page(&mut self) -> Result> { + let (buf, encoding) = match &mut self.dict_encoder { + Some(encoder) => (encoder.write_indices()?, Encoding::RLE_DICTIONARY), + _ => (self.encoder.flush_buffer()?, self.encoder.encoding()), + }; + + Ok(DataPageValues { + buf, + encoding, + num_values: std::mem::take(&mut self.num_values), + min_value: self.min_value.take(), + max_value: self.max_value.take(), + }) + } +} + +impl ColumnValues for [T] { + type T = T; + + fn len(&self) -> usize { + self.len() + } + + fn min_max(&self, descr: &ColumnDescriptor) -> Option<(&T, &T)> { + let mut iter = self.iter(); + + let first = loop { + let next = iter.next()?; + if !is_nan(next) { + break next; + } + }; + + let mut min = first; + let mut max = first; + for val in iter { + if is_nan(val) { + continue; + } + if compare_greater(descr, min, val) { + min = val; + } + if compare_greater(descr, val, max) { + max = val; + } + } + Some((min, max)) + } +} diff --git a/parquet/src/column/writer.rs b/parquet/src/column/writer/mod.rs similarity index 81% rename from parquet/src/column/writer.rs rename to parquet/src/column/writer/mod.rs index 1fc5207f6b4..ff6c098980a 100644 --- a/parquet/src/column/writer.rs +++ b/parquet/src/column/writer/mod.rs @@ -17,18 +17,17 @@ //! Contains column writer API. use parquet_format::{ColumnIndex, OffsetIndex}; -use std::{collections::VecDeque, convert::TryFrom, marker::PhantomData}; +use std::collections::{BTreeSet, VecDeque}; use crate::basic::{Compression, ConvertedType, Encoding, LogicalType, PageType, Type}; use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter}; +use crate::column::writer::encoder::{ + ColumnValueEncoder, ColumnValueEncoderImpl, ColumnValues, +}; use crate::compression::{create_codec, Codec}; use crate::data_type::private::ParquetValueType; -use crate::data_type::AsBytes; use crate::data_type::*; -use crate::encodings::{ - encoding::{get_encoder, DictEncoder, Encoder}, - levels::{max_buffer_size, LevelEncoder}, -}; +use crate::encodings::levels::{max_buffer_size, LevelEncoder}; use crate::errors::{ParquetError, Result}; use crate::file::metadata::{ColumnIndexBuilder, OffsetIndexBuilder}; use crate::file::properties::EnabledStatistics; @@ -38,9 +37,10 @@ use crate::file::{ properties::{WriterProperties, WriterPropertiesPtr, WriterVersion}, }; use crate::schema::types::{ColumnDescPtr, ColumnDescriptor}; -use crate::util::bit_util::FromBytes; use crate::util::memory::ByteBufferPtr; +pub(crate) mod encoder; + /// Column writer for a Parquet type. pub enum ColumnWriter<'a> { BoolColumnWriter(ColumnWriterImpl<'a, BoolType>), @@ -58,26 +58,6 @@ pub enum Level { Column, } -macro_rules! gen_stats_section { - ($physical_ty: ty, $stat_fn: ident, $min: ident, $max: ident, $distinct: ident, $nulls: ident) => {{ - let min = $min.as_ref().and_then(|v| { - Some(read_num_bytes!( - $physical_ty, - v.as_bytes().len(), - &v.as_bytes() - )) - }); - let max = $max.as_ref().and_then(|v| { - Some(read_num_bytes!( - $physical_ty, - v.as_bytes().len(), - &v.as_bytes() - )) - }); - Statistics::$stat_fn(min, max, $distinct, $nulls, false) - }}; -} - /// Gets a specific column writer corresponding to column descriptor `descr`. pub fn get_column_writer<'a>( descr: ColumnDescPtr, @@ -174,26 +154,27 @@ type ColumnCloseResult = ( ); /// Typed column writer for a primitive column. -pub struct ColumnWriterImpl<'a, T: DataType> { +pub type ColumnWriterImpl<'a, T> = GenericColumnWriter<'a, ColumnValueEncoderImpl>; + +pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> { // Column writer properties descr: ColumnDescPtr, props: WriterPropertiesPtr, statistics_enabled: EnabledStatistics, page_writer: Box, - has_dictionary: bool, - dict_encoder: Option>, - encoder: Box>, codec: Compression, compressor: Option>, + encoder: E, + // Metrics per page + /// The number of values including nulls in the in-progress data page num_buffered_values: u32, - num_buffered_encoded_values: u32, + /// The number of rows in the in-progress data page num_buffered_rows: u32, - min_page_value: Option, - max_page_value: Option, + /// The number of nulls in the in-progress data page num_page_nulls: u64, - page_distinct_count: Option, + // Metrics per column writer total_bytes_written: u64, total_rows_written: u64, @@ -202,21 +183,26 @@ pub struct ColumnWriterImpl<'a, T: DataType> { total_num_values: u64, dictionary_page_offset: Option, data_page_offset: Option, - min_column_value: Option, - max_column_value: Option, + min_column_value: Option, + max_column_value: Option, num_column_nulls: u64, column_distinct_count: Option, + + /// The order of encodings within the generated metadata does not impact its meaning, + /// but we use a BTreeSet so that the output is deterministic + encodings: BTreeSet, + // Reused buffers def_levels_sink: Vec, rep_levels_sink: Vec, data_pages: VecDeque, - _phantom: PhantomData, + // column index and offset index column_index_builder: ColumnIndexBuilder, offset_index_builder: OffsetIndexBuilder, } -impl<'a, T: DataType> ColumnWriterImpl<'a, T> { +impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { pub fn new( descr: ColumnDescPtr, props: WriterPropertiesPtr, @@ -224,43 +210,25 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { ) -> Self { let codec = props.compression(descr.path()); let compressor = create_codec(codec).unwrap(); - - // Optionally set dictionary encoder. - let dict_encoder = if props.dictionary_enabled(descr.path()) - && has_dictionary_support(T::get_physical_type(), &props) - { - Some(DictEncoder::new(descr.clone())) - } else { - None - }; - - // Whether or not this column writer has a dictionary encoding. - let has_dictionary = dict_encoder.is_some(); - - // Set either main encoder or fallback encoder. - let fallback_encoder = get_encoder( - descr.clone(), - props - .encoding(descr.path()) - .unwrap_or_else(|| fallback_encoding(T::get_physical_type(), &props)), - ) - .unwrap(); + let encoder = E::try_new(&descr, props.as_ref()).unwrap(); let statistics_enabled = props.statistics_enabled(descr.path()); + let mut encodings = BTreeSet::new(); + // Used for level information + encodings.insert(Encoding::RLE); + Self { descr, props, statistics_enabled, page_writer, - has_dictionary, - dict_encoder, - encoder: fallback_encoder, codec, compressor, + encoder, num_buffered_values: 0, - num_buffered_encoded_values: 0, num_buffered_rows: 0, + num_page_nulls: 0, total_bytes_written: 0, total_rows_written: 0, total_uncompressed_size: 0, @@ -271,27 +239,23 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { def_levels_sink: vec![], rep_levels_sink: vec![], data_pages: VecDeque::new(), - min_page_value: None, - max_page_value: None, - num_page_nulls: 0, - page_distinct_count: None, min_column_value: None, max_column_value: None, num_column_nulls: 0, column_distinct_count: None, - _phantom: PhantomData, column_index_builder: ColumnIndexBuilder::new(), offset_index_builder: OffsetIndexBuilder::new(), + encodings, } } fn write_batch_internal( &mut self, - values: &[T::T], + values: &E::Values, def_levels: Option<&[i16]>, rep_levels: Option<&[i16]>, - min: Option<&T::T>, - max: Option<&T::T>, + min: Option<&E::T>, + max: Option<&E::T>, distinct_count: Option, ) -> Result { // We check for DataPage limits only after we have inserted the values. If a user @@ -304,18 +268,14 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { // TODO: find out why we don't account for size of levels when we estimate page // size. - // Find out the minimal length to prevent index out of bound errors. - let mut min_len = values.len(); - if let Some(levels) = def_levels { - min_len = min_len.min(levels.len()); - } - if let Some(levels) = rep_levels { - min_len = min_len.min(levels.len()); - } + let num_levels = match def_levels { + Some(def_levels) => def_levels.len(), + None => values.len(), + }; // Find out number of batches to process. let write_batch_size = self.props.write_batch_size(); - let num_batches = min_len / write_batch_size; + let num_batches = num_levels / write_batch_size; // If only computing chunk-level statistics compute them here, page-level statistics // are computed in [`Self::write_mini_batch`] and used to update chunk statistics in @@ -323,23 +283,23 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { if self.statistics_enabled == EnabledStatistics::Chunk { match (min, max) { (Some(min), Some(max)) => { - Self::update_min(&self.descr, min, &mut self.min_column_value); - Self::update_max(&self.descr, max, &mut self.max_column_value); + update_min(&self.descr, min, &mut self.min_column_value); + update_max(&self.descr, max, &mut self.max_column_value); } (None, Some(_)) | (Some(_), None) => { panic!("min/max should be both set or both None") } (None, None) => { - for val in values { - Self::update_min(&self.descr, val, &mut self.min_column_value); - Self::update_max(&self.descr, val, &mut self.max_column_value); + if let Some((min, max)) = values.min_max(&self.descr) { + update_min(&self.descr, min, &mut self.min_column_value); + update_max(&self.descr, max, &mut self.max_column_value); } } }; } // We can only set the distinct count if there are no other writes - if self.num_buffered_values == 0 && self.num_page_nulls == 0 { + if self.encoder.num_values() == 0 { self.column_distinct_count = distinct_count; } else { self.column_distinct_count = None; @@ -349,7 +309,9 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { let mut levels_offset = 0; for _ in 0..num_batches { values_offset += self.write_mini_batch( - &values[values_offset..values_offset + write_batch_size], + values, + values_offset, + write_batch_size, def_levels.map(|lv| &lv[levels_offset..levels_offset + write_batch_size]), rep_levels.map(|lv| &lv[levels_offset..levels_offset + write_batch_size]), )?; @@ -357,7 +319,9 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { } values_offset += self.write_mini_batch( - &values[values_offset..], + values, + values_offset, + num_levels - levels_offset, def_levels.map(|lv| &lv[levels_offset..]), rep_levels.map(|lv| &lv[levels_offset..]), )?; @@ -380,7 +344,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { /// non-nullable and/or non-repeated. pub fn write_batch( &mut self, - values: &[T::T], + values: &E::Values, def_levels: Option<&[i16]>, rep_levels: Option<&[i16]>, ) -> Result { @@ -396,11 +360,11 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { /// computed page statistics pub fn write_batch_with_statistics( &mut self, - values: &[T::T], + values: &E::Values, def_levels: Option<&[i16]>, rep_levels: Option<&[i16]>, - min: Option<&T::T>, - max: Option<&T::T>, + min: Option<&E::T>, + max: Option<&E::T>, distinct_count: Option, ) -> Result { self.write_batch_internal( @@ -428,12 +392,14 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { /// Finalises writes and closes the column writer. /// Returns total bytes written, total rows written and column chunk metadata. pub fn close(mut self) -> Result { - if self.dict_encoder.is_some() { + if self.num_buffered_values > 0 { + self.add_data_page()?; + } + if self.encoder.has_dictionary() { self.write_dictionary_page()?; } self.flush_data_pages()?; let metadata = self.write_column_metadata()?; - self.dict_encoder = None; self.page_writer.close()?; let (column_index, offset_index) = if self.column_index_builder.valid() { @@ -459,12 +425,12 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { /// page size. fn write_mini_batch( &mut self, - values: &[T::T], + values: &E::Values, + values_offset: usize, + num_levels: usize, def_levels: Option<&[i16]>, rep_levels: Option<&[i16]>, ) -> Result { - let mut values_to_write = 0; - // Check if number of definition levels is the same as number of repetition // levels. if let (Some(def), Some(rep)) = (def_levels, rep_levels) { @@ -478,7 +444,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { } // Process definition levels and determine how many values to write. - let num_values = if self.descr.max_def_level() > 0 { + let values_to_write = if self.descr.max_def_level() > 0 { let levels = def_levels.ok_or_else(|| { general_err!( "Definition levels are required, because max definition level = {}", @@ -486,6 +452,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { ) })?; + let mut values_to_write = 0; for &level in levels { if level == self.descr.max_def_level() { values_to_write += 1; @@ -494,11 +461,10 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { } } - self.write_definition_levels(levels); - u32::try_from(levels.len()).unwrap() + self.def_levels_sink.extend_from_slice(levels); + values_to_write } else { - values_to_write = values.len(); - u32::try_from(values_to_write).unwrap() + num_levels }; // Process repetition levels and determine how many rows we are about to process. @@ -516,32 +482,15 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { self.num_buffered_rows += (level == 0) as u32 } - self.write_repetition_levels(levels); + self.rep_levels_sink.extend_from_slice(levels); } else { // Each value is exactly one row. // Equals to the number of values, we count nulls as well. - self.num_buffered_rows += num_values; + self.num_buffered_rows += num_levels as u32; } - // Check that we have enough values to write. - let values_to_write = values.get(0..values_to_write).ok_or_else(|| { - general_err!( - "Expected to write {} values, but have only {}", - values_to_write, - values.len() - ) - })?; - - if self.statistics_enabled == EnabledStatistics::Page { - for val in values_to_write { - self.update_page_min_max(val); - } - } - - self.write_values(values_to_write)?; - - self.num_buffered_values += num_values; - self.num_buffered_encoded_values += u32::try_from(values_to_write.len()).unwrap(); + self.encoder.write(values, values_offset, values_to_write)?; + self.num_buffered_values += num_levels as u32; if self.should_add_data_page() { self.add_data_page()?; @@ -551,25 +500,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { self.dict_fallback()?; } - Ok(values_to_write.len()) - } - - #[inline] - fn write_definition_levels(&mut self, def_levels: &[i16]) { - self.def_levels_sink.extend_from_slice(def_levels); - } - - #[inline] - fn write_repetition_levels(&mut self, rep_levels: &[i16]) { - self.rep_levels_sink.extend_from_slice(rep_levels); - } - - #[inline] - fn write_values(&mut self, values: &[T::T]) -> Result<()> { - match self.dict_encoder { - Some(ref mut encoder) => encoder.put(values), - None => self.encoder.put(values), - } + Ok(values_to_write) } /// Returns true if we need to fall back to non-dictionary encoding. @@ -578,10 +509,8 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { /// size. #[inline] fn should_dict_fallback(&self) -> bool { - match self.dict_encoder { - Some(ref encoder) => { - encoder.dict_encoded_size() >= self.props.dictionary_pagesize_limit() - } + match self.encoder.estimated_dict_page_size() { + Some(size) => size >= self.props.dictionary_pagesize_limit(), None => false, } } @@ -593,28 +522,22 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { // // In such a scenario the dictionary decoder may return an estimated encoded // size in excess of the page size limit, even when there are no buffered values - if self.num_buffered_values == 0 { + if self.encoder.num_values() == 0 { return false; } - match self.dict_encoder { - Some(ref encoder) => { - encoder.estimated_data_encoded_size() >= self.props.data_pagesize_limit() - } - None => { - self.encoder.estimated_data_encoded_size() - >= self.props.data_pagesize_limit() - } - } + self.encoder.estimated_data_page_size() >= self.props.data_pagesize_limit() } /// Performs dictionary fallback. /// Prepares and writes dictionary and all data pages into page writer. fn dict_fallback(&mut self) -> Result<()> { // At this point we know that we need to fall back. + if self.num_buffered_values > 0 { + self.add_data_page()?; + } self.write_dictionary_page()?; self.flush_data_pages()?; - self.dict_encoder = None; Ok(()) } @@ -658,28 +581,24 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { /// Data page is either buffered in case of dictionary encoding or written directly. fn add_data_page(&mut self) -> Result<()> { // Extract encoded values - let value_bytes = match self.dict_encoder { - Some(ref mut encoder) => encoder.write_indices()?, - None => self.encoder.flush_buffer()?, - }; - - // Select encoding based on current encoder and writer version (v1 or v2). - let encoding = if self.dict_encoder.is_some() { - self.props.dictionary_data_page_encoding() - } else { - self.encoder.encoding() - }; + let values_data = self.encoder.flush_data_page()?; let max_def_level = self.descr.max_def_level(); let max_rep_level = self.descr.max_rep_level(); self.num_column_nulls += self.num_page_nulls; - let has_min_max = self.min_page_value.is_some() && self.max_page_value.is_some(); - let page_statistics = match self.statistics_enabled { - EnabledStatistics::Page if has_min_max => { - self.update_column_min_max(); - Some(self.make_page_statistics()) + let page_statistics = match (values_data.min_value, values_data.max_value) { + (Some(min), Some(max)) => { + update_min(&self.descr, &min, &mut self.min_column_value); + update_max(&self.descr, &max, &mut self.max_column_value); + Some(Statistics::new( + Some(min), + Some(max), + None, + self.num_page_nulls, + false, + )) } _ => None, }; @@ -711,11 +630,11 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { ); } - buffer.extend_from_slice(value_bytes.data()); + buffer.extend_from_slice(values_data.buf.data()); let uncompressed_size = buffer.len(); if let Some(ref mut cmpr) = self.compressor { - let mut compressed_buf = Vec::with_capacity(value_bytes.data().len()); + let mut compressed_buf = Vec::with_capacity(uncompressed_size); cmpr.compress(&buffer[..], &mut compressed_buf)?; buffer = compressed_buf; } @@ -723,7 +642,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { let data_page = Page::DataPage { buf: ByteBufferPtr::new(buffer), num_values: self.num_buffered_values, - encoding, + encoding: values_data.encoding, def_level_encoding: Encoding::RLE, rep_level_encoding: Encoding::RLE, statistics: page_statistics, @@ -751,20 +670,20 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { } let uncompressed_size = - rep_levels_byte_len + def_levels_byte_len + value_bytes.len(); + rep_levels_byte_len + def_levels_byte_len + values_data.buf.len(); // Data Page v2 compresses values only. match self.compressor { Some(ref mut cmpr) => { - cmpr.compress(value_bytes.data(), &mut buffer)?; + cmpr.compress(values_data.buf.data(), &mut buffer)?; } - None => buffer.extend_from_slice(value_bytes.data()), + None => buffer.extend_from_slice(values_data.buf.data()), } let data_page = Page::DataPageV2 { buf: ByteBufferPtr::new(buffer), num_values: self.num_buffered_values, - encoding, + encoding: values_data.encoding, num_nulls: self.num_page_nulls as u32, num_rows: self.num_buffered_rows, def_levels_byte_len: def_levels_byte_len as u32, @@ -778,7 +697,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { }; // Check if we need to buffer data page or flush it to the sink directly. - if self.dict_encoder.is_some() { + if self.encoder.has_dictionary() { self.data_pages.push_back(compressed_page); } else { self.write_data_page(compressed_page)?; @@ -791,12 +710,8 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { self.rep_levels_sink.clear(); self.def_levels_sink.clear(); self.num_buffered_values = 0; - self.num_buffered_encoded_values = 0; self.num_buffered_rows = 0; - self.min_page_value = None; - self.max_page_value = None; self.num_page_nulls = 0; - self.page_distinct_count = None; Ok(()) } @@ -826,30 +741,22 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { // If data page offset is not set, then no pages have been written let data_page_offset = self.data_page_offset.unwrap_or(0) as i64; - let file_offset; - let mut encodings = Vec::new(); - - if self.has_dictionary { - assert!(dict_page_offset.is_some(), "Dictionary offset is not set"); - file_offset = dict_page_offset.unwrap() + total_compressed_size; - // NOTE: This should be in sync with writing dictionary pages. - encodings.push(self.props.dictionary_page_encoding()); - encodings.push(self.props.dictionary_data_page_encoding()); - // Fallback to alternative encoding, add it to the list. - if self.dict_encoder.is_none() { - encodings.push(self.encoder.encoding()); - } - } else { - file_offset = data_page_offset + total_compressed_size; - encodings.push(self.encoder.encoding()); - } - // We use only RLE level encoding for data page v1 and data page v2. - encodings.push(Encoding::RLE); + let file_offset = match dict_page_offset { + Some(dict_offset) => dict_offset + total_compressed_size, + None => data_page_offset + total_compressed_size, + }; + + let statistics = Statistics::new( + self.min_column_value.clone(), + self.max_column_value.clone(), + self.column_distinct_count, + self.num_column_nulls, + false, + ); - let statistics = self.make_column_statistics(); let metadata = ColumnChunkMetaData::builder(self.descr.clone()) .set_compression(self.codec) - .set_encodings(encodings) + .set_encodings(self.encodings.iter().cloned().collect()) .set_file_offset(file_offset) .set_total_compressed_size(total_compressed_size) .set_total_uncompressed_size(total_uncompressed_size) @@ -891,6 +798,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { /// Writes compressed data page into underlying sink and updates global metrics. #[inline] fn write_data_page(&mut self, page: CompressedPage) -> Result<()> { + self.encodings.insert(page.encoding()); let page_spec = self.page_writer.write_page(page)?; // update offset index // compressed_size = header_size + compressed_data_size @@ -906,31 +814,29 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { #[inline] fn write_dictionary_page(&mut self) -> Result<()> { let compressed_page = { - let encoder = self - .dict_encoder - .as_ref() + let mut page = self + .encoder + .flush_dict_page()? .ok_or_else(|| general_err!("Dictionary encoder is not set"))?; - let is_sorted = encoder.is_sorted(); - let num_values = encoder.num_entries(); - let mut values_buf = encoder.write_dict()?; - let uncompressed_size = values_buf.len(); + let uncompressed_size = page.buf.len(); if let Some(ref mut cmpr) = self.compressor { let mut output_buf = Vec::with_capacity(uncompressed_size); - cmpr.compress(values_buf.data(), &mut output_buf)?; - values_buf = ByteBufferPtr::new(output_buf); + cmpr.compress(page.buf.data(), &mut output_buf)?; + page.buf = ByteBufferPtr::new(output_buf); } let dict_page = Page::DictionaryPage { - buf: values_buf, - num_values: num_values as u32, + buf: page.buf, + num_values: page.num_values as u32, encoding: self.props.dictionary_page_encoding(), - is_sorted, + is_sorted: page.is_sorted, }; CompressedPage::new(dict_page, uncompressed_size) }; + self.encodings.insert(compressed_page.encoding()); let page_spec = self.page_writer.write_page(compressed_page)?; self.update_metrics_for_page(page_spec); // For the directory page, don't need to update column/offset index. @@ -967,149 +873,89 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { fn get_page_writer_ref(&self) -> &dyn PageWriter { self.page_writer.as_ref() } +} - fn make_column_statistics(&self) -> Statistics { - self.make_typed_statistics(Level::Column) - } - - fn make_page_statistics(&self) -> Statistics { - self.make_typed_statistics(Level::Page) - } +fn update_min( + descr: &ColumnDescriptor, + val: &T, + min: &mut Option, +) { + update_stat::(val, min, |cur| compare_greater(descr, cur, val)) +} - pub fn make_typed_statistics(&self, level: Level) -> Statistics { - let (min, max, distinct, nulls) = match level { - Level::Page => ( - self.min_page_value.as_ref(), - self.max_page_value.as_ref(), - self.page_distinct_count, - self.num_page_nulls, - ), - Level::Column => ( - self.min_column_value.as_ref(), - self.max_column_value.as_ref(), - self.column_distinct_count, - self.num_column_nulls, - ), - }; - match self.descr.physical_type() { - Type::INT32 => gen_stats_section!(i32, int32, min, max, distinct, nulls), - Type::BOOLEAN => gen_stats_section!(bool, boolean, min, max, distinct, nulls), - Type::INT64 => gen_stats_section!(i64, int64, min, max, distinct, nulls), - Type::INT96 => gen_stats_section!(Int96, int96, min, max, distinct, nulls), - Type::FLOAT => gen_stats_section!(f32, float, min, max, distinct, nulls), - Type::DOUBLE => gen_stats_section!(f64, double, min, max, distinct, nulls), - Type::BYTE_ARRAY => { - let min = min.as_ref().map(|v| ByteArray::from(v.as_bytes().to_vec())); - let max = max.as_ref().map(|v| ByteArray::from(v.as_bytes().to_vec())); - Statistics::byte_array(min, max, distinct, nulls, false) - } - Type::FIXED_LEN_BYTE_ARRAY => { - let min = min - .as_ref() - .map(|v| ByteArray::from(v.as_bytes().to_vec())) - .map(|ba| { - let ba: FixedLenByteArray = ba.into(); - ba - }); - let max = max - .as_ref() - .map(|v| ByteArray::from(v.as_bytes().to_vec())) - .map(|ba| { - let ba: FixedLenByteArray = ba.into(); - ba - }); - Statistics::fixed_len_byte_array(min, max, distinct, nulls, false) - } - } - } +fn update_max( + descr: &ColumnDescriptor, + val: &T, + max: &mut Option, +) { + update_stat::(val, max, |cur| compare_greater(descr, val, cur)) +} - fn update_page_min_max(&mut self, val: &T::T) { - Self::update_min(&self.descr, val, &mut self.min_page_value); - Self::update_max(&self.descr, val, &mut self.max_page_value); +#[inline] +#[allow(clippy::eq_op)] +fn is_nan(val: &T) -> bool { + match T::PHYSICAL_TYPE { + Type::FLOAT | Type::DOUBLE => val != val, + _ => false, } +} - fn update_column_min_max(&mut self) { - let min = self.min_page_value.as_ref().unwrap(); - Self::update_min(&self.descr, min, &mut self.min_column_value); - - let max = self.max_page_value.as_ref().unwrap(); - Self::update_max(&self.descr, max, &mut self.max_column_value); - } +/// Perform a conditional update of `cur`, skipping any NaN values +/// +/// If `cur` is `None`, sets `cur` to `Some(val)`, otherwise calls `should_update` with +/// the value of `cur`, and updates `cur` to `Some(val)` if it returns `true` - fn update_min(descr: &ColumnDescriptor, val: &T::T, min: &mut Option) { - Self::update_stat(val, min, |cur| Self::compare_greater(descr, cur, val)) +fn update_stat(val: &T, cur: &mut Option, should_update: F) +where + F: Fn(&T) -> bool, +{ + if is_nan(val) { + return; } - fn update_max(descr: &ColumnDescriptor, val: &T::T, max: &mut Option) { - Self::update_stat(val, max, |cur| Self::compare_greater(descr, val, cur)) + if cur.as_ref().map_or(true, should_update) { + *cur = Some(val.clone()); } +} - /// Perform a conditional update of `cur`, skipping any NaN values - /// - /// If `cur` is `None`, sets `cur` to `Some(val)`, otherwise calls `should_update` with - /// the value of `cur`, and updates `cur` to `Some(val)` if it returns `true` - #[allow(clippy::eq_op)] - fn update_stat(val: &T::T, cur: &mut Option, should_update: F) - where - F: Fn(&T::T) -> bool, - { - if let Type::FLOAT | Type::DOUBLE = T::get_physical_type() { - // Skip NaN values - if val != val { - return; - } - } - - if cur.as_ref().map_or(true, should_update) { - *cur = Some(val.clone()); +/// Evaluate `a > b` according to underlying logical type. +fn compare_greater(descr: &ColumnDescriptor, a: &T, b: &T) -> bool { + if let Some(LogicalType::Integer { is_signed, .. }) = descr.logical_type() { + if !is_signed { + // need to compare unsigned + return a.as_u64().unwrap() > b.as_u64().unwrap(); } } - /// Evaluate `a > b` according to underlying logical type. - fn compare_greater(descr: &ColumnDescriptor, a: &T::T, b: &T::T) -> bool { - if let Some(LogicalType::Integer { is_signed, .. }) = descr.logical_type() { - if !is_signed { - // need to compare unsigned - return a.as_u64().unwrap() > b.as_u64().unwrap(); - } + match descr.converted_type() { + ConvertedType::UINT_8 + | ConvertedType::UINT_16 + | ConvertedType::UINT_32 + | ConvertedType::UINT_64 => { + return a.as_u64().unwrap() > b.as_u64().unwrap(); } + _ => {} + }; - match descr.converted_type() { - ConvertedType::UINT_8 - | ConvertedType::UINT_16 - | ConvertedType::UINT_32 - | ConvertedType::UINT_64 => { - return a.as_u64().unwrap() > b.as_u64().unwrap(); + if let Some(LogicalType::Decimal { .. }) = descr.logical_type() { + match T::PHYSICAL_TYPE { + Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => { + return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes()); } _ => {} }; + } - if let Some(LogicalType::Decimal { .. }) = descr.logical_type() { - match T::get_physical_type() { - Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => { - return compare_greater_byte_array_decimals( - a.as_bytes(), - b.as_bytes(), - ); - } - _ => {} - }; - } - - if descr.converted_type() == ConvertedType::DECIMAL { - match T::get_physical_type() { - Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => { - return compare_greater_byte_array_decimals( - a.as_bytes(), - b.as_bytes(), - ); - } - _ => {} - }; + if descr.converted_type() == ConvertedType::DECIMAL { + match T::PHYSICAL_TYPE { + Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => { + return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes()); + } + _ => {} }; + }; - a > b - } + a > b } // ---------------------------------------------------------------------- @@ -1280,16 +1126,16 @@ mod tests { } #[test] - #[should_panic(expected = "Dictionary offset is already set")] fn test_column_writer_write_only_one_dictionary_page() { let page_writer = get_test_page_writer(); let props = Arc::new(WriterProperties::builder().build()); let mut writer = get_test_column_writer::(page_writer, 0, 0, props); writer.write_batch(&[1, 2, 3, 4], None, None).unwrap(); // First page should be correctly written. - let res = writer.write_dictionary_page(); - assert!(res.is_ok()); + writer.add_data_page().unwrap(); writer.write_dictionary_page().unwrap(); + let err = writer.write_dictionary_page().unwrap_err().to_string(); + assert_eq!(err, "Parquet error: Dictionary encoder is not set"); } #[test] @@ -1302,14 +1148,8 @@ mod tests { ); let mut writer = get_test_column_writer::(page_writer, 0, 0, props); writer.write_batch(&[1, 2, 3, 4], None, None).unwrap(); - let res = writer.write_dictionary_page(); - assert!(res.is_err()); - if let Err(err) = res { - assert_eq!( - format!("{}", err), - "Parquet error: Dictionary encoder is not set" - ); - } + let err = writer.write_dictionary_page().unwrap_err().to_string(); + assert_eq!(err, "Parquet error: Dictionary encoder is not set"); } #[test] @@ -1356,14 +1196,14 @@ mod tests { true, &[true, false], None, - &[Encoding::RLE, Encoding::RLE], + &[Encoding::RLE], ); check_encoding_write_support::( WriterVersion::PARQUET_2_0, false, &[true, false], None, - &[Encoding::RLE, Encoding::RLE], + &[Encoding::RLE], ); } @@ -1374,7 +1214,7 @@ mod tests { true, &[1, 2], Some(0), - &[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE], + &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY], ); check_encoding_write_support::( WriterVersion::PARQUET_1_0, @@ -1388,14 +1228,14 @@ mod tests { true, &[1, 2], Some(0), - &[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE], + &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY], ); check_encoding_write_support::( WriterVersion::PARQUET_2_0, false, &[1, 2], None, - &[Encoding::DELTA_BINARY_PACKED, Encoding::RLE], + &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED], ); } @@ -1406,7 +1246,7 @@ mod tests { true, &[1, 2], Some(0), - &[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE], + &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY], ); check_encoding_write_support::( WriterVersion::PARQUET_1_0, @@ -1420,14 +1260,14 @@ mod tests { true, &[1, 2], Some(0), - &[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE], + &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY], ); check_encoding_write_support::( WriterVersion::PARQUET_2_0, false, &[1, 2], None, - &[Encoding::DELTA_BINARY_PACKED, Encoding::RLE], + &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED], ); } @@ -1438,7 +1278,7 @@ mod tests { true, &[Int96::from(vec![1, 2, 3])], Some(0), - &[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE], + &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY], ); check_encoding_write_support::( WriterVersion::PARQUET_1_0, @@ -1452,7 +1292,7 @@ mod tests { true, &[Int96::from(vec![1, 2, 3])], Some(0), - &[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE], + &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY], ); check_encoding_write_support::( WriterVersion::PARQUET_2_0, @@ -1470,7 +1310,7 @@ mod tests { true, &[1.0, 2.0], Some(0), - &[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE], + &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY], ); check_encoding_write_support::( WriterVersion::PARQUET_1_0, @@ -1484,7 +1324,7 @@ mod tests { true, &[1.0, 2.0], Some(0), - &[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE], + &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY], ); check_encoding_write_support::( WriterVersion::PARQUET_2_0, @@ -1502,7 +1342,7 @@ mod tests { true, &[1.0, 2.0], Some(0), - &[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE], + &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY], ); check_encoding_write_support::( WriterVersion::PARQUET_1_0, @@ -1516,7 +1356,7 @@ mod tests { true, &[1.0, 2.0], Some(0), - &[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE], + &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY], ); check_encoding_write_support::( WriterVersion::PARQUET_2_0, @@ -1534,7 +1374,7 @@ mod tests { true, &[ByteArray::from(vec![1u8])], Some(0), - &[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE], + &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY], ); check_encoding_write_support::( WriterVersion::PARQUET_1_0, @@ -1548,14 +1388,14 @@ mod tests { true, &[ByteArray::from(vec![1u8])], Some(0), - &[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE], + &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY], ); check_encoding_write_support::( WriterVersion::PARQUET_2_0, false, &[ByteArray::from(vec![1u8])], None, - &[Encoding::DELTA_BYTE_ARRAY, Encoding::RLE], + &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY], ); } @@ -1580,14 +1420,14 @@ mod tests { true, &[ByteArray::from(vec![1u8]).into()], Some(0), - &[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE], + &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY], ); check_encoding_write_support::( WriterVersion::PARQUET_2_0, false, &[ByteArray::from(vec![1u8]).into()], None, - &[Encoding::DELTA_BYTE_ARRAY, Encoding::RLE], + &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY], ); } @@ -1603,7 +1443,7 @@ mod tests { assert_eq!(rows_written, 4); assert_eq!( metadata.encodings(), - &vec![Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE] + &vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY] ); assert_eq!(metadata.num_values(), 8); // dictionary + value indexes assert_eq!(metadata.compressed_size(), 20); @@ -1728,7 +1568,7 @@ mod tests { assert_eq!(rows_written, 4); assert_eq!( metadata.encodings(), - &vec![Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE] + &vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY] ); assert_eq!(metadata.num_values(), 8); // dictionary + value indexes assert_eq!(metadata.compressed_size(), 20); @@ -1808,40 +1648,19 @@ mod tests { #[test] fn test_column_writer_non_nullable_values_roundtrip() { let props = WriterProperties::builder().build(); - column_roundtrip_random::( - props, - 1024, - std::i32::MIN, - std::i32::MAX, - 0, - 0, - ); + column_roundtrip_random::(props, 1024, i32::MIN, i32::MAX, 0, 0); } #[test] fn test_column_writer_nullable_non_repeated_values_roundtrip() { let props = WriterProperties::builder().build(); - column_roundtrip_random::( - props, - 1024, - std::i32::MIN, - std::i32::MAX, - 10, - 0, - ); + column_roundtrip_random::(props, 1024, i32::MIN, i32::MAX, 10, 0); } #[test] fn test_column_writer_nullable_repeated_values_roundtrip() { let props = WriterProperties::builder().build(); - column_roundtrip_random::( - props, - 1024, - std::i32::MIN, - std::i32::MAX, - 10, - 10, - ); + column_roundtrip_random::(props, 1024, i32::MIN, i32::MAX, 10, 10); } #[test] @@ -1850,14 +1669,7 @@ mod tests { .set_dictionary_pagesize_limit(32) .set_data_pagesize_limit(32) .build(); - column_roundtrip_random::( - props, - 1024, - std::i32::MIN, - std::i32::MAX, - 10, - 10, - ); + column_roundtrip_random::(props, 1024, i32::MIN, i32::MAX, 10, 10); } #[test] @@ -1865,14 +1677,7 @@ mod tests { for i in &[1usize, 2, 5, 10, 11, 1023] { let props = WriterProperties::builder().set_write_batch_size(*i).build(); - column_roundtrip_random::( - props, - 1024, - std::i32::MIN, - std::i32::MAX, - 10, - 10, - ); + column_roundtrip_random::(props, 1024, i32::MIN, i32::MAX, 10, 10); } } @@ -1882,14 +1687,7 @@ mod tests { .set_writer_version(WriterVersion::PARQUET_1_0) .set_dictionary_enabled(false) .build(); - column_roundtrip_random::( - props, - 1024, - std::i32::MIN, - std::i32::MAX, - 10, - 10, - ); + column_roundtrip_random::(props, 1024, i32::MIN, i32::MAX, 10, 10); } #[test] @@ -1898,14 +1696,7 @@ mod tests { .set_writer_version(WriterVersion::PARQUET_2_0) .set_dictionary_enabled(false) .build(); - column_roundtrip_random::( - props, - 1024, - std::i32::MIN, - std::i32::MAX, - 10, - 10, - ); + column_roundtrip_random::(props, 1024, i32::MIN, i32::MAX, 10, 10); } #[test] @@ -1914,14 +1705,7 @@ mod tests { .set_writer_version(WriterVersion::PARQUET_1_0) .set_compression(Compression::SNAPPY) .build(); - column_roundtrip_random::( - props, - 2048, - std::i32::MIN, - std::i32::MAX, - 10, - 10, - ); + column_roundtrip_random::(props, 2048, i32::MIN, i32::MAX, 10, 10); } #[test] @@ -1930,14 +1714,7 @@ mod tests { .set_writer_version(WriterVersion::PARQUET_2_0) .set_compression(Compression::SNAPPY) .build(); - column_roundtrip_random::( - props, - 2048, - std::i32::MIN, - std::i32::MAX, - 10, - 10, - ); + column_roundtrip_random::(props, 2048, i32::MIN, i32::MAX, 10, 10); } #[test] diff --git a/parquet/src/data_type.rs b/parquet/src/data_type.rs index 7b6fb04a74b..1d0b5b231c6 100644 --- a/parquet/src/data_type.rs +++ b/parquet/src/data_type.rs @@ -568,6 +568,7 @@ pub(crate) mod private { use crate::util::bit_util::{round_upto_power_of_2, BitReader, BitWriter}; use crate::util::memory::ByteBufferPtr; + use crate::basic::Type; use byteorder::ByteOrder; use std::convert::TryInto; @@ -581,18 +582,21 @@ pub(crate) mod private { /// crate, and thus hint to the type system (and end user) traits are public for the contract /// and not for extension. pub trait ParquetValueType: - std::cmp::PartialEq + PartialEq + std::fmt::Debug + std::fmt::Display - + std::default::Default - + std::clone::Clone + + Default + + Clone + super::AsBytes + super::FromBytes - + super::SliceAsBytes + + SliceAsBytes + PartialOrd + Send + crate::encodings::decoding::private::GetDecoder + + crate::file::statistics::private::MakeStatistics { + const PHYSICAL_TYPE: Type; + /// Encode the value directly from a higher level encoder fn encode( values: &[Self], @@ -646,6 +650,8 @@ pub(crate) mod private { } impl ParquetValueType for bool { + const PHYSICAL_TYPE: Type = Type::BOOLEAN; + #[inline] fn encode( values: &[Self], @@ -730,8 +736,10 @@ pub(crate) mod private { } macro_rules! impl_from_raw { - ($ty: ty, $self: ident => $as_i64: block) => { + ($ty: ty, $physical_ty: expr, $self: ident => $as_i64: block) => { impl ParquetValueType for $ty { + const PHYSICAL_TYPE: Type = $physical_ty; + #[inline] fn encode(values: &[Self], writer: &mut W, _: &mut BitWriter) -> Result<()> { let raw = unsafe { @@ -809,12 +817,14 @@ pub(crate) mod private { } } - impl_from_raw!(i32, self => { Ok(*self as i64) }); - impl_from_raw!(i64, self => { Ok(*self) }); - impl_from_raw!(f32, self => { Err(general_err!("Type cannot be converted to i64")) }); - impl_from_raw!(f64, self => { Err(general_err!("Type cannot be converted to i64")) }); + impl_from_raw!(i32, Type::INT32, self => { Ok(*self as i64) }); + impl_from_raw!(i64, Type::INT64, self => { Ok(*self) }); + impl_from_raw!(f32, Type::FLOAT, self => { Err(general_err!("Type cannot be converted to i64")) }); + impl_from_raw!(f64, Type::DOUBLE, self => { Err(general_err!("Type cannot be converted to i64")) }); impl ParquetValueType for super::Int96 { + const PHYSICAL_TYPE: Type = Type::INT96; + #[inline] fn encode( values: &[Self], @@ -925,6 +935,8 @@ pub(crate) mod private { } impl ParquetValueType for super::ByteArray { + const PHYSICAL_TYPE: Type = Type::BYTE_ARRAY; + #[inline] fn encode( values: &[Self], @@ -1016,6 +1028,8 @@ pub(crate) mod private { } impl ParquetValueType for super::FixedLenByteArray { + const PHYSICAL_TYPE: Type = Type::FIXED_LEN_BYTE_ARRAY; + #[inline] fn encode( values: &[Self], @@ -1113,7 +1127,9 @@ pub trait DataType: 'static + Send { type T: private::ParquetValueType; /// Returns Parquet physical type. - fn get_physical_type() -> Type; + fn get_physical_type() -> Type { + ::PHYSICAL_TYPE + } /// Returns size in bytes for Rust representation of the physical type. fn get_type_size() -> usize; @@ -1156,17 +1172,13 @@ where } macro_rules! make_type { - ($name:ident, $physical_ty:path, $reader_ident: ident, $writer_ident: ident, $native_ty:ty, $size:expr) => { + ($name:ident, $reader_ident: ident, $writer_ident: ident, $native_ty:ty, $size:expr) => { #[derive(Clone)] pub struct $name {} impl DataType for $name { type T = $native_ty; - fn get_physical_type() -> Type { - $physical_ty - } - fn get_type_size() -> usize { $size } @@ -1212,57 +1224,20 @@ macro_rules! make_type { // Generate struct definitions for all physical types -make_type!( - BoolType, - Type::BOOLEAN, - BoolColumnReader, - BoolColumnWriter, - bool, - 1 -); -make_type!( - Int32Type, - Type::INT32, - Int32ColumnReader, - Int32ColumnWriter, - i32, - 4 -); -make_type!( - Int64Type, - Type::INT64, - Int64ColumnReader, - Int64ColumnWriter, - i64, - 8 -); +make_type!(BoolType, BoolColumnReader, BoolColumnWriter, bool, 1); +make_type!(Int32Type, Int32ColumnReader, Int32ColumnWriter, i32, 4); +make_type!(Int64Type, Int64ColumnReader, Int64ColumnWriter, i64, 8); make_type!( Int96Type, - Type::INT96, Int96ColumnReader, Int96ColumnWriter, Int96, mem::size_of::() ); -make_type!( - FloatType, - Type::FLOAT, - FloatColumnReader, - FloatColumnWriter, - f32, - 4 -); -make_type!( - DoubleType, - Type::DOUBLE, - DoubleColumnReader, - DoubleColumnWriter, - f64, - 8 -); +make_type!(FloatType, FloatColumnReader, FloatColumnWriter, f32, 4); +make_type!(DoubleType, DoubleColumnReader, DoubleColumnWriter, f64, 8); make_type!( ByteArrayType, - Type::BYTE_ARRAY, ByteArrayColumnReader, ByteArrayColumnWriter, ByteArray, @@ -1270,7 +1245,6 @@ make_type!( ); make_type!( FixedLenByteArrayType, - Type::FIXED_LEN_BYTE_ARRAY, FixedLenByteArrayColumnReader, FixedLenByteArrayColumnWriter, FixedLenByteArray, diff --git a/parquet/src/file/statistics.rs b/parquet/src/file/statistics.rs index 40db3c1017f..5d1a01df8a6 100644 --- a/parquet/src/file/statistics.rs +++ b/parquet/src/file/statistics.rs @@ -37,15 +37,48 @@ //! } //! ``` -use std::{cmp, fmt}; +use std::fmt; use byteorder::{ByteOrder, LittleEndian}; use parquet_format::Statistics as TStatistics; use crate::basic::Type; +use crate::data_type::private::ParquetValueType; use crate::data_type::*; use crate::util::bit_util::from_ne_slice; +pub(crate) mod private { + use super::*; + + pub trait MakeStatistics { + fn make_statistics(statistics: ValueStatistics) -> Statistics + where + Self: Sized; + } + + macro_rules! gen_make_statistics { + ($value_ty:ty, $stat:ident) => { + impl MakeStatistics for $value_ty { + fn make_statistics(statistics: ValueStatistics) -> Statistics + where + Self: Sized, + { + Statistics::$stat(statistics) + } + } + }; + } + + gen_make_statistics!(bool, Boolean); + gen_make_statistics!(i32, Int32); + gen_make_statistics!(i64, Int64); + gen_make_statistics!(Int96, Int96); + gen_make_statistics!(f32, Float); + gen_make_statistics!(f64, Double); + gen_make_statistics!(ByteArray, ByteArray); + gen_make_statistics!(FixedLenByteArray, FixedLenByteArray); +} + // Macro to generate methods create Statistics. macro_rules! statistics_new_func { ($func:ident, $vtype:ty, $stat:ident) => { @@ -56,7 +89,7 @@ macro_rules! statistics_new_func { nulls: u64, is_deprecated: bool, ) -> Self { - Statistics::$stat(TypedStatistics::new( + Statistics::$stat(ValueStatistics::new( min, max, distinct, @@ -234,17 +267,39 @@ pub fn to_thrift(stats: Option<&Statistics>) -> Option { /// Statistics for a column chunk and data page. #[derive(Debug, Clone, PartialEq)] pub enum Statistics { - Boolean(TypedStatistics), - Int32(TypedStatistics), - Int64(TypedStatistics), - Int96(TypedStatistics), - Float(TypedStatistics), - Double(TypedStatistics), - ByteArray(TypedStatistics), - FixedLenByteArray(TypedStatistics), + Boolean(ValueStatistics), + Int32(ValueStatistics), + Int64(ValueStatistics), + Int96(ValueStatistics), + Float(ValueStatistics), + Double(ValueStatistics), + ByteArray(ValueStatistics), + FixedLenByteArray(ValueStatistics), +} + +impl From> for Statistics { + fn from(t: ValueStatistics) -> Self { + T::make_statistics(t) + } } impl Statistics { + pub fn new( + min: Option, + max: Option, + distinct_count: Option, + null_count: u64, + is_deprecated: bool, + ) -> Self { + Self::from(ValueStatistics::new( + min, + max, + distinct_count, + null_count, + is_deprecated, + )) + } + statistics_new_func![boolean, Option, Boolean]; statistics_new_func![int32, Option, Int32]; @@ -341,21 +396,24 @@ impl fmt::Display for Statistics { } /// Typed implementation for [`Statistics`]. -#[derive(Clone)] -pub struct TypedStatistics { - min: Option, - max: Option, +pub type TypedStatistics = ValueStatistics<::T>; + +/// Statistics for a particular `ParquetValueType` +#[derive(Clone, Eq, PartialEq)] +pub struct ValueStatistics { + min: Option, + max: Option, // Distinct count could be omitted in some cases distinct_count: Option, null_count: u64, is_min_max_deprecated: bool, } -impl TypedStatistics { +impl ValueStatistics { /// Creates new typed statistics. pub fn new( - min: Option, - max: Option, + min: Option, + max: Option, distinct_count: Option, null_count: u64, is_min_max_deprecated: bool, @@ -373,7 +431,7 @@ impl TypedStatistics { /// /// Panics if min value is not set, e.g. all values are `null`. /// Use `has_min_max_set` method to check that. - pub fn min(&self) -> &T::T { + pub fn min(&self) -> &T { self.min.as_ref().unwrap() } @@ -381,7 +439,7 @@ impl TypedStatistics { /// /// Panics if max value is not set, e.g. all values are `null`. /// Use `has_min_max_set` method to check that. - pub fn max(&self) -> &T::T { + pub fn max(&self) -> &T { self.max.as_ref().unwrap() } @@ -423,7 +481,7 @@ impl TypedStatistics { } } -impl fmt::Display for TypedStatistics { +impl fmt::Display for ValueStatistics { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "{{")?; write!(f, "min: ")?; @@ -447,7 +505,7 @@ impl fmt::Display for TypedStatistics { } } -impl fmt::Debug for TypedStatistics { +impl fmt::Debug for ValueStatistics { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!( f, @@ -462,16 +520,6 @@ impl fmt::Debug for TypedStatistics { } } -impl cmp::PartialEq for TypedStatistics { - fn eq(&self, other: &TypedStatistics) -> bool { - self.min == other.min - && self.max == other.max - && self.distinct_count == other.distinct_count - && self.null_count == other.null_count - && self.is_min_max_deprecated == other.is_min_max_deprecated - } -} - #[cfg(test)] mod tests { use super::*;