From 732487340141a010b3d6a73be70d23d57a1b91e6 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 1 Jul 2022 19:55:37 +0100 Subject: [PATCH 1/5] Stub API for parquet record skipping --- arrow-flight/src/sql/google.protobuf.rs | 0 parquet/src/arrow/array_reader/byte_array.rs | 8 ++ .../array_reader/byte_array_dictionary.rs | 8 ++ .../array_reader/complex_object_array.rs | 7 ++ parquet/src/arrow/array_reader/empty_array.rs | 6 + parquet/src/arrow/array_reader/list_array.rs | 4 + parquet/src/arrow/array_reader/map_array.rs | 13 +++ parquet/src/arrow/array_reader/mod.rs | 3 + parquet/src/arrow/array_reader/null_array.rs | 4 + .../src/arrow/array_reader/primitive_array.rs | 4 + .../src/arrow/array_reader/struct_array.rs | 20 ++++ parquet/src/arrow/array_reader/test_util.rs | 5 + parquet/src/arrow/arrow_reader.rs | 108 +++++++++++++++++- parquet/src/arrow/async_reader.rs | 10 +- .../arrow/record_reader/definition_levels.rs | 16 ++- parquet/src/arrow/record_reader/mod.rs | 38 +++++- parquet/src/column/page.rs | 17 +++ parquet/src/column/reader.rs | 73 +++++++++++- parquet/src/column/reader/decoder.rs | 44 +++++++ parquet/src/file/serialized_reader.rs | 10 +- parquet/src/util/test_common/page_util.rs | 10 +- 21 files changed, 391 insertions(+), 17 deletions(-) create mode 100644 arrow-flight/src/sql/google.protobuf.rs diff --git a/arrow-flight/src/sql/google.protobuf.rs b/arrow-flight/src/sql/google.protobuf.rs new file mode 100644 index 00000000000..e69de29bb2d diff --git a/parquet/src/arrow/array_reader/byte_array.rs b/parquet/src/arrow/array_reader/byte_array.rs index 95620d940b7..6907eef798e 100644 --- a/parquet/src/arrow/array_reader/byte_array.rs +++ b/parquet/src/arrow/array_reader/byte_array.rs @@ -122,6 +122,10 @@ impl ArrayReader for ByteArrayReader { Ok(buffer.into_array(null_buffer, self.data_type.clone())) } + fn skip_records(&mut self, num_records: usize) -> Result { + self.record_reader.skip_records(num_records) + } + fn get_def_levels(&self) -> Option<&[i16]> { self.def_levels_buffer .as_ref() @@ -210,6 +214,10 @@ impl ColumnValueDecoder decoder.read(out, range.end - range.start, self.dict.as_ref()) } + + fn skip_values(&mut self, _num_values: usize) -> Result { + todo!() + } } /// A generic decoder from uncompressed parquet value data to [`OffsetBuffer`] diff --git a/parquet/src/arrow/array_reader/byte_array_dictionary.rs b/parquet/src/arrow/array_reader/byte_array_dictionary.rs index 77f7916ed58..d13fd78bea9 100644 --- a/parquet/src/arrow/array_reader/byte_array_dictionary.rs +++ b/parquet/src/arrow/array_reader/byte_array_dictionary.rs @@ -184,6 +184,10 @@ where Ok(array) } + fn skip_records(&mut self, num_records: usize) -> Result { + self.record_reader.skip_records(num_records) + } + fn get_def_levels(&self) -> Option<&[i16]> { self.def_levels_buffer .as_ref() @@ -371,6 +375,10 @@ where } } } + + fn skip_values(&mut self, _num_values: usize) -> Result { + todo!() + } } #[cfg(test)] diff --git a/parquet/src/arrow/array_reader/complex_object_array.rs b/parquet/src/arrow/array_reader/complex_object_array.rs index b91fde5c427..6e7585ff944 100644 --- a/parquet/src/arrow/array_reader/complex_object_array.rs +++ b/parquet/src/arrow/array_reader/complex_object_array.rs @@ -163,6 +163,13 @@ where Ok(array) } + fn skip_records(&mut self, num_records: usize) -> Result { + match self.column_reader.as_mut() { + Some(reader) => reader.skip_records(num_records), + None => Ok(0), + } + } + fn get_def_levels(&self) -> Option<&[i16]> { self.def_levels_buffer.as_deref() } diff --git a/parquet/src/arrow/array_reader/empty_array.rs b/parquet/src/arrow/array_reader/empty_array.rs index 54b77becba0..b06646cc1c6 100644 --- a/parquet/src/arrow/array_reader/empty_array.rs +++ b/parquet/src/arrow/array_reader/empty_array.rs @@ -65,6 +65,12 @@ impl ArrayReader for EmptyArrayReader { Ok(Arc::new(StructArray::from(data))) } + fn skip_records(&mut self, num_records: usize) -> Result { + let skipped = self.remaining_rows.min(num_records); + self.remaining_rows -= skipped; + Ok(skipped) + } + fn get_def_levels(&self) -> Option<&[i16]> { None } diff --git a/parquet/src/arrow/array_reader/list_array.rs b/parquet/src/arrow/array_reader/list_array.rs index e1cd71b9e1f..3d612facdd6 100644 --- a/parquet/src/arrow/array_reader/list_array.rs +++ b/parquet/src/arrow/array_reader/list_array.rs @@ -231,6 +231,10 @@ impl ArrayReader for ListArrayReader { Ok(Arc::new(result_array)) } + fn skip_records(&mut self, num_records: usize) -> Result { + self.item_reader.skip_records(num_records) + } + fn get_def_levels(&self) -> Option<&[i16]> { self.item_reader.get_def_levels() } diff --git a/parquet/src/arrow/array_reader/map_array.rs b/parquet/src/arrow/array_reader/map_array.rs index 92487ebbcdb..00c3db41a37 100644 --- a/parquet/src/arrow/array_reader/map_array.rs +++ b/parquet/src/arrow/array_reader/map_array.rs @@ -149,6 +149,19 @@ impl ArrayReader for MapArrayReader { Ok(Arc::new(MapArray::from(array_data))) } + fn skip_records(&mut self, num_records: usize) -> Result { + let key_skipped = self.key_reader.skip_records(num_records)?; + let value_skipped = self.value_reader.skip_records(num_records)?; + if key_skipped != value_skipped { + return Err(general_err!( + "MapArrayReader out of sync, skipped {} keys and {} values", + key_skipped, + value_skipped + )); + } + Ok(key_skipped) + } + fn get_def_levels(&self) -> Option<&[i16]> { // Children definition levels should describe the same parent structure, // so return key_reader only diff --git a/parquet/src/arrow/array_reader/mod.rs b/parquet/src/arrow/array_reader/mod.rs index dd65a3626b7..e30c33bba35 100644 --- a/parquet/src/arrow/array_reader/mod.rs +++ b/parquet/src/arrow/array_reader/mod.rs @@ -64,6 +64,9 @@ pub trait ArrayReader: Send { /// Reads at most `batch_size` records into an arrow array and return it. fn next_batch(&mut self, batch_size: usize) -> Result; + /// Skips over `num_records` records, returning the number of rows skipped + fn skip_records(&mut self, num_records: usize) -> Result; + /// If this array has a non-zero definition level, i.e. has a nullable parent /// array, returns the definition levels of data from the last call of `next_batch` /// diff --git a/parquet/src/arrow/array_reader/null_array.rs b/parquet/src/arrow/array_reader/null_array.rs index 4b592025d6a..b207d8b2c56 100644 --- a/parquet/src/arrow/array_reader/null_array.rs +++ b/parquet/src/arrow/array_reader/null_array.rs @@ -96,6 +96,10 @@ where Ok(Arc::new(array)) } + fn skip_records(&mut self, num_records: usize) -> Result { + self.record_reader.skip_records(num_records) + } + fn get_def_levels(&self) -> Option<&[i16]> { self.def_levels_buffer.as_ref().map(|buf| buf.typed_data()) } diff --git a/parquet/src/arrow/array_reader/primitive_array.rs b/parquet/src/arrow/array_reader/primitive_array.rs index c25df89a681..cb41d1fba9c 100644 --- a/parquet/src/arrow/array_reader/primitive_array.rs +++ b/parquet/src/arrow/array_reader/primitive_array.rs @@ -233,6 +233,10 @@ where Ok(array) } + fn skip_records(&mut self, num_records: usize) -> Result { + self.record_reader.skip_records(num_records) + } + fn get_def_levels(&self) -> Option<&[i16]> { self.def_levels_buffer.as_ref().map(|buf| buf.typed_data()) } diff --git a/parquet/src/arrow/array_reader/struct_array.rs b/parquet/src/arrow/array_reader/struct_array.rs index 30824d74203..602c598f826 100644 --- a/parquet/src/arrow/array_reader/struct_array.rs +++ b/parquet/src/arrow/array_reader/struct_array.rs @@ -157,6 +157,26 @@ impl ArrayReader for StructArrayReader { Ok(Arc::new(StructArray::from(array_data))) } + fn skip_records(&mut self, num_records: usize) -> Result { + let mut skipped = None; + for child in self.children.iter_mut() { + let child_skipped = child.skip_records(num_records)?; + match skipped { + Some(expected) => { + if expected != child_skipped { + return Err(general_err!( + "StructArrayReader out of sync, expected {} skipped, got {}", + expected, + child_skipped + )); + } + } + None => skipped = Some(child_skipped), + } + } + Ok(skipped.unwrap_or(0)) + } + fn get_def_levels(&self) -> Option<&[i16]> { // Children definition levels should describe the same // parent structure, so return first child's diff --git a/parquet/src/arrow/array_reader/test_util.rs b/parquet/src/arrow/array_reader/test_util.rs index 0c044eb2df6..04c0f6c68f3 100644 --- a/parquet/src/arrow/array_reader/test_util.rs +++ b/parquet/src/arrow/array_reader/test_util.rs @@ -170,6 +170,11 @@ impl ArrayReader for InMemoryArrayReader { Ok(self.array.slice(self.last_idx, read)) } + fn skip_records(&mut self, num_records: usize) -> Result { + let array = self.next_batch(num_records)?; + Ok(array.len()) + } + fn get_def_levels(&self) -> Option<&[i16]> { self.def_levels .as_ref() diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs index c5d1f66e5bf..39adf7c1e7d 100644 --- a/parquet/src/arrow/arrow_reader.rs +++ b/parquet/src/arrow/arrow_reader.rs @@ -17,6 +17,7 @@ //! Contains reader which reads parquet data into arrow [`RecordBatch`] +use std::collections::VecDeque; use std::sync::Arc; use arrow::array::Array; @@ -29,7 +30,7 @@ use crate::arrow::array_reader::{build_array_reader, ArrayReader}; use crate::arrow::schema::parquet_to_arrow_schema; use crate::arrow::schema::parquet_to_arrow_schema_by_columns; use crate::arrow::ProjectionMask; -use crate::errors::Result; +use crate::errors::{ParquetError, Result}; use crate::file::metadata::{KeyValue, ParquetMetaData}; use crate::file::reader::{ChunkReader, FileReader, SerializedFileReader}; use crate::schema::types::SchemaDescriptor; @@ -70,9 +71,39 @@ pub trait ArrowReader { ) -> Result; } +/// [`RowSelection`] allows selecting or skipping a provided number of rows +/// when scanning the parquet file +#[derive(Debug, Clone, Copy)] +pub(crate) struct RowSelection { + /// The number of rows + pub row_count: usize, + + /// If true, skip `row_count` rows + pub skip: bool, +} + +impl RowSelection { + /// Select `row_count` rows + pub fn select(row_count: usize) -> Self { + Self { + row_count, + skip: false, + } + } + + /// Skip `row_count` rows + pub fn skip(row_count: usize) -> Self { + Self { + row_count, + skip: true, + } + } +} + #[derive(Debug, Clone, Default)] pub struct ArrowReaderOptions { skip_arrow_metadata: bool, + selection: Option>, } impl ArrowReaderOptions { @@ -90,6 +121,20 @@ impl ArrowReaderOptions { pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self { Self { skip_arrow_metadata, + ..self + } + } + + /// Scan rows from the parquet file according to the provided `selection` + /// + /// TODO: Make public once row selection fully implemented + pub(crate) fn with_row_selection( + self, + selection: impl Into>, + ) -> Self { + Self { + selection: Some(selection.into()), + ..self } } } @@ -139,7 +184,12 @@ impl ArrowReader for ParquetFileArrowReader { Box::new(self.file_reader.clone()), )?; - ParquetRecordBatchReader::try_new(batch_size, array_reader) + let selection = self.options.selection.clone().map(Into::into); + Ok(ParquetRecordBatchReader::new( + batch_size, + array_reader, + selection, + )) } } @@ -221,13 +271,47 @@ pub struct ParquetRecordBatchReader { batch_size: usize, array_reader: Box, schema: SchemaRef, + selection: Option>, } impl Iterator for ParquetRecordBatchReader { type Item = ArrowResult; fn next(&mut self) -> Option { - match self.array_reader.next_batch(self.batch_size) { + let to_read = match self.selection.as_mut() { + Some(selection) => loop { + let front = selection.pop_front()?; + if front.skip { + let skipped = match self.array_reader.skip_records(front.row_count) { + Ok(skipped) => skipped, + Err(e) => return Some(Err(e.into())), + }; + + if skipped != front.row_count { + return Some(Err(general_err!( + "failed to skip rows, expected {}, got {}", + front.row_count, + skipped + ) + .into())); + } + continue; + } + + let to_read = match front.row_count.checked_sub(self.batch_size) { + Some(remaining) => { + selection.push_front(RowSelection::skip(remaining)); + self.batch_size + } + None => front.row_count, + }; + + break to_read; + }, + None => self.batch_size, + }; + + match self.array_reader.next_batch(to_read) { Err(error) => Some(Err(error.into())), Ok(array) => { let struct_array = @@ -257,16 +341,30 @@ impl ParquetRecordBatchReader { batch_size: usize, array_reader: Box, ) -> Result { + Ok(Self::new(batch_size, array_reader, None)) + } + + /// Create a new [`ParquetRecordBatchReader`] that will read at most `batch_size` rows at + /// a time from [`ArrayReader`] based on the configured `selection`. If `selection` is `None` + /// all rows will be returned + /// + /// TODO: Make public once row selection fully implemented + pub(crate) fn new( + batch_size: usize, + array_reader: Box, + selection: Option>, + ) -> Self { let schema = match array_reader.get_data_type() { ArrowType::Struct(ref fields) => Schema::new(fields.clone()), _ => unreachable!("Struct array reader's data type is not struct!"), }; - Ok(Self { + Self { batch_size, array_reader, schema: Arc::new(schema), - }) + selection, + } } } diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs index 2400a0b6a32..262bd4f2c9d 100644 --- a/parquet/src/arrow/async_reader.rs +++ b/parquet/src/arrow/async_reader.rs @@ -97,7 +97,7 @@ use crate::arrow::arrow_reader::ParquetRecordBatchReader; use crate::arrow::schema::parquet_to_arrow_schema; use crate::arrow::ProjectionMask; use crate::basic::Compression; -use crate::column::page::{Page, PageIterator, PageReader}; +use crate::column::page::{Page, PageIterator, PageMetadata, PageReader}; use crate::compression::{create_codec, Codec}; use crate::errors::{ParquetError, Result}; use crate::file::footer::{decode_footer, decode_metadata}; @@ -551,6 +551,14 @@ impl PageReader for InMemoryColumnChunkReader { // We are at the end of this column chunk and no more page left. Return None. Ok(None) } + + fn peek_next_page(&self) -> Result> { + todo!() + } + + fn skip_next_page(&mut self) -> Result<()> { + todo!() + } } /// Implements [`PageIterator`] for a single column chunk, yielding a single [`PageReader`] diff --git a/parquet/src/arrow/record_reader/definition_levels.rs b/parquet/src/arrow/record_reader/definition_levels.rs index 9cca25c8ae5..d2276990f05 100644 --- a/parquet/src/arrow/record_reader/definition_levels.rs +++ b/parquet/src/arrow/record_reader/definition_levels.rs @@ -25,7 +25,7 @@ use crate::arrow::buffer::bit_util::count_set_bits; use crate::arrow::record_reader::buffer::BufferQueue; use crate::basic::Encoding; use crate::column::reader::decoder::{ - ColumnLevelDecoder, ColumnLevelDecoderImpl, LevelsBufferSlice, + ColumnLevelDecoder, ColumnLevelDecoderImpl, DefinitionLevelDecoder, LevelsBufferSlice, }; use crate::errors::{ParquetError, Result}; use crate::schema::types::ColumnDescPtr; @@ -146,7 +146,7 @@ impl LevelsBufferSlice for DefinitionLevelBuffer { } } -pub struct DefinitionLevelDecoder { +pub struct DefinitionLevelBufferDecoder { max_level: i16, encoding: Encoding, data: Option, @@ -154,7 +154,7 @@ pub struct DefinitionLevelDecoder { packed_decoder: Option, } -impl ColumnLevelDecoder for DefinitionLevelDecoder { +impl ColumnLevelDecoder for DefinitionLevelBufferDecoder { type Slice = DefinitionLevelBuffer; fn new(max_level: i16, encoding: Encoding, data: ByteBufferPtr) -> Self { @@ -223,6 +223,16 @@ impl ColumnLevelDecoder for DefinitionLevelDecoder { } } +impl DefinitionLevelDecoder for DefinitionLevelBufferDecoder { + fn skip_def_levels( + &mut self, + _num_levels: usize, + _max_def_level: i16, + ) -> Result<(usize, usize)> { + todo!() + } +} + /// An optimized decoder for decoding [RLE] and [BIT_PACKED] data with a bit width of 1 /// directly into a bitmask /// diff --git a/parquet/src/arrow/record_reader/mod.rs b/parquet/src/arrow/record_reader/mod.rs index af75dbb4951..89e4e1c3e31 100644 --- a/parquet/src/arrow/record_reader/mod.rs +++ b/parquet/src/arrow/record_reader/mod.rs @@ -22,7 +22,7 @@ use arrow::buffer::Buffer; use crate::arrow::record_reader::{ buffer::{BufferQueue, ScalarBuffer, ValuesBuffer}, - definition_levels::{DefinitionLevelBuffer, DefinitionLevelDecoder}, + definition_levels::{DefinitionLevelBuffer, DefinitionLevelBufferDecoder}, }; use crate::column::{ page::PageReader, @@ -56,8 +56,9 @@ pub struct GenericRecordReader { records: V, def_levels: Option, rep_levels: Option>, - column_reader: - Option>, + column_reader: Option< + GenericColumnReader, + >, /// Number of records accumulated in records num_records: usize, @@ -202,6 +203,37 @@ where Ok(records_read) } + /// Try to skip the next `num_records` rows + /// + /// # Returns + /// + /// Number of records skipped + pub fn skip_records(&mut self, num_records: usize) -> Result { + // First need to clear the buffer + let (buffered_records, buffered_values) = self.count_records(num_records); + self.num_records += buffered_records; + self.num_values += buffered_values; + + self.consume_def_levels(); + self.consume_rep_levels(); + self.consume_record_data(); + self.consume_bitmap(); + self.reset(); + + let remaining = buffered_records - num_records; + + if remaining == 0 { + return Ok(buffered_records); + } + + let skipped = match self.column_reader.as_mut() { + Some(column_reader) => column_reader.skip_records(remaining)?, + None => 0, + }; + + Ok(skipped + buffered_records) + } + /// Returns number of records stored in buffer. pub fn num_records(&self) -> usize { self.num_records diff --git a/parquet/src/column/page.rs b/parquet/src/column/page.rs index 9364bd30fff..d667af7120a 100644 --- a/parquet/src/column/page.rs +++ b/parquet/src/column/page.rs @@ -187,12 +187,29 @@ impl PageWriteSpec { } } +/// Contains metadata for a page +pub struct PageMetadata { + /// The number of rows in this page + pub num_rows: usize, + + /// Returns true if the page is a dictionary page + pub is_dict: bool, +} + /// API for reading pages from a column chunk. /// This offers a iterator like API to get the next page. pub trait PageReader: Iterator> + Send { /// Gets the next page in the column chunk associated with this reader. /// Returns `None` if there are no pages left. fn get_next_page(&mut self) -> Result>; + + /// Gets metadata about the next page, returns an error if no + /// column index information + fn peek_next_page(&self) -> Result>; + + /// Skips reading the next page, returns an error if no + /// column index information + fn skip_next_page(&mut self) -> Result<()>; } /// API for writing pages in a column chunk. diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs index a97787ccfa5..35e725b1959 100644 --- a/parquet/src/column/reader.rs +++ b/parquet/src/column/reader.rs @@ -22,7 +22,8 @@ use std::cmp::min; use super::page::{Page, PageReader}; use crate::basic::*; use crate::column::reader::decoder::{ - ColumnLevelDecoder, ColumnValueDecoder, LevelsBufferSlice, ValuesBufferSlice, + ColumnValueDecoder, DefinitionLevelDecoder, LevelsBufferSlice, + RepetitionLevelDecoder, ValuesBufferSlice, }; use crate::data_type::*; use crate::errors::{ParquetError, Result}; @@ -137,8 +138,8 @@ pub struct GenericColumnReader { impl GenericColumnReader where - R: ColumnLevelDecoder, - D: ColumnLevelDecoder, + R: RepetitionLevelDecoder, + D: DefinitionLevelDecoder, V: ColumnValueDecoder, { /// Creates new column reader based on column descriptor and page reader. @@ -271,6 +272,72 @@ where Ok((values_read, levels_read)) } + /// Skips over `num_records` records, where records are delimited by repetition levels of 0 + /// + /// # Returns + /// + /// Returns the number of records skipped + pub fn skip_records(&mut self, num_records: usize) -> Result { + let mut remaining = num_records; + while remaining != 0 { + if self.num_buffered_values == self.num_decoded_values { + let metadata = match self.page_reader.peek_next_page()? { + None => return Ok(num_records - remaining), + Some(metadata) => metadata, + }; + + // If dictionary, we must read it + if metadata.is_dict { + self.read_new_page()?; + continue; + } + + // If page has less rows than the remaining records to + // be skipped, skip entire page + if metadata.num_rows < remaining { + self.page_reader.skip_next_page()?; + remaining -= metadata.num_rows; + continue; + } + } + + let to_read = remaining + .min((self.num_buffered_values - self.num_decoded_values) as usize); + + let (records_read, rep_levels_read) = match self.rep_level_decoder.as_mut() { + Some(decoder) => decoder.skip_rep_levels(to_read)?, + None => (to_read, to_read), + }; + + let (values_read, def_levels_read) = match self.def_level_decoder.as_mut() { + Some(decoder) => decoder + .skip_def_levels(rep_levels_read, self.descr.max_def_level())?, + None => (rep_levels_read, rep_levels_read), + }; + + if rep_levels_read != def_levels_read { + return Err(general_err!( + "levels mismatch, read {} repetition levels and {} definition levels", + rep_levels_read, + def_levels_read + )); + } + + let values = self.values_decoder.skip_values(values_read)?; + if values != values_read { + return Err(general_err!( + "skipped {} values, expected {}", + values, + values_read + )); + } + + self.num_decoded_values += rep_levels_read as u32; + remaining -= records_read; + } + Ok(num_records - remaining) + } + /// Reads a new page and set up the decoders for levels, values or dictionary. /// Returns false if there's no page left. fn read_new_page(&mut self) -> Result { diff --git a/parquet/src/column/reader/decoder.rs b/parquet/src/column/reader/decoder.rs index 9f1799133f3..9eea6393b1b 100644 --- a/parquet/src/column/reader/decoder.rs +++ b/parquet/src/column/reader/decoder.rs @@ -81,6 +81,25 @@ pub trait ColumnLevelDecoder { fn read(&mut self, out: &mut Self::Slice, range: Range) -> Result; } +pub trait RepetitionLevelDecoder: ColumnLevelDecoder { + /// Skips over repetition level corresponding to `num_records` records, where a record + /// is delimited by a repetition level of 0 + /// + /// Returns the number of records skipped, and the number of levels skipped + fn skip_rep_levels(&mut self, num_records: usize) -> Result<(usize, usize)>; +} + +pub trait DefinitionLevelDecoder: ColumnLevelDecoder { + /// Skips over `num_levels` definition levels + /// + /// Returns the number of values skipped, and the number of levels skipped + fn skip_def_levels( + &mut self, + num_levels: usize, + max_def_level: i16, + ) -> Result<(usize, usize)>; +} + /// Decodes value data to a [`ValuesBufferSlice`] pub trait ColumnValueDecoder { type Slice: ValuesBufferSlice + ?Sized; @@ -126,6 +145,11 @@ pub trait ColumnValueDecoder { /// Implementations may panic if `range` overlaps with already written data /// fn read(&mut self, out: &mut Self::Slice, range: Range) -> Result; + + /// Skips over `num_values` values + /// + /// Returns the number of values skipped + fn skip_values(&mut self, num_values: usize) -> Result; } /// An implementation of [`ColumnValueDecoder`] for `[T::T]` @@ -225,6 +249,10 @@ impl ColumnValueDecoder for ColumnValueDecoderImpl { current_decoder.get(&mut out[range]) } + + fn skip_values(&mut self, _num_values: usize) -> Result { + todo!() + } } /// An implementation of [`ColumnLevelDecoder`] for `[i16]` @@ -266,3 +294,19 @@ impl ColumnLevelDecoder for ColumnLevelDecoderImpl { } } } + +impl DefinitionLevelDecoder for ColumnLevelDecoderImpl { + fn skip_def_levels( + &mut self, + _num_levels: usize, + _max_def_level: i16, + ) -> Result<(usize, usize)> { + todo!() + } +} + +impl RepetitionLevelDecoder for ColumnLevelDecoderImpl { + fn skip_rep_levels(&mut self, _num_records: usize) -> Result<(usize, usize)> { + todo!() + } +} diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 6ff73e041e8..27dc0e0bb5b 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -25,7 +25,7 @@ use parquet_format::{PageHeader, PageType}; use thrift::protocol::TCompactInputProtocol; use crate::basic::{Compression, Encoding, Type}; -use crate::column::page::{Page, PageReader}; +use crate::column::page::{Page, PageMetadata, PageReader}; use crate::compression::{create_codec, Codec}; use crate::errors::{ParquetError, Result}; use crate::file::page_index::index_reader; @@ -555,6 +555,14 @@ impl PageReader for SerializedPageReader { // We are at the end of this column chunk and no more page left. Return None. Ok(None) } + + fn peek_next_page(&self) -> Result> { + todo!() + } + + fn skip_next_page(&mut self) -> Result<()> { + todo!() + } } #[cfg(test)] diff --git a/parquet/src/util/test_common/page_util.rs b/parquet/src/util/test_common/page_util.rs index 3719d280a90..0b70c38ad0e 100644 --- a/parquet/src/util/test_common/page_util.rs +++ b/parquet/src/util/test_common/page_util.rs @@ -16,7 +16,7 @@ // under the License. use crate::basic::Encoding; -use crate::column::page::PageReader; +use crate::column::page::{PageMetadata, PageReader}; use crate::column::page::{Page, PageIterator}; use crate::data_type::DataType; use crate::encodings::encoding::{get_encoder, DictEncoder, Encoder}; @@ -172,6 +172,14 @@ impl + Send> PageReader for InMemoryPageReader

