Skip to content

Commit

Permalink
Support reading PageIndex from parquet metadata, prepare for skipping…
Browse files Browse the repository at this point in the history
… pages at reading (#1762)

* Add read options for column index based filtering

* try to deserialize pageIndex from parquet

* unable read pageIndex from parquet and test

* fix fmt

* remove FixedLenByteIndex use ByteIndex

* Apply suggestions from code review

Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com>

* fix comments

* use enum instead of trait object

* fix and add comment

* use from_le_slice instead of from_ne_slice

* use builder option

* remove useless trait

* fix from_le_slice

Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com>
  • Loading branch information
Ted-Jiang and tustvold committed May 31, 2022
1 parent 1859d6c commit ac5073f
Show file tree
Hide file tree
Showing 9 changed files with 559 additions and 20 deletions.
2 changes: 1 addition & 1 deletion parquet/src/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub use parquet_format::{
/// control the on disk storage format.
/// For example INT16 is not included as a type since a good encoding of INT32
/// would handle this.
#[derive(Debug, Clone, Copy, PartialEq)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum Type {
BOOLEAN,
INT32,
Expand Down
20 changes: 13 additions & 7 deletions parquet/src/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::column::reader::{ColumnReader, ColumnReaderImpl};
use crate::column::writer::{ColumnWriter, ColumnWriterImpl};
use crate::errors::{ParquetError, Result};
use crate::util::{
bit_util::{from_ne_slice, FromBytes},
bit_util::{from_le_slice, from_ne_slice, FromBytes},
memory::ByteBufferPtr,
};

Expand Down Expand Up @@ -1194,8 +1194,14 @@ make_type!(

impl FromBytes for Int96 {
type Buffer = [u8; 12];
fn from_le_bytes(_bs: Self::Buffer) -> Self {
unimplemented!()
fn from_le_bytes(bs: Self::Buffer) -> Self {
let mut i = Int96::new();
i.set_data(
from_le_slice(&bs[0..4]),
from_le_slice(&bs[4..8]),
from_le_slice(&bs[8..12]),
);
i
}
fn from_be_bytes(_bs: Self::Buffer) -> Self {
unimplemented!()
Expand All @@ -1215,8 +1221,8 @@ impl FromBytes for Int96 {
// appear to actual be converted directly from bytes
impl FromBytes for ByteArray {
type Buffer = [u8; 8];
fn from_le_bytes(_bs: Self::Buffer) -> Self {
unreachable!()
fn from_le_bytes(bs: Self::Buffer) -> Self {
ByteArray::from(bs.to_vec())
}
fn from_be_bytes(_bs: Self::Buffer) -> Self {
unreachable!()
Expand All @@ -1229,8 +1235,8 @@ impl FromBytes for ByteArray {
impl FromBytes for FixedLenByteArray {
type Buffer = [u8; 8];

fn from_le_bytes(_bs: Self::Buffer) -> Self {
unreachable!()
fn from_le_bytes(bs: Self::Buffer) -> Self {
Self(ByteArray::from(bs.to_vec()))
}
fn from_be_bytes(_bs: Self::Buffer) -> Self {
unreachable!()
Expand Down
31 changes: 30 additions & 1 deletion parquet/src/file/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,12 @@

use std::sync::Arc;

use parquet_format::{ColumnChunk, ColumnMetaData, RowGroup};
use parquet_format::{ColumnChunk, ColumnMetaData, PageLocation, RowGroup};

use crate::basic::{ColumnOrder, Compression, Encoding, Type};
use crate::errors::{ParquetError, Result};
use crate::file::page_encoding_stats::{self, PageEncodingStats};
use crate::file::page_index::index::Index;
use crate::file::statistics::{self, Statistics};
use crate::schema::types::{
ColumnDescPtr, ColumnDescriptor, ColumnPath, SchemaDescPtr, SchemaDescriptor,
Expand All @@ -51,6 +52,8 @@ use crate::schema::types::{
pub struct ParquetMetaData {
file_metadata: FileMetaData,
row_groups: Vec<RowGroupMetaData>,
page_indexes: Option<Vec<Index>>,
offset_indexes: Option<Vec<Vec<PageLocation>>>,
}

impl ParquetMetaData {
Expand All @@ -60,6 +63,22 @@ impl ParquetMetaData {
ParquetMetaData {
file_metadata,
row_groups,
page_indexes: None,
offset_indexes: None,
}
}

pub fn new_with_page_index(
file_metadata: FileMetaData,
row_groups: Vec<RowGroupMetaData>,
page_indexes: Option<Vec<Index>>,
offset_indexes: Option<Vec<Vec<PageLocation>>>,
) -> Self {
ParquetMetaData {
file_metadata,
row_groups,
page_indexes,
offset_indexes,
}
}

Expand All @@ -83,6 +102,16 @@ impl ParquetMetaData {
pub fn row_groups(&self) -> &[RowGroupMetaData] {
&self.row_groups
}

/// Returns page indexes in this file.
pub fn page_indexes(&self) -> Option<&Vec<Index>> {
self.page_indexes.as_ref()
}

/// Returns offset indexes in this file.
pub fn offset_indexes(&self) -> Option<&Vec<Vec<PageLocation>>> {
self.offset_indexes.as_ref()
}
}

pub type KeyValue = parquet_format::KeyValue;
Expand Down
1 change: 1 addition & 0 deletions parquet/src/file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
pub mod footer;
pub mod metadata;
pub mod page_encoding_stats;
pub mod page_index;
pub mod properties;
pub mod reader;
pub mod serialized_reader;
Expand Down
209 changes: 209 additions & 0 deletions parquet/src/file/page_index/index.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::basic::Type;
use crate::data_type::private::ParquetValueType;
use crate::data_type::Int96;
use crate::errors::ParquetError;
use crate::util::bit_util::from_le_slice;
use parquet_format::{BoundaryOrder, ColumnIndex};
use std::fmt::Debug;

/// The statistics in one page
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct PageIndex<T> {
/// The minimum value, It is None when all values are null
pub min: Option<T>,
/// The maximum value, It is None when all values are null
pub max: Option<T>,
/// Null values in the page
pub null_count: Option<i64>,
}

impl<T> PageIndex<T> {
pub fn min(&self) -> Option<&T> {
self.min.as_ref()
}
pub fn max(&self) -> Option<&T> {
self.max.as_ref()
}
pub fn null_count(&self) -> Option<i64> {
self.null_count
}
}

#[derive(Debug, Clone, PartialEq)]
pub enum Index {
BOOLEAN(BooleanIndex),
INT32(NativeIndex<i32>),
INT64(NativeIndex<i64>),
INT96(NativeIndex<Int96>),
FLOAT(NativeIndex<f32>),
DOUBLE(NativeIndex<f64>),
BYTE_ARRAY(ByteArrayIndex),
FIXED_LEN_BYTE_ARRAY(ByteArrayIndex),
}

/// An index of a column of [`Type`] physical representation
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct NativeIndex<T: ParquetValueType> {
/// The physical type
pub physical_type: Type,
/// The indexes, one item per page
pub indexes: Vec<PageIndex<T>>,
/// the order
pub boundary_order: BoundaryOrder,
}

impl<T: ParquetValueType> NativeIndex<T> {
/// Creates a new [`NativeIndex`]
pub(crate) fn try_new(
index: ColumnIndex,
physical_type: Type,
) -> Result<Self, ParquetError> {
let len = index.min_values.len();

let null_counts = index
.null_counts
.map(|x| x.into_iter().map(Some).collect::<Vec<_>>())
.unwrap_or_else(|| vec![None; len]);

let indexes = index
.min_values
.iter()
.zip(index.max_values.into_iter())
.zip(index.null_pages.into_iter())
.zip(null_counts.into_iter())
.map(|(((min, max), is_null), null_count)| {
let (min, max) = if is_null {
(None, None)
} else {
let min = min.as_slice();
let max = max.as_slice();
(Some(from_le_slice::<T>(min)), Some(from_le_slice::<T>(max)))
};
Ok(PageIndex {
min,
max,
null_count,
})
})
.collect::<Result<Vec<_>, ParquetError>>()?;

Ok(Self {
physical_type,
indexes,
boundary_order: index.boundary_order,
})
}
}

/// An index of a column of bytes type
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ByteArrayIndex {
/// The physical type
pub physical_type: Type,
/// The indexes, one item per page
pub indexes: Vec<PageIndex<Vec<u8>>>,
pub boundary_order: BoundaryOrder,
}

impl ByteArrayIndex {
pub(crate) fn try_new(
index: ColumnIndex,
physical_type: Type,
) -> Result<Self, ParquetError> {
let len = index.min_values.len();

let null_counts = index
.null_counts
.map(|x| x.into_iter().map(Some).collect::<Vec<_>>())
.unwrap_or_else(|| vec![None; len]);

let indexes = index
.min_values
.into_iter()
.zip(index.max_values.into_iter())
.zip(index.null_pages.into_iter())
.zip(null_counts.into_iter())
.map(|(((min, max), is_null), null_count)| {
let (min, max) = if is_null {
(None, None)
} else {
(Some(min), Some(max))
};
Ok(PageIndex {
min,
max,
null_count,
})
})
.collect::<Result<Vec<_>, ParquetError>>()?;

Ok(Self {
physical_type,
indexes,
boundary_order: index.boundary_order,
})
}
}

/// An index of a column of boolean physical type
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BooleanIndex {
/// The indexes, one item per page
pub indexes: Vec<PageIndex<bool>>,
pub boundary_order: BoundaryOrder,
}

impl BooleanIndex {
pub(crate) fn try_new(index: ColumnIndex) -> Result<Self, ParquetError> {
let len = index.min_values.len();

let null_counts = index
.null_counts
.map(|x| x.into_iter().map(Some).collect::<Vec<_>>())
.unwrap_or_else(|| vec![None; len]);

let indexes = index
.min_values
.into_iter()
.zip(index.max_values.into_iter())
.zip(index.null_pages.into_iter())
.zip(null_counts.into_iter())
.map(|(((min, max), is_null), null_count)| {
let (min, max) = if is_null {
(None, None)
} else {
let min = min[0] != 0;
let max = max[0] == 1;
(Some(min), Some(max))
};
Ok(PageIndex {
min,
max,
null_count,
})
})
.collect::<Result<Vec<_>, ParquetError>>()?;

Ok(Self {
indexes,
boundary_order: index.boundary_order,
})
}
}

0 comments on commit ac5073f

Please sign in to comment.