diff --git a/parquet/src/basic.rs b/parquet/src/basic.rs index 59a0fe07b7d..29857137a5d 100644 --- a/parquet/src/basic.rs +++ b/parquet/src/basic.rs @@ -942,6 +942,23 @@ impl str::FromStr for LogicalType { mod tests { use super::*; + #[test] + fn test_boolean() { + let value = true ^ false; + println!("{}", value); + + let list = vec![1,23,4,5,56,6]; + + let result = list.iter().filter_map(|v| { + if (*v > 10) { + Some(*v+20) + } else { + Some(1) + } + }).collect::>(); + + } + #[test] fn test_display_type() { assert_eq!(Type::BOOLEAN.to_string(), "BOOLEAN"); diff --git a/parquet/src/column/page.rs b/parquet/src/column/page.rs index 9364bd30fff..09a36002578 100644 --- a/parquet/src/column/page.rs +++ b/parquet/src/column/page.rs @@ -212,10 +212,13 @@ pub trait PageWriter { /// /// This method is called once before page writer is closed, normally when writes are /// finalised in column writer. + /// TODO remove this method: when we have the column index, the column chunk metadata data will + /// be determined after the column index witten to the disk fn write_metadata(&mut self, metadata: &ColumnChunkMetaData) -> Result<()>; /// Closes resources and flushes underlying sink. /// Page writer should not be used after this method is called. + /// TODO close and return the column metadata fn close(&mut self) -> Result<()>; } diff --git a/parquet/src/column/writer.rs b/parquet/src/column/writer.rs index d80cafe0e0a..7dfb19dcb89 100644 --- a/parquet/src/column/writer.rs +++ b/parquet/src/column/writer.rs @@ -785,6 +785,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { encodings.push(Encoding::RLE); let statistics = self.make_column_statistics(); + // TODO: column index: offset and column index let metadata = ColumnChunkMetaData::builder(self.descr.clone()) .set_compression(self.codec) .set_encodings(encodings) diff --git a/parquet/src/file/metadata.rs b/parquet/src/file/metadata.rs index a3477dd7577..9d3fc49c785 100644 --- a/parquet/src/file/metadata.rs +++ b/parquet/src/file/metadata.rs @@ -35,7 +35,7 @@ use std::sync::Arc; -use parquet_format::{ColumnChunk, ColumnMetaData, PageLocation, RowGroup}; +use parquet_format::{BoundaryOrder, ColumnChunk, ColumnIndex, ColumnMetaData, OffsetIndex, PageLocation, RowGroup}; use crate::basic::{ColumnOrder, Compression, Encoding, Type}; use crate::errors::{ParquetError, Result}; @@ -52,6 +52,7 @@ use crate::schema::types::{ pub struct ParquetMetaData { file_metadata: FileMetaData, row_groups: Vec, + // TODO: one index, just one page, one column chunk should have a list of page page_indexes: Option>, offset_indexes: Option>>, } @@ -789,6 +790,66 @@ impl ColumnChunkMetaDataBuilder { } } + +/// Builder for column index +pub struct ColumnIndexBuilder { + null_pages: Vec, + min_values: Vec>, + max_values: Vec>, + boundary_order: BoundaryOrder, + null_counts: Vec, +} + +impl ColumnIndexBuilder { + pub fn new() -> Self { + ColumnIndexBuilder { + null_pages: Vec::new(), + min_values: Vec::new(), + max_values: Vec::new(), + boundary_order: BoundaryOrder::Unordered, + null_counts: Vec::new(), + } + } + pub fn set_boundary_order(&mut self, boundary_order: BoundaryOrder) { + self.boundary_order = boundary_order; + } + pub fn append(&mut self, null_page: bool, min_value: &[u8], max_value: &[u8], null_count: i64) { + self.null_pages.push(null_page); + self.min_values.push(min_value.to_vec()); + self.max_values.push(max_value.to_vec()); + self.null_counts.push(null_count); + } + + /// Build and get the thrift metadata of column index + pub fn build_to_thrift(self) -> ColumnIndex { + ColumnIndex::new(self.null_pages, + self.min_values, self.max_values, self.boundary_order, self.null_counts) + } +} + +/// Builder for offset index +pub struct OffsetIndexBuilder { + page_locations: Vec, +} + +impl OffsetIndexBuilder { + pub fn new() -> Self { + OffsetIndexBuilder { + page_locations: Vec::new() + } + } + pub fn append(&mut self, offset: i64, compressed_page_size: i32, first_row_index: i64) { + self.page_locations.push(PageLocation::new( + offset, compressed_page_size, first_row_index, + )); + } + + /// Build and get the thrift metadata of offset index + pub fn build_to_thrift(self) -> OffsetIndex { + OffsetIndex::new(self.page_locations) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/parquet/src/file/page_index/index_writer.rs b/parquet/src/file/page_index/index_writer.rs new file mode 100644 index 00000000000..ea03f0e7312 --- /dev/null +++ b/parquet/src/file/page_index/index_writer.rs @@ -0,0 +1,27 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/// Serialize the column index to byte array. + +pub fn write_column_indexes() { + +} + +pub fn write_offset_indexes() { + +} + diff --git a/parquet/src/file/page_index/mod.rs b/parquet/src/file/page_index/mod.rs index fc87ef20448..777673adc53 100644 --- a/parquet/src/file/page_index/mod.rs +++ b/parquet/src/file/page_index/mod.rs @@ -18,3 +18,4 @@ pub mod index; pub mod index_reader; pub(crate) mod range; +mod index_writer; diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 6ff73e041e8..cadbe6d9971 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -250,6 +250,7 @@ impl SerializedFileReader { if options.enable_page_index { //Todo for now test data `data_index_bloom_encoding_stats.parquet` only have one rowgroup //support multi after create multi-RG test data. + // TODO: just read the first row group, this is some error code let cols = metadata.row_group(0); let columns_indexes = index_reader::read_columns_indexes(&chunk_reader, cols.columns())?; diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index 0a8fc331e7e..2c907a55deb 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -22,6 +22,7 @@ use std::{io::Write, sync::Arc}; use byteorder::{ByteOrder, LittleEndian}; use parquet_format as parquet; +use parquet_format::{ColumnIndex, OffsetIndex}; use thrift::protocol::{TCompactOutputProtocol, TOutputProtocol}; use crate::basic::PageType; @@ -111,6 +112,8 @@ pub struct SerializedFileWriter { props: WriterPropertiesPtr, row_groups: Vec, row_group_index: usize, + column_indexes: Vec>, + offset_indexes: Vec> } impl SerializedFileWriter { @@ -125,6 +128,8 @@ impl SerializedFileWriter { props: properties, row_groups: vec![], row_group_index: 0, + column_indexes: Vec::new(), + offset_indexes: Vec::new() }) } @@ -177,8 +182,24 @@ impl SerializedFileWriter { Ok(()) } + /// Write column index to the file + fn write_column_index(&mut self) { + // iter row group + // iter column + // + } + + /// Write offset index to the file + fn write_offset_index(&mut self) { + + } + /// Assembles and writes metadata at the end of the file. fn write_metadata(&mut self) -> Result { + // Before serialize the FileMeata, write the column index and offset index + self.write_column_index(); + self.write_offset_index(); + let num_rows = self.row_groups.iter().map(|x| x.num_rows()).sum(); let row_groups = self @@ -448,6 +469,8 @@ impl<'a, W: Write> SerializedPageWriter<'a, W> { impl<'a, W: Write> PageWriter for SerializedPageWriter<'a, W> { fn write_page(&mut self, page: CompressedPage) -> Result { + // TODO collection the column index info + // write page to the disk let uncompressed_size = page.uncompressed_size(); let compressed_size = page.compressed_size(); let num_values = page.num_values(); @@ -466,6 +489,7 @@ impl<'a, W: Write> PageWriter for SerializedPageWriter<'a, W> { data_page_header_v2: None, }; + // TODO: column indexes and offset indexes match *page.compressed_page() { Page::DataPage { def_level_encoding, @@ -528,10 +552,10 @@ impl<'a, W: Write> PageWriter for SerializedPageWriter<'a, W> { if page_type == PageType::DATA_PAGE || page_type == PageType::DATA_PAGE_V2 { spec.num_values = num_values; } + // TODO collect the page statistics info and append the column builder Ok(spec) } - fn write_metadata(&mut self, metadata: &ColumnChunkMetaData) -> Result<()> { self.serialize_column_chunk(metadata.to_thrift()) }