Skip to content

Commit

Permalink
add column index and offset index
Browse files Browse the repository at this point in the history
  • Loading branch information
liukun4515 committed Jun 29, 2022
1 parent 9059cbf commit cf7cbfc
Show file tree
Hide file tree
Showing 4 changed files with 461 additions and 58 deletions.
4 changes: 2 additions & 2 deletions parquet/src/arrow/arrow_writer/mod.rs
Expand Up @@ -34,7 +34,7 @@ use super::schema::{

use crate::column::writer::ColumnWriter;
use crate::errors::{ParquetError, Result};
use crate::file::metadata::RowGroupMetaDataPtr;
use crate::file::metadata::RowGroupMetaData;
use crate::file::properties::WriterProperties;
use crate::file::writer::{SerializedColumnWriter, SerializedRowGroupWriter};
use crate::{data_type::*, file::writer::SerializedFileWriter};
Expand Down Expand Up @@ -98,7 +98,7 @@ impl<W: Write> ArrowWriter<W> {
}

/// Returns metadata for any flushed row groups
pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] {
pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
self.writer.flushed_row_groups()
}

Expand Down
172 changes: 159 additions & 13 deletions parquet/src/column/writer.rs
Expand Up @@ -16,6 +16,7 @@
// under the License.

//! Contains column writer API.
use parquet_format::{ColumnIndex, OffsetIndex};
use std::{cmp, collections::VecDeque, convert::TryFrom, marker::PhantomData};

use crate::basic::{Compression, ConvertedType, Encoding, LogicalType, PageType, Type};
Expand All @@ -29,6 +30,7 @@ use crate::encodings::{
levels::{max_buffer_size, LevelEncoder},
};
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{ColumnIndexBuilder, OffsetIndexBuilder};
use crate::file::statistics::Statistics;
use crate::file::{
metadata::ColumnChunkMetaData,
Expand Down Expand Up @@ -162,6 +164,14 @@ pub fn get_typed_column_writer_mut<'a, 'b: 'a, T: DataType>(
})
}

type ColumnCloseResult = (
u64,
u64,
ColumnChunkMetaData,
Option<ColumnIndex>,
Option<OffsetIndex>,
);

/// Typed column writer for a primitive column.
pub struct ColumnWriterImpl<'a, T: DataType> {
// Column writer properties
Expand Down Expand Up @@ -198,6 +208,9 @@ pub struct ColumnWriterImpl<'a, T: DataType> {
rep_levels_sink: Vec<i16>,
data_pages: VecDeque<CompressedPage>,
_phantom: PhantomData<T>,
// column index and offset index
column_index_builder: ColumnIndexBuilder,
offset_index_builder: OffsetIndexBuilder,
}

impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
Expand Down Expand Up @@ -261,6 +274,8 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
num_column_nulls: 0,
column_distinct_count: None,
_phantom: PhantomData,
column_index_builder: ColumnIndexBuilder::new(),
offset_index_builder: OffsetIndexBuilder::new(),
}
}

