From be0d34d4fd37431ed57e7ab001da5f55a016a76e Mon Sep 17 00:00:00 2001 From: Dan Harris <1327726+thinkharderdev@users.noreply.github.com> Date: Thu, 21 Jul 2022 08:05:21 -0400 Subject: [PATCH] Add get_byte_ranges method to AsyncFileReader trait (#2115) * Add get_byte_ranges method to AsyncFileReader trait * Remove overhead * linting * pr comments --- parquet/src/arrow/async_reader.rs | 60 ++++++++++++++++++++++++------- 1 file changed, 48 insertions(+), 12 deletions(-) diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs index 923f329eff2..19e1de9fc14 100644 --- a/parquet/src/arrow/async_reader.rs +++ b/parquet/src/arrow/async_reader.rs @@ -77,6 +77,7 @@ use std::collections::VecDeque; use std::fmt::Formatter; + use std::io::{Cursor, SeekFrom}; use std::ops::Range; use std::pin::Pin; @@ -111,6 +112,27 @@ pub trait AsyncFileReader { /// Retrieve the bytes in `range` fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result>; + /// Retrieve multiple byte ranges. The default implementation will call `get_bytes` sequentially + fn get_byte_ranges( + &mut self, + ranges: Vec>, + ) -> BoxFuture<'_, Result>> + where + Self: Send, + { + async move { + let mut result = Vec::with_capacity(ranges.len()); + + for range in ranges.into_iter() { + let data = self.get_bytes(range).await?; + result.push(data); + } + + Ok(result) + } + .boxed() + } + /// Provides asynchronous access to the [`ParquetMetaData`] of a parquet file, /// allowing fine-grained control over how metadata is sourced, in particular allowing /// for caching, pre-fetching, catalog metadata, etc... @@ -367,24 +389,38 @@ where vec![None; row_group_metadata.columns().len()]; // TODO: Combine consecutive ranges + let fetch_ranges = (0..column_chunks.len()) + .into_iter() + .filter_map(|idx| { + if !projection.leaf_included(idx) { + None + } else { + let column = row_group_metadata.column(idx); + let (start, length) = column.byte_range(); + + Some(start as usize..(start + length) as usize) + } + }) + .collect(); + + let mut chunk_data = + input.get_byte_ranges(fetch_ranges).await?.into_iter(); + for (idx, chunk) in column_chunks.iter_mut().enumerate() { if !projection.leaf_included(idx) { continue; } let column = row_group_metadata.column(idx); - let (start, length) = column.byte_range(); - - let data = input - .get_bytes(start as usize..(start + length) as usize) - .await?; - - *chunk = Some(InMemoryColumnChunk { - num_values: column.num_values(), - compression: column.compression(), - physical_type: column.column_type(), - data, - }); + + if let Some(data) = chunk_data.next() { + *chunk = Some(InMemoryColumnChunk { + num_values: column.num_values(), + compression: column.compression(), + physical_type: column.column_type(), + data, + }); + } } Ok((