Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix record delimiting on row group boundaries (#2025) #2028

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion parquet/src/arrow/array_reader/list_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl<OffsetSize: OffsetSizeTrait> ArrayReader for ListArrayReader<OffsetSize> {

// The output offsets for the computed ListArray
let mut list_offsets: Vec<OffsetSize> =
Vec::with_capacity(next_batch_array.len());
Vec::with_capacity(next_batch_array.len() + 1);

// The validity mask of the computed ListArray if nullable
let mut validity = self
Expand Down
53 changes: 53 additions & 0 deletions parquet/src/arrow/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1528,4 +1528,57 @@ mod tests {

assert_eq!(total_rows, expected_rows);
}

#[test]
fn test_row_group_exact_multiple() {
let schema = Arc::new(Schema::new(vec![
Field::new("int", ArrowDataType::Int32, false),
Field::new(
"list",
ArrowDataType::List(Box::new(Field::new(
"item",
ArrowDataType::Int32,
true,
))),
true,
),
]));

let mut buf = Vec::with_capacity(1024);

let mut writer = ArrowWriter::try_new(
&mut buf,
schema.clone(),
Some(
WriterProperties::builder()
.set_max_row_group_size(8)
.build(),
),
)
.unwrap();
for _ in 0..2 {
let mut int_builder = Int32Builder::new(10);
let mut list_builder = ListBuilder::new(Int32Builder::new(10));
for i in 0..10 {
int_builder.append_value(i).unwrap();
list_builder.append(true).unwrap();
}
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(int_builder.finish()),
Arc::new(list_builder.finish()),
],
)
.unwrap();
writer.write(&batch).unwrap();
}
writer.close().unwrap();

let mut file_reader = ParquetFileArrowReader::try_new(Bytes::from(buf)).unwrap();
let mut record_reader = file_reader.get_record_reader(8).unwrap();
assert_eq!(8, record_reader.next().unwrap().unwrap().num_rows());
assert_eq!(8, record_reader.next().unwrap().unwrap().num_rows());
assert_eq!(4, record_reader.next().unwrap().unwrap().num_rows());
}
}
88 changes: 66 additions & 22 deletions parquet/src/arrow/record_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,32 +139,19 @@ where

let mut records_read = 0;

// Used to mark whether we have reached the end of current
// column chunk
let mut end_of_column = false;

loop {
// Try to find some records from buffers that has been read into memory
// but not counted as seen records.
let end_of_column = !self.column_reader.as_mut().unwrap().has_next()?;

let (record_count, value_count) =
self.count_records(num_records - records_read);
self.count_records(num_records - records_read, end_of_column);

self.num_records += record_count;
self.num_values += value_count;
records_read += record_count;

if records_read == num_records {
break;
}

if end_of_column {
// Since page reader contains complete records, if we reached end of a
// page reader, we should reach the end of a record
if self.rep_levels.is_some() {
self.num_records += 1;
self.num_values = self.values_written;
records_read += 1;
}
if records_read == num_records || end_of_column {
break;
}

Expand Down Expand Up @@ -194,10 +181,7 @@ where
};

// Try to more value from parquet pages
let values_read = self.read_one_batch(batch_size)?;
if values_read < batch_size {
end_of_column = true;
}
self.read_one_batch(batch_size)?;
}

Ok(records_read)
Expand Down Expand Up @@ -334,8 +318,15 @@ where
/// Inspects the buffered repetition levels in the range `self.num_values..self.values_written`
/// and returns the number of "complete" records along with the corresponding number of values
///
/// If `end_of_column` is true it indicates that there are no further values for this
/// column chunk beyond what is currently in the buffers
///
/// A "complete" record is one where the buffer contains a subsequent repetition level of 0
fn count_records(&self, records_to_read: usize) -> (usize, usize) {
fn count_records(
&self,
records_to_read: usize,
end_of_column: bool,
) -> (usize, usize) {
match self.rep_levels.as_ref() {
Some(buf) => {
let buf = buf.as_slice();
Expand All @@ -359,6 +350,15 @@ where
}
}

// If reached end of column chunk => end of a record
if records_read != records_to_read
&& end_of_column
&& self.values_written != 0
{
records_read += 1;
end_of_last_record = self.values_written;
}

(records_read, end_of_last_record - self.num_values)
}
None => {
Expand Down Expand Up @@ -731,4 +731,48 @@ mod tests {
assert_eq!(5000, record_reader.num_values());
}
}

#[test]
fn test_row_group_boundary() {
// Construct column schema
let message_type = "
message test_schema {
REPEATED Group test_struct {
REPEATED INT32 leaf;
}
}
";

let desc = parse_message_type(message_type)
.map(|t| SchemaDescriptor::new(Arc::new(t)))
.map(|s| s.column(0))
.unwrap();

let values = [1, 2, 3];
let def_levels = [1i16, 0i16, 1i16, 2i16, 2i16, 1i16, 2i16];
let rep_levels = [0i16, 0i16, 0i16, 1i16, 2i16, 0i16, 1i16];
let mut pb = DataPageBuilderImpl::new(desc.clone(), 7, true);
pb.add_rep_levels(2, &rep_levels);
pb.add_def_levels(2, &def_levels);
pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
let page = pb.consume();

let mut record_reader = RecordReader::<Int32Type>::new(desc.clone());
let page_reader = Box::new(InMemoryPageReader::new(vec![page.clone()]));
record_reader.set_page_reader(page_reader).unwrap();
assert_eq!(record_reader.read_records(4).unwrap(), 4);
assert_eq!(record_reader.num_records(), 4);
assert_eq!(record_reader.num_values(), 7);

let mut record_reader = RecordReader::<Int32Type>::new(desc);
let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
record_reader.set_page_reader(page_reader).unwrap();
assert_eq!(record_reader.read_records(3).unwrap(), 3);
assert_eq!(record_reader.num_records(), 3);
assert_eq!(record_reader.num_values(), 5);

assert_eq!(record_reader.read_records(3).unwrap(), 1);
assert_eq!(record_reader.num_records(), 4);
assert_eq!(record_reader.num_values(), 7);
}
}
2 changes: 1 addition & 1 deletion parquet/src/column/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ where
}

#[inline]
fn has_next(&mut self) -> Result<bool> {
pub(crate) fn has_next(&mut self) -> Result<bool> {
if self.num_buffered_values == 0
|| self.num_buffered_values == self.num_decoded_values
{
Expand Down