From 04f164a4afc8b6efce452b7b9b2e9f83250ac940 Mon Sep 17 00:00:00 2001 From: liukun4515 Date: Tue, 21 Jun 2022 10:31:42 +0800 Subject: [PATCH] add column index and offset index builder --- parquet/src/basic.rs | 17 ++++++ parquet/src/column/writer.rs | 1 + parquet/src/file/metadata.rs | 63 ++++++++++++++++++++- parquet/src/file/page_index/index_writer.rs | 9 +++ parquet/src/file/page_index/mod.rs | 1 + parquet/src/file/serialized_reader.rs | 1 + parquet/src/file/writer.rs | 2 +- 7 files changed, 92 insertions(+), 2 deletions(-) create mode 100644 parquet/src/file/page_index/index_writer.rs diff --git a/parquet/src/basic.rs b/parquet/src/basic.rs index 59a0fe07b7d..29857137a5d 100644 --- a/parquet/src/basic.rs +++ b/parquet/src/basic.rs @@ -942,6 +942,23 @@ impl str::FromStr for LogicalType { mod tests { use super::*; + #[test] + fn test_boolean() { + let value = true ^ false; + println!("{}", value); + + let list = vec![1,23,4,5,56,6]; + + let result = list.iter().filter_map(|v| { + if (*v > 10) { + Some(*v+20) + } else { + Some(1) + } + }).collect::>(); + + } + #[test] fn test_display_type() { assert_eq!(Type::BOOLEAN.to_string(), "BOOLEAN"); diff --git a/parquet/src/column/writer.rs b/parquet/src/column/writer.rs index d80cafe0e0a..7dfb19dcb89 100644 --- a/parquet/src/column/writer.rs +++ b/parquet/src/column/writer.rs @@ -785,6 +785,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { encodings.push(Encoding::RLE); let statistics = self.make_column_statistics(); + // TODO: column index: offset and column index let metadata = ColumnChunkMetaData::builder(self.descr.clone()) .set_compression(self.codec) .set_encodings(encodings) diff --git a/parquet/src/file/metadata.rs b/parquet/src/file/metadata.rs index a3477dd7577..9d3fc49c785 100644 --- a/parquet/src/file/metadata.rs +++ b/parquet/src/file/metadata.rs @@ -35,7 +35,7 @@ use std::sync::Arc; -use parquet_format::{ColumnChunk, ColumnMetaData, PageLocation, RowGroup}; +use parquet_format::{BoundaryOrder, ColumnChunk, ColumnIndex, ColumnMetaData, OffsetIndex, PageLocation, RowGroup}; use crate::basic::{ColumnOrder, Compression, Encoding, Type}; use crate::errors::{ParquetError, Result}; @@ -52,6 +52,7 @@ use crate::schema::types::{ pub struct ParquetMetaData { file_metadata: FileMetaData, row_groups: Vec, + // TODO: one index, just one page, one column chunk should have a list of page page_indexes: Option>, offset_indexes: Option>>, } @@ -789,6 +790,66 @@ impl ColumnChunkMetaDataBuilder { } } + +/// Builder for column index +pub struct ColumnIndexBuilder { + null_pages: Vec, + min_values: Vec>, + max_values: Vec>, + boundary_order: BoundaryOrder, + null_counts: Vec, +} + +impl ColumnIndexBuilder { + pub fn new() -> Self { + ColumnIndexBuilder { + null_pages: Vec::new(), + min_values: Vec::new(), + max_values: Vec::new(), + boundary_order: BoundaryOrder::Unordered, + null_counts: Vec::new(), + } + } + pub fn set_boundary_order(&mut self, boundary_order: BoundaryOrder) { + self.boundary_order = boundary_order; + } + pub fn append(&mut self, null_page: bool, min_value: &[u8], max_value: &[u8], null_count: i64) { + self.null_pages.push(null_page); + self.min_values.push(min_value.to_vec()); + self.max_values.push(max_value.to_vec()); + self.null_counts.push(null_count); + } + + /// Build and get the thrift metadata of column index + pub fn build_to_thrift(self) -> ColumnIndex { + ColumnIndex::new(self.null_pages, + self.min_values, self.max_values, self.boundary_order, self.null_counts) + } +} + +/// Builder for offset index +pub struct OffsetIndexBuilder { + page_locations: Vec, +} + +impl OffsetIndexBuilder { + pub fn new() -> Self { + OffsetIndexBuilder { + page_locations: Vec::new() + } + } + pub fn append(&mut self, offset: i64, compressed_page_size: i32, first_row_index: i64) { + self.page_locations.push(PageLocation::new( + offset, compressed_page_size, first_row_index, + )); + } + + /// Build and get the thrift metadata of offset index + pub fn build_to_thrift(self) -> OffsetIndex { + OffsetIndex::new(self.page_locations) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/parquet/src/file/page_index/index_writer.rs b/parquet/src/file/page_index/index_writer.rs new file mode 100644 index 00000000000..26344fd0609 --- /dev/null +++ b/parquet/src/file/page_index/index_writer.rs @@ -0,0 +1,9 @@ + +pub fn write_column_indexes() { + +} + +pub fn write_offset_indexes() { + +} + diff --git a/parquet/src/file/page_index/mod.rs b/parquet/src/file/page_index/mod.rs index fc87ef20448..777673adc53 100644 --- a/parquet/src/file/page_index/mod.rs +++ b/parquet/src/file/page_index/mod.rs @@ -18,3 +18,4 @@ pub mod index; pub mod index_reader; pub(crate) mod range; +mod index_writer; diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 6ff73e041e8..cadbe6d9971 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -250,6 +250,7 @@ impl SerializedFileReader { if options.enable_page_index { //Todo for now test data `data_index_bloom_encoding_stats.parquet` only have one rowgroup //support multi after create multi-RG test data. + // TODO: just read the first row group, this is some error code let cols = metadata.row_group(0); let columns_indexes = index_reader::read_columns_indexes(&chunk_reader, cols.columns())?; diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index 0a8fc331e7e..630f204373d 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -466,6 +466,7 @@ impl<'a, W: Write> PageWriter for SerializedPageWriter<'a, W> { data_page_header_v2: None, }; + // TODO: column indexes and offset indexes match *page.compressed_page() { Page::DataPage { def_level_encoding, @@ -531,7 +532,6 @@ impl<'a, W: Write> PageWriter for SerializedPageWriter<'a, W> { Ok(spec) } - fn write_metadata(&mut self, metadata: &ColumnChunkMetaData) -> Result<()> { self.serialize_column_chunk(metadata.to_thrift()) }