Skip to content

Commit

Permalink
add column index and offset index
Browse files Browse the repository at this point in the history
  • Loading branch information
liukun4515 committed Jun 28, 2022
1 parent 9059cbf commit f76ff65
Show file tree
Hide file tree
Showing 4 changed files with 387 additions and 17 deletions.
4 changes: 2 additions & 2 deletions parquet/src/arrow/arrow_writer/mod.rs
Expand Up @@ -34,7 +34,7 @@ use super::schema::{

use crate::column::writer::ColumnWriter;
use crate::errors::{ParquetError, Result};
use crate::file::metadata::RowGroupMetaDataPtr;
use crate::file::metadata::RowGroupMetaData;
use crate::file::properties::WriterProperties;
use crate::file::writer::{SerializedColumnWriter, SerializedRowGroupWriter};
use crate::{data_type::*, file::writer::SerializedFileWriter};
Expand Down Expand Up @@ -98,7 +98,7 @@ impl<W: Write> ArrowWriter<W> {
}

/// Returns metadata for any flushed row groups
pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] {
pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
self.writer.flushed_row_groups()
}

Expand Down
141 changes: 138 additions & 3 deletions parquet/src/column/writer.rs
Expand Up @@ -29,6 +29,7 @@ use crate::encodings::{
levels::{max_buffer_size, LevelEncoder},
};
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{ColumnIndexBuilder, OffsetIndexBuilder};
use crate::file::statistics::Statistics;
use crate::file::{
metadata::ColumnChunkMetaData,
Expand Down Expand Up @@ -198,6 +199,9 @@ pub struct ColumnWriterImpl<'a, T: DataType> {
rep_levels_sink: Vec<i16>,
data_pages: VecDeque<CompressedPage>,
_phantom: PhantomData<T>,
// column index and offset index
column_index_builder: ColumnIndexBuilder,
offset_index_builder: OffsetIndexBuilder,
}

impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
Expand Down Expand Up @@ -261,6 +265,8 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
num_column_nulls: 0,
column_distinct_count: None,
_phantom: PhantomData,
column_index_builder: ColumnIndexBuilder::new(),
offset_index_builder: OffsetIndexBuilder::new(),
}
}

Expand Down Expand Up @@ -421,10 +427,19 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
self.write_dictionary_page()?;
}
self.flush_data_pages()?;
let metadata = self.write_column_metadata()?;
let mut metadata = self.write_column_metadata()?;
self.dict_encoder = None;
self.page_writer.close()?;

if self.column_index_builder.valid() {
// build the column and offset index
let column_index = self.column_index_builder.build_to_thrift();
let offset_index = self.offset_index_builder.build_to_thrift();
// set the column and offset index to the column metadata
metadata.set_column_index(Some(column_index));
metadata.set_offset_index(Some(offset_index));
}

Ok((self.total_bytes_written, self.total_rows_written, metadata))
}

Expand Down Expand Up @@ -593,6 +608,42 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
Ok(())
}

/// Update the column index and offset index when adding the data page
fn update_column_offset_index(&mut self, page_statistics: &Option<Statistics>) {
// update the column index
let null_page = (self.num_buffered_rows as u64) == self.num_page_nulls;
// a page contains only null values,
// and writers have to set the corresponding entries in min_values and max_values to byte[0]
if null_page && self.column_index_builder.valid() {
self.column_index_builder.append(
null_page,
&[0; 1],
&[0; 1],
self.num_page_nulls as i64,
);
} else if self.column_index_builder.valid() {
// from page statistics
// If can't get the page statistics, ignore this column/offset index for this column chunk
match &page_statistics {
None => {
self.column_index_builder.to_invalid();
}
Some(stat) => {
self.column_index_builder.append(
null_page,
stat.min_bytes(),
stat.max_bytes(),
self.num_page_nulls as i64,
);
}
}
}

// update the offset index
self.offset_index_builder
.append_row_count(self.num_buffered_rows as i64);
}

/// Adds data page.
/// Data page is either buffered in case of dictionary encoding or written directly.
fn add_data_page(&mut self, calculate_page_stat: bool) -> Result<()> {
Expand Down Expand Up @@ -622,6 +673,9 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
None
};

