From b104d64ddf4a69a14cc20249d76264ea6468cdc0 Mon Sep 17 00:00:00 2001 From: liukun4515 Date: Sat, 25 Jun 2022 15:50:55 +0800 Subject: [PATCH] fix bug: write column metadata to the behind of the column chunk data --- parquet/src/file/metadata.rs | 23 +++++++++++++++++++++++ parquet/src/file/writer.rs | 11 +++++++---- 2 files changed, 30 insertions(+), 4 deletions(-) diff --git a/parquet/src/file/metadata.rs b/parquet/src/file/metadata.rs index a3477dd7577..f6f343adf3c 100644 --- a/parquet/src/file/metadata.rs +++ b/parquet/src/file/metadata.rs @@ -611,6 +611,29 @@ impl ColumnChunkMetaData { encrypted_column_metadata: None, } } + + /// Method to convert to Thrift `ColumnMetaData` + pub fn to_column_metadata_thrift(&self) -> ColumnMetaData { + ColumnMetaData { + type_: self.column_type.into(), + encodings: self.encodings().iter().map(|&v| v.into()).collect(), + path_in_schema: Vec::from(self.column_path.as_ref()), + codec: self.compression.into(), + num_values: self.num_values, + total_uncompressed_size: self.total_uncompressed_size, + total_compressed_size: self.total_compressed_size, + key_value_metadata: None, + data_page_offset: self.data_page_offset, + index_page_offset: self.index_page_offset, + dictionary_page_offset: self.dictionary_page_offset, + statistics: statistics::to_thrift(self.statistics.as_ref()), + encoding_stats: self + .encoding_stats + .as_ref() + .map(|vec| vec.iter().map(page_encoding_stats::to_thrift).collect()), + bloom_filter_offset: self.bloom_filter_offset, + } + } } /// Builder for column chunk metadata. diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index 0a8fc331e7e..050cf020f42 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -435,12 +435,15 @@ impl<'a, W: Write> SerializedPageWriter<'a, W> { Ok(self.sink.bytes_written() - start_pos) } - /// Serializes column chunk into Thrift. + /// Serializes column metadata into Thrift. /// Returns Ok() if there are not errors serializing and writing data into the sink. #[inline] - fn serialize_column_chunk(&mut self, chunk: parquet::ColumnChunk) -> Result<()> { + fn serialize_column_chunk( + &mut self, + column_metadata: parquet::ColumnMetaData, + ) -> Result<()> { let mut protocol = TCompactOutputProtocol::new(&mut self.sink); - chunk.write_to_out_protocol(&mut protocol)?; + column_metadata.write_to_out_protocol(&mut protocol)?; protocol.flush()?; Ok(()) } @@ -533,7 +536,7 @@ impl<'a, W: Write> PageWriter for SerializedPageWriter<'a, W> { } fn write_metadata(&mut self, metadata: &ColumnChunkMetaData) -> Result<()> { - self.serialize_column_chunk(metadata.to_thrift()) + self.serialize_column_chunk(metadata.to_column_metadata_thrift()) } fn close(&mut self) -> Result<()> {