Skip to content

Commit

Permalink
Remove deprecated parquet ArrowReader (#4125)
Browse files Browse the repository at this point in the history
* Remove deprecated parquet ArrowReader

* Update doctest
  • Loading branch information
tustvold committed Apr 28, 2023
1 parent 1434d1f commit 6c3688b
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 183 deletions.
182 changes: 4 additions & 178 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ use arrow_select::filter::prep_null_mask_filter;
use crate::arrow::array_reader::{
build_array_reader, ArrayReader, FileReaderRowGroupCollection, RowGroupCollection,
};
use crate::arrow::schema::{parquet_to_array_schema_and_fields, parquet_to_arrow_schema};
use crate::arrow::schema::{parquet_to_arrow_schema_by_columns, ParquetField};
use crate::arrow::schema::parquet_to_array_schema_and_fields;
use crate::arrow::schema::ParquetField;
use crate::arrow::ProjectionMask;
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{KeyValue, ParquetMetaData};
use crate::file::reader::{ChunkReader, FileReader, SerializedFileReader};
use crate::file::metadata::ParquetMetaData;
use crate::file::reader::{ChunkReader, SerializedFileReader};
use crate::file::serialized_reader::ReadOptionsBuilder;
use crate::schema::types::SchemaDescriptor;

Expand Down Expand Up @@ -198,43 +198,6 @@ impl<T> ArrowReaderBuilder<T> {
}
}

/// Arrow reader api.
/// With this api, user can get arrow schema from parquet file, and read parquet data
/// into arrow arrays.
#[deprecated(note = "Use ParquetRecordBatchReaderBuilder instead")]
pub trait ArrowReader {
type RecordReader: RecordBatchReader;

/// Read parquet schema and convert it into arrow schema.
fn get_schema(&mut self) -> Result<Schema>;

/// Read parquet schema and convert it into arrow schema.
/// This schema only includes columns identified by `mask`.
fn get_schema_by_columns(&mut self, mask: ProjectionMask) -> Result<Schema>;

/// Returns record batch reader from whole parquet file.
///
/// # Arguments
///
/// `batch_size`: The size of each record batch returned from this reader. Only the
/// last batch may contain records less than this size, otherwise record batches
/// returned from this reader should contains exactly `batch_size` elements.
fn get_record_reader(&mut self, batch_size: usize) -> Result<Self::RecordReader>;

/// Returns record batch reader whose record batch contains columns identified by
/// `mask`.
///
/// # Arguments
///
/// `mask`: The columns that should be included in record batches.
/// `batch_size`: Please refer to `get_record_reader`.
fn get_record_reader_by_columns(
&mut self,
mask: ProjectionMask,
batch_size: usize,
) -> Result<Self::RecordReader>;
}

/// Options that control how metadata is read for a parquet file
///
/// See [`ArrowReaderBuilder`] for how to configure how the column data
Expand Down Expand Up @@ -273,143 +236,6 @@ impl ArrowReaderOptions {
}
}

/// An `ArrowReader` that can be used to synchronously read parquet data as [`RecordBatch`]
///
/// See [`crate::arrow::async_reader`] for an asynchronous interface
#[deprecated(note = "Use ParquetRecordBatchReaderBuilder instead")]
pub struct ParquetFileArrowReader {
file_reader: Arc<dyn FileReader>,

#[allow(deprecated)]
options: ArrowReaderOptions,
}

#[allow(deprecated)]
impl ArrowReader for ParquetFileArrowReader {
type RecordReader = ParquetRecordBatchReader;

fn get_schema(&mut self) -> Result<Schema> {
let file_metadata = self.file_reader.metadata().file_metadata();
parquet_to_arrow_schema(file_metadata.schema_descr(), self.get_kv_metadata())
}

fn get_schema_by_columns(&mut self, mask: ProjectionMask) -> Result<Schema> {
let file_metadata = self.file_reader.metadata().file_metadata();
parquet_to_arrow_schema_by_columns(
file_metadata.schema_descr(),
mask,
self.get_kv_metadata(),
)
}

fn get_record_reader(
&mut self,
batch_size: usize,
) -> Result<ParquetRecordBatchReader> {
self.get_record_reader_by_columns(ProjectionMask::all(), batch_size)
}

fn get_record_reader_by_columns(
&mut self,
mask: ProjectionMask,
batch_size: usize,
) -> Result<ParquetRecordBatchReader> {
let (_, field) = parquet_to_array_schema_and_fields(
self.parquet_schema(),
mask,
self.get_kv_metadata(),
)?;
let array_reader = build_array_reader(
field.as_ref(),
&ProjectionMask::all(),
&self.file_reader,
)?;

// Try to avoid allocate large buffer
let batch_size = self.file_reader.num_rows().min(batch_size);
Ok(ParquetRecordBatchReader::new(
batch_size,
array_reader,
None,
))
}
}

