From c0bc36133d73a49808465daa765a457458e62a43 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Fri, 1 Jul 2022 15:30:35 +0800 Subject: [PATCH] add test for filter offset index and fix conflict --- parquet/src/arrow/array_reader/builder.rs | 2 +- parquet/src/arrow/array_reader/mod.rs | 2 +- parquet/src/arrow/arrow_reader.rs | 2 +- parquet/src/arrow/async_reader.rs | 2 +- parquet/src/file/filer_offset_index.rs | 142 --------- parquet/src/file/mod.rs | 1 - .../src/file/page_index/filer_offset_index.rs | 293 ++++++++++++++++++ parquet/src/file/page_index/mod.rs | 1 + parquet/src/file/reader.rs | 2 +- parquet/src/file/serialized_reader.rs | 2 +- 10 files changed, 300 insertions(+), 149 deletions(-) delete mode 100644 parquet/src/file/filer_offset_index.rs create mode 100644 parquet/src/file/page_index/filer_offset_index.rs diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs index 59717a9d149..999042575f8 100644 --- a/parquet/src/arrow/array_reader/builder.rs +++ b/parquet/src/arrow/array_reader/builder.rs @@ -39,7 +39,7 @@ use crate::data_type::{ Int96Type, }; use crate::errors::Result; -use crate::file::filer_offset_index::FilterOffsetIndex; +use crate::file::page_index::filer_offset_index::FilterOffsetIndex; use crate::schema::types::{ColumnDescriptor, ColumnPath, SchemaDescPtr, Type}; /// Create array reader from parquet schema, projection mask, and parquet file reader. diff --git a/parquet/src/arrow/array_reader/mod.rs b/parquet/src/arrow/array_reader/mod.rs index 5d0fc7ce589..9bef5b77aa0 100644 --- a/parquet/src/arrow/array_reader/mod.rs +++ b/parquet/src/arrow/array_reader/mod.rs @@ -44,7 +44,7 @@ mod struct_array; #[cfg(test)] mod test_util; -use crate::file::filer_offset_index::FilterOffsetIndex; +use crate::file::page_index::filer_offset_index::FilterOffsetIndex; pub use builder::build_array_reader; pub use byte_array::make_byte_array_reader; pub use byte_array_dictionary::make_byte_array_dictionary_reader; diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs index 13fc317ed31..b8dbb89c59b 100644 --- a/parquet/src/arrow/arrow_reader.rs +++ b/parquet/src/arrow/arrow_reader.rs @@ -30,8 +30,8 @@ use crate::arrow::schema::parquet_to_arrow_schema; use crate::arrow::schema::parquet_to_arrow_schema_by_columns; use crate::arrow::ProjectionMask; use crate::errors::{ParquetError, Result}; -use crate::file::filer_offset_index::FilterOffsetIndex; use crate::file::metadata::{KeyValue, ParquetMetaData}; +use crate::file::page_index::filer_offset_index::FilterOffsetIndex; use crate::file::page_index::range::RowRanges; use crate::file::reader::{ChunkReader, FileReader, SerializedFileReader}; use crate::schema::types::SchemaDescriptor; diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs index fff540243c2..a4fea20e1c1 100644 --- a/parquet/src/arrow/async_reader.rs +++ b/parquet/src/arrow/async_reader.rs @@ -100,9 +100,9 @@ use crate::basic::Compression; use crate::column::page::{Page, PageIterator, PageReader}; use crate::compression::{create_codec, Codec}; use crate::errors::{ParquetError, Result}; -use crate::file::filer_offset_index::FilterOffsetIndex; use crate::file::footer::{decode_footer, decode_metadata}; use crate::file::metadata::ParquetMetaData; +use crate::file::page_index::filer_offset_index::FilterOffsetIndex; use crate::file::serialized_reader::{decode_page, read_page_header}; use crate::file::FOOTER_SIZE; use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor}; diff --git a/parquet/src/file/filer_offset_index.rs b/parquet/src/file/filer_offset_index.rs deleted file mode 100644 index 6f1607e879f..00000000000 --- a/parquet/src/file/filer_offset_index.rs +++ /dev/null @@ -1,142 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use crate::file::page_index::range::{Range, RowRanges}; -use parquet_format::PageLocation; - -/// Returns the filtered offset index containing only the pages which are overlapping with rowRanges. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct FilterOffsetIndex { - // read from parquet file which before the footer. - offset_index: Vec, - - // use to keep needed page index. - index_map: Vec, -} - -pub(crate) type OffsetRange = (Vec, Vec); - -impl FilterOffsetIndex { - pub(crate) fn try_new( - offset_index: &[PageLocation], - ranges: &RowRanges, - total_row_count: i64, - ) -> Self { - let mut index = vec![]; - for i in 0..offset_index.len() { - let page_location: &PageLocation = &offset_index[i]; - let page_range = if i == offset_index.len() - 1 { - Range::new( - page_location.first_row_index as usize, - total_row_count as usize, - ) - } else { - let next_page_location: &PageLocation = &offset_index[i + 1]; - Range::new( - page_location.first_row_index as usize, - (next_page_location.first_row_index - 1) as usize, - ) - }; - if ranges.is_overlapping(&page_range) { - index.push(i); - } - } - - FilterOffsetIndex { - offset_index: offset_index.to_vec(), - index_map: index, - } - } - - pub(crate) fn get_page_count(&self) -> usize { - self.index_map.len() - } - - pub(crate) fn get_offset(&self, page_index: usize) -> i64 { - let index = self.index_map[page_index]; - self.offset_index.get(index as usize).unwrap().offset - } - - pub(crate) fn get_compressed_page_size(&self, page_index: usize) -> i32 { - let index = self.index_map[page_index]; - self.offset_index - .get(index as usize) - .unwrap() - .compressed_page_size - } - - pub(crate) fn get_first_row_index(&self, page_index: usize) -> i64 { - let index = self.index_map[page_index]; - self.offset_index - .get(index as usize) - .unwrap() - .first_row_index - } - - pub(crate) fn get_last_row_index( - &self, - page_index: usize, - total_row_count: i64, - ) -> i64 { - let next_index = self.index_map[page_index] + 1; - if next_index >= self.get_page_count() { - total_row_count - } else { - self.offset_index - .get(next_index as usize) - .unwrap() - .first_row_index - - 1 - } - } - - // Return the offset of needed both data page and dictionary page. - // need input `row_group_offset` as input for checking if there is one dictionary page - // in one column chunk. - pub(crate) fn calculate_offset_range(&self, row_group_offset: i64) -> OffsetRange { - let mut start_list = vec![]; - let mut length_list = vec![]; - let page_count = self.get_page_count(); - if page_count > 0 { - let first_page_offset = self.get_offset(0); - // add dictionary page if required - if row_group_offset < first_page_offset { - start_list.push(row_group_offset as usize); - length_list.push((first_page_offset - 1) as usize); - } - let mut current_offset = self.get_offset(0); - let mut current_length = self.get_compressed_page_size(0); - - for i in 1..page_count { - let offset = self.get_offset(i); - let length = self.get_compressed_page_size(i); - - if (current_length + current_length) as i64 == offset { - current_length += length; - } else { - start_list.push(current_offset as usize); - length_list.push(current_length as usize); - current_offset = offset; - current_length = length - } - } - start_list.push(current_offset as usize); - length_list.push(current_length as usize); - } - (start_list, length_list) - } -} diff --git a/parquet/src/file/mod.rs b/parquet/src/file/mod.rs index d6d52921c31..66d8ce48e0a 100644 --- a/parquet/src/file/mod.rs +++ b/parquet/src/file/mod.rs @@ -95,7 +95,6 @@ //! println!("{}", row); //! } //! ``` -pub(crate) mod filer_offset_index; pub mod footer; pub mod metadata; pub mod page_encoding_stats; diff --git a/parquet/src/file/page_index/filer_offset_index.rs b/parquet/src/file/page_index/filer_offset_index.rs new file mode 100644 index 00000000000..700d86d2d79 --- /dev/null +++ b/parquet/src/file/page_index/filer_offset_index.rs @@ -0,0 +1,293 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::file::page_index::range::{Range, RowRanges}; +use parquet_format::PageLocation; + +/// Returns the filtered offset index containing only the pages which are overlapping with rowRanges. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct FilterOffsetIndex { + // read from parquet file which before the footer. + offset_index: Vec, + + // use to keep needed page index. + index_map: Vec, +} + +pub(crate) type OffsetRange = (Vec, Vec); + +impl FilterOffsetIndex { + pub(crate) fn try_new( + offset_index: &[PageLocation], + ranges: &RowRanges, + total_row_count: i64, + ) -> Self { + let mut index = vec![]; + for i in 0..offset_index.len() { + let page_location: &PageLocation = &offset_index[i]; + let page_range = if i == offset_index.len() - 1 { + Range::new( + page_location.first_row_index as usize, + total_row_count as usize, + ) + } else { + let next_page_location: &PageLocation = &offset_index[i + 1]; + Range::new( + page_location.first_row_index as usize, + (next_page_location.first_row_index - 1) as usize, + ) + }; + if ranges.is_overlapping(&page_range) { + index.push(i); + } + } + + FilterOffsetIndex { + offset_index: offset_index.to_vec(), + index_map: index, + } + } + + pub(crate) fn get_page_count(&self) -> usize { + self.index_map.len() + } + + pub(crate) fn get_offset(&self, page_index: usize) -> i64 { + let index = self.index_map[page_index]; + self.offset_index.get(index as usize).unwrap().offset + } + + pub(crate) fn get_compressed_page_size(&self, page_index: usize) -> i32 { + let index = self.index_map[page_index]; + self.offset_index + .get(index as usize) + .unwrap() + .compressed_page_size + } + + pub(crate) fn get_first_row_index(&self, page_index: usize) -> i64 { + let index = self.index_map[page_index]; + self.offset_index + .get(index as usize) + .unwrap() + .first_row_index + } + + pub(crate) fn get_last_row_index( + &self, + page_index: usize, + total_row_count: i64, + ) -> i64 { + let next_index = self.index_map[page_index] + 1; + if next_index >= self.get_page_count() { + total_row_count + } else { + self.offset_index + .get(next_index as usize) + .unwrap() + .first_row_index + - 1 + } + } + + // Return the offset of needed both data page and dictionary page. + // need input `row_group_offset` as input for checking if there is one dictionary page + // in one column chunk. + // Note: If data pages are adjacent, will merge them to one `OffsetRange`, + // for reduce seek effect. + pub(crate) fn calculate_offset_range(&self, row_group_offset: i64) -> OffsetRange { + let mut start_list = vec![]; + let mut length_list = vec![]; + let page_count = self.get_page_count(); + if page_count > 0 { + //Not after filter page offset + let first_page_offset = self.offset_index[0].offset; + // add dictionary page if required + if row_group_offset < first_page_offset { + start_list.push(row_group_offset as usize); + length_list.push((first_page_offset - row_group_offset) as usize); + } + let mut current_offset = self.get_offset(0); + let mut current_length = self.get_compressed_page_size(0); + + for i in 1..page_count { + let offset = self.get_offset(i); + let length = self.get_compressed_page_size(i); + + if current_offset + current_length as i64 == offset { + current_length += length; + } else { + start_list.push(current_offset as usize); + length_list.push(current_length as usize); + current_offset = offset; + current_length = length + } + } + start_list.push(current_offset as usize); + length_list.push(current_length as usize); + } + (start_list, length_list) + } +} + +#[cfg(test)] +mod tests { + use crate::file::page_index::filer_offset_index::FilterOffsetIndex; + use crate::file::page_index::range::{Range, RowRanges}; + use parquet_format::PageLocation; + + #[test] + fn test_without_dictionary() { + // + // * ┌──────┬──────┬ + // * 0 │ p0 │ │ + // * ╞══════╡ p0 │ + // * 10 │ p1(X)│ │ + // * ╞══════╪══════╡ + // * 20 │ p2(X)│ │ + // * ╞══════╡ p1(X)│ + // * 30 │ p3(X)│ │ + // * ╞══════╪══════╡ + // + let total_row = 40; + let offset_index_c0 = vec![ + // add col0 page0, offset start at 0. + PageLocation::new(0, 10, 0), + // add col0 page1 + PageLocation::new(10, 10, 10), + // add col0 page2 + PageLocation::new(20, 10, 20), + // add col0 page3 + PageLocation::new(30, 10, 30), + ]; + + let offset_index_c1 = vec![ + // add col1 page0 + PageLocation::new(0, 10, 0), + // add col1 page1 + PageLocation::new(10, 10, 20), + ]; + + let mut selected_rows = RowRanges::new_empty(); + //filter apply on col0, get result p1 + let selected_range = Range::new(10, 19); + selected_rows.add(selected_range); + + let c0_offset_index = + FilterOffsetIndex::try_new(&offset_index_c0, &selected_rows, total_row); + let (start_list, length_list) = c0_offset_index.calculate_offset_range(0); + assert_eq!(start_list.len(), 1); + assert_eq!(length_list.len(), 1); + // result should be col0 page1 offset + assert_eq!(start_list[0], 10); + assert_eq!(length_list[0], 10); + + let c1_offset_index = + FilterOffsetIndex::try_new(&offset_index_c1, &selected_rows, total_row); + let (start_list, length_list) = c1_offset_index.calculate_offset_range(0); + // result should be col1 page0 offset + assert_eq!(start_list[0], 0); + assert_eq!(length_list[0], 10); + } + + #[test] + fn test_with_dictionary() { + // *In col 0 there is a dictionary page0 (dictionary page must be the first one in column chunk) + // * ┌──────┬──────┬ + // * │ p0(d)│ │ + // * │══════│ │ + // * 0 │ p1 │ │ + // * ╞══════╡ p0 │ + // * 10 │ p2(X)│ │ + // * ╞══════╪══════╡ + // * 20 │ p3(X)│ │ + // * ╞══════╡ p1(X)│ + // * 30 │ p4(X)│ │ + // * ╞══════╪══════╡ + // + let total_row = 40; + let offset_index_c0 = vec![ + // add col0 page0, offset start at `10`, because of the dictionary page + PageLocation::new(10, 10, 0), + // add col0 page1 + PageLocation::new(20, 10, 10), + // add col0 page2 + PageLocation::new(30, 10, 20), + // add col0 page3 + PageLocation::new(40, 10, 30), + ]; + + let offset_index_c1 = vec![ + // add col1 page0 + PageLocation::new(0, 10, 0), + // add col1 page1 + PageLocation::new(10, 10, 20), + ]; + + //TEST1 -> filter apply on col1, get result p0 + let mut selected_rows = RowRanges::new_empty(); + let selected_range = Range::new(0, 19); + selected_rows.add(selected_range); + + let c0_offset_index = + FilterOffsetIndex::try_new(&offset_index_c0, &selected_rows, total_row); + let (start_list, length_list) = c0_offset_index.calculate_offset_range(0); + // it should return 3 values, but in `calculate_offset_range` we can merge p2,p3 to one offset to reduce seek effect. + assert_eq!(start_list.len(), 2); + assert_eq!(length_list.len(), 2); + // result should be col0 page0(dictionary) + assert_eq!(start_list[0], 0); + assert_eq!(length_list[0], 10); + + // page1 and page2(data) merged offset + assert_eq!(start_list[1], 10); + assert_eq!(length_list[1], 20); + + let c1_offset_index = + FilterOffsetIndex::try_new(&offset_index_c1, &selected_rows, total_row); + let (start_list, length_list) = c1_offset_index.calculate_offset_range(0); + // result should be col1 page0 offset + assert_eq!(start_list[0], 0); + assert_eq!(length_list[0], 10); + + //TEST2 -> filter apply on col1, get result p1 + let mut selected_rows = RowRanges::new_empty(); + let selected_range = Range::new(20, 39); + selected_rows.add(selected_range); + + let c0_offset_index = + FilterOffsetIndex::try_new(&offset_index_c0, &selected_rows, total_row); + let (start_list, length_list) = c0_offset_index.calculate_offset_range(0); + + assert_eq!(start_list.len(), 2); + assert_eq!(length_list.len(), 2); + // result should be col0 page0(dictionary) + assert_eq!(start_list[0], 0); + assert_eq!(length_list[0], 10); + + // page3 and page4(data) merged offset + assert_eq!(start_list[1], 30); + assert_eq!(length_list[1], 20); + + let c1_offset_index = + FilterOffsetIndex::try_new(&offset_index_c1, &selected_rows, total_row); + let (start_list, length_list) = c1_offset_index.calculate_offset_range(0); + // result should be col1 page1 offset + assert_eq!(start_list[0], 10); + assert_eq!(length_list[0], 10); + } +} diff --git a/parquet/src/file/page_index/mod.rs b/parquet/src/file/page_index/mod.rs index fc87ef20448..04326c4a647 100644 --- a/parquet/src/file/page_index/mod.rs +++ b/parquet/src/file/page_index/mod.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +pub(crate) mod filer_offset_index; pub mod index; pub mod index_reader; pub(crate) mod range; diff --git a/parquet/src/file/reader.rs b/parquet/src/file/reader.rs index a4873fd1227..9ec9d3c328e 100644 --- a/parquet/src/file/reader.rs +++ b/parquet/src/file/reader.rs @@ -31,7 +31,7 @@ use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, Type as SchemaType}; use crate::basic::Type; use crate::column::reader::ColumnReaderImpl; -use crate::file::filer_offset_index::FilterOffsetIndex; +use crate::file::page_index::filer_offset_index::FilterOffsetIndex; /// Length should return the total number of bytes in the input source. /// It's mainly used to read the metadata, which is at the end of the source. diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 70c34bf3e5b..3c91a21c47a 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -28,7 +28,7 @@ use crate::basic::{Compression, Encoding, Type}; use crate::column::page::{Page, PageReader}; use crate::compression::{create_codec, Codec}; use crate::errors::{ParquetError, Result}; -use crate::file::filer_offset_index::FilterOffsetIndex; +use crate::file::page_index::filer_offset_index::FilterOffsetIndex; use crate::file::page_index::index_reader; use crate::file::{footer, metadata::*, reader::*, statistics}; use crate::record::reader::RowIter;