From 0268bba4c01c2b83986c023258ad4405c29cabff Mon Sep 17 00:00:00 2001 From: Dan Harris <1327726+thinkharderdev@users.noreply.github.com> Date: Mon, 10 Oct 2022 04:25:30 -0400 Subject: [PATCH] Fix page size on dictionary fallback (#2854) * Fix page size on dictionary fallback * Make test deterministic * Comments and improve test --- parquet/src/arrow/arrow_writer/byte_array.rs | 5 +- parquet/src/arrow/arrow_writer/mod.rs | 66 ++++++++++++++++++++ 2 files changed, 70 insertions(+), 1 deletion(-) diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs b/parquet/src/arrow/arrow_writer/byte_array.rs index a25bd8d5c50..9ea3767a28e 100644 --- a/parquet/src/arrow/arrow_writer/byte_array.rs +++ b/parquet/src/arrow/arrow_writer/byte_array.rs @@ -551,7 +551,10 @@ where match &mut encoder.dict_encoder { Some(dict_encoder) => dict_encoder.encode(values, indices), - None => encoder.fallback.encode(values, indices), + None => { + encoder.num_values += indices.len(); + encoder.fallback.encode(values, indices) + } } } diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 2c3d498bcca..b5c0b50127d 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -624,6 +624,7 @@ mod tests { use crate::basic::Encoding; use crate::file::metadata::ParquetMetaData; + use crate::file::page_index::index_reader::read_pages_locations; use crate::file::properties::WriterVersion; use crate::file::{ reader::{FileReader, SerializedFileReader}, @@ -1108,6 +1109,71 @@ mod tests { roundtrip(batch, Some(SMALL_SIZE / 2)); } + #[test] + fn arrow_writer_page_size() { + let schema = + Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)])); + + let mut builder = StringBuilder::with_capacity(100, 329 * 10_000); + + // Generate an array of 10 unique 10 character string + for i in 0..10 { + let value = i + .to_string() + .repeat(10) + .chars() + .take(10) + .collect::(); + + builder.append_value(value); + } + + let array = Arc::new(builder.finish()); + + let batch = RecordBatch::try_new(schema, vec![array]).unwrap(); + + let file = tempfile::tempfile().unwrap(); + + // Set everything very low so we fallback to PLAIN encoding after the first row + let props = WriterProperties::builder() + .set_data_pagesize_limit(1) + .set_dictionary_pagesize_limit(1) + .set_write_batch_size(1) + .build(); + + let mut writer = + ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props)) + .expect("Unable to write file"); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + let reader = SerializedFileReader::new(file.try_clone().unwrap()).unwrap(); + + let column = reader.metadata().row_group(0).columns(); + + assert_eq!(column.len(), 1); + + // We should write one row before falling back to PLAIN encoding so there should still be a + // dictionary page. + assert!( + column[0].dictionary_page_offset().is_some(), + "Expected a dictionary page" + ); + + let page_locations = read_pages_locations(&file, column).unwrap(); + + let offset_index = page_locations[0].clone(); + + // We should fallback to PLAIN encoding after the first row and our max page size is 1 bytes + // so we expect one dictionary encoded page and then a page per row thereafter. + assert_eq!( + offset_index.len(), + 10, + "Expected 9 pages but got {:#?}", + offset_index + ); + } + const SMALL_SIZE: usize = 7; fn roundtrip(