diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index 8a0d5b97ef65..b16525c7a4f6 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -108,6 +108,7 @@ pub(crate) mod test_util { object_meta: meta, partition_values: vec![], range: None, + extensions: None, }]]; let exec = format diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index dfd08352dffb..7e7a522e337e 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -354,7 +354,11 @@ fn summarize_min_max( } } -pub(crate) async fn fetch_parquet_metadata( +/// Fetches parquet metadata from ObjectStore for given object +/// +/// This component is a subject to **change** in near future and is exposed for low level integrations +/// through [ParquetFileReaderFactory]. +pub async fn fetch_parquet_metadata( store: &dyn ObjectStore, meta: &ObjectMeta, size_hint: Option, diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index 80fece684403..6c018eda3e76 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -183,24 +183,27 @@ pub async fn pruned_partition_list<'a>( // Note: We might avoid parsing the partition values if they are not used in any projection, // but the cost of parsing will likely be far dominated by the time to fetch the listing from // the object store. - Ok(Box::pin(list.try_filter_map(move |file_meta| async move { - let parsed_path = parse_partitions_for_path( - table_path, - &file_meta.location, - table_partition_cols, - ) - .map(|p| { - p.iter() - .map(|&pn| ScalarValue::Utf8(Some(pn.to_owned()))) - .collect() - }); - - Ok(parsed_path.map(|partition_values| PartitionedFile { - partition_values, - object_meta: file_meta, - range: None, - })) - }))) + Ok(Box::pin(list.try_filter_map( + move |object_meta| async move { + let parsed_path = parse_partitions_for_path( + table_path, + &object_meta.location, + table_partition_cols, + ) + .map(|p| { + p.iter() + .map(|&pn| ScalarValue::Utf8(Some(pn.to_owned()))) + .collect() + }); + + Ok(parsed_path.map(|partition_values| PartitionedFile { + partition_values, + object_meta, + range: None, + extensions: None, + })) + }, + ))) } else { // parse the partition values and serde them as a RecordBatch to filter them let metas: Vec<_> = list.try_collect().await?; @@ -317,6 +320,7 @@ fn batches_to_paths(batches: &[RecordBatch]) -> Result> { }) .collect(), range: None, + extensions: None, }) }) }) diff --git a/datafusion/core/src/datasource/listing/mod.rs b/datafusion/core/src/datasource/listing/mod.rs index 85d4b6f7d00d..27cf14122690 100644 --- a/datafusion/core/src/datasource/listing/mod.rs +++ b/datafusion/core/src/datasource/listing/mod.rs @@ -28,6 +28,7 @@ use datafusion_common::ScalarValue; use futures::Stream; use object_store::{path::Path, ObjectMeta}; use std::pin::Pin; +use std::sync::Arc; pub use self::url::ListingTableUrl; pub use table::{ListingOptions, ListingTable, ListingTableConfig}; @@ -58,6 +59,8 @@ pub struct PartitionedFile { pub partition_values: Vec, /// An optional file range for a more fine-grained parallel execution pub range: Option, + /// An optional field for user defined per object metadata + pub extensions: Option>, } impl PartitionedFile { @@ -71,6 +74,7 @@ impl PartitionedFile { }, partition_values: vec![], range: None, + extensions: None, } } @@ -84,6 +88,7 @@ impl PartitionedFile { }, partition_values: vec![], range: Some(FileRange { start, end }), + extensions: None, } } } @@ -94,6 +99,7 @@ impl From for PartitionedFile { object_meta, partition_values: vec![], range: None, + extensions: None, } } } diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index dcc508e65434..0dd665628399 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. #![warn(missing_docs, clippy::needless_borrow)] +// TODO: Temporary workaround for https://github.com/apache/arrow-rs/issues/2372 (#3081) +#![allow(where_clauses_object_safety)] //! [DataFusion](https://github.com/apache/arrow-datafusion) //! is an extensible query execution framework that uses diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs index a795a00f9d3e..0b7841d885d2 100644 --- a/datafusion/core/src/physical_plan/file_format/avro.rs +++ b/datafusion/core/src/physical_plan/file_format/avro.rs @@ -151,11 +151,11 @@ impl ExecutionPlan for AvroExec { #[cfg(feature = "avro")] mod private { use super::*; - use crate::datasource::listing::FileRange; use crate::physical_plan::file_format::file_stream::{FileOpenFuture, FileOpener}; + use crate::physical_plan::file_format::FileMeta; use bytes::Buf; use futures::StreamExt; - use object_store::{GetResult, ObjectMeta, ObjectStore}; + use object_store::{GetResult, ObjectStore}; pub struct AvroConfig { pub schema: SchemaRef, @@ -185,12 +185,11 @@ mod private { fn open( &self, store: Arc, - file: ObjectMeta, - _range: Option, - ) -> FileOpenFuture { + file_meta: FileMeta, + ) -> Result { let config = self.config.clone(); - Box::pin(async move { - match store.get(&file.location).await? { + Ok(Box::pin(async move { + match store.get(file_meta.location()).await? { GetResult::File(file, _) => { let reader = config.open(file)?; Ok(futures::stream::iter(reader).boxed()) @@ -201,7 +200,7 @@ mod private { Ok(futures::stream::iter(reader).boxed()) } } - }) + })) } } } diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs index 46e18e27f15e..885bea870bd8 100644 --- a/datafusion/core/src/physical_plan/file_format/csv.rs +++ b/datafusion/core/src/physical_plan/file_format/csv.rs @@ -20,21 +20,20 @@ use crate::error::{DataFusionError, Result}; use crate::execution::context::{SessionState, TaskContext}; use crate::physical_plan::expressions::PhysicalSortExpr; -use crate::physical_plan::{ - DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, -}; - -use crate::datasource::listing::FileRange; use crate::physical_plan::file_format::delimited_stream::newline_delimited_stream; use crate::physical_plan::file_format::file_stream::{ FileOpenFuture, FileOpener, FileStream, }; +use crate::physical_plan::file_format::FileMeta; use crate::physical_plan::metrics::ExecutionPlanMetricsSet; +use crate::physical_plan::{ + DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, +}; use arrow::csv; use arrow::datatypes::SchemaRef; use bytes::Buf; use futures::{StreamExt, TryStreamExt}; -use object_store::{GetResult, ObjectMeta, ObjectStore}; +use object_store::{GetResult, ObjectStore}; use std::any::Any; use std::fs; use std::path::Path; @@ -201,12 +200,11 @@ impl FileOpener for CsvOpener { fn open( &self, store: Arc, - file: ObjectMeta, - _range: Option, - ) -> FileOpenFuture { + file_meta: FileMeta, + ) -> Result { let config = self.config.clone(); - Box::pin(async move { - match store.get(&file.location).await? { + Ok(Box::pin(async move { + match store.get(file_meta.location()).await? { GetResult::File(file, _) => { Ok(futures::stream::iter(config.open(file, true)).boxed()) } @@ -222,7 +220,7 @@ impl FileOpener for CsvOpener { .boxed()) } } - }) + })) } } diff --git a/datafusion/core/src/physical_plan/file_format/file_stream.rs b/datafusion/core/src/physical_plan/file_format/file_stream.rs index f095fe28d114..1a54d685228c 100644 --- a/datafusion/core/src/physical_plan/file_format/file_stream.rs +++ b/datafusion/core/src/physical_plan/file_format/file_stream.rs @@ -32,14 +32,16 @@ use arrow::{error::Result as ArrowResult, record_batch::RecordBatch}; use futures::future::BoxFuture; use futures::stream::BoxStream; use futures::{ready, FutureExt, Stream, StreamExt}; -use object_store::{ObjectMeta, ObjectStore}; +use object_store::ObjectStore; use datafusion_common::ScalarValue; -use crate::datasource::listing::{FileRange, PartitionedFile}; +use crate::datasource::listing::PartitionedFile; use crate::error::Result; use crate::execution::context::TaskContext; -use crate::physical_plan::file_format::{FileScanConfig, PartitionColumnProjector}; +use crate::physical_plan::file_format::{ + FileMeta, FileScanConfig, PartitionColumnProjector, +}; use crate::physical_plan::metrics::{ BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, Time, }; @@ -53,9 +55,8 @@ pub trait FileOpener: Unpin { fn open( &self, store: Arc, - file: ObjectMeta, - range: Option, - ) -> FileOpenFuture; + file_meta: FileMeta, + ) -> Result; } /// A stream that iterates record batch by record batch, file over file. @@ -205,21 +206,30 @@ impl FileStream { loop { match &mut self.state { FileStreamState::Idle => { - let file = match self.file_iter.pop_front() { + let part_file = match self.file_iter.pop_front() { Some(file) => file, None => return Poll::Ready(None), }; + let file_meta = FileMeta { + object_meta: part_file.object_meta, + range: part_file.range, + extensions: part_file.extensions, + }; + self.file_stream_metrics.time_opening.start(); - let future = self.file_reader.open( - self.object_store.clone(), - file.object_meta, - file.range, - ); - - self.state = FileStreamState::Open { - future, - partition_values: file.partition_values, + + match self.file_reader.open(self.object_store.clone(), file_meta) { + Ok(future) => { + self.state = FileStreamState::Open { + future, + partition_values: part_file.partition_values, + } + } + Err(e) => { + self.state = FileStreamState::Error; + return Poll::Ready(Some(Err(e.into()))); + } } } FileStreamState::Open { @@ -322,12 +332,11 @@ mod tests { fn open( &self, _store: Arc, - _file: ObjectMeta, - _range: Option, - ) -> FileOpenFuture { + _file_meta: FileMeta, + ) -> Result { let iterator = self.records.clone().into_iter().map(Ok); let stream = futures::stream::iter(iterator).boxed(); - futures::future::ready(Ok(stream)).boxed() + Ok(futures::future::ready(Ok(stream)).boxed()) } } diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs index 488de2208dff..10f148ad060f 100644 --- a/datafusion/core/src/physical_plan/file_format/json.rs +++ b/datafusion/core/src/physical_plan/file_format/json.rs @@ -16,9 +16,6 @@ // under the License. //! Execution plan for reading line-delimited JSON files -use arrow::json::reader::DecoderOptions; - -use crate::datasource::listing::FileRange; use crate::error::{DataFusionError, Result}; use crate::execution::context::SessionState; use crate::execution::context::TaskContext; @@ -27,14 +24,16 @@ use crate::physical_plan::file_format::delimited_stream::newline_delimited_strea use crate::physical_plan::file_format::file_stream::{ FileOpenFuture, FileOpener, FileStream, }; +use crate::physical_plan::file_format::FileMeta; use crate::physical_plan::metrics::ExecutionPlanMetricsSet; use crate::physical_plan::{ DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, }; +use arrow::json::reader::DecoderOptions; use arrow::{datatypes::SchemaRef, json}; use bytes::Buf; use futures::{StreamExt, TryStreamExt}; -use object_store::{GetResult, ObjectMeta, ObjectStore}; +use object_store::{GetResult, ObjectStore}; use std::any::Any; use std::fs; use std::path::Path; @@ -163,13 +162,12 @@ impl FileOpener for JsonOpener { fn open( &self, store: Arc, - file: ObjectMeta, - _range: Option, - ) -> FileOpenFuture { + file_meta: FileMeta, + ) -> Result { let options = self.options.clone(); let schema = self.file_schema.clone(); - Box::pin(async move { - match store.get(&file.location).await? { + Ok(Box::pin(async move { + match store.get(file_meta.location()).await? { GetResult::File(file, _) => { let reader = json::Reader::new(file, schema.clone(), options); Ok(futures::stream::iter(reader).boxed()) @@ -188,7 +186,7 @@ impl FileOpener for JsonOpener { .boxed()) } } - }) + })) } } diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs index d7d70cd4b824..31fa1f2d8bcf 100644 --- a/datafusion/core/src/physical_plan/file_format/mod.rs +++ b/datafusion/core/src/physical_plan/file_format/mod.rs @@ -29,7 +29,7 @@ mod parquet; pub(crate) use self::csv::plan_to_csv; pub use self::csv::CsvExec; pub(crate) use self::parquet::plan_to_parquet; -pub use self::parquet::ParquetExec; +pub use self::parquet::{ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory}; use arrow::{ array::{ArrayData, ArrayRef, DictionaryArray}, buffer::Buffer, @@ -41,6 +41,7 @@ pub use avro::AvroExec; pub(crate) use json::plan_to_json; pub use json::NdJsonExec; +use crate::datasource::listing::FileRange; use crate::datasource::{listing::PartitionedFile, object_store::ObjectStoreUrl}; use crate::{ error::{DataFusionError, Result}, @@ -50,6 +51,8 @@ use arrow::array::{new_null_array, UInt16BufferBuilder}; use arrow::record_batch::RecordBatchOptions; use lazy_static::lazy_static; use log::info; +use object_store::path::Path; +use object_store::ObjectMeta; use std::{ collections::HashMap, fmt::{Display, Formatter, Result as FmtResult}, @@ -401,6 +404,33 @@ fn create_dict_array( )) } +/// A single file or part of a file that should be read, along with its schema, statistics +pub struct FileMeta { + /// Path for the file (e.g. URL, filesystem path, etc) + pub object_meta: ObjectMeta, + /// An optional file range for a more fine-grained parallel execution + pub range: Option, + /// An optional field for user defined per object metadata + pub extensions: Option>, +} + +impl FileMeta { + /// The full path to the object + pub fn location(&self) -> &Path { + &self.object_meta.location + } +} + +impl From for FileMeta { + fn from(object_meta: ObjectMeta) -> Self { + Self { + object_meta, + range: None, + extensions: None, + } + } +} + #[cfg(test)] mod tests { use crate::{ diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index f404e4de8578..8063ae4d5fab 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -24,6 +24,25 @@ use std::ops::Range; use std::sync::Arc; use std::{any::Any, convert::TryInto}; +use crate::datasource::file_format::parquet::fetch_parquet_metadata; +use crate::datasource::listing::FileRange; +use crate::physical_plan::file_format::file_stream::{ + FileOpenFuture, FileOpener, FileStream, +}; +use crate::physical_plan::file_format::FileMeta; +use crate::{ + error::{DataFusionError, Result}, + execution::context::{SessionState, TaskContext}, + physical_optimizer::pruning::{PruningPredicate, PruningStatistics}, + physical_plan::{ + expressions::PhysicalSortExpr, + file_format::{FileScanConfig, SchemaAdapter}, + metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}, + DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, + Statistics, + }, + scalar::ScalarValue, +}; use arrow::datatypes::DataType; use arrow::{ array::ArrayRef, @@ -31,6 +50,8 @@ use arrow::{ error::ArrowError, }; use bytes::Bytes; +use datafusion_common::Column; +use datafusion_expr::Expr; use futures::future::BoxFuture; use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt}; use log::debug; @@ -46,28 +67,6 @@ use parquet::file::{ }; use parquet::schema::types::ColumnDescriptor; -use datafusion_common::Column; -use datafusion_expr::Expr; - -use crate::datasource::file_format::parquet::fetch_parquet_metadata; -use crate::datasource::listing::FileRange; -use crate::physical_plan::file_format::file_stream::{ - FileOpenFuture, FileOpener, FileStream, -}; -use crate::{ - error::{DataFusionError, Result}, - execution::context::{SessionState, TaskContext}, - physical_optimizer::pruning::{PruningPredicate, PruningStatistics}, - physical_plan::{ - expressions::PhysicalSortExpr, - file_format::{FileScanConfig, SchemaAdapter}, - metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}, - DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, - Statistics, - }, - scalar::ScalarValue, -}; - /// Execution plan for scanning one or more Parquet partitions #[derive(Debug, Clone)] pub struct ParquetExec { @@ -80,17 +79,8 @@ pub struct ParquetExec { pruning_predicate: Option, /// Optional hint for the size of the parquet metadata metadata_size_hint: Option, -} - -/// Stores metrics about the parquet execution for a particular parquet file -#[derive(Debug, Clone)] -struct ParquetFileMetrics { - /// Number of times the predicate could not be evaluated - pub predicate_evaluation_errors: metrics::Count, - /// Number of row groups pruned using - pub row_groups_pruned: metrics::Count, - /// Total number of bytes scanned - pub bytes_scanned: metrics::Count, + /// Optional user defined parquet file reader factory + parquet_file_reader_factory: Option>, } impl ParquetExec { @@ -130,6 +120,7 @@ impl ParquetExec { metrics, pruning_predicate, metadata_size_hint, + parquet_file_reader_factory: None, } } @@ -142,6 +133,35 @@ impl ParquetExec { pub fn pruning_predicate(&self) -> Option<&PruningPredicate> { self.pruning_predicate.as_ref() } + + /// Optional user defined parquet file reader factory. + /// + /// `ParquetFileReaderFactory` complements `TableProvider`, It enables users to provide custom + /// implementation for data access operations. + /// + /// If custom `ParquetFileReaderFactory` is provided, then data access operations will be routed + /// to this factory instead of `ObjectStore`. + pub fn with_parquet_file_reader_factory( + mut self, + parquet_file_reader_factory: Arc, + ) -> Self { + self.parquet_file_reader_factory = Some(parquet_file_reader_factory); + self + } +} + +/// Stores metrics about the parquet execution for a particular parquet file. +/// +/// This component is a subject to **change** in near future and is exposed for low level integrations +/// through [ParquetFileReaderFactory]. +#[derive(Debug, Clone)] +pub struct ParquetFileMetrics { + /// Number of times the predicate could not be evaluated + pub predicate_evaluation_errors: metrics::Count, + /// Number of row groups pruned using + pub row_groups_pruned: metrics::Count, + /// Total number of bytes scanned + pub bytes_scanned: metrics::Count, } impl ParquetFileMetrics { @@ -209,27 +229,41 @@ impl ExecutionPlan for ParquetExec { fn execute( &self, partition_index: usize, - context: Arc, + ctx: Arc, ) -> Result { let projection = match self.base_config.file_column_projection_indices() { Some(proj) => proj, None => (0..self.base_config.file_schema.fields().len()).collect(), }; + let parquet_file_reader_factory = self + .parquet_file_reader_factory + .as_ref() + .map(|f| Ok(Arc::clone(f))) + .unwrap_or_else(|| { + ctx.runtime_env() + .object_store(&self.base_config.object_store_url) + .map(|store| { + Arc::new(DefaultParquetFileReaderFactory::new(store)) + as Arc + }) + })?; + let opener = ParquetOpener { partition_index, projection: Arc::from(projection), - batch_size: context.session_config().batch_size(), + batch_size: ctx.session_config().batch_size(), pruning_predicate: self.pruning_predicate.clone(), table_schema: self.base_config.file_schema.clone(), metadata_size_hint: self.metadata_size_hint, metrics: self.metrics.clone(), + parquet_file_reader_factory, }; let stream = FileStream::new( &self.base_config, partition_index, - context, + ctx, opener, self.metrics.clone(), )?; @@ -284,34 +318,37 @@ struct ParquetOpener { table_schema: SchemaRef, metadata_size_hint: Option, metrics: ExecutionPlanMetricsSet, + parquet_file_reader_factory: Arc, } impl FileOpener for ParquetOpener { fn open( &self, - store: Arc, - meta: ObjectMeta, - range: Option, - ) -> FileOpenFuture { + _: Arc, + file_meta: FileMeta, + ) -> Result { + let file_range = file_meta.range.clone(); + let metrics = ParquetFileMetrics::new( self.partition_index, - meta.location.as_ref(), + file_meta.location().as_ref(), &self.metrics, ); - let reader = ParquetFileReader { - store, - meta, - metadata_size_hint: self.metadata_size_hint, - metrics: metrics.clone(), - }; + let reader = + BoxedAsyncFileReader(self.parquet_file_reader_factory.create_reader( + self.partition_index, + file_meta, + self.metadata_size_hint, + &self.metrics, + )?); let schema_adapter = SchemaAdapter::new(self.table_schema.clone()); let batch_size = self.batch_size; let projection = self.projection.clone(); let pruning_predicate = self.pruning_predicate.clone(); - Box::pin(async move { + Ok(Box::pin(async move { let builder = ParquetRecordBatchStreamBuilder::new(reader).await?; let adapted_projections = schema_adapter.map_projections(builder.schema(), &projection)?; @@ -322,7 +359,8 @@ impl FileOpener for ParquetOpener { ); let groups = builder.metadata().row_groups(); - let row_groups = prune_row_groups(groups, range, pruning_predicate, &metrics); + let row_groups = + prune_row_groups(groups, file_range, pruning_predicate, &metrics); let stream = builder .with_projection(mask) @@ -341,7 +379,32 @@ impl FileOpener for ParquetOpener { }); Ok(adapted.boxed()) - }) + })) + } +} + +/// Factory of parquet file readers. +/// +/// Provides means to implement custom data access interface. +pub trait ParquetFileReaderFactory: Debug + Send + Sync + 'static { + /// Provides `AsyncFileReader` over parquet file specified in `FileMeta` + fn create_reader( + &self, + partition_index: usize, + file_meta: FileMeta, + metadata_size_hint: Option, + metrics: &ExecutionPlanMetricsSet, + ) -> Result>; +} + +#[derive(Debug)] +pub struct DefaultParquetFileReaderFactory { + store: Arc, +} + +impl DefaultParquetFileReaderFactory { + pub fn new(store: Arc) -> Self { + Self { store } } } @@ -349,8 +412,8 @@ impl FileOpener for ParquetOpener { struct ParquetFileReader { store: Arc, meta: ObjectMeta, - metadata_size_hint: Option, metrics: ParquetFileMetrics, + metadata_size_hint: Option, } impl AsyncFileReader for ParquetFileReader { @@ -389,6 +452,63 @@ impl AsyncFileReader for ParquetFileReader { } } +impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory { + fn create_reader( + &self, + partition_index: usize, + file_meta: FileMeta, + metadata_size_hint: Option, + metrics: &ExecutionPlanMetricsSet, + ) -> Result> { + let parquet_file_metrics = ParquetFileMetrics::new( + partition_index, + file_meta.location().as_ref(), + metrics, + ); + + Ok(Box::new(ParquetFileReader { + meta: file_meta.object_meta, + store: Arc::clone(&self.store), + metadata_size_hint, + metrics: parquet_file_metrics, + })) + } +} + +/// +/// BoxedAsyncFileReader has been created to satisfy type requirements of +/// parquet stream builder constructor. +/// +/// Temporary pending https://github.com/apache/arrow-rs/pull/2368 +struct BoxedAsyncFileReader(Box); + +impl AsyncFileReader for BoxedAsyncFileReader { + fn get_bytes( + &mut self, + range: Range, + ) -> BoxFuture<'_, ::parquet::errors::Result> { + self.0.get_bytes(range) + } + + fn get_byte_ranges( + &mut self, + ranges: Vec>, + ) -> BoxFuture<'_, parquet::errors::Result>> + // TODO: This where bound forces us to enable #![allow(where_clauses_object_safety)] (#3081) + // Upstream issue https://github.com/apache/arrow-rs/issues/2372 + where + Self: Send, + { + self.0.get_byte_ranges(ranges) + } + + fn get_metadata( + &mut self, + ) -> BoxFuture<'_, ::parquet::errors::Result>> { + self.0.get_metadata() + } +} + /// Wraps parquet statistics in a way /// that implements [`PruningStatistics`] struct RowGroupPruningStatistics<'a> { @@ -652,12 +772,6 @@ pub async fn plan_to_parquet( #[cfg(test)] mod tests { - use crate::{ - assert_batches_sorted_eq, assert_contains, - datasource::file_format::{parquet::ParquetFormat, FileFormat}, - physical_plan::collect, - }; - use super::*; use crate::datasource::file_format::parquet::test_util::store_parquet; use crate::datasource::file_format::test_util::scan_format; @@ -666,6 +780,11 @@ mod tests { use crate::execution::options::CsvReadOptions; use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext}; use crate::test::object_store::local_unpartitioned_file; + use crate::{ + assert_batches_sorted_eq, assert_contains, + datasource::file_format::{parquet::ParquetFormat, FileFormat}, + physical_plan::collect, + }; use arrow::array::Float32Array; use arrow::record_batch::RecordBatch; use arrow::{ @@ -1086,6 +1205,7 @@ mod tests { object_meta: meta.clone(), partition_values: vec![], range: Some(FileRange { start, end }), + extensions: None, } } @@ -1188,6 +1308,7 @@ mod tests { ScalarValue::Utf8(Some("26".to_owned())), ], range: None, + extensions: None, }; let parquet_exec = ParquetExec::new( @@ -1250,6 +1371,7 @@ mod tests { }, partition_values: vec![], range: None, + extensions: None, }; let parquet_exec = ParquetExec::new( diff --git a/datafusion/core/tests/custom_parquet_reader.rs b/datafusion/core/tests/custom_parquet_reader.rs new file mode 100644 index 000000000000..ac8c98381447 --- /dev/null +++ b/datafusion/core/tests/custom_parquet_reader.rs @@ -0,0 +1,261 @@ +// TODO: Temporary workaround for https://github.com/apache/arrow-rs/issues/2372 (#3081) +#![allow(where_clauses_object_safety)] +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#[cfg(test)] +mod tests { + use arrow::array::{ArrayRef, Int64Array, Int8Array, StringArray}; + use arrow::datatypes::{Field, Schema}; + use arrow::record_batch::RecordBatch; + use bytes::Bytes; + use datafusion::assert_batches_sorted_eq; + use datafusion::datasource::file_format::parquet::fetch_parquet_metadata; + use datafusion::datasource::listing::PartitionedFile; + use datafusion::datasource::object_store::ObjectStoreUrl; + use datafusion::physical_plan::file_format::{ + FileMeta, FileScanConfig, ParquetExec, ParquetFileMetrics, + ParquetFileReaderFactory, + }; + use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; + use datafusion::physical_plan::{collect, Statistics}; + use datafusion::prelude::SessionContext; + use datafusion_common::DataFusionError; + use futures::future::BoxFuture; + use futures::{FutureExt, TryFutureExt}; + use object_store::memory::InMemory; + use object_store::path::Path; + use object_store::{ObjectMeta, ObjectStore}; + use parquet::arrow::async_reader::AsyncFileReader; + use parquet::arrow::ArrowWriter; + use parquet::errors::ParquetError; + use parquet::file::metadata::ParquetMetaData; + use std::io::Cursor; + use std::ops::Range; + use std::sync::Arc; + use std::time::SystemTime; + + const EXPECTED_USER_DEFINED_METADATA: &str = "some-user-defined-metadata"; + + #[tokio::test] + async fn route_data_access_ops_to_parquet_file_reader_factory() { + let c1: ArrayRef = + Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); + let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); + let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None])); + + let batch = create_batch(vec![ + ("c1", c1.clone()), + ("c2", c2.clone()), + ("c3", c3.clone()), + ]); + + let file_schema = batch.schema().clone(); + let (in_memory_object_store, parquet_files_meta) = + store_parquet_in_memory(vec![batch]).await; + let file_groups = parquet_files_meta + .into_iter() + .map(|meta| PartitionedFile { + object_meta: meta, + partition_values: vec![], + range: None, + extensions: Some(Arc::new(String::from(EXPECTED_USER_DEFINED_METADATA))), + }) + .collect(); + + // prepare the scan + let parquet_exec = ParquetExec::new( + FileScanConfig { + // just any url that doesn't point to in memory object store + object_store_url: ObjectStoreUrl::local_filesystem(), + file_groups: vec![file_groups], + file_schema, + statistics: Statistics::default(), + projection: None, + limit: None, + table_partition_cols: vec![], + }, + None, + None, + ) + .with_parquet_file_reader_factory(Arc::new( + InMemoryParquetFileReaderFactory(Arc::clone(&in_memory_object_store)), + )); + + let session_ctx = SessionContext::new(); + + let task_ctx = session_ctx.task_ctx(); + let read = collect(Arc::new(parquet_exec), task_ctx).await.unwrap(); + + let expected = vec![ + "+-----+----+----+", + "| c1 | c2 | c3 |", + "+-----+----+----+", + "| Foo | 1 | 10 |", + "| | 2 | 20 |", + "| bar | | |", + "+-----+----+----+", + ]; + + assert_batches_sorted_eq!(expected, &read); + } + + #[derive(Debug)] + struct InMemoryParquetFileReaderFactory(Arc); + + impl ParquetFileReaderFactory for InMemoryParquetFileReaderFactory { + fn create_reader( + &self, + partition_index: usize, + file_meta: FileMeta, + metadata_size_hint: Option, + metrics: &ExecutionPlanMetricsSet, + ) -> Result, DataFusionError> { + let metadata = file_meta + .extensions + .as_ref() + .expect("has user defined metadata"); + let metadata = metadata + .downcast_ref::() + .expect("has string metadata"); + + assert_eq!(EXPECTED_USER_DEFINED_METADATA, &metadata[..]); + + let parquet_file_metrics = ParquetFileMetrics::new( + partition_index, + file_meta.location().as_ref(), + metrics, + ); + + Ok(Box::new(ParquetFileReader { + store: Arc::clone(&self.0), + meta: file_meta.object_meta, + metrics: parquet_file_metrics, + metadata_size_hint, + })) + } + } + + fn create_batch(columns: Vec<(&str, ArrayRef)>) -> RecordBatch { + columns.into_iter().fold( + RecordBatch::new_empty(Arc::new(Schema::new(vec![]))), + |batch, (field_name, arr)| add_to_batch(&batch, field_name, arr.clone()), + ) + } + + fn add_to_batch( + batch: &RecordBatch, + field_name: &str, + array: ArrayRef, + ) -> RecordBatch { + let mut fields = batch.schema().fields().clone(); + fields.push(Field::new(field_name, array.data_type().clone(), true)); + let schema = Arc::new(Schema::new(fields)); + + let mut columns = batch.columns().to_vec(); + columns.push(array); + RecordBatch::try_new(schema, columns).expect("error; creating record batch") + } + + async fn store_parquet_in_memory( + batches: Vec, + ) -> (Arc, Vec) { + let in_memory = InMemory::new(); + + let parquet_batches: Vec<(ObjectMeta, Bytes)> = batches + .into_iter() + .enumerate() + .map(|(offset, batch)| { + let mut buf = Vec::::with_capacity(32 * 1024); + let mut output = Cursor::new(&mut buf); + + let mut writer = ArrowWriter::try_new(&mut output, batch.schema(), None) + .expect("creating writer"); + + writer.write(&batch).expect("Writing batch"); + writer.close().unwrap(); + + let meta = ObjectMeta { + location: Path::parse(format!("file-{offset}.parquet")) + .expect("creating path"), + last_modified: chrono::DateTime::from(SystemTime::now()), + size: buf.len(), + }; + + (meta, Bytes::from(buf)) + }) + .collect(); + + let mut objects = Vec::with_capacity(parquet_batches.len()); + for (meta, bytes) in parquet_batches { + in_memory + .put(&meta.location, bytes) + .await + .expect("put parquet file into in memory object store"); + objects.push(meta); + } + + (Arc::new(in_memory), objects) + } + + /// Implements [`AsyncFileReader`] for a parquet file in object storage + struct ParquetFileReader { + store: Arc, + meta: ObjectMeta, + metrics: ParquetFileMetrics, + metadata_size_hint: Option, + } + + impl AsyncFileReader for ParquetFileReader { + fn get_bytes( + &mut self, + range: Range, + ) -> BoxFuture<'_, parquet::errors::Result> { + self.metrics.bytes_scanned.add(range.end - range.start); + + self.store + .get_range(&self.meta.location, range) + .map_err(|e| { + ParquetError::General(format!( + "AsyncChunkReader::get_bytes error: {}", + e + )) + }) + .boxed() + } + + fn get_metadata( + &mut self, + ) -> BoxFuture<'_, parquet::errors::Result>> { + Box::pin(async move { + let metadata = fetch_parquet_metadata( + self.store.as_ref(), + &self.meta, + self.metadata_size_hint, + ) + .await + .map_err(|e| { + ParquetError::General(format!( + "AsyncChunkReader::get_metadata error: {}", + e + )) + })?; + Ok(Arc::new(metadata)) + }) + } + } +}