diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 83f1bc70b52..529c1ca056d 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -34,7 +34,7 @@ use super::schema::{ use crate::column::writer::ColumnWriter; use crate::errors::{ParquetError, Result}; -use crate::file::metadata::RowGroupMetaDataPtr; +use crate::file::metadata::RowGroupMetaData; use crate::file::properties::WriterProperties; use crate::file::writer::{SerializedColumnWriter, SerializedRowGroupWriter}; use crate::{data_type::*, file::writer::SerializedFileWriter}; @@ -98,7 +98,7 @@ impl ArrowWriter { } /// Returns metadata for any flushed row groups - pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] { + pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] { self.writer.flushed_row_groups() } diff --git a/parquet/src/column/writer.rs b/parquet/src/column/writer.rs index d80cafe0e0a..0ae69915030 100644 --- a/parquet/src/column/writer.rs +++ b/parquet/src/column/writer.rs @@ -29,6 +29,7 @@ use crate::encodings::{ levels::{max_buffer_size, LevelEncoder}, }; use crate::errors::{ParquetError, Result}; +use crate::file::metadata::{ColumnIndexBuilder, OffsetIndexBuilder}; use crate::file::statistics::Statistics; use crate::file::{ metadata::ColumnChunkMetaData, @@ -198,6 +199,9 @@ pub struct ColumnWriterImpl<'a, T: DataType> { rep_levels_sink: Vec, data_pages: VecDeque, _phantom: PhantomData, + // column index and offset index + column_index_builder: ColumnIndexBuilder, + offset_index_builder: OffsetIndexBuilder, } impl<'a, T: DataType> ColumnWriterImpl<'a, T> { @@ -261,6 +265,8 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { num_column_nulls: 0, column_distinct_count: None, _phantom: PhantomData, + column_index_builder: ColumnIndexBuilder::new(), + offset_index_builder: OffsetIndexBuilder::new(), } } @@ -421,10 +427,19 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { self.write_dictionary_page()?; } self.flush_data_pages()?; - let metadata = self.write_column_metadata()?; + let mut metadata = self.write_column_metadata()?; self.dict_encoder = None; self.page_writer.close()?; + if self.column_index_builder.valid() { + // build the column and offset index + let column_index = self.column_index_builder.build_to_thrift(); + let offset_index = self.offset_index_builder.build_to_thrift(); + // set the column and offset index to the column metadata + metadata.set_column_index(Some(column_index)); + metadata.set_offset_index(Some(offset_index)); + } + Ok((self.total_bytes_written, self.total_rows_written, metadata)) } @@ -593,6 +608,42 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { Ok(()) } + /// Update the column index and offset index when adding the data page + fn update_column_offset_index(&mut self, page_statistics: &Option) { + // update the column index + let null_page = (self.num_buffered_rows as u64) == self.num_page_nulls; + // a page contains only null values, + // and writers have to set the corresponding entries in min_values and max_values to byte[0] + if null_page && self.column_index_builder.valid() { + self.column_index_builder.append( + null_page, + &[0; 1], + &[0; 1], + self.num_page_nulls as i64, + ); + } else if self.column_index_builder.valid() { + // from page statistics + // If can't get the page statistics, ignore this column/offset index for this column chunk + match &page_statistics { + None => { + self.column_index_builder.to_invalid(); + } + Some(stat) => { + self.column_index_builder.append( + null_page, + stat.min_bytes(), + stat.max_bytes(), + self.num_page_nulls as i64, + ); + } + } + } + + // update the offset index + self.offset_index_builder + .append_row_count(self.num_buffered_rows as i64); + } + /// Adds data page. /// Data page is either buffered in case of dictionary encoding or written directly. fn add_data_page(&mut self, calculate_page_stat: bool) -> Result<()> { @@ -622,6 +673,9 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { None }; + // update column and offset index + self.update_column_offset_index(&page_statistics); + let compressed_page = match self.props.writer_version() { WriterVersion::PARQUET_1_0 => { let mut buffer = vec![]; @@ -700,8 +754,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { buf: ByteBufferPtr::new(buffer), num_values: self.num_buffered_values, encoding, - num_nulls: self.num_buffered_values - - self.num_buffered_encoded_values, + num_nulls: self.num_page_nulls as u32, num_rows: self.num_buffered_rows, def_levels_byte_len: def_levels_byte_len as u32, rep_levels_byte_len: rep_levels_byte_len as u32, @@ -830,6 +883,12 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { #[inline] fn write_data_page(&mut self, page: CompressedPage) -> Result<()> { let page_spec = self.page_writer.write_page(page)?; + // update offset index + // compressed_size = header_size + compressed_data_size + self.offset_index_builder.append_offset_and_size( + page_spec.offset as i64, + page_spec.compressed_size as i32, + ); self.update_metrics_for_page(page_spec); Ok(()) } @@ -865,6 +924,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { let page_spec = self.page_writer.write_page(compressed_page)?; self.update_metrics_for_page(page_spec); + // For the directory page, don't need to update column/offset index. Ok(()) } @@ -1133,6 +1193,7 @@ fn compare_greater_byte_array_decimals(a: &[u8], b: &[u8]) -> bool { #[cfg(test)] mod tests { + use parquet_format::BoundaryOrder; use rand::distributions::uniform::SampleUniform; use std::sync::Arc; @@ -2068,6 +2129,80 @@ mod tests { ),); } + #[test] + fn test_column_offset_index_metadata() { + // write data + // and check the offset index and column index + let page_writer = get_test_page_writer(); + let props = Arc::new(WriterProperties::builder().build()); + let mut writer = get_test_column_writer::(page_writer, 0, 0, props); + writer.write_batch(&[1, 2, 3, 4], None, None).unwrap(); + // first page + writer.flush_data_pages().unwrap(); + // second page + writer.write_batch(&[4, 8, 2, -5], None, None).unwrap(); + + let (_, rows_written, metadata) = writer.close().unwrap(); + let column_index = match metadata.column_index() { + None => { + panic!("Can't fine the column index"); + } + Some(column_index) => column_index, + }; + let offset_index = match metadata.offset_index() { + None => { + panic!("Can't find the offset index"); + } + Some(offset_index) => offset_index, + }; + + assert_eq!(8, rows_written); + + // column index + assert_eq!(2, column_index.null_pages.len()); + assert_eq!(2, offset_index.page_locations.len()); + assert_eq!(BoundaryOrder::Unordered, column_index.boundary_order); + for idx in 0..2 { + assert_eq!(&false, column_index.null_pages.get(idx).unwrap()); + assert_eq!( + &0, + column_index.null_counts.as_ref().unwrap().get(idx).unwrap() + ); + } + + if let Some(stats) = metadata.statistics() { + assert!(stats.has_min_max_set()); + assert_eq!(stats.null_count(), 0); + assert_eq!(stats.distinct_count(), None); + if let Statistics::Int32(stats) = stats { + // first page is [1,2,3,4] + // second page is [-5,2,4,8] + assert_eq!( + stats.min_bytes(), + column_index.min_values.get(1).unwrap().as_slice() + ); + assert_eq!( + stats.max_bytes(), + column_index.max_values.get(1).unwrap().as_slice() + ); + } else { + panic!("expecting Statistics::Int32"); + } + } else { + panic!("metadata missing statistics"); + } + + // page location + assert_eq!( + 0, + offset_index.page_locations.get(0).unwrap().first_row_index + ); + assert_eq!( + 4, + offset_index.page_locations.get(1).unwrap().first_row_index + ); + } + /// Performs write-read roundtrip with randomly generated values and levels. /// `max_size` is maximum number of values or levels (if `max_def_level` > 0) to write /// for a column. diff --git a/parquet/src/file/metadata.rs b/parquet/src/file/metadata.rs index a3477dd7577..47dbcabb49f 100644 --- a/parquet/src/file/metadata.rs +++ b/parquet/src/file/metadata.rs @@ -35,7 +35,10 @@ use std::sync::Arc; -use parquet_format::{ColumnChunk, ColumnMetaData, PageLocation, RowGroup}; +use parquet_format::{ + BoundaryOrder, ColumnChunk, ColumnIndex, ColumnMetaData, OffsetIndex, PageLocation, + RowGroup, +}; use crate::basic::{ColumnOrder, Compression, Encoding, Type}; use crate::errors::{ParquetError, Result}; @@ -247,6 +250,11 @@ impl RowGroupMetaData { &self.columns } + /// Returns mut slice of column chunk metadata + pub fn columns_mut(&mut self) -> &mut [ColumnChunkMetaData] { + &mut self.columns + } + /// Number of rows in this row group. pub fn num_rows(&self) -> i64 { self.num_rows @@ -386,6 +394,9 @@ pub struct ColumnChunkMetaData { offset_index_length: Option, column_index_offset: Option, column_index_length: Option, + // column index and offset index + column_index: Option, + offset_index: Option, } /// Represents common operations for a column chunk. @@ -519,6 +530,46 @@ impl ColumnChunkMetaData { self.offset_index_length } + /// Returns the column index + pub fn column_index(&self) -> &Option { + &self.column_index + } + + /// Returns the offset index + pub fn offset_index(&self) -> &Option { + &self.offset_index + } + + /// Set the column index for the column metadata + pub fn set_column_index(&mut self, column_index: Option) { + self.column_index = column_index; + } + + /// Set the offset index for the column metadata + pub fn set_offset_index(&mut self, offset_index: Option) { + self.offset_index = offset_index + } + + /// Set File offset of ColumnChunk's OffsetIndex + pub fn set_offset_index_offset(&mut self, offset_index_offset: i64) { + self.offset_index_offset = Some(offset_index_offset); + } + + /// Set Size of ColumnChunk's OffsetIndex, in bytes + pub fn set_offset_index_length(&mut self, offset_index_length: i32) { + self.offset_index_length = Some(offset_index_length); + } + + /// Set File offset of ColumnChunk's ColumnIndex + pub fn set_column_index_offset(&mut self, column_index_offset: i64) { + self.column_index_offset = Some(column_index_offset); + } + + /// Set Size of ColumnChunk's ColumnIndex, in bytes + pub fn set_column_index_length(&mut self, column_index_length: i32) { + self.column_index_length = Some(column_index_length); + } + /// Method to convert from Thrift. pub fn from_thrift(column_descr: ColumnDescPtr, cc: ColumnChunk) -> Result { if cc.meta_data.is_none() { @@ -573,6 +624,8 @@ impl ColumnChunkMetaData { offset_index_length, column_index_offset, column_index_length, + column_index: None, + offset_index: None, }; Ok(result) } @@ -785,10 +838,113 @@ impl ColumnChunkMetaDataBuilder { offset_index_length: self.offset_index_length, column_index_offset: self.column_index_offset, column_index_length: self.column_index_length, + column_index: None, + offset_index: None, }) } } +/// Builder for column index +pub struct ColumnIndexBuilder { + null_pages: Vec, + min_values: Vec>, + max_values: Vec>, + // TODO: calc the order for all pages in this column + boundary_order: BoundaryOrder, + null_counts: Vec, + // If one page can't get build index, need to ignore all index in this column + valid: bool, +} + +impl ColumnIndexBuilder { + pub fn new() -> Self { + ColumnIndexBuilder { + null_pages: Vec::new(), + min_values: Vec::new(), + max_values: Vec::new(), + boundary_order: BoundaryOrder::Unordered, + null_counts: Vec::new(), + valid: true, + } + } + + pub fn append( + &mut self, + null_page: bool, + min_value: &[u8], + max_value: &[u8], + null_count: i64, + ) { + self.null_pages.push(null_page); + self.min_values.push(min_value.to_vec()); + self.max_values.push(max_value.to_vec()); + self.null_counts.push(null_count); + } + + pub fn to_invalid(&mut self) { + self.valid = false; + } + + pub fn valid(&self) -> bool { + self.valid + } + + /// Build and get the thrift metadata of column index + pub fn build_to_thrift(self) -> ColumnIndex { + ColumnIndex::new( + self.null_pages, + self.min_values, + self.max_values, + self.boundary_order, + self.null_counts, + ) + } +} + +/// Builder for offset index +pub struct OffsetIndexBuilder { + offset_array: Vec, + compressed_page_size_array: Vec, + first_row_index_array: Vec, + current_first_row_index: i64, +} + +impl OffsetIndexBuilder { + pub fn new() -> Self { + OffsetIndexBuilder { + offset_array: Vec::new(), + compressed_page_size_array: Vec::new(), + first_row_index_array: Vec::new(), + current_first_row_index: 0, + } + } + + pub fn append_row_count(&mut self, row_count: i64) { + let current_page_row_index = self.current_first_row_index; + self.first_row_index_array.push(current_page_row_index); + self.current_first_row_index += row_count; + } + + pub fn append_offset_and_size(&mut self, offset: i64, compressed_page_size: i32) { + self.offset_array.push(offset); + self.compressed_page_size_array.push(compressed_page_size); + } + + /// Build and get the thrift metadata of offset index + pub fn build_to_thrift(self) -> OffsetIndex { + let locations = self + .offset_array + .iter() + .zip(self.compressed_page_size_array.iter()) + .zip(self.first_row_index_array.iter()) + .map(|((offset, size), row_index)| { + PageLocation::new(*offset, *size, *row_index) + }) + .collect::>(); + OffsetIndex::new(locations) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index 0a8fc331e7e..1a3968dfc29 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -22,6 +22,7 @@ use std::{io::Write, sync::Arc}; use byteorder::{ByteOrder, LittleEndian}; use parquet_format as parquet; +use parquet_format::{ColumnIndex, OffsetIndex}; use thrift::protocol::{TCompactOutputProtocol, TOutputProtocol}; use crate::basic::PageType; @@ -85,7 +86,7 @@ pub type OnCloseColumnChunk<'a> = /// Callback invoked on closing a row group, arguments are: /// /// - the row group metadata -pub type OnCloseRowGroup<'a> = Box Result<()> + 'a>; +pub type OnCloseRowGroup<'a> = Box Result<()> + 'a>; #[deprecated = "use std::io::Write"] pub trait ParquetWriter: Write + std::io::Seek + TryClone {} @@ -109,8 +110,10 @@ pub struct SerializedFileWriter { schema: TypePtr, descr: SchemaDescPtr, props: WriterPropertiesPtr, - row_groups: Vec, + row_groups: Vec, row_group_index: usize, + column_indexes: Vec>, + offset_indexes: Vec>, } impl SerializedFileWriter { @@ -125,6 +128,8 @@ impl SerializedFileWriter { props: properties, row_groups: vec![], row_group_index: 0, + column_indexes: Vec::new(), + offset_indexes: Vec::new(), }) } @@ -154,8 +159,8 @@ impl SerializedFileWriter { } /// Returns metadata for any flushed row groups - pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] { - &self.row_groups + pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] { + self.row_groups.as_slice() } /// Closes and finalises file writer, returning the file metadata. @@ -177,10 +182,66 @@ impl SerializedFileWriter { Ok(()) } + /// Serialize all the offset index to the file + fn write_offset_indexes(&mut self) -> Result<()> { + // iter row group + // iter each column + // write offset index to the file + for row_group in &mut self.row_groups { + for column_metdata in row_group.columns_mut() { + match column_metdata.offset_index() { + Some(offset_index) => { + let start_offset = self.buf.bytes_written(); + let mut protocol = TCompactOutputProtocol::new(&mut self.buf); + offset_index.write_to_out_protocol(&mut protocol)?; + protocol.flush()?; + let end_offset = self.buf.bytes_written(); + // set offset and index for offset index + column_metdata.set_offset_index_offset(start_offset as i64); + column_metdata + .set_offset_index_length((end_offset - start_offset) as i32); + } + None => {} + } + } + } + Ok(()) + } + + /// Serialize all the column index to the file + fn write_column_indexes(&mut self) -> Result<()> { + // iter row group + // iter each column + // write column index to the file + for row_group in &mut self.row_groups { + for column_metdata in row_group.columns_mut() { + match column_metdata.column_index() { + Some(column_index) => { + let start_offset = self.buf.bytes_written(); + let mut protocol = TCompactOutputProtocol::new(&mut self.buf); + column_index.write_to_out_protocol(&mut protocol)?; + protocol.flush()?; + let end_offset = self.buf.bytes_written(); + // set offset and length for column index + column_metdata.set_column_index_offset(start_offset as i64); + column_metdata + .set_column_index_length((end_offset - start_offset) as i32); + } + None => {} + } + } + } + Ok(()) + } + /// Assembles and writes metadata at the end of the file. fn write_metadata(&mut self) -> Result { let num_rows = self.row_groups.iter().map(|x| x.num_rows()).sum(); + // Write column indexes and offset indexes + self.write_column_indexes()?; + self.write_offset_indexes()?; + let row_groups = self .row_groups .as_slice() @@ -339,11 +400,11 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> { .set_num_rows(self.total_rows_written.unwrap_or(0) as i64) .build()?; - let metadata = Arc::new(row_group_metadata); - self.row_group_metadata = Some(metadata.clone()); + let clone_row_group_metadata = row_group_metadata.clone(); + self.row_group_metadata = Some(Arc::new(row_group_metadata)); if let Some(on_close) = self.on_close.take() { - on_close(metadata)? + on_close(clone_row_group_metadata)? } } @@ -531,7 +592,6 @@ impl<'a, W: Write> PageWriter for SerializedPageWriter<'a, W> { Ok(spec) } - fn write_metadata(&mut self, metadata: &ColumnChunkMetaData) -> Result<()> { self.serialize_column_chunk(metadata.to_thrift()) } @@ -987,7 +1047,10 @@ mod tests { /// File write-read roundtrip. /// `data` consists of arrays of values for each row group. - fn test_file_roundtrip(file: File, data: Vec>) { + fn test_file_roundtrip( + file: File, + data: Vec>, + ) -> parquet_format::FileMetaData { let schema = Arc::new( types::Type::group_type_builder("schema") .with_fields(&mut vec![Arc::new( @@ -1017,9 +1080,9 @@ mod tests { let last_group = row_group_writer.close().unwrap(); let flushed = file_writer.flushed_row_groups(); assert_eq!(flushed.len(), idx + 1); - assert_eq!(flushed[idx].as_ref(), last_group.as_ref()); + assert_eq!(&flushed[idx], last_group.as_ref()); } - file_writer.close().unwrap(); + let file_metadata = file_writer.close().unwrap(); let reader = assert_send(SerializedFileReader::new(file).unwrap()); assert_eq!(reader.num_row_groups(), data.len()); @@ -1036,6 +1099,7 @@ mod tests { .collect::>(); assert_eq!(res, *item); } + file_metadata } fn assert_send(t: T) -> T { @@ -1116,4 +1180,19 @@ mod tests { assert_eq!(res, *item); } } + + #[test] + fn test_column_offset_index_file() { + let file = tempfile::tempfile().unwrap(); + let file_metadata = test_file_roundtrip(file, vec![vec![1, 2, 3, 4, 5]]); + file_metadata.row_groups.iter().for_each(|row_group| { + row_group.columns.iter().for_each(|column_chunk| { + assert_ne!(None, column_chunk.column_index_offset); + assert_ne!(None, column_chunk.column_index_length); + + assert_ne!(None, column_chunk.offset_index_offset); + assert_ne!(None, column_chunk.offset_index_length); + }) + }); + } }