// update column and offset index
self.update_column_offset_index(&page_statistics);

let compressed_page = match self.props.writer_version() {
WriterVersion::PARQUET_1_0 => {
let mut buffer = vec![];
Expand Down Expand Up @@ -700,8 +754,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
buf: ByteBufferPtr::new(buffer),
num_values: self.num_buffered_values,
encoding,
num_nulls: self.num_buffered_values
- self.num_buffered_encoded_values,
num_nulls: self.num_page_nulls as u32,
num_rows: self.num_buffered_rows,
def_levels_byte_len: def_levels_byte_len as u32,
rep_levels_byte_len: rep_levels_byte_len as u32,
Expand Down Expand Up @@ -830,6 +883,12 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
#[inline]
fn write_data_page(&mut self, page: CompressedPage) -> Result<()> {
let page_spec = self.page_writer.write_page(page)?;
// update offset index
// compressed_size = header_size + compressed_data_size
self.offset_index_builder.append_offset_and_size(
page_spec.offset as i64,
page_spec.compressed_size as i32,
);
self.update_metrics_for_page(page_spec);
Ok(())
}
Expand Down Expand Up @@ -865,6 +924,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {

let page_spec = self.page_writer.write_page(compressed_page)?;
self.update_metrics_for_page(page_spec);
// For the directory page, don't need to update column/offset index.
Ok(())
}

Expand Down Expand Up @@ -1133,6 +1193,7 @@ fn compare_greater_byte_array_decimals(a: &[u8], b: &[u8]) -> bool {

#[cfg(test)]
mod tests {
use parquet_format::BoundaryOrder;
use rand::distributions::uniform::SampleUniform;
use std::sync::Arc;

Expand Down Expand Up @@ -2068,6 +2129,80 @@ mod tests {
),);
}

#[test]
fn test_column_offset_index_metadata() {
// write data
// and check the offset index and column index
let page_writer = get_test_page_writer();
let props = Arc::new(WriterProperties::builder().build());
let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
// first page
writer.flush_data_pages().unwrap();
// second page
writer.write_batch(&[4, 8, 2, -5], None, None).unwrap();

let (_, rows_written, metadata) = writer.close().unwrap();
let column_index = match metadata.column_index() {
None => {
panic!("Can't fine the column index");
}
Some(column_index) => column_index,
};
let offset_index = match metadata.offset_index() {
None => {
panic!("Can't find the offset index");
}
Some(offset_index) => offset_index,
};

assert_eq!(8, rows_written);

// column index
assert_eq!(2, column_index.null_pages.len());
assert_eq!(2, offset_index.page_locations.len());
assert_eq!(BoundaryOrder::Unordered, column_index.boundary_order);
for idx in 0..2 {
assert_eq!(&false, column_index.null_pages.get(idx).unwrap());
assert_eq!(
&0,
column_index.null_counts.as_ref().unwrap().get(idx).unwrap()
);
}

if let Some(stats) = metadata.statistics() {
assert!(stats.has_min_max_set());
assert_eq!(stats.null_count(), 0);
assert_eq!(stats.distinct_count(), None);
if let Statistics::Int32(stats) = stats {
// first page is [1,2,3,4]
// second page is [-5,2,4,8]
assert_eq!(
stats.min_bytes(),
column_index.min_values.get(1).unwrap().as_slice()
);
assert_eq!(
stats.max_bytes(),
column_index.max_values.get(1).unwrap().as_slice()
);
} else {
panic!("expecting Statistics::Int32");
}
} else {
panic!("metadata missing statistics");
}

// page location
assert_eq!(
0,
offset_index.page_locations.get(0).unwrap().first_row_index
);
assert_eq!(
4,
offset_index.page_locations.get(1).unwrap().first_row_index
);
}

/// Performs write-read roundtrip with randomly generated values and levels.
/// `max_size` is maximum number of values or levels (if `max_def_level` > 0) to write
/// for a column.
Expand Down

0 comments on commit f76ff65

Please sign in to comment.