diff --git a/.github/workflows/miri.sh b/.github/workflows/miri.sh index 27c6f5eecc8..14c5efd7e3b 100755 --- a/.github/workflows/miri.sh +++ b/.github/workflows/miri.sh @@ -6,21 +6,13 @@ # rustup default nightly -export MIRIFLAGS="-Zmiri-disable-isolation" +# stacked borrows checking uses too much memory to run successfully in github actions +# re-enable if the CI is migrated to something more powerful (https://github.com/apache/arrow-rs/issues/1833) +# see also https://github.com/rust-lang/miri/issues/1367 +export MIRIFLAGS="-Zmiri-disable-isolation -Zmiri-disable-stacked-borrows" cargo miri setup cargo clean -run_miri() { - # Currently only the arrow crate is tested with miri - # IO related tests and some unsupported tests are skipped - cargo miri test -p arrow -- --skip csv --skip ipc --skip json -} - -# If MIRI fails, automatically retry -# Seems like miri is occasionally killed by the github runner -# https://github.com/apache/arrow-rs/issues/879 -for i in `seq 1 5`; do - echo "Starting Arrow MIRI run..." - run_miri && break - echo "foo" > /tmp/data.txt -done +echo "Starting Arrow MIRI run..." +cargo miri test -p arrow -- --skip csv --skip ipc --skip json +echo "Miri finished with exit code $?" diff --git a/arrow-flight/src/arrow.flight.protocol.rs b/arrow-flight/src/arrow.flight.protocol.rs index bbca033fda3..c76469b39ce 100644 --- a/arrow-flight/src/arrow.flight.protocol.rs +++ b/arrow-flight/src/arrow.flight.protocol.rs @@ -229,7 +229,7 @@ pub mod flight_service_client { where T: tonic::client::GrpcService, T::Error: Into, - T::ResponseBody: Default + Body + Send + 'static, + T::ResponseBody: Body + Send + 'static, ::Error: Into + Send, { pub fn new(inner: T) -> Self { @@ -242,6 +242,7 @@ pub mod flight_service_client { ) -> FlightServiceClient> where F: tonic::service::Interceptor, + T::ResponseBody: Default, T: tonic::codegen::Service< http::Request, Response = http::Response< @@ -278,9 +279,9 @@ pub mod flight_service_client { &mut self, request: impl tonic::IntoStreamingRequest, ) -> Result< - tonic::Response>, - tonic::Status, - > { + tonic::Response>, + tonic::Status, + > { self.inner .ready() .await @@ -307,9 +308,9 @@ pub mod flight_service_client { &mut self, request: impl tonic::IntoRequest, ) -> Result< - tonic::Response>, - tonic::Status, - > { + tonic::Response>, + tonic::Status, + > { self.inner .ready() .await @@ -388,9 +389,9 @@ pub mod flight_service_client { &mut self, request: impl tonic::IntoRequest, ) -> Result< - tonic::Response>, - tonic::Status, - > { + tonic::Response>, + tonic::Status, + > { self.inner .ready() .await @@ -417,9 +418,9 @@ pub mod flight_service_client { &mut self, request: impl tonic::IntoStreamingRequest, ) -> Result< - tonic::Response>, - tonic::Status, - > { + tonic::Response>, + tonic::Status, + > { self.inner .ready() .await @@ -445,9 +446,9 @@ pub mod flight_service_client { &mut self, request: impl tonic::IntoStreamingRequest, ) -> Result< - tonic::Response>, - tonic::Status, - > { + tonic::Response>, + tonic::Status, + > { self.inner .ready() .await @@ -474,9 +475,9 @@ pub mod flight_service_client { &mut self, request: impl tonic::IntoRequest, ) -> Result< - tonic::Response>, - tonic::Status, - > { + tonic::Response>, + tonic::Status, + > { self.inner .ready() .await @@ -500,9 +501,9 @@ pub mod flight_service_client { &mut self, request: impl tonic::IntoRequest, ) -> Result< - tonic::Response>, - tonic::Status, - > { + tonic::Response>, + tonic::Status, + > { self.inner .ready() .await diff --git a/arrow-pyarrow-integration-testing/src/lib.rs b/arrow-pyarrow-integration-testing/src/lib.rs index 26c09d64d5d..086b2183465 100644 --- a/arrow-pyarrow-integration-testing/src/lib.rs +++ b/arrow-pyarrow-integration-testing/src/lib.rs @@ -27,6 +27,7 @@ use arrow::array::{ArrayData, ArrayRef, Int64Array}; use arrow::compute::kernels; use arrow::datatypes::{DataType, Field, Schema}; use arrow::error::ArrowError; +use arrow::ffi_stream::ArrowArrayStreamReader; use arrow::pyarrow::PyArrowConvert; use arrow::record_batch::RecordBatch; @@ -111,6 +112,13 @@ fn round_trip_record_batch(obj: RecordBatch) -> PyResult { Ok(obj) } +#[pyfunction] +fn round_trip_record_batch_reader( + obj: ArrowArrayStreamReader, +) -> PyResult { + Ok(obj) +} + #[pymodule] fn arrow_pyarrow_integration_testing(_py: Python, m: &PyModule) -> PyResult<()> { m.add_wrapped(wrap_pyfunction!(double))?; @@ -122,5 +130,6 @@ fn arrow_pyarrow_integration_testing(_py: Python, m: &PyModule) -> PyResult<()> m.add_wrapped(wrap_pyfunction!(round_trip_schema))?; m.add_wrapped(wrap_pyfunction!(round_trip_array))?; m.add_wrapped(wrap_pyfunction!(round_trip_record_batch))?; + m.add_wrapped(wrap_pyfunction!(round_trip_record_batch_reader))?; Ok(()) } diff --git a/arrow-pyarrow-integration-testing/tests/test_sql.py b/arrow-pyarrow-integration-testing/tests/test_sql.py index 324956c9c6a..a17ba6d0613 100644 --- a/arrow-pyarrow-integration-testing/tests/test_sql.py +++ b/arrow-pyarrow-integration-testing/tests/test_sql.py @@ -303,3 +303,19 @@ def test_dictionary_python(): assert a == b del a del b + +def test_record_batch_reader(): + """ + Python -> Rust -> Python + """ + schema = pa.schema([('ints', pa.list_(pa.int32()))], metadata={b'key1': b'value1'}) + batches = [ + pa.record_batch([[[1], [2, 42]]], schema), + pa.record_batch([[None, [], [5, 6]]], schema), + ] + a = pa.RecordBatchReader.from_batches(schema, batches) + b = rust.round_trip_record_batch_reader(a) + + assert b.schema == schema + got_batches = list(b) + assert got_batches == batches diff --git a/arrow/Cargo.toml b/arrow/Cargo.toml index ebcdd9e7a85..a4c5599eb99 100644 --- a/arrow/Cargo.toml +++ b/arrow/Cargo.toml @@ -46,7 +46,7 @@ rand = { version = "0.8", optional = true } num = "0.4" half = "1.8" csv_crate = { version = "1.1", optional = true, package="csv" } -regex = "1.3" +regex = "1.5.6" lazy_static = "1.4" packed_simd = { version = "0.3", optional = true, package = "packed_simd_2" } chrono = { version = "0.4", default-features = false, features = ["clock"] } @@ -61,7 +61,6 @@ bitflags = "1.2.1" [features] default = ["csv", "ipc", "test_utils"] -avx512 = [] csv = ["csv_crate"] ipc = ["flatbuffers"] simd = ["packed_simd"] diff --git a/arrow/README.md b/arrow/README.md index 67de57ff0f6..28240e77dff 100644 --- a/arrow/README.md +++ b/arrow/README.md @@ -100,3 +100,17 @@ cargo run --example read_csv ``` [arrow]: https://arrow.apache.org/ + + +## Performance + +Most of the compute kernels benefit a lot from being optimized for a specific CPU target. +This is especially so on x86-64 since without specifying a target the compiler can only assume support for SSE2 vector instructions. +One of the following values as `-Ctarget-cpu=value` in `RUSTFLAGS` can therefore improve performance significantly: + + - `native`: Target the exact features of the cpu that the build is running on. + This should give the best performance when building and running locally, but should be used carefully for example when building in a CI pipeline or when shipping pre-compiled software. + - `x86-64-v3`: Includes AVX2 support and is close to the intel `haswell` architecture released in 2013 and should be supported by any recent Intel or Amd cpu. + - `x86-64-v4`: Includes AVX512 support available on intel `skylake` server and `icelake`/`tigerlake`/`rocketlake` laptop and desktop processors. + +These flags should be used in addition to the `simd` feature, since they will also affect the code generated by the simd library. \ No newline at end of file diff --git a/arrow/benches/buffer_bit_ops.rs b/arrow/benches/buffer_bit_ops.rs index 063f39c9272..6c6bb0463b2 100644 --- a/arrow/benches/buffer_bit_ops.rs +++ b/arrow/benches/buffer_bit_ops.rs @@ -17,11 +17,14 @@ #[macro_use] extern crate criterion; -use criterion::Criterion; + +use criterion::{Criterion, Throughput}; extern crate arrow; -use arrow::buffer::{Buffer, MutableBuffer}; +use arrow::buffer::{ + buffer_bin_and, buffer_bin_or, buffer_unary_not, Buffer, MutableBuffer, +}; /// Helper function to create arrays fn create_buffer(size: usize) -> Buffer { @@ -42,17 +45,59 @@ fn bench_buffer_or(left: &Buffer, right: &Buffer) { criterion::black_box((left | right).unwrap()); } +fn bench_buffer_not(buffer: &Buffer) { + criterion::black_box(!buffer); +} + +fn bench_buffer_and_with_offsets( + left: &Buffer, + left_offset: usize, + right: &Buffer, + right_offset: usize, + len: usize, +) { + criterion::black_box(buffer_bin_and(left, left_offset, right, right_offset, len)); +} + +fn bench_buffer_or_with_offsets( + left: &Buffer, + left_offset: usize, + right: &Buffer, + right_offset: usize, + len: usize, +) { + criterion::black_box(buffer_bin_or(left, left_offset, right, right_offset, len)); +} + +fn bench_buffer_not_with_offsets(buffer: &Buffer, offset: usize, len: usize) { + criterion::black_box(buffer_unary_not(buffer, offset, len)); +} + fn bit_ops_benchmark(c: &mut Criterion) { let left = create_buffer(512 * 10); let right = create_buffer(512 * 10); - c.bench_function("buffer_bit_ops and", |b| { - b.iter(|| bench_buffer_and(&left, &right)) - }); + c.benchmark_group("buffer_binary_ops") + .throughput(Throughput::Bytes(3 * left.len() as u64)) + .bench_function("and", |b| b.iter(|| bench_buffer_and(&left, &right))) + .bench_function("or", |b| b.iter(|| bench_buffer_or(&left, &right))) + .bench_function("and_with_offset", |b| { + b.iter(|| { + bench_buffer_and_with_offsets(&left, 1, &right, 2, left.len() * 8 - 5) + }) + }) + .bench_function("or_with_offset", |b| { + b.iter(|| { + bench_buffer_or_with_offsets(&left, 1, &right, 2, left.len() * 8 - 5) + }) + }); - c.bench_function("buffer_bit_ops or", |b| { - b.iter(|| bench_buffer_or(&left, &right)) - }); + c.benchmark_group("buffer_unary_ops") + .throughput(Throughput::Bytes(2 * left.len() as u64)) + .bench_function("not", |b| b.iter(|| bench_buffer_not(&left))) + .bench_function("not_with_offset", |b| { + b.iter(|| bench_buffer_not_with_offsets(&left, 1, left.len() * 8 - 5)) + }); } criterion_group!(benches, bit_ops_benchmark); diff --git a/arrow/src/arch/avx512.rs b/arrow/src/arch/avx512.rs deleted file mode 100644 index 264532f3594..00000000000 --- a/arrow/src/arch/avx512.rs +++ /dev/null @@ -1,73 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -pub(crate) const AVX512_U8X64_LANES: usize = 64; - -#[target_feature(enable = "avx512f")] -pub(crate) unsafe fn avx512_bin_and(left: &[u8], right: &[u8], res: &mut [u8]) { - use core::arch::x86_64::{__m512i, _mm512_and_si512, _mm512_loadu_epi64}; - - let l: __m512i = _mm512_loadu_epi64(left.as_ptr() as *const _); - let r: __m512i = _mm512_loadu_epi64(right.as_ptr() as *const _); - let f = _mm512_and_si512(l, r); - let s = &f as *const __m512i as *const u8; - let d = res.get_unchecked_mut(0) as *mut _ as *mut u8; - std::ptr::copy_nonoverlapping(s, d, std::mem::size_of::<__m512i>()); -} - -#[target_feature(enable = "avx512f")] -pub(crate) unsafe fn avx512_bin_or(left: &[u8], right: &[u8], res: &mut [u8]) { - use core::arch::x86_64::{__m512i, _mm512_loadu_epi64, _mm512_or_si512}; - - let l: __m512i = _mm512_loadu_epi64(left.as_ptr() as *const _); - let r: __m512i = _mm512_loadu_epi64(right.as_ptr() as *const _); - let f = _mm512_or_si512(l, r); - let s = &f as *const __m512i as *const u8; - let d = res.get_unchecked_mut(0) as *mut _ as *mut u8; - std::ptr::copy_nonoverlapping(s, d, std::mem::size_of::<__m512i>()); -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_bitwise_and_avx512() { - let buf1 = [0b00110011u8; 64]; - let buf2 = [0b11110000u8; 64]; - let mut buf3 = [0b00000000; 64]; - unsafe { - avx512_bin_and(&buf1, &buf2, &mut buf3); - }; - for i in buf3.iter() { - assert_eq!(&0b00110000u8, i); - } - } - - #[test] - fn test_bitwise_or_avx512() { - let buf1 = [0b00010011u8; 64]; - let buf2 = [0b11100000u8; 64]; - let mut buf3 = [0b00000000; 64]; - unsafe { - avx512_bin_or(&buf1, &buf2, &mut buf3); - }; - for i in buf3.iter() { - assert_eq!(&0b11110011u8, i); - } - } -} diff --git a/arrow/src/array/array.rs b/arrow/src/array/array.rs index f28aba59d73..c566ff99f12 100644 --- a/arrow/src/array/array.rs +++ b/arrow/src/array/array.rs @@ -873,7 +873,9 @@ mod tests { #[test] fn test_memory_size_primitive_nullable() { - let arr: PrimitiveArray = (0..128).map(Some).collect(); + let arr: PrimitiveArray = (0..128) + .map(|i| if i % 20 == 0 { Some(i) } else { None }) + .collect(); let empty_with_bitmap = PrimitiveArray::::from( ArrayData::builder(arr.data_type().clone()) .add_buffer(MutableBuffer::new(0).into()) diff --git a/arrow/src/array/array_binary.rs b/arrow/src/array/array_binary.rs index af55854a531..b1fc06d369a 100644 --- a/arrow/src/array/array_binary.rs +++ b/arrow/src/array/array_binary.rs @@ -700,6 +700,18 @@ impl From for FixedSizeBinaryArray { } } +impl From>> for FixedSizeBinaryArray { + fn from(v: Vec>) -> Self { + Self::try_from_sparse_iter(v.into_iter()).unwrap() + } +} + +impl From> for FixedSizeBinaryArray { + fn from(v: Vec<&[u8]>) -> Self { + Self::try_from_iter(v.into_iter()).unwrap() + } +} + impl fmt::Debug for FixedSizeBinaryArray { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "FixedSizeBinaryArray<{}>\n[\n", self.value_length())?; @@ -1713,6 +1725,64 @@ mod tests { assert_eq!(5, arr.len()) } + #[test] + fn test_fixed_size_binary_array_from_vec() { + let values = vec!["one".as_bytes(), b"two", b"six", b"ten"]; + let array = FixedSizeBinaryArray::from(values); + assert_eq!(array.len(), 4); + assert_eq!(array.null_count(), 0); + assert_eq!(array.value(0), b"one"); + assert_eq!(array.value(1), b"two"); + assert_eq!(array.value(2), b"six"); + assert_eq!(array.value(3), b"ten"); + assert!(!array.is_null(0)); + assert!(!array.is_null(1)); + assert!(!array.is_null(2)); + assert!(!array.is_null(3)); + } + + #[test] + #[should_panic(expected = "Nested array size mismatch: one is 3, and the other is 5")] + fn test_fixed_size_binary_array_from_vec_incorrect_length() { + let values = vec!["one".as_bytes(), b"two", b"three", b"four"]; + let _ = FixedSizeBinaryArray::from(values); + } + + #[test] + fn test_fixed_size_binary_array_from_opt_vec() { + let values = vec![ + Some("one".as_bytes()), + Some(b"two"), + None, + Some(b"six"), + Some(b"ten"), + ]; + let array = FixedSizeBinaryArray::from(values); + assert_eq!(array.len(), 5); + assert_eq!(array.value(0), b"one"); + assert_eq!(array.value(1), b"two"); + assert_eq!(array.value(3), b"six"); + assert_eq!(array.value(4), b"ten"); + assert!(!array.is_null(0)); + assert!(!array.is_null(1)); + assert!(array.is_null(2)); + assert!(!array.is_null(3)); + assert!(!array.is_null(4)); + } + + #[test] + #[should_panic(expected = "Nested array size mismatch: one is 3, and the other is 5")] + fn test_fixed_size_binary_array_from_opt_vec_incorrect_length() { + let values = vec![ + Some("one".as_bytes()), + Some(b"two"), + None, + Some(b"three"), + Some(b"four"), + ]; + let _ = FixedSizeBinaryArray::from(values); + } + #[test] fn test_binary_array_all_null() { let data = vec![None]; diff --git a/arrow/src/array/array_primitive.rs b/arrow/src/array/array_primitive.rs index 8893703aa85..6f496562f89 100644 --- a/arrow/src/array/array_primitive.rs +++ b/arrow/src/array/array_primitive.rs @@ -397,29 +397,35 @@ impl<'a, T: ArrowPrimitiveType, Ptr: Into>> FromIterator let iter = iter.into_iter(); let (lower, _) = iter.size_hint(); - let mut null_buf = BooleanBufferBuilder::new(lower); + let mut null_builder = BooleanBufferBuilder::new(lower); let buffer: Buffer = iter .map(|item| { if let Some(a) = item.into().native { - null_buf.append(true); + null_builder.append(true); a } else { - null_buf.append(false); + null_builder.append(false); // this ensures that null items on the buffer are not arbitrary. - // This is important because falible operations can use null values (e.g. a vectorized "add") + // This is important because fallible operations can use null values (e.g. a vectorized "add") // which may panic (e.g. overflow if the number on the slots happen to be very large). T::Native::default() } }) .collect(); + let len = null_builder.len(); + let null_buf: Buffer = null_builder.into(); + let valid_count = null_buf.count_set_bits(); + let null_count = len - valid_count; + let opt_null_buf = (null_count != 0).then(|| null_buf); + let data = unsafe { ArrayData::new_unchecked( T::DATA_TYPE, - null_buf.len(), - None, - Some(null_buf.into()), + len, + Some(null_count), + opt_null_buf, 0, vec![buffer], vec![], @@ -1025,6 +1031,16 @@ mod tests { assert_eq!(primitive_array.len(), 10); } + #[test] + fn test_primitive_array_from_non_null_iter() { + let iter = (0..10_i32).map(Some); + let primitive_array = PrimitiveArray::::from_iter(iter); + assert_eq!(primitive_array.len(), 10); + assert_eq!(primitive_array.null_count(), 0); + assert_eq!(primitive_array.data().null_buffer(), None); + assert_eq!(primitive_array.values(), &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]) + } + #[test] #[should_panic(expected = "PrimitiveArray data should contain a single buffer only \ (values buffer)")] diff --git a/arrow/src/array/array_union.rs b/arrow/src/array/array_union.rs index 5cfab0bbf85..bae771bb9e7 100644 --- a/arrow/src/array/array_union.rs +++ b/arrow/src/array/array_union.rs @@ -436,6 +436,7 @@ mod tests { } #[test] + #[cfg_attr(miri, ignore)] fn test_dense_i32_large() { let mut builder = UnionBuilder::new_dense(1024); diff --git a/arrow/src/array/ffi.rs b/arrow/src/array/ffi.rs index 57329037bc4..12d6f440b78 100644 --- a/arrow/src/array/ffi.rs +++ b/arrow/src/array/ffi.rs @@ -74,6 +74,11 @@ mod tests { let result = &ArrayData::try_from(d1)?; assert_eq!(result, expected); + + unsafe { + Arc::from_raw(array); + Arc::from_raw(schema); + } Ok(()) } diff --git a/arrow/src/buffer/ops.rs b/arrow/src/buffer/ops.rs index e0086a1a820..b3571d1740b 100644 --- a/arrow/src/buffer/ops.rs +++ b/arrow/src/buffer/ops.rs @@ -15,110 +15,8 @@ // specific language governing permissions and limitations // under the License. -#[cfg(feature = "simd")] -use crate::util::bit_util; -#[cfg(feature = "simd")] -use packed_simd::u8x64; - -#[cfg(feature = "avx512")] -use crate::arch::avx512::*; -use crate::util::bit_util::ceil; -#[cfg(any(feature = "simd", feature = "avx512"))] -use std::borrow::BorrowMut; - use super::{Buffer, MutableBuffer}; - -/// Apply a bitwise operation `simd_op` / `scalar_op` to two inputs using simd instructions and return the result as a Buffer. -/// The `simd_op` functions gets applied on chunks of 64 bytes (512 bits) at a time -/// and the `scalar_op` gets applied to remaining bytes. -/// Contrary to the non-simd version `bitwise_bin_op_helper`, the offset and length is specified in bytes -/// and this version does not support operations starting at arbitrary bit offsets. -#[cfg(feature = "simd")] -pub fn bitwise_bin_op_simd_helper( - left: &Buffer, - left_offset: usize, - right: &Buffer, - right_offset: usize, - len: usize, - simd_op: SI, - scalar_op: SC, -) -> Buffer -where - SI: Fn(u8x64, u8x64) -> u8x64, - SC: Fn(u8, u8) -> u8, -{ - let mut result = MutableBuffer::new(len).with_bitset(len, false); - let lanes = u8x64::lanes(); - - let mut left_chunks = left.as_slice()[left_offset..].chunks_exact(lanes); - let mut right_chunks = right.as_slice()[right_offset..].chunks_exact(lanes); - let mut result_chunks = result.as_slice_mut().chunks_exact_mut(lanes); - - result_chunks - .borrow_mut() - .zip(left_chunks.borrow_mut().zip(right_chunks.borrow_mut())) - .for_each(|(res, (left, right))| { - unsafe { bit_util::bitwise_bin_op_simd(&left, &right, res, &simd_op) }; - }); - - result_chunks - .into_remainder() - .iter_mut() - .zip( - left_chunks - .remainder() - .iter() - .zip(right_chunks.remainder().iter()), - ) - .for_each(|(res, (left, right))| { - *res = scalar_op(*left, *right); - }); - - result.into() -} - -/// Apply a bitwise operation `simd_op` / `scalar_op` to one input using simd instructions and return the result as a Buffer. -/// The `simd_op` functions gets applied on chunks of 64 bytes (512 bits) at a time -/// and the `scalar_op` gets applied to remaining bytes. -/// Contrary to the non-simd version `bitwise_unary_op_helper`, the offset and length is specified in bytes -/// and this version does not support operations starting at arbitrary bit offsets. -#[cfg(feature = "simd")] -pub fn bitwise_unary_op_simd_helper( - left: &Buffer, - left_offset: usize, - len: usize, - simd_op: SI, - scalar_op: SC, -) -> Buffer -where - SI: Fn(u8x64) -> u8x64, - SC: Fn(u8) -> u8, -{ - let mut result = MutableBuffer::new(len).with_bitset(len, false); - let lanes = u8x64::lanes(); - - let mut left_chunks = left.as_slice()[left_offset..].chunks_exact(lanes); - let mut result_chunks = result.as_slice_mut().chunks_exact_mut(lanes); - - result_chunks - .borrow_mut() - .zip(left_chunks.borrow_mut()) - .for_each(|(res, left)| unsafe { - let data_simd = u8x64::from_slice_unaligned_unchecked(left); - let simd_result = simd_op(data_simd); - simd_result.write_to_slice_unaligned_unchecked(res); - }); - - result_chunks - .into_remainder() - .iter_mut() - .zip(left_chunks.remainder().iter()) - .for_each(|(res, left)| { - *res = scalar_op(*left); - }); - - result.into() -} +use crate::util::bit_util::ceil; /// Apply a bitwise operation `op` to two inputs and return the result as a Buffer. /// The inputs are treated as bitmaps, meaning that offsets and length are specified in number of bits. @@ -189,100 +87,6 @@ where result.into() } -#[cfg(all(target_arch = "x86_64", feature = "avx512"))] -pub fn buffer_bin_and( - left: &Buffer, - left_offset_in_bits: usize, - right: &Buffer, - right_offset_in_bits: usize, - len_in_bits: usize, -) -> Buffer { - if left_offset_in_bits % 8 == 0 - && right_offset_in_bits % 8 == 0 - && len_in_bits % 8 == 0 - { - let len = len_in_bits / 8; - let left_offset = left_offset_in_bits / 8; - let right_offset = right_offset_in_bits / 8; - - let mut result = MutableBuffer::new(len).with_bitset(len, false); - - let mut left_chunks = - left.as_slice()[left_offset..].chunks_exact(AVX512_U8X64_LANES); - let mut right_chunks = - right.as_slice()[right_offset..].chunks_exact(AVX512_U8X64_LANES); - let mut result_chunks = - result.as_slice_mut().chunks_exact_mut(AVX512_U8X64_LANES); - - result_chunks - .borrow_mut() - .zip(left_chunks.borrow_mut().zip(right_chunks.borrow_mut())) - .for_each(|(res, (left, right))| unsafe { - avx512_bin_and(left, right, res); - }); - - result_chunks - .into_remainder() - .iter_mut() - .zip( - left_chunks - .remainder() - .iter() - .zip(right_chunks.remainder().iter()), - ) - .for_each(|(res, (left, right))| { - *res = *left & *right; - }); - - result.into() - } else { - bitwise_bin_op_helper( - &left, - left_offset_in_bits, - right, - right_offset_in_bits, - len_in_bits, - |a, b| a & b, - ) - } -} - -#[cfg(all(feature = "simd", not(feature = "avx512")))] -pub fn buffer_bin_and( - left: &Buffer, - left_offset_in_bits: usize, - right: &Buffer, - right_offset_in_bits: usize, - len_in_bits: usize, -) -> Buffer { - if left_offset_in_bits % 8 == 0 - && right_offset_in_bits % 8 == 0 - && len_in_bits % 8 == 0 - { - bitwise_bin_op_simd_helper( - &left, - left_offset_in_bits / 8, - &right, - right_offset_in_bits / 8, - len_in_bits / 8, - |a, b| a & b, - |a, b| a & b, - ) - } else { - bitwise_bin_op_helper( - &left, - left_offset_in_bits, - right, - right_offset_in_bits, - len_in_bits, - |a, b| a & b, - ) - } -} - -// Note: do not target specific features like x86 without considering -// other targets like wasm32, as those would fail to build -#[cfg(all(not(any(feature = "simd", feature = "avx512"))))] pub fn buffer_bin_and( left: &Buffer, left_offset_in_bits: usize, @@ -300,98 +104,6 @@ pub fn buffer_bin_and( ) } -#[cfg(all(target_arch = "x86_64", feature = "avx512"))] -pub fn buffer_bin_or( - left: &Buffer, - left_offset_in_bits: usize, - right: &Buffer, - right_offset_in_bits: usize, - len_in_bits: usize, -) -> Buffer { - if left_offset_in_bits % 8 == 0 - && right_offset_in_bits % 8 == 0 - && len_in_bits % 8 == 0 - { - let len = len_in_bits / 8; - let left_offset = left_offset_in_bits / 8; - let right_offset = right_offset_in_bits / 8; - - let mut result = MutableBuffer::new(len).with_bitset(len, false); - - let mut left_chunks = - left.as_slice()[left_offset..].chunks_exact(AVX512_U8X64_LANES); - let mut right_chunks = - right.as_slice()[right_offset..].chunks_exact(AVX512_U8X64_LANES); - let mut result_chunks = - result.as_slice_mut().chunks_exact_mut(AVX512_U8X64_LANES); - - result_chunks - .borrow_mut() - .zip(left_chunks.borrow_mut().zip(right_chunks.borrow_mut())) - .for_each(|(res, (left, right))| unsafe { - avx512_bin_or(left, right, res); - }); - - result_chunks - .into_remainder() - .iter_mut() - .zip( - left_chunks - .remainder() - .iter() - .zip(right_chunks.remainder().iter()), - ) - .for_each(|(res, (left, right))| { - *res = *left | *right; - }); - - result.into() - } else { - bitwise_bin_op_helper( - &left, - left_offset_in_bits, - right, - right_offset_in_bits, - len_in_bits, - |a, b| a | b, - ) - } -} - -#[cfg(all(feature = "simd", not(feature = "avx512")))] -pub fn buffer_bin_or( - left: &Buffer, - left_offset_in_bits: usize, - right: &Buffer, - right_offset_in_bits: usize, - len_in_bits: usize, -) -> Buffer { - if left_offset_in_bits % 8 == 0 - && right_offset_in_bits % 8 == 0 - && len_in_bits % 8 == 0 - { - bitwise_bin_op_simd_helper( - &left, - left_offset_in_bits / 8, - &right, - right_offset_in_bits / 8, - len_in_bits / 8, - |a, b| a | b, - |a, b| a | b, - ) - } else { - bitwise_bin_op_helper( - &left, - left_offset_in_bits, - right, - right_offset_in_bits, - len_in_bits, - |a, b| a | b, - ) - } -} - -#[cfg(all(not(any(feature = "simd", feature = "avx512"))))] pub fn buffer_bin_or( left: &Buffer, left_offset_in_bits: usize, @@ -414,20 +126,5 @@ pub fn buffer_unary_not( offset_in_bits: usize, len_in_bits: usize, ) -> Buffer { - // SIMD implementation if available and byte-aligned - #[cfg(feature = "simd")] - if offset_in_bits % 8 == 0 && len_in_bits % 8 == 0 { - return bitwise_unary_op_simd_helper( - &left, - offset_in_bits / 8, - len_in_bits / 8, - |a| !a, - |a| !a, - ); - } - // Default implementation - #[allow(unreachable_code)] - { - bitwise_unary_op_helper(left, offset_in_bits, len_in_bits, |a| !a) - } + bitwise_unary_op_helper(left, offset_in_bits, len_in_bits, |a| !a) } diff --git a/arrow/src/compute/kernels/filter.rs b/arrow/src/compute/kernels/filter.rs index b5962511520..fabc4113d8f 100644 --- a/arrow/src/compute/kernels/filter.rs +++ b/arrow/src/compute/kernels/filter.rs @@ -1332,6 +1332,7 @@ mod tests { } #[test] + #[cfg_attr(miri, ignore)] fn fuzz_test_slices_iterator() { let mut rng = thread_rng(); @@ -1401,6 +1402,7 @@ mod tests { } #[test] + #[cfg_attr(miri, ignore)] fn fuzz_filter() { let mut rng = thread_rng(); diff --git a/arrow/src/compute/kernels/substring.rs b/arrow/src/compute/kernels/substring.rs index 1954307e9ac..625a37514d1 100644 --- a/arrow/src/compute/kernels/substring.rs +++ b/arrow/src/compute/kernels/substring.rs @@ -182,24 +182,69 @@ pub fn substring_by_char( start: i64, length: Option, ) -> Result> { - Ok(array - .iter() - .map(|val| { - val.map(|val| { - let char_count = val.chars().count(); - let start = if start >= 0 { - start.to_usize().unwrap().min(char_count) - } else { - char_count - (-start).to_usize().unwrap().min(char_count) - }; - let length = length.map_or(char_count - start, |length| { - length.to_usize().unwrap().min(char_count - start) - }); + let mut vals = BufferBuilder::::new({ + let offsets = array.value_offsets(); + (offsets[array.len()] - offsets[0]).to_usize().unwrap() + }); + let mut new_offsets = BufferBuilder::::new(array.len() + 1); + new_offsets.append(OffsetSize::zero()); + let length = length.map(|len| len.to_usize().unwrap()); + + array.iter().for_each(|val| { + if let Some(val) = val { + let char_count = val.chars().count(); + let start = if start >= 0 { + start.to_usize().unwrap() + } else { + char_count - (-start).to_usize().unwrap().min(char_count) + }; + let (start_offset, end_offset) = get_start_end_offset(val, start, length); + vals.append_slice(&val.as_bytes()[start_offset..end_offset]); + } + new_offsets.append(OffsetSize::from_usize(vals.len()).unwrap()); + }); + let data = unsafe { + ArrayData::new_unchecked( + GenericStringArray::::get_data_type(), + array.len(), + None, + array + .data_ref() + .null_buffer() + .map(|b| b.bit_slice(array.offset(), array.len())), + 0, + vec![new_offsets.finish(), vals.finish()], + vec![], + ) + }; + Ok(GenericStringArray::::from(data)) +} - val.chars().skip(start).take(length).collect::() - }) - }) - .collect::>()) +/// * `val` - string +/// * `start` - the start char index of the substring +/// * `length` - the char length of the substring +/// +/// Return the `start` and `end` offset (by byte) of the substring +fn get_start_end_offset( + val: &str, + start: usize, + length: Option, +) -> (usize, usize) { + let len = val.len(); + let mut offset_char_iter = val.char_indices(); + let start_offset = offset_char_iter + .nth(start) + .map_or(len, |(offset, _)| offset); + let end_offset = length.map_or(len, |length| { + if length > 0 { + offset_char_iter + .nth(length - 1) + .map_or(len, |(offset, _)| offset) + } else { + start_offset + } + }); + (start_offset, end_offset) } fn binary_substring( diff --git a/arrow/src/compute/kernels/temporal.rs b/arrow/src/compute/kernels/temporal.rs index aa49462da86..2482b7b758b 100644 --- a/arrow/src/compute/kernels/temporal.rs +++ b/arrow/src/compute/kernels/temporal.rs @@ -17,7 +17,7 @@ //! Defines temporal kernels for time and date related functions. -use chrono::{Datelike, Timelike}; +use chrono::{Datelike, NaiveDate, NaiveDateTime, Timelike}; use crate::array::*; use crate::datatypes::*; @@ -112,6 +112,34 @@ macro_rules! return_compute_error_with { }; } +trait ChronoDateQuarter { + /// Returns a value in range `1..=4` indicating the quarter this date falls into + fn quarter(&self) -> u32; + + /// Returns a value in range `0..=3` indicating the quarter (zero-based) this date falls into + fn quarter0(&self) -> u32; +} + +impl ChronoDateQuarter for NaiveDateTime { + fn quarter(&self) -> u32 { + self.quarter0() + 1 + } + + fn quarter0(&self) -> u32 { + self.month0() / 3 + } +} + +impl ChronoDateQuarter for NaiveDate { + fn quarter(&self) -> u32 { + self.quarter0() + 1 + } + + fn quarter0(&self) -> u32 { + self.month0() / 3 + } +} + #[cfg(not(feature = "chrono-tz"))] pub fn using_chrono_tz_and_utc_naive_date_time( _tz: &str, @@ -183,6 +211,34 @@ where Ok(b.finish()) } +/// Extracts the quarter of a given temporal array as an array of integers +pub fn quarter(array: &PrimitiveArray) -> Result +where + T: ArrowTemporalType + ArrowNumericType, + i64: std::convert::From, +{ + let mut b = Int32Builder::new(array.len()); + match array.data_type() { + &DataType::Date32 | &DataType::Date64 | &DataType::Timestamp(_, None) => { + extract_component_from_array!(array, b, quarter, value_as_datetime) + } + &DataType::Timestamp(_, Some(ref tz)) => { + let mut scratch = Parsed::new(); + extract_component_from_array!( + array, + b, + quarter, + value_as_datetime_with_tz, + tz, + scratch + ) + } + dt => return_compute_error_with!("quarter does not support", dt), + } + + Ok(b.finish()) +} + /// Extracts the month of a given temporal array as an array of integers pub fn month(array: &PrimitiveArray) -> Result where @@ -389,6 +445,48 @@ mod tests { assert_eq!(2012, b.value(2)); } + #[test] + fn test_temporal_array_date64_quarter() { + //1514764800000 -> 2018-01-01 + //1566275025000 -> 2019-08-20 + let a: PrimitiveArray = + vec![Some(1514764800000), None, Some(1566275025000)].into(); + + let b = quarter(&a).unwrap(); + assert_eq!(1, b.value(0)); + assert!(!b.is_valid(1)); + assert_eq!(3, b.value(2)); + } + + #[test] + fn test_temporal_array_date32_quarter() { + let a: PrimitiveArray = vec![Some(1), None, Some(300)].into(); + + let b = quarter(&a).unwrap(); + assert_eq!(1, b.value(0)); + assert!(!b.is_valid(1)); + assert_eq!(4, b.value(2)); + } + + #[test] + fn test_temporal_array_timestamp_quarter_with_timezone() { + use std::sync::Arc; + + // 24 * 60 * 60 = 86400 + let a = Arc::new(TimestampSecondArray::from_vec( + vec![86400 * 90], + Some("+00:00".to_string()), + )); + let b = quarter(&a).unwrap(); + assert_eq!(2, b.value(0)); + let a = Arc::new(TimestampSecondArray::from_vec( + vec![86400 * 90], + Some("-10:00".to_string()), + )); + let b = quarter(&a).unwrap(); + assert_eq!(1, b.value(0)); + } + #[test] fn test_temporal_array_date64_month() { //1514764800000 -> 2018-01-01 @@ -416,7 +514,7 @@ mod tests { fn test_temporal_array_timestamp_month_with_timezone() { use std::sync::Arc; - // 24 * 60 * 60 = 8640 + // 24 * 60 * 60 = 86400 let a = Arc::new(TimestampSecondArray::from_vec( vec![86400 * 31], Some("+00:00".to_string()), @@ -435,7 +533,7 @@ mod tests { fn test_temporal_array_timestamp_day_with_timezone() { use std::sync::Arc; - // 24 * 60 * 60 = 8640 + // 24 * 60 * 60 = 86400 let a = Arc::new(TimestampSecondArray::from_vec( vec![86400], Some("+00:00".to_string()), diff --git a/arrow/src/ffi.rs b/arrow/src/ffi.rs index 8eb28837ce5..84905af20a6 100644 --- a/arrow/src/ffi.rs +++ b/arrow/src/ffi.rs @@ -79,6 +79,8 @@ //! unsafe { //! Box::from_raw(out_array_ptr); //! Box::from_raw(out_schema_ptr); +//! Arc::from_raw(array_ptr); +//! Arc::from_raw(schema_ptr); //! } //! //! Ok(()) diff --git a/arrow/src/ffi_stream.rs b/arrow/src/ffi_stream.rs index ab4caea36f8..c06b5233344 100644 --- a/arrow/src/ffi_stream.rs +++ b/arrow/src/ffi_stream.rs @@ -198,13 +198,6 @@ impl ExportedArrayStream { } pub fn get_schema(&mut self, out: *mut FFI_ArrowSchema) -> i32 { - unsafe { - match (*out).release { - None => (), - Some(release) => release(out), - }; - }; - let mut private_data = self.get_private_data(); let reader = &private_data.batch_reader; @@ -224,18 +217,17 @@ impl ExportedArrayStream { } pub fn get_next(&mut self, out: *mut FFI_ArrowArray) -> i32 { - unsafe { - match (*out).release { - None => (), - Some(release) => release(out), - }; - }; - let mut private_data = self.get_private_data(); let reader = &mut private_data.batch_reader; let ret_code = match reader.next() { - None => 0, + None => { + // Marks ArrowArray released to indicate reaching the end of stream. + unsafe { + (*out).release = None; + } + 0 + } Some(next_batch) => { if let Ok(batch) = next_batch { let struct_array = StructArray::from(batch); @@ -275,7 +267,7 @@ fn get_error_code(err: &ArrowError) -> i32 { /// Struct used to fetch `RecordBatch` from the C Stream Interface. /// Its main responsibility is to expose `RecordBatchReader` functionality /// that requires [FFI_ArrowArrayStream]. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ArrowArrayStreamReader { stream: Arc, schema: SchemaRef, @@ -508,6 +500,8 @@ mod tests { } assert_eq!(produced_batches, vec![batch.clone(), batch]); + + unsafe { Arc::from_raw(stream_ptr) }; Ok(()) } @@ -537,6 +531,8 @@ mod tests { } assert_eq!(produced_batches, vec![batch.clone(), batch]); + + unsafe { Arc::from_raw(stream_ptr) }; Ok(()) } diff --git a/arrow/src/lib.rs b/arrow/src/lib.rs index 0cb77a36090..95c69ca0be6 100644 --- a/arrow/src/lib.rs +++ b/arrow/src/lib.rs @@ -225,14 +225,10 @@ //! [issue tracker]: https://github.com/apache/arrow-rs/issues //! -#![cfg_attr(feature = "avx512", feature(stdsimd))] -#![cfg_attr(feature = "avx512", feature(repr_simd))] -#![cfg_attr(feature = "avx512", feature(avx512_target_feature))] #![deny(clippy::redundant_clone)] #![warn(missing_debug_implementations)] pub mod alloc; -mod arch; pub mod array; pub mod bitmap; pub mod buffer; diff --git a/arrow/src/pyarrow.rs b/arrow/src/pyarrow.rs index 62e6316b621..3ae5b3b9987 100644 --- a/arrow/src/pyarrow.rs +++ b/arrow/src/pyarrow.rs @@ -24,13 +24,16 @@ use std::sync::Arc; use pyo3::ffi::Py_uintptr_t; use pyo3::import_exception; use pyo3::prelude::*; -use pyo3::types::PyList; +use pyo3::types::{PyList, PyTuple}; use crate::array::{Array, ArrayData, ArrayRef}; use crate::datatypes::{DataType, Field, Schema}; use crate::error::ArrowError; use crate::ffi; use crate::ffi::FFI_ArrowSchema; +use crate::ffi_stream::{ + export_reader_into_raw, ArrowArrayStreamReader, FFI_ArrowArrayStream, +}; use crate::record_batch::RecordBatch; import_exception!(pyarrow, ArrowException); @@ -198,6 +201,42 @@ impl PyArrowConvert for RecordBatch { } } +impl PyArrowConvert for ArrowArrayStreamReader { + fn from_pyarrow(value: &PyAny) -> PyResult { + // prepare a pointer to receive the stream struct + let stream = Box::new(FFI_ArrowArrayStream::empty()); + let stream_ptr = Box::into_raw(stream) as *mut FFI_ArrowArrayStream; + + // make the conversion through PyArrow's private API + // this changes the pointer's memory and is thus unsafe. + // In particular, `_export_to_c` can go out of bounds + let args = PyTuple::new(value.py(), &[stream_ptr as Py_uintptr_t]); + value.call_method1("_export_to_c", args)?; + + let stream_reader = + unsafe { ArrowArrayStreamReader::from_raw(stream_ptr).unwrap() }; + + unsafe { + Box::from_raw(stream_ptr); + } + + Ok(stream_reader) + } + + fn to_pyarrow(&self, py: Python) -> PyResult { + let stream = Box::new(FFI_ArrowArrayStream::empty()); + let stream_ptr = Box::into_raw(stream) as *mut FFI_ArrowArrayStream; + + unsafe { export_reader_into_raw(Box::new(self.clone()), stream_ptr) }; + + let module = py.import("pyarrow")?; + let class = module.getattr("RecordBatchReader")?; + let args = PyTuple::new(py, &[stream_ptr as Py_uintptr_t]); + let reader = class.call_method1("_import_from_c", args)?; + Ok(PyObject::from(reader)) + } +} + macro_rules! add_conversion { ($typ:ty) => { impl<'source> FromPyObject<'source> for $typ { @@ -219,3 +258,4 @@ add_conversion!(Field); add_conversion!(Schema); add_conversion!(ArrayData); add_conversion!(RecordBatch); +add_conversion!(ArrowArrayStreamReader); diff --git a/arrow/src/util/bit_chunk_iterator.rs b/arrow/src/util/bit_chunk_iterator.rs index db5aca2a1b3..8730d5bd263 100644 --- a/arrow/src/util/bit_chunk_iterator.rs +++ b/arrow/src/util/bit_chunk_iterator.rs @@ -611,6 +611,7 @@ mod tests { } #[test] + #[cfg_attr(miri, ignore)] fn fuzz_unaligned_bit_chunk_iterator() { let mut rng = thread_rng(); diff --git a/integration-testing/Cargo.toml b/integration-testing/Cargo.toml index 253c9a25e73..57b5211129f 100644 --- a/integration-testing/Cargo.toml +++ b/integration-testing/Cargo.toml @@ -34,7 +34,7 @@ logging = ["tracing-subscriber"] arrow = { path = "../arrow" } arrow-flight = { path = "../arrow-flight" } async-trait = "0.1.41" -clap = { version = "3", features = ["derive", "env"] } +clap = { version = "~3.1", features = ["derive", "env"] } futures = "0.3" hex = "0.4" prost = "0.10" diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 66e2797b28e..9a979343531 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -44,7 +44,7 @@ num = "0.4" num-bigint = "0.4" arrow = { path = "../arrow", version = "16.0.0", optional = true, default-features = false, features = ["ipc"] } base64 = { version = "0.13", optional = true } -clap = { version = "3", optional = true, features = ["derive", "env"] } +clap = { version = "~3.1", optional = true, features = ["derive", "env"] } serde_json = { version = "1.0", optional = true } rand = "0.8" futures = { version = "0.3", optional = true } diff --git a/parquet/benches/arrow_reader.rs b/parquet/benches/arrow_reader.rs index 04e48baef70..647a8dc6f39 100644 --- a/parquet/benches/arrow_reader.rs +++ b/parquet/benches/arrow_reader.rs @@ -355,27 +355,6 @@ fn create_string_byte_array_dictionary_reader( .unwrap() } -fn create_complex_object_byte_array_dictionary_reader( - page_iterator: impl PageIterator + 'static, - column_desc: ColumnDescPtr, -) -> Box { - use parquet::arrow::array_reader::ComplexObjectArrayReader; - use parquet::arrow::converter::{Utf8ArrayConverter, Utf8Converter}; - let arrow_type = - DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)); - - let converter = Utf8Converter::new(Utf8ArrayConverter {}); - Box::new( - ComplexObjectArrayReader::::new( - Box::new(page_iterator), - column_desc, - converter, - Some(arrow_type), - ) - .unwrap(), - ) -} - fn bench_primitive( group: &mut BenchmarkGroup, schema: &SchemaDescPtr, @@ -678,18 +657,7 @@ fn add_benches(c: &mut Criterion) { let mut group = c.benchmark_group("arrow_array_reader/StringDictionary"); - group.bench_function("dictionary encoded, mandatory, no NULLs - old", |b| { - b.iter(|| { - let array_reader = create_complex_object_byte_array_dictionary_reader( - dictionary_string_no_null_data.clone(), - mandatory_string_column_desc.clone(), - ); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }); - - group.bench_function("dictionary encoded, mandatory, no NULLs - new", |b| { + group.bench_function("dictionary encoded, mandatory, no NULLs", |b| { b.iter(|| { let array_reader = create_string_byte_array_dictionary_reader( dictionary_string_no_null_data.clone(), @@ -700,18 +668,7 @@ fn add_benches(c: &mut Criterion) { assert_eq!(count, EXPECTED_VALUE_COUNT); }); - group.bench_function("dictionary encoded, optional, no NULLs - old", |b| { - b.iter(|| { - let array_reader = create_complex_object_byte_array_dictionary_reader( - dictionary_string_no_null_data.clone(), - optional_string_column_desc.clone(), - ); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }); - - group.bench_function("dictionary encoded, optional, no NULLs - new", |b| { + group.bench_function("dictionary encoded, optional, no NULLs", |b| { b.iter(|| { let array_reader = create_string_byte_array_dictionary_reader( dictionary_string_no_null_data.clone(), @@ -722,18 +679,7 @@ fn add_benches(c: &mut Criterion) { assert_eq!(count, EXPECTED_VALUE_COUNT); }); - group.bench_function("dictionary encoded, optional, half NULLs - old", |b| { - b.iter(|| { - let array_reader = create_complex_object_byte_array_dictionary_reader( - dictionary_string_half_null_data.clone(), - optional_string_column_desc.clone(), - ); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }); - - group.bench_function("dictionary encoded, optional, half NULLs - new", |b| { + group.bench_function("dictionary encoded, optional, half NULLs", |b| { b.iter(|| { let array_reader = create_string_byte_array_dictionary_reader( dictionary_string_half_null_data.clone(), diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs index 7b9adfc23f2..e8c22f95aa0 100644 --- a/parquet/src/arrow/array_reader/builder.rs +++ b/parquet/src/arrow/array_reader/builder.rs @@ -25,7 +25,7 @@ use crate::arrow::array_reader::{ ComplexObjectArrayReader, ListArrayReader, MapArrayReader, NullArrayReader, PrimitiveArrayReader, RowGroupCollection, StructArrayReader, }; -use crate::arrow::converter::{ +use crate::arrow::buffer::converter::{ DecimalArrayConverter, DecimalConverter, FixedLenBinaryConverter, FixedSizeArrayConverter, Int96ArrayConverter, Int96Converter, IntervalDayTimeArrayConverter, IntervalDayTimeConverter, diff --git a/parquet/src/arrow/array_reader/byte_array.rs b/parquet/src/arrow/array_reader/byte_array.rs index b3606a7808b..2e29b609474 100644 --- a/parquet/src/arrow/array_reader/byte_array.rs +++ b/parquet/src/arrow/array_reader/byte_array.rs @@ -15,8 +15,8 @@ // specific language governing permissions and limitations // under the License. -use crate::arrow::array_reader::offset_buffer::OffsetBuffer; use crate::arrow::array_reader::{read_records, ArrayReader}; +use crate::arrow::buffer::offset_buffer::OffsetBuffer; use crate::arrow::record_reader::buffer::ScalarValue; use crate::arrow::record_reader::GenericRecordReader; use crate::arrow::schema::parquet_to_arrow_field; diff --git a/parquet/src/arrow/array_reader/byte_array_dictionary.rs b/parquet/src/arrow/array_reader/byte_array_dictionary.rs index fe8448ffb31..0e64f0d25b7 100644 --- a/parquet/src/arrow/array_reader/byte_array_dictionary.rs +++ b/parquet/src/arrow/array_reader/byte_array_dictionary.rs @@ -24,12 +24,11 @@ use arrow::array::{Array, ArrayRef, OffsetSizeTrait}; use arrow::buffer::Buffer; use arrow::datatypes::{ArrowNativeType, DataType as ArrowType}; -use crate::arrow::array_reader::dictionary_buffer::DictionaryBuffer; -use crate::arrow::array_reader::{ - byte_array::{ByteArrayDecoder, ByteArrayDecoderPlain}, - offset_buffer::OffsetBuffer, -}; +use crate::arrow::array_reader::byte_array::{ByteArrayDecoder, ByteArrayDecoderPlain}; use crate::arrow::array_reader::{read_records, ArrayReader}; +use crate::arrow::buffer::{ + dictionary_buffer::DictionaryBuffer, offset_buffer::OffsetBuffer, +}; use crate::arrow::record_reader::buffer::{BufferQueue, ScalarValue}; use crate::arrow::record_reader::GenericRecordReader; use crate::arrow::schema::parquet_to_arrow_field; @@ -236,13 +235,13 @@ where fn new(col: &ColumnDescPtr) -> Self { let validate_utf8 = col.converted_type() == ConvertedType::UTF8; - let value_type = - match (V::IS_LARGE, col.converted_type() == ConvertedType::UTF8) { - (true, true) => ArrowType::LargeUtf8, - (true, false) => ArrowType::LargeBinary, - (false, true) => ArrowType::Utf8, - (false, false) => ArrowType::Binary, - }; + let value_type = match (V::IS_LARGE, col.converted_type() == ConvertedType::UTF8) + { + (true, true) => ArrowType::LargeUtf8, + (true, false) => ArrowType::LargeBinary, + (false, true) => ArrowType::Utf8, + (false, false) => ArrowType::Binary, + }; Self { dict: None, diff --git a/parquet/src/arrow/array_reader.rs b/parquet/src/arrow/array_reader/mod.rs similarity index 99% rename from parquet/src/arrow/array_reader.rs rename to parquet/src/arrow/array_reader/mod.rs index c70071dacf3..21c49b33878 100644 --- a/parquet/src/arrow/array_reader.rs +++ b/parquet/src/arrow/array_reader/mod.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +//! Logic for reading into arrow arrays + use std::any::Any; use std::cmp::max; use std::marker::PhantomData; @@ -34,7 +36,7 @@ use arrow::datatypes::{ UInt32Type as ArrowUInt32Type, UInt64Type as ArrowUInt64Type, }; -use crate::arrow::converter::Converter; +use crate::arrow::buffer::converter::Converter; use crate::arrow::record_reader::buffer::{ScalarValue, ValuesBuffer}; use crate::arrow::record_reader::{GenericRecordReader, RecordReader}; use crate::arrow::schema::parquet_to_arrow_field; @@ -50,11 +52,9 @@ use crate::schema::types::{ColumnDescPtr, SchemaDescPtr}; mod builder; mod byte_array; mod byte_array_dictionary; -mod dictionary_buffer; mod empty_array; mod list_array; mod map_array; -mod offset_buffer; #[cfg(test)] mod test_util; @@ -811,7 +811,7 @@ mod tests { TimestampMillisecondType as ArrowTimestampMillisecondType, }; - use crate::arrow::converter::{Utf8ArrayConverter, Utf8Converter}; + use crate::arrow::buffer::converter::{Utf8ArrayConverter, Utf8Converter}; use crate::basic::{Encoding, Type as PhysicalType}; use crate::column::page::Page; use crate::data_type::{ByteArray, ByteArrayType, DataType, Int32Type, Int64Type}; @@ -1384,8 +1384,7 @@ mod tests { let mut all_values = Vec::with_capacity(num_pages * values_per_page); for i in 0..num_pages { - let mut dict_encoder = - DictEncoder::::new(column_desc.clone()); + let mut dict_encoder = DictEncoder::::new(column_desc.clone()); // add data page let mut values = Vec::with_capacity(values_per_page); diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs index 1d56960cf16..92d4ff264c1 100644 --- a/parquet/src/arrow/arrow_reader.rs +++ b/parquet/src/arrow/arrow_reader.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! Contains reader which reads parquet data into arrow array. +//! Contains reader which reads parquet data into arrow [`RecordBatch`] use std::sync::Arc; @@ -294,7 +294,7 @@ mod tests { use crate::arrow::arrow_reader::{ ArrowReader, ArrowReaderOptions, ParquetFileArrowReader, }; - use crate::arrow::converter::{ + use crate::arrow::buffer::converter::{ BinaryArrayConverter, Converter, FixedSizeArrayConverter, FromConverter, IntervalDayTimeArrayConverter, LargeUtf8ArrayConverter, Utf8ArrayConverter, }; diff --git a/parquet/src/arrow/levels.rs b/parquet/src/arrow/arrow_writer/levels.rs similarity index 100% rename from parquet/src/arrow/levels.rs rename to parquet/src/arrow/arrow_writer/levels.rs diff --git a/parquet/src/arrow/arrow_writer.rs b/parquet/src/arrow/arrow_writer/mod.rs similarity index 99% rename from parquet/src/arrow/arrow_writer.rs rename to parquet/src/arrow/arrow_writer/mod.rs index 334c7237d70..44631e57409 100644 --- a/parquet/src/arrow/arrow_writer.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -27,19 +27,20 @@ use arrow::datatypes::{DataType as ArrowDataType, IntervalUnit, SchemaRef}; use arrow::record_batch::RecordBatch; use arrow_array::Array; -use super::levels::LevelInfo; use super::schema::{ add_encoded_arrow_schema_to_metadata, arrow_to_parquet_schema, decimal_length_from_precision, }; -use crate::arrow::levels::calculate_array_levels; use crate::column::writer::ColumnWriter; use crate::errors::{ParquetError, Result}; use crate::file::metadata::RowGroupMetaDataPtr; use crate::file::properties::WriterProperties; use crate::file::writer::{SerializedColumnWriter, SerializedRowGroupWriter}; use crate::{data_type::*, file::writer::SerializedFileWriter}; +use levels::{calculate_array_levels, LevelInfo}; + +mod levels; /// Arrow writer /// diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs index 541c9810976..3f14114e3c6 100644 --- a/parquet/src/arrow/async_reader.rs +++ b/parquet/src/arrow/async_reader.rs @@ -77,7 +77,7 @@ use std::collections::VecDeque; use std::fmt::Formatter; -use std::io::SeekFrom; +use std::io::{Cursor, SeekFrom}; use std::ops::Range; use std::pin::Pin; use std::sync::Arc; @@ -86,6 +86,7 @@ use std::task::{Context, Poll}; use bytes::{Buf, Bytes}; use futures::future::{BoxFuture, FutureExt}; use futures::stream::Stream; +use parquet_format::PageType; use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; use arrow::datatypes::SchemaRef; @@ -96,11 +97,13 @@ use crate::arrow::arrow_reader::ParquetRecordBatchReader; use crate::arrow::schema::parquet_to_arrow_schema; use crate::arrow::ProjectionMask; use crate::basic::Compression; -use crate::column::page::{PageIterator, PageReader}; +use crate::column::page::{Page, PageIterator, PageReader}; +use crate::compression::{create_codec, Codec}; use crate::errors::{ParquetError, Result}; use crate::file::footer::{decode_footer, decode_metadata}; use crate::file::metadata::ParquetMetaData; use crate::file::reader::SerializedPageReader; +use crate::file::serialized_reader::{decode_page, read_page_header}; use crate::file::FOOTER_SIZE; use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor}; @@ -433,6 +436,7 @@ where } } +/// An in-memory collection of column chunks struct InMemoryRowGroup { schema: SchemaDescPtr, column_chunks: Vec>, @@ -459,6 +463,7 @@ impl RowGroupCollection for InMemoryRowGroup { } } +/// Data for a single column chunk #[derive(Clone)] struct InMemoryColumnChunk { num_values: i64, @@ -480,6 +485,82 @@ impl InMemoryColumnChunk { } } +// A serialized implementation for Parquet [`PageReader`]. +struct InMemoryColumnChunkReader { + chunk: InMemoryColumnChunk, + decompressor: Option>, + offset: usize, + seen_num_values: i64, +} + +impl InMemoryColumnChunkReader { + /// Creates a new serialized page reader from file source. + pub fn new(chunk: InMemoryColumnChunk) -> Result { + let decompressor = create_codec(chunk.compression)?; + let result = Self { + chunk, + decompressor, + offset: 0, + seen_num_values: 0, + }; + Ok(result) + } +} + +impl Iterator for InMemoryColumnChunkReader { + type Item = Result; + + fn next(&mut self) -> Option { + self.get_next_page().transpose() + } +} + +impl PageReader for InMemoryColumnChunkReader { + fn get_next_page(&mut self) -> Result> { + while self.seen_num_values < self.chunk.num_values { + let mut cursor = Cursor::new(&self.chunk.data.as_ref()[self.offset..]); + let page_header = read_page_header(&mut cursor)?; + let compressed_size = page_header.compressed_page_size as usize; + + self.offset += cursor.position() as usize; + let start_offset = self.offset; + let end_offset = self.offset + compressed_size; + self.offset = end_offset; + + let buffer = self.chunk.data.slice(start_offset..end_offset); + + let result = match page_header.type_ { + PageType::DataPage | PageType::DataPageV2 => { + let decoded = decode_page( + page_header, + buffer.into(), + self.chunk.physical_type, + self.decompressor.as_mut(), + )?; + self.seen_num_values += decoded.num_values() as i64; + decoded + } + PageType::DictionaryPage => decode_page( + page_header, + buffer.into(), + self.chunk.physical_type, + self.decompressor.as_mut(), + )?, + _ => { + // For unknown page type (e.g., INDEX_PAGE), skip and read next. + continue; + } + }; + + return Ok(Some(result)); + } + + // We are at the end of this column chunk and no more page left. Return None. + Ok(None) + } +} + +/// Implements [`PageIterator`] for a single column chunk, yielding a single [`PageReader`] struct ColumnChunkIterator { schema: SchemaDescPtr, column_schema: ColumnDescPtr, diff --git a/parquet/src/arrow/bit_util.rs b/parquet/src/arrow/buffer/bit_util.rs similarity index 100% rename from parquet/src/arrow/bit_util.rs rename to parquet/src/arrow/buffer/bit_util.rs diff --git a/parquet/src/arrow/converter.rs b/parquet/src/arrow/buffer/converter.rs similarity index 100% rename from parquet/src/arrow/converter.rs rename to parquet/src/arrow/buffer/converter.rs diff --git a/parquet/src/arrow/array_reader/dictionary_buffer.rs b/parquet/src/arrow/buffer/dictionary_buffer.rs similarity index 99% rename from parquet/src/arrow/array_reader/dictionary_buffer.rs rename to parquet/src/arrow/buffer/dictionary_buffer.rs index 6dc9cc80f39..7f445850700 100644 --- a/parquet/src/arrow/array_reader/dictionary_buffer.rs +++ b/parquet/src/arrow/buffer/dictionary_buffer.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::arrow::array_reader::offset_buffer::OffsetBuffer; +use crate::arrow::buffer::offset_buffer::OffsetBuffer; use crate::arrow::record_reader::buffer::{ BufferQueue, ScalarBuffer, ScalarValue, ValuesBuffer, }; diff --git a/arrow/src/arch/mod.rs b/parquet/src/arrow/buffer/mod.rs similarity index 79% rename from arrow/src/arch/mod.rs rename to parquet/src/arrow/buffer/mod.rs index 56d8f4c0e2c..5ee89aa1a78 100644 --- a/arrow/src/arch/mod.rs +++ b/parquet/src/arrow/buffer/mod.rs @@ -15,8 +15,9 @@ // specific language governing permissions and limitations // under the License. -/// -/// Arch module contains architecture specific code. -/// Be aware that not all machines have these specific operations available. -#[cfg(all(target_arch = "x86_64", feature = "avx512"))] -pub(crate) mod avx512; +//! Logic for reading data into arrow buffers + +pub mod bit_util; +pub mod converter; +pub mod dictionary_buffer; +pub mod offset_buffer; diff --git a/parquet/src/arrow/array_reader/offset_buffer.rs b/parquet/src/arrow/buffer/offset_buffer.rs similarity index 98% rename from parquet/src/arrow/array_reader/offset_buffer.rs rename to parquet/src/arrow/buffer/offset_buffer.rs index 23e7af7595c..2d73e3f146b 100644 --- a/parquet/src/arrow/array_reader/offset_buffer.rs +++ b/parquet/src/arrow/buffer/offset_buffer.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::arrow::bit_util::iter_set_bits_rev; +use crate::arrow::buffer::bit_util::iter_set_bits_rev; use crate::arrow::record_reader::buffer::{ BufferQueue, ScalarBuffer, ScalarValue, ValuesBuffer, }; @@ -58,7 +58,7 @@ impl OffsetBuffer { /// the start of a UTF-8 codepoint /// /// Note: This does not verify that the entirety of `data` is valid - /// UTF-8. This should be done by calling [`Self::values_as_str`] after + /// UTF-8. This should be done by calling [`Self::check_valid_utf8`] after /// all data has been written pub fn try_push(&mut self, data: &[u8], validate_utf8: bool) -> Result<()> { if validate_utf8 { diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs index c9cc0ff6ce3..3aee7cf42cb 100644 --- a/parquet/src/arrow/mod.rs +++ b/parquet/src/arrow/mod.rs @@ -122,14 +122,12 @@ experimental_mod!(array_reader); pub mod arrow_reader; pub mod arrow_writer; -mod bit_util; +mod buffer; #[cfg(feature = "async")] pub mod async_reader; -experimental_mod!(converter); -pub(in crate::arrow) mod levels; -pub(in crate::arrow) mod record_reader; +mod record_reader; experimental_mod!(schema); pub use self::arrow_reader::ArrowReader; diff --git a/parquet/src/arrow/record_reader/buffer.rs b/parquet/src/arrow/record_reader/buffer.rs index 4fa8213dedc..fa0f919916e 100644 --- a/parquet/src/arrow/record_reader/buffer.rs +++ b/parquet/src/arrow/record_reader/buffer.rs @@ -17,7 +17,7 @@ use std::marker::PhantomData; -use crate::arrow::bit_util::iter_set_bits_rev; +use crate::arrow::buffer::bit_util::iter_set_bits_rev; use arrow::buffer::{Buffer, MutableBuffer}; use arrow::datatypes::ToByteSlice; diff --git a/parquet/src/arrow/record_reader/definition_levels.rs b/parquet/src/arrow/record_reader/definition_levels.rs index 93de4006c10..9cca25c8ae5 100644 --- a/parquet/src/arrow/record_reader/definition_levels.rs +++ b/parquet/src/arrow/record_reader/definition_levels.rs @@ -21,7 +21,7 @@ use arrow::array::BooleanBufferBuilder; use arrow::bitmap::Bitmap; use arrow::buffer::Buffer; -use crate::arrow::bit_util::count_set_bits; +use crate::arrow::buffer::bit_util::count_set_bits; use crate::arrow::record_reader::buffer::BufferQueue; use crate::basic::Encoding; use crate::column::reader::decoder::{ diff --git a/parquet/src/arrow/record_reader.rs b/parquet/src/arrow/record_reader/mod.rs similarity index 100% rename from parquet/src/arrow/record_reader.rs rename to parquet/src/arrow/record_reader/mod.rs diff --git a/parquet/src/bin/parquet-fromcsv-help.txt b/parquet/src/bin/parquet-fromcsv-help.txt index a087b4fda8e..f4fe704ab26 100644 --- a/parquet/src/bin/parquet-fromcsv-help.txt +++ b/parquet/src/bin/parquet-fromcsv-help.txt @@ -1,4 +1,3 @@ -parquet 15.0.0 Apache Arrow Binary to convert csv to Parquet diff --git a/parquet/src/bin/parquet-fromcsv.rs b/parquet/src/bin/parquet-fromcsv.rs index 03303fb8b43..aa1d50563cd 100644 --- a/parquet/src/bin/parquet-fromcsv.rs +++ b/parquet/src/bin/parquet-fromcsv.rs @@ -403,7 +403,10 @@ mod tests { let mut buffer_vec = Vec::new(); let mut buffer = std::io::Cursor::new(&mut buffer_vec); cmd.write_long_help(&mut buffer).unwrap(); - let actual = String::from_utf8(buffer_vec).unwrap(); + // Remove Parquet version string from the help text + let mut actual = String::from_utf8(buffer_vec).unwrap(); + let pos = actual.find('\n').unwrap() + 1; + actual = actual[pos..].to_string(); assert_eq!( expected, actual, "help text not match. please update to \n---\n{}\n---\n", diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 1194292354c..6ff73e041e8 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -358,6 +358,108 @@ impl<'a, R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<' } } +/// Reads a [`PageHeader`] from the provided [`Read`] +pub(crate) fn read_page_header(input: &mut T) -> Result { + let mut prot = TCompactInputProtocol::new(input); + let page_header = PageHeader::read_from_in_protocol(&mut prot)?; + Ok(page_header) +} + +/// Decodes a [`Page`] from the provided `buffer` +pub(crate) fn decode_page( + page_header: PageHeader, + buffer: ByteBufferPtr, + physical_type: Type, + decompressor: Option<&mut Box>, +) -> Result { + // When processing data page v2, depending on enabled compression for the + // page, we should account for uncompressed data ('offset') of + // repetition and definition levels. + // + // We always use 0 offset for other pages other than v2, `true` flag means + // that compression will be applied if decompressor is defined + let mut offset: usize = 0; + let mut can_decompress = true; + + if let Some(ref header_v2) = page_header.data_page_header_v2 { + offset = (header_v2.definition_levels_byte_length + + header_v2.repetition_levels_byte_length) as usize; + // When is_compressed flag is missing the page is considered compressed + can_decompress = header_v2.is_compressed.unwrap_or(true); + } + + // TODO: page header could be huge because of statistics. We should set a + // maximum page header size and abort if that is exceeded. + let buffer = match decompressor { + Some(decompressor) if can_decompress => { + let uncompressed_size = page_header.uncompressed_page_size as usize; + let mut decompressed = Vec::with_capacity(uncompressed_size); + let compressed = &buffer.as_ref()[offset..]; + decompressed.extend_from_slice(&buffer.as_ref()[..offset]); + decompressor.decompress(compressed, &mut decompressed)?; + + if decompressed.len() != uncompressed_size { + return Err(general_err!( + "Actual decompressed size doesn't match the expected one ({} vs {})", + decompressed.len(), + uncompressed_size + )); + } + + ByteBufferPtr::new(decompressed) + } + _ => buffer, + }; + + let result = match page_header.type_ { + PageType::DictionaryPage => { + assert!(page_header.dictionary_page_header.is_some()); + let dict_header = page_header.dictionary_page_header.as_ref().unwrap(); + let is_sorted = dict_header.is_sorted.unwrap_or(false); + Page::DictionaryPage { + buf: buffer, + num_values: dict_header.num_values as u32, + encoding: Encoding::from(dict_header.encoding), + is_sorted, + } + } + PageType::DataPage => { + assert!(page_header.data_page_header.is_some()); + let header = page_header.data_page_header.unwrap(); + Page::DataPage { + buf: buffer, + num_values: header.num_values as u32, + encoding: Encoding::from(header.encoding), + def_level_encoding: Encoding::from(header.definition_level_encoding), + rep_level_encoding: Encoding::from(header.repetition_level_encoding), + statistics: statistics::from_thrift(physical_type, header.statistics), + } + } + PageType::DataPageV2 => { + assert!(page_header.data_page_header_v2.is_some()); + let header = page_header.data_page_header_v2.unwrap(); + let is_compressed = header.is_compressed.unwrap_or(true); + Page::DataPageV2 { + buf: buffer, + num_values: header.num_values as u32, + encoding: Encoding::from(header.encoding), + num_nulls: header.num_nulls as u32, + num_rows: header.num_rows as u32, + def_levels_byte_len: header.definition_levels_byte_length as u32, + rep_levels_byte_len: header.repetition_levels_byte_length as u32, + is_compressed, + statistics: statistics::from_thrift(physical_type, header.statistics), + } + } + _ => { + // For unknown page type (e.g., INDEX_PAGE), skip and read next. + unimplemented!("Page type {:?} is not supported", page_header.type_) + } + }; + + Ok(result) +} + /// A serialized implementation for Parquet [`PageReader`]. pub struct SerializedPageReader { // The file source buffer which references exactly the bytes for the column trunk @@ -395,13 +497,6 @@ impl SerializedPageReader { }; Ok(result) } - - /// Reads Page header from Thrift. - fn read_page_header(&mut self) -> Result { - let mut prot = TCompactInputProtocol::new(&mut self.buf); - let page_header = PageHeader::read_from_in_protocol(&mut prot)?; - Ok(page_header) - } } impl Iterator for SerializedPageReader { @@ -415,108 +510,40 @@ impl Iterator for SerializedPageReader { impl PageReader for SerializedPageReader { fn get_next_page(&mut self) -> Result> { while self.seen_num_values < self.total_num_values { - let page_header = self.read_page_header()?; - - // When processing data page v2, depending on enabled compression for the - // page, we should account for uncompressed data ('offset') of - // repetition and definition levels. - // - // We always use 0 offset for other pages other than v2, `true` flag means - // that compression will be applied if decompressor is defined - let mut offset: usize = 0; - let mut can_decompress = true; - - if let Some(ref header_v2) = page_header.data_page_header_v2 { - offset = (header_v2.definition_levels_byte_length - + header_v2.repetition_levels_byte_length) - as usize; - // When is_compressed flag is missing the page is considered compressed - can_decompress = header_v2.is_compressed.unwrap_or(true); - } - - let compressed_len = page_header.compressed_page_size as usize - offset; - let uncompressed_len = page_header.uncompressed_page_size as usize - offset; - // We still need to read all bytes from buffered stream - let mut buffer = vec![0; offset + compressed_len]; - self.buf.read_exact(&mut buffer)?; - - // TODO: page header could be huge because of statistics. We should set a - // maximum page header size and abort if that is exceeded. - if let Some(decompressor) = self.decompressor.as_mut() { - if can_decompress { - let mut decompressed_buffer = Vec::with_capacity(uncompressed_len); - let decompressed_size = decompressor - .decompress(&buffer[offset..], &mut decompressed_buffer)?; - if decompressed_size != uncompressed_len { - return Err(general_err!( - "Actual decompressed size doesn't match the expected one ({} vs {})", - decompressed_size, - uncompressed_len - )); - } - if offset == 0 { - buffer = decompressed_buffer; - } else { - // Prepend saved offsets to the buffer - buffer.truncate(offset); - buffer.append(&mut decompressed_buffer); - } - } + let page_header = read_page_header(&mut self.buf)?; + + let to_read = page_header.compressed_page_size as usize; + let mut buffer = Vec::with_capacity(to_read); + let read = (&mut self.buf) + .take(to_read as u64) + .read_to_end(&mut buffer)?; + + if read != to_read { + return Err(eof_err!( + "Expected to read {} bytes of page, read only {}", + to_read, + read + )); } + let buffer = ByteBufferPtr::new(buffer); let result = match page_header.type_ { - PageType::DictionaryPage => { - assert!(page_header.dictionary_page_header.is_some()); - let dict_header = - page_header.dictionary_page_header.as_ref().unwrap(); - let is_sorted = dict_header.is_sorted.unwrap_or(false); - Page::DictionaryPage { - buf: ByteBufferPtr::new(buffer), - num_values: dict_header.num_values as u32, - encoding: Encoding::from(dict_header.encoding), - is_sorted, - } - } - PageType::DataPage => { - assert!(page_header.data_page_header.is_some()); - let header = page_header.data_page_header.unwrap(); - self.seen_num_values += header.num_values as i64; - Page::DataPage { - buf: ByteBufferPtr::new(buffer), - num_values: header.num_values as u32, - encoding: Encoding::from(header.encoding), - def_level_encoding: Encoding::from( - header.definition_level_encoding, - ), - rep_level_encoding: Encoding::from( - header.repetition_level_encoding, - ), - statistics: statistics::from_thrift( - self.physical_type, - header.statistics, - ), - } - } - PageType::DataPageV2 => { - assert!(page_header.data_page_header_v2.is_some()); - let header = page_header.data_page_header_v2.unwrap(); - let is_compressed = header.is_compressed.unwrap_or(true); - self.seen_num_values += header.num_values as i64; - Page::DataPageV2 { - buf: ByteBufferPtr::new(buffer), - num_values: header.num_values as u32, - encoding: Encoding::from(header.encoding), - num_nulls: header.num_nulls as u32, - num_rows: header.num_rows as u32, - def_levels_byte_len: header.definition_levels_byte_length as u32, - rep_levels_byte_len: header.repetition_levels_byte_length as u32, - is_compressed, - statistics: statistics::from_thrift( - self.physical_type, - header.statistics, - ), - } + PageType::DataPage | PageType::DataPageV2 => { + let decoded = decode_page( + page_header, + buffer, + self.physical_type, + self.decompressor.as_mut(), + )?; + self.seen_num_values += decoded.num_values() as i64; + decoded } + PageType::DictionaryPage => decode_page( + page_header, + buffer, + self.physical_type, + self.decompressor.as_mut(), + )?, _ => { // For unknown page type (e.g., INDEX_PAGE), skip and read next. continue;