Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
update_parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Nov 27, 2022
1 parent 368aacc commit 03f24bf
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 20 deletions.
12 changes: 9 additions & 3 deletions Cargo.toml
Expand Up @@ -77,9 +77,6 @@ futures = { version = "0.3", optional = true }
# to read IPC as a stream
async-stream = { version = "0.3.2", optional = true }

# parquet support
parquet2 = { version = "0.16", optional = true, default_features = false, features = ["async"] }

# avro support
avro-schema = { version = "0.3", optional = true }

Expand Down Expand Up @@ -108,6 +105,15 @@ getrandom = { version = "0.2", features = ["js"] }
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
ahash = { version = "0.8", features=["runtime-rng"] }

# parquet support
[dependencies.parquet2]
git = "https://github.com/jorgecarleitao/parquet2"
rev = "6acad3677964682e0f2d3051773a518fc5112a18"
version = "0.16"
optional = true
default_features = false
features = ["async"]

[dev-dependencies]
criterion = "0.3"
flate2 = "1"
Expand Down
10 changes: 5 additions & 5 deletions src/io/parquet/write/dictionary.rs
@@ -1,6 +1,6 @@
use parquet2::{
encoding::{hybrid_rle::encode_u32, Encoding},
page::{DictPage, EncodedPage},
page::{DictPage, Page},
schema::types::PrimitiveType,
statistics::{serialize_statistics, ParquetStatistics},
write::DynIter,
Expand Down Expand Up @@ -106,7 +106,7 @@ fn serialize_keys<K: DictionaryKey>(
nested: &[Nested],
statistics: ParquetStatistics,
options: WriteOptions,
) -> Result<EncodedPage> {
) -> Result<Page> {
let mut buffer = vec![];

// parquet only accepts a single validity - we "&" the validities into a single one
Expand Down Expand Up @@ -142,7 +142,7 @@ fn serialize_keys<K: DictionaryKey>(
options,
Encoding::RleDictionary,
)
.map(EncodedPage::Data)
.map(Page::Data)
}

macro_rules! dyn_prim {
Expand All @@ -162,7 +162,7 @@ pub fn array_to_pages<K: DictionaryKey>(
nested: &[Nested],
options: WriteOptions,
encoding: Encoding,
) -> Result<DynIter<'static, Result<EncodedPage>>> {
) -> Result<DynIter<'static, Result<Page>>> {
match encoding {
Encoding::PlainDictionary | Encoding::RleDictionary => {
// write DictPage
Expand Down Expand Up @@ -230,7 +230,7 @@ pub fn array_to_pages<K: DictionaryKey>(
)))
}
};
let dict_page = EncodedPage::Dict(dict_page);
let dict_page = Page::Dict(dict_page);

// write DataPage pointing to DictPage
let data_page = serialize_keys(array, type_, nested, statistics, options)?;
Expand Down
18 changes: 9 additions & 9 deletions src/io/parquet/write/mod.rs
Expand Up @@ -38,7 +38,7 @@ pub use parquet2::{
encoding::Encoding,
fallible_streaming_iterator,
metadata::{Descriptor, FileMetaData, KeyValue, SchemaDescriptor, ThriftFileMetaData},
page::{CompressedDataPage, CompressedPage, EncodedPage},
page::{CompressedDataPage, CompressedPage, Page},
schema::types::{FieldInfo, ParquetType, PhysicalType as ParquetPhysicalType},
write::{
compress, write_metadata_sidecar, Compressor, DynIter, DynStreamingIterator, RowGroupIter,
Expand Down Expand Up @@ -130,15 +130,15 @@ pub fn can_encode(data_type: &DataType, encoding: Encoding) -> bool {
)
}

/// Returns an iterator of [`EncodedPage`].
/// Returns an iterator of [`Page`].
#[allow(clippy::needless_collect)]
pub fn array_to_pages(
array: &dyn Array,
type_: ParquetPrimitiveType,
nested: &[Nested],
options: WriteOptions,
encoding: Encoding,
) -> Result<DynIter<'static, Result<EncodedPage>>> {
) -> Result<DynIter<'static, Result<Page>>> {
// maximum page size is 2^31 e.g. i32::MAX
// we split at 2^31 - 2^25 to err on the safe side
// we also check for an array.len > 3 to prevent infinite recursion
Expand Down Expand Up @@ -175,7 +175,7 @@ pub fn array_to_pages(
((array_byte_size as f64) / ((array.len() + 1) as f64)) as usize;
let rows_per_page = (page_size / (bytes_per_row + 1)).max(1);

let vs: Vec<Result<EncodedPage>> = (0..array.len())
let vs: Vec<Result<Page>> = (0..array.len())
.step_by(rows_per_page)
.map(|offset| {
let length = if offset + rows_per_page > array.len() {
Expand All @@ -202,7 +202,7 @@ pub fn array_to_page(
nested: &[Nested],
options: WriteOptions,
encoding: Encoding,
) -> Result<EncodedPage> {
) -> Result<Page> {
if nested.len() == 1 {
// special case where validity == def levels
return array_to_page_simple(array, type_, options, encoding);
Expand All @@ -216,7 +216,7 @@ pub fn array_to_page_simple(
type_: ParquetPrimitiveType,
options: WriteOptions,
encoding: Encoding,
) -> Result<EncodedPage> {
) -> Result<Page> {
let data_type = array.data_type();
if !can_encode(data_type, encoding) {
return Err(Error::InvalidArgumentError(format!(
Expand Down Expand Up @@ -439,7 +439,7 @@ pub fn array_to_page_simple(
other
))),
}
.map(EncodedPage::Data)
.map(Page::Data)
}

fn array_to_page_nested(
Expand All @@ -448,7 +448,7 @@ fn array_to_page_nested(
nested: &[Nested],
options: WriteOptions,
_encoding: Encoding,
) -> Result<EncodedPage> {
) -> Result<Page> {
use DataType::*;
match array.data_type().to_logical_type() {
Null => {
Expand Down Expand Up @@ -520,7 +520,7 @@ fn array_to_page_nested(
other
))),
}
.map(EncodedPage::Data)
.map(Page::Data)
}

fn transverse_recursive<T, F: Fn(&DataType) -> T + Clone>(
Expand Down
6 changes: 3 additions & 3 deletions src/io/parquet/write/pages.rs
@@ -1,5 +1,5 @@
use parquet2::schema::types::{ParquetType, PrimitiveType as ParquetPrimitiveType};
use parquet2::{page::EncodedPage, write::DynIter};
use parquet2::{page::Page, write::DynIter};

use crate::array::{ListArray, Offset, StructArray};
use crate::bitmap::Bitmap;
Expand Down Expand Up @@ -193,13 +193,13 @@ fn to_parquet_leafs_recursive(type_: ParquetType, leafs: &mut Vec<ParquetPrimiti
}
}

/// Returns a vector of iterators of [`EncodedPage`], one per leaf column in the array
/// Returns a vector of iterators of [`Page`], one per leaf column in the array
pub fn array_to_columns<A: AsRef<dyn Array> + Send + Sync>(
array: A,
type_: ParquetType,
options: WriteOptions,
encoding: &[Encoding],
) -> Result<Vec<DynIter<'static, Result<EncodedPage>>>> {
) -> Result<Vec<DynIter<'static, Result<Page>>>> {
let array = array.as_ref();
let nested = to_nested(array, &type_)?;

Expand Down

0 comments on commit 03f24bf

Please sign in to comment.