From aeee9f8048ac5a58a3f180e787fab42c1683d751 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 12 Aug 2022 14:24:51 +0100 Subject: [PATCH 1/4] Add ParquetRecordBatchReaderBuilder (#2427) --- parquet/src/arrow/array_reader/mod.rs | 49 ++- parquet/src/arrow/arrow_reader/mod.rs | 503 +++++++++++++++++--------- parquet/src/arrow/arrow_writer/mod.rs | 31 +- parquet/src/arrow/async_reader.rs | 125 +------ parquet/src/arrow/mod.rs | 4 +- parquet/src/arrow/schema.rs | 25 +- parquet/src/file/serialized_reader.rs | 16 +- 7 files changed, 426 insertions(+), 327 deletions(-) diff --git a/parquet/src/arrow/array_reader/mod.rs b/parquet/src/arrow/array_reader/mod.rs index 54c45a336a3..ef627b2c2eb 100644 --- a/parquet/src/arrow/array_reader/mod.rs +++ b/parquet/src/arrow/array_reader/mod.rs @@ -124,6 +124,49 @@ impl RowGroupCollection for Arc { } } +pub(crate) struct FileReaderRowGroupCollection { + reader: Arc, + row_groups: Option>, +} + +impl FileReaderRowGroupCollection { + pub fn new(reader: Arc, row_groups: Option>) -> Self { + Self { reader, row_groups } + } +} + +impl RowGroupCollection for FileReaderRowGroupCollection { + fn schema(&self) -> SchemaDescPtr { + self.reader.metadata().file_metadata().schema_descr_ptr() + } + + fn num_rows(&self) -> usize { + match &self.row_groups { + None => self.reader.metadata().file_metadata().num_rows() as usize, + Some(row_groups) => { + let meta = self.reader.metadata().row_groups(); + row_groups + .iter() + .map(|x| meta[*x].num_rows() as usize) + .sum() + } + } + } + + fn column_chunks(&self, i: usize) -> Result> { + let iterator = match &self.row_groups { + Some(row_groups) => FilePageIterator::with_row_groups( + i, + Box::new(row_groups.clone().into_iter()), + Arc::clone(&self.reader), + )?, + None => FilePageIterator::new(i, Arc::clone(&self.reader))?, + }; + + Ok(Box::new(iterator)) + } +} + /// Uses `record_reader` to read up to `batch_size` records from `pages` /// /// Returns the number of records read, which can be less than `batch_size` if @@ -167,9 +210,9 @@ fn skip_records( pages: &mut dyn PageIterator, batch_size: usize, ) -> Result - where - V: ValuesBuffer + Default, - CV: ColumnValueDecoder, +where + V: ValuesBuffer + Default, + CV: ColumnValueDecoder, { let mut records_skipped = 0usize; while records_skipped < batch_size { diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index e363919f651..66c84bf9617 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -27,7 +27,9 @@ use arrow::error::Result as ArrowResult; use arrow::record_batch::{RecordBatch, RecordBatchReader}; use arrow::{array::StructArray, error::ArrowError}; -use crate::arrow::array_reader::{build_array_reader, ArrayReader}; +use crate::arrow::array_reader::{ + build_array_reader, ArrayReader, FileReaderRowGroupCollection, +}; use crate::arrow::schema::parquet_to_arrow_schema; use crate::arrow::schema::parquet_to_arrow_schema_by_columns; use crate::arrow::ProjectionMask; @@ -48,9 +50,127 @@ pub(crate) use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter}; #[allow(unused_imports)] pub(crate) use selection::{RowSelection, RowSelector}; +/// A generic builder for constructing sync or async arrow parquet readers. This is not intended +/// to be used directly, instead you should use the specialization for the type of reader +/// you wish to use +/// +/// * For a synchronous API - [`ParquetRecordBatchReaderBuilder`] +/// * For an asynchronous API - [`ParquetRecordBatchStreamBuilder`] +/// +/// [`ParquetRecordBatchStreamBuilder`]: [crate::arrow::async_reader::ParquetRecordBatchStreamBuilder] +pub struct ArrowReaderBuilder { + pub(crate) input: T, + + pub(crate) metadata: Arc, + + pub(crate) schema: SchemaRef, + + pub(crate) batch_size: usize, + + pub(crate) row_groups: Option>, + + pub(crate) projection: ProjectionMask, + + pub(crate) filter: Option, + + pub(crate) selection: Option, +} + +impl ArrowReaderBuilder { + pub(crate) fn new_builder( + input: T, + metadata: Arc, + options: ArrowReaderOptions, + ) -> Result { + let kv_metadata = match options.skip_arrow_metadata { + true => None, + false => metadata.file_metadata().key_value_metadata(), + }; + + let schema = Arc::new(parquet_to_arrow_schema( + metadata.file_metadata().schema_descr(), + kv_metadata, + )?); + + Ok(Self { + input, + metadata, + schema, + batch_size: 1024, + row_groups: None, + projection: ProjectionMask::all(), + filter: None, + selection: None, + }) + } + + /// Returns a reference to the [`ParquetMetaData`] for this parquet file + pub fn metadata(&self) -> &Arc { + &self.metadata + } + + /// Returns the parquet [`SchemaDescriptor`] for this parquet file + pub fn parquet_schema(&self) -> &SchemaDescriptor { + self.metadata.file_metadata().schema_descr() + } + + /// Returns the arrow [`SchemaRef`] for this parquet file + pub fn schema(&self) -> &SchemaRef { + &self.schema + } + + /// Set the size of [`RecordBatch`] to produce + pub fn with_batch_size(self, batch_size: usize) -> Self { + Self { batch_size, ..self } + } + + /// Only read data from the provided row group indexes + pub fn with_row_groups(self, row_groups: Vec) -> Self { + Self { + row_groups: Some(row_groups), + ..self + } + } + + /// Only read data from the provided column indexes + pub fn with_projection(self, mask: ProjectionMask) -> Self { + Self { + projection: mask, + ..self + } + } + + /// Provide a [`RowSelection] to filter out rows, and avoid fetching their + /// data into memory + /// + /// Row group filtering is applied prior to this, and rows from skipped + /// row groups should not be included in the [`RowSelection`] + /// + /// TODO: Make public once stable (#1792) + #[allow(unused)] + pub(crate) fn with_row_selection(self, selection: RowSelection) -> Self { + Self { + selection: Some(selection), + ..self + } + } + + /// Provide a [`RowFilter`] to skip decoding rows + /// + /// TODO: Make public once stable (#1792) + #[allow(unused)] + pub(crate) fn with_row_filter(self, filter: RowFilter) -> Self { + Self { + filter: Some(filter), + ..self + } + } +} + /// Arrow reader api. /// With this api, user can get arrow schema from parquet file, and read parquet data /// into arrow arrays. +#[deprecated(note = "Use ParquetRecordBatchReaderBuilder instead")] pub trait ArrowReader { type RecordReader: RecordBatchReader; @@ -84,10 +204,14 @@ pub trait ArrowReader { ) -> Result; } +/// Options that control how metadata is read for a parquet file +/// +/// See [`ArrowReaderBuilder`] for how to configure how the column data +/// is then read from the file, including projection and filter pushdown #[derive(Debug, Clone, Default)] pub struct ArrowReaderOptions { skip_arrow_metadata: bool, - selection: Option, + page_index: bool, } impl ArrowReaderOptions { @@ -109,27 +233,29 @@ impl ArrowReaderOptions { } } - /// Scan rows from the parquet file according to the provided `selection` + /// Set this true to enable decoding of the [PageIndex] if present. This can be used + /// to push down predicates to the parquet scan, potentially eliminating unnecessary IO /// - /// TODO: Revisit this API, as [`Self`] is provided before the file metadata is available - #[allow(unused)] - pub(crate) fn with_row_selection(self, selection: impl Into) -> Self { - Self { - selection: Some(selection.into()), - ..self - } + /// See [`RowFilter`] and [`RowSelection`] for more information + /// + /// [PageIndex]: [https://github.com/apache/parquet-format/blob/master/PageIndex.md] + pub fn with_page_index(self, page_index: bool) -> Self { + Self { page_index, ..self } } } /// An `ArrowReader` that can be used to synchronously read parquet data as [`RecordBatch`] /// /// See [`crate::arrow::async_reader`] for an asynchronous interface +#[deprecated(note = "Use ParquetRecordBatchReaderBuilder instead")] pub struct ParquetFileArrowReader { file_reader: Arc, + #[allow(deprecated)] options: ArrowReaderOptions, } +#[allow(deprecated)] impl ArrowReader for ParquetFileArrowReader { type RecordReader = ParquetRecordBatchReader; @@ -165,11 +291,12 @@ impl ArrowReader for ParquetFileArrowReader { Ok(ParquetRecordBatchReader::new( batch_size, array_reader, - self.options.selection.clone(), + None, )) } } +#[allow(deprecated)] impl ParquetFileArrowReader { /// Create a new [`ParquetFileArrowReader`] with the provided [`ChunkReader`] /// @@ -194,15 +321,7 @@ impl ParquetFileArrowReader { chunk_reader: R, options: ArrowReaderOptions, ) -> Result { - let file_reader = if options.selection.is_some() { - let options = ReadOptionsBuilder::new().with_page_index().build(); - Arc::new(SerializedFileReader::new_with_options( - chunk_reader, - options, - )?) - } else { - Arc::new(SerializedFileReader::new(chunk_reader)?) - }; + let file_reader = Arc::new(SerializedFileReader::new(chunk_reader)?); Ok(Self::new_with_options(file_reader, options)) } @@ -252,6 +371,55 @@ impl ParquetFileArrowReader { } } +#[doc(hidden)] +/// A newtype used within [`ReaderOptionsBuilder`] to distinguish sync readers from async +pub struct SyncReader(SerializedFileReader); + +/// A synchronous builder used to construct [`ParquetRecordBatchReader`] for a file +/// +/// For an async API see [`crate::arrow::async_reader::ParquetRecordBatchStreamBuilder`] +pub type ParquetRecordBatchReaderBuilder = ArrowReaderBuilder>; + +impl ArrowReaderBuilder> { + /// Create a new [`ParquetRecordBatchReaderBuilder`] + pub fn try_new(reader: T) -> Result { + Self::try_new_with_options(reader, Default::default()) + } + + /// Create a new [`ParquetRecordBatchReaderBuilder`] with [`ArrowReaderOptions`] + pub fn try_new_with_options(reader: T, options: ArrowReaderOptions) -> Result { + let reader = match options.page_index { + true => { + let read_options = ReadOptionsBuilder::new().with_page_index().build(); + SerializedFileReader::new_with_options(reader, read_options)? + } + false => SerializedFileReader::new(reader)?, + }; + + let metadata = Arc::clone(reader.metadata_ref()); + Self::new_builder(SyncReader(reader), metadata, options) + } + + pub fn build(self) -> Result { + let reader = + FileReaderRowGroupCollection::new(Arc::new(self.input.0), self.row_groups); + let array_reader = build_array_reader(self.schema, self.projection, &reader)?; + + if self.filter.is_some() { + // TODO: Support RowFilter within sync interface (#2431) + return Err(nyi_err!( + "RowFilter is currently not supported within the sync interface" + )); + } + + Ok(ParquetRecordBatchReader::new( + self.batch_size, + array_reader, + self.selection, + )) + } +} + /// An `Iterator>` that yields [`RecordBatch`] /// read from a parquet data source pub struct ParquetRecordBatchReader { @@ -339,6 +507,18 @@ impl RecordBatchReader for ParquetRecordBatchReader { } impl ParquetRecordBatchReader { + /// Create a new [`ParquetRecordBatchReader`] from the provided chunk reader + /// + /// See [`ParquetRecordBatchReaderBuilder`] for more options + pub fn try_new( + reader: T, + batch_size: usize, + ) -> Result { + ParquetRecordBatchReaderBuilder::try_new(reader)? + .with_batch_size(batch_size) + .build() + } + /// Create a new [`ParquetRecordBatchReader`] that will read at most `batch_size` rows at /// a time from [`ArrayReader`] based on the configured `selection`. If `selection` is `None` /// all rows will be returned @@ -396,7 +576,6 @@ mod tests { use bytes::Bytes; use std::cmp::min; use std::collections::VecDeque; - use std::convert::TryFrom; use std::fs::File; use std::io::Seek; use std::path::PathBuf; @@ -412,8 +591,8 @@ mod tests { use arrow::record_batch::{RecordBatch, RecordBatchReader}; use crate::arrow::arrow_reader::{ - ArrowReader, ArrowReaderOptions, ParquetFileArrowReader, - ParquetRecordBatchReader, RowSelection, RowSelector, + ArrowReaderOptions, ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder, + RowSelection, RowSelector, }; use crate::arrow::buffer::converter::{ Converter, FixedSizeArrayConverter, IntervalDayTimeArrayConverter, @@ -427,7 +606,6 @@ mod tests { }; use crate::errors::Result; use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion}; - use crate::file::reader::{FileReader, SerializedFileReader}; use crate::file::writer::SerializedFileWriter; use crate::schema::parser::parse_message_type; use crate::schema::types::{Type, TypePtr}; @@ -435,38 +613,29 @@ mod tests { #[test] fn test_arrow_reader_all_columns() { - let parquet_file_reader = - get_test_reader("parquet/generated_simple_numerics/blogs.parquet"); + let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet"); - let mut arrow_reader = ParquetFileArrowReader::new(parquet_file_reader); - - let record_batch_reader = arrow_reader - .get_record_reader(60) - .expect("Failed to read into array!"); + let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); + let original_schema = Arc::clone(&builder.schema()); + let reader = builder.build().unwrap(); // Verify that the schema was correctly parsed - let original_schema = arrow_reader.get_schema().unwrap().fields().clone(); - assert_eq!(original_schema, *record_batch_reader.schema().fields()); + assert_eq!(original_schema.fields(), reader.schema().fields()); } #[test] fn test_arrow_reader_single_column() { - let parquet_file_reader = - get_test_reader("parquet/generated_simple_numerics/blogs.parquet"); - - let file_metadata = parquet_file_reader.metadata().file_metadata(); + let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet"); - let mask = ProjectionMask::leaves(file_metadata.schema_descr(), [2]); - let mut arrow_reader = ParquetFileArrowReader::new(parquet_file_reader); + let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); + let original_schema = Arc::clone(&builder.schema()); - let record_batch_reader = arrow_reader - .get_record_reader_by_columns(mask, 60) - .expect("Failed to read into array!"); + let mask = ProjectionMask::leaves(builder.parquet_schema(), [2]); + let reader = builder.with_projection(mask).build().unwrap(); // Verify that the schema was correctly parsed - let original_schema = arrow_reader.get_schema().unwrap().fields().clone(); - assert_eq!(1, record_batch_reader.schema().fields().len()); - assert_eq!(original_schema[1], record_batch_reader.schema().fields()[0]); + assert_eq!(1, reader.schema().fields().len()); + assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]); } #[test] @@ -493,9 +662,7 @@ mod tests { file.rewind().unwrap(); - let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap(); - let record_reader = arrow_reader.get_record_reader(2).unwrap(); - + let record_reader = ParquetRecordBatchReader::try_new(file, 2).unwrap(); let batches = record_reader.collect::>>().unwrap(); assert_eq!(batches.len(), 4); @@ -733,9 +900,7 @@ mod tests { writer.write(&written).unwrap(); writer.close().unwrap(); - let read = ParquetFileArrowReader::try_new(Bytes::from(buffer)) - .unwrap() - .get_record_reader(3) + let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3) .unwrap() .collect::>>() .unwrap(); @@ -771,9 +936,7 @@ mod tests { writer.write(&written).unwrap(); writer.close().unwrap(); - let read = ParquetFileArrowReader::try_new(Bytes::from(buffer)) - .unwrap() - .get_record_reader(3) + let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3) .unwrap() .collect::>>() .unwrap(); @@ -813,9 +976,7 @@ mod tests { writer.write(&written).unwrap(); writer.close().unwrap(); - let read = ParquetFileArrowReader::try_new(Bytes::from(buffer)) - .unwrap() - .get_record_reader(3) + let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3) .unwrap() .collect::>>() .unwrap(); @@ -838,9 +999,7 @@ mod tests { for (prefix, target_precision) in file_variants { let path = format!("{}/{}_decimal.parquet", testdata, prefix); let file = File::open(&path).unwrap(); - let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap(); - - let mut record_reader = arrow_reader.get_record_reader(32).unwrap(); + let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap(); let batch = record_reader.next().unwrap().unwrap(); assert_eq!(batch.num_rows(), 24); @@ -1173,35 +1332,42 @@ mod tests { file.rewind().unwrap(); - let mut arrow_reader; - let expected_data: Vec>; - if let Some((selections, row_count)) = opts.row_selections.clone() { - let options = - ArrowReaderOptions::new().with_row_selection(selections.clone()); - arrow_reader = - ParquetFileArrowReader::try_new_with_options(file, options).unwrap(); - let mut without_skip_data = gen_expected_data::(&def_levels, &values); - - let mut skip_data: Vec> = vec![]; - let selections: VecDeque = selections.into(); - for select in selections { - if select.skip { - without_skip_data.drain(0..select.row_count); - } else { - skip_data.extend(without_skip_data.drain(0..select.row_count)); + // TODO: Should be able to always enable page index (#2434) + let options = ArrowReaderOptions::new() + .with_page_index(opts.enabled_statistics == EnabledStatistics::Page); + + let mut builder = + ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap(); + + let expected_data = match opts.row_selections { + Some((selections, row_count)) => { + let mut without_skip_data = gen_expected_data::(&def_levels, &values); + + let mut skip_data: Vec> = vec![]; + let dequeue: VecDeque = selections.clone().into(); + for select in dequeue { + if select.skip { + without_skip_data.drain(0..select.row_count); + } else { + skip_data.extend(without_skip_data.drain(0..select.row_count)); + } } + builder = builder.with_row_selection(selections); + + assert_eq!(skip_data.len(), row_count); + skip_data } - expected_data = skip_data; - assert_eq!(expected_data.len(), row_count); - } else { - arrow_reader = ParquetFileArrowReader::try_new(file).unwrap(); - //get flatten table data - expected_data = gen_expected_data::(&def_levels, &values); - assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups); - } + None => { + //get flatten table data + let expected_data = gen_expected_data::(&def_levels, &values); + assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups); + expected_data + } + }; - let mut record_reader = arrow_reader - .get_record_reader(opts.record_batch_size) + let mut record_reader = builder + .with_batch_size(opts.record_batch_size) + .build() .unwrap(); let mut total_read = 0; @@ -1284,15 +1450,6 @@ mod tests { writer.close() } - fn get_test_reader(file_name: &str) -> Arc> { - let file = get_test_file(file_name); - - let reader = - SerializedFileReader::new(file).expect("Failed to create serialized reader"); - - Arc::new(reader) - } - fn get_test_file(file_name: &str) -> File { let mut path = PathBuf::new(); path.push(arrow::util::test_util::arrow_test_data()); @@ -1309,20 +1466,21 @@ mod tests { let testdata = arrow::util::test_util::parquet_test_data(); let path = format!("{}/nested_structs.rust.parquet", testdata); let file = File::open(&path).unwrap(); - let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap(); - let record_batch_reader = arrow_reader - .get_record_reader(60) - .expect("Failed to read into array!"); + let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap(); for batch in record_batch_reader { batch.unwrap(); } - let mask = ProjectionMask::leaves(arrow_reader.parquet_schema(), [3, 8, 10]); - let projected_reader = arrow_reader - .get_record_reader_by_columns(mask.clone(), 60) + let file = File::open(&path).unwrap(); + let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); + + let mask = ProjectionMask::leaves(builder.parquet_schema(), [3, 8, 10]); + let projected_reader = builder + .with_projection(mask) + .with_batch_size(60) + .build() .unwrap(); - let projected_schema = arrow_reader.get_schema_by_columns(mask).unwrap(); let expected_schema = Schema::new(vec![ Field::new( @@ -1345,12 +1503,11 @@ mod tests { ]); // Tests for #1652 and #1654 - assert_eq!(projected_reader.schema().as_ref(), &projected_schema); - assert_eq!(expected_schema, projected_schema); + assert_eq!(&expected_schema, projected_reader.schema().as_ref()); for batch in projected_reader { let batch = batch.unwrap(); - assert_eq!(batch.schema().as_ref(), &projected_schema); + assert_eq!(batch.schema().as_ref(), &expected_schema); } } @@ -1359,10 +1516,7 @@ mod tests { let testdata = arrow::util::test_util::parquet_test_data(); let path = format!("{}/nested_maps.snappy.parquet", testdata); let file = File::open(&path).unwrap(); - let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap(); - let record_batch_reader = arrow_reader - .get_record_reader(60) - .expect("Failed to read into array!"); + let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap(); for batch in record_batch_reader { batch.unwrap(); @@ -1406,10 +1560,10 @@ mod tests { writer.close().unwrap(); } - let mut reader = ParquetFileArrowReader::try_new(file).unwrap(); - let mask = ProjectionMask::leaves(reader.parquet_schema(), [0]); + let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); + let mask = ProjectionMask::leaves(builder.parquet_schema(), [0]); - let reader = reader.get_record_reader_by_columns(mask, 1024).unwrap(); + let reader = builder.with_projection(mask).build().unwrap(); let expected_schema = Schema::new(vec![Field::new( "group", @@ -1443,10 +1597,8 @@ mod tests { ]; let file = Bytes::from(data); - let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap(); - let mut record_batch_reader = arrow_reader - .get_record_reader_by_columns(ProjectionMask::all(), 10) - .unwrap(); + let mut record_batch_reader = + ParquetRecordBatchReader::try_new(file, 10).unwrap(); let error = record_batch_reader.next().unwrap().unwrap_err(); @@ -1519,9 +1671,7 @@ mod tests { file.rewind().unwrap(); - let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap(); - - let record_reader = arrow_reader.get_record_reader(3).unwrap(); + let record_reader = ParquetRecordBatchReader::try_new(file, 3).unwrap(); let batches = record_reader .collect::>>() @@ -1558,10 +1708,8 @@ mod tests { let testdata = arrow::util::test_util::parquet_test_data(); let path = format!("{}/null_list.parquet", testdata); let file = File::open(&path).unwrap(); - let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap(); - let mut record_batch_reader = arrow_reader - .get_record_reader(60) - .expect("Failed to read into array!"); + let mut record_batch_reader = + ParquetRecordBatchReader::try_new(file, 60).unwrap(); let batch = record_batch_reader.next().unwrap().unwrap(); assert_eq!(batch.num_rows(), 1); @@ -1584,8 +1732,7 @@ mod tests { fn test_null_schema_inference() { let testdata = arrow::util::test_util::parquet_test_data(); let path = format!("{}/null_list.parquet", testdata); - let reader = - Arc::new(SerializedFileReader::try_from(File::open(&path).unwrap()).unwrap()); + let file = File::open(&path).unwrap(); let arrow_field = Field::new( "emptylist", @@ -1593,9 +1740,10 @@ mod tests { true, ); - let options = ArrowReaderOptions::default().with_skip_arrow_metadata(true); - let mut arrow_reader = ParquetFileArrowReader::new_with_options(reader, options); - let schema = arrow_reader.get_schema().unwrap(); + let options = ArrowReaderOptions::new().with_skip_arrow_metadata(true); + let builder = + ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap(); + let schema = builder.schema(); assert_eq!(schema.fields().len(), 1); assert_eq!(schema.field(0), &arrow_field); } @@ -1637,40 +1785,38 @@ mod tests { file }; - let v1_reader = Arc::new( - SerializedFileReader::new(file(WriterVersion::PARQUET_1_0)).unwrap(), - ); - let v2_reader = Arc::new( - SerializedFileReader::new(file(WriterVersion::PARQUET_2_0)).unwrap(), - ); + let skip_options = ArrowReaderOptions::new().with_skip_arrow_metadata(true); - let mut arrow_reader = ParquetFileArrowReader::new(v1_reader.clone()); - assert_eq!( - &arrow_reader.get_schema().unwrap(), - schema_with_metadata.as_ref() - ); + let v1_reader = file(WriterVersion::PARQUET_1_0); + let v2_reader = file(WriterVersion::PARQUET_2_0); - let options = ArrowReaderOptions::new().with_skip_arrow_metadata(true); - let mut arrow_reader = - ParquetFileArrowReader::new_with_options(v1_reader, options); - assert_eq!( - &arrow_reader.get_schema().unwrap(), - schema_without_metadata.as_ref() - ); + let arrow_reader = + ParquetRecordBatchReader::try_new(v1_reader.try_clone().unwrap(), 1024) + .unwrap(); + assert_eq!(arrow_reader.schema(), schema_with_metadata); - let mut arrow_reader = ParquetFileArrowReader::new(v2_reader.clone()); - assert_eq!( - &arrow_reader.get_schema().unwrap(), - schema_with_metadata.as_ref() - ); + let reader = ParquetRecordBatchReaderBuilder::try_new_with_options( + v1_reader, + skip_options.clone(), + ) + .unwrap() + .build() + .unwrap(); + assert_eq!(reader.schema(), schema_without_metadata); - let options = ArrowReaderOptions::new().with_skip_arrow_metadata(true); - let mut arrow_reader = - ParquetFileArrowReader::new_with_options(v2_reader, options); - assert_eq!( - &arrow_reader.get_schema().unwrap(), - schema_without_metadata.as_ref() - ); + let arrow_reader = + ParquetRecordBatchReader::try_new(v2_reader.try_clone().unwrap(), 1024) + .unwrap(); + assert_eq!(arrow_reader.schema(), schema_with_metadata); + + let reader = ParquetRecordBatchReaderBuilder::try_new_with_options( + v2_reader, + skip_options, + ) + .unwrap() + .build() + .unwrap(); + assert_eq!(reader.schema(), schema_without_metadata); } #[test] @@ -1679,13 +1825,16 @@ mod tests { let path = format!("{}/alltypes_plain.parquet", testdata); let file = File::open(&path).unwrap(); - let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap(); - let file_metadata = arrow_reader.metadata().file_metadata(); + let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); + let file_metadata = builder.metadata().file_metadata(); let expected_rows = file_metadata.num_rows() as usize; - let schema = file_metadata.schema_descr_ptr(); - let mask = ProjectionMask::leaves(&schema, []); - let batch_reader = arrow_reader.get_record_reader_by_columns(mask, 2).unwrap(); + let mask = ProjectionMask::leaves(builder.parquet_schema(), []); + let batch_reader = builder + .with_projection(mask) + .with_batch_size(2) + .build() + .unwrap(); let mut total_rows = 0; for maybe_batch in batch_reader { @@ -1731,8 +1880,8 @@ mod tests { } writer.close().unwrap(); - let mut file_reader = ParquetFileArrowReader::try_new(Bytes::from(buf)).unwrap(); - let mut record_reader = file_reader.get_record_reader(batch_size).unwrap(); + let mut record_reader = + ParquetRecordBatchReader::try_new(Bytes::from(buf), batch_size).unwrap(); assert_eq!( batch_size, record_reader.next().unwrap().unwrap().num_rows() @@ -1847,9 +1996,8 @@ mod tests { let path = format!("{}/alltypes_tiny_pages_plain.parquet", testdata); let test_file = File::open(&path).unwrap(); - let mut serial_arrow_reader = - ParquetFileArrowReader::try_new(File::open(path).unwrap()).unwrap(); - let mut serial_reader = serial_arrow_reader.get_record_reader(7300).unwrap(); + let mut serial_reader = + ParquetRecordBatchReader::try_new(File::open(path).unwrap(), 7300).unwrap(); let data = serial_reader.next().unwrap().unwrap(); let do_test = |batch_size: usize, selection_len: usize| { @@ -1889,15 +2037,14 @@ mod tests { batch_size: usize, selections: RowSelection, ) -> ParquetRecordBatchReader { - let arrow_reader_options = - ArrowReaderOptions::new().with_row_selection(selections); - - let mut skip_arrow_reader = ParquetFileArrowReader::try_new_with_options( - test_file.try_clone().unwrap(), - arrow_reader_options, - ) - .unwrap(); - skip_arrow_reader.get_record_reader(batch_size).unwrap() + let options = ArrowReaderOptions::new().with_page_index(true); + let file = test_file.try_clone().unwrap(); + ParquetRecordBatchReaderBuilder::try_new_with_options(file, options) + .unwrap() + .with_batch_size(batch_size) + .with_row_selection(selections) + .build() + .unwrap() } } } diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 08f37c39565..e6fbccb8966 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -606,6 +606,9 @@ mod tests { use std::fs::File; use std::sync::Arc; + use crate::arrow::arrow_reader::{ + ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder, + }; use arrow::datatypes::ToByteSlice; use arrow::datatypes::{DataType, Field, Schema, UInt32Type, UInt8Type}; use arrow::error::Result as ArrowResult; @@ -613,7 +616,6 @@ mod tests { use arrow::util::pretty::pretty_format_batches; use arrow::{array::*, buffer::Buffer}; - use crate::arrow::{ArrowReader, ParquetFileArrowReader}; use crate::basic::Encoding; use crate::file::metadata::ParquetMetaData; use crate::file::properties::WriterVersion; @@ -667,8 +669,8 @@ mod tests { } let cursor = Bytes::from(buffer); - let mut arrow_reader = ParquetFileArrowReader::try_new(cursor).unwrap(); - let mut record_batch_reader = arrow_reader.get_record_reader(1024).unwrap(); + let mut record_batch_reader = + ParquetRecordBatchReader::try_new(cursor, 1024).unwrap(); let actual_batch = record_batch_reader .next() @@ -1116,9 +1118,8 @@ mod tests { writer.write(expected_batch).unwrap(); writer.close().unwrap(); - let mut arrow_reader = - ParquetFileArrowReader::try_new(file.try_clone().unwrap()).unwrap(); - let mut record_batch_reader = arrow_reader.get_record_reader(1024).unwrap(); + let mut record_batch_reader = + ParquetRecordBatchReader::try_new(file.try_clone().unwrap(), 1024).unwrap(); let actual_batch = record_batch_reader .next() @@ -1889,11 +1890,12 @@ mod tests { writer.close().unwrap(); - let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap(); - assert_eq!(&row_group_sizes(arrow_reader.metadata()), &[200, 200, 50]); + let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); + assert_eq!(&row_group_sizes(builder.metadata()), &[200, 200, 50]); - let batches = arrow_reader - .get_record_reader(100) + let batches = builder + .with_batch_size(100) + .build() .unwrap() .collect::>>() .unwrap(); @@ -2034,11 +2036,12 @@ mod tests { // Should have written entire first batch and first row of second to the first row group // leaving a single row in the second row group - let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap(); - assert_eq!(&row_group_sizes(arrow_reader.metadata()), &[6, 1]); + let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); + assert_eq!(&row_group_sizes(builder.metadata()), &[6, 1]); - let batches = arrow_reader - .get_record_reader(2) + let batches = builder + .with_batch_size(2) + .build() .unwrap() .collect::>>() .unwrap(); diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs index 5c186d7aa76..e885a50df27 100644 --- a/parquet/src/arrow/async_reader.rs +++ b/parquet/src/arrow/async_reader.rs @@ -96,9 +96,9 @@ use arrow::record_batch::RecordBatch; use crate::arrow::array_reader::{build_array_reader, RowGroupCollection}; use crate::arrow::arrow_reader::{ - evaluate_predicate, ParquetRecordBatchReader, RowFilter, RowSelection, + evaluate_predicate, ArrowReaderBuilder, ParquetRecordBatchReader, RowFilter, + RowSelection, }; -use crate::arrow::schema::parquet_to_arrow_schema; use crate::arrow::ProjectionMask; use crate::basic::Compression; use crate::column::page::{Page, PageIterator, PageMetadata, PageReader}; @@ -108,7 +108,7 @@ use crate::file::footer::{decode_footer, decode_metadata}; use crate::file::metadata::{ParquetMetaData, RowGroupMetaData}; use crate::file::serialized_reader::{decode_page, read_page_header}; use crate::file::FOOTER_SIZE; -use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor}; +use crate::schema::types::{ColumnDescPtr, SchemaDescPtr}; /// The asynchronous interface used by [`ParquetRecordBatchStream`] to read parquet files pub trait AsyncFileReader: Send { @@ -194,112 +194,23 @@ impl AsyncFileReader for T { } } +#[doc(hidden)] +/// A newtype used within [`ReaderOptionsBuilder`] to distinguish sync readers from async +pub struct AsyncReader(T); + /// A builder used to construct a [`ParquetRecordBatchStream`] for a parquet file /// /// In particular, this handles reading the parquet file metadata, allowing consumers /// to use this information to select what specific columns, row groups, etc... /// they wish to be read by the resulting stream /// -pub struct ParquetRecordBatchStreamBuilder { - input: T, - - metadata: Arc, - - schema: SchemaRef, - - batch_size: usize, - - row_groups: Option>, - - projection: ProjectionMask, - - filter: Option, - - selection: Option, -} +pub type ParquetRecordBatchStreamBuilder = ArrowReaderBuilder>; -impl ParquetRecordBatchStreamBuilder { +impl ArrowReaderBuilder> { /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided parquet file pub async fn new(mut input: T) -> Result { let metadata = input.get_metadata().await?; - - let schema = Arc::new(parquet_to_arrow_schema( - metadata.file_metadata().schema_descr(), - metadata.file_metadata().key_value_metadata(), - )?); - - Ok(Self { - input, - metadata, - schema, - batch_size: 1024, - row_groups: None, - projection: ProjectionMask::all(), - filter: None, - selection: None, - }) - } - - /// Returns a reference to the [`ParquetMetaData`] for this parquet file - pub fn metadata(&self) -> &Arc { - &self.metadata - } - - /// Returns the parquet [`SchemaDescriptor`] for this parquet file - pub fn parquet_schema(&self) -> &SchemaDescriptor { - self.metadata.file_metadata().schema_descr() - } - - /// Returns the arrow [`SchemaRef`] for this parquet file - pub fn schema(&self) -> &SchemaRef { - &self.schema - } - - /// Set the size of [`RecordBatch`] to produce - pub fn with_batch_size(self, batch_size: usize) -> Self { - Self { batch_size, ..self } - } - - /// Only read data from the provided row group indexes - pub fn with_row_groups(self, row_groups: Vec) -> Self { - Self { - row_groups: Some(row_groups), - ..self - } - } - - /// Only read data from the provided column indexes - pub fn with_projection(self, mask: ProjectionMask) -> Self { - Self { - projection: mask, - ..self - } - } - - /// Provide a [`RowSelection] to filter out rows, and avoid fetching their - /// data into memory - /// - /// Row group filtering is applied prior to this, and rows from skipped - /// row groups should not be included in the [`RowSelection`] - /// - /// TODO: Make public once stable (#1792) - #[allow(unused)] - pub(crate) fn with_row_selection(self, selection: RowSelection) -> Self { - Self { - selection: Some(selection), - ..self - } - } - - /// Provide a [`RowFilter`] to skip decoding rows - /// - /// TODO: Make public once stable (#1792) - #[allow(unused)] - pub(crate) fn with_row_filter(self, filter: RowFilter) -> Self { - Self { - filter: Some(filter), - ..self - } + Self::new_builder(AsyncReader(input), metadata, Default::default()) } /// Build a new [`ParquetRecordBatchStream`] @@ -321,7 +232,7 @@ impl ParquetRecordBatchStreamBuilder { }; let reader = ReaderFactory { - input: self.input, + input: self.input.0, filter: self.filter, metadata: self.metadata.clone(), schema: self.schema.clone(), @@ -806,8 +717,8 @@ impl PageIterator for ColumnChunkIterator { #[cfg(test)] mod tests { use super::*; - use crate::arrow::arrow_reader::ArrowPredicateFn; - use crate::arrow::{ArrowReader, ArrowWriter, ParquetFileArrowReader}; + use crate::arrow::arrow_reader::{ArrowPredicateFn, ParquetRecordBatchReaderBuilder}; + use crate::arrow::ArrowWriter; use crate::file::footer::parse_metadata; use arrow::array::{Array, ArrayRef, Int32Array, StringArray}; use arrow::error::Result as ArrowResult; @@ -837,7 +748,7 @@ mod tests { let path = format!("{}/alltypes_plain.parquet", testdata); let data = Bytes::from(std::fs::read(path).unwrap()); - let metadata = crate::file::footer::parse_metadata(&data).unwrap(); + let metadata = parse_metadata(&data).unwrap(); let metadata = Arc::new(metadata); assert_eq!(metadata.num_row_groups(), 1); @@ -862,9 +773,11 @@ mod tests { let async_batches: Vec<_> = stream.try_collect().await.unwrap(); - let mut sync_reader = ParquetFileArrowReader::try_new(data).unwrap(); - let sync_batches = sync_reader - .get_record_reader_by_columns(mask, 1024) + let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data) + .unwrap() + .with_projection(mask) + .with_batch_size(104) + .build() .unwrap() .collect::>>() .unwrap(); diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs index 18edd06d72c..dda6025898c 100644 --- a/parquet/src/arrow/mod.rs +++ b/parquet/src/arrow/mod.rs @@ -130,8 +130,8 @@ pub mod async_reader; mod record_reader; experimental!(mod schema); -pub use self::arrow_reader::ArrowReader; -pub use self::arrow_reader::ParquetFileArrowReader; +#[allow(deprecated)] +pub use self::arrow_reader::{ArrowReader, ParquetFileArrowReader}; pub use self::arrow_writer::ArrowWriter; #[cfg(feature = "async")] pub use self::async_reader::ParquetRecordBatchStreamBuilder; diff --git a/parquet/src/arrow/schema.rs b/parquet/src/arrow/schema.rs index 01aefcd48e1..70e7afc9c29 100644 --- a/parquet/src/arrow/schema.rs +++ b/parquet/src/arrow/schema.rs @@ -488,7 +488,7 @@ mod tests { use crate::file::metadata::KeyValue; use crate::{ - arrow::{ArrowReader, ArrowWriter, ParquetFileArrowReader}, + arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, ArrowWriter}, schema::{parser::parse_message_type, types::SchemaDescriptor}, }; @@ -1662,14 +1662,9 @@ mod tests { writer.close()?; // read file back - let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap(); - let read_schema = arrow_reader.get_schema()?; - assert_eq!(schema, read_schema); - - // read all fields by columns - let partial_read_schema = - arrow_reader.get_schema_by_columns(ProjectionMask::all())?; - assert_eq!(schema, partial_read_schema); + let arrow_reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); + let read_schema = arrow_reader.schema(); + assert_eq!(&schema, read_schema.as_ref()); Ok(()) } @@ -1731,15 +1726,9 @@ mod tests { writer.close()?; // read file back - let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap(); - let read_schema = arrow_reader.get_schema()?; - assert_eq!(schema, read_schema); - - // read all fields by columns - let partial_read_schema = - arrow_reader.get_schema_by_columns(ProjectionMask::all())?; - assert_eq!(schema, partial_read_schema); - + let arrow_reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); + let read_schema = arrow_reader.schema(); + assert_eq!(&schema, read_schema.as_ref()); Ok(()) } } diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 0b7451f4bea..e8ef025ad81 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -135,7 +135,7 @@ impl IntoIterator for SerializedFileReader { /// A serialized implementation for Parquet [`FileReader`]. pub struct SerializedFileReader { chunk_reader: Arc, - metadata: ParquetMetaData, + metadata: Arc, } /// A predicate for filtering row groups, invoked with the metadata and index @@ -208,7 +208,7 @@ impl SerializedFileReader { let metadata = footer::parse_metadata(&chunk_reader)?; Ok(Self { chunk_reader: Arc::new(chunk_reader), - metadata, + metadata: Arc::new(metadata), }) } @@ -248,23 +248,27 @@ impl SerializedFileReader { Ok(Self { chunk_reader: Arc::new(chunk_reader), - metadata: ParquetMetaData::new_with_page_index( + metadata: Arc::new(ParquetMetaData::new_with_page_index( metadata.file_metadata().clone(), filtered_row_groups, Some(columns_indexes), Some(offset_indexes), - ), + )), }) } else { Ok(Self { chunk_reader: Arc::new(chunk_reader), - metadata: ParquetMetaData::new( + metadata: Arc::new(ParquetMetaData::new( metadata.file_metadata().clone(), filtered_row_groups, - ), + )), }) } } + + pub(crate) fn metadata_ref(&self) -> &Arc { + &self.metadata + } } /// Get midpoint offset for a row group From 6177465334e3597eab695329f1442b5733f33adb Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 12 Aug 2022 16:33:28 +0100 Subject: [PATCH 2/4] Clippy --- parquet/src/arrow/arrow_reader/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 66c84bf9617..c904c1a59b7 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -616,7 +616,7 @@ mod tests { let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet"); let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); - let original_schema = Arc::clone(&builder.schema()); + let original_schema = Arc::clone(builder.schema()); let reader = builder.build().unwrap(); // Verify that the schema was correctly parsed @@ -628,7 +628,7 @@ mod tests { let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet"); let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); - let original_schema = Arc::clone(&builder.schema()); + let original_schema = Arc::clone(builder.schema()); let mask = ProjectionMask::leaves(builder.parquet_schema(), [2]); let reader = builder.with_projection(mask).build().unwrap(); From c53c6a523fc367ecb2e31d29f775ad9cc62e8f72 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 12 Aug 2022 16:51:01 +0100 Subject: [PATCH 3/4] Fix doc --- parquet/src/arrow/arrow_reader/mod.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index c904c1a59b7..121e85dfe5e 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -236,8 +236,6 @@ impl ArrowReaderOptions { /// Set this true to enable decoding of the [PageIndex] if present. This can be used /// to push down predicates to the parquet scan, potentially eliminating unnecessary IO /// - /// See [`RowFilter`] and [`RowSelection`] for more information - /// /// [PageIndex]: [https://github.com/apache/parquet-format/blob/master/PageIndex.md] pub fn with_page_index(self, page_index: bool) -> Self { Self { page_index, ..self } From 5f2e37e87315134758e3916d0bea3b9e935af86c Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 15 Aug 2022 15:01:16 +0100 Subject: [PATCH 4/4] Review feedback --- parquet/src/arrow/array_reader/mod.rs | 4 ++++ parquet/src/arrow/arrow_reader/mod.rs | 4 +++- parquet/src/arrow/async_reader.rs | 3 +++ 3 files changed, 10 insertions(+), 1 deletion(-) diff --git a/parquet/src/arrow/array_reader/mod.rs b/parquet/src/arrow/array_reader/mod.rs index ef627b2c2eb..480b4d4dfad 100644 --- a/parquet/src/arrow/array_reader/mod.rs +++ b/parquet/src/arrow/array_reader/mod.rs @@ -125,11 +125,15 @@ impl RowGroupCollection for Arc { } pub(crate) struct FileReaderRowGroupCollection { + /// The underling file reader reader: Arc, + /// Optional list of row group indices to scan row_groups: Option>, } impl FileReaderRowGroupCollection { + /// Creates a new [`RowGroupCollection`] from a `FileReader` and an optional + /// list of row group indexes to scan pub fn new(reader: Arc, row_groups: Option>) -> Self { Self { reader, row_groups } } diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 121e85dfe5e..e96b5d8fae7 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -119,7 +119,7 @@ impl ArrowReaderBuilder { &self.schema } - /// Set the size of [`RecordBatch`] to produce + /// Set the size of [`RecordBatch`] to produce. Defaults to 1024 pub fn with_batch_size(self, batch_size: usize) -> Self { Self { batch_size, ..self } } @@ -157,6 +157,8 @@ impl ArrowReaderBuilder { /// Provide a [`RowFilter`] to skip decoding rows /// + /// Row filters are applied after row group selection and row selection + /// /// TODO: Make public once stable (#1792) #[allow(unused)] pub(crate) fn with_row_filter(self, filter: RowFilter) -> Self { diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs index e885a50df27..abe34cf1e88 100644 --- a/parquet/src/arrow/async_reader.rs +++ b/parquet/src/arrow/async_reader.rs @@ -196,6 +196,9 @@ impl AsyncFileReader for T { #[doc(hidden)] /// A newtype used within [`ReaderOptionsBuilder`] to distinguish sync readers from async +/// +/// Allows sharing the same builder for both the sync and async versions, whilst also not +/// breaking the pre-existing ParquetRecordBatchStreamBuilder API pub struct AsyncReader(T); /// A builder used to construct a [`ParquetRecordBatchStream`] for a parquet file