Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update thrift v0.16 and vendor parquet-format (#2502) #2626

Merged
merged 9 commits into from Sep 10, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 1 addition & 6 deletions .gitattributes
@@ -1,6 +1 @@
r/R/RcppExports.R linguist-generated=true
r/R/arrowExports.R linguist-generated=true
r/src/RcppExports.cpp linguist-generated=true
r/src/arrowExports.cpp linguist-generated=true
r/man/*.Rd linguist-generated=true

parquet/src/format.rs linguist-generated
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

9 changes: 6 additions & 3 deletions parquet/CONTRIBUTING.md
Expand Up @@ -60,7 +60,10 @@ Run `cargo bench` for benchmarks.
To build documentation, run `cargo doc --no-deps`.
To compile and view in the browser, run `cargo doc --no-deps --open`.

## Update Supported Parquet Version
## Update Parquet Format

To update Parquet format to a newer version, check if [parquet-format](https://github.com/sunchao/parquet-format-rs)
version is available. Then simply update version of `parquet-format` crate in Cargo.toml.
To generate the parquet format (thrift definitions) code run from the repository root run

```
$ docker run -v $(pwd):/thrift/src -it archlinux pacman -Sy --noconfirm thrift && wget https://raw.githubusercontent.com/apache/parquet-format/apache-parquet-format-2.9.0/src/main/thrift/parquet.thrift -O /tmp/parquet.thrift && thrift --gen rs /tmp/parquet.thrift && sed -i '/use thrift::server::TProcessor;/d' parquet.rs && mv parquet.rs parquet/src/format.rs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried this and it works great:

(arrow_dev) alamb@MacBook-Pro-8:~/Software/arrow-rs$ diff parquet.rs parquet/src/format.rs 
26d25
< use thrift::server::TProcessor;

```
3 changes: 1 addition & 2 deletions parquet/Cargo.toml
Expand Up @@ -31,9 +31,8 @@ rust-version = "1.62"

[dependencies]
ahash = "0.8"
parquet-format = { version = "4.0.0", default-features = false }
bytes = { version = "1.1", default-features = false, features = ["std"] }
thrift = { version = "0.13", default-features = false }
thrift = { version = "0.16", default-features = false }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

snap = { version = "1.0", default-features = false, optional = true }
brotli = { version = "3.3", default-features = false, features = ["std"], optional = true }
flate2 = { version = "1.0", default-features = false, features = ["rust_backend"], optional = true }
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/arrow/arrow_reader/mod.rs
Expand Up @@ -1668,7 +1668,7 @@ mod tests {
schema: TypePtr,
field: Option<Field>,
opts: &TestOptions,
) -> Result<parquet_format::FileMetaData> {
) -> Result<crate::format::FileMetaData> {
let mut writer_props = opts.writer_props();
if let Some(field) = field {
let arrow_schema = Schema::new(vec![field]);
Expand Down
4 changes: 2 additions & 2 deletions parquet/src/arrow/arrow_reader/selection.rs
Expand Up @@ -121,7 +121,7 @@ impl RowSelection {
#[cfg(any(test, feature = "async"))]
pub(crate) fn scan_ranges(
&self,
page_locations: &[parquet_format::PageLocation],
page_locations: &[crate::format::PageLocation],
) -> Vec<Range<usize>> {
let mut ranges = vec![];
let mut row_offset = 0;
Expand Down Expand Up @@ -302,7 +302,7 @@ impl From<RowSelection> for VecDeque<RowSelector> {
#[cfg(test)]
mod tests {
use super::*;
use parquet_format::PageLocation;
use crate::format::PageLocation;
use rand::{thread_rng, Rng};

#[test]
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/arrow/arrow_writer/mod.rs
Expand Up @@ -230,7 +230,7 @@ impl<W: Write> ArrowWriter<W> {
}

/// Close and finalize the underlying Parquet writer
pub fn close(mut self) -> Result<parquet_format::FileMetaData> {
pub fn close(mut self) -> Result<crate::format::FileMetaData> {
self.flush()?;
self.writer.close()
}
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/arrow/async_reader.rs
Expand Up @@ -84,11 +84,11 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use crate::format::OffsetIndex;
use bytes::{Buf, Bytes};
use futures::future::{BoxFuture, FutureExt};
use futures::ready;
use futures::stream::Stream;
use parquet_format::OffsetIndex;
use thrift::protocol::TCompactInputProtocol;

use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
Expand Down
544 changes: 293 additions & 251 deletions parquet/src/basic.rs

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions parquet/src/column/page.rs
Expand Up @@ -20,9 +20,9 @@
use crate::basic::{Encoding, PageType};
use crate::errors::{ParquetError, Result};
use crate::file::{metadata::ColumnChunkMetaData, statistics::Statistics};
use crate::format::PageHeader;
use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
use crate::util::memory::ByteBufferPtr;
use parquet_format::PageHeader;

/// Parquet Page definition.
///
Expand Down Expand Up @@ -209,15 +209,15 @@ impl TryFrom<&PageHeader> for PageMetadata {

fn try_from(value: &PageHeader) -> std::result::Result<Self, Self::Error> {
match value.type_ {
parquet_format::PageType::DataPage => Ok(PageMetadata {
crate::format::PageType::DATA_PAGE => Ok(PageMetadata {
num_rows: value.data_page_header.as_ref().unwrap().num_values as usize,
is_dict: false,
}),
parquet_format::PageType::DictionaryPage => Ok(PageMetadata {
crate::format::PageType::DICTIONARY_PAGE => Ok(PageMetadata {
num_rows: usize::MIN,
is_dict: true,
}),
parquet_format::PageType::DataPageV2 => Ok(PageMetadata {
crate::format::PageType::DATA_PAGE_V2 => Ok(PageMetadata {
num_rows: value.data_page_header_v2.as_ref().unwrap().num_rows as usize,
is_dict: false,
}),
Expand Down
6 changes: 3 additions & 3 deletions parquet/src/column/writer/mod.rs
Expand Up @@ -16,7 +16,7 @@
// under the License.

//! Contains column writer API.
use parquet_format::{ColumnIndex, OffsetIndex};
use crate::format::{ColumnIndex, OffsetIndex};
use std::collections::{BTreeSet, VecDeque};

use crate::basic::{Compression, ConvertedType, Encoding, LogicalType, PageType, Type};
Expand Down Expand Up @@ -1089,8 +1089,8 @@ fn compare_greater_byte_array_decimals(a: &[u8], b: &[u8]) -> bool {

#[cfg(test)]
mod tests {
use crate::format::BoundaryOrder;
use bytes::Bytes;
use parquet_format::BoundaryOrder;
use rand::distributions::uniform::SampleUniform;
use std::sync::Arc;

Expand Down Expand Up @@ -2086,7 +2086,7 @@ mod tests {
// column index
assert_eq!(2, column_index.null_pages.len());
assert_eq!(2, offset_index.page_locations.len());
assert_eq!(BoundaryOrder::Unordered, column_index.boundary_order);
assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
for idx in 0..2 {
assert!(!column_index.null_pages[idx]);
assert_eq!(0, column_index.null_counts.as_ref().unwrap()[idx]);
Expand Down
4 changes: 2 additions & 2 deletions parquet/src/file/footer.rs
Expand Up @@ -17,7 +17,7 @@

use std::{io::Read, sync::Arc};

use parquet_format::{ColumnOrder as TColumnOrder, FileMetaData as TFileMetaData};
use crate::format::{ColumnOrder as TColumnOrder, FileMetaData as TFileMetaData};
use thrift::protocol::TCompactInputProtocol;

use crate::basic::ColumnOrder;
Expand Down Expand Up @@ -149,8 +149,8 @@ mod tests {

use crate::basic::SortOrder;
use crate::basic::Type;
use crate::format::TypeDefinedOrder;
use crate::schema::types::Type as SchemaType;
use parquet_format::TypeDefinedOrder;

#[test]
fn test_parse_metadata_size_smaller_than_footer() {
Expand Down
21 changes: 13 additions & 8 deletions parquet/src/file/metadata.rs
Expand Up @@ -35,7 +35,7 @@

use std::sync::Arc;

use parquet_format::{
use crate::format::{
BoundaryOrder, ColumnChunk, ColumnIndex, ColumnMetaData, OffsetIndex, PageLocation,
RowGroup,
};
Expand Down Expand Up @@ -122,7 +122,7 @@ impl ParquetMetaData {
}
}

pub type KeyValue = parquet_format::KeyValue;
pub type KeyValue = crate::format::KeyValue;

/// Reference counted pointer for [`FileMetaData`].
pub type FileMetaDataPtr = Arc<FileMetaData>;
Expand Down Expand Up @@ -553,14 +553,14 @@ impl ColumnChunkMetaData {
return Err(general_err!("Expected to have column metadata"));
}
let mut col_metadata: ColumnMetaData = cc.meta_data.unwrap();
let column_type = Type::from(col_metadata.type_);
let column_type = Type::try_from(col_metadata.type_)?;
let column_path = ColumnPath::new(col_metadata.path_in_schema);
let encodings = col_metadata
.encodings
.drain(0..)
.map(Encoding::from)
.collect();
let compression = Compression::from(col_metadata.codec);
.map(Encoding::try_from)
.collect::<Result<_>>()?;
let compression = Compression::try_from(col_metadata.codec)?;
let file_path = cc.file_path;
let file_offset = cc.file_offset;
let num_values = col_metadata.num_values;
Expand All @@ -573,7 +573,12 @@ impl ColumnChunkMetaData {
let encoding_stats = col_metadata
.encoding_stats
.as_ref()
.map(|vec| vec.iter().map(page_encoding_stats::from_thrift).collect());
.map(|vec| {
vec.iter()
.map(page_encoding_stats::try_from_thrift)
.collect::<Result<_>>()
})
.transpose()?;
let bloom_filter_offset = col_metadata.bloom_filter_offset;
let offset_index_offset = cc.offset_index_offset;
let offset_index_length = cc.offset_index_length;
Expand Down Expand Up @@ -846,7 +851,7 @@ impl ColumnIndexBuilder {
null_pages: Vec::new(),
min_values: Vec::new(),
max_values: Vec::new(),
boundary_order: BoundaryOrder::Unordered,
boundary_order: BoundaryOrder::UNORDERED,
null_counts: Vec::new(),
valid: true,
}
Expand Down
17 changes: 10 additions & 7 deletions parquet/src/file/page_encoding_stats.rs
Expand Up @@ -16,7 +16,8 @@
// under the License.

use crate::basic::{Encoding, PageType};
use parquet_format::{
use crate::errors::Result;
use crate::format::{
Encoding as TEncoding, PageEncodingStats as TPageEncodingStats, PageType as TPageType,
};

Expand All @@ -32,16 +33,18 @@ pub struct PageEncodingStats {
}

/// Converts Thrift definition into `PageEncodingStats`.
pub fn from_thrift(thrift_encoding_stats: &TPageEncodingStats) -> PageEncodingStats {
let page_type = PageType::from(thrift_encoding_stats.page_type);
let encoding = Encoding::from(thrift_encoding_stats.encoding);
pub fn try_from_thrift(
thrift_encoding_stats: &TPageEncodingStats,
) -> Result<PageEncodingStats> {
let page_type = PageType::try_from(thrift_encoding_stats.page_type)?;
let encoding = Encoding::try_from(thrift_encoding_stats.encoding)?;
let count = thrift_encoding_stats.count;

PageEncodingStats {
Ok(PageEncodingStats {
page_type,
encoding,
count,
}
})
}

/// Converts `PageEncodingStats` into Thrift definition.
Expand Down Expand Up @@ -70,6 +73,6 @@ mod tests {
count: 1,
};

assert_eq!(from_thrift(&to_thrift(&stats)), stats);
assert_eq!(try_from_thrift(&to_thrift(&stats)).unwrap(), stats);
}
}
2 changes: 1 addition & 1 deletion parquet/src/file/page_index/index.rs
Expand Up @@ -19,8 +19,8 @@ use crate::basic::Type;
use crate::data_type::private::ParquetValueType;
use crate::data_type::Int96;
use crate::errors::ParquetError;
use crate::format::{BoundaryOrder, ColumnIndex};
use crate::util::bit_util::from_le_slice;
use parquet_format::{BoundaryOrder, ColumnIndex};
use std::fmt::Debug;

/// The statistics in one page
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/file/page_index/index_reader.rs
Expand Up @@ -21,7 +21,7 @@ use crate::errors::ParquetError;
use crate::file::metadata::ColumnChunkMetaData;
use crate::file::page_index::index::{BooleanIndex, ByteArrayIndex, Index, NativeIndex};
use crate::file::reader::ChunkReader;
use parquet_format::{ColumnIndex, OffsetIndex, PageLocation};
use crate::format::{ColumnIndex, OffsetIndex, PageLocation};
use std::io::{Cursor, Read};
use thrift::protocol::TCompactInputProtocol;

Expand Down
6 changes: 3 additions & 3 deletions parquet/src/file/page_index/range.rs
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
use crate::errors::ParquetError;
use parquet_format::PageLocation;
use crate::format::PageLocation;
use std::cmp::Ordering;
use std::collections::VecDeque;
use std::ops::RangeInclusive;
Expand Down Expand Up @@ -284,7 +284,7 @@ mod tests {
use crate::basic::Type::INT32;
use crate::file::page_index::index::{NativeIndex, PageIndex};
use crate::file::page_index::range::{compute_row_ranges, Range, RowRanges};
use parquet_format::{BoundaryOrder, PageLocation};
use crate::format::{BoundaryOrder, PageLocation};

#[test]
fn test_binary_search_overlap() {
Expand Down Expand Up @@ -445,7 +445,7 @@ mod tests {
null_count: Some(0),
},
],
boundary_order: BoundaryOrder::Ascending,
boundary_order: BoundaryOrder::ASCENDING,
};
let locations = &[
PageLocation {
Expand Down