Skip to content

Commit

Permalink
add column index and offset index
Browse files Browse the repository at this point in the history
  • Loading branch information
liukun4515 committed Jun 27, 2022
1 parent 9059cbf commit 735f4ed
Show file tree
Hide file tree
Showing 5 changed files with 296 additions and 15 deletions.
4 changes: 2 additions & 2 deletions parquet/src/arrow/arrow_writer/mod.rs
Expand Up @@ -34,7 +34,7 @@ use super::schema::{

use crate::column::writer::ColumnWriter;
use crate::errors::{ParquetError, Result};
use crate::file::metadata::RowGroupMetaDataPtr;
use crate::file::metadata::RowGroupMetaData;
use crate::file::properties::WriterProperties;
use crate::file::writer::{SerializedColumnWriter, SerializedRowGroupWriter};
use crate::{data_type::*, file::writer::SerializedFileWriter};
Expand Down Expand Up @@ -98,7 +98,7 @@ impl<W: Write> ArrowWriter<W> {
}

/// Returns metadata for any flushed row groups
pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] {
pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
self.writer.flushed_row_groups()
}

Expand Down
67 changes: 64 additions & 3 deletions parquet/src/column/writer.rs
Expand Up @@ -29,6 +29,7 @@ use crate::encodings::{
levels::{max_buffer_size, LevelEncoder},
};
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{ColumnIndexBuilder, OffsetIndexBuilder};
use crate::file::statistics::Statistics;
use crate::file::{
metadata::ColumnChunkMetaData,
Expand Down Expand Up @@ -198,6 +199,9 @@ pub struct ColumnWriterImpl<'a, T: DataType> {
rep_levels_sink: Vec<i16>,
data_pages: VecDeque<CompressedPage>,
_phantom: PhantomData<T>,
// column index and offset index
column_index_builder: ColumnIndexBuilder,
offset_index_builder: OffsetIndexBuilder,
}

impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
Expand Down Expand Up @@ -261,6 +265,8 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
num_column_nulls: 0,
column_distinct_count: None,
_phantom: PhantomData,
column_index_builder: ColumnIndexBuilder::new(),
offset_index_builder: OffsetIndexBuilder::new(),
}
}

Expand Down Expand Up @@ -421,10 +427,19 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
self.write_dictionary_page()?;
}
self.flush_data_pages()?;
let metadata = self.write_column_metadata()?;
let mut metadata = self.write_column_metadata()?;
self.dict_encoder = None;
self.page_writer.close()?;

if self.column_index_builder.valid() {
// build the column and offset index
let column_index = self.column_index_builder.build_to_thrift();
let offset_index = self.offset_index_builder.build_to_thrift();
// set the column and offset index to the column metadata
metadata.set_column_index(Some(column_index));
metadata.set_offset_index(Some(offset_index));
}

Ok((self.total_bytes_written, self.total_rows_written, metadata))
}

Expand Down Expand Up @@ -593,6 +608,42 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
Ok(())
}

/// Update the column index and offset index when adding the data page
fn update_column_offset_index(&mut self, page_statistics: &Option<Statistics>) {
// update the column index
let null_page = (self.num_buffered_rows as u64) == self.num_page_nulls;
// a page contains only null values,
// and writers have to set the corresponding entries in min_values and max_values to byte[0]
if null_page && self.column_index_builder.valid() {
self.column_index_builder.append(
null_page,
&[0; 1],
&[0; 1],
self.num_page_nulls as i64,
);
} else if self.column_index_builder.valid() {
// from page statistics
// If can't get the page statistics, ignore this column/offset index for this column chunk
match &page_statistics {
None => {
self.column_index_builder.to_invalid();
}
Some(stat) => {
self.column_index_builder.append(
null_page,
stat.min_bytes(),
stat.max_bytes(),
self.num_page_nulls as i64,
);
}
}
}

// update the offset index
self.offset_index_builder
.append_row_count(self.num_buffered_rows as i64);
}

/// Adds data page.
/// Data page is either buffered in case of dictionary encoding or written directly.
fn add_data_page(&mut self, calculate_page_stat: bool) -> Result<()> {
Expand Down Expand Up @@ -622,6 +673,9 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
None
};

// update column and offset index
self.update_column_offset_index(&page_statistics);

