From b701d227a8c9c251d5615ed92e0e43abb832136f Mon Sep 17 00:00:00 2001 From: yangjiang Date: Tue, 19 Jul 2022 16:21:02 +0800 Subject: [PATCH 1/9] add test --- parquet/src/arrow/arrow_reader.rs | 45 ++++++++++++++++++++++++++++--- 1 file changed, 41 insertions(+), 4 deletions(-) diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs index 770477b025c..75a6dd1ce95 100644 --- a/parquet/src/arrow/arrow_reader.rs +++ b/parquet/src/arrow/arrow_reader.rs @@ -33,6 +33,7 @@ use crate::arrow::ProjectionMask; use crate::errors::{ParquetError, Result}; use crate::file::metadata::{KeyValue, ParquetMetaData}; use crate::file::reader::{ChunkReader, FileReader, SerializedFileReader}; +use crate::file::serialized_reader::ReadOptionsBuilder; use crate::schema::types::SchemaDescriptor; /// Arrow reader api. @@ -217,7 +218,12 @@ impl ParquetFileArrowReader { chunk_reader: R, options: ArrowReaderOptions, ) -> Result { - let file_reader = Arc::new(SerializedFileReader::new(chunk_reader)?); + let file_reader= if options.selection.is_some() { + let options = ReadOptionsBuilder::new().with_page_index().build(); + Arc::new(SerializedFileReader::new_with_options(chunk_reader,options)?) + }else { + Arc::new(SerializedFileReader::new(chunk_reader)?) + }; Ok(Self::new_with_options(file_reader, options)) } @@ -388,9 +394,7 @@ mod tests { use arrow::error::Result as ArrowResult; use arrow::record_batch::{RecordBatch, RecordBatchReader}; - use crate::arrow::arrow_reader::{ - ArrowReader, ArrowReaderOptions, ParquetFileArrowReader, - }; + use crate::arrow::arrow_reader::{ArrowReader, ArrowReaderOptions, ParquetFileArrowReader, RowSelection}; use crate::arrow::buffer::converter::{ BinaryArrayConverter, Converter, FixedSizeArrayConverter, FromConverter, IntervalDayTimeArrayConverter, LargeUtf8ArrayConverter, Utf8ArrayConverter, @@ -405,6 +409,7 @@ mod tests { use crate::errors::Result; use crate::file::properties::{WriterProperties, WriterVersion}; use crate::file::reader::{FileReader, SerializedFileReader}; + use crate::file::serialized_reader::ReadOptionsBuilder; use crate::file::writer::SerializedFileWriter; use crate::schema::parser::parse_message_type; use crate::schema::types::{Type, TypePtr}; @@ -1586,4 +1591,36 @@ mod tests { test_row_group_batch(MIN_BATCH_SIZE, MIN_BATCH_SIZE - 1); test_row_group_batch(MIN_BATCH_SIZE - 1, MIN_BATCH_SIZE); } + + #[test] + fn test_scan_row_with_selection() { + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{}/alltypes_tiny_pages_plain.parquet", testdata); + let test_file = File::open(&path).unwrap(); + // total row count 7300 + // 1. test selection size more than one page row count + let selections = create_test_selection(50, 7300); + let arrow_reader_options = ArrowReaderOptions::new().with_row_selection(selections); + let mut arrow_reader = ParquetFileArrowReader::try_new_with_options(test_file, arrow_reader_options).unwrap(); + let reader = arrow_reader.get_record_reader(50).unwrap(); + + for batch in reader { + assert_eq!(batch.unwrap().num_rows(), 50) + } + } + + fn create_test_selection(step_len: usize, total_len: usize) -> Vec { + let mut cur_offset = 0; + let mut skip = false; + let mut vec = vec![]; + while cur_offset < total_len { + vec.push(RowSelection{ + row_count: step_len, + skip + }); + cur_offset += step_len; + skip = !skip; + } + vec + } } From af8b54f127611cc89aa203b7a2e7caa73e77620f Mon Sep 17 00:00:00 2001 From: yangjiang Date: Wed, 20 Jul 2022 19:27:30 +0800 Subject: [PATCH 2/9] fix some skip bug. --- parquet/src/arrow/arrow_reader.rs | 17 ++++++++++++----- parquet/src/column/reader.rs | 13 ++++++++++--- 2 files changed, 22 insertions(+), 8 deletions(-) diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs index 75a6dd1ce95..5bf7345be19 100644 --- a/parquet/src/arrow/arrow_reader.rs +++ b/parquet/src/arrow/arrow_reader.rs @@ -304,9 +304,14 @@ impl Iterator for ParquetRecordBatchReader { continue; } + // try to read record let to_read = match front.row_count.checked_sub(self.batch_size) { Some(remaining) => { - selection.push_front(RowSelection::skip(remaining)); + // if page row count less than batch_size we must set batch size to page row count. + // add check avoid dead loop + if remaining != 0 { + selection.push_front(RowSelection::select(remaining)); + } self.batch_size } None => front.row_count, @@ -409,7 +414,6 @@ mod tests { use crate::errors::Result; use crate::file::properties::{WriterProperties, WriterVersion}; use crate::file::reader::{FileReader, SerializedFileReader}; - use crate::file::serialized_reader::ReadOptionsBuilder; use crate::file::writer::SerializedFileWriter; use crate::schema::parser::parse_message_type; use crate::schema::types::{Type, TypePtr}; @@ -1599,13 +1603,16 @@ mod tests { let test_file = File::open(&path).unwrap(); // total row count 7300 // 1. test selection size more than one page row count - let selections = create_test_selection(50, 7300); + let selections = create_test_selection(1000, 7300); let arrow_reader_options = ArrowReaderOptions::new().with_row_selection(selections); let mut arrow_reader = ParquetFileArrowReader::try_new_with_options(test_file, arrow_reader_options).unwrap(); - let reader = arrow_reader.get_record_reader(50).unwrap(); + let reader = arrow_reader.get_record_reader(1000).unwrap(); + let mut batch_count = 0; for batch in reader { - assert_eq!(batch.unwrap().num_rows(), 50) + println!("batch_count: {}", batch_count); + assert_eq!(batch.unwrap().num_rows(), 1000); + batch_count += 1; } } diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs index ea00bcf1ba8..ad5b7adc42d 100644 --- a/parquet/src/column/reader.rs +++ b/parquet/src/column/reader.rs @@ -299,7 +299,7 @@ where let mut remaining = num_records; while remaining != 0 { if self.num_buffered_values == self.num_decoded_values { - let metadata = match self.page_reader.peek_next_page()? { + let mut metadata = match self.page_reader.peek_next_page()? { None => return Ok(num_records - remaining), Some(metadata) => metadata, }; @@ -312,13 +312,20 @@ where // If page has less rows than the remaining records to // be skipped, skip entire page - if metadata.num_rows < remaining { + while metadata.num_rows < remaining { self.page_reader.skip_next_page()?; remaining -= metadata.num_rows; - continue; + metadata = match self.page_reader.peek_next_page()? { + None => return Ok(num_records - remaining), + Some(metadata) => metadata, + }; } + // because self.num_buffered_values == self.num_decoded_values means + // we need reads a new page and set up the decoders for levels + self.read_new_page()?; } + // start skip values in page level let to_read = remaining .min((self.num_buffered_values - self.num_decoded_values) as usize); From 333288dfa73917848c6d48375b9f19a4bdf2e9de Mon Sep 17 00:00:00 2001 From: yangjiang Date: Wed, 20 Jul 2022 20:15:47 +0800 Subject: [PATCH 3/9] add it. --- parquet/src/arrow/arrow_reader.rs | 105 ++++++++++++++++++++++-------- 1 file changed, 77 insertions(+), 28 deletions(-) diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs index 5bf7345be19..d50b3b0ff82 100644 --- a/parquet/src/arrow/arrow_reader.rs +++ b/parquet/src/arrow/arrow_reader.rs @@ -218,10 +218,10 @@ impl ParquetFileArrowReader { chunk_reader: R, options: ArrowReaderOptions, ) -> Result { - let file_reader= if options.selection.is_some() { + let file_reader = if options.selection.is_some() { let options = ReadOptionsBuilder::new().with_page_index().build(); - Arc::new(SerializedFileReader::new_with_options(chunk_reader,options)?) - }else { + Arc::new(SerializedFileReader::new_with_options(chunk_reader, options)?) + } else { Arc::new(SerializedFileReader::new(chunk_reader)?) }; Ok(Self::new_with_options(file_reader, options)) @@ -299,7 +299,7 @@ impl Iterator for ParquetRecordBatchReader { front.row_count, skipped ) - .into())); + .into())); } continue; } @@ -496,7 +496,7 @@ mod tests { Some(Field::new("int32", ArrowDataType::Null, true)), &Default::default(), ) - .unwrap(); + .unwrap(); file.rewind().unwrap(); @@ -941,8 +941,8 @@ mod tests { std::iter::from_fn(|| { Some((rng.next_u32() as usize % 100 >= *null_percent) as i16) }) - .take(opts.num_rows) - .collect() + .take(opts.num_rows) + .collect() }) .collect(); (Repetition::OPTIONAL, Some(def_levels)) @@ -996,7 +996,7 @@ mod tests { arrow_field, &opts, ) - .unwrap(); + .unwrap(); file.rewind().unwrap(); @@ -1228,7 +1228,7 @@ mod tests { schema, writer_props, ) - .unwrap(); + .unwrap(); { let mut row_group_writer = writer.next_row_group().unwrap(); @@ -1355,7 +1355,7 @@ mod tests { Some(arrow_field), &opts, ) - .unwrap(); + .unwrap(); file.rewind().unwrap(); @@ -1471,7 +1471,7 @@ mod tests { batch.schema(), Some(props), ) - .unwrap(); + .unwrap(); writer.write(&batch).unwrap(); writer.close().unwrap(); file @@ -1556,7 +1556,7 @@ mod tests { .build(), ), ) - .unwrap(); + .unwrap(); for _ in 0..2 { let mut list_builder = ListBuilder::new(Int32Builder::new(batch_size)); for _ in 0..(batch_size) { @@ -1566,7 +1566,7 @@ mod tests { schema.clone(), vec![Arc::new(list_builder.finish())], ) - .unwrap(); + .unwrap(); writer.write(&batch).unwrap(); } writer.close().unwrap(); @@ -1603,29 +1603,78 @@ mod tests { let test_file = File::open(&path).unwrap(); // total row count 7300 // 1. test selection size more than one page row count - let selections = create_test_selection(1000, 7300); + let batch_size = 1000; + let selections = create_test_selection(batch_size, 7300); + let arrow_reader_options = ArrowReaderOptions::new().with_row_selection(selections); + + let mut skip_arrow_reader = ParquetFileArrowReader::try_new_with_options(test_file.try_clone().unwrap(), arrow_reader_options).unwrap(); + let skip_reader = skip_arrow_reader.get_record_reader(batch_size).unwrap(); + + let mut serial_arrow_reader = ParquetFileArrowReader::try_new(test_file.try_clone().unwrap()).unwrap(); + let serial_reader = serial_arrow_reader.get_record_reader(batch_size).unwrap(); + + let mut expected_data = vec![]; + for batch in serial_reader { + expected_data.push(batch.unwrap()); + } + + let mut total_row_count = 0; + let mut index = 0; + for batch in skip_reader { + let batch = batch.unwrap(); + assert!(batch.eq(expected_data.get(index).unwrap())); + index += 2; + let num = batch.num_rows(); + assert!(num == 1000 || num == 300); + total_row_count += num; + } + assert_eq!(total_row_count, 4000); + + // 2. test selection size less than one page row count + let batch_size = 10; + let selections = create_test_selection(batch_size, 7300); let arrow_reader_options = ArrowReaderOptions::new().with_row_selection(selections); - let mut arrow_reader = ParquetFileArrowReader::try_new_with_options(test_file, arrow_reader_options).unwrap(); - let reader = arrow_reader.get_record_reader(1000).unwrap(); - - let mut batch_count = 0; - for batch in reader { - println!("batch_count: {}", batch_count); - assert_eq!(batch.unwrap().num_rows(), 1000); - batch_count += 1; + + let mut skip_arrow_reader = ParquetFileArrowReader::try_new_with_options(test_file.try_clone().unwrap(), arrow_reader_options).unwrap(); + let skip_reader = skip_arrow_reader.get_record_reader(batch_size).unwrap(); + + let mut serial_arrow_reader = ParquetFileArrowReader::try_new(test_file.try_clone().unwrap()).unwrap(); + let serial_reader = serial_arrow_reader.get_record_reader(batch_size).unwrap(); + + let mut expected_data = vec![]; + for batch in serial_reader { + expected_data.push(batch.unwrap()); + } + + let mut total_row_count = 0; + let mut index = 0; + for batch in skip_reader { + let batch = batch.unwrap(); + assert!(batch.eq(expected_data.get(index).unwrap())); + index += 2; + let num = batch.num_rows(); + assert_eq!(num, 10); + total_row_count += num; } + assert_eq!(total_row_count, 3650); + } fn create_test_selection(step_len: usize, total_len: usize) -> Vec { - let mut cur_offset = 0; + let mut remaining = total_len; let mut skip = false; let mut vec = vec![]; - while cur_offset < total_len { - vec.push(RowSelection{ - row_count: step_len, - skip + while remaining != 0 { + let step = if remaining > step_len { + step_len + } else { + remaining + }; + vec.push(RowSelection { + row_count: step, + skip, }); - cur_offset += step_len; + remaining -= step; skip = !skip; } vec From f277b24e397631c4bfc605d01c33018132702055 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Mon, 25 Jul 2022 12:06:06 +0800 Subject: [PATCH 4/9] fix skip in head. --- parquet/src/arrow/array_reader/byte_array.rs | 2 +- .../array_reader/byte_array_dictionary.rs | 2 +- .../array_reader/complex_object_array.rs | 8 ++- parquet/src/arrow/array_reader/null_array.rs | 2 +- .../src/arrow/array_reader/primitive_array.rs | 2 +- parquet/src/arrow/arrow_reader.rs | 54 ++++++++++++------- parquet/src/arrow/record_reader/mod.rs | 18 ++++++- 7 files changed, 62 insertions(+), 26 deletions(-) diff --git a/parquet/src/arrow/array_reader/byte_array.rs b/parquet/src/arrow/array_reader/byte_array.rs index 60489a26b93..05e802b9719 100644 --- a/parquet/src/arrow/array_reader/byte_array.rs +++ b/parquet/src/arrow/array_reader/byte_array.rs @@ -120,7 +120,7 @@ impl ArrayReader for ByteArrayReader { } fn skip_records(&mut self, num_records: usize) -> Result { - self.record_reader.skip_records(num_records) + self.record_reader.skip_records(num_records, self.pages.as_mut()) } fn get_def_levels(&self) -> Option<&[i16]> { diff --git a/parquet/src/arrow/array_reader/byte_array_dictionary.rs b/parquet/src/arrow/array_reader/byte_array_dictionary.rs index 486dfe211e1..c5b2aad98fb 100644 --- a/parquet/src/arrow/array_reader/byte_array_dictionary.rs +++ b/parquet/src/arrow/array_reader/byte_array_dictionary.rs @@ -181,7 +181,7 @@ where } fn skip_records(&mut self, num_records: usize) -> Result { - self.record_reader.skip_records(num_records) + self.record_reader.skip_records(num_records, self.pages.as_mut()) } fn get_def_levels(&self) -> Option<&[i16]> { diff --git a/parquet/src/arrow/array_reader/complex_object_array.rs b/parquet/src/arrow/array_reader/complex_object_array.rs index 6e7585ff944..1390866cf6a 100644 --- a/parquet/src/arrow/array_reader/complex_object_array.rs +++ b/parquet/src/arrow/array_reader/complex_object_array.rs @@ -166,7 +166,13 @@ where fn skip_records(&mut self, num_records: usize) -> Result { match self.column_reader.as_mut() { Some(reader) => reader.skip_records(num_records), - None => Ok(0), + None => { + if self.next_column_reader()? { + self.column_reader.as_mut().unwrap().skip_records(num_records) + }else { + Ok(0) + } + } } } diff --git a/parquet/src/arrow/array_reader/null_array.rs b/parquet/src/arrow/array_reader/null_array.rs index b207d8b2c56..a25fc08a791 100644 --- a/parquet/src/arrow/array_reader/null_array.rs +++ b/parquet/src/arrow/array_reader/null_array.rs @@ -97,7 +97,7 @@ where } fn skip_records(&mut self, num_records: usize) -> Result { - self.record_reader.skip_records(num_records) + self.record_reader.skip_records(num_records, self.pages.as_mut()) } fn get_def_levels(&self) -> Option<&[i16]> { diff --git a/parquet/src/arrow/array_reader/primitive_array.rs b/parquet/src/arrow/array_reader/primitive_array.rs index 0488e254c90..fcd2fc24f7d 100644 --- a/parquet/src/arrow/array_reader/primitive_array.rs +++ b/parquet/src/arrow/array_reader/primitive_array.rs @@ -222,7 +222,7 @@ where } fn skip_records(&mut self, num_records: usize) -> Result { - self.record_reader.skip_records(num_records) + self.record_reader.skip_records(num_records, self.pages.as_mut()) } fn get_def_levels(&self) -> Option<&[i16]> { diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs index d50b3b0ff82..d330026f4f3 100644 --- a/parquet/src/arrow/arrow_reader.rs +++ b/parquet/src/arrow/arrow_reader.rs @@ -220,7 +220,10 @@ impl ParquetFileArrowReader { ) -> Result { let file_reader = if options.selection.is_some() { let options = ReadOptionsBuilder::new().with_page_index().build(); - Arc::new(SerializedFileReader::new_with_options(chunk_reader, options)?) + Arc::new(SerializedFileReader::new_with_options( + chunk_reader, + options, + )?) } else { Arc::new(SerializedFileReader::new(chunk_reader)?) }; @@ -299,7 +302,7 @@ impl Iterator for ParquetRecordBatchReader { front.row_count, skipped ) - .into())); + .into())); } continue; } @@ -399,7 +402,9 @@ mod tests { use arrow::error::Result as ArrowResult; use arrow::record_batch::{RecordBatch, RecordBatchReader}; - use crate::arrow::arrow_reader::{ArrowReader, ArrowReaderOptions, ParquetFileArrowReader, RowSelection}; + use crate::arrow::arrow_reader::{ + ArrowReader, ArrowReaderOptions, ParquetFileArrowReader, RowSelection, + }; use crate::arrow::buffer::converter::{ BinaryArrayConverter, Converter, FixedSizeArrayConverter, FromConverter, IntervalDayTimeArrayConverter, LargeUtf8ArrayConverter, Utf8ArrayConverter, @@ -496,7 +501,7 @@ mod tests { Some(Field::new("int32", ArrowDataType::Null, true)), &Default::default(), ) - .unwrap(); + .unwrap(); file.rewind().unwrap(); @@ -941,8 +946,8 @@ mod tests { std::iter::from_fn(|| { Some((rng.next_u32() as usize % 100 >= *null_percent) as i16) }) - .take(opts.num_rows) - .collect() + .take(opts.num_rows) + .collect() }) .collect(); (Repetition::OPTIONAL, Some(def_levels)) @@ -996,7 +1001,7 @@ mod tests { arrow_field, &opts, ) - .unwrap(); + .unwrap(); file.rewind().unwrap(); @@ -1228,7 +1233,7 @@ mod tests { schema, writer_props, ) - .unwrap(); + .unwrap(); { let mut row_group_writer = writer.next_row_group().unwrap(); @@ -1355,7 +1360,7 @@ mod tests { Some(arrow_field), &opts, ) - .unwrap(); + .unwrap(); file.rewind().unwrap(); @@ -1471,7 +1476,7 @@ mod tests { batch.schema(), Some(props), ) - .unwrap(); + .unwrap(); writer.write(&batch).unwrap(); writer.close().unwrap(); file @@ -1556,7 +1561,7 @@ mod tests { .build(), ), ) - .unwrap(); + .unwrap(); for _ in 0..2 { let mut list_builder = ListBuilder::new(Int32Builder::new(batch_size)); for _ in 0..(batch_size) { @@ -1566,7 +1571,7 @@ mod tests { schema.clone(), vec![Arc::new(list_builder.finish())], ) - .unwrap(); + .unwrap(); writer.write(&batch).unwrap(); } writer.close().unwrap(); @@ -1605,12 +1610,18 @@ mod tests { // 1. test selection size more than one page row count let batch_size = 1000; let selections = create_test_selection(batch_size, 7300); - let arrow_reader_options = ArrowReaderOptions::new().with_row_selection(selections); + let arrow_reader_options = + ArrowReaderOptions::new().with_row_selection(selections); - let mut skip_arrow_reader = ParquetFileArrowReader::try_new_with_options(test_file.try_clone().unwrap(), arrow_reader_options).unwrap(); + let mut skip_arrow_reader = ParquetFileArrowReader::try_new_with_options( + test_file.try_clone().unwrap(), + arrow_reader_options, + ) + .unwrap(); let skip_reader = skip_arrow_reader.get_record_reader(batch_size).unwrap(); - let mut serial_arrow_reader = ParquetFileArrowReader::try_new(test_file.try_clone().unwrap()).unwrap(); + let mut serial_arrow_reader = + ParquetFileArrowReader::try_new(test_file.try_clone().unwrap()).unwrap(); let serial_reader = serial_arrow_reader.get_record_reader(batch_size).unwrap(); let mut expected_data = vec![]; @@ -1633,12 +1644,18 @@ mod tests { // 2. test selection size less than one page row count let batch_size = 10; let selections = create_test_selection(batch_size, 7300); - let arrow_reader_options = ArrowReaderOptions::new().with_row_selection(selections); + let arrow_reader_options = + ArrowReaderOptions::new().with_row_selection(selections); - let mut skip_arrow_reader = ParquetFileArrowReader::try_new_with_options(test_file.try_clone().unwrap(), arrow_reader_options).unwrap(); + let mut skip_arrow_reader = ParquetFileArrowReader::try_new_with_options( + test_file.try_clone().unwrap(), + arrow_reader_options, + ) + .unwrap(); let skip_reader = skip_arrow_reader.get_record_reader(batch_size).unwrap(); - let mut serial_arrow_reader = ParquetFileArrowReader::try_new(test_file.try_clone().unwrap()).unwrap(); + let mut serial_arrow_reader = + ParquetFileArrowReader::try_new(test_file.try_clone().unwrap()).unwrap(); let serial_reader = serial_arrow_reader.get_record_reader(batch_size).unwrap(); let mut expected_data = vec![]; @@ -1657,7 +1674,6 @@ mod tests { total_row_count += num; } assert_eq!(total_row_count, 3650); - } fn create_test_selection(step_len: usize, total_len: usize) -> Vec { diff --git a/parquet/src/arrow/record_reader/mod.rs b/parquet/src/arrow/record_reader/mod.rs index 04499997e83..195671db1cd 100644 --- a/parquet/src/arrow/record_reader/mod.rs +++ b/parquet/src/arrow/record_reader/mod.rs @@ -24,6 +24,7 @@ use crate::arrow::record_reader::{ buffer::{BufferQueue, ScalarBuffer, ValuesBuffer}, definition_levels::{DefinitionLevelBuffer, DefinitionLevelBufferDecoder}, }; +use crate::column::page::PageIterator; use crate::column::{ page::PageReader, reader::{ @@ -184,11 +185,24 @@ where /// # Returns /// /// Number of records skipped - pub fn skip_records(&mut self, num_records: usize) -> Result { + pub fn skip_records( + &mut self, + num_records: usize, + pages: &mut dyn PageIterator, + ) -> Result { // First need to clear the buffer let end_of_column = match self.column_reader.as_mut() { Some(reader) => !reader.has_next()?, - None => return Ok(0), + None => { + // If we skip records before all read operation + // we need set `column_reader` by `set_page_reader` + if let Some(page_reader) = pages.next() { + self.set_page_reader(page_reader?)?; + false + } else { + return Ok(0); + } + } }; let (buffered_records, buffered_values) = From 4b2728004f787675142e0d07a90eb96e7a6cf262 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Mon, 25 Jul 2022 14:44:41 +0800 Subject: [PATCH 5/9] refine test case --- parquet/src/arrow/arrow_reader.rs | 222 ++++++++++++++++++++---------- 1 file changed, 148 insertions(+), 74 deletions(-) diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs index d330026f4f3..6a522161521 100644 --- a/parquet/src/arrow/arrow_reader.rs +++ b/parquet/src/arrow/arrow_reader.rs @@ -302,7 +302,7 @@ impl Iterator for ParquetRecordBatchReader { front.row_count, skipped ) - .into())); + .into())); } continue; } @@ -402,9 +402,7 @@ mod tests { use arrow::error::Result as ArrowResult; use arrow::record_batch::{RecordBatch, RecordBatchReader}; - use crate::arrow::arrow_reader::{ - ArrowReader, ArrowReaderOptions, ParquetFileArrowReader, RowSelection, - }; + use crate::arrow::arrow_reader::{ArrowReader, ArrowReaderOptions, ParquetFileArrowReader, ParquetRecordBatchReader, RowSelection}; use crate::arrow::buffer::converter::{ BinaryArrayConverter, Converter, FixedSizeArrayConverter, FromConverter, IntervalDayTimeArrayConverter, LargeUtf8ArrayConverter, Utf8ArrayConverter, @@ -501,7 +499,7 @@ mod tests { Some(Field::new("int32", ArrowDataType::Null, true)), &Default::default(), ) - .unwrap(); + .unwrap(); file.rewind().unwrap(); @@ -946,8 +944,8 @@ mod tests { std::iter::from_fn(|| { Some((rng.next_u32() as usize % 100 >= *null_percent) as i16) }) - .take(opts.num_rows) - .collect() + .take(opts.num_rows) + .collect() }) .collect(); (Repetition::OPTIONAL, Some(def_levels)) @@ -1001,7 +999,7 @@ mod tests { arrow_field, &opts, ) - .unwrap(); + .unwrap(); file.rewind().unwrap(); @@ -1233,7 +1231,7 @@ mod tests { schema, writer_props, ) - .unwrap(); + .unwrap(); { let mut row_group_writer = writer.next_row_group().unwrap(); @@ -1360,7 +1358,7 @@ mod tests { Some(arrow_field), &opts, ) - .unwrap(); + .unwrap(); file.rewind().unwrap(); @@ -1476,7 +1474,7 @@ mod tests { batch.schema(), Some(props), ) - .unwrap(); + .unwrap(); writer.write(&batch).unwrap(); writer.close().unwrap(); file @@ -1561,7 +1559,7 @@ mod tests { .build(), ), ) - .unwrap(); + .unwrap(); for _ in 0..2 { let mut list_builder = ListBuilder::new(Int32Builder::new(batch_size)); for _ in 0..(batch_size) { @@ -1571,7 +1569,7 @@ mod tests { schema.clone(), vec![Arc::new(list_builder.finish())], ) - .unwrap(); + .unwrap(); writer.write(&batch).unwrap(); } writer.close().unwrap(); @@ -1606,93 +1604,169 @@ mod tests { let testdata = arrow::util::test_util::parquet_test_data(); let path = format!("{}/alltypes_tiny_pages_plain.parquet", testdata); let test_file = File::open(&path).unwrap(); + // total row count 7300 - // 1. test selection size more than one page row count + // 1. test selection len more than one page row count let batch_size = 1000; - let selections = create_test_selection(batch_size, 7300); - let arrow_reader_options = - ArrowReaderOptions::new().with_row_selection(selections); + let expected_data = create_expect_batch(&test_file, batch_size); - let mut skip_arrow_reader = ParquetFileArrowReader::try_new_with_options( - test_file.try_clone().unwrap(), - arrow_reader_options, - ) - .unwrap(); - let skip_reader = skip_arrow_reader.get_record_reader(batch_size).unwrap(); - - let mut serial_arrow_reader = - ParquetFileArrowReader::try_new(test_file.try_clone().unwrap()).unwrap(); - let serial_reader = serial_arrow_reader.get_record_reader(batch_size).unwrap(); + let selections = create_test_selection(batch_size, 7300, false); + let skip_reader = create_skip_reader(&test_file, batch_size, selections); + let mut total_row_count = 0; + let mut index = 0; + for batch in skip_reader { + let batch = batch.unwrap(); + assert_eq!(batch, expected_data.get(index).unwrap().clone()); + index += 2; + let num = batch.num_rows(); + assert!(num == batch_size || num == 300); + total_row_count += num; + } + assert_eq!(total_row_count, 4000); - let mut expected_data = vec![]; - for batch in serial_reader { - expected_data.push(batch.unwrap()); + let selections = create_test_selection(batch_size, 7300, true); + let skip_reader = create_skip_reader(&test_file, batch_size, selections); + let mut total_row_count = 0; + let mut index = 1; + for batch in skip_reader { + let batch = batch.unwrap(); + assert_eq!(batch, expected_data.get(index).unwrap().clone()); + index += 2; + let num = batch.num_rows(); + //the lase batch will be 300 + assert!(num == batch_size || num == 300); + total_row_count += num; } + assert_eq!(total_row_count, 3300); + + // 2. test selection len less than one page row count + let batch_size = 20; + let expected_data = create_expect_batch(&test_file, batch_size); + let selections = create_test_selection(batch_size, 7300, false); + let skip_reader = create_skip_reader(&test_file, batch_size, selections); let mut total_row_count = 0; let mut index = 0; for batch in skip_reader { let batch = batch.unwrap(); - assert!(batch.eq(expected_data.get(index).unwrap())); + assert_eq!(batch, expected_data.get(index).unwrap().clone()); index += 2; let num = batch.num_rows(); - assert!(num == 1000 || num == 300); + assert_eq!(num, batch_size); total_row_count += num; } - assert_eq!(total_row_count, 4000); + assert_eq!(total_row_count, 3660); - // 2. test selection size less than one page row count - let batch_size = 10; - let selections = create_test_selection(batch_size, 7300); - let arrow_reader_options = - ArrowReaderOptions::new().with_row_selection(selections); + let selections = create_test_selection(batch_size, 7300, true); + let skip_reader = create_skip_reader(&test_file, batch_size, selections); + let mut total_row_count = 0; + let mut index = 1; + for batch in skip_reader { + let batch = batch.unwrap(); + assert_eq!(batch, expected_data.get(index).unwrap().clone()); + index += 2; + let num = batch.num_rows(); + assert_eq!(num, batch_size); + total_row_count += num; + } + assert_eq!(total_row_count, 3640); - let mut skip_arrow_reader = ParquetFileArrowReader::try_new_with_options( - test_file.try_clone().unwrap(), - arrow_reader_options, - ) - .unwrap(); - let skip_reader = skip_arrow_reader.get_record_reader(batch_size).unwrap(); + // 3. test selection_len less than batch_size + let batch_size = 20; + let selection_len = 5; + let expected_data_batch = create_expect_batch(&test_file, batch_size); + let expected_data_selection = create_expect_batch(&test_file, selection_len); + let selections = create_test_selection(selection_len, 7300, false); + let skip_reader = create_skip_reader(&test_file, batch_size, selections); - let mut serial_arrow_reader = - ParquetFileArrowReader::try_new(test_file.try_clone().unwrap()).unwrap(); - let serial_reader = serial_arrow_reader.get_record_reader(batch_size).unwrap(); + let mut total_row_count = 0; - let mut expected_data = vec![]; - for batch in serial_reader { - expected_data.push(batch.unwrap()); + for batch in skip_reader { + let batch = batch.unwrap(); + let num = batch.num_rows(); + assert!(num == batch_size || num == selection_len); + if num == batch_size { + assert_eq!(batch, expected_data_batch.get(total_row_count / batch_size).unwrap().clone()); + total_row_count += batch_size; + } else if num == selection_len { + assert_eq!(batch, expected_data_selection.get(total_row_count / selection_len).unwrap().clone()); + total_row_count += selection_len; + } + // add skip offset + total_row_count += selection_len; } + // 4. test selection_len more than batch_size + // If batch_size < selection_len will divide selection(50, read) -> + // selection(20, read), selection(20, read), selection(10, read) + let batch_size = 20; + let selection_len = 50; + let another_batch_size = 10; + let expected_data_batch = create_expect_batch(&test_file, batch_size); + let expected_data_batch2 = create_expect_batch(&test_file, another_batch_size); + let selections = create_test_selection(selection_len, 7300, false); + let skip_reader = create_skip_reader(&test_file, batch_size, selections); + let mut total_row_count = 0; - let mut index = 0; + for batch in skip_reader { let batch = batch.unwrap(); - assert!(batch.eq(expected_data.get(index).unwrap())); - index += 2; let num = batch.num_rows(); - assert_eq!(num, 10); - total_row_count += num; + assert!(num == batch_size || num == another_batch_size); + if num == batch_size { + assert_eq!(batch, expected_data_batch.get(total_row_count / batch_size).unwrap().clone()); + total_row_count += batch_size; + } else if num == another_batch_size { + assert_eq!(batch, expected_data_batch2.get(total_row_count / another_batch_size).unwrap().clone()); + total_row_count += 10; + // add skip offset + total_row_count += selection_len; + } } - assert_eq!(total_row_count, 3650); - } - fn create_test_selection(step_len: usize, total_len: usize) -> Vec { - let mut remaining = total_len; - let mut skip = false; - let mut vec = vec![]; - while remaining != 0 { - let step = if remaining > step_len { - step_len - } else { - remaining - }; - vec.push(RowSelection { - row_count: step, - skip, - }); - remaining -= step; - skip = !skip; + fn create_skip_reader(test_file: &File, batch_size: usize, selections: Vec) -> ParquetRecordBatchReader { + let arrow_reader_options = + ArrowReaderOptions::new().with_row_selection(selections); + + let mut skip_arrow_reader = ParquetFileArrowReader::try_new_with_options( + test_file.try_clone().unwrap(), + arrow_reader_options, + ) + .unwrap(); + + let skip_reader = skip_arrow_reader.get_record_reader(batch_size).unwrap(); + skip_reader + } + + fn create_test_selection(step_len: usize, total_len: usize, skip_first: bool) -> Vec { + let mut remaining = total_len; + let mut skip = skip_first; + let mut vec = vec![]; + while remaining != 0 { + let step = if remaining > step_len { + step_len + } else { + remaining + }; + vec.push(RowSelection { + row_count: step, + skip, + }); + remaining -= step; + skip = !skip; + } + vec + } + + fn create_expect_batch(test_file: &File, batch_size: usize) -> Vec { + let mut serial_arrow_reader = + ParquetFileArrowReader::try_new(test_file.try_clone().unwrap()).unwrap(); + let serial_reader = serial_arrow_reader.get_record_reader(batch_size).unwrap(); + let mut expected_data = vec![]; + for batch in serial_reader { + expected_data.push(batch.unwrap()); + } + expected_data } - vec } } From 7153bee8a3b6128358df326bd1e993db2fe151b6 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Mon, 25 Jul 2022 15:10:54 +0800 Subject: [PATCH 6/9] fix fmt. --- parquet/src/arrow/arrow_reader.rs | 74 +++++++++++++++++++++++-------- 1 file changed, 55 insertions(+), 19 deletions(-) diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs index 6a522161521..826de3d00c9 100644 --- a/parquet/src/arrow/arrow_reader.rs +++ b/parquet/src/arrow/arrow_reader.rs @@ -302,7 +302,7 @@ impl Iterator for ParquetRecordBatchReader { front.row_count, skipped ) - .into())); + .into())); } continue; } @@ -402,7 +402,10 @@ mod tests { use arrow::error::Result as ArrowResult; use arrow::record_batch::{RecordBatch, RecordBatchReader}; - use crate::arrow::arrow_reader::{ArrowReader, ArrowReaderOptions, ParquetFileArrowReader, ParquetRecordBatchReader, RowSelection}; + use crate::arrow::arrow_reader::{ + ArrowReader, ArrowReaderOptions, ParquetFileArrowReader, + ParquetRecordBatchReader, RowSelection, + }; use crate::arrow::buffer::converter::{ BinaryArrayConverter, Converter, FixedSizeArrayConverter, FromConverter, IntervalDayTimeArrayConverter, LargeUtf8ArrayConverter, Utf8ArrayConverter, @@ -499,7 +502,7 @@ mod tests { Some(Field::new("int32", ArrowDataType::Null, true)), &Default::default(), ) - .unwrap(); + .unwrap(); file.rewind().unwrap(); @@ -944,8 +947,8 @@ mod tests { std::iter::from_fn(|| { Some((rng.next_u32() as usize % 100 >= *null_percent) as i16) }) - .take(opts.num_rows) - .collect() + .take(opts.num_rows) + .collect() }) .collect(); (Repetition::OPTIONAL, Some(def_levels)) @@ -999,7 +1002,7 @@ mod tests { arrow_field, &opts, ) - .unwrap(); + .unwrap(); file.rewind().unwrap(); @@ -1231,7 +1234,7 @@ mod tests { schema, writer_props, ) - .unwrap(); + .unwrap(); { let mut row_group_writer = writer.next_row_group().unwrap(); @@ -1358,7 +1361,7 @@ mod tests { Some(arrow_field), &opts, ) - .unwrap(); + .unwrap(); file.rewind().unwrap(); @@ -1474,7 +1477,7 @@ mod tests { batch.schema(), Some(props), ) - .unwrap(); + .unwrap(); writer.write(&batch).unwrap(); writer.close().unwrap(); file @@ -1559,7 +1562,7 @@ mod tests { .build(), ), ) - .unwrap(); + .unwrap(); for _ in 0..2 { let mut list_builder = ListBuilder::new(Int32Builder::new(batch_size)); for _ in 0..(batch_size) { @@ -1569,7 +1572,7 @@ mod tests { schema.clone(), vec![Arc::new(list_builder.finish())], ) - .unwrap(); + .unwrap(); writer.write(&batch).unwrap(); } writer.close().unwrap(); @@ -1686,10 +1689,22 @@ mod tests { let num = batch.num_rows(); assert!(num == batch_size || num == selection_len); if num == batch_size { - assert_eq!(batch, expected_data_batch.get(total_row_count / batch_size).unwrap().clone()); + assert_eq!( + batch, + expected_data_batch + .get(total_row_count / batch_size) + .unwrap() + .clone() + ); total_row_count += batch_size; } else if num == selection_len { - assert_eq!(batch, expected_data_selection.get(total_row_count / selection_len).unwrap().clone()); + assert_eq!( + batch, + expected_data_selection + .get(total_row_count / selection_len) + .unwrap() + .clone() + ); total_row_count += selection_len; } // add skip offset @@ -1714,17 +1729,33 @@ mod tests { let num = batch.num_rows(); assert!(num == batch_size || num == another_batch_size); if num == batch_size { - assert_eq!(batch, expected_data_batch.get(total_row_count / batch_size).unwrap().clone()); + assert_eq!( + batch, + expected_data_batch + .get(total_row_count / batch_size) + .unwrap() + .clone() + ); total_row_count += batch_size; } else if num == another_batch_size { - assert_eq!(batch, expected_data_batch2.get(total_row_count / another_batch_size).unwrap().clone()); + assert_eq!( + batch, + expected_data_batch2 + .get(total_row_count / another_batch_size) + .unwrap() + .clone() + ); total_row_count += 10; // add skip offset total_row_count += selection_len; } } - fn create_skip_reader(test_file: &File, batch_size: usize, selections: Vec) -> ParquetRecordBatchReader { + fn create_skip_reader( + test_file: &File, + batch_size: usize, + selections: Vec, + ) -> ParquetRecordBatchReader { let arrow_reader_options = ArrowReaderOptions::new().with_row_selection(selections); @@ -1732,13 +1763,17 @@ mod tests { test_file.try_clone().unwrap(), arrow_reader_options, ) - .unwrap(); + .unwrap(); let skip_reader = skip_arrow_reader.get_record_reader(batch_size).unwrap(); skip_reader } - fn create_test_selection(step_len: usize, total_len: usize, skip_first: bool) -> Vec { + fn create_test_selection( + step_len: usize, + total_len: usize, + skip_first: bool, + ) -> Vec { let mut remaining = total_len; let mut skip = skip_first; let mut vec = vec![]; @@ -1761,7 +1796,8 @@ mod tests { fn create_expect_batch(test_file: &File, batch_size: usize) -> Vec { let mut serial_arrow_reader = ParquetFileArrowReader::try_new(test_file.try_clone().unwrap()).unwrap(); - let serial_reader = serial_arrow_reader.get_record_reader(batch_size).unwrap(); + let serial_reader = + serial_arrow_reader.get_record_reader(batch_size).unwrap(); let mut expected_data = vec![]; for batch in serial_reader { expected_data.push(batch.unwrap()); From 25cb93d842adda9941169791ebf4597c4aa9727c Mon Sep 17 00:00:00 2001 From: yangjiang Date: Mon, 25 Jul 2022 15:35:47 +0800 Subject: [PATCH 7/9] fix clippy --- parquet/src/arrow/arrow_reader.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs index 826de3d00c9..f03a44c511e 100644 --- a/parquet/src/arrow/arrow_reader.rs +++ b/parquet/src/arrow/arrow_reader.rs @@ -1764,9 +1764,7 @@ mod tests { arrow_reader_options, ) .unwrap(); - - let skip_reader = skip_arrow_reader.get_record_reader(batch_size).unwrap(); - skip_reader + skip_arrow_reader.get_record_reader(batch_size).unwrap() } fn create_test_selection( From 881752c0110b9291ed915030439e41817d351443 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Tue, 26 Jul 2022 19:20:50 +0800 Subject: [PATCH 8/9] fix comment. --- parquet/src/arrow/array_reader/byte_array.rs | 11 ++++++- .../array_reader/byte_array_dictionary.rs | 11 ++++++- parquet/src/arrow/array_reader/null_array.rs | 11 ++++++- .../src/arrow/array_reader/primitive_array.rs | 11 ++++++- parquet/src/arrow/arrow_reader.rs | 7 ++--- parquet/src/arrow/record_reader/mod.rs | 30 +++++++------------ 6 files changed, 54 insertions(+), 27 deletions(-) diff --git a/parquet/src/arrow/array_reader/byte_array.rs b/parquet/src/arrow/array_reader/byte_array.rs index 05e802b9719..4be5269a37b 100644 --- a/parquet/src/arrow/array_reader/byte_array.rs +++ b/parquet/src/arrow/array_reader/byte_array.rs @@ -120,7 +120,16 @@ impl ArrayReader for ByteArrayReader { } fn skip_records(&mut self, num_records: usize) -> Result { - self.record_reader.skip_records(num_records, self.pages.as_mut()) + if self.record_reader.column_reader().is_none() { + // If we skip records before all read operation + // we need set `column_reader` by `set_page_reader` + if let Some(page_reader) = self.pages.next() { + self.record_reader.set_page_reader(page_reader?)?; + } else { + return Ok(0); + } + } + self.record_reader.skip_records(num_records) } fn get_def_levels(&self) -> Option<&[i16]> { diff --git a/parquet/src/arrow/array_reader/byte_array_dictionary.rs b/parquet/src/arrow/array_reader/byte_array_dictionary.rs index c5b2aad98fb..3afbcaa61a7 100644 --- a/parquet/src/arrow/array_reader/byte_array_dictionary.rs +++ b/parquet/src/arrow/array_reader/byte_array_dictionary.rs @@ -181,7 +181,16 @@ where } fn skip_records(&mut self, num_records: usize) -> Result { - self.record_reader.skip_records(num_records, self.pages.as_mut()) + if self.record_reader.column_reader().is_none() { + // If we skip records before all read operation + // we need set `column_reader` by `set_page_reader` + if let Some(page_reader) = self.pages.next() { + self.record_reader.set_page_reader(page_reader?)?; + } else { + return Ok(0); + } + } + self.record_reader.skip_records(num_records) } fn get_def_levels(&self) -> Option<&[i16]> { diff --git a/parquet/src/arrow/array_reader/null_array.rs b/parquet/src/arrow/array_reader/null_array.rs index a25fc08a791..6d7ed1bb547 100644 --- a/parquet/src/arrow/array_reader/null_array.rs +++ b/parquet/src/arrow/array_reader/null_array.rs @@ -97,7 +97,16 @@ where } fn skip_records(&mut self, num_records: usize) -> Result { - self.record_reader.skip_records(num_records, self.pages.as_mut()) + if self.record_reader.column_reader().is_none() { + // If we skip records before all read operation + // we need set `column_reader` by `set_page_reader` + if let Some(page_reader) = self.pages.next() { + self.record_reader.set_page_reader(page_reader?)?; + } else { + return Ok(0); + } + } + self.record_reader.skip_records(num_records) } fn get_def_levels(&self) -> Option<&[i16]> { diff --git a/parquet/src/arrow/array_reader/primitive_array.rs b/parquet/src/arrow/array_reader/primitive_array.rs index fcd2fc24f7d..a3c6cc7b1b1 100644 --- a/parquet/src/arrow/array_reader/primitive_array.rs +++ b/parquet/src/arrow/array_reader/primitive_array.rs @@ -222,7 +222,16 @@ where } fn skip_records(&mut self, num_records: usize) -> Result { - self.record_reader.skip_records(num_records, self.pages.as_mut()) + if self.record_reader.column_reader().is_none() { + // If we skip records before all read operation + // we need set `column_reader` by `set_page_reader` + if let Some(page_reader) = self.pages.next() { + self.record_reader.set_page_reader(page_reader?)?; + } else { + return Ok(0); + } + } + self.record_reader.skip_records(num_records) } fn get_def_levels(&self) -> Option<&[i16]> { diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs index f03a44c511e..9dcf5d7bf8d 100644 --- a/parquet/src/arrow/arrow_reader.rs +++ b/parquet/src/arrow/arrow_reader.rs @@ -309,14 +309,13 @@ impl Iterator for ParquetRecordBatchReader { // try to read record let to_read = match front.row_count.checked_sub(self.batch_size) { - Some(remaining) => { + Some(remaining) if remaining != 0 => { // if page row count less than batch_size we must set batch size to page row count. // add check avoid dead loop - if remaining != 0 { - selection.push_front(RowSelection::select(remaining)); - } + selection.push_front(RowSelection::select(remaining)); self.batch_size } + Some(_) => self.batch_size, None => front.row_count, }; diff --git a/parquet/src/arrow/record_reader/mod.rs b/parquet/src/arrow/record_reader/mod.rs index 195671db1cd..b68f59d514f 100644 --- a/parquet/src/arrow/record_reader/mod.rs +++ b/parquet/src/arrow/record_reader/mod.rs @@ -24,7 +24,6 @@ use crate::arrow::record_reader::{ buffer::{BufferQueue, ScalarBuffer, ValuesBuffer}, definition_levels::{DefinitionLevelBuffer, DefinitionLevelBufferDecoder}, }; -use crate::column::page::PageIterator; use crate::column::{ page::PageReader, reader::{ @@ -46,6 +45,9 @@ pub(crate) const MIN_BATCH_SIZE: usize = 1024; pub type RecordReader = GenericRecordReader::T>, ColumnValueDecoderImpl>; +pub(crate) type ColumnReader = + GenericColumnReader; + /// A generic stateful column reader that delimits semantic records /// /// This type is hidden from the docs, and relies on private traits with no @@ -57,9 +59,7 @@ pub struct GenericRecordReader { records: V, def_levels: Option, rep_levels: Option>, - column_reader: Option< - GenericColumnReader, - >, + column_reader: Option>, /// Number of records accumulated in records num_records: usize, @@ -185,24 +185,11 @@ where /// # Returns /// /// Number of records skipped - pub fn skip_records( - &mut self, - num_records: usize, - pages: &mut dyn PageIterator, - ) -> Result { + pub fn skip_records(&mut self, num_records: usize) -> Result { // First need to clear the buffer let end_of_column = match self.column_reader.as_mut() { Some(reader) => !reader.has_next()?, - None => { - // If we skip records before all read operation - // we need set `column_reader` by `set_page_reader` - if let Some(page_reader) = pages.next() { - self.set_page_reader(page_reader?)?; - false - } else { - return Ok(0); - } - } + None => return Ok(0), }; let (buffered_records, buffered_values) = @@ -292,6 +279,11 @@ where .map(|levels| levels.split_bitmask(self.num_values)) } + /// Returns column reader. + pub(crate) fn column_reader(&self) -> Option<&ColumnReader> { + self.column_reader.as_ref() + } + /// Try to read one batch of data. fn read_one_batch(&mut self, batch_size: usize) -> Result { let rep_levels = self From 3f991d4b45d0b992b2142e9b67595ac2040505e9 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Wed, 27 Jul 2022 11:29:20 +0800 Subject: [PATCH 9/9] fix comment --- parquet/src/arrow/array_reader/byte_array.rs | 12 ++------- .../array_reader/byte_array_dictionary.rs | 12 ++------- parquet/src/arrow/array_reader/mod.rs | 27 +++++++++++++++++++ parquet/src/arrow/array_reader/null_array.rs | 12 ++------- .../src/arrow/array_reader/primitive_array.rs | 12 ++------- parquet/src/arrow/arrow_reader.rs | 3 +-- parquet/src/column/reader.rs | 15 +++++------ 7 files changed, 43 insertions(+), 50 deletions(-) diff --git a/parquet/src/arrow/array_reader/byte_array.rs b/parquet/src/arrow/array_reader/byte_array.rs index 4be5269a37b..a3b897aec20 100644 --- a/parquet/src/arrow/array_reader/byte_array.rs +++ b/parquet/src/arrow/array_reader/byte_array.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::arrow::array_reader::{read_records, ArrayReader}; +use crate::arrow::array_reader::{read_records, ArrayReader, set_column_reader}; use crate::arrow::buffer::offset_buffer::OffsetBuffer; use crate::arrow::record_reader::buffer::ScalarValue; use crate::arrow::record_reader::GenericRecordReader; @@ -120,15 +120,7 @@ impl ArrayReader for ByteArrayReader { } fn skip_records(&mut self, num_records: usize) -> Result { - if self.record_reader.column_reader().is_none() { - // If we skip records before all read operation - // we need set `column_reader` by `set_page_reader` - if let Some(page_reader) = self.pages.next() { - self.record_reader.set_page_reader(page_reader?)?; - } else { - return Ok(0); - } - } + set_column_reader(&mut self.record_reader, self.pages.as_mut())?; self.record_reader.skip_records(num_records) } diff --git a/parquet/src/arrow/array_reader/byte_array_dictionary.rs b/parquet/src/arrow/array_reader/byte_array_dictionary.rs index 3afbcaa61a7..eba9e578f55 100644 --- a/parquet/src/arrow/array_reader/byte_array_dictionary.rs +++ b/parquet/src/arrow/array_reader/byte_array_dictionary.rs @@ -25,7 +25,7 @@ use arrow::buffer::Buffer; use arrow::datatypes::{ArrowNativeType, DataType as ArrowType}; use crate::arrow::array_reader::byte_array::{ByteArrayDecoder, ByteArrayDecoderPlain}; -use crate::arrow::array_reader::{read_records, ArrayReader}; +use crate::arrow::array_reader::{read_records, ArrayReader, set_column_reader}; use crate::arrow::buffer::{ dictionary_buffer::DictionaryBuffer, offset_buffer::OffsetBuffer, }; @@ -181,15 +181,7 @@ where } fn skip_records(&mut self, num_records: usize) -> Result { - if self.record_reader.column_reader().is_none() { - // If we skip records before all read operation - // we need set `column_reader` by `set_page_reader` - if let Some(page_reader) = self.pages.next() { - self.record_reader.set_page_reader(page_reader?)?; - } else { - return Ok(0); - } - } + set_column_reader(&mut self.record_reader, self.pages.as_mut())?; self.record_reader.skip_records(num_records) } diff --git a/parquet/src/arrow/array_reader/mod.rs b/parquet/src/arrow/array_reader/mod.rs index e30c33bba35..a9d8cc0faa6 100644 --- a/parquet/src/arrow/array_reader/mod.rs +++ b/parquet/src/arrow/array_reader/mod.rs @@ -144,3 +144,30 @@ where } Ok(records_read) } + +/// Uses `pages` to set up to `record_reader` 's `column_reader` +/// +/// If we skip records before all read operation, +/// need set `column_reader` by `set_page_reader` +/// for constructing `def_level_decoder` and `rep_level_decoder`. +fn set_column_reader( + record_reader: &mut GenericRecordReader, + pages: &mut dyn PageIterator, +) -> Result +where + V: ValuesBuffer + Default, + CV: ColumnValueDecoder, +{ + return if record_reader.column_reader().is_none() { + // If we skip records before all read operation + // we need set `column_reader` by `set_page_reader` + if let Some(page_reader) = pages.next() { + record_reader.set_page_reader(page_reader?)?; + Ok(true) + } else { + Ok(false) + } + } else { + Ok(true) + }; +} diff --git a/parquet/src/arrow/array_reader/null_array.rs b/parquet/src/arrow/array_reader/null_array.rs index 6d7ed1bb547..a8c50b87f7e 100644 --- a/parquet/src/arrow/array_reader/null_array.rs +++ b/parquet/src/arrow/array_reader/null_array.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::arrow::array_reader::{read_records, ArrayReader}; +use crate::arrow::array_reader::{read_records, ArrayReader, set_column_reader}; use crate::arrow::record_reader::buffer::ScalarValue; use crate::arrow::record_reader::RecordReader; use crate::column::page::PageIterator; @@ -97,15 +97,7 @@ where } fn skip_records(&mut self, num_records: usize) -> Result { - if self.record_reader.column_reader().is_none() { - // If we skip records before all read operation - // we need set `column_reader` by `set_page_reader` - if let Some(page_reader) = self.pages.next() { - self.record_reader.set_page_reader(page_reader?)?; - } else { - return Ok(0); - } - } + set_column_reader(&mut self.record_reader, self.pages.as_mut())?; self.record_reader.skip_records(num_records) } diff --git a/parquet/src/arrow/array_reader/primitive_array.rs b/parquet/src/arrow/array_reader/primitive_array.rs index a3c6cc7b1b1..700b12b0a0b 100644 --- a/parquet/src/arrow/array_reader/primitive_array.rs +++ b/parquet/src/arrow/array_reader/primitive_array.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::arrow::array_reader::{read_records, ArrayReader}; +use crate::arrow::array_reader::{read_records, set_column_reader, ArrayReader}; use crate::arrow::record_reader::buffer::ScalarValue; use crate::arrow::record_reader::RecordReader; use crate::arrow::schema::parquet_to_arrow_field; @@ -222,15 +222,7 @@ where } fn skip_records(&mut self, num_records: usize) -> Result { - if self.record_reader.column_reader().is_none() { - // If we skip records before all read operation - // we need set `column_reader` by `set_page_reader` - if let Some(page_reader) = self.pages.next() { - self.record_reader.set_page_reader(page_reader?)?; - } else { - return Ok(0); - } - } + set_column_reader(&mut self.record_reader, self.pages.as_mut())?; self.record_reader.skip_records(num_records) } diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs index 9dcf5d7bf8d..b019c890af3 100644 --- a/parquet/src/arrow/arrow_reader.rs +++ b/parquet/src/arrow/arrow_reader.rs @@ -315,8 +315,7 @@ impl Iterator for ParquetRecordBatchReader { selection.push_front(RowSelection::select(remaining)); self.batch_size } - Some(_) => self.batch_size, - None => front.row_count, + _ => front.row_count, }; break to_read; diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs index ad5b7adc42d..8e0fa5a4d5a 100644 --- a/parquet/src/column/reader.rs +++ b/parquet/src/column/reader.rs @@ -299,7 +299,7 @@ where let mut remaining = num_records; while remaining != 0 { if self.num_buffered_values == self.num_decoded_values { - let mut metadata = match self.page_reader.peek_next_page()? { + let metadata = match self.page_reader.peek_next_page()? { None => return Ok(num_records - remaining), Some(metadata) => metadata, }; @@ -312,17 +312,16 @@ where // If page has less rows than the remaining records to // be skipped, skip entire page - while metadata.num_rows < remaining { + if metadata.num_rows < remaining { self.page_reader.skip_next_page()?; remaining -= metadata.num_rows; - metadata = match self.page_reader.peek_next_page()? { - None => return Ok(num_records - remaining), - Some(metadata) => metadata, - }; - } + continue; + }; // because self.num_buffered_values == self.num_decoded_values means // we need reads a new page and set up the decoders for levels - self.read_new_page()?; + if !self.read_new_page()? { + return Ok(num_records - remaining); + } } // start skip values in page level