Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove deprecated parquet ArrowReader #4125

Merged
merged 2 commits into from
Apr 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
182 changes: 4 additions & 178 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ use arrow_select::filter::prep_null_mask_filter;
use crate::arrow::array_reader::{
build_array_reader, ArrayReader, FileReaderRowGroupCollection, RowGroupCollection,
};
use crate::arrow::schema::{parquet_to_array_schema_and_fields, parquet_to_arrow_schema};
use crate::arrow::schema::{parquet_to_arrow_schema_by_columns, ParquetField};
use crate::arrow::schema::parquet_to_array_schema_and_fields;
use crate::arrow::schema::ParquetField;
use crate::arrow::ProjectionMask;
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{KeyValue, ParquetMetaData};
use crate::file::reader::{ChunkReader, FileReader, SerializedFileReader};
use crate::file::metadata::ParquetMetaData;
use crate::file::reader::{ChunkReader, SerializedFileReader};
use crate::file::serialized_reader::ReadOptionsBuilder;
use crate::schema::types::SchemaDescriptor;

Expand Down Expand Up @@ -198,43 +198,6 @@ impl<T> ArrowReaderBuilder<T> {
}
}

/// Arrow reader api.
/// With this api, user can get arrow schema from parquet file, and read parquet data
/// into arrow arrays.
#[deprecated(note = "Use ParquetRecordBatchReaderBuilder instead")]
pub trait ArrowReader {
type RecordReader: RecordBatchReader;

/// Read parquet schema and convert it into arrow schema.
fn get_schema(&mut self) -> Result<Schema>;

/// Read parquet schema and convert it into arrow schema.
/// This schema only includes columns identified by `mask`.
fn get_schema_by_columns(&mut self, mask: ProjectionMask) -> Result<Schema>;

/// Returns record batch reader from whole parquet file.
///
/// # Arguments
///
/// `batch_size`: The size of each record batch returned from this reader. Only the
/// last batch may contain records less than this size, otherwise record batches
/// returned from this reader should contains exactly `batch_size` elements.
fn get_record_reader(&mut self, batch_size: usize) -> Result<Self::RecordReader>;

/// Returns record batch reader whose record batch contains columns identified by
/// `mask`.
///
/// # Arguments
///
/// `mask`: The columns that should be included in record batches.
/// `batch_size`: Please refer to `get_record_reader`.
fn get_record_reader_by_columns(
&mut self,
mask: ProjectionMask,
batch_size: usize,
) -> Result<Self::RecordReader>;
}

/// Options that control how metadata is read for a parquet file
///
/// See [`ArrowReaderBuilder`] for how to configure how the column data
Expand Down Expand Up @@ -273,143 +236,6 @@ impl ArrowReaderOptions {
}
}

/// An `ArrowReader` that can be used to synchronously read parquet data as [`RecordBatch`]
///
/// See [`crate::arrow::async_reader`] for an asynchronous interface
#[deprecated(note = "Use ParquetRecordBatchReaderBuilder instead")]
pub struct ParquetFileArrowReader {
file_reader: Arc<dyn FileReader>,

#[allow(deprecated)]
options: ArrowReaderOptions,
}

#[allow(deprecated)]
impl ArrowReader for ParquetFileArrowReader {
type RecordReader = ParquetRecordBatchReader;

fn get_schema(&mut self) -> Result<Schema> {
let file_metadata = self.file_reader.metadata().file_metadata();
parquet_to_arrow_schema(file_metadata.schema_descr(), self.get_kv_metadata())
}

fn get_schema_by_columns(&mut self, mask: ProjectionMask) -> Result<Schema> {
let file_metadata = self.file_reader.metadata().file_metadata();
parquet_to_arrow_schema_by_columns(
file_metadata.schema_descr(),
mask,
self.get_kv_metadata(),
)
}

fn get_record_reader(
&mut self,
batch_size: usize,
) -> Result<ParquetRecordBatchReader> {
self.get_record_reader_by_columns(ProjectionMask::all(), batch_size)
}

fn get_record_reader_by_columns(
&mut self,
mask: ProjectionMask,
batch_size: usize,
) -> Result<ParquetRecordBatchReader> {
let (_, field) = parquet_to_array_schema_and_fields(
self.parquet_schema(),
mask,
self.get_kv_metadata(),
)?;
let array_reader = build_array_reader(
field.as_ref(),
&ProjectionMask::all(),
&self.file_reader,
)?;

// Try to avoid allocate large buffer
let batch_size = self.file_reader.num_rows().min(batch_size);
Ok(ParquetRecordBatchReader::new(
batch_size,
array_reader,
None,
))
}
}

