Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add page index reader test for all types and support empty index. #2012

Merged
merged 7 commits into from Jul 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 8 additions & 6 deletions parquet/src/file/metadata.rs
Expand Up @@ -55,8 +55,10 @@ use crate::schema::types::{
pub struct ParquetMetaData {
file_metadata: FileMetaData,
row_groups: Vec<RowGroupMetaData>,
page_indexes: Option<Vec<Index>>,
offset_indexes: Option<Vec<Vec<PageLocation>>>,
/// Page index for all pages in each column chunk
page_indexes: Option<Vec<Vec<Index>>>,
/// Offset index for all pages in each column chunk
offset_indexes: Option<Vec<Vec<Vec<PageLocation>>>>,
}

impl ParquetMetaData {
Expand All @@ -74,8 +76,8 @@ impl ParquetMetaData {
pub fn new_with_page_index(
file_metadata: FileMetaData,
row_groups: Vec<RowGroupMetaData>,
page_indexes: Option<Vec<Index>>,
offset_indexes: Option<Vec<Vec<PageLocation>>>,
page_indexes: Option<Vec<Vec<Index>>>,
offset_indexes: Option<Vec<Vec<Vec<PageLocation>>>>,
) -> Self {
ParquetMetaData {
file_metadata,
Expand Down Expand Up @@ -107,12 +109,12 @@ impl ParquetMetaData {
}

/// Returns page indexes in this file.
pub fn page_indexes(&self) -> Option<&Vec<Index>> {
pub fn page_indexes(&self) -> Option<&Vec<Vec<Index>>> {
self.page_indexes.as_ref()
}

/// Returns offset indexes in this file.
pub fn offset_indexes(&self) -> Option<&Vec<Vec<PageLocation>>> {
pub fn offset_indexes(&self) -> Option<&Vec<Vec<Vec<PageLocation>>>> {
self.offset_indexes.as_ref()
}
}
Expand Down
4 changes: 4 additions & 0 deletions parquet/src/file/page_index/index.rs
Expand Up @@ -56,6 +56,10 @@ pub enum Index {
DOUBLE(NativeIndex<f64>),
BYTE_ARRAY(ByteArrayIndex),
FIXED_LEN_BYTE_ARRAY(ByteArrayIndex),
/// Sometimes reading page index from parquet file
/// will only return pageLocations without min_max index,
/// `None` represents this lack of index information
None,
}

/// An index of a column of [`Type`] physical representation
Expand Down
11 changes: 4 additions & 7 deletions parquet/src/file/page_index/index_reader.rs
Expand Up @@ -101,13 +101,7 @@ fn get_index_offset_and_lengths(
.iter()
.map(|x| x.column_index_length())
.map(|maybe_length| {
let index_length = maybe_length.ok_or_else(|| {
ParquetError::General(
"The column_index_length must exist if offset_index_offset exists"
.to_string(),
)
})?;

let index_length = maybe_length.unwrap_or(0);
Ok(index_length.try_into().unwrap())
})
.collect::<Result<Vec<_>, ParquetError>>()?;
Expand Down Expand Up @@ -143,6 +137,9 @@ fn deserialize_column_index(
data: &[u8],
column_type: Type,
) -> Result<Index, ParquetError> {
if data.is_empty() {
return Ok(Index::None);
}
let mut d = Cursor::new(data);
let mut prot = TCompactInputProtocol::new(&mut d);

Expand Down
236 changes: 223 additions & 13 deletions parquet/src/file/serialized_reader.rs
Expand Up @@ -248,21 +248,25 @@ impl<R: 'static + ChunkReader> SerializedFileReader<R> {
}

if options.enable_page_index {
//Todo for now test data `data_index_bloom_encoding_stats.parquet` only have one rowgroup
//support multi after create multi-RG test data.
let cols = metadata.row_group(0);
let columns_indexes =
index_reader::read_columns_indexes(&chunk_reader, cols.columns())?;
let pages_locations =
index_reader::read_pages_locations(&chunk_reader, cols.columns())?;
let mut columns_indexes = vec![];
let mut offset_indexes = vec![];

for rg in &filtered_row_groups {
let column_index =
index_reader::read_columns_indexes(&chunk_reader, rg.columns())?;
let offset_index =
index_reader::read_pages_locations(&chunk_reader, rg.columns())?;
columns_indexes.push(column_index);
offset_indexes.push(offset_index);
}

Ok(Self {
chunk_reader: Arc::new(chunk_reader),
metadata: ParquetMetaData::new_with_page_index(
metadata.file_metadata().clone(),
filtered_row_groups,
Some(columns_indexes),
Some(pages_locations),
Some(offset_indexes),
),
})
} else {
Expand Down Expand Up @@ -561,9 +565,11 @@ impl<T: Read + Send> PageReader for SerializedPageReader<T> {
mod tests {
use super::*;
use crate::basic::{self, ColumnOrder};
use crate::file::page_index::index::Index;
use crate::data_type::private::ParquetValueType;
use crate::file::page_index::index::{ByteArrayIndex, Index, NativeIndex};
use crate::record::RowAccessor;
use crate::schema::parser::parse_message_type;
use crate::util::bit_util::from_le_slice;
use crate::util::test_common::{get_test_file, get_test_path};
use parquet_format::BoundaryOrder;
use std::sync::Arc;
Expand Down Expand Up @@ -1077,7 +1083,7 @@ mod tests {

// only one row group
assert_eq!(page_indexes.len(), 1);
let index = if let Index::BYTE_ARRAY(index) = page_indexes.get(0).unwrap() {
let index = if let Index::BYTE_ARRAY(index) = &page_indexes[0][0] {
index
} else {
unreachable!()
Expand All @@ -1089,7 +1095,7 @@ mod tests {
//only one page group
assert_eq!(index_in_pages.len(), 1);

let page0 = index_in_pages.get(0).unwrap();
let page0 = &index_in_pages[0];
let min = page0.min.as_ref().unwrap();
let max = page0.max.as_ref().unwrap();
assert_eq!("Hello", std::str::from_utf8(min.as_slice()).unwrap());
Expand All @@ -1098,11 +1104,215 @@ mod tests {
let offset_indexes = metadata.offset_indexes().unwrap();
// only one row group
assert_eq!(offset_indexes.len(), 1);
let offset_index = offset_indexes.get(0).unwrap();
let page_offset = offset_index.get(0).unwrap();
let offset_index = &offset_indexes[0];
let page_offset = &offset_index[0][0];

assert_eq!(4, page_offset.offset);
assert_eq!(152, page_offset.compressed_page_size);
assert_eq!(0, page_offset.first_row_index);
}

#[test]
fn test_page_index_reader_all_type() {
let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
let builder = ReadOptionsBuilder::new();
//enable read page index
let options = builder.with_page_index().build();
let reader_result = SerializedFileReader::new_with_options(test_file, options);
let reader = reader_result.unwrap();

// Test contents in Parquet metadata
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 1);

let page_indexes = metadata.page_indexes().unwrap();
let row_group_offset_indexes = &metadata.offset_indexes().unwrap()[0];

// only one row group
assert_eq!(page_indexes.len(), 1);
let row_group_metadata = metadata.row_group(0);

//col0->id: INT32 UNCOMPRESSED DO:0 FPO:4 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 7299, num_nulls: 0]
if let Index::INT32(index) = &page_indexes[0][0] {
check_native_page_index(
index,
325,
get_row_group_min_max_bytes(row_group_metadata, 0),
BoundaryOrder::Unordered,
);
assert_eq!(row_group_offset_indexes[0].len(), 325);
} else {
unreachable!()
};
//col1->bool_col:BOOLEAN UNCOMPRESSED DO:0 FPO:37329 SZ:3022/3022/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: false, max: true, num_nulls: 0]
if let Index::BOOLEAN(index) = &page_indexes[0][1] {
assert_eq!(index.indexes.len(), 82);
assert_eq!(row_group_offset_indexes[1].len(), 82);
} else {
unreachable!()
};
//col2->tinyint_col: INT32 UNCOMPRESSED DO:0 FPO:40351 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
if let Index::INT32(index) = &page_indexes[0][2] {
check_native_page_index(
index,
325,
get_row_group_min_max_bytes(row_group_metadata, 2),
BoundaryOrder::Ascending,
);
assert_eq!(row_group_offset_indexes[2].len(), 325);
} else {
unreachable!()
};
//col4->smallint_col: INT32 UNCOMPRESSED DO:0 FPO:77676 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
if let Index::INT32(index) = &page_indexes[0][3] {
check_native_page_index(
index,
325,
get_row_group_min_max_bytes(row_group_metadata, 3),
BoundaryOrder::Ascending,
);
assert_eq!(row_group_offset_indexes[3].len(), 325);
} else {
unreachable!()
};
//col5->smallint_col: INT32 UNCOMPRESSED DO:0 FPO:77676 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
if let Index::INT32(index) = &page_indexes[0][4] {
check_native_page_index(
index,
325,
get_row_group_min_max_bytes(row_group_metadata, 4),
BoundaryOrder::Ascending,
);
assert_eq!(row_group_offset_indexes[4].len(), 325);
} else {
unreachable!()
};
//col6->bigint_col: INT64 UNCOMPRESSED DO:0 FPO:152326 SZ:71598/71598/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 90, num_nulls: 0]
if let Index::INT64(index) = &page_indexes[0][5] {
check_native_page_index(
index,
528,
get_row_group_min_max_bytes(row_group_metadata, 5),
BoundaryOrder::Unordered,
);
assert_eq!(row_group_offset_indexes[5].len(), 528);
} else {
unreachable!()
};
//col7->float_col: FLOAT UNCOMPRESSED DO:0 FPO:223924 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: -0.0, max: 9.9, num_nulls: 0]
if let Index::FLOAT(index) = &page_indexes[0][6] {
check_native_page_index(
index,
325,
get_row_group_min_max_bytes(row_group_metadata, 6),
BoundaryOrder::Ascending,
);
assert_eq!(row_group_offset_indexes[6].len(), 325);
} else {
unreachable!()
};
//col8->double_col: DOUBLE UNCOMPRESSED DO:0 FPO:261249 SZ:71598/71598/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: -0.0, max: 90.89999999999999, num_nulls: 0]
if let Index::DOUBLE(index) = &page_indexes[0][7] {
check_native_page_index(
index,
528,
get_row_group_min_max_bytes(row_group_metadata, 7),
BoundaryOrder::Unordered,
);
assert_eq!(row_group_offset_indexes[7].len(), 528);
} else {
unreachable!()
};
//col9->date_string_col: BINARY UNCOMPRESSED DO:0 FPO:332847 SZ:111948/111948/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 01/01/09, max: 12/31/10, num_nulls: 0]
if let Index::BYTE_ARRAY(index) = &page_indexes[0][8] {
check_bytes_page_index(
index,
974,
get_row_group_min_max_bytes(row_group_metadata, 8),
BoundaryOrder::Unordered,
);
assert_eq!(row_group_offset_indexes[8].len(), 974);
} else {
unreachable!()
};
//col10->string_col: BINARY UNCOMPRESSED DO:0 FPO:444795 SZ:45298/45298/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
if let Index::BYTE_ARRAY(index) = &page_indexes[0][9] {
check_bytes_page_index(
index,
352,
get_row_group_min_max_bytes(row_group_metadata, 9),
BoundaryOrder::Ascending,
);
assert_eq!(row_group_offset_indexes[9].len(), 352);
} else {
unreachable!()
};
//col11->timestamp_col: INT96 UNCOMPRESSED DO:0 FPO:490093 SZ:111948/111948/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[num_nulls: 0, min/max not defined]
//Notice: min_max values for each page for this col not exits.
if let Index::None = &page_indexes[0][10] {
assert_eq!(row_group_offset_indexes[10].len(), 974);
} else {
unreachable!()
};
//col12->year: INT32 UNCOMPRESSED DO:0 FPO:602041 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 2009, max: 2010, num_nulls: 0]
if let Index::INT32(index) = &page_indexes[0][11] {
check_native_page_index(
index,
325,
get_row_group_min_max_bytes(row_group_metadata, 11),
BoundaryOrder::Ascending,
);
assert_eq!(row_group_offset_indexes[11].len(), 325);
} else {
unreachable!()
};
//col13->month: INT32 UNCOMPRESSED DO:0 FPO:639366 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 1, max: 12, num_nulls: 0]
if let Index::INT32(index) = &page_indexes[0][12] {
check_native_page_index(
index,
325,
get_row_group_min_max_bytes(row_group_metadata, 12),
BoundaryOrder::Unordered,
);
assert_eq!(row_group_offset_indexes[12].len(), 325);
} else {
unreachable!()
};
}

fn check_native_page_index<T: ParquetValueType>(
row_group_index: &NativeIndex<T>,
page_size: usize,
min_max: (&[u8], &[u8]),
boundary_order: BoundaryOrder,
) {
assert_eq!(row_group_index.indexes.len(), page_size);
assert_eq!(row_group_index.boundary_order, boundary_order);
row_group_index.indexes.iter().all(|x| {
x.min.as_ref().unwrap() >= &from_le_slice::<T>(min_max.0)
&& x.max.as_ref().unwrap() <= &from_le_slice::<T>(min_max.1)
});
}

fn check_bytes_page_index(
row_group_index: &ByteArrayIndex,
page_size: usize,
min_max: (&[u8], &[u8]),
boundary_order: BoundaryOrder,
) {
assert_eq!(row_group_index.indexes.len(), page_size);
assert_eq!(row_group_index.boundary_order, boundary_order);
row_group_index.indexes.iter().all(|x| {
x.min.as_ref().unwrap().as_slice() >= min_max.0
&& x.max.as_ref().unwrap().as_slice() <= min_max.1
});
}

fn get_row_group_min_max_bytes(
r: &RowGroupMetaData,
col_num: usize,
) -> (&[u8], &[u8]) {
let statistics = r.column(col_num).statistics().unwrap();
(statistics.min_bytes(), statistics.max_bytes())
}
}