Skip to content

Commit

Permalink
try to deserialize pageIndex from parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
Ted-Jiang committed May 28, 2022
1 parent 684b660 commit 90cf78c
Show file tree
Hide file tree
Showing 9 changed files with 554 additions and 1 deletion.
2 changes: 1 addition & 1 deletion parquet/src/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub use parquet_format::{
/// control the on disk storage format.
/// For example INT16 is not included as a type since a good encoding of INT32
/// would handle this.
#[derive(Debug, Clone, Copy, PartialEq)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum Type {
BOOLEAN,
INT32,
Expand Down
2 changes: 2 additions & 0 deletions parquet/src/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,8 @@ pub(crate) mod private {
+ super::SliceAsBytes
+ PartialOrd
+ Send
+ Sync
+ 'static
+ crate::encodings::decoding::private::GetDecoder
{
/// Encode the value directly from a higher level encoder
Expand Down
7 changes: 7 additions & 0 deletions parquet/src/file/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ use parquet_format::{ColumnChunk, ColumnMetaData, RowGroup};
use crate::basic::{ColumnOrder, Compression, Encoding, Type};
use crate::errors::{ParquetError, Result};
use crate::file::page_encoding_stats::{self, PageEncodingStats};
use crate::file::page_index::index::Index;
use crate::file::page_index::row_range::Range;
use crate::file::statistics::{self, Statistics};
use crate::schema::types::{
ColumnDescPtr, ColumnDescriptor, ColumnPath, SchemaDescPtr, SchemaDescriptor,
Expand All @@ -51,6 +53,9 @@ use crate::schema::types::{
pub struct ParquetMetaData {
file_metadata: FileMetaData,
row_groups: Vec<RowGroupMetaData>,
page_indexes:Option<Vec<Arc<dyn Index>>>,
offset_indexes:Option<Vec<Range>>,

}

impl ParquetMetaData {
Expand All @@ -60,6 +65,8 @@ impl ParquetMetaData {
ParquetMetaData {
file_metadata,
row_groups,
page_indexes: None,
offset_indexes: None
}
}

Expand Down
1 change: 1 addition & 0 deletions parquet/src/file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ pub mod reader;
pub mod serialized_reader;
pub mod statistics;
pub mod writer;
pub mod page_index;

const FOOTER_SIZE: usize = 8;
pub(crate) const PARQUET_MAGIC: [u8; 4] = [b'P', b'A', b'R', b'1'];
Expand Down
340 changes: 340 additions & 0 deletions parquet/src/file/page_index/index.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,340 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::any::Any;
use std::fmt::Debug;
use parquet_format::{BoundaryOrder, ColumnIndex};
use crate::basic::Type;
use crate::data_type::{Int96};
use crate::data_type::private::ParquetValueType;
use crate::errors::ParquetError;
use crate::util::bit_util::from_ne_slice;


/// The static in one page
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct PageIndex<T> {
/// The minimum value, It is None when all values are null
pub min: Option<T>,
/// The maximum value, It is None when all values are null
pub max: Option<T>,
/// Null values in the page
pub null_count: Option<i64>,
}

/// Trait object representing a [`ColumnIndex`]
///
/// See [`NativeIndex`], [`ByteIndex`] and [`FixedLenByteIndex`] for concrete implementations.
pub trait Index: Send + Sync + Debug {
fn as_any(&self) -> &dyn Any;

fn physical_type(&self) -> &Type;
}

impl PartialEq for dyn Index + '_ {
fn eq(&self, that: &dyn Index) -> bool {
equal(self, that)
}
}

impl Eq for dyn Index + '_ {}

fn equal(lhs: &dyn Index, rhs: &dyn Index) -> bool {
if lhs.physical_type() != rhs.physical_type() {
return false;
}

match lhs.physical_type() {
Type::BOOLEAN => {
lhs.as_any().downcast_ref::<BooleanIndex>().unwrap()
== rhs.as_any().downcast_ref::<BooleanIndex>().unwrap()
}
Type::INT32 => {
lhs.as_any().downcast_ref::<NativeIndex<i32>>().unwrap()
== rhs.as_any().downcast_ref::<NativeIndex<i32>>().unwrap()
}
Type::INT64 => {
lhs.as_any().downcast_ref::<NativeIndex<i64>>().unwrap()
== rhs.as_any().downcast_ref::<NativeIndex<i64>>().unwrap()
}
Type::INT96 => {
lhs.as_any()
.downcast_ref::<NativeIndex<Int96>>()
.unwrap()
== rhs
.as_any()
.downcast_ref::<NativeIndex<Int96>>()
.unwrap()
}
Type::FLOAT => {
lhs.as_any().downcast_ref::<NativeIndex<f32>>().unwrap()
== rhs.as_any().downcast_ref::<NativeIndex<f32>>().unwrap()
}
Type::DOUBLE => {
lhs.as_any().downcast_ref::<NativeIndex<f64>>().unwrap()
== rhs.as_any().downcast_ref::<NativeIndex<f64>>().unwrap()
}
Type::BYTE_ARRAY => {
lhs.as_any().downcast_ref::<ByteIndex>().unwrap()
== rhs.as_any().downcast_ref::<ByteIndex>().unwrap()
}
Type::FIXED_LEN_BYTE_ARRAY => {
lhs.as_any().downcast_ref::<FixedLenByteIndex>().unwrap()
== rhs.as_any().downcast_ref::<FixedLenByteIndex>().unwrap()
}
}
}

/// An index of a column of [`DataType`] physical representation
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct NativeIndex<T: ParquetValueType> {
/// The physical type
pub physical_type: Type,
/// The indexes, one item per page
pub indexes: Vec<PageIndex<T>>,
/// the order
pub boundary_order: BoundaryOrder,
}

impl<T: ParquetValueType> NativeIndex<T> {
/// Creates a new [`NativeIndex`]
pub(crate) fn try_new(
index: ColumnIndex,
physical_type: Type,
) -> Result<Self, ParquetError> {
let len = index.min_values.len();

let null_counts = index
.null_counts
.map(|x| x.into_iter().map(Some).collect::<Vec<_>>())
.unwrap_or_else(|| vec![None; len]);

let indexes = index
.min_values
.iter()
.zip(index.max_values.into_iter())
.zip(index.null_pages.into_iter())
.zip(null_counts.into_iter())
.map(|(((min, max), is_null), null_count)| {
let (min, max) = if is_null {
(None, None)
} else {
let min = min.as_slice();
let max = max.as_slice();
(Some(from_ne_slice::<T>(min)), Some(from_ne_slice::<T>(max)))
};
Ok(PageIndex {
min,
max,
null_count,
})
})
.collect::<Result<Vec<_>, ParquetError>>()?;

Ok(Self {
physical_type,
indexes,
boundary_order: index.boundary_order,
})
}
}

impl<T: ParquetValueType> Index for NativeIndex<T> {
fn as_any(&self) -> &dyn Any {
self
}

fn physical_type(&self) -> &Type {
&self.physical_type
}
}

/// An index of a column of bytes physical type
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ByteIndex {
/// The physical type
pub physical_type: Type,
/// The indexes, one item per page
pub indexes: Vec<PageIndex<Vec<u8>>>,
pub boundary_order: BoundaryOrder,
}

impl ByteIndex {
pub(crate) fn try_new(
index: ColumnIndex,
physical_type: Type,
) -> Result<Self, ParquetError> {
let len = index.min_values.len();

let null_counts = index
.null_counts
.map(|x| x.into_iter().map(Some).collect::<Vec<_>>())
.unwrap_or_else(|| vec![None; len]);

let indexes = index
.min_values
.into_iter()
.zip(index.max_values.into_iter())
.zip(index.null_pages.into_iter())
.zip(null_counts.into_iter())
.map(|(((min, max), is_null), null_count)| {
let (min, max) = if is_null {
(None, None)
} else {
(Some(min), Some(max))
};
Ok(PageIndex {
min,
max,
null_count,
})
})
.collect::<Result<Vec<_>, ParquetError>>()?;

Ok(Self {
physical_type,
indexes,
boundary_order: index.boundary_order,
})
}
}

impl Index for ByteIndex {
fn as_any(&self) -> &dyn Any {
self
}

fn physical_type(&self) -> &Type {
&self.physical_type
}
}

/// An index of a column of fixed len byte physical type
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct FixedLenByteIndex {
/// The physical type
pub physical_type: Type,
/// The indexes, one item per page
pub indexes: Vec<PageIndex<Vec<u8>>>,
pub boundary_order: BoundaryOrder,
}

impl FixedLenByteIndex {
pub(crate) fn try_new(
index: ColumnIndex,
physical_type: Type,
) -> Result<Self, ParquetError> {
let len = index.min_values.len();

let null_counts = index
.null_counts
.map(|x| x.into_iter().map(Some).collect::<Vec<_>>())
.unwrap_or_else(|| vec![None; len]);

let indexes = index
.min_values
.into_iter()
.zip(index.max_values.into_iter())
.zip(index.null_pages.into_iter())
.zip(null_counts.into_iter())
.map(|(((min, max), is_null), null_count)| {
let (min, max) = if is_null {
(None, None)
} else {
(Some(min), Some(max))
};
Ok(PageIndex {
min,
max,
null_count,
})
})
.collect::<Result<Vec<_>, ParquetError>>()?;

Ok(Self {
physical_type,
indexes,
boundary_order: index.boundary_order,
})
}
}

impl Index for FixedLenByteIndex {
fn as_any(&self) -> &dyn Any {
self
}

fn physical_type(&self) -> &Type {
&self.physical_type
}
}

/// An index of a column of boolean physical type
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BooleanIndex {
/// The indexes, one item per page
pub indexes: Vec<PageIndex<bool>>,
pub boundary_order: BoundaryOrder,
}

impl BooleanIndex {
pub(crate) fn try_new(index: ColumnIndex) -> Result<Self, ParquetError> {
let len = index.min_values.len();

let null_counts = index
.null_counts
.map(|x| x.into_iter().map(Some).collect::<Vec<_>>())
.unwrap_or_else(|| vec![None; len]);

let indexes = index
.min_values
.into_iter()
.zip(index.max_values.into_iter())
.zip(index.null_pages.into_iter())
.zip(null_counts.into_iter())
.map(|(((min, max), is_null), null_count)| {
let (min, max) = if is_null {
(None, None)
} else {
let min = min[0] == 1;
let max = max[0] == 1;
(Some(min), Some(max))
};
Ok(PageIndex {
min,
max,
null_count,
})
})
.collect::<Result<Vec<_>, ParquetError>>()?;

Ok(Self {
indexes,
boundary_order: index.boundary_order,
})
}
}

impl Index for BooleanIndex {
fn as_any(&self) -> &dyn Any {
self
}

fn physical_type(&self) -> &Type {
&Type::BOOLEAN
}
}

0 comments on commit 90cf78c

Please sign in to comment.