Skip to content

Commit

Permalink
Access metadata of flushed row groups on write (#1691) (#1774)
Browse files Browse the repository at this point in the history
* Access metadata of flushed row groups on write (#1691)

* Add tests
  • Loading branch information
tustvold committed Jun 6, 2022
1 parent 25d5452 commit d4df1d9
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 3 deletions.
6 changes: 6 additions & 0 deletions parquet/src/arrow/arrow_writer.rs
Expand Up @@ -36,6 +36,7 @@ use super::schema::{
use crate::arrow::levels::calculate_array_levels;
use crate::column::writer::ColumnWriter;
use crate::errors::{ParquetError, Result};
use crate::file::metadata::RowGroupMetaDataPtr;
use crate::file::properties::WriterProperties;
use crate::file::writer::{SerializedColumnWriter, SerializedRowGroupWriter};
use crate::{data_type::*, file::writer::SerializedFileWriter};
Expand Down Expand Up @@ -95,6 +96,11 @@ impl<W: Write> ArrowWriter<W> {
})
}

/// Returns metadata for any flushed row groups
pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] {
self.writer.flushed_row_groups()
}

/// Enqueues the provided `RecordBatch` to be written
///
/// If following this there are more than `max_row_group_size` rows buffered,
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/file/metadata.rs
Expand Up @@ -217,7 +217,7 @@ impl FileMetaData {
pub type RowGroupMetaDataPtr = Arc<RowGroupMetaData>;

/// Metadata for a row group.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub struct RowGroupMetaData {
columns: Vec<ColumnChunkMetaData>,
num_rows: i64,
Expand Down
12 changes: 10 additions & 2 deletions parquet/src/file/writer.rs
Expand Up @@ -153,6 +153,11 @@ impl<W: Write> SerializedFileWriter<W> {
Ok(row_group_writer)
}

/// Returns metadata for any flushed row groups
pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] {
&self.row_groups
}

/// Closes and finalises file writer, returning the file metadata.
///
/// All row groups must be appended before this method is called.
Expand Down Expand Up @@ -1000,7 +1005,7 @@ mod tests {
);
let mut rows: i64 = 0;

for subset in &data {
for (idx, subset) in data.iter().enumerate() {
let mut row_group_writer = file_writer.next_row_group().unwrap();
if let Some(mut writer) = row_group_writer.next_column().unwrap() {
rows += writer
Expand All @@ -1009,7 +1014,10 @@ mod tests {
.unwrap() as i64;
writer.close().unwrap();
}
row_group_writer.close().unwrap();
let last_group = row_group_writer.close().unwrap();
let flushed = file_writer.flushed_row_groups();
assert_eq!(flushed.len(), idx + 1);
assert_eq!(flushed[idx].as_ref(), last_group.as_ref());
}
file_writer.close().unwrap();

Expand Down
1 change: 1 addition & 0 deletions parquet/src/schema/types.rs
Expand Up @@ -838,6 +838,7 @@ impl ColumnDescriptor {

/// A schema descriptor. This encapsulates the top-level schemas for all the columns,
/// as well as all descriptors for all the primitive columns.
#[derive(PartialEq)]
pub struct SchemaDescriptor {
// The top-level schema (the "message" type).
// This must be a `GroupType` where each field is a root column type in the schema.
Expand Down

0 comments on commit d4df1d9

Please sign in to comment.