{ fn get_next_page(&mut self) -> Result> { Ok(self.page_iter.next()) } + + fn peek_next_page(&self) -> Result> { + unimplemented!() + } + + fn skip_next_page(&mut self) -> Result<()> { + unimplemented!() + } } impl + Send> Iterator for InMemoryPageReader

{ From 7996cd2085ba20ace26a6c8069176c35be9cecee Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Tue, 5 Jul 2022 09:23:42 -0400 Subject: [PATCH 2/5] Update parquet/src/arrow/record_reader/mod.rs Co-authored-by: Yang Jiang --- parquet/src/arrow/record_reader/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/src/arrow/record_reader/mod.rs b/parquet/src/arrow/record_reader/mod.rs index 89e4e1c3e31..046e01d4672 100644 --- a/parquet/src/arrow/record_reader/mod.rs +++ b/parquet/src/arrow/record_reader/mod.rs @@ -220,7 +220,7 @@ where self.consume_bitmap(); self.reset(); - let remaining = buffered_records - num_records; + let remaining = num_records - buffered_records; if remaining == 0 { return Ok(buffered_records); From 45cbee0b68d620863350783d2382d2fd12fe0515 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 5 Jul 2022 10:20:47 -0400 Subject: [PATCH 3/5] Remove empty google.protobuf.rs --- arrow-flight/src/sql/google.protobuf.rs | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 arrow-flight/src/sql/google.protobuf.rs diff --git a/arrow-flight/src/sql/google.protobuf.rs b/arrow-flight/src/sql/google.protobuf.rs deleted file mode 100644 index e69de29bb2d..00000000000 From d8562402cb211a34f209c7cd72580ef70eda0d56 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 7 Jul 2022 09:16:11 -0400 Subject: [PATCH 4/5] Replace todo with nyi_err --- parquet/src/arrow/array_reader/byte_array.rs | 2 +- parquet/src/arrow/array_reader/byte_array_dictionary.rs | 2 +- parquet/src/arrow/async_reader.rs | 4 ++-- parquet/src/arrow/record_reader/definition_levels.rs | 2 +- parquet/src/column/reader/decoder.rs | 6 +++--- parquet/src/file/serialized_reader.rs | 4 ++-- 6 files changed, 10 insertions(+), 10 deletions(-) diff --git a/parquet/src/arrow/array_reader/byte_array.rs b/parquet/src/arrow/array_reader/byte_array.rs index 6907eef798e..b762236c4be 100644 --- a/parquet/src/arrow/array_reader/byte_array.rs +++ b/parquet/src/arrow/array_reader/byte_array.rs @@ -216,7 +216,7 @@ impl ColumnValueDecoder } fn skip_values(&mut self, _num_values: usize) -> Result { - todo!() + Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792")) } } diff --git a/parquet/src/arrow/array_reader/byte_array_dictionary.rs b/parquet/src/arrow/array_reader/byte_array_dictionary.rs index d13fd78bea9..bfe55749991 100644 --- a/parquet/src/arrow/array_reader/byte_array_dictionary.rs +++ b/parquet/src/arrow/array_reader/byte_array_dictionary.rs @@ -377,7 +377,7 @@ where } fn skip_values(&mut self, _num_values: usize) -> Result { - todo!() + Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792")) } } diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs index 262bd4f2c9d..b251c2a827e 100644 --- a/parquet/src/arrow/async_reader.rs +++ b/parquet/src/arrow/async_reader.rs @@ -553,11 +553,11 @@ impl PageReader for InMemoryColumnChunkReader { } fn peek_next_page(&self) -> Result> { - todo!() + Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792")) } fn skip_next_page(&mut self) -> Result<()> { - todo!() + Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792")) } } diff --git a/parquet/src/arrow/record_reader/definition_levels.rs b/parquet/src/arrow/record_reader/definition_levels.rs index d2276990f05..21526f21f6c 100644 --- a/parquet/src/arrow/record_reader/definition_levels.rs +++ b/parquet/src/arrow/record_reader/definition_levels.rs @@ -229,7 +229,7 @@ impl DefinitionLevelDecoder for DefinitionLevelBufferDecoder { _num_levels: usize, _max_def_level: i16, ) -> Result<(usize, usize)> { - todo!() + Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792")) } } diff --git a/parquet/src/column/reader/decoder.rs b/parquet/src/column/reader/decoder.rs index 9eea6393b1b..6fefdca23e1 100644 --- a/parquet/src/column/reader/decoder.rs +++ b/parquet/src/column/reader/decoder.rs @@ -251,7 +251,7 @@ impl ColumnValueDecoder for ColumnValueDecoderImpl { } fn skip_values(&mut self, _num_values: usize) -> Result { - todo!() + Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792")) } } @@ -301,12 +301,12 @@ impl DefinitionLevelDecoder for ColumnLevelDecoderImpl { _num_levels: usize, _max_def_level: i16, ) -> Result<(usize, usize)> { - todo!() + Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792")) } } impl RepetitionLevelDecoder for ColumnLevelDecoderImpl { fn skip_rep_levels(&mut self, _num_records: usize) -> Result<(usize, usize)> { - todo!() + Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792")) } } diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 27dc0e0bb5b..1dfd1eb45d2 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -557,11 +557,11 @@ impl PageReader for SerializedPageReader { } fn peek_next_page(&self) -> Result> { - todo!() + Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792")) } fn skip_next_page(&mut self) -> Result<()> { - todo!() + Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792")) } } From 2a572d7b3d40fb3285f02cc0016b63bbdc09a659 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 7 Jul 2022 09:17:42 -0400 Subject: [PATCH 5/5] Update doc comment --- parquet/src/arrow/arrow_reader.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs index 39adf7c1e7d..6a3270762f8 100644 --- a/parquet/src/arrow/arrow_reader.rs +++ b/parquet/src/arrow/arrow_reader.rs @@ -127,7 +127,7 @@ impl ArrowReaderOptions { /// Scan rows from the parquet file according to the provided `selection` /// - /// TODO: Make public once row selection fully implemented + /// TODO: Make public once row selection fully implemented (#1792) pub(crate) fn with_row_selection( self, selection: impl Into>, @@ -348,7 +348,7 @@ impl ParquetRecordBatchReader { /// a time from [`ArrayReader`] based on the configured `selection`. If `selection` is `None` /// all rows will be returned /// - /// TODO: Make public once row selection fully implemented + /// TODO: Make public once row selection fully implemented (#1792) pub(crate) fn new( batch_size: usize, array_reader: Box,