Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix page size on dictionary fallback #2854

Merged
merged 3 commits into from Oct 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 4 additions & 1 deletion parquet/src/arrow/arrow_writer/byte_array.rs
Expand Up @@ -551,7 +551,10 @@ where

match &mut encoder.dict_encoder {
Some(dict_encoder) => dict_encoder.encode(values, indices),
None => encoder.fallback.encode(values, indices),
None => {
encoder.num_values += indices.len();
Copy link
Contributor

@tustvold tustvold Oct 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing the problem was that whilst the estimated_data_page_size would increase, the lack of any values would cause it to erroneously not try to flush the page?

In particular https://github.com/apache/arrow-rs/blob/master/parquet/src/column/writer/mod.rs#L567

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, exactly

Copy link
Contributor

@tustvold tustvold Oct 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be doing this regardless of if we've fallen back? I think currently this will fail to flush a dictionary encoded data page even if it has reached sufficient size?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, when we do it that way it causes a panic which may also be a bug.

General("Must flush data pages before flushing dictionary")'

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to reset num_values to 0 when we flush a data page

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it already does that right?

num_values: std::mem::take(&mut self.num_values),

encoder.fallback.encode(values, indices)
}
}
}

Expand Down
66 changes: 66 additions & 0 deletions parquet/src/arrow/arrow_writer/mod.rs
Expand Up @@ -624,6 +624,7 @@ mod tests {

use crate::basic::Encoding;
use crate::file::metadata::ParquetMetaData;
use crate::file::page_index::index_reader::read_pages_locations;
use crate::file::properties::WriterVersion;
use crate::file::{
reader::{FileReader, SerializedFileReader},
Expand Down Expand Up @@ -1108,6 +1109,71 @@ mod tests {
roundtrip(batch, Some(SMALL_SIZE / 2));
}

#[test]
fn arrow_writer_page_size() {
let schema =
Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));

let mut builder = StringBuilder::with_capacity(100, 329 * 10_000);

// Generate an array of 10 unique 10 character string
for i in 0..10 {
let value = i
.to_string()
.repeat(10)
.chars()
.take(10)
.collect::<String>();

builder.append_value(value);
}

let array = Arc::new(builder.finish());

let batch = RecordBatch::try_new(schema, vec![array]).unwrap();

let file = tempfile::tempfile().unwrap();

// Set everything very low so we fallback to PLAIN encoding after the first row
let props = WriterProperties::builder()
.set_data_pagesize_limit(1)
.set_dictionary_pagesize_limit(1)
.set_write_batch_size(1)
.build();

let mut writer =
ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
.expect("Unable to write file");
writer.write(&batch).unwrap();
writer.close().unwrap();

let reader = SerializedFileReader::new(file.try_clone().unwrap()).unwrap();

let column = reader.metadata().row_group(0).columns();

assert_eq!(column.len(), 1);

// We should write one row before falling back to PLAIN encoding so there should still be a
// dictionary page.
assert!(
column[0].dictionary_page_offset().is_some(),
"Expected a dictionary page"
);

let page_locations = read_pages_locations(&file, column).unwrap();

let offset_index = page_locations[0].clone();

// We should fallback to PLAIN encoding after the first row and our max page size is 1 bytes
// so we expect one dictionary encoded page and then a page per row thereafter.
assert_eq!(
offset_index.len(),
10,
"Expected 9 pages but got {:#?}",
offset_index
);
}

const SMALL_SIZE: usize = 7;

fn roundtrip(
Expand Down