Skip to content

Commit

Permalink
Add page index reader test for all types and support empty index. (#2012
Browse files Browse the repository at this point in the history
)

* Add page index reader test and support empty index.

* update parquet-testing commit id

* refine code

* Update parquet/src/file/page_index/index.rs

Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com>

* add comment and fix

* Update parquet/src/file/page_index/index.rs

Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com>

* use `None` represent lack of index

Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com>
  • Loading branch information
Ted-Jiang and tustvold committed Jul 8, 2022
1 parent 9333a85 commit 373ac81
Show file tree
Hide file tree
Showing 5 changed files with 240 additions and 27 deletions.
14 changes: 8 additions & 6 deletions parquet/src/file/metadata.rs
Expand Up @@ -55,8 +55,10 @@ use crate::schema::types::{
pub struct ParquetMetaData {
file_metadata: FileMetaData,
row_groups: Vec<RowGroupMetaData>,
page_indexes: Option<Vec<Index>>,
offset_indexes: Option<Vec<Vec<PageLocation>>>,
/// Page index for all pages in each column chunk
page_indexes: Option<Vec<Vec<Index>>>,
/// Offset index for all pages in each column chunk
offset_indexes: Option<Vec<Vec<Vec<PageLocation>>>>,
}

impl ParquetMetaData {
Expand All @@ -74,8 +76,8 @@ impl ParquetMetaData {
pub fn new_with_page_index(
file_metadata: FileMetaData,
row_groups: Vec<RowGroupMetaData>,
page_indexes: Option<Vec<Index>>,
offset_indexes: Option<Vec<Vec<PageLocation>>>,
page_indexes: Option<Vec<Vec<Index>>>,
offset_indexes: Option<Vec<Vec<Vec<PageLocation>>>>,
) -> Self {
ParquetMetaData {
file_metadata,
Expand Down Expand Up @@ -107,12 +109,12 @@ impl ParquetMetaData {
}

/// Returns page indexes in this file.
pub fn page_indexes(&self) -> Option<&Vec<Index>> {
pub fn page_indexes(&self) -> Option<&Vec<Vec<Index>>> {
self.page_indexes.as_ref()
}

/// Returns offset indexes in this file.
pub fn offset_indexes(&self) -> Option<&Vec<Vec<PageLocation>>> {
pub fn offset_indexes(&self) -> Option<&Vec<Vec<Vec<PageLocation>>>> {
self.offset_indexes.as_ref()
}
}
Expand Down
4 changes: 4 additions & 0 deletions parquet/src/file/page_index/index.rs
Expand Up @@ -56,6 +56,10 @@ pub enum Index {
DOUBLE(NativeIndex<f64>),
BYTE_ARRAY(ByteArrayIndex),
FIXED_LEN_BYTE_ARRAY(ByteArrayIndex),
/// Sometimes reading page index from parquet file
/// will only return pageLocations without min_max index,
/// `None` represents this lack of index information
None,
}

/// An index of a column of [`Type`] physical representation
Expand Down
11 changes: 4 additions & 7 deletions parquet/src/file/page_index/index_reader.rs
Expand Up @@ -101,13 +101,7 @@ fn get_index_offset_and_lengths(
.iter()
.map(|x| x.column_index_length())
.map(|maybe_length| {
let index_length = maybe_length.ok_or_else(|| {
ParquetError::General(
"The column_index_length must exist if offset_index_offset exists"
.to_string(),
)
})?;

let index_length = maybe_length.unwrap_or(0);
Ok(index_length.try_into().unwrap())
})
.collect::<Result<Vec<_>, ParquetError>>()?;
Expand Down Expand Up @@ -143,6 +137,9 @@ fn deserialize_column_index(
data: &[u8],
column_type: Type,
) -> Result<Index, ParquetError> {
if data.is_empty() {
return Ok(Index::None);
}
let mut d = Cursor::new(data);
let mut prot = TCompactInputProtocol::new(&mut d);

Expand Down
236 changes: 223 additions & 13 deletions parquet/src/file/serialized_reader.rs
Expand Up @@ -248,21 +248,25 @@ impl<R: 'static + ChunkReader> SerializedFileReader<R> {
}

if options.enable_page_index {
//Todo for now test data `data_index_bloom_encoding_stats.parquet` only have one rowgroup
//support multi after create multi-RG test data.
let cols = metadata.row_group(0);
let columns_indexes =
index_reader::read_columns_indexes(&chunk_reader, cols.columns())?;
let pages_locations =
index_reader::read_pages_locations(&chunk_reader, cols.columns())?;
let mut columns_indexes = vec![];
let mut offset_indexes = vec![];

for rg in &filtered_row_groups {
let column_index =
index_reader::read_columns_indexes(&chunk_reader, rg.columns())?;
let offset_index =
index_reader::read_pages_locations(&chunk_reader, rg.columns())?;
columns_indexes.push(column_index);
offset_indexes.push(offset_index);
}

Ok(Self {
chunk_reader: Arc::new(chunk_reader),
metadata: ParquetMetaData::new_with_page_index(
metadata.file_metadata().clone(),
filtered_row_groups,
Some(columns_indexes),
Some(pages_locations),
Some(offset_indexes),
),
})
} else {
Expand Down Expand Up @@ -569,9 +573,11 @@ impl<T: Read + Send> PageReader for SerializedPageReader<T> {
mod tests {
use super::*;
use crate::basic::{self, ColumnOrder};
use crate::file::page_index::index::Index;
use crate::data_type::private::ParquetValueType;
use crate::file::page_index::index::{ByteArrayIndex, Index, NativeIndex};
use crate::record::RowAccessor;
use crate::schema::parser::parse_message_type;
use crate::util::bit_util::from_le_slice;
use crate::util::test_common::{get_test_file, get_test_path};
use parquet_format::BoundaryOrder;
use std::sync::Arc;
Expand Down Expand Up @@ -1085,7 +1091,7 @@ mod tests {

// only one row group
assert_eq!(page_indexes.len(), 1);
let index = if let Index::BYTE_ARRAY(index) = page_indexes.get(0).unwrap() {
let index = if let Index::BYTE_ARRAY(index) = &page_indexes[0][0] {
index
} else {
unreachable!()
Expand All @@ -1097,7 +1103,7 @@ mod tests {
//only one page group
assert_eq!(index_in_pages.len(), 1);

let page0 = index_in_pages.get(0).unwrap();
let page0 = &index_in_pages[0];
let min = page0.min.as_ref().unwrap();
let max = page0.max.as_ref().unwrap();
assert_eq!("Hello", std::str::from_utf8(min.as_slice()).unwrap());
Expand All @@ -1106,11 +1112,215 @@ mod tests {
let offset_indexes = metadata.offset_indexes().unwrap();
// only one row group
assert_eq!(offset_indexes.len(), 1);
let offset_index = offset_indexes.get(0).unwrap();
let page_offset = offset_index.get(0).unwrap();
let offset_index = &offset_indexes[0];
let page_offset = &offset_index[0][0];

assert_eq!(4, page_offset.offset);
assert_eq!(152, page_offset.compressed_page_size);
assert_eq!(0, page_offset.first_row_index);
}

#[test]
fn test_page_index_reader_all_type() {
let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
let builder = ReadOptionsBuilder::new();
//enable read page index
let options = builder.with_page_index().build();
let reader_result = SerializedFileReader::new_with_options(test_file, options);
let reader = reader_result.unwrap();

// Test contents in Parquet metadata
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 1);

let page_indexes = metadata.page_indexes().unwrap();
let row_group_offset_indexes = &metadata.offset_indexes().unwrap()[0];

// only one row group
assert_eq!(page_indexes.len(), 1);
let row_group_metadata = metadata.row_group(0);

//col0->id: INT32 UNCOMPRESSED DO:0 FPO:4 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 7299, num_nulls: 0]
if let Index::INT32(index) = &page_indexes[0][0] {
check_native_page_index(
index,
325,
get_row_group_min_max_bytes(row_group_metadata, 0),
BoundaryOrder::Unordered,
);
assert_eq!(row_group_offset_indexes[0].len(), 325);
} else {
unreachable!()
};
//col1->bool_col:BOOLEAN UNCOMPRESSED DO:0 FPO:37329 SZ:3022/3022/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: false, max: true, num_nulls: 0]
if let Index::BOOLEAN(index) = &page_indexes[0][1] {
assert_eq!(index.indexes.len(), 82);
assert_eq!(row_group_offset_indexes[1].len(), 82);
} else {
unreachable!()
};
//col2->tinyint_col: INT32 UNCOMPRESSED DO:0 FPO:40351 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
if let Index::INT32(index) = &page_indexes[0][2] {
check_native_page_index(
index,
325,
get_row_group_min_max_bytes(row_group_metadata, 2),
BoundaryOrder::Ascending,
);
assert_eq!(row_group_offset_indexes[2].len(), 325);
} else {
unreachable!()
};
//col4->smallint_col: INT32 UNCOMPRESSED DO:0 FPO:77676 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
if let Index::INT32(index) = &page_indexes[0][3] {
check_native_page_index(
index,
325,
get_row_group_min_max_bytes(row_group_metadata, 3),
BoundaryOrder::Ascending,
);
assert_eq!(row_group_offset_indexes[3].len(), 325);
} else {
unreachable!()
};
//col5->smallint_col: INT32 UNCOMPRESSED DO:0 FPO:77676 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
if let Index::INT32(index) = &page_indexes[0][4] {
check_native_page_index(
index,
325,
get_row_group_min_max_bytes(row_group_metadata, 4),
BoundaryOrder::Ascending,
);
assert_eq!(row_group_offset_indexes[4].len(), 325);
} else {
unreachable!()
};
//col6->bigint_col: INT64 UNCOMPRESSED DO:0 FPO:152326 SZ:71598/71598/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 90, num_nulls: 0]
if let Index::INT64(index) = &page_indexes[0][5] {
check_native_page_index(
index,
528,
get_row_group_min_max_bytes(row_group_metadata, 5),
BoundaryOrder::Unordered,
);
assert_eq!(row_group_offset_indexes[5].len(), 528);
} else {
unreachable!()
};
//col7->float_col: FLOAT UNCOMPRESSED DO:0 FPO:223924 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: -0.0, max: 9.9, num_nulls: 0]
if let Index::FLOAT(index) = &page_indexes[0][6] {
check_native_page_index(
index,
325,
get_row_group_min_max_bytes(row_group_metadata, 6),
BoundaryOrder::Ascending,
);
assert_eq!(row_group_offset_indexes[6].len(), 325);
} else {
unreachable!()
};
//col8->double_col: DOUBLE UNCOMPRESSED DO:0 FPO:261249 SZ:71598/71598/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: -0.0, max: 90.89999999999999, num_nulls: 0]
if let Index::DOUBLE(index) = &page_indexes[0][7] {
check_native_page_index(
index,
528,
get_row_group_min_max_bytes(row_group_metadata, 7),
BoundaryOrder::Unordered,
);
assert_eq!(row_group_offset_indexes[7].len(), 528);
} else {
unreachable!()
};
//col9->date_string_col: BINARY UNCOMPRESSED DO:0 FPO:332847 SZ:111948/111948/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 01/01/09, max: 12/31/10, num_nulls: 0]
if let Index::BYTE_ARRAY(index) = &page_indexes[0][8] {
check_bytes_page_index(
index,
974,
get_row_group_min_max_bytes(row_group_metadata, 8),
BoundaryOrder::Unordered,
);
assert_eq!(row_group_offset_indexes[8].len(), 974);
} else {
unreachable!()
};
//col10->string_col: BINARY UNCOMPRESSED DO:0 FPO:444795 SZ:45298/45298/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
if let Index::BYTE_ARRAY(index) = &page_indexes[0][9] {
check_bytes_page_index(
index,
352,
get_row_group_min_max_bytes(row_group_metadata, 9),
BoundaryOrder::Ascending,
);
assert_eq!(row_group_offset_indexes[9].len(), 352);
} else {
unreachable!()
};
//col11->timestamp_col: INT96 UNCOMPRESSED DO:0 FPO:490093 SZ:111948/111948/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[num_nulls: 0, min/max not defined]
//Notice: min_max values for each page for this col not exits.
if let Index::None = &page_indexes[0][10] {
assert_eq!(row_group_offset_indexes[10].len(), 974);
} else {
unreachable!()
};
//col12->year: INT32 UNCOMPRESSED DO:0 FPO:602041 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 2009, max: 2010, num_nulls: 0]
if let Index::INT32(index) = &page_indexes[0][11] {
check_native_page_index(
index,
325,
get_row_group_min_max_bytes(row_group_metadata, 11),
BoundaryOrder::Ascending,
);
assert_eq!(row_group_offset_indexes[11].len(), 325);
} else {
unreachable!()
};
//col13->month: INT32 UNCOMPRESSED DO:0 FPO:639366 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 1, max: 12, num_nulls: 0]
if let Index::INT32(index) = &page_indexes[0][12] {
check_native_page_index(
index,
325,
get_row_group_min_max_bytes(row_group_metadata, 12),
BoundaryOrder::Unordered,
);
assert_eq!(row_group_offset_indexes[12].len(), 325);
} else {
unreachable!()
};
}

fn check_native_page_index<T: ParquetValueType>(
row_group_index: &NativeIndex<T>,
page_size: usize,
min_max: (&[u8], &[u8]),
boundary_order: BoundaryOrder,
) {
assert_eq!(row_group_index.indexes.len(), page_size);
assert_eq!(row_group_index.boundary_order, boundary_order);
row_group_index.indexes.iter().all(|x| {
x.min.as_ref().unwrap() >= &from_le_slice::<T>(min_max.0)
&& x.max.as_ref().unwrap() <= &from_le_slice::<T>(min_max.1)
});
}

fn check_bytes_page_index(
row_group_index: &ByteArrayIndex,
page_size: usize,
min_max: (&[u8], &[u8]),
boundary_order: BoundaryOrder,
) {
assert_eq!(row_group_index.indexes.len(), page_size);
assert_eq!(row_group_index.boundary_order, boundary_order);
row_group_index.indexes.iter().all(|x| {
x.min.as_ref().unwrap().as_slice() >= min_max.0
&& x.max.as_ref().unwrap().as_slice() <= min_max.1
});
}

fn get_row_group_min_max_bytes(
r: &RowGroupMetaData,
col_num: usize,
) -> (&[u8], &[u8]) {
let statistics = r.column(col_num).statistics().unwrap();
(statistics.min_bytes(), statistics.max_bytes())
}
}

0 comments on commit 373ac81

Please sign in to comment.