From f0be9da82cbd76da3042b426daf6c424c9560d93 Mon Sep 17 00:00:00 2001 From: askoa <112126368+askoa@users.noreply.github.com> Date: Wed, 25 Jan 2023 09:04:25 -0500 Subject: [PATCH] feat: Add `RunEndEncodedArray` (#3553) * Add `RunEndEncodedArray` * fix doctest and clippy issues * fix doc issues * fix doc issue * add validation for run_ends array and corresponding tests * PR comments * seal ArrowRunEndIndexType per PR suggestion * Fix PR suggestions * few more PR coments * run array name change * fix doc issues * doc change * lint fix * make append methods infallible * fix array.len and other minor changes * formatting fix * add validation of array len * fmt fix * PR comment and some documentation changes * pr suggestion * empty commit Co-authored-by: ask --- arrow-array/src/array/mod.rs | 18 + arrow-array/src/array/run_array.rs | 507 +++++++++++++++++ .../src/builder/generic_byte_run_builder.rs | 519 ++++++++++++++++++ arrow-array/src/builder/mod.rs | 4 + .../src/builder/primitive_run_builder.rs | 294 ++++++++++ arrow-array/src/types.rs | 25 + arrow-data/src/data.rs | 98 +++- arrow-data/src/equal/mod.rs | 1 + arrow-data/src/transform/mod.rs | 16 + arrow-integration-test/src/datatype.rs | 1 + arrow-ipc/src/convert.rs | 1 + arrow-schema/src/datatype.rs | 23 + arrow-schema/src/error.rs | 4 + arrow-schema/src/field.rs | 1 + parquet/src/arrow/arrow_writer/mod.rs | 2 +- parquet/src/arrow/schema/mod.rs | 1 + 16 files changed, 1511 insertions(+), 4 deletions(-) create mode 100644 arrow-array/src/array/run_array.rs create mode 100644 arrow-array/src/builder/generic_byte_run_builder.rs create mode 100644 arrow-array/src/builder/primitive_run_builder.rs diff --git a/arrow-array/src/array/mod.rs b/arrow-array/src/array/mod.rs index 1e17e35d0f6..69f6ba4d8de 100644 --- a/arrow-array/src/array/mod.rs +++ b/arrow-array/src/array/mod.rs @@ -64,6 +64,9 @@ pub use struct_array::*; mod union_array; pub use union_array::*; +mod run_array; +pub use run_array::*; + /// Trait for dealing with different types of array at runtime when the type of the /// array is not known in advance. pub trait Array: std::fmt::Debug + Send + Sync { @@ -579,6 +582,20 @@ pub fn make_array(data: ArrayData) -> ArrayRef { } dt => panic!("Unexpected dictionary key type {:?}", dt), }, + DataType::RunEndEncoded(ref run_ends_type, _) => { + match run_ends_type.data_type() { + DataType::Int16 => { + Arc::new(RunArray::::from(data)) as ArrayRef + } + DataType::Int32 => { + Arc::new(RunArray::::from(data)) as ArrayRef + } + DataType::Int64 => { + Arc::new(RunArray::::from(data)) as ArrayRef + } + dt => panic!("Unexpected data type for run_ends array {:?}", dt), + } + } DataType::Null => Arc::new(NullArray::from(data)) as ArrayRef, DataType::Decimal128(_, _) => Arc::new(Decimal128Array::from(data)) as ArrayRef, DataType::Decimal256(_, _) => Arc::new(Decimal256Array::from(data)) as ArrayRef, @@ -737,6 +754,7 @@ pub fn new_null_array(data_type: &DataType, length: usize) -> ArrayRef { new_null_sized_decimal(data_type, length, std::mem::size_of::()) } DataType::Decimal256(_, _) => new_null_sized_decimal(data_type, length, 32), + DataType::RunEndEncoded(_, _) => todo!(), } } diff --git a/arrow-array/src/array/run_array.rs b/arrow-array/src/array/run_array.rs new file mode 100644 index 00000000000..0e39cd28834 --- /dev/null +++ b/arrow-array/src/array/run_array.rs @@ -0,0 +1,507 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; + +use arrow_buffer::ArrowNativeType; +use arrow_data::{ArrayData, ArrayDataBuilder}; +use arrow_schema::{ArrowError, DataType, Field}; + +use crate::{ + builder::StringRunBuilder, + make_array, + types::{Int16Type, Int32Type, Int64Type, RunEndIndexType}, + Array, ArrayRef, PrimitiveArray, +}; + +/// +/// A run-end encoding (REE) is a variation of [run-length encoding (RLE)](https://en.wikipedia.org/wiki/Run-length_encoding). +/// +/// This encoding is good for representing data containing same values repeated consecutively. +/// +/// [`RunArray`] contains `run_ends` array and `values` array of same length. +/// The `run_ends` array stores the indexes at which the run ends. The `values` array +/// stores the value of each run. Below example illustrates how a logical array is represented in +/// [`RunArray`] +/// +/// +/// ```text +/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─┐ +/// ┌─────────────────┐ ┌─────────┐ ┌─────────────────┐ +/// │ │ A │ │ 2 │ │ │ A │ +/// ├─────────────────┤ ├─────────┤ ├─────────────────┤ +/// │ │ D │ │ 3 │ │ │ A │ run length of 'A' = runs_ends[0] - 0 = 2 +/// ├─────────────────┤ ├─────────┤ ├─────────────────┤ +/// │ │ B │ │ 6 │ │ │ D │ run length of 'D' = run_ends[1] - run_ends[0] = 1 +/// └─────────────────┘ └─────────┘ ├─────────────────┤ +/// │ values run_ends │ │ B │ +/// ├─────────────────┤ +/// └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─┘ │ B │ +/// ├─────────────────┤ +/// RunArray │ B │ run length of 'B' = run_ends[2] - run_ends[1] = 3 +/// length = 3 └─────────────────┘ +/// +/// Logical array +/// Contents +/// ``` + +pub struct RunArray { + data: ArrayData, + run_ends: PrimitiveArray, + values: ArrayRef, +} + +impl RunArray { + // calculates the logical length of the array encoded + // by the given run_ends array. + fn logical_len(run_ends: &PrimitiveArray) -> usize { + let len = run_ends.len(); + if len == 0 { + return 0; + } + run_ends.value(len - 1).as_usize() + } + + /// Attempts to create RunArray using given run_ends (index where a run ends) + /// and the values (value of the run). Returns an error if the given data is not compatible + /// with RunEndEncoded specification. + pub fn try_new( + run_ends: &PrimitiveArray, + values: &dyn Array, + ) -> Result { + let run_ends_type = run_ends.data_type().clone(); + let values_type = values.data_type().clone(); + let ree_array_type = DataType::RunEndEncoded( + Box::new(Field::new("run_ends", run_ends_type, false)), + Box::new(Field::new("values", values_type, true)), + ); + let len = RunArray::logical_len(run_ends); + let builder = ArrayDataBuilder::new(ree_array_type) + .len(len) + .add_child_data(run_ends.data().clone()) + .add_child_data(values.data().clone()); + + // `build_unchecked` is used to avoid recursive validation of child arrays. + let array_data = unsafe { builder.build_unchecked() }; + + // Safety: `validate_data` checks below + // 1. The given array data has exactly two child arrays. + // 2. The first child array (run_ends) has valid data type. + // 3. run_ends array does not have null values + // 4. run_ends array has non-zero and strictly increasing values. + // 5. The length of run_ends array and values array are the same. + array_data.validate_data()?; + + Ok(array_data.into()) + } + + /// Returns a reference to run_ends array + /// + /// Note: any slicing of this array is not applied to the returned array + /// and must be handled separately + pub fn run_ends(&self) -> &PrimitiveArray { + &self.run_ends + } + + /// Returns a reference to values array + pub fn values(&self) -> &ArrayRef { + &self.values + } +} + +impl From for RunArray { + // The method assumes the caller already validated the data using `ArrayData::validate_data()` + fn from(data: ArrayData) -> Self { + match data.data_type() { + DataType::RunEndEncoded(_, _) => {} + _ => { + panic!("Invalid data type for RunArray. The data type should be DataType::RunEndEncoded"); + } + } + + let run_ends = PrimitiveArray::::from(data.child_data()[0].clone()); + let values = make_array(data.child_data()[1].clone()); + Self { + data, + run_ends, + values, + } + } +} + +impl From> for ArrayData { + fn from(array: RunArray) -> Self { + array.data + } +} + +impl Array for RunArray { + fn as_any(&self) -> &dyn Any { + self + } + + fn data(&self) -> &ArrayData { + &self.data + } + + fn into_data(self) -> ArrayData { + self.into() + } +} + +impl std::fmt::Debug for RunArray { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + writeln!( + f, + "RunArray {{run_ends: {:?}, values: {:?}}}", + self.run_ends, self.values + ) + } +} + +/// Constructs a `RunArray` from an iterator of optional strings. +/// +/// # Example: +/// ``` +/// use arrow_array::{RunArray, PrimitiveArray, StringArray, types::Int16Type}; +/// +/// let test = vec!["a", "a", "b", "c", "c"]; +/// let array: RunArray = test +/// .iter() +/// .map(|&x| if x == "b" { None } else { Some(x) }) +/// .collect(); +/// assert_eq!( +/// "RunArray {run_ends: PrimitiveArray\n[\n 2,\n 3,\n 5,\n], values: StringArray\n[\n \"a\",\n null,\n \"c\",\n]}\n", +/// format!("{:?}", array) +/// ); +/// ``` +impl<'a, T: RunEndIndexType> FromIterator> for RunArray { + fn from_iter>>(iter: I) -> Self { + let it = iter.into_iter(); + let (lower, _) = it.size_hint(); + let mut builder = StringRunBuilder::with_capacity(lower, 256); + it.for_each(|i| { + builder.append_option(i); + }); + + builder.finish() + } +} + +/// Constructs a `RunArray` from an iterator of strings. +/// +/// # Example: +/// +/// ``` +/// use arrow_array::{RunArray, PrimitiveArray, StringArray, types::Int16Type}; +/// +/// let test = vec!["a", "a", "b", "c"]; +/// let array: RunArray = test.into_iter().collect(); +/// assert_eq!( +/// "RunArray {run_ends: PrimitiveArray\n[\n 2,\n 3,\n 4,\n], values: StringArray\n[\n \"a\",\n \"b\",\n \"c\",\n]}\n", +/// format!("{:?}", array) +/// ); +/// ``` +impl<'a, T: RunEndIndexType> FromIterator<&'a str> for RunArray { + fn from_iter>(iter: I) -> Self { + let it = iter.into_iter(); + let (lower, _) = it.size_hint(); + let mut builder = StringRunBuilder::with_capacity(lower, 256); + it.for_each(|i| { + builder.append_value(i); + }); + + builder.finish() + } +} + +/// +/// A [`RunArray`] array where run ends are stored using `i16` data type. +/// +/// # Example: Using `collect` +/// ``` +/// # use arrow_array::{Array, Int16RunArray, Int16Array, StringArray}; +/// # use std::sync::Arc; +/// +/// let array: Int16RunArray = vec!["a", "a", "b", "c", "c"].into_iter().collect(); +/// let values: Arc = Arc::new(StringArray::from(vec!["a", "b", "c"])); +/// assert_eq!(array.run_ends(), &Int16Array::from(vec![2, 3, 5])); +/// assert_eq!(array.values(), &values); +/// ``` +pub type Int16RunArray = RunArray; + +/// +/// A [`RunArray`] array where run ends are stored using `i32` data type. +/// +/// # Example: Using `collect` +/// ``` +/// # use arrow_array::{Array, Int32RunArray, Int32Array, StringArray}; +/// # use std::sync::Arc; +/// +/// let array: Int32RunArray = vec!["a", "a", "b", "c", "c"].into_iter().collect(); +/// let values: Arc = Arc::new(StringArray::from(vec!["a", "b", "c"])); +/// assert_eq!(array.run_ends(), &Int32Array::from(vec![2, 3, 5])); +/// assert_eq!(array.values(), &values); +/// ``` +pub type Int32RunArray = RunArray; + +/// +/// A [`RunArray`] array where run ends are stored using `i64` data type. +/// +/// # Example: Using `collect` +/// ``` +/// # use arrow_array::{Array, Int64RunArray, Int64Array, StringArray}; +/// # use std::sync::Arc; +/// +/// let array: Int64RunArray = vec!["a", "a", "b", "c", "c"].into_iter().collect(); +/// let values: Arc = Arc::new(StringArray::from(vec!["a", "b", "c"])); +/// assert_eq!(array.run_ends(), &Int64Array::from(vec![2, 3, 5])); +/// assert_eq!(array.values(), &values); +/// ``` +pub type Int64RunArray = RunArray; + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use super::*; + use crate::builder::PrimitiveRunBuilder; + use crate::types::{Int16Type, Int32Type, Int8Type, UInt32Type}; + use crate::{Array, Int16Array, Int32Array, StringArray}; + + #[test] + fn test_run_array() { + // Construct a value array + let value_data = PrimitiveArray::::from_iter_values([ + 10_i8, 11, 12, 13, 14, 15, 16, 17, + ]); + + // Construct a run_ends array: + let run_ends_data = PrimitiveArray::::from_iter_values([ + 4_i16, 6, 7, 9, 13, 18, 20, 22, + ]); + + // Construct a run ends encoded array from the above two + let ree_array = + RunArray::::try_new(&run_ends_data, &value_data).unwrap(); + + assert_eq!(ree_array.len(), 22); + assert_eq!(ree_array.null_count(), 0); + + let values = ree_array.values(); + assert_eq!(&value_data.into_data(), values.data()); + assert_eq!(&DataType::Int8, values.data_type()); + + let run_ends = ree_array.run_ends(); + assert_eq!(&run_ends_data.into_data(), run_ends.data()); + assert_eq!(&DataType::Int16, run_ends.data_type()); + } + + #[test] + fn test_run_array_fmt_debug() { + let mut builder = PrimitiveRunBuilder::::with_capacity(3); + builder.append_value(12345678); + builder.append_null(); + builder.append_value(22345678); + let array = builder.finish(); + assert_eq!( + "RunArray {run_ends: PrimitiveArray\n[\n 1,\n 2,\n 3,\n], values: PrimitiveArray\n[\n 12345678,\n null,\n 22345678,\n]}\n", + format!("{:?}", array) + ); + + let mut builder = PrimitiveRunBuilder::::with_capacity(20); + for _ in 0..20 { + builder.append_value(1); + } + let array = builder.finish(); + + assert_eq!(array.len(), 20); + assert_eq!(array.null_count(), 0); + + assert_eq!( + "RunArray {run_ends: PrimitiveArray\n[\n 20,\n], values: PrimitiveArray\n[\n 1,\n]}\n", + format!("{:?}", array) + ); + } + + #[test] + fn test_run_array_from_iter() { + let test = vec!["a", "a", "b", "c"]; + let array: RunArray = test + .iter() + .map(|&x| if x == "b" { None } else { Some(x) }) + .collect(); + assert_eq!( + "RunArray {run_ends: PrimitiveArray\n[\n 2,\n 3,\n 4,\n], values: StringArray\n[\n \"a\",\n null,\n \"c\",\n]}\n", + format!("{:?}", array) + ); + + assert_eq!(array.len(), 4); + assert_eq!(array.null_count(), 0); + + let array: RunArray = test.into_iter().collect(); + assert_eq!( + "RunArray {run_ends: PrimitiveArray\n[\n 2,\n 3,\n 4,\n], values: StringArray\n[\n \"a\",\n \"b\",\n \"c\",\n]}\n", + format!("{:?}", array) + ); + } + + #[test] + fn test_run_array_run_ends_as_primitive_array() { + let test = vec!["a", "b", "c", "a"]; + let array: RunArray = test.into_iter().collect(); + + assert_eq!(array.len(), 4); + assert_eq!(array.null_count(), 0); + + let run_ends = array.run_ends(); + assert_eq!(&DataType::Int16, run_ends.data_type()); + assert_eq!(0, run_ends.null_count()); + assert_eq!(&[1, 2, 3, 4], run_ends.values()); + } + + #[test] + fn test_run_array_as_primitive_array_with_null() { + let test = vec![Some("a"), None, Some("b"), None, None, Some("a")]; + let array: RunArray = test.into_iter().collect(); + + assert_eq!(array.len(), 6); + assert_eq!(array.null_count(), 0); + + let run_ends = array.run_ends(); + assert_eq!(&DataType::Int32, run_ends.data_type()); + assert_eq!(0, run_ends.null_count()); + assert_eq!(5, run_ends.len()); + assert_eq!(&[1, 2, 3, 5, 6], run_ends.values()); + + let values_data = array.values(); + assert_eq!(2, values_data.null_count()); + assert_eq!(5, values_data.len()); + } + + #[test] + fn test_run_array_all_nulls() { + let test = vec![None, None, None]; + let array: RunArray = test.into_iter().collect(); + + assert_eq!(array.len(), 3); + assert_eq!(array.null_count(), 0); + + let run_ends = array.run_ends(); + assert_eq!(1, run_ends.len()); + assert_eq!(&[3], run_ends.values()); + + let values_data = array.values(); + assert_eq!(1, values_data.null_count()); + } + + #[test] + fn test_run_array_try_new() { + let values: StringArray = [Some("foo"), Some("bar"), None, Some("baz")] + .into_iter() + .collect(); + let run_ends: Int32Array = + [Some(1), Some(2), Some(3), Some(4)].into_iter().collect(); + + let array = RunArray::::try_new(&run_ends, &values).unwrap(); + assert_eq!(array.run_ends().data_type(), &DataType::Int32); + assert_eq!(array.values().data_type(), &DataType::Utf8); + + assert_eq!(array.null_count(), 0); + assert_eq!(array.len(), 4); + assert_eq!(array.run_ends.null_count(), 0); + assert_eq!(array.values().null_count(), 1); + + assert_eq!( + "RunArray {run_ends: PrimitiveArray\n[\n 1,\n 2,\n 3,\n 4,\n], values: StringArray\n[\n \"foo\",\n \"bar\",\n null,\n \"baz\",\n]}\n", + format!("{:?}", array) + ); + } + + #[test] + fn test_run_array_int16_type_definition() { + let array: Int16RunArray = vec!["a", "a", "b", "c", "c"].into_iter().collect(); + let values: Arc = Arc::new(StringArray::from(vec!["a", "b", "c"])); + assert_eq!(array.run_ends(), &Int16Array::from(vec![2, 3, 5])); + assert_eq!(array.values(), &values); + } + + #[test] + fn test_run_array_empty_string() { + let array: Int16RunArray = vec!["a", "a", "", "", "c"].into_iter().collect(); + let values: Arc = Arc::new(StringArray::from(vec!["a", "", "c"])); + assert_eq!(array.run_ends(), &Int16Array::from(vec![2, 4, 5])); + assert_eq!(array.values(), &values); + } + + #[test] + fn test_run_array_length_mismatch() { + let values: StringArray = [Some("foo"), Some("bar"), None, Some("baz")] + .into_iter() + .collect(); + let run_ends: Int32Array = [Some(1), Some(2), Some(3)].into_iter().collect(); + + let actual = RunArray::::try_new(&run_ends, &values); + let expected = ArrowError::InvalidArgumentError("The run_ends array length should be the same as values array length. Run_ends array length is 3, values array length is 4".to_string()); + assert_eq!(expected.to_string(), actual.err().unwrap().to_string()); + } + + #[test] + fn test_run_array_run_ends_with_null() { + let values: StringArray = [Some("foo"), Some("bar"), Some("baz")] + .into_iter() + .collect(); + let run_ends: Int32Array = [Some(1), None, Some(3)].into_iter().collect(); + + let actual = RunArray::::try_new(&run_ends, &values); + let expected = ArrowError::InvalidArgumentError("Found null values in run_ends array. The run_ends array should not have null values.".to_string()); + assert_eq!(expected.to_string(), actual.err().unwrap().to_string()); + } + + #[test] + fn test_run_array_run_ends_with_zeroes() { + let values: StringArray = [Some("foo"), Some("bar"), Some("baz")] + .into_iter() + .collect(); + let run_ends: Int32Array = [Some(0), Some(1), Some(3)].into_iter().collect(); + + let actual = RunArray::::try_new(&run_ends, &values); + let expected = ArrowError::InvalidArgumentError("The values in run_ends array should be strictly positive. Found value 0 at index 0 that does not match the criteria.".to_string()); + assert_eq!(expected.to_string(), actual.err().unwrap().to_string()); + } + + #[test] + fn test_run_array_run_ends_non_increasing() { + let values: StringArray = [Some("foo"), Some("bar"), Some("baz")] + .into_iter() + .collect(); + let run_ends: Int32Array = [Some(1), Some(4), Some(4)].into_iter().collect(); + + let actual = RunArray::::try_new(&run_ends, &values); + let expected = ArrowError::InvalidArgumentError("The values in run_ends array should be strictly increasing. Found value 4 at index 2 with previous value 4 that does not match the criteria.".to_string()); + assert_eq!(expected.to_string(), actual.err().unwrap().to_string()); + } + + #[test] + #[should_panic( + expected = "PrimitiveArray expected ArrayData with type Int64 got Int32" + )] + fn test_run_array_run_ends_data_type_mismatch() { + let a = RunArray::::from_iter(["32"]); + let _ = RunArray::::from(a.into_data()); + } +} diff --git a/arrow-array/src/builder/generic_byte_run_builder.rs b/arrow-array/src/builder/generic_byte_run_builder.rs new file mode 100644 index 00000000000..c1ecbcb5dde --- /dev/null +++ b/arrow-array/src/builder/generic_byte_run_builder.rs @@ -0,0 +1,519 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::types::bytes::ByteArrayNativeType; +use std::{any::Any, sync::Arc}; + +use crate::{ + types::{ + BinaryType, ByteArrayType, LargeBinaryType, LargeUtf8Type, RunEndIndexType, + Utf8Type, + }, + ArrayRef, ArrowPrimitiveType, RunArray, +}; + +use super::{ArrayBuilder, GenericByteBuilder, PrimitiveBuilder}; + +use arrow_buffer::ArrowNativeType; + +/// Array builder for [`RunArray`] for String and Binary types. +/// +/// # Example: +/// +/// ``` +/// +/// # use arrow_array::builder::GenericByteRunBuilder; +/// # use arrow_array::{GenericByteArray, BinaryArray}; +/// # use arrow_array::types::{BinaryType, Int16Type}; +/// # use arrow_array::{Array, Int16Array}; +/// # use arrow_array::cast::as_generic_binary_array; +/// +/// let mut builder = +/// GenericByteRunBuilder::::new(); +/// builder.append_value(b"abc"); +/// builder.append_value(b"abc"); +/// builder.append_null(); +/// builder.append_value(b"def"); +/// let array = builder.finish(); +/// +/// assert_eq!( +/// array.run_ends(), +/// &Int16Array::from(vec![Some(2), Some(3), Some(4)]) +/// ); +/// +/// let av = array.values(); +/// +/// assert!(!av.is_null(0)); +/// assert!(av.is_null(1)); +/// assert!(!av.is_null(2)); +/// +/// // Values are polymorphic and so require a downcast. +/// let ava: &BinaryArray = as_generic_binary_array(av.as_ref()); +/// +/// assert_eq!(ava.value(0), b"abc"); +/// assert_eq!(ava.value(2), b"def"); +/// ``` +#[derive(Debug)] +pub struct GenericByteRunBuilder +where + R: ArrowPrimitiveType, + V: ByteArrayType, +{ + run_ends_builder: PrimitiveBuilder, + values_builder: GenericByteBuilder, + current_value: Vec, + has_current_value: bool, + current_run_end_index: usize, + prev_run_end_index: usize, +} + +impl Default for GenericByteRunBuilder +where + R: ArrowPrimitiveType, + V: ByteArrayType, +{ + fn default() -> Self { + Self::new() + } +} + +impl GenericByteRunBuilder +where + R: ArrowPrimitiveType, + V: ByteArrayType, +{ + /// Creates a new `GenericByteRunBuilder` + pub fn new() -> Self { + Self { + run_ends_builder: PrimitiveBuilder::new(), + values_builder: GenericByteBuilder::::new(), + current_value: Vec::new(), + has_current_value: false, + current_run_end_index: 0, + prev_run_end_index: 0, + } + } + + /// Creates a new `GenericByteRunBuilder` with the provided capacity + /// + /// `capacity`: the expected number of run-end encoded values. + /// `data_capacity`: the expected number of bytes of run end encoded values + pub fn with_capacity(capacity: usize, data_capacity: usize) -> Self { + Self { + run_ends_builder: PrimitiveBuilder::with_capacity(capacity), + values_builder: GenericByteBuilder::::with_capacity( + capacity, + data_capacity, + ), + current_value: Vec::new(), + has_current_value: false, + current_run_end_index: 0, + prev_run_end_index: 0, + } + } +} + +impl ArrayBuilder for GenericByteRunBuilder +where + R: RunEndIndexType, + V: ByteArrayType, +{ + /// Returns the builder as a non-mutable `Any` reference. + fn as_any(&self) -> &dyn Any { + self + } + + /// Returns the builder as a mutable `Any` reference. + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } + + /// Returns the boxed builder as a box of `Any`. + fn into_box_any(self: Box) -> Box { + self + } + + /// Returns the length of logical array encoded by + /// the eventual runs array. + fn len(&self) -> usize { + self.current_run_end_index + } + + /// Returns whether the number of array slots is zero + fn is_empty(&self) -> bool { + self.current_run_end_index == 0 + } + + /// Builds the array and reset this builder. + fn finish(&mut self) -> ArrayRef { + Arc::new(self.finish()) + } + + /// Builds the array without resetting the builder. + fn finish_cloned(&self) -> ArrayRef { + Arc::new(self.finish_cloned()) + } +} + +impl GenericByteRunBuilder +where + R: RunEndIndexType, + V: ByteArrayType, +{ + /// Appends optional value to the logical array encoded by the RunArray. + pub fn append_option(&mut self, input_value: Option>) { + match input_value { + Some(value) => self.append_value(value), + None => self.append_null(), + } + } + + /// Appends value to the logical array encoded by the RunArray. + pub fn append_value(&mut self, input_value: impl AsRef) { + let value: &[u8] = input_value.as_ref().as_ref(); + if !self.has_current_value { + self.append_run_end(); + self.current_value.extend_from_slice(value); + self.has_current_value = true; + } else if self.current_value.as_slice() != value { + self.append_run_end(); + self.current_value.clear(); + self.current_value.extend_from_slice(value); + } + self.current_run_end_index += 1; + } + + /// Appends null to the logical array encoded by the RunArray. + pub fn append_null(&mut self) { + if self.has_current_value { + self.append_run_end(); + self.current_value.clear(); + self.has_current_value = false; + } + self.current_run_end_index += 1; + } + + /// Creates the RunArray and resets the builder. + /// Panics if RunArray cannot be built. + pub fn finish(&mut self) -> RunArray { + // write the last run end to the array. + self.append_run_end(); + + // reset the run end index to zero. + self.current_value.clear(); + self.has_current_value = false; + self.current_run_end_index = 0; + self.prev_run_end_index = 0; + + // build the run encoded array by adding run_ends and values array as its children. + let run_ends_array = self.run_ends_builder.finish(); + let values_array = self.values_builder.finish(); + RunArray::::try_new(&run_ends_array, &values_array).unwrap() + } + + /// Creates the RunArray and without resetting the builder. + /// Panics if RunArray cannot be built. + pub fn finish_cloned(&self) -> RunArray { + let mut run_ends_array = self.run_ends_builder.finish_cloned(); + let mut values_array = self.values_builder.finish_cloned(); + + // Add current run if one exists + if self.prev_run_end_index != self.current_run_end_index { + let mut run_end_builder = run_ends_array.into_builder().unwrap(); + let mut values_builder = values_array.into_builder().unwrap(); + self.append_run_end_with_builders(&mut run_end_builder, &mut values_builder); + run_ends_array = run_end_builder.finish(); + values_array = values_builder.finish(); + } + + RunArray::::try_new(&run_ends_array, &values_array).unwrap() + } + + // Appends the current run to the array. + fn append_run_end(&mut self) { + // empty array or the function called without appending any value. + if self.prev_run_end_index == self.current_run_end_index { + return; + } + let run_end_index = self.run_end_index_as_native(); + self.run_ends_builder.append_value(run_end_index); + if self.has_current_value { + let slice = self.current_value.as_slice(); + let native = unsafe { + // Safety: + // As self.current_value is created from V::Native. The value V::Native can be + // built back from the bytes without validations + V::Native::from_bytes_unchecked(slice) + }; + self.values_builder.append_value(native); + } else { + self.values_builder.append_null(); + } + self.prev_run_end_index = self.current_run_end_index; + } + + // Similar to `append_run_end` but on custom builders. + // Used in `finish_cloned` which is not suppose to mutate `self`. + fn append_run_end_with_builders( + &self, + run_ends_builder: &mut PrimitiveBuilder, + values_builder: &mut GenericByteBuilder, + ) { + let run_end_index = self.run_end_index_as_native(); + run_ends_builder.append_value(run_end_index); + if self.has_current_value { + let slice = self.current_value.as_slice(); + let native = unsafe { + // Safety: + // As self.current_value is created from V::Native. The value V::Native can be + // built back from the bytes without validations + V::Native::from_bytes_unchecked(slice) + }; + values_builder.append_value(native); + } else { + values_builder.append_null(); + } + } + + fn run_end_index_as_native(&self) -> R::Native { + R::Native::from_usize(self.current_run_end_index) + .unwrap_or_else(|| panic!( + "Cannot convert the value {} from `usize` to native form of arrow datatype {}", + self.current_run_end_index, + R::DATA_TYPE + )) + } +} + +/// Array builder for [`RunArray`] that encodes strings ([`Utf8Type`]). +/// +/// ``` +/// // Create a run-end encoded array with run-end indexes data type as `i16`. +/// // The encoded values are Strings. +/// +/// # use arrow_array::builder::StringRunBuilder; +/// # use arrow_array::{Int16Array, StringArray}; +/// # use arrow_array::types::Int16Type; +/// # use arrow_array::cast::as_string_array; +/// +/// let mut builder = StringRunBuilder::::new(); +/// +/// // The builder builds the dictionary value by value +/// builder.append_value("abc"); +/// builder.append_null(); +/// builder.append_value("def"); +/// builder.append_value("def"); +/// builder.append_value("abc"); +/// let array = builder.finish(); +/// +/// assert_eq!( +/// array.run_ends(), +/// &Int16Array::from(vec![Some(1), Some(2), Some(4), Some(5)]) +/// ); +/// +/// // Values are polymorphic and so require a downcast. +/// let av = array.values(); +/// let ava: &StringArray = as_string_array(av.as_ref()); +/// +/// assert_eq!(ava.value(0), "abc"); +/// assert!(av.is_null(1)); +/// assert_eq!(ava.value(2), "def"); +/// assert_eq!(ava.value(3), "abc"); +/// +/// ``` +pub type StringRunBuilder = GenericByteRunBuilder; + +/// Array builder for [`RunArray`] that encodes large strings ([`LargeUtf8Type`]). See [`StringRunBuilder`] for an example. +pub type LargeStringRunBuilder = GenericByteRunBuilder; + +/// Array builder for [`RunArray`] that encodes binary values([`BinaryType`]). +/// +/// ``` +/// // Create a run-end encoded array with run-end indexes data type as `i16`. +/// // The encoded data is binary values. +/// +/// # use arrow_array::builder::BinaryRunBuilder; +/// # use arrow_array::{BinaryArray, Int16Array}; +/// # use arrow_array::types::Int16Type; +/// # use arrow_array::cast::as_generic_binary_array; +/// +/// let mut builder = BinaryRunBuilder::::new(); +/// +/// // The builder builds the dictionary value by value +/// builder.append_value(b"abc"); +/// builder.append_null(); +/// builder.append_value(b"def"); +/// builder.append_value(b"def"); +/// builder.append_value(b"abc"); +/// let array = builder.finish(); +/// +/// assert_eq!( +/// array.run_ends(), +/// &Int16Array::from(vec![Some(1), Some(2), Some(4), Some(5)]) +/// ); +/// +/// // Values are polymorphic and so require a downcast. +/// let av = array.values(); +/// let ava: &BinaryArray = as_generic_binary_array::(av.as_ref()); +/// +/// assert_eq!(ava.value(0), b"abc"); +/// assert!(av.is_null(1)); +/// assert_eq!(ava.value(2), b"def"); +/// assert_eq!(ava.value(3), b"abc"); +/// +/// ``` +pub type BinaryRunBuilder = GenericByteRunBuilder; + +/// Array builder for [`RunArray`] that encodes large binary values([`LargeBinaryType`]). +/// See documentation of [`BinaryRunBuilder`] for an example. +pub type LargeBinaryRunBuilder = GenericByteRunBuilder; + +#[cfg(test)] +mod tests { + use super::*; + + use crate::array::Array; + use crate::types::Int16Type; + use crate::GenericByteArray; + use crate::Int16Array; + use crate::Int16RunArray; + + fn test_bytes_run_buider(values: Vec<&T::Native>) + where + T: ByteArrayType, + ::Native: PartialEq, + ::Native: AsRef<::Native>, + { + let mut builder = GenericByteRunBuilder::::new(); + builder.append_value(values[0]); + builder.append_value(values[0]); + builder.append_value(values[0]); + builder.append_null(); + builder.append_null(); + builder.append_value(values[1]); + builder.append_value(values[1]); + builder.append_value(values[2]); + builder.append_value(values[2]); + builder.append_value(values[2]); + builder.append_value(values[2]); + let array = builder.finish(); + + assert_eq!(array.len(), 11); + assert_eq!(array.null_count(), 0); + + assert_eq!( + array.run_ends(), + &Int16Array::from(vec![Some(3), Some(5), Some(7), Some(11)]) + ); + + // Values are polymorphic and so require a downcast. + let av = array.values(); + let ava: &GenericByteArray = + av.as_any().downcast_ref::>().unwrap(); + + assert_eq!(*ava.value(0), *values[0]); + assert!(ava.is_null(1)); + assert_eq!(*ava.value(2), *values[1]); + assert_eq!(*ava.value(3), *values[2]); + } + + #[test] + fn test_string_run_buider() { + test_bytes_run_buider::(vec!["abc", "def", "ghi"]); + } + + #[test] + fn test_string_run_buider_with_empty_strings() { + test_bytes_run_buider::(vec!["abc", "", "ghi"]); + } + + #[test] + fn test_binary_run_buider() { + test_bytes_run_buider::(vec![b"abc", b"def", b"ghi"]); + } + + fn test_bytes_run_buider_finish_cloned(values: Vec<&T::Native>) + where + T: ByteArrayType, + ::Native: PartialEq, + ::Native: AsRef<::Native>, + { + let mut builder = GenericByteRunBuilder::::new(); + + builder.append_value(values[0]); + builder.append_null(); + builder.append_value(values[1]); + builder.append_value(values[1]); + builder.append_value(values[0]); + let mut array: Int16RunArray = builder.finish_cloned(); + + assert_eq!(array.len(), 5); + assert_eq!(array.null_count(), 0); + + assert_eq!( + array.run_ends(), + &Int16Array::from(vec![Some(1), Some(2), Some(4), Some(5)]) + ); + + // Values are polymorphic and so require a downcast. + let av = array.values(); + let ava: &GenericByteArray = + av.as_any().downcast_ref::>().unwrap(); + + assert_eq!(ava.value(0), values[0]); + assert!(ava.is_null(1)); + assert_eq!(ava.value(2), values[1]); + assert_eq!(ava.value(3), values[0]); + + // Append last value before `finish_cloned` (`value[0]`) again and ensure it has only + // one entry in final output. + builder.append_value(values[0]); + builder.append_value(values[0]); + builder.append_value(values[1]); + array = builder.finish(); + + assert_eq!(array.len(), 8); + assert_eq!(array.null_count(), 0); + + assert_eq!( + array.run_ends(), + &Int16Array::from(vec![Some(1), Some(2), Some(4), Some(7), Some(8),]) + ); + + // Values are polymorphic and so require a downcast. + let av2 = array.values(); + let ava2: &GenericByteArray = + av2.as_any().downcast_ref::>().unwrap(); + + assert_eq!(ava2.value(0), values[0]); + assert!(ava2.is_null(1)); + assert_eq!(ava2.value(2), values[1]); + // The value appended before and after `finish_cloned` has only one entry. + assert_eq!(ava2.value(3), values[0]); + assert_eq!(ava2.value(4), values[1]); + } + + #[test] + fn test_string_run_buider_finish_cloned() { + test_bytes_run_buider_finish_cloned::(vec!["abc", "def", "ghi"]); + } + + #[test] + fn test_binary_run_buider_finish_cloned() { + test_bytes_run_buider_finish_cloned::(vec![b"abc", b"def", b"ghi"]); + } +} diff --git a/arrow-array/src/builder/mod.rs b/arrow-array/src/builder/mod.rs index 820ecd23bc5..fc2454635d9 100644 --- a/arrow-array/src/builder/mod.rs +++ b/arrow-array/src/builder/mod.rs @@ -39,10 +39,14 @@ mod primitive_builder; pub use primitive_builder::*; mod primitive_dictionary_builder; pub use primitive_dictionary_builder::*; +mod primitive_run_builder; +pub use primitive_run_builder::*; mod struct_builder; pub use struct_builder::*; mod generic_bytes_dictionary_builder; pub use generic_bytes_dictionary_builder::*; +mod generic_byte_run_builder; +pub use generic_byte_run_builder::*; mod union_builder; pub use union_builder::*; diff --git a/arrow-array/src/builder/primitive_run_builder.rs b/arrow-array/src/builder/primitive_run_builder.rs new file mode 100644 index 00000000000..82c46abfa05 --- /dev/null +++ b/arrow-array/src/builder/primitive_run_builder.rs @@ -0,0 +1,294 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{any::Any, sync::Arc}; + +use crate::{types::RunEndIndexType, ArrayRef, ArrowPrimitiveType, RunArray}; + +use super::{ArrayBuilder, PrimitiveBuilder}; + +use arrow_buffer::ArrowNativeType; + +/// Array builder for [`RunArray`] that encodes primitive values. +/// +/// # Example: +/// +/// ``` +/// +/// # use arrow_array::builder::PrimitiveRunBuilder; +/// # use arrow_array::cast::as_primitive_array; +/// # use arrow_array::types::{UInt32Type, Int16Type}; +/// # use arrow_array::{Array, UInt32Array, Int16Array}; +/// +/// let mut builder = +/// PrimitiveRunBuilder::::new(); +/// builder.append_value(1234); +/// builder.append_value(1234); +/// builder.append_value(1234); +/// builder.append_null(); +/// builder.append_value(5678); +/// builder.append_value(5678); +/// let array = builder.finish(); +/// +/// assert_eq!( +/// array.run_ends(), +/// &Int16Array::from(vec![Some(3), Some(4), Some(6)]) +/// ); +/// +/// let av = array.values(); +/// +/// assert!(!av.is_null(0)); +/// assert!(av.is_null(1)); +/// assert!(!av.is_null(2)); +/// +/// // Values are polymorphic and so require a downcast. +/// let ava: &UInt32Array = as_primitive_array::(av.as_ref()); +/// +/// assert_eq!(ava, &UInt32Array::from(vec![Some(1234), None, Some(5678)])); +/// ``` +#[derive(Debug)] +pub struct PrimitiveRunBuilder +where + R: RunEndIndexType, + V: ArrowPrimitiveType, +{ + run_ends_builder: PrimitiveBuilder, + values_builder: PrimitiveBuilder, + current_value: Option, + current_run_end_index: usize, + prev_run_end_index: usize, +} + +impl Default for PrimitiveRunBuilder +where + R: RunEndIndexType, + V: ArrowPrimitiveType, +{ + fn default() -> Self { + Self::new() + } +} + +impl PrimitiveRunBuilder +where + R: RunEndIndexType, + V: ArrowPrimitiveType, +{ + /// Creates a new `PrimitiveRunBuilder` + pub fn new() -> Self { + Self { + run_ends_builder: PrimitiveBuilder::new(), + values_builder: PrimitiveBuilder::new(), + current_value: None, + current_run_end_index: 0, + prev_run_end_index: 0, + } + } + + /// Creates a new `PrimitiveRunBuilder` with the provided capacity + /// + /// `capacity`: the expected number of run-end encoded values. + pub fn with_capacity(capacity: usize) -> Self { + Self { + run_ends_builder: PrimitiveBuilder::with_capacity(capacity), + values_builder: PrimitiveBuilder::with_capacity(capacity), + current_value: None, + current_run_end_index: 0, + prev_run_end_index: 0, + } + } +} + +impl ArrayBuilder for PrimitiveRunBuilder +where + R: RunEndIndexType, + V: ArrowPrimitiveType, +{ + /// Returns the builder as a non-mutable `Any` reference. + fn as_any(&self) -> &dyn Any { + self + } + + /// Returns the builder as a mutable `Any` reference. + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } + + /// Returns the boxed builder as a box of `Any`. + fn into_box_any(self: Box) -> Box { + self + } + + /// Returns the length of logical array encoded by + /// the eventual runs array. + fn len(&self) -> usize { + self.current_run_end_index + } + + /// Returns whether the number of array slots is zero + fn is_empty(&self) -> bool { + self.current_run_end_index == 0 + } + + /// Builds the array and reset this builder. + fn finish(&mut self) -> ArrayRef { + Arc::new(self.finish()) + } + + /// Builds the array without resetting the builder. + fn finish_cloned(&self) -> ArrayRef { + Arc::new(self.finish_cloned()) + } +} + +impl PrimitiveRunBuilder +where + R: RunEndIndexType, + V: ArrowPrimitiveType, +{ + /// Appends optional value to the logical array encoded by the RunArray. + pub fn append_option(&mut self, value: Option) { + if self.current_run_end_index == 0 { + self.current_run_end_index = 1; + self.current_value = value; + return; + } + if self.current_value != value { + self.append_run_end(); + self.current_value = value; + } + + self.current_run_end_index += 1; + } + + /// Appends value to the logical array encoded by the run-ends array. + pub fn append_value(&mut self, value: V::Native) { + self.append_option(Some(value)) + } + + /// Appends null to the logical array encoded by the run-ends array. + pub fn append_null(&mut self) { + self.append_option(None) + } + + /// Creates the RunArray and resets the builder. + /// Panics if RunArray cannot be built. + pub fn finish(&mut self) -> RunArray { + // write the last run end to the array. + self.append_run_end(); + + // reset the run index to zero. + self.current_value = None; + self.current_run_end_index = 0; + + // build the run encoded array by adding run_ends and values array as its children. + let run_ends_array = self.run_ends_builder.finish(); + let values_array = self.values_builder.finish(); + RunArray::::try_new(&run_ends_array, &values_array).unwrap() + } + + /// Creates the RunArray and without resetting the builder. + /// Panics if RunArray cannot be built. + pub fn finish_cloned(&self) -> RunArray { + let mut run_ends_array = self.run_ends_builder.finish_cloned(); + let mut values_array = self.values_builder.finish_cloned(); + + // Add current run if one exists + if self.prev_run_end_index != self.current_run_end_index { + let mut run_end_builder = run_ends_array.into_builder().unwrap(); + let mut values_builder = values_array.into_builder().unwrap(); + self.append_run_end_with_builders(&mut run_end_builder, &mut values_builder); + run_ends_array = run_end_builder.finish(); + values_array = values_builder.finish(); + } + + RunArray::try_new(&run_ends_array, &values_array).unwrap() + } + + // Appends the current run to the array. + fn append_run_end(&mut self) { + // empty array or the function called without appending any value. + if self.prev_run_end_index == self.current_run_end_index { + return; + } + let run_end_index = self.run_end_index_as_native(); + self.run_ends_builder.append_value(run_end_index); + self.values_builder.append_option(self.current_value); + self.prev_run_end_index = self.current_run_end_index; + } + + // Similar to `append_run_end` but on custom builders. + // Used in `finish_cloned` which is not suppose to mutate `self`. + fn append_run_end_with_builders( + &self, + run_ends_builder: &mut PrimitiveBuilder, + values_builder: &mut PrimitiveBuilder, + ) { + let run_end_index = self.run_end_index_as_native(); + run_ends_builder.append_value(run_end_index); + values_builder.append_option(self.current_value); + } + + fn run_end_index_as_native(&self) -> R::Native { + R::Native::from_usize(self.current_run_end_index) + .unwrap_or_else(|| panic!( + "Cannot convert `current_run_end_index` {} from `usize` to native form of arrow datatype {}", + self.current_run_end_index, + R::DATA_TYPE + )) + } +} + +#[cfg(test)] +mod tests { + use crate::builder::PrimitiveRunBuilder; + use crate::cast::as_primitive_array; + use crate::types::{Int16Type, UInt32Type}; + use crate::{Array, Int16Array, UInt32Array}; + + #[test] + fn test_primitive_ree_array_builder() { + let mut builder = PrimitiveRunBuilder::::new(); + builder.append_value(1234); + builder.append_value(1234); + builder.append_value(1234); + builder.append_null(); + builder.append_value(5678); + builder.append_value(5678); + + let array = builder.finish(); + + assert_eq!(array.null_count(), 0); + assert_eq!(array.len(), 6); + + assert_eq!( + array.run_ends(), + &Int16Array::from(vec![Some(3), Some(4), Some(6)]) + ); + + let av = array.values(); + + assert!(!av.is_null(0)); + assert!(av.is_null(1)); + assert!(!av.is_null(2)); + + // Values are polymorphic and so require a downcast. + let ava: &UInt32Array = as_primitive_array::(av.as_ref()); + + assert_eq!(ava, &UInt32Array::from(vec![Some(1234), None, Some(5678)])); + } +} diff --git a/arrow-array/src/types.rs b/arrow-array/src/types.rs index 7c41a469e30..fc02c0e5a3d 100644 --- a/arrow-array/src/types.rs +++ b/arrow-array/src/types.rs @@ -240,6 +240,31 @@ impl ArrowDictionaryKeyType for UInt32Type {} impl ArrowDictionaryKeyType for UInt64Type {} +mod run { + use super::*; + + pub trait RunEndTypeSealed {} + + impl RunEndTypeSealed for Int16Type {} + + impl RunEndTypeSealed for Int32Type {} + + impl RunEndTypeSealed for Int64Type {} +} + +/// A subtype of primitive type that is used as run-ends index +/// in `RunArray`. +/// See +/// +/// Note: The implementation of this trait is sealed to avoid accidental misuse. +pub trait RunEndIndexType: ArrowPrimitiveType + run::RunEndTypeSealed {} + +impl RunEndIndexType for Int16Type {} + +impl RunEndIndexType for Int32Type {} + +impl RunEndIndexType for Int64Type {} + /// A subtype of primitive type that represents temporal values. pub trait ArrowTemporalType: ArrowPrimitiveType {} diff --git a/arrow-data/src/data.rs b/arrow-data/src/data.rs index 258ee082da1..07bbc664234 100644 --- a/arrow-data/src/data.rs +++ b/arrow-data/src/data.rs @@ -198,9 +198,9 @@ pub(crate) fn new_buffers(data_type: &DataType, capacity: usize) -> [MutableBuff ], _ => unreachable!(), }, - DataType::FixedSizeList(_, _) | DataType::Struct(_) => { - [empty_buffer, MutableBuffer::new(0)] - } + DataType::FixedSizeList(_, _) + | DataType::Struct(_) + | DataType::RunEndEncoded(_, _) => [empty_buffer, MutableBuffer::new(0)], DataType::Decimal128(_, _) | DataType::Decimal256(_, _) => [ MutableBuffer::new(capacity * mem::size_of::()), empty_buffer, @@ -724,6 +724,12 @@ impl ArrayData { DataType::Dictionary(_, data_type) => { vec![Self::new_empty(data_type)] } + DataType::RunEndEncoded(run_ends, values) => { + vec![ + Self::new_empty(run_ends.data_type()), + Self::new_empty(values.data_type()), + ] + } }; // Data was constructed correctly above @@ -853,6 +859,19 @@ impl ArrayData { ))); } } + DataType::RunEndEncoded(run_ends_type, _) => { + if run_ends_type.is_nullable() { + return Err(ArrowError::InvalidArgumentError( + "The nullable should be set to false for the field defining run_ends array.".to_string() + )); + } + if !DataType::is_run_ends_type(run_ends_type.data_type()) { + return Err(ArrowError::InvalidArgumentError(format!( + "RunArray run_ends types must be Int16, Int32 or Int64, but was {}", + run_ends_type.data_type() + ))); + } + } _ => {} }; @@ -998,6 +1017,25 @@ impl ArrayData { } Ok(()) } + DataType::RunEndEncoded(run_ends_field, values_field) => { + self.validate_num_child_data(2)?; + let run_ends_data = + self.get_valid_child_data(0, run_ends_field.data_type())?; + let values_data = + self.get_valid_child_data(1, values_field.data_type())?; + if run_ends_data.len != values_data.len { + return Err(ArrowError::InvalidArgumentError(format!( + "The run_ends array length should be the same as values array length. Run_ends array length is {}, values array length is {}", + run_ends_data.len, values_data.len + ))); + } + if run_ends_data.null_count() > 0 { + return Err(ArrowError::InvalidArgumentError( + "Found null values in run_ends array. The run_ends array should not have null values.".to_string(), + )); + } + Ok(()) + } DataType::Union(fields, _, mode) => { self.validate_num_child_data(fields.len())?; @@ -1286,6 +1324,15 @@ impl ArrayData { _ => unreachable!(), } } + DataType::RunEndEncoded(run_ends, _values) => { + let run_ends_data = self.child_data()[0].clone(); + match run_ends.data_type() { + DataType::Int16 => run_ends_data.check_run_ends::(self.len()), + DataType::Int32 => run_ends_data.check_run_ends::(self.len()), + DataType::Int64 => run_ends_data.check_run_ends::(self.len()), + _ => unreachable!(), + } + } _ => { // No extra validation check required for other types Ok(()) @@ -1446,6 +1493,50 @@ impl ArrayData { }) } + /// Validates that each value in run_ends array is positive and strictly increasing. + fn check_run_ends(&self, array_len: usize) -> Result<(), ArrowError> + where + T: ArrowNativeType + TryInto + num::Num + std::fmt::Display, + { + let values = self.typed_buffer::(0, self.len())?; + let mut prev_value: i64 = 0_i64; + values.iter().enumerate().try_for_each(|(ix, &inp_value)| { + let value: i64 = inp_value.try_into().map_err(|_| { + ArrowError::InvalidArgumentError(format!( + "Value at position {} out of bounds: {} (can not convert to i64)", + ix, inp_value + )) + })?; + if value <= 0_i64 { + return Err(ArrowError::InvalidArgumentError(format!( + "The values in run_ends array should be strictly positive. Found value {} at index {} that does not match the criteria.", + value, + ix + ))); + } + if ix > 0 && value <= prev_value { + return Err(ArrowError::InvalidArgumentError(format!( + "The values in run_ends array should be strictly increasing. Found value {} at index {} with previous value {} that does not match the criteria.", + value, + ix, + prev_value + ))); + } + + prev_value = value; + Ok(()) + })?; + + if prev_value.as_usize() != array_len { + return Err(ArrowError::InvalidArgumentError(format!( + "The length of array does not match the last value in the run_ends array. The last value of run_ends array is {} and length of array is {}.", + prev_value, + array_len + ))); + } + Ok(()) + } + /// Returns true if this `ArrayData` is equal to `other`, using pointer comparisons /// to determine buffer equality. This is cheaper than `PartialEq::eq` but may /// return false when the arrays are logically equal @@ -1542,6 +1633,7 @@ pub fn layout(data_type: &DataType) -> DataTypeLayout { DataType::FixedSizeList(_, _) => DataTypeLayout::new_empty(), // all in child data DataType::LargeList(_) => DataTypeLayout::new_fixed_width(size_of::()), DataType::Struct(_) => DataTypeLayout::new_empty(), // all in child data, + DataType::RunEndEncoded(_, _) => DataTypeLayout::new_empty(), // all in child data, DataType::Union(_, _, mode) => { let type_ids = BufferSpec::FixedWidth { byte_width: size_of::(), diff --git a/arrow-data/src/equal/mod.rs b/arrow-data/src/equal/mod.rs index 85c595cfed1..aff61e3d37e 100644 --- a/arrow-data/src/equal/mod.rs +++ b/arrow-data/src/equal/mod.rs @@ -137,6 +137,7 @@ fn equal_values( }, DataType::Float16 => primitive_equal::(lhs, rhs, lhs_start, rhs_start, len), DataType::Map(_, _) => list_equal::(lhs, rhs, lhs_start, rhs_start, len), + DataType::RunEndEncoded(_, _) => todo!(), } } diff --git a/arrow-data/src/transform/mod.rs b/arrow-data/src/transform/mod.rs index 6a8c89d25a2..2a24b1cc266 100644 --- a/arrow-data/src/transform/mod.rs +++ b/arrow-data/src/transform/mod.rs @@ -230,6 +230,7 @@ fn build_extend(array: &ArrayData) -> Extend { UnionMode::Sparse => union::build_extend_sparse(array), UnionMode::Dense => union::build_extend_dense(array), }, + DataType::RunEndEncoded(_, _) => todo!(), } } @@ -281,6 +282,7 @@ fn build_extend_nulls(data_type: &DataType) -> ExtendNulls { UnionMode::Sparse => union::extend_nulls_sparse, UnionMode::Dense => union::extend_nulls_dense, }, + DataType::RunEndEncoded(_, _) => todo!(), }) } @@ -473,6 +475,20 @@ impl<'a> MutableArrayData<'a> { }) .collect::>(), }, + DataType::RunEndEncoded(_, _) => { + let run_ends_child = arrays + .iter() + .map(|array| &array.child_data()[0]) + .collect::>(); + let value_child = arrays + .iter() + .map(|array| &array.child_data()[1]) + .collect::>(); + vec![ + MutableArrayData::new(run_ends_child, false, array_capacity), + MutableArrayData::new(value_child, use_nulls, array_capacity), + ] + } DataType::FixedSizeList(_, _) => { let childs = arrays .iter() diff --git a/arrow-integration-test/src/datatype.rs b/arrow-integration-test/src/datatype.rs index dd0b95b0a83..c2e326b4f2f 100644 --- a/arrow-integration-test/src/datatype.rs +++ b/arrow-integration-test/src/datatype.rs @@ -357,6 +357,7 @@ pub fn data_type_to_json(data_type: &DataType) -> serde_json::Value { DataType::Map(_, keys_sorted) => { json!({"name": "map", "keysSorted": keys_sorted}) } + DataType::RunEndEncoded(_, _) => todo!(), } } diff --git a/arrow-ipc/src/convert.rs b/arrow-ipc/src/convert.rs index a60a19b866c..305bb943cbb 100644 --- a/arrow-ipc/src/convert.rs +++ b/arrow-ipc/src/convert.rs @@ -711,6 +711,7 @@ pub(crate) fn get_fb_field_type<'a>( children: Some(fbb.create_vector(&children[..])), } } + RunEndEncoded(_, _) => todo!(), Map(map_field, keys_sorted) => { let child = build_field(fbb, map_field); let mut field_type = crate::MapBuilder::new(fbb); diff --git a/arrow-schema/src/datatype.rs b/arrow-schema/src/datatype.rs index da1c20ddbd3..1e5c1321c95 100644 --- a/arrow-schema/src/datatype.rs +++ b/arrow-schema/src/datatype.rs @@ -242,6 +242,18 @@ pub enum DataType { /// child fields may be respectively "entries", "key", and "value", but this is /// not enforced. Map(Box, bool), + /// A run-end encoding (REE) is a variation of run-length encoding (RLE). These + /// encodings are well-suited for representing data containing sequences of the + /// same value, called runs. Each run is represented as a value and an integer giving + /// the index in the array where the run ends. + /// + /// A run-end encoded array has no buffers by itself, but has two child arrays. The + /// first child array, called the run ends array, holds either 16, 32, or 64-bit + /// signed integers. The actual values of each run are held in the second child array. + /// + /// These child arrays are prescribed the standard names of "run_ends" and "values" + /// respectively. + RunEndEncoded(Box, Box), } /// An absolute length of time in seconds, milliseconds, microseconds or nanoseconds. @@ -346,6 +358,13 @@ impl DataType { ) } + /// Returns true if this type is valid for run-ends array in RunArray + #[inline] + pub fn is_run_ends_type(&self) -> bool { + use DataType::*; + matches!(self, Int16 | Int32 | Int64) + } + /// Returns true if this type is nested (List, FixedSizeList, LargeList, Struct, Union, /// or Map), or a dictionary of a nested type pub fn is_nested(&self) -> bool { @@ -438,6 +457,10 @@ impl DataType { + (std::mem::size_of::() * fields.capacity()) } DataType::Dictionary(dt1, dt2) => dt1.size() + dt2.size(), + DataType::RunEndEncoded(run_ends, values) => { + run_ends.size() - std::mem::size_of_val(run_ends) + values.size() + - std::mem::size_of_val(values) + } } } } diff --git a/arrow-schema/src/error.rs b/arrow-schema/src/error.rs index ea60572b3d4..6213af8bcf1 100644 --- a/arrow-schema/src/error.rs +++ b/arrow-schema/src/error.rs @@ -41,6 +41,7 @@ pub enum ArrowError { /// Error during import or export to/from the C Data Interface CDataInterface(String), DictionaryKeyOverflowError, + RunEndIndexOverflowError, } impl ArrowError { @@ -96,6 +97,9 @@ impl Display for ArrowError { ArrowError::DictionaryKeyOverflowError => { write!(f, "Dictionary key bigger than the key type") } + ArrowError::RunEndIndexOverflowError => { + write!(f, "Run end encoded array index overflow error") + } } } } diff --git a/arrow-schema/src/field.rs b/arrow-schema/src/field.rs index a3275dcb335..dc3ab3d6237 100644 --- a/arrow-schema/src/field.rs +++ b/arrow-schema/src/field.rs @@ -410,6 +410,7 @@ impl Field { | DataType::List(_) | DataType::Map(_, _) | DataType::Dictionary(_, _) + | DataType::RunEndEncoded(_, _) | DataType::FixedSizeList(_, _) | DataType::FixedSizeBinary(_) | DataType::Utf8 diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 31198159371..c459d40d73b 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -360,7 +360,7 @@ fn write_leaves( ArrowDataType::Float16 => Err(ParquetError::ArrowError( "Float16 arrays not supported".to_string(), )), - ArrowDataType::FixedSizeList(_, _) | ArrowDataType::Union(_, _, _) => { + ArrowDataType::FixedSizeList(_, _) | ArrowDataType::Union(_, _, _) | ArrowDataType::RunEndEncoded(_, _) => { Err(ParquetError::NYI( format!( "Attempting to write an Arrow type {:?} to parquet that is not yet implemented", diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs index 2ca4b7ef8a7..d81d6a69bbb 100644 --- a/parquet/src/arrow/schema/mod.rs +++ b/parquet/src/arrow/schema/mod.rs @@ -507,6 +507,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result { let dict_field = Field::new(name, *value.clone(), field.is_nullable()); arrow_to_parquet_type(&dict_field) } + DataType::RunEndEncoded(_, _) => Err(arrow_err!("Converting RunEndEncodedType to parquet not supported",)) } }