diff --git a/parquet/benches/arrow_reader.rs b/parquet/benches/arrow_reader.rs index 04e48baef70..647a8dc6f39 100644 --- a/parquet/benches/arrow_reader.rs +++ b/parquet/benches/arrow_reader.rs @@ -355,27 +355,6 @@ fn create_string_byte_array_dictionary_reader( .unwrap() } -fn create_complex_object_byte_array_dictionary_reader( - page_iterator: impl PageIterator + 'static, - column_desc: ColumnDescPtr, -) -> Box { - use parquet::arrow::array_reader::ComplexObjectArrayReader; - use parquet::arrow::converter::{Utf8ArrayConverter, Utf8Converter}; - let arrow_type = - DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)); - - let converter = Utf8Converter::new(Utf8ArrayConverter {}); - Box::new( - ComplexObjectArrayReader::::new( - Box::new(page_iterator), - column_desc, - converter, - Some(arrow_type), - ) - .unwrap(), - ) -} - fn bench_primitive( group: &mut BenchmarkGroup, schema: &SchemaDescPtr, @@ -678,18 +657,7 @@ fn add_benches(c: &mut Criterion) { let mut group = c.benchmark_group("arrow_array_reader/StringDictionary"); - group.bench_function("dictionary encoded, mandatory, no NULLs - old", |b| { - b.iter(|| { - let array_reader = create_complex_object_byte_array_dictionary_reader( - dictionary_string_no_null_data.clone(), - mandatory_string_column_desc.clone(), - ); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }); - - group.bench_function("dictionary encoded, mandatory, no NULLs - new", |b| { + group.bench_function("dictionary encoded, mandatory, no NULLs", |b| { b.iter(|| { let array_reader = create_string_byte_array_dictionary_reader( dictionary_string_no_null_data.clone(), @@ -700,18 +668,7 @@ fn add_benches(c: &mut Criterion) { assert_eq!(count, EXPECTED_VALUE_COUNT); }); - group.bench_function("dictionary encoded, optional, no NULLs - old", |b| { - b.iter(|| { - let array_reader = create_complex_object_byte_array_dictionary_reader( - dictionary_string_no_null_data.clone(), - optional_string_column_desc.clone(), - ); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }); - - group.bench_function("dictionary encoded, optional, no NULLs - new", |b| { + group.bench_function("dictionary encoded, optional, no NULLs", |b| { b.iter(|| { let array_reader = create_string_byte_array_dictionary_reader( dictionary_string_no_null_data.clone(), @@ -722,18 +679,7 @@ fn add_benches(c: &mut Criterion) { assert_eq!(count, EXPECTED_VALUE_COUNT); }); - group.bench_function("dictionary encoded, optional, half NULLs - old", |b| { - b.iter(|| { - let array_reader = create_complex_object_byte_array_dictionary_reader( - dictionary_string_half_null_data.clone(), - optional_string_column_desc.clone(), - ); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }); - - group.bench_function("dictionary encoded, optional, half NULLs - new", |b| { + group.bench_function("dictionary encoded, optional, half NULLs", |b| { b.iter(|| { let array_reader = create_string_byte_array_dictionary_reader( dictionary_string_half_null_data.clone(), diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs index 7b9adfc23f2..e8c22f95aa0 100644 --- a/parquet/src/arrow/array_reader/builder.rs +++ b/parquet/src/arrow/array_reader/builder.rs @@ -25,7 +25,7 @@ use crate::arrow::array_reader::{ ComplexObjectArrayReader, ListArrayReader, MapArrayReader, NullArrayReader, PrimitiveArrayReader, RowGroupCollection, StructArrayReader, }; -use crate::arrow::converter::{ +use crate::arrow::buffer::converter::{ DecimalArrayConverter, DecimalConverter, FixedLenBinaryConverter, FixedSizeArrayConverter, Int96ArrayConverter, Int96Converter, IntervalDayTimeArrayConverter, IntervalDayTimeConverter, diff --git a/parquet/src/arrow/array_reader/byte_array.rs b/parquet/src/arrow/array_reader/byte_array.rs index b3606a7808b..2e29b609474 100644 --- a/parquet/src/arrow/array_reader/byte_array.rs +++ b/parquet/src/arrow/array_reader/byte_array.rs @@ -15,8 +15,8 @@ // specific language governing permissions and limitations // under the License. -use crate::arrow::array_reader::offset_buffer::OffsetBuffer; use crate::arrow::array_reader::{read_records, ArrayReader}; +use crate::arrow::buffer::offset_buffer::OffsetBuffer; use crate::arrow::record_reader::buffer::ScalarValue; use crate::arrow::record_reader::GenericRecordReader; use crate::arrow::schema::parquet_to_arrow_field; diff --git a/parquet/src/arrow/array_reader/byte_array_dictionary.rs b/parquet/src/arrow/array_reader/byte_array_dictionary.rs index fe8448ffb31..0e64f0d25b7 100644 --- a/parquet/src/arrow/array_reader/byte_array_dictionary.rs +++ b/parquet/src/arrow/array_reader/byte_array_dictionary.rs @@ -24,12 +24,11 @@ use arrow::array::{Array, ArrayRef, OffsetSizeTrait}; use arrow::buffer::Buffer; use arrow::datatypes::{ArrowNativeType, DataType as ArrowType}; -use crate::arrow::array_reader::dictionary_buffer::DictionaryBuffer; -use crate::arrow::array_reader::{ - byte_array::{ByteArrayDecoder, ByteArrayDecoderPlain}, - offset_buffer::OffsetBuffer, -}; +use crate::arrow::array_reader::byte_array::{ByteArrayDecoder, ByteArrayDecoderPlain}; use crate::arrow::array_reader::{read_records, ArrayReader}; +use crate::arrow::buffer::{ + dictionary_buffer::DictionaryBuffer, offset_buffer::OffsetBuffer, +}; use crate::arrow::record_reader::buffer::{BufferQueue, ScalarValue}; use crate::arrow::record_reader::GenericRecordReader; use crate::arrow::schema::parquet_to_arrow_field; @@ -236,13 +235,13 @@ where fn new(col: &ColumnDescPtr) -> Self { let validate_utf8 = col.converted_type() == ConvertedType::UTF8; - let value_type = - match (V::IS_LARGE, col.converted_type() == ConvertedType::UTF8) { - (true, true) => ArrowType::LargeUtf8, - (true, false) => ArrowType::LargeBinary, - (false, true) => ArrowType::Utf8, - (false, false) => ArrowType::Binary, - }; + let value_type = match (V::IS_LARGE, col.converted_type() == ConvertedType::UTF8) + { + (true, true) => ArrowType::LargeUtf8, + (true, false) => ArrowType::LargeBinary, + (false, true) => ArrowType::Utf8, + (false, false) => ArrowType::Binary, + }; Self { dict: None, diff --git a/parquet/src/arrow/array_reader.rs b/parquet/src/arrow/array_reader/mod.rs similarity index 99% rename from parquet/src/arrow/array_reader.rs rename to parquet/src/arrow/array_reader/mod.rs index c70071dacf3..21c49b33878 100644 --- a/parquet/src/arrow/array_reader.rs +++ b/parquet/src/arrow/array_reader/mod.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +//! Logic for reading into arrow arrays + use std::any::Any; use std::cmp::max; use std::marker::PhantomData; @@ -34,7 +36,7 @@ use arrow::datatypes::{ UInt32Type as ArrowUInt32Type, UInt64Type as ArrowUInt64Type, }; -use crate::arrow::converter::Converter; +use crate::arrow::buffer::converter::Converter; use crate::arrow::record_reader::buffer::{ScalarValue, ValuesBuffer}; use crate::arrow::record_reader::{GenericRecordReader, RecordReader}; use crate::arrow::schema::parquet_to_arrow_field; @@ -50,11 +52,9 @@ use crate::schema::types::{ColumnDescPtr, SchemaDescPtr}; mod builder; mod byte_array; mod byte_array_dictionary; -mod dictionary_buffer; mod empty_array; mod list_array; mod map_array; -mod offset_buffer; #[cfg(test)] mod test_util; @@ -811,7 +811,7 @@ mod tests { TimestampMillisecondType as ArrowTimestampMillisecondType, }; - use crate::arrow::converter::{Utf8ArrayConverter, Utf8Converter}; + use crate::arrow::buffer::converter::{Utf8ArrayConverter, Utf8Converter}; use crate::basic::{Encoding, Type as PhysicalType}; use crate::column::page::Page; use crate::data_type::{ByteArray, ByteArrayType, DataType, Int32Type, Int64Type}; @@ -1384,8 +1384,7 @@ mod tests { let mut all_values = Vec::with_capacity(num_pages * values_per_page); for i in 0..num_pages { - let mut dict_encoder = - DictEncoder::::new(column_desc.clone()); + let mut dict_encoder = DictEncoder::::new(column_desc.clone()); // add data page let mut values = Vec::with_capacity(values_per_page); diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs index 1d56960cf16..92d4ff264c1 100644 --- a/parquet/src/arrow/arrow_reader.rs +++ b/parquet/src/arrow/arrow_reader.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! Contains reader which reads parquet data into arrow array. +//! Contains reader which reads parquet data into arrow [`RecordBatch`] use std::sync::Arc; @@ -294,7 +294,7 @@ mod tests { use crate::arrow::arrow_reader::{ ArrowReader, ArrowReaderOptions, ParquetFileArrowReader, }; - use crate::arrow::converter::{ + use crate::arrow::buffer::converter::{ BinaryArrayConverter, Converter, FixedSizeArrayConverter, FromConverter, IntervalDayTimeArrayConverter, LargeUtf8ArrayConverter, Utf8ArrayConverter, }; diff --git a/parquet/src/arrow/levels.rs b/parquet/src/arrow/arrow_writer/levels.rs similarity index 100% rename from parquet/src/arrow/levels.rs rename to parquet/src/arrow/arrow_writer/levels.rs diff --git a/parquet/src/arrow/arrow_writer.rs b/parquet/src/arrow/arrow_writer/mod.rs similarity index 99% rename from parquet/src/arrow/arrow_writer.rs rename to parquet/src/arrow/arrow_writer/mod.rs index 334c7237d70..44631e57409 100644 --- a/parquet/src/arrow/arrow_writer.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -27,19 +27,20 @@ use arrow::datatypes::{DataType as ArrowDataType, IntervalUnit, SchemaRef}; use arrow::record_batch::RecordBatch; use arrow_array::Array; -use super::levels::LevelInfo; use super::schema::{ add_encoded_arrow_schema_to_metadata, arrow_to_parquet_schema, decimal_length_from_precision, }; -use crate::arrow::levels::calculate_array_levels; use crate::column::writer::ColumnWriter; use crate::errors::{ParquetError, Result}; use crate::file::metadata::RowGroupMetaDataPtr; use crate::file::properties::WriterProperties; use crate::file::writer::{SerializedColumnWriter, SerializedRowGroupWriter}; use crate::{data_type::*, file::writer::SerializedFileWriter}; +use levels::{calculate_array_levels, LevelInfo}; + +mod levels; /// Arrow writer /// diff --git a/parquet/src/arrow/bit_util.rs b/parquet/src/arrow/buffer/bit_util.rs similarity index 100% rename from parquet/src/arrow/bit_util.rs rename to parquet/src/arrow/buffer/bit_util.rs diff --git a/parquet/src/arrow/converter.rs b/parquet/src/arrow/buffer/converter.rs similarity index 100% rename from parquet/src/arrow/converter.rs rename to parquet/src/arrow/buffer/converter.rs diff --git a/parquet/src/arrow/array_reader/dictionary_buffer.rs b/parquet/src/arrow/buffer/dictionary_buffer.rs similarity index 99% rename from parquet/src/arrow/array_reader/dictionary_buffer.rs rename to parquet/src/arrow/buffer/dictionary_buffer.rs index 6dc9cc80f39..7f445850700 100644 --- a/parquet/src/arrow/array_reader/dictionary_buffer.rs +++ b/parquet/src/arrow/buffer/dictionary_buffer.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::arrow::array_reader::offset_buffer::OffsetBuffer; +use crate::arrow::buffer::offset_buffer::OffsetBuffer; use crate::arrow::record_reader::buffer::{ BufferQueue, ScalarBuffer, ScalarValue, ValuesBuffer, }; diff --git a/parquet/src/arrow/buffer/mod.rs b/parquet/src/arrow/buffer/mod.rs new file mode 100644 index 00000000000..5ee89aa1a78 --- /dev/null +++ b/parquet/src/arrow/buffer/mod.rs @@ -0,0 +1,23 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Logic for reading data into arrow buffers + +pub mod bit_util; +pub mod converter; +pub mod dictionary_buffer; +pub mod offset_buffer; diff --git a/parquet/src/arrow/array_reader/offset_buffer.rs b/parquet/src/arrow/buffer/offset_buffer.rs similarity index 98% rename from parquet/src/arrow/array_reader/offset_buffer.rs rename to parquet/src/arrow/buffer/offset_buffer.rs index 23e7af7595c..2d73e3f146b 100644 --- a/parquet/src/arrow/array_reader/offset_buffer.rs +++ b/parquet/src/arrow/buffer/offset_buffer.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::arrow::bit_util::iter_set_bits_rev; +use crate::arrow::buffer::bit_util::iter_set_bits_rev; use crate::arrow::record_reader::buffer::{ BufferQueue, ScalarBuffer, ScalarValue, ValuesBuffer, }; @@ -58,7 +58,7 @@ impl OffsetBuffer { /// the start of a UTF-8 codepoint /// /// Note: This does not verify that the entirety of `data` is valid - /// UTF-8. This should be done by calling [`Self::values_as_str`] after + /// UTF-8. This should be done by calling [`Self::check_valid_utf8`] after /// all data has been written pub fn try_push(&mut self, data: &[u8], validate_utf8: bool) -> Result<()> { if validate_utf8 { diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs index c9cc0ff6ce3..3aee7cf42cb 100644 --- a/parquet/src/arrow/mod.rs +++ b/parquet/src/arrow/mod.rs @@ -122,14 +122,12 @@ experimental_mod!(array_reader); pub mod arrow_reader; pub mod arrow_writer; -mod bit_util; +mod buffer; #[cfg(feature = "async")] pub mod async_reader; -experimental_mod!(converter); -pub(in crate::arrow) mod levels; -pub(in crate::arrow) mod record_reader; +mod record_reader; experimental_mod!(schema); pub use self::arrow_reader::ArrowReader; diff --git a/parquet/src/arrow/record_reader/buffer.rs b/parquet/src/arrow/record_reader/buffer.rs index 4fa8213dedc..fa0f919916e 100644 --- a/parquet/src/arrow/record_reader/buffer.rs +++ b/parquet/src/arrow/record_reader/buffer.rs @@ -17,7 +17,7 @@ use std::marker::PhantomData; -use crate::arrow::bit_util::iter_set_bits_rev; +use crate::arrow::buffer::bit_util::iter_set_bits_rev; use arrow::buffer::{Buffer, MutableBuffer}; use arrow::datatypes::ToByteSlice; diff --git a/parquet/src/arrow/record_reader/definition_levels.rs b/parquet/src/arrow/record_reader/definition_levels.rs index 93de4006c10..9cca25c8ae5 100644 --- a/parquet/src/arrow/record_reader/definition_levels.rs +++ b/parquet/src/arrow/record_reader/definition_levels.rs @@ -21,7 +21,7 @@ use arrow::array::BooleanBufferBuilder; use arrow::bitmap::Bitmap; use arrow::buffer::Buffer; -use crate::arrow::bit_util::count_set_bits; +use crate::arrow::buffer::bit_util::count_set_bits; use crate::arrow::record_reader::buffer::BufferQueue; use crate::basic::Encoding; use crate::column::reader::decoder::{ diff --git a/parquet/src/arrow/record_reader.rs b/parquet/src/arrow/record_reader/mod.rs similarity index 100% rename from parquet/src/arrow/record_reader.rs rename to parquet/src/arrow/record_reader/mod.rs