Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metadata_size_hint for optimistic fetching of parquet metadata #2946

Merged
merged 6 commits into from Jul 21, 2022
262 changes: 247 additions & 15 deletions datafusion/core/src/datasource/file_format/parquet.rs
Expand Up @@ -23,6 +23,7 @@ use std::sync::Arc;
use arrow::datatypes::Schema;
use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use bytes::{BufMut, BytesMut};
use datafusion_common::DataFusionError;
use hashbrown::HashMap;
use object_store::{ObjectMeta, ObjectStore};
Expand Down Expand Up @@ -52,12 +53,14 @@ pub const DEFAULT_PARQUET_EXTENSION: &str = ".parquet";
#[derive(Debug)]
pub struct ParquetFormat {
enable_pruning: bool,
metadata_size_hint: Option<usize>,
}

impl Default for ParquetFormat {
fn default() -> Self {
Self {
enable_pruning: true,
metadata_size_hint: None,
}
}
}
Expand All @@ -69,10 +72,24 @@ impl ParquetFormat {
self.enable_pruning = enable;
self
}

/// Provide a hint to the size of the file metadata. If a hint is provided
/// the reader will try and fetch the last `size_hint` bytes of the parquet file optimistically.
/// With out a hint, two read are required. One read to fetch the 8-byte parquet footer and then
/// another read to fetch the metadata length encoded in the footer.
pub fn with_metadata_size_hint(mut self, size_hint: usize) -> Self {
self.metadata_size_hint = Some(size_hint);
self
}
/// Return true if pruning is enabled
pub fn enable_pruning(&self) -> bool {
self.enable_pruning
}

/// Return the metadata size hint if set
pub fn metadata_size_hint(&self) -> Option<usize> {
self.metadata_size_hint
}
}

#[async_trait]
Expand All @@ -88,7 +105,8 @@ impl FileFormat for ParquetFormat {
) -> Result<SchemaRef> {
let mut schemas = Vec::with_capacity(objects.len());
for object in objects {
let schema = fetch_schema(store.as_ref(), object).await?;
let schema =
fetch_schema(store.as_ref(), object, self.metadata_size_hint).await?;
schemas.push(schema)
}
let schema = Schema::try_merge(schemas)?;
Expand All @@ -101,7 +119,13 @@ impl FileFormat for ParquetFormat {
table_schema: SchemaRef,
object: &ObjectMeta,
) -> Result<Statistics> {
let stats = fetch_statistics(store.as_ref(), table_schema, object).await?;
let stats = fetch_statistics(
store.as_ref(),
table_schema,
object,
self.metadata_size_hint,
)
.await?;
Ok(stats)
}

Expand All @@ -119,7 +143,11 @@ impl FileFormat for ParquetFormat {
None
};

Ok(Arc::new(ParquetExec::new(conf, predicate)))
Ok(Arc::new(ParquetExec::new(
conf,
predicate,
self.metadata_size_hint(),
)))
}
}

Expand Down Expand Up @@ -290,6 +318,7 @@ fn summarize_min_max(
pub(crate) async fn fetch_parquet_metadata(
store: &dyn ObjectStore,
meta: &ObjectMeta,
size_hint: Option<usize>,
) -> Result<ParquetMetaData> {
if meta.size < 8 {
return Err(DataFusionError::Execution(format!(
Expand All @@ -298,13 +327,22 @@ pub(crate) async fn fetch_parquet_metadata(
)));
}

let footer_start = meta.size - 8;
// If a size hint is provided, read more than the minimum size
// to try and avoid a second fetch.
let footer_start = if let Some(size_hint) = size_hint {
thinkharderdev marked this conversation as resolved.
Show resolved Hide resolved
meta.size.saturating_sub(size_hint)
} else {
meta.size - 8
};

let suffix = store
.get_range(&meta.location, footer_start..meta.size)
.await?;

let suffix_len = suffix.len();

let mut footer = [0; 8];
footer.copy_from_slice(suffix.as_ref());
footer.copy_from_slice(&suffix[suffix_len - 8..suffix_len]);

let length = decode_footer(&footer)?;

Expand All @@ -316,17 +354,35 @@ pub(crate) async fn fetch_parquet_metadata(
)));
}