#[allow(deprecated)]
impl ParquetFileArrowReader {
/// Create a new [`ParquetFileArrowReader`] with the provided [`ChunkReader`]
///
/// ```no_run
/// # use std::fs::File;
/// # use bytes::Bytes;
/// # use parquet::arrow::ParquetFileArrowReader;
///
/// let file = File::open("file.parquet").unwrap();
/// let reader = ParquetFileArrowReader::try_new(file).unwrap();
///
/// let bytes = Bytes::from(vec![]);
/// let reader = ParquetFileArrowReader::try_new(bytes).unwrap();
/// ```
pub fn try_new<R: ChunkReader + 'static>(chunk_reader: R) -> Result<Self> {
Self::try_new_with_options(chunk_reader, Default::default())
}

/// Create a new [`ParquetFileArrowReader`] with the provided [`ChunkReader`]
/// and [`ArrowReaderOptions`]
pub fn try_new_with_options<R: ChunkReader + 'static>(
chunk_reader: R,
options: ArrowReaderOptions,
) -> Result<Self> {
let file_reader = Arc::new(SerializedFileReader::new(chunk_reader)?);
Ok(Self::new_with_options(file_reader, options))
}

/// Create a new [`ParquetFileArrowReader`] with the provided [`Arc<dyn FileReader>`]
pub fn new(file_reader: Arc<dyn FileReader>) -> Self {
Self::new_with_options(file_reader, Default::default())
}

/// Create a new [`ParquetFileArrowReader`] with the provided [`Arc<dyn FileReader>`]
/// and [`ArrowReaderOptions`]
pub fn new_with_options(
file_reader: Arc<dyn FileReader>,
options: ArrowReaderOptions,
) -> Self {
Self {
file_reader,
options,
}
}

/// Expose the reader metadata
#[deprecated = "use metadata() instead"]
pub fn get_metadata(&mut self) -> ParquetMetaData {
self.file_reader.metadata().clone()
}

/// Returns the parquet metadata
pub fn metadata(&self) -> &ParquetMetaData {
self.file_reader.metadata()
}

/// Returns the parquet schema
pub fn parquet_schema(&self) -> &SchemaDescriptor {
self.file_reader.metadata().file_metadata().schema_descr()
}

/// Returns the key value metadata, returns `None` if [`ArrowReaderOptions::skip_arrow_metadata`]
fn get_kv_metadata(&self) -> Option<&Vec<KeyValue>> {
if self.options.skip_arrow_metadata {
return None;
}

self.file_reader
.metadata()
.file_metadata()
.key_value_metadata()
}
}

#[doc(hidden)]
/// A newtype used within [`ReaderOptionsBuilder`] to distinguish sync readers from async
pub struct SyncReader<T: ChunkReader>(SerializedFileReader<T>);
Expand Down
6 changes: 3 additions & 3 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ mod levels;
/// # use bytes::Bytes;
/// # use arrow_array::{ArrayRef, Int64Array};
/// # use arrow_array::RecordBatch;
/// # use parquet::arrow::{ArrowReader, ArrowWriter, ParquetFileArrowReader};
/// # use parquet::arrow::arrow_writer::ArrowWriter;
/// # use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
/// let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
/// let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
///
Expand All @@ -65,8 +66,7 @@ mod levels;
/// writer.write(&to_write).unwrap();
/// writer.close().unwrap();
///
/// let mut reader = ParquetFileArrowReader::try_new(Bytes::from(buffer)).unwrap();
/// let mut reader = reader.get_record_reader(1024).unwrap();
/// let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 1024).unwrap();
/// let read = reader.next().unwrap().unwrap();
///
/// assert_eq!(to_write, read);
Expand Down
2 changes: 0 additions & 2 deletions parquet/src/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,6 @@ pub mod async_writer;
mod record_reader;
experimental!(mod schema);

#[allow(deprecated)]
pub use self::arrow_reader::{ArrowReader, ParquetFileArrowReader};
pub use self::arrow_writer::ArrowWriter;
#[cfg(feature = "async")]
pub use self::async_reader::ParquetRecordBatchStreamBuilder;
Expand Down