From c5b7d41cdef01103247acff1951ec166b4c948af Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Fri, 2 Dec 2022 05:12:25 +0100 Subject: [PATCH] Bumped parquet2 dependency (#1304) --- Cargo.toml | 10 +++++++--- src/io/parquet/read/mod.rs | 3 +-- src/io/parquet/write/dictionary.rs | 10 +++++----- src/io/parquet/write/mod.rs | 18 +++++++++--------- src/io/parquet/write/pages.rs | 6 +++--- tests/it/io/parquet/read_indexes.rs | 9 +++------ 6 files changed, 28 insertions(+), 28 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 7cf1ccb2381..f0f2714a0fb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -77,9 +77,6 @@ futures = { version = "0.3", optional = true } # to read IPC as a stream async-stream = { version = "0.3.2", optional = true } -# parquet support -parquet2 = { version = "0.16", optional = true, default_features = false, features = ["async"] } - # avro support avro-schema = { version = "0.3", optional = true } @@ -108,6 +105,13 @@ getrandom = { version = "0.2", features = ["js"] } [target.'cfg(not(target_arch = "wasm32"))'.dependencies] ahash = { version = "0.8", features=["runtime-rng"] } +# parquet support +[dependencies.parquet2] +version = "0.17" +optional = true +default_features = false +features = ["async"] + [dev-dependencies] criterion = "0.3" flate2 = "1" diff --git a/src/io/parquet/read/mod.rs b/src/io/parquet/read/mod.rs index be0ddc9ac56..b8031a37580 100644 --- a/src/io/parquet/read/mod.rs +++ b/src/io/parquet/read/mod.rs @@ -22,8 +22,7 @@ pub use parquet2::{ decompress, get_column_iterator, get_page_stream, read_columns_indexes as _read_columns_indexes, read_metadata as _read_metadata, read_metadata_async as _read_metadata_async, read_pages_locations, BasicDecompressor, - ColumnChunkIter, Decompressor, MutStreamingIterator, PageFilter, PageReader, - ReadColumnIterator, State, + Decompressor, MutStreamingIterator, PageFilter, PageReader, ReadColumnIterator, State, }, schema::types::{ GroupLogicalType, ParquetType, PhysicalType, PrimitiveConvertedType, PrimitiveLogicalType, diff --git a/src/io/parquet/write/dictionary.rs b/src/io/parquet/write/dictionary.rs index 2abffd98a49..9f20d4692de 100644 --- a/src/io/parquet/write/dictionary.rs +++ b/src/io/parquet/write/dictionary.rs @@ -1,6 +1,6 @@ use parquet2::{ encoding::{hybrid_rle::encode_u32, Encoding}, - page::{DictPage, EncodedPage}, + page::{DictPage, Page}, schema::types::PrimitiveType, statistics::{serialize_statistics, ParquetStatistics}, write::DynIter, @@ -106,7 +106,7 @@ fn serialize_keys( nested: &[Nested], statistics: ParquetStatistics, options: WriteOptions, -) -> Result { +) -> Result { let mut buffer = vec![]; // parquet only accepts a single validity - we "&" the validities into a single one @@ -142,7 +142,7 @@ fn serialize_keys( options, Encoding::RleDictionary, ) - .map(EncodedPage::Data) + .map(Page::Data) } macro_rules! dyn_prim { @@ -162,7 +162,7 @@ pub fn array_to_pages( nested: &[Nested], options: WriteOptions, encoding: Encoding, -) -> Result>> { +) -> Result>> { match encoding { Encoding::PlainDictionary | Encoding::RleDictionary => { // write DictPage @@ -230,7 +230,7 @@ pub fn array_to_pages( ))) } }; - let dict_page = EncodedPage::Dict(dict_page); + let dict_page = Page::Dict(dict_page); // write DataPage pointing to DictPage let data_page = serialize_keys(array, type_, nested, statistics, options)?; diff --git a/src/io/parquet/write/mod.rs b/src/io/parquet/write/mod.rs index f48c1a67ea6..84b4f1cab58 100644 --- a/src/io/parquet/write/mod.rs +++ b/src/io/parquet/write/mod.rs @@ -38,7 +38,7 @@ pub use parquet2::{ encoding::Encoding, fallible_streaming_iterator, metadata::{Descriptor, FileMetaData, KeyValue, SchemaDescriptor, ThriftFileMetaData}, - page::{CompressedDataPage, CompressedPage, EncodedPage}, + page::{CompressedDataPage, CompressedPage, Page}, schema::types::{FieldInfo, ParquetType, PhysicalType as ParquetPhysicalType}, write::{ compress, write_metadata_sidecar, Compressor, DynIter, DynStreamingIterator, RowGroupIter, @@ -130,7 +130,7 @@ pub fn can_encode(data_type: &DataType, encoding: Encoding) -> bool { ) } -/// Returns an iterator of [`EncodedPage`]. +/// Returns an iterator of [`Page`]. #[allow(clippy::needless_collect)] pub fn array_to_pages( array: &dyn Array, @@ -138,7 +138,7 @@ pub fn array_to_pages( nested: &[Nested], options: WriteOptions, encoding: Encoding, -) -> Result>> { +) -> Result>> { // maximum page size is 2^31 e.g. i32::MAX // we split at 2^31 - 2^25 to err on the safe side // we also check for an array.len > 3 to prevent infinite recursion @@ -175,7 +175,7 @@ pub fn array_to_pages( ((array_byte_size as f64) / ((array.len() + 1) as f64)) as usize; let rows_per_page = (page_size / (bytes_per_row + 1)).max(1); - let vs: Vec> = (0..array.len()) + let vs: Vec> = (0..array.len()) .step_by(rows_per_page) .map(|offset| { let length = if offset + rows_per_page > array.len() { @@ -202,7 +202,7 @@ pub fn array_to_page( nested: &[Nested], options: WriteOptions, encoding: Encoding, -) -> Result { +) -> Result { if nested.len() == 1 { // special case where validity == def levels return array_to_page_simple(array, type_, options, encoding); @@ -216,7 +216,7 @@ pub fn array_to_page_simple( type_: ParquetPrimitiveType, options: WriteOptions, encoding: Encoding, -) -> Result { +) -> Result { let data_type = array.data_type(); if !can_encode(data_type, encoding) { return Err(Error::InvalidArgumentError(format!( @@ -439,7 +439,7 @@ pub fn array_to_page_simple( other ))), } - .map(EncodedPage::Data) + .map(Page::Data) } fn array_to_page_nested( @@ -448,7 +448,7 @@ fn array_to_page_nested( nested: &[Nested], options: WriteOptions, _encoding: Encoding, -) -> Result { +) -> Result { use DataType::*; match array.data_type().to_logical_type() { Null => { @@ -520,7 +520,7 @@ fn array_to_page_nested( other ))), } - .map(EncodedPage::Data) + .map(Page::Data) } fn transverse_recursive T + Clone>( diff --git a/src/io/parquet/write/pages.rs b/src/io/parquet/write/pages.rs index 5e0c80112da..caaa92866be 100644 --- a/src/io/parquet/write/pages.rs +++ b/src/io/parquet/write/pages.rs @@ -1,5 +1,5 @@ use parquet2::schema::types::{ParquetType, PrimitiveType as ParquetPrimitiveType}; -use parquet2::{page::EncodedPage, write::DynIter}; +use parquet2::{page::Page, write::DynIter}; use crate::array::{ListArray, Offset, StructArray}; use crate::bitmap::Bitmap; @@ -193,13 +193,13 @@ fn to_parquet_leafs_recursive(type_: ParquetType, leafs: &mut Vec + Send + Sync>( array: A, type_: ParquetType, options: WriteOptions, encoding: &[Encoding], -) -> Result>>> { +) -> Result>>> { let array = array.as_ref(); let nested = to_nested(array, &type_)?; diff --git a/tests/it/io/parquet/read_indexes.rs b/tests/it/io/parquet/read_indexes.rs index b17803b16fb..4e41bb2baf6 100644 --- a/tests/it/io/parquet/read_indexes.rs +++ b/tests/it/io/parquet/read_indexes.rs @@ -6,10 +6,7 @@ use arrow2::io::parquet::read::indexes; use arrow2::{array::*, datatypes::*, error::Result, io::parquet::read::*, io::parquet::write::*}; /// Returns 2 sets of pages with different the same number of rows distributed un-evenly -fn pages( - arrays: &[&dyn Array], - encoding: Encoding, -) -> Result<(Vec, Vec, Schema)> { +fn pages(arrays: &[&dyn Array], encoding: Encoding) -> Result<(Vec, Vec, Schema)> { // create pages with different number of rows let array11 = PrimitiveArray::::from_slice([1, 2, 3, 4]); let array12 = PrimitiveArray::::from_slice([5]); @@ -73,7 +70,7 @@ fn pages( /// Tests reading pages while skipping indexes fn read_with_indexes( - (pages1, pages2, schema): (Vec, Vec, Schema), + (pages1, pages2, schema): (Vec, Vec, Schema), expected: Box, ) -> Result<()> { let options = WriteOptions { @@ -83,7 +80,7 @@ fn read_with_indexes( data_pagesize_limit: None, }; - let to_compressed = |pages: Vec| { + let to_compressed = |pages: Vec| { let encoded_pages = DynIter::new(pages.into_iter().map(Ok)); let compressed_pages = Compressor::new(encoded_pages, options.compression, vec![]).map_err(Error::from);