Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add column index writer for parquet #1935

Merged
merged 1 commit into from Jun 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
172 changes: 159 additions & 13 deletions parquet/src/column/writer.rs
Expand Up @@ -16,6 +16,7 @@
// under the License.

//! Contains column writer API.
use parquet_format::{ColumnIndex, OffsetIndex};
use std::{cmp, collections::VecDeque, convert::TryFrom, marker::PhantomData};

use crate::basic::{Compression, ConvertedType, Encoding, LogicalType, PageType, Type};
Expand All @@ -29,6 +30,7 @@ use crate::encodings::{
levels::{max_buffer_size, LevelEncoder},
};
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{ColumnIndexBuilder, OffsetIndexBuilder};
use crate::file::statistics::Statistics;
use crate::file::{
metadata::ColumnChunkMetaData,
Expand Down Expand Up @@ -162,6 +164,14 @@ pub fn get_typed_column_writer_mut<'a, 'b: 'a, T: DataType>(
})
}

type ColumnCloseResult = (
liukun4515 marked this conversation as resolved.
Show resolved Hide resolved
u64,
u64,
ColumnChunkMetaData,
Option<ColumnIndex>,
Option<OffsetIndex>,
);

/// Typed column writer for a primitive column.
pub struct ColumnWriterImpl<'a, T: DataType> {
// Column writer properties
Expand Down Expand Up @@ -198,6 +208,9 @@ pub struct ColumnWriterImpl<'a, T: DataType> {
rep_levels_sink: Vec<i16>,
data_pages: VecDeque<CompressedPage>,
_phantom: PhantomData<T>,
// column index and offset index
column_index_builder: ColumnIndexBuilder,
offset_index_builder: OffsetIndexBuilder,
}

impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
Expand Down Expand Up @@ -261,6 +274,8 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
num_column_nulls: 0,
column_distinct_count: None,
_phantom: PhantomData,
column_index_builder: ColumnIndexBuilder::new(),
offset_index_builder: OffsetIndexBuilder::new(),
}
}

