Skip to content

Commit

Permalink
Avoid large over allocate buffer in async reader (#2537)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ted-Jiang committed Aug 20, 2022
1 parent 1eb6c45 commit 98de2e3
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 1 deletion.
4 changes: 4 additions & 0 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,11 @@ impl<T> ArrowReaderBuilder<T> {
}

/// Set the size of [`RecordBatch`] to produce. Defaults to 1024
/// If the batch_size more than the file row count, use the file row count.
pub fn with_batch_size(self, batch_size: usize) -> Self {
// Try to avoid allocate large buffer
let batch_size =
batch_size.min(self.metadata.file_metadata().num_rows() as usize);
Self { batch_size, ..self }
}

Expand Down
36 changes: 35 additions & 1 deletion parquet/src/arrow/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,10 @@ impl<T: AsyncFileReader + Send + 'static> ArrowReaderBuilder<AsyncReader<T>> {
None => (0..self.metadata.row_groups().len()).collect(),
};

// Try to avoid allocate large buffer
let batch_size = self
.batch_size
.min(self.metadata.file_metadata().num_rows() as usize);
let reader = ReaderFactory {
input: self.input.0,
filter: self.filter,
Expand All @@ -338,7 +342,7 @@ impl<T: AsyncFileReader + Send + 'static> ArrowReaderBuilder<AsyncReader<T>> {

Ok(ParquetRecordBatchStream {
metadata: self.metadata,
batch_size: self.batch_size,
batch_size,
row_groups,
projection: self.projection,
selection: self.selection,
Expand Down Expand Up @@ -1133,4 +1137,34 @@ mod tests {

assert_eq!(&requests[..], &expected_page_requests)
}

#[tokio::test]
async fn test_batch_size_overallocate() {
let testdata = arrow::util::test_util::parquet_test_data();
// `alltypes_plain.parquet` only have 8 rows
let path = format!("{}/alltypes_plain.parquet", testdata);
let data = Bytes::from(std::fs::read(path).unwrap());

let metadata = parse_metadata(&data).unwrap();
let file_rows = metadata.file_metadata().num_rows() as usize;
let metadata = Arc::new(metadata);

let async_reader = TestReader {
data: data.clone(),
metadata: metadata.clone(),
requests: Default::default(),
};

let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
.await
.unwrap();

let stream = builder
.with_projection(ProjectionMask::all())
.with_batch_size(1024)
.build()
.unwrap();
assert_ne!(1024, file_rows);
assert_eq!(stream.batch_size, file_rows as usize);
}
}

0 comments on commit 98de2e3

Please sign in to comment.