Skip to content

Commit

Permalink
add column index and offset index builder
Browse files Browse the repository at this point in the history
  • Loading branch information
liukun4515 committed Jun 24, 2022
1 parent 9059cbf commit e705846
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 2 deletions.
17 changes: 17 additions & 0 deletions parquet/src/basic.rs
Expand Up @@ -942,6 +942,23 @@ impl str::FromStr for LogicalType {
mod tests {
use super::*;

#[test]
fn test_boolean() {
let value = true ^ false;
println!("{}", value);

let list = vec![1,23,4,5,56,6];

let result = list.iter().filter_map(|v| {
if (*v > 10) {
Some(*v+20)
} else {
Some(1)
}
}).collect::<Vec<_>>();

}

#[test]
fn test_display_type() {
assert_eq!(Type::BOOLEAN.to_string(), "BOOLEAN");
Expand Down
3 changes: 3 additions & 0 deletions parquet/src/column/page.rs
Expand Up @@ -212,10 +212,13 @@ pub trait PageWriter {
///
/// This method is called once before page writer is closed, normally when writes are
/// finalised in column writer.
/// TODO remove this method: when we have the column index, the column chunk metadata data will
/// be determined after the column index witten to the disk
fn write_metadata(&mut self, metadata: &ColumnChunkMetaData) -> Result<()>;

/// Closes resources and flushes underlying sink.
/// Page writer should not be used after this method is called.
/// TODO close and return the column metadata
fn close(&mut self) -> Result<()>;
}

Expand Down
1 change: 1 addition & 0 deletions parquet/src/column/writer.rs
Expand Up @@ -785,6 +785,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
encodings.push(Encoding::RLE);

let statistics = self.make_column_statistics();
// TODO: column index: offset and column index
let metadata = ColumnChunkMetaData::builder(self.descr.clone())
.set_compression(self.codec)
.set_encodings(encodings)
Expand Down
63 changes: 62 additions & 1 deletion parquet/src/file/metadata.rs
Expand Up @@ -35,7 +35,7 @@

use std::sync::Arc;

use parquet_format::{ColumnChunk, ColumnMetaData, PageLocation, RowGroup};
use parquet_format::{BoundaryOrder, ColumnChunk, ColumnIndex, ColumnMetaData, OffsetIndex, PageLocation, RowGroup};

use crate::basic::{ColumnOrder, Compression, Encoding, Type};
use crate::errors::{ParquetError, Result};
Expand All @@ -52,6 +52,7 @@ use crate::schema::types::{
pub struct ParquetMetaData {
file_metadata: FileMetaData,
row_groups: Vec<RowGroupMetaData>,
// TODO: one index, just one page, one column chunk should have a list of page
page_indexes: Option<Vec<Index>>,
offset_indexes: Option<Vec<Vec<PageLocation>>>,
}
Expand Down Expand Up @@ -789,6 +790,66 @@ impl ColumnChunkMetaDataBuilder {
}
}


/// Builder for column index
pub struct ColumnIndexBuilder {
null_pages: Vec<bool>,
min_values: Vec<Vec<u8>>,
max_values: Vec<Vec<u8>>,
boundary_order: BoundaryOrder,
null_counts: Vec<i64>,
}

impl ColumnIndexBuilder {
pub fn new() -> Self {
ColumnIndexBuilder {
null_pages: Vec::new(),
min_values: Vec::new(),
max_values: Vec::new(),
boundary_order: BoundaryOrder::Unordered,
null_counts: Vec::new(),
}
}
pub fn set_boundary_order(&mut self, boundary_order: BoundaryOrder) {
self.boundary_order = boundary_order;
}
pub fn append(&mut self, null_page: bool, min_value: &[u8], max_value: &[u8], null_count: i64) {
self.null_pages.push(null_page);
self.min_values.push(min_value.to_vec());
self.max_values.push(max_value.to_vec());
self.null_counts.push(null_count);
}

/// Build and get the thrift metadata of column index
pub fn build_to_thrift(self) -> ColumnIndex {
ColumnIndex::new(self.null_pages,
self.min_values, self.max_values, self.boundary_order, self.null_counts)
}
}

/// Builder for offset index
pub struct OffsetIndexBuilder {
page_locations: Vec<PageLocation>,
}

impl OffsetIndexBuilder {
pub fn new() -> Self {
OffsetIndexBuilder {
page_locations: Vec::new()
}
}
pub fn append(&mut self, offset: i64, compressed_page_size: i32, first_row_index: i64) {
self.page_locations.push(PageLocation::new(
offset, compressed_page_size, first_row_index,
));
}

/// Build and get the thrift metadata of offset index
pub fn build_to_thrift(self) -> OffsetIndex {
OffsetIndex::new(self.page_locations)
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
27 changes: 27 additions & 0 deletions parquet/src/file/page_index/index_writer.rs
@@ -0,0 +1,27 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

/// Serialize the column index to byte array.

pub fn write_column_indexes() {

}

pub fn write_offset_indexes() {

}

1 change: 1 addition & 0 deletions parquet/src/file/page_index/mod.rs
Expand Up @@ -18,3 +18,4 @@
pub mod index;
pub mod index_reader;
pub(crate) mod range;
mod index_writer;
1 change: 1 addition & 0 deletions parquet/src/file/serialized_reader.rs
Expand Up @@ -250,6 +250,7 @@ impl<R: 'static + ChunkReader> SerializedFileReader<R> {
if options.enable_page_index {
//Todo for now test data `data_index_bloom_encoding_stats.parquet` only have one rowgroup
//support multi after create multi-RG test data.
// TODO: just read the first row group, this is some error code
let cols = metadata.row_group(0);
let columns_indexes =
index_reader::read_columns_indexes(&chunk_reader, cols.columns())?;
Expand Down
26 changes: 25 additions & 1 deletion parquet/src/file/writer.rs
Expand Up @@ -22,6 +22,7 @@ use std::{io::Write, sync::Arc};

use byteorder::{ByteOrder, LittleEndian};
use parquet_format as parquet;
use parquet_format::{ColumnIndex, OffsetIndex};
use thrift::protocol::{TCompactOutputProtocol, TOutputProtocol};

use crate::basic::PageType;
Expand Down Expand Up @@ -111,6 +112,8 @@ pub struct SerializedFileWriter<W: Write> {
props: WriterPropertiesPtr,
row_groups: Vec<RowGroupMetaDataPtr>,
row_group_index: usize,
column_indexes: Vec<Vec<ColumnIndex>>,
offset_indexes: Vec<Vec<OffsetIndex>>
}

impl<W: Write> SerializedFileWriter<W> {
Expand All @@ -125,6 +128,8 @@ impl<W: Write> SerializedFileWriter<W> {
props: properties,
row_groups: vec![],
row_group_index: 0,
column_indexes: Vec::new(),
offset_indexes: Vec::new()
})
}

Expand Down Expand Up @@ -177,8 +182,24 @@ impl<W: Write> SerializedFileWriter<W> {
Ok(())
}

/// Write column index to the file
fn write_column_index(&mut self) {
// iter row group
// iter column
//
}

/// Write offset index to the file
fn write_offset_index(&mut self) {

}

/// Assembles and writes metadata at the end of the file.
fn write_metadata(&mut self) -> Result<parquet::FileMetaData> {
// Before serialize the FileMeata, write the column index and offset index
self.write_column_index();
self.write_offset_index();

let num_rows = self.row_groups.iter().map(|x| x.num_rows()).sum();

let row_groups = self
Expand Down Expand Up @@ -448,6 +469,8 @@ impl<'a, W: Write> SerializedPageWriter<'a, W> {

impl<'a, W: Write> PageWriter for SerializedPageWriter<'a, W> {
fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
// TODO collection the column index info
// write page to the disk
let uncompressed_size = page.uncompressed_size();
let compressed_size = page.compressed_size();
let num_values = page.num_values();
Expand All @@ -466,6 +489,7 @@ impl<'a, W: Write> PageWriter for SerializedPageWriter<'a, W> {
data_page_header_v2: None,
};

// TODO: column indexes and offset indexes
match *page.compressed_page() {
Page::DataPage {
def_level_encoding,
Expand Down Expand Up @@ -528,10 +552,10 @@ impl<'a, W: Write> PageWriter for SerializedPageWriter<'a, W> {
if page_type == PageType::DATA_PAGE || page_type == PageType::DATA_PAGE_V2 {
spec.num_values = num_values;
}
// TODO collect the page statistics info and append the column builder

Ok(spec)
}

fn write_metadata(&mut self, metadata: &ColumnChunkMetaData) -> Result<()> {
self.serialize_column_chunk(metadata.to_thrift())
}
Expand Down

0 comments on commit e705846

Please sign in to comment.