Skip to content

Commit

Permalink
Update thrift v0.16 vendor parquet-format (apache#2502)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Sep 1, 2022
1 parent 9f4d56d commit 33f7fa4
Show file tree
Hide file tree
Showing 21 changed files with 5,585 additions and 317 deletions.
17 changes: 14 additions & 3 deletions parquet/CONTRIBUTING.md
Expand Up @@ -60,7 +60,18 @@ Run `cargo bench` for benchmarks.
To build documentation, run `cargo doc --no-deps`.
To compile and view in the browser, run `cargo doc --no-deps --open`.

## Update Supported Parquet Version
## Update Parquet Format

To update Parquet format to a newer version, check if [parquet-format](https://github.com/sunchao/parquet-format-rs)
version is available. Then simply update version of `parquet-format` crate in Cargo.toml.
To generate the parquet format code run

```
$ git clone https://github.com/apache/thrift
$ cd thrift
$ git checkout v0.16.0
# docker build just builds a docker image with thrift dependencies
$ docker build -t thrift build/docker/ubuntu-bionic
# build/docker/scripts/cmake.sh actually compiles thrift
$ docker run -v $(pwd):/thrift/src -it thrift build/docker/scripts/cmake.sh && wget https://raw.githubusercontent.com/apache/parquet-format/apache-parquet-format-2.9.0/src/main/thrift/parquet.thrift && ./cmake_build/compiler/cpp/bin/thrift --gen rs parquet.thrift
```

Then copy the generated `parquet.rs` into `src/format.rs` and commit changes.
3 changes: 1 addition & 2 deletions parquet/Cargo.toml
Expand Up @@ -31,9 +31,8 @@ rust-version = "1.62"

[dependencies]
ahash = "0.8"
parquet-format = { version = "4.0.0", default-features = false }
bytes = { version = "1.1", default-features = false, features = ["std"] }
thrift = { version = "0.13", default-features = false }
thrift = { version = "0.16", default-features = false }
snap = { version = "1.0", default-features = false, optional = true }
brotli = { version = "3.3", default-features = false, features = ["std"], optional = true }
flate2 = { version = "1.0", default-features = false, features = ["rust_backend"], optional = true }
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/arrow/arrow_reader/mod.rs
Expand Up @@ -1662,7 +1662,7 @@ mod tests {
schema: TypePtr,
field: Option<Field>,
opts: &TestOptions,
) -> Result<parquet_format::FileMetaData> {
) -> Result<crate::format::FileMetaData> {
let mut writer_props = opts.writer_props();
if let Some(field) = field {
let arrow_schema = Schema::new(vec![field]);
Expand Down
4 changes: 2 additions & 2 deletions parquet/src/arrow/arrow_reader/selection.rs
Expand Up @@ -121,7 +121,7 @@ impl RowSelection {
#[cfg(any(test, feature = "async"))]
pub(crate) fn scan_ranges(
&self,
page_locations: &[parquet_format::PageLocation],
page_locations: &[crate::format::PageLocation],
) -> Vec<Range<usize>> {
let mut ranges = vec![];
let mut row_offset = 0;
Expand Down Expand Up @@ -302,7 +302,7 @@ impl From<RowSelection> for VecDeque<RowSelector> {
#[cfg(test)]
mod tests {
use super::*;
use parquet_format::PageLocation;
use crate::format::PageLocation;
use rand::{thread_rng, Rng};

#[test]
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/arrow/arrow_writer/mod.rs
Expand Up @@ -230,7 +230,7 @@ impl<W: Write> ArrowWriter<W> {
}

/// Close and finalize the underlying Parquet writer
pub fn close(mut self) -> Result<parquet_format::FileMetaData> {
pub fn close(mut self) -> Result<crate::format::FileMetaData> {
self.flush()?;
self.writer.close()
}
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/arrow/async_reader.rs
Expand Up @@ -88,7 +88,7 @@ use bytes::{Buf, Bytes};
use futures::future::{BoxFuture, FutureExt};
use futures::ready;
use futures::stream::Stream;
use parquet_format::OffsetIndex;
use crate::format::OffsetIndex;
use thrift::protocol::TCompactInputProtocol;

use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
Expand Down
534 changes: 288 additions & 246 deletions parquet/src/basic.rs

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions parquet/src/column/page.rs
Expand Up @@ -20,9 +20,9 @@
use crate::basic::{Encoding, PageType};
use crate::errors::{ParquetError, Result};
use crate::file::{metadata::ColumnChunkMetaData, statistics::Statistics};
use crate::format::PageHeader;
use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
use crate::util::memory::ByteBufferPtr;
use parquet_format::PageHeader;

/// Parquet Page definition.
///
Expand Down Expand Up @@ -209,15 +209,15 @@ impl TryFrom<&PageHeader> for PageMetadata {

fn try_from(value: &PageHeader) -> std::result::Result<Self, Self::Error> {
match value.type_ {
parquet_format::PageType::DataPage => Ok(PageMetadata {
crate::format::PageType::DATA_PAGE => Ok(PageMetadata {
num_rows: value.data_page_header.as_ref().unwrap().num_values as usize,
is_dict: false,
}),
parquet_format::PageType::DictionaryPage => Ok(PageMetadata {
crate::format::PageType::DICTIONARY_PAGE => Ok(PageMetadata {
num_rows: usize::MIN,
is_dict: true,
}),
parquet_format::PageType::DataPageV2 => Ok(PageMetadata {
crate::format::PageType::DATA_PAGE_V2 => Ok(PageMetadata {
num_rows: value.data_page_header_v2.as_ref().unwrap().num_rows as usize,
is_dict: false,
}),
Expand Down
6 changes: 3 additions & 3 deletions parquet/src/column/writer/mod.rs
Expand Up @@ -16,7 +16,7 @@
// under the License.

//! Contains column writer API.
use parquet_format::{ColumnIndex, OffsetIndex};
use crate::format::{ColumnIndex, OffsetIndex};
use std::collections::{BTreeSet, VecDeque};

use crate::basic::{Compression, ConvertedType, Encoding, LogicalType, PageType, Type};
Expand Down Expand Up @@ -1089,8 +1089,8 @@ fn compare_greater_byte_array_decimals(a: &[u8], b: &[u8]) -> bool {

#[cfg(test)]
mod tests {
use crate::format::BoundaryOrder;
use bytes::Bytes;
use parquet_format::BoundaryOrder;
use rand::distributions::uniform::SampleUniform;
use std::sync::Arc;

Expand Down Expand Up @@ -2086,7 +2086,7 @@ mod tests {
// column index
assert_eq!(2, column_index.null_pages.len());
assert_eq!(2, offset_index.page_locations.len());
assert_eq!(BoundaryOrder::Unordered, column_index.boundary_order);
assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
for idx in 0..2 {
assert!(!column_index.null_pages[idx]);
assert_eq!(0, column_index.null_counts.as_ref().unwrap()[idx]);
Expand Down
4 changes: 2 additions & 2 deletions parquet/src/file/footer.rs
Expand Up @@ -17,7 +17,7 @@

use std::{io::Read, sync::Arc};

use parquet_format::{ColumnOrder as TColumnOrder, FileMetaData as TFileMetaData};
use crate::format::{ColumnOrder as TColumnOrder, FileMetaData as TFileMetaData};
use thrift::protocol::TCompactInputProtocol;

use crate::basic::ColumnOrder;
Expand Down Expand Up @@ -150,7 +150,7 @@ mod tests {
use crate::basic::SortOrder;
use crate::basic::Type;
use crate::schema::types::Type as SchemaType;
use parquet_format::TypeDefinedOrder;
use crate::format::TypeDefinedOrder;

#[test]
fn test_parse_metadata_size_smaller_than_footer() {
Expand Down
21 changes: 13 additions & 8 deletions parquet/src/file/metadata.rs
Expand Up @@ -35,7 +35,7 @@

use std::sync::Arc;

use parquet_format::{
use crate::format::{
BoundaryOrder, ColumnChunk, ColumnIndex, ColumnMetaData, OffsetIndex, PageLocation,
RowGroup,
};
Expand Down Expand Up @@ -122,7 +122,7 @@ impl ParquetMetaData {
}
}

pub type KeyValue = parquet_format::KeyValue;
pub type KeyValue = crate::format::KeyValue;

/// Reference counted pointer for [`FileMetaData`].
pub type FileMetaDataPtr = Arc<FileMetaData>;
Expand Down Expand Up @@ -553,14 +553,14 @@ impl ColumnChunkMetaData {
return Err(general_err!("Expected to have column metadata"));
}
let mut col_metadata: ColumnMetaData = cc.meta_data.unwrap();
let column_type = Type::from(col_metadata.type_);
let column_type = Type::try_from(col_metadata.type_)?;
let column_path = ColumnPath::new(col_metadata.path_in_schema);
let encodings = col_metadata
.encodings
.drain(0..)
.map(Encoding::from)
.collect();
let compression = Compression::from(col_metadata.codec);
.map(Encoding::try_from)
.collect::<Result<_>>()?;
let compression = Compression::try_from(col_metadata.codec)?;
let file_path = cc.file_path;
let file_offset = cc.file_offset;
let num_values = col_metadata.num_values;
Expand All @@ -573,7 +573,12 @@ impl ColumnChunkMetaData {
let encoding_stats = col_metadata
.encoding_stats
.as_ref()
.map(|vec| vec.iter().map(page_encoding_stats::from_thrift).collect());
.map(|vec| {
vec.iter()
.map(page_encoding_stats::try_from_thrift)
.collect::<Result<_>>()
})
.transpose()?;
let bloom_filter_offset = col_metadata.bloom_filter_offset;
let offset_index_offset = cc.offset_index_offset;
let offset_index_length = cc.offset_index_length;
Expand Down Expand Up @@ -846,7 +851,7 @@ impl ColumnIndexBuilder {
null_pages: Vec::new(),
min_values: Vec::new(),
max_values: Vec::new(),
boundary_order: BoundaryOrder::Unordered,
boundary_order: BoundaryOrder::UNORDERED,
null_counts: Vec::new(),
valid: true,
}
Expand Down
17 changes: 10 additions & 7 deletions parquet/src/file/page_encoding_stats.rs
Expand Up @@ -16,7 +16,8 @@
// under the License.

use crate::basic::{Encoding, PageType};
use parquet_format::{
use crate::errors::Result;
use crate::format::{
Encoding as TEncoding, PageEncodingStats as TPageEncodingStats, PageType as TPageType,
};

Expand All @@ -32,16 +33,18 @@ pub struct PageEncodingStats {
}

/// Converts Thrift definition into `PageEncodingStats`.
pub fn from_thrift(thrift_encoding_stats: &TPageEncodingStats) -> PageEncodingStats {
let page_type = PageType::from(thrift_encoding_stats.page_type);
let encoding = Encoding::from(thrift_encoding_stats.encoding);
pub fn try_from_thrift(
thrift_encoding_stats: &TPageEncodingStats,
) -> Result<PageEncodingStats> {
let page_type = PageType::try_from(thrift_encoding_stats.page_type)?;
let encoding = Encoding::try_from(thrift_encoding_stats.encoding)?;
let count = thrift_encoding_stats.count;

PageEncodingStats {
Ok(PageEncodingStats {
page_type,
encoding,
count,
}
})
}

/// Converts `PageEncodingStats` into Thrift definition.
Expand Down Expand Up @@ -70,6 +73,6 @@ mod tests {
count: 1,
};

assert_eq!(from_thrift(&to_thrift(&stats)), stats);
assert_eq!(try_from_thrift(&to_thrift(&stats)).unwrap(), stats);
}
}
2 changes: 1 addition & 1 deletion parquet/src/file/page_index/index.rs
Expand Up @@ -20,7 +20,7 @@ use crate::data_type::private::ParquetValueType;
use crate::data_type::Int96;
use crate::errors::ParquetError;
use crate::util::bit_util::from_le_slice;
use parquet_format::{BoundaryOrder, ColumnIndex};
use crate::format::{BoundaryOrder, ColumnIndex};
use std::fmt::Debug;

/// The statistics in one page
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/file/page_index/index_reader.rs
Expand Up @@ -21,7 +21,7 @@ use crate::errors::ParquetError;
use crate::file::metadata::ColumnChunkMetaData;
use crate::file::page_index::index::{BooleanIndex, ByteArrayIndex, Index, NativeIndex};
use crate::file::reader::ChunkReader;
use parquet_format::{ColumnIndex, OffsetIndex, PageLocation};
use crate::format::{ColumnIndex, OffsetIndex, PageLocation};
use std::io::{Cursor, Read};
use thrift::protocol::TCompactInputProtocol;

Expand Down
6 changes: 3 additions & 3 deletions parquet/src/file/page_index/range.rs
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
use crate::errors::ParquetError;
use parquet_format::PageLocation;
use crate::format::PageLocation;
use std::cmp::Ordering;
use std::collections::VecDeque;
use std::ops::RangeInclusive;
Expand Down Expand Up @@ -284,7 +284,7 @@ mod tests {
use crate::basic::Type::INT32;
use crate::file::page_index::index::{NativeIndex, PageIndex};
use crate::file::page_index::range::{compute_row_ranges, Range, RowRanges};
use parquet_format::{BoundaryOrder, PageLocation};
use crate::format::{BoundaryOrder, PageLocation};

#[test]
fn test_binary_search_overlap() {
Expand Down Expand Up @@ -445,7 +445,7 @@ mod tests {
null_count: Some(0),
},
],
boundary_order: BoundaryOrder::Ascending,
boundary_order: BoundaryOrder::ASCENDING,
};
let locations = &[
PageLocation {
Expand Down

0 comments on commit 33f7fa4

Please sign in to comment.