Expand Down Expand Up @@ -416,7 +431,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {

/// Finalises writes and closes the column writer.
/// Returns total bytes written, total rows written and column chunk metadata.
pub fn close(mut self) -> Result<(u64, u64, ColumnChunkMetaData)> {
pub fn close(mut self) -> Result<ColumnCloseResult> {
if self.dict_encoder.is_some() {
self.write_dictionary_page()?;
}
Expand All @@ -425,7 +440,22 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
self.dict_encoder = None;
self.page_writer.close()?;

Ok((self.total_bytes_written, self.total_rows_written, metadata))
let (column_index, offset_index) = if self.column_index_builder.valid() {
// build the column and offset index
let column_index = self.column_index_builder.build_to_thrift();
let offset_index = self.offset_index_builder.build_to_thrift();
(Some(column_index), Some(offset_index))
} else {
(None, None)
};

Ok((
self.total_bytes_written,
self.total_rows_written,
metadata,
column_index,
offset_index,
))
}

/// Writes mini batch of values, definition and repetition levels.
Expand Down Expand Up @@ -593,6 +623,42 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
Ok(())
}

/// Update the column index and offset index when adding the data page
fn update_column_offset_index(&mut self, page_statistics: &Option<Statistics>) {
// update the column index
let null_page = (self.num_buffered_rows as u64) == self.num_page_nulls;
// a page contains only null values,
// and writers have to set the corresponding entries in min_values and max_values to byte[0]
if null_page && self.column_index_builder.valid() {
self.column_index_builder.append(
null_page,
&[0; 1],
&[0; 1],
self.num_page_nulls as i64,
);
} else if self.column_index_builder.valid() {
// from page statistics
// If can't get the page statistics, ignore this column/offset index for this column chunk
match &page_statistics {
None => {
self.column_index_builder.to_invalid();
}
Some(stat) => {
self.column_index_builder.append(
null_page,
stat.min_bytes(),
stat.max_bytes(),
self.num_page_nulls as i64,
);
}
}
}

// update the offset index
self.offset_index_builder
.append_row_count(self.num_buffered_rows as i64);
}

/// Adds data page.
/// Data page is either buffered in case of dictionary encoding or written directly.
fn add_data_page(&mut self, calculate_page_stat: bool) -> Result<()> {
Expand Down Expand Up @@ -622,6 +688,9 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
None
};

// update column and offset index
self.update_column_offset_index(&page_statistics);

let compressed_page = match self.props.writer_version() {
WriterVersion::PARQUET_1_0 => {
let mut buffer = vec![];
Expand Down Expand Up @@ -700,8 +769,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
buf: ByteBufferPtr::new(buffer),
num_values: self.num_buffered_values,
encoding,
num_nulls: self.num_buffered_values
- self.num_buffered_encoded_values,
num_nulls: self.num_page_nulls as u32,
num_rows: self.num_buffered_rows,
def_levels_byte_len: def_levels_byte_len as u32,
rep_levels_byte_len: rep_levels_byte_len as u32,
Expand Down Expand Up @@ -830,6 +898,12 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
#[inline]
fn write_data_page(&mut self, page: CompressedPage) -> Result<()> {
let page_spec = self.page_writer.write_page(page)?;
// update offset index
// compressed_size = header_size + compressed_data_size
self.offset_index_builder.append_offset_and_size(
page_spec.offset as i64,
page_spec.compressed_size as i32,
);
self.update_metrics_for_page(page_spec);
Ok(())
}
Expand Down Expand Up @@ -865,6 +939,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {

let page_spec = self.page_writer.write_page(compressed_page)?;
self.update_metrics_for_page(page_spec);
// For the directory page, don't need to update column/offset index.
Ok(())
}

Expand Down Expand Up @@ -1133,6 +1208,7 @@ fn compare_greater_byte_array_decimals(a: &[u8], b: &[u8]) -> bool {

#[cfg(test)]
mod tests {
use parquet_format::BoundaryOrder;
use rand::distributions::uniform::SampleUniform;
use std::sync::Arc;

Expand Down Expand Up @@ -1256,7 +1332,7 @@ mod tests {
.write_batch(&[true, false, true, false], None, None)
.unwrap();

let (bytes_written, rows_written, metadata) = writer.close().unwrap();
let (bytes_written, rows_written, metadata, _, _) = writer.close().unwrap();
// PlainEncoder uses bit writer to write boolean values, which all fit into 1
// byte.
assert_eq!(bytes_written, 1);
Expand Down Expand Up @@ -1529,7 +1605,7 @@ mod tests {
let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();

let (bytes_written, rows_written, metadata) = writer.close().unwrap();
let (bytes_written, rows_written, metadata, _, _) = writer.close().unwrap();
assert_eq!(bytes_written, 20);
assert_eq!(rows_written, 4);
assert_eq!(
Expand Down Expand Up @@ -1586,7 +1662,7 @@ mod tests {
None,
)
.unwrap();
let (_bytes_written, _rows_written, metadata) = writer.close().unwrap();
let (_bytes_written, _rows_written, metadata, _, _) = writer.close().unwrap();
if let Some(stats) = metadata.statistics() {
assert!(stats.has_min_max_set());
if let Statistics::ByteArray(stats) = stats {
Expand Down Expand Up @@ -1620,7 +1696,7 @@ mod tests {
Int32Type,
>(page_writer, 0, 0, props);
writer.write_batch(&[0, 1, 2, 3, 4, 5], None, None).unwrap();
let (_bytes_written, _rows_written, metadata) = writer.close().unwrap();
let (_bytes_written, _rows_written, metadata, _, _) = writer.close().unwrap();
if let Some(stats) = metadata.statistics() {
assert!(stats.has_min_max_set());
if let Statistics::Int32(stats) = stats {
Expand Down Expand Up @@ -1651,7 +1727,7 @@ mod tests {
)
.unwrap();

let (bytes_written, rows_written, metadata) = writer.close().unwrap();
let (bytes_written, rows_written, metadata, _, _) = writer.close().unwrap();
assert_eq!(bytes_written, 20);
assert_eq!(rows_written, 4);
assert_eq!(
Expand Down Expand Up @@ -1835,7 +1911,7 @@ mod tests {
let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
writer.write_batch(data, None, None).unwrap();
let (bytes_written, _, _) = writer.close().unwrap();
let (bytes_written, _, _, _, _) = writer.close().unwrap();

// Read pages and check the sequence
let source = FileSource::new(&file, 0, bytes_written as usize);
Expand Down Expand Up @@ -2068,6 +2144,75 @@ mod tests {
),);
}

#[test]
fn test_column_offset_index_metadata() {
// write data
// and check the offset index and column index
let page_writer = get_test_page_writer();
let props = Arc::new(WriterProperties::builder().build());
let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
// first page
writer.flush_data_pages().unwrap();
// second page
writer.write_batch(&[4, 8, 2, -5], None, None).unwrap();

let (_, rows_written, metadata, column_index, offset_index) =
writer.close().unwrap();
let column_index = match column_index {
None => {
panic!("Can't fine the column index");
}
Some(column_index) => column_index,
};
let offset_index = match offset_index {
None => {
panic!("Can't find the offset index");
}
Some(offset_index) => offset_index,
};

assert_eq!(8, rows_written);

// column index
assert_eq!(2, column_index.null_pages.len());
assert_eq!(2, offset_index.page_locations.len());
assert_eq!(BoundaryOrder::Unordered, column_index.boundary_order);
for idx in 0..2 {
assert!(!column_index.null_pages[idx]);
assert_eq!(0, column_index.null_counts.as_ref().unwrap()[idx]);
}

if let Some(stats) = metadata.statistics() {
assert!(stats.has_min_max_set());
assert_eq!(stats.null_count(), 0);
assert_eq!(stats.distinct_count(), None);
if let Statistics::Int32(stats) = stats {
// first page is [1,2,3,4]
// second page is [-5,2,4,8]
assert_eq!(stats.min_bytes(), column_index.min_values[1].as_slice());
assert_eq!(
stats.max_bytes(),
column_index.max_values.get(1).unwrap().as_slice()
);
} else {
panic!("expecting Statistics::Int32");
}
} else {
panic!("metadata missing statistics");
}

// page location
assert_eq!(
0,
offset_index.page_locations.get(0).unwrap().first_row_index
);
assert_eq!(
4,
offset_index.page_locations.get(1).unwrap().first_row_index
);
}

/// Performs write-read roundtrip with randomly generated values and levels.
/// `max_size` is maximum number of values or levels (if `max_def_level` > 0) to write
/// for a column.
Expand Down Expand Up @@ -2149,7 +2294,8 @@ mod tests {

let values_written = writer.write_batch(values, def_levels, rep_levels).unwrap();
assert_eq!(values_written, values.len());
let (bytes_written, rows_written, column_metadata) = writer.close().unwrap();
let (bytes_written, rows_written, column_metadata, _, _) =
writer.close().unwrap();

let source = FileSource::new(&file, 0, bytes_written as usize);
let page_reader = Box::new(
Expand Down Expand Up @@ -2215,7 +2361,7 @@ mod tests {
let props = Arc::new(props);
let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
writer.write_batch(values, None, None).unwrap();
let (_, _, metadata) = writer.close().unwrap();
let (_, _, metadata, _, _) = writer.close().unwrap();
metadata
}

Expand Down Expand Up @@ -2327,7 +2473,7 @@ mod tests {
let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
writer.write_batch(values, None, None).unwrap();

let (_bytes_written, _rows_written, metadata) = writer.close().unwrap();
let (_bytes_written, _rows_written, metadata, _, _) = writer.close().unwrap();
if let Some(stats) = metadata.statistics() {
stats.clone()
} else {
Expand Down