Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow Overriding AsyncFileReader used by ParquetExec #3051

Merged
merged 25 commits into from Aug 9, 2022
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
db4e632
add metadata_ext to part file, etc
Cheappie Aug 6, 2022
12dee5e
handle error
Cheappie Aug 6, 2022
926d25d
fix compilation issues
Cheappie Aug 6, 2022
4e9eeba
rename var
Cheappie Aug 6, 2022
cab87e8
fix tests compilation issues
Cheappie Aug 6, 2022
cecfa0a
allow user to provide their own parquet reader factory
Cheappie Aug 6, 2022
124eeb9
move ThinFileReader into parquet module
Cheappie Aug 6, 2022
793d252
rename field reader to delegate
Cheappie Aug 6, 2022
8994e0c
convert if else to unwrap or else
Cheappie Aug 8, 2022
cd1e532
rename ThinFileReader to BoxedAsyncFileReader, add doc
Cheappie Aug 8, 2022
ef4e30e
hide ParquetFileMetrics
Cheappie Aug 8, 2022
cfb168f
derive debug
Cheappie Aug 8, 2022
8c6d702
convert metadata_ext field into Any type
Cheappie Aug 8, 2022
10c12da
add builder like method instead of modifying ctor
Cheappie Aug 8, 2022
e6e50c2
make `ParquetFileReaderFactory` public to let user's provide custom i…
Cheappie Aug 8, 2022
6b33ea9
imports cleanup and more docs
Cheappie Aug 8, 2022
2f2079a
try fix clippy failures
Cheappie Aug 8, 2022
cb96ca7
Disable where_clauses_object_safety
tustvold Aug 8, 2022
e387d23
add test
Cheappie Aug 9, 2022
bc66f6a
extract ParquetFileReaderFactory test to integration tests
Cheappie Aug 9, 2022
334edce
resolve conflicts
Cheappie Aug 9, 2022
30d664a
further cleanup
Cheappie Aug 9, 2022
9c2ccc3
Merge pull request #1 from tustvold/fix-send
Cheappie Aug 9, 2022
e18d26f
fix: Add apache RAT license
alamb Aug 9, 2022
cec773f
fix send
Cheappie Aug 9, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/file_format/mod.rs
Expand Up @@ -108,6 +108,7 @@ pub(crate) mod test_util {
object_meta: meta,
partition_values: vec![],
range: None,
extensions: None,
}]];

let exec = format
Expand Down
40 changes: 22 additions & 18 deletions datafusion/core/src/datasource/listing/helpers.rs
Expand Up @@ -183,24 +183,27 @@ pub async fn pruned_partition_list<'a>(
// Note: We might avoid parsing the partition values if they are not used in any projection,
// but the cost of parsing will likely be far dominated by the time to fetch the listing from
// the object store.
Ok(Box::pin(list.try_filter_map(move |file_meta| async move {
let parsed_path = parse_partitions_for_path(
table_path,
&file_meta.location,
table_partition_cols,
)
.map(|p| {
p.iter()
.map(|&pn| ScalarValue::Utf8(Some(pn.to_owned())))
.collect()
});

Ok(parsed_path.map(|partition_values| PartitionedFile {
partition_values,
object_meta: file_meta,
range: None,
}))
})))
Ok(Box::pin(list.try_filter_map(
move |object_meta| async move {
let parsed_path = parse_partitions_for_path(
table_path,
&object_meta.location,
table_partition_cols,
)
.map(|p| {
p.iter()
.map(|&pn| ScalarValue::Utf8(Some(pn.to_owned())))
.collect()
});

Ok(parsed_path.map(|partition_values| PartitionedFile {
partition_values,
object_meta,
range: None,
extensions: None,
}))
},
)))
} else {
// parse the partition values and serde them as a RecordBatch to filter them
let metas: Vec<_> = list.try_collect().await?;
Expand Down Expand Up @@ -317,6 +320,7 @@ fn batches_to_paths(batches: &[RecordBatch]) -> Result<Vec<PartitionedFile>> {
})
.collect(),
range: None,
extensions: None,
})
})
})
Expand Down
6 changes: 6 additions & 0 deletions datafusion/core/src/datasource/listing/mod.rs
Expand Up @@ -28,6 +28,7 @@ use datafusion_common::ScalarValue;
use futures::Stream;
use object_store::{path::Path, ObjectMeta};
use std::pin::Pin;
use std::sync::Arc;

pub use self::url::ListingTableUrl;
pub use table::{ListingOptions, ListingTable, ListingTableConfig};
Expand Down Expand Up @@ -58,6 +59,8 @@ pub struct PartitionedFile {
pub partition_values: Vec<ScalarValue>,
/// An optional file range for a more fine-grained parallel execution
pub range: Option<FileRange>,
/// An optional field for user defined per object metadata
pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
}

impl PartitionedFile {
Expand All @@ -71,6 +74,7 @@ impl PartitionedFile {
},
partition_values: vec![],
range: None,
extensions: None,
}
}

