From d4df1d98b0abfe9a8ae1c322f935b4fbf9825feb Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Mon, 6 Jun 2022 13:57:55 +0100 Subject: [PATCH] Access metadata of flushed row groups on write (#1691) (#1774) * Access metadata of flushed row groups on write (#1691) * Add tests --- parquet/src/arrow/arrow_writer.rs | 6 ++++++ parquet/src/file/metadata.rs | 2 +- parquet/src/file/writer.rs | 12 ++++++++++-- parquet/src/schema/types.rs | 1 + 4 files changed, 18 insertions(+), 3 deletions(-) diff --git a/parquet/src/arrow/arrow_writer.rs b/parquet/src/arrow/arrow_writer.rs index b02a916da8b..334c7237d70 100644 --- a/parquet/src/arrow/arrow_writer.rs +++ b/parquet/src/arrow/arrow_writer.rs @@ -36,6 +36,7 @@ use super::schema::{ use crate::arrow::levels::calculate_array_levels; use crate::column::writer::ColumnWriter; use crate::errors::{ParquetError, Result}; +use crate::file::metadata::RowGroupMetaDataPtr; use crate::file::properties::WriterProperties; use crate::file::writer::{SerializedColumnWriter, SerializedRowGroupWriter}; use crate::{data_type::*, file::writer::SerializedFileWriter}; @@ -95,6 +96,11 @@ impl ArrowWriter { }) } + /// Returns metadata for any flushed row groups + pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] { + self.writer.flushed_row_groups() + } + /// Enqueues the provided `RecordBatch` to be written /// /// If following this there are more than `max_row_group_size` rows buffered, diff --git a/parquet/src/file/metadata.rs b/parquet/src/file/metadata.rs index 2a385be515d..d96e95a9550 100644 --- a/parquet/src/file/metadata.rs +++ b/parquet/src/file/metadata.rs @@ -217,7 +217,7 @@ impl FileMetaData { pub type RowGroupMetaDataPtr = Arc; /// Metadata for a row group. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct RowGroupMetaData { columns: Vec, num_rows: i64, diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index 3108baddefa..0a8fc331e7e 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -153,6 +153,11 @@ impl SerializedFileWriter { Ok(row_group_writer) } + /// Returns metadata for any flushed row groups + pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] { + &self.row_groups + } + /// Closes and finalises file writer, returning the file metadata. /// /// All row groups must be appended before this method is called. @@ -1000,7 +1005,7 @@ mod tests { ); let mut rows: i64 = 0; - for subset in &data { + for (idx, subset) in data.iter().enumerate() { let mut row_group_writer = file_writer.next_row_group().unwrap(); if let Some(mut writer) = row_group_writer.next_column().unwrap() { rows += writer @@ -1009,7 +1014,10 @@ mod tests { .unwrap() as i64; writer.close().unwrap(); } - row_group_writer.close().unwrap(); + let last_group = row_group_writer.close().unwrap(); + let flushed = file_writer.flushed_row_groups(); + assert_eq!(flushed.len(), idx + 1); + assert_eq!(flushed[idx].as_ref(), last_group.as_ref()); } file_writer.close().unwrap(); diff --git a/parquet/src/schema/types.rs b/parquet/src/schema/types.rs index 9cef93a69b2..8d624fe3d18 100644 --- a/parquet/src/schema/types.rs +++ b/parquet/src/schema/types.rs @@ -838,6 +838,7 @@ impl ColumnDescriptor { /// A schema descriptor. This encapsulates the top-level schemas for all the columns, /// as well as all descriptors for all the primitive columns. +#[derive(PartialEq)] pub struct SchemaDescriptor { // The top-level schema (the "message" type). // This must be a `GroupType` where each field is a root column type in the schema.