diff --git a/parquet/src/column/writer.rs b/parquet/src/column/writer.rs index d80cafe0e0a..d589aef5a50 100644 --- a/parquet/src/column/writer.rs +++ b/parquet/src/column/writer.rs @@ -16,6 +16,7 @@ // under the License. //! Contains column writer API. +use parquet_format::{ColumnIndex, OffsetIndex}; use std::{cmp, collections::VecDeque, convert::TryFrom, marker::PhantomData}; use crate::basic::{Compression, ConvertedType, Encoding, LogicalType, PageType, Type}; @@ -29,6 +30,7 @@ use crate::encodings::{ levels::{max_buffer_size, LevelEncoder}, }; use crate::errors::{ParquetError, Result}; +use crate::file::metadata::{ColumnIndexBuilder, OffsetIndexBuilder}; use crate::file::statistics::Statistics; use crate::file::{ metadata::ColumnChunkMetaData, @@ -162,6 +164,14 @@ pub fn get_typed_column_writer_mut<'a, 'b: 'a, T: DataType>( }) } +type ColumnCloseResult = ( + u64, + u64, + ColumnChunkMetaData, + Option, + Option, +); + /// Typed column writer for a primitive column. pub struct ColumnWriterImpl<'a, T: DataType> { // Column writer properties @@ -198,6 +208,9 @@ pub struct ColumnWriterImpl<'a, T: DataType> { rep_levels_sink: Vec, data_pages: VecDeque, _phantom: PhantomData, + // column index and offset index + column_index_builder: ColumnIndexBuilder, + offset_index_builder: OffsetIndexBuilder, } impl<'a, T: DataType> ColumnWriterImpl<'a, T> { @@ -261,6 +274,8 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { num_column_nulls: 0, column_distinct_count: None, _phantom: PhantomData, + column_index_builder: ColumnIndexBuilder::new(), + offset_index_builder: OffsetIndexBuilder::new(), } } @@ -416,7 +431,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { /// Finalises writes and closes the column writer. /// Returns total bytes written, total rows written and column chunk metadata. - pub fn close(mut self) -> Result<(u64, u64, ColumnChunkMetaData)> { + pub fn close(mut self) -> Result { if self.dict_encoder.is_some() { self.write_dictionary_page()?; } @@ -425,7 +440,22 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { self.dict_encoder = None; self.page_writer.close()?; - Ok((self.total_bytes_written, self.total_rows_written, metadata)) + let (column_index, offset_index) = if self.column_index_builder.valid() { + // build the column and offset index + let column_index = self.column_index_builder.build_to_thrift(); + let offset_index = self.offset_index_builder.build_to_thrift(); + (Some(column_index), Some(offset_index)) + } else { + (None, None) + }; + + Ok(( + self.total_bytes_written, + self.total_rows_written, + metadata, + column_index, + offset_index, + )) } /// Writes mini batch of values, definition and repetition levels. @@ -593,6 +623,42 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { Ok(()) } + /// Update the column index and offset index when adding the data page + fn update_column_offset_index(&mut self, page_statistics: &Option) { + // update the column index + let null_page = (self.num_buffered_rows as u64) == self.num_page_nulls; + // a page contains only null values, + // and writers have to set the corresponding entries in min_values and max_values to byte[0] + if null_page && self.column_index_builder.valid() { + self.column_index_builder.append( + null_page, + &[0; 1], + &[0; 1], + self.num_page_nulls as i64, + ); + } else if self.column_index_builder.valid() { + // from page statistics + // If can't get the page statistics, ignore this column/offset index for this column chunk + match &page_statistics { + None => { + self.column_index_builder.to_invalid(); + } + Some(stat) => { + self.column_index_builder.append( + null_page, + stat.min_bytes(), + stat.max_bytes(), + self.num_page_nulls as i64, + ); + } + } + } + + // update the offset index + self.offset_index_builder + .append_row_count(self.num_buffered_rows as i64); + } + /// Adds data page. /// Data page is either buffered in case of dictionary encoding or written directly. fn add_data_page(&mut self, calculate_page_stat: bool) -> Result<()> { @@ -622,6 +688,9 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { None }; + // update column and offset index + self.update_column_offset_index(&page_statistics); + let compressed_page = match self.props.writer_version() { WriterVersion::PARQUET_1_0 => { let mut buffer = vec![]; @@ -700,8 +769,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { buf: ByteBufferPtr::new(buffer), num_values: self.num_buffered_values, encoding, - num_nulls: self.num_buffered_values - - self.num_buffered_encoded_values, + num_nulls: self.num_page_nulls as u32, num_rows: self.num_buffered_rows, def_levels_byte_len: def_levels_byte_len as u32, rep_levels_byte_len: rep_levels_byte_len as u32, @@ -830,6 +898,12 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { #[inline] fn write_data_page(&mut self, page: CompressedPage) -> Result<()> { let page_spec = self.page_writer.write_page(page)?; + // update offset index + // compressed_size = header_size + compressed_data_size + self.offset_index_builder.append_offset_and_size( + page_spec.offset as i64, + page_spec.compressed_size as i32, + ); self.update_metrics_for_page(page_spec); Ok(()) } @@ -865,6 +939,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { let page_spec = self.page_writer.write_page(compressed_page)?; self.update_metrics_for_page(page_spec); + // For the directory page, don't need to update column/offset index. Ok(()) } @@ -1133,6 +1208,7 @@ fn compare_greater_byte_array_decimals(a: &[u8], b: &[u8]) -> bool { #[cfg(test)] mod tests { + use parquet_format::BoundaryOrder; use rand::distributions::uniform::SampleUniform; use std::sync::Arc; @@ -1256,7 +1332,7 @@ mod tests { .write_batch(&[true, false, true, false], None, None) .unwrap(); - let (bytes_written, rows_written, metadata) = writer.close().unwrap(); + let (bytes_written, rows_written, metadata, _, _) = writer.close().unwrap(); // PlainEncoder uses bit writer to write boolean values, which all fit into 1 // byte. assert_eq!(bytes_written, 1); @@ -1529,7 +1605,7 @@ mod tests { let mut writer = get_test_column_writer::(page_writer, 0, 0, props); writer.write_batch(&[1, 2, 3, 4], None, None).unwrap(); - let (bytes_written, rows_written, metadata) = writer.close().unwrap(); + let (bytes_written, rows_written, metadata, _, _) = writer.close().unwrap(); assert_eq!(bytes_written, 20); assert_eq!(rows_written, 4); assert_eq!( @@ -1586,7 +1662,7 @@ mod tests { None, ) .unwrap(); - let (_bytes_written, _rows_written, metadata) = writer.close().unwrap(); + let (_bytes_written, _rows_written, metadata, _, _) = writer.close().unwrap(); if let Some(stats) = metadata.statistics() { assert!(stats.has_min_max_set()); if let Statistics::ByteArray(stats) = stats { @@ -1620,7 +1696,7 @@ mod tests { Int32Type, >(page_writer, 0, 0, props); writer.write_batch(&[0, 1, 2, 3, 4, 5], None, None).unwrap(); - let (_bytes_written, _rows_written, metadata) = writer.close().unwrap(); + let (_bytes_written, _rows_written, metadata, _, _) = writer.close().unwrap(); if let Some(stats) = metadata.statistics() { assert!(stats.has_min_max_set()); if let Statistics::Int32(stats) = stats { @@ -1651,7 +1727,7 @@ mod tests { ) .unwrap(); - let (bytes_written, rows_written, metadata) = writer.close().unwrap(); + let (bytes_written, rows_written, metadata, _, _) = writer.close().unwrap(); assert_eq!(bytes_written, 20); assert_eq!(rows_written, 4); assert_eq!( @@ -1835,7 +1911,7 @@ mod tests { let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; let mut writer = get_test_column_writer::(page_writer, 0, 0, props); writer.write_batch(data, None, None).unwrap(); - let (bytes_written, _, _) = writer.close().unwrap(); + let (bytes_written, _, _, _, _) = writer.close().unwrap(); // Read pages and check the sequence let source = FileSource::new(&file, 0, bytes_written as usize); @@ -2068,6 +2144,75 @@ mod tests { ),); } + #[test] + fn test_column_offset_index_metadata() { + // write data + // and check the offset index and column index + let page_writer = get_test_page_writer(); + let props = Arc::new(WriterProperties::builder().build()); + let mut writer = get_test_column_writer::(page_writer, 0, 0, props); + writer.write_batch(&[1, 2, 3, 4], None, None).unwrap(); + // first page + writer.flush_data_pages().unwrap(); + // second page + writer.write_batch(&[4, 8, 2, -5], None, None).unwrap(); + + let (_, rows_written, metadata, column_index, offset_index) = + writer.close().unwrap(); + let column_index = match column_index { + None => { + panic!("Can't fine the column index"); + } + Some(column_index) => column_index, + }; + let offset_index = match offset_index { + None => { + panic!("Can't find the offset index"); + } + Some(offset_index) => offset_index, + }; + + assert_eq!(8, rows_written); + + // column index + assert_eq!(2, column_index.null_pages.len()); + assert_eq!(2, offset_index.page_locations.len()); + assert_eq!(BoundaryOrder::Unordered, column_index.boundary_order); + for idx in 0..2 { + assert!(!column_index.null_pages[idx]); + assert_eq!(0, column_index.null_counts.as_ref().unwrap()[idx]); + } + + if let Some(stats) = metadata.statistics() { + assert!(stats.has_min_max_set()); + assert_eq!(stats.null_count(), 0); + assert_eq!(stats.distinct_count(), None); + if let Statistics::Int32(stats) = stats { + // first page is [1,2,3,4] + // second page is [-5,2,4,8] + assert_eq!(stats.min_bytes(), column_index.min_values[1].as_slice()); + assert_eq!( + stats.max_bytes(), + column_index.max_values.get(1).unwrap().as_slice() + ); + } else { + panic!("expecting Statistics::Int32"); + } + } else { + panic!("metadata missing statistics"); + } + + // page location + assert_eq!( + 0, + offset_index.page_locations.get(0).unwrap().first_row_index + ); + assert_eq!( + 4, + offset_index.page_locations.get(1).unwrap().first_row_index + ); + } + /// Performs write-read roundtrip with randomly generated values and levels. /// `max_size` is maximum number of values or levels (if `max_def_level` > 0) to write /// for a column. @@ -2149,7 +2294,8 @@ mod tests { let values_written = writer.write_batch(values, def_levels, rep_levels).unwrap(); assert_eq!(values_written, values.len()); - let (bytes_written, rows_written, column_metadata) = writer.close().unwrap(); + let (bytes_written, rows_written, column_metadata, _, _) = + writer.close().unwrap(); let source = FileSource::new(&file, 0, bytes_written as usize); let page_reader = Box::new( @@ -2215,7 +2361,7 @@ mod tests { let props = Arc::new(props); let mut writer = get_test_column_writer::(page_writer, 0, 0, props); writer.write_batch(values, None, None).unwrap(); - let (_, _, metadata) = writer.close().unwrap(); + let (_, _, metadata, _, _) = writer.close().unwrap(); metadata } @@ -2327,7 +2473,7 @@ mod tests { let mut writer = get_test_column_writer::(page_writer, 0, 0, props); writer.write_batch(values, None, None).unwrap(); - let (_bytes_written, _rows_written, metadata) = writer.close().unwrap(); + let (_bytes_written, _rows_written, metadata, _, _) = writer.close().unwrap(); if let Some(stats) = metadata.statistics() { stats.clone() } else { diff --git a/parquet/src/file/metadata.rs b/parquet/src/file/metadata.rs index 4d9842c0e89..7ec29de0173 100644 --- a/parquet/src/file/metadata.rs +++ b/parquet/src/file/metadata.rs @@ -35,7 +35,10 @@ use std::sync::Arc; -use parquet_format::{ColumnChunk, ColumnMetaData, PageLocation, RowGroup}; +use parquet_format::{ + BoundaryOrder, ColumnChunk, ColumnIndex, ColumnMetaData, OffsetIndex, PageLocation, + RowGroup, +}; use crate::basic::{ColumnOrder, Compression, Encoding, Type}; use crate::errors::{ParquetError, Result}; @@ -794,6 +797,107 @@ impl ColumnChunkMetaDataBuilder { } } +/// Builder for column index +pub struct ColumnIndexBuilder { + null_pages: Vec, + min_values: Vec>, + max_values: Vec>, + // TODO: calc the order for all pages in this column + boundary_order: BoundaryOrder, + null_counts: Vec, + // If one page can't get build index, need to ignore all index in this column + valid: bool, +} + +impl ColumnIndexBuilder { + pub fn new() -> Self { + ColumnIndexBuilder { + null_pages: Vec::new(), + min_values: Vec::new(), + max_values: Vec::new(), + boundary_order: BoundaryOrder::Unordered, + null_counts: Vec::new(), + valid: true, + } + } + + pub fn append( + &mut self, + null_page: bool, + min_value: &[u8], + max_value: &[u8], + null_count: i64, + ) { + self.null_pages.push(null_page); + self.min_values.push(min_value.to_vec()); + self.max_values.push(max_value.to_vec()); + self.null_counts.push(null_count); + } + + pub fn to_invalid(&mut self) { + self.valid = false; + } + + pub fn valid(&self) -> bool { + self.valid + } + + /// Build and get the thrift metadata of column index + pub fn build_to_thrift(self) -> ColumnIndex { + ColumnIndex::new( + self.null_pages, + self.min_values, + self.max_values, + self.boundary_order, + self.null_counts, + ) + } +} + +/// Builder for offset index +pub struct OffsetIndexBuilder { + offset_array: Vec, + compressed_page_size_array: Vec, + first_row_index_array: Vec, + current_first_row_index: i64, +} + +impl OffsetIndexBuilder { + pub fn new() -> Self { + OffsetIndexBuilder { + offset_array: Vec::new(), + compressed_page_size_array: Vec::new(), + first_row_index_array: Vec::new(), + current_first_row_index: 0, + } + } + + pub fn append_row_count(&mut self, row_count: i64) { + let current_page_row_index = self.current_first_row_index; + self.first_row_index_array.push(current_page_row_index); + self.current_first_row_index += row_count; + } + + pub fn append_offset_and_size(&mut self, offset: i64, compressed_page_size: i32) { + self.offset_array.push(offset); + self.compressed_page_size_array.push(compressed_page_size); + } + + /// Build and get the thrift metadata of offset index + pub fn build_to_thrift(self) -> OffsetIndex { + let locations = self + .offset_array + .iter() + .zip(self.compressed_page_size_array.iter()) + .zip(self.first_row_index_array.iter()) + .map(|((offset, size), row_index)| { + PageLocation::new(*offset, *size, *row_index) + }) + .collect::>(); + OffsetIndex::new(locations) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index b503c264d46..10983c74135 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -22,6 +22,7 @@ use std::{io::Write, sync::Arc}; use byteorder::{ByteOrder, LittleEndian}; use parquet_format as parquet; +use parquet_format::{ColumnIndex, OffsetIndex, RowGroup}; use thrift::protocol::{TCompactOutputProtocol, TOutputProtocol}; use crate::basic::PageType; @@ -78,14 +79,33 @@ impl Write for TrackedWrite { /// - the number of bytes written /// - the number of rows written /// - the column chunk metadata +/// - the column index +/// - the offset index /// -pub type OnCloseColumnChunk<'a> = - Box Result<()> + 'a>; +pub type OnCloseColumnChunk<'a> = Box< + dyn FnOnce( + u64, + u64, + ColumnChunkMetaData, + Option, + Option, + ) -> Result<()> + + 'a, +>; /// Callback invoked on closing a row group, arguments are: /// /// - the row group metadata -pub type OnCloseRowGroup<'a> = Box Result<()> + 'a>; +/// - the column index for each column chunk +/// - the offset index for each column chunk +pub type OnCloseRowGroup<'a> = Box< + dyn FnOnce( + RowGroupMetaDataPtr, + Vec>, + Vec>, + ) -> Result<()> + + 'a, +>; #[deprecated = "use std::io::Write"] pub trait ParquetWriter: Write + std::io::Seek + TryClone {} @@ -110,6 +130,8 @@ pub struct SerializedFileWriter { descr: SchemaDescPtr, props: WriterPropertiesPtr, row_groups: Vec, + column_indexes: Vec>>, + offset_indexes: Vec>>, row_group_index: usize, } @@ -124,6 +146,8 @@ impl SerializedFileWriter { descr: Arc::new(SchemaDescriptor::new(schema)), props: properties, row_groups: vec![], + column_indexes: Vec::new(), + offset_indexes: Vec::new(), row_group_index: 0, }) } @@ -139,8 +163,12 @@ impl SerializedFileWriter { self.row_group_index += 1; let row_groups = &mut self.row_groups; - let on_close = |metadata| { + let row_column_indexes = &mut self.column_indexes; + let row_offset_indexes = &mut self.offset_indexes; + let on_close = |metadata, row_group_column_index, row_group_offset_index| { row_groups.push(metadata); + row_column_indexes.push(row_group_column_index); + row_offset_indexes.push(row_group_offset_index); Ok(()) }; @@ -177,16 +205,74 @@ impl SerializedFileWriter { Ok(()) } + /// Serialize all the offset index to the file + fn write_offset_indexes(&mut self, row_groups: &mut [RowGroup]) -> Result<()> { + // iter row group + // iter each column + // write offset index to the file + for (row_group_idx, row_group) in row_groups.iter_mut().enumerate() { + for (column_idx, column_metadata) in row_group.columns.iter_mut().enumerate() + { + match &self.offset_indexes[row_group_idx][column_idx] { + Some(offset_index) => { + let start_offset = self.buf.bytes_written(); + let mut protocol = TCompactOutputProtocol::new(&mut self.buf); + offset_index.write_to_out_protocol(&mut protocol)?; + protocol.flush()?; + let end_offset = self.buf.bytes_written(); + // set offset and index for offset index + column_metadata.offset_index_offset = Some(start_offset as i64); + column_metadata.offset_index_length = + Some((end_offset - start_offset) as i32); + } + None => {} + } + } + } + Ok(()) + } + + /// Serialize all the column index to the file + fn write_column_indexes(&mut self, row_groups: &mut [RowGroup]) -> Result<()> { + // iter row group + // iter each column + // write column index to the file + for (row_group_idx, row_group) in row_groups.iter_mut().enumerate() { + for (column_idx, column_metadata) in row_group.columns.iter_mut().enumerate() + { + match &self.column_indexes[row_group_idx][column_idx] { + Some(column_index) => { + let start_offset = self.buf.bytes_written(); + let mut protocol = TCompactOutputProtocol::new(&mut self.buf); + column_index.write_to_out_protocol(&mut protocol)?; + protocol.flush()?; + let end_offset = self.buf.bytes_written(); + // set offset and index for offset index + column_metadata.column_index_offset = Some(start_offset as i64); + column_metadata.column_index_length = + Some((end_offset - start_offset) as i32); + } + None => {} + } + } + } + Ok(()) + } + /// Assembles and writes metadata at the end of the file. fn write_metadata(&mut self) -> Result { let num_rows = self.row_groups.iter().map(|x| x.num_rows()).sum(); - let row_groups = self + let mut row_groups = self .row_groups .as_slice() .iter() .map(|v| v.to_thrift()) - .collect(); + .collect::>(); + + // Write column indexes and offset indexes + self.write_column_indexes(&mut row_groups)?; + self.write_offset_indexes(&mut row_groups)?; let file_metadata = parquet::FileMetaData { num_rows, @@ -247,6 +333,8 @@ pub struct SerializedRowGroupWriter<'a, W: Write> { column_index: usize, row_group_metadata: Option, column_chunks: Vec, + column_indexes: Vec>, + offset_indexes: Vec>, on_close: Option>, } @@ -273,6 +361,8 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> { column_index: 0, row_group_metadata: None, column_chunks: Vec::with_capacity(num_columns), + column_indexes: Vec::with_capacity(num_columns), + offset_indexes: Vec::with_capacity(num_columns), total_bytes_written: 0, } } @@ -297,25 +387,31 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> { let total_bytes_written = &mut self.total_bytes_written; let total_rows_written = &mut self.total_rows_written; let column_chunks = &mut self.column_chunks; - - let on_close = |bytes_written, rows_written, metadata| { - // Update row group writer metrics - *total_bytes_written += bytes_written; - column_chunks.push(metadata); - if let Some(rows) = *total_rows_written { - if rows != rows_written { - return Err(general_err!( - "Incorrect number of rows, expected {} != {} rows", - rows, - rows_written - )); + let column_indexes = &mut self.column_indexes; + let offset_indexes = &mut self.offset_indexes; + + let on_close = + |bytes_written, rows_written, metadata, column_index, offset_index| { + // Update row group writer metrics + *total_bytes_written += bytes_written; + column_chunks.push(metadata); + column_indexes.push(column_index); + offset_indexes.push(offset_index); + + if let Some(rows) = *total_rows_written { + if rows != rows_written { + return Err(general_err!( + "Incorrect number of rows, expected {} != {} rows", + rows, + rows_written + )); + } + } else { + *total_rows_written = Some(rows_written); } - } else { - *total_rows_written = Some(rows_written); - } - Ok(()) - }; + Ok(()) + }; Ok(Some(SerializedColumnWriter::new( column_writer, @@ -343,7 +439,11 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> { self.row_group_metadata = Some(metadata.clone()); if let Some(on_close) = self.on_close.take() { - on_close(metadata)? + on_close( + metadata, + self.column_indexes.clone(), + self.offset_indexes.clone(), + )? } } @@ -389,19 +489,26 @@ impl<'a> SerializedColumnWriter<'a> { /// Close this [`SerializedColumnWriter] pub fn close(mut self) -> Result<()> { - let (bytes_written, rows_written, metadata) = match self.inner { - ColumnWriter::BoolColumnWriter(typed) => typed.close()?, - ColumnWriter::Int32ColumnWriter(typed) => typed.close()?, - ColumnWriter::Int64ColumnWriter(typed) => typed.close()?, - ColumnWriter::Int96ColumnWriter(typed) => typed.close()?, - ColumnWriter::FloatColumnWriter(typed) => typed.close()?, - ColumnWriter::DoubleColumnWriter(typed) => typed.close()?, - ColumnWriter::ByteArrayColumnWriter(typed) => typed.close()?, - ColumnWriter::FixedLenByteArrayColumnWriter(typed) => typed.close()?, - }; + let (bytes_written, rows_written, metadata, column_index, offset_index) = + match self.inner { + ColumnWriter::BoolColumnWriter(typed) => typed.close()?, + ColumnWriter::Int32ColumnWriter(typed) => typed.close()?, + ColumnWriter::Int64ColumnWriter(typed) => typed.close()?, + ColumnWriter::Int96ColumnWriter(typed) => typed.close()?, + ColumnWriter::FloatColumnWriter(typed) => typed.close()?, + ColumnWriter::DoubleColumnWriter(typed) => typed.close()?, + ColumnWriter::ByteArrayColumnWriter(typed) => typed.close()?, + ColumnWriter::FixedLenByteArrayColumnWriter(typed) => typed.close()?, + }; if let Some(on_close) = self.on_close.take() { - on_close(bytes_written, rows_written, metadata)? + on_close( + bytes_written, + rows_written, + metadata, + column_index, + offset_index, + )? } Ok(()) @@ -521,7 +628,6 @@ impl<'a, W: Write> PageWriter for SerializedPageWriter<'a, W> { Ok(spec) } - fn write_metadata(&mut self, metadata: &ColumnChunkMetaData) -> Result<()> { let mut protocol = TCompactOutputProtocol::new(&mut self.sink); metadata @@ -982,7 +1088,10 @@ mod tests { /// File write-read roundtrip. /// `data` consists of arrays of values for each row group. - fn test_file_roundtrip(file: File, data: Vec>) { + fn test_file_roundtrip( + file: File, + data: Vec>, + ) -> parquet_format::FileMetaData { let schema = Arc::new( types::Type::group_type_builder("schema") .with_fields(&mut vec![Arc::new( @@ -1014,7 +1123,7 @@ mod tests { assert_eq!(flushed.len(), idx + 1); assert_eq!(flushed[idx].as_ref(), last_group.as_ref()); } - file_writer.close().unwrap(); + let file_metadata = file_writer.close().unwrap(); let reader = assert_send(SerializedFileReader::new(file).unwrap()); assert_eq!(reader.num_row_groups(), data.len()); @@ -1031,6 +1140,7 @@ mod tests { .collect::>(); assert_eq!(res, *item); } + file_metadata } fn assert_send(t: T) -> T { @@ -1111,4 +1221,19 @@ mod tests { assert_eq!(res, *item); } } + + #[test] + fn test_column_offset_index_file() { + let file = tempfile::tempfile().unwrap(); + let file_metadata = test_file_roundtrip(file, vec![vec![1, 2, 3, 4, 5]]); + file_metadata.row_groups.iter().for_each(|row_group| { + row_group.columns.iter().for_each(|column_chunk| { + assert_ne!(None, column_chunk.column_index_offset); + assert_ne!(None, column_chunk.column_index_length); + + assert_ne!(None, column_chunk.offset_index_offset); + assert_ne!(None, column_chunk.offset_index_length); + }) + }); + } }