Skip to content

Commit

Permalink
Fix page size on dictionary fallback (#2854)
Browse files Browse the repository at this point in the history
* Fix page size on dictionary fallback

* Make test deterministic

* Comments and improve test
  • Loading branch information
thinkharderdev committed Oct 10, 2022
1 parent c3aac93 commit 0268bba
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 1 deletion.
5 changes: 4 additions & 1 deletion parquet/src/arrow/arrow_writer/byte_array.rs
Expand Up @@ -551,7 +551,10 @@ where

match &mut encoder.dict_encoder {
Some(dict_encoder) => dict_encoder.encode(values, indices),
None => encoder.fallback.encode(values, indices),
None => {
encoder.num_values += indices.len();
encoder.fallback.encode(values, indices)
}
}
}

Expand Down
66 changes: 66 additions & 0 deletions parquet/src/arrow/arrow_writer/mod.rs
Expand Up @@ -624,6 +624,7 @@ mod tests {

use crate::basic::Encoding;
use crate::file::metadata::ParquetMetaData;
use crate::file::page_index::index_reader::read_pages_locations;
use crate::file::properties::WriterVersion;
use crate::file::{
reader::{FileReader, SerializedFileReader},
Expand Down Expand Up @@ -1108,6 +1109,71 @@ mod tests {
roundtrip(batch, Some(SMALL_SIZE / 2));
}

#[test]
fn arrow_writer_page_size() {
let schema =
Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));

let mut builder = StringBuilder::with_capacity(100, 329 * 10_000);

// Generate an array of 10 unique 10 character string
for i in 0..10 {
let value = i
.to_string()
.repeat(10)
.chars()
.take(10)
.collect::<String>();

builder.append_value(value);
}

let array = Arc::new(builder.finish());

let batch = RecordBatch::try_new(schema, vec![array]).unwrap();

let file = tempfile::tempfile().unwrap();

// Set everything very low so we fallback to PLAIN encoding after the first row
let props = WriterProperties::builder()
.set_data_pagesize_limit(1)
.set_dictionary_pagesize_limit(1)
.set_write_batch_size(1)
.build();

let mut writer =
ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
.expect("Unable to write file");
writer.write(&batch).unwrap();
writer.close().unwrap();

let reader = SerializedFileReader::new(file.try_clone().unwrap()).unwrap();

let column = reader.metadata().row_group(0).columns();

assert_eq!(column.len(), 1);

// We should write one row before falling back to PLAIN encoding so there should still be a
// dictionary page.
assert!(
column[0].dictionary_page_offset().is_some(),
"Expected a dictionary page"
);

let page_locations = read_pages_locations(&file, column).unwrap();

let offset_index = page_locations[0].clone();

// We should fallback to PLAIN encoding after the first row and our max page size is 1 bytes
// so we expect one dictionary encoded page and then a page per row thereafter.
assert_eq!(
offset_index.len(),
10,
"Expected 9 pages but got {:#?}",
offset_index
);
}

const SMALL_SIZE: usize = 7;

fn roundtrip(
Expand Down

0 comments on commit 0268bba

Please sign in to comment.