let compressed_page = match self.props.writer_version() {
WriterVersion::PARQUET_1_0 => {
let mut buffer = vec![];
Expand Down Expand Up @@ -700,8 +754,8 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
buf: ByteBufferPtr::new(buffer),
num_values: self.num_buffered_values,
encoding,
num_nulls: self.num_buffered_values
- self.num_buffered_encoded_values,
// TODO: why not use the null_page_count
num_nulls: self.num_page_nulls as u32,
num_rows: self.num_buffered_rows,
def_levels_byte_len: def_levels_byte_len as u32,
rep_levels_byte_len: rep_levels_byte_len as u32,
Expand Down Expand Up @@ -830,6 +884,12 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
#[inline]
fn write_data_page(&mut self, page: CompressedPage) -> Result<()> {
let page_spec = self.page_writer.write_page(page)?;
// update offset index
// compressed_size = header_size + compressed_data_size
self.offset_index_builder.append_offset_and_size(
page_spec.offset as i64,
page_spec.compressed_size as i32,
);
self.update_metrics_for_page(page_spec);
Ok(())
}
Expand Down Expand Up @@ -865,6 +925,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {

let page_spec = self.page_writer.write_page(compressed_page)?;
self.update_metrics_for_page(page_spec);
// For the directory page, don't need to update column/offset index.
Ok(())
}

Expand Down
159 changes: 158 additions & 1 deletion parquet/src/file/metadata.rs
Expand Up @@ -35,7 +35,10 @@

use std::sync::Arc;

use parquet_format::{ColumnChunk, ColumnMetaData, PageLocation, RowGroup};
use parquet_format::{
BoundaryOrder, ColumnChunk, ColumnIndex, ColumnMetaData, OffsetIndex, PageLocation,
RowGroup,
};

use crate::basic::{ColumnOrder, Compression, Encoding, Type};
use crate::errors::{ParquetError, Result};
Expand All @@ -52,6 +55,7 @@ use crate::schema::types::{
pub struct ParquetMetaData {
file_metadata: FileMetaData,
row_groups: Vec<RowGroupMetaData>,
// TODO: one index, just one page, one column chunk should have a list of page
page_indexes: Option<Vec<Index>>,
offset_indexes: Option<Vec<Vec<PageLocation>>>,
}
Expand Down Expand Up @@ -247,6 +251,11 @@ impl RowGroupMetaData {
&self.columns
}

/// Returns mut slice of column chunk metadata
pub fn columns_mut(&mut self) -> &mut [ColumnChunkMetaData] {
&mut self.columns
}

/// Number of rows in this row group.
pub fn num_rows(&self) -> i64 {
self.num_rows
Expand Down Expand Up @@ -386,6 +395,9 @@ pub struct ColumnChunkMetaData {
offset_index_length: Option<i32>,
column_index_offset: Option<i64>,
column_index_length: Option<i32>,
// column index and offset index
column_index: Option<ColumnIndex>,
offset_index: Option<OffsetIndex>,
}

/// Represents common operations for a column chunk.
Expand Down Expand Up @@ -519,6 +531,46 @@ impl ColumnChunkMetaData {
self.offset_index_length
}

/// Returns the column index
pub fn column_index(&self) -> &Option<ColumnIndex> {
&self.column_index
}

/// Returns the offset index
pub fn offset_index(&self) -> &Option<OffsetIndex> {
&self.offset_index
}

/// Set the column index for the column metadata
pub fn set_column_index(&mut self, column_index: Option<ColumnIndex>) {
self.column_index = column_index;
}

/// Set the offset index for the column metadata
pub fn set_offset_index(&mut self, offset_index: Option<OffsetIndex>) {
self.offset_index = offset_index
}

/// Set File offset of ColumnChunk's OffsetIndex
pub fn set_offset_index_offset(&mut self, offset_index_offset: i64) {
self.offset_index_offset = Some(offset_index_offset);
}

/// Set Size of ColumnChunk's OffsetIndex, in bytes
pub fn set_offset_index_length(&mut self, offset_index_length: i32) {
self.offset_index_length = Some(offset_index_length);
}

/// Set File offset of ColumnChunk's ColumnIndex
pub fn set_column_index_offset(&mut self, column_index_offset: i64) {
self.column_index_offset = Some(column_index_offset);
}

/// Set Size of ColumnChunk's ColumnIndex, in bytes
pub fn set_column_index_length(&mut self, column_index_length: i32) {
self.column_index_length = Some(column_index_length);
}

/// Method to convert from Thrift.
pub fn from_thrift(column_descr: ColumnDescPtr, cc: ColumnChunk) -> Result<Self> {
if cc.meta_data.is_none() {
Expand Down Expand Up @@ -573,6 +625,8 @@ impl ColumnChunkMetaData {
offset_index_length,
column_index_offset,
column_index_length,
column_index: None,
offset_index: None,
};
Ok(result)
}
Expand Down Expand Up @@ -785,10 +839,113 @@ impl ColumnChunkMetaDataBuilder {
offset_index_length: self.offset_index_length,
column_index_offset: self.column_index_offset,
column_index_length: self.column_index_length,
column_index: None,
offset_index: None,
})
}
}

/// Builder for column index
pub struct ColumnIndexBuilder {
null_pages: Vec<bool>,
min_values: Vec<Vec<u8>>,
max_values: Vec<Vec<u8>>,
// TODO: calc the order for all pages in this column
boundary_order: BoundaryOrder,
null_counts: Vec<i64>,
// If one page can't get build index, need to ignore all index in this column
valid: bool,
}

impl ColumnIndexBuilder {
pub fn new() -> Self {
ColumnIndexBuilder {
null_pages: Vec::new(),
min_values: Vec::new(),
max_values: Vec::new(),
boundary_order: BoundaryOrder::Unordered,
null_counts: Vec::new(),
valid: true,
}
}

pub fn append(
&mut self,
null_page: bool,
min_value: &[u8],
max_value: &[u8],
null_count: i64,
) {
self.null_pages.push(null_page);
self.min_values.push(min_value.to_vec());
self.max_values.push(max_value.to_vec());
self.null_counts.push(null_count);
}

pub fn to_invalid(&mut self) {
self.valid = false;
}

pub fn valid(&self) -> bool {
self.valid
}

/// Build and get the thrift metadata of column index
pub fn build_to_thrift(self) -> ColumnIndex {
ColumnIndex::new(
self.null_pages,
self.min_values,
self.max_values,
self.boundary_order,
self.null_counts,
)
}
}

/// Builder for offset index
pub struct OffsetIndexBuilder {
offset_array: Vec<i64>,
compressed_page_size_array: Vec<i32>,
first_row_index_array: Vec<i64>,
current_first_row_index: i64,
}

impl OffsetIndexBuilder {
pub fn new() -> Self {
OffsetIndexBuilder {
offset_array: Vec::new(),
compressed_page_size_array: Vec::new(),
first_row_index_array: Vec::new(),
current_first_row_index: 0,
}
}

pub fn append_row_count(&mut self, row_count: i64) {
let current_page_row_index = self.current_first_row_index;
self.first_row_index_array.push(current_page_row_index);
self.current_first_row_index += row_count;
}

pub fn append_offset_and_size(&mut self, offset: i64, compressed_page_size: i32) {
self.offset_array.push(offset);
self.compressed_page_size_array.push(compressed_page_size);
}

/// Build and get the thrift metadata of offset index
pub fn build_to_thrift(self) -> OffsetIndex {
let locations = self
.offset_array
.iter()
.zip(self.compressed_page_size_array.iter())
.zip(self.first_row_index_array.iter())
.map(|((offset, size), row_index)| {
PageLocation::new(*offset, *size, *row_index)
})
.collect::<Vec<_>>();
OffsetIndex::new(locations)
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
1 change: 1 addition & 0 deletions parquet/src/file/serialized_reader.rs
Expand Up @@ -250,6 +250,7 @@ impl<R: 'static + ChunkReader> SerializedFileReader<R> {
if options.enable_page_index {
//Todo for now test data `data_index_bloom_encoding_stats.parquet` only have one rowgroup
//support multi after create multi-RG test data.
// TODO: just read the first row group, this is some error code
let cols = metadata.row_group(0);
let columns_indexes =
index_reader::read_columns_indexes(&chunk_reader, cols.columns())?;
Expand Down

0 comments on commit 735f4ed

Please sign in to comment.