diff --git a/arrow/Cargo.toml b/arrow/Cargo.toml index cedd48e4d31..1b2bb6fd775 100644 --- a/arrow/Cargo.toml +++ b/arrow/Cargo.toml @@ -233,3 +233,8 @@ harness = false [[bench]] name = "decimal_validate" harness = false + +[[bench]] +name = "row_format" +harness = false +required-features = ["test_utils"] diff --git a/arrow/benches/row_format.rs b/arrow/benches/row_format.rs new file mode 100644 index 00000000000..2802aa6ece0 --- /dev/null +++ b/arrow/benches/row_format.rs @@ -0,0 +1,114 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#[macro_use] +extern crate criterion; +extern crate core; + +use arrow::array::ArrayRef; +use arrow::datatypes::{DataType, Int64Type, UInt64Type}; +use arrow::row::{RowConverter, SortField}; +use arrow::util::bench_util::{create_primitive_array, create_string_array_with_len}; +use criterion::{black_box, Criterion}; +use std::sync::Arc; + +fn row_bench(c: &mut Criterion) { + let cols = vec![Arc::new(create_primitive_array::(4096, 0.)) as ArrayRef]; + + c.bench_function("row_batch 4096 u64(0)", |b| { + b.iter(|| { + let mut converter = RowConverter::new(vec![SortField::new(DataType::UInt64)]); + black_box(converter.convert_columns(&cols)) + }); + }); + + let cols = vec![Arc::new(create_primitive_array::(4096, 0.)) as ArrayRef]; + + c.bench_function("row_batch 4096 i64(0)", |b| { + b.iter(|| { + let mut converter = RowConverter::new(vec![SortField::new(DataType::Int64)]); + black_box(converter.convert_columns(&cols)) + }); + }); + + let cols = + vec![Arc::new(create_string_array_with_len::(4096, 0., 10)) as ArrayRef]; + + c.bench_function("row_batch 4096 string(10, 0)", |b| { + b.iter(|| { + let mut converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]); + black_box(converter.convert_columns(&cols)) + }); + }); + + let cols = + vec![Arc::new(create_string_array_with_len::(4096, 0., 30)) as ArrayRef]; + + c.bench_function("row_batch 4096 string(30, 0)", |b| { + b.iter(|| { + let mut converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]); + black_box(converter.convert_columns(&cols)) + }); + }); + + let cols = + vec![Arc::new(create_string_array_with_len::(4096, 0., 100)) as ArrayRef]; + + c.bench_function("row_batch 4096 string(100, 0)", |b| { + b.iter(|| { + let mut converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]); + black_box(converter.convert_columns(&cols)) + }); + }); + + let cols = + vec![Arc::new(create_string_array_with_len::(4096, 0.5, 100)) as ArrayRef]; + + c.bench_function("row_batch 4096 string(100, 0.5)", |b| { + b.iter(|| { + let mut converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]); + black_box(converter.convert_columns(&cols)) + }); + }); + + let cols = [ + Arc::new(create_string_array_with_len::(4096, 0.5, 20)) as ArrayRef, + Arc::new(create_string_array_with_len::(4096, 0., 30)) as ArrayRef, + Arc::new(create_string_array_with_len::(4096, 0., 100)) as ArrayRef, + Arc::new(create_primitive_array::(4096, 0.)) as ArrayRef, + ]; + + let fields = [ + SortField::new(DataType::Utf8), + SortField::new(DataType::Utf8), + SortField::new(DataType::Utf8), + SortField::new(DataType::Int64), + ]; + + c.bench_function( + "row_batch 4096 string(20, 0.5), string(30, 0), string(100, 0), i64(0)", + |b| { + b.iter(|| { + let mut converter = RowConverter::new(fields.to_vec()); + black_box(converter.convert_columns(&cols)) + }); + }, + ); +} + +criterion_group!(benches, row_bench); +criterion_main!(benches); diff --git a/arrow/src/compute/kernels/sort.rs b/arrow/src/compute/kernels/sort.rs index 7a2d47786af..e4eb3527906 100644 --- a/arrow/src/compute/kernels/sort.rs +++ b/arrow/src/compute/kernels/sort.rs @@ -1071,13 +1071,13 @@ type LexicographicalCompareItem<'a> = ( /// A lexicographical comparator that wraps given array data (columns) and can lexicographically compare data /// at given two indices. The lifetime is the same at the data wrapped. -pub(super) struct LexicographicalComparator<'a> { +pub(crate) struct LexicographicalComparator<'a> { compare_items: Vec>, } impl LexicographicalComparator<'_> { /// lexicographically compare values at the wrapped columns with given indices. - pub(super) fn compare<'a, 'b>( + pub(crate) fn compare<'a, 'b>( &'a self, a_idx: &'b usize, b_idx: &'b usize, @@ -1121,7 +1121,7 @@ impl LexicographicalComparator<'_> { /// Create a new lex comparator that will wrap the given sort columns and give comparison /// results with two indices. - pub(super) fn try_new( + pub(crate) fn try_new( columns: &[SortColumn], ) -> Result> { let compare_items = columns diff --git a/arrow/src/lib.rs b/arrow/src/lib.rs index d1fb0cae0da..87a4799e3e2 100644 --- a/arrow/src/lib.rs +++ b/arrow/src/lib.rs @@ -269,6 +269,7 @@ pub mod json; #[cfg(feature = "pyarrow")] pub mod pyarrow; pub mod record_batch; +pub mod row; pub mod temporal_conversions; pub mod tensor; pub mod util; diff --git a/arrow/src/row/fixed.rs b/arrow/src/row/fixed.rs new file mode 100644 index 00000000000..78108274241 --- /dev/null +++ b/arrow/src/row/fixed.rs @@ -0,0 +1,160 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::array::PrimitiveArray; +use crate::compute::SortOptions; +use crate::datatypes::ArrowPrimitiveType; +use crate::row::Rows; +use crate::util::decimal::{Decimal128, Decimal256}; +use half::f16; + +/// Encodes a value of a particular fixed width type into bytes according to the rules +/// described on [`super::RowConverter`] +pub trait FixedLengthEncoding: Copy { + const ENCODED_LEN: usize = 1 + N; + + fn encode(self) -> [u8; N]; +} + +impl FixedLengthEncoding<1> for bool { + fn encode(self) -> [u8; 1] { + [self as u8] + } +} + +macro_rules! encode_signed { + ($n:expr, $t:ty) => { + impl FixedLengthEncoding<$n> for $t { + fn encode(self) -> [u8; $n] { + let mut b = self.to_be_bytes(); + // Toggle top "sign" bit to ensure consistent sort order + b[0] ^= 0x80; + b + } + } + }; +} + +encode_signed!(1, i8); +encode_signed!(2, i16); +encode_signed!(4, i32); +encode_signed!(8, i64); +encode_signed!(16, i128); + +macro_rules! encode_unsigned { + ($n:expr, $t:ty) => { + impl FixedLengthEncoding<$n> for $t { + fn encode(self) -> [u8; $n] { + self.to_be_bytes() + } + } + }; +} + +encode_unsigned!(1, u8); +encode_unsigned!(2, u16); +encode_unsigned!(4, u32); +encode_unsigned!(8, u64); + +impl FixedLengthEncoding<2> for f16 { + fn encode(self) -> [u8; 2] { + // https://github.com/rust-lang/rust/blob/9c20b2a8cc7588decb6de25ac6a7912dcef24d65/library/core/src/num/f32.rs#L1176-L1260 + let s = self.to_bits() as i16; + let val = s ^ (((s >> 15) as u16) >> 1) as i16; + val.encode() + } +} + +impl FixedLengthEncoding<4> for f32 { + fn encode(self) -> [u8; 4] { + // https://github.com/rust-lang/rust/blob/9c20b2a8cc7588decb6de25ac6a7912dcef24d65/library/core/src/num/f32.rs#L1176-L1260 + let s = self.to_bits() as i32; + let val = s ^ (((s >> 31) as u32) >> 1) as i32; + val.encode() + } +} + +impl FixedLengthEncoding<8> for f64 { + fn encode(self) -> [u8; 8] { + // https://github.com/rust-lang/rust/blob/9c20b2a8cc7588decb6de25ac6a7912dcef24d65/library/core/src/num/f32.rs#L1176-L1260 + let s = self.to_bits() as i64; + let val = s ^ (((s >> 63) as u64) >> 1) as i64; + val.encode() + } +} + +impl FixedLengthEncoding<16> for Decimal128 { + fn encode(self) -> [u8; 16] { + let mut val = *self.raw_value(); + // Convert to big endian representation + val.reverse(); + // Toggle top "sign" bit to ensure consistent sort order + val[0] ^= 0x80; + val + } +} + +impl FixedLengthEncoding<32> for Decimal256 { + fn encode(self) -> [u8; 32] { + let mut val = *self.raw_value(); + // Convert to big endian representation + val.reverse(); + // Toggle top "sign" bit to ensure consistent sort order + val[0] ^= 0x80; + val + } +} + +/// Returns the total encoded length (including null byte) for a value of type `T::Native` +pub const fn encoded_len(_col: &PrimitiveArray) -> usize +where + T: ArrowPrimitiveType, + T::Native: FixedLengthEncoding, +{ + T::Native::ENCODED_LEN +} + +/// Fixed width types are encoded as +/// +/// - 1 byte `0` if null or `1` if valid +/// - bytes of [`FixedLengthEncoding`] +pub fn encode< + const N: usize, + T: FixedLengthEncoding, + I: IntoIterator>, +>( + out: &mut Rows, + i: I, + opts: SortOptions, +) { + for (offset, maybe_val) in out.offsets.iter_mut().skip(1).zip(i) { + let end_offset = *offset + N + 1; + if let Some(val) = maybe_val { + let to_write = &mut out.buffer[*offset..end_offset]; + to_write[0] = 1; + let mut encoded = val.encode(); + if opts.descending { + // Flip bits to reverse order + encoded.iter_mut().for_each(|v| *v = !*v) + } + to_write[1..].copy_from_slice(&encoded) + } else if !opts.nulls_first { + out.buffer[*offset] = 0xFF; + } + *offset = end_offset; + } +} diff --git a/arrow/src/row/interner.rs b/arrow/src/row/interner.rs new file mode 100644 index 00000000000..77edb97e8d1 --- /dev/null +++ b/arrow/src/row/interner.rs @@ -0,0 +1,451 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use hashbrown::hash_map::RawEntryMut; +use hashbrown::HashMap; +use std::cmp::Ordering; +use std::num::NonZeroU32; +use std::ops::Index; + +/// An interned value +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub struct Interned(NonZeroU32); // We use NonZeroU32 so that `Option` is 32 bits + +/// A byte array interner that generates normalized keys that are sorted with respect +/// to the interned values, e.g. `inter(a) < intern(b) => a < b` +#[derive(Debug, Default)] +pub struct OrderPreservingInterner { + /// Provides a lookup from [`Interned`] to the normalized key + keys: InternBuffer, + /// Provides a lookup from [`Interned`] to the normalized value + values: InternBuffer, + /// Key allocation data structure + bucket: Box, + + // A hash table used to perform faster re-keying, and detect duplicates + hasher: ahash::RandomState, + lookup: HashMap, +} + +impl OrderPreservingInterner { + /// Interns an iterator of values returning a list of [`Interned`] which can be + /// used with [`Self::normalized_key`] to retrieve the normalized keys with a + /// lifetime not tied to the mutable borrow passed to this method + pub fn intern(&mut self, input: I) -> Vec> + where + I: IntoIterator>, + V: AsRef<[u8]>, + { + let iter = input.into_iter(); + let capacity = iter.size_hint().0; + let mut out = Vec::with_capacity(capacity); + + // (index in output, hash value, value) + let mut to_intern: Vec<(usize, u64, V)> = Vec::with_capacity(capacity); + let mut to_intern_len = 0; + + for (idx, item) in iter.enumerate() { + let value: V = match item { + Some(value) => value, + None => { + out.push(None); + continue; + } + }; + + let v = value.as_ref(); + let hash = self.hasher.hash_one(v); + let entry = self + .lookup + .raw_entry_mut() + .from_hash(hash, |a| &self.values[*a] == v); + + match entry { + RawEntryMut::Occupied(o) => out.push(Some(*o.key())), + RawEntryMut::Vacant(_) => { + // Push placeholder + out.push(None); + to_intern_len += v.len(); + to_intern.push((idx, hash, value)); + } + }; + } + + to_intern.sort_unstable_by(|(_, _, a), (_, _, b)| a.as_ref().cmp(b.as_ref())); + + self.keys.offsets.reserve(to_intern.len()); + self.keys.values.reserve(to_intern.len()); // Approximation + self.values.offsets.reserve(to_intern.len()); + self.values.values.reserve(to_intern_len); + + for (idx, hash, value) in to_intern { + let val = value.as_ref(); + + let entry = self + .lookup + .raw_entry_mut() + .from_hash(hash, |a| &self.values[*a] == val); + + match entry { + RawEntryMut::Occupied(o) => { + out[idx] = Some(*o.key()); + } + RawEntryMut::Vacant(v) => { + let val = value.as_ref(); + self.bucket + .insert(&mut self.values, val, &mut self.keys.values); + self.keys.values.push(0); + let interned = self.keys.append(); + + let hasher = &mut self.hasher; + let values = &self.values; + v.insert_with_hasher(hash, interned, (), |key| { + hasher.hash_one(&values[*key]) + }); + out[idx] = Some(interned); + } + } + } + + out + } + + /// Returns a null-terminated byte array that can be compared against other normalized_key + /// returned by this instance, to establish ordering of the interned values + pub fn normalized_key(&self, key: Interned) -> &[u8] { + &self.keys[key] + } +} + +/// A buffer of `[u8]` indexed by `[Interned]` +#[derive(Debug)] +struct InternBuffer { + /// Raw values + values: Vec, + /// The ith value is `&values[offsets[i]..offsets[i+1]]` + offsets: Vec, +} + +impl Default for InternBuffer { + fn default() -> Self { + Self { + values: Default::default(), + offsets: vec![0], + } + } +} + +impl InternBuffer { + /// Insert `data` returning the corresponding [`Interned`] + fn insert(&mut self, data: &[u8]) -> Interned { + self.values.extend_from_slice(data); + self.append() + } + + /// Appends the next value based on data written to `self.values` + /// returning the corresponding [`Interned`] + fn append(&mut self) -> Interned { + let idx: u32 = self.offsets.len().try_into().unwrap(); + let key = Interned(NonZeroU32::new(idx).unwrap()); + self.offsets.push(self.values.len()); + key + } +} + +impl Index for InternBuffer { + type Output = [u8]; + + fn index(&self, key: Interned) -> &Self::Output { + let index = key.0.get() as usize; + let end = self.offsets[index]; + let start = self.offsets[index - 1]; + // SAFETY: + // self.values is never reduced in size and values appended + // to self.offsets are always less than self.values at the time + unsafe { self.values.get_unchecked(start..end) } + } +} + +/// A slot corresponds to a single byte-value in the generated normalized key +/// +/// It may contain a value, if not the first slot, and may contain a child [`Bucket`] representing +/// the next byte in the generated normalized key +#[derive(Debug, Default, Clone)] +struct Slot { + value: Option, + /// Child values less than `self.value` if any + child: Option>, +} + +/// Bucket is the root of the data-structure used to allocate normalized keys +/// +/// In particular it needs to generate keys that +/// +/// * Contain no `0` bytes other than the null terminator +/// * Compare lexicographically in the same manner as the encoded `data` +/// +/// The data structure consists of 255 slots, each of which can store a value. +/// Additionally each slot may contain a child bucket, containing values smaller +/// than the value within the slot +/// +/// # Allocation Strategy +/// +/// To find the insertion point within a Bucket we perform a binary search of the slots, but +/// capping the search range at 4. Visualizing this as a search tree, the root would have 64 +/// children, with subsequent non-leaf nodes each containing two children. +/// +/// The insertion point is the first empty slot we encounter, otherwise it is the first slot +/// that contains a value greater than the value being inserted +/// +/// For example, initially all slots are empty +/// +/// ```ignore +/// 0: +/// 1: +/// . +/// . +/// 254: +/// ``` +/// +/// Insert `1000` +/// +/// ```ignore +/// 0: +/// 1: +/// 2: +/// 3: 1000 <- 1. slot is empty, insert here +/// 4: +/// . +/// . +/// 254: +/// ``` +/// +/// Insert `500` +/// +/// ```ignore +/// 0: +/// 1: 500 <- 2. slot is empty, insert here +/// 2: +/// 3: 1000 <- 1. compare against slot value +/// 4. +/// . +/// . +/// 254: +/// ``` +/// +/// Insert `600` +/// +/// ```ignore +/// 0: +/// 1: 500 <- 2. compare against slot value +/// 2: 600 <- 3. slot is empty, insert here +/// 3: 1000 <- 1. compare against slot value +/// 4. +/// . +/// . +/// 254: +/// ``` +/// +/// Insert `400` +/// +/// ```ignore +/// 0: 400 <- 3. slot is empty, insert here +/// 1: 500 <- 2. compare against slot value +/// 2: 600 +/// 3: 1000 <- 1. compare against slot value +/// 4. +/// . +/// . +/// 254: +/// ``` +/// +/// Insert `700` +/// +/// ```ignore +/// 0: 400 +/// 1: 500 <- 2. compare against slot value +/// 2: 600 <- 3. slot is occupied and end of search +/// 3: 1000 <- 1. compare against slot value +/// 4. +/// . +/// . +/// 254: +/// ``` +/// +/// In this case we reach the end of our search and need to insert a value between +/// slots 2 and 3. To do this we create a new bucket under slot 3, and repeat +/// the process for that bucket. +/// +/// The final key will consists of the slot indexes visited incremented by 1, +/// with the final value incremented by 2, followed by a null terminator. +/// +/// So in the above example we would have +/// +/// ```ignore +/// 400: &[2, 0] +/// 500: &[3, 0] +/// 600: &[4, 0] +/// 700: &[4, 5, 0] +/// 1000: &[5, 0] +/// ``` +/// +#[derive(Debug, Clone)] +struct Bucket { + slots: Box<[Slot]>, +} + +impl Default for Bucket { + fn default() -> Self { + let slots = (0..255).map(|_| Slot::default()).collect::>().into(); + Self { slots } + } +} + +impl Bucket { + /// Perform a skewed binary search to find the first slot that is empty or less + /// + /// Returns `Ok(idx)` if an exact match is found, otherwise returns `Err(idx)` + /// containing the slot index to insert at + fn insert_pos(&self, values_buf: &InternBuffer, data: &[u8]) -> Result { + let mut size = self.slots.len() - 1; + let mut left = 0; + let mut right = size; + while left < right { + // Skew binary search to leave gaps of at most 3 elements + let mid = left + (size / 2).min(3); + + let slot = &self.slots[mid]; + let val = match slot.value { + Some(val) => val, + None => return Err(mid), + }; + + let cmp = values_buf[val].cmp(data); + if cmp == Ordering::Less { + left = mid + 1; + } else if cmp == Ordering::Greater { + right = mid; + } else { + return Ok(mid); + } + + size = right - left; + } + Err(left) + } + + /// Insert `data` into this bucket or one of its children, appending the + /// normalized key to `out` as it is constructed + /// + /// # Panics + /// + /// Panics if the value already exists + fn insert(&mut self, values_buf: &mut InternBuffer, data: &[u8], out: &mut Vec) { + match self.insert_pos(values_buf, data) { + Ok(_) => unreachable!("value already exists"), + Err(idx) => { + let slot = &mut self.slots[idx]; + // Cannot insert a value into slot 254 as would overflow byte, but also + // would prevent inserting any larger values, as the child bucket can + // only contain values less than the slot + if idx != 254 && slot.value.is_none() { + out.push(idx as u8 + 2); + slot.value = Some(values_buf.insert(data)) + } else { + out.push(idx as u8 + 1); + slot.child + .get_or_insert_with(Default::default) + .insert(values_buf, data, out); + } + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use rand::prelude::*; + + // Clippy isn't smart enough to understand dropping mutability + #[allow(clippy::needless_collect)] + fn test_intern_values(values: &[u64]) { + let mut interner = OrderPreservingInterner::default(); + + // Intern a single value at a time to check ordering + let interned: Vec<_> = values + .iter() + .flat_map(|v| interner.intern([Some(&v.to_be_bytes())])) + .map(Option::unwrap) + .collect(); + + let interned: Vec<_> = interned + .into_iter() + .map(|x| interner.normalized_key(x)) + .collect(); + + for (i, a) in interned.iter().enumerate() { + for (j, b) in interned.iter().enumerate() { + let interned_cmp = a.cmp(b); + let values_cmp = values[i].cmp(&values[j]); + assert_eq!( + interned_cmp, values_cmp, + "({:?} vs {:?}) vs ({} vs {})", + a, b, values[i], values[j] + ) + } + } + } + + #[test] + #[cfg_attr(miri, ignore)] + fn test_interner() { + test_intern_values(&[8, 6, 5, 7]); + + let mut values: Vec<_> = (0_u64..2000).collect(); + test_intern_values(&values); + + let mut rng = thread_rng(); + values.shuffle(&mut rng); + test_intern_values(&values); + } + + #[test] + fn test_intern_duplicates() { + // Unsorted with duplicates + let values = vec![0_u8, 1, 8, 4, 1, 0]; + let mut interner = OrderPreservingInterner::default(); + + let interned = interner.intern(values.iter().map(std::slice::from_ref).map(Some)); + let interned: Vec<_> = interned.into_iter().map(Option::unwrap).collect(); + + assert_eq!(interned[0], interned[5]); + assert_eq!(interned[1], interned[4]); + assert!( + interner.normalized_key(interned[0]) < interner.normalized_key(interned[1]) + ); + assert!( + interner.normalized_key(interned[1]) < interner.normalized_key(interned[2]) + ); + assert!( + interner.normalized_key(interned[1]) < interner.normalized_key(interned[3]) + ); + assert!( + interner.normalized_key(interned[3]) < interner.normalized_key(interned[2]) + ); + } +} diff --git a/arrow/src/row/mod.rs b/arrow/src/row/mod.rs new file mode 100644 index 00000000000..88c8a916663 --- /dev/null +++ b/arrow/src/row/mod.rs @@ -0,0 +1,893 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! A comparable row-oriented representation of a collection of [`Array`] + +use crate::array::{ + as_boolean_array, as_generic_binary_array, as_largestring_array, as_string_array, + Array, ArrayRef, Decimal128Array, Decimal256Array, +}; +use crate::compute::SortOptions; +use crate::datatypes::*; +use crate::error::{ArrowError, Result}; +use crate::row::interner::{Interned, OrderPreservingInterner}; +use crate::util::decimal::{Decimal128, Decimal256}; +use crate::{downcast_dictionary_array, downcast_primitive_array}; + +mod fixed; +mod interner; +mod variable; + +/// Converts [`ArrayRef`] columns into a row-oriented format that are [normalized for sorting]. +/// +/// In particular, a byte-wise comparison of the rows, e.g. [`memcmp`], is sufficient +/// to establish the ordering of two rows, allowing for extremely fast comparisons, +/// and permitting the use of [non-comparison sorts] such as [radix sort] +/// +/// Comparing [`Rows`] generated by different [`RowConverter`] is not guaranteed to +/// yield a meaningful ordering +/// +/// # Format +/// +/// The encoding of the row format should not be considered stable, but is documented here +/// for reference. +/// +/// ## Unsigned Integer Encoding +/// +/// A null integer is encoded as a `0_u8`, followed by a zero-ed number of bytes corresponding +/// to the integer's length +/// +/// A valid integer is encoded as `1_u8`, followed by the big-endian representation of the +/// integer +/// +/// ## Signed Integer Encoding +/// +/// Signed integers have their most significant sign bit flipped, and are then encoded in the +/// same manner as an unsigned integer +/// +/// ## Float Encoding +/// +/// Floats are converted from IEEE 754 representation to a signed integer representation +/// by flipping all bar the sign bit if they are negative. +/// +/// They are then encoded in the same manner as a signed integer +/// +/// ## Variable Length Bytes Encoding +/// +/// A null is encoded as a `0_u8` +/// +/// An empty byte array is encoded as `1_u8` +/// +/// A non-null, non-empty byte array is encoded as `2_u8` followed by the byte array +/// encoded using a block based scheme described below. +/// +/// The byte array is broken up into 32-byte blocks, each block is written in turn +/// to the output, followed by `0xFF_u8`. The final block is padded to 32-bytes +/// with `0_u8` and written to the output, followed by the un-padded length in bytes +/// of this final block as a `u8` +/// +/// This is loosely inspired by [COBS] encoding, and chosen over more traditional +/// [byte stuffing] as it is more amenable to vectorisation, in particular AVX-256. +/// +/// ## Dictionary Encoding +/// +/// [`RowConverter`] needs to support converting dictionary encoded arrays with unsorted, and +/// potentially distinct dictionaries. One simple mechanism to avoid this would be to reverse +/// the dictionary encoding, and encode the array values directly, however, this would lose +/// the benefits of dictionary encoding to reduce memory and CPU consumption. +/// +/// As such the [`RowConverter`] maintains an order-preserving dictionary encoding for each +/// dictionary encoded column. As this is a variable-length encoding, new dictionary values +/// can be added whilst preserving the sort order. +/// +/// A null dictionary value is encoded as `0_u8`. +/// +/// A non-null dictionary value is encoded as `1_u8` followed by a null-terminated byte array +/// key determined by the order-preserving dictionary encoding +/// +/// # Ordering +/// +/// ## Float Ordering +/// +/// Floats are totally ordered in accordance to the `totalOrder` predicate as defined +/// in the IEEE 754 (2008 revision) floating point standard. +/// +/// The ordering established by this does not always agree with the +/// [`PartialOrd`] and [`PartialEq`] implementations of `f32`. For example, +/// they consider negative and positive zero equal, while this does not +/// +/// ## Null Ordering +/// +/// The encoding described above will order nulls first, this can be inverted by representing +/// nulls as `0xFF_u8` instead of `0_u8` +/// +/// ## Reverse Column Ordering +/// +/// The order of a given column can be reversed by negating the encoded bytes of non-null values +/// +/// ## Reconstruction +/// +/// Given a schema it would theoretically be possible to reconstruct the columnar data from +/// the row format, however, this is currently not implemented. It is recommended that the row +/// format is instead used to obtain a sorted list of row indices, which can then be used +/// with [`take`](crate::compute::take) to obtain a sorted [`Array`] +/// +/// [non-comparison sorts]:[https://en.wikipedia.org/wiki/Sorting_algorithm#Non-comparison_sorts] +/// [radix sort]:[https://en.wikipedia.org/wiki/Radix_sort] +/// [normalized for sorting]:[https://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.83.1080&rep=rep1&type=pdf] +/// [`memcmp`]:[https://www.man7.org/linux/man-pages/man3/memcmp.3.html] +/// [COBS]:[https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing] +/// [byte stuffing]:[https://en.wikipedia.org/wiki/High-Level_Data_Link_Control#Asynchronous_framing] +#[derive(Debug)] +pub struct RowConverter { + fields: Vec, + /// interning state for column `i`, if column`i` is a dictionary + interners: Vec>>, +} + +/// Configure the data type and sort order for a given column +#[derive(Debug, Clone)] +pub struct SortField { + /// Sort options + options: SortOptions, + /// Data type + data_type: DataType, +} + +impl SortField { + /// Create a new column with the given data type + pub fn new(data_type: DataType) -> Self { + Self::new_with_options(data_type, Default::default()) + } + + /// Create a new column with the given data type and [`SortOptions`] + pub fn new_with_options(data_type: DataType, options: SortOptions) -> Self { + Self { options, data_type } + } +} + +impl RowConverter { + /// Create a new [`RowConverter`] with the provided schema + pub fn new(fields: Vec) -> Self { + let interners = (0..fields.len()).map(|_| None).collect(); + Self { fields, interners } + } + + /// Convert [`ArrayRef`] columns into [`Rows`] + /// + /// See [`Row`] for information on when [`Row`] can be compared + /// + /// # Panics + /// + /// Panics if the schema of `columns` does not match that provided to [`RowConverter::new`] + pub fn convert_columns(&mut self, columns: &[ArrayRef]) -> Result { + if columns.len() != self.fields.len() { + return Err(ArrowError::InvalidArgumentError(format!( + "Incorrect number of arrays provided to RowConverter, expected {} got {}", + self.fields.len(), + columns.len() + ))); + } + + let dictionaries = columns + .iter() + .zip(&mut self.interners) + .zip(&self.fields) + .map(|((column, interner), field)| { + if !column.data_type().equals_datatype(&field.data_type) { + return Err(ArrowError::InvalidArgumentError(format!( + "RowConverter column schema mismatch, expected {} got {}", + field.data_type, + column.data_type() + ))); + } + + let values = downcast_dictionary_array! { + column => column.values(), + _ => return Ok(None) + }; + + let interner = interner.get_or_insert_with(Default::default); + + let mapping: Vec<_> = compute_dictionary_mapping(interner, values)? + .into_iter() + .map(|maybe_interned| { + maybe_interned.map(|interned| interner.normalized_key(interned)) + }) + .collect(); + + Ok(Some(mapping)) + }) + .collect::>>()?; + + let mut rows = new_empty_rows(columns, &dictionaries)?; + + for ((column, field), dictionary) in + columns.iter().zip(&self.fields).zip(dictionaries) + { + // We encode a column at a time to minimise dispatch overheads + encode_column(&mut rows, column, field.options, dictionary.as_deref()) + } + + if cfg!(debug_assertions) { + assert_eq!(*rows.offsets.last().unwrap(), rows.buffer.len()); + rows.offsets + .windows(2) + .for_each(|w| assert!(w[0] < w[1], "offsets should be monotonic")); + } + + Ok(rows) + } +} + +/// A row-oriented representation of arrow data, that is normalized for comparison +/// +/// See [`RowConverter`] +#[derive(Debug)] +pub struct Rows { + /// Underlying row bytes + buffer: Box<[u8]>, + /// Row `i` has data `&buffer[offsets[i]..offsets[i+1]]` + offsets: Box<[usize]>, +} + +impl Rows { + pub fn row(&self, row: usize) -> Row<'_> { + let end = self.offsets[row + 1]; + let start = self.offsets[row]; + Row(&self.buffer[start..end]) + } + + pub fn num_rows(&self) -> usize { + self.offsets.len() - 1 + } +} + +/// A comparable representation of a row +/// +/// Two [`Row`] can be compared if they both belong to [`Rows`] returned by calls to +/// [`RowConverter::convert_columns`] on the same [`RowConverter`] +/// +/// Otherwise any ordering established by comparing the [`Row`] is arbitrary +#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct Row<'a>(&'a [u8]); + +impl<'a> AsRef<[u8]> for Row<'a> { + fn as_ref(&self) -> &[u8] { + self.0 + } +} + +/// Computes the dictionary mapping for the given dictionary values +fn compute_dictionary_mapping( + interner: &mut OrderPreservingInterner, + values: &ArrayRef, +) -> Result>> { + use fixed::FixedLengthEncoding; + Ok(downcast_primitive_array! { + values => interner + .intern(values.iter().map(|x| x.map(|x| x.encode()))), + DataType::Binary => { + let iter = as_generic_binary_array::(values).iter(); + interner.intern(iter) + } + DataType::LargeBinary => { + let iter = as_generic_binary_array::(values).iter(); + interner.intern(iter) + } + DataType::Utf8 => { + let iter = as_string_array(values).iter().map(|x| x.map(|x| x.as_bytes())); + interner.intern(iter) + } + DataType::LargeUtf8 => { + let iter = as_largestring_array(values).iter().map(|x| x.map(|x| x.as_bytes())); + interner.intern(iter) + } + t => return Err(ArrowError::NotYetImplemented(format!("dictionary value {} is not supported", t))), + }) +} + +/// Computes the length of each encoded [`Rows`] and returns an empty [`Rows`] +fn new_empty_rows( + cols: &[ArrayRef], + dictionaries: &[Option>>], +) -> Result { + use fixed::FixedLengthEncoding; + + let num_rows = cols.first().map(|x| x.len()).unwrap_or(0); + let mut lengths = vec![0; num_rows]; + + for (array, dict) in cols.iter().zip(dictionaries) { + downcast_primitive_array! { + array => lengths.iter_mut().for_each(|x| *x += fixed::encoded_len(array)), + DataType::Null => lengths.iter_mut().for_each(|x| *x += 1), + DataType::Boolean => lengths.iter_mut().for_each(|x| *x += bool::ENCODED_LEN), + DataType::Decimal128(_, _) => lengths.iter_mut().for_each(|x| *x += Decimal128::ENCODED_LEN), + DataType::Decimal256(_, _) => lengths.iter_mut().for_each(|x| *x += Decimal256::ENCODED_LEN), + DataType::Binary => as_generic_binary_array::(array) + .iter() + .zip(lengths.iter_mut()) + .for_each(|(slice, length)| *length += variable::encoded_len(slice)), + DataType::LargeBinary => as_generic_binary_array::(array) + .iter() + .zip(lengths.iter_mut()) + .for_each(|(slice, length)| *length += variable::encoded_len(slice)), + DataType::Utf8 => as_string_array(array) + .iter() + .zip(lengths.iter_mut()) + .for_each(|(slice, length)| { + *length += variable::encoded_len(slice.map(|x| x.as_bytes())) + }), + DataType::LargeUtf8 => as_largestring_array(array) + .iter() + .zip(lengths.iter_mut()) + .for_each(|(slice, length)| { + *length += variable::encoded_len(slice.map(|x| x.as_bytes())) + }), + DataType::Dictionary(_, _) => downcast_dictionary_array! { + array => { + let dict = dict.as_ref().unwrap(); + for (v, length) in array.keys().iter().zip(lengths.iter_mut()) { + match v.and_then(|v| dict[v as usize]) { + Some(k) => *length += k.len() + 1, + None => *length += 1, + } + } + } + _ => unreachable!(), + } + t => return Err(ArrowError::NotYetImplemented(format!("not yet implemented: {}", t))) + } + } + + let mut offsets = Vec::with_capacity(num_rows + 1); + offsets.push(0); + + // We initialize the offsets shifted down by one row index. + // + // As the rows are appended to the offsets will be incremented to match + // + // For example, consider the case of 3 rows of length 3, 4, and 6 respectively. + // The offsets would be initialized to `0, 0, 3, 7` + // + // Writing the first row entirely would yield `0, 3, 3, 7` + // The second, `0, 3, 7, 7` + // The third, `0, 3, 7, 13` + // + // This would be the final offsets for reading + // + // In this way offsets tracks the position during writing whilst eventually serving + // as identifying the offsets of the written rows + let mut cur_offset = 0_usize; + for l in lengths { + offsets.push(cur_offset); + cur_offset = cur_offset.checked_add(l).expect("overflow"); + } + + let buffer = vec![0_u8; cur_offset]; + + Ok(Rows { + buffer: buffer.into(), + offsets: offsets.into(), + }) +} + +/// Encodes a column to the provided [`Rows`] incrementing the offsets as it progresses +fn encode_column( + out: &mut Rows, + column: &ArrayRef, + opts: SortOptions, + dictionary: Option<&[Option<&[u8]>]>, +) { + downcast_primitive_array! { + column => fixed::encode(out, column, opts), + DataType::Null => { + fixed::encode(out, std::iter::repeat(None::).take(column.len()), opts) + } + DataType::Boolean => fixed::encode(out, as_boolean_array(column), opts), + DataType::Decimal128(_, _) => fixed::encode( + out, + column.as_any().downcast_ref::().unwrap(), + opts, + ), + DataType::Decimal256(_, _) => fixed::encode( + out, + column.as_any().downcast_ref::().unwrap(), + opts, + ), + DataType::Binary => { + variable::encode(out, as_generic_binary_array::(column).iter(), opts) + } + DataType::LargeBinary => { + variable::encode(out, as_generic_binary_array::(column).iter(), opts) + } + DataType::Utf8 => variable::encode( + out, + as_string_array(column).iter().map(|x| x.map(|x| x.as_bytes())), + opts, + ), + DataType::LargeUtf8 => variable::encode( + out, + as_largestring_array(column) + .iter() + .map(|x| x.map(|x| x.as_bytes())), + opts, + ), + DataType::Dictionary(_, _) => downcast_dictionary_array! { + column => { + let dict = dictionary.unwrap(); + for (offset, k) in out.offsets.iter_mut().skip(1).zip(column.keys()) { + match k.and_then(|k| dict[k as usize]) { + Some(v) => { + let end_offset = *offset + 1 + v.len(); + out.buffer[*offset] = 1; + out.buffer[*offset+1..end_offset].copy_from_slice(v); + if opts.descending { + out.buffer[*offset..end_offset].iter_mut().for_each(|v| *v = !*v) + } + *offset = end_offset; + } + None => { + if !opts.nulls_first { + out.buffer[*offset] = 0xFF; + } + *offset += 1; + } + } + } + }, + _ => unreachable!() + } + t => unimplemented!("not yet implemented: {}", t) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::array::{ + BinaryArray, BooleanArray, DictionaryArray, Float32Array, GenericStringArray, + Int16Array, Int32Array, OffsetSizeTrait, PrimitiveArray, + PrimitiveDictionaryBuilder, StringArray, + }; + use crate::compute::{LexicographicalComparator, SortColumn}; + use crate::util::display::array_value_to_string; + use rand::distributions::uniform::SampleUniform; + use rand::distributions::{Distribution, Standard}; + use rand::{thread_rng, Rng}; + use std::sync::Arc; + + #[test] + fn test_fixed_width() { + let cols = [ + Arc::new(Int16Array::from_iter([ + Some(1), + Some(2), + None, + Some(-5), + Some(2), + Some(2), + Some(0), + ])) as ArrayRef, + Arc::new(Float32Array::from_iter([ + Some(1.3), + Some(2.5), + None, + Some(4.), + Some(0.1), + Some(-4.), + Some(-0.), + ])) as ArrayRef, + ]; + + let mut converter = RowConverter::new(vec![ + SortField::new(DataType::Int16), + SortField::new(DataType::Float32), + ]); + let rows = converter.convert_columns(&cols).unwrap(); + + assert_eq!(rows.offsets.as_ref(), &[0, 8, 16, 24, 32, 40, 48, 56]); + assert_eq!( + rows.buffer.as_ref(), + &[ + 1, 128, 1, // + 1, 191, 166, 102, 102, // + 1, 128, 2, // + 1, 192, 32, 0, 0, // + 0, 0, 0, // + 0, 0, 0, 0, 0, // + 1, 127, 251, // + 1, 192, 128, 0, 0, // + 1, 128, 2, // + 1, 189, 204, 204, 205, // + 1, 128, 2, // + 1, 63, 127, 255, 255, // + 1, 128, 0, // + 1, 127, 255, 255, 255 // + ] + ); + + assert!(rows.row(3) < rows.row(6)); + assert!(rows.row(0) < rows.row(1)); + assert!(rows.row(3) < rows.row(0)); + assert!(rows.row(4) < rows.row(1)); + assert!(rows.row(5) < rows.row(4)) + } + + #[test] + fn test_bool() { + let mut converter = RowConverter::new(vec![SortField::new(DataType::Boolean)]); + + let col = Arc::new(BooleanArray::from_iter([None, Some(false), Some(true)])); + let rows = converter.convert_columns(&[col]).unwrap(); + assert!(rows.row(2) > rows.row(1)); + assert!(rows.row(2) > rows.row(0)); + assert!(rows.row(1) > rows.row(0)); + + let mut converter = RowConverter::new(vec![SortField::new_with_options( + DataType::Boolean, + SortOptions { + descending: true, + nulls_first: false, + }, + )]); + + let col = Arc::new(BooleanArray::from_iter([None, Some(false), Some(true)])); + let rows = converter.convert_columns(&[col]).unwrap(); + assert!(rows.row(2) < rows.row(1)); + assert!(rows.row(2) < rows.row(0)); + assert!(rows.row(1) < rows.row(0)); + } + + #[test] + fn test_variable_width() { + let col = Arc::new(StringArray::from_iter([ + Some("hello"), + Some("he"), + None, + Some("foo"), + Some(""), + ])); + + let mut converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]); + let rows = converter.convert_columns(&[col]).unwrap(); + + assert!(rows.row(1) < rows.row(0)); + assert!(rows.row(2) < rows.row(4)); + assert!(rows.row(3) < rows.row(0)); + assert!(rows.row(3) < rows.row(1)); + + let col = Arc::new(BinaryArray::from_iter([ + None, + Some(vec![0_u8; 0]), + Some(vec![0_u8; 6]), + Some(vec![0_u8; variable::BLOCK_SIZE]), + Some(vec![0_u8; variable::BLOCK_SIZE + 1]), + Some(vec![1_u8; 6]), + Some(vec![1_u8; variable::BLOCK_SIZE]), + Some(vec![1_u8; variable::BLOCK_SIZE + 1]), + Some(vec![0xFF_u8; 6]), + Some(vec![0xFF_u8; variable::BLOCK_SIZE]), + Some(vec![0xFF_u8; variable::BLOCK_SIZE + 1]), + ])) as ArrayRef; + + let mut converter = RowConverter::new(vec![SortField::new(DataType::Binary)]); + let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap(); + + for i in 0..rows.num_rows() { + for j in i + 1..rows.num_rows() { + assert!( + rows.row(i) < rows.row(j), + "{} < {} - {:?} < {:?}", + i, + j, + rows.row(i), + rows.row(j) + ); + } + } + + let mut converter = RowConverter::new(vec![SortField::new_with_options( + DataType::Binary, + SortOptions { + descending: true, + nulls_first: false, + }, + )]); + let rows = converter.convert_columns(&[col]).unwrap(); + + for i in 0..rows.num_rows() { + for j in i + 1..rows.num_rows() { + assert!( + rows.row(i) > rows.row(j), + "{} > {} - {:?} > {:?}", + i, + j, + rows.row(i), + rows.row(j) + ); + } + } + } + + #[test] + fn test_string_dictionary() { + let a = Arc::new(DictionaryArray::::from_iter([ + Some("foo"), + Some("hello"), + Some("he"), + None, + Some("hello"), + Some(""), + Some("hello"), + Some("hello"), + ])) as ArrayRef; + + let mut converter = + RowConverter::new(vec![SortField::new(a.data_type().clone())]); + let rows_a = converter.convert_columns(&[Arc::clone(&a)]).unwrap(); + + assert!(rows_a.row(3) < rows_a.row(5)); + assert!(rows_a.row(2) < rows_a.row(1)); + assert!(rows_a.row(0) < rows_a.row(1)); + assert!(rows_a.row(3) < rows_a.row(0)); + + assert_eq!(rows_a.row(1), rows_a.row(4)); + assert_eq!(rows_a.row(1), rows_a.row(6)); + assert_eq!(rows_a.row(1), rows_a.row(7)); + + let b = Arc::new(DictionaryArray::::from_iter([ + Some("hello"), + None, + Some("cupcakes"), + ])); + + let rows_b = converter.convert_columns(&[b]).unwrap(); + assert_eq!(rows_a.row(1), rows_b.row(0)); + assert_eq!(rows_a.row(3), rows_b.row(1)); + assert!(rows_b.row(2) < rows_a.row(0)); + + let mut converter = RowConverter::new(vec![SortField::new_with_options( + a.data_type().clone(), + SortOptions { + descending: true, + nulls_first: false, + }, + )]); + + let rows_c = converter.convert_columns(&[a]).unwrap(); + assert!(rows_c.row(3) > rows_c.row(5)); + assert!(rows_c.row(2) > rows_c.row(1)); + assert!(rows_c.row(0) > rows_c.row(1)); + assert!(rows_c.row(3) > rows_c.row(0)); + } + + #[test] + fn test_primitive_dictionary() { + let mut builder = PrimitiveDictionaryBuilder::::new(); + builder.append(2).unwrap(); + builder.append(3).unwrap(); + builder.append(0).unwrap(); + builder.append_null(); + builder.append(5).unwrap(); + builder.append(3).unwrap(); + builder.append(-1).unwrap(); + + let a = builder.finish(); + + let mut converter = + RowConverter::new(vec![SortField::new(a.data_type().clone())]); + let rows = converter.convert_columns(&[Arc::new(a)]).unwrap(); + assert!(rows.row(0) < rows.row(1)); + assert!(rows.row(2) < rows.row(0)); + assert!(rows.row(3) < rows.row(2)); + assert!(rows.row(6) < rows.row(2)); + assert!(rows.row(3) < rows.row(6)); + } + + #[test] + fn test_dictionary_nulls() { + let values = + Int32Array::from_iter([Some(1), Some(-1), None, Some(4), None]).into_data(); + let keys = + Int32Array::from_iter([Some(0), Some(0), Some(1), Some(2), Some(4), None]) + .into_data(); + + let data_type = + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Int32)); + let data = keys + .into_builder() + .data_type(data_type.clone()) + .child_data(vec![values]) + .build() + .unwrap(); + + let mut converter = RowConverter::new(vec![SortField::new(data_type)]); + let rows = converter + .convert_columns(&[Arc::new(DictionaryArray::::from(data))]) + .unwrap(); + + assert_eq!(rows.row(0), rows.row(1)); + assert_eq!(rows.row(3), rows.row(4)); + assert_eq!(rows.row(4), rows.row(5)); + assert!(rows.row(3) < rows.row(0)); + } + + fn generate_primitive_array(len: usize, valid_percent: f64) -> PrimitiveArray + where + K: ArrowPrimitiveType, + Standard: Distribution, + { + let mut rng = thread_rng(); + (0..len) + .map(|_| rng.gen_bool(valid_percent).then(|| rng.gen())) + .collect() + } + + fn generate_strings( + len: usize, + valid_percent: f64, + ) -> GenericStringArray { + let mut rng = thread_rng(); + (0..len) + .map(|_| { + rng.gen_bool(valid_percent).then(|| { + let len = rng.gen_range(0..100); + let bytes = (0..len).map(|_| rng.gen_range(0..128)).collect(); + String::from_utf8(bytes).unwrap() + }) + }) + .collect() + } + + fn generate_dictionary( + values: ArrayRef, + len: usize, + valid_percent: f64, + ) -> DictionaryArray + where + K: ArrowDictionaryKeyType, + K::Native: SampleUniform, + { + let mut rng = thread_rng(); + let min_key = K::Native::from_usize(0).unwrap(); + let max_key = K::Native::from_usize(values.len()).unwrap(); + let keys: PrimitiveArray = (0..len) + .map(|_| { + rng.gen_bool(valid_percent) + .then(|| rng.gen_range(min_key..max_key)) + }) + .collect(); + + let data_type = DataType::Dictionary( + Box::new(K::DATA_TYPE), + Box::new(values.data_type().clone()), + ); + + let data = keys + .into_data() + .into_builder() + .data_type(data_type) + .add_child_data(values.data().clone()) + .build() + .unwrap(); + + DictionaryArray::from(data) + } + + fn generate_column(len: usize) -> ArrayRef { + let mut rng = thread_rng(); + match rng.gen_range(0..9) { + 0 => Arc::new(generate_primitive_array::(len, 0.8)), + 1 => Arc::new(generate_primitive_array::(len, 0.8)), + 2 => Arc::new(generate_primitive_array::(len, 0.8)), + 3 => Arc::new(generate_primitive_array::(len, 0.8)), + 4 => Arc::new(generate_primitive_array::(len, 0.8)), + 5 => Arc::new(generate_primitive_array::(len, 0.8)), + 6 => Arc::new(generate_strings::(len, 0.8)), + 7 => Arc::new(generate_dictionary::( + // Cannot test dictionaries containing null values because of #2687 + Arc::new(generate_strings::(rng.gen_range(1..len), 1.0)), + len, + 0.8, + )), + 8 => Arc::new(generate_dictionary::( + // Cannot test dictionaries containing null values because of #2687 + Arc::new(generate_primitive_array::( + rng.gen_range(1..len), + 1.0, + )), + len, + 0.8, + )), + _ => unreachable!(), + } + } + + fn print_row(cols: &[SortColumn], row: usize) -> String { + let t: Vec<_> = cols + .iter() + .map(|x| array_value_to_string(&x.values, row).unwrap()) + .collect(); + t.join(",") + } + + fn print_col_types(cols: &[SortColumn]) -> String { + let t: Vec<_> = cols + .iter() + .map(|x| x.values.data_type().to_string()) + .collect(); + t.join(",") + } + + #[test] + #[cfg_attr(miri, ignore)] + fn fuzz_test() { + for _ in 0..100 { + let mut rng = thread_rng(); + let num_columns = rng.gen_range(1..5); + let len = rng.gen_range(5..100); + let arrays: Vec<_> = (0..num_columns).map(|_| generate_column(len)).collect(); + + let options: Vec<_> = (0..num_columns) + .map(|_| SortOptions { + descending: rng.gen_bool(0.5), + nulls_first: rng.gen_bool(0.5), + }) + .collect(); + + let sort_columns: Vec<_> = options + .iter() + .zip(&arrays) + .map(|(o, c)| SortColumn { + values: Arc::clone(c), + options: Some(*o), + }) + .collect(); + + let comparator = LexicographicalComparator::try_new(&sort_columns).unwrap(); + + let columns = options + .into_iter() + .zip(&arrays) + .map(|(o, a)| SortField::new_with_options(a.data_type().clone(), o)) + .collect(); + + let mut converter = RowConverter::new(columns); + let rows = converter.convert_columns(&arrays).unwrap(); + + for i in 0..len { + for j in 0..len { + let row_i = rows.row(i); + let row_j = rows.row(j); + let row_cmp = row_i.cmp(&row_j); + let lex_cmp = comparator.compare(&i, &j); + assert_eq!( + row_cmp, + lex_cmp, + "({:?} vs {:?}) vs ({:?} vs {:?}) for types {}", + print_row(&sort_columns, i), + print_row(&sort_columns, j), + row_i, + row_j, + print_col_types(&sort_columns) + ); + } + } + } + } +} diff --git a/arrow/src/row/variable.rs b/arrow/src/row/variable.rs new file mode 100644 index 00000000000..2213dad9e78 --- /dev/null +++ b/arrow/src/row/variable.rs @@ -0,0 +1,107 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::compute::SortOptions; +use crate::row::Rows; +use crate::util::bit_util::ceil; + +/// The block size of the variable length encoding +pub const BLOCK_SIZE: usize = 32; + +/// Returns the length of the encoded representation of a byte array, including the null byte +pub fn encoded_len(a: Option<&[u8]>) -> usize { + match a { + Some(a) => 1 + ceil(a.len(), BLOCK_SIZE) * (BLOCK_SIZE + 1), + None => 1, + } +} + +/// Variable length values are encoded as +/// +/// - single `0_u8` if null +/// - single `1_u8` if empty array +/// - `2_u8` if not empty, followed by one or more blocks +/// +/// where a block is encoded as +/// +/// - [`BLOCK_SIZE`] bytes of string data, padded with 0s +/// - `0xFF_u8` if this is not the last block for this string +/// - otherwise the length of the block as a `u8` +pub fn encode<'a, I: Iterator>>( + out: &mut Rows, + i: I, + opts: SortOptions, +) { + for (offset, maybe_val) in out.offsets.iter_mut().skip(1).zip(i) { + match maybe_val { + Some(val) if val.is_empty() => { + out.buffer[*offset] = match opts.descending { + true => !1, + false => 1, + }; + *offset += 1; + } + Some(val) => { + let block_count = ceil(val.len(), BLOCK_SIZE); + let end_offset = *offset + 1 + block_count * (BLOCK_SIZE + 1); + let to_write = &mut out.buffer[*offset..end_offset]; + + // Write `2_u8` to demarcate as non-empty, non-null string + to_write[0] = 2; + + let chunks = val.chunks_exact(BLOCK_SIZE); + let remainder = chunks.remainder(); + for (input, output) in chunks + .clone() + .zip(to_write[1..].chunks_exact_mut(BLOCK_SIZE + 1)) + { + let input: &[u8; BLOCK_SIZE] = input.try_into().unwrap(); + let out_block: &mut [u8; BLOCK_SIZE] = + (&mut output[..BLOCK_SIZE]).try_into().unwrap(); + + *out_block = *input; + + // Indicate that there are further blocks to follow + output[BLOCK_SIZE] = u8::MAX; + } + + if !remainder.is_empty() { + let start_offset = 1 + (block_count - 1) * (BLOCK_SIZE + 1); + to_write[start_offset..start_offset + remainder.len()] + .copy_from_slice(remainder); + *to_write.last_mut().unwrap() = remainder.len() as u8; + } else { + // We must overwrite the continuation marker written by the loop above + *to_write.last_mut().unwrap() = BLOCK_SIZE as u8; + } + + *offset = end_offset; + + if opts.descending { + // Invert bits + to_write.iter_mut().for_each(|v| *v = !*v) + } + } + None => { + if !opts.nulls_first { + out.buffer[*offset] = 0xFF; + } + *offset += 1; + } + } + } +}