From db4e632075affe2650de4312064f5872bd9f696b Mon Sep 17 00:00:00 2001 From: Kamil Konior Date: Sat, 6 Aug 2022 11:48:58 +0200 Subject: [PATCH 01/23] add metadata_ext to part file, etc --- .../core/src/datasource/listing/helpers.rs | 40 ++++--- datafusion/core/src/datasource/listing/mod.rs | 12 ++ .../src/physical_plan/file_format/avro.rs | 14 ++- .../core/src/physical_plan/file_format/csv.rs | 14 ++- .../physical_plan/file_format/file_stream.rs | 41 ++++--- .../src/physical_plan/file_format/json.rs | 14 ++- .../core/src/physical_plan/file_format/mod.rs | 28 +++++ .../src/physical_plan/file_format/parquet.rs | 111 +++++++++++++++--- 8 files changed, 203 insertions(+), 71 deletions(-) diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index 80fece684403..a5464ff06d56 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -183,24 +183,27 @@ pub async fn pruned_partition_list<'a>( // Note: We might avoid parsing the partition values if they are not used in any projection, // but the cost of parsing will likely be far dominated by the time to fetch the listing from // the object store. - Ok(Box::pin(list.try_filter_map(move |file_meta| async move { - let parsed_path = parse_partitions_for_path( - table_path, - &file_meta.location, - table_partition_cols, - ) - .map(|p| { - p.iter() - .map(|&pn| ScalarValue::Utf8(Some(pn.to_owned()))) - .collect() - }); - - Ok(parsed_path.map(|partition_values| PartitionedFile { - partition_values, - object_meta: file_meta, - range: None, - })) - }))) + Ok(Box::pin(list.try_filter_map( + move |object_meta| async move { + let parsed_path = parse_partitions_for_path( + table_path, + &object_meta.location, + table_partition_cols, + ) + .map(|p| { + p.iter() + .map(|&pn| ScalarValue::Utf8(Some(pn.to_owned()))) + .collect() + }); + + Ok(parsed_path.map(|partition_values| PartitionedFile { + partition_values, + object_meta, + range: None, + metadata_ext: None, + })) + }, + ))) } else { // parse the partition values and serde them as a RecordBatch to filter them let metas: Vec<_> = list.try_collect().await?; @@ -317,6 +320,7 @@ fn batches_to_paths(batches: &[RecordBatch]) -> Result> { }) .collect(), range: None, + metadata_ext: None, }) }) }) diff --git a/datafusion/core/src/datasource/listing/mod.rs b/datafusion/core/src/datasource/listing/mod.rs index 85d4b6f7d00d..cc91371675d2 100644 --- a/datafusion/core/src/datasource/listing/mod.rs +++ b/datafusion/core/src/datasource/listing/mod.rs @@ -27,6 +27,7 @@ use chrono::TimeZone; use datafusion_common::ScalarValue; use futures::Stream; use object_store::{path::Path, ObjectMeta}; +use std::collections::HashMap; use std::pin::Pin; pub use self::url::ListingTableUrl; @@ -58,6 +59,8 @@ pub struct PartitionedFile { pub partition_values: Vec, /// An optional file range for a more fine-grained parallel execution pub range: Option, + /// An optional field for user defined per object metadata + pub metadata_ext: Option, } impl PartitionedFile { @@ -71,6 +74,7 @@ impl PartitionedFile { }, partition_values: vec![], range: None, + metadata_ext: None, } } @@ -84,6 +88,7 @@ impl PartitionedFile { }, partition_values: vec![], range: Some(FileRange { start, end }), + metadata_ext: None, } } } @@ -94,6 +99,13 @@ impl From for PartitionedFile { object_meta, partition_values: vec![], range: None, + metadata_ext: None, } } } + +#[derive(Debug, Clone)] +pub enum FileMetaExt { + Map(HashMap), + Array(Box<[u8]>), +} diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs index a1cef4a1ea50..e27480e0b98c 100644 --- a/datafusion/core/src/physical_plan/file_format/avro.rs +++ b/datafusion/core/src/physical_plan/file_format/avro.rs @@ -155,6 +155,7 @@ mod private { use super::*; use crate::datasource::listing::FileRange; use crate::physical_plan::file_format::file_stream::{FileOpenFuture, FileOpener}; + use crate::physical_plan::file_format::FileMeta; use bytes::Buf; use futures::StreamExt; use object_store::{GetResult, ObjectMeta, ObjectStore}; @@ -187,12 +188,11 @@ mod private { fn open( &self, store: Arc, - file: ObjectMeta, - _range: Option, - ) -> FileOpenFuture { + file_meta: FileMeta, + ) -> Result { let config = self.config.clone(); - Box::pin(async move { - match store.get(&file.location).await? { + let file_open_future = Box::pin(async move { + match store.get(file_meta.location()).await? { GetResult::File(file, _) => { let reader = config.open(file)?; Ok(futures::stream::iter(reader).boxed()) @@ -203,7 +203,9 @@ mod private { Ok(futures::stream::iter(reader).boxed()) } } - }) + }); + + Ok(file_open_future) } } } diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs index 276dd0ed666b..4ed8efa59f2e 100644 --- a/datafusion/core/src/physical_plan/file_format/csv.rs +++ b/datafusion/core/src/physical_plan/file_format/csv.rs @@ -29,6 +29,7 @@ use crate::physical_plan::file_format::delimited_stream::newline_delimited_strea use crate::physical_plan::file_format::file_stream::{ FileOpenFuture, FileOpener, FileStream, }; +use crate::physical_plan::file_format::FileMeta; use crate::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet}; use arrow::csv; use arrow::datatypes::SchemaRef; @@ -201,12 +202,11 @@ impl FileOpener for CsvOpener { fn open( &self, store: Arc, - file: ObjectMeta, - _range: Option, - ) -> FileOpenFuture { + file_meta: FileMeta, + ) -> Result { let config = self.config.clone(); - Box::pin(async move { - match store.get(&file.location).await? { + let file_open_future = Box::pin(async move { + match store.get(file_meta.location()).await? { GetResult::File(file, _) => { Ok(futures::stream::iter(config.open(file, true)).boxed()) } @@ -222,7 +222,9 @@ impl FileOpener for CsvOpener { .boxed()) } } - }) + }); + + Ok(file_open_future) } } diff --git a/datafusion/core/src/physical_plan/file_format/file_stream.rs b/datafusion/core/src/physical_plan/file_format/file_stream.rs index ae4d549b0821..d32e8ce6eaa5 100644 --- a/datafusion/core/src/physical_plan/file_format/file_stream.rs +++ b/datafusion/core/src/physical_plan/file_format/file_stream.rs @@ -31,14 +31,17 @@ use arrow::{error::Result as ArrowResult, record_batch::RecordBatch}; use futures::future::BoxFuture; use futures::stream::BoxStream; use futures::{ready, FutureExt, Stream, StreamExt}; +use hashbrown::HashMap; use object_store::{ObjectMeta, ObjectStore}; -use datafusion_common::ScalarValue; +use datafusion_common::{DataFusionError, ScalarValue}; -use crate::datasource::listing::{FileRange, PartitionedFile}; +use crate::datasource::listing::{FileMetaExt, FileRange, PartitionedFile}; use crate::error::Result; use crate::execution::context::TaskContext; -use crate::physical_plan::file_format::{FileScanConfig, PartitionColumnProjector}; +use crate::physical_plan::file_format::{ + FileMeta, FileScanConfig, PartitionColumnProjector, +}; use crate::physical_plan::metrics::BaselineMetrics; use crate::physical_plan::RecordBatchStream; @@ -50,9 +53,8 @@ pub trait FileOpener: Unpin { fn open( &self, store: Arc, - file: ObjectMeta, - range: Option, - ) -> FileOpenFuture; + file_meta: FileMeta, + ) -> Result; } /// A stream that iterates record batch by record batch, file over file. @@ -148,15 +150,23 @@ impl FileStream { None => return Poll::Ready(None), }; - let future = self.file_reader.open( - self.object_store.clone(), - file.object_meta, - file.range, - ); + let file_meta = FileMeta { + object_meta: file.object_meta, + range: file.range, + metadata_ext: file.metadata_ext, + }; - self.state = FileStreamState::Open { - future, - partition_values: file.partition_values, + match self.file_reader.open(self.object_store.clone(), file_meta) { + Ok(future) => { + self.state = FileStreamState::Open { + future, + partition_values: file.partition_values, + } + } + Err(e) => { + // TODO : how to handle error + self.state = FileStreamState::Error; + } } } FileStreamState::Open { @@ -254,8 +264,7 @@ mod tests { fn open( &self, _store: Arc, - _file: ObjectMeta, - _range: Option, + _file_meta: FileMeta, ) -> FileOpenFuture { let iterator = self.records.clone().into_iter().map(Ok); let stream = futures::stream::iter(iterator).boxed(); diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs index 6cc864312ded..da3576860bcc 100644 --- a/datafusion/core/src/physical_plan/file_format/json.rs +++ b/datafusion/core/src/physical_plan/file_format/json.rs @@ -27,6 +27,7 @@ use crate::physical_plan::file_format::delimited_stream::newline_delimited_strea use crate::physical_plan::file_format::file_stream::{ FileOpenFuture, FileOpener, FileStream, }; +use crate::physical_plan::file_format::FileMeta; use crate::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet}; use crate::physical_plan::{ DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, @@ -163,13 +164,12 @@ impl FileOpener for JsonOpener { fn open( &self, store: Arc, - file: ObjectMeta, - _range: Option, - ) -> FileOpenFuture { + file_meta: FileMeta, + ) -> Result { let options = self.options.clone(); let schema = self.file_schema.clone(); - Box::pin(async move { - match store.get(&file.location).await? { + let file_open_future = Box::pin(async move { + match store.get(file_meta.location()).await? { GetResult::File(file, _) => { let reader = json::Reader::new(file, schema.clone(), options); Ok(futures::stream::iter(reader).boxed()) @@ -188,7 +188,9 @@ impl FileOpener for JsonOpener { .boxed()) } } - }) + }); + + Ok(file_open_future) } } diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs index d7d70cd4b824..3874289f3ed2 100644 --- a/datafusion/core/src/physical_plan/file_format/mod.rs +++ b/datafusion/core/src/physical_plan/file_format/mod.rs @@ -41,6 +41,7 @@ pub use avro::AvroExec; pub(crate) use json::plan_to_json; pub use json::NdJsonExec; +use crate::datasource::listing::{FileMetaExt, FileRange}; use crate::datasource::{listing::PartitionedFile, object_store::ObjectStoreUrl}; use crate::{ error::{DataFusionError, Result}, @@ -50,6 +51,8 @@ use arrow::array::{new_null_array, UInt16BufferBuilder}; use arrow::record_batch::RecordBatchOptions; use lazy_static::lazy_static; use log::info; +use object_store::path::Path; +use object_store::ObjectMeta; use std::{ collections::HashMap, fmt::{Display, Formatter, Result as FmtResult}, @@ -401,6 +404,31 @@ fn create_dict_array( )) } +pub struct FileMeta { + /// Path for the file (e.g. URL, filesystem path, etc) + pub object_meta: ObjectMeta, + /// An optional file range for a more fine-grained parallel execution + pub range: Option, + /// An optional field for user defined per object metadata + pub metadata_ext: Option, +} + +impl FileMeta { + pub fn location(&self) -> &Path { + &self.object_meta.location + } +} + +impl From for FileMeta { + fn from(object_meta: ObjectMeta) -> Self { + Self { + object_meta, + range: None, + metadata_ext: None, + } + } +} + #[cfg(test)] mod tests { use crate::{ diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 184214d6875d..cfd3abc770f7 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -19,6 +19,7 @@ use fmt::Debug; use std::fmt; +use std::fmt::{Display, Formatter}; use std::fs; use std::ops::Range; use std::sync::Arc; @@ -50,10 +51,11 @@ use datafusion_common::Column; use datafusion_expr::Expr; use crate::datasource::file_format::parquet::fetch_parquet_metadata; -use crate::datasource::listing::FileRange; +use crate::datasource::listing::{FileRange, PartitionedFile}; use crate::physical_plan::file_format::file_stream::{ FileOpenFuture, FileOpener, FileStream, }; +use crate::physical_plan::file_format::FileMeta; use crate::physical_plan::metrics::BaselineMetrics; use crate::{ error::{DataFusionError, Result}, @@ -81,11 +83,13 @@ pub struct ParquetExec { pruning_predicate: Option, /// Optional hint for the size of the parquet metadata metadata_size_hint: Option, + /// Optional user defined parquet file reader factory + parquet_file_reader_factory: Option>, } /// Stores metrics about the parquet execution for a particular parquet file #[derive(Debug, Clone)] -struct ParquetFileMetrics { +pub struct ParquetFileMetrics { /// Number of times the predicate could not be evaluated pub predicate_evaluation_errors: metrics::Count, /// Number of row groups pruned using @@ -131,6 +135,7 @@ impl ParquetExec { metrics, pruning_predicate, metadata_size_hint, + parquet_file_reader_factory: None, } } @@ -210,27 +215,39 @@ impl ExecutionPlan for ParquetExec { fn execute( &self, partition_index: usize, - context: Arc, + ctx: Arc, ) -> Result { let projection = match self.base_config.file_column_projection_indices() { Some(proj) => proj, None => (0..self.base_config.file_schema.fields().len()).collect(), }; + let parquet_file_reader_factory = + if let Some(factory) = self.parquet_file_reader_factory.as_ref() { + Arc::clone(&factory) + } else { + let store = ctx + .runtime_env() + .object_store(&self.base_config.object_store_url)?; + + Arc::new(DefaultParquetFileReaderFactory::new(store)) + }; + let opener = ParquetOpener { partition_index, projection: Arc::from(projection), - batch_size: context.session_config().batch_size(), + batch_size: ctx.session_config().batch_size(), pruning_predicate: self.pruning_predicate.clone(), table_schema: self.base_config.file_schema.clone(), metadata_size_hint: self.metadata_size_hint, metrics: self.metrics.clone(), + parquet_file_reader_factory, }; let stream = FileStream::new( &self.base_config, partition_index, - context, + ctx, opener, BaselineMetrics::new(&self.metrics, partition_index), )?; @@ -285,34 +302,35 @@ struct ParquetOpener { table_schema: SchemaRef, metadata_size_hint: Option, metrics: ExecutionPlanMetricsSet, + parquet_file_reader_factory: Arc, } impl FileOpener for ParquetOpener { fn open( &self, - store: Arc, - meta: ObjectMeta, - range: Option, - ) -> FileOpenFuture { + _: Arc, + file_meta: FileMeta, + ) -> Result { + let file_range = file_meta.range.clone(); + let metrics = ParquetFileMetrics::new( self.partition_index, - meta.location.as_ref(), + file_meta.location().as_ref(), &self.metrics, ); - let reader = ParquetFileReader { - store, - meta, - metadata_size_hint: self.metadata_size_hint, - metrics: metrics.clone(), - }; + let reader = self.parquet_file_reader_factory.create_reader( + file_meta, + self.metadata_size_hint, + metrics, + )?; let schema_adapter = SchemaAdapter::new(self.table_schema.clone()); let batch_size = self.batch_size; let projection = self.projection.clone(); let pruning_predicate = self.pruning_predicate.clone(); - Box::pin(async move { + let file_open_future = Box::pin(async move { let builder = ParquetRecordBatchStreamBuilder::new(reader).await?; let adapted_projections = schema_adapter.map_projections(builder.schema(), &projection)?; @@ -323,7 +341,8 @@ impl FileOpener for ParquetOpener { ); let groups = builder.metadata().row_groups(); - let row_groups = prune_row_groups(groups, range, pruning_predicate, &metrics); + let row_groups = + prune_row_groups(groups, file_range, pruning_predicate, &metrics); let stream = builder .with_projection(mask) @@ -342,7 +361,30 @@ impl FileOpener for ParquetOpener { }); Ok(adapted.boxed()) - }) + }); + + Ok(file_open_future) + } +} + +pub trait ParquetFileReaderFactory: + std::fmt::Display + Debug + Send + Sync + 'static +{ + fn create_reader( + &self, + file_meta: FileMeta, + metadata_size_hint: Option, + metrics: ParquetFileMetrics, + ) -> Result>; +} + +pub struct DefaultParquetFileReaderFactory { + store: Arc, +} + +impl DefaultParquetFileReaderFactory { + pub fn new(store: Arc) -> Self { + Self { store } } } @@ -390,6 +432,34 @@ impl AsyncFileReader for ParquetFileReader { } } +impl Display for DefaultParquetFileReaderFactory { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + write!(f, "DefaultParquetFileReaderFactory") + } +} + +impl Debug for DefaultParquetFileReaderFactory { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + write!(f, "DefaultParquetFileReaderFactory") + } +} + +impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory { + fn create_reader( + &self, + file_meta: FileMeta, + metadata_size_hint: Option, + metrics: ParquetFileMetrics, + ) -> Result> { + Ok(Box::new(ParquetFileReader { + meta: file_meta.object_meta, + store: Arc::clone(&self.store), + metadata_size_hint, + metrics, + })) + } +} + /// Wraps parquet statistics in a way /// that implements [`PruningStatistics`] struct RowGroupPruningStatistics<'a> { @@ -1087,6 +1157,7 @@ mod tests { object_meta: meta.clone(), partition_values: vec![], range: Some(FileRange { start, end }), + metadata_ext: None, } } @@ -1189,6 +1260,7 @@ mod tests { ScalarValue::Utf8(Some("26".to_owned())), ], range: None, + metadata_ext: None, }; let parquet_exec = ParquetExec::new( @@ -1251,6 +1323,7 @@ mod tests { }, partition_values: vec![], range: None, + metadata_ext: None, }; let parquet_exec = ParquetExec::new( From 12dee5e38d7ff0bb4139e17045a3e54f2a963573 Mon Sep 17 00:00:00 2001 From: Kamil Konior Date: Sat, 6 Aug 2022 11:57:49 +0200 Subject: [PATCH 02/23] handle error --- datafusion/core/src/physical_plan/file_format/file_stream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/physical_plan/file_format/file_stream.rs b/datafusion/core/src/physical_plan/file_format/file_stream.rs index d32e8ce6eaa5..6ccc26366d3a 100644 --- a/datafusion/core/src/physical_plan/file_format/file_stream.rs +++ b/datafusion/core/src/physical_plan/file_format/file_stream.rs @@ -164,8 +164,8 @@ impl FileStream { } } Err(e) => { - // TODO : how to handle error self.state = FileStreamState::Error; + return Poll::Ready(Some(Err(e.into()))); } } } From 926d25d7acd87b629a6731fecdce65b1b2976d5e Mon Sep 17 00:00:00 2001 From: Kamil Konior Date: Sat, 6 Aug 2022 14:00:04 +0200 Subject: [PATCH 03/23] fix compilation issues --- .../src/physical_plan/file_format/avro.rs | 6 ++-- .../core/src/physical_plan/file_format/csv.rs | 14 +++----- .../src/physical_plan/file_format/json.rs | 12 +++---- .../core/src/physical_plan/file_format/mod.rs | 34 +++++++++++++++++++ .../src/physical_plan/file_format/parquet.rs | 28 +++++++-------- 5 files changed, 59 insertions(+), 35 deletions(-) diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs index e27480e0b98c..4f680a05f2d6 100644 --- a/datafusion/core/src/physical_plan/file_format/avro.rs +++ b/datafusion/core/src/physical_plan/file_format/avro.rs @@ -191,7 +191,7 @@ mod private { file_meta: FileMeta, ) -> Result { let config = self.config.clone(); - let file_open_future = Box::pin(async move { + Ok(Box::pin(async move { match store.get(file_meta.location()).await? { GetResult::File(file, _) => { let reader = config.open(file)?; @@ -203,9 +203,7 @@ mod private { Ok(futures::stream::iter(reader).boxed()) } } - }); - - Ok(file_open_future) + })) } } } diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs index 4ed8efa59f2e..bdb60da7f521 100644 --- a/datafusion/core/src/physical_plan/file_format/csv.rs +++ b/datafusion/core/src/physical_plan/file_format/csv.rs @@ -20,17 +20,15 @@ use crate::error::{DataFusionError, Result}; use crate::execution::context::{SessionState, TaskContext}; use crate::physical_plan::expressions::PhysicalSortExpr; -use crate::physical_plan::{ - DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, -}; - -use crate::datasource::listing::FileRange; use crate::physical_plan::file_format::delimited_stream::newline_delimited_stream; use crate::physical_plan::file_format::file_stream::{ FileOpenFuture, FileOpener, FileStream, }; use crate::physical_plan::file_format::FileMeta; use crate::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet}; +use crate::physical_plan::{ + DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, +}; use arrow::csv; use arrow::datatypes::SchemaRef; use bytes::Buf; @@ -205,7 +203,7 @@ impl FileOpener for CsvOpener { file_meta: FileMeta, ) -> Result { let config = self.config.clone(); - let file_open_future = Box::pin(async move { + Ok(Box::pin(async move { match store.get(file_meta.location()).await? { GetResult::File(file, _) => { Ok(futures::stream::iter(config.open(file, true)).boxed()) @@ -222,9 +220,7 @@ impl FileOpener for CsvOpener { .boxed()) } } - }); - - Ok(file_open_future) + })) } } diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs index da3576860bcc..4fbb6db66e48 100644 --- a/datafusion/core/src/physical_plan/file_format/json.rs +++ b/datafusion/core/src/physical_plan/file_format/json.rs @@ -16,9 +16,6 @@ // under the License. //! Execution plan for reading line-delimited JSON files -use arrow::json::reader::DecoderOptions; - -use crate::datasource::listing::FileRange; use crate::error::{DataFusionError, Result}; use crate::execution::context::SessionState; use crate::execution::context::TaskContext; @@ -32,10 +29,11 @@ use crate::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet}; use crate::physical_plan::{ DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, }; +use arrow::json::reader::DecoderOptions; use arrow::{datatypes::SchemaRef, json}; use bytes::Buf; use futures::{StreamExt, TryStreamExt}; -use object_store::{GetResult, ObjectMeta, ObjectStore}; +use object_store::{GetResult, ObjectStore}; use std::any::Any; use std::fs; use std::path::Path; @@ -168,7 +166,7 @@ impl FileOpener for JsonOpener { ) -> Result { let options = self.options.clone(); let schema = self.file_schema.clone(); - let file_open_future = Box::pin(async move { + Ok(Box::pin(async move { match store.get(file_meta.location()).await? { GetResult::File(file, _) => { let reader = json::Reader::new(file, schema.clone(), options); @@ -188,9 +186,7 @@ impl FileOpener for JsonOpener { .boxed()) } } - }); - - Ok(file_open_future) + })) } } diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs index 3874289f3ed2..9d26aaa3f739 100644 --- a/datafusion/core/src/physical_plan/file_format/mod.rs +++ b/datafusion/core/src/physical_plan/file_format/mod.rs @@ -47,12 +47,17 @@ use crate::{ error::{DataFusionError, Result}, scalar::ScalarValue, }; +use ::parquet::arrow::async_reader::AsyncFileReader; +use ::parquet::file::metadata::ParquetMetaData; use arrow::array::{new_null_array, UInt16BufferBuilder}; use arrow::record_batch::RecordBatchOptions; +use bytes::Bytes; +use futures::future::BoxFuture; use lazy_static::lazy_static; use log::info; use object_store::path::Path; use object_store::ObjectMeta; +use std::ops::Range; use std::{ collections::HashMap, fmt::{Display, Formatter, Result as FmtResult}, @@ -429,6 +434,35 @@ impl From for FileMeta { } } +pub struct ThinFileReader { + reader: Box, +} + +impl AsyncFileReader for ThinFileReader { + fn get_bytes( + &mut self, + range: Range, + ) -> BoxFuture<'_, parquet::errors::Result> { + self.reader.get_bytes(range) + } + + fn get_byte_ranges( + &mut self, + ranges: Vec>, + ) -> BoxFuture<'_, parquet::errors::Result>> + where + Self: Send, + { + self.reader.get_byte_ranges(ranges) + } + + fn get_metadata( + &mut self, + ) -> BoxFuture<'_, parquet::errors::Result>> { + self.reader.get_metadata() + } +} + #[cfg(test)] mod tests { use crate::{ diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index cfd3abc770f7..b0fab2785824 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -55,7 +55,7 @@ use crate::datasource::listing::{FileRange, PartitionedFile}; use crate::physical_plan::file_format::file_stream::{ FileOpenFuture, FileOpener, FileStream, }; -use crate::physical_plan::file_format::FileMeta; +use crate::physical_plan::file_format::{FileMeta, ThinFileReader}; use crate::physical_plan::metrics::BaselineMetrics; use crate::{ error::{DataFusionError, Result}, @@ -322,7 +322,7 @@ impl FileOpener for ParquetOpener { let reader = self.parquet_file_reader_factory.create_reader( file_meta, self.metadata_size_hint, - metrics, + metrics.clone(), )?; let schema_adapter = SchemaAdapter::new(self.table_schema.clone()); @@ -330,7 +330,7 @@ impl FileOpener for ParquetOpener { let projection = self.projection.clone(); let pruning_predicate = self.pruning_predicate.clone(); - let file_open_future = Box::pin(async move { + Ok(Box::pin(async move { let builder = ParquetRecordBatchStreamBuilder::new(reader).await?; let adapted_projections = schema_adapter.map_projections(builder.schema(), &projection)?; @@ -361,9 +361,7 @@ impl FileOpener for ParquetOpener { }); Ok(adapted.boxed()) - }); - - Ok(file_open_future) + })) } } @@ -375,7 +373,7 @@ pub trait ParquetFileReaderFactory: file_meta: FileMeta, metadata_size_hint: Option, metrics: ParquetFileMetrics, - ) -> Result>; + ) -> Result; } pub struct DefaultParquetFileReaderFactory { @@ -450,13 +448,15 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory { file_meta: FileMeta, metadata_size_hint: Option, metrics: ParquetFileMetrics, - ) -> Result> { - Ok(Box::new(ParquetFileReader { - meta: file_meta.object_meta, - store: Arc::clone(&self.store), - metadata_size_hint, - metrics, - })) + ) -> Result { + Ok(ThinFileReader { + reader: Box::new(ParquetFileReader { + meta: file_meta.object_meta, + store: Arc::clone(&self.store), + metadata_size_hint, + metrics, + }), + }) } } From 4e9eeba7e59f49a237e151883b547ef2176f048d Mon Sep 17 00:00:00 2001 From: Kamil Konior Date: Sat, 6 Aug 2022 14:01:27 +0200 Subject: [PATCH 04/23] rename var --- .../core/src/physical_plan/file_format/file_stream.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/datafusion/core/src/physical_plan/file_format/file_stream.rs b/datafusion/core/src/physical_plan/file_format/file_stream.rs index 6ccc26366d3a..def57cdab43b 100644 --- a/datafusion/core/src/physical_plan/file_format/file_stream.rs +++ b/datafusion/core/src/physical_plan/file_format/file_stream.rs @@ -145,22 +145,22 @@ impl FileStream { loop { match &mut self.state { FileStreamState::Idle => { - let file = match self.file_iter.pop_front() { + let part_file = match self.file_iter.pop_front() { Some(file) => file, None => return Poll::Ready(None), }; let file_meta = FileMeta { - object_meta: file.object_meta, - range: file.range, - metadata_ext: file.metadata_ext, + object_meta: part_file.object_meta, + range: part_file.range, + metadata_ext: part_file.metadata_ext, }; match self.file_reader.open(self.object_store.clone(), file_meta) { Ok(future) => { self.state = FileStreamState::Open { future, - partition_values: file.partition_values, + partition_values: part_file.partition_values, } } Err(e) => { From cab87e84292f4d816431a876d0314e41d0089fb4 Mon Sep 17 00:00:00 2001 From: Kamil Konior Date: Sat, 6 Aug 2022 14:30:22 +0200 Subject: [PATCH 05/23] fix tests compilation issues --- datafusion/core/src/datasource/file_format/mod.rs | 1 + .../core/src/physical_plan/file_format/file_stream.rs | 4 ++-- datafusion/core/src/physical_plan/file_format/mod.rs | 6 +++--- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index 8a0d5b97ef65..18a332556a0c 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -108,6 +108,7 @@ pub(crate) mod test_util { object_meta: meta, partition_values: vec![], range: None, + metadata_ext: None, }]]; let exec = format diff --git a/datafusion/core/src/physical_plan/file_format/file_stream.rs b/datafusion/core/src/physical_plan/file_format/file_stream.rs index def57cdab43b..c68114a3b29e 100644 --- a/datafusion/core/src/physical_plan/file_format/file_stream.rs +++ b/datafusion/core/src/physical_plan/file_format/file_stream.rs @@ -265,10 +265,10 @@ mod tests { &self, _store: Arc, _file_meta: FileMeta, - ) -> FileOpenFuture { + ) -> Result { let iterator = self.records.clone().into_iter().map(Ok); let stream = futures::stream::iter(iterator).boxed(); - futures::future::ready(Ok(stream)).boxed() + Ok(futures::future::ready(Ok(stream)).boxed()) } } diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs index 9d26aaa3f739..fd7339f1fb74 100644 --- a/datafusion/core/src/physical_plan/file_format/mod.rs +++ b/datafusion/core/src/physical_plan/file_format/mod.rs @@ -442,14 +442,14 @@ impl AsyncFileReader for ThinFileReader { fn get_bytes( &mut self, range: Range, - ) -> BoxFuture<'_, parquet::errors::Result> { + ) -> BoxFuture<'_, ::parquet::errors::Result> { self.reader.get_bytes(range) } fn get_byte_ranges( &mut self, ranges: Vec>, - ) -> BoxFuture<'_, parquet::errors::Result>> + ) -> BoxFuture<'_, ::parquet::errors::Result>> where Self: Send, { @@ -458,7 +458,7 @@ impl AsyncFileReader for ThinFileReader { fn get_metadata( &mut self, - ) -> BoxFuture<'_, parquet::errors::Result>> { + ) -> BoxFuture<'_, ::parquet::errors::Result>> { self.reader.get_metadata() } } From cecfa0abe993bc4b3926fff290f8d0ff322c65a7 Mon Sep 17 00:00:00 2001 From: Kamil Konior Date: Sat, 6 Aug 2022 14:33:04 +0200 Subject: [PATCH 06/23] allow user to provide their own parquet reader factory --- datafusion/core/src/datasource/file_format/parquet.rs | 1 + datafusion/core/src/physical_optimizer/repartition.rs | 1 + datafusion/core/src/physical_plan/file_format/parquet.rs | 7 ++++++- 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index dfd08352dffb..8b941de5ba44 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -186,6 +186,7 @@ impl FileFormat for ParquetFormat { conf, predicate, self.metadata_size_hint(), + None, ))) } } diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs index ec2a1355ff26..50d5ee3158c0 100644 --- a/datafusion/core/src/physical_optimizer/repartition.rs +++ b/datafusion/core/src/physical_optimizer/repartition.rs @@ -272,6 +272,7 @@ mod tests { }, None, None, + None, )) } diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index b0fab2785824..a80cd47bd388 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -104,6 +104,7 @@ impl ParquetExec { base_config: FileScanConfig, predicate: Option, metadata_size_hint: Option, + parquet_file_reader_factory: Option>, ) -> Self { debug!("Creating ParquetExec, files: {:?}, projection {:?}, predicate: {:?}, limit: {:?}", base_config.file_groups, base_config.projection, predicate, base_config.limit); @@ -135,7 +136,7 @@ impl ParquetExec { metrics, pruning_predicate, metadata_size_hint, - parquet_file_reader_factory: None, + parquet_file_reader_factory, } } @@ -791,6 +792,7 @@ mod tests { }, predicate, None, + None, ); let session_ctx = SessionContext::new(); @@ -1179,6 +1181,7 @@ mod tests { }, None, None, + None, ); assert_eq!(parquet_exec.output_partitioning().partition_count(), 1); let results = parquet_exec.execute(0, task_ctx)?.next().await; @@ -1280,6 +1283,7 @@ mod tests { }, None, None, + None, ); assert_eq!(parquet_exec.output_partitioning().partition_count(), 1); @@ -1338,6 +1342,7 @@ mod tests { }, None, None, + None, ); let mut results = parquet_exec.execute(0, task_ctx)?; From 124eeb99a960e4ee69c25c471461c6d2214f0505 Mon Sep 17 00:00:00 2001 From: Kamil Konior Date: Sat, 6 Aug 2022 21:03:00 +0200 Subject: [PATCH 07/23] move ThinFileReader into parquet module --- .../core/src/physical_plan/file_format/mod.rs | 29 --------- .../src/physical_plan/file_format/parquet.rs | 61 ++++++++++++++----- 2 files changed, 45 insertions(+), 45 deletions(-) diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs index fd7339f1fb74..9ea88129f5b6 100644 --- a/datafusion/core/src/physical_plan/file_format/mod.rs +++ b/datafusion/core/src/physical_plan/file_format/mod.rs @@ -434,35 +434,6 @@ impl From for FileMeta { } } -pub struct ThinFileReader { - reader: Box, -} - -impl AsyncFileReader for ThinFileReader { - fn get_bytes( - &mut self, - range: Range, - ) -> BoxFuture<'_, ::parquet::errors::Result> { - self.reader.get_bytes(range) - } - - fn get_byte_ranges( - &mut self, - ranges: Vec>, - ) -> BoxFuture<'_, ::parquet::errors::Result>> - where - Self: Send, - { - self.reader.get_byte_ranges(ranges) - } - - fn get_metadata( - &mut self, - ) -> BoxFuture<'_, ::parquet::errors::Result>> { - self.reader.get_metadata() - } -} - #[cfg(test)] mod tests { use crate::{ diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index a80cd47bd388..760d6cbb9d9c 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -55,7 +55,7 @@ use crate::datasource::listing::{FileRange, PartitionedFile}; use crate::physical_plan::file_format::file_stream::{ FileOpenFuture, FileOpener, FileStream, }; -use crate::physical_plan::file_format::{FileMeta, ThinFileReader}; +use crate::physical_plan::file_format::FileMeta; use crate::physical_plan::metrics::BaselineMetrics; use crate::{ error::{DataFusionError, Result}, @@ -320,11 +320,13 @@ impl FileOpener for ParquetOpener { &self.metrics, ); - let reader = self.parquet_file_reader_factory.create_reader( - file_meta, - self.metadata_size_hint, - metrics.clone(), - )?; + let reader = ThinFileReader { + reader: self.parquet_file_reader_factory.create_reader( + file_meta, + self.metadata_size_hint, + metrics.clone(), + )?, + }; let schema_adapter = SchemaAdapter::new(self.table_schema.clone()); let batch_size = self.batch_size; @@ -374,7 +376,7 @@ pub trait ParquetFileReaderFactory: file_meta: FileMeta, metadata_size_hint: Option, metrics: ParquetFileMetrics, - ) -> Result; + ) -> Result>; } pub struct DefaultParquetFileReaderFactory { @@ -449,15 +451,42 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory { file_meta: FileMeta, metadata_size_hint: Option, metrics: ParquetFileMetrics, - ) -> Result { - Ok(ThinFileReader { - reader: Box::new(ParquetFileReader { - meta: file_meta.object_meta, - store: Arc::clone(&self.store), - metadata_size_hint, - metrics, - }), - }) + ) -> Result> { + Ok(Box::new(ParquetFileReader { + meta: file_meta.object_meta, + store: Arc::clone(&self.store), + metadata_size_hint, + metrics, + })) + } +} + +struct ThinFileReader { + reader: Box, +} + +impl AsyncFileReader for ThinFileReader { + fn get_bytes( + &mut self, + range: Range, + ) -> BoxFuture<'_, ::parquet::errors::Result> { + self.reader.get_bytes(range) + } + + fn get_byte_ranges( + &mut self, + ranges: Vec>, + ) -> BoxFuture<'_, ::parquet::errors::Result>> + where + Self: Send, + { + self.reader.get_byte_ranges(ranges) + } + + fn get_metadata( + &mut self, + ) -> BoxFuture<'_, ::parquet::errors::Result>> { + self.reader.get_metadata() } } From 793d252f7d27dd385613a5eeb17c492e677ffffb Mon Sep 17 00:00:00 2001 From: Kamil Konior Date: Sat, 6 Aug 2022 21:03:34 +0200 Subject: [PATCH 08/23] rename field reader to delegate --- .../core/src/physical_plan/file_format/parquet.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 760d6cbb9d9c..facdf299714c 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -321,7 +321,7 @@ impl FileOpener for ParquetOpener { ); let reader = ThinFileReader { - reader: self.parquet_file_reader_factory.create_reader( + delegate: self.parquet_file_reader_factory.create_reader( file_meta, self.metadata_size_hint, metrics.clone(), @@ -462,7 +462,7 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory { } struct ThinFileReader { - reader: Box, + delegate: Box, } impl AsyncFileReader for ThinFileReader { @@ -470,7 +470,7 @@ impl AsyncFileReader for ThinFileReader { &mut self, range: Range, ) -> BoxFuture<'_, ::parquet::errors::Result> { - self.reader.get_bytes(range) + self.delegate.get_bytes(range) } fn get_byte_ranges( @@ -480,13 +480,13 @@ impl AsyncFileReader for ThinFileReader { where Self: Send, { - self.reader.get_byte_ranges(ranges) + self.delegate.get_byte_ranges(ranges) } fn get_metadata( &mut self, ) -> BoxFuture<'_, ::parquet::errors::Result>> { - self.reader.get_metadata() + self.delegate.get_metadata() } } From 8994e0c52dbbf040314e099bb06ee14141076071 Mon Sep 17 00:00:00 2001 From: Kamil Konior Date: Mon, 8 Aug 2022 11:56:54 +0200 Subject: [PATCH 09/23] convert if else to unwrap or else --- .../src/physical_plan/file_format/parquet.rs | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index facdf299714c..f308deb046f1 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -223,16 +223,18 @@ impl ExecutionPlan for ParquetExec { None => (0..self.base_config.file_schema.fields().len()).collect(), }; - let parquet_file_reader_factory = - if let Some(factory) = self.parquet_file_reader_factory.as_ref() { - Arc::clone(&factory) - } else { - let store = ctx - .runtime_env() - .object_store(&self.base_config.object_store_url)?; - - Arc::new(DefaultParquetFileReaderFactory::new(store)) - }; + let parquet_file_reader_factory = self + .parquet_file_reader_factory + .as_ref() + .map(|f| Ok(Arc::clone(&f))) + .unwrap_or_else(|| { + ctx.runtime_env() + .object_store(&self.base_config.object_store_url) + .map(|store| { + Arc::new(DefaultParquetFileReaderFactory::new(store)) + as Arc + }) + })?; let opener = ParquetOpener { partition_index, From cd1e5320440dbcc1c0d43400c638689cb779419c Mon Sep 17 00:00:00 2001 From: Kamil Konior Date: Mon, 8 Aug 2022 12:08:11 +0200 Subject: [PATCH 10/23] rename ThinFileReader to BoxedAsyncFileReader, add doc --- .../src/physical_plan/file_format/parquet.rs | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index f308deb046f1..7407d2c953f8 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -322,13 +322,12 @@ impl FileOpener for ParquetOpener { &self.metrics, ); - let reader = ThinFileReader { - delegate: self.parquet_file_reader_factory.create_reader( + let reader = + BoxedAsyncFileReader(self.parquet_file_reader_factory.create_reader( file_meta, self.metadata_size_hint, metrics.clone(), - )?, - }; + )?); let schema_adapter = SchemaAdapter::new(self.table_schema.clone()); let batch_size = self.batch_size; @@ -463,16 +462,18 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory { } } -struct ThinFileReader { - delegate: Box, -} +/// +/// BoxedAsyncFileReader has been created to satisfy type requirements of +/// parquet stream builder constructor. +/// +struct BoxedAsyncFileReader(Box); -impl AsyncFileReader for ThinFileReader { +impl AsyncFileReader for BoxedAsyncFileReader { fn get_bytes( &mut self, range: Range, ) -> BoxFuture<'_, ::parquet::errors::Result> { - self.delegate.get_bytes(range) + self.0.get_bytes(range) } fn get_byte_ranges( @@ -482,13 +483,13 @@ impl AsyncFileReader for ThinFileReader { where Self: Send, { - self.delegate.get_byte_ranges(ranges) + self.0.get_byte_ranges(ranges) } fn get_metadata( &mut self, ) -> BoxFuture<'_, ::parquet::errors::Result>> { - self.delegate.get_metadata() + self.0.get_metadata() } } From ef4e30e10a41968e8bfaa1bd3e47e4047c3c0d54 Mon Sep 17 00:00:00 2001 From: Kamil Konior Date: Mon, 8 Aug 2022 12:19:58 +0200 Subject: [PATCH 11/23] hide ParquetFileMetrics --- .../src/physical_plan/file_format/parquet.rs | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 7407d2c953f8..c708e9c012fd 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -89,7 +89,7 @@ pub struct ParquetExec { /// Stores metrics about the parquet execution for a particular parquet file #[derive(Debug, Clone)] -pub struct ParquetFileMetrics { +struct ParquetFileMetrics { /// Number of times the predicate could not be evaluated pub predicate_evaluation_errors: metrics::Count, /// Number of row groups pruned using @@ -324,9 +324,10 @@ impl FileOpener for ParquetOpener { let reader = BoxedAsyncFileReader(self.parquet_file_reader_factory.create_reader( + self.partition_index, file_meta, self.metadata_size_hint, - metrics.clone(), + &self.metrics, )?); let schema_adapter = SchemaAdapter::new(self.table_schema.clone()); @@ -374,9 +375,10 @@ pub trait ParquetFileReaderFactory: { fn create_reader( &self, + partition_index: usize, file_meta: FileMeta, metadata_size_hint: Option, - metrics: ParquetFileMetrics, + metrics: &ExecutionPlanMetricsSet, ) -> Result>; } @@ -449,15 +451,22 @@ impl Debug for DefaultParquetFileReaderFactory { impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory { fn create_reader( &self, + partition_index: usize, file_meta: FileMeta, metadata_size_hint: Option, - metrics: ParquetFileMetrics, + metrics: &ExecutionPlanMetricsSet, ) -> Result> { + let parquet_file_metrics = ParquetFileMetrics::new( + partition_index, + file_meta.location().as_ref(), + metrics, + ); + Ok(Box::new(ParquetFileReader { meta: file_meta.object_meta, store: Arc::clone(&self.store), metadata_size_hint, - metrics, + metrics: parquet_file_metrics, })) } } From cfb168f0fc1e490a2e7dc89387b3a89b42ecc329 Mon Sep 17 00:00:00 2001 From: Kamil Konior Date: Mon, 8 Aug 2022 12:22:22 +0200 Subject: [PATCH 12/23] derive debug --- .../src/physical_plan/file_format/parquet.rs | 17 ++--------------- 1 file changed, 2 insertions(+), 15 deletions(-) diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index c708e9c012fd..d93f70b3fb6b 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -370,9 +370,7 @@ impl FileOpener for ParquetOpener { } } -pub trait ParquetFileReaderFactory: - std::fmt::Display + Debug + Send + Sync + 'static -{ +pub trait ParquetFileReaderFactory: Debug + Send + Sync + 'static { fn create_reader( &self, partition_index: usize, @@ -382,6 +380,7 @@ pub trait ParquetFileReaderFactory: ) -> Result>; } +#[derive(Debug)] pub struct DefaultParquetFileReaderFactory { store: Arc, } @@ -436,18 +435,6 @@ impl AsyncFileReader for ParquetFileReader { } } -impl Display for DefaultParquetFileReaderFactory { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - write!(f, "DefaultParquetFileReaderFactory") - } -} - -impl Debug for DefaultParquetFileReaderFactory { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - write!(f, "DefaultParquetFileReaderFactory") - } -} - impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory { fn create_reader( &self, From 8c6d702b17bf0e3fcefcc1dfa3c02feed7b6ac86 Mon Sep 17 00:00:00 2001 From: Kamil Konior Date: Mon, 8 Aug 2022 12:33:13 +0200 Subject: [PATCH 13/23] convert metadata_ext field into Any type --- datafusion/core/src/datasource/file_format/mod.rs | 2 +- datafusion/core/src/datasource/listing/helpers.rs | 4 ++-- datafusion/core/src/datasource/listing/mod.rs | 15 +++++---------- .../src/physical_plan/file_format/file_stream.rs | 9 ++++----- .../core/src/physical_plan/file_format/mod.rs | 13 +++++-------- .../core/src/physical_plan/file_format/parquet.rs | 6 +++--- 6 files changed, 20 insertions(+), 29 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index 18a332556a0c..b16525c7a4f6 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -108,7 +108,7 @@ pub(crate) mod test_util { object_meta: meta, partition_values: vec![], range: None, - metadata_ext: None, + extensions: None, }]]; let exec = format diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index a5464ff06d56..6c018eda3e76 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -200,7 +200,7 @@ pub async fn pruned_partition_list<'a>( partition_values, object_meta, range: None, - metadata_ext: None, + extensions: None, })) }, ))) @@ -320,7 +320,7 @@ fn batches_to_paths(batches: &[RecordBatch]) -> Result> { }) .collect(), range: None, - metadata_ext: None, + extensions: None, }) }) }) diff --git a/datafusion/core/src/datasource/listing/mod.rs b/datafusion/core/src/datasource/listing/mod.rs index cc91371675d2..a7dd084a12fb 100644 --- a/datafusion/core/src/datasource/listing/mod.rs +++ b/datafusion/core/src/datasource/listing/mod.rs @@ -29,6 +29,7 @@ use futures::Stream; use object_store::{path::Path, ObjectMeta}; use std::collections::HashMap; use std::pin::Pin; +use std::sync::Arc; pub use self::url::ListingTableUrl; pub use table::{ListingOptions, ListingTable, ListingTableConfig}; @@ -60,7 +61,7 @@ pub struct PartitionedFile { /// An optional file range for a more fine-grained parallel execution pub range: Option, /// An optional field for user defined per object metadata - pub metadata_ext: Option, + pub extensions: Option>, } impl PartitionedFile { @@ -74,7 +75,7 @@ impl PartitionedFile { }, partition_values: vec![], range: None, - metadata_ext: None, + extensions: None, } } @@ -88,7 +89,7 @@ impl PartitionedFile { }, partition_values: vec![], range: Some(FileRange { start, end }), - metadata_ext: None, + extensions: None, } } } @@ -99,13 +100,7 @@ impl From for PartitionedFile { object_meta, partition_values: vec![], range: None, - metadata_ext: None, + extensions: None, } } } - -#[derive(Debug, Clone)] -pub enum FileMetaExt { - Map(HashMap), - Array(Box<[u8]>), -} diff --git a/datafusion/core/src/physical_plan/file_format/file_stream.rs b/datafusion/core/src/physical_plan/file_format/file_stream.rs index c68114a3b29e..120334f6cc16 100644 --- a/datafusion/core/src/physical_plan/file_format/file_stream.rs +++ b/datafusion/core/src/physical_plan/file_format/file_stream.rs @@ -31,12 +31,11 @@ use arrow::{error::Result as ArrowResult, record_batch::RecordBatch}; use futures::future::BoxFuture; use futures::stream::BoxStream; use futures::{ready, FutureExt, Stream, StreamExt}; -use hashbrown::HashMap; -use object_store::{ObjectMeta, ObjectStore}; +use object_store::ObjectStore; -use datafusion_common::{DataFusionError, ScalarValue}; +use datafusion_common::ScalarValue; -use crate::datasource::listing::{FileMetaExt, FileRange, PartitionedFile}; +use crate::datasource::listing::PartitionedFile; use crate::error::Result; use crate::execution::context::TaskContext; use crate::physical_plan::file_format::{ @@ -153,7 +152,7 @@ impl FileStream { let file_meta = FileMeta { object_meta: part_file.object_meta, range: part_file.range, - metadata_ext: part_file.metadata_ext, + extensions: part_file.extensions, }; match self.file_reader.open(self.object_store.clone(), file_meta) { diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs index 9ea88129f5b6..31f781633a40 100644 --- a/datafusion/core/src/physical_plan/file_format/mod.rs +++ b/datafusion/core/src/physical_plan/file_format/mod.rs @@ -41,23 +41,18 @@ pub use avro::AvroExec; pub(crate) use json::plan_to_json; pub use json::NdJsonExec; -use crate::datasource::listing::{FileMetaExt, FileRange}; +use crate::datasource::listing::FileRange; use crate::datasource::{listing::PartitionedFile, object_store::ObjectStoreUrl}; use crate::{ error::{DataFusionError, Result}, scalar::ScalarValue, }; -use ::parquet::arrow::async_reader::AsyncFileReader; -use ::parquet::file::metadata::ParquetMetaData; use arrow::array::{new_null_array, UInt16BufferBuilder}; use arrow::record_batch::RecordBatchOptions; -use bytes::Bytes; -use futures::future::BoxFuture; use lazy_static::lazy_static; use log::info; use object_store::path::Path; use object_store::ObjectMeta; -use std::ops::Range; use std::{ collections::HashMap, fmt::{Display, Formatter, Result as FmtResult}, @@ -409,16 +404,18 @@ fn create_dict_array( )) } +/// A single file or part of a file that should be read, along with its schema, statistics pub struct FileMeta { /// Path for the file (e.g. URL, filesystem path, etc) pub object_meta: ObjectMeta, /// An optional file range for a more fine-grained parallel execution pub range: Option, /// An optional field for user defined per object metadata - pub metadata_ext: Option, + pub extensions: Option>, } impl FileMeta { + /// The full path to the object pub fn location(&self) -> &Path { &self.object_meta.location } @@ -429,7 +426,7 @@ impl From for FileMeta { Self { object_meta, range: None, - metadata_ext: None, + extensions: None, } } } diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index d93f70b3fb6b..37a6247864dd 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -1187,7 +1187,7 @@ mod tests { object_meta: meta.clone(), partition_values: vec![], range: Some(FileRange { start, end }), - metadata_ext: None, + extensions: None, } } @@ -1291,7 +1291,7 @@ mod tests { ScalarValue::Utf8(Some("26".to_owned())), ], range: None, - metadata_ext: None, + extensions: None, }; let parquet_exec = ParquetExec::new( @@ -1355,7 +1355,7 @@ mod tests { }, partition_values: vec![], range: None, - metadata_ext: None, + extensions: None, }; let parquet_exec = ParquetExec::new( From 10c12da09ca7df7f112573adf87321a8d7375bda Mon Sep 17 00:00:00 2001 From: Kamil Konior Date: Mon, 8 Aug 2022 14:01:59 +0200 Subject: [PATCH 14/23] add builder like method instead of modifying ctor --- .../src/datasource/file_format/parquet.rs | 1 - datafusion/core/src/datasource/listing/mod.rs | 1 - .../src/physical_optimizer/repartition.rs | 1 - .../src/physical_plan/file_format/parquet.rs | 22 ++++++++++++++----- 4 files changed, 16 insertions(+), 9 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 8b941de5ba44..dfd08352dffb 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -186,7 +186,6 @@ impl FileFormat for ParquetFormat { conf, predicate, self.metadata_size_hint(), - None, ))) } } diff --git a/datafusion/core/src/datasource/listing/mod.rs b/datafusion/core/src/datasource/listing/mod.rs index a7dd084a12fb..27cf14122690 100644 --- a/datafusion/core/src/datasource/listing/mod.rs +++ b/datafusion/core/src/datasource/listing/mod.rs @@ -27,7 +27,6 @@ use chrono::TimeZone; use datafusion_common::ScalarValue; use futures::Stream; use object_store::{path::Path, ObjectMeta}; -use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs index 50d5ee3158c0..ec2a1355ff26 100644 --- a/datafusion/core/src/physical_optimizer/repartition.rs +++ b/datafusion/core/src/physical_optimizer/repartition.rs @@ -272,7 +272,6 @@ mod tests { }, None, None, - None, )) } diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 37a6247864dd..d4a83b780f01 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -104,7 +104,6 @@ impl ParquetExec { base_config: FileScanConfig, predicate: Option, metadata_size_hint: Option, - parquet_file_reader_factory: Option>, ) -> Self { debug!("Creating ParquetExec, files: {:?}, projection {:?}, predicate: {:?}, limit: {:?}", base_config.file_groups, base_config.projection, predicate, base_config.limit); @@ -136,7 +135,7 @@ impl ParquetExec { metrics, pruning_predicate, metadata_size_hint, - parquet_file_reader_factory, + parquet_file_reader_factory: None, } } @@ -149,6 +148,21 @@ impl ParquetExec { pub fn pruning_predicate(&self) -> Option<&PruningPredicate> { self.pruning_predicate.as_ref() } + + /// Optional user defined parquet file reader factory. + /// + /// `ParquetFileReaderFactory` complements `TableProvider`, It enables users to provide custom + /// implementation for data access operations. + /// + /// If custom `ParquetFileReaderFactory` is provided, then data access operations will be routed + /// to this factory instead of `ObjectStore`. + pub fn with_parquet_file_reader_factory( + mut self, + parquet_file_reader_factory: Arc, + ) -> Self { + self.parquet_file_reader_factory = Some(parquet_file_reader_factory); + self + } } impl ParquetFileMetrics { @@ -820,7 +834,6 @@ mod tests { }, predicate, None, - None, ); let session_ctx = SessionContext::new(); @@ -1209,7 +1222,6 @@ mod tests { }, None, None, - None, ); assert_eq!(parquet_exec.output_partitioning().partition_count(), 1); let results = parquet_exec.execute(0, task_ctx)?.next().await; @@ -1311,7 +1323,6 @@ mod tests { }, None, None, - None, ); assert_eq!(parquet_exec.output_partitioning().partition_count(), 1); @@ -1370,7 +1381,6 @@ mod tests { }, None, None, - None, ); let mut results = parquet_exec.execute(0, task_ctx)?; From e6e50c2d09f3e4eb14f80512527e74c0b25bf090 Mon Sep 17 00:00:00 2001 From: Kamil Konior Date: Mon, 8 Aug 2022 14:10:56 +0200 Subject: [PATCH 15/23] make `ParquetFileReaderFactory` public to let user's provide custom implementations --- datafusion/core/src/physical_plan/file_format/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs index 31f781633a40..851141d7d637 100644 --- a/datafusion/core/src/physical_plan/file_format/mod.rs +++ b/datafusion/core/src/physical_plan/file_format/mod.rs @@ -29,7 +29,7 @@ mod parquet; pub(crate) use self::csv::plan_to_csv; pub use self::csv::CsvExec; pub(crate) use self::parquet::plan_to_parquet; -pub use self::parquet::ParquetExec; +pub use self::parquet::{ParquetExec, ParquetFileReaderFactory}; use arrow::{ array::{ArrayData, ArrayRef, DictionaryArray}, buffer::Buffer, From 6b33ea93ed022aa0575fb1de7c2340e9ea9963f3 Mon Sep 17 00:00:00 2001 From: Kamil Konior Date: Mon, 8 Aug 2022 14:44:38 +0200 Subject: [PATCH 16/23] imports cleanup and more docs --- datafusion/core/src/physical_plan/file_format/csv.rs | 2 +- datafusion/core/src/physical_plan/file_format/parquet.rs | 7 +++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs index bdb60da7f521..09865f0ff211 100644 --- a/datafusion/core/src/physical_plan/file_format/csv.rs +++ b/datafusion/core/src/physical_plan/file_format/csv.rs @@ -33,7 +33,7 @@ use arrow::csv; use arrow::datatypes::SchemaRef; use bytes::Buf; use futures::{StreamExt, TryStreamExt}; -use object_store::{GetResult, ObjectMeta, ObjectStore}; +use object_store::{GetResult, ObjectStore}; use std::any::Any; use std::fs; use std::path::Path; diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index d4a83b780f01..ff0e3d57ae61 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -19,7 +19,6 @@ use fmt::Debug; use std::fmt; -use std::fmt::{Display, Formatter}; use std::fs; use std::ops::Range; use std::sync::Arc; @@ -51,7 +50,7 @@ use datafusion_common::Column; use datafusion_expr::Expr; use crate::datasource::file_format::parquet::fetch_parquet_metadata; -use crate::datasource::listing::{FileRange, PartitionedFile}; +use crate::datasource::listing::FileRange; use crate::physical_plan::file_format::file_stream::{ FileOpenFuture, FileOpener, FileStream, }; @@ -384,7 +383,11 @@ impl FileOpener for ParquetOpener { } } +/// Factory of parquet file readers. +/// +/// Provides means to implement custom data access interface. pub trait ParquetFileReaderFactory: Debug + Send + Sync + 'static { + /// Provides `AsyncFileReader` over parquet file specified in `FileMeta` fn create_reader( &self, partition_index: usize, From 2f2079a1d8a9a45ba556e3b5bd09dfa396f63e31 Mon Sep 17 00:00:00 2001 From: Kamil Konior Date: Mon, 8 Aug 2022 16:37:00 +0200 Subject: [PATCH 17/23] try fix clippy failures --- .../core/src/physical_plan/file_format/avro.rs | 3 +-- .../core/src/physical_plan/file_format/parquet.rs | 12 +----------- 2 files changed, 2 insertions(+), 13 deletions(-) diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs index 4f680a05f2d6..ee3dc9af57b5 100644 --- a/datafusion/core/src/physical_plan/file_format/avro.rs +++ b/datafusion/core/src/physical_plan/file_format/avro.rs @@ -153,12 +153,11 @@ impl ExecutionPlan for AvroExec { #[cfg(feature = "avro")] mod private { use super::*; - use crate::datasource::listing::FileRange; use crate::physical_plan::file_format::file_stream::{FileOpenFuture, FileOpener}; use crate::physical_plan::file_format::FileMeta; use bytes::Buf; use futures::StreamExt; - use object_store::{GetResult, ObjectMeta, ObjectStore}; + use object_store::{GetResult, ObjectStore}; pub struct AvroConfig { pub schema: SchemaRef, diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index ff0e3d57ae61..1a21fc7605cb 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -239,7 +239,7 @@ impl ExecutionPlan for ParquetExec { let parquet_file_reader_factory = self .parquet_file_reader_factory .as_ref() - .map(|f| Ok(Arc::clone(&f))) + .map(|f| Ok(Arc::clone(f))) .unwrap_or_else(|| { ctx.runtime_env() .object_store(&self.base_config.object_store_url) @@ -489,16 +489,6 @@ impl AsyncFileReader for BoxedAsyncFileReader { self.0.get_bytes(range) } - fn get_byte_ranges( - &mut self, - ranges: Vec>, - ) -> BoxFuture<'_, ::parquet::errors::Result>> - where - Self: Send, - { - self.0.get_byte_ranges(ranges) - } - fn get_metadata( &mut self, ) -> BoxFuture<'_, ::parquet::errors::Result>> { From cb96ca790c753ed2d9d80b86833f95298c19827d Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 8 Aug 2022 17:47:09 +0100 Subject: [PATCH 18/23] Disable where_clauses_object_safety --- datafusion/core/src/lib.rs | 2 ++ .../core/src/physical_plan/file_format/parquet.rs | 13 +++++++++++++ 2 files changed, 15 insertions(+) diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index dcc508e65434..0dd665628399 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. #![warn(missing_docs, clippy::needless_borrow)] +// TODO: Temporary workaround for https://github.com/apache/arrow-rs/issues/2372 (#3081) +#![allow(where_clauses_object_safety)] //! [DataFusion](https://github.com/apache/arrow-datafusion) //! is an extensible query execution framework that uses diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 1a21fc7605cb..a3389809ddd0 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -479,6 +479,7 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory { /// BoxedAsyncFileReader has been created to satisfy type requirements of /// parquet stream builder constructor. /// +/// Temporary pending https://github.com/apache/arrow-rs/pull/2368 struct BoxedAsyncFileReader(Box); impl AsyncFileReader for BoxedAsyncFileReader { @@ -489,6 +490,18 @@ impl AsyncFileReader for BoxedAsyncFileReader { self.0.get_bytes(range) } + fn get_byte_ranges( + &mut self, + ranges: Vec>, + ) -> BoxFuture<'_, parquet::errors::Result>> + // TODO: This where bound forces us to enable #![allow(where_clauses_object_safety)] (#3081) + // Upstream issue https://github.com/apache/arrow-rs/issues/2372 + where + Self: Send, + { + self.0.get_byte_ranges(ranges) + } + fn get_metadata( &mut self, ) -> BoxFuture<'_, ::parquet::errors::Result>> { From e387d23b0b10b3527e87e866c7f6a9658c86dec4 Mon Sep 17 00:00:00 2001 From: Kamil Konior Date: Tue, 9 Aug 2022 15:53:40 +0200 Subject: [PATCH 19/23] add test --- .../src/datasource/file_format/parquet.rs | 46 +++++++ .../src/physical_plan/file_format/parquet.rs | 116 ++++++++++++++++-- 2 files changed, 155 insertions(+), 7 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index dfd08352dffb..9f3c399e3470 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -522,8 +522,13 @@ pub(crate) mod test_util { use super::*; use crate::test::object_store::local_unpartitioned_file; use arrow::record_batch::RecordBatch; + use bytes::Bytes; + use object_store::memory::InMemory; + use object_store::path::Path; use parquet::arrow::ArrowWriter; use parquet::file::properties::WriterProperties; + use std::io::Cursor; + use std::time::SystemTime; use tempfile::NamedTempFile; pub async fn store_parquet( @@ -548,6 +553,47 @@ pub(crate) mod test_util { let meta: Vec<_> = files.iter().map(local_unpartitioned_file).collect(); Ok((meta, files)) } + + pub async fn store_parquet_in_memory( + batches: Vec, + ) -> (Arc, Vec) { + let in_memory = InMemory::new(); + + let parquet_batches: Vec<(ObjectMeta, Bytes)> = batches + .into_iter() + .enumerate() + .map(|(offset, batch)| { + let mut buf = Vec::::with_capacity(32 * 1024); + let mut output = Cursor::new(&mut buf); + + let mut writer = ArrowWriter::try_new(&mut output, batch.schema(), None) + .expect("creating writer"); + + writer.write(&batch).expect("Writing batch"); + writer.close().unwrap(); + + let meta = ObjectMeta { + location: Path::parse(format!("file-{offset}.parquet")) + .expect("creating path"), + last_modified: chrono::DateTime::from(SystemTime::now()), + size: buf.len(), + }; + + (meta, Bytes::from(buf)) + }) + .collect(); + + let mut objects = Vec::with_capacity(parquet_batches.len()); + for (meta, bytes) in parquet_batches { + in_memory + .put(&meta.location, bytes) + .await + .expect("put parquet file into in memory object store"); + objects.push(meta); + } + + (Arc::new(in_memory), objects) + } } #[cfg(test)] diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 1a21fc7605cb..6f2718a5441b 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -759,20 +759,21 @@ pub async fn plan_to_parquet( #[cfg(test)] mod tests { - use crate::{ - assert_batches_sorted_eq, assert_contains, - datasource::file_format::{parquet::ParquetFormat, FileFormat}, - physical_plan::collect, - }; - use super::*; - use crate::datasource::file_format::parquet::test_util::store_parquet; + use crate::datasource::file_format::parquet::test_util::{ + store_parquet, store_parquet_in_memory, + }; use crate::datasource::file_format::test_util::scan_format; use crate::datasource::listing::{FileRange, PartitionedFile}; use crate::datasource::object_store::ObjectStoreUrl; use crate::execution::options::CsvReadOptions; use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext}; use crate::test::object_store::local_unpartitioned_file; + use crate::{ + assert_batches_sorted_eq, assert_contains, + datasource::file_format::{parquet::ParquetFormat, FileFormat}, + physical_plan::collect, + }; use arrow::array::Float32Array; use arrow::record_batch::RecordBatch; use arrow::{ @@ -856,6 +857,107 @@ mod tests { ) } + const EXPECTED_USER_DEFINED_METADATA: &str = "some-user-defined-metadata"; + + #[tokio::test] + async fn route_data_access_ops_to_parquet_file_reader_factory() { + let c1: ArrayRef = + Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); + let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); + let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None])); + + let batch = create_batch(vec![ + ("c1", c1.clone()), + ("c2", c2.clone()), + ("c3", c3.clone()), + ]); + + let file_schema = batch.schema().clone(); + let (in_memory_object_store, parquet_files_meta) = + store_parquet_in_memory(vec![batch]).await; + let file_groups = parquet_files_meta + .into_iter() + .map(|meta| PartitionedFile { + object_meta: meta, + partition_values: vec![], + range: None, + extensions: Some(Arc::new(String::from(EXPECTED_USER_DEFINED_METADATA))), + }) + .collect(); + + // prepare the scan + let parquet_exec = ParquetExec::new( + FileScanConfig { + // just any url that doesn't point to in memory object store + object_store_url: ObjectStoreUrl::local_filesystem(), + file_groups: vec![file_groups], + file_schema, + statistics: Statistics::default(), + projection: None, + limit: None, + table_partition_cols: vec![], + }, + None, + None, + ) + .with_parquet_file_reader_factory(Arc::new( + InMemoryParquetFileReaderFactory(Arc::clone(&in_memory_object_store)), + )); + + let session_ctx = SessionContext::new(); + + let task_ctx = session_ctx.task_ctx(); + let read = collect(Arc::new(parquet_exec), task_ctx).await.unwrap(); + + let expected = vec![ + "+-----+----+----+", + "| c1 | c2 | c3 |", + "+-----+----+----+", + "| Foo | 1 | 10 |", + "| | 2 | 20 |", + "| bar | | |", + "+-----+----+----+", + ]; + + assert_batches_sorted_eq!(expected, &read); + } + + #[derive(Debug)] + struct InMemoryParquetFileReaderFactory(Arc); + + impl ParquetFileReaderFactory for InMemoryParquetFileReaderFactory { + fn create_reader( + &self, + partition_index: usize, + file_meta: FileMeta, + metadata_size_hint: Option, + metrics: &ExecutionPlanMetricsSet, + ) -> Result> { + let metadata = file_meta + .extensions + .as_ref() + .expect("has user defined metadata"); + let metadata = metadata + .downcast_ref::() + .expect("has string metadata"); + + assert_eq!(EXPECTED_USER_DEFINED_METADATA, &metadata[..]); + + let parquet_file_metrics = ParquetFileMetrics::new( + partition_index, + file_meta.location().as_ref(), + metrics, + ); + + Ok(Box::new(ParquetFileReader { + meta: file_meta.object_meta, + store: Arc::clone(&self.0), + metadata_size_hint, + metrics: parquet_file_metrics, + })) + } + } + #[tokio::test] async fn evolved_schema() { let c1: ArrayRef = From bc66f6a48afeb7961554cd626b0197e4a39849f0 Mon Sep 17 00:00:00 2001 From: Kamil Konior Date: Tue, 9 Aug 2022 16:48:52 +0200 Subject: [PATCH 20/23] extract ParquetFileReaderFactory test to integration tests --- .../src/datasource/file_format/parquet.rs | 47 +--- .../core/src/physical_plan/file_format/mod.rs | 2 +- .../src/physical_plan/file_format/parquet.rs | 126 ++------- .../core/tests/custom_parquet_reader.rs | 242 ++++++++++++++++++ 4 files changed, 263 insertions(+), 154 deletions(-) create mode 100644 datafusion/core/tests/custom_parquet_reader.rs diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 9f3c399e3470..39e9c4ba27d4 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -354,7 +354,11 @@ fn summarize_min_max( } } -pub(crate) async fn fetch_parquet_metadata( +/// Fetches parquet metadata from ObjectStore for given object +/// +/// This component is a subject to **change** in near future and is exposed for low level integrations +/// through [ParquetFileReaderFactory]. +pub async fn fetch_parquet_metadata( store: &dyn ObjectStore, meta: &ObjectMeta, size_hint: Option, @@ -553,47 +557,6 @@ pub(crate) mod test_util { let meta: Vec<_> = files.iter().map(local_unpartitioned_file).collect(); Ok((meta, files)) } - - pub async fn store_parquet_in_memory( - batches: Vec, - ) -> (Arc, Vec) { - let in_memory = InMemory::new(); - - let parquet_batches: Vec<(ObjectMeta, Bytes)> = batches - .into_iter() - .enumerate() - .map(|(offset, batch)| { - let mut buf = Vec::::with_capacity(32 * 1024); - let mut output = Cursor::new(&mut buf); - - let mut writer = ArrowWriter::try_new(&mut output, batch.schema(), None) - .expect("creating writer"); - - writer.write(&batch).expect("Writing batch"); - writer.close().unwrap(); - - let meta = ObjectMeta { - location: Path::parse(format!("file-{offset}.parquet")) - .expect("creating path"), - last_modified: chrono::DateTime::from(SystemTime::now()), - size: buf.len(), - }; - - (meta, Bytes::from(buf)) - }) - .collect(); - - let mut objects = Vec::with_capacity(parquet_batches.len()); - for (meta, bytes) in parquet_batches { - in_memory - .put(&meta.location, bytes) - .await - .expect("put parquet file into in memory object store"); - objects.push(meta); - } - - (Arc::new(in_memory), objects) - } } #[cfg(test)] diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs index 851141d7d637..31fa1f2d8bcf 100644 --- a/datafusion/core/src/physical_plan/file_format/mod.rs +++ b/datafusion/core/src/physical_plan/file_format/mod.rs @@ -29,7 +29,7 @@ mod parquet; pub(crate) use self::csv::plan_to_csv; pub use self::csv::CsvExec; pub(crate) use self::parquet::plan_to_parquet; -pub use self::parquet::{ParquetExec, ParquetFileReaderFactory}; +pub use self::parquet::{ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory}; use arrow::{ array::{ArrayData, ArrayRef, DictionaryArray}, buffer::Buffer, diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 6f2718a5441b..3e58e31e086a 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -86,17 +86,6 @@ pub struct ParquetExec { parquet_file_reader_factory: Option>, } -/// Stores metrics about the parquet execution for a particular parquet file -#[derive(Debug, Clone)] -struct ParquetFileMetrics { - /// Number of times the predicate could not be evaluated - pub predicate_evaluation_errors: metrics::Count, - /// Number of row groups pruned using - pub row_groups_pruned: metrics::Count, - /// Total number of bytes scanned - pub bytes_scanned: metrics::Count, -} - impl ParquetExec { /// Create a new Parquet reader execution plan provided file list and schema. pub fn new( @@ -164,6 +153,20 @@ impl ParquetExec { } } +/// Stores metrics about the parquet execution for a particular parquet file. +/// +/// This component is a subject to **change** in near future and is exposed for low level integrations +/// through [ParquetFileReaderFactory]. +#[derive(Debug, Clone)] +pub struct ParquetFileMetrics { + /// Number of times the predicate could not be evaluated + pub predicate_evaluation_errors: metrics::Count, + /// Number of row groups pruned using + pub row_groups_pruned: metrics::Count, + /// Total number of bytes scanned + pub bytes_scanned: metrics::Count, +} + impl ParquetFileMetrics { /// Create new metrics pub fn new( @@ -412,8 +415,8 @@ impl DefaultParquetFileReaderFactory { struct ParquetFileReader { store: Arc, meta: ObjectMeta, - metadata_size_hint: Option, metrics: ParquetFileMetrics, + metadata_size_hint: Option, } impl AsyncFileReader for ParquetFileReader { @@ -859,105 +862,6 @@ mod tests { const EXPECTED_USER_DEFINED_METADATA: &str = "some-user-defined-metadata"; - #[tokio::test] - async fn route_data_access_ops_to_parquet_file_reader_factory() { - let c1: ArrayRef = - Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); - let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); - let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None])); - - let batch = create_batch(vec![ - ("c1", c1.clone()), - ("c2", c2.clone()), - ("c3", c3.clone()), - ]); - - let file_schema = batch.schema().clone(); - let (in_memory_object_store, parquet_files_meta) = - store_parquet_in_memory(vec![batch]).await; - let file_groups = parquet_files_meta - .into_iter() - .map(|meta| PartitionedFile { - object_meta: meta, - partition_values: vec![], - range: None, - extensions: Some(Arc::new(String::from(EXPECTED_USER_DEFINED_METADATA))), - }) - .collect(); - - // prepare the scan - let parquet_exec = ParquetExec::new( - FileScanConfig { - // just any url that doesn't point to in memory object store - object_store_url: ObjectStoreUrl::local_filesystem(), - file_groups: vec![file_groups], - file_schema, - statistics: Statistics::default(), - projection: None, - limit: None, - table_partition_cols: vec![], - }, - None, - None, - ) - .with_parquet_file_reader_factory(Arc::new( - InMemoryParquetFileReaderFactory(Arc::clone(&in_memory_object_store)), - )); - - let session_ctx = SessionContext::new(); - - let task_ctx = session_ctx.task_ctx(); - let read = collect(Arc::new(parquet_exec), task_ctx).await.unwrap(); - - let expected = vec![ - "+-----+----+----+", - "| c1 | c2 | c3 |", - "+-----+----+----+", - "| Foo | 1 | 10 |", - "| | 2 | 20 |", - "| bar | | |", - "+-----+----+----+", - ]; - - assert_batches_sorted_eq!(expected, &read); - } - - #[derive(Debug)] - struct InMemoryParquetFileReaderFactory(Arc); - - impl ParquetFileReaderFactory for InMemoryParquetFileReaderFactory { - fn create_reader( - &self, - partition_index: usize, - file_meta: FileMeta, - metadata_size_hint: Option, - metrics: &ExecutionPlanMetricsSet, - ) -> Result> { - let metadata = file_meta - .extensions - .as_ref() - .expect("has user defined metadata"); - let metadata = metadata - .downcast_ref::() - .expect("has string metadata"); - - assert_eq!(EXPECTED_USER_DEFINED_METADATA, &metadata[..]); - - let parquet_file_metrics = ParquetFileMetrics::new( - partition_index, - file_meta.location().as_ref(), - metrics, - ); - - Ok(Box::new(ParquetFileReader { - meta: file_meta.object_meta, - store: Arc::clone(&self.0), - metadata_size_hint, - metrics: parquet_file_metrics, - })) - } - } - #[tokio::test] async fn evolved_schema() { let c1: ArrayRef = diff --git a/datafusion/core/tests/custom_parquet_reader.rs b/datafusion/core/tests/custom_parquet_reader.rs new file mode 100644 index 000000000000..ae4b05fa7f2c --- /dev/null +++ b/datafusion/core/tests/custom_parquet_reader.rs @@ -0,0 +1,242 @@ +#[cfg(test)] +mod tests { + use arrow::array::{ArrayRef, Int64Array, Int8Array, StringArray}; + use arrow::datatypes::{Field, Schema}; + use arrow::record_batch::RecordBatch; + use bytes::Bytes; + use datafusion::assert_batches_sorted_eq; + use datafusion::datasource::file_format::parquet::fetch_parquet_metadata; + use datafusion::datasource::listing::PartitionedFile; + use datafusion::datasource::object_store::ObjectStoreUrl; + use datafusion::physical_plan::file_format::{ + FileMeta, FileScanConfig, ParquetExec, ParquetFileMetrics, + ParquetFileReaderFactory, + }; + use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; + use datafusion::physical_plan::{collect, Statistics}; + use datafusion::prelude::SessionContext; + use datafusion_common::DataFusionError; + use futures::future::BoxFuture; + use futures::{StreamExt, TryFutureExt}; + use object_store::memory::InMemory; + use object_store::path::Path; + use object_store::{ObjectMeta, ObjectStore}; + use parquet::arrow::async_reader::AsyncFileReader; + use parquet::arrow::ArrowWriter; + use parquet::errors::ParquetError; + use parquet::file::metadata::ParquetMetaData; + use std::io::Cursor; + use std::ops::Range; + use std::sync::Arc; + use std::time::SystemTime; + + const EXPECTED_USER_DEFINED_METADATA: &str = "some-user-defined-metadata"; + + #[tokio::test] + async fn route_data_access_ops_to_parquet_file_reader_factory() { + let c1: ArrayRef = + Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); + let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); + let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None])); + + let batch = create_batch(vec![ + ("c1", c1.clone()), + ("c2", c2.clone()), + ("c3", c3.clone()), + ]); + + let file_schema = batch.schema().clone(); + let (in_memory_object_store, parquet_files_meta) = + store_parquet_in_memory(vec![batch]).await; + let file_groups = parquet_files_meta + .into_iter() + .map(|meta| PartitionedFile { + object_meta: meta, + partition_values: vec![], + range: None, + extensions: Some(Arc::new(String::from(EXPECTED_USER_DEFINED_METADATA))), + }) + .collect(); + + // prepare the scan + let parquet_exec = ParquetExec::new( + FileScanConfig { + // just any url that doesn't point to in memory object store + object_store_url: ObjectStoreUrl::local_filesystem(), + file_groups: vec![file_groups], + file_schema, + statistics: Statistics::default(), + projection: None, + limit: None, + table_partition_cols: vec![], + }, + None, + None, + ) + .with_parquet_file_reader_factory(Arc::new( + InMemoryParquetFileReaderFactory(Arc::clone(&in_memory_object_store)), + )); + + let session_ctx = SessionContext::new(); + + let task_ctx = session_ctx.task_ctx(); + let read = collect(Arc::new(parquet_exec), task_ctx).await.unwrap(); + + let expected = vec![ + "+-----+----+----+", + "| c1 | c2 | c3 |", + "+-----+----+----+", + "| Foo | 1 | 10 |", + "| | 2 | 20 |", + "| bar | | |", + "+-----+----+----+", + ]; + + assert_batches_sorted_eq!(expected, &read); + } + + #[derive(Debug)] + struct InMemoryParquetFileReaderFactory(Arc); + + impl ParquetFileReaderFactory for InMemoryParquetFileReaderFactory { + fn create_reader( + &self, + partition_index: usize, + file_meta: FileMeta, + metadata_size_hint: Option, + metrics: &ExecutionPlanMetricsSet, + ) -> Result, DataFusionError> { + let metadata = file_meta + .extensions + .as_ref() + .expect("has user defined metadata"); + let metadata = metadata + .downcast_ref::() + .expect("has string metadata"); + + assert_eq!(EXPECTED_USER_DEFINED_METADATA, &metadata[..]); + + let parquet_file_metrics = ParquetFileMetrics::new( + partition_index, + file_meta.location().as_ref(), + metrics, + ); + + Ok(Box::new(ParquetFileReader { + store: Arc::clone(&self.0), + meta: file_meta.object_meta, + metrics: parquet_file_metrics, + metadata_size_hint, + })) + } + } + + fn create_batch(columns: Vec<(&str, ArrayRef)>) -> RecordBatch { + columns.into_iter().fold( + RecordBatch::new_empty(Arc::new(Schema::new(vec![]))), + |batch, (field_name, arr)| add_to_batch(&batch, field_name, arr.clone()), + ) + } + + fn add_to_batch( + batch: &RecordBatch, + field_name: &str, + array: ArrayRef, + ) -> RecordBatch { + let mut fields = batch.schema().fields().clone(); + fields.push(Field::new(field_name, array.data_type().clone(), true)); + let schema = Arc::new(Schema::new(fields)); + + let mut columns = batch.columns().to_vec(); + columns.push(array); + RecordBatch::try_new(schema, columns).expect("error; creating record batch") + } + + async fn store_parquet_in_memory( + batches: Vec, + ) -> (Arc, Vec) { + let in_memory = InMemory::new(); + + let parquet_batches: Vec<(ObjectMeta, Bytes)> = batches + .into_iter() + .enumerate() + .map(|(offset, batch)| { + let mut buf = Vec::::with_capacity(32 * 1024); + let mut output = Cursor::new(&mut buf); + + let mut writer = ArrowWriter::try_new(&mut output, batch.schema(), None) + .expect("creating writer"); + + writer.write(&batch).expect("Writing batch"); + writer.close().unwrap(); + + let meta = ObjectMeta { + location: Path::parse(format!("file-{offset}.parquet")) + .expect("creating path"), + last_modified: chrono::DateTime::from(SystemTime::now()), + size: buf.len(), + }; + + (meta, Bytes::from(buf)) + }) + .collect(); + + let mut objects = Vec::with_capacity(parquet_batches.len()); + for (meta, bytes) in parquet_batches { + in_memory + .put(&meta.location, bytes) + .await + .expect("put parquet file into in memory object store"); + objects.push(meta); + } + + (Arc::new(in_memory), objects) + } + + /// Implements [`AsyncFileReader`] for a parquet file in object storage + struct ParquetFileReader { + store: Arc, + meta: ObjectMeta, + metrics: ParquetFileMetrics, + metadata_size_hint: Option, + } + + impl AsyncFileReader for ParquetFileReader { + fn get_bytes( + &mut self, + range: Range, + ) -> BoxFuture<'_, parquet::errors::Result> { + self.metrics.bytes_scanned.add(range.end - range.start); + + self.store + .get_range(&self.meta.location, range) + .map_err(|e| { + ParquetError::General(format!( + "AsyncChunkReader::get_bytes error: {}", + e + )) + }) + .boxed() + } + + fn get_metadata( + &mut self, + ) -> BoxFuture<'_, parquet::errors::Result>> { + Box::pin(async move { + let metadata = fetch_parquet_metadata( + self.store.as_ref(), + &self.meta, + self.metadata_size_hint, + ) + .await + .map_err(|e| { + ParquetError::General(format!( + "AsyncChunkReader::get_metadata error: {}", + e + )) + })?; + Ok(Arc::new(metadata)) + }) + } + } +} From 30d664aa7829b200c00d2f9ac77aa255a5247dea Mon Sep 17 00:00:00 2001 From: Kamil Konior Date: Tue, 9 Aug 2022 17:45:21 +0200 Subject: [PATCH 21/23] further cleanup --- datafusion/core/src/datasource/file_format/parquet.rs | 5 ----- datafusion/core/src/physical_plan/file_format/parquet.rs | 6 +----- datafusion/core/tests/custom_parquet_reader.rs | 2 +- 3 files changed, 2 insertions(+), 11 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 39e9c4ba27d4..7e7a522e337e 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -526,13 +526,8 @@ pub(crate) mod test_util { use super::*; use crate::test::object_store::local_unpartitioned_file; use arrow::record_batch::RecordBatch; - use bytes::Bytes; - use object_store::memory::InMemory; - use object_store::path::Path; use parquet::arrow::ArrowWriter; use parquet::file::properties::WriterProperties; - use std::io::Cursor; - use std::time::SystemTime; use tempfile::NamedTempFile; pub async fn store_parquet( diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index b0a8f7a004e0..ea40672b646a 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -760,9 +760,7 @@ pub async fn plan_to_parquet( #[cfg(test)] mod tests { use super::*; - use crate::datasource::file_format::parquet::test_util::{ - store_parquet, store_parquet_in_memory, - }; + use crate::datasource::file_format::parquet::test_util::store_parquet; use crate::datasource::file_format::test_util::scan_format; use crate::datasource::listing::{FileRange, PartitionedFile}; use crate::datasource::object_store::ObjectStoreUrl; @@ -857,8 +855,6 @@ mod tests { ) } - const EXPECTED_USER_DEFINED_METADATA: &str = "some-user-defined-metadata"; - #[tokio::test] async fn evolved_schema() { let c1: ArrayRef = diff --git a/datafusion/core/tests/custom_parquet_reader.rs b/datafusion/core/tests/custom_parquet_reader.rs index ae4b05fa7f2c..d579791bab66 100644 --- a/datafusion/core/tests/custom_parquet_reader.rs +++ b/datafusion/core/tests/custom_parquet_reader.rs @@ -17,7 +17,7 @@ mod tests { use datafusion::prelude::SessionContext; use datafusion_common::DataFusionError; use futures::future::BoxFuture; - use futures::{StreamExt, TryFutureExt}; + use futures::{FutureExt, TryFutureExt}; use object_store::memory::InMemory; use object_store::path::Path; use object_store::{ObjectMeta, ObjectStore}; From e18d26fc86b308defd4870ae1656074987859d37 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 9 Aug 2022 12:18:12 -0400 Subject: [PATCH 22/23] fix: Add apache RAT license --- datafusion/core/tests/custom_parquet_reader.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/datafusion/core/tests/custom_parquet_reader.rs b/datafusion/core/tests/custom_parquet_reader.rs index d579791bab66..ca450ec92dfb 100644 --- a/datafusion/core/tests/custom_parquet_reader.rs +++ b/datafusion/core/tests/custom_parquet_reader.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + #[cfg(test)] mod tests { use arrow::array::{ArrayRef, Int64Array, Int8Array, StringArray}; From cec773f5653fcd893b495c895b99668aa743b9d4 Mon Sep 17 00:00:00 2001 From: Kamil Konior Date: Tue, 9 Aug 2022 19:53:51 +0200 Subject: [PATCH 23/23] fix send --- datafusion/core/tests/custom_parquet_reader.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/core/tests/custom_parquet_reader.rs b/datafusion/core/tests/custom_parquet_reader.rs index ca450ec92dfb..ac8c98381447 100644 --- a/datafusion/core/tests/custom_parquet_reader.rs +++ b/datafusion/core/tests/custom_parquet_reader.rs @@ -1,3 +1,5 @@ +// TODO: Temporary workaround for https://github.com/apache/arrow-rs/issues/2372 (#3081) +#![allow(where_clauses_object_safety)] // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information