Expand All @@ -84,6 +88,7 @@ impl PartitionedFile {
},
partition_values: vec![],
range: Some(FileRange { start, end }),
extensions: None,
}
}
}
Expand All @@ -94,6 +99,7 @@ impl From<ObjectMeta> for PartitionedFile {
object_meta,
partition_values: vec![],
range: None,
extensions: None,
}
}
}
12 changes: 6 additions & 6 deletions datafusion/core/src/physical_plan/file_format/avro.rs
Expand Up @@ -155,6 +155,7 @@ mod private {
use super::*;
use crate::datasource::listing::FileRange;
use crate::physical_plan::file_format::file_stream::{FileOpenFuture, FileOpener};
use crate::physical_plan::file_format::FileMeta;
use bytes::Buf;
use futures::StreamExt;
use object_store::{GetResult, ObjectMeta, ObjectStore};
Expand Down Expand Up @@ -187,12 +188,11 @@ mod private {
fn open(
&self,
store: Arc<dyn ObjectStore>,
file: ObjectMeta,
_range: Option<FileRange>,
) -> FileOpenFuture {
file_meta: FileMeta,
) -> Result<FileOpenFuture> {
let config = self.config.clone();
Box::pin(async move {
match store.get(&file.location).await? {
Ok(Box::pin(async move {
match store.get(file_meta.location()).await? {
GetResult::File(file, _) => {
let reader = config.open(file)?;
Ok(futures::stream::iter(reader).boxed())
Expand All @@ -203,7 +203,7 @@ mod private {
Ok(futures::stream::iter(reader).boxed())
}
}
})
}))
}
}
}
Expand Down
20 changes: 9 additions & 11 deletions datafusion/core/src/physical_plan/file_format/csv.rs
Expand Up @@ -20,16 +20,15 @@
use crate::error::{DataFusionError, Result};
use crate::execution::context::{SessionState, TaskContext};
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
};

use crate::datasource::listing::FileRange;
use crate::physical_plan::file_format::delimited_stream::newline_delimited_stream;
use crate::physical_plan::file_format::file_stream::{
FileOpenFuture, FileOpener, FileStream,
};
use crate::physical_plan::file_format::FileMeta;
use crate::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
use crate::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
};
use arrow::csv;
use arrow::datatypes::SchemaRef;
use bytes::Buf;
Expand Down Expand Up @@ -201,12 +200,11 @@ impl FileOpener for CsvOpener {
fn open(
&self,
store: Arc<dyn ObjectStore>,
file: ObjectMeta,
_range: Option<FileRange>,
) -> FileOpenFuture {
file_meta: FileMeta,
) -> Result<FileOpenFuture> {
let config = self.config.clone();
Box::pin(async move {
match store.get(&file.location).await? {
Ok(Box::pin(async move {
match store.get(file_meta.location()).await? {
GetResult::File(file, _) => {
Ok(futures::stream::iter(config.open(file, true)).boxed())
}
Expand All @@ -222,7 +220,7 @@ impl FileOpener for CsvOpener {
.boxed())
}
}
})
}))
}
}

Expand Down
46 changes: 27 additions & 19 deletions datafusion/core/src/physical_plan/file_format/file_stream.rs
Expand Up @@ -31,14 +31,16 @@ use arrow::{error::Result as ArrowResult, record_batch::RecordBatch};
use futures::future::BoxFuture;
use futures::stream::BoxStream;
use futures::{ready, FutureExt, Stream, StreamExt};
use object_store::{ObjectMeta, ObjectStore};
use object_store::ObjectStore;

use datafusion_common::ScalarValue;

use crate::datasource::listing::{FileRange, PartitionedFile};
use crate::datasource::listing::PartitionedFile;
use crate::error::Result;
use crate::execution::context::TaskContext;
use crate::physical_plan::file_format::{FileScanConfig, PartitionColumnProjector};
use crate::physical_plan::file_format::{
FileMeta, FileScanConfig, PartitionColumnProjector,
};
use crate::physical_plan::metrics::BaselineMetrics;
use crate::physical_plan::RecordBatchStream;

Expand All @@ -50,9 +52,8 @@ pub trait FileOpener: Unpin {
fn open(
&self,
store: Arc<dyn ObjectStore>,
file: ObjectMeta,
range: Option<FileRange>,
) -> FileOpenFuture;
file_meta: FileMeta,
tustvold marked this conversation as resolved.
Show resolved Hide resolved
) -> Result<FileOpenFuture>;
}