Expand Down Expand Up @@ -416,7 +431,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {

/// Finalises writes and closes the column writer.
/// Returns total bytes written, total rows written and column chunk metadata.
pub fn close(mut self) -> Result<(u64, u64, ColumnChunkMetaData)> {
pub fn close(mut self) -> Result<ColumnCloseResult> {
if self.dict_encoder.is_some() {
self.write_dictionary_page()?;
}
Expand All @@ -425,7 +440,22 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
self.dict_encoder = None;
self.page_writer.close()?;

Ok((self.total_bytes_written, self.total_rows_written, metadata))
let (column_index, offset_index) = if self.column_index_builder.valid() {
// build the column and offset index
let column_index = self.column_index_builder.build_to_thrift();
let offset_index = self.offset_index_builder.build_to_thrift();
(Some(column_index), Some(offset_index))
} else {
(None, None)
};

Ok((
self.total_bytes_written,
self.total_rows_written,
metadata,
column_index,
offset_index,
))
}

/// Writes mini batch of values, definition and repetition levels.
Expand Down Expand Up @@ -593,6 +623,42 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
Ok(())
}

/// Update the column index and offset index when adding the data page
fn update_column_offset_index(&mut self, page_statistics: &Option<Statistics>) {
// update the column index
let null_page = (self.num_buffered_rows as u64) == self.num_page_nulls;
// a page contains only null values,
// and writers have to set the corresponding entries in min_values and max_values to byte[0]
if null_page && self.column_index_builder.valid() {
self.column_index_builder.append(
null_page,
&[0; 1],
&[0; 1],
self.num_page_nulls as i64,
);
} else if self.column_index_builder.valid() {
// from page statistics
// If can't get the page statistics, ignore this column/offset index for this column chunk
match &page_statistics {
None => {
self.column_index_builder.to_invalid();
}
Some(stat) => {
self.column_index_builder.append(
null_page,
stat.min_bytes(),
stat.max_bytes(),
self.num_page_nulls as i64,
);
}
}
}

// update the offset index
self.offset_index_builder
.append_row_count(self.num_buffered_rows as i64);
}

/// Adds data page.
/// Data page is either buffered in case of dictionary encoding or written directly.
fn add_data_page(&mut self, calculate_page_stat: bool) -> Result<()> {
Expand Down Expand Up @@ -622,6 +688,9 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
None
};

// update column and offset index
self.update_column_offset_index(&page_statistics);

let compressed_page = match self.props.writer_version() {
WriterVersion::PARQUET_1_0 => {
let mut buffer = vec![];
Expand Down Expand Up @@ -700,8 +769,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
buf: ByteBufferPtr::new(buffer),
num_values: self.num_buffered_values,
encoding,
num_nulls: self.num_buffered_values
- self.num_buffered_encoded_values,
num_nulls: self.num_page_nulls as u32,
num_rows: self.num_buffered_rows,
def_levels_byte_len: def_levels_byte_len as u32,
rep_levels_byte_len: rep_levels_byte_len as u32,
Expand Down Expand Up @@ -830,6 +898,12 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
#[inline]
fn write_data_page(&mut self, page: CompressedPage) -> Result<()> {
let page_spec = self.page_writer.write_page(page)?;
// update offset index
// compressed_size = header_size + compressed_data_size
self.offset_index_builder.append_offset_and_size(
page_spec.offset as i64,
page_spec.compressed_size as i32,
);
self.update_metrics_for_page(page_spec);
Ok(())
}
Expand Down Expand Up @@ -865,6 +939,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {

let page_spec = self.page_writer.write_page(compressed_page)?;
self.update_metrics_for_page(page_spec);
// For the directory page, don't need to update column/offset index.
Ok(())
}

Expand Down Expand Up @@ -1133,6 +1208,7 @@ fn compare_greater_byte_array_decimals(a: &[u8], b: &[u8]) -> bool {

#[cfg(test)]
mod tests {
use parquet_format::BoundaryOrder;
use rand::distributions::uniform::SampleUniform;
use std::sync::Arc;

Expand Down Expand Up @@ -1256,7 +1332,7 @@ mod tests {
.write_batch(&[true, false, true, false], None, None)
.unwrap();

let (bytes_written, rows_written, metadata) = writer.close().unwrap();
let (bytes_written, rows_written, metadata, _, _) = writer.close().unwrap();
// PlainEncoder uses bit writer to write boolean values, which all fit into 1
// byte.
assert_eq!(bytes_written, 1);
Expand Down Expand Up @@ -1529,7 +1605,7 @@ mod tests {
let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();

let (bytes_written, rows_written, metadata) = writer.close().unwrap();
let (bytes_written, rows_written, metadata, _, _) = writer.close().unwrap();
assert_eq!(bytes_written, 20);
assert_eq!(rows_written, 4);
assert_eq!(
Expand Down Expand Up @@ -1586,7 +1662,7 @@ mod tests {
None,
)
.unwrap();
let (_bytes_written, _rows_written, metadata) = writer.close().unwrap();
let (_bytes_written, _rows_written, metadata, _, _) = writer.close().unwrap();
if let Some(stats) = metadata.statistics() {
assert!(stats.has_min_max_set());
if let Statistics::ByteArray(stats) = stats {
Expand Down Expand Up @@ -1620,7 +1696,7 @@ mod tests {
Int32Type,
>(page_writer, 0, 0, props);
writer.write_batch(&[0, 1, 2, 3, 4, 5], None, None).unwrap();
let (_bytes_written, _rows_written, metadata) = writer.close().unwrap();
let (_bytes_written, _rows_written, metadata, _, _) = writer.close().unwrap();
if let Some(stats) = metadata.statistics() {
assert!(stats.has_min_max_set());
if let Statistics::Int32(stats) = stats {
Expand Down Expand Up @@ -1651,7 +1727,7 @@ mod tests {
)
.unwrap();

let (bytes_written, rows_written, metadata) = writer.close().unwrap();
let (bytes_written, rows_written, metadata, _, _) = writer.close().unwrap();
assert_eq!(bytes_written, 20);
assert_eq!(rows_written, 4);
assert_eq!(
Expand Down Expand Up @@ -1835,7 +1911,7 @@ mod tests {
let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
writer.write_batch(data, None, None).unwrap();
let (bytes_written, _, _) = writer.close().unwrap();
let (bytes_written, _, _, _, _) = writer.close().unwrap();

// Read pages and check the sequence
let source = FileSource::new(&file, 0, bytes_written as usize);
Expand Down Expand Up @@ -2068,6 +2144,75 @@ mod tests {
),);
}

#[test]
fn test_column_offset_index_metadata() {
// write data
// and check the offset index and column index
let page_writer = get_test_page_writer();
let props = Arc::new(WriterProperties::builder().build());
let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
// first page
writer.flush_data_pages().unwrap();
// second page
writer.write_batch(&[4, 8, 2, -5], None, None).unwrap();

let (_, rows_written, metadata, column_index, offset_index) =
writer.close().unwrap();
let column_index = match column_index {
None => {
panic!("Can't fine the column index");
}
Some(column_index) => column_index,
};
let offset_index = match offset_index {
None => {
panic!("Can't find the offset index");
}
Some(offset_index) => offset_index,
};

assert_eq!(8, rows_written);

// column index
assert_eq!(2, column_index.null_pages.len());
assert_eq!(2, offset_index.page_locations.len());
assert_eq!(BoundaryOrder::Unordered, column_index.boundary_order);
for idx in 0..2 {
assert!(!column_index.null_pages[idx]);
assert_eq!(0, column_index.null_counts.as_ref().unwrap()[idx]);
}

if let Some(stats) = metadata.statistics() {
assert!(stats.has_min_max_set());
assert_eq!(stats.null_count(), 0);
assert_eq!(stats.distinct_count(), None);
if let Statistics::Int32(stats) = stats {
// first page is [1,2,3,4]
// second page is [-5,2,4,8]
assert_eq!(stats.min_bytes(), column_index.min_values[1].as_slice());
assert_eq!(
stats.max_bytes(),
column_index.max_values.get(1).unwrap().as_slice()
);
} else {
panic!("expecting Statistics::Int32");
}
} else {
panic!("metadata missing statistics");
}

// page location
assert_eq!(
0,
offset_index.page_locations.get(0).unwrap().first_row_index
);
assert_eq!(
4,
offset_index.page_locations.get(1).unwrap().first_row_index
);
}

/// Performs write-read roundtrip with randomly generated values and levels.
/// `max_size` is maximum number of values or levels (if `max_def_level` > 0) to write
/// for a column.
Expand Down Expand Up @@ -2149,7 +2294,8 @@ mod tests {

let values_written = writer.write_batch(values, def_levels, rep_levels).unwrap();
assert_eq!(values_written, values.len());
let (bytes_written, rows_written, column_metadata) = writer.close().unwrap();
let (bytes_written, rows_written, column_metadata, _, _) =
writer.close().unwrap();

let source = FileSource::new(&file, 0, bytes_written as usize);
let page_reader = Box::new(
Expand Down Expand Up @@ -2215,7 +2361,7 @@ mod tests {
let props = Arc::new(props);
let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
writer.write_batch(values, None, None).unwrap();
let (_, _, metadata) = writer.close().unwrap();
let (_, _, metadata, _, _) = writer.close().unwrap();
metadata
}

Expand Down Expand Up @@ -2327,7 +2473,7 @@ mod tests {
let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
writer.write_batch(values, None, None).unwrap();

let (_bytes_written, _rows_written, metadata) = writer.close().unwrap();
let (_bytes_written, _rows_written, metadata, _, _) = writer.close().unwrap();
if let Some(stats) = metadata.statistics() {
stats.clone()
} else {
Expand Down

0 comments on commit cf7cbfc

Please sign in to comment.