Skip to content

Commit

Permalink
Refactor parquet::arrow module (#1827)
Browse files Browse the repository at this point in the history
* Refactor parquet::arrow module

* Fix doc

* Remove legacy benchmarks
  • Loading branch information
tustvold committed Jun 11, 2022
1 parent c08e532 commit 029203e
Show file tree
Hide file tree
Showing 17 changed files with 56 additions and 90 deletions.
60 changes: 3 additions & 57 deletions parquet/benches/arrow_reader.rs
Expand Up @@ -355,27 +355,6 @@ fn create_string_byte_array_dictionary_reader(
.unwrap()
}

fn create_complex_object_byte_array_dictionary_reader(
page_iterator: impl PageIterator + 'static,
column_desc: ColumnDescPtr,
) -> Box<dyn ArrayReader> {
use parquet::arrow::array_reader::ComplexObjectArrayReader;
use parquet::arrow::converter::{Utf8ArrayConverter, Utf8Converter};
let arrow_type =
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));

let converter = Utf8Converter::new(Utf8ArrayConverter {});
Box::new(
ComplexObjectArrayReader::<ByteArrayType, Utf8Converter>::new(
Box::new(page_iterator),
column_desc,
converter,
Some(arrow_type),
)
.unwrap(),
)
}

fn bench_primitive<T>(
group: &mut BenchmarkGroup<WallTime>,
schema: &SchemaDescPtr,
Expand Down Expand Up @@ -678,18 +657,7 @@ fn add_benches(c: &mut Criterion) {

let mut group = c.benchmark_group("arrow_array_reader/StringDictionary");

group.bench_function("dictionary encoded, mandatory, no NULLs - old", |b| {
b.iter(|| {
let array_reader = create_complex_object_byte_array_dictionary_reader(
dictionary_string_no_null_data.clone(),
mandatory_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});

group.bench_function("dictionary encoded, mandatory, no NULLs - new", |b| {
group.bench_function("dictionary encoded, mandatory, no NULLs", |b| {
b.iter(|| {
let array_reader = create_string_byte_array_dictionary_reader(
dictionary_string_no_null_data.clone(),
Expand All @@ -700,18 +668,7 @@ fn add_benches(c: &mut Criterion) {
assert_eq!(count, EXPECTED_VALUE_COUNT);
});

group.bench_function("dictionary encoded, optional, no NULLs - old", |b| {
b.iter(|| {
let array_reader = create_complex_object_byte_array_dictionary_reader(
dictionary_string_no_null_data.clone(),
optional_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});

group.bench_function("dictionary encoded, optional, no NULLs - new", |b| {
group.bench_function("dictionary encoded, optional, no NULLs", |b| {
b.iter(|| {
let array_reader = create_string_byte_array_dictionary_reader(
dictionary_string_no_null_data.clone(),
Expand All @@ -722,18 +679,7 @@ fn add_benches(c: &mut Criterion) {
assert_eq!(count, EXPECTED_VALUE_COUNT);
});

group.bench_function("dictionary encoded, optional, half NULLs - old", |b| {
b.iter(|| {
let array_reader = create_complex_object_byte_array_dictionary_reader(
dictionary_string_half_null_data.clone(),
optional_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});

group.bench_function("dictionary encoded, optional, half NULLs - new", |b| {
group.bench_function("dictionary encoded, optional, half NULLs", |b| {
b.iter(|| {
let array_reader = create_string_byte_array_dictionary_reader(
dictionary_string_half_null_data.clone(),
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/arrow/array_reader/builder.rs
Expand Up @@ -25,7 +25,7 @@ use crate::arrow::array_reader::{
ComplexObjectArrayReader, ListArrayReader, MapArrayReader, NullArrayReader,
PrimitiveArrayReader, RowGroupCollection, StructArrayReader,
};
use crate::arrow::converter::{
use crate::arrow::buffer::converter::{
DecimalArrayConverter, DecimalConverter, FixedLenBinaryConverter,
FixedSizeArrayConverter, Int96ArrayConverter, Int96Converter,
IntervalDayTimeArrayConverter, IntervalDayTimeConverter,
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/arrow/array_reader/byte_array.rs
Expand Up @@ -15,8 +15,8 @@
// specific language governing permissions and limitations
// under the License.

use crate::arrow::array_reader::offset_buffer::OffsetBuffer;
use crate::arrow::array_reader::{read_records, ArrayReader};
use crate::arrow::buffer::offset_buffer::OffsetBuffer;
use crate::arrow::record_reader::buffer::ScalarValue;
use crate::arrow::record_reader::GenericRecordReader;
use crate::arrow::schema::parquet_to_arrow_field;
Expand Down
23 changes: 11 additions & 12 deletions parquet/src/arrow/array_reader/byte_array_dictionary.rs
Expand Up @@ -24,12 +24,11 @@ use arrow::array::{Array, ArrayRef, OffsetSizeTrait};
use arrow::buffer::Buffer;
use arrow::datatypes::{ArrowNativeType, DataType as ArrowType};

use crate::arrow::array_reader::dictionary_buffer::DictionaryBuffer;
use crate::arrow::array_reader::{
byte_array::{ByteArrayDecoder, ByteArrayDecoderPlain},
offset_buffer::OffsetBuffer,
};
use crate::arrow::array_reader::byte_array::{ByteArrayDecoder, ByteArrayDecoderPlain};
use crate::arrow::array_reader::{read_records, ArrayReader};
use crate::arrow::buffer::{
dictionary_buffer::DictionaryBuffer, offset_buffer::OffsetBuffer,
};
use crate::arrow::record_reader::buffer::{BufferQueue, ScalarValue};
use crate::arrow::record_reader::GenericRecordReader;
use crate::arrow::schema::parquet_to_arrow_field;
Expand Down Expand Up @@ -236,13 +235,13 @@ where
fn new(col: &ColumnDescPtr) -> Self {
let validate_utf8 = col.converted_type() == ConvertedType::UTF8;

let value_type =
match (V::IS_LARGE, col.converted_type() == ConvertedType::UTF8) {
(true, true) => ArrowType::LargeUtf8,
(true, false) => ArrowType::LargeBinary,
(false, true) => ArrowType::Utf8,
(false, false) => ArrowType::Binary,
};
let value_type = match (V::IS_LARGE, col.converted_type() == ConvertedType::UTF8)
{
(true, true) => ArrowType::LargeUtf8,
(true, false) => ArrowType::LargeBinary,
(false, true) => ArrowType::Utf8,
(false, false) => ArrowType::Binary,
};

Self {
dict: None,
Expand Down
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

//! Logic for reading into arrow arrays

use std::any::Any;
use std::cmp::max;
use std::marker::PhantomData;
Expand All @@ -34,7 +36,7 @@ use arrow::datatypes::{
UInt32Type as ArrowUInt32Type, UInt64Type as ArrowUInt64Type,
};

use crate::arrow::converter::Converter;
use crate::arrow::buffer::converter::Converter;
use crate::arrow::record_reader::buffer::{ScalarValue, ValuesBuffer};
use crate::arrow::record_reader::{GenericRecordReader, RecordReader};
use crate::arrow::schema::parquet_to_arrow_field;
Expand All @@ -50,11 +52,9 @@ use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
mod builder;
mod byte_array;
mod byte_array_dictionary;
mod dictionary_buffer;
mod empty_array;
mod list_array;
mod map_array;
mod offset_buffer;

#[cfg(test)]
mod test_util;
Expand Down Expand Up @@ -811,7 +811,7 @@ mod tests {
TimestampMillisecondType as ArrowTimestampMillisecondType,
};

use crate::arrow::converter::{Utf8ArrayConverter, Utf8Converter};
use crate::arrow::buffer::converter::{Utf8ArrayConverter, Utf8Converter};
use crate::basic::{Encoding, Type as PhysicalType};
use crate::column::page::Page;
use crate::data_type::{ByteArray, ByteArrayType, DataType, Int32Type, Int64Type};
Expand Down Expand Up @@ -1384,8 +1384,7 @@ mod tests {
let mut all_values = Vec::with_capacity(num_pages * values_per_page);

for i in 0..num_pages {
let mut dict_encoder =
DictEncoder::<ByteArrayType>::new(column_desc.clone());
let mut dict_encoder = DictEncoder::<ByteArrayType>::new(column_desc.clone());
// add data page
let mut values = Vec::with_capacity(values_per_page);

Expand Down
4 changes: 2 additions & 2 deletions parquet/src/arrow/arrow_reader.rs
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! Contains reader which reads parquet data into arrow array.
//! Contains reader which reads parquet data into arrow [`RecordBatch`]

use std::sync::Arc;

Expand Down Expand Up @@ -294,7 +294,7 @@ mod tests {
use crate::arrow::arrow_reader::{
ArrowReader, ArrowReaderOptions, ParquetFileArrowReader,
};
use crate::arrow::converter::{
use crate::arrow::buffer::converter::{
BinaryArrayConverter, Converter, FixedSizeArrayConverter, FromConverter,
IntervalDayTimeArrayConverter, LargeUtf8ArrayConverter, Utf8ArrayConverter,
};
Expand Down
File renamed without changes.
Expand Up @@ -27,19 +27,20 @@ use arrow::datatypes::{DataType as ArrowDataType, IntervalUnit, SchemaRef};
use arrow::record_batch::RecordBatch;
use arrow_array::Array;

use super::levels::LevelInfo;
use super::schema::{
add_encoded_arrow_schema_to_metadata, arrow_to_parquet_schema,
decimal_length_from_precision,
};

use crate::arrow::levels::calculate_array_levels;
use crate::column::writer::ColumnWriter;
use crate::errors::{ParquetError, Result};
use crate::file::metadata::RowGroupMetaDataPtr;
use crate::file::properties::WriterProperties;
use crate::file::writer::{SerializedColumnWriter, SerializedRowGroupWriter};
use crate::{data_type::*, file::writer::SerializedFileWriter};
use levels::{calculate_array_levels, LevelInfo};

mod levels;

/// Arrow writer
///
Expand Down
File renamed without changes.
File renamed without changes.
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::arrow::array_reader::offset_buffer::OffsetBuffer;
use crate::arrow::buffer::offset_buffer::OffsetBuffer;
use crate::arrow::record_reader::buffer::{
BufferQueue, ScalarBuffer, ScalarValue, ValuesBuffer,
};
Expand Down
23 changes: 23 additions & 0 deletions parquet/src/arrow/buffer/mod.rs
@@ -0,0 +1,23 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Logic for reading data into arrow buffers

pub mod bit_util;
pub mod converter;
pub mod dictionary_buffer;
pub mod offset_buffer;
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::arrow::bit_util::iter_set_bits_rev;
use crate::arrow::buffer::bit_util::iter_set_bits_rev;
use crate::arrow::record_reader::buffer::{
BufferQueue, ScalarBuffer, ScalarValue, ValuesBuffer,
};
Expand Down Expand Up @@ -58,7 +58,7 @@ impl<I: OffsetSizeTrait + ScalarValue> OffsetBuffer<I> {
/// the start of a UTF-8 codepoint
///
/// Note: This does not verify that the entirety of `data` is valid
/// UTF-8. This should be done by calling [`Self::values_as_str`] after
/// UTF-8. This should be done by calling [`Self::check_valid_utf8`] after
/// all data has been written
pub fn try_push(&mut self, data: &[u8], validate_utf8: bool) -> Result<()> {
if validate_utf8 {
Expand Down
6 changes: 2 additions & 4 deletions parquet/src/arrow/mod.rs
Expand Up @@ -122,14 +122,12 @@
experimental_mod!(array_reader);
pub mod arrow_reader;
pub mod arrow_writer;
mod bit_util;
mod buffer;

#[cfg(feature = "async")]
pub mod async_reader;

experimental_mod!(converter);
pub(in crate::arrow) mod levels;
pub(in crate::arrow) mod record_reader;
mod record_reader;
experimental_mod!(schema);

pub use self::arrow_reader::ArrowReader;
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/arrow/record_reader/buffer.rs
Expand Up @@ -17,7 +17,7 @@

use std::marker::PhantomData;

use crate::arrow::bit_util::iter_set_bits_rev;
use crate::arrow::buffer::bit_util::iter_set_bits_rev;
use arrow::buffer::{Buffer, MutableBuffer};
use arrow::datatypes::ToByteSlice;

Expand Down
2 changes: 1 addition & 1 deletion parquet/src/arrow/record_reader/definition_levels.rs
Expand Up @@ -21,7 +21,7 @@ use arrow::array::BooleanBufferBuilder;
use arrow::bitmap::Bitmap;
use arrow::buffer::Buffer;

use crate::arrow::bit_util::count_set_bits;
use crate::arrow::buffer::bit_util::count_set_bits;
use crate::arrow::record_reader::buffer::BufferQueue;
use crate::basic::Encoding;
use crate::column::reader::decoder::{
Expand Down
File renamed without changes.

0 comments on commit 029203e

Please sign in to comment.