/// A stream that iterates record batch by record batch, file over file.
Expand Down Expand Up @@ -143,20 +144,28 @@ impl<F: FileOpener> FileStream<F> {
loop {
match &mut self.state {
FileStreamState::Idle => {
let file = match self.file_iter.pop_front() {
let part_file = match self.file_iter.pop_front() {
Some(file) => file,
None => return Poll::Ready(None),
};

let future = self.file_reader.open(
self.object_store.clone(),
file.object_meta,
file.range,
);
let file_meta = FileMeta {
object_meta: part_file.object_meta,
range: part_file.range,
extensions: part_file.extensions,
};

self.state = FileStreamState::Open {
future,
partition_values: file.partition_values,
match self.file_reader.open(self.object_store.clone(), file_meta) {
Ok(future) => {
self.state = FileStreamState::Open {
future,
partition_values: part_file.partition_values,
}
}
Err(e) => {
self.state = FileStreamState::Error;
return Poll::Ready(Some(Err(e.into())));
}
}
}
FileStreamState::Open {
Expand Down Expand Up @@ -254,12 +263,11 @@ mod tests {
fn open(
&self,
_store: Arc<dyn ObjectStore>,
_file: ObjectMeta,
_range: Option<FileRange>,
) -> FileOpenFuture {
_file_meta: FileMeta,
) -> Result<FileOpenFuture> {
let iterator = self.records.clone().into_iter().map(Ok);
let stream = futures::stream::iter(iterator).boxed();
futures::future::ready(Ok(stream)).boxed()
Ok(futures::future::ready(Ok(stream)).boxed())
}
}

Expand Down
18 changes: 8 additions & 10 deletions datafusion/core/src/physical_plan/file_format/json.rs
Expand Up @@ -16,9 +16,6 @@
// under the License.

//! Execution plan for reading line-delimited JSON files
use arrow::json::reader::DecoderOptions;

use crate::datasource::listing::FileRange;
use crate::error::{DataFusionError, Result};
use crate::execution::context::SessionState;
use crate::execution::context::TaskContext;
Expand All @@ -27,14 +24,16 @@ use crate::physical_plan::file_format::delimited_stream::newline_delimited_strea
use crate::physical_plan::file_format::file_stream::{
FileOpenFuture, FileOpener, FileStream,
};
use crate::physical_plan::file_format::FileMeta;
use crate::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
use crate::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
};
use arrow::json::reader::DecoderOptions;
use arrow::{datatypes::SchemaRef, json};
use bytes::Buf;
use futures::{StreamExt, TryStreamExt};
use object_store::{GetResult, ObjectMeta, ObjectStore};
use object_store::{GetResult, ObjectStore};
use std::any::Any;
use std::fs;
use std::path::Path;
Expand Down Expand Up @@ -163,13 +162,12 @@ impl FileOpener for JsonOpener {
fn open(
&self,
store: Arc<dyn ObjectStore>,
file: ObjectMeta,
_range: Option<FileRange>,
) -> FileOpenFuture {
file_meta: FileMeta,
) -> Result<FileOpenFuture> {
let options = self.options.clone();
let schema = self.file_schema.clone();
Box::pin(async move {
match store.get(&file.location).await? {
Ok(Box::pin(async move {
match store.get(file_meta.location()).await? {
GetResult::File(file, _) => {
let reader = json::Reader::new(file, schema.clone(), options);
Ok(futures::stream::iter(reader).boxed())
Expand All @@ -188,7 +186,7 @@ impl FileOpener for JsonOpener {
.boxed())
}
}
})
}))
}
}

Expand Down
30 changes: 30 additions & 0 deletions datafusion/core/src/physical_plan/file_format/mod.rs
Expand Up @@ -41,6 +41,7 @@ pub use avro::AvroExec;
pub(crate) use json::plan_to_json;
pub use json::NdJsonExec;

use crate::datasource::listing::FileRange;
use crate::datasource::{listing::PartitionedFile, object_store::ObjectStoreUrl};
use crate::{
error::{DataFusionError, Result},
Expand All @@ -50,6 +51,8 @@ use arrow::array::{new_null_array, UInt16BufferBuilder};
use arrow::record_batch::RecordBatchOptions;
use lazy_static::lazy_static;
use log::info;
use object_store::path::Path;
use object_store::ObjectMeta;
use std::{
collections::HashMap,
fmt::{Display, Formatter, Result as FmtResult},
Expand Down Expand Up @@ -401,6 +404,33 @@ fn create_dict_array(
))
}

/// A single file or part of a file that should be read, along with its schema, statistics
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I am missing something but I don't see "schema" and "statistics" on this struct

Perhaps we should refer to "extensions" instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This description has been copied from PartitionedFile, at first I had similar impression that something is wrong, but after second read actually It doesn't tell that schema and statistics are part of the struct, but rather that these should be read from file along with data

pub struct FileMeta {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This struct makes a lot of sense to me

/// Path for the file (e.g. URL, filesystem path, etc)
pub object_meta: ObjectMeta,
/// An optional file range for a more fine-grained parallel execution
pub range: Option<FileRange>,
/// An optional field for user defined per object metadata
pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
}

impl FileMeta {
/// The full path to the object
pub fn location(&self) -> &Path {
&self.object_meta.location
}
}

impl From<ObjectMeta> for FileMeta {
fn from(object_meta: ObjectMeta) -> Self {
Self {
object_meta,
range: None,
extensions: None,
}
}
}

#[cfg(test)]
mod tests {
use crate::{
Expand Down