From 1c7ae00ebf77c0a26b4d6fcee70adf24fe507a6a Mon Sep 17 00:00:00 2001 From: Dan Harris Date: Wed, 20 Jul 2022 15:45:57 -0400 Subject: [PATCH 1/4] Add get_byte_ranges method to AsyncFileReader trait --- parquet/src/arrow/async_reader.rs | 60 +++++++++++++++++++++++++------ 1 file changed, 50 insertions(+), 10 deletions(-) diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs index 923f329eff2..3f437becfe0 100644 --- a/parquet/src/arrow/async_reader.rs +++ b/parquet/src/arrow/async_reader.rs @@ -77,6 +77,7 @@ use std::collections::VecDeque; use std::fmt::Formatter; + use std::io::{Cursor, SeekFrom}; use std::ops::Range; use std::pin::Pin; @@ -86,6 +87,7 @@ use std::task::{Context, Poll}; use bytes::Bytes; use futures::future::{BoxFuture, FutureExt}; use futures::stream::Stream; +use futures::StreamExt; use parquet_format::PageType; use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; @@ -111,6 +113,27 @@ pub trait AsyncFileReader { /// Retrieve the bytes in `range` fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result>; + /// Retrieve multiple byte ranges. The default implementation will call `get_bytes` sequentially + fn get_byte_ranges( + &mut self, + ranges: Vec>, + ) -> BoxFuture<'_, Result>> + where + Self: Send, + { + async move { + let mut result = Vec::with_capacity(ranges.len()); + + for range in ranges.into_iter() { + let data = self.get_bytes(range).await?; + result.push(data); + } + + Ok(result) + } + .boxed() + } + /// Provides asynchronous access to the [`ParquetMetaData`] of a parquet file, /// allowing fine-grained control over how metadata is sourced, in particular allowing /// for caching, pre-fetching, catalog metadata, etc... @@ -366,25 +389,42 @@ where let mut column_chunks = vec![None; row_group_metadata.columns().len()]; + let mut fetch_ranges = + Vec::with_capacity(column_chunks.len()); + let mut skip_idx = vec![]; // TODO: Combine consecutive ranges - for (idx, chunk) in column_chunks.iter_mut().enumerate() { + for idx in 0..column_chunks.len() { if !projection.leaf_included(idx) { + skip_idx.push(idx); continue; } let column = row_group_metadata.column(idx); let (start, length) = column.byte_range(); - let data = input - .get_bytes(start as usize..(start + length) as usize) - .await?; + fetch_ranges + .push(start as usize..(start + length) as usize); + } + + let mut chunks = VecDeque::from( + input.get_byte_ranges(fetch_ranges).await?, + ); - *chunk = Some(InMemoryColumnChunk { - num_values: column.num_values(), - compression: column.compression(), - physical_type: column.column_type(), - data, - }); + for (idx, chunk) in column_chunks.iter_mut().enumerate() { + if skip_idx.contains(&idx) { + continue; + } + + if let Some(data) = chunks.pop_front() { + let column = row_group_metadata.column(idx); + + *chunk = Some(InMemoryColumnChunk { + num_values: column.num_values(), + compression: column.compression(), + physical_type: column.column_type(), + data, + }); + } } Ok(( From c8f171782a6313cbdf31dddf90f833f539f52596 Mon Sep 17 00:00:00 2001 From: Dan Harris Date: Wed, 20 Jul 2022 16:15:30 -0400 Subject: [PATCH 2/4] Remove overhead --- parquet/src/arrow/async_reader.rs | 35 ++++++++++++++++--------------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs index 3f437becfe0..dcc33bb36c7 100644 --- a/parquet/src/arrow/async_reader.rs +++ b/parquet/src/arrow/async_reader.rs @@ -103,7 +103,7 @@ use crate::column::page::{Page, PageIterator, PageMetadata, PageReader}; use crate::compression::{create_codec, Codec}; use crate::errors::{ParquetError, Result}; use crate::file::footer::{decode_footer, decode_metadata}; -use crate::file::metadata::ParquetMetaData; +use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaData}; use crate::file::serialized_reader::{decode_page, read_page_header}; use crate::file::FOOTER_SIZE; use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor}; @@ -391,11 +391,14 @@ where let mut fetch_ranges = Vec::with_capacity(column_chunks.len()); - let mut skip_idx = vec![]; + + let mut update_chunks: Vec<( + &mut Option, + &ColumnChunkMetaData, + )> = Vec::with_capacity(column_chunks.len()); // TODO: Combine consecutive ranges - for idx in 0..column_chunks.len() { + for (idx, chunk) in column_chunks.iter_mut().enumerate() { if !projection.leaf_included(idx) { - skip_idx.push(idx); continue; } @@ -404,21 +407,19 @@ where fetch_ranges .push(start as usize..(start + length) as usize); - } - let mut chunks = VecDeque::from( - input.get_byte_ranges(fetch_ranges).await?, - ); - - for (idx, chunk) in column_chunks.iter_mut().enumerate() { - if skip_idx.contains(&idx) { - continue; - } - - if let Some(data) = chunks.pop_front() { - let column = row_group_metadata.column(idx); + update_chunks.push((chunk, column)); + } - *chunk = Some(InMemoryColumnChunk { + for (idx, data) in input + .get_byte_ranges(fetch_ranges) + .await? + .into_iter() + .enumerate() + { + if let Some((chunk, column)) = update_chunks.get_mut(idx) + { + **chunk = Some(InMemoryColumnChunk { num_values: column.num_values(), compression: column.compression(), physical_type: column.column_type(), From a72522374466fe82f410927b72f09ffbb24a1418 Mon Sep 17 00:00:00 2001 From: Dan Harris Date: Wed, 20 Jul 2022 16:36:27 -0400 Subject: [PATCH 3/4] linting --- parquet/src/arrow/async_reader.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs index dcc33bb36c7..21bdb4533cc 100644 --- a/parquet/src/arrow/async_reader.rs +++ b/parquet/src/arrow/async_reader.rs @@ -87,7 +87,6 @@ use std::task::{Context, Poll}; use bytes::Bytes; use futures::future::{BoxFuture, FutureExt}; use futures::stream::Stream; -use futures::StreamExt; use parquet_format::PageType; use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; From 76647dda6cb23fe89afefd6eed9f0621ea76f882 Mon Sep 17 00:00:00 2001 From: Dan Harris Date: Thu, 21 Jul 2022 06:49:53 -0400 Subject: [PATCH 4/4] pr comments --- parquet/src/arrow/async_reader.rs | 44 ++++++++++++++----------------- 1 file changed, 20 insertions(+), 24 deletions(-) diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs index 21bdb4533cc..19e1de9fc14 100644 --- a/parquet/src/arrow/async_reader.rs +++ b/parquet/src/arrow/async_reader.rs @@ -102,7 +102,7 @@ use crate::column::page::{Page, PageIterator, PageMetadata, PageReader}; use crate::compression::{create_codec, Codec}; use crate::errors::{ParquetError, Result}; use crate::file::footer::{decode_footer, decode_metadata}; -use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaData}; +use crate::file::metadata::ParquetMetaData; use crate::file::serialized_reader::{decode_page, read_page_header}; use crate::file::FOOTER_SIZE; use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor}; @@ -388,37 +388,33 @@ where let mut column_chunks = vec![None; row_group_metadata.columns().len()]; - let mut fetch_ranges = - Vec::with_capacity(column_chunks.len()); - - let mut update_chunks: Vec<( - &mut Option, - &ColumnChunkMetaData, - )> = Vec::with_capacity(column_chunks.len()); // TODO: Combine consecutive ranges + let fetch_ranges = (0..column_chunks.len()) + .into_iter() + .filter_map(|idx| { + if !projection.leaf_included(idx) { + None + } else { + let column = row_group_metadata.column(idx); + let (start, length) = column.byte_range(); + + Some(start as usize..(start + length) as usize) + } + }) + .collect(); + + let mut chunk_data = + input.get_byte_ranges(fetch_ranges).await?.into_iter(); + for (idx, chunk) in column_chunks.iter_mut().enumerate() { if !projection.leaf_included(idx) { continue; } let column = row_group_metadata.column(idx); - let (start, length) = column.byte_range(); - fetch_ranges - .push(start as usize..(start + length) as usize); - - update_chunks.push((chunk, column)); - } - - for (idx, data) in input - .get_byte_ranges(fetch_ranges) - .await? - .into_iter() - .enumerate() - { - if let Some((chunk, column)) = update_chunks.get_mut(idx) - { - **chunk = Some(InMemoryColumnChunk { + if let Some(data) = chunk_data.next() { + *chunk = Some(InMemoryColumnChunk { num_values: column.num_values(), compression: column.compression(), physical_type: column.column_type(),