diff --git a/Cargo.toml b/Cargo.toml index 44832bdd997..a076210d4b8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,8 @@ hash_hasher = "^2.0.3" simdutf8 = "0.1.3" # faster hashing ahash = { version = "0.7" } +# A Rust port of SwissTable +hashbrown = { version = "0.12", default-features = false, optional = true } # for timezone support chrono-tz = { version = "0.6", optional = true } @@ -207,7 +209,7 @@ compute_merge_sort = ["itertools", "compute_sort"] compute_nullif = ["compute_comparison"] compute_partition = ["compute_sort"] compute_regex_match = ["regex"] -compute_sort = ["compute_take"] +compute_sort = ["compute_take", "hashbrown"] compute_substring = [] compute_take = [] compute_temporal = [] @@ -244,6 +246,9 @@ benchmarks = ["rand"] serde_types = ["serde", "serde_derive"] simd = [] +[build-dependencies] +rustc_version = "0.4.0" + [package.metadata.cargo-all-features] allowlist = ["compute", "compute_sort", "compute_hash", "compute_nullif"] diff --git a/build.rs b/build.rs new file mode 100644 index 00000000000..2f4aca2e626 --- /dev/null +++ b/build.rs @@ -0,0 +1,7 @@ +use rustc_version::{version_meta, Channel}; + +fn main() { + if version_meta().unwrap().channel == Channel::Nightly { + println!("cargo:rustc-cfg=nightly_build"); + } +} diff --git a/src/compute/sort/mod.rs b/src/compute/sort/mod.rs index 6f10f4bdb49..be0f15a1ae2 100644 --- a/src/compute/sort/mod.rs +++ b/src/compute/sort/mod.rs @@ -17,6 +17,7 @@ mod lex_sort; mod primitive; mod utf8; +pub mod row; pub(crate) use lex_sort::build_compare; pub use lex_sort::{lexsort, lexsort_to_indices, lexsort_to_indices_impl, SortColumn}; @@ -290,7 +291,7 @@ pub fn can_sort(data_type: &DataType) -> bool { } /// Options that define how sort kernels should behave -#[derive(Clone, Copy, Debug)] +#[derive(Clone, Copy, Debug, Eq, PartialEq)] pub struct SortOptions { /// Whether to sort in descending order pub descending: bool, diff --git a/src/compute/sort/row/dictionary.rs b/src/compute/sort/row/dictionary.rs new file mode 100644 index 00000000000..e97a71aba7e --- /dev/null +++ b/src/compute/sort/row/dictionary.rs @@ -0,0 +1,120 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::{ + array::{Array, BinaryArray, DictionaryArray, DictionaryKey, PrimitiveArray, Utf8Array}, + compute::sort::SortOptions, + datatypes::PhysicalType, + error::*, + with_match_primitive_without_interval_type, +}; + +use super::{ + fixed::FixedLengthEncoding, + interner::{Interned, OrderPreservingInterner}, + null_sentinel, Rows, +}; + +/// Computes the dictionary mapping for the given dictionary values +pub fn compute_dictionary_mapping( + interner: &mut OrderPreservingInterner, + values: &Box, +) -> Result>> { + Ok(match values.data_type().to_physical_type() { + PhysicalType::Primitive(primitive) => { + with_match_primitive_without_interval_type!(primitive, |$T| { + let values = values + .as_any() + .downcast_ref::>() + .unwrap(); + interner.intern(values.iter().map(|x| x.map(|x| x.encode()))) + }) + } + PhysicalType::Binary => { + let iter = values + .as_any() + .downcast_ref::>() + .unwrap() + .iter(); + interner.intern(iter) + } + PhysicalType::LargeBinary => { + let iter = values + .as_any() + .downcast_ref::>() + .unwrap() + .iter(); + interner.intern(iter) + } + PhysicalType::Utf8 => { + let iter = values + .as_any() + .downcast_ref::>() + .unwrap() + .iter() + .map(|x| x.map(|x| x.as_bytes())); + interner.intern(iter) + } + PhysicalType::LargeUtf8 => { + let iter = values + .as_any() + .downcast_ref::>() + .unwrap() + .iter() + .map(|x| x.map(|x| x.as_bytes())); + interner.intern(iter) + } + t => { + return Err(Error::NotYetImplemented(format!( + "dictionary value {:?} is not supported", + t + ))) + } + }) +} + +/// Dictionary types are encoded as +/// +/// - single `0_u8` if null +/// - the bytes of the corresponding normalized key including the null terminator +pub fn encode_dictionary( + out: &mut Rows, + column: &DictionaryArray, + normalized_keys: &[Option<&[u8]>], + opts: SortOptions, +) { + for (offset, k) in out.offsets.iter_mut().skip(1).zip(column.keys()) { + match k.and_then(|k| normalized_keys[unsafe { k.as_usize() }]) { + Some(normalized_key) => { + let end_offset = *offset + 1 + normalized_key.len(); + out.buffer[*offset] = 1; + out.buffer[*offset + 1..end_offset].copy_from_slice(normalized_key); + // Negate if descending + if opts.descending { + out.buffer[*offset..end_offset] + .iter_mut() + .for_each(|v| *v = !*v) + } + *offset = end_offset; + } + None => { + out.buffer[*offset] = null_sentinel(opts); + *offset += 1; + } + } + } +} diff --git a/src/compute/sort/row/fixed.rs b/src/compute/sort/row/fixed.rs new file mode 100644 index 00000000000..cb7992a676f --- /dev/null +++ b/src/compute/sort/row/fixed.rs @@ -0,0 +1,198 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::{ + array::PrimitiveArray, + compute::sort::SortOptions, + types::{f16, i256, NativeType}, +}; + +use super::{null_sentinel, Rows}; + +pub trait FromSlice { + fn from_slice(slice: &[u8], invert: bool) -> Self; +} + +impl FromSlice for [u8; N] { + #[inline] + fn from_slice(slice: &[u8], invert: bool) -> Self { + let mut t: Self = slice.try_into().unwrap(); + if invert { + t.iter_mut().for_each(|o| *o = !*o); + } + t + } +} + +/// Encodes a value of a particular fixed width type into bytes according to the rules +/// described on [`super::RowConverter`] +pub trait FixedLengthEncoding: Copy { + const ENCODED_LEN: usize = 1 + std::mem::size_of::(); + + type Encoded: Sized + Copy + FromSlice + AsRef<[u8]> + AsMut<[u8]>; + + fn encode(self) -> Self::Encoded; + + fn decode(encoded: Self::Encoded) -> Self; +} + +impl FixedLengthEncoding for bool { + type Encoded = [u8; 1]; + + fn encode(self) -> [u8; 1] { + [self as u8] + } + + fn decode(encoded: Self::Encoded) -> Self { + encoded[0] != 0 + } +} + +macro_rules! encode_signed { + ($n:expr, $t:ty) => { + impl FixedLengthEncoding for $t { + type Encoded = [u8; $n]; + + fn encode(self) -> [u8; $n] { + let mut b = self.to_be_bytes(); + // Toggle top "sign" bit to ensure consistent sort order + b[0] ^= 0x80; + b + } + + fn decode(mut encoded: Self::Encoded) -> Self { + // Toggle top "sign" bit + encoded[0] ^= 0x80; + Self::from_be_bytes(encoded) + } + } + }; +} + +encode_signed!(1, i8); +encode_signed!(2, i16); +encode_signed!(4, i32); +encode_signed!(8, i64); +encode_signed!(16, i128); +encode_signed!(32, i256); + +macro_rules! encode_unsigned { + ($n:expr, $t:ty) => { + impl FixedLengthEncoding for $t { + type Encoded = [u8; $n]; + + fn encode(self) -> [u8; $n] { + self.to_be_bytes() + } + + fn decode(encoded: Self::Encoded) -> Self { + Self::from_be_bytes(encoded) + } + } + }; +} + +encode_unsigned!(1, u8); +encode_unsigned!(2, u16); +encode_unsigned!(4, u32); +encode_unsigned!(8, u64); + +impl FixedLengthEncoding for f16 { + type Encoded = [u8; 2]; + + fn encode(self) -> [u8; 2] { + // https://github.com/rust-lang/rust/blob/9c20b2a8cc7588decb6de25ac6a7912dcef24d65/library/core/src/num/f32.rs#L1176-L1260 + let s = self.to_bits() as i16; + let val = s ^ (((s >> 15) as u16) >> 1) as i16; + val.encode() + } + + fn decode(encoded: Self::Encoded) -> Self { + let bits = i16::decode(encoded); + let val = bits ^ (((bits >> 15) as u16) >> 1) as i16; + Self::from_bits(val as u16) + } +} + +impl FixedLengthEncoding for f32 { + type Encoded = [u8; 4]; + + fn encode(self) -> [u8; 4] { + // https://github.com/rust-lang/rust/blob/9c20b2a8cc7588decb6de25ac6a7912dcef24d65/library/core/src/num/f32.rs#L1176-L1260 + let s = self.to_bits() as i32; + let val = s ^ (((s >> 31) as u32) >> 1) as i32; + val.encode() + } + + fn decode(encoded: Self::Encoded) -> Self { + let bits = i32::decode(encoded); + let val = bits ^ (((bits >> 31) as u32) >> 1) as i32; + Self::from_bits(val as u32) + } +} + +impl FixedLengthEncoding for f64 { + type Encoded = [u8; 8]; + + fn encode(self) -> [u8; 8] { + // https://github.com/rust-lang/rust/blob/9c20b2a8cc7588decb6de25ac6a7912dcef24d65/library/core/src/num/f32.rs#L1176-L1260 + let s = self.to_bits() as i64; + let val = s ^ (((s >> 63) as u64) >> 1) as i64; + val.encode() + } + + fn decode(encoded: Self::Encoded) -> Self { + let bits = i64::decode(encoded); + let val = bits ^ (((bits >> 63) as u64) >> 1) as i64; + Self::from_bits(val as u64) + } +} + +/// Returns the total encoded length (including null byte) for a value of type `T::Native` +pub const fn encoded_len(_col: &PrimitiveArray) -> usize +where + T: NativeType + FixedLengthEncoding, +{ + T::ENCODED_LEN +} + +/// Fixed width types are encoded as +/// +/// - 1 byte `0` if null or `1` if valid +/// - bytes of [`FixedLengthEncoding`] +pub fn encode>>( + out: &mut Rows, + i: I, + opts: SortOptions, +) { + for (offset, maybe_val) in out.offsets.iter_mut().skip(1).zip(i) { + let end_offset = *offset + T::ENCODED_LEN; + if let Some(val) = maybe_val { + let to_write = &mut out.buffer[*offset..end_offset]; + to_write[0] = 1; + let mut encoded = val.encode(); + if opts.descending { + // Flip bits to reverse order + encoded.as_mut().iter_mut().for_each(|v| *v = !*v) + } + to_write[1..].copy_from_slice(encoded.as_ref()) + } else { + out.buffer[*offset] = null_sentinel(opts); + } + *offset = end_offset; + } +} diff --git a/src/compute/sort/row/interner.rs b/src/compute/sort/row/interner.rs new file mode 100644 index 00000000000..77c53a06843 --- /dev/null +++ b/src/compute/sort/row/interner.rs @@ -0,0 +1,431 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::hash::{BuildHasher, Hash}; +use std::num::NonZeroU32; +use std::ops::Index; + +use hashbrown::hash_map::RawEntryMut; +use hashbrown::HashMap; + +/// An interned value of 32 bits. +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] +pub struct Interned(NonZeroU32); // We use NonZeroU32 so that `Option` is 32 bits. + +/// A byte array interner that generates normalized keys that are sorted with respect +/// to the interned values, e.g. `intern(a) < intern(b) => a < b` +#[derive(Debug, Default, Clone)] +pub struct OrderPreservingInterner { + /// Provides a lookup from [`Interned`] to the normalized key + keys: InternBuffer, + /// Provides a lookup from [`Interned`] to the normalized value + values: InternBuffer, + /// Key allocation data structure + bucket: Box, + + // A hash table used to perform faster re-keying, and detect duplicates + hasher: ahash::RandomState, + // A SwissTable hashmap. + lookup: HashMap, +} + +trait HashSingle: BuildHasher { + /// Calculates the hash of a single value. + #[inline] + fn hash_single(&self, x: T) -> u64 + where + Self: Sized, + { + // Rewrite as `hasher.hash_one(&x)` after + // https://github.com/rust-lang/rust/issues/86161 is merged. + #[cfg(feature = "nightly_build")] + { + self.hash_one(x) + } + #[cfg(not(feature = "nightly_build"))] + { + use std::hash::Hasher; + let mut hasher = self.build_hasher(); + x.hash(&mut hasher); + hasher.finish() + } + } +} + +impl HashSingle for ahash::RandomState {} + +impl OrderPreservingInterner { + /// Interns an iterator of values returning a list of [`Interned`] which can be + /// used with [`Self::normalized_key`] to retrieve the normalized keys with a + /// lifetime not tied to the mutable borrow passed to this method + pub fn intern(&mut self, input: I) -> Vec> + where + I: IntoIterator>, + V: AsRef<[u8]>, + { + let iter = input.into_iter(); + let capacity = iter.size_hint().0; + let mut out = Vec::with_capacity(capacity); + + // (index in output, hash value, value) + let mut to_intern: Vec<(usize, u64, V)> = Vec::with_capacity(capacity); + let mut to_intern_len = 0; + + for (idx, item) in iter.enumerate() { + let value: V = match item { + Some(value) => value, + None => { + out.push(None); + continue; + } + }; + + let v = value.as_ref(); + let hash = self.hasher.hash_single(v); + let entry = self + .lookup + .raw_entry_mut() + .from_hash(hash, |a| &self.values[*a] == v); + + match entry { + RawEntryMut::Occupied(o) => out.push(Some(*o.key())), + RawEntryMut::Vacant(_) => { + // Push placeholder + out.push(None); + to_intern_len += v.len(); + to_intern.push((idx, hash, value)); + } + }; + } + + to_intern.sort_unstable_by(|(_, _, a), (_, _, b)| a.as_ref().cmp(b.as_ref())); + + self.keys.offsets.reserve(to_intern.len()); + self.keys.values.reserve(to_intern.len()); // Approximation + self.values.offsets.reserve(to_intern.len()); + self.values.values.reserve(to_intern_len); + + for (idx, hash, value) in to_intern { + let val = value.as_ref(); + + let entry = self + .lookup + .raw_entry_mut() + .from_hash(hash, |a| &self.values[*a] == val); + + match entry { + RawEntryMut::Occupied(o) => { + out[idx] = Some(*o.key()); + } + RawEntryMut::Vacant(v) => { + let val = value.as_ref(); + self.bucket + .insert(&mut self.values, val, &mut self.keys.values); + self.keys.values.push(0); + let interned = self.keys.append(); + + let values = &self.values; + v.insert_with_hasher(hash, interned, (), |key| { + self.hasher.hash_single(&values[*key]) + }); + out[idx] = Some(interned); + } + } + } + + out + } + + /// Returns a null-terminated byte array that can be compared against other normalized_key + /// returned by this instance, to establish ordering of the interned values + pub fn normalized_key(&self, key: Interned) -> &[u8] { + self.keys.index(key) + } + + #[cfg(test)] + /// Converts a normalized key returned by [`Self::normalized_key`] to [`Interned`] + /// returning `None` if it cannot be found + pub fn lookup(&self, normalized_key: &[u8]) -> Option { + let len = normalized_key.len(); + if len <= 1 { + // `normalized_key` should terminate with a 0. + return None; + } + + let mut bucket = self.bucket.as_ref(); + if len > 2 { + for v in normalized_key.iter().take(len - 2) { + if *v == 255 { + bucket = bucket.next.as_ref()?; + } else { + let bucket_idx = v.checked_sub(1)?; + bucket = bucket.slots.get(bucket_idx as usize)?.child.as_ref()?; + } + } + } + + let slot_idx = normalized_key[len - 2].checked_sub(2)?; + Some(bucket.slots.get(slot_idx as usize)?.value) + } + + #[cfg(test)] + /// Returns the interned value for a given [`Interned`] + pub fn value(&self, key: Interned) -> &[u8] { + self.values.index(key) + } +} + +/// A buffer of `[u8]` indexed by `[Interned]` +#[derive(Debug, Clone)] +struct InternBuffer { + /// Raw values + values: Vec, + /// The ith value is `&values[offsets[i]..offsets[i+1]]` + offsets: Vec, +} + +impl Default for InternBuffer { + fn default() -> Self { + Self { + values: Default::default(), + offsets: vec![0], + } + } +} + +impl InternBuffer { + /// Insert `data` returning the corresponding [`Interned`] + fn insert(&mut self, data: &[u8]) -> Interned { + self.values.extend_from_slice(data); + self.append() + } + + /// Appends the next value based on data written to `self.values` + /// returning the corresponding [`Interned`] + fn append(&mut self) -> Interned { + let idx: u32 = self.offsets.len().try_into().unwrap(); + let key = Interned(NonZeroU32::new(idx).unwrap()); + self.offsets.push(self.values.len()); + key + } +} + +impl Index for InternBuffer { + type Output = [u8]; + + fn index(&self, key: Interned) -> &Self::Output { + let index = key.0.get() as usize; + let end = self.offsets[index]; + let start = self.offsets[index - 1]; + // SAFETY: + // self.values is never reduced in size and values appended + // to self.offsets are always less than self.values at the time + unsafe { self.values.get_unchecked(start..end) } + } +} + +/// A slot corresponds to a single byte-value in the generated normalized key +/// +/// It may contain a value, if not the first slot, and may contain a child [`Bucket`] representing +/// the next byte in the generated normalized key +#[derive(Debug, Clone)] +struct Slot { + value: Interned, + /// Child values less than `self.value` if any + child: Option>, +} + +/// Bucket is the root of the data-structure used to allocate normalized keys +/// +/// In particular it needs to generate keys that +/// +/// * Contain no `0` bytes other than the null terminator +/// * Compare lexicographically in the same manner as the encoded `data` +/// +/// The data structure consists of 254 slots, each of which can store a value. +/// Additionally each slot may contain a child bucket, containing values smaller +/// than the value within the slot. +/// +/// Each bucket also may contain a child bucket, containing values greater than +/// all values in the current bucket +/// +/// # Allocation Strategy +/// +/// The contiguous slice of slots containing values is searched to find the insertion +/// point for the new value, according to the sort order. +/// +/// If the insertion position exceeds 254, the number of slots, the value is inserted +/// into the child bucket of the current bucket. +/// +/// If the insertion position already contains a value, the value is inserted into the +/// child bucket of that slot. +/// +/// If the slot is not occupied, the value is inserted into that slot. +/// +/// The final key consists of the slot indexes visited incremented by 1, +/// with the final value incremented by 2, followed by a null terminator. +/// +/// Consider the case of the integers `[8, 6, 5, 7]` inserted in that order +/// +/// ```ignore +/// 8: &[2, 0] +/// 6: &[1, 2, 0] +/// 5: &[1, 1, 2, 0] +/// 7: &[1, 3, 0] +/// ``` +/// +/// Note: this allocation strategy is optimised for interning values in sorted order +/// +#[derive(Debug, Clone)] +struct Bucket { + slots: Vec, + /// Bucket containing values larger than all of `slots` + next: Option>, +} + +impl Default for Bucket { + fn default() -> Self { + Self { + slots: Vec::with_capacity(254), + next: None, + } + } +} + +impl Bucket { + /// Insert `data` into this bucket or one of its children, appending the + /// normalized key to `out` as it is constructed + /// + /// # Panics + /// + /// Panics if the value already exists + fn insert(&mut self, values_buf: &mut InternBuffer, data: &[u8], out: &mut Vec) { + let slots_len = self.slots.len() as u8; + // We optimize the case of inserting a value directly after those already inserted + // as [`OrderPreservingInterner::intern`] sorts values prior to interning them + match self.slots.last() { + Some(slot) => { + if &values_buf[slot.value] < data { + if slots_len == 254 { + out.push(255); + self.next + .get_or_insert_with(Default::default) + .insert(values_buf, data, out) + } else { + out.push(slots_len + 2); + let value = values_buf.insert(data); + self.slots.push(Slot { value, child: None }); + } + } else { + // Find insertion point + match self + .slots + .binary_search_by(|slot| values_buf[slot.value].cmp(data)) + { + Ok(_) => unreachable!("value already exists"), + Err(idx) => { + out.push(idx as u8 + 1); + self.slots[idx] + .child + .get_or_insert_with(Default::default) + .insert(values_buf, data, out) + } + } + } + } + None => { + out.push(2); + let value = values_buf.insert(data); + self.slots.push(Slot { value, child: None }) + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use rand::{seq::SliceRandom, thread_rng}; + + // Clippy isn't smart enough to understand dropping mutability + #[allow(clippy::needless_collect)] + fn test_intern_values(values: &[u64]) { + let mut interner = OrderPreservingInterner::default(); + + // Intern a single value at a time to check ordering + let interned: Vec<_> = values + .iter() + .flat_map(|v| interner.intern([Some(&v.to_be_bytes())])) + .map(Option::unwrap) + .collect(); + + for (value, interned) in values.iter().zip(&interned) { + assert_eq!(interner.value(*interned), &value.to_be_bytes()); + } + + let normalized_keys: Vec<_> = interned + .iter() + .map(|x| interner.normalized_key(*x)) + .collect(); + + for (interned, normalized) in interned.iter().zip(&normalized_keys) { + assert_eq!(*interned, interner.lookup(normalized).unwrap()); + } + + for (i, a) in normalized_keys.iter().enumerate() { + for (j, b) in normalized_keys.iter().enumerate() { + let interned_cmp = a.cmp(b); + let values_cmp = values[i].cmp(&values[j]); + assert_eq!( + interned_cmp, values_cmp, + "({:?} vs {:?}) vs ({} vs {})", + a, b, values[i], values[j] + ) + } + } + } + + #[test] + #[cfg_attr(miri, ignore)] + fn test_interner() { + test_intern_values(&[8, 6, 5, 7]); + + let mut values: Vec<_> = (0_u64..2000).collect(); + test_intern_values(&values); + + let mut rng = thread_rng(); + values.shuffle(&mut rng); + test_intern_values(&values); + } + + #[test] + fn test_intern_duplicates() { + // Unsorted with duplicates + let values = vec![0_u8, 1, 8, 4, 1, 0]; + let mut interner = OrderPreservingInterner::default(); + + let interned = interner.intern(values.iter().map(std::slice::from_ref).map(Some)); + let interned: Vec<_> = interned.into_iter().map(Option::unwrap).collect(); + + assert_eq!(interned[0], interned[5]); + assert_eq!(interned[1], interned[4]); + assert!(interner.normalized_key(interned[0]) < interner.normalized_key(interned[1])); + assert!(interner.normalized_key(interned[1]) < interner.normalized_key(interned[2])); + assert!(interner.normalized_key(interned[1]) < interner.normalized_key(interned[3])); + assert!(interner.normalized_key(interned[3]) < interner.normalized_key(interned[2])); + } +} diff --git a/src/compute/sort/row/mod.rs b/src/compute/sort/row/mod.rs new file mode 100644 index 00000000000..8d4833af089 --- /dev/null +++ b/src/compute/sort/row/mod.rs @@ -0,0 +1,829 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! A comparable row-oriented representation of a collection of [`Array`]. +//! +//! **This module is an arrow2 version of [arrow::row]:[https://docs.rs/arrow/latest/arrow/row/index.html]** +//! +//! As [`Row`] are [normalized for sorting], they can be very efficiently [compared](PartialOrd), +//! using [`memcmp`] under the hood, or used in [non-comparison sorts] such as [radix sort]. This +//! makes the row format ideal for implementing efficient multi-column sorting, +//! grouping, aggregation, windowing and more. +//! +//! _Comparing [`Rows`] generated by different [`RowConverter`] is not guaranteed to +//! yield a meaningful ordering_ +//! +//! [non-comparison sorts]:[https://en.wikipedia.org/wiki/Sorting_algorithm#Non-comparison_sorts] +//! [radix sort]:[https://en.wikipedia.org/wiki/Radix_sort] +//! [normalized for sorting]:[https://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.83.1080&rep=rep1&type=pdf] +//! [`memcmp`]:[https://www.man7.org/linux/man-pages/man3/memcmp.3.html] +use std::{ + cmp::Ordering, + hash::{Hash, Hasher}, + sync::Arc, +}; + +use crate::{ + array::{Array, BinaryArray, BooleanArray, DictionaryArray, PrimitiveArray, Utf8Array}, + datatypes::PhysicalType, + error::*, +}; +use crate::{compute::sort::SortOptions, datatypes::DataType}; + +use self::{ + dictionary::{compute_dictionary_mapping, encode_dictionary}, + interner::OrderPreservingInterner, +}; + +mod dictionary; +mod fixed; +mod interner; +mod variable; + +/// Converts `Box` columns into a row-oriented format. +/// +/// # Format +/// +/// The encoding of the row format should not be considered stable, but is documented here +/// for reference. +/// +/// ## Unsigned Integer Encoding +/// +/// A null integer is encoded as a `0_u8`, followed by a zero-ed number of bytes corresponding +/// to the integer's length +/// +/// A valid integer is encoded as `1_u8`, followed by the big-endian representation of the +/// integer +/// +/// ## Signed Integer Encoding +/// +/// Signed integers have their most significant sign bit flipped, and are then encoded in the +/// same manner as an unsigned integer +/// +/// ## Float Encoding +/// +/// Floats are converted from IEEE 754 representation to a signed integer representation +/// by flipping all bar the sign bit if they are negative. +/// +/// They are then encoded in the same manner as a signed integer +/// +/// ## Variable Length Bytes Encoding +/// +/// A null is encoded as a `0_u8` +/// +/// An empty byte array is encoded as `1_u8` +/// +/// A non-null, non-empty byte array is encoded as `2_u8` followed by the byte array +/// encoded using a block based scheme described below. +/// +/// The byte array is broken up into 32-byte blocks, each block is written in turn +/// to the output, followed by `0xFF_u8`. The final block is padded to 32-bytes +/// with `0_u8` and written to the output, followed by the un-padded length in bytes +/// of this final block as a `u8` +/// +/// This is loosely inspired by [COBS] encoding, and chosen over more traditional +/// [byte stuffing] as it is more amenable to vectorisation, in particular AVX-256. +/// +/// ## Dictionary Encoding +/// +/// [`RowConverter`] needs to support converting dictionary encoded arrays with unsorted, and +/// potentially distinct dictionaries. One simple mechanism to avoid this would be to reverse +/// the dictionary encoding, and encode the array values directly, however, this would lose +/// the benefits of dictionary encoding to reduce memory and CPU consumption. +/// +/// As such the [`RowConverter`] maintains an order-preserving dictionary encoding for each +/// dictionary encoded column. As this is a variable-length encoding, new dictionary values +/// can be added whilst preserving the sort order. +/// +/// A null dictionary value is encoded as `0_u8`. +/// +/// A non-null dictionary value is encoded as `1_u8` followed by a null-terminated byte array +/// key determined by the order-preserving dictionary encoding +/// +/// # Ordering +/// +/// ## Float Ordering +/// +/// Floats are totally ordered in accordance to the `totalOrder` predicate as defined +/// in the IEEE 754 (2008 revision) floating point standard. +/// +/// The ordering established by this does not always agree with the +/// [`PartialOrd`] and [`PartialEq`] implementations of `f32`. For example, +/// they consider negative and positive zero equal, while this does not +/// +/// ## Null Ordering +/// +/// The encoding described above will order nulls first, this can be inverted by representing +/// nulls as `0xFF_u8` instead of `0_u8` +/// +/// ## Reverse Column Ordering +/// +/// The order of a given column can be reversed by negating the encoded bytes of non-null values +/// +/// [COBS]:[https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing] +/// [byte stuffing]:[https://en.wikipedia.org/wiki/High-Level_Data_Link_Control#Asynchronous_framing] +#[derive(Debug)] +pub struct RowConverter { + /// Sort fields + fields: Arc<[SortField]>, + /// interning state for column `i`, if column`i` is a dictionary + interners: Vec>>, +} + +/// Configure the data type and sort order for a given column +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct SortField { + /// Sort options + options: SortOptions, + /// Data type + data_type: DataType, +} + +impl SortField { + /// Create a new column with the given data type + pub fn new(data_type: DataType) -> Self { + Self::new_with_options(data_type, SortOptions::default()) + } + + /// Create a new column with the given data type and [`SortOptions`] + pub fn new_with_options(data_type: DataType, options: SortOptions) -> Self { + Self { options, data_type } + } +} + +impl RowConverter { + /// Create a new [`RowConverter`] with the provided schema + pub fn new(fields: Vec) -> Self { + let interners = vec![None; fields.len()]; + Self { + fields: fields.into(), + interners, + } + } + + /// Convert a slice of [`Box`] columns into [`Rows`] + /// + /// See [`Row`] for information on when [`Row`] can be compared + /// + /// # Panics + /// + /// Panics if the schema of `columns` does not match that provided to [`RowConverter::new`] + pub fn convert_columns(&mut self, columns: &[Box]) -> Result { + if columns.len() != self.fields.len() { + return Err(Error::InvalidArgumentError(format!( + "Incorrect number of arrays provided to RowConverter, expected {} got {}", + self.fields.len(), + columns.len() + ))); + } + + let dictionaries = columns + .iter() + .zip(&mut self.interners) + .zip(self.fields.iter()) + .map(|((column, interner), field)| { + if column.data_type() != &field.data_type { + return Err(Error::InvalidArgumentError(format!( + "RowConverter column schema mismatch, expected {:?} got {:?}", + field.data_type, + column.data_type() + ))); + } + + let values = match column.data_type().to_logical_type() { + DataType::Dictionary(k, _, _) => match_integer_type!(k, |$T| { + let column = column + .as_any() + .downcast_ref::>() + .unwrap(); + column.values() + }), + _ => return Ok(None), + }; + + let interner = interner.get_or_insert_with(Default::default); + + let mapping = compute_dictionary_mapping(interner, values)? + .into_iter() + .map(|maybe_interned| { + maybe_interned.map(|interned| interner.normalized_key(interned)) + }) + .collect::>(); + + Ok(Some(mapping)) + }) + .collect::>>()?; + + let mut rows = new_empty_rows(columns, &dictionaries)?; + + // jorgecarleitao's comments in PR#1287: + // This seems to be embarassibly parallel. + // Given that this is a transpose of O(N x C) where N is length and C number of columns, I wonder if we could split this so users can parallelize. + // This is almost parallelizable - it is changing rows. + // However, there is still an optimization since modifying rows is O(1) but encoding is O(C). + // Will continue to think about this. + for ((column, field), dictionary) in + columns.iter().zip(self.fields.iter()).zip(dictionaries) + { + // We encode a column at a time to minimise dispatch overheads + encode_column(&mut rows, column, field.options, dictionary.as_deref()) + } + + Ok(rows) + } +} + +/// A row-oriented representation of arrow data, that is normalized for comparison +/// +/// See [`RowConverter`] +#[derive(Debug)] +pub struct Rows { + /// Underlying row bytes + buffer: Box<[u8]>, + /// Row `i` has data `&buffer[offsets[i]..offsets[i+1]]` + offsets: Box<[usize]>, +} + +impl Rows { + /// Get a reference to a certain row. + pub fn row(&self, row: usize) -> Row<'_> { + let end = self.offsets[row + 1]; + let start = self.offsets[row]; + Row { + data: unsafe { self.buffer.get_unchecked(start..end) }, + } + } + + /// Get a reference to a certain row but not check the bounds. + pub fn row_unchecked(&self, row: usize) -> Row<'_> { + let data = unsafe { + let end = *self.offsets.get_unchecked(row + 1); + let start = *self.offsets.get_unchecked(row); + self.buffer.get_unchecked(start..end) + }; + Row { data } + } + + /// Returns the number of rows + #[inline] + pub fn len(&self) -> usize { + self.offsets.len() - 1 + } + + #[inline] + /// Returns the iterator + pub fn iter(&self) -> RowsIter<'_> { + self.into_iter() + } +} + +impl<'a> IntoIterator for &'a Rows { + type Item = Row<'a>; + type IntoIter = RowsIter<'a>; + + #[inline] + fn into_iter(self) -> Self::IntoIter { + RowsIter { + rows: self, + start: 0, + end: self.len(), + } + } +} + +/// An iterator over [`Rows`] +#[derive(Debug)] +pub struct RowsIter<'a> { + rows: &'a Rows, + start: usize, + end: usize, +} + +impl<'a> Iterator for RowsIter<'a> { + type Item = Row<'a>; + + #[inline] + fn next(&mut self) -> Option { + if self.start < self.end { + let row = self.rows.row_unchecked(self.start); + self.start += 1; + Some(row) + } else { + None + } + } + + #[inline] + fn size_hint(&self) -> (usize, Option) { + let len = self.len(); + (len, Some(len)) + } +} + +impl<'a> ExactSizeIterator for RowsIter<'a> { + #[inline] + fn len(&self) -> usize { + self.end - self.start + } +} + +impl<'a> DoubleEndedIterator for RowsIter<'a> { + fn next_back(&mut self) -> Option { + if self.end == self.start { + return None; + } + let row = self.rows.row(self.end); + self.end -= 1; + Some(row) + } +} + +unsafe impl<'a> crate::trusted_len::TrustedLen for RowsIter<'a> {} + +/// A comparable representation of a row +/// +/// Two [`Row`] can be compared if they both belong to [`Rows`] returned by calls to +/// [`RowConverter::convert_columns`] on the same [`RowConverter`] +/// +/// Otherwise any ordering established by comparing the [`Row`] is arbitrary +#[derive(Debug, Copy, Clone)] +pub struct Row<'a> { + data: &'a [u8], +} + +// Manually derive these as don't wish to include `fields` + +impl<'a> PartialEq for Row<'a> { + #[inline] + fn eq(&self, other: &Self) -> bool { + self.data.eq(other.data) + } +} + +impl<'a> Eq for Row<'a> {} + +impl<'a> PartialOrd for Row<'a> { + #[inline] + fn partial_cmp(&self, other: &Self) -> Option { + self.data.partial_cmp(other.data) + } +} + +impl<'a> Ord for Row<'a> { + #[inline] + fn cmp(&self, other: &Self) -> Ordering { + self.data.cmp(other.data) + } +} + +impl<'a> Hash for Row<'a> { + #[inline] + fn hash(&self, state: &mut H) { + self.data.hash(state) + } +} + +impl<'a> AsRef<[u8]> for Row<'a> { + #[inline] + fn as_ref(&self) -> &[u8] { + self.data + } +} + +/// Returns the null sentinel, negated if `invert` is true +#[inline] +fn null_sentinel(options: SortOptions) -> u8 { + match options.nulls_first { + true => 0, + false => 0xFF, + } +} + +/// Match `PrimitiveType` to standard Rust types +#[macro_export] +macro_rules! with_match_primitive_without_interval_type {( + $key_type:expr, | $_:tt $T:ident | $($body:tt)* +) => ({ + macro_rules! __with_ty__ {( $_ $T:ident ) => ( $($body)* )} + use $crate::datatypes::PrimitiveType::*; + use $crate::types::{f16, i256}; + match $key_type { + Int8 => __with_ty__! { i8 }, + Int16 => __with_ty__! { i16 }, + Int32 => __with_ty__! { i32 }, + Int64 => __with_ty__! { i64 }, + Int128 => __with_ty__! { i128 }, + Int256 => __with_ty__! { i256 }, + UInt8 => __with_ty__! { u8 }, + UInt16 => __with_ty__! { u16 }, + UInt32 => __with_ty__! { u32 }, + UInt64 => __with_ty__! { u64 }, + Float16 => __with_ty__! { f16 }, + Float32 => __with_ty__! { f32 }, + Float64 => __with_ty__! { f64 }, + _ => unimplemented!("Unsupported type: {:?}", $key_type), + } +})} + +/// Computes the length of each encoded [`Rows`] and returns an empty [`Rows`] +fn new_empty_rows( + cols: &[Box], + dictionaries: &[Option>>], +) -> Result { + use fixed::FixedLengthEncoding; + + let num_rows = cols.first().map(|x| x.len()).unwrap_or(0); + let mut lengths = vec![0; num_rows]; + + for (array, dict) in cols.iter().zip(dictionaries) { + match array.data_type().to_physical_type() { + PhysicalType::Primitive(primitive) => { + with_match_primitive_without_interval_type!(primitive, |$T| { + let array = array + .as_any() + .downcast_ref::>() + .unwrap(); + lengths.iter_mut().for_each(|x| *x += fixed::encoded_len(array)) + }) + } + PhysicalType::Null => {} + PhysicalType::Boolean => lengths.iter_mut().for_each(|x| *x += bool::ENCODED_LEN), + PhysicalType::Binary => array + .as_any() + .downcast_ref::>() + .unwrap() + .iter() + .zip(lengths.iter_mut()) + .for_each(|(slice, length)| *length += variable::encoded_len(slice)), + PhysicalType::LargeBinary => array + .as_any() + .downcast_ref::>() + .unwrap() + .iter() + .zip(lengths.iter_mut()) + .for_each(|(slice, length)| *length += variable::encoded_len(slice)), + PhysicalType::Utf8 => array + .as_any() + .downcast_ref::>() + .unwrap() + .iter() + .zip(lengths.iter_mut()) + .for_each(|(slice, length)| { + *length += variable::encoded_len(slice.map(|x| x.as_bytes())) + }), + PhysicalType::LargeUtf8 => array + .as_any() + .downcast_ref::>() + .unwrap() + .iter() + .zip(lengths.iter_mut()) + .for_each(|(slice, length)| { + *length += variable::encoded_len(slice.map(|x| x.as_bytes())) + }), + PhysicalType::Dictionary(k) => match_integer_type!(k, |$T| { + let array = array + .as_any() + .downcast_ref::>() + .unwrap(); + let dict = dict.as_ref().unwrap(); + for (v, length) in array.keys().iter().zip(lengths.iter_mut()) { + match v.and_then(|v| dict[*v as usize]) { + Some(k) => *length += k.len() + 1, + None => *length += 1, + } + } + }), + t => { + return Err(Error::NotYetImplemented(format!( + "not yet implemented: {:?}", + t + ))) + } + } + } + + let mut offsets = Vec::with_capacity(num_rows + 1); + offsets.push(0); + + // We initialize the offsets shifted down by one row index. + // + // As the rows are appended to the offsets will be incremented to match + // + // For example, consider the case of 3 rows of length 3, 4, and 6 respectively. + // The offsets would be initialized to `0, 0, 3, 7` + // + // Writing the first row entirely would yield `0, 3, 3, 7` + // The second, `0, 3, 7, 7` + // The third, `0, 3, 7, 13` + // + // This would be the final offsets for reading + // + // In this way offsets tracks the position during writing whilst eventually serving + // as identifying the offsets of the written rows + let mut cur_offset = 0_usize; + for l in lengths { + offsets.push(cur_offset); + cur_offset = cur_offset.checked_add(l).expect("overflow"); + } + + let buffer = vec![0_u8; cur_offset]; + + Ok(Rows { + buffer: buffer.into(), + offsets: offsets.into(), + }) +} + +/// Encodes a column to the provided [`Rows`] incrementing the offsets as it progresses +fn encode_column( + out: &mut Rows, + column: &Box, + opts: SortOptions, + dictionary: Option<&[Option<&[u8]>]>, +) { + match column.data_type().to_physical_type() { + PhysicalType::Primitive(primitive) => { + with_match_primitive_without_interval_type!(primitive, |$T| { + let column = column + .as_any() + .downcast_ref::>() + .unwrap() + .iter() + .map(|v| v.map(|v| *v)); + fixed::encode(out, column, opts); + }) + } + PhysicalType::Null => {} + PhysicalType::Boolean => fixed::encode( + out, + column.as_any().downcast_ref::().unwrap(), + opts, + ), + PhysicalType::Binary => { + variable::encode( + out, + column + .as_any() + .downcast_ref::>() + .unwrap() + .iter(), + opts, + ); + } + PhysicalType::LargeBinary => { + variable::encode( + out, + column + .as_any() + .downcast_ref::>() + .unwrap() + .iter(), + opts, + ); + } + PhysicalType::Utf8 => variable::encode( + out, + column + .as_any() + .downcast_ref::>() + .unwrap() + .iter() + .map(|x| x.map(|x| x.as_bytes())), + opts, + ), + PhysicalType::LargeUtf8 => variable::encode( + out, + column + .as_any() + .downcast_ref::>() + .unwrap() + .iter() + .map(|x| x.map(|x| x.as_bytes())), + opts, + ), + PhysicalType::Dictionary(k) => match_integer_type!(k, |$T| { + let column = column + .as_any() + .downcast_ref::>() + .unwrap(); + encode_dictionary(out, column, dictionary.unwrap(), opts); + }), + t => unimplemented!("not yet implemented: {:?}", t), + } +} + +#[cfg(test)] +mod tests { + use std::fmt::Debug; + + use rand::{ + distributions::{uniform::SampleUniform, Distribution, Standard}, + thread_rng, Rng, + }; + + use super::*; + use crate::{ + array::{Array, DictionaryKey, Float32Array, Int16Array, NullArray, Offset}, + compute::sort::build_compare, + datatypes::DataType, + types::NativeType, + }; + + #[test] + fn test_fixed_width() { + let cols = [ + Int16Array::from_iter([Some(1), Some(2), None, Some(-5), Some(2), Some(2), Some(0)]) + .to_boxed(), + Float32Array::from_iter([ + Some(1.3), + Some(2.5), + None, + Some(4.), + Some(0.1), + Some(-4.), + Some(-0.), + ]) + .to_boxed(), + ]; + + let mut converter = RowConverter::new(vec![ + SortField::new(DataType::Int16), + SortField::new(DataType::Float32), + ]); + let rows = converter.convert_columns(&cols).unwrap(); + + assert_eq!(rows.offsets.as_ref(), &[0, 8, 16, 24, 32, 40, 48, 56]); + assert_eq!( + rows.buffer.as_ref(), + &[ + 1, 128, 1, // + 1, 191, 166, 102, 102, // + 1, 128, 2, // + 1, 192, 32, 0, 0, // + 0, 0, 0, // + 0, 0, 0, 0, 0, // + 1, 127, 251, // + 1, 192, 128, 0, 0, // + 1, 128, 2, // + 1, 189, 204, 204, 205, // + 1, 128, 2, // + 1, 63, 127, 255, 255, // + 1, 128, 0, // + 1, 127, 255, 255, 255 // + ] + ); + + assert!(rows.row(3) < rows.row(6)); + assert!(rows.row(0) < rows.row(1)); + assert!(rows.row(3) < rows.row(0)); + assert!(rows.row(4) < rows.row(1)); + assert!(rows.row(5) < rows.row(4)); + } + + #[test] + fn test_null_encoding() { + let col = NullArray::new(DataType::Null, 10).to_boxed(); + let mut converter = RowConverter::new(vec![SortField::new(DataType::Null)]); + let rows = converter.convert_columns(&[col]).unwrap(); + assert_eq!(rows.len(), 10); + assert_eq!(rows.row(1).data.len(), 0); + } + + fn generate_primitive_array(len: usize, valid_percent: f64) -> PrimitiveArray + where + K: NativeType, + Standard: Distribution, + { + let mut rng = thread_rng(); + (0..len) + .map(|_| rng.gen_bool(valid_percent).then(|| rng.gen())) + .collect() + } + + fn generate_strings(len: usize, valid_percent: f64) -> Utf8Array { + let mut rng = thread_rng(); + (0..len) + .map(|_| { + rng.gen_bool(valid_percent).then(|| { + let len = rng.gen_range(0..100); + let bytes = (0..len).map(|_| rng.gen_range(0..128)).collect(); + String::from_utf8(bytes).unwrap() + }) + }) + .collect() + } + + fn generate_dictionary( + values: Box, + len: usize, + valid_percent: f64, + ) -> DictionaryArray + where + K: DictionaryKey + Ord + SampleUniform, + >::Error: Debug, + { + let mut rng = thread_rng(); + let min_key = 0_usize.try_into().unwrap(); + let max_key = values.len().try_into().unwrap(); + let keys: PrimitiveArray = (0..len) + .map(|_| { + rng.gen_bool(valid_percent) + .then(|| rng.gen_range(min_key..max_key)) + }) + .collect(); + + DictionaryArray::try_from_keys(keys, values).unwrap() + } + + fn generate_column(len: usize) -> Box { + let mut rng = thread_rng(); + match rng.gen_range(0..9) { + 0 => Box::new(generate_primitive_array::(len, 0.8)), + 1 => Box::new(generate_primitive_array::(len, 0.8)), + 2 => Box::new(generate_primitive_array::(len, 0.8)), + 3 => Box::new(generate_primitive_array::(len, 0.8)), + 4 => Box::new(generate_primitive_array::(len, 0.8)), + 5 => Box::new(generate_primitive_array::(len, 0.8)), + 6 => Box::new(generate_strings::(len, 0.8)), + 7 => Box::new(generate_dictionary::( + // Cannot test dictionaries containing null values because of #2687 + Box::new(generate_strings::(rng.gen_range(1..len), 1.0)), + len, + 0.8, + )), + 8 => Box::new(generate_dictionary::( + // Cannot test dictionaries containing null values because of #2687 + Box::new(generate_primitive_array::(rng.gen_range(1..len), 1.0)), + len, + 0.8, + )), + _ => unreachable!(), + } + } + + #[test] + #[cfg_attr(miri, ignore)] + fn fuzz_test() { + for _ in 0..100 { + let mut rng = thread_rng(); + let num_columns = rng.gen_range(1..5); + let len = rng.gen_range(5..100); + let arrays: Vec<_> = (0..num_columns).map(|_| generate_column(len)).collect(); + + let options: Vec<_> = (0..num_columns) + .map(|_| SortOptions { + descending: rng.gen_bool(0.5), + nulls_first: rng.gen_bool(0.5), + }) + .collect(); + + let comparators = arrays + .iter() + .zip(options.iter()) + .map(|(a, o)| build_compare(&**a, *o).unwrap()) + .collect::>(); + + let columns = options + .into_iter() + .zip(&arrays) + .map(|(o, a)| SortField::new_with_options(a.data_type().clone(), o)) + .collect(); + + let mut converter = RowConverter::new(columns); + let rows = converter.convert_columns(&arrays).unwrap(); + let cmp = |i, j| { + for cmp in comparators.iter() { + let cmp = cmp(i, j); + if cmp != Ordering::Equal { + return cmp; + } + } + Ordering::Equal + }; + + for i in 0..len { + for j in 0..len { + let row_i = rows.row(i); + let row_j = rows.row(j); + let row_cmp = row_i.cmp(&row_j); + let lex_cmp = cmp(i, j); + assert_eq!(row_cmp, lex_cmp); + } + } + } + } +} diff --git a/src/compute/sort/row/variable.rs b/src/compute/sort/row/variable.rs new file mode 100644 index 00000000000..40fd5c17735 --- /dev/null +++ b/src/compute/sort/row/variable.rs @@ -0,0 +1,118 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::compute::sort::SortOptions; + +use super::{null_sentinel, Rows}; + +/// The block size of the variable length encoding +pub const BLOCK_SIZE: usize = 32; + +/// The continuation token +pub const BLOCK_CONTINUATION: u8 = 0xFF; + +/// Indicates an empty string +pub const EMPTY_SENTINEL: u8 = 1; + +/// Indicates a non-empty string +pub const NON_EMPTY_SENTINEL: u8 = 2; + +/// Returns the ceil of `value`/`divisor` +#[inline] +fn div_ceil(value: usize, divisor: usize) -> usize { + // Rewrite as `value.div_ceil(&divisor)` after + // https://github.com/rust-lang/rust/issues/88581 is merged. + value / divisor + (0 != value % divisor) as usize +} + +/// Returns the length of the encoded representation of a byte array, including the null byte +pub fn encoded_len(a: Option<&[u8]>) -> usize { + match a { + Some(a) => 1 + div_ceil(a.len(), BLOCK_SIZE) * (BLOCK_SIZE + 1), + None => 1, + } +} + +/// Variable length values are encoded as +/// +/// - single `0_u8` if null +/// - single `1_u8` if empty array +/// - `2_u8` if not empty, followed by one or more blocks +/// +/// where a block is encoded as +/// +/// - [`BLOCK_SIZE`] bytes of string data, padded with 0s +/// - `0xFF_u8` if this is not the last block for this string +/// - otherwise the length of the block as a `u8` +pub fn encode<'a, I: Iterator>>(out: &mut Rows, i: I, opts: SortOptions) { + for (offset, maybe_val) in out.offsets.iter_mut().skip(1).zip(i) { + match maybe_val { + Some(val) if val.is_empty() => { + out.buffer[*offset] = match opts.descending { + true => !EMPTY_SENTINEL, + false => EMPTY_SENTINEL, + }; + *offset += 1; + } + Some(val) => { + let block_count = div_ceil(val.len(), BLOCK_SIZE); + let end_offset = *offset + 1 + block_count * (BLOCK_SIZE + 1); + let to_write = &mut out.buffer[*offset..end_offset]; + + // Write `2_u8` to demarcate as non-empty, non-null string + to_write[0] = NON_EMPTY_SENTINEL; + + let chunks = val.chunks_exact(BLOCK_SIZE); + let remainder = chunks.remainder(); + for (input, output) in chunks + .clone() + .zip(to_write[1..].chunks_exact_mut(BLOCK_SIZE + 1)) + { + let input: &[u8; BLOCK_SIZE] = input.try_into().unwrap(); + let out_block: &mut [u8; BLOCK_SIZE] = + (&mut output[..BLOCK_SIZE]).try_into().unwrap(); + + *out_block = *input; + + // Indicate that there are further blocks to follow + output[BLOCK_SIZE] = BLOCK_CONTINUATION; + } + + if !remainder.is_empty() { + let start_offset = 1 + (block_count - 1) * (BLOCK_SIZE + 1); + to_write[start_offset..start_offset + remainder.len()] + .copy_from_slice(remainder); + *to_write.last_mut().unwrap() = remainder.len() as u8; + } else { + // We must overwrite the continuation marker written by the loop above + *to_write.last_mut().unwrap() = BLOCK_SIZE as u8; + } + + *offset = end_offset; + + if opts.descending { + // Invert bits + to_write.iter_mut().for_each(|v| *v = !*v) + } + } + None => { + out.buffer[*offset] = null_sentinel(opts); + *offset += 1; + } + } + } +} diff --git a/src/lib.rs b/src/lib.rs index e479cb7d08b..64771b954cf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,6 +12,7 @@ #![allow(clippy::type_complexity)] #![cfg_attr(docsrs, feature(doc_cfg))] #![cfg_attr(feature = "simd", feature(portable_simd))] +#![cfg_attr(feature = "nightly_build", feature(build_hasher_simple_hash_one))] #[macro_use] pub mod array; diff --git a/src/types/native.rs b/src/types/native.rs index 8c82452e895..19c1697d1cf 100644 --- a/src/types/native.rs +++ b/src/types/native.rs @@ -357,6 +357,18 @@ impl f16 { self.0 & 0x7FFFu16 > 0x7C00u16 } + /// Casts from u16. + #[inline] + pub const fn from_bits(bits: u16) -> f16 { + f16(bits) + } + + /// Casts to u16. + #[inline] + pub const fn to_bits(self) -> u16 { + self.0 + } + /// Casts this `f16` to `f32` pub fn to_f32(self) -> f32 { let i = self.0; diff --git a/tests/it/compute/sort/mod.rs b/tests/it/compute/sort/mod.rs index 7618c041352..2ede8873640 100644 --- a/tests/it/compute/sort/mod.rs +++ b/tests/it/compute/sort/mod.rs @@ -1,4 +1,5 @@ mod lex_sort; +mod row; use arrow2::array::*; use arrow2::compute::sort::*; diff --git a/tests/it/compute/sort/row/mod.rs b/tests/it/compute/sort/row/mod.rs new file mode 100644 index 00000000000..d82d548623c --- /dev/null +++ b/tests/it/compute/sort/row/mod.rs @@ -0,0 +1,278 @@ +use arrow2::{ + array::{ + Array, BinaryArray, BooleanArray, DictionaryArray, Float32Array, Int128Array, Int16Array, + Int256Array, Int32Array, MutableDictionaryArray, MutablePrimitiveArray, MutableUtf8Array, + NullArray, TryExtend, TryPush, Utf8Array, + }, + compute::sort::{ + row::{RowConverter, SortField}, + SortOptions, + }, + datatypes::{DataType, IntegerType}, + types::i256, +}; + +#[test] +fn test_fixed_width() { + let cols = [ + Int16Array::from_iter([Some(1), Some(2), None, Some(-5), Some(2), Some(2), Some(0)]) + .to_boxed(), + Float32Array::from_iter([ + Some(1.3), + Some(2.5), + None, + Some(4.), + Some(0.1), + Some(-4.), + Some(-0.), + ]) + .to_boxed(), + ]; + + let mut converter = RowConverter::new(vec![ + SortField::new(DataType::Int16), + SortField::new(DataType::Float32), + ]); + let rows = converter.convert_columns(&cols).unwrap(); + + assert!(rows.row(3) < rows.row(6)); + assert!(rows.row(0) < rows.row(1)); + assert!(rows.row(3) < rows.row(0)); + assert!(rows.row(4) < rows.row(1)); + assert!(rows.row(5) < rows.row(4)); +} + +#[test] +fn test_decimal128() { + let mut converter = RowConverter::new(vec![SortField::new(DataType::Decimal(38, 7))]); + let col = Int128Array::from_iter([ + None, + Some(i128::MIN), + Some(-13), + Some(46_i128), + Some(5456_i128), + Some(i128::MAX), + ]) + .to(DataType::Decimal(38, 7)) + .to_boxed(); + + let rows = converter.convert_columns(&[col]).unwrap(); + for i in 0..rows.len() - 1 { + assert!(rows.row(i) < rows.row(i + 1)); + } +} + +#[test] +fn test_decimal256() { + let mut converter = RowConverter::new(vec![SortField::new(DataType::Decimal256(76, 7))]); + let col = Int256Array::from_iter([ + None, + Some(i256::from_words(i128::MIN, i128::MIN)), + Some(i256::from_words(0, -1)), + Some(i256::from_words(i128::MAX, -1)), + Some(i256::from_words(i128::MAX, 0)), + Some(i256::from_words(0, 46_i128)), + Some(i256::from_words(5, 46_i128)), + Some(i256::from_words(i128::MAX, i128::MAX)), + ]) + .to(DataType::Decimal256(76, 7)) + .to_boxed(); + + let rows = converter.convert_columns(&[col]).unwrap(); + for i in 0..rows.len() - 1 { + assert!(rows.row(i) < rows.row(i + 1)); + } +} + +#[test] +fn test_bool() { + let mut converter = RowConverter::new(vec![SortField::new(DataType::Boolean)]); + + let col = BooleanArray::from_iter([None, Some(false), Some(true)]).to_boxed(); + + let rows = converter.convert_columns(&[Box::clone(&col)]).unwrap(); + assert!(rows.row(2) > rows.row(1)); + assert!(rows.row(2) > rows.row(0)); + assert!(rows.row(1) > rows.row(0)); + + let mut converter = RowConverter::new(vec![SortField::new_with_options( + DataType::Boolean, + SortOptions { + descending: true, + nulls_first: false, + }, + )]); + + let rows = converter.convert_columns(&[Box::clone(&col)]).unwrap(); + assert!(rows.row(2) < rows.row(1)); + assert!(rows.row(2) < rows.row(0)); + assert!(rows.row(1) < rows.row(0)); +} + +#[test] +fn test_null_encoding() { + let col = NullArray::new(DataType::Null, 10).to_boxed(); + let mut converter = RowConverter::new(vec![SortField::new(DataType::Null)]); + let rows = converter.convert_columns(&[col]).unwrap(); + assert_eq!(rows.len(), 10); +} + +#[test] +fn test_variable_width() { + let col = + Utf8Array::::from([Some("hello"), Some("he"), None, Some("foo"), Some("")]).to_boxed(); + + let mut converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]); + let rows = converter.convert_columns(&[Box::clone(&col)]).unwrap(); + + assert!(rows.row(1) < rows.row(0)); + assert!(rows.row(2) < rows.row(4)); + assert!(rows.row(3) < rows.row(0)); + assert!(rows.row(3) < rows.row(1)); + + let col = BinaryArray::::from_iter([ + None, + Some(vec![0_u8; 0]), + Some(vec![0_u8; 6]), + Some(vec![0_u8; 32]), + Some(vec![0_u8; 33]), + Some(vec![1_u8; 6]), + Some(vec![1_u8; 32]), + Some(vec![1_u8; 33]), + Some(vec![0xFF_u8; 6]), + Some(vec![0xFF_u8; 32]), + Some(vec![0xFF_u8; 33]), + ]) + .to_boxed(); + + let mut converter = RowConverter::new(vec![SortField::new(DataType::Binary)]); + let rows = converter.convert_columns(&[Box::clone(&col)]).unwrap(); + + for i in 0..rows.len() { + for j in i + 1..rows.len() { + assert!( + rows.row(i) < rows.row(j), + "{} < {} - {:?} < {:?}", + i, + j, + rows.row(i), + rows.row(j) + ); + } + } + + let mut converter = RowConverter::new(vec![SortField::new_with_options( + DataType::Binary, + SortOptions { + descending: true, + nulls_first: false, + }, + )]); + let rows = converter.convert_columns(&[Box::clone(&col)]).unwrap(); + + for i in 0..rows.len() { + for j in i + 1..rows.len() { + assert!( + rows.row(i) > rows.row(j), + "{} > {} - {:?} > {:?}", + i, + j, + rows.row(i), + rows.row(j) + ); + } + } +} + +#[test] +fn test_string_dictionary() { + let data = vec![ + Some("foo"), + Some("hello"), + Some("he"), + None, + Some("hello"), + Some(""), + Some("hello"), + Some("hello"), + ]; + let mut array = MutableDictionaryArray::>::new(); + array.try_extend(data).unwrap(); + let a: DictionaryArray = array.into(); + let a = a.to_boxed(); + + let mut converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]); + let rows_a = converter.convert_columns(&[Box::clone(&a)]).unwrap(); + + assert!(rows_a.row(3) < rows_a.row(5)); + assert!(rows_a.row(2) < rows_a.row(1)); + assert!(rows_a.row(0) < rows_a.row(1)); + assert!(rows_a.row(3) < rows_a.row(0)); + + assert_eq!(rows_a.row(1), rows_a.row(4)); + assert_eq!(rows_a.row(1), rows_a.row(6)); + assert_eq!(rows_a.row(1), rows_a.row(7)); + + let data = vec![Some("hello"), None, Some("cupcakes")]; + let mut array = MutableDictionaryArray::>::new(); + array.try_extend(data).unwrap(); + let b: DictionaryArray = array.into(); + + let rows_b = converter.convert_columns(&[b.to_boxed()]).unwrap(); + assert_eq!(rows_a.row(1), rows_b.row(0)); + assert_eq!(rows_a.row(3), rows_b.row(1)); + assert!(rows_b.row(2) < rows_a.row(0)); + + let mut converter = RowConverter::new(vec![SortField::new_with_options( + a.data_type().clone(), + SortOptions { + descending: true, + nulls_first: false, + }, + )]); + + let rows_c = converter.convert_columns(&[Box::clone(&a)]).unwrap(); + assert!(rows_c.row(3) > rows_c.row(5)); + assert!(rows_c.row(2) > rows_c.row(1)); + assert!(rows_c.row(0) > rows_c.row(1)); + assert!(rows_c.row(3) > rows_c.row(0)); +} + +#[test] +fn test_primitive_dictionary() { + let mut builder = MutableDictionaryArray::>::new(); + builder.try_push(Some(2)).unwrap(); + builder.try_push(Some(3)).unwrap(); + builder.try_push(Some(0)).unwrap(); + builder.push_null(); + builder.try_push(Some(5)).unwrap(); + builder.try_push(Some(3)).unwrap(); + builder.try_push(Some(-1)).unwrap(); + + let a: DictionaryArray = builder.into(); + + let mut converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]); + let rows = converter.convert_columns(&[a.to_boxed()]).unwrap(); + assert!(rows.row(0) < rows.row(1)); + assert!(rows.row(2) < rows.row(0)); + assert!(rows.row(3) < rows.row(2)); + assert!(rows.row(6) < rows.row(2)); + assert!(rows.row(3) < rows.row(6)); +} + +#[test] +fn test_dictionary_nulls() { + let values = Int32Array::from_iter([Some(1), Some(-1), None, Some(4), None]); + let keys = Int32Array::from_iter([Some(0), Some(0), Some(1), Some(2), Some(4), None]); + + let data_type = DataType::Dictionary(IntegerType::Int32, Box::new(DataType::Int32), false); + let data = DictionaryArray::try_from_keys(keys, values.to_boxed()).unwrap(); + + let mut converter = RowConverter::new(vec![SortField::new(data_type)]); + let rows = converter.convert_columns(&[data.to_boxed()]).unwrap(); + + assert_eq!(rows.row(0), rows.row(1)); + assert_eq!(rows.row(3), rows.row(4)); + assert_eq!(rows.row(4), rows.row(5)); + assert!(rows.row(3) < rows.row(0)); +}