diff --git a/parquet/src/arrow/array_reader/byte_array.rs b/parquet/src/arrow/array_reader/byte_array.rs index 60489a26b93..a3b897aec20 100644 --- a/parquet/src/arrow/array_reader/byte_array.rs +++ b/parquet/src/arrow/array_reader/byte_array.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::arrow::array_reader::{read_records, ArrayReader}; +use crate::arrow::array_reader::{read_records, ArrayReader, set_column_reader}; use crate::arrow::buffer::offset_buffer::OffsetBuffer; use crate::arrow::record_reader::buffer::ScalarValue; use crate::arrow::record_reader::GenericRecordReader; @@ -120,6 +120,7 @@ impl ArrayReader for ByteArrayReader { } fn skip_records(&mut self, num_records: usize) -> Result { + set_column_reader(&mut self.record_reader, self.pages.as_mut())?; self.record_reader.skip_records(num_records) } diff --git a/parquet/src/arrow/array_reader/byte_array_dictionary.rs b/parquet/src/arrow/array_reader/byte_array_dictionary.rs index 486dfe211e1..eba9e578f55 100644 --- a/parquet/src/arrow/array_reader/byte_array_dictionary.rs +++ b/parquet/src/arrow/array_reader/byte_array_dictionary.rs @@ -25,7 +25,7 @@ use arrow::buffer::Buffer; use arrow::datatypes::{ArrowNativeType, DataType as ArrowType}; use crate::arrow::array_reader::byte_array::{ByteArrayDecoder, ByteArrayDecoderPlain}; -use crate::arrow::array_reader::{read_records, ArrayReader}; +use crate::arrow::array_reader::{read_records, ArrayReader, set_column_reader}; use crate::arrow::buffer::{ dictionary_buffer::DictionaryBuffer, offset_buffer::OffsetBuffer, }; @@ -181,6 +181,7 @@ where } fn skip_records(&mut self, num_records: usize) -> Result { + set_column_reader(&mut self.record_reader, self.pages.as_mut())?; self.record_reader.skip_records(num_records) } diff --git a/parquet/src/arrow/array_reader/complex_object_array.rs b/parquet/src/arrow/array_reader/complex_object_array.rs index 6e7585ff944..1390866cf6a 100644 --- a/parquet/src/arrow/array_reader/complex_object_array.rs +++ b/parquet/src/arrow/array_reader/complex_object_array.rs @@ -166,7 +166,13 @@ where fn skip_records(&mut self, num_records: usize) -> Result { match self.column_reader.as_mut() { Some(reader) => reader.skip_records(num_records), - None => Ok(0), + None => { + if self.next_column_reader()? { + self.column_reader.as_mut().unwrap().skip_records(num_records) + }else { + Ok(0) + } + } } } diff --git a/parquet/src/arrow/array_reader/mod.rs b/parquet/src/arrow/array_reader/mod.rs index e30c33bba35..a9d8cc0faa6 100644 --- a/parquet/src/arrow/array_reader/mod.rs +++ b/parquet/src/arrow/array_reader/mod.rs @@ -144,3 +144,30 @@ where } Ok(records_read) } + +/// Uses `pages` to set up to `record_reader` 's `column_reader` +/// +/// If we skip records before all read operation, +/// need set `column_reader` by `set_page_reader` +/// for constructing `def_level_decoder` and `rep_level_decoder`. +fn set_column_reader( + record_reader: &mut GenericRecordReader, + pages: &mut dyn PageIterator, +) -> Result +where + V: ValuesBuffer + Default, + CV: ColumnValueDecoder, +{ + return if record_reader.column_reader().is_none() { + // If we skip records before all read operation + // we need set `column_reader` by `set_page_reader` + if let Some(page_reader) = pages.next() { + record_reader.set_page_reader(page_reader?)?; + Ok(true) + } else { + Ok(false) + } + } else { + Ok(true) + }; +} diff --git a/parquet/src/arrow/array_reader/null_array.rs b/parquet/src/arrow/array_reader/null_array.rs index b207d8b2c56..a8c50b87f7e 100644 --- a/parquet/src/arrow/array_reader/null_array.rs +++ b/parquet/src/arrow/array_reader/null_array.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::arrow::array_reader::{read_records, ArrayReader}; +use crate::arrow::array_reader::{read_records, ArrayReader, set_column_reader}; use crate::arrow::record_reader::buffer::ScalarValue; use crate::arrow::record_reader::RecordReader; use crate::column::page::PageIterator; @@ -97,6 +97,7 @@ where } fn skip_records(&mut self, num_records: usize) -> Result { + set_column_reader(&mut self.record_reader, self.pages.as_mut())?; self.record_reader.skip_records(num_records) } diff --git a/parquet/src/arrow/array_reader/primitive_array.rs b/parquet/src/arrow/array_reader/primitive_array.rs index 0488e254c90..700b12b0a0b 100644 --- a/parquet/src/arrow/array_reader/primitive_array.rs +++ b/parquet/src/arrow/array_reader/primitive_array.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::arrow::array_reader::{read_records, ArrayReader}; +use crate::arrow::array_reader::{read_records, set_column_reader, ArrayReader}; use crate::arrow::record_reader::buffer::ScalarValue; use crate::arrow::record_reader::RecordReader; use crate::arrow::schema::parquet_to_arrow_field; @@ -222,6 +222,7 @@ where } fn skip_records(&mut self, num_records: usize) -> Result { + set_column_reader(&mut self.record_reader, self.pages.as_mut())?; self.record_reader.skip_records(num_records) } diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs index 770477b025c..b019c890af3 100644 --- a/parquet/src/arrow/arrow_reader.rs +++ b/parquet/src/arrow/arrow_reader.rs @@ -33,6 +33,7 @@ use crate::arrow::ProjectionMask; use crate::errors::{ParquetError, Result}; use crate::file::metadata::{KeyValue, ParquetMetaData}; use crate::file::reader::{ChunkReader, FileReader, SerializedFileReader}; +use crate::file::serialized_reader::ReadOptionsBuilder; use crate::schema::types::SchemaDescriptor; /// Arrow reader api. @@ -217,7 +218,15 @@ impl ParquetFileArrowReader { chunk_reader: R, options: ArrowReaderOptions, ) -> Result { - let file_reader = Arc::new(SerializedFileReader::new(chunk_reader)?); + let file_reader = if options.selection.is_some() { + let options = ReadOptionsBuilder::new().with_page_index().build(); + Arc::new(SerializedFileReader::new_with_options( + chunk_reader, + options, + )?) + } else { + Arc::new(SerializedFileReader::new(chunk_reader)?) + }; Ok(Self::new_with_options(file_reader, options)) } @@ -298,12 +307,15 @@ impl Iterator for ParquetRecordBatchReader { continue; } + // try to read record let to_read = match front.row_count.checked_sub(self.batch_size) { - Some(remaining) => { - selection.push_front(RowSelection::skip(remaining)); + Some(remaining) if remaining != 0 => { + // if page row count less than batch_size we must set batch size to page row count. + // add check avoid dead loop + selection.push_front(RowSelection::select(remaining)); self.batch_size } - None => front.row_count, + _ => front.row_count, }; break to_read; @@ -390,6 +402,7 @@ mod tests { use crate::arrow::arrow_reader::{ ArrowReader, ArrowReaderOptions, ParquetFileArrowReader, + ParquetRecordBatchReader, RowSelection, }; use crate::arrow::buffer::converter::{ BinaryArrayConverter, Converter, FixedSizeArrayConverter, FromConverter, @@ -1586,4 +1599,206 @@ mod tests { test_row_group_batch(MIN_BATCH_SIZE, MIN_BATCH_SIZE - 1); test_row_group_batch(MIN_BATCH_SIZE - 1, MIN_BATCH_SIZE); } + + #[test] + fn test_scan_row_with_selection() { + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{}/alltypes_tiny_pages_plain.parquet", testdata); + let test_file = File::open(&path).unwrap(); + + // total row count 7300 + // 1. test selection len more than one page row count + let batch_size = 1000; + let expected_data = create_expect_batch(&test_file, batch_size); + + let selections = create_test_selection(batch_size, 7300, false); + let skip_reader = create_skip_reader(&test_file, batch_size, selections); + let mut total_row_count = 0; + let mut index = 0; + for batch in skip_reader { + let batch = batch.unwrap(); + assert_eq!(batch, expected_data.get(index).unwrap().clone()); + index += 2; + let num = batch.num_rows(); + assert!(num == batch_size || num == 300); + total_row_count += num; + } + assert_eq!(total_row_count, 4000); + + let selections = create_test_selection(batch_size, 7300, true); + let skip_reader = create_skip_reader(&test_file, batch_size, selections); + let mut total_row_count = 0; + let mut index = 1; + for batch in skip_reader { + let batch = batch.unwrap(); + assert_eq!(batch, expected_data.get(index).unwrap().clone()); + index += 2; + let num = batch.num_rows(); + //the lase batch will be 300 + assert!(num == batch_size || num == 300); + total_row_count += num; + } + assert_eq!(total_row_count, 3300); + + // 2. test selection len less than one page row count + let batch_size = 20; + let expected_data = create_expect_batch(&test_file, batch_size); + let selections = create_test_selection(batch_size, 7300, false); + + let skip_reader = create_skip_reader(&test_file, batch_size, selections); + let mut total_row_count = 0; + let mut index = 0; + for batch in skip_reader { + let batch = batch.unwrap(); + assert_eq!(batch, expected_data.get(index).unwrap().clone()); + index += 2; + let num = batch.num_rows(); + assert_eq!(num, batch_size); + total_row_count += num; + } + assert_eq!(total_row_count, 3660); + + let selections = create_test_selection(batch_size, 7300, true); + let skip_reader = create_skip_reader(&test_file, batch_size, selections); + let mut total_row_count = 0; + let mut index = 1; + for batch in skip_reader { + let batch = batch.unwrap(); + assert_eq!(batch, expected_data.get(index).unwrap().clone()); + index += 2; + let num = batch.num_rows(); + assert_eq!(num, batch_size); + total_row_count += num; + } + assert_eq!(total_row_count, 3640); + + // 3. test selection_len less than batch_size + let batch_size = 20; + let selection_len = 5; + let expected_data_batch = create_expect_batch(&test_file, batch_size); + let expected_data_selection = create_expect_batch(&test_file, selection_len); + let selections = create_test_selection(selection_len, 7300, false); + let skip_reader = create_skip_reader(&test_file, batch_size, selections); + + let mut total_row_count = 0; + + for batch in skip_reader { + let batch = batch.unwrap(); + let num = batch.num_rows(); + assert!(num == batch_size || num == selection_len); + if num == batch_size { + assert_eq!( + batch, + expected_data_batch + .get(total_row_count / batch_size) + .unwrap() + .clone() + ); + total_row_count += batch_size; + } else if num == selection_len { + assert_eq!( + batch, + expected_data_selection + .get(total_row_count / selection_len) + .unwrap() + .clone() + ); + total_row_count += selection_len; + } + // add skip offset + total_row_count += selection_len; + } + + // 4. test selection_len more than batch_size + // If batch_size < selection_len will divide selection(50, read) -> + // selection(20, read), selection(20, read), selection(10, read) + let batch_size = 20; + let selection_len = 50; + let another_batch_size = 10; + let expected_data_batch = create_expect_batch(&test_file, batch_size); + let expected_data_batch2 = create_expect_batch(&test_file, another_batch_size); + let selections = create_test_selection(selection_len, 7300, false); + let skip_reader = create_skip_reader(&test_file, batch_size, selections); + + let mut total_row_count = 0; + + for batch in skip_reader { + let batch = batch.unwrap(); + let num = batch.num_rows(); + assert!(num == batch_size || num == another_batch_size); + if num == batch_size { + assert_eq!( + batch, + expected_data_batch + .get(total_row_count / batch_size) + .unwrap() + .clone() + ); + total_row_count += batch_size; + } else if num == another_batch_size { + assert_eq!( + batch, + expected_data_batch2 + .get(total_row_count / another_batch_size) + .unwrap() + .clone() + ); + total_row_count += 10; + // add skip offset + total_row_count += selection_len; + } + } + + fn create_skip_reader( + test_file: &File, + batch_size: usize, + selections: Vec, + ) -> ParquetRecordBatchReader { + let arrow_reader_options = + ArrowReaderOptions::new().with_row_selection(selections); + + let mut skip_arrow_reader = ParquetFileArrowReader::try_new_with_options( + test_file.try_clone().unwrap(), + arrow_reader_options, + ) + .unwrap(); + skip_arrow_reader.get_record_reader(batch_size).unwrap() + } + + fn create_test_selection( + step_len: usize, + total_len: usize, + skip_first: bool, + ) -> Vec { + let mut remaining = total_len; + let mut skip = skip_first; + let mut vec = vec![]; + while remaining != 0 { + let step = if remaining > step_len { + step_len + } else { + remaining + }; + vec.push(RowSelection { + row_count: step, + skip, + }); + remaining -= step; + skip = !skip; + } + vec + } + + fn create_expect_batch(test_file: &File, batch_size: usize) -> Vec { + let mut serial_arrow_reader = + ParquetFileArrowReader::try_new(test_file.try_clone().unwrap()).unwrap(); + let serial_reader = + serial_arrow_reader.get_record_reader(batch_size).unwrap(); + let mut expected_data = vec![]; + for batch in serial_reader { + expected_data.push(batch.unwrap()); + } + expected_data + } + } } diff --git a/parquet/src/arrow/record_reader/mod.rs b/parquet/src/arrow/record_reader/mod.rs index 04499997e83..b68f59d514f 100644 --- a/parquet/src/arrow/record_reader/mod.rs +++ b/parquet/src/arrow/record_reader/mod.rs @@ -45,6 +45,9 @@ pub(crate) const MIN_BATCH_SIZE: usize = 1024; pub type RecordReader = GenericRecordReader::T>, ColumnValueDecoderImpl>; +pub(crate) type ColumnReader = + GenericColumnReader; + /// A generic stateful column reader that delimits semantic records /// /// This type is hidden from the docs, and relies on private traits with no @@ -56,9 +59,7 @@ pub struct GenericRecordReader { records: V, def_levels: Option, rep_levels: Option>, - column_reader: Option< - GenericColumnReader, - >, + column_reader: Option>, /// Number of records accumulated in records num_records: usize, @@ -278,6 +279,11 @@ where .map(|levels| levels.split_bitmask(self.num_values)) } + /// Returns column reader. + pub(crate) fn column_reader(&self) -> Option<&ColumnReader> { + self.column_reader.as_ref() + } + /// Try to read one batch of data. fn read_one_batch(&mut self, batch_size: usize) -> Result { let rep_levels = self diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs index ea00bcf1ba8..8e0fa5a4d5a 100644 --- a/parquet/src/column/reader.rs +++ b/parquet/src/column/reader.rs @@ -316,9 +316,15 @@ where self.page_reader.skip_next_page()?; remaining -= metadata.num_rows; continue; + }; + // because self.num_buffered_values == self.num_decoded_values means + // we need reads a new page and set up the decoders for levels + if !self.read_new_page()? { + return Ok(num_records - remaining); } } + // start skip values in page level let to_read = remaining .min((self.num_buffered_values - self.num_decoded_values) as usize);