let metadata_start = meta.size - length - 8;
let metadata = store
.get_range(&meta.location, metadata_start..footer_start)
.await?;
// Did not fetch the entire file metadata in the initial read, need to make a second request
if length > suffix_len - 8 {
thinkharderdev marked this conversation as resolved.
Show resolved Hide resolved
let metadata_start = meta.size - length - 8;
let remaining_metadata = store
.get_range(&meta.location, metadata_start..footer_start)
.await?;

let mut metadata = BytesMut::with_capacity(length);

Ok(decode_metadata(metadata.as_ref())?)
metadata.put(remaining_metadata.as_ref());
metadata.put(&suffix[..suffix_len - 8]);

Ok(decode_metadata(metadata.as_ref())?)
} else {
let metadata_start = meta.size - length - 8;

Ok(decode_metadata(
&suffix[metadata_start - footer_start..suffix_len - 8],
)?)
}
}

/// Read and parse the schema of the Parquet file at location `path`
async fn fetch_schema(store: &dyn ObjectStore, file: &ObjectMeta) -> Result<Schema> {
let metadata = fetch_parquet_metadata(store, file).await?;
async fn fetch_schema(
store: &dyn ObjectStore,
file: &ObjectMeta,
metadata_size_hint: Option<usize>,
) -> Result<Schema> {
let metadata = fetch_parquet_metadata(store, file, metadata_size_hint).await?;
let file_metadata = metadata.file_metadata();
let schema = parquet_to_arrow_schema(
file_metadata.schema_descr(),
Expand All @@ -340,8 +396,9 @@ async fn fetch_statistics(
store: &dyn ObjectStore,
table_schema: SchemaRef,
file: &ObjectMeta,
metadata_size_hint: Option<usize>,
) -> Result<Statistics> {
let metadata = fetch_parquet_metadata(store, file).await?;
let metadata = fetch_parquet_metadata(store, file, metadata_size_hint).await?;
let file_metadata = metadata.file_metadata();

let file_schema = parquet_to_arrow_schema(
Expand Down Expand Up @@ -458,6 +515,9 @@ pub(crate) mod test_util {
mod tests {
use super::super::test_util::scan_format;
use crate::physical_plan::collect;
use std::fmt::{Display, Formatter};
use std::ops::Range;
use std::sync::atomic::{AtomicUsize, Ordering};

use super::*;

Expand All @@ -469,9 +529,14 @@ mod tests {
StringArray, TimestampNanosecondArray,
};
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use bytes::Bytes;
use datafusion_common::ScalarValue;
use futures::stream::BoxStream;
use futures::StreamExt;
use object_store::local::LocalFileSystem;
use object_store::path::Path;
use object_store::{GetResult, ListResult};

#[tokio::test]
async fn read_merged_batches() -> Result<()> {
Expand All @@ -489,15 +554,16 @@ mod tests {
let format = ParquetFormat::default();
let schema = format.infer_schema(&store, &meta).await.unwrap();

let stats = fetch_statistics(store.as_ref(), schema.clone(), &meta[0]).await?;
let stats =
fetch_statistics(store.as_ref(), schema.clone(), &meta[0], None).await?;

assert_eq!(stats.num_rows, Some(3));
let c1_stats = &stats.column_statistics.as_ref().expect("missing c1 stats")[0];
let c2_stats = &stats.column_statistics.as_ref().expect("missing c2 stats")[1];
assert_eq!(c1_stats.null_count, Some(1));
assert_eq!(c2_stats.null_count, Some(3));

let stats = fetch_statistics(store.as_ref(), schema, &meta[1]).await?;
let stats = fetch_statistics(store.as_ref(), schema, &meta[1], None).await?;
assert_eq!(stats.num_rows, Some(3));
let c1_stats = &stats.column_statistics.as_ref().expect("missing c1 stats")[0];
let c2_stats = &stats.column_statistics.as_ref().expect("missing c2 stats")[1];
Expand All @@ -509,6 +575,172 @@ mod tests {
Ok(())
}

#[derive(Debug)]
struct RequestCountingObjectStore {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

inner: Arc<dyn ObjectStore>,
request_count: AtomicUsize,
}

impl Display for RequestCountingObjectStore {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "RequestCounting({})", self.inner)
}
}

impl RequestCountingObjectStore {
pub fn new(inner: Arc<dyn ObjectStore>) -> Self {
Self {
inner,
request_count: Default::default(),
}
}

pub fn request_count(&self) -> usize {
self.request_count.load(Ordering::SeqCst)
}

pub fn upcast(self: &Arc<Self>) -> Arc<dyn ObjectStore> {
self.clone()
}
}

#[async_trait]
impl ObjectStore for RequestCountingObjectStore {
async fn put(&self, _location: &Path, _bytes: Bytes) -> object_store::Result<()> {
Err(object_store::Error::NotImplemented)
}

async fn get(&self, _location: &Path) -> object_store::Result<GetResult> {
Err(object_store::Error::NotImplemented)
}

async fn get_range(
&self,
location: &Path,
range: Range<usize>,
) -> object_store::Result<Bytes> {
self.request_count.fetch_add(1, Ordering::SeqCst);
self.inner.get_range(location, range).await
}

async fn head(&self, _location: &Path) -> object_store::Result<ObjectMeta> {
Err(object_store::Error::NotImplemented)
}

async fn delete(&self, _location: &Path) -> object_store::Result<()> {
Err(object_store::Error::NotImplemented)
}

async fn list(
&self,
_prefix: Option<&Path>,
) -> object_store::Result<BoxStream<'_, object_store::Result<ObjectMeta>>>
{
Err(object_store::Error::NotImplemented)
}

async fn list_with_delimiter(
&self,
_prefix: Option<&Path>,
) -> object_store::Result<ListResult> {
Err(object_store::Error::NotImplemented)
}

async fn copy(&self, _from: &Path, _to: &Path) -> object_store::Result<()> {
Err(object_store::Error::NotImplemented)
}

async fn copy_if_not_exists(
&self,
_from: &Path,
_to: &Path,
) -> object_store::Result<()> {
Err(object_store::Error::NotImplemented)
}
}

#[tokio::test]
async fn fetch_metadata_with_size_hint() -> Result<()> {
let c1: ArrayRef =
Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));

let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));

let batch1 = RecordBatch::try_from_iter(vec![("c1", c1.clone())]).unwrap();
let batch2 = RecordBatch::try_from_iter(vec![("c2", c2)]).unwrap();

let store = Arc::new(RequestCountingObjectStore::new(Arc::new(
LocalFileSystem::new(),
)));
let (meta, _files) = store_parquet(vec![batch1, batch2]).await?;

// Use a size hint larger than the parquet footer but smaller than the actual metadata, requiring a second fetch
// for the remaining metadata
fetch_parquet_metadata(store.as_ref() as &dyn ObjectStore, &meta[0], Some(9))
.await
.expect("error reading metadata with hint");

assert_eq!(store.request_count(), 2);

let format = ParquetFormat::default().with_metadata_size_hint(9);
let schema = format.infer_schema(&store.upcast(), &meta).await.unwrap();

let stats =
fetch_statistics(store.upcast().as_ref(), schema.clone(), &meta[0], Some(9))
.await?;

assert_eq!(stats.num_rows, Some(3));
let c1_stats = &stats.column_statistics.as_ref().expect("missing c1 stats")[0];
let c2_stats = &stats.column_statistics.as_ref().expect("missing c2 stats")[1];
assert_eq!(c1_stats.null_count, Some(1));
assert_eq!(c2_stats.null_count, Some(3));

let store = Arc::new(RequestCountingObjectStore::new(Arc::new(
LocalFileSystem::new(),
)));

// Use the file size as the hint so we can get the full metadata from the first fetch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if there is any way to test that there was a single request made to the object store as well 🤔

let size_hint = meta[0].size;

fetch_parquet_metadata(store.upcast().as_ref(), &meta[0], Some(size_hint))
.await
.expect("error reading metadata with hint");

// ensure the requests were coalesced into a single request
assert_eq!(store.request_count(), 1);
thinkharderdev marked this conversation as resolved.
Show resolved Hide resolved

let format = ParquetFormat::default().with_metadata_size_hint(size_hint);
let schema = format.infer_schema(&store.upcast(), &meta).await.unwrap();
let stats = fetch_statistics(
store.upcast().as_ref(),
schema.clone(),
&meta[0],
Some(size_hint),
)
.await?;

assert_eq!(stats.num_rows, Some(3));
let c1_stats = &stats.column_statistics.as_ref().expect("missing c1 stats")[0];
let c2_stats = &stats.column_statistics.as_ref().expect("missing c2 stats")[1];
assert_eq!(c1_stats.null_count, Some(1));
assert_eq!(c2_stats.null_count, Some(3));

let store = Arc::new(RequestCountingObjectStore::new(Arc::new(
LocalFileSystem::new(),
)));

// Use the a size hint larger than the file size to make sure we don't panic
let size_hint = meta[0].size + 100;

fetch_parquet_metadata(store.upcast().as_ref(), &meta[0], Some(size_hint))
.await
.expect("error reading metadata with hint");

assert_eq!(store.request_count(), 1);

Ok(())
}

#[tokio::test]
async fn read_small_batches() -> Result<()> {
let config = SessionConfig::new().with_batch_size(2);
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/physical_optimizer/repartition.rs
Expand Up @@ -271,6 +271,7 @@ mod tests {
table_partition_cols: vec![],
},
None,
None,
))
}

Expand Down