diff --git a/parquet/src/arrow/array_reader/byte_array.rs b/parquet/src/arrow/array_reader/byte_array.rs index 9e0f83fa945..95620d940b7 100644 --- a/parquet/src/arrow/array_reader/byte_array.rs +++ b/parquet/src/arrow/array_reader/byte_array.rs @@ -113,10 +113,10 @@ impl ArrayReader for ByteArrayReader { fn next_batch(&mut self, batch_size: usize) -> Result { read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)?; - let buffer = self.record_reader.consume_record_data()?; - let null_buffer = self.record_reader.consume_bitmap_buffer()?; - self.def_levels_buffer = self.record_reader.consume_def_levels()?; - self.rep_levels_buffer = self.record_reader.consume_rep_levels()?; + let buffer = self.record_reader.consume_record_data(); + let null_buffer = self.record_reader.consume_bitmap_buffer(); + self.def_levels_buffer = self.record_reader.consume_def_levels(); + self.rep_levels_buffer = self.record_reader.consume_rep_levels(); self.record_reader.reset(); Ok(buffer.into_array(null_buffer, self.data_type.clone())) diff --git a/parquet/src/arrow/array_reader/byte_array_dictionary.rs b/parquet/src/arrow/array_reader/byte_array_dictionary.rs index 0cd67206f00..77f7916ed58 100644 --- a/parquet/src/arrow/array_reader/byte_array_dictionary.rs +++ b/parquet/src/arrow/array_reader/byte_array_dictionary.rs @@ -173,12 +173,12 @@ where fn next_batch(&mut self, batch_size: usize) -> Result { read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)?; - let buffer = self.record_reader.consume_record_data()?; - let null_buffer = self.record_reader.consume_bitmap_buffer()?; + let buffer = self.record_reader.consume_record_data(); + let null_buffer = self.record_reader.consume_bitmap_buffer(); let array = buffer.into_array(null_buffer, &self.data_type)?; - self.def_levels_buffer = self.record_reader.consume_def_levels()?; - self.rep_levels_buffer = self.record_reader.consume_rep_levels()?; + self.def_levels_buffer = self.record_reader.consume_def_levels(); + self.rep_levels_buffer = self.record_reader.consume_rep_levels(); self.record_reader.reset(); Ok(array) diff --git a/parquet/src/arrow/array_reader/null_array.rs b/parquet/src/arrow/array_reader/null_array.rs index 53ac0852fab..4b592025d6a 100644 --- a/parquet/src/arrow/array_reader/null_array.rs +++ b/parquet/src/arrow/array_reader/null_array.rs @@ -86,11 +86,11 @@ where let array = arrow::array::NullArray::new(self.record_reader.num_values()); // save definition and repetition buffers - self.def_levels_buffer = self.record_reader.consume_def_levels()?; - self.rep_levels_buffer = self.record_reader.consume_rep_levels()?; + self.def_levels_buffer = self.record_reader.consume_def_levels(); + self.rep_levels_buffer = self.record_reader.consume_rep_levels(); // Must consume bitmap buffer - self.record_reader.consume_bitmap_buffer()?; + self.record_reader.consume_bitmap_buffer(); self.record_reader.reset(); Ok(Arc::new(array)) diff --git a/parquet/src/arrow/array_reader/primitive_array.rs b/parquet/src/arrow/array_reader/primitive_array.rs index 222b595c2e1..c25df89a681 100644 --- a/parquet/src/arrow/array_reader/primitive_array.rs +++ b/parquet/src/arrow/array_reader/primitive_array.rs @@ -148,7 +148,7 @@ where // Convert to arrays by using the Parquet physical type. // The physical types are then cast to Arrow types if necessary - let mut record_data = self.record_reader.consume_record_data()?; + let mut record_data = self.record_reader.consume_record_data(); if T::get_physical_type() == PhysicalType::BOOLEAN { let mut boolean_buffer = BooleanBufferBuilder::new(record_data.len()); @@ -162,7 +162,7 @@ where let array_data = ArrayDataBuilder::new(arrow_data_type) .len(self.record_reader.num_values()) .add_buffer(record_data) - .null_bit_buffer(self.record_reader.consume_bitmap_buffer()?); + .null_bit_buffer(self.record_reader.consume_bitmap_buffer()); let array_data = unsafe { array_data.build_unchecked() }; let array = match T::get_physical_type() { @@ -227,8 +227,8 @@ where }; // save definition and repetition buffers - self.def_levels_buffer = self.record_reader.consume_def_levels()?; - self.rep_levels_buffer = self.record_reader.consume_rep_levels()?; + self.def_levels_buffer = self.record_reader.consume_def_levels(); + self.rep_levels_buffer = self.record_reader.consume_rep_levels(); self.record_reader.reset(); Ok(array) } diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs index 89406cd616a..c5d1f66e5bf 100644 --- a/parquet/src/arrow/arrow_reader.rs +++ b/parquet/src/arrow/arrow_reader.rs @@ -86,7 +86,6 @@ impl ArrowReaderOptions { /// /// For example:[ARROW-16184](https://issues.apache.org/jira/browse/ARROW-16184) /// - /// Set `skip_arrow_metadata` to true, to skip decoding this pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self { Self { diff --git a/parquet/src/arrow/record_reader/mod.rs b/parquet/src/arrow/record_reader/mod.rs index 023a538a274..af75dbb4951 100644 --- a/parquet/src/arrow/record_reader/mod.rs +++ b/parquet/src/arrow/record_reader/mod.rs @@ -218,32 +218,32 @@ where /// The implementation has side effects. It will create a new buffer to hold those /// definition level values that have already been read into memory but not counted /// as record values, e.g. those from `self.num_values` to `self.values_written`. - pub fn consume_def_levels(&mut self) -> Result> { - Ok(match self.def_levels.as_mut() { + pub fn consume_def_levels(&mut self) -> Option { + match self.def_levels.as_mut() { Some(x) => x.split_levels(self.num_values), None => None, - }) + } } /// Return repetition level data. /// The side effect is similar to `consume_def_levels`. - pub fn consume_rep_levels(&mut self) -> Result> { - Ok(match self.rep_levels.as_mut() { + pub fn consume_rep_levels(&mut self) -> Option { + match self.rep_levels.as_mut() { Some(x) => Some(x.split_off(self.num_values)), None => None, - }) + } } /// Returns currently stored buffer data. /// The side effect is similar to `consume_def_levels`. - pub fn consume_record_data(&mut self) -> Result { - Ok(self.records.split_off(self.num_values)) + pub fn consume_record_data(&mut self) -> V::Output { + self.records.split_off(self.num_values) } /// Returns currently stored null bitmap data. /// The side effect is similar to `consume_def_levels`. - pub fn consume_bitmap_buffer(&mut self) -> Result> { - Ok(self.consume_bitmap()?.map(|b| b.into_buffer())) + pub fn consume_bitmap_buffer(&mut self) -> Option { + self.consume_bitmap().map(|b| b.into_buffer()) } /// Reset state of record reader. @@ -256,11 +256,10 @@ where } /// Returns bitmap data. - pub fn consume_bitmap(&mut self) -> Result> { - Ok(self - .def_levels + pub fn consume_bitmap(&mut self) -> Option { + self.def_levels .as_mut() - .map(|levels| levels.split_bitmask(self.num_values))) + .map(|levels| levels.split_bitmask(self.num_values)) } /// Try to read one batch of data. @@ -296,7 +295,7 @@ where } let values_read = max(levels_read, values_read); - self.set_values_written(self.values_written + values_read)?; + self.set_values_written(self.values_written + values_read); Ok(values_read) } @@ -339,8 +338,7 @@ where } } - #[allow(clippy::unnecessary_wraps)] - fn set_values_written(&mut self, new_values_written: usize) -> Result<()> { + fn set_values_written(&mut self, new_values_written: usize) { self.values_written = new_values_written; self.records.set_len(self.values_written); @@ -351,8 +349,6 @@ where if let Some(ref mut buf) = self.def_levels { buf.set_len(self.values_written) }; - - Ok(()) } } @@ -365,42 +361,15 @@ mod tests { use arrow::buffer::Buffer; use crate::basic::Encoding; - use crate::column::page::Page; - use crate::column::page::PageReader; use crate::data_type::Int32Type; - use crate::errors::Result; use crate::schema::parser::parse_message_type; use crate::schema::types::SchemaDescriptor; - use crate::util::test_common::page_util::{DataPageBuilder, DataPageBuilderImpl}; + use crate::util::test_common::page_util::{ + DataPageBuilder, DataPageBuilderImpl, InMemoryPageReader, + }; use super::RecordReader; - struct TestPageReader { - pages: Box + Send>, - } - - impl TestPageReader { - pub fn new(pages: Vec) -> Self { - Self { - pages: Box::new(pages.into_iter()), - } - } - } - - impl PageReader for TestPageReader { - fn get_next_page(&mut self) -> Result> { - Ok(self.pages.next()) - } - } - - impl Iterator for TestPageReader { - type Item = Result; - - fn next(&mut self) -> Option { - self.get_next_page().transpose() - } - } - #[test] fn test_read_required_records() { // Construct column schema @@ -436,7 +405,7 @@ mod tests { pb.add_values::(Encoding::PLAIN, &values); let page = pb.consume(); - let page_reader = Box::new(TestPageReader::new(vec![page])); + let page_reader = Box::new(InMemoryPageReader::new(vec![page])); record_reader.set_page_reader(page_reader).unwrap(); assert_eq!(2, record_reader.read_records(2).unwrap()); assert_eq!(2, record_reader.num_records()); @@ -459,7 +428,7 @@ mod tests { pb.add_values::(Encoding::PLAIN, &values); let page = pb.consume(); - let page_reader = Box::new(TestPageReader::new(vec![page])); + let page_reader = Box::new(InMemoryPageReader::new(vec![page])); record_reader.set_page_reader(page_reader).unwrap(); assert_eq!(2, record_reader.read_records(10).unwrap()); assert_eq!(7, record_reader.num_records()); @@ -469,12 +438,9 @@ mod tests { let mut bb = Int32BufferBuilder::new(7); bb.append_slice(&[4, 7, 6, 3, 2, 8, 9]); let expected_buffer = bb.finish(); - assert_eq!( - expected_buffer, - record_reader.consume_record_data().unwrap() - ); - assert_eq!(None, record_reader.consume_def_levels().unwrap()); - assert_eq!(None, record_reader.consume_bitmap().unwrap()); + assert_eq!(expected_buffer, record_reader.consume_record_data()); + assert_eq!(None, record_reader.consume_def_levels()); + assert_eq!(None, record_reader.consume_bitmap()); } #[test] @@ -520,7 +486,7 @@ mod tests { pb.add_values::(Encoding::PLAIN, &values); let page = pb.consume(); - let page_reader = Box::new(TestPageReader::new(vec![page])); + let page_reader = Box::new(InMemoryPageReader::new(vec![page])); record_reader.set_page_reader(page_reader).unwrap(); assert_eq!(2, record_reader.read_records(2).unwrap()); assert_eq!(2, record_reader.num_records()); @@ -546,7 +512,7 @@ mod tests { pb.add_values::(Encoding::PLAIN, &values); let page = pb.consume(); - let page_reader = Box::new(TestPageReader::new(vec![page])); + let page_reader = Box::new(InMemoryPageReader::new(vec![page])); record_reader.set_page_reader(page_reader).unwrap(); assert_eq!(2, record_reader.read_records(10).unwrap()); assert_eq!(7, record_reader.num_records()); @@ -559,20 +525,17 @@ mod tests { let expected_def_levels = bb.finish(); assert_eq!( Some(expected_def_levels), - record_reader.consume_def_levels().unwrap() + record_reader.consume_def_levels() ); // Verify bitmap let expected_valid = &[false, true, false, true, true, false, true]; let expected_buffer = Buffer::from_iter(expected_valid.iter().cloned()); let expected_bitmap = Bitmap::from(expected_buffer); - assert_eq!( - Some(expected_bitmap), - record_reader.consume_bitmap().unwrap() - ); + assert_eq!(Some(expected_bitmap), record_reader.consume_bitmap()); // Verify result record data - let actual = record_reader.consume_record_data().unwrap(); + let actual = record_reader.consume_record_data(); let actual_values = actual.typed_data::(); let expected = &[0, 7, 0, 6, 3, 0, 8]; @@ -631,7 +594,7 @@ mod tests { pb.add_values::(Encoding::PLAIN, &values); let page = pb.consume(); - let page_reader = Box::new(TestPageReader::new(vec![page])); + let page_reader = Box::new(InMemoryPageReader::new(vec![page])); record_reader.set_page_reader(page_reader).unwrap(); assert_eq!(1, record_reader.read_records(1).unwrap()); @@ -659,7 +622,7 @@ mod tests { pb.add_values::(Encoding::PLAIN, &values); let page = pb.consume(); - let page_reader = Box::new(TestPageReader::new(vec![page])); + let page_reader = Box::new(InMemoryPageReader::new(vec![page])); record_reader.set_page_reader(page_reader).unwrap(); assert_eq!(1, record_reader.read_records(10).unwrap()); @@ -673,20 +636,17 @@ mod tests { let expected_def_levels = bb.finish(); assert_eq!( Some(expected_def_levels), - record_reader.consume_def_levels().unwrap() + record_reader.consume_def_levels() ); // Verify bitmap let expected_valid = &[true, false, false, true, true, true, true, true, true]; let expected_buffer = Buffer::from_iter(expected_valid.iter().cloned()); let expected_bitmap = Bitmap::from(expected_buffer); - assert_eq!( - Some(expected_bitmap), - record_reader.consume_bitmap().unwrap() - ); + assert_eq!(Some(expected_bitmap), record_reader.consume_bitmap()); // Verify result record data - let actual = record_reader.consume_record_data().unwrap(); + let actual = record_reader.consume_record_data(); let actual_values = actual.typed_data::(); let expected = &[4, 0, 0, 7, 6, 3, 2, 8, 9]; assert_eq!(actual_values.len(), expected.len()); @@ -731,7 +691,7 @@ mod tests { pb.add_values::(Encoding::PLAIN, &values); let page = pb.consume(); - let page_reader = Box::new(TestPageReader::new(vec![page])); + let page_reader = Box::new(InMemoryPageReader::new(vec![page])); record_reader.set_page_reader(page_reader).unwrap(); assert_eq!(1000, record_reader.read_records(1000).unwrap()); diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs index b5a52f6e23c..a97787ccfa5 100644 --- a/parquet/src/column/reader.rs +++ b/parquet/src/column/reader.rs @@ -17,7 +17,7 @@ //! Contains column reader API. -use std::cmp::{max, min}; +use std::cmp::min; use super::page::{Page, PageReader}; use crate::basic::*; @@ -163,26 +163,18 @@ where } } - /// Reads a batch of values of at most `batch_size`. + /// Reads a batch of values of at most `batch_size`, returning a tuple containing the + /// actual number of non-null values read, followed by the corresponding number of levels, + /// i.e, the total number of values including nulls, empty lists, etc... /// - /// This will try to read from the row group, and fills up at most `batch_size` values - /// for `def_levels`, `rep_levels` and `values`. It will stop either when the row - /// group is depleted or `batch_size` values has been read, or there is no space - /// in the input slices (values/definition levels/repetition levels). + /// If the max definition level is 0, `def_levels` will be ignored, otherwise it will be + /// populated with the number of levels read, with an error returned if it is `None`. /// - /// Note that in case the field being read is not required, `values` could contain - /// less values than `def_levels`. Also note that this will skip reading def / rep - /// levels if the field is required / not repeated, respectively. + /// If the max repetition level is 0, `rep_levels` will be ignored, otherwise it will be + /// populated with the number of levels read, with an error returned if it is `None`. /// - /// If `def_levels` or `rep_levels` is `None`, this will also skip reading the - /// respective levels. This is useful when the caller of this function knows in - /// advance that the field is required and non-repeated, therefore can avoid - /// allocating memory for the levels data. Note that if field has definition - /// levels, but caller provides None, there might be inconsistency between - /// levels/values (see comments below). - /// - /// Returns a tuple where the first element is the actual number of values read, - /// and the second element is the actual number of levels read. + /// `values` will be contiguously populated with the non-null values. Note that if the column + /// is not required, this may be less than either `batch_size` or the number of levels read #[inline] pub fn read_batch( &mut self, @@ -205,84 +197,65 @@ where // Read exhaustively all pages until we read all batch_size values/levels // or there are no more values/levels to read. - while max(values_read, levels_read) < batch_size { + while levels_read < batch_size { if !self.has_next()? { break; } // Batch size for the current iteration - let iter_batch_size = { - // Compute approximate value based on values decoded so far - let mut adjusted_size = min( - batch_size, - (self.num_buffered_values - self.num_decoded_values) as usize, - ); - - // Adjust batch size by taking into account how much data there - // to read. As batch_size is also smaller than value and level - // slices (if available), this ensures that available space is not - // exceeded. - adjusted_size = min(adjusted_size, batch_size - values_read); - adjusted_size = min(adjusted_size, batch_size - levels_read); - - adjusted_size - }; + let iter_batch_size = (batch_size - levels_read) + .min((self.num_buffered_values - self.num_decoded_values) as usize); // If the field is required and non-repeated, there are no definition levels - let (num_def_levels, null_count) = match def_levels.as_mut() { - Some(levels) if self.descr.max_def_level() > 0 => { + let null_count = match self.descr.max_def_level() > 0 { + true => { + let levels = def_levels + .as_mut() + .ok_or_else(|| general_err!("must specify definition levels"))?; + let num_def_levels = self .def_level_decoder .as_mut() .expect("def_level_decoder be set") - .read(*levels, levels_read..levels_read + iter_batch_size)?; + .read(levels, levels_read..levels_read + iter_batch_size)?; + + if num_def_levels != iter_batch_size { + return Err(general_err!("insufficient definition levels read from column - expected {}, got {}", iter_batch_size, num_def_levels)); + } - let null_count = levels.count_nulls( + levels.count_nulls( levels_read..levels_read + num_def_levels, self.descr.max_def_level(), - ); - (num_def_levels, null_count) + ) } - _ => (0, 0), + false => 0, }; - let num_rep_levels = match rep_levels.as_mut() { - Some(levels) if self.descr.max_rep_level() > 0 => self + if self.descr.max_rep_level() > 0 { + let levels = rep_levels + .as_mut() + .ok_or_else(|| general_err!("must specify repetition levels"))?; + + let rep_levels = self .rep_level_decoder .as_mut() .expect("rep_level_decoder be set") - .read(levels, levels_read..levels_read + iter_batch_size)?, - _ => 0, - }; + .read(levels, levels_read..levels_read + iter_batch_size)?; - // At this point we have read values, definition and repetition levels. - // If both definition and repetition levels are defined, their counts - // should be equal. Values count is always less or equal to definition levels. - if num_def_levels != 0 - && num_rep_levels != 0 - && num_rep_levels != num_def_levels - { - return Err(general_err!( - "inconsistent number of levels read - def: {}, rep: {}", - num_def_levels, - num_rep_levels - )); + if rep_levels != iter_batch_size { + return Err(general_err!("insufficient repetition levels read from column - expected {}, got {}", iter_batch_size, rep_levels)); + } } - // Note that if field is not required, but no definition levels are provided, - // we would read values of batch size and (if provided, of course) repetition - // levels of batch size - [!] they will not be synced, because only definition - // levels enforce number of non-null values to read. - let values_to_read = iter_batch_size - null_count; let curr_values_read = self .values_decoder .read(values, values_read..values_read + values_to_read)?; - if num_def_levels != 0 && curr_values_read != num_def_levels - null_count { + if curr_values_read != values_to_read { return Err(general_err!( "insufficient values read from column - expected: {}, got: {}", - num_def_levels - null_count, + values_to_read, curr_values_read )); } @@ -290,9 +263,8 @@ where // Update all "return" counters and internal state. // This is to account for when def or rep levels are not provided - let curr_levels_read = max(num_def_levels, num_rep_levels); - self.num_decoded_values += max(curr_levels_read, curr_values_read) as u32; - levels_read += curr_levels_read; + self.num_decoded_values += iter_batch_size as u32; + levels_read += iter_batch_size; values_read += curr_values_read; } @@ -302,8 +274,7 @@ where /// Reads a new page and set up the decoders for levels, values or dictionary. /// Returns false if there's no page left. fn read_new_page(&mut self) -> Result { - #[allow(while_true)] - while true { + loop { match self.page_reader.get_next_page()? { // No more page to read None => return Ok(false), @@ -433,8 +404,6 @@ where } } } - - Ok(true) } #[inline] @@ -484,12 +453,12 @@ mod tests { use super::*; use rand::distributions::uniform::SampleUniform; - use std::{collections::VecDeque, sync::Arc, vec::IntoIter}; + use std::{collections::VecDeque, sync::Arc}; use crate::basic::Type as PhysicalType; - use crate::column::page::Page; use crate::schema::types::{ColumnDescriptor, ColumnPath, Type as SchemaType}; use crate::util::test_common::make_pages; + use crate::util::test_common::page_util::InMemoryPageReader; const NUM_LEVELS: usize = 128; const NUM_PAGES: usize = 2; @@ -1036,7 +1005,7 @@ mod tests { } else { 0 }; - let max_rep_level = if def_levels.is_some() { + let max_rep_level = if rep_levels.is_some() { MAX_REP_LEVEL } else { 0 @@ -1055,8 +1024,8 @@ mod tests { NUM_PAGES, NUM_LEVELS, batch_size, - std::i32::MIN, - std::i32::MAX, + i32::MIN, + i32::MAX, values, def_levels, rep_levels, @@ -1235,7 +1204,8 @@ mod tests { use_v2, ); let max_def_level = desc.max_def_level(); - let page_reader = TestPageReader::new(Vec::from(pages)); + let max_rep_level = desc.max_rep_level(); + let page_reader = InMemoryPageReader::new(pages); let column_reader: ColumnReader = get_column_reader(desc, Box::new(page_reader)); let mut typed_column_reader = get_typed_column_reader::(column_reader); @@ -1276,7 +1246,8 @@ mod tests { "values content doesn't match" ); - if let Some(ref levels) = def_levels { + if max_def_level > 0 { + let levels = def_levels.as_ref().unwrap(); assert!( levels.len() >= curr_levels_read, "def_levels.len() >= levels_read" @@ -1288,7 +1259,8 @@ mod tests { ); } - if let Some(ref levels) = rep_levels { + if max_rep_level > 0 { + let levels = rep_levels.as_ref().unwrap(); assert!( levels.len() >= curr_levels_read, "rep_levels.len() >= levels_read" @@ -1300,44 +1272,10 @@ mod tests { ); } - if def_levels.is_none() && rep_levels.is_none() { - assert!( - curr_levels_read == 0, - "expected to read 0 levels, found {}", - curr_levels_read - ); - } else if def_levels.is_some() && max_def_level > 0 { - assert!( - curr_levels_read >= curr_values_read, - "expected levels read to be greater than values read" - ); - } - } - } - - struct TestPageReader { - pages: IntoIter, - } - - impl TestPageReader { - pub fn new(pages: Vec) -> Self { - Self { - pages: pages.into_iter(), - } - } - } - - impl PageReader for TestPageReader { - fn get_next_page(&mut self) -> Result> { - Ok(self.pages.next()) - } - } - - impl Iterator for TestPageReader { - type Item = Result; - - fn next(&mut self) -> Option { - self.get_next_page().transpose() + assert!( + curr_levels_read >= curr_values_read, + "expected levels read to be greater than values read" + ); } } } diff --git a/parquet/src/util/test_common/page_util.rs b/parquet/src/util/test_common/page_util.rs index ffa559f3fec..3719d280a90 100644 --- a/parquet/src/util/test_common/page_util.rs +++ b/parquet/src/util/test_common/page_util.rs @@ -100,11 +100,7 @@ impl DataPageBuilder for DataPageBuilderImpl { } fn add_def_levels(&mut self, max_levels: i16, def_levels: &[i16]) { - assert!( - self.num_values == def_levels.len() as u32, - "Must call `add_rep_levels() first!`" - ); - + self.num_values = def_levels.len() as u32; self.def_levels_byte_len = self.add_levels(max_levels, def_levels); }