From 25aceeb6b305aebea6bec5607e8d1878e29c4ec4 Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Thu, 18 Aug 2022 19:57:30 +0800 Subject: [PATCH 1/3] add into_inner to take inner writer out --- parquet/src/arrow/arrow_writer/mod.rs | 5 +++++ parquet/src/file/writer.rs | 10 ++++++++++ 2 files changed, 15 insertions(+) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index e6fbccb8966..656810e1a93 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -223,6 +223,11 @@ impl ArrowWriter { Ok(()) } + /// Returns the underlying writer. + pub fn into_inner(self) -> W { + self.writer.into_inner() + } + /// Close and finalize the underlying Parquet writer pub fn close(mut self) -> Result { self.flush()?; diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index 87a9ae3e14e..cf55ac57fea 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -60,6 +60,11 @@ impl TrackedWrite { pub fn bytes_written(&self) -> usize { self.bytes_written } + + /// Returns the underlying writer. + pub fn into_inner(self) -> W { + self.inner + } } impl Write for TrackedWrite { @@ -306,6 +311,11 @@ impl SerializedFileWriter { Ok(()) } } + + /// Returns the underlying writer. + pub fn into_inner(self) -> W { + self.buf.into_inner() + } } /// Parquet row group writer API. From a0cb64a2bfe4e2e3bc28666a002bbd0b84b05d6e Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Thu, 18 Aug 2022 21:48:45 +0800 Subject: [PATCH 2/3] flush writer before into_inner --- parquet/src/arrow/arrow_writer/mod.rs | 69 +++++++++++++++++---------- parquet/src/file/writer.rs | 7 ++- 2 files changed, 48 insertions(+), 28 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 656810e1a93..bef91494a82 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -224,7 +224,8 @@ impl ArrowWriter { } /// Returns the underlying writer. - pub fn into_inner(self) -> W { + pub fn into_inner(mut self) -> Result { + self.flush()?; self.writer.into_inner() } @@ -649,6 +650,25 @@ mod tests { roundtrip(batch, Some(SMALL_SIZE / 2)); } + fn get_bytes_after_close(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec { + let mut buffer = vec![]; + + let mut writer = ArrowWriter::try_new(&mut buffer, schema, None).unwrap(); + writer.write(expected_batch).unwrap(); + writer.close().unwrap(); + + buffer + } + + fn get_bytes_by_into_inner( + schema: SchemaRef, + expected_batch: &RecordBatch, + ) -> Vec { + let mut writer = ArrowWriter::try_new(Vec::new(), schema, None).unwrap(); + writer.write(expected_batch).unwrap(); + writer.into_inner().unwrap() + } + #[test] fn roundtrip_bytes() { // define schema @@ -665,31 +685,28 @@ mod tests { let expected_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a), Arc::new(b)]).unwrap(); - let mut buffer = vec![]; - - { - let mut writer = ArrowWriter::try_new(&mut buffer, schema, None).unwrap(); - writer.write(&expected_batch).unwrap(); - writer.close().unwrap(); - } - - let cursor = Bytes::from(buffer); - let mut record_batch_reader = - ParquetRecordBatchReader::try_new(cursor, 1024).unwrap(); - - let actual_batch = record_batch_reader - .next() - .expect("No batch found") - .expect("Unable to get batch"); - - assert_eq!(expected_batch.schema(), actual_batch.schema()); - assert_eq!(expected_batch.num_columns(), actual_batch.num_columns()); - assert_eq!(expected_batch.num_rows(), actual_batch.num_rows()); - for i in 0..expected_batch.num_columns() { - let expected_data = expected_batch.column(i).data().clone(); - let actual_data = actual_batch.column(i).data().clone(); - - assert_eq!(expected_data, actual_data); + for buffer in vec![ + get_bytes_after_close(schema.clone(), &expected_batch), + get_bytes_by_into_inner(schema, &expected_batch), + ] { + let cursor = Bytes::from(buffer); + let mut record_batch_reader = + ParquetRecordBatchReader::try_new(cursor, 1024).unwrap(); + + let actual_batch = record_batch_reader + .next() + .expect("No batch found") + .expect("Unable to get batch"); + + assert_eq!(expected_batch.schema(), actual_batch.schema()); + assert_eq!(expected_batch.num_columns(), actual_batch.num_columns()); + assert_eq!(expected_batch.num_rows(), actual_batch.num_rows()); + for i in 0..expected_batch.num_columns() { + let expected_data = expected_batch.column(i).data().clone(); + let actual_data = actual_batch.column(i).data().clone(); + + assert_eq!(expected_data, actual_data); + } } } diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index cf55ac57fea..aa6759e886c 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -313,8 +313,11 @@ impl SerializedFileWriter { } /// Returns the underlying writer. - pub fn into_inner(self) -> W { - self.buf.into_inner() + pub fn into_inner(mut self) -> Result { + self.assert_previous_writer_closed()?; + let _ = self.write_metadata()?; + + Ok(self.buf.into_inner()) } } From 5d5fba293ba14b7f5f5e3e411d872f3a23fe18a2 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Thu, 18 Aug 2022 17:30:00 +0100 Subject: [PATCH 3/3] Apply suggestions from code review --- parquet/src/arrow/arrow_writer/mod.rs | 2 +- parquet/src/file/writer.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index bef91494a82..d09cb712ea2 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -223,7 +223,7 @@ impl ArrowWriter { Ok(()) } - /// Returns the underlying writer. + /// Flushes any outstanding data and returns the underlying writer. pub fn into_inner(mut self) -> Result { self.flush()?; self.writer.into_inner() diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index aa6759e886c..f11365c7699 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -312,7 +312,7 @@ impl SerializedFileWriter { } } - /// Returns the underlying writer. + /// Writes the file footer and returns the underlying writer. pub fn into_inner(mut self) -> Result { self.assert_previous_writer_closed()?; let _ = self.write_metadata()?;