From a85568bb395edb13dca02a80ea382643125cbb5f Mon Sep 17 00:00:00 2001 From: Dan Harris Date: Sun, 9 Oct 2022 09:42:07 -0400 Subject: [PATCH 1/3] Fix page size on dictionary fallback --- parquet/src/arrow/arrow_writer/byte_array.rs | 5 +- parquet/src/arrow/arrow_writer/mod.rs | 51 ++++++++++++++++++++ 2 files changed, 55 insertions(+), 1 deletion(-) diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs b/parquet/src/arrow/arrow_writer/byte_array.rs index a25bd8d5c50..9ea3767a28e 100644 --- a/parquet/src/arrow/arrow_writer/byte_array.rs +++ b/parquet/src/arrow/arrow_writer/byte_array.rs @@ -551,7 +551,10 @@ where match &mut encoder.dict_encoder { Some(dict_encoder) => dict_encoder.encode(values, indices), - None => encoder.fallback.encode(values, indices), + None => { + encoder.num_values += indices.len(); + encoder.fallback.encode(values, indices) + } } } diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 2c3d498bcca..a872c5f85a4 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -609,6 +609,7 @@ mod tests { use super::*; use bytes::Bytes; + use rand::{thread_rng, Rng}; use std::fs::File; use std::sync::Arc; @@ -624,6 +625,7 @@ mod tests { use crate::basic::Encoding; use crate::file::metadata::ParquetMetaData; + use crate::file::page_index::index_reader::read_pages_locations; use crate::file::properties::WriterVersion; use crate::file::{ reader::{FileReader, SerializedFileReader}, @@ -1108,6 +1110,55 @@ mod tests { roundtrip(batch, Some(SMALL_SIZE / 2)); } + #[test] + fn arrow_writer_page_size() { + let mut rng = thread_rng(); + let schema = + Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)])); + + let mut builder = StringBuilder::with_capacity(1_000, 2 * 1_000); + + for _ in 0..10_000 { + let value = (0..200) + .map(|_| rng.gen_range(b'a'..=b'z') as char) + .collect::(); + + builder.append_value(value); + } + + let array = Arc::new(builder.finish()); + + let batch = RecordBatch::try_new(schema, vec![array]).unwrap(); + + let file = tempfile::tempfile().unwrap(); + + let props = WriterProperties::builder() + .set_max_row_group_size(usize::MAX) + .set_data_pagesize_limit(256) + .build(); + + let mut writer = + ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props)) + .expect("Unable to write file"); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + let reader = SerializedFileReader::new(file.try_clone().unwrap()).unwrap(); + + let column = reader.metadata().row_group(0).columns(); + + let page_locations = read_pages_locations(&file, column).unwrap(); + + let offset_index = page_locations[0].clone(); + + assert_eq!( + offset_index.len(), + 5, + "Expected more than two pages but got {:#?}", + offset_index + ); + } + const SMALL_SIZE: usize = 7; fn roundtrip( From b627724285b1b88a3e925d52f62552c111b9198a Mon Sep 17 00:00:00 2001 From: Dan Harris Date: Sun, 9 Oct 2022 11:32:42 -0400 Subject: [PATCH 2/3] Make test deterministic --- parquet/src/arrow/arrow_writer/mod.rs | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index a872c5f85a4..edc07ec9ab0 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -609,7 +609,6 @@ mod tests { use super::*; use bytes::Bytes; - use rand::{thread_rng, Rng}; use std::fs::File; use std::sync::Arc; @@ -1112,16 +1111,13 @@ mod tests { #[test] fn arrow_writer_page_size() { - let mut rng = thread_rng(); let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)])); - let mut builder = StringBuilder::with_capacity(1_000, 2 * 1_000); + let mut builder = StringBuilder::with_capacity(10_000, 2 * 10_000); - for _ in 0..10_000 { - let value = (0..200) - .map(|_| rng.gen_range(b'a'..=b'z') as char) - .collect::(); + for i in 0..10_000 { + let value = i.to_string().repeat(100); builder.append_value(value); } @@ -1153,8 +1149,8 @@ mod tests { assert_eq!( offset_index.len(), - 5, - "Expected more than two pages but got {:#?}", + 8, + "Expected 9 pages but got {:#?}", offset_index ); } From 606e4f8bc824217090fafa24f27a8c93c2918cc7 Mon Sep 17 00:00:00 2001 From: Dan Harris Date: Sun, 9 Oct 2022 11:55:22 -0400 Subject: [PATCH 3/3] Comments and improve test --- parquet/src/arrow/arrow_writer/mod.rs | 31 +++++++++++++++++++++------ 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index edc07ec9ab0..b5c0b50127d 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -1114,10 +1114,16 @@ mod tests { let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)])); - let mut builder = StringBuilder::with_capacity(10_000, 2 * 10_000); + let mut builder = StringBuilder::with_capacity(100, 329 * 10_000); - for i in 0..10_000 { - let value = i.to_string().repeat(100); + // Generate an array of 10 unique 10 character string + for i in 0..10 { + let value = i + .to_string() + .repeat(10) + .chars() + .take(10) + .collect::(); builder.append_value(value); } @@ -1128,9 +1134,11 @@ mod tests { let file = tempfile::tempfile().unwrap(); + // Set everything very low so we fallback to PLAIN encoding after the first row let props = WriterProperties::builder() - .set_max_row_group_size(usize::MAX) - .set_data_pagesize_limit(256) + .set_data_pagesize_limit(1) + .set_dictionary_pagesize_limit(1) + .set_write_batch_size(1) .build(); let mut writer = @@ -1143,13 +1151,24 @@ mod tests { let column = reader.metadata().row_group(0).columns(); + assert_eq!(column.len(), 1); + + // We should write one row before falling back to PLAIN encoding so there should still be a + // dictionary page. + assert!( + column[0].dictionary_page_offset().is_some(), + "Expected a dictionary page" + ); + let page_locations = read_pages_locations(&file, column).unwrap(); let offset_index = page_locations[0].clone(); + // We should fallback to PLAIN encoding after the first row and our max page size is 1 bytes + // so we expect one dictionary encoded page and then a page per row thereafter. assert_eq!( offset_index.len(), - 8, + 10, "Expected 9 pages but got {:#?}", offset_index );