Skip to content

Commit

Permalink
Bumped parquet2 dependency (jorgecarleitao#1304)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Mar 29, 2023
1 parent 80cbfa0 commit c5b7d41
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 28 deletions.
10 changes: 7 additions & 3 deletions Cargo.toml
Expand Up @@ -77,9 +77,6 @@ futures = { version = "0.3", optional = true }
# to read IPC as a stream
async-stream = { version = "0.3.2", optional = true }

# parquet support
parquet2 = { version = "0.16", optional = true, default_features = false, features = ["async"] }

# avro support
avro-schema = { version = "0.3", optional = true }

Expand Down Expand Up @@ -108,6 +105,13 @@ getrandom = { version = "0.2", features = ["js"] }
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
ahash = { version = "0.8", features=["runtime-rng"] }

# parquet support
[dependencies.parquet2]
version = "0.17"
optional = true
default_features = false
features = ["async"]

[dev-dependencies]
criterion = "0.3"
flate2 = "1"
Expand Down
3 changes: 1 addition & 2 deletions src/io/parquet/read/mod.rs
Expand Up @@ -22,8 +22,7 @@ pub use parquet2::{
decompress, get_column_iterator, get_page_stream,
read_columns_indexes as _read_columns_indexes, read_metadata as _read_metadata,
read_metadata_async as _read_metadata_async, read_pages_locations, BasicDecompressor,
ColumnChunkIter, Decompressor, MutStreamingIterator, PageFilter, PageReader,
ReadColumnIterator, State,
Decompressor, MutStreamingIterator, PageFilter, PageReader, ReadColumnIterator, State,
},
schema::types::{
GroupLogicalType, ParquetType, PhysicalType, PrimitiveConvertedType, PrimitiveLogicalType,
Expand Down
10 changes: 5 additions & 5 deletions src/io/parquet/write/dictionary.rs
@@ -1,6 +1,6 @@
use parquet2::{
encoding::{hybrid_rle::encode_u32, Encoding},
page::{DictPage, EncodedPage},
page::{DictPage, Page},
schema::types::PrimitiveType,
statistics::{serialize_statistics, ParquetStatistics},
write::DynIter,
Expand Down Expand Up @@ -106,7 +106,7 @@ fn serialize_keys<K: DictionaryKey>(
nested: &[Nested],
statistics: ParquetStatistics,
options: WriteOptions,
) -> Result<EncodedPage> {
) -> Result<Page> {
let mut buffer = vec![];

// parquet only accepts a single validity - we "&" the validities into a single one
Expand Down Expand Up @@ -142,7 +142,7 @@ fn serialize_keys<K: DictionaryKey>(
options,
Encoding::RleDictionary,
)
.map(EncodedPage::Data)
.map(Page::Data)
}

macro_rules! dyn_prim {
Expand All @@ -162,7 +162,7 @@ pub fn array_to_pages<K: DictionaryKey>(
nested: &[Nested],
options: WriteOptions,
encoding: Encoding,
) -> Result<DynIter<'static, Result<EncodedPage>>> {
) -> Result<DynIter<'static, Result<Page>>> {
match encoding {
Encoding::PlainDictionary | Encoding::RleDictionary => {
// write DictPage
Expand Down Expand Up @@ -230,7 +230,7 @@ pub fn array_to_pages<K: DictionaryKey>(
)))
}
};
let dict_page = EncodedPage::Dict(dict_page);
let dict_page = Page::Dict(dict_page);

// write DataPage pointing to DictPage
let data_page = serialize_keys(array, type_, nested, statistics, options)?;
Expand Down
18 changes: 9 additions & 9 deletions src/io/parquet/write/mod.rs
Expand Up @@ -38,7 +38,7 @@ pub use parquet2::{
encoding::Encoding,
fallible_streaming_iterator,
metadata::{Descriptor, FileMetaData, KeyValue, SchemaDescriptor, ThriftFileMetaData},
page::{CompressedDataPage, CompressedPage, EncodedPage},
page::{CompressedDataPage, CompressedPage, Page},
schema::types::{FieldInfo, ParquetType, PhysicalType as ParquetPhysicalType},
write::{
compress, write_metadata_sidecar, Compressor, DynIter, DynStreamingIterator, RowGroupIter,
Expand Down Expand Up @@ -130,15 +130,15 @@ pub fn can_encode(data_type: &DataType, encoding: Encoding) -> bool {
)
}

/// Returns an iterator of [`EncodedPage`].
/// Returns an iterator of [`Page`].
#[allow(clippy::needless_collect)]
pub fn array_to_pages(
array: &dyn Array,
type_: ParquetPrimitiveType,
nested: &[Nested],
options: WriteOptions,
encoding: Encoding,
) -> Result<DynIter<'static, Result<EncodedPage>>> {
) -> Result<DynIter<'static, Result<Page>>> {
// maximum page size is 2^31 e.g. i32::MAX
// we split at 2^31 - 2^25 to err on the safe side
// we also check for an array.len > 3 to prevent infinite recursion
Expand Down Expand Up @@ -175,7 +175,7 @@ pub fn array_to_pages(
((array_byte_size as f64) / ((array.len() + 1) as f64)) as usize;
let rows_per_page = (page_size / (bytes_per_row + 1)).max(1);

let vs: Vec<Result<EncodedPage>> = (0..array.len())
let vs: Vec<Result<Page>> = (0..array.len())
.step_by(rows_per_page)
.map(|offset| {
let length = if offset + rows_per_page > array.len() {
Expand All @@ -202,7 +202,7 @@ pub fn array_to_page(
nested: &[Nested],
options: WriteOptions,
encoding: Encoding,
) -> Result<EncodedPage> {
) -> Result<Page> {
if nested.len() == 1 {
// special case where validity == def levels
return array_to_page_simple(array, type_, options, encoding);
Expand All @@ -216,7 +216,7 @@ pub fn array_to_page_simple(
type_: ParquetPrimitiveType,
options: WriteOptions,
encoding: Encoding,
) -> Result<EncodedPage> {
) -> Result<Page> {
let data_type = array.data_type();
if !can_encode(data_type, encoding) {
return Err(Error::InvalidArgumentError(format!(
Expand Down Expand Up @@ -439,7 +439,7 @@ pub fn array_to_page_simple(
other
))),
}
.map(EncodedPage::Data)
.map(Page::Data)
}

fn array_to_page_nested(
Expand All @@ -448,7 +448,7 @@ fn array_to_page_nested(
nested: &[Nested],
options: WriteOptions,
_encoding: Encoding,
) -> Result<EncodedPage> {
) -> Result<Page> {
use DataType::*;
match array.data_type().to_logical_type() {
Null => {
Expand Down Expand Up @@ -520,7 +520,7 @@ fn array_to_page_nested(
other
))),
}
.map(EncodedPage::Data)
.map(Page::Data)
}

fn transverse_recursive<T, F: Fn(&DataType) -> T + Clone>(
Expand Down
6 changes: 3 additions & 3 deletions src/io/parquet/write/pages.rs
@@ -1,5 +1,5 @@
use parquet2::schema::types::{ParquetType, PrimitiveType as ParquetPrimitiveType};
use parquet2::{page::EncodedPage, write::DynIter};
use parquet2::{page::Page, write::DynIter};

use crate::array::{ListArray, Offset, StructArray};
use crate::bitmap::Bitmap;
Expand Down Expand Up @@ -193,13 +193,13 @@ fn to_parquet_leafs_recursive(type_: ParquetType, leafs: &mut Vec<ParquetPrimiti
}
}

/// Returns a vector of iterators of [`EncodedPage`], one per leaf column in the array
/// Returns a vector of iterators of [`Page`], one per leaf column in the array
pub fn array_to_columns<A: AsRef<dyn Array> + Send + Sync>(
array: A,
type_: ParquetType,
options: WriteOptions,
encoding: &[Encoding],
) -> Result<Vec<DynIter<'static, Result<EncodedPage>>>> {
) -> Result<Vec<DynIter<'static, Result<Page>>>> {
let array = array.as_ref();
let nested = to_nested(array, &type_)?;

Expand Down
9 changes: 3 additions & 6 deletions tests/it/io/parquet/read_indexes.rs
Expand Up @@ -6,10 +6,7 @@ use arrow2::io::parquet::read::indexes;
use arrow2::{array::*, datatypes::*, error::Result, io::parquet::read::*, io::parquet::write::*};

/// Returns 2 sets of pages with different the same number of rows distributed un-evenly
fn pages(
arrays: &[&dyn Array],
encoding: Encoding,
) -> Result<(Vec<EncodedPage>, Vec<EncodedPage>, Schema)> {
fn pages(arrays: &[&dyn Array], encoding: Encoding) -> Result<(Vec<Page>, Vec<Page>, Schema)> {
// create pages with different number of rows
let array11 = PrimitiveArray::<i64>::from_slice([1, 2, 3, 4]);
let array12 = PrimitiveArray::<i64>::from_slice([5]);
Expand Down Expand Up @@ -73,7 +70,7 @@ fn pages(

/// Tests reading pages while skipping indexes
fn read_with_indexes(
(pages1, pages2, schema): (Vec<EncodedPage>, Vec<EncodedPage>, Schema),
(pages1, pages2, schema): (Vec<Page>, Vec<Page>, Schema),
expected: Box<dyn Array>,
) -> Result<()> {
let options = WriteOptions {
Expand All @@ -83,7 +80,7 @@ fn read_with_indexes(
data_pagesize_limit: None,
};

let to_compressed = |pages: Vec<EncodedPage>| {
let to_compressed = |pages: Vec<Page>| {
let encoded_pages = DynIter::new(pages.into_iter().map(Ok));
let compressed_pages =
Compressor::new(encoded_pages, options.compression, vec![]).map_err(Error::from);
Expand Down

0 comments on commit c5b7d41

Please sign in to comment.