Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

write ColumnMetadata after the column chunk data, not the ColumnChunk #1947

Merged
merged 1 commit into from Jun 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
31 changes: 18 additions & 13 deletions parquet/src/file/metadata.rs
Expand Up @@ -579,7 +579,24 @@ impl ColumnChunkMetaData {

/// Method to convert to Thrift.
pub fn to_thrift(&self) -> ColumnChunk {
let column_metadata = ColumnMetaData {
let column_metadata = self.to_column_metadata_thrift();

ColumnChunk {
file_path: self.file_path().map(|s| s.to_owned()),
file_offset: self.file_offset,
meta_data: Some(column_metadata),
offset_index_offset: self.offset_index_offset,
offset_index_length: self.offset_index_length,
column_index_offset: self.column_index_offset,
column_index_length: self.column_index_length,
crypto_metadata: None,
encrypted_column_metadata: None,
}
}

/// Method to convert to Thrift `ColumnMetaData`
pub fn to_column_metadata_thrift(&self) -> ColumnMetaData {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we change to_thrift above to use this method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch

ColumnMetaData {
type_: self.column_type.into(),
encodings: self.encodings().iter().map(|&v| v.into()).collect(),
path_in_schema: Vec::from(self.column_path.as_ref()),
Expand All @@ -597,18 +614,6 @@ impl ColumnChunkMetaData {
.as_ref()
.map(|vec| vec.iter().map(page_encoding_stats::to_thrift).collect()),
bloom_filter_offset: self.bloom_filter_offset,
};

ColumnChunk {
file_path: self.file_path().map(|s| s.to_owned()),
file_offset: self.file_offset,
meta_data: Some(column_metadata),
offset_index_offset: self.offset_index_offset,
offset_index_length: self.offset_index_length,
column_index_offset: self.column_index_offset,
column_index_length: self.column_index_length,
crypto_metadata: None,
encrypted_column_metadata: None,
}
}
}
Expand Down
17 changes: 6 additions & 11 deletions parquet/src/file/writer.rs
Expand Up @@ -434,16 +434,6 @@ impl<'a, W: Write> SerializedPageWriter<'a, W> {
}
Ok(self.sink.bytes_written() - start_pos)
}

/// Serializes column chunk into Thrift.
/// Returns Ok() if there are not errors serializing and writing data into the sink.
#[inline]
fn serialize_column_chunk(&mut self, chunk: parquet::ColumnChunk) -> Result<()> {
let mut protocol = TCompactOutputProtocol::new(&mut self.sink);
chunk.write_to_out_protocol(&mut protocol)?;
protocol.flush()?;
Ok(())
}
}

impl<'a, W: Write> PageWriter for SerializedPageWriter<'a, W> {
Expand Down Expand Up @@ -533,7 +523,12 @@ impl<'a, W: Write> PageWriter for SerializedPageWriter<'a, W> {
}

fn write_metadata(&mut self, metadata: &ColumnChunkMetaData) -> Result<()> {
self.serialize_column_chunk(metadata.to_thrift())
let mut protocol = TCompactOutputProtocol::new(&mut self.sink);
metadata
.to_column_metadata_thrift()
.write_to_out_protocol(&mut protocol)?;
protocol.flush()?;
Ok(())
}

fn close(&mut self) -> Result<()> {
Expand Down