From 735f4ed09499d9d1b26d167ab98b30a8aabb35a8 Mon Sep 17 00:00:00 2001 From: liukun4515 Date: Tue, 21 Jun 2022 10:31:42 +0800 Subject: [PATCH] add column index and offset index --- parquet/src/arrow/arrow_writer/mod.rs | 4 +- parquet/src/column/writer.rs | 67 ++++++++++- parquet/src/file/metadata.rs | 159 +++++++++++++++++++++++++- parquet/src/file/serialized_reader.rs | 1 + parquet/src/file/writer.rs | 80 +++++++++++-- 5 files changed, 296 insertions(+), 15 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 83f1bc70b52..529c1ca056d 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -34,7 +34,7 @@ use super::schema::{ use crate::column::writer::ColumnWriter; use crate::errors::{ParquetError, Result}; -use crate::file::metadata::RowGroupMetaDataPtr; +use crate::file::metadata::RowGroupMetaData; use crate::file::properties::WriterProperties; use crate::file::writer::{SerializedColumnWriter, SerializedRowGroupWriter}; use crate::{data_type::*, file::writer::SerializedFileWriter}; @@ -98,7 +98,7 @@ impl ArrowWriter { } /// Returns metadata for any flushed row groups - pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] { + pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] { self.writer.flushed_row_groups() } diff --git a/parquet/src/column/writer.rs b/parquet/src/column/writer.rs index d80cafe0e0a..25b886b37e0 100644 --- a/parquet/src/column/writer.rs +++ b/parquet/src/column/writer.rs @@ -29,6 +29,7 @@ use crate::encodings::{ levels::{max_buffer_size, LevelEncoder}, }; use crate::errors::{ParquetError, Result}; +use crate::file::metadata::{ColumnIndexBuilder, OffsetIndexBuilder}; use crate::file::statistics::Statistics; use crate::file::{ metadata::ColumnChunkMetaData, @@ -198,6 +199,9 @@ pub struct ColumnWriterImpl<'a, T: DataType> { rep_levels_sink: Vec, data_pages: VecDeque, _phantom: PhantomData, + // column index and offset index + column_index_builder: ColumnIndexBuilder, + offset_index_builder: OffsetIndexBuilder, } impl<'a, T: DataType> ColumnWriterImpl<'a, T> { @@ -261,6 +265,8 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { num_column_nulls: 0, column_distinct_count: None, _phantom: PhantomData, + column_index_builder: ColumnIndexBuilder::new(), + offset_index_builder: OffsetIndexBuilder::new(), } } @@ -421,10 +427,19 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { self.write_dictionary_page()?; } self.flush_data_pages()?; - let metadata = self.write_column_metadata()?; + let mut metadata = self.write_column_metadata()?; self.dict_encoder = None; self.page_writer.close()?; + if self.column_index_builder.valid() { + // build the column and offset index + let column_index = self.column_index_builder.build_to_thrift(); + let offset_index = self.offset_index_builder.build_to_thrift(); + // set the column and offset index to the column metadata + metadata.set_column_index(Some(column_index)); + metadata.set_offset_index(Some(offset_index)); + } + Ok((self.total_bytes_written, self.total_rows_written, metadata)) } @@ -593,6 +608,42 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { Ok(()) } + /// Update the column index and offset index when adding the data page + fn update_column_offset_index(&mut self, page_statistics: &Option) { + // update the column index + let null_page = (self.num_buffered_rows as u64) == self.num_page_nulls; + // a page contains only null values, + // and writers have to set the corresponding entries in min_values and max_values to byte[0] + if null_page && self.column_index_builder.valid() { + self.column_index_builder.append( + null_page, + &[0; 1], + &[0; 1], + self.num_page_nulls as i64, + ); + } else if self.column_index_builder.valid() { + // from page statistics + // If can't get the page statistics, ignore this column/offset index for this column chunk + match &page_statistics { + None => { + self.column_index_builder.to_invalid(); + } + Some(stat) => { + self.column_index_builder.append( + null_page, + stat.min_bytes(), + stat.max_bytes(), + self.num_page_nulls as i64, + ); + } + } + } + + // update the offset index + self.offset_index_builder + .append_row_count(self.num_buffered_rows as i64); + } + /// Adds data page. /// Data page is either buffered in case of dictionary encoding or written directly. fn add_data_page(&mut self, calculate_page_stat: bool) -> Result<()> { @@ -622,6 +673,9 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { None }; + // update column and offset index + self.update_column_offset_index(&page_statistics); + let compressed_page = match self.props.writer_version() { WriterVersion::PARQUET_1_0 => { let mut buffer = vec![]; @@ -700,8 +754,8 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { buf: ByteBufferPtr::new(buffer), num_values: self.num_buffered_values, encoding, - num_nulls: self.num_buffered_values - - self.num_buffered_encoded_values, + // TODO: why not use the null_page_count + num_nulls: self.num_page_nulls as u32, num_rows: self.num_buffered_rows, def_levels_byte_len: def_levels_byte_len as u32, rep_levels_byte_len: rep_levels_byte_len as u32, @@ -830,6 +884,12 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { #[inline] fn write_data_page(&mut self, page: CompressedPage) -> Result<()> { let page_spec = self.page_writer.write_page(page)?; + // update offset index + // compressed_size = header_size + compressed_data_size + self.offset_index_builder.append_offset_and_size( + page_spec.offset as i64, + page_spec.compressed_size as i32, + ); self.update_metrics_for_page(page_spec); Ok(()) } @@ -865,6 +925,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { let page_spec = self.page_writer.write_page(compressed_page)?; self.update_metrics_for_page(page_spec); + // For the directory page, don't need to update column/offset index. Ok(()) } diff --git a/parquet/src/file/metadata.rs b/parquet/src/file/metadata.rs index a3477dd7577..fdaecceb900 100644 --- a/parquet/src/file/metadata.rs +++ b/parquet/src/file/metadata.rs @@ -35,7 +35,10 @@ use std::sync::Arc; -use parquet_format::{ColumnChunk, ColumnMetaData, PageLocation, RowGroup}; +use parquet_format::{ + BoundaryOrder, ColumnChunk, ColumnIndex, ColumnMetaData, OffsetIndex, PageLocation, + RowGroup, +}; use crate::basic::{ColumnOrder, Compression, Encoding, Type}; use crate::errors::{ParquetError, Result}; @@ -52,6 +55,7 @@ use crate::schema::types::{ pub struct ParquetMetaData { file_metadata: FileMetaData, row_groups: Vec, + // TODO: one index, just one page, one column chunk should have a list of page page_indexes: Option>, offset_indexes: Option>>, } @@ -247,6 +251,11 @@ impl RowGroupMetaData { &self.columns } + /// Returns mut slice of column chunk metadata + pub fn columns_mut(&mut self) -> &mut [ColumnChunkMetaData] { + &mut self.columns + } + /// Number of rows in this row group. pub fn num_rows(&self) -> i64 { self.num_rows @@ -386,6 +395,9 @@ pub struct ColumnChunkMetaData { offset_index_length: Option, column_index_offset: Option, column_index_length: Option, + // column index and offset index + column_index: Option, + offset_index: Option, } /// Represents common operations for a column chunk. @@ -519,6 +531,46 @@ impl ColumnChunkMetaData { self.offset_index_length } + /// Returns the column index + pub fn column_index(&self) -> &Option { + &self.column_index + } + + /// Returns the offset index + pub fn offset_index(&self) -> &Option { + &self.offset_index + } + + /// Set the column index for the column metadata + pub fn set_column_index(&mut self, column_index: Option) { + self.column_index = column_index; + } + + /// Set the offset index for the column metadata + pub fn set_offset_index(&mut self, offset_index: Option) { + self.offset_index = offset_index + } + + /// Set File offset of ColumnChunk's OffsetIndex + pub fn set_offset_index_offset(&mut self, offset_index_offset: i64) { + self.offset_index_offset = Some(offset_index_offset); + } + + /// Set Size of ColumnChunk's OffsetIndex, in bytes + pub fn set_offset_index_length(&mut self, offset_index_length: i32) { + self.offset_index_length = Some(offset_index_length); + } + + /// Set File offset of ColumnChunk's ColumnIndex + pub fn set_column_index_offset(&mut self, column_index_offset: i64) { + self.column_index_offset = Some(column_index_offset); + } + + /// Set Size of ColumnChunk's ColumnIndex, in bytes + pub fn set_column_index_length(&mut self, column_index_length: i32) { + self.column_index_length = Some(column_index_length); + } + /// Method to convert from Thrift. pub fn from_thrift(column_descr: ColumnDescPtr, cc: ColumnChunk) -> Result { if cc.meta_data.is_none() { @@ -573,6 +625,8 @@ impl ColumnChunkMetaData { offset_index_length, column_index_offset, column_index_length, + column_index: None, + offset_index: None, }; Ok(result) } @@ -785,10 +839,113 @@ impl ColumnChunkMetaDataBuilder { offset_index_length: self.offset_index_length, column_index_offset: self.column_index_offset, column_index_length: self.column_index_length, + column_index: None, + offset_index: None, }) } } +/// Builder for column index +pub struct ColumnIndexBuilder { + null_pages: Vec, + min_values: Vec>, + max_values: Vec>, + // TODO: calc the order for all pages in this column + boundary_order: BoundaryOrder, + null_counts: Vec, + // If one page can't get build index, need to ignore all index in this column + valid: bool, +} + +impl ColumnIndexBuilder { + pub fn new() -> Self { + ColumnIndexBuilder { + null_pages: Vec::new(), + min_values: Vec::new(), + max_values: Vec::new(), + boundary_order: BoundaryOrder::Unordered, + null_counts: Vec::new(), + valid: true, + } + } + + pub fn append( + &mut self, + null_page: bool, + min_value: &[u8], + max_value: &[u8], + null_count: i64, + ) { + self.null_pages.push(null_page); + self.min_values.push(min_value.to_vec()); + self.max_values.push(max_value.to_vec()); + self.null_counts.push(null_count); + } + + pub fn to_invalid(&mut self) { + self.valid = false; + } + + pub fn valid(&self) -> bool { + self.valid + } + + /// Build and get the thrift metadata of column index + pub fn build_to_thrift(self) -> ColumnIndex { + ColumnIndex::new( + self.null_pages, + self.min_values, + self.max_values, + self.boundary_order, + self.null_counts, + ) + } +} + +/// Builder for offset index +pub struct OffsetIndexBuilder { + offset_array: Vec, + compressed_page_size_array: Vec, + first_row_index_array: Vec, + current_first_row_index: i64, +} + +impl OffsetIndexBuilder { + pub fn new() -> Self { + OffsetIndexBuilder { + offset_array: Vec::new(), + compressed_page_size_array: Vec::new(), + first_row_index_array: Vec::new(), + current_first_row_index: 0, + } + } + + pub fn append_row_count(&mut self, row_count: i64) { + let current_page_row_index = self.current_first_row_index; + self.first_row_index_array.push(current_page_row_index); + self.current_first_row_index += row_count; + } + + pub fn append_offset_and_size(&mut self, offset: i64, compressed_page_size: i32) { + self.offset_array.push(offset); + self.compressed_page_size_array.push(compressed_page_size); + } + + /// Build and get the thrift metadata of offset index + pub fn build_to_thrift(self) -> OffsetIndex { + let locations = self + .offset_array + .iter() + .zip(self.compressed_page_size_array.iter()) + .zip(self.first_row_index_array.iter()) + .map(|((offset, size), row_index)| { + PageLocation::new(*offset, *size, *row_index) + }) + .collect::>(); + OffsetIndex::new(locations) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 6ff73e041e8..cadbe6d9971 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -250,6 +250,7 @@ impl SerializedFileReader { if options.enable_page_index { //Todo for now test data `data_index_bloom_encoding_stats.parquet` only have one rowgroup //support multi after create multi-RG test data. + // TODO: just read the first row group, this is some error code let cols = metadata.row_group(0); let columns_indexes = index_reader::read_columns_indexes(&chunk_reader, cols.columns())?; diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index 0a8fc331e7e..8c928197706 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -22,6 +22,7 @@ use std::{io::Write, sync::Arc}; use byteorder::{ByteOrder, LittleEndian}; use parquet_format as parquet; +use parquet_format::{ColumnIndex, OffsetIndex}; use thrift::protocol::{TCompactOutputProtocol, TOutputProtocol}; use crate::basic::PageType; @@ -85,7 +86,7 @@ pub type OnCloseColumnChunk<'a> = /// Callback invoked on closing a row group, arguments are: /// /// - the row group metadata -pub type OnCloseRowGroup<'a> = Box Result<()> + 'a>; +pub type OnCloseRowGroup<'a> = Box Result<()> + 'a>; #[deprecated = "use std::io::Write"] pub trait ParquetWriter: Write + std::io::Seek + TryClone {} @@ -109,8 +110,10 @@ pub struct SerializedFileWriter { schema: TypePtr, descr: SchemaDescPtr, props: WriterPropertiesPtr, - row_groups: Vec, + row_groups: Vec, row_group_index: usize, + column_indexes: Vec>, + offset_indexes: Vec>, } impl SerializedFileWriter { @@ -125,6 +128,8 @@ impl SerializedFileWriter { props: properties, row_groups: vec![], row_group_index: 0, + column_indexes: Vec::new(), + offset_indexes: Vec::new(), }) } @@ -154,8 +159,8 @@ impl SerializedFileWriter { } /// Returns metadata for any flushed row groups - pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] { - &self.row_groups + pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] { + self.row_groups.as_slice() } /// Closes and finalises file writer, returning the file metadata. @@ -177,6 +182,58 @@ impl SerializedFileWriter { Ok(()) } + /// Serialize all the offset index to the file + fn write_offset_indexes(&mut self) -> Result<()> { + // iter row group + // iter each column + // write offset index to the file + for row_group in &mut self.row_groups { + for column_metdata in row_group.columns_mut() { + match column_metdata.offset_index() { + Some(offset_index) => { + let start_offset = self.buf.bytes_written(); + let mut protocol = TCompactOutputProtocol::new(&mut self.buf); + offset_index.write_to_out_protocol(&mut protocol)?; + protocol.flush()?; + let end_offset = self.buf.bytes_written(); + // set offset and index for offset index + column_metdata.set_offset_index_offset(start_offset as i64); + column_metdata + .set_offset_index_length((end_offset - start_offset) as i32); + } + None => {} + } + } + } + Ok(()) + } + + /// Serialize all the column index to the file + fn write_column_indexes(&mut self) -> Result<()> { + // iter row group + // iter each column + // write column index to the file + for row_group in &mut self.row_groups { + for column_metdata in row_group.columns_mut() { + match column_metdata.column_index() { + Some(column_index) => { + let start_offset = self.buf.bytes_written(); + let mut protocol = TCompactOutputProtocol::new(&mut self.buf); + column_index.write_to_out_protocol(&mut protocol)?; + protocol.flush()?; + let end_offset = self.buf.bytes_written(); + // set offset and length for column index + column_metdata.set_column_index_offset(start_offset as i64); + column_metdata + .set_column_index_length((end_offset - start_offset) as i32); + } + None => {} + } + } + } + Ok(()) + } + /// Assembles and writes metadata at the end of the file. fn write_metadata(&mut self) -> Result { let num_rows = self.row_groups.iter().map(|x| x.num_rows()).sum(); @@ -200,6 +257,10 @@ impl SerializedFileWriter { footer_signing_key_metadata: None, }; + // Write column indexes and offset indexes + self.write_column_indexes()?; + self.write_offset_indexes()?; + // Write file metadata let start_pos = self.buf.bytes_written(); { @@ -339,11 +400,11 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> { .set_num_rows(self.total_rows_written.unwrap_or(0) as i64) .build()?; - let metadata = Arc::new(row_group_metadata); - self.row_group_metadata = Some(metadata.clone()); + let clone_row_group_metadata = row_group_metadata.clone(); + self.row_group_metadata = Some(Arc::new(row_group_metadata)); if let Some(on_close) = self.on_close.take() { - on_close(metadata)? + on_close(clone_row_group_metadata)? } } @@ -448,6 +509,7 @@ impl<'a, W: Write> SerializedPageWriter<'a, W> { impl<'a, W: Write> PageWriter for SerializedPageWriter<'a, W> { fn write_page(&mut self, page: CompressedPage) -> Result { + // write page to the disk let uncompressed_size = page.uncompressed_size(); let compressed_size = page.compressed_size(); let num_values = page.num_values(); @@ -466,6 +528,7 @@ impl<'a, W: Write> PageWriter for SerializedPageWriter<'a, W> { data_page_header_v2: None, }; + // TODO column index match *page.compressed_page() { Page::DataPage { def_level_encoding, @@ -531,7 +594,6 @@ impl<'a, W: Write> PageWriter for SerializedPageWriter<'a, W> { Ok(spec) } - fn write_metadata(&mut self, metadata: &ColumnChunkMetaData) -> Result<()> { self.serialize_column_chunk(metadata.to_thrift()) } @@ -1017,7 +1079,7 @@ mod tests { let last_group = row_group_writer.close().unwrap(); let flushed = file_writer.flushed_row_groups(); assert_eq!(flushed.len(), idx + 1); - assert_eq!(flushed[idx].as_ref(), last_group.as_ref()); + assert_eq!(&flushed[idx], last_group.as_ref()); } file_writer.close().unwrap();