Skip to content

Commit

Permalink
Add ParquetRecordBatchReaderBuilder (apache#2427)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Aug 12, 2022
1 parent b235173 commit aeee9f8
Show file tree
Hide file tree
Showing 7 changed files with 426 additions and 327 deletions.
49 changes: 46 additions & 3 deletions parquet/src/arrow/array_reader/mod.rs
Expand Up @@ -124,6 +124,49 @@ impl RowGroupCollection for Arc<dyn FileReader> {
}
}

pub(crate) struct FileReaderRowGroupCollection {
reader: Arc<dyn FileReader>,
row_groups: Option<Vec<usize>>,
}

impl FileReaderRowGroupCollection {
pub fn new(reader: Arc<dyn FileReader>, row_groups: Option<Vec<usize>>) -> Self {
Self { reader, row_groups }
}
}

impl RowGroupCollection for FileReaderRowGroupCollection {
fn schema(&self) -> SchemaDescPtr {
self.reader.metadata().file_metadata().schema_descr_ptr()
}

fn num_rows(&self) -> usize {
match &self.row_groups {
None => self.reader.metadata().file_metadata().num_rows() as usize,
Some(row_groups) => {
let meta = self.reader.metadata().row_groups();
row_groups
.iter()
.map(|x| meta[*x].num_rows() as usize)
.sum()
}
}
}

fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
let iterator = match &self.row_groups {
Some(row_groups) => FilePageIterator::with_row_groups(
i,
Box::new(row_groups.clone().into_iter()),
Arc::clone(&self.reader),
)?,
None => FilePageIterator::new(i, Arc::clone(&self.reader))?,
};

Ok(Box::new(iterator))
}
}

/// Uses `record_reader` to read up to `batch_size` records from `pages`
///
/// Returns the number of records read, which can be less than `batch_size` if
Expand Down Expand Up @@ -167,9 +210,9 @@ fn skip_records<V, CV>(
pages: &mut dyn PageIterator,
batch_size: usize,
) -> Result<usize>
where
V: ValuesBuffer + Default,
CV: ColumnValueDecoder<Slice = V::Slice>,
where
V: ValuesBuffer + Default,
CV: ColumnValueDecoder<Slice = V::Slice>,
{
let mut records_skipped = 0usize;
while records_skipped < batch_size {
Expand Down

0 comments on commit aeee9f8

Please sign in to comment.