#[allow(deprecated)]
impl ParquetFileArrowReader {
/// Create a new [`ParquetFileArrowReader`] with the provided [`ChunkReader`]
///
/// ```no_run
/// # use std::fs::File;
/// # use bytes::Bytes;
/// # use parquet::arrow::ParquetFileArrowReader;
///
/// let file = File::open("file.parquet").unwrap();
/// let reader = ParquetFileArrowReader::try_new(file).unwrap();
///
/// let bytes = Bytes::from(vec![]);
/// let reader = ParquetFileArrowReader::try_new(bytes).unwrap();
/// ```
pub fn try_new<R: ChunkReader + 'static>(chunk_reader: R) -> Result<Self> {
Self::try_new_with_options(chunk_reader, Default::default())
}

/// Create a new [`ParquetFileArrowReader`] with the provided [`ChunkReader`]
/// and [`ArrowReaderOptions`]
pub fn try_new_with_options<R: ChunkReader + 'static>(
chunk_reader: R,
options: ArrowReaderOptions,
) -> Result<Self> {
let file_reader = Arc::new(SerializedFileReader::new(chunk_reader)?);
Ok(Self::new_with_options(file_reader, options))
}

/// Create a new [`ParquetFileArrowReader`] with the provided [`Arc<dyn FileReader>`]
pub fn new(file_reader: Arc<dyn FileReader>) -> Self {
Self::new_with_options(file_reader, Default::default())
}

/// Create a new [`ParquetFileArrowReader`] with the provided [`Arc<dyn FileReader>`]
/// and [`ArrowReaderOptions`]
pub fn new_with_options(
file_reader: Arc<dyn FileReader>,
options: ArrowReaderOptions,
) -> Self {
Self {
file_reader,
options,
}
}

/// Expose the reader metadata
#[deprecated = "use metadata() instead"]
pub fn get_metadata(&mut self) -> ParquetMetaData {
self.file_reader.metadata().clone()
}

/// Returns the parquet metadata
pub fn metadata(&self) -> &ParquetMetaData {
self.file_reader.metadata()
}

/// Returns the parquet schema
pub fn parquet_schema(&self) -> &SchemaDescriptor {
self.file_reader.metadata().file_metadata().schema_descr()
}

/// Returns the key value metadata, returns `None` if [`ArrowReaderOptions::skip_arrow_metadata`]
fn get_kv_metadata(&self) -> Option<&Vec<KeyValue>> {
if self.options.skip_arrow_metadata {
return None;
}

self.file_reader
.metadata()
.file_metadata()
.key_value_metadata()
}
}

#[doc(hidden)]
/// A newtype used within [`ReaderOptionsBuilder`] to distinguish sync readers from async
pub struct SyncReader<T: ChunkReader>(SerializedFileReader<T>);
Expand Down
6 changes: 3 additions & 3 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ mod levels;
/// # use bytes::Bytes;
/// # use arrow_array::{ArrayRef, Int64Array};
/// # use arrow_array::RecordBatch;
/// # use parquet::arrow::{ArrowReader, ArrowWriter, ParquetFileArrowReader};
/// # use parquet::arrow::arrow_writer::ArrowWriter;
/// # use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
/// let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
/// let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
///
Expand All @@ -65,8 +66,7 @@ mod levels;
/// writer.write(&to_write).unwrap();
/// writer.close().unwrap();
///
/// let mut reader = ParquetFileArrowReader::try_new(Bytes::from(buffer)).unwrap();
/// let mut reader = reader.get_record_reader(1024).unwrap();
/// let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 1024).unwrap();
/// let read = reader.next().unwrap().unwrap();
///
/// assert_eq!(to_write, read);
Expand Down
2 changes: 0 additions & 2 deletions parquet/src/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,6 @@ pub mod async_writer;
mod record_reader;
experimental!(mod schema);

#[allow(deprecated)]
pub use self::arrow_reader::{ArrowReader, ParquetFileArrowReader};
pub use self::arrow_writer::ArrowWriter;
#[cfg(feature = "async")]
pub use self::async_reader::ParquetRecordBatchStreamBuilder;
Expand Down

0 comments on commit 6c3688b

Please sign in to comment.