diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs index f93488f75f7..ebbb864d630 100644 --- a/parquet/src/arrow/arrow_reader.rs +++ b/parquet/src/arrow/arrow_reader.rs @@ -1529,8 +1529,7 @@ mod tests { assert_eq!(total_rows, expected_rows); } - #[test] - fn test_row_group_exact_multiple() { + fn test_row_group_batch(row_group_size: usize, batch_size: usize) { let schema = Arc::new(Schema::new(vec![Field::new( "list", ArrowDataType::List(Box::new(Field::new("item", ArrowDataType::Int32, true))), @@ -1544,14 +1543,14 @@ mod tests { schema.clone(), Some( WriterProperties::builder() - .set_max_row_group_size(8) + .set_max_row_group_size(row_group_size) .build(), ), ) .unwrap(); for _ in 0..2 { - let mut list_builder = ListBuilder::new(Int32Builder::new(10)); - for _ in 0..10 { + let mut list_builder = ListBuilder::new(Int32Builder::new(batch_size)); + for _ in 0..(batch_size) { list_builder.append(true).unwrap(); } let batch = RecordBatch::try_new( @@ -1564,9 +1563,27 @@ mod tests { writer.close().unwrap(); let mut file_reader = ParquetFileArrowReader::try_new(Bytes::from(buf)).unwrap(); - let mut record_reader = file_reader.get_record_reader(8).unwrap(); - assert_eq!(8, record_reader.next().unwrap().unwrap().num_rows()); - assert_eq!(8, record_reader.next().unwrap().unwrap().num_rows()); - assert_eq!(4, record_reader.next().unwrap().unwrap().num_rows()); + let mut record_reader = file_reader.get_record_reader(batch_size).unwrap(); + assert_eq!( + batch_size, + record_reader.next().unwrap().unwrap().num_rows() + ); + assert_eq!( + batch_size, + record_reader.next().unwrap().unwrap().num_rows() + ); + } + + #[test] + fn test_row_group_exact_multiple() { + use crate::arrow::record_reader::MIN_BATCH_SIZE; + test_row_group_batch(8, 8); + test_row_group_batch(10, 8); + test_row_group_batch(8, 10); + test_row_group_batch(MIN_BATCH_SIZE, MIN_BATCH_SIZE); + test_row_group_batch(MIN_BATCH_SIZE + 1, MIN_BATCH_SIZE); + test_row_group_batch(MIN_BATCH_SIZE, MIN_BATCH_SIZE + 1); + test_row_group_batch(MIN_BATCH_SIZE, MIN_BATCH_SIZE - 1); + test_row_group_batch(MIN_BATCH_SIZE - 1, MIN_BATCH_SIZE); } } diff --git a/parquet/src/arrow/record_reader/mod.rs b/parquet/src/arrow/record_reader/mod.rs index 30324fbe3e3..d2720aedeb8 100644 --- a/parquet/src/arrow/record_reader/mod.rs +++ b/parquet/src/arrow/record_reader/mod.rs @@ -38,7 +38,8 @@ use crate::schema::types::ColumnDescPtr; pub(crate) mod buffer; mod definition_levels; -const MIN_BATCH_SIZE: usize = 1024; +/// The minimum number of levels read when reading a repeated field +pub(crate) const MIN_BATCH_SIZE: usize = 1024; /// A `RecordReader` is a stateful column reader that delimits semantic records. pub type RecordReader =