Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update thrift v0.16 and vendor parquet-format (#2502) #2626

Merged
merged 9 commits into from Sep 10, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 1 addition & 6 deletions .gitattributes
@@ -1,6 +1 @@
r/R/RcppExports.R linguist-generated=true
r/R/arrowExports.R linguist-generated=true
r/src/RcppExports.cpp linguist-generated=true
r/src/arrowExports.cpp linguist-generated=true
r/man/*.Rd linguist-generated=true

parquet/src/format.rs linguist-generated
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

17 changes: 14 additions & 3 deletions parquet/CONTRIBUTING.md
Expand Up @@ -60,7 +60,18 @@ Run `cargo bench` for benchmarks.
To build documentation, run `cargo doc --no-deps`.
To compile and view in the browser, run `cargo doc --no-deps --open`.

## Update Supported Parquet Version
## Update Parquet Format

To update Parquet format to a newer version, check if [parquet-format](https://github.com/sunchao/parquet-format-rs)
version is available. Then simply update version of `parquet-format` crate in Cargo.toml.
To generate the parquet format code run
tustvold marked this conversation as resolved.
Show resolved Hide resolved

```
$ git clone https://github.com/apache/thrift
$ cd thrift
$ git checkout v0.16.0
# docker build just builds a docker image with thrift dependencies
$ docker build -t thrift build/docker/ubuntu-bionic
tustvold marked this conversation as resolved.
Show resolved Hide resolved
# build/docker/scripts/cmake.sh actually compiles thrift
$ docker run -v $(pwd):/thrift/src -it thrift build/docker/scripts/cmake.sh && wget https://raw.githubusercontent.com/apache/parquet-format/apache-parquet-format-2.9.0/src/main/thrift/parquet.thrift && ./cmake_build/compiler/cpp/bin/thrift --gen rs parquet.thrift
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This command did not complete successfully for me.

...
441: ----------------------------------------------------------------------
441: Ran 2 tests in 0.012s
441: 
441: OK
441: Traceback (most recent call last):
441:   File "/thrift/src/test/py/TestServer.py", line 403, in <module>
441:     from thrift.TMultiplexedProcessor import TMultiplexedProcessor
441:   File "/thrift/src/lib/py/build/lib.linux-x86_64-2.7/thrift/TMultiplexedProcessor.py", line 20, in <module>
441:     from thrift.Thrift import TProcessor, TMessageType
441: ImportError: No module named Thrift
441: t.py
441: ----
441: ----------------
441:  Executing Client/Server tests with various generated code directories
441:  Servers to be tested: TSimpleServer, TThreadedServer, TThreadPoolServer, TNonblockingServer, THttpServer, TProcessPoolServer, TForkingServer
441:  Directories to be tested: gen-py-default, gen-py-slots, gen-py-oldstyle, gen-py-no_utf8strings, gen-py-dynamic, gen-py-dynamicslots
441:  Protocols to be tested: accel, accelc, binary, compact, json, header
441:  Options to be tested: ZLIB(yes/no), SSL(yes/no)
441: ----------------
441: 
441: Test run #0:  (includes gen-py-default) Server=TSimpleServer,  Proto=accel,  zlib=False,  SSL=False
441: Testing server TSimpleServer: /usr/bin/python /thrift/src/test/py/TestServer.py --protocol=accel --port=9090 TSimpleServer
441: FAIL: Server process (/usr/bin/python /thrift/src/test/py/TestServer.py --protocol=accel --port=9090 TSimpleServer) failed with retcode 1
441: Traceback (most recent call last):
441:   File "/thrift/src/test/py/RunClientServer.py", line 323, in <module>
441:     sys.exit(main())
441:   File "/thrift/src/test/py/RunClientServer.py", line 315, in main
441:     tests.test_feature('gendir', generated_dirs)
441:   File "/thrift/src/test/py/RunClientServer.py", line 230, in test_feature
441:     if self.run(conf, test_count):
441:   File "/thrift/src/test/py/RunClientServer.py", line 219, in run
441:     runServiceTest(self.libdir, self.genbase, genpydir, try_server, try_proto, self.port, with_zlib, with_ssl, self.verbose)
441:   File "/thrift/src/test/py/RunClientServer.py", line 157, in runServiceTest
441:     ensureServerAlive()
441:   File "/thrift/src/test/py/RunClientServer.py", line 140, in ensureServerAlive
441:     % (server_class, ' '.join(server_args)))
441: Exception: Server subprocess TSimpleServer died, args: /usr/bin/python /thrift/src/test/py/TestServer.py --protocol=accel --port=9090 TSimpleServer
441/441 Test #441: python_test .......................***Failed   28.49 sec

99% tests passed, 6 tests failed out of 441

Total Test time (real) = 250.09 sec

The following tests FAILED:
	382 - TInterruptTest (Failed)
	401 - TNonblockingSSLServerTest (Failed)
	403 - SecurityTest (Failed)
	404 - SecurityFromBufferTest (Failed)
	433 - PythonTestSSLSocket (Failed)
	441 - python_test (Failed)
Errors while running CTest

Copy link
Contributor Author

@tustvold tustvold Sep 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I ended up just using the version vended by Arch linux... Perhaps I will just update the instructions to do that 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah -- maybe docker run a arch linux image?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, although of course the formatting is now different... But at least we now have a one-liner

```

Then copy the generated `parquet.rs` into `src/format.rs` and commit changes.
3 changes: 1 addition & 2 deletions parquet/Cargo.toml
Expand Up @@ -31,9 +31,8 @@ rust-version = "1.62"

[dependencies]
ahash = "0.8"
parquet-format = { version = "4.0.0", default-features = false }
bytes = { version = "1.1", default-features = false, features = ["std"] }
thrift = { version = "0.13", default-features = false }
thrift = { version = "0.16", default-features = false }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

snap = { version = "1.0", default-features = false, optional = true }
brotli = { version = "3.3", default-features = false, features = ["std"], optional = true }
flate2 = { version = "1.0", default-features = false, features = ["rust_backend"], optional = true }
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/arrow/arrow_reader/mod.rs
Expand Up @@ -1668,7 +1668,7 @@ mod tests {
schema: TypePtr,
field: Option<Field>,
opts: &TestOptions,
) -> Result<parquet_format::FileMetaData> {
) -> Result<crate::format::FileMetaData> {
let mut writer_props = opts.writer_props();
if let Some(field) = field {
let arrow_schema = Schema::new(vec![field]);
Expand Down
4 changes: 2 additions & 2 deletions parquet/src/arrow/arrow_reader/selection.rs
Expand Up @@ -121,7 +121,7 @@ impl RowSelection {
#[cfg(any(test, feature = "async"))]
pub(crate) fn scan_ranges(
&self,
page_locations: &[parquet_format::PageLocation],
page_locations: &[crate::format::PageLocation],
) -> Vec<Range<usize>> {
let mut ranges = vec![];
let mut row_offset = 0;
Expand Down Expand Up @@ -302,7 +302,7 @@ impl From<RowSelection> for VecDeque<RowSelector> {
#[cfg(test)]
mod tests {
use super::*;
use parquet_format::PageLocation;
use crate::format::PageLocation;
use rand::{thread_rng, Rng};

#[test]
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/arrow/arrow_writer/mod.rs
Expand Up @@ -230,7 +230,7 @@ impl<W: Write> ArrowWriter<W> {
}

/// Close and finalize the underlying Parquet writer
pub fn close(mut self) -> Result<parquet_format::FileMetaData> {
pub fn close(mut self) -> Result<crate::format::FileMetaData> {
self.flush()?;
self.writer.close()
}
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/arrow/async_reader.rs
Expand Up @@ -84,11 +84,11 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use crate::format::OffsetIndex;
use bytes::{Buf, Bytes};
use futures::future::{BoxFuture, FutureExt};
use futures::ready;
use futures::stream::Stream;
use parquet_format::OffsetIndex;
use thrift::protocol::TCompactInputProtocol;

use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
Expand Down
534 changes: 288 additions & 246 deletions parquet/src/basic.rs

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions parquet/src/column/page.rs
Expand Up @@ -20,9 +20,9 @@
use crate::basic::{Encoding, PageType};
use crate::errors::{ParquetError, Result};
use crate::file::{metadata::ColumnChunkMetaData, statistics::Statistics};
use crate::format::PageHeader;
use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
use crate::util::memory::ByteBufferPtr;
use parquet_format::PageHeader;

/// Parquet Page definition.
///
Expand Down Expand Up @@ -209,15 +209,15 @@ impl TryFrom<&PageHeader> for PageMetadata {

fn try_from(value: &PageHeader) -> std::result::Result<Self, Self::Error> {
match value.type_ {
parquet_format::PageType::DataPage => Ok(PageMetadata {
crate::format::PageType::DATA_PAGE => Ok(PageMetadata {
num_rows: value.data_page_header.as_ref().unwrap().num_values as usize,
is_dict: false,
}),
parquet_format::PageType::DictionaryPage => Ok(PageMetadata {
crate::format::PageType::DICTIONARY_PAGE => Ok(PageMetadata {
num_rows: usize::MIN,
is_dict: true,
}),
parquet_format::PageType::DataPageV2 => Ok(PageMetadata {
crate::format::PageType::DATA_PAGE_V2 => Ok(PageMetadata {
num_rows: value.data_page_header_v2.as_ref().unwrap().num_rows as usize,
is_dict: false,
}),
Expand Down
6 changes: 3 additions & 3 deletions parquet/src/column/writer/mod.rs
Expand Up @@ -16,7 +16,7 @@
// under the License.

//! Contains column writer API.
use parquet_format::{ColumnIndex, OffsetIndex};
use crate::format::{ColumnIndex, OffsetIndex};
use std::collections::{BTreeSet, VecDeque};

use crate::basic::{Compression, ConvertedType, Encoding, LogicalType, PageType, Type};
Expand Down Expand Up @@ -1089,8 +1089,8 @@ fn compare_greater_byte_array_decimals(a: &[u8], b: &[u8]) -> bool {

#[cfg(test)]
mod tests {
use crate::format::BoundaryOrder;
use bytes::Bytes;
use parquet_format::BoundaryOrder;
use rand::distributions::uniform::SampleUniform;
use std::sync::Arc;

Expand Down Expand Up @@ -2086,7 +2086,7 @@ mod tests {
// column index
assert_eq!(2, column_index.null_pages.len());
assert_eq!(2, offset_index.page_locations.len());
assert_eq!(BoundaryOrder::Unordered, column_index.boundary_order);
assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
for idx in 0..2 {
assert!(!column_index.null_pages[idx]);
assert_eq!(0, column_index.null_counts.as_ref().unwrap()[idx]);
Expand Down
4 changes: 2 additions & 2 deletions parquet/src/file/footer.rs
Expand Up @@ -17,7 +17,7 @@

use std::{io::Read, sync::Arc};

use parquet_format::{ColumnOrder as TColumnOrder, FileMetaData as TFileMetaData};
use crate::format::{ColumnOrder as TColumnOrder, FileMetaData as TFileMetaData};
use thrift::protocol::TCompactInputProtocol;

use crate::basic::ColumnOrder;
Expand Down Expand Up @@ -149,8 +149,8 @@ mod tests {

use crate::basic::SortOrder;
use crate::basic::Type;
use crate::format::TypeDefinedOrder;
use crate::schema::types::Type as SchemaType;
use parquet_format::TypeDefinedOrder;

#[test]
fn test_parse_metadata_size_smaller_than_footer() {
Expand Down
21 changes: 13 additions & 8 deletions parquet/src/file/metadata.rs
Expand Up @@ -35,7 +35,7 @@

use std::sync::Arc;

use parquet_format::{
use crate::format::{
BoundaryOrder, ColumnChunk, ColumnIndex, ColumnMetaData, OffsetIndex, PageLocation,
RowGroup,
};
Expand Down Expand Up @@ -122,7 +122,7 @@ impl ParquetMetaData {
}
}

pub type KeyValue = parquet_format::KeyValue;
pub type KeyValue = crate::format::KeyValue;

/// Reference counted pointer for [`FileMetaData`].
pub type FileMetaDataPtr = Arc<FileMetaData>;
Expand Down Expand Up @@ -553,14 +553,14 @@ impl ColumnChunkMetaData {
return Err(general_err!("Expected to have column metadata"));
}
let mut col_metadata: ColumnMetaData = cc.meta_data.unwrap();
let column_type = Type::from(col_metadata.type_);
let column_type = Type::try_from(col_metadata.type_)?;
let column_path = ColumnPath::new(col_metadata.path_in_schema);
let encodings = col_metadata
.encodings
.drain(0..)
.map(Encoding::from)
.collect();
let compression = Compression::from(col_metadata.codec);
.map(Encoding::try_from)
.collect::<Result<_>>()?;
let compression = Compression::try_from(col_metadata.codec)?;
let file_path = cc.file_path;
let file_offset = cc.file_offset;
let num_values = col_metadata.num_values;
Expand All @@ -573,7 +573,12 @@ impl ColumnChunkMetaData {
let encoding_stats = col_metadata
.encoding_stats
.as_ref()
.map(|vec| vec.iter().map(page_encoding_stats::from_thrift).collect());
.map(|vec| {
vec.iter()
.map(page_encoding_stats::try_from_thrift)
.collect::<Result<_>>()
})
.transpose()?;
let bloom_filter_offset = col_metadata.bloom_filter_offset;
let offset_index_offset = cc.offset_index_offset;
let offset_index_length = cc.offset_index_length;
Expand Down Expand Up @@ -846,7 +851,7 @@ impl ColumnIndexBuilder {
null_pages: Vec::new(),
min_values: Vec::new(),
max_values: Vec::new(),
boundary_order: BoundaryOrder::Unordered,
boundary_order: BoundaryOrder::UNORDERED,
null_counts: Vec::new(),
valid: true,
}
Expand Down
17 changes: 10 additions & 7 deletions parquet/src/file/page_encoding_stats.rs
Expand Up @@ -16,7 +16,8 @@
// under the License.

use crate::basic::{Encoding, PageType};
use parquet_format::{
use crate::errors::Result;
use crate::format::{
Encoding as TEncoding, PageEncodingStats as TPageEncodingStats, PageType as TPageType,
};

Expand All @@ -32,16 +33,18 @@ pub struct PageEncodingStats {
}

/// Converts Thrift definition into `PageEncodingStats`.
pub fn from_thrift(thrift_encoding_stats: &TPageEncodingStats) -> PageEncodingStats {
let page_type = PageType::from(thrift_encoding_stats.page_type);
let encoding = Encoding::from(thrift_encoding_stats.encoding);
pub fn try_from_thrift(
thrift_encoding_stats: &TPageEncodingStats,
) -> Result<PageEncodingStats> {
let page_type = PageType::try_from(thrift_encoding_stats.page_type)?;
let encoding = Encoding::try_from(thrift_encoding_stats.encoding)?;
let count = thrift_encoding_stats.count;

PageEncodingStats {
Ok(PageEncodingStats {
page_type,
encoding,
count,
}
})
}

/// Converts `PageEncodingStats` into Thrift definition.
Expand Down Expand Up @@ -70,6 +73,6 @@ mod tests {
count: 1,
};

assert_eq!(from_thrift(&to_thrift(&stats)), stats);
assert_eq!(try_from_thrift(&to_thrift(&stats)).unwrap(), stats);
}
}
2 changes: 1 addition & 1 deletion parquet/src/file/page_index/index.rs
Expand Up @@ -19,8 +19,8 @@ use crate::basic::Type;
use crate::data_type::private::ParquetValueType;
use crate::data_type::Int96;
use crate::errors::ParquetError;
use crate::format::{BoundaryOrder, ColumnIndex};
use crate::util::bit_util::from_le_slice;
use parquet_format::{BoundaryOrder, ColumnIndex};
use std::fmt::Debug;

/// The statistics in one page
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/file/page_index/index_reader.rs
Expand Up @@ -21,7 +21,7 @@ use crate::errors::ParquetError;
use crate::file::metadata::ColumnChunkMetaData;
use crate::file::page_index::index::{BooleanIndex, ByteArrayIndex, Index, NativeIndex};
use crate::file::reader::ChunkReader;
use parquet_format::{ColumnIndex, OffsetIndex, PageLocation};
use crate::format::{ColumnIndex, OffsetIndex, PageLocation};
use std::io::{Cursor, Read};
use thrift::protocol::TCompactInputProtocol;

Expand Down
6 changes: 3 additions & 3 deletions parquet/src/file/page_index/range.rs
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
use crate::errors::ParquetError;
use parquet_format::PageLocation;
use crate::format::PageLocation;
use std::cmp::Ordering;
use std::collections::VecDeque;
use std::ops::RangeInclusive;
Expand Down Expand Up @@ -284,7 +284,7 @@ mod tests {
use crate::basic::Type::INT32;
use crate::file::page_index::index::{NativeIndex, PageIndex};
use crate::file::page_index::range::{compute_row_ranges, Range, RowRanges};
use parquet_format::{BoundaryOrder, PageLocation};
use crate::format::{BoundaryOrder, PageLocation};

#[test]
fn test_binary_search_overlap() {
Expand Down Expand Up @@ -445,7 +445,7 @@ mod tests {
null_count: Some(0),
},
],
boundary_order: BoundaryOrder::Ascending,
boundary_order: BoundaryOrder::ASCENDING,
};
let locations = &[
PageLocation {
Expand Down