Skip to content

Commit

Permalink
PR comments: Guard against size_hint larger than file size and verify…
Browse files Browse the repository at this point in the history
… request counts in unit test
  • Loading branch information
thinkharderdev committed Jul 20, 2022
1 parent 6d2ca39 commit d9e7996
Showing 1 changed file with 134 additions and 14 deletions.
148 changes: 134 additions & 14 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,10 +327,10 @@ pub(crate) async fn fetch_parquet_metadata(
)));
}

// If a size hint is provided, read more than the minimum size
// If a size hint is provided, read more than the minimum size
// to try and avoid a second fetch.
let footer_start = if let Some(size_hint) = size_hint {
meta.size - size_hint
meta.size.saturating_sub(size_hint)
} else {
meta.size - 8
};
Expand Down Expand Up @@ -515,6 +515,9 @@ pub(crate) mod test_util {
mod tests {
use super::super::test_util::scan_format;
use crate::physical_plan::collect;
use std::fmt::{Display, Formatter};
use std::ops::Range;
use std::sync::atomic::{AtomicUsize, Ordering};

use super::*;

Expand All @@ -526,9 +529,14 @@ mod tests {
StringArray, TimestampNanosecondArray,
};
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use bytes::Bytes;
use datafusion_common::ScalarValue;
use futures::stream::BoxStream;
use futures::StreamExt;
use object_store::local::LocalFileSystem;
use object_store::path::Path;
use object_store::{GetResult, ListResult};

#[tokio::test]
async fn read_merged_batches() -> Result<()> {
Expand Down Expand Up @@ -567,6 +575,90 @@ mod tests {
Ok(())
}

#[derive(Debug)]
struct RequestCountingObjectStore {
inner: Arc<dyn ObjectStore>,
request_count: AtomicUsize,
}

impl Display for RequestCountingObjectStore {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "RequestCounting({})", self.inner)
}
}

impl RequestCountingObjectStore {
pub fn new(inner: Arc<dyn ObjectStore>) -> Self {
Self {
inner,
request_count: Default::default(),
}
}

pub fn request_count(&self) -> usize {
self.request_count.load(Ordering::SeqCst)
}

pub fn upcast(self: &Arc<Self>) -> Arc<dyn ObjectStore> {
self.clone()
}
}

#[async_trait]
impl ObjectStore for RequestCountingObjectStore {
async fn put(&self, _location: &Path, _bytes: Bytes) -> object_store::Result<()> {
Err(object_store::Error::NotImplemented)
}

async fn get(&self, _location: &Path) -> object_store::Result<GetResult> {
Err(object_store::Error::NotImplemented)
}

async fn get_range(
&self,
location: &Path,
range: Range<usize>,
) -> object_store::Result<Bytes> {
self.request_count.fetch_add(1, Ordering::SeqCst);
self.inner.get_range(location, range).await
}

async fn head(&self, _location: &Path) -> object_store::Result<ObjectMeta> {
Err(object_store::Error::NotImplemented)
}

async fn delete(&self, _location: &Path) -> object_store::Result<()> {
Err(object_store::Error::NotImplemented)
}

async fn list(
&self,
_prefix: Option<&Path>,
) -> object_store::Result<BoxStream<'_, object_store::Result<ObjectMeta>>>
{
Err(object_store::Error::NotImplemented)
}

async fn list_with_delimiter(
&self,
_prefix: Option<&Path>,
) -> object_store::Result<ListResult> {
Err(object_store::Error::NotImplemented)
}

async fn copy(&self, _from: &Path, _to: &Path) -> object_store::Result<()> {
Err(object_store::Error::NotImplemented)
}

async fn copy_if_not_exists(
&self,
_from: &Path,
_to: &Path,
) -> object_store::Result<()> {
Err(object_store::Error::NotImplemented)
}
}

#[tokio::test]
async fn fetch_metadata_with_size_hint() -> Result<()> {
let c1: ArrayRef =
Expand All @@ -577,46 +669,74 @@ mod tests {
let batch1 = RecordBatch::try_from_iter(vec![("c1", c1.clone())]).unwrap();
let batch2 = RecordBatch::try_from_iter(vec![("c2", c2)]).unwrap();

let store = Arc::new(LocalFileSystem::new()) as _;
let store = Arc::new(RequestCountingObjectStore::new(Arc::new(
LocalFileSystem::new(),
)));
let (meta, _files) = store_parquet(vec![batch1, batch2]).await?;

// Use a size hint larger than the parquet footer but smaller than the actual metadata, requiring a second fetch
// for the remaining metadata
let format = ParquetFormat::default().with_metadata_size_hint(9);
let schema = format.infer_schema(&store, &meta).await.unwrap();

fetch_parquet_metadata(store.as_ref(), &meta[0], Some(9))
fetch_parquet_metadata(store.as_ref() as &dyn ObjectStore, &meta[0], Some(9))
.await
.expect("error reading metadata with hint");

assert_eq!(store.request_count(), 2);

let format = ParquetFormat::default().with_metadata_size_hint(9);
let schema = format.infer_schema(&store.upcast(), &meta).await.unwrap();

let stats =
fetch_statistics(store.as_ref(), schema.clone(), &meta[0], Some(9)).await?;
fetch_statistics(store.upcast().as_ref(), schema.clone(), &meta[0], Some(9))
.await?;

assert_eq!(stats.num_rows, Some(3));
let c1_stats = &stats.column_statistics.as_ref().expect("missing c1 stats")[0];
let c2_stats = &stats.column_statistics.as_ref().expect("missing c2 stats")[1];
assert_eq!(c1_stats.null_count, Some(1));
assert_eq!(c2_stats.null_count, Some(3));

let store = Arc::new(RequestCountingObjectStore::new(Arc::new(
LocalFileSystem::new(),
)));

// Use the file size as the hint so we can get the full metadata from the first fetch
let size_hint = meta[0].size;
let format = ParquetFormat::default().with_metadata_size_hint(size_hint);
let schema = format.infer_schema(&store, &meta).await.unwrap();

fetch_parquet_metadata(store.as_ref(), &meta[0], Some(size_hint))
fetch_parquet_metadata(store.upcast().as_ref(), &meta[0], Some(size_hint))
.await
.expect("error reading metadata with hint");

let stats =
fetch_statistics(store.as_ref(), schema.clone(), &meta[0], Some(size_hint))
.await?;
assert_eq!(store.request_count(), 1);

let format = ParquetFormat::default().with_metadata_size_hint(size_hint);
let schema = format.infer_schema(&store.upcast(), &meta).await.unwrap();
let stats = fetch_statistics(
store.upcast().as_ref(),
schema.clone(),
&meta[0],
Some(size_hint),
)
.await?;

assert_eq!(stats.num_rows, Some(3));
let c1_stats = &stats.column_statistics.as_ref().expect("missing c1 stats")[0];
let c2_stats = &stats.column_statistics.as_ref().expect("missing c2 stats")[1];
assert_eq!(c1_stats.null_count, Some(1));
assert_eq!(c2_stats.null_count, Some(3));

let store = Arc::new(RequestCountingObjectStore::new(Arc::new(
LocalFileSystem::new(),
)));

// Use the a size hint larger than the file size to make sure we don't panic
let size_hint = meta[0].size + 100;

fetch_parquet_metadata(store.upcast().as_ref(), &meta[0], Some(size_hint))
.await
.expect("error reading metadata with hint");

assert_eq!(store.request_count(), 1);

Ok(())
}

Expand Down

0 comments on commit d9e7996

Please sign in to comment.