Skip to content

Commit

Permalink
add column index and offset index builder
Browse files Browse the repository at this point in the history
  • Loading branch information
liukun4515 committed Jun 22, 2022
1 parent 9059cbf commit 04f164a
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 2 deletions.
17 changes: 17 additions & 0 deletions parquet/src/basic.rs
Expand Up @@ -942,6 +942,23 @@ impl str::FromStr for LogicalType {
mod tests {
use super::*;

#[test]
fn test_boolean() {
let value = true ^ false;
println!("{}", value);

let list = vec![1,23,4,5,56,6];

let result = list.iter().filter_map(|v| {
if (*v > 10) {
Some(*v+20)
} else {
Some(1)
}
}).collect::<Vec<_>>();

}

#[test]
fn test_display_type() {
assert_eq!(Type::BOOLEAN.to_string(), "BOOLEAN");
Expand Down
1 change: 1 addition & 0 deletions parquet/src/column/writer.rs
Expand Up @@ -785,6 +785,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
encodings.push(Encoding::RLE);

let statistics = self.make_column_statistics();
// TODO: column index: offset and column index
let metadata = ColumnChunkMetaData::builder(self.descr.clone())
.set_compression(self.codec)
.set_encodings(encodings)
Expand Down
63 changes: 62 additions & 1 deletion parquet/src/file/metadata.rs
Expand Up @@ -35,7 +35,7 @@

use std::sync::Arc;

use parquet_format::{ColumnChunk, ColumnMetaData, PageLocation, RowGroup};
use parquet_format::{BoundaryOrder, ColumnChunk, ColumnIndex, ColumnMetaData, OffsetIndex, PageLocation, RowGroup};

use crate::basic::{ColumnOrder, Compression, Encoding, Type};
use crate::errors::{ParquetError, Result};
Expand All @@ -52,6 +52,7 @@ use crate::schema::types::{
pub struct ParquetMetaData {
file_metadata: FileMetaData,
row_groups: Vec<RowGroupMetaData>,
// TODO: one index, just one page, one column chunk should have a list of page
page_indexes: Option<Vec<Index>>,
offset_indexes: Option<Vec<Vec<PageLocation>>>,
}
Expand Down Expand Up @@ -789,6 +790,66 @@ impl ColumnChunkMetaDataBuilder {
}
}


/// Builder for column index
pub struct ColumnIndexBuilder {
null_pages: Vec<bool>,
min_values: Vec<Vec<u8>>,
max_values: Vec<Vec<u8>>,
boundary_order: BoundaryOrder,
null_counts: Vec<i64>,
}

impl ColumnIndexBuilder {
pub fn new() -> Self {
ColumnIndexBuilder {
null_pages: Vec::new(),
min_values: Vec::new(),
max_values: Vec::new(),
boundary_order: BoundaryOrder::Unordered,
null_counts: Vec::new(),
}
}
pub fn set_boundary_order(&mut self, boundary_order: BoundaryOrder) {
self.boundary_order = boundary_order;
}
pub fn append(&mut self, null_page: bool, min_value: &[u8], max_value: &[u8], null_count: i64) {
self.null_pages.push(null_page);
self.min_values.push(min_value.to_vec());
self.max_values.push(max_value.to_vec());
self.null_counts.push(null_count);
}

/// Build and get the thrift metadata of column index
pub fn build_to_thrift(self) -> ColumnIndex {
ColumnIndex::new(self.null_pages,
self.min_values, self.max_values, self.boundary_order, self.null_counts)
}
}

/// Builder for offset index
pub struct OffsetIndexBuilder {
page_locations: Vec<PageLocation>,
}

impl OffsetIndexBuilder {
pub fn new() -> Self {
OffsetIndexBuilder {
page_locations: Vec::new()
}
}
pub fn append(&mut self, offset: i64, compressed_page_size: i32, first_row_index: i64) {
self.page_locations.push(PageLocation::new(
offset, compressed_page_size, first_row_index,
));
}

/// Build and get the thrift metadata of offset index
pub fn build_to_thrift(self) -> OffsetIndex {
OffsetIndex::new(self.page_locations)
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
9 changes: 9 additions & 0 deletions parquet/src/file/page_index/index_writer.rs
@@ -0,0 +1,9 @@

pub fn write_column_indexes() {

}

pub fn write_offset_indexes() {

}

1 change: 1 addition & 0 deletions parquet/src/file/page_index/mod.rs
Expand Up @@ -18,3 +18,4 @@
pub mod index;
pub mod index_reader;
pub(crate) mod range;
mod index_writer;
1 change: 1 addition & 0 deletions parquet/src/file/serialized_reader.rs
Expand Up @@ -250,6 +250,7 @@ impl<R: 'static + ChunkReader> SerializedFileReader<R> {
if options.enable_page_index {
//Todo for now test data `data_index_bloom_encoding_stats.parquet` only have one rowgroup
//support multi after create multi-RG test data.
// TODO: just read the first row group, this is some error code
let cols = metadata.row_group(0);
let columns_indexes =
index_reader::read_columns_indexes(&chunk_reader, cols.columns())?;
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/file/writer.rs
Expand Up @@ -466,6 +466,7 @@ impl<'a, W: Write> PageWriter for SerializedPageWriter<'a, W> {
data_page_header_v2: None,
};

// TODO: column indexes and offset indexes
match *page.compressed_page() {
Page::DataPage {
def_level_encoding,
Expand Down Expand Up @@ -531,7 +532,6 @@ impl<'a, W: Write> PageWriter for SerializedPageWriter<'a, W> {

Ok(spec)
}

fn write_metadata(&mut self, metadata: &ColumnChunkMetaData) -> Result<()> {
self.serialize_column_chunk(metadata.to_thrift())
}
Expand Down

0 comments on commit 04f164a

Please sign in to comment.