New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix page size on dictionary fallback #2854
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -551,7 +551,10 @@ where | |||
|
||||
match &mut encoder.dict_encoder { | ||||
Some(dict_encoder) => dict_encoder.encode(values, indices), | ||||
None => encoder.fallback.encode(values, indices), | ||||
None => { | ||||
encoder.num_values += indices.len(); | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we be doing this regardless of if we've fallen back? I think currently this will fail to flush a dictionary encoded data page even if it has reached sufficient size? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe, when we do it that way it causes a panic which may also be a bug.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we need to reset num_values to 0 when we flush a data page There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it already does that right?
|
||||
encoder.fallback.encode(values, indices) | ||||
} | ||||
} | ||||
} | ||||
|
||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -609,6 +609,7 @@ mod tests { | |
use super::*; | ||
|
||
use bytes::Bytes; | ||
use rand::{thread_rng, Rng}; | ||
use std::fs::File; | ||
use std::sync::Arc; | ||
|
||
|
@@ -624,6 +625,7 @@ mod tests { | |
|
||
use crate::basic::Encoding; | ||
use crate::file::metadata::ParquetMetaData; | ||
use crate::file::page_index::index_reader::read_pages_locations; | ||
use crate::file::properties::WriterVersion; | ||
use crate::file::{ | ||
reader::{FileReader, SerializedFileReader}, | ||
|
@@ -1108,6 +1110,55 @@ mod tests { | |
roundtrip(batch, Some(SMALL_SIZE / 2)); | ||
} | ||
|
||
#[test] | ||
fn arrow_writer_page_size() { | ||
let mut rng = thread_rng(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should either seed this, or loosen the assert below. Otherwise I worry that depending on what values are generated, we may end up with more or less pages (as the dictionary page will only spill once it has seen sufficient different values, which technically could occur at any point) |
||
let schema = | ||
Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)])); | ||
|
||
let mut builder = StringBuilder::with_capacity(1_000, 2 * 1_000); | ||
|
||
for _ in 0..10_000 { | ||
let value = (0..200) | ||
.map(|_| rng.gen_range(b'a'..=b'z') as char) | ||
.collect::<String>(); | ||
|
||
builder.append_value(value); | ||
} | ||
|
||
let array = Arc::new(builder.finish()); | ||
|
||
let batch = RecordBatch::try_new(schema, vec![array]).unwrap(); | ||
|
||
let file = tempfile::tempfile().unwrap(); | ||
|
||
let props = WriterProperties::builder() | ||
.set_max_row_group_size(usize::MAX) | ||
.set_data_pagesize_limit(256) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You could potentially set the dictionary page size smaller to verify that as well, but up to you There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So I think there are still some issues here. It is still ignoring the size limit. It is at least respecting the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That is expected and I believe consistent with other parquet writers. The limit is best effort |
||
.build(); | ||
|
||
let mut writer = | ||
ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props)) | ||
.expect("Unable to write file"); | ||
writer.write(&batch).unwrap(); | ||
writer.close().unwrap(); | ||
|
||
let reader = SerializedFileReader::new(file.try_clone().unwrap()).unwrap(); | ||
|
||
let column = reader.metadata().row_group(0).columns(); | ||
|
||
let page_locations = read_pages_locations(&file, column).unwrap(); | ||
|
||
let offset_index = page_locations[0].clone(); | ||
|
||
assert_eq!( | ||
offset_index.len(), | ||
5, | ||
"Expected more than two pages but got {:#?}", | ||
offset_index | ||
); | ||
} | ||
|
||
const SMALL_SIZE: usize = 7; | ||
|
||
fn roundtrip( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm guessing the problem was that whilst the estimated_data_page_size would increase, the lack of any values would cause it to erroneously not try to flush the page?
In particular https://github.com/apache/arrow-rs/blob/master/parquet/src/column/writer/mod.rs#L567
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, exactly